LCOV - code coverage report
Current view: top level - src/backend/replication/logical - decode.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 415 437 95.0 %
Date: 2025-01-18 04:15:08 Functions: 20 20 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /* -------------------------------------------------------------------------
       2             :  *
       3             :  * decode.c
       4             :  *      This module decodes WAL records read using xlogreader.h's APIs for the
       5             :  *      purpose of logical decoding by passing information to the
       6             :  *      reorderbuffer module (containing the actual changes) and to the
       7             :  *      snapbuild module to build a fitting catalog snapshot (to be able to
       8             :  *      properly decode the changes in the reorderbuffer).
       9             :  *
      10             :  * NOTE:
      11             :  *      This basically tries to handle all low level xlog stuff for
      12             :  *      reorderbuffer.c and snapbuild.c. There's some minor leakage where a
      13             :  *      specific record's struct is used to pass data along, but those just
      14             :  *      happen to contain the right amount of data in a convenient
      15             :  *      format. There isn't and shouldn't be much intelligence about the
      16             :  *      contents of records in here except turning them into a more usable
      17             :  *      format.
      18             :  *
      19             :  * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
      20             :  * Portions Copyright (c) 1994, Regents of the University of California
      21             :  *
      22             :  * IDENTIFICATION
      23             :  *    src/backend/replication/logical/decode.c
      24             :  *
      25             :  * -------------------------------------------------------------------------
      26             :  */
      27             : #include "postgres.h"
      28             : 
      29             : #include "access/heapam_xlog.h"
      30             : #include "access/transam.h"
      31             : #include "access/xact.h"
      32             : #include "access/xlog_internal.h"
      33             : #include "access/xlogreader.h"
      34             : #include "access/xlogrecord.h"
      35             : #include "catalog/pg_control.h"
      36             : #include "replication/decode.h"
      37             : #include "replication/logical.h"
      38             : #include "replication/message.h"
      39             : #include "replication/reorderbuffer.h"
      40             : #include "replication/snapbuild.h"
      41             : #include "storage/standbydefs.h"
      42             : 
      43             : /* individual record(group)'s handlers */
      44             : static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      45             : static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      46             : static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      47             : static void DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      48             : static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      49             : static void DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      50             : 
      51             : static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
      52             :                          xl_xact_parsed_commit *parsed, TransactionId xid,
      53             :                          bool two_phase);
      54             : static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
      55             :                         xl_xact_parsed_abort *parsed, TransactionId xid,
      56             :                         bool two_phase);
      57             : static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
      58             :                           xl_xact_parsed_prepare *parsed);
      59             : 
      60             : 
      61             : /* common function to decode tuples */
      62             : static void DecodeXLogTuple(char *data, Size len, HeapTuple tuple);
      63             : 
      64             : /* helper functions for decoding transactions */
      65             : static inline bool FilterPrepare(LogicalDecodingContext *ctx,
      66             :                                  TransactionId xid, const char *gid);
      67             : static bool DecodeTXNNeedSkip(LogicalDecodingContext *ctx,
      68             :                               XLogRecordBuffer *buf, Oid txn_dbid,
      69             :                               RepOriginId origin_id);
      70             : 
      71             : /*
      72             :  * Take every XLogReadRecord()ed record and perform the actions required to
      73             :  * decode it using the output plugin already setup in the logical decoding
      74             :  * context.
      75             :  *
      76             :  * NB: Note that every record's xid needs to be processed by reorderbuffer
      77             :  * (xids contained in the content of records are not relevant for this rule).
      78             :  * That means that for records which'd otherwise not go through the
      79             :  * reorderbuffer ReorderBufferProcessXid() has to be called. We don't want to
      80             :  * call ReorderBufferProcessXid for each record type by default, because
      81             :  * e.g. empty xacts can be handled more efficiently if there's no previous
      82             :  * state for them.
      83             :  *
      84             :  * We also support the ability to fast forward thru records, skipping some
      85             :  * record types completely - see individual record types for details.
      86             :  */
      87             : void
      88     4964982 : LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
      89             : {
      90             :     XLogRecordBuffer buf;
      91             :     TransactionId txid;
      92             :     RmgrData    rmgr;
      93             : 
      94     4964982 :     buf.origptr = ctx->reader->ReadRecPtr;
      95     4964982 :     buf.endptr = ctx->reader->EndRecPtr;
      96     4964982 :     buf.record = record;
      97             : 
      98     4964982 :     txid = XLogRecGetTopXid(record);
      99             : 
     100             :     /*
     101             :      * If the top-level xid is valid, we need to assign the subxact to the
     102             :      * top-level xact. We need to do this for all records, hence we do it
     103             :      * before the switch.
     104             :      */
     105     4964982 :     if (TransactionIdIsValid(txid))
     106             :     {
     107        1368 :         ReorderBufferAssignChild(ctx->reorder,
     108             :                                  txid,
     109        1368 :                                  XLogRecGetXid(record),
     110             :                                  buf.origptr);
     111             :     }
     112             : 
     113     4964982 :     rmgr = GetRmgr(XLogRecGetRmid(record));
     114             : 
     115     4964982 :     if (rmgr.rm_decode != NULL)
     116     3860386 :         rmgr.rm_decode(ctx, &buf);
     117             :     else
     118             :     {
     119             :         /* just deal with xid, and done */
     120     1104596 :         ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(record),
     121             :                                 buf.origptr);
     122             :     }
     123     4964970 : }
     124             : 
     125             : /*
     126             :  * Handle rmgr XLOG_ID records for LogicalDecodingProcessRecord().
     127             :  */
     128             : void
     129       11664 : xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     130             : {
     131       11664 :     SnapBuild  *builder = ctx->snapshot_builder;
     132       11664 :     uint8       info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK;
     133             : 
     134       11664 :     ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(buf->record),
     135             :                             buf->origptr);
     136             : 
     137       11664 :     switch (info)
     138             :     {
     139             :             /* this is also used in END_OF_RECOVERY checkpoints */
     140          76 :         case XLOG_CHECKPOINT_SHUTDOWN:
     141             :         case XLOG_END_OF_RECOVERY:
     142          76 :             SnapBuildSerializationPoint(builder, buf->origptr);
     143             : 
     144          76 :             break;
     145         126 :         case XLOG_CHECKPOINT_ONLINE:
     146             : 
     147             :             /*
     148             :              * a RUNNING_XACTS record will have been logged near to this, we
     149             :              * can restart from there.
     150             :              */
     151         126 :             break;
     152           2 :         case XLOG_PARAMETER_CHANGE:
     153             :             {
     154           2 :                 xl_parameter_change *xlrec =
     155           2 :                     (xl_parameter_change *) XLogRecGetData(buf->record);
     156             : 
     157             :                 /*
     158             :                  * If wal_level on the primary is reduced to less than
     159             :                  * logical, we want to prevent existing logical slots from
     160             :                  * being used.  Existing logical slots on the standby get
     161             :                  * invalidated when this WAL record is replayed; and further,
     162             :                  * slot creation fails when wal_level is not sufficient; but
     163             :                  * all these operations are not synchronized, so a logical
     164             :                  * slot may creep in while the wal_level is being reduced.
     165             :                  * Hence this extra check.
     166             :                  */
     167           2 :                 if (xlrec->wal_level < WAL_LEVEL_LOGICAL)
     168             :                 {
     169             :                     /*
     170             :                      * This can occur only on a standby, as a primary would
     171             :                      * not allow to restart after changing wal_level < logical
     172             :                      * if there is pre-existing logical slot.
     173             :                      */
     174             :                     Assert(RecoveryInProgress());
     175           0 :                     ereport(ERROR,
     176             :                             (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     177             :                              errmsg("logical decoding on standby requires \"wal_level\" >= \"logical\" on the primary")));
     178             :                 }
     179           2 :                 break;
     180             :             }
     181       11460 :         case XLOG_NOOP:
     182             :         case XLOG_NEXTOID:
     183             :         case XLOG_SWITCH:
     184             :         case XLOG_BACKUP_END:
     185             :         case XLOG_RESTORE_POINT:
     186             :         case XLOG_FPW_CHANGE:
     187             :         case XLOG_FPI_FOR_HINT:
     188             :         case XLOG_FPI:
     189             :         case XLOG_OVERWRITE_CONTRECORD:
     190             :         case XLOG_CHECKPOINT_REDO:
     191       11460 :             break;
     192           0 :         default:
     193           0 :             elog(ERROR, "unexpected RM_XLOG_ID record type: %u", info);
     194             :     }
     195       11664 : }
     196             : 
     197             : /*
     198             :  * Handle rmgr XACT_ID records for LogicalDecodingProcessRecord().
     199             :  */
     200             : void
     201       16264 : xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     202             : {
     203       16264 :     SnapBuild  *builder = ctx->snapshot_builder;
     204       16264 :     ReorderBuffer *reorder = ctx->reorder;
     205       16264 :     XLogReaderState *r = buf->record;
     206       16264 :     uint8       info = XLogRecGetInfo(r) & XLOG_XACT_OPMASK;
     207             : 
     208             :     /*
     209             :      * If the snapshot isn't yet fully built, we cannot decode anything, so
     210             :      * bail out.
     211             :      */
     212       16264 :     if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
     213          28 :         return;
     214             : 
     215       16236 :     switch (info)
     216             :     {
     217        5544 :         case XLOG_XACT_COMMIT:
     218             :         case XLOG_XACT_COMMIT_PREPARED:
     219             :             {
     220             :                 xl_xact_commit *xlrec;
     221             :                 xl_xact_parsed_commit parsed;
     222             :                 TransactionId xid;
     223        5544 :                 bool        two_phase = false;
     224             : 
     225        5544 :                 xlrec = (xl_xact_commit *) XLogRecGetData(r);
     226        5544 :                 ParseCommitRecord(XLogRecGetInfo(buf->record), xlrec, &parsed);
     227             : 
     228        5544 :                 if (!TransactionIdIsValid(parsed.twophase_xid))
     229        5344 :                     xid = XLogRecGetXid(r);
     230             :                 else
     231         200 :                     xid = parsed.twophase_xid;
     232             : 
     233             :                 /*
     234             :                  * We would like to process the transaction in a two-phase
     235             :                  * manner iff output plugin supports two-phase commits and
     236             :                  * doesn't filter the transaction at prepare time.
     237             :                  */
     238        5544 :                 if (info == XLOG_XACT_COMMIT_PREPARED)
     239         200 :                     two_phase = !(FilterPrepare(ctx, xid,
     240         200 :                                                 parsed.twophase_gid));
     241             : 
     242        5544 :                 DecodeCommit(ctx, buf, &parsed, xid, two_phase);
     243        5536 :                 break;
     244             :             }
     245         228 :         case XLOG_XACT_ABORT:
     246             :         case XLOG_XACT_ABORT_PREPARED:
     247             :             {
     248             :                 xl_xact_abort *xlrec;
     249             :                 xl_xact_parsed_abort parsed;
     250             :                 TransactionId xid;
     251         228 :                 bool        two_phase = false;
     252             : 
     253         228 :                 xlrec = (xl_xact_abort *) XLogRecGetData(r);
     254         228 :                 ParseAbortRecord(XLogRecGetInfo(buf->record), xlrec, &parsed);
     255             : 
     256         228 :                 if (!TransactionIdIsValid(parsed.twophase_xid))
     257         152 :                     xid = XLogRecGetXid(r);
     258             :                 else
     259          76 :                     xid = parsed.twophase_xid;
     260             : 
     261             :                 /*
     262             :                  * We would like to process the transaction in a two-phase
     263             :                  * manner iff output plugin supports two-phase commits and
     264             :                  * doesn't filter the transaction at prepare time.
     265             :                  */
     266         228 :                 if (info == XLOG_XACT_ABORT_PREPARED)
     267          76 :                     two_phase = !(FilterPrepare(ctx, xid,
     268          76 :                                                 parsed.twophase_gid));
     269             : 
     270         228 :                 DecodeAbort(ctx, buf, &parsed, xid, two_phase);
     271         228 :                 break;
     272             :             }
     273         266 :         case XLOG_XACT_ASSIGNMENT:
     274             : 
     275             :             /*
     276             :              * We assign subxact to the toplevel xact while processing each
     277             :              * record if required.  So, we don't need to do anything here. See
     278             :              * LogicalDecodingProcessRecord.
     279             :              */
     280         266 :             break;
     281        9884 :         case XLOG_XACT_INVALIDATIONS:
     282             :             {
     283             :                 TransactionId xid;
     284             :                 xl_xact_invals *invals;
     285             : 
     286        9884 :                 xid = XLogRecGetXid(r);
     287        9884 :                 invals = (xl_xact_invals *) XLogRecGetData(r);
     288             : 
     289             :                 /*
     290             :                  * Execute the invalidations for xid-less transactions,
     291             :                  * otherwise, accumulate them so that they can be processed at
     292             :                  * the commit time.
     293             :                  */
     294        9884 :                 if (TransactionIdIsValid(xid))
     295             :                 {
     296        9880 :                     if (!ctx->fast_forward)
     297        9848 :                         ReorderBufferAddInvalidations(reorder, xid,
     298             :                                                       buf->origptr,
     299        9848 :                                                       invals->nmsgs,
     300        9848 :                                                       invals->msgs);
     301        9880 :                     ReorderBufferXidSetCatalogChanges(ctx->reorder, xid,
     302             :                                                       buf->origptr);
     303             :                 }
     304           4 :                 else if (!ctx->fast_forward)
     305           4 :                     ReorderBufferImmediateInvalidation(ctx->reorder,
     306           4 :                                                        invals->nmsgs,
     307           4 :                                                        invals->msgs);
     308             : 
     309        9884 :                 break;
     310             :             }
     311         314 :         case XLOG_XACT_PREPARE:
     312             :             {
     313             :                 xl_xact_parsed_prepare parsed;
     314             :                 xl_xact_prepare *xlrec;
     315             : 
     316             :                 /* ok, parse it */
     317         314 :                 xlrec = (xl_xact_prepare *) XLogRecGetData(r);
     318         314 :                 ParsePrepareRecord(XLogRecGetInfo(buf->record),
     319             :                                    xlrec, &parsed);
     320             : 
     321             :                 /*
     322             :                  * We would like to process the transaction in a two-phase
     323             :                  * manner iff output plugin supports two-phase commits and
     324             :                  * doesn't filter the transaction at prepare time.
     325             :                  */
     326         314 :                 if (FilterPrepare(ctx, parsed.twophase_xid,
     327             :                                   parsed.twophase_gid))
     328             :                 {
     329          28 :                     ReorderBufferProcessXid(reorder, parsed.twophase_xid,
     330             :                                             buf->origptr);
     331          28 :                     break;
     332             :                 }
     333             : 
     334             :                 /*
     335             :                  * Note that if the prepared transaction has locked [user]
     336             :                  * catalog tables exclusively then decoding prepare can block
     337             :                  * till the main transaction is committed because it needs to
     338             :                  * lock the catalog tables.
     339             :                  *
     340             :                  * XXX Now, this can even lead to a deadlock if the prepare
     341             :                  * transaction is waiting to get it logically replicated for
     342             :                  * distributed 2PC. This can be avoided by disallowing
     343             :                  * preparing transactions that have locked [user] catalog
     344             :                  * tables exclusively but as of now, we ask users not to do
     345             :                  * such an operation.
     346             :                  */
     347         286 :                 DecodePrepare(ctx, buf, &parsed);
     348         286 :                 break;
     349             :             }
     350           0 :         default:
     351           0 :             elog(ERROR, "unexpected RM_XACT_ID record type: %u", info);
     352             :     }
     353             : }
     354             : 
     355             : /*
     356             :  * Handle rmgr STANDBY_ID records for LogicalDecodingProcessRecord().
     357             :  */
     358             : void
     359        7338 : standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     360             : {
     361        7338 :     SnapBuild  *builder = ctx->snapshot_builder;
     362        7338 :     XLogReaderState *r = buf->record;
     363        7338 :     uint8       info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
     364             : 
     365        7338 :     ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
     366             : 
     367        7338 :     switch (info)
     368             :     {
     369        2678 :         case XLOG_RUNNING_XACTS:
     370             :             {
     371        2678 :                 xl_running_xacts *running = (xl_running_xacts *) XLogRecGetData(r);
     372             : 
     373        2678 :                 SnapBuildProcessRunningXacts(builder, buf->origptr, running);
     374             : 
     375             :                 /*
     376             :                  * Abort all transactions that we keep track of, that are
     377             :                  * older than the record's oldestRunningXid. This is the most
     378             :                  * convenient spot for doing so since, in contrast to shutdown
     379             :                  * or end-of-recovery checkpoints, we have information about
     380             :                  * all running transactions which includes prepared ones,
     381             :                  * while shutdown checkpoints just know that no non-prepared
     382             :                  * transactions are in progress.
     383             :                  */
     384        2674 :                 ReorderBufferAbortOld(ctx->reorder, running->oldestRunningXid);
     385             :             }
     386        2674 :             break;
     387        4656 :         case XLOG_STANDBY_LOCK:
     388        4656 :             break;
     389           4 :         case XLOG_INVALIDATIONS:
     390             : 
     391             :             /*
     392             :              * We are processing the invalidations at the command level via
     393             :              * XLOG_XACT_INVALIDATIONS.  So we don't need to do anything here.
     394             :              */
     395           4 :             break;
     396           0 :         default:
     397           0 :             elog(ERROR, "unexpected RM_STANDBY_ID record type: %u", info);
     398             :     }
     399        7334 : }
     400             : 
     401             : /*
     402             :  * Handle rmgr HEAP2_ID records for LogicalDecodingProcessRecord().
     403             :  */
     404             : void
     405       64208 : heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     406             : {
     407       64208 :     uint8       info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK;
     408       64208 :     TransactionId xid = XLogRecGetXid(buf->record);
     409       64208 :     SnapBuild  *builder = ctx->snapshot_builder;
     410             : 
     411       64208 :     ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
     412             : 
     413             :     /*
     414             :      * If we don't have snapshot or we are just fast-forwarding, there is no
     415             :      * point in decoding changes.
     416             :      */
     417       64208 :     if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
     418       64190 :         ctx->fast_forward)
     419         260 :         return;
     420             : 
     421       63948 :     switch (info)
     422             :     {
     423       11586 :         case XLOG_HEAP2_MULTI_INSERT:
     424       11586 :             if (SnapBuildProcessChange(builder, xid, buf->origptr))
     425       11586 :                 DecodeMultiInsert(ctx, buf);
     426       11586 :             break;
     427       47442 :         case XLOG_HEAP2_NEW_CID:
     428             :             {
     429             :                 xl_heap_new_cid *xlrec;
     430             : 
     431       47442 :                 xlrec = (xl_heap_new_cid *) XLogRecGetData(buf->record);
     432       47442 :                 SnapBuildProcessNewCid(builder, xid, buf->origptr, xlrec);
     433             : 
     434       47442 :                 break;
     435             :             }
     436         180 :         case XLOG_HEAP2_REWRITE:
     437             : 
     438             :             /*
     439             :              * Although these records only exist to serve the needs of logical
     440             :              * decoding, all the work happens as part of crash or archive
     441             :              * recovery, so we don't need to do anything here.
     442             :              */
     443         180 :             break;
     444             : 
     445             :             /*
     446             :              * Everything else here is just low level physical stuff we're not
     447             :              * interested in.
     448             :              */
     449        4740 :         case XLOG_HEAP2_PRUNE_ON_ACCESS:
     450             :         case XLOG_HEAP2_PRUNE_VACUUM_SCAN:
     451             :         case XLOG_HEAP2_PRUNE_VACUUM_CLEANUP:
     452             :         case XLOG_HEAP2_VISIBLE:
     453             :         case XLOG_HEAP2_LOCK_UPDATED:
     454        4740 :             break;
     455           0 :         default:
     456           0 :             elog(ERROR, "unexpected RM_HEAP2_ID record type: %u", info);
     457             :     }
     458             : }
     459             : 
     460             : /*
     461             :  * Handle rmgr HEAP_ID records for LogicalDecodingProcessRecord().
     462             :  */
     463             : void
     464     3760798 : heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     465             : {
     466     3760798 :     uint8       info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK;
     467     3760798 :     TransactionId xid = XLogRecGetXid(buf->record);
     468     3760798 :     SnapBuild  *builder = ctx->snapshot_builder;
     469             : 
     470     3760798 :     ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
     471             : 
     472             :     /*
     473             :      * If we don't have snapshot or we are just fast-forwarding, there is no
     474             :      * point in decoding data changes.
     475             :      */
     476     3760798 :     if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
     477     3760790 :         ctx->fast_forward)
     478         188 :         return;
     479             : 
     480     3760610 :     switch (info)
     481             :     {
     482     2392614 :         case XLOG_HEAP_INSERT:
     483     2392614 :             if (SnapBuildProcessChange(builder, xid, buf->origptr))
     484     2392614 :                 DecodeInsert(ctx, buf);
     485     2392614 :             break;
     486             : 
     487             :             /*
     488             :              * Treat HOT update as normal updates. There is no useful
     489             :              * information in the fact that we could make it a HOT update
     490             :              * locally and the WAL layout is compatible.
     491             :              */
     492      414356 :         case XLOG_HEAP_HOT_UPDATE:
     493             :         case XLOG_HEAP_UPDATE:
     494      414356 :             if (SnapBuildProcessChange(builder, xid, buf->origptr))
     495      414356 :                 DecodeUpdate(ctx, buf);
     496      414356 :             break;
     497             : 
     498      534884 :         case XLOG_HEAP_DELETE:
     499      534884 :             if (SnapBuildProcessChange(builder, xid, buf->origptr))
     500      534884 :                 DecodeDelete(ctx, buf);
     501      534884 :             break;
     502             : 
     503         110 :         case XLOG_HEAP_TRUNCATE:
     504         110 :             if (SnapBuildProcessChange(builder, xid, buf->origptr))
     505         110 :                 DecodeTruncate(ctx, buf);
     506         110 :             break;
     507             : 
     508        2020 :         case XLOG_HEAP_INPLACE:
     509             : 
     510             :             /*
     511             :              * Inplace updates are only ever performed on catalog tuples and
     512             :              * can, per definition, not change tuple visibility.  Inplace
     513             :              * updates don't affect storage or interpretation of table rows,
     514             :              * so they don't affect logicalrep_write_tuple() outcomes.  Hence,
     515             :              * we don't process invalidations from the original operation.  If
     516             :              * inplace updates did affect those things, invalidations wouldn't
     517             :              * make it work, since there are no snapshot-specific versions of
     518             :              * inplace-updated values.  Since we also don't decode catalog
     519             :              * tuples, we're not interested in the record's contents.
     520             :              *
     521             :              * WAL contains likely-unnecessary commit-time invals from the
     522             :              * CacheInvalidateHeapTuple() call in
     523             :              * heap_inplace_update_and_unlock(). Excess invalidation is safe.
     524             :              */
     525        2020 :             break;
     526             : 
     527       35832 :         case XLOG_HEAP_CONFIRM:
     528       35832 :             if (SnapBuildProcessChange(builder, xid, buf->origptr))
     529       35832 :                 DecodeSpecConfirm(ctx, buf);
     530       35832 :             break;
     531             : 
     532      380794 :         case XLOG_HEAP_LOCK:
     533             :             /* we don't care about row level locks for now */
     534      380794 :             break;
     535             : 
     536           0 :         default:
     537           0 :             elog(ERROR, "unexpected RM_HEAP_ID record type: %u", info);
     538             :             break;
     539             :     }
     540             : }
     541             : 
     542             : /*
     543             :  * Ask output plugin whether we want to skip this PREPARE and send
     544             :  * this transaction as a regular commit later.
     545             :  */
     546             : static inline bool
     547         590 : FilterPrepare(LogicalDecodingContext *ctx, TransactionId xid,
     548             :               const char *gid)
     549             : {
     550             :     /*
     551             :      * Skip if decoding of two-phase transactions at PREPARE time is not
     552             :      * enabled. In that case, all two-phase transactions are considered
     553             :      * filtered out and will be applied as regular transactions at COMMIT
     554             :      * PREPARED.
     555             :      */
     556         590 :     if (!ctx->twophase)
     557          26 :         return true;
     558             : 
     559             :     /*
     560             :      * The filter_prepare callback is optional. When not supplied, all
     561             :      * prepared transactions should go through.
     562             :      */
     563         564 :     if (ctx->callbacks.filter_prepare_cb == NULL)
     564         272 :         return false;
     565             : 
     566         292 :     return filter_prepare_cb_wrapper(ctx, xid, gid);
     567             : }
     568             : 
     569             : static inline bool
     570     3373216 : FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
     571             : {
     572     3373216 :     if (ctx->callbacks.filter_by_origin_cb == NULL)
     573          42 :         return false;
     574             : 
     575     3373174 :     return filter_by_origin_cb_wrapper(ctx, origin_id);
     576             : }
     577             : 
     578             : /*
     579             :  * Handle rmgr LOGICALMSG_ID records for LogicalDecodingProcessRecord().
     580             :  */
     581             : void
     582         114 : logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     583             : {
     584         114 :     SnapBuild  *builder = ctx->snapshot_builder;
     585         114 :     XLogReaderState *r = buf->record;
     586         114 :     TransactionId xid = XLogRecGetXid(r);
     587         114 :     uint8       info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
     588         114 :     RepOriginId origin_id = XLogRecGetOrigin(r);
     589         114 :     Snapshot    snapshot = NULL;
     590             :     xl_logical_message *message;
     591             : 
     592         114 :     if (info != XLOG_LOGICAL_MESSAGE)
     593           0 :         elog(ERROR, "unexpected RM_LOGICALMSG_ID record type: %u", info);
     594             : 
     595         114 :     ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
     596             : 
     597             :     /* If we don't have snapshot, there is no point in decoding messages */
     598         114 :     if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
     599           0 :         return;
     600             : 
     601         114 :     message = (xl_logical_message *) XLogRecGetData(r);
     602             : 
     603         224 :     if (message->dbId != ctx->slot->data.database ||
     604         110 :         FilterByOrigin(ctx, origin_id))
     605           8 :         return;
     606             : 
     607         106 :     if (message->transactional &&
     608          78 :         !SnapBuildProcessChange(builder, xid, buf->origptr))
     609           0 :         return;
     610         134 :     else if (!message->transactional &&
     611          56 :              (SnapBuildCurrentState(builder) != SNAPBUILD_CONSISTENT ||
     612          28 :               SnapBuildXactNeedsSkip(builder, buf->origptr)))
     613           8 :         return;
     614             : 
     615             :     /*
     616             :      * We also skip decoding in fast_forward mode. This check must be last
     617             :      * because we don't want to set the processing_required flag unless we
     618             :      * have a decodable message.
     619             :      */
     620          98 :     if (ctx->fast_forward)
     621             :     {
     622             :         /*
     623             :          * We need to set processing_required flag to notify the message's
     624             :          * existence to the caller. Usually, the flag is set when either the
     625             :          * COMMIT or ABORT records are decoded, but this must be turned on
     626             :          * here because the non-transactional logical message is decoded
     627             :          * without waiting for these records.
     628             :          */
     629           4 :         if (!message->transactional)
     630           4 :             ctx->processing_required = true;
     631             : 
     632           4 :         return;
     633             :     }
     634             : 
     635             :     /*
     636             :      * If this is a non-transactional change, get the snapshot we're expected
     637             :      * to use. We only get here when the snapshot is consistent, and the
     638             :      * change is not meant to be skipped.
     639             :      *
     640             :      * For transactional changes we don't need a snapshot, we'll use the
     641             :      * regular snapshot maintained by ReorderBuffer. We just leave it NULL.
     642             :      */
     643          94 :     if (!message->transactional)
     644          16 :         snapshot = SnapBuildGetOrBuildSnapshot(builder);
     645             : 
     646          94 :     ReorderBufferQueueMessage(ctx->reorder, xid, snapshot, buf->endptr,
     647          94 :                               message->transactional,
     648          94 :                               message->message, /* first part of message is
     649             :                                                  * prefix */
     650             :                               message->message_size,
     651          94 :                               message->message + message->prefix_size);
     652             : }
     653             : 
     654             : /*
     655             :  * Consolidated commit record handling between the different form of commit
     656             :  * records.
     657             :  *
     658             :  * 'two_phase' indicates that caller wants to process the transaction in two
     659             :  * phases, first process prepare if not already done and then process
     660             :  * commit_prepared.
     661             :  */
     662             : static void
     663        5544 : DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
     664             :              xl_xact_parsed_commit *parsed, TransactionId xid,
     665             :              bool two_phase)
     666             : {
     667        5544 :     XLogRecPtr  origin_lsn = InvalidXLogRecPtr;
     668        5544 :     TimestampTz commit_time = parsed->xact_time;
     669        5544 :     RepOriginId origin_id = XLogRecGetOrigin(buf->record);
     670             :     int         i;
     671             : 
     672        5544 :     if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
     673             :     {
     674         136 :         origin_lsn = parsed->origin_lsn;
     675         136 :         commit_time = parsed->origin_timestamp;
     676             :     }
     677             : 
     678        5544 :     SnapBuildCommitTxn(ctx->snapshot_builder, buf->origptr, xid,
     679             :                        parsed->nsubxacts, parsed->subxacts,
     680             :                        parsed->xinfo);
     681             : 
     682             :     /* ----
     683             :      * Check whether we are interested in this specific transaction, and tell
     684             :      * the reorderbuffer to forget the content of the (sub-)transactions
     685             :      * if not.
     686             :      *
     687             :      * We can't just use ReorderBufferAbort() here, because we need to execute
     688             :      * the transaction's invalidations.  This currently won't be needed if
     689             :      * we're just skipping over the transaction because currently we only do
     690             :      * so during startup, to get to the first transaction the client needs. As
     691             :      * we have reset the catalog caches before starting to read WAL, and we
     692             :      * haven't yet touched any catalogs, there can't be anything to invalidate.
     693             :      * But if we're "forgetting" this commit because it happened in another
     694             :      * database, the invalidations might be important, because they could be
     695             :      * for shared catalogs and we might have loaded data into the relevant
     696             :      * syscaches.
     697             :      * ---
     698             :      */
     699        5544 :     if (DecodeTXNNeedSkip(ctx, buf, parsed->dbId, origin_id))
     700             :     {
     701        5030 :         for (i = 0; i < parsed->nsubxacts; i++)
     702             :         {
     703        1968 :             ReorderBufferForget(ctx->reorder, parsed->subxacts[i], buf->origptr);
     704             :         }
     705        3062 :         ReorderBufferForget(ctx->reorder, xid, buf->origptr);
     706             : 
     707        3062 :         return;
     708             :     }
     709             : 
     710             :     /* tell the reorderbuffer about the surviving subtransactions */
     711        3016 :     for (i = 0; i < parsed->nsubxacts; i++)
     712             :     {
     713         534 :         ReorderBufferCommitChild(ctx->reorder, xid, parsed->subxacts[i],
     714             :                                  buf->origptr, buf->endptr);
     715             :     }
     716             : 
     717             :     /*
     718             :      * Send the final commit record if the transaction data is already
     719             :      * decoded, otherwise, process the entire transaction.
     720             :      */
     721        2482 :     if (two_phase)
     722             :     {
     723          64 :         ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
     724          64 :                                     SnapBuildGetTwoPhaseAt(ctx->snapshot_builder),
     725             :                                     commit_time, origin_id, origin_lsn,
     726          64 :                                     parsed->twophase_gid, true);
     727             :     }
     728             :     else
     729             :     {
     730        2418 :         ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
     731             :                             commit_time, origin_id, origin_lsn);
     732             :     }
     733             : 
     734             :     /*
     735             :      * Update the decoding stats at transaction prepare/commit/abort.
     736             :      * Additionally we send the stats when we spill or stream the changes to
     737             :      * avoid losing them in case the decoding is interrupted. It is not clear
     738             :      * that sending more or less frequently than this would be better.
     739             :      */
     740        2474 :     UpdateDecodingStats(ctx);
     741             : }
     742             : 
     743             : /*
     744             :  * Decode PREPARE record. Similar logic as in DecodeCommit.
     745             :  *
     746             :  * Note that we don't skip prepare even if have detected concurrent abort
     747             :  * because it is quite possible that we had already sent some changes before we
     748             :  * detect abort in which case we need to abort those changes in the subscriber.
     749             :  * To abort such changes, we do send the prepare and then the rollback prepared
     750             :  * which is what happened on the publisher-side as well. Now, we can invent a
     751             :  * new abort API wherein in such cases we send abort and skip sending prepared
     752             :  * and rollback prepared but then it is not that straightforward because we
     753             :  * might have streamed this transaction by that time in which case it is
     754             :  * handled when the rollback is encountered. It is not impossible to optimize
     755             :  * the concurrent abort case but it can introduce design complexity w.r.t
     756             :  * handling different cases so leaving it for now as it doesn't seem worth it.
     757             :  */
     758             : static void
     759         286 : DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
     760             :               xl_xact_parsed_prepare *parsed)
     761             : {
     762         286 :     SnapBuild  *builder = ctx->snapshot_builder;
     763         286 :     XLogRecPtr  origin_lsn = parsed->origin_lsn;
     764         286 :     TimestampTz prepare_time = parsed->xact_time;
     765         286 :     RepOriginId origin_id = XLogRecGetOrigin(buf->record);
     766             :     int         i;
     767         286 :     TransactionId xid = parsed->twophase_xid;
     768             : 
     769         286 :     if (parsed->origin_timestamp != 0)
     770          16 :         prepare_time = parsed->origin_timestamp;
     771             : 
     772             :     /*
     773             :      * Remember the prepare info for a txn so that it can be used later in
     774             :      * commit prepared if required. See ReorderBufferFinishPrepared.
     775             :      */
     776         286 :     if (!ReorderBufferRememberPrepareInfo(ctx->reorder, xid, buf->origptr,
     777             :                                           buf->endptr, prepare_time, origin_id,
     778             :                                           origin_lsn))
     779           0 :         return;
     780             : 
     781             :     /* We can't start streaming unless a consistent state is reached. */
     782         286 :     if (SnapBuildCurrentState(builder) < SNAPBUILD_CONSISTENT)
     783             :     {
     784           6 :         ReorderBufferSkipPrepare(ctx->reorder, xid);
     785           6 :         return;
     786             :     }
     787             : 
     788             :     /*
     789             :      * Check whether we need to process this transaction. See
     790             :      * DecodeTXNNeedSkip for the reasons why we sometimes want to skip the
     791             :      * transaction.
     792             :      *
     793             :      * We can't call ReorderBufferForget as we did in DecodeCommit as the txn
     794             :      * hasn't yet been committed, removing this txn before a commit might
     795             :      * result in the computation of an incorrect restart_lsn. See
     796             :      * SnapBuildProcessRunningXacts. But we need to process cache
     797             :      * invalidations if there are any for the reasons mentioned in
     798             :      * DecodeCommit.
     799             :      */
     800         280 :     if (DecodeTXNNeedSkip(ctx, buf, parsed->dbId, origin_id))
     801             :     {
     802         198 :         ReorderBufferSkipPrepare(ctx->reorder, xid);
     803         198 :         ReorderBufferInvalidate(ctx->reorder, xid, buf->origptr);
     804         198 :         return;
     805             :     }
     806             : 
     807             :     /* Tell the reorderbuffer about the surviving subtransactions. */
     808          84 :     for (i = 0; i < parsed->nsubxacts; i++)
     809             :     {
     810           2 :         ReorderBufferCommitChild(ctx->reorder, xid, parsed->subxacts[i],
     811             :                                  buf->origptr, buf->endptr);
     812             :     }
     813             : 
     814             :     /* replay actions of all transaction + subtransactions in order */
     815          82 :     ReorderBufferPrepare(ctx->reorder, xid, parsed->twophase_gid);
     816             : 
     817             :     /*
     818             :      * Update the decoding stats at transaction prepare/commit/abort.
     819             :      * Additionally we send the stats when we spill or stream the changes to
     820             :      * avoid losing them in case the decoding is interrupted. It is not clear
     821             :      * that sending more or less frequently than this would be better.
     822             :      */
     823          82 :     UpdateDecodingStats(ctx);
     824             : }
     825             : 
     826             : 
     827             : /*
     828             :  * Get the data from the various forms of abort records and pass it on to
     829             :  * snapbuild.c and reorderbuffer.c.
     830             :  *
     831             :  * 'two_phase' indicates to finish prepared transaction.
     832             :  */
     833             : static void
     834         228 : DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
     835             :             xl_xact_parsed_abort *parsed, TransactionId xid,
     836             :             bool two_phase)
     837             : {
     838             :     int         i;
     839         228 :     XLogRecPtr  origin_lsn = InvalidXLogRecPtr;
     840         228 :     TimestampTz abort_time = parsed->xact_time;
     841         228 :     RepOriginId origin_id = XLogRecGetOrigin(buf->record);
     842             :     bool        skip_xact;
     843             : 
     844         228 :     if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
     845             :     {
     846           8 :         origin_lsn = parsed->origin_lsn;
     847           8 :         abort_time = parsed->origin_timestamp;
     848             :     }
     849             : 
     850             :     /*
     851             :      * Check whether we need to process this transaction. See
     852             :      * DecodeTXNNeedSkip for the reasons why we sometimes want to skip the
     853             :      * transaction.
     854             :      */
     855         228 :     skip_xact = DecodeTXNNeedSkip(ctx, buf, parsed->dbId, origin_id);
     856             : 
     857             :     /*
     858             :      * Send the final rollback record for a prepared transaction unless we
     859             :      * need to skip it. For non-two-phase xacts, simply forget the xact.
     860             :      */
     861         228 :     if (two_phase && !skip_xact)
     862             :     {
     863          20 :         ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
     864             :                                     InvalidXLogRecPtr,
     865             :                                     abort_time, origin_id, origin_lsn,
     866          20 :                                     parsed->twophase_gid, false);
     867             :     }
     868             :     else
     869             :     {
     870         220 :         for (i = 0; i < parsed->nsubxacts; i++)
     871             :         {
     872          12 :             ReorderBufferAbort(ctx->reorder, parsed->subxacts[i],
     873          12 :                                buf->record->EndRecPtr, abort_time);
     874             :         }
     875             : 
     876         208 :         ReorderBufferAbort(ctx->reorder, xid, buf->record->EndRecPtr,
     877             :                            abort_time);
     878             :     }
     879             : 
     880             :     /* update the decoding stats */
     881         228 :     UpdateDecodingStats(ctx);
     882         228 : }
     883             : 
     884             : /*
     885             :  * Parse XLOG_HEAP_INSERT (not MULTI_INSERT!) records into tuplebufs.
     886             :  *
     887             :  * Inserts can contain the new tuple.
     888             :  */
     889             : static void
     890     2392614 : DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     891             : {
     892             :     Size        datalen;
     893             :     char       *tupledata;
     894             :     Size        tuplelen;
     895     2392614 :     XLogReaderState *r = buf->record;
     896             :     xl_heap_insert *xlrec;
     897             :     ReorderBufferChange *change;
     898             :     RelFileLocator target_locator;
     899             : 
     900     2392614 :     xlrec = (xl_heap_insert *) XLogRecGetData(r);
     901             : 
     902             :     /*
     903             :      * Ignore insert records without new tuples (this does happen when
     904             :      * raw_heap_insert marks the TOAST record as HEAP_INSERT_NO_LOGICAL).
     905             :      */
     906     2392614 :     if (!(xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE))
     907        7444 :         return;
     908             : 
     909             :     /* only interested in our database */
     910     2385204 :     XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
     911     2385204 :     if (target_locator.dbOid != ctx->slot->data.database)
     912           0 :         return;
     913             : 
     914             :     /* output plugin doesn't look for this origin, no need to queue */
     915     2385204 :     if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
     916          34 :         return;
     917             : 
     918     2385170 :     change = ReorderBufferGetChange(ctx->reorder);
     919     2385170 :     if (!(xlrec->flags & XLH_INSERT_IS_SPECULATIVE))
     920     2349338 :         change->action = REORDER_BUFFER_CHANGE_INSERT;
     921             :     else
     922       35832 :         change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT;
     923     2385170 :     change->origin_id = XLogRecGetOrigin(r);
     924             : 
     925     2385170 :     memcpy(&change->data.tp.rlocator, &target_locator, sizeof(RelFileLocator));
     926             : 
     927     2385170 :     tupledata = XLogRecGetBlockData(r, 0, &datalen);
     928     2385170 :     tuplelen = datalen - SizeOfHeapHeader;
     929             : 
     930     2385170 :     change->data.tp.newtuple =
     931     2385170 :         ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
     932             : 
     933     2385170 :     DecodeXLogTuple(tupledata, datalen, change->data.tp.newtuple);
     934             : 
     935     2385170 :     change->data.tp.clear_toast_afterwards = true;
     936             : 
     937     2385170 :     ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
     938             :                              change,
     939     2385170 :                              xlrec->flags & XLH_INSERT_ON_TOAST_RELATION);
     940             : }
     941             : 
     942             : /*
     943             :  * Parse XLOG_HEAP_UPDATE and XLOG_HEAP_HOT_UPDATE, which have the same layout
     944             :  * in the record, from wal into proper tuplebufs.
     945             :  *
     946             :  * Updates can possibly contain a new tuple and the old primary key.
     947             :  */
     948             : static void
     949      414356 : DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     950             : {
     951      414356 :     XLogReaderState *r = buf->record;
     952             :     xl_heap_update *xlrec;
     953             :     ReorderBufferChange *change;
     954             :     char       *data;
     955             :     RelFileLocator target_locator;
     956             : 
     957      414356 :     xlrec = (xl_heap_update *) XLogRecGetData(r);
     958             : 
     959             :     /* only interested in our database */
     960      414356 :     XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
     961      414356 :     if (target_locator.dbOid != ctx->slot->data.database)
     962          74 :         return;
     963             : 
     964             :     /* output plugin doesn't look for this origin, no need to queue */
     965      414334 :     if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
     966          52 :         return;
     967             : 
     968      414282 :     change = ReorderBufferGetChange(ctx->reorder);
     969      414282 :     change->action = REORDER_BUFFER_CHANGE_UPDATE;
     970      414282 :     change->origin_id = XLogRecGetOrigin(r);
     971      414282 :     memcpy(&change->data.tp.rlocator, &target_locator, sizeof(RelFileLocator));
     972             : 
     973      414282 :     if (xlrec->flags & XLH_UPDATE_CONTAINS_NEW_TUPLE)
     974             :     {
     975             :         Size        datalen;
     976             :         Size        tuplelen;
     977             : 
     978      410846 :         data = XLogRecGetBlockData(r, 0, &datalen);
     979             : 
     980      410846 :         tuplelen = datalen - SizeOfHeapHeader;
     981             : 
     982      410846 :         change->data.tp.newtuple =
     983      410846 :             ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
     984             : 
     985      410846 :         DecodeXLogTuple(data, datalen, change->data.tp.newtuple);
     986             :     }
     987             : 
     988      414282 :     if (xlrec->flags & XLH_UPDATE_CONTAINS_OLD)
     989             :     {
     990             :         Size        datalen;
     991             :         Size        tuplelen;
     992             : 
     993             :         /* caution, remaining data in record is not aligned */
     994         560 :         data = XLogRecGetData(r) + SizeOfHeapUpdate;
     995         560 :         datalen = XLogRecGetDataLen(r) - SizeOfHeapUpdate;
     996         560 :         tuplelen = datalen - SizeOfHeapHeader;
     997             : 
     998         560 :         change->data.tp.oldtuple =
     999         560 :             ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
    1000             : 
    1001         560 :         DecodeXLogTuple(data, datalen, change->data.tp.oldtuple);
    1002             :     }
    1003             : 
    1004      414282 :     change->data.tp.clear_toast_afterwards = true;
    1005             : 
    1006      414282 :     ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
    1007             :                              change, false);
    1008             : }
    1009             : 
    1010             : /*
    1011             :  * Parse XLOG_HEAP_DELETE from wal into proper tuplebufs.
    1012             :  *
    1013             :  * Deletes can possibly contain the old primary key.
    1014             :  */
    1015             : static void
    1016      534884 : DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
    1017             : {
    1018      534884 :     XLogReaderState *r = buf->record;
    1019             :     xl_heap_delete *xlrec;
    1020             :     ReorderBufferChange *change;
    1021             :     RelFileLocator target_locator;
    1022             : 
    1023      534884 :     xlrec = (xl_heap_delete *) XLogRecGetData(r);
    1024             : 
    1025             :     /* only interested in our database */
    1026      534884 :     XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
    1027      534884 :     if (target_locator.dbOid != ctx->slot->data.database)
    1028          64 :         return;
    1029             : 
    1030             :     /* output plugin doesn't look for this origin, no need to queue */
    1031      534832 :     if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
    1032          12 :         return;
    1033             : 
    1034      534820 :     change = ReorderBufferGetChange(ctx->reorder);
    1035             : 
    1036      534820 :     if (xlrec->flags & XLH_DELETE_IS_SUPER)
    1037           0 :         change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT;
    1038             :     else
    1039      534820 :         change->action = REORDER_BUFFER_CHANGE_DELETE;
    1040             : 
    1041      534820 :     change->origin_id = XLogRecGetOrigin(r);
    1042             : 
    1043      534820 :     memcpy(&change->data.tp.rlocator, &target_locator, sizeof(RelFileLocator));
    1044             : 
    1045             :     /* old primary key stored */
    1046      534820 :     if (xlrec->flags & XLH_DELETE_CONTAINS_OLD)
    1047             :     {
    1048      411474 :         Size        datalen = XLogRecGetDataLen(r) - SizeOfHeapDelete;
    1049      411474 :         Size        tuplelen = datalen - SizeOfHeapHeader;
    1050             : 
    1051             :         Assert(XLogRecGetDataLen(r) > (SizeOfHeapDelete + SizeOfHeapHeader));
    1052             : 
    1053      411474 :         change->data.tp.oldtuple =
    1054      411474 :             ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
    1055             : 
    1056      411474 :         DecodeXLogTuple((char *) xlrec + SizeOfHeapDelete,
    1057             :                         datalen, change->data.tp.oldtuple);
    1058             :     }
    1059             : 
    1060      534820 :     change->data.tp.clear_toast_afterwards = true;
    1061             : 
    1062      534820 :     ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
    1063             :                              change, false);
    1064             : }
    1065             : 
    1066             : /*
    1067             :  * Parse XLOG_HEAP_TRUNCATE from wal
    1068             :  */
    1069             : static void
    1070         110 : DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
    1071             : {
    1072         110 :     XLogReaderState *r = buf->record;
    1073             :     xl_heap_truncate *xlrec;
    1074             :     ReorderBufferChange *change;
    1075             : 
    1076         110 :     xlrec = (xl_heap_truncate *) XLogRecGetData(r);
    1077             : 
    1078             :     /* only interested in our database */
    1079         110 :     if (xlrec->dbId != ctx->slot->data.database)
    1080           0 :         return;
    1081             : 
    1082             :     /* output plugin doesn't look for this origin, no need to queue */
    1083         110 :     if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
    1084           0 :         return;
    1085             : 
    1086         110 :     change = ReorderBufferGetChange(ctx->reorder);
    1087         110 :     change->action = REORDER_BUFFER_CHANGE_TRUNCATE;
    1088         110 :     change->origin_id = XLogRecGetOrigin(r);
    1089         110 :     if (xlrec->flags & XLH_TRUNCATE_CASCADE)
    1090           2 :         change->data.truncate.cascade = true;
    1091         110 :     if (xlrec->flags & XLH_TRUNCATE_RESTART_SEQS)
    1092           4 :         change->data.truncate.restart_seqs = true;
    1093         110 :     change->data.truncate.nrelids = xlrec->nrelids;
    1094         220 :     change->data.truncate.relids = ReorderBufferGetRelids(ctx->reorder,
    1095         110 :                                                           xlrec->nrelids);
    1096         110 :     memcpy(change->data.truncate.relids, xlrec->relids,
    1097         110 :            xlrec->nrelids * sizeof(Oid));
    1098         110 :     ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r),
    1099             :                              buf->origptr, change, false);
    1100             : }
    1101             : 
    1102             : /*
    1103             :  * Decode XLOG_HEAP2_MULTI_INSERT record into multiple tuplebufs.
    1104             :  *
    1105             :  * Currently MULTI_INSERT will always contain the full tuples.
    1106             :  */
    1107             : static void
    1108       11586 : DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
    1109             : {
    1110       11586 :     XLogReaderState *r = buf->record;
    1111             :     xl_heap_multi_insert *xlrec;
    1112             :     int         i;
    1113             :     char       *data;
    1114             :     char       *tupledata;
    1115             :     Size        tuplelen;
    1116             :     RelFileLocator rlocator;
    1117             : 
    1118       11586 :     xlrec = (xl_heap_multi_insert *) XLogRecGetData(r);
    1119             : 
    1120             :     /*
    1121             :      * Ignore insert records without new tuples.  This happens when a
    1122             :      * multi_insert is done on a catalog or on a non-persistent relation.
    1123             :      */
    1124       11586 :     if (!(xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE))
    1125       11552 :         return;
    1126             : 
    1127             :     /* only interested in our database */
    1128         136 :     XLogRecGetBlockTag(r, 0, &rlocator, NULL, NULL);
    1129         136 :     if (rlocator.dbOid != ctx->slot->data.database)
    1130         102 :         return;
    1131             : 
    1132             :     /* output plugin doesn't look for this origin, no need to queue */
    1133          34 :     if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
    1134           0 :         return;
    1135             : 
    1136             :     /*
    1137             :      * We know that this multi_insert isn't for a catalog, so the block should
    1138             :      * always have data even if a full-page write of it is taken.
    1139             :      */
    1140          34 :     tupledata = XLogRecGetBlockData(r, 0, &tuplelen);
    1141             :     Assert(tupledata != NULL);
    1142             : 
    1143          34 :     data = tupledata;
    1144        2140 :     for (i = 0; i < xlrec->ntuples; i++)
    1145             :     {
    1146             :         ReorderBufferChange *change;
    1147             :         xl_multi_insert_tuple *xlhdr;
    1148             :         int         datalen;
    1149             :         HeapTuple   tuple;
    1150             :         HeapTupleHeader header;
    1151             : 
    1152        2106 :         change = ReorderBufferGetChange(ctx->reorder);
    1153        2106 :         change->action = REORDER_BUFFER_CHANGE_INSERT;
    1154        2106 :         change->origin_id = XLogRecGetOrigin(r);
    1155             : 
    1156        2106 :         memcpy(&change->data.tp.rlocator, &rlocator, sizeof(RelFileLocator));
    1157             : 
    1158        2106 :         xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(data);
    1159        2106 :         data = ((char *) xlhdr) + SizeOfMultiInsertTuple;
    1160        2106 :         datalen = xlhdr->datalen;
    1161             : 
    1162        2106 :         change->data.tp.newtuple =
    1163        2106 :             ReorderBufferGetTupleBuf(ctx->reorder, datalen);
    1164             : 
    1165        2106 :         tuple = change->data.tp.newtuple;
    1166        2106 :         header = tuple->t_data;
    1167             : 
    1168             :         /* not a disk based tuple */
    1169        2106 :         ItemPointerSetInvalid(&tuple->t_self);
    1170             : 
    1171             :         /*
    1172             :          * We can only figure this out after reassembling the transactions.
    1173             :          */
    1174        2106 :         tuple->t_tableOid = InvalidOid;
    1175             : 
    1176        2106 :         tuple->t_len = datalen + SizeofHeapTupleHeader;
    1177             : 
    1178        2106 :         memset(header, 0, SizeofHeapTupleHeader);
    1179             : 
    1180        2106 :         memcpy((char *) tuple->t_data + SizeofHeapTupleHeader,
    1181             :                (char *) data,
    1182             :                datalen);
    1183        2106 :         header->t_infomask = xlhdr->t_infomask;
    1184        2106 :         header->t_infomask2 = xlhdr->t_infomask2;
    1185        2106 :         header->t_hoff = xlhdr->t_hoff;
    1186             : 
    1187             :         /*
    1188             :          * Reset toast reassembly state only after the last row in the last
    1189             :          * xl_multi_insert_tuple record emitted by one heap_multi_insert()
    1190             :          * call.
    1191             :          */
    1192        2106 :         if (xlrec->flags & XLH_INSERT_LAST_IN_MULTI &&
    1193         386 :             (i + 1) == xlrec->ntuples)
    1194          24 :             change->data.tp.clear_toast_afterwards = true;
    1195             :         else
    1196        2082 :             change->data.tp.clear_toast_afterwards = false;
    1197             : 
    1198        2106 :         ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r),
    1199             :                                  buf->origptr, change, false);
    1200             : 
    1201             :         /* move to the next xl_multi_insert_tuple entry */
    1202        2106 :         data += datalen;
    1203             :     }
    1204             :     Assert(data == tupledata + tuplelen);
    1205             : }
    1206             : 
    1207             : /*
    1208             :  * Parse XLOG_HEAP_CONFIRM from wal into a confirmation change.
    1209             :  *
    1210             :  * This is pretty trivial, all the state essentially already setup by the
    1211             :  * speculative insertion.
    1212             :  */
    1213             : static void
    1214       35832 : DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
    1215             : {
    1216       35832 :     XLogReaderState *r = buf->record;
    1217             :     ReorderBufferChange *change;
    1218             :     RelFileLocator target_locator;
    1219             : 
    1220             :     /* only interested in our database */
    1221       35832 :     XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
    1222       35832 :     if (target_locator.dbOid != ctx->slot->data.database)
    1223           0 :         return;
    1224             : 
    1225             :     /* output plugin doesn't look for this origin, no need to queue */
    1226       35832 :     if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
    1227           0 :         return;
    1228             : 
    1229       35832 :     change = ReorderBufferGetChange(ctx->reorder);
    1230       35832 :     change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM;
    1231       35832 :     change->origin_id = XLogRecGetOrigin(r);
    1232             : 
    1233       35832 :     memcpy(&change->data.tp.rlocator, &target_locator, sizeof(RelFileLocator));
    1234             : 
    1235       35832 :     change->data.tp.clear_toast_afterwards = true;
    1236             : 
    1237       35832 :     ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
    1238             :                              change, false);
    1239             : }
    1240             : 
    1241             : 
    1242             : /*
    1243             :  * Read a HeapTuple as WAL logged by heap_insert, heap_update and heap_delete
    1244             :  * (but not by heap_multi_insert) into a tuplebuf.
    1245             :  *
    1246             :  * The size 'len' and the pointer 'data' in the record need to be
    1247             :  * computed outside as they are record specific.
    1248             :  */
    1249             : static void
    1250     3208050 : DecodeXLogTuple(char *data, Size len, HeapTuple tuple)
    1251             : {
    1252             :     xl_heap_header xlhdr;
    1253     3208050 :     int         datalen = len - SizeOfHeapHeader;
    1254             :     HeapTupleHeader header;
    1255             : 
    1256             :     Assert(datalen >= 0);
    1257             : 
    1258     3208050 :     tuple->t_len = datalen + SizeofHeapTupleHeader;
    1259     3208050 :     header = tuple->t_data;
    1260             : 
    1261             :     /* not a disk based tuple */
    1262     3208050 :     ItemPointerSetInvalid(&tuple->t_self);
    1263             : 
    1264             :     /* we can only figure this out after reassembling the transactions */
    1265     3208050 :     tuple->t_tableOid = InvalidOid;
    1266             : 
    1267             :     /* data is not stored aligned, copy to aligned storage */
    1268     3208050 :     memcpy((char *) &xlhdr,
    1269             :            data,
    1270             :            SizeOfHeapHeader);
    1271             : 
    1272     3208050 :     memset(header, 0, SizeofHeapTupleHeader);
    1273             : 
    1274     3208050 :     memcpy(((char *) tuple->t_data) + SizeofHeapTupleHeader,
    1275     3208050 :            data + SizeOfHeapHeader,
    1276             :            datalen);
    1277             : 
    1278     3208050 :     header->t_infomask = xlhdr.t_infomask;
    1279     3208050 :     header->t_infomask2 = xlhdr.t_infomask2;
    1280     3208050 :     header->t_hoff = xlhdr.t_hoff;
    1281     3208050 : }
    1282             : 
    1283             : /*
    1284             :  * Check whether we are interested in this specific transaction.
    1285             :  *
    1286             :  * There can be several reasons we might not be interested in this
    1287             :  * transaction:
    1288             :  * 1) We might not be interested in decoding transactions up to this
    1289             :  *    LSN. This can happen because we previously decoded it and now just
    1290             :  *    are restarting or if we haven't assembled a consistent snapshot yet.
    1291             :  * 2) The transaction happened in another database.
    1292             :  * 3) The output plugin is not interested in the origin.
    1293             :  * 4) We are doing fast-forwarding
    1294             :  */
    1295             : static bool
    1296        6052 : DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
    1297             :                   Oid txn_dbid, RepOriginId origin_id)
    1298             : {
    1299        6052 :     if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
    1300        5492 :         (txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) ||
    1301        2760 :         FilterByOrigin(ctx, origin_id))
    1302        3348 :         return true;
    1303             : 
    1304             :     /*
    1305             :      * We also skip decoding in fast_forward mode. In passing set the
    1306             :      * processing_required flag to indicate that if it were not for
    1307             :      * fast_forward mode, processing would have been required.
    1308             :      */
    1309        2704 :     if (ctx->fast_forward)
    1310             :     {
    1311          38 :         ctx->processing_required = true;
    1312          38 :         return true;
    1313             :     }
    1314             : 
    1315        2666 :     return false;
    1316             : }

Generated by: LCOV version 1.14