LCOV - code coverage report
Current view: top level - src/backend/replication/logical - decode.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 93.0 % 458 426
Test Date: 2026-04-26 21:16:32 Functions: 95.2 % 21 20
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /* -------------------------------------------------------------------------
       2              :  *
       3              :  * decode.c
       4              :  *      This module decodes WAL records read using xlogreader.h's APIs for the
       5              :  *      purpose of logical decoding by passing information to the
       6              :  *      reorderbuffer module (containing the actual changes) and to the
       7              :  *      snapbuild module to build a fitting catalog snapshot (to be able to
       8              :  *      properly decode the changes in the reorderbuffer).
       9              :  *
      10              :  * NOTE:
      11              :  *      This basically tries to handle all low level xlog stuff for
      12              :  *      reorderbuffer.c and snapbuild.c. There's some minor leakage where a
      13              :  *      specific record's struct is used to pass data along, but those just
      14              :  *      happen to contain the right amount of data in a convenient
      15              :  *      format. There isn't and shouldn't be much intelligence about the
      16              :  *      contents of records in here except turning them into a more usable
      17              :  *      format.
      18              :  *
      19              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
      20              :  * Portions Copyright (c) 1994, Regents of the University of California
      21              :  *
      22              :  * IDENTIFICATION
      23              :  *    src/backend/replication/logical/decode.c
      24              :  *
      25              :  * -------------------------------------------------------------------------
      26              :  */
      27              : #include "postgres.h"
      28              : 
      29              : #include "access/heapam_xlog.h"
      30              : #include "access/transam.h"
      31              : #include "access/xact.h"
      32              : #include "access/xlog_internal.h"
      33              : #include "access/xlogreader.h"
      34              : #include "access/xlogrecord.h"
      35              : #include "catalog/pg_control.h"
      36              : #include "commands/repack.h"
      37              : #include "replication/decode.h"
      38              : #include "replication/logical.h"
      39              : #include "replication/message.h"
      40              : #include "replication/reorderbuffer.h"
      41              : #include "replication/snapbuild.h"
      42              : #include "storage/standbydefs.h"
      43              : 
      44              : /* individual record(group)'s handlers */
      45              : static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      46              : static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      47              : static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      48              : static void DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      49              : static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      50              : static void DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      51              : 
      52              : static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
      53              :                          xl_xact_parsed_commit *parsed, TransactionId xid,
      54              :                          bool two_phase);
      55              : static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
      56              :                         xl_xact_parsed_abort *parsed, TransactionId xid,
      57              :                         bool two_phase);
      58              : static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
      59              :                           xl_xact_parsed_prepare *parsed);
      60              : 
      61              : 
      62              : /* common function to decode tuples */
      63              : static void DecodeXLogTuple(char *data, Size len, HeapTuple tuple);
      64              : 
      65              : /* helper functions for decoding transactions */
      66              : static inline bool FilterPrepare(LogicalDecodingContext *ctx,
      67              :                                  TransactionId xid, const char *gid);
      68              : static bool DecodeTXNNeedSkip(LogicalDecodingContext *ctx,
      69              :                               XLogRecordBuffer *buf, Oid txn_dbid,
      70              :                               ReplOriginId origin_id);
      71              : 
      72              : /*
      73              :  * Take every XLogReadRecord()ed record and perform the actions required to
      74              :  * decode it using the output plugin already setup in the logical decoding
      75              :  * context.
      76              :  *
      77              :  * NB: Note that every record's xid needs to be processed by reorderbuffer
      78              :  * (xids contained in the content of records are not relevant for this rule).
      79              :  * That means that for records which'd otherwise not go through the
      80              :  * reorderbuffer ReorderBufferProcessXid() has to be called. We don't want to
      81              :  * call ReorderBufferProcessXid for each record type by default, because
      82              :  * e.g. empty xacts can be handled more efficiently if there's no previous
      83              :  * state for them.
      84              :  *
      85              :  * We also support the ability to fast forward thru records, skipping some
      86              :  * record types completely - see individual record types for details.
      87              :  */
      88              : void
      89      2358143 : LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
      90              : {
      91              :     XLogRecordBuffer buf;
      92              :     TransactionId txid;
      93              :     RmgrData    rmgr;
      94              : 
      95      2358143 :     buf.origptr = ctx->reader->ReadRecPtr;
      96      2358143 :     buf.endptr = ctx->reader->EndRecPtr;
      97      2358143 :     buf.record = record;
      98              : 
      99      2358143 :     txid = XLogRecGetTopXid(record);
     100              : 
     101              :     /*
     102              :      * If the top-level xid is valid, we need to assign the subxact to the
     103              :      * top-level xact. We need to do this for all records, hence we do it
     104              :      * before the switch.
     105              :      */
     106      2358143 :     if (TransactionIdIsValid(txid))
     107              :     {
     108          671 :         ReorderBufferAssignChild(ctx->reorder,
     109              :                                  txid,
     110          671 :                                  XLogRecGetXid(record),
     111              :                                  buf.origptr);
     112              :     }
     113              : 
     114      2358143 :     rmgr = GetRmgr(XLogRecGetRmid(record));
     115              : 
     116      2358143 :     if (rmgr.rm_decode != NULL)
     117      1801858 :         rmgr.rm_decode(ctx, &buf);
     118              :     else
     119              :     {
     120              :         /* just deal with xid, and done */
     121       556285 :         ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(record),
     122              :                                 buf.origptr);
     123              :     }
     124      2358128 : }
     125              : 
     126              : /*
     127              :  * Handle rmgr XLOG_ID records for LogicalDecodingProcessRecord().
     128              :  */
     129              : void
     130         7562 : xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     131              : {
     132         7562 :     SnapBuild  *builder = ctx->snapshot_builder;
     133         7562 :     uint8       info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK;
     134              : 
     135         7562 :     ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(buf->record),
     136              :                             buf->origptr);
     137              : 
     138         7562 :     switch (info)
     139              :     {
     140              :             /* this is also used in END_OF_RECOVERY checkpoints */
     141          102 :         case XLOG_CHECKPOINT_SHUTDOWN:
     142              :         case XLOG_END_OF_RECOVERY:
     143          102 :             SnapBuildSerializationPoint(builder, buf->origptr);
     144              : 
     145          102 :             break;
     146           85 :         case XLOG_CHECKPOINT_ONLINE:
     147              : 
     148              :             /*
     149              :              * a RUNNING_XACTS record will have been logged near to this, we
     150              :              * can restart from there.
     151              :              */
     152           85 :             break;
     153            0 :         case XLOG_LOGICAL_DECODING_STATUS_CHANGE:
     154              :             {
     155              :                 bool        logical_decoding;
     156              : 
     157            0 :                 memcpy(&logical_decoding, XLogRecGetData(buf->record), sizeof(bool));
     158              : 
     159              :                 /*
     160              :                  * Error out as we should not decode this WAL record.
     161              :                  *
     162              :                  * Logical decoding is disabled, and existing logical slots on
     163              :                  * the standby are invalidated when this WAL record is
     164              :                  * replayed. No logical decoder can process this WAL record
     165              :                  * until replay completes, and by then the slots are already
     166              :                  * invalidated. Furthermore, no new logical slots can be
     167              :                  * created while logical decoding is disabled. This cannot
     168              :                  * occur even on primary either, since it will not restart
     169              :                  * with wal_level < replica if any logical slots exist.
     170              :                  */
     171            0 :                 elog(ERROR, "unexpected logical decoding status change %d",
     172              :                      logical_decoding);
     173              : 
     174              :                 break;
     175              :             }
     176         7375 :         case XLOG_NOOP:
     177              :         case XLOG_NEXTOID:
     178              :         case XLOG_SWITCH:
     179              :         case XLOG_BACKUP_END:
     180              :         case XLOG_PARAMETER_CHANGE:
     181              :         case XLOG_RESTORE_POINT:
     182              :         case XLOG_FPW_CHANGE:
     183              :         case XLOG_FPI_FOR_HINT:
     184              :         case XLOG_FPI:
     185              :         case XLOG_OVERWRITE_CONTRECORD:
     186              :         case XLOG_CHECKPOINT_REDO:
     187         7375 :             break;
     188            0 :         default:
     189            0 :             elog(ERROR, "unexpected RM_XLOG_ID record type: %u", info);
     190              :     }
     191         7562 : }
     192              : 
     193              : void
     194            0 : xlog2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     195              : {
     196            0 :     uint8       info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK;
     197              : 
     198            0 :     ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(buf->record), buf->origptr);
     199              : 
     200            0 :     switch (info)
     201              :     {
     202            0 :         case XLOG2_CHECKSUMS:
     203            0 :             break;
     204            0 :         default:
     205            0 :             elog(ERROR, "unexpected RM_XLOG2_ID record type: %u", info);
     206              :     }
     207            0 : }
     208              : 
     209              : /*
     210              :  * Handle rmgr XACT_ID records for LogicalDecodingProcessRecord().
     211              :  */
     212              : void
     213        10688 : xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     214              : {
     215        10688 :     SnapBuild  *builder = ctx->snapshot_builder;
     216        10688 :     ReorderBuffer *reorder = ctx->reorder;
     217        10688 :     XLogReaderState *r = buf->record;
     218        10688 :     uint8       info = XLogRecGetInfo(r) & XLOG_XACT_OPMASK;
     219              : 
     220              :     /*
     221              :      * If the snapshot isn't yet fully built, we cannot decode anything, so
     222              :      * bail out.
     223              :      */
     224        10688 :     if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
     225           13 :         return;
     226              : 
     227        10675 :     switch (info)
     228              :     {
     229         3999 :         case XLOG_XACT_COMMIT:
     230              :         case XLOG_XACT_COMMIT_PREPARED:
     231              :             {
     232              :                 xl_xact_commit *xlrec;
     233              :                 xl_xact_parsed_commit parsed;
     234              :                 TransactionId xid;
     235         3999 :                 bool        two_phase = false;
     236              : 
     237         3999 :                 xlrec = (xl_xact_commit *) XLogRecGetData(r);
     238         3999 :                 ParseCommitRecord(XLogRecGetInfo(buf->record), xlrec, &parsed);
     239              : 
     240         3999 :                 if (!TransactionIdIsValid(parsed.twophase_xid))
     241         3871 :                     xid = XLogRecGetXid(r);
     242              :                 else
     243          128 :                     xid = parsed.twophase_xid;
     244              : 
     245              :                 /*
     246              :                  * We would like to process the transaction in a two-phase
     247              :                  * manner iff output plugin supports two-phase commits and
     248              :                  * doesn't filter the transaction at prepare time.
     249              :                  */
     250         3999 :                 if (info == XLOG_XACT_COMMIT_PREPARED)
     251          128 :                     two_phase = !(FilterPrepare(ctx, xid,
     252          128 :                                                 parsed.twophase_gid));
     253              : 
     254         3999 :                 DecodeCommit(ctx, buf, &parsed, xid, two_phase);
     255         3986 :                 break;
     256              :             }
     257          295 :         case XLOG_XACT_ABORT:
     258              :         case XLOG_XACT_ABORT_PREPARED:
     259              :             {
     260              :                 xl_xact_abort *xlrec;
     261              :                 xl_xact_parsed_abort parsed;
     262              :                 TransactionId xid;
     263          295 :                 bool        two_phase = false;
     264              : 
     265          295 :                 xlrec = (xl_xact_abort *) XLogRecGetData(r);
     266          295 :                 ParseAbortRecord(XLogRecGetInfo(buf->record), xlrec, &parsed);
     267              : 
     268          295 :                 if (!TransactionIdIsValid(parsed.twophase_xid))
     269          244 :                     xid = XLogRecGetXid(r);
     270              :                 else
     271           51 :                     xid = parsed.twophase_xid;
     272              : 
     273              :                 /*
     274              :                  * We would like to process the transaction in a two-phase
     275              :                  * manner iff output plugin supports two-phase commits and
     276              :                  * doesn't filter the transaction at prepare time.
     277              :                  */
     278          295 :                 if (info == XLOG_XACT_ABORT_PREPARED)
     279           51 :                     two_phase = !(FilterPrepare(ctx, xid,
     280           51 :                                                 parsed.twophase_gid));
     281              : 
     282          295 :                 DecodeAbort(ctx, buf, &parsed, xid, two_phase);
     283          295 :                 break;
     284              :             }
     285          124 :         case XLOG_XACT_ASSIGNMENT:
     286              : 
     287              :             /*
     288              :              * We assign subxact to the toplevel xact while processing each
     289              :              * record if required.  So, we don't need to do anything here. See
     290              :              * LogicalDecodingProcessRecord.
     291              :              */
     292          124 :             break;
     293         6054 :         case XLOG_XACT_INVALIDATIONS:
     294              :             {
     295              :                 TransactionId xid;
     296              :                 xl_xact_invals *invals;
     297              : 
     298         6054 :                 xid = XLogRecGetXid(r);
     299         6054 :                 invals = (xl_xact_invals *) XLogRecGetData(r);
     300              : 
     301              :                 /*
     302              :                  * Execute the invalidations for xid-less transactions,
     303              :                  * otherwise, accumulate them so that they can be processed at
     304              :                  * the commit time.
     305              :                  */
     306         6054 :                 if (TransactionIdIsValid(xid))
     307              :                 {
     308         6020 :                     if (!ctx->fast_forward)
     309         5945 :                         ReorderBufferAddInvalidations(reorder, xid,
     310              :                                                       buf->origptr,
     311         5945 :                                                       invals->nmsgs,
     312         5945 :                                                       invals->msgs);
     313         6020 :                     ReorderBufferXidSetCatalogChanges(ctx->reorder, xid,
     314              :                                                       buf->origptr);
     315              :                 }
     316           34 :                 else if (!ctx->fast_forward)
     317           34 :                     ReorderBufferImmediateInvalidation(ctx->reorder,
     318           34 :                                                        invals->nmsgs,
     319           34 :                                                        invals->msgs);
     320              : 
     321         6054 :                 break;
     322              :             }
     323          203 :         case XLOG_XACT_PREPARE:
     324              :             {
     325              :                 xl_xact_parsed_prepare parsed;
     326              :                 xl_xact_prepare *xlrec;
     327              : 
     328              :                 /* ok, parse it */
     329          203 :                 xlrec = (xl_xact_prepare *) XLogRecGetData(r);
     330          203 :                 ParsePrepareRecord(XLogRecGetInfo(buf->record),
     331              :                                    xlrec, &parsed);
     332              : 
     333              :                 /*
     334              :                  * We would like to process the transaction in a two-phase
     335              :                  * manner iff output plugin supports two-phase commits and
     336              :                  * doesn't filter the transaction at prepare time.
     337              :                  */
     338          203 :                 if (FilterPrepare(ctx, parsed.twophase_xid,
     339              :                                   parsed.twophase_gid))
     340              :                 {
     341           20 :                     ReorderBufferProcessXid(reorder, parsed.twophase_xid,
     342              :                                             buf->origptr);
     343           20 :                     break;
     344              :                 }
     345              : 
     346              :                 /*
     347              :                  * Note that if the prepared transaction has locked [user]
     348              :                  * catalog tables exclusively then decoding prepare can block
     349              :                  * till the main transaction is committed because it needs to
     350              :                  * lock the catalog tables.
     351              :                  *
     352              :                  * XXX Now, this can even lead to a deadlock if the prepare
     353              :                  * transaction is waiting to get it logically replicated for
     354              :                  * distributed 2PC. This can be avoided by disallowing
     355              :                  * preparing transactions that have locked [user] catalog
     356              :                  * tables exclusively but as of now, we ask users not to do
     357              :                  * such an operation.
     358              :                  */
     359          183 :                 DecodePrepare(ctx, buf, &parsed);
     360          183 :                 break;
     361              :             }
     362            0 :         default:
     363            0 :             elog(ERROR, "unexpected RM_XACT_ID record type: %u", info);
     364              :     }
     365              : }
     366              : 
     367              : /*
     368              :  * Handle rmgr STANDBY_ID records for LogicalDecodingProcessRecord().
     369              :  */
     370              : void
     371         4411 : standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     372              : {
     373         4411 :     SnapBuild  *builder = ctx->snapshot_builder;
     374         4411 :     XLogReaderState *r = buf->record;
     375         4411 :     uint8       info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
     376              : 
     377         4411 :     ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
     378              : 
     379         4411 :     switch (info)
     380              :     {
     381         1742 :         case XLOG_RUNNING_XACTS:
     382              :             {
     383         1742 :                 xl_running_xacts *running = (xl_running_xacts *) XLogRecGetData(r);
     384              : 
     385              :                 /*
     386              :                  * Update this decoder's idea of transactions currently
     387              :                  * running.  In doing so we will determine whether we have
     388              :                  * reached consistent status.
     389              :                  *
     390              :                  * If the output plugin doesn't need access to shared
     391              :                  * catalogs, we can ignore transactions in other databases.
     392              :                  */
     393         1742 :                 SnapBuildProcessRunningXacts(builder, buf->origptr, running,
     394         1742 :                                              !ctx->options.need_shared_catalogs);
     395              : 
     396              :                 /*
     397              :                  * Abort all transactions that we keep track of, that are
     398              :                  * older than the record's oldestRunningXid. This is the most
     399              :                  * convenient spot for doing so since, in contrast to shutdown
     400              :                  * or end-of-recovery checkpoints, we have information about
     401              :                  * all running transactions which includes prepared ones,
     402              :                  * while shutdown checkpoints just know that no non-prepared
     403              :                  * transactions are in progress.
     404              :                  *
     405              :                  * The database-specific records might work here too, but it's
     406              :                  * not their purpose.
     407              :                  */
     408         1740 :                 if (!OidIsValid(running->dbid))
     409         1737 :                     ReorderBufferAbortOld(ctx->reorder, running->oldestRunningXid);
     410              :             }
     411         1740 :             break;
     412         2635 :         case XLOG_STANDBY_LOCK:
     413         2635 :             break;
     414           34 :         case XLOG_INVALIDATIONS:
     415              : 
     416              :             /*
     417              :              * We are processing the invalidations at the command level via
     418              :              * XLOG_XACT_INVALIDATIONS.  So we don't need to do anything here.
     419              :              */
     420           34 :             break;
     421            0 :         default:
     422            0 :             elog(ERROR, "unexpected RM_STANDBY_ID record type: %u", info);
     423              :     }
     424         4409 : }
     425              : 
     426              : /*
     427              :  * Handle rmgr HEAP2_ID records for LogicalDecodingProcessRecord().
     428              :  */
     429              : void
     430        37040 : heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     431              : {
     432        37040 :     uint8       info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK;
     433        37040 :     TransactionId xid = XLogRecGetXid(buf->record);
     434        37040 :     SnapBuild  *builder = ctx->snapshot_builder;
     435              : 
     436        37040 :     ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
     437              : 
     438              :     /*
     439              :      * If we don't have snapshot or we are just fast-forwarding, there is no
     440              :      * point in decoding data changes. However, it's crucial to build the base
     441              :      * snapshot during fast-forward mode (as is done in
     442              :      * SnapBuildProcessChange()) because we require the snapshot's xmin when
     443              :      * determining the candidate catalog_xmin for the replication slot. See
     444              :      * SnapBuildProcessRunningXacts().
     445              :      */
     446        37040 :     if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
     447            8 :         return;
     448              : 
     449        37032 :     switch (info)
     450              :     {
     451         7020 :         case XLOG_HEAP2_MULTI_INSERT:
     452         7020 :             if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
     453         7020 :                 !ctx->fast_forward &&
     454         6917 :                 !change_useless_for_repack(buf))
     455         6885 :                 DecodeMultiInsert(ctx, buf);
     456         7020 :             break;
     457        28067 :         case XLOG_HEAP2_NEW_CID:
     458        28067 :             if (!ctx->fast_forward)
     459              :             {
     460              :                 xl_heap_new_cid *xlrec;
     461              : 
     462        27684 :                 xlrec = (xl_heap_new_cid *) XLogRecGetData(buf->record);
     463        27684 :                 SnapBuildProcessNewCid(builder, xid, buf->origptr, xlrec);
     464              :             }
     465        28067 :             break;
     466           90 :         case XLOG_HEAP2_REWRITE:
     467              : 
     468              :             /*
     469              :              * Although these records only exist to serve the needs of logical
     470              :              * decoding, all the work happens as part of crash or archive
     471              :              * recovery, so we don't need to do anything here.
     472              :              */
     473           90 :             break;
     474              : 
     475              :             /*
     476              :              * Everything else here is just low level physical stuff we're not
     477              :              * interested in.
     478              :              */
     479         1855 :         case XLOG_HEAP2_PRUNE_ON_ACCESS:
     480              :         case XLOG_HEAP2_PRUNE_VACUUM_SCAN:
     481              :         case XLOG_HEAP2_PRUNE_VACUUM_CLEANUP:
     482              :         case XLOG_HEAP2_LOCK_UPDATED:
     483         1855 :             break;
     484            0 :         default:
     485            0 :             elog(ERROR, "unexpected RM_HEAP2_ID record type: %u", info);
     486              :     }
     487              : }
     488              : 
     489              : /*
     490              :  * Handle rmgr HEAP_ID records for LogicalDecodingProcessRecord().
     491              :  */
     492              : void
     493      1742059 : heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     494              : {
     495      1742059 :     uint8       info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK;
     496      1742059 :     TransactionId xid = XLogRecGetXid(buf->record);
     497      1742059 :     SnapBuild  *builder = ctx->snapshot_builder;
     498              : 
     499      1742059 :     ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
     500              : 
     501              :     /*
     502              :      * If we don't have snapshot or we are just fast-forwarding, there is no
     503              :      * point in decoding data changes. However, it's crucial to build the base
     504              :      * snapshot during fast-forward mode (as is done in
     505              :      * SnapBuildProcessChange()) because we require the snapshot's xmin when
     506              :      * determining the candidate catalog_xmin for the replication slot. See
     507              :      * SnapBuildProcessRunningXacts().
     508              :      */
     509      1742059 :     if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
     510          684 :         return;
     511              : 
     512      1741375 :     switch (info)
     513              :     {
     514      1127494 :         case XLOG_HEAP_INSERT:
     515      1127494 :             if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
     516      1127494 :                 !ctx->fast_forward &&
     517      1127287 :                 !change_useless_for_repack(buf))
     518      1127219 :                 DecodeInsert(ctx, buf);
     519      1127494 :             break;
     520              : 
     521              :             /*
     522              :              * Treat HOT update as normal updates. There is no useful
     523              :              * information in the fact that we could make it a HOT update
     524              :              * locally and the WAL layout is compatible.
     525              :              */
     526       181057 :         case XLOG_HEAP_HOT_UPDATE:
     527              :         case XLOG_HEAP_UPDATE:
     528       181057 :             if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
     529       181057 :                 !ctx->fast_forward &&
     530       181044 :                 !change_useless_for_repack(buf))
     531       181029 :                 DecodeUpdate(ctx, buf);
     532       181057 :             break;
     533              : 
     534       238069 :         case XLOG_HEAP_DELETE:
     535       238069 :             if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
     536       238069 :                 !ctx->fast_forward &&
     537       237975 :                 !change_useless_for_repack(buf))
     538       237968 :                 DecodeDelete(ctx, buf);
     539       238069 :             break;
     540              : 
     541           62 :         case XLOG_HEAP_TRUNCATE:
     542           62 :             if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
     543           62 :                 !ctx->fast_forward &&
     544           60 :                 !change_useless_for_repack(buf))
     545           60 :                 DecodeTruncate(ctx, buf);
     546           62 :             break;
     547              : 
     548         1235 :         case XLOG_HEAP_INPLACE:
     549              : 
     550              :             /*
     551              :              * Inplace updates are only ever performed on catalog tuples and
     552              :              * can, per definition, not change tuple visibility.  Since we
     553              :              * also don't decode catalog tuples, we're not interested in the
     554              :              * record's contents.
     555              :              */
     556         1235 :             break;
     557              : 
     558        17964 :         case XLOG_HEAP_CONFIRM:
     559        17964 :             if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
     560        17964 :                 !ctx->fast_forward &&
     561        17964 :                 !change_useless_for_repack(buf))
     562        17964 :                 DecodeSpecConfirm(ctx, buf);
     563        17964 :             break;
     564              : 
     565       175494 :         case XLOG_HEAP_LOCK:
     566              :             /* we don't care about row level locks for now */
     567       175494 :             break;
     568              : 
     569            0 :         default:
     570            0 :             elog(ERROR, "unexpected RM_HEAP_ID record type: %u", info);
     571              :             break;
     572              :     }
     573              : }
     574              : 
     575              : /*
     576              :  * Ask output plugin whether we want to skip this PREPARE and send
     577              :  * this transaction as a regular commit later.
     578              :  */
     579              : static inline bool
     580          382 : FilterPrepare(LogicalDecodingContext *ctx, TransactionId xid,
     581              :               const char *gid)
     582              : {
     583              :     /*
     584              :      * Skip if decoding of two-phase transactions at PREPARE time is not
     585              :      * enabled. In that case, all two-phase transactions are considered
     586              :      * filtered out and will be applied as regular transactions at COMMIT
     587              :      * PREPARED.
     588              :      */
     589          382 :     if (!ctx->twophase)
     590           23 :         return true;
     591              : 
     592              :     /*
     593              :      * The filter_prepare callback is optional. When not supplied, all
     594              :      * prepared transactions should go through.
     595              :      */
     596          359 :     if (ctx->callbacks.filter_prepare_cb == NULL)
     597          211 :         return false;
     598              : 
     599          148 :     return filter_prepare_cb_wrapper(ctx, xid, gid);
     600              : }
     601              : 
     602              : static inline bool
     603      1561617 : FilterByOrigin(LogicalDecodingContext *ctx, ReplOriginId origin_id)
     604              : {
     605      1561617 :     if (ctx->callbacks.filter_by_origin_cb == NULL)
     606           79 :         return false;
     607              : 
     608      1561538 :     return filter_by_origin_cb_wrapper(ctx, origin_id);
     609              : }
     610              : 
     611              : /*
     612              :  * Handle rmgr LOGICALMSG_ID records for LogicalDecodingProcessRecord().
     613              :  */
     614              : void
     615           98 : logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     616              : {
     617           98 :     SnapBuild  *builder = ctx->snapshot_builder;
     618           98 :     XLogReaderState *r = buf->record;
     619           98 :     TransactionId xid = XLogRecGetXid(r);
     620           98 :     uint8       info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
     621           98 :     ReplOriginId origin_id = XLogRecGetOrigin(r);
     622           98 :     Snapshot    snapshot = NULL;
     623              :     xl_logical_message *message;
     624              : 
     625           98 :     if (info != XLOG_LOGICAL_MESSAGE)
     626            0 :         elog(ERROR, "unexpected RM_LOGICALMSG_ID record type: %u", info);
     627              : 
     628           98 :     ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
     629              : 
     630              :     /* If we don't have snapshot, there is no point in decoding messages */
     631           98 :     if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
     632            0 :         return;
     633              : 
     634           98 :     message = (xl_logical_message *) XLogRecGetData(r);
     635              : 
     636          194 :     if (message->dbId != ctx->slot->data.database ||
     637           96 :         FilterByOrigin(ctx, origin_id))
     638            4 :         return;
     639              : 
     640           94 :     if (message->transactional &&
     641           39 :         !SnapBuildProcessChange(builder, xid, buf->origptr))
     642            0 :         return;
     643          149 :     else if (!message->transactional &&
     644          110 :              (SnapBuildCurrentState(builder) != SNAPBUILD_CONSISTENT ||
     645           55 :               SnapBuildXactNeedsSkip(builder, buf->origptr)))
     646           44 :         return;
     647              : 
     648              :     /*
     649              :      * We also skip decoding in fast_forward mode. This check must be last
     650              :      * because we don't want to set the processing_required flag unless we
     651              :      * have a decodable message.
     652              :      */
     653           50 :     if (ctx->fast_forward)
     654              :     {
     655              :         /*
     656              :          * We need to set processing_required flag to notify the message's
     657              :          * existence to the caller. Usually, the flag is set when either the
     658              :          * COMMIT or ABORT records are decoded, but this must be turned on
     659              :          * here because the non-transactional logical message is decoded
     660              :          * without waiting for these records.
     661              :          */
     662            3 :         if (!message->transactional)
     663            3 :             ctx->processing_required = true;
     664              : 
     665            3 :         return;
     666              :     }
     667              : 
     668              :     /*
     669              :      * If this is a non-transactional change, get the snapshot we're expected
     670              :      * to use. We only get here when the snapshot is consistent, and the
     671              :      * change is not meant to be skipped.
     672              :      *
     673              :      * For transactional changes we don't need a snapshot, we'll use the
     674              :      * regular snapshot maintained by ReorderBuffer. We just leave it NULL.
     675              :      */
     676           47 :     if (!message->transactional)
     677            8 :         snapshot = SnapBuildGetOrBuildSnapshot(builder);
     678              : 
     679           47 :     ReorderBufferQueueMessage(ctx->reorder, xid, snapshot, buf->endptr,
     680           47 :                               message->transactional,
     681           47 :                               message->message, /* first part of message is
     682              :                                                  * prefix */
     683              :                               message->message_size,
     684           47 :                               message->message + message->prefix_size);
     685              : }
     686              : 
     687              : /*
     688              :  * Consolidated commit record handling between the different form of commit
     689              :  * records.
     690              :  *
     691              :  * 'two_phase' indicates that caller wants to process the transaction in two
     692              :  * phases, first process prepare if not already done and then process
     693              :  * commit_prepared.
     694              :  */
     695              : static void
     696         3999 : DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
     697              :              xl_xact_parsed_commit *parsed, TransactionId xid,
     698              :              bool two_phase)
     699              : {
     700         3999 :     XLogRecPtr  origin_lsn = InvalidXLogRecPtr;
     701         3999 :     TimestampTz commit_time = parsed->xact_time;
     702         3999 :     ReplOriginId origin_id = XLogRecGetOrigin(buf->record);
     703              :     int         i;
     704              : 
     705         3999 :     if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
     706              :     {
     707          106 :         origin_lsn = parsed->origin_lsn;
     708          106 :         commit_time = parsed->origin_timestamp;
     709              :     }
     710              : 
     711         3999 :     SnapBuildCommitTxn(ctx->snapshot_builder, buf->origptr, xid,
     712              :                        parsed->nsubxacts, parsed->subxacts,
     713              :                        parsed->xinfo);
     714              : 
     715              :     /* ----
     716              :      * Check whether we are interested in this specific transaction, and tell
     717              :      * the reorderbuffer to forget the content of the (sub-)transactions
     718              :      * if not.
     719              :      *
     720              :      * We can't just use ReorderBufferAbort() here, because we need to execute
     721              :      * the transaction's invalidations.  This currently won't be needed if
     722              :      * we're just skipping over the transaction because currently we only do
     723              :      * so during startup, to get to the first transaction the client needs. As
     724              :      * we have reset the catalog caches before starting to read WAL, and we
     725              :      * haven't yet touched any catalogs, there can't be anything to invalidate.
     726              :      * But if we're "forgetting" this commit because it happened in another
     727              :      * database, the invalidations might be important, because they could be
     728              :      * for shared catalogs and we might have loaded data into the relevant
     729              :      * syscaches.
     730              :      * ---
     731              :      */
     732         3999 :     if (DecodeTXNNeedSkip(ctx, buf, parsed->dbId, origin_id))
     733              :     {
     734         2972 :         for (i = 0; i < parsed->nsubxacts; i++)
     735              :         {
     736          967 :             ReorderBufferForget(ctx->reorder, parsed->subxacts[i], buf->origptr);
     737              :         }
     738         2005 :         ReorderBufferForget(ctx->reorder, xid, buf->origptr);
     739              : 
     740         2005 :         return;
     741              :     }
     742              : 
     743              :     /* tell the reorderbuffer about the surviving subtransactions */
     744         2263 :     for (i = 0; i < parsed->nsubxacts; i++)
     745              :     {
     746          269 :         ReorderBufferCommitChild(ctx->reorder, xid, parsed->subxacts[i],
     747              :                                  buf->origptr, buf->endptr);
     748              :     }
     749              : 
     750              :     /*
     751              :      * Send the final commit record if the transaction data is already
     752              :      * decoded, otherwise, process the entire transaction.
     753              :      */
     754         1994 :     if (two_phase)
     755              :     {
     756           35 :         ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
     757           35 :                                     SnapBuildGetTwoPhaseAt(ctx->snapshot_builder),
     758              :                                     commit_time, origin_id, origin_lsn,
     759           35 :                                     parsed->twophase_gid, true);
     760              :     }
     761              :     else
     762              :     {
     763         1959 :         ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
     764              :                             commit_time, origin_id, origin_lsn);
     765              :     }
     766              : 
     767              :     /*
     768              :      * Update the decoding stats at transaction prepare/commit/abort.
     769              :      * Additionally we send the stats when we spill or stream the changes to
     770              :      * avoid losing them in case the decoding is interrupted. It is not clear
     771              :      * that sending more or less frequently than this would be better.
     772              :      */
     773         1981 :     UpdateDecodingStats(ctx);
     774              : }
     775              : 
     776              : /*
     777              :  * Decode PREPARE record. Similar logic as in DecodeCommit.
     778              :  *
     779              :  * Note that we don't skip prepare even if have detected concurrent abort
     780              :  * because it is quite possible that we had already sent some changes before we
     781              :  * detect abort in which case we need to abort those changes in the subscriber.
     782              :  * To abort such changes, we do send the prepare and then the rollback prepared
     783              :  * which is what happened on the publisher-side as well. Now, we can invent a
     784              :  * new abort API wherein in such cases we send abort and skip sending prepared
     785              :  * and rollback prepared but then it is not that straightforward because we
     786              :  * might have streamed this transaction by that time in which case it is
     787              :  * handled when the rollback is encountered. It is not impossible to optimize
     788              :  * the concurrent abort case but it can introduce design complexity w.r.t
     789              :  * handling different cases so leaving it for now as it doesn't seem worth it.
     790              :  */
     791              : static void
     792          183 : DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
     793              :               xl_xact_parsed_prepare *parsed)
     794              : {
     795          183 :     SnapBuild  *builder = ctx->snapshot_builder;
     796          183 :     XLogRecPtr  origin_lsn = parsed->origin_lsn;
     797          183 :     TimestampTz prepare_time = parsed->xact_time;
     798          183 :     ReplOriginId origin_id = XLogRecGetOrigin(buf->record);
     799              :     int         i;
     800          183 :     TransactionId xid = parsed->twophase_xid;
     801              : 
     802          183 :     if (parsed->origin_timestamp != 0)
     803            8 :         prepare_time = parsed->origin_timestamp;
     804              : 
     805              :     /*
     806              :      * Remember the prepare info for a txn so that it can be used later in
     807              :      * commit prepared if required. See ReorderBufferFinishPrepared.
     808              :      */
     809          183 :     if (!ReorderBufferRememberPrepareInfo(ctx->reorder, xid, buf->origptr,
     810              :                                           buf->endptr, prepare_time, origin_id,
     811              :                                           origin_lsn))
     812            0 :         return;
     813              : 
     814              :     /* We can't start streaming unless a consistent state is reached. */
     815          183 :     if (SnapBuildCurrentState(builder) < SNAPBUILD_CONSISTENT)
     816              :     {
     817            3 :         ReorderBufferSkipPrepare(ctx->reorder, xid);
     818            3 :         return;
     819              :     }
     820              : 
     821              :     /*
     822              :      * Check whether we need to process this transaction. See
     823              :      * DecodeTXNNeedSkip for the reasons why we sometimes want to skip the
     824              :      * transaction.
     825              :      *
     826              :      * We can't call ReorderBufferForget as we did in DecodeCommit as the txn
     827              :      * hasn't yet been committed, removing this txn before a commit might
     828              :      * result in the computation of an incorrect restart_lsn. See
     829              :      * SnapBuildProcessRunningXacts. But we need to process cache
     830              :      * invalidations if there are any for the reasons mentioned in
     831              :      * DecodeCommit.
     832              :      */
     833          180 :     if (DecodeTXNNeedSkip(ctx, buf, parsed->dbId, origin_id))
     834              :     {
     835          134 :         ReorderBufferSkipPrepare(ctx->reorder, xid);
     836          134 :         ReorderBufferInvalidate(ctx->reorder, xid, buf->origptr);
     837          134 :         return;
     838              :     }
     839              : 
     840              :     /* Tell the reorderbuffer about the surviving subtransactions. */
     841           47 :     for (i = 0; i < parsed->nsubxacts; i++)
     842              :     {
     843            1 :         ReorderBufferCommitChild(ctx->reorder, xid, parsed->subxacts[i],
     844              :                                  buf->origptr, buf->endptr);
     845              :     }
     846              : 
     847              :     /* replay actions of all transaction + subtransactions in order */
     848           46 :     ReorderBufferPrepare(ctx->reorder, xid, parsed->twophase_gid);
     849              : 
     850              :     /*
     851              :      * Update the decoding stats at transaction prepare/commit/abort.
     852              :      * Additionally we send the stats when we spill or stream the changes to
     853              :      * avoid losing them in case the decoding is interrupted. It is not clear
     854              :      * that sending more or less frequently than this would be better.
     855              :      */
     856           46 :     UpdateDecodingStats(ctx);
     857              : }
     858              : 
     859              : 
     860              : /*
     861              :  * Get the data from the various forms of abort records and pass it on to
     862              :  * snapbuild.c and reorderbuffer.c.
     863              :  *
     864              :  * 'two_phase' indicates to finish prepared transaction.
     865              :  */
     866              : static void
     867          295 : DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
     868              :             xl_xact_parsed_abort *parsed, TransactionId xid,
     869              :             bool two_phase)
     870              : {
     871              :     int         i;
     872          295 :     XLogRecPtr  origin_lsn = InvalidXLogRecPtr;
     873          295 :     TimestampTz abort_time = parsed->xact_time;
     874          295 :     ReplOriginId origin_id = XLogRecGetOrigin(buf->record);
     875              :     bool        skip_xact;
     876              : 
     877          295 :     if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
     878              :     {
     879            4 :         origin_lsn = parsed->origin_lsn;
     880            4 :         abort_time = parsed->origin_timestamp;
     881              :     }
     882              : 
     883              :     /*
     884              :      * Check whether we need to process this transaction. See
     885              :      * DecodeTXNNeedSkip for the reasons why we sometimes want to skip the
     886              :      * transaction.
     887              :      */
     888          295 :     skip_xact = DecodeTXNNeedSkip(ctx, buf, parsed->dbId, origin_id);
     889              : 
     890              :     /*
     891              :      * Send the final rollback record for a prepared transaction unless we
     892              :      * need to skip it. For non-two-phase xacts, simply forget the xact.
     893              :      */
     894          295 :     if (two_phase && !skip_xact)
     895              :     {
     896           11 :         ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
     897              :                                     InvalidXLogRecPtr,
     898              :                                     abort_time, origin_id, origin_lsn,
     899           11 :                                     parsed->twophase_gid, false);
     900              :     }
     901              :     else
     902              :     {
     903          290 :         for (i = 0; i < parsed->nsubxacts; i++)
     904              :         {
     905            6 :             ReorderBufferAbort(ctx->reorder, parsed->subxacts[i],
     906            6 :                                buf->record->EndRecPtr, abort_time);
     907              :         }
     908              : 
     909          284 :         ReorderBufferAbort(ctx->reorder, xid, buf->record->EndRecPtr,
     910              :                            abort_time);
     911              :     }
     912              : 
     913              :     /* update the decoding stats */
     914          295 :     UpdateDecodingStats(ctx);
     915          295 : }
     916              : 
     917              : /*
     918              :  * Parse XLOG_HEAP_INSERT (not MULTI_INSERT!) records into tuplebufs.
     919              :  *
     920              :  * Inserts can contain the new tuple.
     921              :  */
     922              : static void
     923      1127219 : DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     924              : {
     925              :     Size        datalen;
     926              :     char       *tupledata;
     927              :     Size        tuplelen;
     928      1127219 :     XLogReaderState *r = buf->record;
     929              :     xl_heap_insert *xlrec;
     930              :     ReorderBufferChange *change;
     931              :     RelFileLocator target_locator;
     932              : 
     933      1127219 :     xlrec = (xl_heap_insert *) XLogRecGetData(r);
     934              : 
     935              :     /*
     936              :      * Ignore insert records without new tuples (this does happen when
     937              :      * raw_heap_insert marks the TOAST record as HEAP_INSERT_NO_LOGICAL).
     938              :      */
     939      1127219 :     if (!(xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE))
     940         4818 :         return;
     941              : 
     942              :     /* only interested in our database */
     943      1122594 :     XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
     944      1122594 :     if (target_locator.dbOid != ctx->slot->data.database)
     945            0 :         return;
     946              : 
     947              :     /* output plugin doesn't look for this origin, no need to queue */
     948      1122594 :     if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
     949          193 :         return;
     950              : 
     951      1122401 :     change = ReorderBufferAllocChange(ctx->reorder);
     952      1122401 :     if (!(xlrec->flags & XLH_INSERT_IS_SPECULATIVE))
     953      1104437 :         change->action = REORDER_BUFFER_CHANGE_INSERT;
     954              :     else
     955        17964 :         change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT;
     956      1122401 :     change->origin_id = XLogRecGetOrigin(r);
     957              : 
     958      1122401 :     memcpy(&change->data.tp.rlocator, &target_locator, sizeof(RelFileLocator));
     959              : 
     960      1122401 :     tupledata = XLogRecGetBlockData(r, 0, &datalen);
     961      1122401 :     tuplelen = datalen - SizeOfHeapHeader;
     962              : 
     963      1122401 :     change->data.tp.newtuple =
     964      1122401 :         ReorderBufferAllocTupleBuf(ctx->reorder, tuplelen);
     965              : 
     966      1122401 :     DecodeXLogTuple(tupledata, datalen, change->data.tp.newtuple);
     967              : 
     968      1122401 :     change->data.tp.clear_toast_afterwards = true;
     969              : 
     970      1122401 :     ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
     971              :                              change,
     972      1122401 :                              xlrec->flags & XLH_INSERT_ON_TOAST_RELATION);
     973              : }
     974              : 
     975              : /*
     976              :  * Parse XLOG_HEAP_UPDATE and XLOG_HEAP_HOT_UPDATE, which have the same layout
     977              :  * in the record, from wal into proper tuplebufs.
     978              :  *
     979              :  * Updates can possibly contain a new tuple and the old primary key.
     980              :  */
     981              : static void
     982       181029 : DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     983              : {
     984       181029 :     XLogReaderState *r = buf->record;
     985              :     xl_heap_update *xlrec;
     986              :     ReorderBufferChange *change;
     987              :     char       *data;
     988              :     RelFileLocator target_locator;
     989              : 
     990       181029 :     xlrec = (xl_heap_update *) XLogRecGetData(r);
     991              : 
     992              :     /* only interested in our database */
     993       181029 :     XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
     994       181029 :     if (target_locator.dbOid != ctx->slot->data.database)
     995          316 :         return;
     996              : 
     997              :     /* output plugin doesn't look for this origin, no need to queue */
     998       180746 :     if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
     999           33 :         return;
    1000              : 
    1001       180713 :     change = ReorderBufferAllocChange(ctx->reorder);
    1002       180713 :     change->action = REORDER_BUFFER_CHANGE_UPDATE;
    1003       180713 :     change->origin_id = XLogRecGetOrigin(r);
    1004       180713 :     memcpy(&change->data.tp.rlocator, &target_locator, sizeof(RelFileLocator));
    1005              : 
    1006       180713 :     if (xlrec->flags & XLH_UPDATE_CONTAINS_NEW_TUPLE)
    1007              :     {
    1008              :         Size        datalen;
    1009              :         Size        tuplelen;
    1010              : 
    1011       178921 :         data = XLogRecGetBlockData(r, 0, &datalen);
    1012              : 
    1013       178921 :         tuplelen = datalen - SizeOfHeapHeader;
    1014              : 
    1015       178921 :         change->data.tp.newtuple =
    1016       178921 :             ReorderBufferAllocTupleBuf(ctx->reorder, tuplelen);
    1017              : 
    1018       178921 :         DecodeXLogTuple(data, datalen, change->data.tp.newtuple);
    1019              :     }
    1020              : 
    1021       180713 :     if (xlrec->flags & XLH_UPDATE_CONTAINS_OLD)
    1022              :     {
    1023              :         Size        datalen;
    1024              :         Size        tuplelen;
    1025              : 
    1026              :         /* caution, remaining data in record is not aligned */
    1027          352 :         data = XLogRecGetData(r) + SizeOfHeapUpdate;
    1028          352 :         datalen = XLogRecGetDataLen(r) - SizeOfHeapUpdate;
    1029          352 :         tuplelen = datalen - SizeOfHeapHeader;
    1030              : 
    1031          352 :         change->data.tp.oldtuple =
    1032          352 :             ReorderBufferAllocTupleBuf(ctx->reorder, tuplelen);
    1033              : 
    1034          352 :         DecodeXLogTuple(data, datalen, change->data.tp.oldtuple);
    1035              :     }
    1036              : 
    1037       180713 :     change->data.tp.clear_toast_afterwards = true;
    1038              : 
    1039       180713 :     ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
    1040              :                              change, false);
    1041              : }
    1042              : 
    1043              : /*
    1044              :  * Parse XLOG_HEAP_DELETE from wal into proper tuplebufs.
    1045              :  *
    1046              :  * Deletes can possibly contain the old primary key.
    1047              :  */
    1048              : static void
    1049       237968 : DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
    1050              : {
    1051       237968 :     XLogReaderState *r = buf->record;
    1052              :     xl_heap_delete *xlrec;
    1053              :     ReorderBufferChange *change;
    1054              :     RelFileLocator target_locator;
    1055              : 
    1056       237968 :     xlrec = (xl_heap_delete *) XLogRecGetData(r);
    1057              : 
    1058              :     /*
    1059              :      * Skip changes that were marked as ignorable at origin.
    1060              :      *
    1061              :      * (This is used for changes that affect relations not visible to other
    1062              :      * transactions, such as the transient table during concurrent repack.)
    1063              :      */
    1064       237968 :     if (xlrec->flags & XLH_DELETE_NO_LOGICAL)
    1065           51 :         return;
    1066              : 
    1067              :     /* only interested in our database */
    1068       237968 :     XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
    1069       237968 :     if (target_locator.dbOid != ctx->slot->data.database)
    1070           33 :         return;
    1071              : 
    1072              :     /* output plugin doesn't look for this origin, no need to queue */
    1073       237935 :     if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
    1074           18 :         return;
    1075              : 
    1076       237917 :     change = ReorderBufferAllocChange(ctx->reorder);
    1077              : 
    1078       237917 :     if (xlrec->flags & XLH_DELETE_IS_SUPER)
    1079            0 :         change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT;
    1080              :     else
    1081       237917 :         change->action = REORDER_BUFFER_CHANGE_DELETE;
    1082              : 
    1083       237917 :     change->origin_id = XLogRecGetOrigin(r);
    1084              : 
    1085       237917 :     memcpy(&change->data.tp.rlocator, &target_locator, sizeof(RelFileLocator));
    1086              : 
    1087              :     /* old primary key stored */
    1088       237917 :     if (xlrec->flags & XLH_DELETE_CONTAINS_OLD)
    1089              :     {
    1090       175892 :         Size        datalen = XLogRecGetDataLen(r) - SizeOfHeapDelete;
    1091       175892 :         Size        tuplelen = datalen - SizeOfHeapHeader;
    1092              : 
    1093              :         Assert(XLogRecGetDataLen(r) > (SizeOfHeapDelete + SizeOfHeapHeader));
    1094              : 
    1095       175892 :         change->data.tp.oldtuple =
    1096       175892 :             ReorderBufferAllocTupleBuf(ctx->reorder, tuplelen);
    1097              : 
    1098       175892 :         DecodeXLogTuple((char *) xlrec + SizeOfHeapDelete,
    1099              :                         datalen, change->data.tp.oldtuple);
    1100              :     }
    1101              : 
    1102       237917 :     change->data.tp.clear_toast_afterwards = true;
    1103              : 
    1104       237917 :     ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
    1105              :                              change, false);
    1106              : }
    1107              : 
    1108              : /*
    1109              :  * Parse XLOG_HEAP_TRUNCATE from wal
    1110              :  */
    1111              : static void
    1112           60 : DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
    1113              : {
    1114           60 :     XLogReaderState *r = buf->record;
    1115              :     xl_heap_truncate *xlrec;
    1116              :     ReorderBufferChange *change;
    1117              : 
    1118           60 :     xlrec = (xl_heap_truncate *) XLogRecGetData(r);
    1119              : 
    1120              :     /* only interested in our database */
    1121           60 :     if (xlrec->dbId != ctx->slot->data.database)
    1122            0 :         return;
    1123              : 
    1124              :     /* output plugin doesn't look for this origin, no need to queue */
    1125           60 :     if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
    1126            1 :         return;
    1127              : 
    1128           59 :     change = ReorderBufferAllocChange(ctx->reorder);
    1129           59 :     change->action = REORDER_BUFFER_CHANGE_TRUNCATE;
    1130           59 :     change->origin_id = XLogRecGetOrigin(r);
    1131           59 :     if (xlrec->flags & XLH_TRUNCATE_CASCADE)
    1132            1 :         change->data.truncate.cascade = true;
    1133           59 :     if (xlrec->flags & XLH_TRUNCATE_RESTART_SEQS)
    1134            2 :         change->data.truncate.restart_seqs = true;
    1135           59 :     change->data.truncate.nrelids = xlrec->nrelids;
    1136          118 :     change->data.truncate.relids = ReorderBufferAllocRelids(ctx->reorder,
    1137           59 :                                                             xlrec->nrelids);
    1138           59 :     memcpy(change->data.truncate.relids, xlrec->relids,
    1139           59 :            xlrec->nrelids * sizeof(Oid));
    1140           59 :     ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r),
    1141              :                              buf->origptr, change, false);
    1142              : }
    1143              : 
    1144              : /*
    1145              :  * Decode XLOG_HEAP2_MULTI_INSERT record into multiple tuplebufs.
    1146              :  *
    1147              :  * Currently MULTI_INSERT will always contain the full tuples.
    1148              :  */
    1149              : static void
    1150         6885 : DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
    1151              : {
    1152         6885 :     XLogReaderState *r = buf->record;
    1153              :     xl_heap_multi_insert *xlrec;
    1154              :     int         i;
    1155              :     char       *data;
    1156              :     char       *tupledata;
    1157              :     Size        tuplelen;
    1158              :     RelFileLocator rlocator;
    1159              : 
    1160         6885 :     xlrec = (xl_heap_multi_insert *) XLogRecGetData(r);
    1161              : 
    1162              :     /*
    1163              :      * Ignore insert records without new tuples.  This happens when a
    1164              :      * multi_insert is done on a catalog or on a non-persistent relation.
    1165              :      */
    1166         6885 :     if (!(xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE))
    1167         6871 :         return;
    1168              : 
    1169              :     /* only interested in our database */
    1170           65 :     XLogRecGetBlockTag(r, 0, &rlocator, NULL, NULL);
    1171           65 :     if (rlocator.dbOid != ctx->slot->data.database)
    1172           51 :         return;
    1173              : 
    1174              :     /* output plugin doesn't look for this origin, no need to queue */
    1175           14 :     if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
    1176            0 :         return;
    1177              : 
    1178              :     /*
    1179              :      * We know that this multi_insert isn't for a catalog, so the block should
    1180              :      * always have data even if a full-page write of it is taken.
    1181              :      */
    1182           14 :     tupledata = XLogRecGetBlockData(r, 0, &tuplelen);
    1183              :     Assert(tupledata != NULL);
    1184              : 
    1185           14 :     data = tupledata;
    1186         1053 :     for (i = 0; i < xlrec->ntuples; i++)
    1187              :     {
    1188              :         ReorderBufferChange *change;
    1189              :         xl_multi_insert_tuple *xlhdr;
    1190              :         int         datalen;
    1191              :         HeapTuple   tuple;
    1192              :         HeapTupleHeader header;
    1193              : 
    1194         1039 :         change = ReorderBufferAllocChange(ctx->reorder);
    1195         1039 :         change->action = REORDER_BUFFER_CHANGE_INSERT;
    1196         1039 :         change->origin_id = XLogRecGetOrigin(r);
    1197              : 
    1198         1039 :         memcpy(&change->data.tp.rlocator, &rlocator, sizeof(RelFileLocator));
    1199              : 
    1200         1039 :         xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(data);
    1201         1039 :         data = ((char *) xlhdr) + SizeOfMultiInsertTuple;
    1202         1039 :         datalen = xlhdr->datalen;
    1203              : 
    1204         1039 :         change->data.tp.newtuple =
    1205         1039 :             ReorderBufferAllocTupleBuf(ctx->reorder, datalen);
    1206              : 
    1207         1039 :         tuple = change->data.tp.newtuple;
    1208         1039 :         header = tuple->t_data;
    1209              : 
    1210              :         /* not a disk based tuple */
    1211         1039 :         ItemPointerSetInvalid(&tuple->t_self);
    1212              : 
    1213              :         /*
    1214              :          * We can only figure this out after reassembling the transactions.
    1215              :          */
    1216         1039 :         tuple->t_tableOid = InvalidOid;
    1217              : 
    1218         1039 :         tuple->t_len = datalen + SizeofHeapTupleHeader;
    1219              : 
    1220         1039 :         memset(header, 0, SizeofHeapTupleHeader);
    1221              : 
    1222         1039 :         memcpy((char *) tuple->t_data + SizeofHeapTupleHeader, data, datalen);
    1223         1039 :         header->t_infomask = xlhdr->t_infomask;
    1224         1039 :         header->t_infomask2 = xlhdr->t_infomask2;
    1225         1039 :         header->t_hoff = xlhdr->t_hoff;
    1226              : 
    1227              :         /*
    1228              :          * Reset toast reassembly state only after the last row in the last
    1229              :          * xl_multi_insert_tuple record emitted by one heap_multi_insert()
    1230              :          * call.
    1231              :          */
    1232         1039 :         if (xlrec->flags & XLH_INSERT_LAST_IN_MULTI &&
    1233          179 :             (i + 1) == xlrec->ntuples)
    1234            9 :             change->data.tp.clear_toast_afterwards = true;
    1235              :         else
    1236         1030 :             change->data.tp.clear_toast_afterwards = false;
    1237              : 
    1238         1039 :         ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r),
    1239              :                                  buf->origptr, change, false);
    1240              : 
    1241              :         /* move to the next xl_multi_insert_tuple entry */
    1242         1039 :         data += datalen;
    1243              :     }
    1244              :     Assert(data == tupledata + tuplelen);
    1245              : }
    1246              : 
    1247              : /*
    1248              :  * Parse XLOG_HEAP_CONFIRM from wal into a confirmation change.
    1249              :  *
    1250              :  * This is pretty trivial, all the state essentially already setup by the
    1251              :  * speculative insertion.
    1252              :  */
    1253              : static void
    1254        17964 : DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
    1255              : {
    1256        17964 :     XLogReaderState *r = buf->record;
    1257              :     ReorderBufferChange *change;
    1258              :     RelFileLocator target_locator;
    1259              : 
    1260              :     /* only interested in our database */
    1261        17964 :     XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
    1262        17964 :     if (target_locator.dbOid != ctx->slot->data.database)
    1263            0 :         return;
    1264              : 
    1265              :     /* output plugin doesn't look for this origin, no need to queue */
    1266        17964 :     if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
    1267            0 :         return;
    1268              : 
    1269        17964 :     change = ReorderBufferAllocChange(ctx->reorder);
    1270        17964 :     change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM;
    1271        17964 :     change->origin_id = XLogRecGetOrigin(r);
    1272              : 
    1273        17964 :     memcpy(&change->data.tp.rlocator, &target_locator, sizeof(RelFileLocator));
    1274              : 
    1275        17964 :     change->data.tp.clear_toast_afterwards = true;
    1276              : 
    1277        17964 :     ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
    1278              :                              change, false);
    1279              : }
    1280              : 
    1281              : 
    1282              : /*
    1283              :  * Read a HeapTuple as WAL logged by heap_insert, heap_update and heap_delete
    1284              :  * (but not by heap_multi_insert) into a tuplebuf.
    1285              :  *
    1286              :  * The size 'len' and the pointer 'data' in the record need to be
    1287              :  * computed outside as they are record specific.
    1288              :  */
    1289              : static void
    1290      1477566 : DecodeXLogTuple(char *data, Size len, HeapTuple tuple)
    1291              : {
    1292              :     xl_heap_header xlhdr;
    1293      1477566 :     int         datalen = len - SizeOfHeapHeader;
    1294              :     HeapTupleHeader header;
    1295              : 
    1296              :     Assert(datalen >= 0);
    1297              : 
    1298      1477566 :     tuple->t_len = datalen + SizeofHeapTupleHeader;
    1299      1477566 :     header = tuple->t_data;
    1300              : 
    1301              :     /* not a disk based tuple */
    1302      1477566 :     ItemPointerSetInvalid(&tuple->t_self);
    1303              : 
    1304              :     /* we can only figure this out after reassembling the transactions */
    1305      1477566 :     tuple->t_tableOid = InvalidOid;
    1306              : 
    1307              :     /* data is not stored aligned, copy to aligned storage */
    1308      1477566 :     memcpy(&xlhdr, data, SizeOfHeapHeader);
    1309              : 
    1310      1477566 :     memset(header, 0, SizeofHeapTupleHeader);
    1311              : 
    1312      1477566 :     memcpy(((char *) tuple->t_data) + SizeofHeapTupleHeader,
    1313      1477566 :            data + SizeOfHeapHeader,
    1314              :            datalen);
    1315              : 
    1316      1477566 :     header->t_infomask = xlhdr.t_infomask;
    1317      1477566 :     header->t_infomask2 = xlhdr.t_infomask2;
    1318      1477566 :     header->t_hoff = xlhdr.t_hoff;
    1319      1477566 : }
    1320              : 
    1321              : /*
    1322              :  * Check whether we are interested in this specific transaction.
    1323              :  *
    1324              :  * There can be several reasons we might not be interested in this
    1325              :  * transaction:
    1326              :  * 1) We might not be interested in decoding transactions up to this
    1327              :  *    LSN. This can happen because we previously decoded it and now just
    1328              :  *    are restarting or if we haven't assembled a consistent snapshot yet.
    1329              :  * 2) The transaction happened in another database.
    1330              :  * 3) The output plugin is not interested in the origin.
    1331              :  * 4) We are doing fast-forwarding
    1332              :  */
    1333              : static bool
    1334         4474 : DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
    1335              :                   Oid txn_dbid, ReplOriginId origin_id)
    1336              : {
    1337         4474 :     if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
    1338         4358 :         (txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) ||
    1339         2208 :         FilterByOrigin(ctx, origin_id))
    1340         2304 :         return true;
    1341              : 
    1342              :     /*
    1343              :      * We also skip decoding in fast_forward mode. In passing set the
    1344              :      * processing_required flag to indicate that if it were not for
    1345              :      * fast_forward mode, processing would have been required.
    1346              :      */
    1347         2170 :     if (ctx->fast_forward)
    1348              :     {
    1349           39 :         ctx->processing_required = true;
    1350           39 :         return true;
    1351              :     }
    1352              : 
    1353         2131 :     return false;
    1354              : }
        

Generated by: LCOV version 2.0-1