LCOV - code coverage report
Current view: top level - src/bin/pg_waldump - xlogreader.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 69.2 % 725 502
Test Date: 2026-03-11 10:16:42 Functions: 89.3 % 28 25
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * xlogreader.c
       4              :  *      Generic XLog reading facility
       5              :  *
       6              :  * Portions Copyright (c) 2013-2026, PostgreSQL Global Development Group
       7              :  *
       8              :  * IDENTIFICATION
       9              :  *      src/backend/access/transam/xlogreader.c
      10              :  *
      11              :  * NOTES
      12              :  *      See xlogreader.h for more notes on this facility.
      13              :  *
      14              :  *      This file is compiled as both front-end and backend code, so it
      15              :  *      may not use ereport, server-defined static variables, etc.
      16              :  *-------------------------------------------------------------------------
      17              :  */
      18              : #include "postgres.h"
      19              : 
      20              : #include <unistd.h>
      21              : #ifdef USE_LZ4
      22              : #include <lz4.h>
      23              : #endif
      24              : #ifdef USE_ZSTD
      25              : #include <zstd.h>
      26              : #endif
      27              : 
      28              : #include "access/transam.h"
      29              : #include "access/xlog_internal.h"
      30              : #include "access/xlogreader.h"
      31              : #include "access/xlogrecord.h"
      32              : #include "catalog/pg_control.h"
      33              : #include "common/pg_lzcompress.h"
      34              : #include "replication/origin.h"
      35              : 
      36              : #ifndef FRONTEND
      37              : #include "pgstat.h"
      38              : #include "storage/bufmgr.h"
      39              : #include "utils/wait_event.h"
      40              : #else
      41              : #include "common/logging.h"
      42              : #endif
      43              : 
      44              : static void report_invalid_record(XLogReaderState *state, const char *fmt,...)
      45              :             pg_attribute_printf(2, 3);
      46              : static void allocate_recordbuf(XLogReaderState *state, uint32 reclength);
      47              : static int  ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr,
      48              :                              int reqLen);
      49              : static void XLogReaderInvalReadState(XLogReaderState *state);
      50              : static XLogPageReadResult XLogDecodeNextRecord(XLogReaderState *state, bool nonblocking);
      51              : static bool ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
      52              :                                   XLogRecPtr PrevRecPtr, XLogRecord *record, bool randAccess);
      53              : static bool ValidXLogRecord(XLogReaderState *state, XLogRecord *record,
      54              :                             XLogRecPtr recptr);
      55              : static void ResetDecoder(XLogReaderState *state);
      56              : static void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
      57              :                                int segsize, const char *waldir);
      58              : 
      59              : /* size of the buffer allocated for error message. */
      60              : #define MAX_ERRORMSG_LEN 1000
      61              : 
      62              : /*
      63              :  * Default size; large enough that typical users of XLogReader won't often need
      64              :  * to use the 'oversized' memory allocation code path.
      65              :  */
      66              : #define DEFAULT_DECODE_BUFFER_SIZE (64 * 1024)
      67              : 
      68              : /*
      69              :  * Construct a string in state->errormsg_buf explaining what's wrong with
      70              :  * the current record being read.
      71              :  */
      72              : static void
      73            2 : report_invalid_record(XLogReaderState *state, const char *fmt,...)
      74              : {
      75              :     va_list     args;
      76              : 
      77            2 :     fmt = _(fmt);
      78              : 
      79            2 :     va_start(args, fmt);
      80            2 :     vsnprintf(state->errormsg_buf, MAX_ERRORMSG_LEN, fmt, args);
      81            2 :     va_end(args);
      82              : 
      83            2 :     state->errormsg_deferred = true;
      84            2 : }
      85              : 
      86              : /*
      87              :  * Set the size of the decoding buffer.  A pointer to a caller supplied memory
      88              :  * region may also be passed in, in which case non-oversized records will be
      89              :  * decoded there.
      90              :  */
      91              : void
      92            0 : XLogReaderSetDecodeBuffer(XLogReaderState *state, void *buffer, size_t size)
      93              : {
      94              :     Assert(state->decode_buffer == NULL);
      95              : 
      96            0 :     state->decode_buffer = buffer;
      97            0 :     state->decode_buffer_size = size;
      98            0 :     state->decode_buffer_tail = buffer;
      99            0 :     state->decode_buffer_head = buffer;
     100            0 : }
     101              : 
     102              : /*
     103              :  * Allocate and initialize a new XLogReader.
     104              :  *
     105              :  * Returns NULL if the xlogreader couldn't be allocated.
     106              :  */
     107              : XLogReaderState *
     108           65 : XLogReaderAllocate(int wal_segment_size, const char *waldir,
     109              :                    XLogReaderRoutine *routine, void *private_data)
     110              : {
     111              :     XLogReaderState *state;
     112              : 
     113              :     state = (XLogReaderState *)
     114           65 :         palloc_extended(sizeof(XLogReaderState),
     115              :                         MCXT_ALLOC_NO_OOM | MCXT_ALLOC_ZERO);
     116           65 :     if (!state)
     117            0 :         return NULL;
     118              : 
     119              :     /* initialize caller-provided support functions */
     120           65 :     state->routine = *routine;
     121              : 
     122              :     /*
     123              :      * Permanently allocate readBuf.  We do it this way, rather than just
     124              :      * making a static array, for two reasons: (1) no need to waste the
     125              :      * storage in most instantiations of the backend; (2) a static char array
     126              :      * isn't guaranteed to have any particular alignment, whereas
     127              :      * palloc_extended() will provide MAXALIGN'd storage.
     128              :      */
     129           65 :     state->readBuf = (char *) palloc_extended(XLOG_BLCKSZ,
     130              :                                               MCXT_ALLOC_NO_OOM);
     131           65 :     if (!state->readBuf)
     132              :     {
     133            0 :         pfree(state);
     134            0 :         return NULL;
     135              :     }
     136              : 
     137              :     /* Initialize segment info. */
     138           65 :     WALOpenSegmentInit(&state->seg, &state->segcxt, wal_segment_size,
     139              :                        waldir);
     140              : 
     141              :     /* system_identifier initialized to zeroes above */
     142           65 :     state->private_data = private_data;
     143              :     /* ReadRecPtr, EndRecPtr and readLen initialized to zeroes above */
     144           65 :     state->errormsg_buf = palloc_extended(MAX_ERRORMSG_LEN + 1,
     145              :                                           MCXT_ALLOC_NO_OOM);
     146           65 :     if (!state->errormsg_buf)
     147              :     {
     148            0 :         pfree(state->readBuf);
     149            0 :         pfree(state);
     150            0 :         return NULL;
     151              :     }
     152           65 :     state->errormsg_buf[0] = '\0';
     153              : 
     154              :     /*
     155              :      * Allocate an initial readRecordBuf of minimal size, which can later be
     156              :      * enlarged if necessary.
     157              :      */
     158           65 :     allocate_recordbuf(state, 0);
     159           65 :     return state;
     160              : }
     161              : 
     162              : void
     163           63 : XLogReaderFree(XLogReaderState *state)
     164              : {
     165           63 :     if (state->seg.ws_file != -1)
     166           63 :         state->routine.segment_close(state);
     167              : 
     168           63 :     if (state->decode_buffer && state->free_decode_buffer)
     169           63 :         pfree(state->decode_buffer);
     170              : 
     171           63 :     pfree(state->errormsg_buf);
     172           63 :     if (state->readRecordBuf)
     173           63 :         pfree(state->readRecordBuf);
     174           63 :     pfree(state->readBuf);
     175           63 :     pfree(state);
     176           63 : }
     177              : 
     178              : /*
     179              :  * Allocate readRecordBuf to fit a record of at least the given length.
     180              :  *
     181              :  * readRecordBufSize is set to the new buffer size.
     182              :  *
     183              :  * To avoid useless small increases, round its size to a multiple of
     184              :  * XLOG_BLCKSZ, and make sure it's at least 5*Max(BLCKSZ, XLOG_BLCKSZ) to start
     185              :  * with.  (That is enough for all "normal" records, but very large commit or
     186              :  * abort records might need more space.)
     187              :  *
     188              :  * Note: This routine should *never* be called for xl_tot_len until the header
     189              :  * of the record has been fully validated.
     190              :  */
     191              : static void
     192           73 : allocate_recordbuf(XLogReaderState *state, uint32 reclength)
     193              : {
     194           73 :     uint32      newSize = reclength;
     195              : 
     196           73 :     newSize += XLOG_BLCKSZ - (newSize % XLOG_BLCKSZ);
     197           73 :     newSize = Max(newSize, 5 * Max(BLCKSZ, XLOG_BLCKSZ));
     198              : 
     199           73 :     if (state->readRecordBuf)
     200            8 :         pfree(state->readRecordBuf);
     201           73 :     state->readRecordBuf = (char *) palloc(newSize);
     202           73 :     state->readRecordBufSize = newSize;
     203           73 : }
     204              : 
     205              : /*
     206              :  * Initialize the passed segment structs.
     207              :  */
     208              : static void
     209           65 : WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
     210              :                    int segsize, const char *waldir)
     211              : {
     212           65 :     seg->ws_file = -1;
     213           65 :     seg->ws_segno = 0;
     214           65 :     seg->ws_tli = 0;
     215              : 
     216           65 :     segcxt->ws_segsize = segsize;
     217           65 :     if (waldir)
     218           65 :         snprintf(segcxt->ws_dir, MAXPGPATH, "%s", waldir);
     219           65 : }
     220              : 
     221              : /*
     222              :  * Begin reading WAL at 'RecPtr'.
     223              :  *
     224              :  * 'RecPtr' should point to the beginning of a valid WAL record.  Pointing at
     225              :  * the beginning of a page is also OK, if there is a new record right after
     226              :  * the page header, i.e. not a continuation.
     227              :  *
     228              :  * This does not make any attempt to read the WAL yet, and hence cannot fail.
     229              :  * If the starting address is not correct, the first call to XLogReadRecord()
     230              :  * will error out.
     231              :  */
     232              : void
     233          130 : XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
     234              : {
     235              :     Assert(XLogRecPtrIsValid(RecPtr));
     236              : 
     237          130 :     ResetDecoder(state);
     238              : 
     239              :     /* Begin at the passed-in record pointer. */
     240          130 :     state->EndRecPtr = RecPtr;
     241          130 :     state->NextRecPtr = RecPtr;
     242          130 :     state->ReadRecPtr = InvalidXLogRecPtr;
     243          130 :     state->DecodeRecPtr = InvalidXLogRecPtr;
     244          130 : }
     245              : 
     246              : /*
     247              :  * Release the last record that was returned by XLogNextRecord(), if any, to
     248              :  * free up space.  Returns the LSN past the end of the record.
     249              :  */
     250              : XLogRecPtr
     251      1231504 : XLogReleasePreviousRecord(XLogReaderState *state)
     252              : {
     253              :     DecodedXLogRecord *record;
     254              :     XLogRecPtr  next_lsn;
     255              : 
     256      1231504 :     if (!state->record)
     257       615882 :         return InvalidXLogRecPtr;
     258              : 
     259              :     /*
     260              :      * Remove it from the decoded record queue.  It must be the oldest item
     261              :      * decoded, decode_queue_head.
     262              :      */
     263       615622 :     record = state->record;
     264       615622 :     next_lsn = record->next_lsn;
     265              :     Assert(record == state->decode_queue_head);
     266       615622 :     state->record = NULL;
     267       615622 :     state->decode_queue_head = record->next;
     268              : 
     269              :     /* It might also be the newest item decoded, decode_queue_tail. */
     270       615622 :     if (state->decode_queue_tail == record)
     271       615622 :         state->decode_queue_tail = NULL;
     272              : 
     273              :     /* Release the space. */
     274       615622 :     if (unlikely(record->oversized))
     275              :     {
     276              :         /* It's not in the decode buffer, so free it to release space. */
     277           12 :         pfree(record);
     278              :     }
     279              :     else
     280              :     {
     281              :         /* It must be the head (oldest) record in the decode buffer. */
     282              :         Assert(state->decode_buffer_head == (char *) record);
     283              : 
     284              :         /*
     285              :          * We need to update head to point to the next record that is in the
     286              :          * decode buffer, if any, being careful to skip oversized ones
     287              :          * (they're not in the decode buffer).
     288              :          */
     289       615610 :         record = record->next;
     290       615610 :         while (unlikely(record && record->oversized))
     291            0 :             record = record->next;
     292              : 
     293       615610 :         if (record)
     294              :         {
     295              :             /* Adjust head to release space up to the next record. */
     296            0 :             state->decode_buffer_head = (char *) record;
     297              :         }
     298              :         else
     299              :         {
     300              :             /*
     301              :              * Otherwise we might as well just reset head and tail to the
     302              :              * start of the buffer space, because we're empty.  This means
     303              :              * we'll keep overwriting the same piece of memory if we're not
     304              :              * doing any prefetching.
     305              :              */
     306       615610 :             state->decode_buffer_head = state->decode_buffer;
     307       615610 :             state->decode_buffer_tail = state->decode_buffer;
     308              :         }
     309              :     }
     310              : 
     311       615622 :     return next_lsn;
     312              : }
     313              : 
     314              : /*
     315              :  * Attempt to read an XLOG record.
     316              :  *
     317              :  * XLogBeginRead() or XLogFindNextRecord() and then XLogReadAhead() must be
     318              :  * called before the first call to XLogNextRecord().  This functions returns
     319              :  * records and errors that were put into an internal queue by XLogReadAhead().
     320              :  *
     321              :  * On success, a record is returned.
     322              :  *
     323              :  * The returned record (or *errormsg) points to an internal buffer that's
     324              :  * valid until the next call to XLogNextRecord.
     325              :  */
     326              : DecodedXLogRecord *
     327       615752 : XLogNextRecord(XLogReaderState *state, char **errormsg)
     328              : {
     329              :     /* Release the last record returned by XLogNextRecord(). */
     330       615752 :     XLogReleasePreviousRecord(state);
     331              : 
     332       615752 :     if (state->decode_queue_head == NULL)
     333              :     {
     334           64 :         *errormsg = NULL;
     335           64 :         if (state->errormsg_deferred)
     336              :         {
     337            2 :             if (state->errormsg_buf[0] != '\0')
     338            2 :                 *errormsg = state->errormsg_buf;
     339            2 :             state->errormsg_deferred = false;
     340              :         }
     341              : 
     342              :         /*
     343              :          * state->EndRecPtr is expected to have been set by the last call to
     344              :          * XLogBeginRead() or XLogNextRecord(), and is the location of the
     345              :          * error.
     346              :          */
     347              :         Assert(XLogRecPtrIsValid(state->EndRecPtr));
     348              : 
     349           64 :         return NULL;
     350              :     }
     351              : 
     352              :     /*
     353              :      * Record this as the most recent record returned, so that we'll release
     354              :      * it next time.  This also exposes it to the traditional
     355              :      * XLogRecXXX(xlogreader) macros, which work with the decoder rather than
     356              :      * the record for historical reasons.
     357              :      */
     358       615688 :     state->record = state->decode_queue_head;
     359              : 
     360              :     /*
     361              :      * Update the pointers to the beginning and one-past-the-end of this
     362              :      * record, again for the benefit of historical code that expected the
     363              :      * decoder to track this rather than accessing these fields of the record
     364              :      * itself.
     365              :      */
     366       615688 :     state->ReadRecPtr = state->record->lsn;
     367       615688 :     state->EndRecPtr = state->record->next_lsn;
     368              : 
     369       615688 :     *errormsg = NULL;
     370              : 
     371       615688 :     return state->record;
     372              : }
     373              : 
     374              : /*
     375              :  * Attempt to read an XLOG record.
     376              :  *
     377              :  * XLogBeginRead() or XLogFindNextRecord() must be called before the first call
     378              :  * to XLogReadRecord().
     379              :  *
     380              :  * If the page_read callback fails to read the requested data, NULL is
     381              :  * returned.  The callback is expected to have reported the error; errormsg
     382              :  * is set to NULL.
     383              :  *
     384              :  * If the reading fails for some other reason, NULL is also returned, and
     385              :  * *errormsg is set to a string with details of the failure.
     386              :  *
     387              :  * The returned pointer (or *errormsg) points to an internal buffer that's
     388              :  * valid until the next call to XLogReadRecord.
     389              :  */
     390              : XLogRecord *
     391       615752 : XLogReadRecord(XLogReaderState *state, char **errormsg)
     392              : {
     393              :     DecodedXLogRecord *decoded;
     394              : 
     395              :     /*
     396              :      * Release last returned record, if there is one.  We need to do this so
     397              :      * that we can check for empty decode queue accurately.
     398              :      */
     399       615752 :     XLogReleasePreviousRecord(state);
     400              : 
     401              :     /*
     402              :      * Call XLogReadAhead() in blocking mode to make sure there is something
     403              :      * in the queue, though we don't use the result.
     404              :      */
     405       615752 :     if (!XLogReaderHasQueuedRecordOrError(state))
     406       615752 :         XLogReadAhead(state, false /* nonblocking */ );
     407              : 
     408              :     /* Consume the head record or error. */
     409       615752 :     decoded = XLogNextRecord(state, errormsg);
     410       615752 :     if (decoded)
     411              :     {
     412              :         /*
     413              :          * This function returns a pointer to the record's header, not the
     414              :          * actual decoded record.  The caller will access the decoded record
     415              :          * through the XLogRecGetXXX() macros, which reach the decoded
     416              :          * recorded as xlogreader->record.
     417              :          */
     418              :         Assert(state->record == decoded);
     419       615688 :         return &decoded->header;
     420              :     }
     421              : 
     422           64 :     return NULL;
     423              : }
     424              : 
     425              : /*
     426              :  * Allocate space for a decoded record.  The only member of the returned
     427              :  * object that is initialized is the 'oversized' flag, indicating that the
     428              :  * decoded record wouldn't fit in the decode buffer and must eventually be
     429              :  * freed explicitly.
     430              :  *
     431              :  * The caller is responsible for adjusting decode_buffer_tail with the real
     432              :  * size after successfully decoding a record into this space.  This way, if
     433              :  * decoding fails, then there is nothing to undo unless the 'oversized' flag
     434              :  * was set and pfree() must be called.
     435              :  *
     436              :  * Return NULL if there is no space in the decode buffer and allow_oversized
     437              :  * is false, or if memory allocation fails for an oversized buffer.
     438              :  */
     439              : static DecodedXLogRecord *
     440       615700 : XLogReadRecordAlloc(XLogReaderState *state, size_t xl_tot_len, bool allow_oversized)
     441              : {
     442       615700 :     size_t      required_space = DecodeXLogRecordRequiredSpace(xl_tot_len);
     443       615700 :     DecodedXLogRecord *decoded = NULL;
     444              : 
     445              :     /* Allocate a circular decode buffer if we don't have one already. */
     446       615700 :     if (unlikely(state->decode_buffer == NULL))
     447              :     {
     448           65 :         if (state->decode_buffer_size == 0)
     449           65 :             state->decode_buffer_size = DEFAULT_DECODE_BUFFER_SIZE;
     450           65 :         state->decode_buffer = palloc(state->decode_buffer_size);
     451           65 :         state->decode_buffer_head = state->decode_buffer;
     452           65 :         state->decode_buffer_tail = state->decode_buffer;
     453           65 :         state->free_decode_buffer = true;
     454              :     }
     455              : 
     456              :     /* Try to allocate space in the circular decode buffer. */
     457       615700 :     if (state->decode_buffer_tail >= state->decode_buffer_head)
     458              :     {
     459              :         /* Empty, or tail is to the right of head. */
     460       615700 :         if (required_space <=
     461       615700 :             state->decode_buffer_size -
     462       615700 :             (state->decode_buffer_tail - state->decode_buffer))
     463              :         {
     464              :             /*-
     465              :              * There is space between tail and end.
     466              :              *
     467              :              * +-----+--------------------+-----+
     468              :              * |     |////////////////////|here!|
     469              :              * +-----+--------------------+-----+
     470              :              *       ^                    ^
     471              :              *       |                    |
     472              :              *       h                    t
     473              :              */
     474       615676 :             decoded = (DecodedXLogRecord *) state->decode_buffer_tail;
     475       615676 :             decoded->oversized = false;
     476       615676 :             return decoded;
     477              :         }
     478           24 :         else if (required_space <
     479           24 :                  state->decode_buffer_head - state->decode_buffer)
     480              :         {
     481              :             /*-
     482              :              * There is space between start and head.
     483              :              *
     484              :              * +-----+--------------------+-----+
     485              :              * |here!|////////////////////|     |
     486              :              * +-----+--------------------+-----+
     487              :              *       ^                    ^
     488              :              *       |                    |
     489              :              *       h                    t
     490              :              */
     491            0 :             decoded = (DecodedXLogRecord *) state->decode_buffer;
     492            0 :             decoded->oversized = false;
     493            0 :             return decoded;
     494              :         }
     495              :     }
     496              :     else
     497              :     {
     498              :         /* Tail is to the left of head. */
     499            0 :         if (required_space <
     500            0 :             state->decode_buffer_head - state->decode_buffer_tail)
     501              :         {
     502              :             /*-
     503              :              * There is space between tail and head.
     504              :              *
     505              :              * +-----+--------------------+-----+
     506              :              * |/////|here!               |/////|
     507              :              * +-----+--------------------+-----+
     508              :              *       ^                    ^
     509              :              *       |                    |
     510              :              *       t                    h
     511              :              */
     512            0 :             decoded = (DecodedXLogRecord *) state->decode_buffer_tail;
     513            0 :             decoded->oversized = false;
     514            0 :             return decoded;
     515              :         }
     516              :     }
     517              : 
     518              :     /* Not enough space in the decode buffer.  Are we allowed to allocate? */
     519           24 :     if (allow_oversized)
     520              :     {
     521           12 :         decoded = palloc(required_space);
     522           12 :         decoded->oversized = true;
     523           12 :         return decoded;
     524              :     }
     525              : 
     526           12 :     return NULL;
     527              : }
     528              : 
     529              : static XLogPageReadResult
     530       615752 : XLogDecodeNextRecord(XLogReaderState *state, bool nonblocking)
     531              : {
     532              :     XLogRecPtr  RecPtr;
     533              :     XLogRecord *record;
     534              :     XLogRecPtr  targetPagePtr;
     535              :     bool        randAccess;
     536              :     uint32      len,
     537              :                 total_len;
     538              :     uint32      targetRecOff;
     539              :     uint32      pageHeaderSize;
     540              :     bool        assembled;
     541              :     bool        gotheader;
     542              :     int         readOff;
     543              :     DecodedXLogRecord *decoded;
     544              :     char       *errormsg;       /* not used */
     545              : 
     546              :     /*
     547              :      * randAccess indicates whether to verify the previous-record pointer of
     548              :      * the record we're reading.  We only do this if we're reading
     549              :      * sequentially, which is what we initially assume.
     550              :      */
     551       615752 :     randAccess = false;
     552              : 
     553              :     /* reset error state */
     554       615752 :     state->errormsg_buf[0] = '\0';
     555       615752 :     decoded = NULL;
     556              : 
     557       615752 :     state->abortedRecPtr = InvalidXLogRecPtr;
     558       615752 :     state->missingContrecPtr = InvalidXLogRecPtr;
     559              : 
     560       615752 :     RecPtr = state->NextRecPtr;
     561              : 
     562       615752 :     if (XLogRecPtrIsValid(state->DecodeRecPtr))
     563              :     {
     564              :         /* read the record after the one we just read */
     565              : 
     566              :         /*
     567              :          * NextRecPtr is pointing to end+1 of the previous WAL record.  If
     568              :          * we're at a page boundary, no more records can fit on the current
     569              :          * page. We must skip over the page header, but we can't do that until
     570              :          * we've read in the page, since the header size is variable.
     571              :          */
     572              :     }
     573              :     else
     574              :     {
     575              :         /*
     576              :          * Caller supplied a position to start at.
     577              :          *
     578              :          * In this case, NextRecPtr should already be pointing either to a
     579              :          * valid record starting position or alternatively to the beginning of
     580              :          * a page. See the header comments for XLogBeginRead.
     581              :          */
     582              :         Assert(RecPtr % XLOG_BLCKSZ == 0 || XRecOffIsValid(RecPtr));
     583          130 :         randAccess = true;
     584              :     }
     585              : 
     586       615752 : restart:
     587       615752 :     state->nonblocking = nonblocking;
     588       615752 :     state->currRecPtr = RecPtr;
     589       615752 :     assembled = false;
     590              : 
     591       615752 :     targetPagePtr = RecPtr - (RecPtr % XLOG_BLCKSZ);
     592       615752 :     targetRecOff = RecPtr % XLOG_BLCKSZ;
     593              : 
     594              :     /*
     595              :      * Read the page containing the record into state->readBuf. Request enough
     596              :      * byte to cover the whole record header, or at least the part of it that
     597              :      * fits on the same page.
     598              :      */
     599       615752 :     readOff = ReadPageInternal(state, targetPagePtr,
     600       615752 :                                Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ));
     601       615752 :     if (readOff == XLREAD_WOULDBLOCK)
     602            0 :         return XLREAD_WOULDBLOCK;
     603       615752 :     else if (readOff < 0)
     604           62 :         goto err;
     605              : 
     606              :     /*
     607              :      * ReadPageInternal always returns at least the page header, so we can
     608              :      * examine it now.
     609              :      */
     610       615690 :     pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf);
     611       615690 :     if (targetRecOff == 0)
     612              :     {
     613              :         /*
     614              :          * At page start, so skip over page header.
     615              :          */
     616          567 :         RecPtr += pageHeaderSize;
     617          567 :         targetRecOff = pageHeaderSize;
     618              :     }
     619       615123 :     else if (targetRecOff < pageHeaderSize)
     620              :     {
     621            0 :         report_invalid_record(state, "invalid record offset at %X/%08X: expected at least %u, got %u",
     622            0 :                               LSN_FORMAT_ARGS(RecPtr),
     623              :                               pageHeaderSize, targetRecOff);
     624            0 :         goto err;
     625              :     }
     626              : 
     627       615690 :     if ((((XLogPageHeader) state->readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) &&
     628              :         targetRecOff == pageHeaderSize)
     629              :     {
     630            0 :         report_invalid_record(state, "contrecord is requested by %X/%08X",
     631            0 :                               LSN_FORMAT_ARGS(RecPtr));
     632            0 :         goto err;
     633              :     }
     634              : 
     635              :     /* ReadPageInternal has verified the page header */
     636              :     Assert(pageHeaderSize <= readOff);
     637              : 
     638              :     /*
     639              :      * Read the record length.
     640              :      *
     641              :      * NB: Even though we use an XLogRecord pointer here, the whole record
     642              :      * header might not fit on this page. xl_tot_len is the first field of the
     643              :      * struct, so it must be on this page (the records are MAXALIGNed), but we
     644              :      * cannot access any other fields until we've verified that we got the
     645              :      * whole header.
     646              :      */
     647       615690 :     record = (XLogRecord *) (state->readBuf + RecPtr % XLOG_BLCKSZ);
     648       615690 :     total_len = record->xl_tot_len;
     649              : 
     650              :     /*
     651              :      * If the whole record header is on this page, validate it immediately.
     652              :      * Otherwise do just a basic sanity check on xl_tot_len, and validate the
     653              :      * rest of the header after reading it from the next page.  The xl_tot_len
     654              :      * check is necessary here to ensure that we enter the "Need to reassemble
     655              :      * record" code path below; otherwise we might fail to apply
     656              :      * ValidXLogRecordHeader at all.
     657              :      */
     658       615690 :     if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord)
     659              :     {
     660       614519 :         if (!ValidXLogRecordHeader(state, RecPtr, state->DecodeRecPtr, record,
     661              :                                    randAccess))
     662            2 :             goto err;
     663       614517 :         gotheader = true;
     664              :     }
     665              :     else
     666              :     {
     667              :         /* There may be no next page if it's too small. */
     668         1171 :         if (total_len < SizeOfXLogRecord)
     669              :         {
     670            0 :             report_invalid_record(state,
     671              :                                   "invalid record length at %X/%08X: expected at least %u, got %u",
     672            0 :                                   LSN_FORMAT_ARGS(RecPtr),
     673              :                                   (uint32) SizeOfXLogRecord, total_len);
     674            0 :             goto err;
     675              :         }
     676              :         /* We'll validate the header once we have the next page. */
     677         1171 :         gotheader = false;
     678              :     }
     679              : 
     680              :     /*
     681              :      * Try to find space to decode this record, if we can do so without
     682              :      * calling palloc.  If we can't, we'll try again below after we've
     683              :      * validated that total_len isn't garbage bytes from a recycled WAL page.
     684              :      */
     685       615688 :     decoded = XLogReadRecordAlloc(state,
     686              :                                   total_len,
     687              :                                   false /* allow_oversized */ );
     688       615688 :     if (decoded == NULL && nonblocking)
     689              :     {
     690              :         /*
     691              :          * There is no space in the circular decode buffer, and the caller is
     692              :          * only reading ahead.  The caller should consume existing records to
     693              :          * make space.
     694              :          */
     695            0 :         return XLREAD_WOULDBLOCK;
     696              :     }
     697              : 
     698       615688 :     len = XLOG_BLCKSZ - RecPtr % XLOG_BLCKSZ;
     699       615688 :     if (total_len > len)
     700              :     {
     701              :         /* Need to reassemble record */
     702              :         char       *contdata;
     703              :         XLogPageHeader pageHeader;
     704              :         char       *buffer;
     705              :         uint32      gotlen;
     706              : 
     707        17444 :         assembled = true;
     708              : 
     709              :         /*
     710              :          * We always have space for a couple of pages, enough to validate a
     711              :          * boundary-spanning record header.
     712              :          */
     713              :         Assert(state->readRecordBufSize >= XLOG_BLCKSZ * 2);
     714              :         Assert(state->readRecordBufSize >= len);
     715              : 
     716              :         /* Copy the first fragment of the record from the first page. */
     717        17444 :         memcpy(state->readRecordBuf,
     718        17444 :                state->readBuf + RecPtr % XLOG_BLCKSZ, len);
     719        17444 :         buffer = state->readRecordBuf + len;
     720        17444 :         gotlen = len;
     721              : 
     722              :         do
     723              :         {
     724              :             /* Calculate pointer to beginning of next page */
     725        17772 :             targetPagePtr += XLOG_BLCKSZ;
     726              : 
     727              :             /*
     728              :              * Read the page header before processing the record data, so we
     729              :              * can handle the case where the previous record ended as being a
     730              :              * partial one.
     731              :              */
     732        17772 :             readOff = ReadPageInternal(state, targetPagePtr, SizeOfXLogShortPHD);
     733        17772 :             if (readOff == XLREAD_WOULDBLOCK)
     734            0 :                 return XLREAD_WOULDBLOCK;
     735        17772 :             else if (readOff < 0)
     736            0 :                 goto err;
     737              : 
     738              :             Assert(SizeOfXLogShortPHD <= readOff);
     739              : 
     740        17772 :             pageHeader = (XLogPageHeader) state->readBuf;
     741              : 
     742              :             /*
     743              :              * If we were expecting a continuation record and got an
     744              :              * "overwrite contrecord" flag, that means the continuation record
     745              :              * was overwritten with a different record.  Restart the read by
     746              :              * assuming the address to read is the location where we found
     747              :              * this flag; but keep track of the LSN of the record we were
     748              :              * reading, for later verification.
     749              :              */
     750        17772 :             if (pageHeader->xlp_info & XLP_FIRST_IS_OVERWRITE_CONTRECORD)
     751              :             {
     752            0 :                 state->overwrittenRecPtr = RecPtr;
     753            0 :                 RecPtr = targetPagePtr;
     754            0 :                 goto restart;
     755              :             }
     756              : 
     757              :             /* Check that the continuation on next page looks valid */
     758        17772 :             if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD))
     759              :             {
     760            0 :                 report_invalid_record(state,
     761              :                                       "there is no contrecord flag at %X/%08X",
     762            0 :                                       LSN_FORMAT_ARGS(RecPtr));
     763            0 :                 goto err;
     764              :             }
     765              : 
     766              :             /*
     767              :              * Cross-check that xlp_rem_len agrees with how much of the record
     768              :              * we expect there to be left.
     769              :              */
     770        17772 :             if (pageHeader->xlp_rem_len == 0 ||
     771        17772 :                 total_len != (pageHeader->xlp_rem_len + gotlen))
     772              :             {
     773            0 :                 report_invalid_record(state,
     774              :                                       "invalid contrecord length %u (expected %lld) at %X/%08X",
     775              :                                       pageHeader->xlp_rem_len,
     776            0 :                                       ((long long) total_len) - gotlen,
     777            0 :                                       LSN_FORMAT_ARGS(RecPtr));
     778            0 :                 goto err;
     779              :             }
     780              : 
     781              :             /* Wait for the next page to become available */
     782        17772 :             readOff = ReadPageInternal(state, targetPagePtr,
     783        17772 :                                        Min(total_len - gotlen + SizeOfXLogShortPHD,
     784              :                                            XLOG_BLCKSZ));
     785        17772 :             if (readOff == XLREAD_WOULDBLOCK)
     786            0 :                 return XLREAD_WOULDBLOCK;
     787        17772 :             else if (readOff < 0)
     788            0 :                 goto err;
     789              : 
     790              :             /* Append the continuation from this page to the buffer */
     791        17772 :             pageHeaderSize = XLogPageHeaderSize(pageHeader);
     792              : 
     793        17772 :             if (readOff < pageHeaderSize)
     794            0 :                 readOff = ReadPageInternal(state, targetPagePtr,
     795              :                                            pageHeaderSize);
     796              : 
     797              :             Assert(pageHeaderSize <= readOff);
     798              : 
     799        17772 :             contdata = (char *) state->readBuf + pageHeaderSize;
     800        17772 :             len = XLOG_BLCKSZ - pageHeaderSize;
     801        17772 :             if (pageHeader->xlp_rem_len < len)
     802        17444 :                 len = pageHeader->xlp_rem_len;
     803              : 
     804        17772 :             if (readOff < pageHeaderSize + len)
     805            0 :                 readOff = ReadPageInternal(state, targetPagePtr,
     806            0 :                                            pageHeaderSize + len);
     807              : 
     808        17772 :             memcpy(buffer, contdata, len);
     809        17772 :             buffer += len;
     810        17772 :             gotlen += len;
     811              : 
     812              :             /* If we just reassembled the record header, validate it. */
     813        17772 :             if (!gotheader)
     814              :             {
     815         1171 :                 record = (XLogRecord *) state->readRecordBuf;
     816         1171 :                 if (!ValidXLogRecordHeader(state, RecPtr, state->DecodeRecPtr,
     817              :                                            record, randAccess))
     818            0 :                     goto err;
     819         1171 :                 gotheader = true;
     820              :             }
     821              : 
     822              :             /*
     823              :              * We might need a bigger buffer.  We have validated the record
     824              :              * header, in the case that it split over a page boundary.  We've
     825              :              * also cross-checked total_len against xlp_rem_len on the second
     826              :              * page, and verified xlp_pageaddr on both.
     827              :              */
     828        17772 :             if (total_len > state->readRecordBufSize)
     829              :             {
     830              :                 char        save_copy[XLOG_BLCKSZ * 2];
     831              : 
     832              :                 /*
     833              :                  * Save and restore the data we already had.  It can't be more
     834              :                  * than two pages.
     835              :                  */
     836              :                 Assert(gotlen <= lengthof(save_copy));
     837              :                 Assert(gotlen <= state->readRecordBufSize);
     838            8 :                 memcpy(save_copy, state->readRecordBuf, gotlen);
     839            8 :                 allocate_recordbuf(state, total_len);
     840            8 :                 memcpy(state->readRecordBuf, save_copy, gotlen);
     841            8 :                 buffer = state->readRecordBuf + gotlen;
     842              :             }
     843        17772 :         } while (gotlen < total_len);
     844              :         Assert(gotheader);
     845              : 
     846        17444 :         record = (XLogRecord *) state->readRecordBuf;
     847        17444 :         if (!ValidXLogRecord(state, record, RecPtr))
     848            0 :             goto err;
     849              : 
     850        17444 :         pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf);
     851        17444 :         state->DecodeRecPtr = RecPtr;
     852        17444 :         state->NextRecPtr = targetPagePtr + pageHeaderSize
     853        17444 :             + MAXALIGN(pageHeader->xlp_rem_len);
     854              :     }
     855              :     else
     856              :     {
     857              :         /* Wait for the record data to become available */
     858       598244 :         readOff = ReadPageInternal(state, targetPagePtr,
     859       598244 :                                    Min(targetRecOff + total_len, XLOG_BLCKSZ));
     860       598244 :         if (readOff == XLREAD_WOULDBLOCK)
     861            0 :             return XLREAD_WOULDBLOCK;
     862       598244 :         else if (readOff < 0)
     863            0 :             goto err;
     864              : 
     865              :         /* Record does not cross a page boundary */
     866       598244 :         if (!ValidXLogRecord(state, record, RecPtr))
     867            0 :             goto err;
     868              : 
     869       598244 :         state->NextRecPtr = RecPtr + MAXALIGN(total_len);
     870              : 
     871       598244 :         state->DecodeRecPtr = RecPtr;
     872              :     }
     873              : 
     874              :     /*
     875              :      * Special processing if it's an XLOG SWITCH record
     876              :      */
     877       615688 :     if (record->xl_rmid == RM_XLOG_ID &&
     878        16735 :         (record->xl_info & ~XLR_INFO_MASK) == XLOG_SWITCH)
     879              :     {
     880              :         /* Pretend it extends to end of segment */
     881            7 :         state->NextRecPtr += state->segcxt.ws_segsize - 1;
     882            7 :         state->NextRecPtr -= XLogSegmentOffset(state->NextRecPtr, state->segcxt.ws_segsize);
     883              :     }
     884              : 
     885              :     /*
     886              :      * If we got here without a DecodedXLogRecord, it means we needed to
     887              :      * validate total_len before trusting it, but by now we've done that.
     888              :      */
     889       615688 :     if (decoded == NULL)
     890              :     {
     891              :         Assert(!nonblocking);
     892           12 :         decoded = XLogReadRecordAlloc(state,
     893              :                                       total_len,
     894              :                                       true /* allow_oversized */ );
     895              :         /* allocation should always happen under allow_oversized */
     896              :         Assert(decoded != NULL);
     897              :     }
     898              : 
     899       615688 :     if (DecodeXLogRecord(state, decoded, record, RecPtr, &errormsg))
     900              :     {
     901              :         /* Record the location of the next record. */
     902       615688 :         decoded->next_lsn = state->NextRecPtr;
     903              : 
     904              :         /*
     905              :          * If it's in the decode buffer, mark the decode buffer space as
     906              :          * occupied.
     907              :          */
     908       615688 :         if (!decoded->oversized)
     909              :         {
     910              :             /* The new decode buffer head must be MAXALIGNed. */
     911              :             Assert(decoded->size == MAXALIGN(decoded->size));
     912       615676 :             if ((char *) decoded == state->decode_buffer)
     913       615676 :                 state->decode_buffer_tail = state->decode_buffer + decoded->size;
     914              :             else
     915            0 :                 state->decode_buffer_tail += decoded->size;
     916              :         }
     917              : 
     918              :         /* Insert it into the queue of decoded records. */
     919              :         Assert(state->decode_queue_tail != decoded);
     920       615688 :         if (state->decode_queue_tail)
     921            0 :             state->decode_queue_tail->next = decoded;
     922       615688 :         state->decode_queue_tail = decoded;
     923       615688 :         if (!state->decode_queue_head)
     924       615688 :             state->decode_queue_head = decoded;
     925       615688 :         return XLREAD_SUCCESS;
     926              :     }
     927              : 
     928            0 : err:
     929           64 :     if (assembled)
     930              :     {
     931              :         /*
     932              :          * We get here when a record that spans multiple pages needs to be
     933              :          * assembled, but something went wrong -- perhaps a contrecord piece
     934              :          * was lost.  If caller is WAL replay, it will know where the aborted
     935              :          * record was and where to direct followup WAL to be written, marking
     936              :          * the next piece with XLP_FIRST_IS_OVERWRITE_CONTRECORD, which will
     937              :          * in turn signal downstream WAL consumers that the broken WAL record
     938              :          * is to be ignored.
     939              :          */
     940            0 :         state->abortedRecPtr = RecPtr;
     941            0 :         state->missingContrecPtr = targetPagePtr;
     942              : 
     943              :         /*
     944              :          * If we got here without reporting an error, make sure an error is
     945              :          * queued so that XLogPrefetcherReadRecord() doesn't bring us back a
     946              :          * second time and clobber the above state.
     947              :          */
     948            0 :         state->errormsg_deferred = true;
     949              :     }
     950              : 
     951           64 :     if (decoded && decoded->oversized)
     952            0 :         pfree(decoded);
     953              : 
     954              :     /*
     955              :      * Invalidate the read state. We might read from a different source after
     956              :      * failure.
     957              :      */
     958           64 :     XLogReaderInvalReadState(state);
     959              : 
     960              :     /*
     961              :      * If an error was written to errormsg_buf, it'll be returned to the
     962              :      * caller of XLogReadRecord() after all successfully decoded records from
     963              :      * the read queue.
     964              :      */
     965              : 
     966           64 :     return XLREAD_FAIL;
     967              : }
     968              : 
     969              : /*
     970              :  * Try to decode the next available record, and return it.  The record will
     971              :  * also be returned to XLogNextRecord(), which must be called to 'consume'
     972              :  * each record.
     973              :  *
     974              :  * If nonblocking is true, may return NULL due to lack of data or WAL decoding
     975              :  * space.
     976              :  */
     977              : DecodedXLogRecord *
     978       615752 : XLogReadAhead(XLogReaderState *state, bool nonblocking)
     979              : {
     980              :     XLogPageReadResult result;
     981              : 
     982       615752 :     if (state->errormsg_deferred)
     983            0 :         return NULL;
     984              : 
     985       615752 :     result = XLogDecodeNextRecord(state, nonblocking);
     986       615752 :     if (result == XLREAD_SUCCESS)
     987              :     {
     988              :         Assert(state->decode_queue_tail != NULL);
     989       615688 :         return state->decode_queue_tail;
     990              :     }
     991              : 
     992           64 :     return NULL;
     993              : }
     994              : 
     995              : /*
     996              :  * Read a single xlog page including at least [pageptr, reqLen] of valid data
     997              :  * via the page_read() callback.
     998              :  *
     999              :  * Returns XLREAD_FAIL if the required page cannot be read for some
    1000              :  * reason; errormsg_buf is set in that case (unless the error occurs in the
    1001              :  * page_read callback).
    1002              :  *
    1003              :  * Returns XLREAD_WOULDBLOCK if the requested data can't be read without
    1004              :  * waiting.  This can be returned only if the installed page_read callback
    1005              :  * respects the state->nonblocking flag, and cannot read the requested data
    1006              :  * immediately.
    1007              :  *
    1008              :  * We fetch the page from a reader-local cache if we know we have the required
    1009              :  * data and if there hasn't been any error since caching the data.
    1010              :  */
    1011              : static int
    1012      1249670 : ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
    1013              : {
    1014              :     int         readLen;
    1015              :     uint32      targetPageOff;
    1016              :     XLogSegNo   targetSegNo;
    1017              :     XLogPageHeader hdr;
    1018              : 
    1019              :     Assert((pageptr % XLOG_BLCKSZ) == 0);
    1020              : 
    1021      1249670 :     XLByteToSeg(pageptr, targetSegNo, state->segcxt.ws_segsize);
    1022      1249670 :     targetPageOff = XLogSegmentOffset(pageptr, state->segcxt.ws_segsize);
    1023              : 
    1024              :     /* check whether we have all the requested data already */
    1025      1249670 :     if (targetSegNo == state->seg.ws_segno &&
    1026      1249598 :         targetPageOff == state->segoff && reqLen <= state->readLen)
    1027      1231204 :         return state->readLen;
    1028              : 
    1029              :     /*
    1030              :      * Invalidate contents of internal buffer before read attempt.  Just set
    1031              :      * the length to 0, rather than a full XLogReaderInvalReadState(), so we
    1032              :      * don't forget the segment we last successfully read.
    1033              :      */
    1034        18466 :     state->readLen = 0;
    1035              : 
    1036              :     /*
    1037              :      * Data is not in our buffer.
    1038              :      *
    1039              :      * Every time we actually read the segment, even if we looked at parts of
    1040              :      * it before, we need to do verification as the page_read callback might
    1041              :      * now be rereading data from a different source.
    1042              :      *
    1043              :      * Whenever switching to a new WAL segment, we read the first page of the
    1044              :      * file and validate its header, even if that's not where the target
    1045              :      * record is.  This is so that we can check the additional identification
    1046              :      * info that is present in the first page's "long" header.
    1047              :      */
    1048        18466 :     if (targetSegNo != state->seg.ws_segno && targetPageOff != 0)
    1049              :     {
    1050           13 :         XLogRecPtr  targetSegmentPtr = pageptr - targetPageOff;
    1051              : 
    1052           13 :         readLen = state->routine.page_read(state, targetSegmentPtr, XLOG_BLCKSZ,
    1053              :                                            state->currRecPtr,
    1054              :                                            state->readBuf);
    1055           13 :         if (readLen == XLREAD_WOULDBLOCK)
    1056            0 :             return XLREAD_WOULDBLOCK;
    1057           13 :         else if (readLen < 0)
    1058            0 :             goto err;
    1059              : 
    1060              :         /* we can be sure to have enough WAL available, we scrolled back */
    1061              :         Assert(readLen == XLOG_BLCKSZ);
    1062              : 
    1063           13 :         if (!XLogReaderValidatePageHeader(state, targetSegmentPtr,
    1064              :                                           state->readBuf))
    1065            0 :             goto err;
    1066              :     }
    1067              : 
    1068              :     /*
    1069              :      * First, read the requested data length, but at least a short page header
    1070              :      * so that we can validate it.
    1071              :      */
    1072        18466 :     readLen = state->routine.page_read(state, pageptr, Max(reqLen, SizeOfXLogShortPHD),
    1073              :                                        state->currRecPtr,
    1074              :                                        state->readBuf);
    1075        18466 :     if (readLen == XLREAD_WOULDBLOCK)
    1076            0 :         return XLREAD_WOULDBLOCK;
    1077        18466 :     else if (readLen < 0)
    1078           62 :         goto err;
    1079              : 
    1080              :     Assert(readLen <= XLOG_BLCKSZ);
    1081              : 
    1082              :     /* Do we have enough data to check the header length? */
    1083        18404 :     if (readLen <= SizeOfXLogShortPHD)
    1084            0 :         goto err;
    1085              : 
    1086              :     Assert(readLen >= reqLen);
    1087              : 
    1088        18404 :     hdr = (XLogPageHeader) state->readBuf;
    1089              : 
    1090              :     /* still not enough */
    1091        18404 :     if (readLen < XLogPageHeaderSize(hdr))
    1092              :     {
    1093            0 :         readLen = state->routine.page_read(state, pageptr, XLogPageHeaderSize(hdr),
    1094              :                                            state->currRecPtr,
    1095              :                                            state->readBuf);
    1096            0 :         if (readLen == XLREAD_WOULDBLOCK)
    1097            0 :             return XLREAD_WOULDBLOCK;
    1098            0 :         else if (readLen < 0)
    1099            0 :             goto err;
    1100              :     }
    1101              : 
    1102              :     /*
    1103              :      * Now that we know we have the full header, validate it.
    1104              :      */
    1105        18404 :     if (!XLogReaderValidatePageHeader(state, pageptr, (char *) hdr))
    1106            0 :         goto err;
    1107              : 
    1108              :     /* update read state information */
    1109        18404 :     state->seg.ws_segno = targetSegNo;
    1110        18404 :     state->segoff = targetPageOff;
    1111        18404 :     state->readLen = readLen;
    1112              : 
    1113        18404 :     return readLen;
    1114              : 
    1115           62 : err:
    1116           62 :     XLogReaderInvalReadState(state);
    1117              : 
    1118           62 :     return XLREAD_FAIL;
    1119              : }
    1120              : 
    1121              : /*
    1122              :  * Invalidate the xlogreader's read state to force a re-read.
    1123              :  */
    1124              : static void
    1125          126 : XLogReaderInvalReadState(XLogReaderState *state)
    1126              : {
    1127          126 :     state->seg.ws_segno = 0;
    1128          126 :     state->segoff = 0;
    1129          126 :     state->readLen = 0;
    1130          126 : }
    1131              : 
    1132              : /*
    1133              :  * Validate an XLOG record header.
    1134              :  *
    1135              :  * This is just a convenience subroutine to avoid duplicated code in
    1136              :  * XLogReadRecord.  It's not intended for use from anywhere else.
    1137              :  */
    1138              : static bool
    1139       615690 : ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
    1140              :                       XLogRecPtr PrevRecPtr, XLogRecord *record,
    1141              :                       bool randAccess)
    1142              : {
    1143       615690 :     if (record->xl_tot_len < SizeOfXLogRecord)
    1144              :     {
    1145            2 :         report_invalid_record(state,
    1146              :                               "invalid record length at %X/%08X: expected at least %u, got %u",
    1147            2 :                               LSN_FORMAT_ARGS(RecPtr),
    1148              :                               (uint32) SizeOfXLogRecord, record->xl_tot_len);
    1149            2 :         return false;
    1150              :     }
    1151       615688 :     if (!RmgrIdIsValid(record->xl_rmid))
    1152              :     {
    1153            0 :         report_invalid_record(state,
    1154              :                               "invalid resource manager ID %u at %X/%08X",
    1155            0 :                               record->xl_rmid, LSN_FORMAT_ARGS(RecPtr));
    1156            0 :         return false;
    1157              :     }
    1158       615688 :     if (randAccess)
    1159              :     {
    1160              :         /*
    1161              :          * We can't exactly verify the prev-link, but surely it should be less
    1162              :          * than the record's own address.
    1163              :          */
    1164          130 :         if (!(record->xl_prev < RecPtr))
    1165              :         {
    1166            0 :             report_invalid_record(state,
    1167              :                                   "record with incorrect prev-link %X/%08X at %X/%08X",
    1168            0 :                                   LSN_FORMAT_ARGS(record->xl_prev),
    1169            0 :                                   LSN_FORMAT_ARGS(RecPtr));
    1170            0 :             return false;
    1171              :         }
    1172              :     }
    1173              :     else
    1174              :     {
    1175              :         /*
    1176              :          * Record's prev-link should exactly match our previous location. This
    1177              :          * check guards against torn WAL pages where a stale but valid-looking
    1178              :          * WAL record starts on a sector boundary.
    1179              :          */
    1180       615558 :         if (record->xl_prev != PrevRecPtr)
    1181              :         {
    1182            0 :             report_invalid_record(state,
    1183              :                                   "record with incorrect prev-link %X/%08X at %X/%08X",
    1184            0 :                                   LSN_FORMAT_ARGS(record->xl_prev),
    1185            0 :                                   LSN_FORMAT_ARGS(RecPtr));
    1186            0 :             return false;
    1187              :         }
    1188              :     }
    1189              : 
    1190       615688 :     return true;
    1191              : }
    1192              : 
    1193              : 
    1194              : /*
    1195              :  * CRC-check an XLOG record.  We do not believe the contents of an XLOG
    1196              :  * record (other than to the minimal extent of computing the amount of
    1197              :  * data to read in) until we've checked the CRCs.
    1198              :  *
    1199              :  * We assume all of the record (that is, xl_tot_len bytes) has been read
    1200              :  * into memory at *record.  Also, ValidXLogRecordHeader() has accepted the
    1201              :  * record's header, which means in particular that xl_tot_len is at least
    1202              :  * SizeOfXLogRecord.
    1203              :  */
    1204              : static bool
    1205       615688 : ValidXLogRecord(XLogReaderState *state, XLogRecord *record, XLogRecPtr recptr)
    1206              : {
    1207              :     pg_crc32c   crc;
    1208              : 
    1209              :     Assert(record->xl_tot_len >= SizeOfXLogRecord);
    1210              : 
    1211              :     /* Calculate the CRC */
    1212       615688 :     INIT_CRC32C(crc);
    1213       615688 :     COMP_CRC32C(crc, ((char *) record) + SizeOfXLogRecord, record->xl_tot_len - SizeOfXLogRecord);
    1214              :     /* include the record header last */
    1215       615688 :     COMP_CRC32C(crc, (char *) record, offsetof(XLogRecord, xl_crc));
    1216       615688 :     FIN_CRC32C(crc);
    1217              : 
    1218       615688 :     if (!EQ_CRC32C(record->xl_crc, crc))
    1219              :     {
    1220            0 :         report_invalid_record(state,
    1221              :                               "incorrect resource manager data checksum in record at %X/%08X",
    1222            0 :                               LSN_FORMAT_ARGS(recptr));
    1223            0 :         return false;
    1224              :     }
    1225              : 
    1226       615688 :     return true;
    1227              : }
    1228              : 
    1229              : /*
    1230              :  * Validate a page header.
    1231              :  *
    1232              :  * Check if 'phdr' is valid as the header of the XLog page at position
    1233              :  * 'recptr'.
    1234              :  */
    1235              : bool
    1236        18417 : XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr,
    1237              :                              char *phdr)
    1238              : {
    1239              :     XLogSegNo   segno;
    1240              :     int32       offset;
    1241        18417 :     XLogPageHeader hdr = (XLogPageHeader) phdr;
    1242              : 
    1243              :     Assert((recptr % XLOG_BLCKSZ) == 0);
    1244              : 
    1245        18417 :     XLByteToSeg(recptr, segno, state->segcxt.ws_segsize);
    1246        18417 :     offset = XLogSegmentOffset(recptr, state->segcxt.ws_segsize);
    1247              : 
    1248        18417 :     if (hdr->xlp_magic != XLOG_PAGE_MAGIC)
    1249              :     {
    1250              :         char        fname[MAXFNAMELEN];
    1251              : 
    1252            0 :         XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
    1253              : 
    1254            0 :         report_invalid_record(state,
    1255              :                               "invalid magic number %04X in WAL segment %s, LSN %X/%08X, offset %u",
    1256            0 :                               hdr->xlp_magic,
    1257              :                               fname,
    1258            0 :                               LSN_FORMAT_ARGS(recptr),
    1259              :                               offset);
    1260            0 :         return false;
    1261              :     }
    1262              : 
    1263        18417 :     if ((hdr->xlp_info & ~XLP_ALL_FLAGS) != 0)
    1264              :     {
    1265              :         char        fname[MAXFNAMELEN];
    1266              : 
    1267            0 :         XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
    1268              : 
    1269            0 :         report_invalid_record(state,
    1270              :                               "invalid info bits %04X in WAL segment %s, LSN %X/%08X, offset %u",
    1271            0 :                               hdr->xlp_info,
    1272              :                               fname,
    1273            0 :                               LSN_FORMAT_ARGS(recptr),
    1274              :                               offset);
    1275            0 :         return false;
    1276              :     }
    1277              : 
    1278        18417 :     if (hdr->xlp_info & XLP_LONG_HEADER)
    1279              :     {
    1280           67 :         XLogLongPageHeader longhdr = (XLogLongPageHeader) hdr;
    1281              : 
    1282           67 :         if (state->system_identifier &&
    1283            0 :             longhdr->xlp_sysid != state->system_identifier)
    1284              :         {
    1285            0 :             report_invalid_record(state,
    1286              :                                   "WAL file is from different database system: WAL file database system identifier is %" PRIu64 ", pg_control database system identifier is %" PRIu64,
    1287              :                                   longhdr->xlp_sysid,
    1288              :                                   state->system_identifier);
    1289            0 :             return false;
    1290              :         }
    1291           67 :         else if (longhdr->xlp_seg_size != state->segcxt.ws_segsize)
    1292              :         {
    1293            0 :             report_invalid_record(state,
    1294              :                                   "WAL file is from different database system: incorrect segment size in page header");
    1295            0 :             return false;
    1296              :         }
    1297           67 :         else if (longhdr->xlp_xlog_blcksz != XLOG_BLCKSZ)
    1298              :         {
    1299            0 :             report_invalid_record(state,
    1300              :                                   "WAL file is from different database system: incorrect XLOG_BLCKSZ in page header");
    1301            0 :             return false;
    1302              :         }
    1303              :     }
    1304        18350 :     else if (offset == 0)
    1305              :     {
    1306              :         char        fname[MAXFNAMELEN];
    1307              : 
    1308            0 :         XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
    1309              : 
    1310              :         /* hmm, first page of file doesn't have a long header? */
    1311            0 :         report_invalid_record(state,
    1312              :                               "invalid info bits %04X in WAL segment %s, LSN %X/%08X, offset %u",
    1313            0 :                               hdr->xlp_info,
    1314              :                               fname,
    1315            0 :                               LSN_FORMAT_ARGS(recptr),
    1316              :                               offset);
    1317            0 :         return false;
    1318              :     }
    1319              : 
    1320              :     /*
    1321              :      * Check that the address on the page agrees with what we expected. This
    1322              :      * check typically fails when an old WAL segment is recycled, and hasn't
    1323              :      * yet been overwritten with new data yet.
    1324              :      */
    1325        18417 :     if (hdr->xlp_pageaddr != recptr)
    1326              :     {
    1327              :         char        fname[MAXFNAMELEN];
    1328              : 
    1329            0 :         XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
    1330              : 
    1331            0 :         report_invalid_record(state,
    1332              :                               "unexpected pageaddr %X/%08X in WAL segment %s, LSN %X/%08X, offset %u",
    1333            0 :                               LSN_FORMAT_ARGS(hdr->xlp_pageaddr),
    1334              :                               fname,
    1335            0 :                               LSN_FORMAT_ARGS(recptr),
    1336              :                               offset);
    1337            0 :         return false;
    1338              :     }
    1339              : 
    1340              :     /*
    1341              :      * Since child timelines are always assigned a TLI greater than their
    1342              :      * immediate parent's TLI, we should never see TLI go backwards across
    1343              :      * successive pages of a consistent WAL sequence.
    1344              :      *
    1345              :      * Sometimes we re-read a segment that's already been (partially) read. So
    1346              :      * we only verify TLIs for pages that are later than the last remembered
    1347              :      * LSN.
    1348              :      */
    1349        18417 :     if (recptr > state->latestPagePtr)
    1350              :     {
    1351        18417 :         if (hdr->xlp_tli < state->latestPageTLI)
    1352              :         {
    1353              :             char        fname[MAXFNAMELEN];
    1354              : 
    1355            0 :             XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize);
    1356              : 
    1357            0 :             report_invalid_record(state,
    1358              :                                   "out-of-sequence timeline ID %u (after %u) in WAL segment %s, LSN %X/%08X, offset %u",
    1359              :                                   hdr->xlp_tli,
    1360              :                                   state->latestPageTLI,
    1361              :                                   fname,
    1362            0 :                                   LSN_FORMAT_ARGS(recptr),
    1363              :                                   offset);
    1364            0 :             return false;
    1365              :         }
    1366              :     }
    1367        18417 :     state->latestPagePtr = recptr;
    1368        18417 :     state->latestPageTLI = hdr->xlp_tli;
    1369              : 
    1370        18417 :     return true;
    1371              : }
    1372              : 
    1373              : /*
    1374              :  * Forget about an error produced by XLogReaderValidatePageHeader().
    1375              :  */
    1376              : void
    1377            0 : XLogReaderResetError(XLogReaderState *state)
    1378              : {
    1379            0 :     state->errormsg_buf[0] = '\0';
    1380            0 :     state->errormsg_deferred = false;
    1381            0 : }
    1382              : 
    1383              : /*
    1384              :  * Find the first record with an lsn >= RecPtr.
    1385              :  *
    1386              :  * This is different from XLogBeginRead() in that RecPtr doesn't need to point
    1387              :  * to a valid record boundary.  Useful for checking whether RecPtr is a valid
    1388              :  * xlog address for reading, and to find the first valid address after some
    1389              :  * address when dumping records for debugging purposes.
    1390              :  *
    1391              :  * This positions the reader, like XLogBeginRead(), so that the next call to
    1392              :  * XLogReadRecord() will read the next valid record.
    1393              :  */
    1394              : XLogRecPtr
    1395           65 : XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr)
    1396              : {
    1397              :     XLogRecPtr  tmpRecPtr;
    1398           65 :     XLogRecPtr  found = InvalidXLogRecPtr;
    1399              :     XLogPageHeader header;
    1400              :     char       *errormsg;
    1401              : 
    1402              :     Assert(XLogRecPtrIsValid(RecPtr));
    1403              : 
    1404              :     /* Make sure ReadPageInternal() can't return XLREAD_WOULDBLOCK. */
    1405           65 :     state->nonblocking = false;
    1406              : 
    1407              :     /*
    1408              :      * skip over potential continuation data, keeping in mind that it may span
    1409              :      * multiple pages
    1410              :      */
    1411           65 :     tmpRecPtr = RecPtr;
    1412              :     while (true)
    1413            0 :     {
    1414              :         XLogRecPtr  targetPagePtr;
    1415              :         int         targetRecOff;
    1416              :         uint32      pageHeaderSize;
    1417              :         int         readLen;
    1418              : 
    1419              :         /*
    1420              :          * Compute targetRecOff. It should typically be equal or greater than
    1421              :          * short page-header since a valid record can't start anywhere before
    1422              :          * that, except when caller has explicitly specified the offset that
    1423              :          * falls somewhere there or when we are skipping multi-page
    1424              :          * continuation record. It doesn't matter though because
    1425              :          * ReadPageInternal() is prepared to handle that and will read at
    1426              :          * least short page-header worth of data
    1427              :          */
    1428           65 :         targetRecOff = tmpRecPtr % XLOG_BLCKSZ;
    1429              : 
    1430              :         /* scroll back to page boundary */
    1431           65 :         targetPagePtr = tmpRecPtr - targetRecOff;
    1432              : 
    1433              :         /* Read the page containing the record */
    1434           65 :         readLen = ReadPageInternal(state, targetPagePtr, targetRecOff);
    1435           65 :         if (readLen < 0)
    1436            0 :             goto err;
    1437              : 
    1438           65 :         header = (XLogPageHeader) state->readBuf;
    1439              : 
    1440           65 :         pageHeaderSize = XLogPageHeaderSize(header);
    1441              : 
    1442              :         /* make sure we have enough data for the page header */
    1443           65 :         readLen = ReadPageInternal(state, targetPagePtr, pageHeaderSize);
    1444           65 :         if (readLen < 0)
    1445            0 :             goto err;
    1446              : 
    1447              :         /* skip over potential continuation data */
    1448           65 :         if (header->xlp_info & XLP_FIRST_IS_CONTRECORD)
    1449              :         {
    1450              :             /*
    1451              :              * If the length of the remaining continuation data is more than
    1452              :              * what can fit in this page, the continuation record crosses over
    1453              :              * this page. Read the next page and try again. xlp_rem_len in the
    1454              :              * next page header will contain the remaining length of the
    1455              :              * continuation data
    1456              :              *
    1457              :              * Note that record headers are MAXALIGN'ed
    1458              :              */
    1459           13 :             if (MAXALIGN(header->xlp_rem_len) >= (XLOG_BLCKSZ - pageHeaderSize))
    1460            0 :                 tmpRecPtr = targetPagePtr + XLOG_BLCKSZ;
    1461              :             else
    1462              :             {
    1463              :                 /*
    1464              :                  * The previous continuation record ends in this page. Set
    1465              :                  * tmpRecPtr to point to the first valid record
    1466              :                  */
    1467           13 :                 tmpRecPtr = targetPagePtr + pageHeaderSize
    1468           13 :                     + MAXALIGN(header->xlp_rem_len);
    1469           13 :                 break;
    1470              :             }
    1471              :         }
    1472              :         else
    1473              :         {
    1474           52 :             tmpRecPtr = targetPagePtr + pageHeaderSize;
    1475           52 :             break;
    1476              :         }
    1477              :     }
    1478              : 
    1479              :     /*
    1480              :      * we know now that tmpRecPtr is an address pointing to a valid XLogRecord
    1481              :      * because either we're at the first record after the beginning of a page
    1482              :      * or we just jumped over the remaining data of a continuation.
    1483              :      */
    1484           65 :     XLogBeginRead(state, tmpRecPtr);
    1485          313 :     while (XLogReadRecord(state, &errormsg) != NULL)
    1486              :     {
    1487              :         /* past the record we've found, break out */
    1488          313 :         if (RecPtr <= state->ReadRecPtr)
    1489              :         {
    1490              :             /* Rewind the reader to the beginning of the last record. */
    1491           65 :             found = state->ReadRecPtr;
    1492           65 :             XLogBeginRead(state, found);
    1493           65 :             return found;
    1494              :         }
    1495              :     }
    1496              : 
    1497            0 : err:
    1498            0 :     XLogReaderInvalReadState(state);
    1499              : 
    1500            0 :     return InvalidXLogRecPtr;
    1501              : }
    1502              : 
    1503              : /*
    1504              :  * Helper function to ease writing of XLogReaderRoutine->page_read callbacks.
    1505              :  * If this function is used, caller must supply a segment_open callback in
    1506              :  * 'state', as that is used here.
    1507              :  *
    1508              :  * Read 'count' bytes into 'buf', starting at location 'startptr', from WAL
    1509              :  * fetched from timeline 'tli'.
    1510              :  *
    1511              :  * Returns true if succeeded, false if an error occurs, in which case
    1512              :  * 'errinfo' receives error details.
    1513              :  */
    1514              : bool
    1515        18417 : WALRead(XLogReaderState *state,
    1516              :         char *buf, XLogRecPtr startptr, Size count, TimeLineID tli,
    1517              :         WALReadError *errinfo)
    1518              : {
    1519              :     char       *p;
    1520              :     XLogRecPtr  recptr;
    1521              :     Size        nbytes;
    1522              : #ifndef FRONTEND
    1523              :     instr_time  io_start;
    1524              : #endif
    1525              : 
    1526        18417 :     p = buf;
    1527        18417 :     recptr = startptr;
    1528        18417 :     nbytes = count;
    1529              : 
    1530        36834 :     while (nbytes > 0)
    1531              :     {
    1532              :         uint32      startoff;
    1533              :         int         segbytes;
    1534              :         int         readbytes;
    1535              : 
    1536        18417 :         startoff = XLogSegmentOffset(recptr, state->segcxt.ws_segsize);
    1537              : 
    1538              :         /*
    1539              :          * If the data we want is not in a segment we have open, close what we
    1540              :          * have (if anything) and open the next one, using the caller's
    1541              :          * provided segment_open callback.
    1542              :          */
    1543        18417 :         if (state->seg.ws_file < 0 ||
    1544        18352 :             !XLByteInSeg(recptr, state->seg.ws_segno, state->segcxt.ws_segsize) ||
    1545        18350 :             tli != state->seg.ws_tli)
    1546              :         {
    1547              :             XLogSegNo   nextSegNo;
    1548              : 
    1549           67 :             if (state->seg.ws_file >= 0)
    1550            2 :                 state->routine.segment_close(state);
    1551              : 
    1552           67 :             XLByteToSeg(recptr, nextSegNo, state->segcxt.ws_segsize);
    1553           67 :             state->routine.segment_open(state, nextSegNo, &tli);
    1554              : 
    1555              :             /* This shouldn't happen -- indicates a bug in segment_open */
    1556              :             Assert(state->seg.ws_file >= 0);
    1557              : 
    1558              :             /* Update the current segment info. */
    1559           67 :             state->seg.ws_tli = tli;
    1560           67 :             state->seg.ws_segno = nextSegNo;
    1561              :         }
    1562              : 
    1563              :         /* How many bytes are within this segment? */
    1564        18417 :         if (nbytes > (state->segcxt.ws_segsize - startoff))
    1565            0 :             segbytes = state->segcxt.ws_segsize - startoff;
    1566              :         else
    1567        18417 :             segbytes = nbytes;
    1568              : 
    1569              : #ifndef FRONTEND
    1570              :         /* Measure I/O timing when reading segment */
    1571              :         io_start = pgstat_prepare_io_time(track_wal_io_timing);
    1572              : 
    1573              :         pgstat_report_wait_start(WAIT_EVENT_WAL_READ);
    1574              : #endif
    1575              : 
    1576              :         /* Reset errno first; eases reporting non-errno-affecting errors */
    1577        18417 :         errno = 0;
    1578        18417 :         readbytes = pg_pread(state->seg.ws_file, p, segbytes, (pgoff_t) startoff);
    1579              : 
    1580              : #ifndef FRONTEND
    1581              :         pgstat_report_wait_end();
    1582              : 
    1583              :         pgstat_count_io_op_time(IOOBJECT_WAL, IOCONTEXT_NORMAL, IOOP_READ,
    1584              :                                 io_start, 1, readbytes);
    1585              : #endif
    1586              : 
    1587        18417 :         if (readbytes <= 0)
    1588              :         {
    1589            0 :             errinfo->wre_errno = errno;
    1590            0 :             errinfo->wre_req = segbytes;
    1591            0 :             errinfo->wre_read = readbytes;
    1592            0 :             errinfo->wre_off = startoff;
    1593            0 :             errinfo->wre_seg = state->seg;
    1594            0 :             return false;
    1595              :         }
    1596              : 
    1597              :         /* Update state for read */
    1598        18417 :         recptr += readbytes;
    1599        18417 :         nbytes -= readbytes;
    1600        18417 :         p += readbytes;
    1601              :     }
    1602              : 
    1603        18417 :     return true;
    1604              : }
    1605              : 
    1606              : /* ----------------------------------------
    1607              :  * Functions for decoding the data and block references in a record.
    1608              :  * ----------------------------------------
    1609              :  */
    1610              : 
    1611              : /*
    1612              :  * Private function to reset the state, forgetting all decoded records, if we
    1613              :  * are asked to move to a new read position.
    1614              :  */
    1615              : static void
    1616          130 : ResetDecoder(XLogReaderState *state)
    1617              : {
    1618              :     DecodedXLogRecord *r;
    1619              : 
    1620              :     /* Reset the decoded record queue, freeing any oversized records. */
    1621          325 :     while ((r = state->decode_queue_head) != NULL)
    1622              :     {
    1623           65 :         state->decode_queue_head = r->next;
    1624           65 :         if (r->oversized)
    1625            0 :             pfree(r);
    1626              :     }
    1627          130 :     state->decode_queue_tail = NULL;
    1628          130 :     state->decode_queue_head = NULL;
    1629          130 :     state->record = NULL;
    1630              : 
    1631              :     /* Reset the decode buffer to empty. */
    1632          130 :     state->decode_buffer_tail = state->decode_buffer;
    1633          130 :     state->decode_buffer_head = state->decode_buffer;
    1634              : 
    1635              :     /* Clear error state. */
    1636          130 :     state->errormsg_buf[0] = '\0';
    1637          130 :     state->errormsg_deferred = false;
    1638          130 : }
    1639              : 
    1640              : /*
    1641              :  * Compute the maximum possible amount of padding that could be required to
    1642              :  * decode a record, given xl_tot_len from the record's header.  This is the
    1643              :  * amount of output buffer space that we need to decode a record, though we
    1644              :  * might not finish up using it all.
    1645              :  *
    1646              :  * This computation is pessimistic and assumes the maximum possible number of
    1647              :  * blocks, due to lack of better information.
    1648              :  */
    1649              : size_t
    1650       615700 : DecodeXLogRecordRequiredSpace(size_t xl_tot_len)
    1651              : {
    1652       615700 :     size_t      size = 0;
    1653              : 
    1654              :     /* Account for the fixed size part of the decoded record struct. */
    1655       615700 :     size += offsetof(DecodedXLogRecord, blocks[0]);
    1656              :     /* Account for the flexible blocks array of maximum possible size. */
    1657       615700 :     size += sizeof(DecodedBkpBlock) * (XLR_MAX_BLOCK_ID + 1);
    1658              :     /* Account for all the raw main and block data. */
    1659       615700 :     size += xl_tot_len;
    1660              :     /* We might insert padding before main_data. */
    1661       615700 :     size += (MAXIMUM_ALIGNOF - 1);
    1662              :     /* We might insert padding before each block's data. */
    1663       615700 :     size += (MAXIMUM_ALIGNOF - 1) * (XLR_MAX_BLOCK_ID + 1);
    1664              :     /* We might insert padding at the end. */
    1665       615700 :     size += (MAXIMUM_ALIGNOF - 1);
    1666              : 
    1667       615700 :     return size;
    1668              : }
    1669              : 
    1670              : /*
    1671              :  * Decode a record.  "decoded" must point to a MAXALIGNed memory area that has
    1672              :  * space for at least DecodeXLogRecordRequiredSpace(record) bytes.  On
    1673              :  * success, decoded->size contains the actual space occupied by the decoded
    1674              :  * record, which may turn out to be less.
    1675              :  *
    1676              :  * Only decoded->oversized member must be initialized already, and will not be
    1677              :  * modified.  Other members will be initialized as required.
    1678              :  *
    1679              :  * On error, a human-readable error message is returned in *errormsg, and
    1680              :  * the return value is false.
    1681              :  */
    1682              : bool
    1683       615688 : DecodeXLogRecord(XLogReaderState *state,
    1684              :                  DecodedXLogRecord *decoded,
    1685              :                  XLogRecord *record,
    1686              :                  XLogRecPtr lsn,
    1687              :                  char **errormsg)
    1688              : {
    1689              :     /*
    1690              :      * read next _size bytes from record buffer, but check for overrun first.
    1691              :      */
    1692              : #define COPY_HEADER_FIELD(_dst, _size)          \
    1693              :     do {                                        \
    1694              :         if (remaining < _size)                   \
    1695              :             goto shortdata_err;                 \
    1696              :         memcpy(_dst, ptr, _size);               \
    1697              :         ptr += _size;                           \
    1698              :         remaining -= _size;                     \
    1699              :     } while(0)
    1700              : 
    1701              :     char       *ptr;
    1702              :     char       *out;
    1703              :     uint32      remaining;
    1704              :     uint32      datatotal;
    1705       615688 :     RelFileLocator *rlocator = NULL;
    1706              :     uint8       block_id;
    1707              : 
    1708       615688 :     decoded->header = *record;
    1709       615688 :     decoded->lsn = lsn;
    1710       615688 :     decoded->next = NULL;
    1711       615688 :     decoded->record_origin = InvalidReplOriginId;
    1712       615688 :     decoded->toplevel_xid = InvalidTransactionId;
    1713       615688 :     decoded->main_data = NULL;
    1714       615688 :     decoded->main_data_len = 0;
    1715       615688 :     decoded->max_block_id = -1;
    1716       615688 :     ptr = (char *) record;
    1717       615688 :     ptr += SizeOfXLogRecord;
    1718       615688 :     remaining = record->xl_tot_len - SizeOfXLogRecord;
    1719              : 
    1720              :     /* Decode the headers */
    1721       615688 :     datatotal = 0;
    1722      1300432 :     while (remaining > datatotal)
    1723              :     {
    1724      1284110 :         COPY_HEADER_FIELD(&block_id, sizeof(uint8));
    1725              : 
    1726      1284110 :         if (block_id == XLR_BLOCK_ID_DATA_SHORT)
    1727              :         {
    1728              :             /* XLogRecordDataHeaderShort */
    1729              :             uint8       main_data_len;
    1730              : 
    1731       598351 :             COPY_HEADER_FIELD(&main_data_len, sizeof(uint8));
    1732              : 
    1733       598351 :             decoded->main_data_len = main_data_len;
    1734       598351 :             datatotal += main_data_len;
    1735       598351 :             break;              /* by convention, the main data fragment is
    1736              :                                  * always last */
    1737              :         }
    1738       685759 :         else if (block_id == XLR_BLOCK_ID_DATA_LONG)
    1739              :         {
    1740              :             /* XLogRecordDataHeaderLong */
    1741              :             uint32      main_data_len;
    1742              : 
    1743         1015 :             COPY_HEADER_FIELD(&main_data_len, sizeof(uint32));
    1744         1015 :             decoded->main_data_len = main_data_len;
    1745         1015 :             datatotal += main_data_len;
    1746         1015 :             break;              /* by convention, the main data fragment is
    1747              :                                  * always last */
    1748              :         }
    1749       684744 :         else if (block_id == XLR_BLOCK_ID_ORIGIN)
    1750              :         {
    1751            0 :             COPY_HEADER_FIELD(&decoded->record_origin, sizeof(ReplOriginId));
    1752              :         }
    1753       684744 :         else if (block_id == XLR_BLOCK_ID_TOPLEVEL_XID)
    1754              :         {
    1755            0 :             COPY_HEADER_FIELD(&decoded->toplevel_xid, sizeof(TransactionId));
    1756              :         }
    1757       684744 :         else if (block_id <= XLR_MAX_BLOCK_ID)
    1758              :         {
    1759              :             /* XLogRecordBlockHeader */
    1760              :             DecodedBkpBlock *blk;
    1761              :             uint8       fork_flags;
    1762              : 
    1763              :             /* mark any intervening block IDs as not in use */
    1764       684913 :             for (int i = decoded->max_block_id + 1; i < block_id; ++i)
    1765          169 :                 decoded->blocks[i].in_use = false;
    1766              : 
    1767       684744 :             if (block_id <= decoded->max_block_id)
    1768              :             {
    1769            0 :                 report_invalid_record(state,
    1770              :                                       "out-of-order block_id %u at %X/%08X",
    1771              :                                       block_id,
    1772            0 :                                       LSN_FORMAT_ARGS(state->ReadRecPtr));
    1773            0 :                 goto err;
    1774              :             }
    1775       684744 :             decoded->max_block_id = block_id;
    1776              : 
    1777       684744 :             blk = &decoded->blocks[block_id];
    1778       684744 :             blk->in_use = true;
    1779       684744 :             blk->apply_image = false;
    1780              : 
    1781       684744 :             COPY_HEADER_FIELD(&fork_flags, sizeof(uint8));
    1782       684744 :             blk->forknum = fork_flags & BKPBLOCK_FORK_MASK;
    1783       684744 :             blk->flags = fork_flags;
    1784       684744 :             blk->has_image = ((fork_flags & BKPBLOCK_HAS_IMAGE) != 0);
    1785       684744 :             blk->has_data = ((fork_flags & BKPBLOCK_HAS_DATA) != 0);
    1786              : 
    1787       684744 :             blk->prefetch_buffer = InvalidBuffer;
    1788              : 
    1789       684744 :             COPY_HEADER_FIELD(&blk->data_len, sizeof(uint16));
    1790              :             /* cross-check that the HAS_DATA flag is set iff data_length > 0 */
    1791       684744 :             if (blk->has_data && blk->data_len == 0)
    1792              :             {
    1793            0 :                 report_invalid_record(state,
    1794              :                                       "BKPBLOCK_HAS_DATA set, but no data included at %X/%08X",
    1795            0 :                                       LSN_FORMAT_ARGS(state->ReadRecPtr));
    1796            0 :                 goto err;
    1797              :             }
    1798       684744 :             if (!blk->has_data && blk->data_len != 0)
    1799              :             {
    1800            0 :                 report_invalid_record(state,
    1801              :                                       "BKPBLOCK_HAS_DATA not set, but data length is %d at %X/%08X",
    1802            0 :                                       blk->data_len,
    1803            0 :                                       LSN_FORMAT_ARGS(state->ReadRecPtr));
    1804            0 :                 goto err;
    1805              :             }
    1806       684744 :             datatotal += blk->data_len;
    1807              : 
    1808       684744 :             if (blk->has_image)
    1809              :             {
    1810        19427 :                 COPY_HEADER_FIELD(&blk->bimg_len, sizeof(uint16));
    1811        19427 :                 COPY_HEADER_FIELD(&blk->hole_offset, sizeof(uint16));
    1812        19427 :                 COPY_HEADER_FIELD(&blk->bimg_info, sizeof(uint8));
    1813              : 
    1814        19427 :                 blk->apply_image = ((blk->bimg_info & BKPIMAGE_APPLY) != 0);
    1815              : 
    1816        19427 :                 if (BKPIMAGE_COMPRESSED(blk->bimg_info))
    1817              :                 {
    1818            0 :                     if (blk->bimg_info & BKPIMAGE_HAS_HOLE)
    1819            0 :                         COPY_HEADER_FIELD(&blk->hole_length, sizeof(uint16));
    1820              :                     else
    1821            0 :                         blk->hole_length = 0;
    1822              :                 }
    1823              :                 else
    1824        19427 :                     blk->hole_length = BLCKSZ - blk->bimg_len;
    1825        19427 :                 datatotal += blk->bimg_len;
    1826              : 
    1827              :                 /*
    1828              :                  * cross-check that hole_offset > 0, hole_length > 0 and
    1829              :                  * bimg_len < BLCKSZ if the HAS_HOLE flag is set.
    1830              :                  */
    1831        19427 :                 if ((blk->bimg_info & BKPIMAGE_HAS_HOLE) &&
    1832        17614 :                     (blk->hole_offset == 0 ||
    1833        17614 :                      blk->hole_length == 0 ||
    1834        17614 :                      blk->bimg_len == BLCKSZ))
    1835              :                 {
    1836            0 :                     report_invalid_record(state,
    1837              :                                           "BKPIMAGE_HAS_HOLE set, but hole offset %d length %d block image length %d at %X/%08X",
    1838            0 :                                           blk->hole_offset,
    1839            0 :                                           blk->hole_length,
    1840            0 :                                           blk->bimg_len,
    1841            0 :                                           LSN_FORMAT_ARGS(state->ReadRecPtr));
    1842            0 :                     goto err;
    1843              :                 }
    1844              : 
    1845              :                 /*
    1846              :                  * cross-check that hole_offset == 0 and hole_length == 0 if
    1847              :                  * the HAS_HOLE flag is not set.
    1848              :                  */
    1849        19427 :                 if (!(blk->bimg_info & BKPIMAGE_HAS_HOLE) &&
    1850         1813 :                     (blk->hole_offset != 0 || blk->hole_length != 0))
    1851              :                 {
    1852            0 :                     report_invalid_record(state,
    1853              :                                           "BKPIMAGE_HAS_HOLE not set, but hole offset %d length %d at %X/%08X",
    1854            0 :                                           blk->hole_offset,
    1855            0 :                                           blk->hole_length,
    1856            0 :                                           LSN_FORMAT_ARGS(state->ReadRecPtr));
    1857            0 :                     goto err;
    1858              :                 }
    1859              : 
    1860              :                 /*
    1861              :                  * Cross-check that bimg_len < BLCKSZ if it is compressed.
    1862              :                  */
    1863        19427 :                 if (BKPIMAGE_COMPRESSED(blk->bimg_info) &&
    1864            0 :                     blk->bimg_len == BLCKSZ)
    1865              :                 {
    1866            0 :                     report_invalid_record(state,
    1867              :                                           "BKPIMAGE_COMPRESSED set, but block image length %d at %X/%08X",
    1868            0 :                                           blk->bimg_len,
    1869            0 :                                           LSN_FORMAT_ARGS(state->ReadRecPtr));
    1870            0 :                     goto err;
    1871              :                 }
    1872              : 
    1873              :                 /*
    1874              :                  * cross-check that bimg_len = BLCKSZ if neither HAS_HOLE is
    1875              :                  * set nor COMPRESSED().
    1876              :                  */
    1877        19427 :                 if (!(blk->bimg_info & BKPIMAGE_HAS_HOLE) &&
    1878         1813 :                     !BKPIMAGE_COMPRESSED(blk->bimg_info) &&
    1879         1813 :                     blk->bimg_len != BLCKSZ)
    1880              :                 {
    1881            0 :                     report_invalid_record(state,
    1882              :                                           "neither BKPIMAGE_HAS_HOLE nor BKPIMAGE_COMPRESSED set, but block image length is %d at %X/%08X",
    1883            0 :                                           blk->data_len,
    1884            0 :                                           LSN_FORMAT_ARGS(state->ReadRecPtr));
    1885            0 :                     goto err;
    1886              :                 }
    1887              :             }
    1888       684744 :             if (!(fork_flags & BKPBLOCK_SAME_REL))
    1889              :             {
    1890       596377 :                 COPY_HEADER_FIELD(&blk->rlocator, sizeof(RelFileLocator));
    1891       596377 :                 rlocator = &blk->rlocator;
    1892              :             }
    1893              :             else
    1894              :             {
    1895        88367 :                 if (rlocator == NULL)
    1896              :                 {
    1897            0 :                     report_invalid_record(state,
    1898              :                                           "BKPBLOCK_SAME_REL set but no previous rel at %X/%08X",
    1899            0 :                                           LSN_FORMAT_ARGS(state->ReadRecPtr));
    1900            0 :                     goto err;
    1901              :                 }
    1902              : 
    1903        88367 :                 blk->rlocator = *rlocator;
    1904              :             }
    1905       684744 :             COPY_HEADER_FIELD(&blk->blkno, sizeof(BlockNumber));
    1906              :         }
    1907              :         else
    1908              :         {
    1909            0 :             report_invalid_record(state,
    1910              :                                   "invalid block_id %u at %X/%08X",
    1911            0 :                                   block_id, LSN_FORMAT_ARGS(state->ReadRecPtr));
    1912            0 :             goto err;
    1913              :         }
    1914              :     }
    1915              : 
    1916       615688 :     if (remaining != datatotal)
    1917            0 :         goto shortdata_err;
    1918              : 
    1919              :     /*
    1920              :      * Ok, we've parsed the fragment headers, and verified that the total
    1921              :      * length of the payload in the fragments is equal to the amount of data
    1922              :      * left.  Copy the data of each fragment to contiguous space after the
    1923              :      * blocks array, inserting alignment padding before the data fragments so
    1924              :      * they can be cast to struct pointers by REDO routines.
    1925              :      */
    1926       615688 :     out = ((char *) decoded) +
    1927       615688 :         offsetof(DecodedXLogRecord, blocks) +
    1928       615688 :         sizeof(decoded->blocks[0]) * (decoded->max_block_id + 1);
    1929              : 
    1930              :     /* block data first */
    1931      1300601 :     for (block_id = 0; block_id <= decoded->max_block_id; block_id++)
    1932              :     {
    1933       684913 :         DecodedBkpBlock *blk = &decoded->blocks[block_id];
    1934              : 
    1935       684913 :         if (!blk->in_use)
    1936          169 :             continue;
    1937              : 
    1938              :         Assert(blk->has_image || !blk->apply_image);
    1939              : 
    1940       684744 :         if (blk->has_image)
    1941              :         {
    1942              :             /* no need to align image */
    1943        19427 :             blk->bkp_image = out;
    1944        19427 :             memcpy(out, ptr, blk->bimg_len);
    1945        19427 :             ptr += blk->bimg_len;
    1946        19427 :             out += blk->bimg_len;
    1947              :         }
    1948       684744 :         if (blk->has_data)
    1949              :         {
    1950       499637 :             out = (char *) MAXALIGN(out);
    1951       499637 :             blk->data = out;
    1952       499637 :             memcpy(blk->data, ptr, blk->data_len);
    1953       499637 :             ptr += blk->data_len;
    1954       499637 :             out += blk->data_len;
    1955              :         }
    1956              :     }
    1957              : 
    1958              :     /* and finally, the main data */
    1959       615688 :     if (decoded->main_data_len > 0)
    1960              :     {
    1961       599366 :         out = (char *) MAXALIGN(out);
    1962       599366 :         decoded->main_data = out;
    1963       599366 :         memcpy(decoded->main_data, ptr, decoded->main_data_len);
    1964       599366 :         ptr += decoded->main_data_len;
    1965       599366 :         out += decoded->main_data_len;
    1966              :     }
    1967              : 
    1968              :     /* Report the actual size we used. */
    1969       615688 :     decoded->size = MAXALIGN(out - (char *) decoded);
    1970              :     Assert(DecodeXLogRecordRequiredSpace(record->xl_tot_len) >=
    1971              :            decoded->size);
    1972              : 
    1973       615688 :     return true;
    1974              : 
    1975            0 : shortdata_err:
    1976            0 :     report_invalid_record(state,
    1977              :                           "record with invalid length at %X/%08X",
    1978            0 :                           LSN_FORMAT_ARGS(state->ReadRecPtr));
    1979            0 : err:
    1980            0 :     *errormsg = state->errormsg_buf;
    1981              : 
    1982            0 :     return false;
    1983              : }
    1984              : 
    1985              : /*
    1986              :  * Returns information about the block that a block reference refers to.
    1987              :  *
    1988              :  * This is like XLogRecGetBlockTagExtended, except that the block reference
    1989              :  * must exist and there's no access to prefetch_buffer.
    1990              :  */
    1991              : void
    1992            0 : XLogRecGetBlockTag(XLogReaderState *record, uint8 block_id,
    1993              :                    RelFileLocator *rlocator, ForkNumber *forknum,
    1994              :                    BlockNumber *blknum)
    1995              : {
    1996            0 :     if (!XLogRecGetBlockTagExtended(record, block_id, rlocator, forknum,
    1997              :                                     blknum, NULL))
    1998              :     {
    1999              : #ifndef FRONTEND
    2000              :         elog(ERROR, "could not locate backup block with ID %d in WAL record",
    2001              :              block_id);
    2002              : #else
    2003            0 :         pg_fatal("could not locate backup block with ID %d in WAL record",
    2004              :                  block_id);
    2005              : #endif
    2006              :     }
    2007            0 : }
    2008              : 
    2009              : /*
    2010              :  * Returns information about the block that a block reference refers to,
    2011              :  * optionally including the buffer that the block may already be in.
    2012              :  *
    2013              :  * If the WAL record contains a block reference with the given ID, *rlocator,
    2014              :  * *forknum, *blknum and *prefetch_buffer are filled in (if not NULL), and
    2015              :  * returns true.  Otherwise returns false.
    2016              :  */
    2017              : bool
    2018       427717 : XLogRecGetBlockTagExtended(XLogReaderState *record, uint8 block_id,
    2019              :                            RelFileLocator *rlocator, ForkNumber *forknum,
    2020              :                            BlockNumber *blknum,
    2021              :                            Buffer *prefetch_buffer)
    2022              : {
    2023              :     DecodedBkpBlock *bkpb;
    2024              : 
    2025       427717 :     if (!XLogRecHasBlockRef(record, block_id))
    2026          117 :         return false;
    2027              : 
    2028       427600 :     bkpb = &record->record->blocks[block_id];
    2029       427600 :     if (rlocator)
    2030       427600 :         *rlocator = bkpb->rlocator;
    2031       427600 :     if (forknum)
    2032       427600 :         *forknum = bkpb->forknum;
    2033       427600 :     if (blknum)
    2034       427600 :         *blknum = bkpb->blkno;
    2035       427600 :     if (prefetch_buffer)
    2036            0 :         *prefetch_buffer = bkpb->prefetch_buffer;
    2037       427600 :     return true;
    2038              : }
    2039              : 
    2040              : /*
    2041              :  * Returns the data associated with a block reference, or NULL if there is
    2042              :  * no data (e.g. because a full-page image was taken instead). The returned
    2043              :  * pointer points to a MAXALIGNed buffer.
    2044              :  */
    2045              : char *
    2046         1384 : XLogRecGetBlockData(XLogReaderState *record, uint8 block_id, Size *len)
    2047              : {
    2048              :     DecodedBkpBlock *bkpb;
    2049              : 
    2050         1384 :     if (block_id > record->record->max_block_id ||
    2051         1384 :         !record->record->blocks[block_id].in_use)
    2052            0 :         return NULL;
    2053              : 
    2054         1384 :     bkpb = &record->record->blocks[block_id];
    2055              : 
    2056         1384 :     if (!bkpb->has_data)
    2057              :     {
    2058            0 :         if (len)
    2059            0 :             *len = 0;
    2060            0 :         return NULL;
    2061              :     }
    2062              :     else
    2063              :     {
    2064         1384 :         if (len)
    2065         1224 :             *len = bkpb->data_len;
    2066         1384 :         return bkpb->data;
    2067              :     }
    2068              : }
    2069              : 
    2070              : /*
    2071              :  * Restore a full-page image from a backup block attached to an XLOG record.
    2072              :  *
    2073              :  * Returns true if a full-page image is restored, and false on failure with
    2074              :  * an error to be consumed by the caller.
    2075              :  */
    2076              : bool
    2077            1 : RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page)
    2078              : {
    2079              :     DecodedBkpBlock *bkpb;
    2080              :     char       *ptr;
    2081              :     PGAlignedBlock tmp;
    2082              : 
    2083            1 :     if (block_id > record->record->max_block_id ||
    2084            1 :         !record->record->blocks[block_id].in_use)
    2085              :     {
    2086            0 :         report_invalid_record(record,
    2087              :                               "could not restore image at %X/%08X with invalid block %d specified",
    2088            0 :                               LSN_FORMAT_ARGS(record->ReadRecPtr),
    2089              :                               block_id);
    2090            0 :         return false;
    2091              :     }
    2092            1 :     if (!record->record->blocks[block_id].has_image)
    2093              :     {
    2094            0 :         report_invalid_record(record, "could not restore image at %X/%08X with invalid state, block %d",
    2095            0 :                               LSN_FORMAT_ARGS(record->ReadRecPtr),
    2096              :                               block_id);
    2097            0 :         return false;
    2098              :     }
    2099              : 
    2100            1 :     bkpb = &record->record->blocks[block_id];
    2101            1 :     ptr = bkpb->bkp_image;
    2102              : 
    2103            1 :     if (BKPIMAGE_COMPRESSED(bkpb->bimg_info))
    2104              :     {
    2105              :         /* If a backup block image is compressed, decompress it */
    2106            0 :         bool        decomp_success = true;
    2107              : 
    2108            0 :         if ((bkpb->bimg_info & BKPIMAGE_COMPRESS_PGLZ) != 0)
    2109              :         {
    2110            0 :             if (pglz_decompress(ptr, bkpb->bimg_len, tmp.data,
    2111            0 :                                 BLCKSZ - bkpb->hole_length, true) < 0)
    2112            0 :                 decomp_success = false;
    2113              :         }
    2114            0 :         else if ((bkpb->bimg_info & BKPIMAGE_COMPRESS_LZ4) != 0)
    2115              :         {
    2116              : #ifdef USE_LZ4
    2117            0 :             if (LZ4_decompress_safe(ptr, tmp.data,
    2118            0 :                                     bkpb->bimg_len, BLCKSZ - bkpb->hole_length) <= 0)
    2119            0 :                 decomp_success = false;
    2120              : #else
    2121              :             report_invalid_record(record, "could not restore image at %X/%08X compressed with %s not supported by build, block %d",
    2122              :                                   LSN_FORMAT_ARGS(record->ReadRecPtr),
    2123              :                                   "LZ4",
    2124              :                                   block_id);
    2125              :             return false;
    2126              : #endif
    2127              :         }
    2128            0 :         else if ((bkpb->bimg_info & BKPIMAGE_COMPRESS_ZSTD) != 0)
    2129              :         {
    2130              : #ifdef USE_ZSTD
    2131              :             size_t      decomp_result = ZSTD_decompress(tmp.data,
    2132              :                                                         BLCKSZ - bkpb->hole_length,
    2133              :                                                         ptr, bkpb->bimg_len);
    2134              : 
    2135              :             if (ZSTD_isError(decomp_result))
    2136              :                 decomp_success = false;
    2137              : #else
    2138            0 :             report_invalid_record(record, "could not restore image at %X/%08X compressed with %s not supported by build, block %d",
    2139            0 :                                   LSN_FORMAT_ARGS(record->ReadRecPtr),
    2140              :                                   "zstd",
    2141              :                                   block_id);
    2142            0 :             return false;
    2143              : #endif
    2144              :         }
    2145              :         else
    2146              :         {
    2147            0 :             report_invalid_record(record, "could not restore image at %X/%08X compressed with unknown method, block %d",
    2148            0 :                                   LSN_FORMAT_ARGS(record->ReadRecPtr),
    2149              :                                   block_id);
    2150            0 :             return false;
    2151              :         }
    2152              : 
    2153            0 :         if (!decomp_success)
    2154              :         {
    2155            0 :             report_invalid_record(record, "could not decompress image at %X/%08X, block %d",
    2156            0 :                                   LSN_FORMAT_ARGS(record->ReadRecPtr),
    2157              :                                   block_id);
    2158            0 :             return false;
    2159              :         }
    2160              : 
    2161            0 :         ptr = tmp.data;
    2162              :     }
    2163              : 
    2164              :     /* generate page, taking into account hole if necessary */
    2165            1 :     if (bkpb->hole_length == 0)
    2166              :     {
    2167            0 :         memcpy(page, ptr, BLCKSZ);
    2168              :     }
    2169              :     else
    2170              :     {
    2171            1 :         memcpy(page, ptr, bkpb->hole_offset);
    2172              :         /* must zero-fill the hole */
    2173            1 :         MemSet(page + bkpb->hole_offset, 0, bkpb->hole_length);
    2174            1 :         memcpy(page + (bkpb->hole_offset + bkpb->hole_length),
    2175            1 :                ptr + bkpb->hole_offset,
    2176            1 :                BLCKSZ - (bkpb->hole_offset + bkpb->hole_length));
    2177              :     }
    2178              : 
    2179            1 :     return true;
    2180              : }
    2181              : 
    2182              : #ifndef FRONTEND
    2183              : 
    2184              : /*
    2185              :  * Extract the FullTransactionId from a WAL record.
    2186              :  */
    2187              : FullTransactionId
    2188              : XLogRecGetFullXid(XLogReaderState *record)
    2189              : {
    2190              :     /*
    2191              :      * This function is only safe during replay, because it depends on the
    2192              :      * replay state.  See AdvanceNextFullTransactionIdPastXid() for more.
    2193              :      */
    2194              :     Assert(AmStartupProcess() || !IsUnderPostmaster);
    2195              : 
    2196              :     return FullTransactionIdFromAllowableAt(TransamVariables->nextXid,
    2197              :                                             XLogRecGetXid(record));
    2198              : }
    2199              : 
    2200              : #endif
        

Generated by: LCOV version 2.0-1