LCOV - code coverage report
Current view: top level - src/backend/replication/logical - decode.c (source / functions) Hit Total Coverage
Test: PostgreSQL 19devel Lines: 415 438 94.7 %
Date: 2026-01-18 04:17:21 Functions: 20 20 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /* -------------------------------------------------------------------------
       2             :  *
       3             :  * decode.c
       4             :  *      This module decodes WAL records read using xlogreader.h's APIs for the
       5             :  *      purpose of logical decoding by passing information to the
       6             :  *      reorderbuffer module (containing the actual changes) and to the
       7             :  *      snapbuild module to build a fitting catalog snapshot (to be able to
       8             :  *      properly decode the changes in the reorderbuffer).
       9             :  *
      10             :  * NOTE:
      11             :  *      This basically tries to handle all low level xlog stuff for
      12             :  *      reorderbuffer.c and snapbuild.c. There's some minor leakage where a
      13             :  *      specific record's struct is used to pass data along, but those just
      14             :  *      happen to contain the right amount of data in a convenient
      15             :  *      format. There isn't and shouldn't be much intelligence about the
      16             :  *      contents of records in here except turning them into a more usable
      17             :  *      format.
      18             :  *
      19             :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
      20             :  * Portions Copyright (c) 1994, Regents of the University of California
      21             :  *
      22             :  * IDENTIFICATION
      23             :  *    src/backend/replication/logical/decode.c
      24             :  *
      25             :  * -------------------------------------------------------------------------
      26             :  */
      27             : #include "postgres.h"
      28             : 
      29             : #include "access/heapam_xlog.h"
      30             : #include "access/transam.h"
      31             : #include "access/xact.h"
      32             : #include "access/xlog_internal.h"
      33             : #include "access/xlogreader.h"
      34             : #include "access/xlogrecord.h"
      35             : #include "catalog/pg_control.h"
      36             : #include "replication/decode.h"
      37             : #include "replication/logical.h"
      38             : #include "replication/message.h"
      39             : #include "replication/reorderbuffer.h"
      40             : #include "replication/snapbuild.h"
      41             : #include "storage/standbydefs.h"
      42             : 
      43             : /* individual record(group)'s handlers */
      44             : static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      45             : static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      46             : static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      47             : static void DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      48             : static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      49             : static void DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      50             : 
      51             : static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
      52             :                          xl_xact_parsed_commit *parsed, TransactionId xid,
      53             :                          bool two_phase);
      54             : static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
      55             :                         xl_xact_parsed_abort *parsed, TransactionId xid,
      56             :                         bool two_phase);
      57             : static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
      58             :                           xl_xact_parsed_prepare *parsed);
      59             : 
      60             : 
      61             : /* common function to decode tuples */
      62             : static void DecodeXLogTuple(char *data, Size len, HeapTuple tuple);
      63             : 
      64             : /* helper functions for decoding transactions */
      65             : static inline bool FilterPrepare(LogicalDecodingContext *ctx,
      66             :                                  TransactionId xid, const char *gid);
      67             : static bool DecodeTXNNeedSkip(LogicalDecodingContext *ctx,
      68             :                               XLogRecordBuffer *buf, Oid txn_dbid,
      69             :                               RepOriginId origin_id);
      70             : 
      71             : /*
      72             :  * Take every XLogReadRecord()ed record and perform the actions required to
      73             :  * decode it using the output plugin already setup in the logical decoding
      74             :  * context.
      75             :  *
      76             :  * NB: Note that every record's xid needs to be processed by reorderbuffer
      77             :  * (xids contained in the content of records are not relevant for this rule).
      78             :  * That means that for records which'd otherwise not go through the
      79             :  * reorderbuffer ReorderBufferProcessXid() has to be called. We don't want to
      80             :  * call ReorderBufferProcessXid for each record type by default, because
      81             :  * e.g. empty xacts can be handled more efficiently if there's no previous
      82             :  * state for them.
      83             :  *
      84             :  * We also support the ability to fast forward thru records, skipping some
      85             :  * record types completely - see individual record types for details.
      86             :  */
      87             : void
      88     3972762 : LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
      89             : {
      90             :     XLogRecordBuffer buf;
      91             :     TransactionId txid;
      92             :     RmgrData    rmgr;
      93             : 
      94     3972762 :     buf.origptr = ctx->reader->ReadRecPtr;
      95     3972762 :     buf.endptr = ctx->reader->EndRecPtr;
      96     3972762 :     buf.record = record;
      97             : 
      98     3972762 :     txid = XLogRecGetTopXid(record);
      99             : 
     100             :     /*
     101             :      * If the top-level xid is valid, we need to assign the subxact to the
     102             :      * top-level xact. We need to do this for all records, hence we do it
     103             :      * before the switch.
     104             :      */
     105     3972762 :     if (TransactionIdIsValid(txid))
     106             :     {
     107        1294 :         ReorderBufferAssignChild(ctx->reorder,
     108             :                                  txid,
     109        1294 :                                  XLogRecGetXid(record),
     110             :                                  buf.origptr);
     111             :     }
     112             : 
     113     3972762 :     rmgr = GetRmgr(XLogRecGetRmid(record));
     114             : 
     115     3972762 :     if (rmgr.rm_decode != NULL)
     116     3046996 :         rmgr.rm_decode(ctx, &buf);
     117             :     else
     118             :     {
     119             :         /* just deal with xid, and done */
     120      925766 :         ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(record),
     121             :                                 buf.origptr);
     122             :     }
     123     3972714 : }
     124             : 
     125             : /*
     126             :  * Handle rmgr XLOG_ID records for LogicalDecodingProcessRecord().
     127             :  */
     128             : void
     129       12020 : xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     130             : {
     131       12020 :     SnapBuild  *builder = ctx->snapshot_builder;
     132       12020 :     uint8       info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK;
     133             : 
     134       12020 :     ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(buf->record),
     135             :                             buf->origptr);
     136             : 
     137       12020 :     switch (info)
     138             :     {
     139             :             /* this is also used in END_OF_RECOVERY checkpoints */
     140         114 :         case XLOG_CHECKPOINT_SHUTDOWN:
     141             :         case XLOG_END_OF_RECOVERY:
     142         114 :             SnapBuildSerializationPoint(builder, buf->origptr);
     143             : 
     144         114 :             break;
     145         126 :         case XLOG_CHECKPOINT_ONLINE:
     146             : 
     147             :             /*
     148             :              * a RUNNING_XACTS record will have been logged near to this, we
     149             :              * can restart from there.
     150             :              */
     151         126 :             break;
     152           0 :         case XLOG_LOGICAL_DECODING_STATUS_CHANGE:
     153             :             {
     154             :                 bool        logical_decoding;
     155             : 
     156           0 :                 memcpy(&logical_decoding, XLogRecGetData(buf->record), sizeof(bool));
     157             : 
     158             :                 /*
     159             :                  * Error out as we should not decode this WAL record.
     160             :                  *
     161             :                  * Logical decoding is disabled, and existing logical slots on
     162             :                  * the standby are invalidated when this WAL record is
     163             :                  * replayed. No logical decoder can process this WAL record
     164             :                  * until replay completes, and by then the slots are already
     165             :                  * invalidated. Furthermore, no new logical slots can be
     166             :                  * created while logical decoding is disabled. This cannot
     167             :                  * occur even on primary either, since it will not restart
     168             :                  * with wal_level < replica if any logical slots exist.
     169             :                  */
     170           0 :                 elog(ERROR, "unexpected logical decoding status change %d",
     171             :                      logical_decoding);
     172             : 
     173             :                 break;
     174             :             }
     175       11780 :         case XLOG_NOOP:
     176             :         case XLOG_NEXTOID:
     177             :         case XLOG_SWITCH:
     178             :         case XLOG_BACKUP_END:
     179             :         case XLOG_PARAMETER_CHANGE:
     180             :         case XLOG_RESTORE_POINT:
     181             :         case XLOG_FPW_CHANGE:
     182             :         case XLOG_FPI_FOR_HINT:
     183             :         case XLOG_FPI:
     184             :         case XLOG_OVERWRITE_CONTRECORD:
     185             :         case XLOG_CHECKPOINT_REDO:
     186       11780 :             break;
     187           0 :         default:
     188           0 :             elog(ERROR, "unexpected RM_XLOG_ID record type: %u", info);
     189             :     }
     190       12020 : }
     191             : 
     192             : /*
     193             :  * Handle rmgr XACT_ID records for LogicalDecodingProcessRecord().
     194             :  */
     195             : void
     196       17638 : xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     197             : {
     198       17638 :     SnapBuild  *builder = ctx->snapshot_builder;
     199       17638 :     ReorderBuffer *reorder = ctx->reorder;
     200       17638 :     XLogReaderState *r = buf->record;
     201       17638 :     uint8       info = XLogRecGetInfo(r) & XLOG_XACT_OPMASK;
     202             : 
     203             :     /*
     204             :      * If the snapshot isn't yet fully built, we cannot decode anything, so
     205             :      * bail out.
     206             :      */
     207       17638 :     if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
     208          32 :         return;
     209             : 
     210       17606 :     switch (info)
     211             :     {
     212        6252 :         case XLOG_XACT_COMMIT:
     213             :         case XLOG_XACT_COMMIT_PREPARED:
     214             :             {
     215             :                 xl_xact_commit *xlrec;
     216             :                 xl_xact_parsed_commit parsed;
     217             :                 TransactionId xid;
     218        6252 :                 bool        two_phase = false;
     219             : 
     220        6252 :                 xlrec = (xl_xact_commit *) XLogRecGetData(r);
     221        6252 :                 ParseCommitRecord(XLogRecGetInfo(buf->record), xlrec, &parsed);
     222             : 
     223        6252 :                 if (!TransactionIdIsValid(parsed.twophase_xid))
     224        6022 :                     xid = XLogRecGetXid(r);
     225             :                 else
     226         230 :                     xid = parsed.twophase_xid;
     227             : 
     228             :                 /*
     229             :                  * We would like to process the transaction in a two-phase
     230             :                  * manner iff output plugin supports two-phase commits and
     231             :                  * doesn't filter the transaction at prepare time.
     232             :                  */
     233        6252 :                 if (info == XLOG_XACT_COMMIT_PREPARED)
     234         230 :                     two_phase = !(FilterPrepare(ctx, xid,
     235         230 :                                                 parsed.twophase_gid));
     236             : 
     237        6252 :                 DecodeCommit(ctx, buf, &parsed, xid, two_phase);
     238        6208 :                 break;
     239             :             }
     240         344 :         case XLOG_XACT_ABORT:
     241             :         case XLOG_XACT_ABORT_PREPARED:
     242             :             {
     243             :                 xl_xact_abort *xlrec;
     244             :                 xl_xact_parsed_abort parsed;
     245             :                 TransactionId xid;
     246         344 :                 bool        two_phase = false;
     247             : 
     248         344 :                 xlrec = (xl_xact_abort *) XLogRecGetData(r);
     249         344 :                 ParseAbortRecord(XLogRecGetInfo(buf->record), xlrec, &parsed);
     250             : 
     251         344 :                 if (!TransactionIdIsValid(parsed.twophase_xid))
     252         260 :                     xid = XLogRecGetXid(r);
     253             :                 else
     254          84 :                     xid = parsed.twophase_xid;
     255             : 
     256             :                 /*
     257             :                  * We would like to process the transaction in a two-phase
     258             :                  * manner iff output plugin supports two-phase commits and
     259             :                  * doesn't filter the transaction at prepare time.
     260             :                  */
     261         344 :                 if (info == XLOG_XACT_ABORT_PREPARED)
     262          84 :                     two_phase = !(FilterPrepare(ctx, xid,
     263          84 :                                                 parsed.twophase_gid));
     264             : 
     265         344 :                 DecodeAbort(ctx, buf, &parsed, xid, two_phase);
     266         344 :                 break;
     267             :             }
     268         222 :         case XLOG_XACT_ASSIGNMENT:
     269             : 
     270             :             /*
     271             :              * We assign subxact to the toplevel xact while processing each
     272             :              * record if required.  So, we don't need to do anything here. See
     273             :              * LogicalDecodingProcessRecord.
     274             :              */
     275         222 :             break;
     276       10432 :         case XLOG_XACT_INVALIDATIONS:
     277             :             {
     278             :                 TransactionId xid;
     279             :                 xl_xact_invals *invals;
     280             : 
     281       10432 :                 xid = XLogRecGetXid(r);
     282       10432 :                 invals = (xl_xact_invals *) XLogRecGetData(r);
     283             : 
     284             :                 /*
     285             :                  * Execute the invalidations for xid-less transactions,
     286             :                  * otherwise, accumulate them so that they can be processed at
     287             :                  * the commit time.
     288             :                  */
     289       10432 :                 if (TransactionIdIsValid(xid))
     290             :                 {
     291       10410 :                     if (!ctx->fast_forward)
     292       10342 :                         ReorderBufferAddInvalidations(reorder, xid,
     293             :                                                       buf->origptr,
     294       10342 :                                                       invals->nmsgs,
     295       10342 :                                                       invals->msgs);
     296       10410 :                     ReorderBufferXidSetCatalogChanges(ctx->reorder, xid,
     297             :                                                       buf->origptr);
     298             :                 }
     299          22 :                 else if (!ctx->fast_forward)
     300          22 :                     ReorderBufferImmediateInvalidation(ctx->reorder,
     301          22 :                                                        invals->nmsgs,
     302          22 :                                                        invals->msgs);
     303             : 
     304       10432 :                 break;
     305             :             }
     306         356 :         case XLOG_XACT_PREPARE:
     307             :             {
     308             :                 xl_xact_parsed_prepare parsed;
     309             :                 xl_xact_prepare *xlrec;
     310             : 
     311             :                 /* ok, parse it */
     312         356 :                 xlrec = (xl_xact_prepare *) XLogRecGetData(r);
     313         356 :                 ParsePrepareRecord(XLogRecGetInfo(buf->record),
     314             :                                    xlrec, &parsed);
     315             : 
     316             :                 /*
     317             :                  * We would like to process the transaction in a two-phase
     318             :                  * manner iff output plugin supports two-phase commits and
     319             :                  * doesn't filter the transaction at prepare time.
     320             :                  */
     321         356 :                 if (FilterPrepare(ctx, parsed.twophase_xid,
     322             :                                   parsed.twophase_gid))
     323             :                 {
     324          40 :                     ReorderBufferProcessXid(reorder, parsed.twophase_xid,
     325             :                                             buf->origptr);
     326          40 :                     break;
     327             :                 }
     328             : 
     329             :                 /*
     330             :                  * Note that if the prepared transaction has locked [user]
     331             :                  * catalog tables exclusively then decoding prepare can block
     332             :                  * till the main transaction is committed because it needs to
     333             :                  * lock the catalog tables.
     334             :                  *
     335             :                  * XXX Now, this can even lead to a deadlock if the prepare
     336             :                  * transaction is waiting to get it logically replicated for
     337             :                  * distributed 2PC. This can be avoided by disallowing
     338             :                  * preparing transactions that have locked [user] catalog
     339             :                  * tables exclusively but as of now, we ask users not to do
     340             :                  * such an operation.
     341             :                  */
     342         316 :                 DecodePrepare(ctx, buf, &parsed);
     343         316 :                 break;
     344             :             }
     345           0 :         default:
     346           0 :             elog(ERROR, "unexpected RM_XACT_ID record type: %u", info);
     347             :     }
     348             : }
     349             : 
     350             : /*
     351             :  * Handle rmgr STANDBY_ID records for LogicalDecodingProcessRecord().
     352             :  */
     353             : void
     354        7606 : standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     355             : {
     356        7606 :     SnapBuild  *builder = ctx->snapshot_builder;
     357        7606 :     XLogReaderState *r = buf->record;
     358        7606 :     uint8       info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
     359             : 
     360        7606 :     ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
     361             : 
     362        7606 :     switch (info)
     363             :     {
     364        2804 :         case XLOG_RUNNING_XACTS:
     365             :             {
     366        2804 :                 xl_running_xacts *running = (xl_running_xacts *) XLogRecGetData(r);
     367             : 
     368        2804 :                 SnapBuildProcessRunningXacts(builder, buf->origptr, running);
     369             : 
     370             :                 /*
     371             :                  * Abort all transactions that we keep track of, that are
     372             :                  * older than the record's oldestRunningXid. This is the most
     373             :                  * convenient spot for doing so since, in contrast to shutdown
     374             :                  * or end-of-recovery checkpoints, we have information about
     375             :                  * all running transactions which includes prepared ones,
     376             :                  * while shutdown checkpoints just know that no non-prepared
     377             :                  * transactions are in progress.
     378             :                  */
     379        2800 :                 ReorderBufferAbortOld(ctx->reorder, running->oldestRunningXid);
     380             :             }
     381        2800 :             break;
     382        4780 :         case XLOG_STANDBY_LOCK:
     383        4780 :             break;
     384          22 :         case XLOG_INVALIDATIONS:
     385             : 
     386             :             /*
     387             :              * We are processing the invalidations at the command level via
     388             :              * XLOG_XACT_INVALIDATIONS.  So we don't need to do anything here.
     389             :              */
     390          22 :             break;
     391           0 :         default:
     392           0 :             elog(ERROR, "unexpected RM_STANDBY_ID record type: %u", info);
     393             :     }
     394        7602 : }
     395             : 
     396             : /*
     397             :  * Handle rmgr HEAP2_ID records for LogicalDecodingProcessRecord().
     398             :  */
     399             : void
     400       64246 : heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     401             : {
     402       64246 :     uint8       info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK;
     403       64246 :     TransactionId xid = XLogRecGetXid(buf->record);
     404       64246 :     SnapBuild  *builder = ctx->snapshot_builder;
     405             : 
     406       64246 :     ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
     407             : 
     408             :     /*
     409             :      * If we don't have snapshot or we are just fast-forwarding, there is no
     410             :      * point in decoding data changes. However, it's crucial to build the base
     411             :      * snapshot during fast-forward mode (as is done in
     412             :      * SnapBuildProcessChange()) because we require the snapshot's xmin when
     413             :      * determining the candidate catalog_xmin for the replication slot. See
     414             :      * SnapBuildProcessRunningXacts().
     415             :      */
     416       64246 :     if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
     417          20 :         return;
     418             : 
     419       64226 :     switch (info)
     420             :     {
     421       11894 :         case XLOG_HEAP2_MULTI_INSERT:
     422       11894 :             if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
     423       11894 :                 !ctx->fast_forward)
     424       11758 :                 DecodeMultiInsert(ctx, buf);
     425       11894 :             break;
     426       49216 :         case XLOG_HEAP2_NEW_CID:
     427       49216 :             if (!ctx->fast_forward)
     428             :             {
     429             :                 xl_heap_new_cid *xlrec;
     430             : 
     431       48828 :                 xlrec = (xl_heap_new_cid *) XLogRecGetData(buf->record);
     432       48828 :                 SnapBuildProcessNewCid(builder, xid, buf->origptr, xlrec);
     433             : 
     434       48828 :                 break;
     435             :             }
     436             :         case XLOG_HEAP2_REWRITE:
     437             : 
     438             :             /*
     439             :              * Although these records only exist to serve the needs of logical
     440             :              * decoding, all the work happens as part of crash or archive
     441             :              * recovery, so we don't need to do anything here.
     442             :              */
     443         568 :             break;
     444             : 
     445             :             /*
     446             :              * Everything else here is just low level physical stuff we're not
     447             :              * interested in.
     448             :              */
     449        2936 :         case XLOG_HEAP2_PRUNE_ON_ACCESS:
     450             :         case XLOG_HEAP2_PRUNE_VACUUM_SCAN:
     451             :         case XLOG_HEAP2_PRUNE_VACUUM_CLEANUP:
     452             :         case XLOG_HEAP2_VISIBLE:
     453             :         case XLOG_HEAP2_LOCK_UPDATED:
     454        2936 :             break;
     455           0 :         default:
     456           0 :             elog(ERROR, "unexpected RM_HEAP2_ID record type: %u", info);
     457             :     }
     458             : }
     459             : 
     460             : /*
     461             :  * Handle rmgr HEAP_ID records for LogicalDecodingProcessRecord().
     462             :  */
     463             : void
     464     2945372 : heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     465             : {
     466     2945372 :     uint8       info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK;
     467     2945372 :     TransactionId xid = XLogRecGetXid(buf->record);
     468     2945372 :     SnapBuild  *builder = ctx->snapshot_builder;
     469             : 
     470     2945372 :     ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
     471             : 
     472             :     /*
     473             :      * If we don't have snapshot or we are just fast-forwarding, there is no
     474             :      * point in decoding data changes. However, it's crucial to build the base
     475             :      * snapshot during fast-forward mode (as is done in
     476             :      * SnapBuildProcessChange()) because we require the snapshot's xmin when
     477             :      * determining the candidate catalog_xmin for the replication slot. See
     478             :      * SnapBuildProcessRunningXacts().
     479             :      */
     480     2945372 :     if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
     481          10 :         return;
     482             : 
     483     2945362 :     switch (info)
     484             :     {
     485     1863680 :         case XLOG_HEAP_INSERT:
     486     1863680 :             if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
     487     1863680 :                 !ctx->fast_forward)
     488     1863446 :                 DecodeInsert(ctx, buf);
     489     1863680 :             break;
     490             : 
     491             :             /*
     492             :              * Treat HOT update as normal updates. There is no useful
     493             :              * information in the fact that we could make it a HOT update
     494             :              * locally and the WAL layout is compatible.
     495             :              */
     496      308044 :         case XLOG_HEAP_HOT_UPDATE:
     497             :         case XLOG_HEAP_UPDATE:
     498      308044 :             if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
     499      308044 :                 !ctx->fast_forward)
     500      308030 :                 DecodeUpdate(ctx, buf);
     501      308044 :             break;
     502             : 
     503      415294 :         case XLOG_HEAP_DELETE:
     504      415294 :             if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
     505      415294 :                 !ctx->fast_forward)
     506      415288 :                 DecodeDelete(ctx, buf);
     507      415294 :             break;
     508             : 
     509         122 :         case XLOG_HEAP_TRUNCATE:
     510         122 :             if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
     511         122 :                 !ctx->fast_forward)
     512         120 :                 DecodeTruncate(ctx, buf);
     513         122 :             break;
     514             : 
     515        2046 :         case XLOG_HEAP_INPLACE:
     516             : 
     517             :             /*
     518             :              * Inplace updates are only ever performed on catalog tuples and
     519             :              * can, per definition, not change tuple visibility.  Since we
     520             :              * also don't decode catalog tuples, we're not interested in the
     521             :              * record's contents.
     522             :              */
     523        2046 :             break;
     524             : 
     525       35832 :         case XLOG_HEAP_CONFIRM:
     526       35832 :             if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
     527       35832 :                 !ctx->fast_forward)
     528       35832 :                 DecodeSpecConfirm(ctx, buf);
     529       35832 :             break;
     530             : 
     531      320344 :         case XLOG_HEAP_LOCK:
     532             :             /* we don't care about row level locks for now */
     533      320344 :             break;
     534             : 
     535           0 :         default:
     536           0 :             elog(ERROR, "unexpected RM_HEAP_ID record type: %u", info);
     537             :             break;
     538             :     }
     539             : }
     540             : 
     541             : /*
     542             :  * Ask output plugin whether we want to skip this PREPARE and send
     543             :  * this transaction as a regular commit later.
     544             :  */
     545             : static inline bool
     546         670 : FilterPrepare(LogicalDecodingContext *ctx, TransactionId xid,
     547             :               const char *gid)
     548             : {
     549             :     /*
     550             :      * Skip if decoding of two-phase transactions at PREPARE time is not
     551             :      * enabled. In that case, all two-phase transactions are considered
     552             :      * filtered out and will be applied as regular transactions at COMMIT
     553             :      * PREPARED.
     554             :      */
     555         670 :     if (!ctx->twophase)
     556          46 :         return true;
     557             : 
     558             :     /*
     559             :      * The filter_prepare callback is optional. When not supplied, all
     560             :      * prepared transactions should go through.
     561             :      */
     562         624 :     if (ctx->callbacks.filter_prepare_cb == NULL)
     563         328 :         return false;
     564             : 
     565         296 :     return filter_prepare_cb_wrapper(ctx, xid, gid);
     566             : }
     567             : 
     568             : static inline bool
     569     2618184 : FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
     570             : {
     571     2618184 :     if (ctx->callbacks.filter_by_origin_cb == NULL)
     572          48 :         return false;
     573             : 
     574     2618136 :     return filter_by_origin_cb_wrapper(ctx, origin_id);
     575             : }
     576             : 
     577             : /*
     578             :  * Handle rmgr LOGICALMSG_ID records for LogicalDecodingProcessRecord().
     579             :  */
     580             : void
     581         114 : logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     582             : {
     583         114 :     SnapBuild  *builder = ctx->snapshot_builder;
     584         114 :     XLogReaderState *r = buf->record;
     585         114 :     TransactionId xid = XLogRecGetXid(r);
     586         114 :     uint8       info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
     587         114 :     RepOriginId origin_id = XLogRecGetOrigin(r);
     588         114 :     Snapshot    snapshot = NULL;
     589             :     xl_logical_message *message;
     590             : 
     591         114 :     if (info != XLOG_LOGICAL_MESSAGE)
     592           0 :         elog(ERROR, "unexpected RM_LOGICALMSG_ID record type: %u", info);
     593             : 
     594         114 :     ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
     595             : 
     596             :     /* If we don't have snapshot, there is no point in decoding messages */
     597         114 :     if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
     598           0 :         return;
     599             : 
     600         114 :     message = (xl_logical_message *) XLogRecGetData(r);
     601             : 
     602         224 :     if (message->dbId != ctx->slot->data.database ||
     603         110 :         FilterByOrigin(ctx, origin_id))
     604           8 :         return;
     605             : 
     606         106 :     if (message->transactional &&
     607          78 :         !SnapBuildProcessChange(builder, xid, buf->origptr))
     608           0 :         return;
     609         134 :     else if (!message->transactional &&
     610          56 :              (SnapBuildCurrentState(builder) != SNAPBUILD_CONSISTENT ||
     611          28 :               SnapBuildXactNeedsSkip(builder, buf->origptr)))
     612           8 :         return;
     613             : 
     614             :     /*
     615             :      * We also skip decoding in fast_forward mode. This check must be last
     616             :      * because we don't want to set the processing_required flag unless we
     617             :      * have a decodable message.
     618             :      */
     619          98 :     if (ctx->fast_forward)
     620             :     {
     621             :         /*
     622             :          * We need to set processing_required flag to notify the message's
     623             :          * existence to the caller. Usually, the flag is set when either the
     624             :          * COMMIT or ABORT records are decoded, but this must be turned on
     625             :          * here because the non-transactional logical message is decoded
     626             :          * without waiting for these records.
     627             :          */
     628           4 :         if (!message->transactional)
     629           4 :             ctx->processing_required = true;
     630             : 
     631           4 :         return;
     632             :     }
     633             : 
     634             :     /*
     635             :      * If this is a non-transactional change, get the snapshot we're expected
     636             :      * to use. We only get here when the snapshot is consistent, and the
     637             :      * change is not meant to be skipped.
     638             :      *
     639             :      * For transactional changes we don't need a snapshot, we'll use the
     640             :      * regular snapshot maintained by ReorderBuffer. We just leave it NULL.
     641             :      */
     642          94 :     if (!message->transactional)
     643          16 :         snapshot = SnapBuildGetOrBuildSnapshot(builder);
     644             : 
     645          94 :     ReorderBufferQueueMessage(ctx->reorder, xid, snapshot, buf->endptr,
     646          94 :                               message->transactional,
     647          94 :                               message->message, /* first part of message is
     648             :                                                  * prefix */
     649             :                               message->message_size,
     650          94 :                               message->message + message->prefix_size);
     651             : }
     652             : 
     653             : /*
     654             :  * Consolidated commit record handling between the different form of commit
     655             :  * records.
     656             :  *
     657             :  * 'two_phase' indicates that caller wants to process the transaction in two
     658             :  * phases, first process prepare if not already done and then process
     659             :  * commit_prepared.
     660             :  */
     661             : static void
     662        6252 : DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
     663             :              xl_xact_parsed_commit *parsed, TransactionId xid,
     664             :              bool two_phase)
     665             : {
     666        6252 :     XLogRecPtr  origin_lsn = InvalidXLogRecPtr;
     667        6252 :     TimestampTz commit_time = parsed->xact_time;
     668        6252 :     RepOriginId origin_id = XLogRecGetOrigin(buf->record);
     669             :     int         i;
     670             : 
     671        6252 :     if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
     672             :     {
     673         220 :         origin_lsn = parsed->origin_lsn;
     674         220 :         commit_time = parsed->origin_timestamp;
     675             :     }
     676             : 
     677        6252 :     SnapBuildCommitTxn(ctx->snapshot_builder, buf->origptr, xid,
     678             :                        parsed->nsubxacts, parsed->subxacts,
     679             :                        parsed->xinfo);
     680             : 
     681             :     /* ----
     682             :      * Check whether we are interested in this specific transaction, and tell
     683             :      * the reorderbuffer to forget the content of the (sub-)transactions
     684             :      * if not.
     685             :      *
     686             :      * We can't just use ReorderBufferAbort() here, because we need to execute
     687             :      * the transaction's invalidations.  This currently won't be needed if
     688             :      * we're just skipping over the transaction because currently we only do
     689             :      * so during startup, to get to the first transaction the client needs. As
     690             :      * we have reset the catalog caches before starting to read WAL, and we
     691             :      * haven't yet touched any catalogs, there can't be anything to invalidate.
     692             :      * But if we're "forgetting" this commit because it happened in another
     693             :      * database, the invalidations might be important, because they could be
     694             :      * for shared catalogs and we might have loaded data into the relevant
     695             :      * syscaches.
     696             :      * ---
     697             :      */
     698        6252 :     if (DecodeTXNNeedSkip(ctx, buf, parsed->dbId, origin_id))
     699             :     {
     700        5258 :         for (i = 0; i < parsed->nsubxacts; i++)
     701             :         {
     702        1894 :             ReorderBufferForget(ctx->reorder, parsed->subxacts[i], buf->origptr);
     703             :         }
     704        3364 :         ReorderBufferForget(ctx->reorder, xid, buf->origptr);
     705             : 
     706        3364 :         return;
     707             :     }
     708             : 
     709             :     /* tell the reorderbuffer about the surviving subtransactions */
     710        3420 :     for (i = 0; i < parsed->nsubxacts; i++)
     711             :     {
     712         532 :         ReorderBufferCommitChild(ctx->reorder, xid, parsed->subxacts[i],
     713             :                                  buf->origptr, buf->endptr);
     714             :     }
     715             : 
     716             :     /*
     717             :      * Send the final commit record if the transaction data is already
     718             :      * decoded, otherwise, process the entire transaction.
     719             :      */
     720        2888 :     if (two_phase)
     721             :     {
     722          66 :         ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
     723          66 :                                     SnapBuildGetTwoPhaseAt(ctx->snapshot_builder),
     724             :                                     commit_time, origin_id, origin_lsn,
     725          66 :                                     parsed->twophase_gid, true);
     726             :     }
     727             :     else
     728             :     {
     729        2822 :         ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
     730             :                             commit_time, origin_id, origin_lsn);
     731             :     }
     732             : 
     733             :     /*
     734             :      * Update the decoding stats at transaction prepare/commit/abort.
     735             :      * Additionally we send the stats when we spill or stream the changes to
     736             :      * avoid losing them in case the decoding is interrupted. It is not clear
     737             :      * that sending more or less frequently than this would be better.
     738             :      */
     739        2844 :     UpdateDecodingStats(ctx);
     740             : }
     741             : 
     742             : /*
     743             :  * Decode PREPARE record. Similar logic as in DecodeCommit.
     744             :  *
     745             :  * Note that we don't skip prepare even if have detected concurrent abort
     746             :  * because it is quite possible that we had already sent some changes before we
     747             :  * detect abort in which case we need to abort those changes in the subscriber.
     748             :  * To abort such changes, we do send the prepare and then the rollback prepared
     749             :  * which is what happened on the publisher-side as well. Now, we can invent a
     750             :  * new abort API wherein in such cases we send abort and skip sending prepared
     751             :  * and rollback prepared but then it is not that straightforward because we
     752             :  * might have streamed this transaction by that time in which case it is
     753             :  * handled when the rollback is encountered. It is not impossible to optimize
     754             :  * the concurrent abort case but it can introduce design complexity w.r.t
     755             :  * handling different cases so leaving it for now as it doesn't seem worth it.
     756             :  */
     757             : static void
     758         316 : DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
     759             :               xl_xact_parsed_prepare *parsed)
     760             : {
     761         316 :     SnapBuild  *builder = ctx->snapshot_builder;
     762         316 :     XLogRecPtr  origin_lsn = parsed->origin_lsn;
     763         316 :     TimestampTz prepare_time = parsed->xact_time;
     764         316 :     RepOriginId origin_id = XLogRecGetOrigin(buf->record);
     765             :     int         i;
     766         316 :     TransactionId xid = parsed->twophase_xid;
     767             : 
     768         316 :     if (parsed->origin_timestamp != 0)
     769          16 :         prepare_time = parsed->origin_timestamp;
     770             : 
     771             :     /*
     772             :      * Remember the prepare info for a txn so that it can be used later in
     773             :      * commit prepared if required. See ReorderBufferFinishPrepared.
     774             :      */
     775         316 :     if (!ReorderBufferRememberPrepareInfo(ctx->reorder, xid, buf->origptr,
     776             :                                           buf->endptr, prepare_time, origin_id,
     777             :                                           origin_lsn))
     778           0 :         return;
     779             : 
     780             :     /* We can't start streaming unless a consistent state is reached. */
     781         316 :     if (SnapBuildCurrentState(builder) < SNAPBUILD_CONSISTENT)
     782             :     {
     783           6 :         ReorderBufferSkipPrepare(ctx->reorder, xid);
     784           6 :         return;
     785             :     }
     786             : 
     787             :     /*
     788             :      * Check whether we need to process this transaction. See
     789             :      * DecodeTXNNeedSkip for the reasons why we sometimes want to skip the
     790             :      * transaction.
     791             :      *
     792             :      * We can't call ReorderBufferForget as we did in DecodeCommit as the txn
     793             :      * hasn't yet been committed, removing this txn before a commit might
     794             :      * result in the computation of an incorrect restart_lsn. See
     795             :      * SnapBuildProcessRunningXacts. But we need to process cache
     796             :      * invalidations if there are any for the reasons mentioned in
     797             :      * DecodeCommit.
     798             :      */
     799         310 :     if (DecodeTXNNeedSkip(ctx, buf, parsed->dbId, origin_id))
     800             :     {
     801         230 :         ReorderBufferSkipPrepare(ctx->reorder, xid);
     802         230 :         ReorderBufferInvalidate(ctx->reorder, xid, buf->origptr);
     803         230 :         return;
     804             :     }
     805             : 
     806             :     /* Tell the reorderbuffer about the surviving subtransactions. */
     807          82 :     for (i = 0; i < parsed->nsubxacts; i++)
     808             :     {
     809           2 :         ReorderBufferCommitChild(ctx->reorder, xid, parsed->subxacts[i],
     810             :                                  buf->origptr, buf->endptr);
     811             :     }
     812             : 
     813             :     /* replay actions of all transaction + subtransactions in order */
     814          80 :     ReorderBufferPrepare(ctx->reorder, xid, parsed->twophase_gid);
     815             : 
     816             :     /*
     817             :      * Update the decoding stats at transaction prepare/commit/abort.
     818             :      * Additionally we send the stats when we spill or stream the changes to
     819             :      * avoid losing them in case the decoding is interrupted. It is not clear
     820             :      * that sending more or less frequently than this would be better.
     821             :      */
     822          80 :     UpdateDecodingStats(ctx);
     823             : }
     824             : 
     825             : 
     826             : /*
     827             :  * Get the data from the various forms of abort records and pass it on to
     828             :  * snapbuild.c and reorderbuffer.c.
     829             :  *
     830             :  * 'two_phase' indicates to finish prepared transaction.
     831             :  */
     832             : static void
     833         344 : DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
     834             :             xl_xact_parsed_abort *parsed, TransactionId xid,
     835             :             bool two_phase)
     836             : {
     837             :     int         i;
     838         344 :     XLogRecPtr  origin_lsn = InvalidXLogRecPtr;
     839         344 :     TimestampTz abort_time = parsed->xact_time;
     840         344 :     RepOriginId origin_id = XLogRecGetOrigin(buf->record);
     841             :     bool        skip_xact;
     842             : 
     843         344 :     if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
     844             :     {
     845           8 :         origin_lsn = parsed->origin_lsn;
     846           8 :         abort_time = parsed->origin_timestamp;
     847             :     }
     848             : 
     849             :     /*
     850             :      * Check whether we need to process this transaction. See
     851             :      * DecodeTXNNeedSkip for the reasons why we sometimes want to skip the
     852             :      * transaction.
     853             :      */
     854         344 :     skip_xact = DecodeTXNNeedSkip(ctx, buf, parsed->dbId, origin_id);
     855             : 
     856             :     /*
     857             :      * Send the final rollback record for a prepared transaction unless we
     858             :      * need to skip it. For non-two-phase xacts, simply forget the xact.
     859             :      */
     860         344 :     if (two_phase && !skip_xact)
     861             :     {
     862          18 :         ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
     863             :                                     InvalidXLogRecPtr,
     864             :                                     abort_time, origin_id, origin_lsn,
     865          18 :                                     parsed->twophase_gid, false);
     866             :     }
     867             :     else
     868             :     {
     869         338 :         for (i = 0; i < parsed->nsubxacts; i++)
     870             :         {
     871          12 :             ReorderBufferAbort(ctx->reorder, parsed->subxacts[i],
     872          12 :                                buf->record->EndRecPtr, abort_time);
     873             :         }
     874             : 
     875         326 :         ReorderBufferAbort(ctx->reorder, xid, buf->record->EndRecPtr,
     876             :                            abort_time);
     877             :     }
     878             : 
     879             :     /* update the decoding stats */
     880         344 :     UpdateDecodingStats(ctx);
     881         344 : }
     882             : 
     883             : /*
     884             :  * Parse XLOG_HEAP_INSERT (not MULTI_INSERT!) records into tuplebufs.
     885             :  *
     886             :  * Inserts can contain the new tuple.
     887             :  */
     888             : static void
     889     1863446 : DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     890             : {
     891             :     Size        datalen;
     892             :     char       *tupledata;
     893             :     Size        tuplelen;
     894     1863446 :     XLogReaderState *r = buf->record;
     895             :     xl_heap_insert *xlrec;
     896             :     ReorderBufferChange *change;
     897             :     RelFileLocator target_locator;
     898             : 
     899     1863446 :     xlrec = (xl_heap_insert *) XLogRecGetData(r);
     900             : 
     901             :     /*
     902             :      * Ignore insert records without new tuples (this does happen when
     903             :      * raw_heap_insert marks the TOAST record as HEAP_INSERT_NO_LOGICAL).
     904             :      */
     905     1863446 :     if (!(xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE))
     906        7816 :         return;
     907             : 
     908             :     /* only interested in our database */
     909     1855800 :     XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
     910     1855800 :     if (target_locator.dbOid != ctx->slot->data.database)
     911           0 :         return;
     912             : 
     913             :     /* output plugin doesn't look for this origin, no need to queue */
     914     1855800 :     if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
     915         170 :         return;
     916             : 
     917     1855630 :     change = ReorderBufferAllocChange(ctx->reorder);
     918     1855630 :     if (!(xlrec->flags & XLH_INSERT_IS_SPECULATIVE))
     919     1819798 :         change->action = REORDER_BUFFER_CHANGE_INSERT;
     920             :     else
     921       35832 :         change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT;
     922     1855630 :     change->origin_id = XLogRecGetOrigin(r);
     923             : 
     924     1855630 :     memcpy(&change->data.tp.rlocator, &target_locator, sizeof(RelFileLocator));
     925             : 
     926     1855630 :     tupledata = XLogRecGetBlockData(r, 0, &datalen);
     927     1855630 :     tuplelen = datalen - SizeOfHeapHeader;
     928             : 
     929     1855630 :     change->data.tp.newtuple =
     930     1855630 :         ReorderBufferAllocTupleBuf(ctx->reorder, tuplelen);
     931             : 
     932     1855630 :     DecodeXLogTuple(tupledata, datalen, change->data.tp.newtuple);
     933             : 
     934     1855630 :     change->data.tp.clear_toast_afterwards = true;
     935             : 
     936     1855630 :     ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
     937             :                              change,
     938     1855630 :                              xlrec->flags & XLH_INSERT_ON_TOAST_RELATION);
     939             : }
     940             : 
     941             : /*
     942             :  * Parse XLOG_HEAP_UPDATE and XLOG_HEAP_HOT_UPDATE, which have the same layout
     943             :  * in the record, from wal into proper tuplebufs.
     944             :  *
     945             :  * Updates can possibly contain a new tuple and the old primary key.
     946             :  */
     947             : static void
     948      308030 : DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     949             : {
     950      308030 :     XLogReaderState *r = buf->record;
     951             :     xl_heap_update *xlrec;
     952             :     ReorderBufferChange *change;
     953             :     char       *data;
     954             :     RelFileLocator target_locator;
     955             : 
     956      308030 :     xlrec = (xl_heap_update *) XLogRecGetData(r);
     957             : 
     958             :     /* only interested in our database */
     959      308030 :     XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
     960      308030 :     if (target_locator.dbOid != ctx->slot->data.database)
     961         260 :         return;
     962             : 
     963             :     /* output plugin doesn't look for this origin, no need to queue */
     964      307864 :     if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
     965          94 :         return;
     966             : 
     967      307770 :     change = ReorderBufferAllocChange(ctx->reorder);
     968      307770 :     change->action = REORDER_BUFFER_CHANGE_UPDATE;
     969      307770 :     change->origin_id = XLogRecGetOrigin(r);
     970      307770 :     memcpy(&change->data.tp.rlocator, &target_locator, sizeof(RelFileLocator));
     971             : 
     972      307770 :     if (xlrec->flags & XLH_UPDATE_CONTAINS_NEW_TUPLE)
     973             :     {
     974             :         Size        datalen;
     975             :         Size        tuplelen;
     976             : 
     977      304254 :         data = XLogRecGetBlockData(r, 0, &datalen);
     978             : 
     979      304254 :         tuplelen = datalen - SizeOfHeapHeader;
     980             : 
     981      304254 :         change->data.tp.newtuple =
     982      304254 :             ReorderBufferAllocTupleBuf(ctx->reorder, tuplelen);
     983             : 
     984      304254 :         DecodeXLogTuple(data, datalen, change->data.tp.newtuple);
     985             :     }
     986             : 
     987      307770 :     if (xlrec->flags & XLH_UPDATE_CONTAINS_OLD)
     988             :     {
     989             :         Size        datalen;
     990             :         Size        tuplelen;
     991             : 
     992             :         /* caution, remaining data in record is not aligned */
     993         582 :         data = XLogRecGetData(r) + SizeOfHeapUpdate;
     994         582 :         datalen = XLogRecGetDataLen(r) - SizeOfHeapUpdate;
     995         582 :         tuplelen = datalen - SizeOfHeapHeader;
     996             : 
     997         582 :         change->data.tp.oldtuple =
     998         582 :             ReorderBufferAllocTupleBuf(ctx->reorder, tuplelen);
     999             : 
    1000         582 :         DecodeXLogTuple(data, datalen, change->data.tp.oldtuple);
    1001             :     }
    1002             : 
    1003      307770 :     change->data.tp.clear_toast_afterwards = true;
    1004             : 
    1005      307770 :     ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
    1006             :                              change, false);
    1007             : }
    1008             : 
    1009             : /*
    1010             :  * Parse XLOG_HEAP_DELETE from wal into proper tuplebufs.
    1011             :  *
    1012             :  * Deletes can possibly contain the old primary key.
    1013             :  */
    1014             : static void
    1015      415288 : DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
    1016             : {
    1017      415288 :     XLogReaderState *r = buf->record;
    1018             :     xl_heap_delete *xlrec;
    1019             :     ReorderBufferChange *change;
    1020             :     RelFileLocator target_locator;
    1021             : 
    1022      415288 :     xlrec = (xl_heap_delete *) XLogRecGetData(r);
    1023             : 
    1024             :     /* only interested in our database */
    1025      415288 :     XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
    1026      415288 :     if (target_locator.dbOid != ctx->slot->data.database)
    1027         112 :         return;
    1028             : 
    1029             :     /* output plugin doesn't look for this origin, no need to queue */
    1030      415212 :     if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
    1031          36 :         return;
    1032             : 
    1033      415176 :     change = ReorderBufferAllocChange(ctx->reorder);
    1034             : 
    1035      415176 :     if (xlrec->flags & XLH_DELETE_IS_SUPER)
    1036           0 :         change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT;
    1037             :     else
    1038      415176 :         change->action = REORDER_BUFFER_CHANGE_DELETE;
    1039             : 
    1040      415176 :     change->origin_id = XLogRecGetOrigin(r);
    1041             : 
    1042      415176 :     memcpy(&change->data.tp.rlocator, &target_locator, sizeof(RelFileLocator));
    1043             : 
    1044             :     /* old primary key stored */
    1045      415176 :     if (xlrec->flags & XLH_DELETE_CONTAINS_OLD)
    1046             :     {
    1047      291590 :         Size        datalen = XLogRecGetDataLen(r) - SizeOfHeapDelete;
    1048      291590 :         Size        tuplelen = datalen - SizeOfHeapHeader;
    1049             : 
    1050             :         Assert(XLogRecGetDataLen(r) > (SizeOfHeapDelete + SizeOfHeapHeader));
    1051             : 
    1052      291590 :         change->data.tp.oldtuple =
    1053      291590 :             ReorderBufferAllocTupleBuf(ctx->reorder, tuplelen);
    1054             : 
    1055      291590 :         DecodeXLogTuple((char *) xlrec + SizeOfHeapDelete,
    1056             :                         datalen, change->data.tp.oldtuple);
    1057             :     }
    1058             : 
    1059      415176 :     change->data.tp.clear_toast_afterwards = true;
    1060             : 
    1061      415176 :     ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
    1062             :                              change, false);
    1063             : }
    1064             : 
    1065             : /*
    1066             :  * Parse XLOG_HEAP_TRUNCATE from wal
    1067             :  */
    1068             : static void
    1069         120 : DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
    1070             : {
    1071         120 :     XLogReaderState *r = buf->record;
    1072             :     xl_heap_truncate *xlrec;
    1073             :     ReorderBufferChange *change;
    1074             : 
    1075         120 :     xlrec = (xl_heap_truncate *) XLogRecGetData(r);
    1076             : 
    1077             :     /* only interested in our database */
    1078         120 :     if (xlrec->dbId != ctx->slot->data.database)
    1079           0 :         return;
    1080             : 
    1081             :     /* output plugin doesn't look for this origin, no need to queue */
    1082         120 :     if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
    1083           2 :         return;
    1084             : 
    1085         118 :     change = ReorderBufferAllocChange(ctx->reorder);
    1086         118 :     change->action = REORDER_BUFFER_CHANGE_TRUNCATE;
    1087         118 :     change->origin_id = XLogRecGetOrigin(r);
    1088         118 :     if (xlrec->flags & XLH_TRUNCATE_CASCADE)
    1089           2 :         change->data.truncate.cascade = true;
    1090         118 :     if (xlrec->flags & XLH_TRUNCATE_RESTART_SEQS)
    1091           4 :         change->data.truncate.restart_seqs = true;
    1092         118 :     change->data.truncate.nrelids = xlrec->nrelids;
    1093         236 :     change->data.truncate.relids = ReorderBufferAllocRelids(ctx->reorder,
    1094         118 :                                                             xlrec->nrelids);
    1095         118 :     memcpy(change->data.truncate.relids, xlrec->relids,
    1096         118 :            xlrec->nrelids * sizeof(Oid));
    1097         118 :     ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r),
    1098             :                              buf->origptr, change, false);
    1099             : }
    1100             : 
    1101             : /*
    1102             :  * Decode XLOG_HEAP2_MULTI_INSERT record into multiple tuplebufs.
    1103             :  *
    1104             :  * Currently MULTI_INSERT will always contain the full tuples.
    1105             :  */
    1106             : static void
    1107       11758 : DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
    1108             : {
    1109       11758 :     XLogReaderState *r = buf->record;
    1110             :     xl_heap_multi_insert *xlrec;
    1111             :     int         i;
    1112             :     char       *data;
    1113             :     char       *tupledata;
    1114             :     Size        tuplelen;
    1115             :     RelFileLocator rlocator;
    1116             : 
    1117       11758 :     xlrec = (xl_heap_multi_insert *) XLogRecGetData(r);
    1118             : 
    1119             :     /*
    1120             :      * Ignore insert records without new tuples.  This happens when a
    1121             :      * multi_insert is done on a catalog or on a non-persistent relation.
    1122             :      */
    1123       11758 :     if (!(xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE))
    1124       11726 :         return;
    1125             : 
    1126             :     /* only interested in our database */
    1127         108 :     XLogRecGetBlockTag(r, 0, &rlocator, NULL, NULL);
    1128         108 :     if (rlocator.dbOid != ctx->slot->data.database)
    1129          76 :         return;
    1130             : 
    1131             :     /* output plugin doesn't look for this origin, no need to queue */
    1132          32 :     if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
    1133           0 :         return;
    1134             : 
    1135             :     /*
    1136             :      * We know that this multi_insert isn't for a catalog, so the block should
    1137             :      * always have data even if a full-page write of it is taken.
    1138             :      */
    1139          32 :     tupledata = XLogRecGetBlockData(r, 0, &tuplelen);
    1140             :     Assert(tupledata != NULL);
    1141             : 
    1142          32 :     data = tupledata;
    1143        2150 :     for (i = 0; i < xlrec->ntuples; i++)
    1144             :     {
    1145             :         ReorderBufferChange *change;
    1146             :         xl_multi_insert_tuple *xlhdr;
    1147             :         int         datalen;
    1148             :         HeapTuple   tuple;
    1149             :         HeapTupleHeader header;
    1150             : 
    1151        2118 :         change = ReorderBufferAllocChange(ctx->reorder);
    1152        2118 :         change->action = REORDER_BUFFER_CHANGE_INSERT;
    1153        2118 :         change->origin_id = XLogRecGetOrigin(r);
    1154             : 
    1155        2118 :         memcpy(&change->data.tp.rlocator, &rlocator, sizeof(RelFileLocator));
    1156             : 
    1157        2118 :         xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(data);
    1158        2118 :         data = ((char *) xlhdr) + SizeOfMultiInsertTuple;
    1159        2118 :         datalen = xlhdr->datalen;
    1160             : 
    1161        2118 :         change->data.tp.newtuple =
    1162        2118 :             ReorderBufferAllocTupleBuf(ctx->reorder, datalen);
    1163             : 
    1164        2118 :         tuple = change->data.tp.newtuple;
    1165        2118 :         header = tuple->t_data;
    1166             : 
    1167             :         /* not a disk based tuple */
    1168        2118 :         ItemPointerSetInvalid(&tuple->t_self);
    1169             : 
    1170             :         /*
    1171             :          * We can only figure this out after reassembling the transactions.
    1172             :          */
    1173        2118 :         tuple->t_tableOid = InvalidOid;
    1174             : 
    1175        2118 :         tuple->t_len = datalen + SizeofHeapTupleHeader;
    1176             : 
    1177        2118 :         memset(header, 0, SizeofHeapTupleHeader);
    1178             : 
    1179        2118 :         memcpy((char *) tuple->t_data + SizeofHeapTupleHeader, data, datalen);
    1180        2118 :         header->t_infomask = xlhdr->t_infomask;
    1181        2118 :         header->t_infomask2 = xlhdr->t_infomask2;
    1182        2118 :         header->t_hoff = xlhdr->t_hoff;
    1183             : 
    1184             :         /*
    1185             :          * Reset toast reassembly state only after the last row in the last
    1186             :          * xl_multi_insert_tuple record emitted by one heap_multi_insert()
    1187             :          * call.
    1188             :          */
    1189        2118 :         if (xlrec->flags & XLH_INSERT_LAST_IN_MULTI &&
    1190         398 :             (i + 1) == xlrec->ntuples)
    1191          22 :             change->data.tp.clear_toast_afterwards = true;
    1192             :         else
    1193        2096 :             change->data.tp.clear_toast_afterwards = false;
    1194             : 
    1195        2118 :         ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r),
    1196             :                                  buf->origptr, change, false);
    1197             : 
    1198             :         /* move to the next xl_multi_insert_tuple entry */
    1199        2118 :         data += datalen;
    1200             :     }
    1201             :     Assert(data == tupledata + tuplelen);
    1202             : }
    1203             : 
    1204             : /*
    1205             :  * Parse XLOG_HEAP_CONFIRM from wal into a confirmation change.
    1206             :  *
    1207             :  * This is pretty trivial, all the state essentially already setup by the
    1208             :  * speculative insertion.
    1209             :  */
    1210             : static void
    1211       35832 : DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
    1212             : {
    1213       35832 :     XLogReaderState *r = buf->record;
    1214             :     ReorderBufferChange *change;
    1215             :     RelFileLocator target_locator;
    1216             : 
    1217             :     /* only interested in our database */
    1218       35832 :     XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
    1219       35832 :     if (target_locator.dbOid != ctx->slot->data.database)
    1220           0 :         return;
    1221             : 
    1222             :     /* output plugin doesn't look for this origin, no need to queue */
    1223       35832 :     if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
    1224           0 :         return;
    1225             : 
    1226       35832 :     change = ReorderBufferAllocChange(ctx->reorder);
    1227       35832 :     change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM;
    1228       35832 :     change->origin_id = XLogRecGetOrigin(r);
    1229             : 
    1230       35832 :     memcpy(&change->data.tp.rlocator, &target_locator, sizeof(RelFileLocator));
    1231             : 
    1232       35832 :     change->data.tp.clear_toast_afterwards = true;
    1233             : 
    1234       35832 :     ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
    1235             :                              change, false);
    1236             : }
    1237             : 
    1238             : 
    1239             : /*
    1240             :  * Read a HeapTuple as WAL logged by heap_insert, heap_update and heap_delete
    1241             :  * (but not by heap_multi_insert) into a tuplebuf.
    1242             :  *
    1243             :  * The size 'len' and the pointer 'data' in the record need to be
    1244             :  * computed outside as they are record specific.
    1245             :  */
    1246             : static void
    1247     2452056 : DecodeXLogTuple(char *data, Size len, HeapTuple tuple)
    1248             : {
    1249             :     xl_heap_header xlhdr;
    1250     2452056 :     int         datalen = len - SizeOfHeapHeader;
    1251             :     HeapTupleHeader header;
    1252             : 
    1253             :     Assert(datalen >= 0);
    1254             : 
    1255     2452056 :     tuple->t_len = datalen + SizeofHeapTupleHeader;
    1256     2452056 :     header = tuple->t_data;
    1257             : 
    1258             :     /* not a disk based tuple */
    1259     2452056 :     ItemPointerSetInvalid(&tuple->t_self);
    1260             : 
    1261             :     /* we can only figure this out after reassembling the transactions */
    1262     2452056 :     tuple->t_tableOid = InvalidOid;
    1263             : 
    1264             :     /* data is not stored aligned, copy to aligned storage */
    1265     2452056 :     memcpy(&xlhdr, data, SizeOfHeapHeader);
    1266             : 
    1267     2452056 :     memset(header, 0, SizeofHeapTupleHeader);
    1268             : 
    1269     2452056 :     memcpy(((char *) tuple->t_data) + SizeofHeapTupleHeader,
    1270     2452056 :            data + SizeOfHeapHeader,
    1271             :            datalen);
    1272             : 
    1273     2452056 :     header->t_infomask = xlhdr.t_infomask;
    1274     2452056 :     header->t_infomask2 = xlhdr.t_infomask2;
    1275     2452056 :     header->t_hoff = xlhdr.t_hoff;
    1276     2452056 : }
    1277             : 
    1278             : /*
    1279             :  * Check whether we are interested in this specific transaction.
    1280             :  *
    1281             :  * There can be several reasons we might not be interested in this
    1282             :  * transaction:
    1283             :  * 1) We might not be interested in decoding transactions up to this
    1284             :  *    LSN. This can happen because we previously decoded it and now just
    1285             :  *    are restarting or if we haven't assembled a consistent snapshot yet.
    1286             :  * 2) The transaction happened in another database.
    1287             :  * 3) The output plugin is not interested in the origin.
    1288             :  * 4) We are doing fast-forwarding
    1289             :  */
    1290             : static bool
    1291        6906 : DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
    1292             :                   Oid txn_dbid, RepOriginId origin_id)
    1293             : {
    1294        6906 :     if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
    1295        6362 :         (txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) ||
    1296        3214 :         FilterByOrigin(ctx, origin_id))
    1297        3768 :         return true;
    1298             : 
    1299             :     /*
    1300             :      * We also skip decoding in fast_forward mode. In passing set the
    1301             :      * processing_required flag to indicate that if it were not for
    1302             :      * fast_forward mode, processing would have been required.
    1303             :      */
    1304        3138 :     if (ctx->fast_forward)
    1305             :     {
    1306          44 :         ctx->processing_required = true;
    1307          44 :         return true;
    1308             :     }
    1309             : 
    1310        3094 :     return false;
    1311             : }

Generated by: LCOV version 1.16