LCOV - code coverage report
Current view: top level - src/backend/replication/logical - decode.c (source / functions) Hit Total Coverage
Test: PostgreSQL 13devel Lines: 320 349 91.7 %
Date: 2019-11-15 23:07:02 Functions: 17 17 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /* -------------------------------------------------------------------------
       2             :  *
       3             :  * decode.c
       4             :  *      This module decodes WAL records read using xlogreader.h's APIs for the
       5             :  *      purpose of logical decoding by passing information to the
       6             :  *      reorderbuffer module (containing the actual changes) and to the
       7             :  *      snapbuild module to build a fitting catalog snapshot (to be able to
       8             :  *      properly decode the changes in the reorderbuffer).
       9             :  *
      10             :  * NOTE:
      11             :  *      This basically tries to handle all low level xlog stuff for
      12             :  *      reorderbuffer.c and snapbuild.c. There's some minor leakage where a
      13             :  *      specific record's struct is used to pass data along, but those just
      14             :  *      happen to contain the right amount of data in a convenient
      15             :  *      format. There isn't and shouldn't be much intelligence about the
      16             :  *      contents of records in here except turning them into a more usable
      17             :  *      format.
      18             :  *
      19             :  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
      20             :  * Portions Copyright (c) 1994, Regents of the University of California
      21             :  *
      22             :  * IDENTIFICATION
      23             :  *    src/backend/replication/logical/decode.c
      24             :  *
      25             :  * -------------------------------------------------------------------------
      26             :  */
      27             : #include "postgres.h"
      28             : 
      29             : #include "access/heapam.h"
      30             : #include "access/heapam_xlog.h"
      31             : #include "access/transam.h"
      32             : #include "access/xact.h"
      33             : #include "access/xlog_internal.h"
      34             : #include "access/xlogreader.h"
      35             : #include "access/xlogrecord.h"
      36             : #include "access/xlogutils.h"
      37             : #include "catalog/pg_control.h"
      38             : #include "replication/decode.h"
      39             : #include "replication/logical.h"
      40             : #include "replication/message.h"
      41             : #include "replication/origin.h"
      42             : #include "replication/reorderbuffer.h"
      43             : #include "replication/snapbuild.h"
      44             : #include "storage/standby.h"
      45             : 
      46             : typedef struct XLogRecordBuffer
      47             : {
      48             :     XLogRecPtr  origptr;
      49             :     XLogRecPtr  endptr;
      50             :     XLogReaderState *record;
      51             : } XLogRecordBuffer;
      52             : 
      53             : /* RMGR Handlers */
      54             : static void DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      55             : static void DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      56             : static void DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      57             : static void DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      58             : static void DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      59             : static void DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      60             : 
      61             : /* individual record(group)'s handlers */
      62             : static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      63             : static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      64             : static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      65             : static void DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      66             : static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      67             : static void DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      68             : 
      69             : static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
      70             :                          xl_xact_parsed_commit *parsed, TransactionId xid);
      71             : static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
      72             :                         xl_xact_parsed_abort *parsed, TransactionId xid);
      73             : 
      74             : /* common function to decode tuples */
      75             : static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
      76             : 
      77             : /*
      78             :  * Take every XLogReadRecord()ed record and perform the actions required to
      79             :  * decode it using the output plugin already setup in the logical decoding
      80             :  * context.
      81             :  *
      82             :  * NB: Note that every record's xid needs to be processed by reorderbuffer
      83             :  * (xids contained in the content of records are not relevant for this rule).
      84             :  * That means that for records which'd otherwise not go through the
      85             :  * reorderbuffer ReorderBufferProcessXid() has to be called. We don't want to
      86             :  * call ReorderBufferProcessXid for each record type by default, because
      87             :  * e.g. empty xacts can be handled more efficiently if there's no previous
      88             :  * state for them.
      89             :  *
      90             :  * We also support the ability to fast forward thru records, skipping some
      91             :  * record types completely - see individual record types for details.
      92             :  */
      93             : void
      94     3190342 : LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
      95             : {
      96             :     XLogRecordBuffer buf;
      97             : 
      98     3190342 :     buf.origptr = ctx->reader->ReadRecPtr;
      99     3190342 :     buf.endptr = ctx->reader->EndRecPtr;
     100     3190342 :     buf.record = record;
     101             : 
     102             :     /* cast so we get a warning when new rmgrs are added */
     103     3190342 :     switch ((RmgrIds) XLogRecGetRmid(record))
     104             :     {
     105             :             /*
     106             :              * Rmgrs we care about for logical decoding. Add new rmgrs in
     107             :              * rmgrlist.h's order.
     108             :              */
     109             :         case RM_XLOG_ID:
     110        2270 :             DecodeXLogOp(ctx, &buf);
     111        2270 :             break;
     112             : 
     113             :         case RM_XACT_ID:
     114        2788 :             DecodeXactOp(ctx, &buf);
     115        2788 :             break;
     116             : 
     117             :         case RM_STANDBY_ID:
     118        2886 :             DecodeStandbyOp(ctx, &buf);
     119        2886 :             break;
     120             : 
     121             :         case RM_HEAP2_ID:
     122       28140 :             DecodeHeap2Op(ctx, &buf);
     123       28140 :             break;
     124             : 
     125             :         case RM_HEAP_ID:
     126     2602734 :             DecodeHeapOp(ctx, &buf);
     127     2602734 :             break;
     128             : 
     129             :         case RM_LOGICALMSG_ID:
     130          64 :             DecodeLogicalMsgOp(ctx, &buf);
     131          64 :             break;
     132             : 
     133             :             /*
     134             :              * Rmgrs irrelevant for logical decoding; they describe stuff not
     135             :              * represented in logical decoding. Add new rmgrs in rmgrlist.h's
     136             :              * order.
     137             :              */
     138             :         case RM_SMGR_ID:
     139             :         case RM_CLOG_ID:
     140             :         case RM_DBASE_ID:
     141             :         case RM_TBLSPC_ID:
     142             :         case RM_MULTIXACT_ID:
     143             :         case RM_RELMAP_ID:
     144             :         case RM_BTREE_ID:
     145             :         case RM_HASH_ID:
     146             :         case RM_GIN_ID:
     147             :         case RM_GIST_ID:
     148             :         case RM_SEQ_ID:
     149             :         case RM_SPGIST_ID:
     150             :         case RM_BRIN_ID:
     151             :         case RM_COMMIT_TS_ID:
     152             :         case RM_REPLORIGIN_ID:
     153             :         case RM_GENERIC_ID:
     154             :             /* just deal with xid, and done */
     155      551460 :             ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(record),
     156             :                                     buf.origptr);
     157      551460 :             break;
     158             :         case RM_NEXT_ID:
     159           0 :             elog(ERROR, "unexpected RM_NEXT_ID rmgr_id: %u", (RmgrIds) XLogRecGetRmid(buf.record));
     160             :     }
     161     3190342 : }
     162             : 
     163             : /*
     164             :  * Handle rmgr XLOG_ID records for DecodeRecordIntoReorderBuffer().
     165             :  */
     166             : static void
     167        2270 : DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     168             : {
     169        2270 :     SnapBuild  *builder = ctx->snapshot_builder;
     170        2270 :     uint8       info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK;
     171             : 
     172        2270 :     ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(buf->record),
     173             :                             buf->origptr);
     174             : 
     175        2270 :     switch (info)
     176             :     {
     177             :             /* this is also used in END_OF_RECOVERY checkpoints */
     178             :         case XLOG_CHECKPOINT_SHUTDOWN:
     179             :         case XLOG_END_OF_RECOVERY:
     180          16 :             SnapBuildSerializationPoint(builder, buf->origptr);
     181             : 
     182          16 :             break;
     183             :         case XLOG_CHECKPOINT_ONLINE:
     184             : 
     185             :             /*
     186             :              * a RUNNING_XACTS record will have been logged near to this, we
     187             :              * can restart from there.
     188             :              */
     189          46 :             break;
     190             :         case XLOG_NOOP:
     191             :         case XLOG_NEXTOID:
     192             :         case XLOG_SWITCH:
     193             :         case XLOG_BACKUP_END:
     194             :         case XLOG_PARAMETER_CHANGE:
     195             :         case XLOG_RESTORE_POINT:
     196             :         case XLOG_FPW_CHANGE:
     197             :         case XLOG_FPI_FOR_HINT:
     198             :         case XLOG_FPI:
     199        2208 :             break;
     200             :         default:
     201           0 :             elog(ERROR, "unexpected RM_XLOG_ID record type: %u", info);
     202             :     }
     203        2270 : }
     204             : 
     205             : /*
     206             :  * Handle rmgr XACT_ID records for DecodeRecordIntoReorderBuffer().
     207             :  */
     208             : static void
     209        2788 : DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     210             : {
     211        2788 :     SnapBuild  *builder = ctx->snapshot_builder;
     212        2788 :     ReorderBuffer *reorder = ctx->reorder;
     213        2788 :     XLogReaderState *r = buf->record;
     214        2788 :     uint8       info = XLogRecGetInfo(r) & XLOG_XACT_OPMASK;
     215             : 
     216             :     /*
     217             :      * If the snapshot isn't yet fully built, we cannot decode anything, so
     218             :      * bail out.
     219             :      *
     220             :      * However, it's critical to process XLOG_XACT_ASSIGNMENT records even
     221             :      * when the snapshot is being built: it is possible to get later records
     222             :      * that require subxids to be properly assigned.
     223             :      */
     224        2788 :     if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT &&
     225             :         info != XLOG_XACT_ASSIGNMENT)
     226          10 :         return;
     227             : 
     228        2778 :     switch (info)
     229             :     {
     230             :         case XLOG_XACT_COMMIT:
     231             :         case XLOG_XACT_COMMIT_PREPARED:
     232             :             {
     233             :                 xl_xact_commit *xlrec;
     234             :                 xl_xact_parsed_commit parsed;
     235             :                 TransactionId xid;
     236             : 
     237        2508 :                 xlrec = (xl_xact_commit *) XLogRecGetData(r);
     238        2508 :                 ParseCommitRecord(XLogRecGetInfo(buf->record), xlrec, &parsed);
     239             : 
     240        2508 :                 if (!TransactionIdIsValid(parsed.twophase_xid))
     241        2504 :                     xid = XLogRecGetXid(r);
     242             :                 else
     243           4 :                     xid = parsed.twophase_xid;
     244             : 
     245        2508 :                 DecodeCommit(ctx, buf, &parsed, xid);
     246        2508 :                 break;
     247             :             }
     248             :         case XLOG_XACT_ABORT:
     249             :         case XLOG_XACT_ABORT_PREPARED:
     250             :             {
     251             :                 xl_xact_abort *xlrec;
     252             :                 xl_xact_parsed_abort parsed;
     253             :                 TransactionId xid;
     254             : 
     255          46 :                 xlrec = (xl_xact_abort *) XLogRecGetData(r);
     256          46 :                 ParseAbortRecord(XLogRecGetInfo(buf->record), xlrec, &parsed);
     257             : 
     258          46 :                 if (!TransactionIdIsValid(parsed.twophase_xid))
     259          44 :                     xid = XLogRecGetXid(r);
     260             :                 else
     261           2 :                     xid = parsed.twophase_xid;
     262             : 
     263          46 :                 DecodeAbort(ctx, buf, &parsed, xid);
     264          46 :                 break;
     265             :             }
     266             :         case XLOG_XACT_ASSIGNMENT:
     267             :             {
     268             :                 xl_xact_assignment *xlrec;
     269             :                 int         i;
     270             :                 TransactionId *sub_xid;
     271             : 
     272         218 :                 xlrec = (xl_xact_assignment *) XLogRecGetData(r);
     273             : 
     274         218 :                 sub_xid = &xlrec->xsub[0];
     275             : 
     276        1444 :                 for (i = 0; i < xlrec->nsubxacts; i++)
     277             :                 {
     278        2452 :                     ReorderBufferAssignChild(reorder, xlrec->xtop,
     279        1226 :                                              *(sub_xid++), buf->origptr);
     280             :                 }
     281         218 :                 break;
     282             :             }
     283             :         case XLOG_XACT_PREPARE:
     284             : 
     285             :             /*
     286             :              * Currently decoding ignores PREPARE TRANSACTION and will just
     287             :              * decode the transaction when the COMMIT PREPARED is sent or
     288             :              * throw away the transaction's contents when a ROLLBACK PREPARED
     289             :              * is received. In the future we could add code to expose prepared
     290             :              * transactions in the changestream allowing for a kind of
     291             :              * distributed 2PC.
     292             :              */
     293           6 :             ReorderBufferProcessXid(reorder, XLogRecGetXid(r), buf->origptr);
     294           6 :             break;
     295             :         default:
     296           0 :             elog(ERROR, "unexpected RM_XACT_ID record type: %u", info);
     297             :     }
     298             : }
     299             : 
     300             : /*
     301             :  * Handle rmgr STANDBY_ID records for DecodeRecordIntoReorderBuffer().
     302             :  */
     303             : static void
     304        2886 : DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     305             : {
     306        2886 :     SnapBuild  *builder = ctx->snapshot_builder;
     307        2886 :     XLogReaderState *r = buf->record;
     308        2886 :     uint8       info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
     309             : 
     310        2886 :     ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
     311             : 
     312        2886 :     switch (info)
     313             :     {
     314             :         case XLOG_RUNNING_XACTS:
     315             :             {
     316         692 :                 xl_running_xacts *running = (xl_running_xacts *) XLogRecGetData(r);
     317             : 
     318         692 :                 SnapBuildProcessRunningXacts(builder, buf->origptr, running);
     319             : 
     320             :                 /*
     321             :                  * Abort all transactions that we keep track of, that are
     322             :                  * older than the record's oldestRunningXid. This is the most
     323             :                  * convenient spot for doing so since, in contrast to shutdown
     324             :                  * or end-of-recovery checkpoints, we have information about
     325             :                  * all running transactions which includes prepared ones,
     326             :                  * while shutdown checkpoints just know that no non-prepared
     327             :                  * transactions are in progress.
     328             :                  */
     329         692 :                 ReorderBufferAbortOld(ctx->reorder, running->oldestRunningXid);
     330             :             }
     331         692 :             break;
     332             :         case XLOG_STANDBY_LOCK:
     333        2194 :             break;
     334             :         case XLOG_INVALIDATIONS:
     335             :             {
     336           0 :                 xl_invalidations *invalidations =
     337             :                 (xl_invalidations *) XLogRecGetData(r);
     338             : 
     339           0 :                 if (!ctx->fast_forward)
     340           0 :                     ReorderBufferImmediateInvalidation(ctx->reorder,
     341           0 :                                                        invalidations->nmsgs,
     342           0 :                                                        invalidations->msgs);
     343             :             }
     344           0 :             break;
     345             :         default:
     346           0 :             elog(ERROR, "unexpected RM_STANDBY_ID record type: %u", info);
     347             :     }
     348        2886 : }
     349             : 
     350             : /*
     351             :  * Handle rmgr HEAP2_ID records for DecodeRecordIntoReorderBuffer().
     352             :  */
     353             : static void
     354       28140 : DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     355             : {
     356       28140 :     uint8       info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK;
     357       28140 :     TransactionId xid = XLogRecGetXid(buf->record);
     358       28140 :     SnapBuild  *builder = ctx->snapshot_builder;
     359             : 
     360       28140 :     ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
     361             : 
     362             :     /*
     363             :      * If we don't have snapshot or we are just fast-forwarding, there is no
     364             :      * point in decoding changes.
     365             :      */
     366       56260 :     if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
     367       28120 :         ctx->fast_forward)
     368          20 :         return;
     369             : 
     370       28120 :     switch (info)
     371             :     {
     372             :         case XLOG_HEAP2_MULTI_INSERT:
     373          32 :             if (!ctx->fast_forward &&
     374          16 :                 SnapBuildProcessChange(builder, xid, buf->origptr))
     375          16 :                 DecodeMultiInsert(ctx, buf);
     376          16 :             break;
     377             :         case XLOG_HEAP2_NEW_CID:
     378             :             {
     379             :                 xl_heap_new_cid *xlrec;
     380             : 
     381       27260 :                 xlrec = (xl_heap_new_cid *) XLogRecGetData(buf->record);
     382       27260 :                 SnapBuildProcessNewCid(builder, xid, buf->origptr, xlrec);
     383             : 
     384       27260 :                 break;
     385             :             }
     386             :         case XLOG_HEAP2_REWRITE:
     387             : 
     388             :             /*
     389             :              * Although these records only exist to serve the needs of logical
     390             :              * decoding, all the work happens as part of crash or archive
     391             :              * recovery, so we don't need to do anything here.
     392             :              */
     393         178 :             break;
     394             : 
     395             :             /*
     396             :              * Everything else here is just low level physical stuff we're not
     397             :              * interested in.
     398             :              */
     399             :         case XLOG_HEAP2_FREEZE_PAGE:
     400             :         case XLOG_HEAP2_CLEAN:
     401             :         case XLOG_HEAP2_CLEANUP_INFO:
     402             :         case XLOG_HEAP2_VISIBLE:
     403             :         case XLOG_HEAP2_LOCK_UPDATED:
     404         666 :             break;
     405             :         default:
     406           0 :             elog(ERROR, "unexpected RM_HEAP2_ID record type: %u", info);
     407             :     }
     408             : }
     409             : 
     410             : /*
     411             :  * Handle rmgr HEAP_ID records for DecodeRecordIntoReorderBuffer().
     412             :  */
     413             : static void
     414     2602734 : DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     415             : {
     416     2602734 :     uint8       info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK;
     417     2602734 :     TransactionId xid = XLogRecGetXid(buf->record);
     418     2602734 :     SnapBuild  *builder = ctx->snapshot_builder;
     419             : 
     420     2602734 :     ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
     421             : 
     422             :     /*
     423             :      * If we don't have snapshot or we are just fast-forwarding, there is no
     424             :      * point in decoding data changes.
     425             :      */
     426     5205460 :     if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
     427     2602726 :         ctx->fast_forward)
     428          22 :         return;
     429             : 
     430     2602712 :     switch (info)
     431             :     {
     432             :         case XLOG_HEAP_INSERT:
     433     1925604 :             if (SnapBuildProcessChange(builder, xid, buf->origptr))
     434     1925604 :                 DecodeInsert(ctx, buf);
     435     1925604 :             break;
     436             : 
     437             :             /*
     438             :              * Treat HOT update as normal updates. There is no useful
     439             :              * information in the fact that we could make it a HOT update
     440             :              * locally and the WAL layout is compatible.
     441             :              */
     442             :         case XLOG_HEAP_HOT_UPDATE:
     443             :         case XLOG_HEAP_UPDATE:
     444      174118 :             if (SnapBuildProcessChange(builder, xid, buf->origptr))
     445      174118 :                 DecodeUpdate(ctx, buf);
     446      174118 :             break;
     447             : 
     448             :         case XLOG_HEAP_DELETE:
     449      247092 :             if (SnapBuildProcessChange(builder, xid, buf->origptr))
     450      247092 :                 DecodeDelete(ctx, buf);
     451      247092 :             break;
     452             : 
     453             :         case XLOG_HEAP_TRUNCATE:
     454          16 :             if (SnapBuildProcessChange(builder, xid, buf->origptr))
     455          16 :                 DecodeTruncate(ctx, buf);
     456          16 :             break;
     457             : 
     458             :         case XLOG_HEAP_INPLACE:
     459             : 
     460             :             /*
     461             :              * Inplace updates are only ever performed on catalog tuples and
     462             :              * can, per definition, not change tuple visibility.  Since we
     463             :              * don't decode catalog tuples, we're not interested in the
     464             :              * record's contents.
     465             :              *
     466             :              * In-place updates can be used either by XID-bearing transactions
     467             :              * (e.g.  in CREATE INDEX CONCURRENTLY) or by XID-less
     468             :              * transactions (e.g.  VACUUM).  In the former case, the commit
     469             :              * record will include cache invalidations, so we mark the
     470             :              * transaction as catalog modifying here. Currently that's
     471             :              * redundant because the commit will do that as well, but once we
     472             :              * support decoding in-progress relations, this will be important.
     473             :              */
     474        1206 :             if (!TransactionIdIsValid(xid))
     475           6 :                 break;
     476             : 
     477        1200 :             SnapBuildProcessChange(builder, xid, buf->origptr);
     478        1200 :             ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr);
     479        1200 :             break;
     480             : 
     481             :         case XLOG_HEAP_CONFIRM:
     482       35800 :             if (SnapBuildProcessChange(builder, xid, buf->origptr))
     483       35800 :                 DecodeSpecConfirm(ctx, buf);
     484       35800 :             break;
     485             : 
     486             :         case XLOG_HEAP_LOCK:
     487             :             /* we don't care about row level locks for now */
     488      218876 :             break;
     489             : 
     490             :         default:
     491           0 :             elog(ERROR, "unexpected RM_HEAP_ID record type: %u", info);
     492             :             break;
     493             :     }
     494             : }
     495             : 
     496             : static inline bool
     497     2365442 : FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
     498             : {
     499     2365442 :     if (ctx->callbacks.filter_by_origin_cb == NULL)
     500           0 :         return false;
     501             : 
     502     2365442 :     return filter_by_origin_cb_wrapper(ctx, origin_id);
     503             : }
     504             : 
     505             : /*
     506             :  * Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer().
     507             :  */
     508             : static void
     509          64 : DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     510             : {
     511          64 :     SnapBuild  *builder = ctx->snapshot_builder;
     512          64 :     XLogReaderState *r = buf->record;
     513          64 :     TransactionId xid = XLogRecGetXid(r);
     514          64 :     uint8       info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
     515          64 :     RepOriginId origin_id = XLogRecGetOrigin(r);
     516             :     Snapshot    snapshot;
     517             :     xl_logical_message *message;
     518             : 
     519          64 :     if (info != XLOG_LOGICAL_MESSAGE)
     520           0 :         elog(ERROR, "unexpected RM_LOGICALMSG_ID record type: %u", info);
     521             : 
     522          64 :     ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
     523             : 
     524             :     /*
     525             :      * If we don't have snapshot or we are just fast-forwarding, there is no
     526             :      * point in decoding messages.
     527             :      */
     528         128 :     if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
     529          64 :         ctx->fast_forward)
     530           0 :         return;
     531             : 
     532          64 :     message = (xl_logical_message *) XLogRecGetData(r);
     533             : 
     534         124 :     if (message->dbId != ctx->slot->data.database ||
     535          60 :         FilterByOrigin(ctx, origin_id))
     536           8 :         return;
     537             : 
     538         100 :     if (message->transactional &&
     539          44 :         !SnapBuildProcessChange(builder, xid, buf->origptr))
     540           0 :         return;
     541          68 :     else if (!message->transactional &&
     542          24 :              (SnapBuildCurrentState(builder) != SNAPBUILD_CONSISTENT ||
     543          12 :               SnapBuildXactNeedsSkip(builder, buf->origptr)))
     544           6 :         return;
     545             : 
     546          50 :     snapshot = SnapBuildGetOrBuildSnapshot(builder, xid);
     547         100 :     ReorderBufferQueueMessage(ctx->reorder, xid, snapshot, buf->endptr,
     548          50 :                               message->transactional,
     549          50 :                               message->message, /* first part of message is
     550             :                                                  * prefix */
     551             :                               message->message_size,
     552          50 :                               message->message + message->prefix_size);
     553             : }
     554             : 
     555             : /*
     556             :  * Consolidated commit record handling between the different form of commit
     557             :  * records.
     558             :  */
     559             : static void
     560        2508 : DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
     561             :              xl_xact_parsed_commit *parsed, TransactionId xid)
     562             : {
     563        2508 :     XLogRecPtr  origin_lsn = InvalidXLogRecPtr;
     564        2508 :     TimestampTz commit_time = parsed->xact_time;
     565        2508 :     RepOriginId origin_id = XLogRecGetOrigin(buf->record);
     566             :     int         i;
     567             : 
     568        2508 :     if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
     569             :     {
     570           4 :         origin_lsn = parsed->origin_lsn;
     571           4 :         commit_time = parsed->origin_timestamp;
     572             :     }
     573             : 
     574             :     /*
     575             :      * Process invalidation messages, even if we're not interested in the
     576             :      * transaction's contents, since the various caches need to always be
     577             :      * consistent.
     578             :      */
     579        2508 :     if (parsed->nmsgs > 0)
     580             :     {
     581         902 :         if (!ctx->fast_forward)
     582        1804 :             ReorderBufferAddInvalidations(ctx->reorder, xid, buf->origptr,
     583         902 :                                           parsed->nmsgs, parsed->msgs);
     584         902 :         ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr);
     585             :     }
     586             : 
     587        2508 :     SnapBuildCommitTxn(ctx->snapshot_builder, buf->origptr, xid,
     588             :                        parsed->nsubxacts, parsed->subxacts);
     589             : 
     590             :     /* ----
     591             :      * Check whether we are interested in this specific transaction, and tell
     592             :      * the reorderbuffer to forget the content of the (sub-)transactions
     593             :      * if not.
     594             :      *
     595             :      * There can be several reasons we might not be interested in this
     596             :      * transaction:
     597             :      * 1) We might not be interested in decoding transactions up to this
     598             :      *    LSN. This can happen because we previously decoded it and now just
     599             :      *    are restarting or if we haven't assembled a consistent snapshot yet.
     600             :      * 2) The transaction happened in another database.
     601             :      * 3) The output plugin is not interested in the origin.
     602             :      * 4) We are doing fast-forwarding
     603             :      *
     604             :      * We can't just use ReorderBufferAbort() here, because we need to execute
     605             :      * the transaction's invalidations.  This currently won't be needed if
     606             :      * we're just skipping over the transaction because currently we only do
     607             :      * so during startup, to get to the first transaction the client needs. As
     608             :      * we have reset the catalog caches before starting to read WAL, and we
     609             :      * haven't yet touched any catalogs, there can't be anything to invalidate.
     610             :      * But if we're "forgetting" this commit because it's it happened in
     611             :      * another database, the invalidations might be important, because they
     612             :      * could be for shared catalogs and we might have loaded data into the
     613             :      * relevant syscaches.
     614             :      * ---
     615             :      */
     616        3416 :     if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
     617        2722 :         (parsed->dbId != InvalidOid && parsed->dbId != ctx->slot->data.database) ||
     618        1802 :         ctx->fast_forward || FilterByOrigin(ctx, origin_id))
     619             :     {
     620        3034 :         for (i = 0; i < parsed->nsubxacts; i++)
     621             :         {
     622        1420 :             ReorderBufferForget(ctx->reorder, parsed->subxacts[i], buf->origptr);
     623             :         }
     624        1614 :         ReorderBufferForget(ctx->reorder, xid, buf->origptr);
     625             : 
     626        1614 :         return;
     627             :     }
     628             : 
     629             :     /* tell the reorderbuffer about the surviving subtransactions */
     630        1122 :     for (i = 0; i < parsed->nsubxacts; i++)
     631             :     {
     632         228 :         ReorderBufferCommitChild(ctx->reorder, xid, parsed->subxacts[i],
     633             :                                  buf->origptr, buf->endptr);
     634             :     }
     635             : 
     636             :     /* replay actions of all transaction + subtransactions in order */
     637         894 :     ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
     638             :                         commit_time, origin_id, origin_lsn);
     639             : }
     640             : 
     641             : /*
     642             :  * Get the data from the various forms of abort records and pass it on to
     643             :  * snapbuild.c and reorderbuffer.c
     644             :  */
     645             : static void
     646          46 : DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
     647             :             xl_xact_parsed_abort *parsed, TransactionId xid)
     648             : {
     649             :     int         i;
     650             : 
     651          46 :     for (i = 0; i < parsed->nsubxacts; i++)
     652             :     {
     653           0 :         ReorderBufferAbort(ctx->reorder, parsed->subxacts[i],
     654           0 :                            buf->record->EndRecPtr);
     655             :     }
     656             : 
     657          46 :     ReorderBufferAbort(ctx->reorder, xid, buf->record->EndRecPtr);
     658          46 : }
     659             : 
     660             : /*
     661             :  * Parse XLOG_HEAP_INSERT (not MULTI_INSERT!) records into tuplebufs.
     662             :  *
     663             :  * Deletes can contain the new tuple.
     664             :  */
     665             : static void
     666     1925604 : DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     667             : {
     668             :     Size        datalen;
     669             :     char       *tupledata;
     670             :     Size        tuplelen;
     671     1925604 :     XLogReaderState *r = buf->record;
     672             :     xl_heap_insert *xlrec;
     673             :     ReorderBufferChange *change;
     674             :     RelFileNode target_node;
     675             : 
     676     1925604 :     xlrec = (xl_heap_insert *) XLogRecGetData(r);
     677             : 
     678             :     /*
     679             :      * Ignore insert records without new tuples (this does happen when
     680             :      * raw_heap_insert marks the TOAST record as HEAP_INSERT_NO_LOGICAL).
     681             :      */
     682     1925604 :     if (!(xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE))
     683       36324 :         return;
     684             : 
     685             :     /* only interested in our database */
     686     1907448 :     XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL);
     687     1907448 :     if (target_node.dbNode != ctx->slot->data.database)
     688           0 :         return;
     689             : 
     690             :     /* output plugin doesn't look for this origin, no need to queue */
     691     1907448 :     if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
     692          12 :         return;
     693             : 
     694     1907436 :     change = ReorderBufferGetChange(ctx->reorder);
     695     1907436 :     if (!(xlrec->flags & XLH_INSERT_IS_SPECULATIVE))
     696     1871636 :         change->action = REORDER_BUFFER_CHANGE_INSERT;
     697             :     else
     698       35800 :         change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT;
     699     1907436 :     change->origin_id = XLogRecGetOrigin(r);
     700             : 
     701     1907436 :     memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode));
     702             : 
     703     1907436 :     tupledata = XLogRecGetBlockData(r, 0, &datalen);
     704     1907436 :     tuplelen = datalen - SizeOfHeapHeader;
     705             : 
     706     1907436 :     change->data.tp.newtuple =
     707     1907436 :         ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
     708             : 
     709     1907436 :     DecodeXLogTuple(tupledata, datalen, change->data.tp.newtuple);
     710             : 
     711     1907436 :     change->data.tp.clear_toast_afterwards = true;
     712             : 
     713     1907436 :     ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change);
     714             : }
     715             : 
     716             : /*
     717             :  * Parse XLOG_HEAP_UPDATE and XLOG_HEAP_HOT_UPDATE, which have the same layout
     718             :  * in the record, from wal into proper tuplebufs.
     719             :  *
     720             :  * Updates can possibly contain a new tuple and the old primary key.
     721             :  */
     722             : static void
     723      174118 : DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     724             : {
     725      174118 :     XLogReaderState *r = buf->record;
     726             :     xl_heap_update *xlrec;
     727             :     ReorderBufferChange *change;
     728             :     char       *data;
     729             :     RelFileNode target_node;
     730             : 
     731      174118 :     xlrec = (xl_heap_update *) XLogRecGetData(r);
     732             : 
     733             :     /* only interested in our database */
     734      174118 :     XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL);
     735      174118 :     if (target_node.dbNode != ctx->slot->data.database)
     736           0 :         return;
     737             : 
     738             :     /* output plugin doesn't look for this origin, no need to queue */
     739      174118 :     if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
     740           0 :         return;
     741             : 
     742      174118 :     change = ReorderBufferGetChange(ctx->reorder);
     743      174118 :     change->action = REORDER_BUFFER_CHANGE_UPDATE;
     744      174118 :     change->origin_id = XLogRecGetOrigin(r);
     745      174118 :     memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode));
     746             : 
     747      174118 :     if (xlrec->flags & XLH_UPDATE_CONTAINS_NEW_TUPLE)
     748             :     {
     749             :         Size        datalen;
     750             :         Size        tuplelen;
     751             : 
     752      172108 :         data = XLogRecGetBlockData(r, 0, &datalen);
     753             : 
     754      172108 :         tuplelen = datalen - SizeOfHeapHeader;
     755             : 
     756      172108 :         change->data.tp.newtuple =
     757      172108 :             ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
     758             : 
     759      172108 :         DecodeXLogTuple(data, datalen, change->data.tp.newtuple);
     760             :     }
     761             : 
     762      174118 :     if (xlrec->flags & XLH_UPDATE_CONTAINS_OLD)
     763             :     {
     764             :         Size        datalen;
     765             :         Size        tuplelen;
     766             : 
     767             :         /* caution, remaining data in record is not aligned */
     768         764 :         data = XLogRecGetData(r) + SizeOfHeapUpdate;
     769         764 :         datalen = XLogRecGetDataLen(r) - SizeOfHeapUpdate;
     770         764 :         tuplelen = datalen - SizeOfHeapHeader;
     771             : 
     772         764 :         change->data.tp.oldtuple =
     773         764 :             ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
     774             : 
     775         764 :         DecodeXLogTuple(data, datalen, change->data.tp.oldtuple);
     776             :     }
     777             : 
     778      174118 :     change->data.tp.clear_toast_afterwards = true;
     779             : 
     780      174118 :     ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change);
     781             : }
     782             : 
     783             : /*
     784             :  * Parse XLOG_HEAP_DELETE from wal into proper tuplebufs.
     785             :  *
     786             :  * Deletes can possibly contain the old primary key.
     787             :  */
     788             : static void
     789      247092 : DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     790             : {
     791      247092 :     XLogReaderState *r = buf->record;
     792             :     xl_heap_delete *xlrec;
     793             :     ReorderBufferChange *change;
     794             :     RelFileNode target_node;
     795             : 
     796      247092 :     xlrec = (xl_heap_delete *) XLogRecGetData(r);
     797             : 
     798             :     /* only interested in our database */
     799      247092 :     XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL);
     800      247092 :     if (target_node.dbNode != ctx->slot->data.database)
     801           8 :         return;
     802             : 
     803             :     /*
     804             :      * Super deletions are irrelevant for logical decoding, it's driven by the
     805             :      * confirmation records.
     806             :      */
     807      247088 :     if (xlrec->flags & XLH_DELETE_IS_SUPER)
     808           0 :         return;
     809             : 
     810             :     /* output plugin doesn't look for this origin, no need to queue */
     811      247088 :     if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
     812           0 :         return;
     813             : 
     814      247088 :     change = ReorderBufferGetChange(ctx->reorder);
     815      247088 :     change->action = REORDER_BUFFER_CHANGE_DELETE;
     816      247088 :     change->origin_id = XLogRecGetOrigin(r);
     817             : 
     818      247088 :     memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode));
     819             : 
     820             :     /* old primary key stored */
     821      247088 :     if (xlrec->flags & XLH_DELETE_CONTAINS_OLD)
     822             :     {
     823      121258 :         Size        datalen = XLogRecGetDataLen(r) - SizeOfHeapDelete;
     824      121258 :         Size        tuplelen = datalen - SizeOfHeapHeader;
     825             : 
     826             :         Assert(XLogRecGetDataLen(r) > (SizeOfHeapDelete + SizeOfHeapHeader));
     827             : 
     828      121258 :         change->data.tp.oldtuple =
     829      121258 :             ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
     830             : 
     831      121258 :         DecodeXLogTuple((char *) xlrec + SizeOfHeapDelete,
     832             :                         datalen, change->data.tp.oldtuple);
     833             :     }
     834             : 
     835      247088 :     change->data.tp.clear_toast_afterwards = true;
     836             : 
     837      247088 :     ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change);
     838             : }
     839             : 
     840             : /*
     841             :  * Parse XLOG_HEAP_TRUNCATE from wal
     842             :  */
     843             : static void
     844          16 : DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     845             : {
     846          16 :     XLogReaderState *r = buf->record;
     847             :     xl_heap_truncate *xlrec;
     848             :     ReorderBufferChange *change;
     849             : 
     850          16 :     xlrec = (xl_heap_truncate *) XLogRecGetData(r);
     851             : 
     852             :     /* only interested in our database */
     853          16 :     if (xlrec->dbId != ctx->slot->data.database)
     854           0 :         return;
     855             : 
     856             :     /* output plugin doesn't look for this origin, no need to queue */
     857          16 :     if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
     858           0 :         return;
     859             : 
     860          16 :     change = ReorderBufferGetChange(ctx->reorder);
     861          16 :     change->action = REORDER_BUFFER_CHANGE_TRUNCATE;
     862          16 :     change->origin_id = XLogRecGetOrigin(r);
     863          16 :     if (xlrec->flags & XLH_TRUNCATE_CASCADE)
     864           2 :         change->data.truncate.cascade = true;
     865          16 :     if (xlrec->flags & XLH_TRUNCATE_RESTART_SEQS)
     866           4 :         change->data.truncate.restart_seqs = true;
     867          16 :     change->data.truncate.nrelids = xlrec->nrelids;
     868          16 :     change->data.truncate.relids = ReorderBufferGetRelids(ctx->reorder,
     869          16 :                                                           xlrec->nrelids);
     870          16 :     memcpy(change->data.truncate.relids, xlrec->relids,
     871          16 :            xlrec->nrelids * sizeof(Oid));
     872          16 :     ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r),
     873             :                              buf->origptr, change);
     874             : }
     875             : 
     876             : /*
     877             :  * Decode XLOG_HEAP2_MULTI_INSERT_insert record into multiple tuplebufs.
     878             :  *
     879             :  * Currently MULTI_INSERT will always contain the full tuples.
     880             :  */
     881             : static void
     882          16 : DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     883             : {
     884          16 :     XLogReaderState *r = buf->record;
     885             :     xl_heap_multi_insert *xlrec;
     886             :     int         i;
     887             :     char       *data;
     888             :     char       *tupledata;
     889             :     Size        tuplelen;
     890             :     RelFileNode rnode;
     891             : 
     892          16 :     xlrec = (xl_heap_multi_insert *) XLogRecGetData(r);
     893             : 
     894             :     /* only interested in our database */
     895          16 :     XLogRecGetBlockTag(r, 0, &rnode, NULL, NULL);
     896          16 :     if (rnode.dbNode != ctx->slot->data.database)
     897           0 :         return;
     898             : 
     899             :     /* output plugin doesn't look for this origin, no need to queue */
     900          16 :     if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
     901           0 :         return;
     902             : 
     903             :     /*
     904             :      * As multi_insert is not used for catalogs yet, the block should always
     905             :      * have data even if a full-page write of it is taken.
     906             :      */
     907          16 :     tupledata = XLogRecGetBlockData(r, 0, &tuplelen);
     908             :     Assert(tupledata != NULL);
     909             : 
     910          16 :     data = tupledata;
     911        1640 :     for (i = 0; i < xlrec->ntuples; i++)
     912             :     {
     913             :         ReorderBufferChange *change;
     914             :         xl_multi_insert_tuple *xlhdr;
     915             :         int         datalen;
     916             :         ReorderBufferTupleBuf *tuple;
     917             : 
     918        1624 :         change = ReorderBufferGetChange(ctx->reorder);
     919        1624 :         change->action = REORDER_BUFFER_CHANGE_INSERT;
     920        1624 :         change->origin_id = XLogRecGetOrigin(r);
     921             : 
     922        1624 :         memcpy(&change->data.tp.relnode, &rnode, sizeof(RelFileNode));
     923             : 
     924        1624 :         xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(data);
     925        1624 :         data = ((char *) xlhdr) + SizeOfMultiInsertTuple;
     926        1624 :         datalen = xlhdr->datalen;
     927             : 
     928             :         /*
     929             :          * CONTAINS_NEW_TUPLE will always be set currently as multi_insert
     930             :          * isn't used for catalogs, but better be future proof.
     931             :          *
     932             :          * We decode the tuple in pretty much the same way as DecodeXLogTuple,
     933             :          * but since the layout is slightly different, we can't use it here.
     934             :          */
     935        1624 :         if (xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE)
     936             :         {
     937             :             HeapTupleHeader header;
     938             : 
     939        1624 :             change->data.tp.newtuple =
     940        1624 :                 ReorderBufferGetTupleBuf(ctx->reorder, datalen);
     941             : 
     942        1624 :             tuple = change->data.tp.newtuple;
     943        1624 :             header = tuple->tuple.t_data;
     944             : 
     945             :             /* not a disk based tuple */
     946        1624 :             ItemPointerSetInvalid(&tuple->tuple.t_self);
     947             : 
     948             :             /*
     949             :              * We can only figure this out after reassembling the
     950             :              * transactions.
     951             :              */
     952        1624 :             tuple->tuple.t_tableOid = InvalidOid;
     953             : 
     954        1624 :             tuple->tuple.t_len = datalen + SizeofHeapTupleHeader;
     955             : 
     956        1624 :             memset(header, 0, SizeofHeapTupleHeader);
     957             : 
     958        1624 :             memcpy((char *) tuple->tuple.t_data + SizeofHeapTupleHeader,
     959             :                    (char *) data,
     960             :                    datalen);
     961        1624 :             header->t_infomask = xlhdr->t_infomask;
     962        1624 :             header->t_infomask2 = xlhdr->t_infomask2;
     963        1624 :             header->t_hoff = xlhdr->t_hoff;
     964             :         }
     965             : 
     966             :         /*
     967             :          * Reset toast reassembly state only after the last row in the last
     968             :          * xl_multi_insert_tuple record emitted by one heap_multi_insert()
     969             :          * call.
     970             :          */
     971        1872 :         if (xlrec->flags & XLH_INSERT_LAST_IN_MULTI &&
     972         248 :             (i + 1) == xlrec->ntuples)
     973           8 :             change->data.tp.clear_toast_afterwards = true;
     974             :         else
     975        1616 :             change->data.tp.clear_toast_afterwards = false;
     976             : 
     977        1624 :         ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r),
     978             :                                  buf->origptr, change);
     979             : 
     980             :         /* move to the next xl_multi_insert_tuple entry */
     981        1624 :         data += datalen;
     982             :     }
     983             :     Assert(data == tupledata + tuplelen);
     984             : }
     985             : 
     986             : /*
     987             :  * Parse XLOG_HEAP_CONFIRM from wal into a confirmation change.
     988             :  *
     989             :  * This is pretty trivial, all the state essentially already setup by the
     990             :  * speculative insertion.
     991             :  */
     992             : static void
     993       35800 : DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     994             : {
     995       35800 :     XLogReaderState *r = buf->record;
     996             :     ReorderBufferChange *change;
     997             :     RelFileNode target_node;
     998             : 
     999             :     /* only interested in our database */
    1000       35800 :     XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL);
    1001       35800 :     if (target_node.dbNode != ctx->slot->data.database)
    1002           0 :         return;
    1003             : 
    1004             :     /* output plugin doesn't look for this origin, no need to queue */
    1005       35800 :     if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
    1006           0 :         return;
    1007             : 
    1008       35800 :     change = ReorderBufferGetChange(ctx->reorder);
    1009       35800 :     change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM;
    1010       35800 :     change->origin_id = XLogRecGetOrigin(r);
    1011             : 
    1012       35800 :     memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode));
    1013             : 
    1014       35800 :     change->data.tp.clear_toast_afterwards = true;
    1015             : 
    1016       35800 :     ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change);
    1017             : }
    1018             : 
    1019             : 
    1020             : /*
    1021             :  * Read a HeapTuple as WAL logged by heap_insert, heap_update and heap_delete
    1022             :  * (but not by heap_multi_insert) into a tuplebuf.
    1023             :  *
    1024             :  * The size 'len' and the pointer 'data' in the record need to be
    1025             :  * computed outside as they are record specific.
    1026             :  */
    1027             : static void
    1028     2201566 : DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tuple)
    1029             : {
    1030             :     xl_heap_header xlhdr;
    1031     2201566 :     int         datalen = len - SizeOfHeapHeader;
    1032             :     HeapTupleHeader header;
    1033             : 
    1034             :     Assert(datalen >= 0);
    1035             : 
    1036     2201566 :     tuple->tuple.t_len = datalen + SizeofHeapTupleHeader;
    1037     2201566 :     header = tuple->tuple.t_data;
    1038             : 
    1039             :     /* not a disk based tuple */
    1040     2201566 :     ItemPointerSetInvalid(&tuple->tuple.t_self);
    1041             : 
    1042             :     /* we can only figure this out after reassembling the transactions */
    1043     2201566 :     tuple->tuple.t_tableOid = InvalidOid;
    1044             : 
    1045             :     /* data is not stored aligned, copy to aligned storage */
    1046     2201566 :     memcpy((char *) &xlhdr,
    1047             :            data,
    1048             :            SizeOfHeapHeader);
    1049             : 
    1050     2201566 :     memset(header, 0, SizeofHeapTupleHeader);
    1051             : 
    1052     4403132 :     memcpy(((char *) tuple->tuple.t_data) + SizeofHeapTupleHeader,
    1053     2201566 :            data + SizeOfHeapHeader,
    1054             :            datalen);
    1055             : 
    1056     2201566 :     header->t_infomask = xlhdr.t_infomask;
    1057     2201566 :     header->t_infomask2 = xlhdr.t_infomask2;
    1058     2201566 :     header->t_hoff = xlhdr.t_hoff;
    1059     2201566 : }

Generated by: LCOV version 1.13