LCOV - code coverage report
Current view: top level - src/backend/replication/logical - decode.c (source / functions) Hit Total Coverage
Test: PostgreSQL 15devel Lines: 414 446 92.8 %
Date: 2021-12-09 04:09:06 Functions: 20 20 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /* -------------------------------------------------------------------------
       2             :  *
       3             :  * decode.c
       4             :  *      This module decodes WAL records read using xlogreader.h's APIs for the
       5             :  *      purpose of logical decoding by passing information to the
       6             :  *      reorderbuffer module (containing the actual changes) and to the
       7             :  *      snapbuild module to build a fitting catalog snapshot (to be able to
       8             :  *      properly decode the changes in the reorderbuffer).
       9             :  *
      10             :  * NOTE:
      11             :  *      This basically tries to handle all low level xlog stuff for
      12             :  *      reorderbuffer.c and snapbuild.c. There's some minor leakage where a
      13             :  *      specific record's struct is used to pass data along, but those just
      14             :  *      happen to contain the right amount of data in a convenient
      15             :  *      format. There isn't and shouldn't be much intelligence about the
      16             :  *      contents of records in here except turning them into a more usable
      17             :  *      format.
      18             :  *
      19             :  * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
      20             :  * Portions Copyright (c) 1994, Regents of the University of California
      21             :  *
      22             :  * IDENTIFICATION
      23             :  *    src/backend/replication/logical/decode.c
      24             :  *
      25             :  * -------------------------------------------------------------------------
      26             :  */
      27             : #include "postgres.h"
      28             : 
      29             : #include "access/heapam.h"
      30             : #include "access/heapam_xlog.h"
      31             : #include "access/transam.h"
      32             : #include "access/xact.h"
      33             : #include "access/xlog_internal.h"
      34             : #include "access/xlogreader.h"
      35             : #include "access/xlogrecord.h"
      36             : #include "access/xlogutils.h"
      37             : #include "catalog/pg_control.h"
      38             : #include "replication/decode.h"
      39             : #include "replication/logical.h"
      40             : #include "replication/message.h"
      41             : #include "replication/origin.h"
      42             : #include "replication/reorderbuffer.h"
      43             : #include "replication/snapbuild.h"
      44             : #include "storage/standby.h"
      45             : 
      46             : typedef struct XLogRecordBuffer
      47             : {
      48             :     XLogRecPtr  origptr;
      49             :     XLogRecPtr  endptr;
      50             :     XLogReaderState *record;
      51             : } XLogRecordBuffer;
      52             : 
      53             : /* RMGR Handlers */
      54             : static void DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      55             : static void DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      56             : static void DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      57             : static void DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      58             : static void DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      59             : static void DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      60             : 
      61             : /* individual record(group)'s handlers */
      62             : static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      63             : static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      64             : static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      65             : static void DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      66             : static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      67             : static void DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      68             : 
      69             : static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
      70             :                          xl_xact_parsed_commit *parsed, TransactionId xid,
      71             :                          bool two_phase);
      72             : static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
      73             :                         xl_xact_parsed_abort *parsed, TransactionId xid,
      74             :                         bool two_phase);
      75             : static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
      76             :                           xl_xact_parsed_prepare *parsed);
      77             : 
      78             : 
      79             : /* common function to decode tuples */
      80             : static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
      81             : 
      82             : /* helper functions for decoding transactions */
      83             : static inline bool FilterPrepare(LogicalDecodingContext *ctx,
      84             :                                  TransactionId xid, const char *gid);
      85             : static bool DecodeTXNNeedSkip(LogicalDecodingContext *ctx,
      86             :                               XLogRecordBuffer *buf, Oid dbId,
      87             :                               RepOriginId origin_id);
      88             : 
      89             : /*
      90             :  * Take every XLogReadRecord()ed record and perform the actions required to
      91             :  * decode it using the output plugin already setup in the logical decoding
      92             :  * context.
      93             :  *
      94             :  * NB: Note that every record's xid needs to be processed by reorderbuffer
      95             :  * (xids contained in the content of records are not relevant for this rule).
      96             :  * That means that for records which'd otherwise not go through the
      97             :  * reorderbuffer ReorderBufferProcessXid() has to be called. We don't want to
      98             :  * call ReorderBufferProcessXid for each record type by default, because
      99             :  * e.g. empty xacts can be handled more efficiently if there's no previous
     100             :  * state for them.
     101             :  *
     102             :  * We also support the ability to fast forward thru records, skipping some
     103             :  * record types completely - see individual record types for details.
     104             :  */
     105             : void
     106     4058202 : LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
     107             : {
     108             :     XLogRecordBuffer buf;
     109             :     TransactionId txid;
     110             : 
     111     4058202 :     buf.origptr = ctx->reader->ReadRecPtr;
     112     4058202 :     buf.endptr = ctx->reader->EndRecPtr;
     113     4058202 :     buf.record = record;
     114             : 
     115     4058202 :     txid = XLogRecGetTopXid(record);
     116             : 
     117             :     /*
     118             :      * If the top-level xid is valid, we need to assign the subxact to the
     119             :      * top-level xact. We need to do this for all records, hence we do it
     120             :      * before the switch.
     121             :      */
     122     4058202 :     if (TransactionIdIsValid(txid))
     123             :     {
     124        1280 :         ReorderBufferAssignChild(ctx->reorder,
     125             :                                  txid,
     126        1280 :                                  record->decoded_record->xl_xid,
     127             :                                  buf.origptr);
     128             :     }
     129             : 
     130             :     /* cast so we get a warning when new rmgrs are added */
     131     4058202 :     switch ((RmgrId) XLogRecGetRmid(record))
     132             :     {
     133             :             /*
     134             :              * Rmgrs we care about for logical decoding. Add new rmgrs in
     135             :              * rmgrlist.h's order.
     136             :              */
     137        3188 :         case RM_XLOG_ID:
     138        3188 :             DecodeXLogOp(ctx, &buf);
     139        3188 :             break;
     140             : 
     141       10948 :         case RM_XACT_ID:
     142       10948 :             DecodeXactOp(ctx, &buf);
     143       10944 :             break;
     144             : 
     145        4510 :         case RM_STANDBY_ID:
     146        4510 :             DecodeStandbyOp(ctx, &buf);
     147        4510 :             break;
     148             : 
     149       45734 :         case RM_HEAP2_ID:
     150       45734 :             DecodeHeap2Op(ctx, &buf);
     151       45734 :             break;
     152             : 
     153     3104162 :         case RM_HEAP_ID:
     154     3104162 :             DecodeHeapOp(ctx, &buf);
     155     3104160 :             break;
     156             : 
     157         104 :         case RM_LOGICALMSG_ID:
     158         104 :             DecodeLogicalMsgOp(ctx, &buf);
     159         104 :             break;
     160             : 
     161             :             /*
     162             :              * Rmgrs irrelevant for logical decoding; they describe stuff not
     163             :              * represented in logical decoding. Add new rmgrs in rmgrlist.h's
     164             :              * order.
     165             :              */
     166      889556 :         case RM_SMGR_ID:
     167             :         case RM_CLOG_ID:
     168             :         case RM_DBASE_ID:
     169             :         case RM_TBLSPC_ID:
     170             :         case RM_MULTIXACT_ID:
     171             :         case RM_RELMAP_ID:
     172             :         case RM_BTREE_ID:
     173             :         case RM_HASH_ID:
     174             :         case RM_GIN_ID:
     175             :         case RM_GIST_ID:
     176             :         case RM_SEQ_ID:
     177             :         case RM_SPGIST_ID:
     178             :         case RM_BRIN_ID:
     179             :         case RM_COMMIT_TS_ID:
     180             :         case RM_REPLORIGIN_ID:
     181             :         case RM_GENERIC_ID:
     182             :             /* just deal with xid, and done */
     183      889556 :             ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(record),
     184             :                                     buf.origptr);
     185      889556 :             break;
     186           0 :         case RM_NEXT_ID:
     187           0 :             elog(ERROR, "unexpected RM_NEXT_ID rmgr_id: %u", (RmgrIds) XLogRecGetRmid(buf.record));
     188             :     }
     189     4058196 : }
     190             : 
     191             : /*
     192             :  * Handle rmgr XLOG_ID records for DecodeRecordIntoReorderBuffer().
     193             :  */
     194             : static void
     195        3188 : DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     196             : {
     197        3188 :     SnapBuild  *builder = ctx->snapshot_builder;
     198        3188 :     uint8       info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK;
     199             : 
     200        3188 :     ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(buf->record),
     201             :                             buf->origptr);
     202             : 
     203        3188 :     switch (info)
     204             :     {
     205             :             /* this is also used in END_OF_RECOVERY checkpoints */
     206          30 :         case XLOG_CHECKPOINT_SHUTDOWN:
     207             :         case XLOG_END_OF_RECOVERY:
     208          30 :             SnapBuildSerializationPoint(builder, buf->origptr);
     209             : 
     210          30 :             break;
     211          58 :         case XLOG_CHECKPOINT_ONLINE:
     212             : 
     213             :             /*
     214             :              * a RUNNING_XACTS record will have been logged near to this, we
     215             :              * can restart from there.
     216             :              */
     217          58 :             break;
     218        3100 :         case XLOG_NOOP:
     219             :         case XLOG_NEXTOID:
     220             :         case XLOG_SWITCH:
     221             :         case XLOG_BACKUP_END:
     222             :         case XLOG_PARAMETER_CHANGE:
     223             :         case XLOG_RESTORE_POINT:
     224             :         case XLOG_FPW_CHANGE:
     225             :         case XLOG_FPI_FOR_HINT:
     226             :         case XLOG_FPI:
     227             :         case XLOG_OVERWRITE_CONTRECORD:
     228        3100 :             break;
     229           0 :         default:
     230           0 :             elog(ERROR, "unexpected RM_XLOG_ID record type: %u", info);
     231             :     }
     232        3188 : }
     233             : 
     234             : /*
     235             :  * Handle rmgr XACT_ID records for DecodeRecordIntoReorderBuffer().
     236             :  */
     237             : static void
     238       10948 : DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     239             : {
     240       10948 :     SnapBuild  *builder = ctx->snapshot_builder;
     241       10948 :     ReorderBuffer *reorder = ctx->reorder;
     242       10948 :     XLogReaderState *r = buf->record;
     243       10948 :     uint8       info = XLogRecGetInfo(r) & XLOG_XACT_OPMASK;
     244             : 
     245             :     /*
     246             :      * If the snapshot isn't yet fully built, we cannot decode anything, so
     247             :      * bail out.
     248             :      */
     249       10948 :     if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
     250          28 :         return;
     251             : 
     252       10920 :     switch (info)
     253             :     {
     254        3570 :         case XLOG_XACT_COMMIT:
     255             :         case XLOG_XACT_COMMIT_PREPARED:
     256             :             {
     257             :                 xl_xact_commit *xlrec;
     258             :                 xl_xact_parsed_commit parsed;
     259             :                 TransactionId xid;
     260        3570 :                 bool        two_phase = false;
     261             : 
     262        3570 :                 xlrec = (xl_xact_commit *) XLogRecGetData(r);
     263        3570 :                 ParseCommitRecord(XLogRecGetInfo(buf->record), xlrec, &parsed);
     264             : 
     265        3570 :                 if (!TransactionIdIsValid(parsed.twophase_xid))
     266        3430 :                     xid = XLogRecGetXid(r);
     267             :                 else
     268         140 :                     xid = parsed.twophase_xid;
     269             : 
     270             :                 /*
     271             :                  * We would like to process the transaction in a two-phase
     272             :                  * manner iff output plugin supports two-phase commits and
     273             :                  * doesn't filter the transaction at prepare time.
     274             :                  */
     275        3570 :                 if (info == XLOG_XACT_COMMIT_PREPARED)
     276         140 :                     two_phase = !(FilterPrepare(ctx, xid,
     277         140 :                                                 parsed.twophase_gid));
     278             : 
     279        3570 :                 DecodeCommit(ctx, buf, &parsed, xid, two_phase);
     280        3566 :                 break;
     281             :             }
     282         156 :         case XLOG_XACT_ABORT:
     283             :         case XLOG_XACT_ABORT_PREPARED:
     284             :             {
     285             :                 xl_xact_abort *xlrec;
     286             :                 xl_xact_parsed_abort parsed;
     287             :                 TransactionId xid;
     288         156 :                 bool        two_phase = false;
     289             : 
     290         156 :                 xlrec = (xl_xact_abort *) XLogRecGetData(r);
     291         156 :                 ParseAbortRecord(XLogRecGetInfo(buf->record), xlrec, &parsed);
     292             : 
     293         156 :                 if (!TransactionIdIsValid(parsed.twophase_xid))
     294         100 :                     xid = XLogRecGetXid(r);
     295             :                 else
     296          56 :                     xid = parsed.twophase_xid;
     297             : 
     298             :                 /*
     299             :                  * We would like to process the transaction in a two-phase
     300             :                  * manner iff output plugin supports two-phase commits and
     301             :                  * doesn't filter the transaction at prepare time.
     302             :                  */
     303         156 :                 if (info == XLOG_XACT_ABORT_PREPARED)
     304          56 :                     two_phase = !(FilterPrepare(ctx, xid,
     305          56 :                                                 parsed.twophase_gid));
     306             : 
     307         156 :                 DecodeAbort(ctx, buf, &parsed, xid, two_phase);
     308         156 :                 break;
     309             :             }
     310         252 :         case XLOG_XACT_ASSIGNMENT:
     311             : 
     312             :             /*
     313             :              * We assign subxact to the toplevel xact while processing each
     314             :              * record if required.  So, we don't need to do anything here. See
     315             :              * LogicalDecodingProcessRecord.
     316             :              */
     317         252 :             break;
     318        6718 :         case XLOG_XACT_INVALIDATIONS:
     319             :             {
     320             :                 TransactionId xid;
     321             :                 xl_xact_invals *invals;
     322             : 
     323        6718 :                 xid = XLogRecGetXid(r);
     324        6718 :                 invals = (xl_xact_invals *) XLogRecGetData(r);
     325             : 
     326             :                 /*
     327             :                  * Execute the invalidations for xid-less transactions,
     328             :                  * otherwise, accumulate them so that they can be processed at
     329             :                  * the commit time.
     330             :                  */
     331        6718 :                 if (TransactionIdIsValid(xid))
     332             :                 {
     333        6718 :                     if (!ctx->fast_forward)
     334        6716 :                         ReorderBufferAddInvalidations(reorder, xid,
     335             :                                                       buf->origptr,
     336        6716 :                                                       invals->nmsgs,
     337        6716 :                                                       invals->msgs);
     338        6718 :                     ReorderBufferXidSetCatalogChanges(ctx->reorder, xid,
     339             :                                                       buf->origptr);
     340             :                 }
     341           0 :                 else if ((!ctx->fast_forward))
     342           0 :                     ReorderBufferImmediateInvalidation(ctx->reorder,
     343           0 :                                                        invals->nmsgs,
     344           0 :                                                        invals->msgs);
     345             :             }
     346        6718 :             break;
     347         224 :         case XLOG_XACT_PREPARE:
     348             :             {
     349             :                 xl_xact_parsed_prepare parsed;
     350             :                 xl_xact_prepare *xlrec;
     351             : 
     352             :                 /* ok, parse it */
     353         224 :                 xlrec = (xl_xact_prepare *) XLogRecGetData(r);
     354         224 :                 ParsePrepareRecord(XLogRecGetInfo(buf->record),
     355             :                                    xlrec, &parsed);
     356             : 
     357             :                 /*
     358             :                  * We would like to process the transaction in a two-phase
     359             :                  * manner iff output plugin supports two-phase commits and
     360             :                  * doesn't filter the transaction at prepare time.
     361             :                  */
     362         224 :                 if (FilterPrepare(ctx, parsed.twophase_xid,
     363             :                                   parsed.twophase_gid))
     364             :                 {
     365          16 :                     ReorderBufferProcessXid(reorder, parsed.twophase_xid,
     366             :                                             buf->origptr);
     367          16 :                     break;
     368             :                 }
     369             : 
     370             :                 /*
     371             :                  * Note that if the prepared transaction has locked [user]
     372             :                  * catalog tables exclusively then decoding prepare can block
     373             :                  * till the main transaction is committed because it needs to
     374             :                  * lock the catalog tables.
     375             :                  *
     376             :                  * XXX Now, this can even lead to a deadlock if the prepare
     377             :                  * transaction is waiting to get it logically replicated for
     378             :                  * distributed 2PC. This can be avoided by disallowing
     379             :                  * preparing transactions that have locked [user] catalog
     380             :                  * tables exclusively but as of now, we ask users not to do
     381             :                  * such an operation.
     382             :                  */
     383         208 :                 DecodePrepare(ctx, buf, &parsed);
     384         208 :                 break;
     385             :             }
     386           0 :         default:
     387           0 :             elog(ERROR, "unexpected RM_XACT_ID record type: %u", info);
     388             :     }
     389             : }
     390             : 
     391             : /*
     392             :  * Handle rmgr STANDBY_ID records for DecodeRecordIntoReorderBuffer().
     393             :  */
     394             : static void
     395        4510 : DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     396             : {
     397        4510 :     SnapBuild  *builder = ctx->snapshot_builder;
     398        4510 :     XLogReaderState *r = buf->record;
     399        4510 :     uint8       info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
     400             : 
     401        4510 :     ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
     402             : 
     403        4510 :     switch (info)
     404             :     {
     405        1336 :         case XLOG_RUNNING_XACTS:
     406             :             {
     407        1336 :                 xl_running_xacts *running = (xl_running_xacts *) XLogRecGetData(r);
     408             : 
     409        1336 :                 SnapBuildProcessRunningXacts(builder, buf->origptr, running);
     410             : 
     411             :                 /*
     412             :                  * Abort all transactions that we keep track of, that are
     413             :                  * older than the record's oldestRunningXid. This is the most
     414             :                  * convenient spot for doing so since, in contrast to shutdown
     415             :                  * or end-of-recovery checkpoints, we have information about
     416             :                  * all running transactions which includes prepared ones,
     417             :                  * while shutdown checkpoints just know that no non-prepared
     418             :                  * transactions are in progress.
     419             :                  */
     420        1336 :                 ReorderBufferAbortOld(ctx->reorder, running->oldestRunningXid);
     421             :             }
     422        1336 :             break;
     423        3174 :         case XLOG_STANDBY_LOCK:
     424        3174 :             break;
     425           0 :         case XLOG_INVALIDATIONS:
     426             : 
     427             :             /*
     428             :              * We are processing the invalidations at the command level via
     429             :              * XLOG_XACT_INVALIDATIONS.  So we don't need to do anything here.
     430             :              */
     431           0 :             break;
     432           0 :         default:
     433           0 :             elog(ERROR, "unexpected RM_STANDBY_ID record type: %u", info);
     434             :     }
     435        4510 : }
     436             : 
     437             : /*
     438             :  * Handle rmgr HEAP2_ID records for DecodeRecordIntoReorderBuffer().
     439             :  */
     440             : static void
     441       45734 : DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     442             : {
     443       45734 :     uint8       info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK;
     444       45734 :     TransactionId xid = XLogRecGetXid(buf->record);
     445       45734 :     SnapBuild  *builder = ctx->snapshot_builder;
     446             : 
     447       45734 :     ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
     448             : 
     449             :     /*
     450             :      * If we don't have snapshot or we are just fast-forwarding, there is no
     451             :      * point in decoding changes.
     452             :      */
     453       45734 :     if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
     454       45706 :         ctx->fast_forward)
     455          64 :         return;
     456             : 
     457       45670 :     switch (info)
     458             :     {
     459        8608 :         case XLOG_HEAP2_MULTI_INSERT:
     460       17216 :             if (!ctx->fast_forward &&
     461        8608 :                 SnapBuildProcessChange(builder, xid, buf->origptr))
     462        8608 :                 DecodeMultiInsert(ctx, buf);
     463        8608 :             break;
     464       34822 :         case XLOG_HEAP2_NEW_CID:
     465             :             {
     466             :                 xl_heap_new_cid *xlrec;
     467             : 
     468       34822 :                 xlrec = (xl_heap_new_cid *) XLogRecGetData(buf->record);
     469       34822 :                 SnapBuildProcessNewCid(builder, xid, buf->origptr, xlrec);
     470             : 
     471       34822 :                 break;
     472             :             }
     473         178 :         case XLOG_HEAP2_REWRITE:
     474             : 
     475             :             /*
     476             :              * Although these records only exist to serve the needs of logical
     477             :              * decoding, all the work happens as part of crash or archive
     478             :              * recovery, so we don't need to do anything here.
     479             :              */
     480         178 :             break;
     481             : 
     482             :             /*
     483             :              * Everything else here is just low level physical stuff we're not
     484             :              * interested in.
     485             :              */
     486        2062 :         case XLOG_HEAP2_FREEZE_PAGE:
     487             :         case XLOG_HEAP2_PRUNE:
     488             :         case XLOG_HEAP2_VACUUM:
     489             :         case XLOG_HEAP2_VISIBLE:
     490             :         case XLOG_HEAP2_LOCK_UPDATED:
     491        2062 :             break;
     492           0 :         default:
     493           0 :             elog(ERROR, "unexpected RM_HEAP2_ID record type: %u", info);
     494             :     }
     495             : }
     496             : 
     497             : /*
     498             :  * Handle rmgr HEAP_ID records for DecodeRecordIntoReorderBuffer().
     499             :  */
     500             : static void
     501     3104162 : DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     502             : {
     503     3104162 :     uint8       info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK;
     504     3104162 :     TransactionId xid = XLogRecGetXid(buf->record);
     505     3104162 :     SnapBuild  *builder = ctx->snapshot_builder;
     506             : 
     507     3104162 :     ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
     508             : 
     509             :     /*
     510             :      * If we don't have snapshot or we are just fast-forwarding, there is no
     511             :      * point in decoding data changes.
     512             :      */
     513     3104162 :     if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
     514     3104154 :         ctx->fast_forward)
     515          48 :         return;
     516             : 
     517     3104114 :     switch (info)
     518             :     {
     519     2144106 :         case XLOG_HEAP_INSERT:
     520     2144106 :             if (SnapBuildProcessChange(builder, xid, buf->origptr))
     521     2144106 :                 DecodeInsert(ctx, buf);
     522     2144106 :             break;
     523             : 
     524             :             /*
     525             :              * Treat HOT update as normal updates. There is no useful
     526             :              * information in the fact that we could make it a HOT update
     527             :              * locally and the WAL layout is compatible.
     528             :              */
     529      266892 :         case XLOG_HEAP_HOT_UPDATE:
     530             :         case XLOG_HEAP_UPDATE:
     531      266892 :             if (SnapBuildProcessChange(builder, xid, buf->origptr))
     532      266892 :                 DecodeUpdate(ctx, buf);
     533      266892 :             break;
     534             : 
     535      340868 :         case XLOG_HEAP_DELETE:
     536      340868 :             if (SnapBuildProcessChange(builder, xid, buf->origptr))
     537      340868 :                 DecodeDelete(ctx, buf);
     538      340866 :             break;
     539             : 
     540          52 :         case XLOG_HEAP_TRUNCATE:
     541          52 :             if (SnapBuildProcessChange(builder, xid, buf->origptr))
     542          52 :                 DecodeTruncate(ctx, buf);
     543          52 :             break;
     544             : 
     545        1644 :         case XLOG_HEAP_INPLACE:
     546             : 
     547             :             /*
     548             :              * Inplace updates are only ever performed on catalog tuples and
     549             :              * can, per definition, not change tuple visibility.  Since we
     550             :              * don't decode catalog tuples, we're not interested in the
     551             :              * record's contents.
     552             :              *
     553             :              * In-place updates can be used either by XID-bearing transactions
     554             :              * (e.g.  in CREATE INDEX CONCURRENTLY) or by XID-less
     555             :              * transactions (e.g.  VACUUM).  In the former case, the commit
     556             :              * record will include cache invalidations, so we mark the
     557             :              * transaction as catalog modifying here. Currently that's
     558             :              * redundant because the commit will do that as well, but once we
     559             :              * support decoding in-progress relations, this will be important.
     560             :              */
     561        1644 :             if (!TransactionIdIsValid(xid))
     562           6 :                 break;
     563             : 
     564        1638 :             SnapBuildProcessChange(builder, xid, buf->origptr);
     565        1638 :             ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr);
     566        1638 :             break;
     567             : 
     568       35800 :         case XLOG_HEAP_CONFIRM:
     569       35800 :             if (SnapBuildProcessChange(builder, xid, buf->origptr))
     570       35800 :                 DecodeSpecConfirm(ctx, buf);
     571       35800 :             break;
     572             : 
     573      314752 :         case XLOG_HEAP_LOCK:
     574             :             /* we don't care about row level locks for now */
     575      314752 :             break;
     576             : 
     577           0 :         default:
     578           0 :             elog(ERROR, "unexpected RM_HEAP_ID record type: %u", info);
     579             :             break;
     580             :     }
     581             : }
     582             : 
     583             : /*
     584             :  * Ask output plugin whether we want to skip this PREPARE and send
     585             :  * this transaction as a regular commit later.
     586             :  */
     587             : static inline bool
     588         420 : FilterPrepare(LogicalDecodingContext *ctx, TransactionId xid,
     589             :               const char *gid)
     590             : {
     591             :     /*
     592             :      * Skip if decoding of two-phase transactions at PREPARE time is not
     593             :      * enabled. In that case, all two-phase transactions are considered
     594             :      * filtered out and will be applied as regular transactions at COMMIT
     595             :      * PREPARED.
     596             :      */
     597         420 :     if (!ctx->twophase)
     598          12 :         return true;
     599             : 
     600             :     /*
     601             :      * The filter_prepare callback is optional. When not supplied, all
     602             :      * prepared transactions should go through.
     603             :      */
     604         408 :     if (ctx->callbacks.filter_prepare_cb == NULL)
     605         174 :         return false;
     606             : 
     607         234 :     return filter_prepare_cb_wrapper(ctx, xid, gid);
     608             : }
     609             : 
     610             : static inline bool
     611     2784148 : FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
     612             : {
     613     2784148 :     if (ctx->callbacks.filter_by_origin_cb == NULL)
     614           0 :         return false;
     615             : 
     616     2784148 :     return filter_by_origin_cb_wrapper(ctx, origin_id);
     617             : }
     618             : 
     619             : /*
     620             :  * Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer().
     621             :  */
     622             : static void
     623         104 : DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     624             : {
     625         104 :     SnapBuild  *builder = ctx->snapshot_builder;
     626         104 :     XLogReaderState *r = buf->record;
     627         104 :     TransactionId xid = XLogRecGetXid(r);
     628         104 :     uint8       info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
     629         104 :     RepOriginId origin_id = XLogRecGetOrigin(r);
     630             :     Snapshot    snapshot;
     631             :     xl_logical_message *message;
     632             : 
     633         104 :     if (info != XLOG_LOGICAL_MESSAGE)
     634           0 :         elog(ERROR, "unexpected RM_LOGICALMSG_ID record type: %u", info);
     635             : 
     636         104 :     ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
     637             : 
     638             :     /*
     639             :      * If we don't have snapshot or we are just fast-forwarding, there is no
     640             :      * point in decoding messages.
     641             :      */
     642         104 :     if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
     643         104 :         ctx->fast_forward)
     644           0 :         return;
     645             : 
     646         104 :     message = (xl_logical_message *) XLogRecGetData(r);
     647             : 
     648         204 :     if (message->dbId != ctx->slot->data.database ||
     649         100 :         FilterByOrigin(ctx, origin_id))
     650           8 :         return;
     651             : 
     652          96 :     if (message->transactional &&
     653          76 :         !SnapBuildProcessChange(builder, xid, buf->origptr))
     654           0 :         return;
     655         116 :     else if (!message->transactional &&
     656          40 :              (SnapBuildCurrentState(builder) != SNAPBUILD_CONSISTENT ||
     657          20 :               SnapBuildXactNeedsSkip(builder, buf->origptr)))
     658           8 :         return;
     659             : 
     660          88 :     snapshot = SnapBuildGetOrBuildSnapshot(builder, xid);
     661          88 :     ReorderBufferQueueMessage(ctx->reorder, xid, snapshot, buf->endptr,
     662          88 :                               message->transactional,
     663          88 :                               message->message, /* first part of message is
     664             :                                                  * prefix */
     665             :                               message->message_size,
     666          88 :                               message->message + message->prefix_size);
     667             : }
     668             : 
     669             : /*
     670             :  * Consolidated commit record handling between the different form of commit
     671             :  * records.
     672             :  *
     673             :  * 'two_phase' indicates that caller wants to process the transaction in two
     674             :  * phases, first process prepare if not already done and then process
     675             :  * commit_prepared.
     676             :  */
     677             : static void
     678        3570 : DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
     679             :              xl_xact_parsed_commit *parsed, TransactionId xid,
     680             :              bool two_phase)
     681             : {
     682        3570 :     XLogRecPtr  origin_lsn = InvalidXLogRecPtr;
     683        3570 :     TimestampTz commit_time = parsed->xact_time;
     684        3570 :     RepOriginId origin_id = XLogRecGetOrigin(buf->record);
     685             :     int         i;
     686             : 
     687        3570 :     if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
     688             :     {
     689          38 :         origin_lsn = parsed->origin_lsn;
     690          38 :         commit_time = parsed->origin_timestamp;
     691             :     }
     692             : 
     693        3570 :     SnapBuildCommitTxn(ctx->snapshot_builder, buf->origptr, xid,
     694             :                        parsed->nsubxacts, parsed->subxacts);
     695             : 
     696             :     /* ----
     697             :      * Check whether we are interested in this specific transaction, and tell
     698             :      * the reorderbuffer to forget the content of the (sub-)transactions
     699             :      * if not.
     700             :      *
     701             :      * We can't just use ReorderBufferAbort() here, because we need to execute
     702             :      * the transaction's invalidations.  This currently won't be needed if
     703             :      * we're just skipping over the transaction because currently we only do
     704             :      * so during startup, to get to the first transaction the client needs. As
     705             :      * we have reset the catalog caches before starting to read WAL, and we
     706             :      * haven't yet touched any catalogs, there can't be anything to invalidate.
     707             :      * But if we're "forgetting" this commit because it happened in another
     708             :      * database, the invalidations might be important, because they could be
     709             :      * for shared catalogs and we might have loaded data into the relevant
     710             :      * syscaches.
     711             :      * ---
     712             :      */
     713        3570 :     if (DecodeTXNNeedSkip(ctx, buf, parsed->dbId, origin_id))
     714             :     {
     715        4144 :         for (i = 0; i < parsed->nsubxacts; i++)
     716             :         {
     717        1946 :             ReorderBufferForget(ctx->reorder, parsed->subxacts[i], buf->origptr);
     718             :         }
     719        2198 :         ReorderBufferForget(ctx->reorder, xid, buf->origptr);
     720             : 
     721        2198 :         return;
     722             :     }
     723             : 
     724             :     /* tell the reorderbuffer about the surviving subtransactions */
     725        1886 :     for (i = 0; i < parsed->nsubxacts; i++)
     726             :     {
     727         514 :         ReorderBufferCommitChild(ctx->reorder, xid, parsed->subxacts[i],
     728             :                                  buf->origptr, buf->endptr);
     729             :     }
     730             : 
     731             :     /*
     732             :      * Send the final commit record if the transaction data is already
     733             :      * decoded, otherwise, process the entire transaction.
     734             :      */
     735        1372 :     if (two_phase)
     736             :     {
     737          48 :         ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
     738          48 :                                     SnapBuildGetTwoPhaseAt(ctx->snapshot_builder),
     739             :                                     commit_time, origin_id, origin_lsn,
     740          48 :                                     parsed->twophase_gid, true);
     741             :     }
     742             :     else
     743             :     {
     744        1324 :         ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
     745             :                             commit_time, origin_id, origin_lsn);
     746             :     }
     747             : 
     748             :     /*
     749             :      * Update the decoding stats at transaction prepare/commit/abort.
     750             :      * Additionally we send the stats when we spill or stream the changes to
     751             :      * avoid losing them in case the decoding is interrupted. It is not clear
     752             :      * that sending more or less frequently than this would be better.
     753             :      */
     754        1368 :     UpdateDecodingStats(ctx);
     755             : }
     756             : 
     757             : /*
     758             :  * Decode PREPARE record. Similar logic as in DecodeCommit.
     759             :  *
     760             :  * Note that we don't skip prepare even if have detected concurrent abort
     761             :  * because it is quite possible that we had already sent some changes before we
     762             :  * detect abort in which case we need to abort those changes in the subscriber.
     763             :  * To abort such changes, we do send the prepare and then the rollback prepared
     764             :  * which is what happened on the publisher-side as well. Now, we can invent a
     765             :  * new abort API wherein in such cases we send abort and skip sending prepared
     766             :  * and rollback prepared but then it is not that straightforward because we
     767             :  * might have streamed this transaction by that time in which case it is
     768             :  * handled when the rollback is encountered. It is not impossible to optimize
     769             :  * the concurrent abort case but it can introduce design complexity w.r.t
     770             :  * handling different cases so leaving it for now as it doesn't seem worth it.
     771             :  */
     772             : static void
     773         208 : DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
     774             :               xl_xact_parsed_prepare *parsed)
     775             : {
     776         208 :     SnapBuild  *builder = ctx->snapshot_builder;
     777         208 :     XLogRecPtr  origin_lsn = parsed->origin_lsn;
     778         208 :     TimestampTz prepare_time = parsed->xact_time;
     779         208 :     XLogRecPtr  origin_id = XLogRecGetOrigin(buf->record);
     780             :     int         i;
     781         208 :     TransactionId xid = parsed->twophase_xid;
     782             : 
     783         208 :     if (parsed->origin_timestamp != 0)
     784          16 :         prepare_time = parsed->origin_timestamp;
     785             : 
     786             :     /*
     787             :      * Remember the prepare info for a txn so that it can be used later in
     788             :      * commit prepared if required. See ReorderBufferFinishPrepared.
     789             :      */
     790         208 :     if (!ReorderBufferRememberPrepareInfo(ctx->reorder, xid, buf->origptr,
     791             :                                           buf->endptr, prepare_time, origin_id,
     792             :                                           origin_lsn))
     793           0 :         return;
     794             : 
     795             :     /* We can't start streaming unless a consistent state is reached. */
     796         208 :     if (SnapBuildCurrentState(builder) < SNAPBUILD_CONSISTENT)
     797             :     {
     798           6 :         ReorderBufferSkipPrepare(ctx->reorder, xid);
     799           6 :         return;
     800             :     }
     801             : 
     802             :     /*
     803             :      * Check whether we need to process this transaction. See
     804             :      * DecodeTXNNeedSkip for the reasons why we sometimes want to skip the
     805             :      * transaction.
     806             :      *
     807             :      * We can't call ReorderBufferForget as we did in DecodeCommit as the txn
     808             :      * hasn't yet been committed, removing this txn before a commit might
     809             :      * result in the computation of an incorrect restart_lsn. See
     810             :      * SnapBuildProcessRunningXacts. But we need to process cache
     811             :      * invalidations if there are any for the reasons mentioned in
     812             :      * DecodeCommit.
     813             :      */
     814         202 :     if (DecodeTXNNeedSkip(ctx, buf, parsed->dbId, origin_id))
     815             :     {
     816         140 :         ReorderBufferSkipPrepare(ctx->reorder, xid);
     817         140 :         ReorderBufferInvalidate(ctx->reorder, xid, buf->origptr);
     818         140 :         return;
     819             :     }
     820             : 
     821             :     /* Tell the reorderbuffer about the surviving subtransactions. */
     822          64 :     for (i = 0; i < parsed->nsubxacts; i++)
     823             :     {
     824           2 :         ReorderBufferCommitChild(ctx->reorder, xid, parsed->subxacts[i],
     825             :                                  buf->origptr, buf->endptr);
     826             :     }
     827             : 
     828             :     /* replay actions of all transaction + subtransactions in order */
     829          62 :     ReorderBufferPrepare(ctx->reorder, xid, parsed->twophase_gid);
     830             : 
     831             :     /*
     832             :      * Update the decoding stats at transaction prepare/commit/abort.
     833             :      * Additionally we send the stats when we spill or stream the changes to
     834             :      * avoid losing them in case the decoding is interrupted. It is not clear
     835             :      * that sending more or less frequently than this would be better.
     836             :      */
     837          62 :     UpdateDecodingStats(ctx);
     838             : }
     839             : 
     840             : 
     841             : /*
     842             :  * Get the data from the various forms of abort records and pass it on to
     843             :  * snapbuild.c and reorderbuffer.c.
     844             :  *
     845             :  * 'two_phase' indicates to finish prepared transaction.
     846             :  */
     847             : static void
     848         156 : DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
     849             :             xl_xact_parsed_abort *parsed, TransactionId xid,
     850             :             bool two_phase)
     851             : {
     852             :     int         i;
     853         156 :     XLogRecPtr  origin_lsn = InvalidXLogRecPtr;
     854         156 :     TimestampTz abort_time = parsed->xact_time;
     855         156 :     XLogRecPtr  origin_id = XLogRecGetOrigin(buf->record);
     856             :     bool        skip_xact;
     857             : 
     858         156 :     if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
     859             :     {
     860           4 :         origin_lsn = parsed->origin_lsn;
     861           4 :         abort_time = parsed->origin_timestamp;
     862             :     }
     863             : 
     864             :     /*
     865             :      * Check whether we need to process this transaction. See
     866             :      * DecodeTXNNeedSkip for the reasons why we sometimes want to skip the
     867             :      * transaction.
     868             :      */
     869         156 :     skip_xact = DecodeTXNNeedSkip(ctx, buf, parsed->dbId, origin_id);
     870             : 
     871             :     /*
     872             :      * Send the final rollback record for a prepared transaction unless we
     873             :      * need to skip it. For non-two-phase xacts, simply forget the xact.
     874             :      */
     875         156 :     if (two_phase && !skip_xact)
     876             :     {
     877          16 :         ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
     878             :                                     InvalidXLogRecPtr,
     879             :                                     abort_time, origin_id, origin_lsn,
     880          16 :                                     parsed->twophase_gid, false);
     881             :     }
     882             :     else
     883             :     {
     884         144 :         for (i = 0; i < parsed->nsubxacts; i++)
     885             :         {
     886           4 :             ReorderBufferAbort(ctx->reorder, parsed->subxacts[i],
     887           4 :                                buf->record->EndRecPtr);
     888             :         }
     889             : 
     890         140 :         ReorderBufferAbort(ctx->reorder, xid, buf->record->EndRecPtr);
     891             :     }
     892             : 
     893             :     /* update the decoding stats */
     894         156 :     UpdateDecodingStats(ctx);
     895         156 : }
     896             : 
     897             : /*
     898             :  * Parse XLOG_HEAP_INSERT (not MULTI_INSERT!) records into tuplebufs.
     899             :  *
     900             :  * Deletes can contain the new tuple.
     901             :  */
     902             : static void
     903     2144106 : DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     904             : {
     905             :     Size        datalen;
     906             :     char       *tupledata;
     907             :     Size        tuplelen;
     908     2144106 :     XLogReaderState *r = buf->record;
     909             :     xl_heap_insert *xlrec;
     910             :     ReorderBufferChange *change;
     911             :     RelFileNode target_node;
     912             : 
     913     2144106 :     xlrec = (xl_heap_insert *) XLogRecGetData(r);
     914             : 
     915             :     /*
     916             :      * Ignore insert records without new tuples (this does happen when
     917             :      * raw_heap_insert marks the TOAST record as HEAP_INSERT_NO_LOGICAL).
     918             :      */
     919     2144106 :     if (!(xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE))
     920        5178 :         return;
     921             : 
     922             :     /* only interested in our database */
     923     2138940 :     XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL);
     924     2138940 :     if (target_node.dbNode != ctx->slot->data.database)
     925           0 :         return;
     926             : 
     927             :     /* output plugin doesn't look for this origin, no need to queue */
     928     2138940 :     if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
     929          12 :         return;
     930             : 
     931     2138928 :     change = ReorderBufferGetChange(ctx->reorder);
     932     2138928 :     if (!(xlrec->flags & XLH_INSERT_IS_SPECULATIVE))
     933     2103128 :         change->action = REORDER_BUFFER_CHANGE_INSERT;
     934             :     else
     935       35800 :         change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT;
     936     2138928 :     change->origin_id = XLogRecGetOrigin(r);
     937             : 
     938     2138928 :     memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode));
     939             : 
     940     2138928 :     tupledata = XLogRecGetBlockData(r, 0, &datalen);
     941     2138928 :     tuplelen = datalen - SizeOfHeapHeader;
     942             : 
     943     2138928 :     change->data.tp.newtuple =
     944     2138928 :         ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
     945             : 
     946     2138928 :     DecodeXLogTuple(tupledata, datalen, change->data.tp.newtuple);
     947             : 
     948     2138928 :     change->data.tp.clear_toast_afterwards = true;
     949             : 
     950     2138928 :     ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
     951             :                              change,
     952     2138928 :                              xlrec->flags & XLH_INSERT_ON_TOAST_RELATION);
     953             : }
     954             : 
     955             : /*
     956             :  * Parse XLOG_HEAP_UPDATE and XLOG_HEAP_HOT_UPDATE, which have the same layout
     957             :  * in the record, from wal into proper tuplebufs.
     958             :  *
     959             :  * Updates can possibly contain a new tuple and the old primary key.
     960             :  */
     961             : static void
     962      266892 : DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     963             : {
     964      266892 :     XLogReaderState *r = buf->record;
     965             :     xl_heap_update *xlrec;
     966             :     ReorderBufferChange *change;
     967             :     char       *data;
     968             :     RelFileNode target_node;
     969             : 
     970      266892 :     xlrec = (xl_heap_update *) XLogRecGetData(r);
     971             : 
     972             :     /* only interested in our database */
     973      266892 :     XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL);
     974      266892 :     if (target_node.dbNode != ctx->slot->data.database)
     975          18 :         return;
     976             : 
     977             :     /* output plugin doesn't look for this origin, no need to queue */
     978      266874 :     if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
     979           0 :         return;
     980             : 
     981      266874 :     change = ReorderBufferGetChange(ctx->reorder);
     982      266874 :     change->action = REORDER_BUFFER_CHANGE_UPDATE;
     983      266874 :     change->origin_id = XLogRecGetOrigin(r);
     984      266874 :     memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode));
     985             : 
     986      266874 :     if (xlrec->flags & XLH_UPDATE_CONTAINS_NEW_TUPLE)
     987             :     {
     988             :         Size        datalen;
     989             :         Size        tuplelen;
     990             : 
     991      264180 :         data = XLogRecGetBlockData(r, 0, &datalen);
     992             : 
     993      264180 :         tuplelen = datalen - SizeOfHeapHeader;
     994             : 
     995      264180 :         change->data.tp.newtuple =
     996      264180 :             ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
     997             : 
     998      264180 :         DecodeXLogTuple(data, datalen, change->data.tp.newtuple);
     999             :     }
    1000             : 
    1001      266874 :     if (xlrec->flags & XLH_UPDATE_CONTAINS_OLD)
    1002             :     {
    1003             :         Size        datalen;
    1004             :         Size        tuplelen;
    1005             : 
    1006             :         /* caution, remaining data in record is not aligned */
    1007         506 :         data = XLogRecGetData(r) + SizeOfHeapUpdate;
    1008         506 :         datalen = XLogRecGetDataLen(r) - SizeOfHeapUpdate;
    1009         506 :         tuplelen = datalen - SizeOfHeapHeader;
    1010             : 
    1011         506 :         change->data.tp.oldtuple =
    1012         506 :             ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
    1013             : 
    1014         506 :         DecodeXLogTuple(data, datalen, change->data.tp.oldtuple);
    1015             :     }
    1016             : 
    1017      266874 :     change->data.tp.clear_toast_afterwards = true;
    1018             : 
    1019      266874 :     ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
    1020             :                              change, false);
    1021             : }
    1022             : 
    1023             : /*
    1024             :  * Parse XLOG_HEAP_DELETE from wal into proper tuplebufs.
    1025             :  *
    1026             :  * Deletes can possibly contain the old primary key.
    1027             :  */
    1028             : static void
    1029      340868 : DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
    1030             : {
    1031      340868 :     XLogReaderState *r = buf->record;
    1032             :     xl_heap_delete *xlrec;
    1033             :     ReorderBufferChange *change;
    1034             :     RelFileNode target_node;
    1035             : 
    1036      340868 :     xlrec = (xl_heap_delete *) XLogRecGetData(r);
    1037             : 
    1038             :     /* only interested in our database */
    1039      340868 :     XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL);
    1040      340868 :     if (target_node.dbNode != ctx->slot->data.database)
    1041          14 :         return;
    1042             : 
    1043             :     /* output plugin doesn't look for this origin, no need to queue */
    1044      340854 :     if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
    1045           0 :         return;
    1046             : 
    1047      340854 :     change = ReorderBufferGetChange(ctx->reorder);
    1048             : 
    1049      340854 :     if (xlrec->flags & XLH_DELETE_IS_SUPER)
    1050           0 :         change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT;
    1051             :     else
    1052      340854 :         change->action = REORDER_BUFFER_CHANGE_DELETE;
    1053             : 
    1054      340854 :     change->origin_id = XLogRecGetOrigin(r);
    1055             : 
    1056      340854 :     memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode));
    1057             : 
    1058             :     /* old primary key stored */
    1059      340854 :     if (xlrec->flags & XLH_DELETE_CONTAINS_OLD)
    1060             :     {
    1061      221318 :         Size        datalen = XLogRecGetDataLen(r) - SizeOfHeapDelete;
    1062      221318 :         Size        tuplelen = datalen - SizeOfHeapHeader;
    1063             : 
    1064             :         Assert(XLogRecGetDataLen(r) > (SizeOfHeapDelete + SizeOfHeapHeader));
    1065             : 
    1066      221318 :         change->data.tp.oldtuple =
    1067      221318 :             ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
    1068             : 
    1069      221318 :         DecodeXLogTuple((char *) xlrec + SizeOfHeapDelete,
    1070             :                         datalen, change->data.tp.oldtuple);
    1071             :     }
    1072             : 
    1073      340854 :     change->data.tp.clear_toast_afterwards = true;
    1074             : 
    1075      340854 :     ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
    1076             :                              change, false);
    1077             : }
    1078             : 
    1079             : /*
    1080             :  * Parse XLOG_HEAP_TRUNCATE from wal
    1081             :  */
    1082             : static void
    1083          52 : DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
    1084             : {
    1085          52 :     XLogReaderState *r = buf->record;
    1086             :     xl_heap_truncate *xlrec;
    1087             :     ReorderBufferChange *change;
    1088             : 
    1089          52 :     xlrec = (xl_heap_truncate *) XLogRecGetData(r);
    1090             : 
    1091             :     /* only interested in our database */
    1092          52 :     if (xlrec->dbId != ctx->slot->data.database)
    1093           0 :         return;
    1094             : 
    1095             :     /* output plugin doesn't look for this origin, no need to queue */
    1096          52 :     if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
    1097           0 :         return;
    1098             : 
    1099          52 :     change = ReorderBufferGetChange(ctx->reorder);
    1100          52 :     change->action = REORDER_BUFFER_CHANGE_TRUNCATE;
    1101          52 :     change->origin_id = XLogRecGetOrigin(r);
    1102          52 :     if (xlrec->flags & XLH_TRUNCATE_CASCADE)
    1103           2 :         change->data.truncate.cascade = true;
    1104          52 :     if (xlrec->flags & XLH_TRUNCATE_RESTART_SEQS)
    1105           4 :         change->data.truncate.restart_seqs = true;
    1106          52 :     change->data.truncate.nrelids = xlrec->nrelids;
    1107         104 :     change->data.truncate.relids = ReorderBufferGetRelids(ctx->reorder,
    1108          52 :                                                           xlrec->nrelids);
    1109          52 :     memcpy(change->data.truncate.relids, xlrec->relids,
    1110          52 :            xlrec->nrelids * sizeof(Oid));
    1111          52 :     ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r),
    1112             :                              buf->origptr, change, false);
    1113             : }
    1114             : 
    1115             : /*
    1116             :  * Decode XLOG_HEAP2_MULTI_INSERT_insert record into multiple tuplebufs.
    1117             :  *
    1118             :  * Currently MULTI_INSERT will always contain the full tuples.
    1119             :  */
    1120             : static void
    1121        8608 : DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
    1122             : {
    1123        8608 :     XLogReaderState *r = buf->record;
    1124             :     xl_heap_multi_insert *xlrec;
    1125             :     int         i;
    1126             :     char       *data;
    1127             :     char       *tupledata;
    1128             :     Size        tuplelen;
    1129             :     RelFileNode rnode;
    1130             : 
    1131        8608 :     xlrec = (xl_heap_multi_insert *) XLogRecGetData(r);
    1132             : 
    1133             :     /*
    1134             :      * Ignore insert records without new tuples.  This happens when a
    1135             :      * multi_insert is done on a catalog or on a non-persistent relation.
    1136             :      */
    1137        8608 :     if (!(xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE))
    1138        8582 :         return;
    1139             : 
    1140             :     /* only interested in our database */
    1141         122 :     XLogRecGetBlockTag(r, 0, &rnode, NULL, NULL);
    1142         122 :     if (rnode.dbNode != ctx->slot->data.database)
    1143          96 :         return;
    1144             : 
    1145             :     /* output plugin doesn't look for this origin, no need to queue */
    1146          26 :     if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
    1147           0 :         return;
    1148             : 
    1149             :     /*
    1150             :      * We know that this multi_insert isn't for a catalog, so the block should
    1151             :      * always have data even if a full-page write of it is taken.
    1152             :      */
    1153          26 :     tupledata = XLogRecGetBlockData(r, 0, &tuplelen);
    1154             :     Assert(tupledata != NULL);
    1155             : 
    1156          26 :     data = tupledata;
    1157        2100 :     for (i = 0; i < xlrec->ntuples; i++)
    1158             :     {
    1159             :         ReorderBufferChange *change;
    1160             :         xl_multi_insert_tuple *xlhdr;
    1161             :         int         datalen;
    1162             :         ReorderBufferTupleBuf *tuple;
    1163             :         HeapTupleHeader header;
    1164             : 
    1165        2074 :         change = ReorderBufferGetChange(ctx->reorder);
    1166        2074 :         change->action = REORDER_BUFFER_CHANGE_INSERT;
    1167        2074 :         change->origin_id = XLogRecGetOrigin(r);
    1168             : 
    1169        2074 :         memcpy(&change->data.tp.relnode, &rnode, sizeof(RelFileNode));
    1170             : 
    1171        2074 :         xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(data);
    1172        2074 :         data = ((char *) xlhdr) + SizeOfMultiInsertTuple;
    1173        2074 :         datalen = xlhdr->datalen;
    1174             : 
    1175        2074 :         change->data.tp.newtuple =
    1176        2074 :             ReorderBufferGetTupleBuf(ctx->reorder, datalen);
    1177             : 
    1178        2074 :         tuple = change->data.tp.newtuple;
    1179        2074 :         header = tuple->tuple.t_data;
    1180             : 
    1181             :         /* not a disk based tuple */
    1182        2074 :         ItemPointerSetInvalid(&tuple->tuple.t_self);
    1183             : 
    1184             :         /*
    1185             :          * We can only figure this out after reassembling the transactions.
    1186             :          */
    1187        2074 :         tuple->tuple.t_tableOid = InvalidOid;
    1188             : 
    1189        2074 :         tuple->tuple.t_len = datalen + SizeofHeapTupleHeader;
    1190             : 
    1191        2074 :         memset(header, 0, SizeofHeapTupleHeader);
    1192             : 
    1193        2074 :         memcpy((char *) tuple->tuple.t_data + SizeofHeapTupleHeader,
    1194             :                (char *) data,
    1195             :                datalen);
    1196        2074 :         header->t_infomask = xlhdr->t_infomask;
    1197        2074 :         header->t_infomask2 = xlhdr->t_infomask2;
    1198        2074 :         header->t_hoff = xlhdr->t_hoff;
    1199             : 
    1200             :         /*
    1201             :          * Reset toast reassembly state only after the last row in the last
    1202             :          * xl_multi_insert_tuple record emitted by one heap_multi_insert()
    1203             :          * call.
    1204             :          */
    1205        2074 :         if (xlrec->flags & XLH_INSERT_LAST_IN_MULTI &&
    1206         354 :             (i + 1) == xlrec->ntuples)
    1207          16 :             change->data.tp.clear_toast_afterwards = true;
    1208             :         else
    1209        2058 :             change->data.tp.clear_toast_afterwards = false;
    1210             : 
    1211        2074 :         ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r),
    1212             :                                  buf->origptr, change, false);
    1213             : 
    1214             :         /* move to the next xl_multi_insert_tuple entry */
    1215        2074 :         data += datalen;
    1216             :     }
    1217             :     Assert(data == tupledata + tuplelen);
    1218             : }
    1219             : 
    1220             : /*
    1221             :  * Parse XLOG_HEAP_CONFIRM from wal into a confirmation change.
    1222             :  *
    1223             :  * This is pretty trivial, all the state essentially already setup by the
    1224             :  * speculative insertion.
    1225             :  */
    1226             : static void
    1227       35800 : DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
    1228             : {
    1229       35800 :     XLogReaderState *r = buf->record;
    1230             :     ReorderBufferChange *change;
    1231             :     RelFileNode target_node;
    1232             : 
    1233             :     /* only interested in our database */
    1234       35800 :     XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL);
    1235       35800 :     if (target_node.dbNode != ctx->slot->data.database)
    1236           0 :         return;
    1237             : 
    1238             :     /* output plugin doesn't look for this origin, no need to queue */
    1239       35800 :     if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
    1240           0 :         return;
    1241             : 
    1242       35800 :     change = ReorderBufferGetChange(ctx->reorder);
    1243       35800 :     change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM;
    1244       35800 :     change->origin_id = XLogRecGetOrigin(r);
    1245             : 
    1246       35800 :     memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode));
    1247             : 
    1248       35800 :     change->data.tp.clear_toast_afterwards = true;
    1249             : 
    1250       35800 :     ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
    1251             :                              change, false);
    1252             : }
    1253             : 
    1254             : 
    1255             : /*
    1256             :  * Read a HeapTuple as WAL logged by heap_insert, heap_update and heap_delete
    1257             :  * (but not by heap_multi_insert) into a tuplebuf.
    1258             :  *
    1259             :  * The size 'len' and the pointer 'data' in the record need to be
    1260             :  * computed outside as they are record specific.
    1261             :  */
    1262             : static void
    1263     2624932 : DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tuple)
    1264             : {
    1265             :     xl_heap_header xlhdr;
    1266     2624932 :     int         datalen = len - SizeOfHeapHeader;
    1267             :     HeapTupleHeader header;
    1268             : 
    1269             :     Assert(datalen >= 0);
    1270             : 
    1271     2624932 :     tuple->tuple.t_len = datalen + SizeofHeapTupleHeader;
    1272     2624932 :     header = tuple->tuple.t_data;
    1273             : 
    1274             :     /* not a disk based tuple */
    1275     2624932 :     ItemPointerSetInvalid(&tuple->tuple.t_self);
    1276             : 
    1277             :     /* we can only figure this out after reassembling the transactions */
    1278     2624932 :     tuple->tuple.t_tableOid = InvalidOid;
    1279             : 
    1280             :     /* data is not stored aligned, copy to aligned storage */
    1281     2624932 :     memcpy((char *) &xlhdr,
    1282             :            data,
    1283             :            SizeOfHeapHeader);
    1284             : 
    1285     2624932 :     memset(header, 0, SizeofHeapTupleHeader);
    1286             : 
    1287     2624932 :     memcpy(((char *) tuple->tuple.t_data) + SizeofHeapTupleHeader,
    1288     2624932 :            data + SizeOfHeapHeader,
    1289             :            datalen);
    1290             : 
    1291     2624932 :     header->t_infomask = xlhdr.t_infomask;
    1292     2624932 :     header->t_infomask2 = xlhdr.t_infomask2;
    1293     2624932 :     header->t_hoff = xlhdr.t_hoff;
    1294     2624932 : }
    1295             : 
    1296             : /*
    1297             :  * Check whether we are interested in this specific transaction.
    1298             :  *
    1299             :  * There can be several reasons we might not be interested in this
    1300             :  * transaction:
    1301             :  * 1) We might not be interested in decoding transactions up to this
    1302             :  *    LSN. This can happen because we previously decoded it and now just
    1303             :  *    are restarting or if we haven't assembled a consistent snapshot yet.
    1304             :  * 2) The transaction happened in another database.
    1305             :  * 3) The output plugin is not interested in the origin.
    1306             :  * 4) We are doing fast-forwarding
    1307             :  */
    1308             : static bool
    1309        3928 : DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
    1310             :                   Oid txn_dbid, RepOriginId origin_id)
    1311             : {
    1312        5456 :     return (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
    1313        1480 :             (txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) ||
    1314        5456 :             ctx->fast_forward || FilterByOrigin(ctx, origin_id));
    1315             : }

Generated by: LCOV version 1.14