LCOV - code coverage report
Current view: top level - src/backend/replication/logical - decode.c (source / functions) Hit Total Coverage
Test: PostgreSQL 17devel Lines: 415 441 94.1 %
Date: 2024-04-26 14:11:25 Functions: 20 20 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /* -------------------------------------------------------------------------
       2             :  *
       3             :  * decode.c
       4             :  *      This module decodes WAL records read using xlogreader.h's APIs for the
       5             :  *      purpose of logical decoding by passing information to the
       6             :  *      reorderbuffer module (containing the actual changes) and to the
       7             :  *      snapbuild module to build a fitting catalog snapshot (to be able to
       8             :  *      properly decode the changes in the reorderbuffer).
       9             :  *
      10             :  * NOTE:
      11             :  *      This basically tries to handle all low level xlog stuff for
      12             :  *      reorderbuffer.c and snapbuild.c. There's some minor leakage where a
      13             :  *      specific record's struct is used to pass data along, but those just
      14             :  *      happen to contain the right amount of data in a convenient
      15             :  *      format. There isn't and shouldn't be much intelligence about the
      16             :  *      contents of records in here except turning them into a more usable
      17             :  *      format.
      18             :  *
      19             :  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
      20             :  * Portions Copyright (c) 1994, Regents of the University of California
      21             :  *
      22             :  * IDENTIFICATION
      23             :  *    src/backend/replication/logical/decode.c
      24             :  *
      25             :  * -------------------------------------------------------------------------
      26             :  */
      27             : #include "postgres.h"
      28             : 
      29             : #include "access/heapam_xlog.h"
      30             : #include "access/transam.h"
      31             : #include "access/xact.h"
      32             : #include "access/xlog_internal.h"
      33             : #include "access/xlogreader.h"
      34             : #include "access/xlogrecord.h"
      35             : #include "catalog/pg_control.h"
      36             : #include "replication/decode.h"
      37             : #include "replication/logical.h"
      38             : #include "replication/message.h"
      39             : #include "replication/reorderbuffer.h"
      40             : #include "replication/snapbuild.h"
      41             : #include "storage/standbydefs.h"
      42             : 
      43             : /* individual record(group)'s handlers */
      44             : static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      45             : static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      46             : static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      47             : static void DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      48             : static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      49             : static void DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      50             : 
      51             : static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
      52             :                          xl_xact_parsed_commit *parsed, TransactionId xid,
      53             :                          bool two_phase);
      54             : static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
      55             :                         xl_xact_parsed_abort *parsed, TransactionId xid,
      56             :                         bool two_phase);
      57             : static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
      58             :                           xl_xact_parsed_prepare *parsed);
      59             : 
      60             : 
      61             : /* common function to decode tuples */
      62             : static void DecodeXLogTuple(char *data, Size len, HeapTuple tuple);
      63             : 
      64             : /* helper functions for decoding transactions */
      65             : static inline bool FilterPrepare(LogicalDecodingContext *ctx,
      66             :                                  TransactionId xid, const char *gid);
      67             : static bool DecodeTXNNeedSkip(LogicalDecodingContext *ctx,
      68             :                               XLogRecordBuffer *buf, Oid txn_dbid,
      69             :                               RepOriginId origin_id);
      70             : 
      71             : /*
      72             :  * Take every XLogReadRecord()ed record and perform the actions required to
      73             :  * decode it using the output plugin already setup in the logical decoding
      74             :  * context.
      75             :  *
      76             :  * NB: Note that every record's xid needs to be processed by reorderbuffer
      77             :  * (xids contained in the content of records are not relevant for this rule).
      78             :  * That means that for records which'd otherwise not go through the
      79             :  * reorderbuffer ReorderBufferProcessXid() has to be called. We don't want to
      80             :  * call ReorderBufferProcessXid for each record type by default, because
      81             :  * e.g. empty xacts can be handled more efficiently if there's no previous
      82             :  * state for them.
      83             :  *
      84             :  * We also support the ability to fast forward thru records, skipping some
      85             :  * record types completely - see individual record types for details.
      86             :  */
      87             : void
      88     4946102 : LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
      89             : {
      90             :     XLogRecordBuffer buf;
      91             :     TransactionId txid;
      92             :     RmgrData    rmgr;
      93             : 
      94     4946102 :     buf.origptr = ctx->reader->ReadRecPtr;
      95     4946102 :     buf.endptr = ctx->reader->EndRecPtr;
      96     4946102 :     buf.record = record;
      97             : 
      98     4946102 :     txid = XLogRecGetTopXid(record);
      99             : 
     100             :     /*
     101             :      * If the top-level xid is valid, we need to assign the subxact to the
     102             :      * top-level xact. We need to do this for all records, hence we do it
     103             :      * before the switch.
     104             :      */
     105     4946102 :     if (TransactionIdIsValid(txid))
     106             :     {
     107        1354 :         ReorderBufferAssignChild(ctx->reorder,
     108             :                                  txid,
     109        1354 :                                  XLogRecGetXid(record),
     110             :                                  buf.origptr);
     111             :     }
     112             : 
     113     4946102 :     rmgr = GetRmgr(XLogRecGetRmid(record));
     114             : 
     115     4946102 :     if (rmgr.rm_decode != NULL)
     116     3849042 :         rmgr.rm_decode(ctx, &buf);
     117             :     else
     118             :     {
     119             :         /* just deal with xid, and done */
     120     1097060 :         ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(record),
     121             :                                 buf.origptr);
     122             :     }
     123     4946090 : }
     124             : 
     125             : /*
     126             :  * Handle rmgr XLOG_ID records for LogicalDecodingProcessRecord().
     127             :  */
     128             : void
     129       11022 : xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     130             : {
     131       11022 :     SnapBuild  *builder = ctx->snapshot_builder;
     132       11022 :     uint8       info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK;
     133             : 
     134       11022 :     ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(buf->record),
     135             :                             buf->origptr);
     136             : 
     137       11022 :     switch (info)
     138             :     {
     139             :             /* this is also used in END_OF_RECOVERY checkpoints */
     140          74 :         case XLOG_CHECKPOINT_SHUTDOWN:
     141             :         case XLOG_END_OF_RECOVERY:
     142          74 :             SnapBuildSerializationPoint(builder, buf->origptr);
     143             : 
     144          74 :             break;
     145         104 :         case XLOG_CHECKPOINT_ONLINE:
     146             : 
     147             :             /*
     148             :              * a RUNNING_XACTS record will have been logged near to this, we
     149             :              * can restart from there.
     150             :              */
     151         104 :             break;
     152           0 :         case XLOG_PARAMETER_CHANGE:
     153             :             {
     154           0 :                 xl_parameter_change *xlrec =
     155           0 :                     (xl_parameter_change *) XLogRecGetData(buf->record);
     156             : 
     157             :                 /*
     158             :                  * If wal_level on the primary is reduced to less than
     159             :                  * logical, we want to prevent existing logical slots from
     160             :                  * being used.  Existing logical slots on the standby get
     161             :                  * invalidated when this WAL record is replayed; and further,
     162             :                  * slot creation fails when wal_level is not sufficient; but
     163             :                  * all these operations are not synchronized, so a logical
     164             :                  * slot may creep in while the wal_level is being reduced.
     165             :                  * Hence this extra check.
     166             :                  */
     167           0 :                 if (xlrec->wal_level < WAL_LEVEL_LOGICAL)
     168             :                 {
     169             :                     /*
     170             :                      * This can occur only on a standby, as a primary would
     171             :                      * not allow to restart after changing wal_level < logical
     172             :                      * if there is pre-existing logical slot.
     173             :                      */
     174             :                     Assert(RecoveryInProgress());
     175           0 :                     ereport(ERROR,
     176             :                             (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     177             :                              errmsg("logical decoding on standby requires wal_level >= logical on the primary")));
     178             :                 }
     179           0 :                 break;
     180             :             }
     181       10844 :         case XLOG_NOOP:
     182             :         case XLOG_NEXTOID:
     183             :         case XLOG_SWITCH:
     184             :         case XLOG_BACKUP_END:
     185             :         case XLOG_RESTORE_POINT:
     186             :         case XLOG_FPW_CHANGE:
     187             :         case XLOG_FPI_FOR_HINT:
     188             :         case XLOG_FPI:
     189             :         case XLOG_OVERWRITE_CONTRECORD:
     190             :         case XLOG_CHECKPOINT_REDO:
     191       10844 :             break;
     192           0 :         default:
     193           0 :             elog(ERROR, "unexpected RM_XLOG_ID record type: %u", info);
     194             :     }
     195       11022 : }
     196             : 
     197             : /*
     198             :  * Handle rmgr XACT_ID records for LogicalDecodingProcessRecord().
     199             :  */
     200             : void
     201       14818 : xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     202             : {
     203       14818 :     SnapBuild  *builder = ctx->snapshot_builder;
     204       14818 :     ReorderBuffer *reorder = ctx->reorder;
     205       14818 :     XLogReaderState *r = buf->record;
     206       14818 :     uint8       info = XLogRecGetInfo(r) & XLOG_XACT_OPMASK;
     207             : 
     208             :     /*
     209             :      * If the snapshot isn't yet fully built, we cannot decode anything, so
     210             :      * bail out.
     211             :      */
     212       14818 :     if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
     213          34 :         return;
     214             : 
     215       14784 :     switch (info)
     216             :     {
     217        5194 :         case XLOG_XACT_COMMIT:
     218             :         case XLOG_XACT_COMMIT_PREPARED:
     219             :             {
     220             :                 xl_xact_commit *xlrec;
     221             :                 xl_xact_parsed_commit parsed;
     222             :                 TransactionId xid;
     223        5194 :                 bool        two_phase = false;
     224             : 
     225        5194 :                 xlrec = (xl_xact_commit *) XLogRecGetData(r);
     226        5194 :                 ParseCommitRecord(XLogRecGetInfo(buf->record), xlrec, &parsed);
     227             : 
     228        5194 :                 if (!TransactionIdIsValid(parsed.twophase_xid))
     229        5024 :                     xid = XLogRecGetXid(r);
     230             :                 else
     231         170 :                     xid = parsed.twophase_xid;
     232             : 
     233             :                 /*
     234             :                  * We would like to process the transaction in a two-phase
     235             :                  * manner iff output plugin supports two-phase commits and
     236             :                  * doesn't filter the transaction at prepare time.
     237             :                  */
     238        5194 :                 if (info == XLOG_XACT_COMMIT_PREPARED)
     239         170 :                     two_phase = !(FilterPrepare(ctx, xid,
     240         170 :                                                 parsed.twophase_gid));
     241             : 
     242        5194 :                 DecodeCommit(ctx, buf, &parsed, xid, two_phase);
     243        5186 :                 break;
     244             :             }
     245         216 :         case XLOG_XACT_ABORT:
     246             :         case XLOG_XACT_ABORT_PREPARED:
     247             :             {
     248             :                 xl_xact_abort *xlrec;
     249             :                 xl_xact_parsed_abort parsed;
     250             :                 TransactionId xid;
     251         216 :                 bool        two_phase = false;
     252             : 
     253         216 :                 xlrec = (xl_xact_abort *) XLogRecGetData(r);
     254         216 :                 ParseAbortRecord(XLogRecGetInfo(buf->record), xlrec, &parsed);
     255             : 
     256         216 :                 if (!TransactionIdIsValid(parsed.twophase_xid))
     257         144 :                     xid = XLogRecGetXid(r);
     258             :                 else
     259          72 :                     xid = parsed.twophase_xid;
     260             : 
     261             :                 /*
     262             :                  * We would like to process the transaction in a two-phase
     263             :                  * manner iff output plugin supports two-phase commits and
     264             :                  * doesn't filter the transaction at prepare time.
     265             :                  */
     266         216 :                 if (info == XLOG_XACT_ABORT_PREPARED)
     267          72 :                     two_phase = !(FilterPrepare(ctx, xid,
     268          72 :                                                 parsed.twophase_gid));
     269             : 
     270         216 :                 DecodeAbort(ctx, buf, &parsed, xid, two_phase);
     271         216 :                 break;
     272             :             }
     273         260 :         case XLOG_XACT_ASSIGNMENT:
     274             : 
     275             :             /*
     276             :              * We assign subxact to the toplevel xact while processing each
     277             :              * record if required.  So, we don't need to do anything here. See
     278             :              * LogicalDecodingProcessRecord.
     279             :              */
     280         260 :             break;
     281        8840 :         case XLOG_XACT_INVALIDATIONS:
     282             :             {
     283             :                 TransactionId xid;
     284             :                 xl_xact_invals *invals;
     285             : 
     286        8840 :                 xid = XLogRecGetXid(r);
     287        8840 :                 invals = (xl_xact_invals *) XLogRecGetData(r);
     288             : 
     289             :                 /*
     290             :                  * Execute the invalidations for xid-less transactions,
     291             :                  * otherwise, accumulate them so that they can be processed at
     292             :                  * the commit time.
     293             :                  */
     294        8840 :                 if (TransactionIdIsValid(xid))
     295             :                 {
     296        8836 :                     if (!ctx->fast_forward)
     297        8800 :                         ReorderBufferAddInvalidations(reorder, xid,
     298             :                                                       buf->origptr,
     299        8800 :                                                       invals->nmsgs,
     300        8800 :                                                       invals->msgs);
     301        8836 :                     ReorderBufferXidSetCatalogChanges(ctx->reorder, xid,
     302             :                                                       buf->origptr);
     303             :                 }
     304           4 :                 else if ((!ctx->fast_forward))
     305           4 :                     ReorderBufferImmediateInvalidation(ctx->reorder,
     306           4 :                                                        invals->nmsgs,
     307           4 :                                                        invals->msgs);
     308             :             }
     309        8840 :             break;
     310         274 :         case XLOG_XACT_PREPARE:
     311             :             {
     312             :                 xl_xact_parsed_prepare parsed;
     313             :                 xl_xact_prepare *xlrec;
     314             : 
     315             :                 /* ok, parse it */
     316         274 :                 xlrec = (xl_xact_prepare *) XLogRecGetData(r);
     317         274 :                 ParsePrepareRecord(XLogRecGetInfo(buf->record),
     318             :                                    xlrec, &parsed);
     319             : 
     320             :                 /*
     321             :                  * We would like to process the transaction in a two-phase
     322             :                  * manner iff output plugin supports two-phase commits and
     323             :                  * doesn't filter the transaction at prepare time.
     324             :                  */
     325         274 :                 if (FilterPrepare(ctx, parsed.twophase_xid,
     326             :                                   parsed.twophase_gid))
     327             :                 {
     328          20 :                     ReorderBufferProcessXid(reorder, parsed.twophase_xid,
     329             :                                             buf->origptr);
     330          20 :                     break;
     331             :                 }
     332             : 
     333             :                 /*
     334             :                  * Note that if the prepared transaction has locked [user]
     335             :                  * catalog tables exclusively then decoding prepare can block
     336             :                  * till the main transaction is committed because it needs to
     337             :                  * lock the catalog tables.
     338             :                  *
     339             :                  * XXX Now, this can even lead to a deadlock if the prepare
     340             :                  * transaction is waiting to get it logically replicated for
     341             :                  * distributed 2PC. This can be avoided by disallowing
     342             :                  * preparing transactions that have locked [user] catalog
     343             :                  * tables exclusively but as of now, we ask users not to do
     344             :                  * such an operation.
     345             :                  */
     346         254 :                 DecodePrepare(ctx, buf, &parsed);
     347         254 :                 break;
     348             :             }
     349           0 :         default:
     350           0 :             elog(ERROR, "unexpected RM_XACT_ID record type: %u", info);
     351             :     }
     352             : }
     353             : 
     354             : /*
     355             :  * Handle rmgr STANDBY_ID records for LogicalDecodingProcessRecord().
     356             :  */
     357             : void
     358        6368 : standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     359             : {
     360        6368 :     SnapBuild  *builder = ctx->snapshot_builder;
     361        6368 :     XLogReaderState *r = buf->record;
     362        6368 :     uint8       info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
     363             : 
     364        6368 :     ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
     365             : 
     366        6368 :     switch (info)
     367             :     {
     368        2430 :         case XLOG_RUNNING_XACTS:
     369             :             {
     370        2430 :                 xl_running_xacts *running = (xl_running_xacts *) XLogRecGetData(r);
     371             : 
     372        2430 :                 SnapBuildProcessRunningXacts(builder, buf->origptr, running);
     373             : 
     374             :                 /*
     375             :                  * Abort all transactions that we keep track of, that are
     376             :                  * older than the record's oldestRunningXid. This is the most
     377             :                  * convenient spot for doing so since, in contrast to shutdown
     378             :                  * or end-of-recovery checkpoints, we have information about
     379             :                  * all running transactions which includes prepared ones,
     380             :                  * while shutdown checkpoints just know that no non-prepared
     381             :                  * transactions are in progress.
     382             :                  */
     383        2426 :                 ReorderBufferAbortOld(ctx->reorder, running->oldestRunningXid);
     384             :             }
     385        2426 :             break;
     386        3934 :         case XLOG_STANDBY_LOCK:
     387        3934 :             break;
     388           4 :         case XLOG_INVALIDATIONS:
     389             : 
     390             :             /*
     391             :              * We are processing the invalidations at the command level via
     392             :              * XLOG_XACT_INVALIDATIONS.  So we don't need to do anything here.
     393             :              */
     394           4 :             break;
     395           0 :         default:
     396           0 :             elog(ERROR, "unexpected RM_STANDBY_ID record type: %u", info);
     397             :     }
     398        6364 : }
     399             : 
     400             : /*
     401             :  * Handle rmgr HEAP2_ID records for LogicalDecodingProcessRecord().
     402             :  */
     403             : void
     404       59218 : heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     405             : {
     406       59218 :     uint8       info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK;
     407       59218 :     TransactionId xid = XLogRecGetXid(buf->record);
     408       59218 :     SnapBuild  *builder = ctx->snapshot_builder;
     409             : 
     410       59218 :     ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
     411             : 
     412             :     /*
     413             :      * If we don't have snapshot or we are just fast-forwarding, there is no
     414             :      * point in decoding changes.
     415             :      */
     416       59218 :     if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
     417       59188 :         ctx->fast_forward)
     418         274 :         return;
     419             : 
     420       58944 :     switch (info)
     421             :     {
     422       10638 :         case XLOG_HEAP2_MULTI_INSERT:
     423       10638 :             if (SnapBuildProcessChange(builder, xid, buf->origptr))
     424       10638 :                 DecodeMultiInsert(ctx, buf);
     425       10638 :             break;
     426       43466 :         case XLOG_HEAP2_NEW_CID:
     427             :             {
     428             :                 xl_heap_new_cid *xlrec;
     429             : 
     430       43466 :                 xlrec = (xl_heap_new_cid *) XLogRecGetData(buf->record);
     431       43466 :                 SnapBuildProcessNewCid(builder, xid, buf->origptr, xlrec);
     432             : 
     433       43466 :                 break;
     434             :             }
     435         178 :         case XLOG_HEAP2_REWRITE:
     436             : 
     437             :             /*
     438             :              * Although these records only exist to serve the needs of logical
     439             :              * decoding, all the work happens as part of crash or archive
     440             :              * recovery, so we don't need to do anything here.
     441             :              */
     442         178 :             break;
     443             : 
     444             :             /*
     445             :              * Everything else here is just low level physical stuff we're not
     446             :              * interested in.
     447             :              */
     448        4662 :         case XLOG_HEAP2_PRUNE_ON_ACCESS:
     449             :         case XLOG_HEAP2_PRUNE_VACUUM_SCAN:
     450             :         case XLOG_HEAP2_PRUNE_VACUUM_CLEANUP:
     451             :         case XLOG_HEAP2_VISIBLE:
     452             :         case XLOG_HEAP2_LOCK_UPDATED:
     453        4662 :             break;
     454           0 :         default:
     455           0 :             elog(ERROR, "unexpected RM_HEAP2_ID record type: %u", info);
     456             :     }
     457             : }
     458             : 
     459             : /*
     460             :  * Handle rmgr HEAP_ID records for LogicalDecodingProcessRecord().
     461             :  */
     462             : void
     463     3757504 : heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     464             : {
     465     3757504 :     uint8       info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK;
     466     3757504 :     TransactionId xid = XLogRecGetXid(buf->record);
     467     3757504 :     SnapBuild  *builder = ctx->snapshot_builder;
     468             : 
     469     3757504 :     ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
     470             : 
     471             :     /*
     472             :      * If we don't have snapshot or we are just fast-forwarding, there is no
     473             :      * point in decoding data changes.
     474             :      */
     475     3757504 :     if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
     476     3757494 :         ctx->fast_forward)
     477         234 :         return;
     478             : 
     479     3757270 :     switch (info)
     480             :     {
     481     2391052 :         case XLOG_HEAP_INSERT:
     482     2391052 :             if (SnapBuildProcessChange(builder, xid, buf->origptr))
     483     2391052 :                 DecodeInsert(ctx, buf);
     484     2391052 :             break;
     485             : 
     486             :             /*
     487             :              * Treat HOT update as normal updates. There is no useful
     488             :              * information in the fact that we could make it a HOT update
     489             :              * locally and the WAL layout is compatible.
     490             :              */
     491      414206 :         case XLOG_HEAP_HOT_UPDATE:
     492             :         case XLOG_HEAP_UPDATE:
     493      414206 :             if (SnapBuildProcessChange(builder, xid, buf->origptr))
     494      414206 :                 DecodeUpdate(ctx, buf);
     495      414206 :             break;
     496             : 
     497      533576 :         case XLOG_HEAP_DELETE:
     498      533576 :             if (SnapBuildProcessChange(builder, xid, buf->origptr))
     499      533576 :                 DecodeDelete(ctx, buf);
     500      533576 :             break;
     501             : 
     502          88 :         case XLOG_HEAP_TRUNCATE:
     503          88 :             if (SnapBuildProcessChange(builder, xid, buf->origptr))
     504          88 :                 DecodeTruncate(ctx, buf);
     505          88 :             break;
     506             : 
     507        1900 :         case XLOG_HEAP_INPLACE:
     508             : 
     509             :             /*
     510             :              * Inplace updates are only ever performed on catalog tuples and
     511             :              * can, per definition, not change tuple visibility.  Since we
     512             :              * don't decode catalog tuples, we're not interested in the
     513             :              * record's contents.
     514             :              *
     515             :              * In-place updates can be used either by XID-bearing transactions
     516             :              * (e.g.  in CREATE INDEX CONCURRENTLY) or by XID-less
     517             :              * transactions (e.g.  VACUUM).  In the former case, the commit
     518             :              * record will include cache invalidations, so we mark the
     519             :              * transaction as catalog modifying here. Currently that's
     520             :              * redundant because the commit will do that as well, but once we
     521             :              * support decoding in-progress relations, this will be important.
     522             :              */
     523        1900 :             if (!TransactionIdIsValid(xid))
     524          10 :                 break;
     525             : 
     526        1890 :             (void) SnapBuildProcessChange(builder, xid, buf->origptr);
     527        1890 :             ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr);
     528        1890 :             break;
     529             : 
     530       35832 :         case XLOG_HEAP_CONFIRM:
     531       35832 :             if (SnapBuildProcessChange(builder, xid, buf->origptr))
     532       35832 :                 DecodeSpecConfirm(ctx, buf);
     533       35832 :             break;
     534             : 
     535      380616 :         case XLOG_HEAP_LOCK:
     536             :             /* we don't care about row level locks for now */
     537      380616 :             break;
     538             : 
     539           0 :         default:
     540           0 :             elog(ERROR, "unexpected RM_HEAP_ID record type: %u", info);
     541             :             break;
     542             :     }
     543             : }
     544             : 
     545             : /*
     546             :  * Ask output plugin whether we want to skip this PREPARE and send
     547             :  * this transaction as a regular commit later.
     548             :  */
     549             : static inline bool
     550         516 : FilterPrepare(LogicalDecodingContext *ctx, TransactionId xid,
     551             :               const char *gid)
     552             : {
     553             :     /*
     554             :      * Skip if decoding of two-phase transactions at PREPARE time is not
     555             :      * enabled. In that case, all two-phase transactions are considered
     556             :      * filtered out and will be applied as regular transactions at COMMIT
     557             :      * PREPARED.
     558             :      */
     559         516 :     if (!ctx->twophase)
     560          20 :         return true;
     561             : 
     562             :     /*
     563             :      * The filter_prepare callback is optional. When not supplied, all
     564             :      * prepared transactions should go through.
     565             :      */
     566         496 :     if (ctx->callbacks.filter_prepare_cb == NULL)
     567         262 :         return false;
     568             : 
     569         234 :     return filter_prepare_cb_wrapper(ctx, xid, gid);
     570             : }
     571             : 
     572             : static inline bool
     573     3370684 : FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
     574             : {
     575     3370684 :     if (ctx->callbacks.filter_by_origin_cb == NULL)
     576          46 :         return false;
     577             : 
     578     3370638 :     return filter_by_origin_cb_wrapper(ctx, origin_id);
     579             : }
     580             : 
     581             : /*
     582             :  * Handle rmgr LOGICALMSG_ID records for LogicalDecodingProcessRecord().
     583             :  */
     584             : void
     585         112 : logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     586             : {
     587         112 :     SnapBuild  *builder = ctx->snapshot_builder;
     588         112 :     XLogReaderState *r = buf->record;
     589         112 :     TransactionId xid = XLogRecGetXid(r);
     590         112 :     uint8       info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
     591         112 :     RepOriginId origin_id = XLogRecGetOrigin(r);
     592         112 :     Snapshot    snapshot = NULL;
     593             :     xl_logical_message *message;
     594             : 
     595         112 :     if (info != XLOG_LOGICAL_MESSAGE)
     596           0 :         elog(ERROR, "unexpected RM_LOGICALMSG_ID record type: %u", info);
     597             : 
     598         112 :     ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
     599             : 
     600             :     /* If we don't have snapshot, there is no point in decoding messages */
     601         112 :     if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
     602           0 :         return;
     603             : 
     604         112 :     message = (xl_logical_message *) XLogRecGetData(r);
     605             : 
     606         220 :     if (message->dbId != ctx->slot->data.database ||
     607         108 :         FilterByOrigin(ctx, origin_id))
     608           8 :         return;
     609             : 
     610         104 :     if (message->transactional &&
     611          76 :         !SnapBuildProcessChange(builder, xid, buf->origptr))
     612           0 :         return;
     613         132 :     else if (!message->transactional &&
     614          56 :              (SnapBuildCurrentState(builder) != SNAPBUILD_CONSISTENT ||
     615          28 :               SnapBuildXactNeedsSkip(builder, buf->origptr)))
     616           8 :         return;
     617             : 
     618             :     /*
     619             :      * We also skip decoding in fast_forward mode. This check must be last
     620             :      * because we don't want to set the processing_required flag unless we
     621             :      * have a decodable message.
     622             :      */
     623          96 :     if (ctx->fast_forward)
     624             :     {
     625             :         /*
     626             :          * We need to set processing_required flag to notify the message's
     627             :          * existence to the caller. Usually, the flag is set when either the
     628             :          * COMMIT or ABORT records are decoded, but this must be turned on
     629             :          * here because the non-transactional logical message is decoded
     630             :          * without waiting for these records.
     631             :          */
     632           4 :         if (!message->transactional)
     633           4 :             ctx->processing_required = true;
     634             : 
     635           4 :         return;
     636             :     }
     637             : 
     638             :     /*
     639             :      * If this is a non-transactional change, get the snapshot we're expected
     640             :      * to use. We only get here when the snapshot is consistent, and the
     641             :      * change is not meant to be skipped.
     642             :      *
     643             :      * For transactional changes we don't need a snapshot, we'll use the
     644             :      * regular snapshot maintained by ReorderBuffer. We just leave it NULL.
     645             :      */
     646          92 :     if (!message->transactional)
     647          16 :         snapshot = SnapBuildGetOrBuildSnapshot(builder);
     648             : 
     649          92 :     ReorderBufferQueueMessage(ctx->reorder, xid, snapshot, buf->endptr,
     650          92 :                               message->transactional,
     651          92 :                               message->message, /* first part of message is
     652             :                                                  * prefix */
     653             :                               message->message_size,
     654          92 :                               message->message + message->prefix_size);
     655             : }
     656             : 
     657             : /*
     658             :  * Consolidated commit record handling between the different form of commit
     659             :  * records.
     660             :  *
     661             :  * 'two_phase' indicates that caller wants to process the transaction in two
     662             :  * phases, first process prepare if not already done and then process
     663             :  * commit_prepared.
     664             :  */
     665             : static void
     666        5194 : DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
     667             :              xl_xact_parsed_commit *parsed, TransactionId xid,
     668             :              bool two_phase)
     669             : {
     670        5194 :     XLogRecPtr  origin_lsn = InvalidXLogRecPtr;
     671        5194 :     TimestampTz commit_time = parsed->xact_time;
     672        5194 :     RepOriginId origin_id = XLogRecGetOrigin(buf->record);
     673             :     int         i;
     674             : 
     675        5194 :     if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
     676             :     {
     677          94 :         origin_lsn = parsed->origin_lsn;
     678          94 :         commit_time = parsed->origin_timestamp;
     679             :     }
     680             : 
     681        5194 :     SnapBuildCommitTxn(ctx->snapshot_builder, buf->origptr, xid,
     682             :                        parsed->nsubxacts, parsed->subxacts,
     683             :                        parsed->xinfo);
     684             : 
     685             :     /* ----
     686             :      * Check whether we are interested in this specific transaction, and tell
     687             :      * the reorderbuffer to forget the content of the (sub-)transactions
     688             :      * if not.
     689             :      *
     690             :      * We can't just use ReorderBufferAbort() here, because we need to execute
     691             :      * the transaction's invalidations.  This currently won't be needed if
     692             :      * we're just skipping over the transaction because currently we only do
     693             :      * so during startup, to get to the first transaction the client needs. As
     694             :      * we have reset the catalog caches before starting to read WAL, and we
     695             :      * haven't yet touched any catalogs, there can't be anything to invalidate.
     696             :      * But if we're "forgetting" this commit because it happened in another
     697             :      * database, the invalidations might be important, because they could be
     698             :      * for shared catalogs and we might have loaded data into the relevant
     699             :      * syscaches.
     700             :      * ---
     701             :      */
     702        5194 :     if (DecodeTXNNeedSkip(ctx, buf, parsed->dbId, origin_id))
     703             :     {
     704        4860 :         for (i = 0; i < parsed->nsubxacts; i++)
     705             :         {
     706        1964 :             ReorderBufferForget(ctx->reorder, parsed->subxacts[i], buf->origptr);
     707             :         }
     708        2896 :         ReorderBufferForget(ctx->reorder, xid, buf->origptr);
     709             : 
     710        2896 :         return;
     711             :     }
     712             : 
     713             :     /* tell the reorderbuffer about the surviving subtransactions */
     714        2830 :     for (i = 0; i < parsed->nsubxacts; i++)
     715             :     {
     716         532 :         ReorderBufferCommitChild(ctx->reorder, xid, parsed->subxacts[i],
     717             :                                  buf->origptr, buf->endptr);
     718             :     }
     719             : 
     720             :     /*
     721             :      * Send the final commit record if the transaction data is already
     722             :      * decoded, otherwise, process the entire transaction.
     723             :      */
     724        2298 :     if (two_phase)
     725             :     {
     726          60 :         ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
     727          60 :                                     SnapBuildGetTwoPhaseAt(ctx->snapshot_builder),
     728             :                                     commit_time, origin_id, origin_lsn,
     729          60 :                                     parsed->twophase_gid, true);
     730             :     }
     731             :     else
     732             :     {
     733        2238 :         ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
     734             :                             commit_time, origin_id, origin_lsn);
     735             :     }
     736             : 
     737             :     /*
     738             :      * Update the decoding stats at transaction prepare/commit/abort.
     739             :      * Additionally we send the stats when we spill or stream the changes to
     740             :      * avoid losing them in case the decoding is interrupted. It is not clear
     741             :      * that sending more or less frequently than this would be better.
     742             :      */
     743        2290 :     UpdateDecodingStats(ctx);
     744             : }
     745             : 
     746             : /*
     747             :  * Decode PREPARE record. Similar logic as in DecodeCommit.
     748             :  *
     749             :  * Note that we don't skip prepare even if have detected concurrent abort
     750             :  * because it is quite possible that we had already sent some changes before we
     751             :  * detect abort in which case we need to abort those changes in the subscriber.
     752             :  * To abort such changes, we do send the prepare and then the rollback prepared
     753             :  * which is what happened on the publisher-side as well. Now, we can invent a
     754             :  * new abort API wherein in such cases we send abort and skip sending prepared
     755             :  * and rollback prepared but then it is not that straightforward because we
     756             :  * might have streamed this transaction by that time in which case it is
     757             :  * handled when the rollback is encountered. It is not impossible to optimize
     758             :  * the concurrent abort case but it can introduce design complexity w.r.t
     759             :  * handling different cases so leaving it for now as it doesn't seem worth it.
     760             :  */
     761             : static void
     762         254 : DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
     763             :               xl_xact_parsed_prepare *parsed)
     764             : {
     765         254 :     SnapBuild  *builder = ctx->snapshot_builder;
     766         254 :     XLogRecPtr  origin_lsn = parsed->origin_lsn;
     767         254 :     TimestampTz prepare_time = parsed->xact_time;
     768         254 :     RepOriginId origin_id = XLogRecGetOrigin(buf->record);
     769             :     int         i;
     770         254 :     TransactionId xid = parsed->twophase_xid;
     771             : 
     772         254 :     if (parsed->origin_timestamp != 0)
     773          16 :         prepare_time = parsed->origin_timestamp;
     774             : 
     775             :     /*
     776             :      * Remember the prepare info for a txn so that it can be used later in
     777             :      * commit prepared if required. See ReorderBufferFinishPrepared.
     778             :      */
     779         254 :     if (!ReorderBufferRememberPrepareInfo(ctx->reorder, xid, buf->origptr,
     780             :                                           buf->endptr, prepare_time, origin_id,
     781             :                                           origin_lsn))
     782           0 :         return;
     783             : 
     784             :     /* We can't start streaming unless a consistent state is reached. */
     785         254 :     if (SnapBuildCurrentState(builder) < SNAPBUILD_CONSISTENT)
     786             :     {
     787           6 :         ReorderBufferSkipPrepare(ctx->reorder, xid);
     788           6 :         return;
     789             :     }
     790             : 
     791             :     /*
     792             :      * Check whether we need to process this transaction. See
     793             :      * DecodeTXNNeedSkip for the reasons why we sometimes want to skip the
     794             :      * transaction.
     795             :      *
     796             :      * We can't call ReorderBufferForget as we did in DecodeCommit as the txn
     797             :      * hasn't yet been committed, removing this txn before a commit might
     798             :      * result in the computation of an incorrect restart_lsn. See
     799             :      * SnapBuildProcessRunningXacts. But we need to process cache
     800             :      * invalidations if there are any for the reasons mentioned in
     801             :      * DecodeCommit.
     802             :      */
     803         248 :     if (DecodeTXNNeedSkip(ctx, buf, parsed->dbId, origin_id))
     804             :     {
     805         170 :         ReorderBufferSkipPrepare(ctx->reorder, xid);
     806         170 :         ReorderBufferInvalidate(ctx->reorder, xid, buf->origptr);
     807         170 :         return;
     808             :     }
     809             : 
     810             :     /* Tell the reorderbuffer about the surviving subtransactions. */
     811          80 :     for (i = 0; i < parsed->nsubxacts; i++)
     812             :     {
     813           2 :         ReorderBufferCommitChild(ctx->reorder, xid, parsed->subxacts[i],
     814             :                                  buf->origptr, buf->endptr);
     815             :     }
     816             : 
     817             :     /* replay actions of all transaction + subtransactions in order */
     818          78 :     ReorderBufferPrepare(ctx->reorder, xid, parsed->twophase_gid);
     819             : 
     820             :     /*
     821             :      * Update the decoding stats at transaction prepare/commit/abort.
     822             :      * Additionally we send the stats when we spill or stream the changes to
     823             :      * avoid losing them in case the decoding is interrupted. It is not clear
     824             :      * that sending more or less frequently than this would be better.
     825             :      */
     826          78 :     UpdateDecodingStats(ctx);
     827             : }
     828             : 
     829             : 
     830             : /*
     831             :  * Get the data from the various forms of abort records and pass it on to
     832             :  * snapbuild.c and reorderbuffer.c.
     833             :  *
     834             :  * 'two_phase' indicates to finish prepared transaction.
     835             :  */
     836             : static void
     837         216 : DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
     838             :             xl_xact_parsed_abort *parsed, TransactionId xid,
     839             :             bool two_phase)
     840             : {
     841             :     int         i;
     842         216 :     XLogRecPtr  origin_lsn = InvalidXLogRecPtr;
     843         216 :     TimestampTz abort_time = parsed->xact_time;
     844         216 :     RepOriginId origin_id = XLogRecGetOrigin(buf->record);
     845             :     bool        skip_xact;
     846             : 
     847         216 :     if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
     848             :     {
     849           8 :         origin_lsn = parsed->origin_lsn;
     850           8 :         abort_time = parsed->origin_timestamp;
     851             :     }
     852             : 
     853             :     /*
     854             :      * Check whether we need to process this transaction. See
     855             :      * DecodeTXNNeedSkip for the reasons why we sometimes want to skip the
     856             :      * transaction.
     857             :      */
     858         216 :     skip_xact = DecodeTXNNeedSkip(ctx, buf, parsed->dbId, origin_id);
     859             : 
     860             :     /*
     861             :      * Send the final rollback record for a prepared transaction unless we
     862             :      * need to skip it. For non-two-phase xacts, simply forget the xact.
     863             :      */
     864         216 :     if (two_phase && !skip_xact)
     865             :     {
     866          20 :         ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
     867             :                                     InvalidXLogRecPtr,
     868             :                                     abort_time, origin_id, origin_lsn,
     869          20 :                                     parsed->twophase_gid, false);
     870             :     }
     871             :     else
     872             :     {
     873         208 :         for (i = 0; i < parsed->nsubxacts; i++)
     874             :         {
     875          12 :             ReorderBufferAbort(ctx->reorder, parsed->subxacts[i],
     876          12 :                                buf->record->EndRecPtr, abort_time);
     877             :         }
     878             : 
     879         196 :         ReorderBufferAbort(ctx->reorder, xid, buf->record->EndRecPtr,
     880             :                            abort_time);
     881             :     }
     882             : 
     883             :     /* update the decoding stats */
     884         216 :     UpdateDecodingStats(ctx);
     885         216 : }
     886             : 
     887             : /*
     888             :  * Parse XLOG_HEAP_INSERT (not MULTI_INSERT!) records into tuplebufs.
     889             :  *
     890             :  * Inserts can contain the new tuple.
     891             :  */
     892             : static void
     893     2391052 : DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     894             : {
     895             :     Size        datalen;
     896             :     char       *tupledata;
     897             :     Size        tuplelen;
     898     2391052 :     XLogReaderState *r = buf->record;
     899             :     xl_heap_insert *xlrec;
     900             :     ReorderBufferChange *change;
     901             :     RelFileLocator target_locator;
     902             : 
     903     2391052 :     xlrec = (xl_heap_insert *) XLogRecGetData(r);
     904             : 
     905             :     /*
     906             :      * Ignore insert records without new tuples (this does happen when
     907             :      * raw_heap_insert marks the TOAST record as HEAP_INSERT_NO_LOGICAL).
     908             :      */
     909     2391052 :     if (!(xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE))
     910        6728 :         return;
     911             : 
     912             :     /* only interested in our database */
     913     2384344 :     XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
     914     2384344 :     if (target_locator.dbOid != ctx->slot->data.database)
     915           0 :         return;
     916             : 
     917             :     /* output plugin doesn't look for this origin, no need to queue */
     918     2384344 :     if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
     919          20 :         return;
     920             : 
     921     2384324 :     change = ReorderBufferGetChange(ctx->reorder);
     922     2384324 :     if (!(xlrec->flags & XLH_INSERT_IS_SPECULATIVE))
     923     2348492 :         change->action = REORDER_BUFFER_CHANGE_INSERT;
     924             :     else
     925       35832 :         change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT;
     926     2384324 :     change->origin_id = XLogRecGetOrigin(r);
     927             : 
     928     2384324 :     memcpy(&change->data.tp.rlocator, &target_locator, sizeof(RelFileLocator));
     929             : 
     930     2384324 :     tupledata = XLogRecGetBlockData(r, 0, &datalen);
     931     2384324 :     tuplelen = datalen - SizeOfHeapHeader;
     932             : 
     933     2384324 :     change->data.tp.newtuple =
     934     2384324 :         ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
     935             : 
     936     2384324 :     DecodeXLogTuple(tupledata, datalen, change->data.tp.newtuple);
     937             : 
     938     2384324 :     change->data.tp.clear_toast_afterwards = true;
     939             : 
     940     2384324 :     ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
     941             :                              change,
     942     2384324 :                              xlrec->flags & XLH_INSERT_ON_TOAST_RELATION);
     943             : }
     944             : 
     945             : /*
     946             :  * Parse XLOG_HEAP_UPDATE and XLOG_HEAP_HOT_UPDATE, which have the same layout
     947             :  * in the record, from wal into proper tuplebufs.
     948             :  *
     949             :  * Updates can possibly contain a new tuple and the old primary key.
     950             :  */
     951             : static void
     952      414206 : DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     953             : {
     954      414206 :     XLogReaderState *r = buf->record;
     955             :     xl_heap_update *xlrec;
     956             :     ReorderBufferChange *change;
     957             :     char       *data;
     958             :     RelFileLocator target_locator;
     959             : 
     960      414206 :     xlrec = (xl_heap_update *) XLogRecGetData(r);
     961             : 
     962             :     /* only interested in our database */
     963      414206 :     XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
     964      414206 :     if (target_locator.dbOid != ctx->slot->data.database)
     965          54 :         return;
     966             : 
     967             :     /* output plugin doesn't look for this origin, no need to queue */
     968      414188 :     if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
     969          36 :         return;
     970             : 
     971      414152 :     change = ReorderBufferGetChange(ctx->reorder);
     972      414152 :     change->action = REORDER_BUFFER_CHANGE_UPDATE;
     973      414152 :     change->origin_id = XLogRecGetOrigin(r);
     974      414152 :     memcpy(&change->data.tp.rlocator, &target_locator, sizeof(RelFileLocator));
     975             : 
     976      414152 :     if (xlrec->flags & XLH_UPDATE_CONTAINS_NEW_TUPLE)
     977             :     {
     978             :         Size        datalen;
     979             :         Size        tuplelen;
     980             : 
     981      410962 :         data = XLogRecGetBlockData(r, 0, &datalen);
     982             : 
     983      410962 :         tuplelen = datalen - SizeOfHeapHeader;
     984             : 
     985      410962 :         change->data.tp.newtuple =
     986      410962 :             ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
     987             : 
     988      410962 :         DecodeXLogTuple(data, datalen, change->data.tp.newtuple);
     989             :     }
     990             : 
     991      414152 :     if (xlrec->flags & XLH_UPDATE_CONTAINS_OLD)
     992             :     {
     993             :         Size        datalen;
     994             :         Size        tuplelen;
     995             : 
     996             :         /* caution, remaining data in record is not aligned */
     997         680 :         data = XLogRecGetData(r) + SizeOfHeapUpdate;
     998         680 :         datalen = XLogRecGetDataLen(r) - SizeOfHeapUpdate;
     999         680 :         tuplelen = datalen - SizeOfHeapHeader;
    1000             : 
    1001         680 :         change->data.tp.oldtuple =
    1002         680 :             ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
    1003             : 
    1004         680 :         DecodeXLogTuple(data, datalen, change->data.tp.oldtuple);
    1005             :     }
    1006             : 
    1007      414152 :     change->data.tp.clear_toast_afterwards = true;
    1008             : 
    1009      414152 :     ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
    1010             :                              change, false);
    1011             : }
    1012             : 
    1013             : /*
    1014             :  * Parse XLOG_HEAP_DELETE from wal into proper tuplebufs.
    1015             :  *
    1016             :  * Deletes can possibly contain the old primary key.
    1017             :  */
    1018             : static void
    1019      533576 : DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
    1020             : {
    1021      533576 :     XLogReaderState *r = buf->record;
    1022             :     xl_heap_delete *xlrec;
    1023             :     ReorderBufferChange *change;
    1024             :     RelFileLocator target_locator;
    1025             : 
    1026      533576 :     xlrec = (xl_heap_delete *) XLogRecGetData(r);
    1027             : 
    1028             :     /* only interested in our database */
    1029      533576 :     XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
    1030      533576 :     if (target_locator.dbOid != ctx->slot->data.database)
    1031          52 :         return;
    1032             : 
    1033             :     /* output plugin doesn't look for this origin, no need to queue */
    1034      533528 :     if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
    1035           4 :         return;
    1036             : 
    1037      533524 :     change = ReorderBufferGetChange(ctx->reorder);
    1038             : 
    1039      533524 :     if (xlrec->flags & XLH_DELETE_IS_SUPER)
    1040           0 :         change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT;
    1041             :     else
    1042      533524 :         change->action = REORDER_BUFFER_CHANGE_DELETE;
    1043             : 
    1044      533524 :     change->origin_id = XLogRecGetOrigin(r);
    1045             : 
    1046      533524 :     memcpy(&change->data.tp.rlocator, &target_locator, sizeof(RelFileLocator));
    1047             : 
    1048             :     /* old primary key stored */
    1049      533524 :     if (xlrec->flags & XLH_DELETE_CONTAINS_OLD)
    1050             :     {
    1051      411374 :         Size        datalen = XLogRecGetDataLen(r) - SizeOfHeapDelete;
    1052      411374 :         Size        tuplelen = datalen - SizeOfHeapHeader;
    1053             : 
    1054             :         Assert(XLogRecGetDataLen(r) > (SizeOfHeapDelete + SizeOfHeapHeader));
    1055             : 
    1056      411374 :         change->data.tp.oldtuple =
    1057      411374 :             ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
    1058             : 
    1059      411374 :         DecodeXLogTuple((char *) xlrec + SizeOfHeapDelete,
    1060             :                         datalen, change->data.tp.oldtuple);
    1061             :     }
    1062             : 
    1063      533524 :     change->data.tp.clear_toast_afterwards = true;
    1064             : 
    1065      533524 :     ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
    1066             :                              change, false);
    1067             : }
    1068             : 
    1069             : /*
    1070             :  * Parse XLOG_HEAP_TRUNCATE from wal
    1071             :  */
    1072             : static void
    1073          88 : DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
    1074             : {
    1075          88 :     XLogReaderState *r = buf->record;
    1076             :     xl_heap_truncate *xlrec;
    1077             :     ReorderBufferChange *change;
    1078             : 
    1079          88 :     xlrec = (xl_heap_truncate *) XLogRecGetData(r);
    1080             : 
    1081             :     /* only interested in our database */
    1082          88 :     if (xlrec->dbId != ctx->slot->data.database)
    1083           0 :         return;
    1084             : 
    1085             :     /* output plugin doesn't look for this origin, no need to queue */
    1086          88 :     if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
    1087           0 :         return;
    1088             : 
    1089          88 :     change = ReorderBufferGetChange(ctx->reorder);
    1090          88 :     change->action = REORDER_BUFFER_CHANGE_TRUNCATE;
    1091          88 :     change->origin_id = XLogRecGetOrigin(r);
    1092          88 :     if (xlrec->flags & XLH_TRUNCATE_CASCADE)
    1093           2 :         change->data.truncate.cascade = true;
    1094          88 :     if (xlrec->flags & XLH_TRUNCATE_RESTART_SEQS)
    1095           4 :         change->data.truncate.restart_seqs = true;
    1096          88 :     change->data.truncate.nrelids = xlrec->nrelids;
    1097         176 :     change->data.truncate.relids = ReorderBufferGetRelids(ctx->reorder,
    1098          88 :                                                           xlrec->nrelids);
    1099          88 :     memcpy(change->data.truncate.relids, xlrec->relids,
    1100          88 :            xlrec->nrelids * sizeof(Oid));
    1101          88 :     ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r),
    1102             :                              buf->origptr, change, false);
    1103             : }
    1104             : 
    1105             : /*
    1106             :  * Decode XLOG_HEAP2_MULTI_INSERT record into multiple tuplebufs.
    1107             :  *
    1108             :  * Currently MULTI_INSERT will always contain the full tuples.
    1109             :  */
    1110             : static void
    1111       10638 : DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
    1112             : {
    1113       10638 :     XLogReaderState *r = buf->record;
    1114             :     xl_heap_multi_insert *xlrec;
    1115             :     int         i;
    1116             :     char       *data;
    1117             :     char       *tupledata;
    1118             :     Size        tuplelen;
    1119             :     RelFileLocator rlocator;
    1120             : 
    1121       10638 :     xlrec = (xl_heap_multi_insert *) XLogRecGetData(r);
    1122             : 
    1123             :     /*
    1124             :      * Ignore insert records without new tuples.  This happens when a
    1125             :      * multi_insert is done on a catalog or on a non-persistent relation.
    1126             :      */
    1127       10638 :     if (!(xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE))
    1128       10610 :         return;
    1129             : 
    1130             :     /* only interested in our database */
    1131         132 :     XLogRecGetBlockTag(r, 0, &rlocator, NULL, NULL);
    1132         132 :     if (rlocator.dbOid != ctx->slot->data.database)
    1133         102 :         return;
    1134             : 
    1135             :     /* output plugin doesn't look for this origin, no need to queue */
    1136          30 :     if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
    1137           2 :         return;
    1138             : 
    1139             :     /*
    1140             :      * We know that this multi_insert isn't for a catalog, so the block should
    1141             :      * always have data even if a full-page write of it is taken.
    1142             :      */
    1143          28 :     tupledata = XLogRecGetBlockData(r, 0, &tuplelen);
    1144             :     Assert(tupledata != NULL);
    1145             : 
    1146          28 :     data = tupledata;
    1147        2106 :     for (i = 0; i < xlrec->ntuples; i++)
    1148             :     {
    1149             :         ReorderBufferChange *change;
    1150             :         xl_multi_insert_tuple *xlhdr;
    1151             :         int         datalen;
    1152             :         HeapTuple   tuple;
    1153             :         HeapTupleHeader header;
    1154             : 
    1155        2078 :         change = ReorderBufferGetChange(ctx->reorder);
    1156        2078 :         change->action = REORDER_BUFFER_CHANGE_INSERT;
    1157        2078 :         change->origin_id = XLogRecGetOrigin(r);
    1158             : 
    1159        2078 :         memcpy(&change->data.tp.rlocator, &rlocator, sizeof(RelFileLocator));
    1160             : 
    1161        2078 :         xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(data);
    1162        2078 :         data = ((char *) xlhdr) + SizeOfMultiInsertTuple;
    1163        2078 :         datalen = xlhdr->datalen;
    1164             : 
    1165        2078 :         change->data.tp.newtuple =
    1166        2078 :             ReorderBufferGetTupleBuf(ctx->reorder, datalen);
    1167             : 
    1168        2078 :         tuple = change->data.tp.newtuple;
    1169        2078 :         header = tuple->t_data;
    1170             : 
    1171             :         /* not a disk based tuple */
    1172        2078 :         ItemPointerSetInvalid(&tuple->t_self);
    1173             : 
    1174             :         /*
    1175             :          * We can only figure this out after reassembling the transactions.
    1176             :          */
    1177        2078 :         tuple->t_tableOid = InvalidOid;
    1178             : 
    1179        2078 :         tuple->t_len = datalen + SizeofHeapTupleHeader;
    1180             : 
    1181        2078 :         memset(header, 0, SizeofHeapTupleHeader);
    1182             : 
    1183        2078 :         memcpy((char *) tuple->t_data + SizeofHeapTupleHeader,
    1184             :                (char *) data,
    1185             :                datalen);
    1186        2078 :         header->t_infomask = xlhdr->t_infomask;
    1187        2078 :         header->t_infomask2 = xlhdr->t_infomask2;
    1188        2078 :         header->t_hoff = xlhdr->t_hoff;
    1189             : 
    1190             :         /*
    1191             :          * Reset toast reassembly state only after the last row in the last
    1192             :          * xl_multi_insert_tuple record emitted by one heap_multi_insert()
    1193             :          * call.
    1194             :          */
    1195        2078 :         if (xlrec->flags & XLH_INSERT_LAST_IN_MULTI &&
    1196         358 :             (i + 1) == xlrec->ntuples)
    1197          18 :             change->data.tp.clear_toast_afterwards = true;
    1198             :         else
    1199        2060 :             change->data.tp.clear_toast_afterwards = false;
    1200             : 
    1201        2078 :         ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r),
    1202             :                                  buf->origptr, change, false);
    1203             : 
    1204             :         /* move to the next xl_multi_insert_tuple entry */
    1205        2078 :         data += datalen;
    1206             :     }
    1207             :     Assert(data == tupledata + tuplelen);
    1208             : }
    1209             : 
    1210             : /*
    1211             :  * Parse XLOG_HEAP_CONFIRM from wal into a confirmation change.
    1212             :  *
    1213             :  * This is pretty trivial, all the state essentially already setup by the
    1214             :  * speculative insertion.
    1215             :  */
    1216             : static void
    1217       35832 : DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
    1218             : {
    1219       35832 :     XLogReaderState *r = buf->record;
    1220             :     ReorderBufferChange *change;
    1221             :     RelFileLocator target_locator;
    1222             : 
    1223             :     /* only interested in our database */
    1224       35832 :     XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
    1225       35832 :     if (target_locator.dbOid != ctx->slot->data.database)
    1226           0 :         return;
    1227             : 
    1228             :     /* output plugin doesn't look for this origin, no need to queue */
    1229       35832 :     if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
    1230           0 :         return;
    1231             : 
    1232       35832 :     change = ReorderBufferGetChange(ctx->reorder);
    1233       35832 :     change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM;
    1234       35832 :     change->origin_id = XLogRecGetOrigin(r);
    1235             : 
    1236       35832 :     memcpy(&change->data.tp.rlocator, &target_locator, sizeof(RelFileLocator));
    1237             : 
    1238       35832 :     change->data.tp.clear_toast_afterwards = true;
    1239             : 
    1240       35832 :     ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
    1241             :                              change, false);
    1242             : }
    1243             : 
    1244             : 
    1245             : /*
    1246             :  * Read a HeapTuple as WAL logged by heap_insert, heap_update and heap_delete
    1247             :  * (but not by heap_multi_insert) into a tuplebuf.
    1248             :  *
    1249             :  * The size 'len' and the pointer 'data' in the record need to be
    1250             :  * computed outside as they are record specific.
    1251             :  */
    1252             : static void
    1253     3207340 : DecodeXLogTuple(char *data, Size len, HeapTuple tuple)
    1254             : {
    1255             :     xl_heap_header xlhdr;
    1256     3207340 :     int         datalen = len - SizeOfHeapHeader;
    1257             :     HeapTupleHeader header;
    1258             : 
    1259             :     Assert(datalen >= 0);
    1260             : 
    1261     3207340 :     tuple->t_len = datalen + SizeofHeapTupleHeader;
    1262     3207340 :     header = tuple->t_data;
    1263             : 
    1264             :     /* not a disk based tuple */
    1265     3207340 :     ItemPointerSetInvalid(&tuple->t_self);
    1266             : 
    1267             :     /* we can only figure this out after reassembling the transactions */
    1268     3207340 :     tuple->t_tableOid = InvalidOid;
    1269             : 
    1270             :     /* data is not stored aligned, copy to aligned storage */
    1271     3207340 :     memcpy((char *) &xlhdr,
    1272             :            data,
    1273             :            SizeOfHeapHeader);
    1274             : 
    1275     3207340 :     memset(header, 0, SizeofHeapTupleHeader);
    1276             : 
    1277     3207340 :     memcpy(((char *) tuple->t_data) + SizeofHeapTupleHeader,
    1278     3207340 :            data + SizeOfHeapHeader,
    1279             :            datalen);
    1280             : 
    1281     3207340 :     header->t_infomask = xlhdr.t_infomask;
    1282     3207340 :     header->t_infomask2 = xlhdr.t_infomask2;
    1283     3207340 :     header->t_hoff = xlhdr.t_hoff;
    1284     3207340 : }
    1285             : 
    1286             : /*
    1287             :  * Check whether we are interested in this specific transaction.
    1288             :  *
    1289             :  * There can be several reasons we might not be interested in this
    1290             :  * transaction:
    1291             :  * 1) We might not be interested in decoding transactions up to this
    1292             :  *    LSN. This can happen because we previously decoded it and now just
    1293             :  *    are restarting or if we haven't assembled a consistent snapshot yet.
    1294             :  * 2) The transaction happened in another database.
    1295             :  * 3) The output plugin is not interested in the origin.
    1296             :  * 4) We are doing fast-forwarding
    1297             :  */
    1298             : static bool
    1299        5658 : DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
    1300             :                   Oid txn_dbid, RepOriginId origin_id)
    1301             : {
    1302        5658 :     if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
    1303        5106 :         (txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) ||
    1304        2566 :         FilterByOrigin(ctx, origin_id))
    1305        3140 :         return true;
    1306             : 
    1307             :     /*
    1308             :      * We also skip decoding in fast_forward mode. In passing set the
    1309             :      * processing_required flag to indicate that if it were not for
    1310             :      * fast_forward mode, processing would have been required.
    1311             :      */
    1312        2518 :     if (ctx->fast_forward)
    1313             :     {
    1314          42 :         ctx->processing_required = true;
    1315          42 :         return true;
    1316             :     }
    1317             : 
    1318        2476 :     return false;
    1319             : }

Generated by: LCOV version 1.14