LCOV - code coverage report
Current view: top level - src/bin/pg_rewind - xlogreader.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 52.4 % 725 380
Test Date: 2026-03-03 14:15:12 Functions: 67.9 % 28 19
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * xlogreader.c
       4              :  *      Generic XLog reading facility
       5              :  *
       6              :  * Portions Copyright (c) 2013-2026, PostgreSQL Global Development Group
       7              :  *
       8              :  * IDENTIFICATION
       9              :  *      src/backend/access/transam/xlogreader.c
      10              :  *
      11              :  * NOTES
      12              :  *      See xlogreader.h for more notes on this facility.
      13              :  *
      14              :  *      This file is compiled as both front-end and backend code, so it
      15              :  *      may not use ereport, server-defined static variables, etc.
      16              :  *-------------------------------------------------------------------------
      17              :  */
      18              : #include "postgres.h"
      19              : 
      20              : #include <unistd.h>
      21              : #ifdef USE_LZ4
      22              : #include <lz4.h>
      23              : #endif
      24              : #ifdef USE_ZSTD
      25              : #include <zstd.h>
      26              : #endif
      27              : 
      28              : #include "access/transam.h"
      29              : #include "access/xlog_internal.h"
      30              : #include "access/xlogreader.h"
      31              : #include "access/xlogrecord.h"
      32              : #include "catalog/pg_control.h"
      33              : #include "common/pg_lzcompress.h"
      34              : #include "replication/origin.h"
      35              : 
      36              : #ifndef FRONTEND
      37              : #include "pgstat.h"
      38              : #include "storage/bufmgr.h"
      39              : #else
      40              : #include "common/logging.h"
      41              : #endif
      42              : 
      43              : static void report_invalid_record(XLogReaderState *state, const char *fmt,...)
      44              :             pg_attribute_printf(2, 3);
      45              : static void allocate_recordbuf(XLogReaderState *state, uint32 reclength);
      46              : static int  ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr,
      47              :                              int reqLen);
      48              : static void XLogReaderInvalReadState(XLogReaderState *state);
      49              : static XLogPageReadResult XLogDecodeNextRecord(XLogReaderState *state, bool nonblocking);
      50              : static bool ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
      51              :                                   XLogRecPtr PrevRecPtr, XLogRecord *record, bool randAccess);
      52              : static bool ValidXLogRecord(XLogReaderState *state, XLogRecord *record,
      53              :                             XLogRecPtr recptr);
      54              : static void ResetDecoder(XLogReaderState *state);
      55              : static void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
      56              :                                int segsize, const char *waldir);
      57              : 
      58              : /* size of the buffer allocated for error message. */
      59              : #define MAX_ERRORMSG_LEN 1000
      60              : 
      61              : /*
      62              :  * Default size; large enough that typical users of XLogReader won't often need
      63              :  * to use the 'oversized' memory allocation code path.
      64              :  */
      65              : #define DEFAULT_DECODE_BUFFER_SIZE (64 * 1024)
      66              : 
      67              : /*
      68              :  * Construct a string in state->errormsg_buf explaining what's wrong with
      69              :  * the current record being read.
      70              :  */
      71              : static void
      72            0 : report_invalid_record(XLogReaderState *state, const char *fmt,...)
      73              : {
      74              :     va_list     args;
      75              : 
      76            0 :     fmt = _(fmt);
      77              : 
      78            0 :     va_start(args, fmt);
      79            0 :     vsnprintf(state->errormsg_buf, MAX_ERRORMSG_LEN, fmt, args);
      80            0 :     va_end(args);
      81              : 
      82            0 :     state->errormsg_deferred = true;
      83            0 : }
      84              : 
      85              : /*
      86              :  * Set the size of the decoding buffer.  A pointer to a caller supplied memory
      87              :  * region may also be passed in, in which case non-oversized records will be
      88              :  * decoded there.
      89              :  */
      90              : void
      91            0 : XLogReaderSetDecodeBuffer(XLogReaderState *state, void *buffer, size_t size)
      92              : {
      93              :     Assert(state->decode_buffer == NULL);
      94              : 
      95            0 :     state->decode_buffer = buffer;
      96            0 :     state->decode_buffer_size = size;
      97            0 :     state->decode_buffer_tail = buffer;
      98            0 :     state->decode_buffer_head = buffer;
      99            0 : }
     100              : 
     101              : /*
     102              :  * Allocate and initialize a new XLogReader.
     103              :  *
     104              :  * Returns NULL if the xlogreader couldn't be allocated.
     105              :  */
     106              : XLogReaderState *
     107           45 : XLogReaderAllocate(int wal_segment_size, const char *waldir,
     108              :                    XLogReaderRoutine *routine, void *private_data)
     109              : {
     110              :     XLogReaderState *state;
     111              : 
     112              :     state = (XLogReaderState *)
     113           45 :         palloc_extended(sizeof(XLogReaderState),
     114              :                         MCXT_ALLOC_NO_OOM | MCXT_ALLOC_ZERO);
     115           45 :     if (!state)
     116            0 :         return NULL;
     117              : 
     118              :     /* initialize caller-provided support functions */
     119           45 :     state->routine = *routine;
     120              : 
     121              :     /*
     122              :      * Permanently allocate readBuf.  We do it this way, rather than just
     123              :      * making a static array, for two reasons: (1) no need to waste the
     124              :      * storage in most instantiations of the backend; (2) a static char array
     125              :      * isn't guaranteed to have any particular alignment, whereas
     126              :      * palloc_extended() will provide MAXALIGN'd storage.
     127              :      */
     128           45 :     state->readBuf = (char *) palloc_extended(XLOG_BLCKSZ,
     129              :                                               MCXT_ALLOC_NO_OOM);
     130           45 :     if (!state->readBuf)
     131              :     {
     132            0 :         pfree(state);
     133            0 :         return NULL;
     134              :     }
     135              : 
     136              :     /* Initialize segment info. */
     137           45 :     WALOpenSegmentInit(&state->seg, &state->segcxt, wal_segment_size,
     138              :                        waldir);
     139              : 
     140              :     /* system_identifier initialized to zeroes above */
     141           45 :     state->private_data = private_data;
     142              :     /* ReadRecPtr, EndRecPtr and readLen initialized to zeroes above */
     143           45 :     state->errormsg_buf = palloc_extended(MAX_ERRORMSG_LEN + 1,
     144              :                                           MCXT_ALLOC_NO_OOM);
     145           45 :     if (!state->errormsg_buf)
     146              :     {
     147            0 :         pfree(state->readBuf);
     148            0 :         pfree(state);
     149            0 :         return NULL;
     150              :     }
     151           45 :     state->errormsg_buf[0] = '\0';
     152              : 
     153              :     /*
     154              :      * Allocate an initial readRecordBuf of minimal size, which can later be
     155              :      * enlarged if necessary.
     156              :      */
     157           45 :     allocate_recordbuf(state, 0);
     158           45 :     return state;
     159              : }
     160              : 
     161              : void
     162           45 : XLogReaderFree(XLogReaderState *state)
     163              : {
     164           45 :     if (state->seg.ws_file != -1)
     165            0 :         state->routine.segment_close(state);
     166              : 
     167           45 :     if (state->decode_buffer && state->free_decode_buffer)
     168           45 :         pfree(state->decode_buffer);
     169              : 
     170           45 :     pfree(state->errormsg_buf);
     171           45 :     if (state->readRecordBuf)
     172           45 :         pfree(state->readRecordBuf);
     173           45 :     pfree(state->readBuf);
     174           45 :     pfree(state);
     175           45 : }
     176              : 
     177              : /*
     178              :  * Allocate readRecordBuf to fit a record of at least the given length.
     179              :  *
     180              :  * readRecordBufSize is set to the new buffer size.
     181              :  *
     182              :  * To avoid useless small increases, round its size to a multiple of
     183              :  * XLOG_BLCKSZ, and make sure it's at least 5*Max(BLCKSZ, XLOG_BLCKSZ) to start
     184              :  * with.  (That is enough for all "normal" records, but very large commit or
     185              :  * abort records might need more space.)
     186              :  *
     187              :  * Note: This routine should *never* be called for xl_tot_len until the header
     188              :  * of the record has been fully validated.
     189              :  */
     190              : static void
     191           45 : allocate_recordbuf(XLogReaderState *state, uint32 reclength)
     192              : {
     193           45 :     uint32      newSize = reclength;
     194              : 
     195           45 :     newSize += XLOG_BLCKSZ - (newSize % XLOG_BLCKSZ);
     196           45 :     newSize = Max(newSize, 5 * Max(BLCKSZ, XLOG_BLCKSZ));
     197              : 
     198           45 :     if (state->readRecordBuf)
     199            0 :         pfree(state->readRecordBuf);
     200           45 :     state->readRecordBuf = (char *) palloc(newSize);
     201           45 :     state->readRecordBufSize = newSize;
     202           45 : }
     203              : 
     204              : /*
     205              :  * Initialize the passed segment structs.
     206              :  */
     207              : static void
     208           45 : WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
     209              :                    int segsize, const char *waldir)
     210              : {
     211           45 :     seg->ws_file = -1;
     212           45 :     seg->ws_segno = 0;
     213           45 :     seg->ws_tli = 0;
     214              : 
     215           45 :     segcxt->ws_segsize = segsize;
     216           45 :     if (waldir)
     217           45 :         snprintf(segcxt->ws_dir, MAXPGPATH, "%s", waldir);
     218           45 : }
     219              : 
     220              : /*
     221              :  * Begin reading WAL at 'RecPtr'.
     222              :  *
     223              :  * 'RecPtr' should point to the beginning of a valid WAL record.  Pointing at
     224              :  * the beginning of a page is also OK, if there is a new record right after
     225              :  * the page header, i.e. not a continuation.
     226              :  *
     227              :  * This does not make any attempt to read the WAL yet, and hence cannot fail.
     228              :  * If the starting address is not correct, the first call to XLogReadRecord()
     229              :  * will error out.
     230              :  */
     231              : void
     232         2692 : XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
     233              : {
     234              :     Assert(XLogRecPtrIsValid(RecPtr));
     235              : 
     236         2692 :     ResetDecoder(state);
     237              : 
     238              :     /* Begin at the passed-in record pointer. */
     239         2692 :     state->EndRecPtr = RecPtr;
     240         2692 :     state->NextRecPtr = RecPtr;
     241         2692 :     state->ReadRecPtr = InvalidXLogRecPtr;
     242         2692 :     state->DecodeRecPtr = InvalidXLogRecPtr;
     243         2692 : }
     244              : 
     245              : /*
     246              :  * Release the last record that was returned by XLogNextRecord(), if any, to
     247              :  * free up space.  Returns the LSN past the end of the record.
     248              :  */
     249              : XLogRecPtr
     250       178336 : XLogReleasePreviousRecord(XLogReaderState *state)
     251              : {
     252              :     DecodedXLogRecord *record;
     253              :     XLogRecPtr  next_lsn;
     254              : 
     255       178336 :     if (!state->record)
     256        91860 :         return InvalidXLogRecPtr;
     257              : 
     258              :     /*
     259              :      * Remove it from the decoded record queue.  It must be the oldest item
     260              :      * decoded, decode_queue_head.
     261              :      */
     262        86476 :     record = state->record;
     263        86476 :     next_lsn = record->next_lsn;
     264              :     Assert(record == state->decode_queue_head);
     265        86476 :     state->record = NULL;
     266        86476 :     state->decode_queue_head = record->next;
     267              : 
     268              :     /* It might also be the newest item decoded, decode_queue_tail. */
     269        86476 :     if (state->decode_queue_tail == record)
     270        86476 :         state->decode_queue_tail = NULL;
     271              : 
     272              :     /* Release the space. */
     273        86476 :     if (unlikely(record->oversized))
     274              :     {
     275              :         /* It's not in the decode buffer, so free it to release space. */
     276            0 :         pfree(record);
     277              :     }
     278              :     else
     279              :     {
     280              :         /* It must be the head (oldest) record in the decode buffer. */
     281              :         Assert(state->decode_buffer_head == (char *) record);
     282              : 
     283              :         /*
     284              :          * We need to update head to point to the next record that is in the
     285              :          * decode buffer, if any, being careful to skip oversized ones
     286              :          * (they're not in the decode buffer).
     287              :          */
     288        86476 :         record = record->next;
     289        86476 :         while (unlikely(record && record->oversized))
     290            0 :             record = record->next;
     291              : 
     292        86476 :         if (record)
     293              :         {
     294              :             /* Adjust head to release space up to the next record. */
     295            0 :             state->decode_buffer_head = (char *) record;
     296              :         }
     297              :         else
     298              :         {
     299              :             /*
     300              :              * Otherwise we might as well just reset head and tail to the
     301              :              * start of the buffer space, because we're empty.  This means
     302              :              * we'll keep overwriting the same piece of memory if we're not
     303              :              * doing any prefetching.
     304              :              */
     305        86476 :             state->decode_buffer_head = state->decode_buffer;
     306        86476 :             state->decode_buffer_tail = state->decode_buffer;
     307              :         }
     308              :     }
     309              : 
     310        86476 :     return next_lsn;
     311              : }
     312              : 
     313              : /*
     314              :  * Attempt to read an XLOG record.
     315              :  *
     316              :  * XLogBeginRead() or XLogFindNextRecord() and then XLogReadAhead() must be
     317              :  * called before the first call to XLogNextRecord().  This functions returns
     318              :  * records and errors that were put into an internal queue by XLogReadAhead().
     319              :  *
     320              :  * On success, a record is returned.
     321              :  *
     322              :  * The returned record (or *errormsg) points to an internal buffer that's
     323              :  * valid until the next call to XLogNextRecord.
     324              :  */
     325              : DecodedXLogRecord *
     326        89168 : XLogNextRecord(XLogReaderState *state, char **errormsg)
     327              : {
     328              :     /* Release the last record returned by XLogNextRecord(). */
     329        89168 :     XLogReleasePreviousRecord(state);
     330              : 
     331        89168 :     if (state->decode_queue_head == NULL)
     332              :     {
     333            0 :         *errormsg = NULL;
     334            0 :         if (state->errormsg_deferred)
     335              :         {
     336            0 :             if (state->errormsg_buf[0] != '\0')
     337            0 :                 *errormsg = state->errormsg_buf;
     338            0 :             state->errormsg_deferred = false;
     339              :         }
     340              : 
     341              :         /*
     342              :          * state->EndRecPtr is expected to have been set by the last call to
     343              :          * XLogBeginRead() or XLogNextRecord(), and is the location of the
     344              :          * error.
     345              :          */
     346              :         Assert(XLogRecPtrIsValid(state->EndRecPtr));
     347              : 
     348            0 :         return NULL;
     349              :     }
     350              : 
     351              :     /*
     352              :      * Record this as the most recent record returned, so that we'll release
     353              :      * it next time.  This also exposes it to the traditional
     354              :      * XLogRecXXX(xlogreader) macros, which work with the decoder rather than
     355              :      * the record for historical reasons.
     356              :      */
     357        89168 :     state->record = state->decode_queue_head;
     358              : 
     359              :     /*
     360              :      * Update the pointers to the beginning and one-past-the-end of this
     361              :      * record, again for the benefit of historical code that expected the
     362              :      * decoder to track this rather than accessing these fields of the record
     363              :      * itself.
     364              :      */
     365        89168 :     state->ReadRecPtr = state->record->lsn;
     366        89168 :     state->EndRecPtr = state->record->next_lsn;
     367              : 
     368        89168 :     *errormsg = NULL;
     369              : 
     370        89168 :     return state->record;
     371              : }
     372              : 
     373              : /*
     374              :  * Attempt to read an XLOG record.
     375              :  *
     376              :  * XLogBeginRead() or XLogFindNextRecord() must be called before the first call
     377              :  * to XLogReadRecord().
     378              :  *
     379              :  * If the page_read callback fails to read the requested data, NULL is
     380              :  * returned.  The callback is expected to have reported the error; errormsg
     381              :  * is set to NULL.
     382              :  *
     383              :  * If the reading fails for some other reason, NULL is also returned, and
     384              :  * *errormsg is set to a string with details of the failure.
     385              :  *
     386              :  * The returned pointer (or *errormsg) points to an internal buffer that's
     387              :  * valid until the next call to XLogReadRecord.
     388              :  */
     389              : XLogRecord *
     390        89168 : XLogReadRecord(XLogReaderState *state, char **errormsg)
     391              : {
     392              :     DecodedXLogRecord *decoded;
     393              : 
     394              :     /*
     395              :      * Release last returned record, if there is one.  We need to do this so
     396              :      * that we can check for empty decode queue accurately.
     397              :      */
     398        89168 :     XLogReleasePreviousRecord(state);
     399              : 
     400              :     /*
     401              :      * Call XLogReadAhead() in blocking mode to make sure there is something
     402              :      * in the queue, though we don't use the result.
     403              :      */
     404        89168 :     if (!XLogReaderHasQueuedRecordOrError(state))
     405        89168 :         XLogReadAhead(state, false /* nonblocking */ );
     406              : 
     407              :     /* Consume the head record or error. */
     408        89168 :     decoded = XLogNextRecord(state, errormsg);
     409        89168 :     if (decoded)
     410              :     {
     411              :         /*
     412              :          * This function returns a pointer to the record's header, not the
     413              :          * actual decoded record.  The caller will access the decoded record
     414              :          * through the XLogRecGetXXX() macros, which reach the decoded
     415              :          * recorded as xlogreader->record.
     416              :          */
     417              :         Assert(state->record == decoded);
     418        89168 :         return &decoded->header;
     419              :     }
     420              : 
     421            0 :     return NULL;
     422              : }
     423              : 
     424              : /*
     425              :  * Allocate space for a decoded record.  The only member of the returned
     426              :  * object that is initialized is the 'oversized' flag, indicating that the
     427              :  * decoded record wouldn't fit in the decode buffer and must eventually be
     428              :  * freed explicitly.
     429              :  *
     430              :  * The caller is responsible for adjusting decode_buffer_tail with the real
     431              :  * size after successfully decoding a record into this space.  This way, if
     432              :  * decoding fails, then there is nothing to undo unless the 'oversized' flag
     433              :  * was set and pfree() must be called.
     434              :  *
     435              :  * Return NULL if there is no space in the decode buffer and allow_oversized
     436              :  * is false, or if memory allocation fails for an oversized buffer.
     437              :  */
     438              : static DecodedXLogRecord *
     439        89168 : XLogReadRecordAlloc(XLogReaderState *state, size_t xl_tot_len, bool allow_oversized)
     440              : {
     441        89168 :     size_t      required_space = DecodeXLogRecordRequiredSpace(xl_tot_len);
     442        89168 :     DecodedXLogRecord *decoded = NULL;
     443              : 
     444              :     /* Allocate a circular decode buffer if we don't have one already. */
     445        89168 :     if (unlikely(state->decode_buffer == NULL))
     446              :     {
     447           45 :         if (state->decode_buffer_size == 0)
     448           45 :             state->decode_buffer_size = DEFAULT_DECODE_BUFFER_SIZE;
     449           45 :         state->decode_buffer = palloc(state->decode_buffer_size);
     450           45 :         state->decode_buffer_head = state->decode_buffer;
     451           45 :         state->decode_buffer_tail = state->decode_buffer;
     452           45 :         state->free_decode_buffer = true;
     453              :     }
     454              : 
     455              :     /* Try to allocate space in the circular decode buffer. */
     456        89168 :     if (state->decode_buffer_tail >= state->decode_buffer_head)
     457              :     {
     458              :         /* Empty, or tail is to the right of head. */
     459        89168 :         if (required_space <=
     460        89168 :             state->decode_buffer_size -
     461        89168 :             (state->decode_buffer_tail - state->decode_buffer))
     462              :         {
     463              :             /*-
     464              :              * There is space between tail and end.
     465              :              *
     466              :              * +-----+--------------------+-----+
     467              :              * |     |////////////////////|here!|
     468              :              * +-----+--------------------+-----+
     469              :              *       ^                    ^
     470              :              *       |                    |
     471              :              *       h                    t
     472              :              */
     473        89168 :             decoded = (DecodedXLogRecord *) state->decode_buffer_tail;
     474        89168 :             decoded->oversized = false;
     475        89168 :             return decoded;
     476              :         }
     477            0 :         else if (required_space <
     478            0 :                  state->decode_buffer_head - state->decode_buffer)
     479              :         {
     480              :             /*-
     481              :              * There is space between start and head.
     482              :              *
     483              :              * +-----+--------------------+-----+
     484              :              * |here!|////////////////////|     |
     485              :              * +-----+--------------------+-----+
     486              :              *       ^                    ^
     487              :              *       |                    |
     488              :              *       h                    t
     489              :              */
     490            0 :             decoded = (DecodedXLogRecord *) state->decode_buffer;
     491            0 :             decoded->oversized = false;
     492            0 :             return decoded;
     493              :         }
     494              :     }
     495              :     else
     496              :     {
     497              :         /* Tail is to the left of head. */
     498            0 :         if (required_space <
     499            0 :             state->decode_buffer_head - state->decode_buffer_tail)
     500              :         {
     501              :             /*-
     502              :              * There is space between tail and head.
     503              :              *
     504              :              * +-----+--------------------+-----+
     505              :              * |/////|here!               |/////|
     506              :              * +-----+--------------------+-----+
     507              :              *       ^                    ^
     508              :              *       |                    |
     509              :              *       t                    h
     510              :              */
     511            0 :             decoded = (DecodedXLogRecord *) state->decode_buffer_tail;
     512            0 :             decoded->oversized = false;
     513            0 :             return decoded;
     514              :         }
     515              :     }
     516              : 
     517              :     /* Not enough space in the decode buffer.  Are we allowed to allocate? */
     518            0 :     if (allow_oversized)
     519              :     {
     520            0 :         decoded = palloc(required_space);
     521            0 :         decoded->oversized = true;
     522            0 :         return decoded;
     523              :     }
     524              : 
     525            0 :     return NULL;
     526              : }
     527              : 
     528              : static XLogPageReadResult
     529        89168 : XLogDecodeNextRecord(XLogReaderState *state, bool nonblocking)
     530              : {
     531              :     XLogRecPtr  RecPtr;
     532              :     XLogRecord *record;
     533              :     XLogRecPtr  targetPagePtr;
     534              :     bool        randAccess;
     535              :     uint32      len,
     536              :                 total_len;
     537              :     uint32      targetRecOff;
     538              :     uint32      pageHeaderSize;
     539              :     bool        assembled;
     540              :     bool        gotheader;
     541              :     int         readOff;
     542              :     DecodedXLogRecord *decoded;
     543              :     char       *errormsg;       /* not used */
     544              : 
     545              :     /*
     546              :      * randAccess indicates whether to verify the previous-record pointer of
     547              :      * the record we're reading.  We only do this if we're reading
     548              :      * sequentially, which is what we initially assume.
     549              :      */
     550        89168 :     randAccess = false;
     551              : 
     552              :     /* reset error state */
     553        89168 :     state->errormsg_buf[0] = '\0';
     554        89168 :     decoded = NULL;
     555              : 
     556        89168 :     state->abortedRecPtr = InvalidXLogRecPtr;
     557        89168 :     state->missingContrecPtr = InvalidXLogRecPtr;
     558              : 
     559        89168 :     RecPtr = state->NextRecPtr;
     560              : 
     561        89168 :     if (XLogRecPtrIsValid(state->DecodeRecPtr))
     562              :     {
     563              :         /* read the record after the one we just read */
     564              : 
     565              :         /*
     566              :          * NextRecPtr is pointing to end+1 of the previous WAL record.  If
     567              :          * we're at a page boundary, no more records can fit on the current
     568              :          * page. We must skip over the page header, but we can't do that until
     569              :          * we've read in the page, since the header size is variable.
     570              :          */
     571              :     }
     572              :     else
     573              :     {
     574              :         /*
     575              :          * Caller supplied a position to start at.
     576              :          *
     577              :          * In this case, NextRecPtr should already be pointing either to a
     578              :          * valid record starting position or alternatively to the beginning of
     579              :          * a page. See the header comments for XLogBeginRead.
     580              :          */
     581              :         Assert(RecPtr % XLOG_BLCKSZ == 0 || XRecOffIsValid(RecPtr));
     582         2692 :         randAccess = true;
     583              :     }
     584              : 
     585        89168 : restart:
     586        89168 :     state->nonblocking = nonblocking;
     587        89168 :     state->currRecPtr = RecPtr;
     588        89168 :     assembled = false;
     589              : 
     590        89168 :     targetPagePtr = RecPtr - (RecPtr % XLOG_BLCKSZ);
     591        89168 :     targetRecOff = RecPtr % XLOG_BLCKSZ;
     592              : 
     593              :     /*
     594              :      * Read the page containing the record into state->readBuf. Request enough
     595              :      * byte to cover the whole record header, or at least the part of it that
     596              :      * fits on the same page.
     597              :      */
     598        89168 :     readOff = ReadPageInternal(state, targetPagePtr,
     599        89168 :                                Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ));
     600        89168 :     if (readOff == XLREAD_WOULDBLOCK)
     601            0 :         return XLREAD_WOULDBLOCK;
     602        89168 :     else if (readOff < 0)
     603            0 :         goto err;
     604              : 
     605              :     /*
     606              :      * ReadPageInternal always returns at least the page header, so we can
     607              :      * examine it now.
     608              :      */
     609        89168 :     pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf);
     610        89168 :     if (targetRecOff == 0)
     611              :     {
     612              :         /*
     613              :          * At page start, so skip over page header.
     614              :          */
     615           91 :         RecPtr += pageHeaderSize;
     616           91 :         targetRecOff = pageHeaderSize;
     617              :     }
     618        89077 :     else if (targetRecOff < pageHeaderSize)
     619              :     {
     620            0 :         report_invalid_record(state, "invalid record offset at %X/%08X: expected at least %u, got %u",
     621            0 :                               LSN_FORMAT_ARGS(RecPtr),
     622              :                               pageHeaderSize, targetRecOff);
     623            0 :         goto err;
     624              :     }
     625              : 
     626        89168 :     if ((((XLogPageHeader) state->readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) &&
     627              :         targetRecOff == pageHeaderSize)
     628              :     {
     629            0 :         report_invalid_record(state, "contrecord is requested by %X/%08X",
     630            0 :                               LSN_FORMAT_ARGS(RecPtr));
     631            0 :         goto err;
     632              :     }
     633              : 
     634              :     /* ReadPageInternal has verified the page header */
     635              :     Assert(pageHeaderSize <= readOff);
     636              : 
     637              :     /*
     638              :      * Read the record length.
     639              :      *
     640              :      * NB: Even though we use an XLogRecord pointer here, the whole record
     641              :      * header might not fit on this page. xl_tot_len is the first field of the
     642              :      * struct, so it must be on this page (the records are MAXALIGNed), but we
     643              :      * cannot access any other fields until we've verified that we got the
     644              :      * whole header.
     645              :      */
     646        89168 :     record = (XLogRecord *) (state->readBuf + RecPtr % XLOG_BLCKSZ);
     647        89168 :     total_len = record->xl_tot_len;
     648              : 
     649              :     /*
     650              :      * If the whole record header is on this page, validate it immediately.
     651              :      * Otherwise do just a basic sanity check on xl_tot_len, and validate the
     652              :      * rest of the header after reading it from the next page.  The xl_tot_len
     653              :      * check is necessary here to ensure that we enter the "Need to reassemble
     654              :      * record" code path below; otherwise we might fail to apply
     655              :      * ValidXLogRecordHeader at all.
     656              :      */
     657        89168 :     if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord)
     658              :     {
     659        88998 :         if (!ValidXLogRecordHeader(state, RecPtr, state->DecodeRecPtr, record,
     660              :                                    randAccess))
     661            0 :             goto err;
     662        88998 :         gotheader = true;
     663              :     }
     664              :     else
     665              :     {
     666              :         /* There may be no next page if it's too small. */
     667          170 :         if (total_len < SizeOfXLogRecord)
     668              :         {
     669            0 :             report_invalid_record(state,
     670              :                                   "invalid record length at %X/%08X: expected at least %u, got %u",
     671            0 :                                   LSN_FORMAT_ARGS(RecPtr),
     672              :                                   (uint32) SizeOfXLogRecord, total_len);
     673            0 :             goto err;
     674              :         }
     675              :         /* We'll validate the header once we have the next page. */
     676          170 :         gotheader = false;
     677              :     }
     678              : 
     679              :     /*
     680              :      * Try to find space to decode this record, if we can do so without
     681              :      * calling palloc.  If we can't, we'll try again below after we've
     682              :      * validated that total_len isn't garbage bytes from a recycled WAL page.
     683              :      */
     684        89168 :     decoded = XLogReadRecordAlloc(state,
     685              :                                   total_len,
     686              :                                   false /* allow_oversized */ );
     687        89168 :     if (decoded == NULL && nonblocking)
     688              :     {
     689              :         /*
     690              :          * There is no space in the circular decode buffer, and the caller is
     691              :          * only reading ahead.  The caller should consume existing records to
     692              :          * make space.
     693              :          */
     694            0 :         return XLREAD_WOULDBLOCK;
     695              :     }
     696              : 
     697        89168 :     len = XLOG_BLCKSZ - RecPtr % XLOG_BLCKSZ;
     698        89168 :     if (total_len > len)
     699              :     {
     700              :         /* Need to reassemble record */
     701              :         char       *contdata;
     702              :         XLogPageHeader pageHeader;
     703              :         char       *buffer;
     704              :         uint32      gotlen;
     705              : 
     706         4406 :         assembled = true;
     707              : 
     708              :         /*
     709              :          * We always have space for a couple of pages, enough to validate a
     710              :          * boundary-spanning record header.
     711              :          */
     712              :         Assert(state->readRecordBufSize >= XLOG_BLCKSZ * 2);
     713              :         Assert(state->readRecordBufSize >= len);
     714              : 
     715              :         /* Copy the first fragment of the record from the first page. */
     716         4406 :         memcpy(state->readRecordBuf,
     717         4406 :                state->readBuf + RecPtr % XLOG_BLCKSZ, len);
     718         4406 :         buffer = state->readRecordBuf + len;
     719         4406 :         gotlen = len;
     720              : 
     721              :         do
     722              :         {
     723              :             /* Calculate pointer to beginning of next page */
     724         4410 :             targetPagePtr += XLOG_BLCKSZ;
     725              : 
     726              :             /*
     727              :              * Read the page header before processing the record data, so we
     728              :              * can handle the case where the previous record ended as being a
     729              :              * partial one.
     730              :              */
     731         4410 :             readOff = ReadPageInternal(state, targetPagePtr, SizeOfXLogShortPHD);
     732         4410 :             if (readOff == XLREAD_WOULDBLOCK)
     733            0 :                 return XLREAD_WOULDBLOCK;
     734         4410 :             else if (readOff < 0)
     735            0 :                 goto err;
     736              : 
     737              :             Assert(SizeOfXLogShortPHD <= readOff);
     738              : 
     739         4410 :             pageHeader = (XLogPageHeader) state->readBuf;
     740              : 
     741              :             /*
     742              :              * If we were expecting a continuation record and got an
     743              :              * "overwrite contrecord" flag, that means the continuation record
     744              :              * was overwritten with a different record.  Restart the read by
     745              :              * assuming the address to read is the location where we found
     746              :              * this flag; but keep track of the LSN of the record we were
     747              :              * reading, for later verification.
     748              :              */
     749         4410 :             if (pageHeader->xlp_info & XLP_FIRST_IS_OVERWRITE_CONTRECORD)
     750              :             {
     751            0 :                 state->overwrittenRecPtr = RecPtr;
     752            0 :                 RecPtr = targetPagePtr;
     753            0 :                 goto restart;
     754              :             }
     755              : 
     756              :             /* Check that the continuation on next page looks valid */
     757         4410 :             if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD))
     758              :             {
     759            0 :                 report_invalid_record(state,
     760              :                                       "there is no contrecord flag at %X/%08X",
     761            0 :                                       LSN_FORMAT_ARGS(RecPtr));
     762            0 :                 goto err;
     763              :             }
     764              : 
     765              :             /*
     766              :              * Cross-check that xlp_rem_len agrees with how much of the record
     767              :              * we expect there to be left.
     768              :              */
     769         4410 :             if (pageHeader->xlp_rem_len == 0 ||
     770         4410 :                 total_len != (pageHeader->xlp_rem_len + gotlen))
     771              :             {
     772            0 :                 report_invalid_record(state,
     773              :                                       "invalid contrecord length %u (expected %lld) at %X/%08X",
     774              :                                       pageHeader->xlp_rem_len,
     775            0 :                                       ((long long) total_len) - gotlen,
     776            0 :                                       LSN_FORMAT_ARGS(RecPtr));
     777            0 :                 goto err;
     778              :             }
     779              : 
     780              :             /* Wait for the next page to become available */
     781         4410 :             readOff = ReadPageInternal(state, targetPagePtr,
     782         4410 :                                        Min(total_len - gotlen + SizeOfXLogShortPHD,
     783              :                                            XLOG_BLCKSZ));
     784         4410 :             if (readOff == XLREAD_WOULDBLOCK)
     785            0 :                 return XLREAD_WOULDBLOCK;
     786         4410 :             else if (readOff < 0)
     787            0 :                 goto err;
     788              : 
     789              :             /* Append the continuation from this page to the buffer */
     790         4410 :             pageHeaderSize = XLogPageHeaderSize(pageHeader);
     791              : 
     792         4410 :             if (readOff < pageHeaderSize)
     793            0 :                 readOff = ReadPageInternal(state, targetPagePtr,
     794              :                                            pageHeaderSize);
     795              : 
     796              :             Assert(pageHeaderSize <= readOff);
     797              : 
     798         4410 :             contdata = (char *) state->readBuf + pageHeaderSize;
     799         4410 :             len = XLOG_BLCKSZ - pageHeaderSize;
     800         4410 :             if (pageHeader->xlp_rem_len < len)
     801         4406 :                 len = pageHeader->xlp_rem_len;
     802              : 
     803         4410 :             if (readOff < pageHeaderSize + len)
     804            0 :                 readOff = ReadPageInternal(state, targetPagePtr,
     805            0 :                                            pageHeaderSize + len);
     806              : 
     807         4410 :             memcpy(buffer, contdata, len);
     808         4410 :             buffer += len;
     809         4410 :             gotlen += len;
     810              : 
     811              :             /* If we just reassembled the record header, validate it. */
     812         4410 :             if (!gotheader)
     813              :             {
     814          170 :                 record = (XLogRecord *) state->readRecordBuf;
     815          170 :                 if (!ValidXLogRecordHeader(state, RecPtr, state->DecodeRecPtr,
     816              :                                            record, randAccess))
     817            0 :                     goto err;
     818          170 :                 gotheader = true;
     819              :             }
     820              : 
     821              :             /*
     822              :              * We might need a bigger buffer.  We have validated the record
     823              :              * header, in the case that it split over a page boundary.  We've
     824              :              * also cross-checked total_len against xlp_rem_len on the second
     825              :              * page, and verified xlp_pageaddr on both.
     826              :              */
     827         4410 :             if (total_len > state->readRecordBufSize)
     828              :             {
     829              :                 char        save_copy[XLOG_BLCKSZ * 2];
     830              : 
     831              :                 /*
     832              :                  * Save and restore the data we already had.  It can't be more
     833              :                  * than two pages.
     834              :                  */
     835              :                 Assert(gotlen <= lengthof(save_copy));
     836              :                 Assert(gotlen <= state->readRecordBufSize);
     837            0 :                 memcpy(save_copy, state->readRecordBuf, gotlen);
     838            0 :                 allocate_recordbuf(state, total_len);
     839            0 :                 memcpy(state->readRecordBuf, save_copy, gotlen);
     840            0 :                 buffer = state->readRecordBuf + gotlen;
     841              :             }
     842         4410 :         } while (gotlen < total_len);
     843              :         Assert(gotheader);
     844              : 
     845         4406 :         record = (XLogRecord *) state->readRecordBuf;
     846         4406 :         if (!ValidXLogRecord(state, record, RecPtr))
     847            0 :             goto err;
     848              : 
     849         4406 :         pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf);
     850         4406 :         state->DecodeRecPtr = RecPtr;
     851         4406 :         state->NextRecPtr = targetPagePtr + pageHeaderSize
     852         4406 :             + MAXALIGN(pageHeader->xlp_rem_len);
     853              :     }
     854              :     else
     855              :     {
     856              :         /* Wait for the record data to become available */
     857        84762 :         readOff = ReadPageInternal(state, targetPagePtr,
     858        84762 :                                    Min(targetRecOff + total_len, XLOG_BLCKSZ));
     859        84762 :         if (readOff == XLREAD_WOULDBLOCK)
     860            0 :             return XLREAD_WOULDBLOCK;
     861        84762 :         else if (readOff < 0)
     862            0 :             goto err;
     863              : 
     864              :         /* Record does not cross a page boundary */
     865        84762 :         if (!ValidXLogRecord(state, record, RecPtr))
     866            0 :             goto err;
     867              : 
     868        84762 :         state->NextRecPtr = RecPtr + MAXALIGN(total_len);
     869              : 
     870        84762 :         state->DecodeRecPtr = RecPtr;
     871              :     }
     872              : 
     873              :     /*
     874              :      * Special processing if it's an XLOG SWITCH record
     875              :      */
     876        89168 :     if (record->xl_rmid == RM_XLOG_ID &&
     877         6181 :         (record->xl_info & ~XLR_INFO_MASK) == XLOG_SWITCH)
     878              :     {
     879              :         /* Pretend it extends to end of segment */
     880           10 :         state->NextRecPtr += state->segcxt.ws_segsize - 1;
     881           10 :         state->NextRecPtr -= XLogSegmentOffset(state->NextRecPtr, state->segcxt.ws_segsize);
     882              :     }
     883              : 
     884              :     /*
     885              :      * If we got here without a DecodedXLogRecord, it means we needed to
     886              :      * validate total_len before trusting it, but by now we've done that.
     887              :      */
     888        89168 :     if (decoded == NULL)
     889              :     {
     890              :         Assert(!nonblocking);
     891            0 :         decoded = XLogReadRecordAlloc(state,
     892              :                                       total_len,
     893              :                                       true /* allow_oversized */ );
     894              :         /* allocation should always happen under allow_oversized */
     895              :         Assert(decoded != NULL);
     896              :     }
     897              : 
     898        89168 :     if (DecodeXLogRecord(state, decoded, record, RecPtr, &errormsg))
     899              :     {
     900              :         /* Record the location of the next record. */
     901        89168 :         decoded->next_lsn = state->NextRecPtr;
     902              : 
     903              :         /*
     904              :          * If it's in the decode buffer, mark the decode buffer space as
     905              :          * occupied.
     906              :          */
     907        89168 :         if (!decoded->oversized)
     908              :         {
     909              :             /* The new decode buffer head must be MAXALIGNed. */
     910              :             Assert(decoded->size == MAXALIGN(decoded->size));
     911        89168 :             if ((char *) decoded == state->decode_buffer)
     912        89168 :                 state->decode_buffer_tail = state->decode_buffer + decoded->size;
     913              :             else
     914            0 :                 state->decode_buffer_tail += decoded->size;
     915              :         }
     916              : 
     917              :         /* Insert it into the queue of decoded records. */
     918              :         Assert(state->decode_queue_tail != decoded);
     919        89168 :         if (state->decode_queue_tail)
     920            0 :             state->decode_queue_tail->next = decoded;
     921        89168 :         state->decode_queue_tail = decoded;
     922        89168 :         if (!state->decode_queue_head)
     923        89168 :             state->decode_queue_head = decoded;
     924        89168 :         return XLREAD_SUCCESS;
     925              :     }
     926              : 
     927            0 : err:
     928            0 :     if (assembled)
     929              :     {
     930              :         /*
     931              :          * We get here when a record that spans multiple pages needs to be
     932              :          * assembled, but something went wrong -- perhaps a contrecord piece
     933              :          * was lost.  If caller is WAL replay, it will know where the aborted
     934              :          * record was and where to direct followup WAL to be written, marking
     935              :          * the next piece with XLP_FIRST_IS_OVERWRITE_CONTRECORD, which will
     936              :          * in turn signal downstream WAL consumers that the broken WAL record
     937              :          * is to be ignored.
     938              :          */
     939            0 :         state->abortedRecPtr = RecPtr;
     940            0 :         state->missingContrecPtr = targetPagePtr;
     941              : 
     942              :         /*
     943              :          * If we got here without reporting an error, make sure an error is
     944              :          * queued so that XLogPrefetcherReadRecord() doesn't bring us back a
     945              :          * second time and clobber the above state.
     946              :          */
     947            0 :         state->errormsg_deferred = true;
     948              :     }
     949              : 
     950            0 :     if (decoded && decoded->oversized)
     951            0 :         pfree(decoded);
     952              : 
     953              :     /*
     954              :      * Invalidate the read state. We might read from a different source after
     955              :      * failure.
     956              :      */
     957            0 :     XLogReaderInvalReadState(state);
     958              : 
     959              :     /*
     960              :      * If an error was written to errormsg_buf, it'll be returned to the
     961              :      * caller of XLogReadRecord() after all successfully decoded records from
     962              :      * the read queue.
     963              :      */
     964              : 
     965            0 :     return XLREAD_FAIL;
     966              : }
     967              : 
     968              : /*
     969              :  * Try to decode the next available record, and return it.  The record will
     970              :  * also be returned to XLogNextRecord(), which must be called to 'consume'
     971              :  * each record.
     972              :  *
     973              :  * If nonblocking is true, may return NULL due to lack of data or WAL decoding
     974              :  * space.
     975              :  */
     976              : DecodedXLogRecord *
     977        89168 : XLogReadAhead(XLogReaderState *state, bool nonblocking)
     978              : {
     979              :     XLogPageReadResult result;
     980              : 
     981        89168 :     if (state->errormsg_deferred)
     982            0 :         return NULL;
     983              : 
     984        89168 :     result = XLogDecodeNextRecord(state, nonblocking);
     985        89168 :     if (result == XLREAD_SUCCESS)
     986              :     {
     987              :         Assert(state->decode_queue_tail != NULL);
     988        89168 :         return state->decode_queue_tail;
     989              :     }
     990              : 
     991            0 :     return NULL;
     992              : }
     993              : 
     994              : /*
     995              :  * Read a single xlog page including at least [pageptr, reqLen] of valid data
     996              :  * via the page_read() callback.
     997              :  *
     998              :  * Returns XLREAD_FAIL if the required page cannot be read for some
     999              :  * reason; errormsg_buf is set in that case (unless the error occurs in the
    1000              :  * page_read callback).
    1001              :  *
    1002              :  * Returns XLREAD_WOULDBLOCK if the requested data can't be read without
    1003              :  * waiting.  This can be returned only if the installed page_read callback
    1004              :  * respects the state->nonblocking flag, and cannot read the requested data
    1005              :  * immediately.
    1006              :  *
    1007              :  * We fetch the page from a reader-local cache if we know we have the required
    1008              :  * data and if there hasn't been any error since caching the data.
    1009              :  */
    1010              : static int
    1011       182750 : ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
    1012              : {
    1013              :     int         readLen;
    1014              :     uint32      targetPageOff;
    1015              :     XLogSegNo   targetSegNo;
    1016              :     XLogPageHeader hdr;
    1017              : 
    1018              :     Assert((pageptr % XLOG_BLCKSZ) == 0);
    1019              : 
    1020       182750 :     XLByteToSeg(pageptr, targetSegNo, state->segcxt.ws_segsize);
    1021       182750 :     targetPageOff = XLogSegmentOffset(pageptr, state->segcxt.ws_segsize);
    1022              : 
    1023              :     /* check whether we have all the requested data already */
    1024       182750 :     if (targetSegNo == state->seg.ws_segno &&
    1025       182695 :         targetPageOff == state->segoff && reqLen <= state->readLen)
    1026       176856 :         return state->readLen;
    1027              : 
    1028              :     /*
    1029              :      * Invalidate contents of internal buffer before read attempt.  Just set
    1030              :      * the length to 0, rather than a full XLogReaderInvalReadState(), so we
    1031              :      * don't forget the segment we last successfully read.
    1032              :      */
    1033         5894 :     state->readLen = 0;
    1034              : 
    1035              :     /*
    1036              :      * Data is not in our buffer.
    1037              :      *
    1038              :      * Every time we actually read the segment, even if we looked at parts of
    1039              :      * it before, we need to do verification as the page_read callback might
    1040              :      * now be rereading data from a different source.
    1041              :      *
    1042              :      * Whenever switching to a new WAL segment, we read the first page of the
    1043              :      * file and validate its header, even if that's not where the target
    1044              :      * record is.  This is so that we can check the additional identification
    1045              :      * info that is present in the first page's "long" header.
    1046              :      */
    1047         5894 :     if (targetSegNo != state->seg.ws_segno && targetPageOff != 0)
    1048              :     {
    1049           26 :         XLogRecPtr  targetSegmentPtr = pageptr - targetPageOff;
    1050              : 
    1051           26 :         readLen = state->routine.page_read(state, targetSegmentPtr, XLOG_BLCKSZ,
    1052              :                                            state->currRecPtr,
    1053              :                                            state->readBuf);
    1054           26 :         if (readLen == XLREAD_WOULDBLOCK)
    1055            0 :             return XLREAD_WOULDBLOCK;
    1056           26 :         else if (readLen < 0)
    1057            0 :             goto err;
    1058              : 
    1059              :         /* we can be sure to have enough WAL available, we scrolled back */
    1060              :         Assert(readLen == XLOG_BLCKSZ);
    1061              : 
    1062           26 :         if (!XLogReaderValidatePageHeader(state, targetSegmentPtr,
    1063              :                                           state->readBuf))
    1064            0 :             goto err;
    1065              :     }
    1066              : 
    1067              :     /*
    1068              :      * First, read the requested data length, but at least a short page header
    1069              :      * so that we can validate it.
    1070              :      */
    1071         5894 :     readLen = state->routine.page_read(state, pageptr, Max(reqLen, SizeOfXLogShortPHD),
    1072              :                                        state->currRecPtr,
    1073              :                                        state->readBuf);
    1074         5894 :     if (readLen == XLREAD_WOULDBLOCK)
    1075            0 :         return XLREAD_WOULDBLOCK;
    1076         5894 :     else if (readLen < 0)
    1077            0 :         goto err;
    1078              : 
    1079              :     Assert(readLen <= XLOG_BLCKSZ);
    1080              : 
    1081              :     /* Do we have enough data to check the header length? */
    1082         5894 :     if (readLen <= SizeOfXLogShortPHD)
    1083            0 :         goto err;
    1084              : 
    1085              :     Assert(readLen >= reqLen);
    1086              : 
    1087         5894 :     hdr = (XLogPageHeader) state->readBuf;
    1088              : 
    1089              :     /* still not enough */
    1090         5894 :     if (readLen < XLogPageHeaderSize(hdr))
    1091              :     {
    1092            0 :         readLen = state->routine.page_read(state, pageptr, XLogPageHeaderSize(hdr),
    1093              :                                            state->currRecPtr,
    1094              :                                            state->readBuf);
    1095            0 :         if (readLen == XLREAD_WOULDBLOCK)
    1096            0 :             return XLREAD_WOULDBLOCK;
    1097            0 :         else if (readLen < 0)
    1098            0 :             goto err;
    1099              :     }
    1100              : 
    1101              :     /*
    1102              :      * Now that we know we have the full header, validate it.
    1103              :      */
    1104         5894 :     if (!XLogReaderValidatePageHeader(state, pageptr, (char *) hdr))
    1105            0 :         goto err;
    1106              : 
    1107              :     /* update read state information */
    1108         5894 :     state->seg.ws_segno = targetSegNo;
    1109         5894 :     state->segoff = targetPageOff;
    1110         5894 :     state->readLen = readLen;
    1111              : 
    1112         5894 :     return readLen;
    1113              : 
    1114            0 : err:
    1115            0 :     XLogReaderInvalReadState(state);
    1116              : 
    1117            0 :     return XLREAD_FAIL;
    1118              : }
    1119              : 
    1120              : /*
    1121              :  * Invalidate the xlogreader's read state to force a re-read.
    1122              :  */
    1123              : static void
    1124            0 : XLogReaderInvalReadState(XLogReaderState *state)
    1125              : {
    1126            0 :     state->seg.ws_segno = 0;
    1127            0 :     state->segoff = 0;
    1128            0 :     state->readLen = 0;
    1129            0 : }
    1130              : 
    1131              : /*
    1132              :  * Validate an XLOG record header.
    1133              :  *
    1134              :  * This is just a convenience subroutine to avoid duplicated code in
    1135              :  * XLogReadRecord.  It's not intended for use from anywhere else.
    1136              :  */
    1137              : static bool
    1138        89168 : ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
    1139              :                       XLogRecPtr PrevRecPtr, XLogRecord *record,
    1140              :                       bool randAccess)
    1141              : {
    1142        89168 :     if (record->xl_tot_len < SizeOfXLogRecord)
    1143              :     {
    1144            0 :         report_invalid_record(state,
    1145              :                               "invalid record length at %X/%08X: expected at least %u, got %u",
    1146            0 :                               LSN_FORMAT_ARGS(RecPtr),
    1147              :                               (uint32) SizeOfXLogRecord, record->xl_tot_len);
    1148            0 :         return false;
    1149              :     }
    1150        89168 :     if (!RmgrIdIsValid(record->xl_rmid))
    1151              :     {
    1152            0 :         report_invalid_record(state,
    1153              :                               "invalid resource manager ID %u at %X/%08X",
    1154            0 :                               record->xl_rmid, LSN_FORMAT_ARGS(RecPtr));
    1155            0 :         return false;
    1156              :     }
    1157        89168 :     if (randAccess)
    1158              :     {
    1159              :         /*
    1160              :          * We can't exactly verify the prev-link, but surely it should be less
    1161              :          * than the record's own address.
    1162              :          */
    1163         2692 :         if (!(record->xl_prev < RecPtr))
    1164              :         {
    1165            0 :             report_invalid_record(state,
    1166              :                                   "record with incorrect prev-link %X/%08X at %X/%08X",
    1167            0 :                                   LSN_FORMAT_ARGS(record->xl_prev),
    1168            0 :                                   LSN_FORMAT_ARGS(RecPtr));
    1169            0 :             return false;
    1170              :         }
    1171              :     }
    1172              :     else
    1173              :     {
    1174              :         /*
    1175              :          * Record's prev-link should exactly match our previous location. This
    1176              :          * check guards against torn WAL pages where a stale but valid-looking
    1177              :          * WAL record starts on a sector boundary.
    1178              :          */
    1179        86476 :         if (record->xl_prev != PrevRecPtr)
    1180              :         {
    1181            0 :             report_invalid_record(state,
    1182              :                                   "record with incorrect prev-link %X/%08X at %X/%08X",
    1183            0 :                                   LSN_FORMAT_ARGS(record->xl_prev),
    1184            0 :                                   LSN_FORMAT_ARGS(RecPtr));
    1185            0 :             return false;
    1186              :         }
    1187              :     }
    1188              : 
    1189        89168 :     return true;
    1190              : }
    1191              : 
    1192              : 
    1193              : /*
    1194              :  * CRC-check an XLOG record.  We do not believe the contents of an XLOG
    1195              :  * record (other than to the minimal extent of computing the amount of
    1196              :  * data to read in) until we've checked the CRCs.
    1197              :  *
    1198              :  * We assume all of the record (that is, xl_tot_len bytes) has been read
    1199              :  * into memory at *record.  Also, ValidXLogRecordHeader() has accepted the
    1200              :  * record's header, which means in particular that xl_tot_len is at least
    1201              :  * SizeOfXLogRecord.
    1202              :  */
    1203              : static bool
    1204        89168 : ValidXLogRecord(XLogReaderState *state, XLogRecord *record, XLogRecPtr recptr)
    1205              : {
    1206              :     pg_crc32c   crc;
    1207              : 
    1208              :     Assert(record->xl_tot_len >= SizeOfXLogRecord);
    1209              : 
    1210              :     /* Calculate the CRC */
    1211        89168 :     INIT_CRC32C(crc);
    1212        89168 :     COMP_CRC32C(crc, ((char *) record) + SizeOfXLogRecord, record->xl_tot_len - SizeOfXLogRecord);
    1213              :     /* include the record header last */
    1214        89168 :     COMP_CRC32C(crc, (char *) record, offsetof(XLogRecord, xl_crc));
    1215        89168 :     FIN_CRC32C(crc);
    1216              : 
    1217        89168 :     if (!EQ_CRC32C(record->xl_crc, crc))
    1218              :     {
    1219            0 :         report_invalid_record(state,
    1220              :                               "incorrect resource manager data checksum in record at %X/%08X",
    1221            0 :                               LSN_FORMAT_ARGS(recptr));
    1222            0 :         return false;
    1223              :     }
    1224              : 
    1225        89168 :     return true;
    1226              : }
    1227              : 
    1228              : /*
    1229              :  * Validate a page header.
    1230              :  *
    1231              :  * Check if 'phdr' is valid as the header of the XLog page at position
    1232              :  * 'recptr'.
    1233              :  */
    1234              : bool
    1235         5920 : XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
    1236              :                              char *phdr)
    1237              : {
    1238              :     XLogSegNo   segno;
    1239              :     int32       offset;
    1240         5920 :     XLogPageHeader hdr = (XLogPageHeader) phdr;
    1241              : 
    1242              :     Assert((recptr % XLOG_BLCKSZ) == 0);
    1243              : 
    1244         5920 :     XLByteToSeg(recptr, segno, state->segcxt.ws_segsize);
    1245         5920 :     offset = XLogSegmentOffset(recptr, state->segcxt.ws_segsize);
    1246              : 
    1247         5920 :     if (hdr->xlp_magic != XLOG_PAGE_MAGIC)
    1248              :     {
    1249              :         char        fname[MAXFNAMELEN];
    1250              : 
    1251            0 :         XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
    1252              : 
    1253            0 :         report_invalid_record(state,
    1254              :                               "invalid magic number %04X in WAL segment %s, LSN %X/%08X, offset %u",
    1255            0 :                               hdr->xlp_magic,
    1256              :                               fname,
    1257            0 :                               LSN_FORMAT_ARGS(recptr),
    1258              :                               offset);
    1259            0 :         return false;
    1260              :     }
    1261              : 
    1262         5920 :     if ((hdr->xlp_info & ~XLP_ALL_FLAGS) != 0)
    1263              :     {
    1264              :         char        fname[MAXFNAMELEN];
    1265              : 
    1266            0 :         XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
    1267              : 
    1268            0 :         report_invalid_record(state,
    1269              :                               "invalid info bits %04X in WAL segment %s, LSN %X/%08X, offset %u",
    1270            0 :                               hdr->xlp_info,
    1271              :                               fname,
    1272            0 :                               LSN_FORMAT_ARGS(recptr),
    1273              :                               offset);
    1274            0 :         return false;
    1275              :     }
    1276              : 
    1277         5920 :     if (hdr->xlp_info & XLP_LONG_HEADER)
    1278              :     {
    1279           69 :         XLogLongPageHeader longhdr = (XLogLongPageHeader) hdr;
    1280              : 
    1281           69 :         if (state->system_identifier &&
    1282            0 :             longhdr->xlp_sysid != state->system_identifier)
    1283              :         {
    1284            0 :             report_invalid_record(state,
    1285              :                                   "WAL file is from different database system: WAL file database system identifier is %" PRIu64 ", pg_control database system identifier is %" PRIu64,
    1286              :                                   longhdr->xlp_sysid,
    1287              :                                   state->system_identifier);
    1288            0 :             return false;
    1289              :         }
    1290           69 :         else if (longhdr->xlp_seg_size != state->segcxt.ws_segsize)
    1291              :         {
    1292            0 :             report_invalid_record(state,
    1293              :                                   "WAL file is from different database system: incorrect segment size in page header");
    1294            0 :             return false;
    1295              :         }
    1296           69 :         else if (longhdr->xlp_xlog_blcksz != XLOG_BLCKSZ)
    1297              :         {
    1298            0 :             report_invalid_record(state,
    1299              :                                   "WAL file is from different database system: incorrect XLOG_BLCKSZ in page header");
    1300            0 :             return false;
    1301              :         }
    1302              :     }
    1303         5851 :     else if (offset == 0)
    1304              :     {
    1305              :         char        fname[MAXFNAMELEN];
    1306              : 
    1307            0 :         XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
    1308              : 
    1309              :         /* hmm, first page of file doesn't have a long header? */
    1310            0 :         report_invalid_record(state,
    1311              :                               "invalid info bits %04X in WAL segment %s, LSN %X/%08X, offset %u",
    1312            0 :                               hdr->xlp_info,
    1313              :                               fname,
    1314            0 :                               LSN_FORMAT_ARGS(recptr),
    1315              :                               offset);
    1316            0 :         return false;
    1317              :     }
    1318              : 
    1319              :     /*
    1320              :      * Check that the address on the page agrees with what we expected. This
    1321              :      * check typically fails when an old WAL segment is recycled, and hasn't
    1322              :      * yet been overwritten with new data yet.
    1323              :      */
    1324         5920 :     if (hdr->xlp_pageaddr != recptr)
    1325              :     {
    1326              :         char        fname[MAXFNAMELEN];
    1327              : 
    1328            0 :         XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
    1329              : 
    1330            0 :         report_invalid_record(state,
    1331              :                               "unexpected pageaddr %X/%08X in WAL segment %s, LSN %X/%08X, offset %u",
    1332            0 :                               LSN_FORMAT_ARGS(hdr->xlp_pageaddr),
    1333              :                               fname,
    1334            0 :                               LSN_FORMAT_ARGS(recptr),
    1335              :                               offset);
    1336            0 :         return false;
    1337              :     }
    1338              : 
    1339              :     /*
    1340              :      * Since child timelines are always assigned a TLI greater than their
    1341              :      * immediate parent's TLI, we should never see TLI go backwards across
    1342              :      * successive pages of a consistent WAL sequence.
    1343              :      *
    1344              :      * Sometimes we re-read a segment that's already been (partially) read. So
    1345              :      * we only verify TLIs for pages that are later than the last remembered
    1346              :      * LSN.
    1347              :      */
    1348         5920 :     if (recptr > state->latestPagePtr)
    1349              :     {
    1350         4572 :         if (hdr->xlp_tli < state->latestPageTLI)
    1351              :         {
    1352              :             char        fname[MAXFNAMELEN];
    1353              : 
    1354            0 :             XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
    1355              : 
    1356            0 :             report_invalid_record(state,
    1357              :                                   "out-of-sequence timeline ID %u (after %u) in WAL segment %s, LSN %X/%08X, offset %u",
    1358              :                                   hdr->xlp_tli,
    1359              :                                   state->latestPageTLI,
    1360              :                                   fname,
    1361            0 :                                   LSN_FORMAT_ARGS(recptr),
    1362              :                                   offset);
    1363            0 :             return false;
    1364              :         }
    1365              :     }
    1366         5920 :     state->latestPagePtr = recptr;
    1367         5920 :     state->latestPageTLI = hdr->xlp_tli;
    1368              : 
    1369         5920 :     return true;
    1370              : }
    1371              : 
    1372              : /*
    1373              :  * Forget about an error produced by XLogReaderValidatePageHeader().
    1374              :  */
    1375              : void
    1376            0 : XLogReaderResetError(XLogReaderState *state)
    1377              : {
    1378            0 :     state->errormsg_buf[0] = '\0';
    1379            0 :     state->errormsg_deferred = false;
    1380            0 : }
    1381              : 
    1382              : /*
    1383              :  * Find the first record with an lsn >= RecPtr.
    1384              :  *
    1385              :  * This is different from XLogBeginRead() in that RecPtr doesn't need to point
    1386              :  * to a valid record boundary.  Useful for checking whether RecPtr is a valid
    1387              :  * xlog address for reading, and to find the first valid address after some
    1388              :  * address when dumping records for debugging purposes.
    1389              :  *
    1390              :  * This positions the reader, like XLogBeginRead(), so that the next call to
    1391              :  * XLogReadRecord() will read the next valid record.
    1392              :  */
    1393              : XLogRecPtr
    1394            0 : XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
    1395              : {
    1396              :     XLogRecPtr  tmpRecPtr;
    1397            0 :     XLogRecPtr  found = InvalidXLogRecPtr;
    1398              :     XLogPageHeader header;
    1399              :     char       *errormsg;
    1400              : 
    1401              :     Assert(XLogRecPtrIsValid(RecPtr));
    1402              : 
    1403              :     /* Make sure ReadPageInternal() can't return XLREAD_WOULDBLOCK. */
    1404            0 :     state->nonblocking = false;
    1405              : 
    1406              :     /*
    1407              :      * skip over potential continuation data, keeping in mind that it may span
    1408              :      * multiple pages
    1409              :      */
    1410            0 :     tmpRecPtr = RecPtr;
    1411              :     while (true)
    1412            0 :     {
    1413              :         XLogRecPtr  targetPagePtr;
    1414              :         int         targetRecOff;
    1415              :         uint32      pageHeaderSize;
    1416              :         int         readLen;
    1417              : 
    1418              :         /*
    1419              :          * Compute targetRecOff. It should typically be equal or greater than
    1420              :          * short page-header since a valid record can't start anywhere before
    1421              :          * that, except when caller has explicitly specified the offset that
    1422              :          * falls somewhere there or when we are skipping multi-page
    1423              :          * continuation record. It doesn't matter though because
    1424              :          * ReadPageInternal() is prepared to handle that and will read at
    1425              :          * least short page-header worth of data
    1426              :          */
    1427            0 :         targetRecOff = tmpRecPtr % XLOG_BLCKSZ;
    1428              : 
    1429              :         /* scroll back to page boundary */
    1430            0 :         targetPagePtr = tmpRecPtr - targetRecOff;
    1431              : 
    1432              :         /* Read the page containing the record */
    1433            0 :         readLen = ReadPageInternal(state, targetPagePtr, targetRecOff);
    1434            0 :         if (readLen < 0)
    1435            0 :             goto err;
    1436              : 
    1437            0 :         header = (XLogPageHeader) state->readBuf;
    1438              : 
    1439            0 :         pageHeaderSize = XLogPageHeaderSize(header);
    1440              : 
    1441              :         /* make sure we have enough data for the page header */
    1442            0 :         readLen = ReadPageInternal(state, targetPagePtr, pageHeaderSize);
    1443            0 :         if (readLen < 0)
    1444            0 :             goto err;
    1445              : 
    1446              :         /* skip over potential continuation data */
    1447            0 :         if (header->xlp_info & XLP_FIRST_IS_CONTRECORD)
    1448              :         {
    1449              :             /*
    1450              :              * If the length of the remaining continuation data is more than
    1451              :              * what can fit in this page, the continuation record crosses over
    1452              :              * this page. Read the next page and try again. xlp_rem_len in the
    1453              :              * next page header will contain the remaining length of the
    1454              :              * continuation data
    1455              :              *
    1456              :              * Note that record headers are MAXALIGN'ed
    1457              :              */
    1458            0 :             if (MAXALIGN(header->xlp_rem_len) >= (XLOG_BLCKSZ - pageHeaderSize))
    1459            0 :                 tmpRecPtr = targetPagePtr + XLOG_BLCKSZ;
    1460              :             else
    1461              :             {
    1462              :                 /*
    1463              :                  * The previous continuation record ends in this page. Set
    1464              :                  * tmpRecPtr to point to the first valid record
    1465              :                  */
    1466            0 :                 tmpRecPtr = targetPagePtr + pageHeaderSize
    1467            0 :                     + MAXALIGN(header->xlp_rem_len);
    1468            0 :                 break;
    1469              :             }
    1470              :         }
    1471              :         else
    1472              :         {
    1473            0 :             tmpRecPtr = targetPagePtr + pageHeaderSize;
    1474            0 :             break;
    1475              :         }
    1476              :     }
    1477              : 
    1478              :     /*
    1479              :      * we know now that tmpRecPtr is an address pointing to a valid XLogRecord
    1480              :      * because either we're at the first record after the beginning of a page
    1481              :      * or we just jumped over the remaining data of a continuation.
    1482              :      */
    1483            0 :     XLogBeginRead(state, tmpRecPtr);
    1484            0 :     while (XLogReadRecord(state, &errormsg) != NULL)
    1485              :     {
    1486              :         /* past the record we've found, break out */
    1487            0 :         if (RecPtr <= state->ReadRecPtr)
    1488              :         {
    1489              :             /* Rewind the reader to the beginning of the last record. */
    1490            0 :             found = state->ReadRecPtr;
    1491            0 :             XLogBeginRead(state, found);
    1492            0 :             return found;
    1493              :         }
    1494              :     }
    1495              : 
    1496            0 : err:
    1497            0 :     XLogReaderInvalReadState(state);
    1498              : 
    1499            0 :     return InvalidXLogRecPtr;
    1500              : }
    1501              : 
    1502              : /*
    1503              :  * Helper function to ease writing of XLogReaderRoutine->page_read callbacks.
    1504              :  * If this function is used, caller must supply a segment_open callback in
    1505              :  * 'state', as that is used here.
    1506              :  *
    1507              :  * Read 'count' bytes into 'buf', starting at location 'startptr', from WAL
    1508              :  * fetched from timeline 'tli'.
    1509              :  *
    1510              :  * Returns true if succeeded, false if an error occurs, in which case
    1511              :  * 'errinfo' receives error details.
    1512              :  */
    1513              : bool
    1514            0 : WALRead(XLogReaderState *state,
    1515              :         char *buf, XLogRecPtr startptr, Size count, TimeLineID tli,
    1516              :         WALReadError *errinfo)
    1517              : {
    1518              :     char       *p;
    1519              :     XLogRecPtr  recptr;
    1520              :     Size        nbytes;
    1521              : #ifndef FRONTEND
    1522              :     instr_time  io_start;
    1523              : #endif
    1524              : 
    1525            0 :     p = buf;
    1526            0 :     recptr = startptr;
    1527            0 :     nbytes = count;
    1528              : 
    1529            0 :     while (nbytes > 0)
    1530              :     {
    1531              :         uint32      startoff;
    1532              :         int         segbytes;
    1533              :         int         readbytes;
    1534              : 
    1535            0 :         startoff = XLogSegmentOffset(recptr, state->segcxt.ws_segsize);
    1536              : 
    1537              :         /*
    1538              :          * If the data we want is not in a segment we have open, close what we
    1539              :          * have (if anything) and open the next one, using the caller's
    1540              :          * provided segment_open callback.
    1541              :          */
    1542            0 :         if (state->seg.ws_file < 0 ||
    1543            0 :             !XLByteInSeg(recptr, state->seg.ws_segno, state->segcxt.ws_segsize) ||
    1544            0 :             tli != state->seg.ws_tli)
    1545              :         {
    1546              :             XLogSegNo   nextSegNo;
    1547              : 
    1548            0 :             if (state->seg.ws_file >= 0)
    1549            0 :                 state->routine.segment_close(state);
    1550              : 
    1551            0 :             XLByteToSeg(recptr, nextSegNo, state->segcxt.ws_segsize);
    1552            0 :             state->routine.segment_open(state, nextSegNo, &tli);
    1553              : 
    1554              :             /* This shouldn't happen -- indicates a bug in segment_open */
    1555              :             Assert(state->seg.ws_file >= 0);
    1556              : 
    1557              :             /* Update the current segment info. */
    1558            0 :             state->seg.ws_tli = tli;
    1559            0 :             state->seg.ws_segno = nextSegNo;
    1560              :         }
    1561              : 
    1562              :         /* How many bytes are within this segment? */
    1563            0 :         if (nbytes > (state->segcxt.ws_segsize - startoff))
    1564            0 :             segbytes = state->segcxt.ws_segsize - startoff;
    1565              :         else
    1566            0 :             segbytes = nbytes;
    1567              : 
    1568              : #ifndef FRONTEND
    1569              :         /* Measure I/O timing when reading segment */
    1570              :         io_start = pgstat_prepare_io_time(track_wal_io_timing);
    1571              : 
    1572              :         pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
    1573              : #endif
    1574              : 
    1575              :         /* Reset errno first; eases reporting non-errno-affecting errors */
    1576            0 :         errno = 0;
    1577            0 :         readbytes = pg_pread(state->seg.ws_file, p, segbytes, (pgoff_t) startoff);
    1578              : 
    1579              : #ifndef FRONTEND
    1580              :         pgstat_report_wait_end();
    1581              : 
    1582              :         pgstat_count_io_op_time(IOOBJECT_WAL, IOCONTEXT_NORMAL, IOOP_READ,
    1583              :                                 io_start, 1, readbytes);
    1584              : #endif
    1585              : 
    1586            0 :         if (readbytes <= 0)
    1587              :         {
    1588            0 :             errinfo->wre_errno = errno;
    1589            0 :             errinfo->wre_req = segbytes;
    1590            0 :             errinfo->wre_read = readbytes;
    1591            0 :             errinfo->wre_off = startoff;
    1592            0 :             errinfo->wre_seg = state->seg;
    1593            0 :             return false;
    1594              :         }
    1595              : 
    1596              :         /* Update state for read */
    1597            0 :         recptr += readbytes;
    1598            0 :         nbytes -= readbytes;
    1599            0 :         p += readbytes;
    1600              :     }
    1601              : 
    1602            0 :     return true;
    1603              : }
    1604              : 
    1605              : /* ----------------------------------------
    1606              :  * Functions for decoding the data and block references in a record.
    1607              :  * ----------------------------------------
    1608              :  */
    1609              : 
    1610              : /*
    1611              :  * Private function to reset the state, forgetting all decoded records, if we
    1612              :  * are asked to move to a new read position.
    1613              :  */
    1614              : static void
    1615         2692 : ResetDecoder(XLogReaderState *state)
    1616              : {
    1617              :     DecodedXLogRecord *r;
    1618              : 
    1619              :     /* Reset the decoded record queue, freeing any oversized records. */
    1620         8031 :     while ((r = state->decode_queue_head) != NULL)
    1621              :     {
    1622         2647 :         state->decode_queue_head = r->next;
    1623         2647 :         if (r->oversized)
    1624            0 :             pfree(r);
    1625              :     }
    1626         2692 :     state->decode_queue_tail = NULL;
    1627         2692 :     state->decode_queue_head = NULL;
    1628         2692 :     state->record = NULL;
    1629              : 
    1630              :     /* Reset the decode buffer to empty. */
    1631         2692 :     state->decode_buffer_tail = state->decode_buffer;
    1632         2692 :     state->decode_buffer_head = state->decode_buffer;
    1633              : 
    1634              :     /* Clear error state. */
    1635         2692 :     state->errormsg_buf[0] = '\0';
    1636         2692 :     state->errormsg_deferred = false;
    1637         2692 : }
    1638              : 
    1639              : /*
    1640              :  * Compute the maximum possible amount of padding that could be required to
    1641              :  * decode a record, given xl_tot_len from the record's header.  This is the
    1642              :  * amount of output buffer space that we need to decode a record, though we
    1643              :  * might not finish up using it all.
    1644              :  *
    1645              :  * This computation is pessimistic and assumes the maximum possible number of
    1646              :  * blocks, due to lack of better information.
    1647              :  */
    1648              : size_t
    1649        89168 : DecodeXLogRecordRequiredSpace(size_t xl_tot_len)
    1650              : {
    1651        89168 :     size_t      size = 0;
    1652              : 
    1653              :     /* Account for the fixed size part of the decoded record struct. */
    1654        89168 :     size += offsetof(DecodedXLogRecord, blocks[0]);
    1655              :     /* Account for the flexible blocks array of maximum possible size. */
    1656        89168 :     size += sizeof(DecodedBkpBlock) * (XLR_MAX_BLOCK_ID + 1);
    1657              :     /* Account for all the raw main and block data. */
    1658        89168 :     size += xl_tot_len;
    1659              :     /* We might insert padding before main_data. */
    1660        89168 :     size += (MAXIMUM_ALIGNOF - 1);
    1661              :     /* We might insert padding before each block's data. */
    1662        89168 :     size += (MAXIMUM_ALIGNOF - 1) * (XLR_MAX_BLOCK_ID + 1);
    1663              :     /* We might insert padding at the end. */
    1664        89168 :     size += (MAXIMUM_ALIGNOF - 1);
    1665              : 
    1666        89168 :     return size;
    1667              : }
    1668              : 
    1669              : /*
    1670              :  * Decode a record.  "decoded" must point to a MAXALIGNed memory area that has
    1671              :  * space for at least DecodeXLogRecordRequiredSpace(record) bytes.  On
    1672              :  * success, decoded->size contains the actual space occupied by the decoded
    1673              :  * record, which may turn out to be less.
    1674              :  *
    1675              :  * Only decoded->oversized member must be initialized already, and will not be
    1676              :  * modified.  Other members will be initialized as required.
    1677              :  *
    1678              :  * On error, a human-readable error message is returned in *errormsg, and
    1679              :  * the return value is false.
    1680              :  */
    1681              : bool
    1682        89168 : DecodeXLogRecord(XLogReaderState *state,
    1683              :                  DecodedXLogRecord *decoded,
    1684              :                  XLogRecord *record,
    1685              :                  XLogRecPtr lsn,
    1686              :                  char **errormsg)
    1687              : {
    1688              :     /*
    1689              :      * read next _size bytes from record buffer, but check for overrun first.
    1690              :      */
    1691              : #define COPY_HEADER_FIELD(_dst, _size)          \
    1692              :     do {                                        \
    1693              :         if (remaining < _size)                   \
    1694              :             goto shortdata_err;                 \
    1695              :         memcpy(_dst, ptr, _size);               \
    1696              :         ptr += _size;                           \
    1697              :         remaining -= _size;                     \
    1698              :     } while(0)
    1699              : 
    1700              :     char       *ptr;
    1701              :     char       *out;
    1702              :     uint32      remaining;
    1703              :     uint32      datatotal;
    1704        89168 :     RelFileLocator *rlocator = NULL;
    1705              :     uint8       block_id;
    1706              : 
    1707        89168 :     decoded->header = *record;
    1708        89168 :     decoded->lsn = lsn;
    1709        89168 :     decoded->next = NULL;
    1710        89168 :     decoded->record_origin = InvalidReplOriginId;
    1711        89168 :     decoded->toplevel_xid = InvalidTransactionId;
    1712        89168 :     decoded->main_data = NULL;
    1713        89168 :     decoded->main_data_len = 0;
    1714        89168 :     decoded->max_block_id = -1;
    1715        89168 :     ptr = (char *) record;
    1716        89168 :     ptr += SizeOfXLogRecord;
    1717        89168 :     remaining = record->xl_tot_len - SizeOfXLogRecord;
    1718              : 
    1719              :     /* Decode the headers */
    1720        89168 :     datatotal = 0;
    1721       176690 :     while (remaining > datatotal)
    1722              :     {
    1723       170591 :         COPY_HEADER_FIELD(&block_id, sizeof(uint8));
    1724              : 
    1725       170591 :         if (block_id == XLR_BLOCK_ID_DATA_SHORT)
    1726              :         {
    1727              :             /* XLogRecordDataHeaderShort */
    1728              :             uint8       main_data_len;
    1729              : 
    1730        83051 :             COPY_HEADER_FIELD(&main_data_len, sizeof(uint8));
    1731              : 
    1732        83051 :             decoded->main_data_len = main_data_len;
    1733        83051 :             datatotal += main_data_len;
    1734        83051 :             break;              /* by convention, the main data fragment is
    1735              :                                  * always last */
    1736              :         }
    1737        87540 :         else if (block_id == XLR_BLOCK_ID_DATA_LONG)
    1738              :         {
    1739              :             /* XLogRecordDataHeaderLong */
    1740              :             uint32      main_data_len;
    1741              : 
    1742           18 :             COPY_HEADER_FIELD(&main_data_len, sizeof(uint32));
    1743           18 :             decoded->main_data_len = main_data_len;
    1744           18 :             datatotal += main_data_len;
    1745           18 :             break;              /* by convention, the main data fragment is
    1746              :                                  * always last */
    1747              :         }
    1748        87522 :         else if (block_id == XLR_BLOCK_ID_ORIGIN)
    1749              :         {
    1750            0 :             COPY_HEADER_FIELD(&decoded->record_origin, sizeof(ReplOriginId));
    1751              :         }
    1752        87522 :         else if (block_id == XLR_BLOCK_ID_TOPLEVEL_XID)
    1753              :         {
    1754            0 :             COPY_HEADER_FIELD(&decoded->toplevel_xid, sizeof(TransactionId));
    1755              :         }
    1756        87522 :         else if (block_id <= XLR_MAX_BLOCK_ID)
    1757              :         {
    1758              :             /* XLogRecordBlockHeader */
    1759              :             DecodedBkpBlock *blk;
    1760              :             uint8       fork_flags;
    1761              : 
    1762              :             /* mark any intervening block IDs as not in use */
    1763        87522 :             for (int i = decoded->max_block_id + 1; i < block_id; ++i)
    1764            0 :                 decoded->blocks[i].in_use = false;
    1765              : 
    1766        87522 :             if (block_id <= decoded->max_block_id)
    1767              :             {
    1768            0 :                 report_invalid_record(state,
    1769              :                                       "out-of-order block_id %u at %X/%08X",
    1770              :                                       block_id,
    1771            0 :                                       LSN_FORMAT_ARGS(state->ReadRecPtr));
    1772            0 :                 goto err;
    1773              :             }
    1774        87522 :             decoded->max_block_id = block_id;
    1775              : 
    1776        87522 :             blk = &decoded->blocks[block_id];
    1777        87522 :             blk->in_use = true;
    1778        87522 :             blk->apply_image = false;
    1779              : 
    1780        87522 :             COPY_HEADER_FIELD(&fork_flags, sizeof(uint8));
    1781        87522 :             blk->forknum = fork_flags & BKPBLOCK_FORK_MASK;
    1782        87522 :             blk->flags = fork_flags;
    1783        87522 :             blk->has_image = ((fork_flags & BKPBLOCK_HAS_IMAGE) != 0);
    1784        87522 :             blk->has_data = ((fork_flags & BKPBLOCK_HAS_DATA) != 0);
    1785              : 
    1786        87522 :             blk->prefetch_buffer = InvalidBuffer;
    1787              : 
    1788        87522 :             COPY_HEADER_FIELD(&blk->data_len, sizeof(uint16));
    1789              :             /* cross-check that the HAS_DATA flag is set iff data_length > 0 */
    1790        87522 :             if (blk->has_data && blk->data_len == 0)
    1791              :             {
    1792            0 :                 report_invalid_record(state,
    1793              :                                       "BKPBLOCK_HAS_DATA set, but no data included at %X/%08X",
    1794            0 :                                       LSN_FORMAT_ARGS(state->ReadRecPtr));
    1795            0 :                 goto err;
    1796              :             }
    1797        87522 :             if (!blk->has_data && blk->data_len != 0)
    1798              :             {
    1799            0 :                 report_invalid_record(state,
    1800              :                                       "BKPBLOCK_HAS_DATA not set, but data length is %d at %X/%08X",
    1801            0 :                                       blk->data_len,
    1802            0 :                                       LSN_FORMAT_ARGS(state->ReadRecPtr));
    1803            0 :                 goto err;
    1804              :             }
    1805        87522 :             datatotal += blk->data_len;
    1806              : 
    1807        87522 :             if (blk->has_image)
    1808              :             {
    1809         6165 :                 COPY_HEADER_FIELD(&blk->bimg_len, sizeof(uint16));
    1810         6165 :                 COPY_HEADER_FIELD(&blk->hole_offset, sizeof(uint16));
    1811         6165 :                 COPY_HEADER_FIELD(&blk->bimg_info, sizeof(uint8));
    1812              : 
    1813         6165 :                 blk->apply_image = ((blk->bimg_info & BKPIMAGE_APPLY) != 0);
    1814              : 
    1815         6165 :                 if (BKPIMAGE_COMPRESSED(blk->bimg_info))
    1816              :                 {
    1817            0 :                     if (blk->bimg_info & BKPIMAGE_HAS_HOLE)
    1818            0 :                         COPY_HEADER_FIELD(&blk->hole_length, sizeof(uint16));
    1819              :                     else
    1820            0 :                         blk->hole_length = 0;
    1821              :                 }
    1822              :                 else
    1823         6165 :                     blk->hole_length = BLCKSZ - blk->bimg_len;
    1824         6165 :                 datatotal += blk->bimg_len;
    1825              : 
    1826              :                 /*
    1827              :                  * cross-check that hole_offset > 0, hole_length > 0 and
    1828              :                  * bimg_len < BLCKSZ if the HAS_HOLE flag is set.
    1829              :                  */
    1830         6165 :                 if ((blk->bimg_info & BKPIMAGE_HAS_HOLE) &&
    1831         6111 :                     (blk->hole_offset == 0 ||
    1832         6111 :                      blk->hole_length == 0 ||
    1833         6111 :                      blk->bimg_len == BLCKSZ))
    1834              :                 {
    1835            0 :                     report_invalid_record(state,
    1836              :                                           "BKPIMAGE_HAS_HOLE set, but hole offset %d length %d block image length %d at %X/%08X",
    1837            0 :                                           blk->hole_offset,
    1838            0 :                                           blk->hole_length,
    1839            0 :                                           blk->bimg_len,
    1840            0 :                                           LSN_FORMAT_ARGS(state->ReadRecPtr));
    1841            0 :                     goto err;
    1842              :                 }
    1843              : 
    1844              :                 /*
    1845              :                  * cross-check that hole_offset == 0 and hole_length == 0 if
    1846              :                  * the HAS_HOLE flag is not set.
    1847              :                  */
    1848         6165 :                 if (!(blk->bimg_info & BKPIMAGE_HAS_HOLE) &&
    1849           54 :                     (blk->hole_offset != 0 || blk->hole_length != 0))
    1850              :                 {
    1851            0 :                     report_invalid_record(state,
    1852              :                                           "BKPIMAGE_HAS_HOLE not set, but hole offset %d length %d at %X/%08X",
    1853            0 :                                           blk->hole_offset,
    1854            0 :                                           blk->hole_length,
    1855            0 :                                           LSN_FORMAT_ARGS(state->ReadRecPtr));
    1856            0 :                     goto err;
    1857              :                 }
    1858              : 
    1859              :                 /*
    1860              :                  * Cross-check that bimg_len < BLCKSZ if it is compressed.
    1861              :                  */
    1862         6165 :                 if (BKPIMAGE_COMPRESSED(blk->bimg_info) &&
    1863            0 :                     blk->bimg_len == BLCKSZ)
    1864              :                 {
    1865            0 :                     report_invalid_record(state,
    1866              :                                           "BKPIMAGE_COMPRESSED set, but block image length %d at %X/%08X",
    1867            0 :                                           blk->bimg_len,
    1868            0 :                                           LSN_FORMAT_ARGS(state->ReadRecPtr));
    1869            0 :                     goto err;
    1870              :                 }
    1871              : 
    1872              :                 /*
    1873              :                  * cross-check that bimg_len = BLCKSZ if neither HAS_HOLE is
    1874              :                  * set nor COMPRESSED().
    1875              :                  */
    1876         6165 :                 if (!(blk->bimg_info & BKPIMAGE_HAS_HOLE) &&
    1877           54 :                     !BKPIMAGE_COMPRESSED(blk->bimg_info) &&
    1878           54 :                     blk->bimg_len != BLCKSZ)
    1879              :                 {
    1880            0 :                     report_invalid_record(state,
    1881              :                                           "neither BKPIMAGE_HAS_HOLE nor BKPIMAGE_COMPRESSED set, but block image length is %d at %X/%08X",
    1882            0 :                                           blk->data_len,
    1883            0 :                                           LSN_FORMAT_ARGS(state->ReadRecPtr));
    1884            0 :                     goto err;
    1885              :                 }
    1886              :             }
    1887        87522 :             if (!(fork_flags & BKPBLOCK_SAME_REL))
    1888              :             {
    1889        87186 :                 COPY_HEADER_FIELD(&blk->rlocator, sizeof(RelFileLocator));
    1890        87186 :                 rlocator = &blk->rlocator;
    1891              :             }
    1892              :             else
    1893              :             {
    1894          336 :                 if (rlocator == NULL)
    1895              :                 {
    1896            0 :                     report_invalid_record(state,
    1897              :                                           "BKPBLOCK_SAME_REL set but no previous rel at %X/%08X",
    1898            0 :                                           LSN_FORMAT_ARGS(state->ReadRecPtr));
    1899            0 :                     goto err;
    1900              :                 }
    1901              : 
    1902          336 :                 blk->rlocator = *rlocator;
    1903              :             }
    1904        87522 :             COPY_HEADER_FIELD(&blk->blkno, sizeof(BlockNumber));
    1905              :         }
    1906              :         else
    1907              :         {
    1908            0 :             report_invalid_record(state,
    1909              :                                   "invalid block_id %u at %X/%08X",
    1910            0 :                                   block_id, LSN_FORMAT_ARGS(state->ReadRecPtr));
    1911            0 :             goto err;
    1912              :         }
    1913              :     }
    1914              : 
    1915        89168 :     if (remaining != datatotal)
    1916            0 :         goto shortdata_err;
    1917              : 
    1918              :     /*
    1919              :      * Ok, we've parsed the fragment headers, and verified that the total
    1920              :      * length of the payload in the fragments is equal to the amount of data
    1921              :      * left.  Copy the data of each fragment to contiguous space after the
    1922              :      * blocks array, inserting alignment padding before the data fragments so
    1923              :      * they can be cast to struct pointers by REDO routines.
    1924              :      */
    1925        89168 :     out = ((char *) decoded) +
    1926        89168 :         offsetof(DecodedXLogRecord, blocks) +
    1927        89168 :         sizeof(decoded->blocks[0]) * (decoded->max_block_id + 1);
    1928              : 
    1929              :     /* block data first */
    1930       176690 :     for (block_id = 0; block_id <= decoded->max_block_id; block_id++)
    1931              :     {
    1932        87522 :         DecodedBkpBlock *blk = &decoded->blocks[block_id];
    1933              : 
    1934        87522 :         if (!blk->in_use)
    1935            0 :             continue;
    1936              : 
    1937              :         Assert(blk->has_image || !blk->apply_image);
    1938              : 
    1939        87522 :         if (blk->has_image)
    1940              :         {
    1941              :             /* no need to align image */
    1942         6165 :             blk->bkp_image = out;
    1943         6165 :             memcpy(out, ptr, blk->bimg_len);
    1944         6165 :             ptr += blk->bimg_len;
    1945         6165 :             out += blk->bimg_len;
    1946              :         }
    1947        87522 :         if (blk->has_data)
    1948              :         {
    1949        40609 :             out = (char *) MAXALIGN(out);
    1950        40609 :             blk->data = out;
    1951        40609 :             memcpy(blk->data, ptr, blk->data_len);
    1952        40609 :             ptr += blk->data_len;
    1953        40609 :             out += blk->data_len;
    1954              :         }
    1955              :     }
    1956              : 
    1957              :     /* and finally, the main data */
    1958        89168 :     if (decoded->main_data_len > 0)
    1959              :     {
    1960        83069 :         out = (char *) MAXALIGN(out);
    1961        83069 :         decoded->main_data = out;
    1962        83069 :         memcpy(decoded->main_data, ptr, decoded->main_data_len);
    1963        83069 :         ptr += decoded->main_data_len;
    1964        83069 :         out += decoded->main_data_len;
    1965              :     }
    1966              : 
    1967              :     /* Report the actual size we used. */
    1968        89168 :     decoded->size = MAXALIGN(out - (char *) decoded);
    1969              :     Assert(DecodeXLogRecordRequiredSpace(record->xl_tot_len) >=
    1970              :            decoded->size);
    1971              : 
    1972        89168 :     return true;
    1973              : 
    1974            0 : shortdata_err:
    1975            0 :     report_invalid_record(state,
    1976              :                           "record with invalid length at %X/%08X",
    1977            0 :                           LSN_FORMAT_ARGS(state->ReadRecPtr));
    1978            0 : err:
    1979            0 :     *errormsg = state->errormsg_buf;
    1980              : 
    1981            0 :     return false;
    1982              : }
    1983              : 
    1984              : /*
    1985              :  * Returns information about the block that a block reference refers to.
    1986              :  *
    1987              :  * This is like XLogRecGetBlockTagExtended, except that the block reference
    1988              :  * must exist and there's no access to prefetch_buffer.
    1989              :  */
    1990              : void
    1991            0 : XLogRecGetBlockTag(XLogReaderState *record, uint8 block_id,
    1992              :                    RelFileLocator *rlocator, ForkNumber *forknum,
    1993              :                    BlockNumber *blknum)
    1994              : {
    1995            0 :     if (!XLogRecGetBlockTagExtended(record, block_id, rlocator, forknum,
    1996              :                                     blknum, NULL))
    1997              :     {
    1998              : #ifndef FRONTEND
    1999              :         elog(ERROR, "could not locate backup block with ID %d in WAL record",
    2000              :              block_id);
    2001              : #else
    2002            0 :         pg_fatal("could not locate backup block with ID %d in WAL record",
    2003              :                  block_id);
    2004              : #endif
    2005              :     }
    2006            0 : }
    2007              : 
    2008              : /*
    2009              :  * Returns information about the block that a block reference refers to,
    2010              :  * optionally including the buffer that the block may already be in.
    2011              :  *
    2012              :  * If the WAL record contains a block reference with the given ID, *rlocator,
    2013              :  * *forknum, *blknum and *prefetch_buffer are filled in (if not NULL), and
    2014              :  * returns true.  Otherwise returns false.
    2015              :  */
    2016              : bool
    2017        85499 : XLogRecGetBlockTagExtended(XLogReaderState *record, uint8 block_id,
    2018              :                            RelFileLocator *rlocator, ForkNumber *forknum,
    2019              :                            BlockNumber *blknum,
    2020              :                            Buffer *prefetch_buffer)
    2021              : {
    2022              :     DecodedBkpBlock *bkpb;
    2023              : 
    2024        85499 :     if (!XLogRecHasBlockRef(record, block_id))
    2025            0 :         return false;
    2026              : 
    2027        85499 :     bkpb = &record->record->blocks[block_id];
    2028        85499 :     if (rlocator)
    2029        85499 :         *rlocator = bkpb->rlocator;
    2030        85499 :     if (forknum)
    2031        85499 :         *forknum = bkpb->forknum;
    2032        85499 :     if (blknum)
    2033        85499 :         *blknum = bkpb->blkno;
    2034        85499 :     if (prefetch_buffer)
    2035            0 :         *prefetch_buffer = bkpb->prefetch_buffer;
    2036        85499 :     return true;
    2037              : }
    2038              : 
    2039              : /*
    2040              :  * Returns the data associated with a block reference, or NULL if there is
    2041              :  * no data (e.g. because a full-page image was taken instead). The returned
    2042              :  * pointer points to a MAXALIGNed buffer.
    2043              :  */
    2044              : char *
    2045            0 : XLogRecGetBlockData(XLogReaderState *record, uint8 block_id, Size *len)
    2046              : {
    2047              :     DecodedBkpBlock *bkpb;
    2048              : 
    2049            0 :     if (block_id > record->record->max_block_id ||
    2050            0 :         !record->record->blocks[block_id].in_use)
    2051            0 :         return NULL;
    2052              : 
    2053            0 :     bkpb = &record->record->blocks[block_id];
    2054              : 
    2055            0 :     if (!bkpb->has_data)
    2056              :     {
    2057            0 :         if (len)
    2058            0 :             *len = 0;
    2059            0 :         return NULL;
    2060              :     }
    2061              :     else
    2062              :     {
    2063            0 :         if (len)
    2064            0 :             *len = bkpb->data_len;
    2065            0 :         return bkpb->data;
    2066              :     }
    2067              : }
    2068              : 
    2069              : /*
    2070              :  * Restore a full-page image from a backup block attached to an XLOG record.
    2071              :  *
    2072              :  * Returns true if a full-page image is restored, and false on failure with
    2073              :  * an error to be consumed by the caller.
    2074              :  */
    2075              : bool
    2076            0 : RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page)
    2077              : {
    2078              :     DecodedBkpBlock *bkpb;
    2079              :     char       *ptr;
    2080              :     PGAlignedBlock tmp;
    2081              : 
    2082            0 :     if (block_id > record->record->max_block_id ||
    2083            0 :         !record->record->blocks[block_id].in_use)
    2084              :     {
    2085            0 :         report_invalid_record(record,
    2086              :                               "could not restore image at %X/%08X with invalid block %d specified",
    2087            0 :                               LSN_FORMAT_ARGS(record->ReadRecPtr),
    2088              :                               block_id);
    2089            0 :         return false;
    2090              :     }
    2091            0 :     if (!record->record->blocks[block_id].has_image)
    2092              :     {
    2093            0 :         report_invalid_record(record, "could not restore image at %X/%08X with invalid state, block %d",
    2094            0 :                               LSN_FORMAT_ARGS(record->ReadRecPtr),
    2095              :                               block_id);
    2096            0 :         return false;
    2097              :     }
    2098              : 
    2099            0 :     bkpb = &record->record->blocks[block_id];
    2100            0 :     ptr = bkpb->bkp_image;
    2101              : 
    2102            0 :     if (BKPIMAGE_COMPRESSED(bkpb->bimg_info))
    2103              :     {
    2104              :         /* If a backup block image is compressed, decompress it */
    2105            0 :         bool        decomp_success = true;
    2106              : 
    2107            0 :         if ((bkpb->bimg_info & BKPIMAGE_COMPRESS_PGLZ) != 0)
    2108              :         {
    2109            0 :             if (pglz_decompress(ptr, bkpb->bimg_len, tmp.data,
    2110            0 :                                 BLCKSZ - bkpb->hole_length, true) < 0)
    2111            0 :                 decomp_success = false;
    2112              :         }
    2113            0 :         else if ((bkpb->bimg_info & BKPIMAGE_COMPRESS_LZ4) != 0)
    2114              :         {
    2115              : #ifdef USE_LZ4
    2116            0 :             if (LZ4_decompress_safe(ptr, tmp.data,
    2117            0 :                                     bkpb->bimg_len, BLCKSZ - bkpb->hole_length) <= 0)
    2118            0 :                 decomp_success = false;
    2119              : #else
    2120              :             report_invalid_record(record, "could not restore image at %X/%08X compressed with %s not supported by build, block %d",
    2121              :                                   LSN_FORMAT_ARGS(record->ReadRecPtr),
    2122              :                                   "LZ4",
    2123              :                                   block_id);
    2124              :             return false;
    2125              : #endif
    2126              :         }
    2127            0 :         else if ((bkpb->bimg_info & BKPIMAGE_COMPRESS_ZSTD) != 0)
    2128              :         {
    2129              : #ifdef USE_ZSTD
    2130              :             size_t      decomp_result = ZSTD_decompress(tmp.data,
    2131              :                                                         BLCKSZ - bkpb->hole_length,
    2132              :                                                         ptr, bkpb->bimg_len);
    2133              : 
    2134              :             if (ZSTD_isError(decomp_result))
    2135              :                 decomp_success = false;
    2136              : #else
    2137            0 :             report_invalid_record(record, "could not restore image at %X/%08X compressed with %s not supported by build, block %d",
    2138            0 :                                   LSN_FORMAT_ARGS(record->ReadRecPtr),
    2139              :                                   "zstd",
    2140              :                                   block_id);
    2141            0 :             return false;
    2142              : #endif
    2143              :         }
    2144              :         else
    2145              :         {
    2146            0 :             report_invalid_record(record, "could not restore image at %X/%08X compressed with unknown method, block %d",
    2147            0 :                                   LSN_FORMAT_ARGS(record->ReadRecPtr),
    2148              :                                   block_id);
    2149            0 :             return false;
    2150              :         }
    2151              : 
    2152            0 :         if (!decomp_success)
    2153              :         {
    2154            0 :             report_invalid_record(record, "could not decompress image at %X/%08X, block %d",
    2155            0 :                                   LSN_FORMAT_ARGS(record->ReadRecPtr),
    2156              :                                   block_id);
    2157            0 :             return false;
    2158              :         }
    2159              : 
    2160            0 :         ptr = tmp.data;
    2161              :     }
    2162              : 
    2163              :     /* generate page, taking into account hole if necessary */
    2164            0 :     if (bkpb->hole_length == 0)
    2165              :     {
    2166            0 :         memcpy(page, ptr, BLCKSZ);
    2167              :     }
    2168              :     else
    2169              :     {
    2170            0 :         memcpy(page, ptr, bkpb->hole_offset);
    2171              :         /* must zero-fill the hole */
    2172            0 :         MemSet(page + bkpb->hole_offset, 0, bkpb->hole_length);
    2173            0 :         memcpy(page + (bkpb->hole_offset + bkpb->hole_length),
    2174            0 :                ptr + bkpb->hole_offset,
    2175            0 :                BLCKSZ - (bkpb->hole_offset + bkpb->hole_length));
    2176              :     }
    2177              : 
    2178            0 :     return true;
    2179              : }
    2180              : 
    2181              : #ifndef FRONTEND
    2182              : 
    2183              : /*
    2184              :  * Extract the FullTransactionId from a WAL record.
    2185              :  */
    2186              : FullTransactionId
    2187              : XLogRecGetFullXid(XLogReaderState *record)
    2188              : {
    2189              :     /*
    2190              :      * This function is only safe during replay, because it depends on the
    2191              :      * replay state.  See AdvanceNextFullTransactionIdPastXid() for more.
    2192              :      */
    2193              :     Assert(AmStartupProcess() || !IsUnderPostmaster);
    2194              : 
    2195              :     return FullTransactionIdFromAllowableAt(TransamVariables->nextXid,
    2196              :                                             XLogRecGetXid(record));
    2197              : }
    2198              : 
    2199              : #endif
        

Generated by: LCOV version 2.0-1