LCOV - code coverage report
Current view: top level - src/backend/replication/logical - decode.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18beta1 Lines: 419 441 95.0 %
Date: 2025-05-11 13:15:18 Functions: 20 20 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /* -------------------------------------------------------------------------
       2             :  *
       3             :  * decode.c
       4             :  *      This module decodes WAL records read using xlogreader.h's APIs for the
       5             :  *      purpose of logical decoding by passing information to the
       6             :  *      reorderbuffer module (containing the actual changes) and to the
       7             :  *      snapbuild module to build a fitting catalog snapshot (to be able to
       8             :  *      properly decode the changes in the reorderbuffer).
       9             :  *
      10             :  * NOTE:
      11             :  *      This basically tries to handle all low level xlog stuff for
      12             :  *      reorderbuffer.c and snapbuild.c. There's some minor leakage where a
      13             :  *      specific record's struct is used to pass data along, but those just
      14             :  *      happen to contain the right amount of data in a convenient
      15             :  *      format. There isn't and shouldn't be much intelligence about the
      16             :  *      contents of records in here except turning them into a more usable
      17             :  *      format.
      18             :  *
      19             :  * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
      20             :  * Portions Copyright (c) 1994, Regents of the University of California
      21             :  *
      22             :  * IDENTIFICATION
      23             :  *    src/backend/replication/logical/decode.c
      24             :  *
      25             :  * -------------------------------------------------------------------------
      26             :  */
      27             : #include "postgres.h"
      28             : 
      29             : #include "access/heapam_xlog.h"
      30             : #include "access/transam.h"
      31             : #include "access/xact.h"
      32             : #include "access/xlog_internal.h"
      33             : #include "access/xlogreader.h"
      34             : #include "access/xlogrecord.h"
      35             : #include "catalog/pg_control.h"
      36             : #include "replication/decode.h"
      37             : #include "replication/logical.h"
      38             : #include "replication/message.h"
      39             : #include "replication/reorderbuffer.h"
      40             : #include "replication/snapbuild.h"
      41             : #include "storage/standbydefs.h"
      42             : 
      43             : /* individual record(group)'s handlers */
      44             : static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      45             : static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      46             : static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      47             : static void DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      48             : static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      49             : static void DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      50             : 
      51             : static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
      52             :                          xl_xact_parsed_commit *parsed, TransactionId xid,
      53             :                          bool two_phase);
      54             : static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
      55             :                         xl_xact_parsed_abort *parsed, TransactionId xid,
      56             :                         bool two_phase);
      57             : static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
      58             :                           xl_xact_parsed_prepare *parsed);
      59             : 
      60             : 
      61             : /* common function to decode tuples */
      62             : static void DecodeXLogTuple(char *data, Size len, HeapTuple tuple);
      63             : 
      64             : /* helper functions for decoding transactions */
      65             : static inline bool FilterPrepare(LogicalDecodingContext *ctx,
      66             :                                  TransactionId xid, const char *gid);
      67             : static bool DecodeTXNNeedSkip(LogicalDecodingContext *ctx,
      68             :                               XLogRecordBuffer *buf, Oid txn_dbid,
      69             :                               RepOriginId origin_id);
      70             : 
      71             : /*
      72             :  * Take every XLogReadRecord()ed record and perform the actions required to
      73             :  * decode it using the output plugin already setup in the logical decoding
      74             :  * context.
      75             :  *
      76             :  * NB: Note that every record's xid needs to be processed by reorderbuffer
      77             :  * (xids contained in the content of records are not relevant for this rule).
      78             :  * That means that for records which'd otherwise not go through the
      79             :  * reorderbuffer ReorderBufferProcessXid() has to be called. We don't want to
      80             :  * call ReorderBufferProcessXid for each record type by default, because
      81             :  * e.g. empty xacts can be handled more efficiently if there's no previous
      82             :  * state for them.
      83             :  *
      84             :  * We also support the ability to fast forward thru records, skipping some
      85             :  * record types completely - see individual record types for details.
      86             :  */
      87             : void
      88     4804852 : LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
      89             : {
      90             :     XLogRecordBuffer buf;
      91             :     TransactionId txid;
      92             :     RmgrData    rmgr;
      93             : 
      94     4804852 :     buf.origptr = ctx->reader->ReadRecPtr;
      95     4804852 :     buf.endptr = ctx->reader->EndRecPtr;
      96     4804852 :     buf.record = record;
      97             : 
      98     4804852 :     txid = XLogRecGetTopXid(record);
      99             : 
     100             :     /*
     101             :      * If the top-level xid is valid, we need to assign the subxact to the
     102             :      * top-level xact. We need to do this for all records, hence we do it
     103             :      * before the switch.
     104             :      */
     105     4804852 :     if (TransactionIdIsValid(txid))
     106             :     {
     107        1334 :         ReorderBufferAssignChild(ctx->reorder,
     108             :                                  txid,
     109        1334 :                                  XLogRecGetXid(record),
     110             :                                  buf.origptr);
     111             :     }
     112             : 
     113     4804852 :     rmgr = GetRmgr(XLogRecGetRmid(record));
     114             : 
     115     4804852 :     if (rmgr.rm_decode != NULL)
     116     3700232 :         rmgr.rm_decode(ctx, &buf);
     117             :     else
     118             :     {
     119             :         /* just deal with xid, and done */
     120     1104620 :         ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(record),
     121             :                                 buf.origptr);
     122             :     }
     123     4804836 : }
     124             : 
     125             : /*
     126             :  * Handle rmgr XLOG_ID records for LogicalDecodingProcessRecord().
     127             :  */
     128             : void
     129       11720 : xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     130             : {
     131       11720 :     SnapBuild  *builder = ctx->snapshot_builder;
     132       11720 :     uint8       info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK;
     133             : 
     134       11720 :     ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(buf->record),
     135             :                             buf->origptr);
     136             : 
     137       11720 :     switch (info)
     138             :     {
     139             :             /* this is also used in END_OF_RECOVERY checkpoints */
     140          80 :         case XLOG_CHECKPOINT_SHUTDOWN:
     141             :         case XLOG_END_OF_RECOVERY:
     142          80 :             SnapBuildSerializationPoint(builder, buf->origptr);
     143             : 
     144          80 :             break;
     145         132 :         case XLOG_CHECKPOINT_ONLINE:
     146             : 
     147             :             /*
     148             :              * a RUNNING_XACTS record will have been logged near to this, we
     149             :              * can restart from there.
     150             :              */
     151         132 :             break;
     152           2 :         case XLOG_PARAMETER_CHANGE:
     153             :             {
     154           2 :                 xl_parameter_change *xlrec =
     155           2 :                     (xl_parameter_change *) XLogRecGetData(buf->record);
     156             : 
     157             :                 /*
     158             :                  * If wal_level on the primary is reduced to less than
     159             :                  * logical, we want to prevent existing logical slots from
     160             :                  * being used.  Existing logical slots on the standby get
     161             :                  * invalidated when this WAL record is replayed; and further,
     162             :                  * slot creation fails when wal_level is not sufficient; but
     163             :                  * all these operations are not synchronized, so a logical
     164             :                  * slot may creep in while the wal_level is being reduced.
     165             :                  * Hence this extra check.
     166             :                  */
     167           2 :                 if (xlrec->wal_level < WAL_LEVEL_LOGICAL)
     168             :                 {
     169             :                     /*
     170             :                      * This can occur only on a standby, as a primary would
     171             :                      * not allow to restart after changing wal_level < logical
     172             :                      * if there is pre-existing logical slot.
     173             :                      */
     174             :                     Assert(RecoveryInProgress());
     175           0 :                     ereport(ERROR,
     176             :                             (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     177             :                              errmsg("logical decoding on standby requires \"wal_level\" >= \"logical\" on the primary")));
     178             :                 }
     179           2 :                 break;
     180             :             }
     181       11506 :         case XLOG_NOOP:
     182             :         case XLOG_NEXTOID:
     183             :         case XLOG_SWITCH:
     184             :         case XLOG_BACKUP_END:
     185             :         case XLOG_RESTORE_POINT:
     186             :         case XLOG_FPW_CHANGE:
     187             :         case XLOG_FPI_FOR_HINT:
     188             :         case XLOG_FPI:
     189             :         case XLOG_OVERWRITE_CONTRECORD:
     190             :         case XLOG_CHECKPOINT_REDO:
     191       11506 :             break;
     192           0 :         default:
     193           0 :             elog(ERROR, "unexpected RM_XLOG_ID record type: %u", info);
     194             :     }
     195       11720 : }
     196             : 
     197             : /*
     198             :  * Handle rmgr XACT_ID records for LogicalDecodingProcessRecord().
     199             :  */
     200             : void
     201       15974 : xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     202             : {
     203       15974 :     SnapBuild  *builder = ctx->snapshot_builder;
     204       15974 :     ReorderBuffer *reorder = ctx->reorder;
     205       15974 :     XLogReaderState *r = buf->record;
     206       15974 :     uint8       info = XLogRecGetInfo(r) & XLOG_XACT_OPMASK;
     207             : 
     208             :     /*
     209             :      * If the snapshot isn't yet fully built, we cannot decode anything, so
     210             :      * bail out.
     211             :      */
     212       15974 :     if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
     213          28 :         return;
     214             : 
     215       15946 :     switch (info)
     216             :     {
     217        5498 :         case XLOG_XACT_COMMIT:
     218             :         case XLOG_XACT_COMMIT_PREPARED:
     219             :             {
     220             :                 xl_xact_commit *xlrec;
     221             :                 xl_xact_parsed_commit parsed;
     222             :                 TransactionId xid;
     223        5498 :                 bool        two_phase = false;
     224             : 
     225        5498 :                 xlrec = (xl_xact_commit *) XLogRecGetData(r);
     226        5498 :                 ParseCommitRecord(XLogRecGetInfo(buf->record), xlrec, &parsed);
     227             : 
     228        5498 :                 if (!TransactionIdIsValid(parsed.twophase_xid))
     229        5294 :                     xid = XLogRecGetXid(r);
     230             :                 else
     231         204 :                     xid = parsed.twophase_xid;
     232             : 
     233             :                 /*
     234             :                  * We would like to process the transaction in a two-phase
     235             :                  * manner iff output plugin supports two-phase commits and
     236             :                  * doesn't filter the transaction at prepare time.
     237             :                  */
     238        5498 :                 if (info == XLOG_XACT_COMMIT_PREPARED)
     239         204 :                     two_phase = !(FilterPrepare(ctx, xid,
     240         204 :                                                 parsed.twophase_gid));
     241             : 
     242        5498 :                 DecodeCommit(ctx, buf, &parsed, xid, two_phase);
     243        5486 :                 break;
     244             :             }
     245         230 :         case XLOG_XACT_ABORT:
     246             :         case XLOG_XACT_ABORT_PREPARED:
     247             :             {
     248             :                 xl_xact_abort *xlrec;
     249             :                 xl_xact_parsed_abort parsed;
     250             :                 TransactionId xid;
     251         230 :                 bool        two_phase = false;
     252             : 
     253         230 :                 xlrec = (xl_xact_abort *) XLogRecGetData(r);
     254         230 :                 ParseAbortRecord(XLogRecGetInfo(buf->record), xlrec, &parsed);
     255             : 
     256         230 :                 if (!TransactionIdIsValid(parsed.twophase_xid))
     257         152 :                     xid = XLogRecGetXid(r);
     258             :                 else
     259          78 :                     xid = parsed.twophase_xid;
     260             : 
     261             :                 /*
     262             :                  * We would like to process the transaction in a two-phase
     263             :                  * manner iff output plugin supports two-phase commits and
     264             :                  * doesn't filter the transaction at prepare time.
     265             :                  */
     266         230 :                 if (info == XLOG_XACT_ABORT_PREPARED)
     267          78 :                     two_phase = !(FilterPrepare(ctx, xid,
     268          78 :                                                 parsed.twophase_gid));
     269             : 
     270         230 :                 DecodeAbort(ctx, buf, &parsed, xid, two_phase);
     271         230 :                 break;
     272             :             }
     273         246 :         case XLOG_XACT_ASSIGNMENT:
     274             : 
     275             :             /*
     276             :              * We assign subxact to the toplevel xact while processing each
     277             :              * record if required.  So, we don't need to do anything here. See
     278             :              * LogicalDecodingProcessRecord.
     279             :              */
     280         246 :             break;
     281        9646 :         case XLOG_XACT_INVALIDATIONS:
     282             :             {
     283             :                 TransactionId xid;
     284             :                 xl_xact_invals *invals;
     285             : 
     286        9646 :                 xid = XLogRecGetXid(r);
     287        9646 :                 invals = (xl_xact_invals *) XLogRecGetData(r);
     288             : 
     289             :                 /*
     290             :                  * Execute the invalidations for xid-less transactions,
     291             :                  * otherwise, accumulate them so that they can be processed at
     292             :                  * the commit time.
     293             :                  */
     294        9646 :                 if (TransactionIdIsValid(xid))
     295             :                 {
     296        9642 :                     if (!ctx->fast_forward)
     297        9576 :                         ReorderBufferAddInvalidations(reorder, xid,
     298             :                                                       buf->origptr,
     299        9576 :                                                       invals->nmsgs,
     300        9576 :                                                       invals->msgs);
     301        9642 :                     ReorderBufferXidSetCatalogChanges(ctx->reorder, xid,
     302             :                                                       buf->origptr);
     303             :                 }
     304           4 :                 else if (!ctx->fast_forward)
     305           4 :                     ReorderBufferImmediateInvalidation(ctx->reorder,
     306           4 :                                                        invals->nmsgs,
     307           4 :                                                        invals->msgs);
     308             : 
     309        9646 :                 break;
     310             :             }
     311         326 :         case XLOG_XACT_PREPARE:
     312             :             {
     313             :                 xl_xact_parsed_prepare parsed;
     314             :                 xl_xact_prepare *xlrec;
     315             : 
     316             :                 /* ok, parse it */
     317         326 :                 xlrec = (xl_xact_prepare *) XLogRecGetData(r);
     318         326 :                 ParsePrepareRecord(XLogRecGetInfo(buf->record),
     319             :                                    xlrec, &parsed);
     320             : 
     321             :                 /*
     322             :                  * We would like to process the transaction in a two-phase
     323             :                  * manner iff output plugin supports two-phase commits and
     324             :                  * doesn't filter the transaction at prepare time.
     325             :                  */
     326         326 :                 if (FilterPrepare(ctx, parsed.twophase_xid,
     327             :                                   parsed.twophase_gid))
     328             :                 {
     329          34 :                     ReorderBufferProcessXid(reorder, parsed.twophase_xid,
     330             :                                             buf->origptr);
     331          34 :                     break;
     332             :                 }
     333             : 
     334             :                 /*
     335             :                  * Note that if the prepared transaction has locked [user]
     336             :                  * catalog tables exclusively then decoding prepare can block
     337             :                  * till the main transaction is committed because it needs to
     338             :                  * lock the catalog tables.
     339             :                  *
     340             :                  * XXX Now, this can even lead to a deadlock if the prepare
     341             :                  * transaction is waiting to get it logically replicated for
     342             :                  * distributed 2PC. This can be avoided by disallowing
     343             :                  * preparing transactions that have locked [user] catalog
     344             :                  * tables exclusively but as of now, we ask users not to do
     345             :                  * such an operation.
     346             :                  */
     347         292 :                 DecodePrepare(ctx, buf, &parsed);
     348         292 :                 break;
     349             :             }
     350           0 :         default:
     351           0 :             elog(ERROR, "unexpected RM_XACT_ID record type: %u", info);
     352             :     }
     353             : }
     354             : 
     355             : /*
     356             :  * Handle rmgr STANDBY_ID records for LogicalDecodingProcessRecord().
     357             :  */
     358             : void
     359        7162 : standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     360             : {
     361        7162 :     SnapBuild  *builder = ctx->snapshot_builder;
     362        7162 :     XLogReaderState *r = buf->record;
     363        7162 :     uint8       info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
     364             : 
     365        7162 :     ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
     366             : 
     367        7162 :     switch (info)
     368             :     {
     369        2678 :         case XLOG_RUNNING_XACTS:
     370             :             {
     371        2678 :                 xl_running_xacts *running = (xl_running_xacts *) XLogRecGetData(r);
     372             : 
     373        2678 :                 SnapBuildProcessRunningXacts(builder, buf->origptr, running);
     374             : 
     375             :                 /*
     376             :                  * Abort all transactions that we keep track of, that are
     377             :                  * older than the record's oldestRunningXid. This is the most
     378             :                  * convenient spot for doing so since, in contrast to shutdown
     379             :                  * or end-of-recovery checkpoints, we have information about
     380             :                  * all running transactions which includes prepared ones,
     381             :                  * while shutdown checkpoints just know that no non-prepared
     382             :                  * transactions are in progress.
     383             :                  */
     384        2674 :                 ReorderBufferAbortOld(ctx->reorder, running->oldestRunningXid);
     385             :             }
     386        2674 :             break;
     387        4480 :         case XLOG_STANDBY_LOCK:
     388        4480 :             break;
     389           4 :         case XLOG_INVALIDATIONS:
     390             : 
     391             :             /*
     392             :              * We are processing the invalidations at the command level via
     393             :              * XLOG_XACT_INVALIDATIONS.  So we don't need to do anything here.
     394             :              */
     395           4 :             break;
     396           0 :         default:
     397           0 :             elog(ERROR, "unexpected RM_STANDBY_ID record type: %u", info);
     398             :     }
     399        7158 : }
     400             : 
     401             : /*
     402             :  * Handle rmgr HEAP2_ID records for LogicalDecodingProcessRecord().
     403             :  */
     404             : void
     405       64466 : heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     406             : {
     407       64466 :     uint8       info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK;
     408       64466 :     TransactionId xid = XLogRecGetXid(buf->record);
     409       64466 :     SnapBuild  *builder = ctx->snapshot_builder;
     410             : 
     411       64466 :     ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
     412             : 
     413             :     /*
     414             :      * If we don't have snapshot or we are just fast-forwarding, there is no
     415             :      * point in decoding data changes. However, it's crucial to build the base
     416             :      * snapshot during fast-forward mode (as is done in
     417             :      * SnapBuildProcessChange()) because we require the snapshot's xmin when
     418             :      * determining the candidate catalog_xmin for the replication slot. See
     419             :      * SnapBuildProcessRunningXacts().
     420             :      */
     421       64466 :     if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
     422          22 :         return;
     423             : 
     424       64444 :     switch (info)
     425             :     {
     426       11760 :         case XLOG_HEAP2_MULTI_INSERT:
     427       11760 :             if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
     428       11760 :                 !ctx->fast_forward)
     429       11630 :                 DecodeMultiInsert(ctx, buf);
     430       11760 :             break;
     431       47718 :         case XLOG_HEAP2_NEW_CID:
     432       47718 :             if (!ctx->fast_forward)
     433             :             {
     434             :                 xl_heap_new_cid *xlrec;
     435             : 
     436       47356 :                 xlrec = (xl_heap_new_cid *) XLogRecGetData(buf->record);
     437       47356 :                 SnapBuildProcessNewCid(builder, xid, buf->origptr, xlrec);
     438             : 
     439       47356 :                 break;
     440             :             }
     441             :         case XLOG_HEAP2_REWRITE:
     442             : 
     443             :             /*
     444             :              * Although these records only exist to serve the needs of logical
     445             :              * decoding, all the work happens as part of crash or archive
     446             :              * recovery, so we don't need to do anything here.
     447             :              */
     448         542 :             break;
     449             : 
     450             :             /*
     451             :              * Everything else here is just low level physical stuff we're not
     452             :              * interested in.
     453             :              */
     454        4786 :         case XLOG_HEAP2_PRUNE_ON_ACCESS:
     455             :         case XLOG_HEAP2_PRUNE_VACUUM_SCAN:
     456             :         case XLOG_HEAP2_PRUNE_VACUUM_CLEANUP:
     457             :         case XLOG_HEAP2_VISIBLE:
     458             :         case XLOG_HEAP2_LOCK_UPDATED:
     459        4786 :             break;
     460           0 :         default:
     461           0 :             elog(ERROR, "unexpected RM_HEAP2_ID record type: %u", info);
     462             :     }
     463             : }
     464             : 
     465             : /*
     466             :  * Handle rmgr HEAP_ID records for LogicalDecodingProcessRecord().
     467             :  */
     468             : void
     469     3600796 : heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     470             : {
     471     3600796 :     uint8       info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK;
     472     3600796 :     TransactionId xid = XLogRecGetXid(buf->record);
     473     3600796 :     SnapBuild  *builder = ctx->snapshot_builder;
     474             : 
     475     3600796 :     ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
     476             : 
     477             :     /*
     478             :      * If we don't have snapshot or we are just fast-forwarding, there is no
     479             :      * point in decoding data changes. However, it's crucial to build the base
     480             :      * snapshot during fast-forward mode (as is done in
     481             :      * SnapBuildProcessChange()) because we require the snapshot's xmin when
     482             :      * determining the candidate catalog_xmin for the replication slot. See
     483             :      * SnapBuildProcessRunningXacts().
     484             :      */
     485     3600796 :     if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
     486           8 :         return;
     487             : 
     488     3600788 :     switch (info)
     489             :     {
     490     2232844 :         case XLOG_HEAP_INSERT:
     491     2232844 :             if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
     492     2232844 :                 !ctx->fast_forward)
     493     2232596 :                 DecodeInsert(ctx, buf);
     494     2232844 :             break;
     495             : 
     496             :             /*
     497             :              * Treat HOT update as normal updates. There is no useful
     498             :              * information in the fact that we could make it a HOT update
     499             :              * locally and the WAL layout is compatible.
     500             :              */
     501      414328 :         case XLOG_HEAP_HOT_UPDATE:
     502             :         case XLOG_HEAP_UPDATE:
     503      414328 :             if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
     504      414328 :                 !ctx->fast_forward)
     505      414314 :                 DecodeUpdate(ctx, buf);
     506      414328 :             break;
     507             : 
     508      534854 :         case XLOG_HEAP_DELETE:
     509      534854 :             if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
     510      534854 :                 !ctx->fast_forward)
     511      534848 :                 DecodeDelete(ctx, buf);
     512      534854 :             break;
     513             : 
     514          88 :         case XLOG_HEAP_TRUNCATE:
     515          88 :             if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
     516          88 :                 !ctx->fast_forward)
     517          86 :                 DecodeTruncate(ctx, buf);
     518          88 :             break;
     519             : 
     520        2032 :         case XLOG_HEAP_INPLACE:
     521             : 
     522             :             /*
     523             :              * Inplace updates are only ever performed on catalog tuples and
     524             :              * can, per definition, not change tuple visibility.  Inplace
     525             :              * updates don't affect storage or interpretation of table rows,
     526             :              * so they don't affect logicalrep_write_tuple() outcomes.  Hence,
     527             :              * we don't process invalidations from the original operation.  If
     528             :              * inplace updates did affect those things, invalidations wouldn't
     529             :              * make it work, since there are no snapshot-specific versions of
     530             :              * inplace-updated values.  Since we also don't decode catalog
     531             :              * tuples, we're not interested in the record's contents.
     532             :              *
     533             :              * WAL contains likely-unnecessary commit-time invals from the
     534             :              * CacheInvalidateHeapTuple() call in
     535             :              * heap_inplace_update_and_unlock(). Excess invalidation is safe.
     536             :              */
     537        2032 :             break;
     538             : 
     539       35832 :         case XLOG_HEAP_CONFIRM:
     540       35832 :             if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
     541       35832 :                 !ctx->fast_forward)
     542       35832 :                 DecodeSpecConfirm(ctx, buf);
     543       35832 :             break;
     544             : 
     545      380810 :         case XLOG_HEAP_LOCK:
     546             :             /* we don't care about row level locks for now */
     547      380810 :             break;
     548             : 
     549           0 :         default:
     550           0 :             elog(ERROR, "unexpected RM_HEAP_ID record type: %u", info);
     551             :             break;
     552             :     }
     553             : }
     554             : 
     555             : /*
     556             :  * Ask output plugin whether we want to skip this PREPARE and send
     557             :  * this transaction as a regular commit later.
     558             :  */
     559             : static inline bool
     560         608 : FilterPrepare(LogicalDecodingContext *ctx, TransactionId xid,
     561             :               const char *gid)
     562             : {
     563             :     /*
     564             :      * Skip if decoding of two-phase transactions at PREPARE time is not
     565             :      * enabled. In that case, all two-phase transactions are considered
     566             :      * filtered out and will be applied as regular transactions at COMMIT
     567             :      * PREPARED.
     568             :      */
     569         608 :     if (!ctx->twophase)
     570          34 :         return true;
     571             : 
     572             :     /*
     573             :      * The filter_prepare callback is optional. When not supplied, all
     574             :      * prepared transactions should go through.
     575             :      */
     576         574 :     if (ctx->callbacks.filter_prepare_cb == NULL)
     577         278 :         return false;
     578             : 
     579         296 :     return filter_prepare_cb_wrapper(ctx, xid, gid);
     580             : }
     581             : 
     582             : static inline bool
     583     3213134 : FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
     584             : {
     585     3213134 :     if (ctx->callbacks.filter_by_origin_cb == NULL)
     586          50 :         return false;
     587             : 
     588     3213084 :     return filter_by_origin_cb_wrapper(ctx, origin_id);
     589             : }
     590             : 
     591             : /*
     592             :  * Handle rmgr LOGICALMSG_ID records for LogicalDecodingProcessRecord().
     593             :  */
     594             : void
     595         114 : logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     596             : {
     597         114 :     SnapBuild  *builder = ctx->snapshot_builder;
     598         114 :     XLogReaderState *r = buf->record;
     599         114 :     TransactionId xid = XLogRecGetXid(r);
     600         114 :     uint8       info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
     601         114 :     RepOriginId origin_id = XLogRecGetOrigin(r);
     602         114 :     Snapshot    snapshot = NULL;
     603             :     xl_logical_message *message;
     604             : 
     605         114 :     if (info != XLOG_LOGICAL_MESSAGE)
     606           0 :         elog(ERROR, "unexpected RM_LOGICALMSG_ID record type: %u", info);
     607             : 
     608         114 :     ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
     609             : 
     610             :     /* If we don't have snapshot, there is no point in decoding messages */
     611         114 :     if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
     612           0 :         return;
     613             : 
     614         114 :     message = (xl_logical_message *) XLogRecGetData(r);
     615             : 
     616         224 :     if (message->dbId != ctx->slot->data.database ||
     617         110 :         FilterByOrigin(ctx, origin_id))
     618           8 :         return;
     619             : 
     620         106 :     if (message->transactional &&
     621          78 :         !SnapBuildProcessChange(builder, xid, buf->origptr))
     622           0 :         return;
     623         134 :     else if (!message->transactional &&
     624          56 :              (SnapBuildCurrentState(builder) != SNAPBUILD_CONSISTENT ||
     625          28 :               SnapBuildXactNeedsSkip(builder, buf->origptr)))
     626           8 :         return;
     627             : 
     628             :     /*
     629             :      * We also skip decoding in fast_forward mode. This check must be last
     630             :      * because we don't want to set the processing_required flag unless we
     631             :      * have a decodable message.
     632             :      */
     633          98 :     if (ctx->fast_forward)
     634             :     {
     635             :         /*
     636             :          * We need to set processing_required flag to notify the message's
     637             :          * existence to the caller. Usually, the flag is set when either the
     638             :          * COMMIT or ABORT records are decoded, but this must be turned on
     639             :          * here because the non-transactional logical message is decoded
     640             :          * without waiting for these records.
     641             :          */
     642           4 :         if (!message->transactional)
     643           4 :             ctx->processing_required = true;
     644             : 
     645           4 :         return;
     646             :     }
     647             : 
     648             :     /*
     649             :      * If this is a non-transactional change, get the snapshot we're expected
     650             :      * to use. We only get here when the snapshot is consistent, and the
     651             :      * change is not meant to be skipped.
     652             :      *
     653             :      * For transactional changes we don't need a snapshot, we'll use the
     654             :      * regular snapshot maintained by ReorderBuffer. We just leave it NULL.
     655             :      */
     656          94 :     if (!message->transactional)
     657          16 :         snapshot = SnapBuildGetOrBuildSnapshot(builder);
     658             : 
     659          94 :     ReorderBufferQueueMessage(ctx->reorder, xid, snapshot, buf->endptr,
     660          94 :                               message->transactional,
     661          94 :                               message->message, /* first part of message is
     662             :                                                  * prefix */
     663             :                               message->message_size,
     664          94 :                               message->message + message->prefix_size);
     665             : }
     666             : 
     667             : /*
     668             :  * Consolidated commit record handling between the different form of commit
     669             :  * records.
     670             :  *
     671             :  * 'two_phase' indicates that caller wants to process the transaction in two
     672             :  * phases, first process prepare if not already done and then process
     673             :  * commit_prepared.
     674             :  */
     675             : static void
     676        5498 : DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
     677             :              xl_xact_parsed_commit *parsed, TransactionId xid,
     678             :              bool two_phase)
     679             : {
     680        5498 :     XLogRecPtr  origin_lsn = InvalidXLogRecPtr;
     681        5498 :     TimestampTz commit_time = parsed->xact_time;
     682        5498 :     RepOriginId origin_id = XLogRecGetOrigin(buf->record);
     683             :     int         i;
     684             : 
     685        5498 :     if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
     686             :     {
     687         124 :         origin_lsn = parsed->origin_lsn;
     688         124 :         commit_time = parsed->origin_timestamp;
     689             :     }
     690             : 
     691        5498 :     SnapBuildCommitTxn(ctx->snapshot_builder, buf->origptr, xid,
     692             :                        parsed->nsubxacts, parsed->subxacts,
     693             :                        parsed->xinfo);
     694             : 
     695             :     /* ----
     696             :      * Check whether we are interested in this specific transaction, and tell
     697             :      * the reorderbuffer to forget the content of the (sub-)transactions
     698             :      * if not.
     699             :      *
     700             :      * We can't just use ReorderBufferAbort() here, because we need to execute
     701             :      * the transaction's invalidations.  This currently won't be needed if
     702             :      * we're just skipping over the transaction because currently we only do
     703             :      * so during startup, to get to the first transaction the client needs. As
     704             :      * we have reset the catalog caches before starting to read WAL, and we
     705             :      * haven't yet touched any catalogs, there can't be anything to invalidate.
     706             :      * But if we're "forgetting" this commit because it happened in another
     707             :      * database, the invalidations might be important, because they could be
     708             :      * for shared catalogs and we might have loaded data into the relevant
     709             :      * syscaches.
     710             :      * ---
     711             :      */
     712        5498 :     if (DecodeTXNNeedSkip(ctx, buf, parsed->dbId, origin_id))
     713             :     {
     714        4858 :         for (i = 0; i < parsed->nsubxacts; i++)
     715             :         {
     716        1934 :             ReorderBufferForget(ctx->reorder, parsed->subxacts[i], buf->origptr);
     717             :         }
     718        2924 :         ReorderBufferForget(ctx->reorder, xid, buf->origptr);
     719             : 
     720        2924 :         return;
     721             :     }
     722             : 
     723             :     /* tell the reorderbuffer about the surviving subtransactions */
     724        3106 :     for (i = 0; i < parsed->nsubxacts; i++)
     725             :     {
     726         532 :         ReorderBufferCommitChild(ctx->reorder, xid, parsed->subxacts[i],
     727             :                                  buf->origptr, buf->endptr);
     728             :     }
     729             : 
     730             :     /*
     731             :      * Send the final commit record if the transaction data is already
     732             :      * decoded, otherwise, process the entire transaction.
     733             :      */
     734        2574 :     if (two_phase)
     735             :     {
     736          66 :         ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
     737          66 :                                     SnapBuildGetTwoPhaseAt(ctx->snapshot_builder),
     738             :                                     commit_time, origin_id, origin_lsn,
     739          66 :                                     parsed->twophase_gid, true);
     740             :     }
     741             :     else
     742             :     {
     743        2508 :         ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
     744             :                             commit_time, origin_id, origin_lsn);
     745             :     }
     746             : 
     747             :     /*
     748             :      * Update the decoding stats at transaction prepare/commit/abort.
     749             :      * Additionally we send the stats when we spill or stream the changes to
     750             :      * avoid losing them in case the decoding is interrupted. It is not clear
     751             :      * that sending more or less frequently than this would be better.
     752             :      */
     753        2562 :     UpdateDecodingStats(ctx);
     754             : }
     755             : 
     756             : /*
     757             :  * Decode PREPARE record. Similar logic as in DecodeCommit.
     758             :  *
     759             :  * Note that we don't skip prepare even if have detected concurrent abort
     760             :  * because it is quite possible that we had already sent some changes before we
     761             :  * detect abort in which case we need to abort those changes in the subscriber.
     762             :  * To abort such changes, we do send the prepare and then the rollback prepared
     763             :  * which is what happened on the publisher-side as well. Now, we can invent a
     764             :  * new abort API wherein in such cases we send abort and skip sending prepared
     765             :  * and rollback prepared but then it is not that straightforward because we
     766             :  * might have streamed this transaction by that time in which case it is
     767             :  * handled when the rollback is encountered. It is not impossible to optimize
     768             :  * the concurrent abort case but it can introduce design complexity w.r.t
     769             :  * handling different cases so leaving it for now as it doesn't seem worth it.
     770             :  */
     771             : static void
     772         292 : DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
     773             :               xl_xact_parsed_prepare *parsed)
     774             : {
     775         292 :     SnapBuild  *builder = ctx->snapshot_builder;
     776         292 :     XLogRecPtr  origin_lsn = parsed->origin_lsn;
     777         292 :     TimestampTz prepare_time = parsed->xact_time;
     778         292 :     RepOriginId origin_id = XLogRecGetOrigin(buf->record);
     779             :     int         i;
     780         292 :     TransactionId xid = parsed->twophase_xid;
     781             : 
     782         292 :     if (parsed->origin_timestamp != 0)
     783          16 :         prepare_time = parsed->origin_timestamp;
     784             : 
     785             :     /*
     786             :      * Remember the prepare info for a txn so that it can be used later in
     787             :      * commit prepared if required. See ReorderBufferFinishPrepared.
     788             :      */
     789         292 :     if (!ReorderBufferRememberPrepareInfo(ctx->reorder, xid, buf->origptr,
     790             :                                           buf->endptr, prepare_time, origin_id,
     791             :                                           origin_lsn))
     792           0 :         return;
     793             : 
     794             :     /* We can't start streaming unless a consistent state is reached. */
     795         292 :     if (SnapBuildCurrentState(builder) < SNAPBUILD_CONSISTENT)
     796             :     {
     797           6 :         ReorderBufferSkipPrepare(ctx->reorder, xid);
     798           6 :         return;
     799             :     }
     800             : 
     801             :     /*
     802             :      * Check whether we need to process this transaction. See
     803             :      * DecodeTXNNeedSkip for the reasons why we sometimes want to skip the
     804             :      * transaction.
     805             :      *
     806             :      * We can't call ReorderBufferForget as we did in DecodeCommit as the txn
     807             :      * hasn't yet been committed, removing this txn before a commit might
     808             :      * result in the computation of an incorrect restart_lsn. See
     809             :      * SnapBuildProcessRunningXacts. But we need to process cache
     810             :      * invalidations if there are any for the reasons mentioned in
     811             :      * DecodeCommit.
     812             :      */
     813         286 :     if (DecodeTXNNeedSkip(ctx, buf, parsed->dbId, origin_id))
     814             :     {
     815         202 :         ReorderBufferSkipPrepare(ctx->reorder, xid);
     816         202 :         ReorderBufferInvalidate(ctx->reorder, xid, buf->origptr);
     817         202 :         return;
     818             :     }
     819             : 
     820             :     /* Tell the reorderbuffer about the surviving subtransactions. */
     821          86 :     for (i = 0; i < parsed->nsubxacts; i++)
     822             :     {
     823           2 :         ReorderBufferCommitChild(ctx->reorder, xid, parsed->subxacts[i],
     824             :                                  buf->origptr, buf->endptr);
     825             :     }
     826             : 
     827             :     /* replay actions of all transaction + subtransactions in order */
     828          84 :     ReorderBufferPrepare(ctx->reorder, xid, parsed->twophase_gid);
     829             : 
     830             :     /*
     831             :      * Update the decoding stats at transaction prepare/commit/abort.
     832             :      * Additionally we send the stats when we spill or stream the changes to
     833             :      * avoid losing them in case the decoding is interrupted. It is not clear
     834             :      * that sending more or less frequently than this would be better.
     835             :      */
     836          84 :     UpdateDecodingStats(ctx);
     837             : }
     838             : 
     839             : 
     840             : /*
     841             :  * Get the data from the various forms of abort records and pass it on to
     842             :  * snapbuild.c and reorderbuffer.c.
     843             :  *
     844             :  * 'two_phase' indicates to finish prepared transaction.
     845             :  */
     846             : static void
     847         230 : DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
     848             :             xl_xact_parsed_abort *parsed, TransactionId xid,
     849             :             bool two_phase)
     850             : {
     851             :     int         i;
     852         230 :     XLogRecPtr  origin_lsn = InvalidXLogRecPtr;
     853         230 :     TimestampTz abort_time = parsed->xact_time;
     854         230 :     RepOriginId origin_id = XLogRecGetOrigin(buf->record);
     855             :     bool        skip_xact;
     856             : 
     857         230 :     if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
     858             :     {
     859           8 :         origin_lsn = parsed->origin_lsn;
     860           8 :         abort_time = parsed->origin_timestamp;
     861             :     }
     862             : 
     863             :     /*
     864             :      * Check whether we need to process this transaction. See
     865             :      * DecodeTXNNeedSkip for the reasons why we sometimes want to skip the
     866             :      * transaction.
     867             :      */
     868         230 :     skip_xact = DecodeTXNNeedSkip(ctx, buf, parsed->dbId, origin_id);
     869             : 
     870             :     /*
     871             :      * Send the final rollback record for a prepared transaction unless we
     872             :      * need to skip it. For non-two-phase xacts, simply forget the xact.
     873             :      */
     874         230 :     if (two_phase && !skip_xact)
     875             :     {
     876          22 :         ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
     877             :                                     InvalidXLogRecPtr,
     878             :                                     abort_time, origin_id, origin_lsn,
     879          22 :                                     parsed->twophase_gid, false);
     880             :     }
     881             :     else
     882             :     {
     883         220 :         for (i = 0; i < parsed->nsubxacts; i++)
     884             :         {
     885          12 :             ReorderBufferAbort(ctx->reorder, parsed->subxacts[i],
     886          12 :                                buf->record->EndRecPtr, abort_time);
     887             :         }
     888             : 
     889         208 :         ReorderBufferAbort(ctx->reorder, xid, buf->record->EndRecPtr,
     890             :                            abort_time);
     891             :     }
     892             : 
     893             :     /* update the decoding stats */
     894         230 :     UpdateDecodingStats(ctx);
     895         230 : }
     896             : 
     897             : /*
     898             :  * Parse XLOG_HEAP_INSERT (not MULTI_INSERT!) records into tuplebufs.
     899             :  *
     900             :  * Inserts can contain the new tuple.
     901             :  */
     902             : static void
     903     2232596 : DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     904             : {
     905             :     Size        datalen;
     906             :     char       *tupledata;
     907             :     Size        tuplelen;
     908     2232596 :     XLogReaderState *r = buf->record;
     909             :     xl_heap_insert *xlrec;
     910             :     ReorderBufferChange *change;
     911             :     RelFileLocator target_locator;
     912             : 
     913     2232596 :     xlrec = (xl_heap_insert *) XLogRecGetData(r);
     914             : 
     915             :     /*
     916             :      * Ignore insert records without new tuples (this does happen when
     917             :      * raw_heap_insert marks the TOAST record as HEAP_INSERT_NO_LOGICAL).
     918             :      */
     919     2232596 :     if (!(xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE))
     920        7510 :         return;
     921             : 
     922             :     /* only interested in our database */
     923     2225120 :     XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
     924     2225120 :     if (target_locator.dbOid != ctx->slot->data.database)
     925           0 :         return;
     926             : 
     927             :     /* output plugin doesn't look for this origin, no need to queue */
     928     2225120 :     if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
     929          34 :         return;
     930             : 
     931     2225086 :     change = ReorderBufferAllocChange(ctx->reorder);
     932     2225086 :     if (!(xlrec->flags & XLH_INSERT_IS_SPECULATIVE))
     933     2189254 :         change->action = REORDER_BUFFER_CHANGE_INSERT;
     934             :     else
     935       35832 :         change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT;
     936     2225086 :     change->origin_id = XLogRecGetOrigin(r);
     937             : 
     938     2225086 :     memcpy(&change->data.tp.rlocator, &target_locator, sizeof(RelFileLocator));
     939             : 
     940     2225086 :     tupledata = XLogRecGetBlockData(r, 0, &datalen);
     941     2225086 :     tuplelen = datalen - SizeOfHeapHeader;
     942             : 
     943     2225086 :     change->data.tp.newtuple =
     944     2225086 :         ReorderBufferAllocTupleBuf(ctx->reorder, tuplelen);
     945             : 
     946     2225086 :     DecodeXLogTuple(tupledata, datalen, change->data.tp.newtuple);
     947             : 
     948     2225086 :     change->data.tp.clear_toast_afterwards = true;
     949             : 
     950     2225086 :     ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
     951             :                              change,
     952     2225086 :                              xlrec->flags & XLH_INSERT_ON_TOAST_RELATION);
     953             : }
     954             : 
     955             : /*
     956             :  * Parse XLOG_HEAP_UPDATE and XLOG_HEAP_HOT_UPDATE, which have the same layout
     957             :  * in the record, from wal into proper tuplebufs.
     958             :  *
     959             :  * Updates can possibly contain a new tuple and the old primary key.
     960             :  */
     961             : static void
     962      414314 : DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     963             : {
     964      414314 :     XLogReaderState *r = buf->record;
     965             :     xl_heap_update *xlrec;
     966             :     ReorderBufferChange *change;
     967             :     char       *data;
     968             :     RelFileLocator target_locator;
     969             : 
     970      414314 :     xlrec = (xl_heap_update *) XLogRecGetData(r);
     971             : 
     972             :     /* only interested in our database */
     973      414314 :     XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
     974      414314 :     if (target_locator.dbOid != ctx->slot->data.database)
     975          64 :         return;
     976             : 
     977             :     /* output plugin doesn't look for this origin, no need to queue */
     978      414302 :     if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
     979          52 :         return;
     980             : 
     981      414250 :     change = ReorderBufferAllocChange(ctx->reorder);
     982      414250 :     change->action = REORDER_BUFFER_CHANGE_UPDATE;
     983      414250 :     change->origin_id = XLogRecGetOrigin(r);
     984      414250 :     memcpy(&change->data.tp.rlocator, &target_locator, sizeof(RelFileLocator));
     985             : 
     986      414250 :     if (xlrec->flags & XLH_UPDATE_CONTAINS_NEW_TUPLE)
     987             :     {
     988             :         Size        datalen;
     989             :         Size        tuplelen;
     990             : 
     991      411000 :         data = XLogRecGetBlockData(r, 0, &datalen);
     992             : 
     993      411000 :         tuplelen = datalen - SizeOfHeapHeader;
     994             : 
     995      411000 :         change->data.tp.newtuple =
     996      411000 :             ReorderBufferAllocTupleBuf(ctx->reorder, tuplelen);
     997             : 
     998      411000 :         DecodeXLogTuple(data, datalen, change->data.tp.newtuple);
     999             :     }
    1000             : 
    1001      414250 :     if (xlrec->flags & XLH_UPDATE_CONTAINS_OLD)
    1002             :     {
    1003             :         Size        datalen;
    1004             :         Size        tuplelen;
    1005             : 
    1006             :         /* caution, remaining data in record is not aligned */
    1007         706 :         data = XLogRecGetData(r) + SizeOfHeapUpdate;
    1008         706 :         datalen = XLogRecGetDataLen(r) - SizeOfHeapUpdate;
    1009         706 :         tuplelen = datalen - SizeOfHeapHeader;
    1010             : 
    1011         706 :         change->data.tp.oldtuple =
    1012         706 :             ReorderBufferAllocTupleBuf(ctx->reorder, tuplelen);
    1013             : 
    1014         706 :         DecodeXLogTuple(data, datalen, change->data.tp.oldtuple);
    1015             :     }
    1016             : 
    1017      414250 :     change->data.tp.clear_toast_afterwards = true;
    1018             : 
    1019      414250 :     ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
    1020             :                              change, false);
    1021             : }
    1022             : 
    1023             : /*
    1024             :  * Parse XLOG_HEAP_DELETE from wal into proper tuplebufs.
    1025             :  *
    1026             :  * Deletes can possibly contain the old primary key.
    1027             :  */
    1028             : static void
    1029      534848 : DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
    1030             : {
    1031      534848 :     XLogReaderState *r = buf->record;
    1032             :     xl_heap_delete *xlrec;
    1033             :     ReorderBufferChange *change;
    1034             :     RelFileLocator target_locator;
    1035             : 
    1036      534848 :     xlrec = (xl_heap_delete *) XLogRecGetData(r);
    1037             : 
    1038             :     /* only interested in our database */
    1039      534848 :     XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
    1040      534848 :     if (target_locator.dbOid != ctx->slot->data.database)
    1041          72 :         return;
    1042             : 
    1043             :     /* output plugin doesn't look for this origin, no need to queue */
    1044      534788 :     if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
    1045          12 :         return;
    1046             : 
    1047      534776 :     change = ReorderBufferAllocChange(ctx->reorder);
    1048             : 
    1049      534776 :     if (xlrec->flags & XLH_DELETE_IS_SUPER)
    1050           0 :         change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT;
    1051             :     else
    1052      534776 :         change->action = REORDER_BUFFER_CHANGE_DELETE;
    1053             : 
    1054      534776 :     change->origin_id = XLogRecGetOrigin(r);
    1055             : 
    1056      534776 :     memcpy(&change->data.tp.rlocator, &target_locator, sizeof(RelFileLocator));
    1057             : 
    1058             :     /* old primary key stored */
    1059      534776 :     if (xlrec->flags & XLH_DELETE_CONTAINS_OLD)
    1060             :     {
    1061      411396 :         Size        datalen = XLogRecGetDataLen(r) - SizeOfHeapDelete;
    1062      411396 :         Size        tuplelen = datalen - SizeOfHeapHeader;
    1063             : 
    1064             :         Assert(XLogRecGetDataLen(r) > (SizeOfHeapDelete + SizeOfHeapHeader));
    1065             : 
    1066      411396 :         change->data.tp.oldtuple =
    1067      411396 :             ReorderBufferAllocTupleBuf(ctx->reorder, tuplelen);
    1068             : 
    1069      411396 :         DecodeXLogTuple((char *) xlrec + SizeOfHeapDelete,
    1070             :                         datalen, change->data.tp.oldtuple);
    1071             :     }
    1072             : 
    1073      534776 :     change->data.tp.clear_toast_afterwards = true;
    1074             : 
    1075      534776 :     ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
    1076             :                              change, false);
    1077             : }
    1078             : 
    1079             : /*
    1080             :  * Parse XLOG_HEAP_TRUNCATE from wal
    1081             :  */
    1082             : static void
    1083          86 : DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
    1084             : {
    1085          86 :     XLogReaderState *r = buf->record;
    1086             :     xl_heap_truncate *xlrec;
    1087             :     ReorderBufferChange *change;
    1088             : 
    1089          86 :     xlrec = (xl_heap_truncate *) XLogRecGetData(r);
    1090             : 
    1091             :     /* only interested in our database */
    1092          86 :     if (xlrec->dbId != ctx->slot->data.database)
    1093           0 :         return;
    1094             : 
    1095             :     /* output plugin doesn't look for this origin, no need to queue */
    1096          86 :     if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
    1097           0 :         return;
    1098             : 
    1099          86 :     change = ReorderBufferAllocChange(ctx->reorder);
    1100          86 :     change->action = REORDER_BUFFER_CHANGE_TRUNCATE;
    1101          86 :     change->origin_id = XLogRecGetOrigin(r);
    1102          86 :     if (xlrec->flags & XLH_TRUNCATE_CASCADE)
    1103           2 :         change->data.truncate.cascade = true;
    1104          86 :     if (xlrec->flags & XLH_TRUNCATE_RESTART_SEQS)
    1105           4 :         change->data.truncate.restart_seqs = true;
    1106          86 :     change->data.truncate.nrelids = xlrec->nrelids;
    1107         172 :     change->data.truncate.relids = ReorderBufferAllocRelids(ctx->reorder,
    1108          86 :                                                             xlrec->nrelids);
    1109          86 :     memcpy(change->data.truncate.relids, xlrec->relids,
    1110          86 :            xlrec->nrelids * sizeof(Oid));
    1111          86 :     ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r),
    1112             :                              buf->origptr, change, false);
    1113             : }
    1114             : 
    1115             : /*
    1116             :  * Decode XLOG_HEAP2_MULTI_INSERT record into multiple tuplebufs.
    1117             :  *
    1118             :  * Currently MULTI_INSERT will always contain the full tuples.
    1119             :  */
    1120             : static void
    1121       11630 : DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
    1122             : {
    1123       11630 :     XLogReaderState *r = buf->record;
    1124             :     xl_heap_multi_insert *xlrec;
    1125             :     int         i;
    1126             :     char       *data;
    1127             :     char       *tupledata;
    1128             :     Size        tuplelen;
    1129             :     RelFileLocator rlocator;
    1130             : 
    1131       11630 :     xlrec = (xl_heap_multi_insert *) XLogRecGetData(r);
    1132             : 
    1133             :     /*
    1134             :      * Ignore insert records without new tuples.  This happens when a
    1135             :      * multi_insert is done on a catalog or on a non-persistent relation.
    1136             :      */
    1137       11630 :     if (!(xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE))
    1138       11598 :         return;
    1139             : 
    1140             :     /* only interested in our database */
    1141         108 :     XLogRecGetBlockTag(r, 0, &rlocator, NULL, NULL);
    1142         108 :     if (rlocator.dbOid != ctx->slot->data.database)
    1143          76 :         return;
    1144             : 
    1145             :     /* output plugin doesn't look for this origin, no need to queue */
    1146          32 :     if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
    1147           0 :         return;
    1148             : 
    1149             :     /*
    1150             :      * We know that this multi_insert isn't for a catalog, so the block should
    1151             :      * always have data even if a full-page write of it is taken.
    1152             :      */
    1153          32 :     tupledata = XLogRecGetBlockData(r, 0, &tuplelen);
    1154             :     Assert(tupledata != NULL);
    1155             : 
    1156          32 :     data = tupledata;
    1157        2134 :     for (i = 0; i < xlrec->ntuples; i++)
    1158             :     {
    1159             :         ReorderBufferChange *change;
    1160             :         xl_multi_insert_tuple *xlhdr;
    1161             :         int         datalen;
    1162             :         HeapTuple   tuple;
    1163             :         HeapTupleHeader header;
    1164             : 
    1165        2102 :         change = ReorderBufferAllocChange(ctx->reorder);
    1166        2102 :         change->action = REORDER_BUFFER_CHANGE_INSERT;
    1167        2102 :         change->origin_id = XLogRecGetOrigin(r);
    1168             : 
    1169        2102 :         memcpy(&change->data.tp.rlocator, &rlocator, sizeof(RelFileLocator));
    1170             : 
    1171        2102 :         xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(data);
    1172        2102 :         data = ((char *) xlhdr) + SizeOfMultiInsertTuple;
    1173        2102 :         datalen = xlhdr->datalen;
    1174             : 
    1175        2102 :         change->data.tp.newtuple =
    1176        2102 :             ReorderBufferAllocTupleBuf(ctx->reorder, datalen);
    1177             : 
    1178        2102 :         tuple = change->data.tp.newtuple;
    1179        2102 :         header = tuple->t_data;
    1180             : 
    1181             :         /* not a disk based tuple */
    1182        2102 :         ItemPointerSetInvalid(&tuple->t_self);
    1183             : 
    1184             :         /*
    1185             :          * We can only figure this out after reassembling the transactions.
    1186             :          */
    1187        2102 :         tuple->t_tableOid = InvalidOid;
    1188             : 
    1189        2102 :         tuple->t_len = datalen + SizeofHeapTupleHeader;
    1190             : 
    1191        2102 :         memset(header, 0, SizeofHeapTupleHeader);
    1192             : 
    1193        2102 :         memcpy((char *) tuple->t_data + SizeofHeapTupleHeader, data, datalen);
    1194        2102 :         header->t_infomask = xlhdr->t_infomask;
    1195        2102 :         header->t_infomask2 = xlhdr->t_infomask2;
    1196        2102 :         header->t_hoff = xlhdr->t_hoff;
    1197             : 
    1198             :         /*
    1199             :          * Reset toast reassembly state only after the last row in the last
    1200             :          * xl_multi_insert_tuple record emitted by one heap_multi_insert()
    1201             :          * call.
    1202             :          */
    1203        2102 :         if (xlrec->flags & XLH_INSERT_LAST_IN_MULTI &&
    1204         382 :             (i + 1) == xlrec->ntuples)
    1205          22 :             change->data.tp.clear_toast_afterwards = true;
    1206             :         else
    1207        2080 :             change->data.tp.clear_toast_afterwards = false;
    1208             : 
    1209        2102 :         ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r),
    1210             :                                  buf->origptr, change, false);
    1211             : 
    1212             :         /* move to the next xl_multi_insert_tuple entry */
    1213        2102 :         data += datalen;
    1214             :     }
    1215             :     Assert(data == tupledata + tuplelen);
    1216             : }
    1217             : 
    1218             : /*
    1219             :  * Parse XLOG_HEAP_CONFIRM from wal into a confirmation change.
    1220             :  *
    1221             :  * This is pretty trivial, all the state essentially already setup by the
    1222             :  * speculative insertion.
    1223             :  */
    1224             : static void
    1225       35832 : DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
    1226             : {
    1227       35832 :     XLogReaderState *r = buf->record;
    1228             :     ReorderBufferChange *change;
    1229             :     RelFileLocator target_locator;
    1230             : 
    1231             :     /* only interested in our database */
    1232       35832 :     XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
    1233       35832 :     if (target_locator.dbOid != ctx->slot->data.database)
    1234           0 :         return;
    1235             : 
    1236             :     /* output plugin doesn't look for this origin, no need to queue */
    1237       35832 :     if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
    1238           0 :         return;
    1239             : 
    1240       35832 :     change = ReorderBufferAllocChange(ctx->reorder);
    1241       35832 :     change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM;
    1242       35832 :     change->origin_id = XLogRecGetOrigin(r);
    1243             : 
    1244       35832 :     memcpy(&change->data.tp.rlocator, &target_locator, sizeof(RelFileLocator));
    1245             : 
    1246       35832 :     change->data.tp.clear_toast_afterwards = true;
    1247             : 
    1248       35832 :     ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
    1249             :                              change, false);
    1250             : }
    1251             : 
    1252             : 
    1253             : /*
    1254             :  * Read a HeapTuple as WAL logged by heap_insert, heap_update and heap_delete
    1255             :  * (but not by heap_multi_insert) into a tuplebuf.
    1256             :  *
    1257             :  * The size 'len' and the pointer 'data' in the record need to be
    1258             :  * computed outside as they are record specific.
    1259             :  */
    1260             : static void
    1261     3048188 : DecodeXLogTuple(char *data, Size len, HeapTuple tuple)
    1262             : {
    1263             :     xl_heap_header xlhdr;
    1264     3048188 :     int         datalen = len - SizeOfHeapHeader;
    1265             :     HeapTupleHeader header;
    1266             : 
    1267             :     Assert(datalen >= 0);
    1268             : 
    1269     3048188 :     tuple->t_len = datalen + SizeofHeapTupleHeader;
    1270     3048188 :     header = tuple->t_data;
    1271             : 
    1272             :     /* not a disk based tuple */
    1273     3048188 :     ItemPointerSetInvalid(&tuple->t_self);
    1274             : 
    1275             :     /* we can only figure this out after reassembling the transactions */
    1276     3048188 :     tuple->t_tableOid = InvalidOid;
    1277             : 
    1278             :     /* data is not stored aligned, copy to aligned storage */
    1279     3048188 :     memcpy(&xlhdr, data, SizeOfHeapHeader);
    1280             : 
    1281     3048188 :     memset(header, 0, SizeofHeapTupleHeader);
    1282             : 
    1283     3048188 :     memcpy(((char *) tuple->t_data) + SizeofHeapTupleHeader,
    1284     3048188 :            data + SizeOfHeapHeader,
    1285             :            datalen);
    1286             : 
    1287     3048188 :     header->t_infomask = xlhdr.t_infomask;
    1288     3048188 :     header->t_infomask2 = xlhdr.t_infomask2;
    1289     3048188 :     header->t_hoff = xlhdr.t_hoff;
    1290     3048188 : }
    1291             : 
    1292             : /*
    1293             :  * Check whether we are interested in this specific transaction.
    1294             :  *
    1295             :  * There can be several reasons we might not be interested in this
    1296             :  * transaction:
    1297             :  * 1) We might not be interested in decoding transactions up to this
    1298             :  *    LSN. This can happen because we previously decoded it and now just
    1299             :  *    are restarting or if we haven't assembled a consistent snapshot yet.
    1300             :  * 2) The transaction happened in another database.
    1301             :  * 3) The output plugin is not interested in the origin.
    1302             :  * 4) We are doing fast-forwarding
    1303             :  */
    1304             : static bool
    1305        6014 : DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
    1306             :                   Oid txn_dbid, RepOriginId origin_id)
    1307             : {
    1308        6014 :     if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
    1309        5688 :         (txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) ||
    1310        2864 :         FilterByOrigin(ctx, origin_id))
    1311        3206 :         return true;
    1312             : 
    1313             :     /*
    1314             :      * We also skip decoding in fast_forward mode. In passing set the
    1315             :      * processing_required flag to indicate that if it were not for
    1316             :      * fast_forward mode, processing would have been required.
    1317             :      */
    1318        2808 :     if (ctx->fast_forward)
    1319             :     {
    1320          46 :         ctx->processing_required = true;
    1321          46 :         return true;
    1322             :     }
    1323             : 
    1324        2762 :     return false;
    1325             : }

Generated by: LCOV version 1.14