LCOV - code coverage report
Current view: top level - src/backend/replication/logical - decode.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 92.9 % 448 416
Test Date: 2026-04-06 09:16:18 Functions: 95.2 % 21 20
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /* -------------------------------------------------------------------------
       2              :  *
       3              :  * decode.c
       4              :  *      This module decodes WAL records read using xlogreader.h's APIs for the
       5              :  *      purpose of logical decoding by passing information to the
       6              :  *      reorderbuffer module (containing the actual changes) and to the
       7              :  *      snapbuild module to build a fitting catalog snapshot (to be able to
       8              :  *      properly decode the changes in the reorderbuffer).
       9              :  *
      10              :  * NOTE:
      11              :  *      This basically tries to handle all low level xlog stuff for
      12              :  *      reorderbuffer.c and snapbuild.c. There's some minor leakage where a
      13              :  *      specific record's struct is used to pass data along, but those just
      14              :  *      happen to contain the right amount of data in a convenient
      15              :  *      format. There isn't and shouldn't be much intelligence about the
      16              :  *      contents of records in here except turning them into a more usable
      17              :  *      format.
      18              :  *
      19              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
      20              :  * Portions Copyright (c) 1994, Regents of the University of California
      21              :  *
      22              :  * IDENTIFICATION
      23              :  *    src/backend/replication/logical/decode.c
      24              :  *
      25              :  * -------------------------------------------------------------------------
      26              :  */
      27              : #include "postgres.h"
      28              : 
      29              : #include "access/heapam_xlog.h"
      30              : #include "access/transam.h"
      31              : #include "access/xact.h"
      32              : #include "access/xlog_internal.h"
      33              : #include "access/xlogreader.h"
      34              : #include "access/xlogrecord.h"
      35              : #include "catalog/pg_control.h"
      36              : #include "replication/decode.h"
      37              : #include "replication/logical.h"
      38              : #include "replication/message.h"
      39              : #include "replication/reorderbuffer.h"
      40              : #include "replication/snapbuild.h"
      41              : #include "storage/standbydefs.h"
      42              : 
      43              : /* individual record(group)'s handlers */
      44              : static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      45              : static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      46              : static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      47              : static void DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      48              : static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      49              : static void DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      50              : 
      51              : static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
      52              :                          xl_xact_parsed_commit *parsed, TransactionId xid,
      53              :                          bool two_phase);
      54              : static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
      55              :                         xl_xact_parsed_abort *parsed, TransactionId xid,
      56              :                         bool two_phase);
      57              : static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
      58              :                           xl_xact_parsed_prepare *parsed);
      59              : 
      60              : 
      61              : /* common function to decode tuples */
      62              : static void DecodeXLogTuple(char *data, Size len, HeapTuple tuple);
      63              : 
      64              : /* helper functions for decoding transactions */
      65              : static inline bool FilterPrepare(LogicalDecodingContext *ctx,
      66              :                                  TransactionId xid, const char *gid);
      67              : static bool DecodeTXNNeedSkip(LogicalDecodingContext *ctx,
      68              :                               XLogRecordBuffer *buf, Oid txn_dbid,
      69              :                               ReplOriginId origin_id);
      70              : 
      71              : /*
      72              :  * Take every XLogReadRecord()ed record and perform the actions required to
      73              :  * decode it using the output plugin already setup in the logical decoding
      74              :  * context.
      75              :  *
      76              :  * NB: Note that every record's xid needs to be processed by reorderbuffer
      77              :  * (xids contained in the content of records are not relevant for this rule).
      78              :  * That means that for records which'd otherwise not go through the
      79              :  * reorderbuffer ReorderBufferProcessXid() has to be called. We don't want to
      80              :  * call ReorderBufferProcessXid for each record type by default, because
      81              :  * e.g. empty xacts can be handled more efficiently if there's no previous
      82              :  * state for them.
      83              :  *
      84              :  * We also support the ability to fast forward thru records, skipping some
      85              :  * record types completely - see individual record types for details.
      86              :  */
      87              : void
      88      2219310 : LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
      89              : {
      90              :     XLogRecordBuffer buf;
      91              :     TransactionId txid;
      92              :     RmgrData    rmgr;
      93              : 
      94      2219310 :     buf.origptr = ctx->reader->ReadRecPtr;
      95      2219310 :     buf.endptr = ctx->reader->EndRecPtr;
      96      2219310 :     buf.record = record;
      97              : 
      98      2219310 :     txid = XLogRecGetTopXid(record);
      99              : 
     100              :     /*
     101              :      * If the top-level xid is valid, we need to assign the subxact to the
     102              :      * top-level xact. We need to do this for all records, hence we do it
     103              :      * before the switch.
     104              :      */
     105      2219310 :     if (TransactionIdIsValid(txid))
     106              :     {
     107          667 :         ReorderBufferAssignChild(ctx->reorder,
     108              :                                  txid,
     109          667 :                                  XLogRecGetXid(record),
     110              :                                  buf.origptr);
     111              :     }
     112              : 
     113      2219310 :     rmgr = GetRmgr(XLogRecGetRmid(record));
     114              : 
     115      2219310 :     if (rmgr.rm_decode != NULL)
     116      1709018 :         rmgr.rm_decode(ctx, &buf);
     117              :     else
     118              :     {
     119              :         /* just deal with xid, and done */
     120       510292 :         ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(record),
     121              :                                 buf.origptr);
     122              :     }
     123      2219298 : }
     124              : 
     125              : /*
     126              :  * Handle rmgr XLOG_ID records for LogicalDecodingProcessRecord().
     127              :  */
     128              : void
     129         6427 : xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     130              : {
     131         6427 :     SnapBuild  *builder = ctx->snapshot_builder;
     132         6427 :     uint8       info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK;
     133              : 
     134         6427 :     ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(buf->record),
     135              :                             buf->origptr);
     136              : 
     137         6427 :     switch (info)
     138              :     {
     139              :             /* this is also used in END_OF_RECOVERY checkpoints */
     140           81 :         case XLOG_CHECKPOINT_SHUTDOWN:
     141              :         case XLOG_END_OF_RECOVERY:
     142           81 :             SnapBuildSerializationPoint(builder, buf->origptr);
     143              : 
     144           81 :             break;
     145           80 :         case XLOG_CHECKPOINT_ONLINE:
     146              : 
     147              :             /*
     148              :              * a RUNNING_XACTS record will have been logged near to this, we
     149              :              * can restart from there.
     150              :              */
     151           80 :             break;
     152            0 :         case XLOG_LOGICAL_DECODING_STATUS_CHANGE:
     153              :             {
     154              :                 bool        logical_decoding;
     155              : 
     156            0 :                 memcpy(&logical_decoding, XLogRecGetData(buf->record), sizeof(bool));
     157              : 
     158              :                 /*
     159              :                  * Error out as we should not decode this WAL record.
     160              :                  *
     161              :                  * Logical decoding is disabled, and existing logical slots on
     162              :                  * the standby are invalidated when this WAL record is
     163              :                  * replayed. No logical decoder can process this WAL record
     164              :                  * until replay completes, and by then the slots are already
     165              :                  * invalidated. Furthermore, no new logical slots can be
     166              :                  * created while logical decoding is disabled. This cannot
     167              :                  * occur even on primary either, since it will not restart
     168              :                  * with wal_level < replica if any logical slots exist.
     169              :                  */
     170            0 :                 elog(ERROR, "unexpected logical decoding status change %d",
     171              :                      logical_decoding);
     172              : 
     173              :                 break;
     174              :             }
     175         6266 :         case XLOG_NOOP:
     176              :         case XLOG_NEXTOID:
     177              :         case XLOG_SWITCH:
     178              :         case XLOG_BACKUP_END:
     179              :         case XLOG_PARAMETER_CHANGE:
     180              :         case XLOG_RESTORE_POINT:
     181              :         case XLOG_FPW_CHANGE:
     182              :         case XLOG_FPI_FOR_HINT:
     183              :         case XLOG_FPI:
     184              :         case XLOG_OVERWRITE_CONTRECORD:
     185              :         case XLOG_CHECKPOINT_REDO:
     186         6266 :             break;
     187            0 :         default:
     188            0 :             elog(ERROR, "unexpected RM_XLOG_ID record type: %u", info);
     189              :     }
     190         6427 : }
     191              : 
     192              : void
     193            0 : xlog2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     194              : {
     195            0 :     uint8       info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK;
     196              : 
     197            0 :     ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(buf->record), buf->origptr);
     198              : 
     199            0 :     switch (info)
     200              :     {
     201            0 :         case XLOG2_CHECKSUMS:
     202            0 :             break;
     203            0 :         default:
     204            0 :             elog(ERROR, "unexpected RM_XLOG2_ID record type: %u", info);
     205              :     }
     206            0 : }
     207              : 
     208              : /*
     209              :  * Handle rmgr XACT_ID records for LogicalDecodingProcessRecord().
     210              :  */
     211              : void
     212        10469 : xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     213              : {
     214        10469 :     SnapBuild  *builder = ctx->snapshot_builder;
     215        10469 :     ReorderBuffer *reorder = ctx->reorder;
     216        10469 :     XLogReaderState *r = buf->record;
     217        10469 :     uint8       info = XLogRecGetInfo(r) & XLOG_XACT_OPMASK;
     218              : 
     219              :     /*
     220              :      * If the snapshot isn't yet fully built, we cannot decode anything, so
     221              :      * bail out.
     222              :      */
     223        10469 :     if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
     224           12 :         return;
     225              : 
     226        10457 :     switch (info)
     227              :     {
     228         3860 :         case XLOG_XACT_COMMIT:
     229              :         case XLOG_XACT_COMMIT_PREPARED:
     230              :             {
     231              :                 xl_xact_commit *xlrec;
     232              :                 xl_xact_parsed_commit parsed;
     233              :                 TransactionId xid;
     234         3860 :                 bool        two_phase = false;
     235              : 
     236         3860 :                 xlrec = (xl_xact_commit *) XLogRecGetData(r);
     237         3860 :                 ParseCommitRecord(XLogRecGetInfo(buf->record), xlrec, &parsed);
     238              : 
     239         3860 :                 if (!TransactionIdIsValid(parsed.twophase_xid))
     240         3740 :                     xid = XLogRecGetXid(r);
     241              :                 else
     242          120 :                     xid = parsed.twophase_xid;
     243              : 
     244              :                 /*
     245              :                  * We would like to process the transaction in a two-phase
     246              :                  * manner iff output plugin supports two-phase commits and
     247              :                  * doesn't filter the transaction at prepare time.
     248              :                  */
     249         3860 :                 if (info == XLOG_XACT_COMMIT_PREPARED)
     250          120 :                     two_phase = !(FilterPrepare(ctx, xid,
     251          120 :                                                 parsed.twophase_gid));
     252              : 
     253         3860 :                 DecodeCommit(ctx, buf, &parsed, xid, two_phase);
     254         3850 :                 break;
     255              :             }
     256          206 :         case XLOG_XACT_ABORT:
     257              :         case XLOG_XACT_ABORT_PREPARED:
     258              :             {
     259              :                 xl_xact_abort *xlrec;
     260              :                 xl_xact_parsed_abort parsed;
     261              :                 TransactionId xid;
     262          206 :                 bool        two_phase = false;
     263              : 
     264          206 :                 xlrec = (xl_xact_abort *) XLogRecGetData(r);
     265          206 :                 ParseAbortRecord(XLogRecGetInfo(buf->record), xlrec, &parsed);
     266              : 
     267          206 :                 if (!TransactionIdIsValid(parsed.twophase_xid))
     268          161 :                     xid = XLogRecGetXid(r);
     269              :                 else
     270           45 :                     xid = parsed.twophase_xid;
     271              : 
     272              :                 /*
     273              :                  * We would like to process the transaction in a two-phase
     274              :                  * manner iff output plugin supports two-phase commits and
     275              :                  * doesn't filter the transaction at prepare time.
     276              :                  */
     277          206 :                 if (info == XLOG_XACT_ABORT_PREPARED)
     278           45 :                     two_phase = !(FilterPrepare(ctx, xid,
     279           45 :                                                 parsed.twophase_gid));
     280              : 
     281          206 :                 DecodeAbort(ctx, buf, &parsed, xid, two_phase);
     282          206 :                 break;
     283              :             }
     284          123 :         case XLOG_XACT_ASSIGNMENT:
     285              : 
     286              :             /*
     287              :              * We assign subxact to the toplevel xact while processing each
     288              :              * record if required.  So, we don't need to do anything here. See
     289              :              * LogicalDecodingProcessRecord.
     290              :              */
     291          123 :             break;
     292         6081 :         case XLOG_XACT_INVALIDATIONS:
     293              :             {
     294              :                 TransactionId xid;
     295              :                 xl_xact_invals *invals;
     296              : 
     297         6081 :                 xid = XLogRecGetXid(r);
     298         6081 :                 invals = (xl_xact_invals *) XLogRecGetData(r);
     299              : 
     300              :                 /*
     301              :                  * Execute the invalidations for xid-less transactions,
     302              :                  * otherwise, accumulate them so that they can be processed at
     303              :                  * the commit time.
     304              :                  */
     305         6081 :                 if (TransactionIdIsValid(xid))
     306              :                 {
     307         6057 :                     if (!ctx->fast_forward)
     308         5982 :                         ReorderBufferAddInvalidations(reorder, xid,
     309              :                                                       buf->origptr,
     310         5982 :                                                       invals->nmsgs,
     311         5982 :                                                       invals->msgs);
     312         6057 :                     ReorderBufferXidSetCatalogChanges(ctx->reorder, xid,
     313              :                                                       buf->origptr);
     314              :                 }
     315           24 :                 else if (!ctx->fast_forward)
     316           24 :                     ReorderBufferImmediateInvalidation(ctx->reorder,
     317           24 :                                                        invals->nmsgs,
     318           24 :                                                        invals->msgs);
     319              : 
     320         6081 :                 break;
     321              :             }
     322          187 :         case XLOG_XACT_PREPARE:
     323              :             {
     324              :                 xl_xact_parsed_prepare parsed;
     325              :                 xl_xact_prepare *xlrec;
     326              : 
     327              :                 /* ok, parse it */
     328          187 :                 xlrec = (xl_xact_prepare *) XLogRecGetData(r);
     329          187 :                 ParsePrepareRecord(XLogRecGetInfo(buf->record),
     330              :                                    xlrec, &parsed);
     331              : 
     332              :                 /*
     333              :                  * We would like to process the transaction in a two-phase
     334              :                  * manner iff output plugin supports two-phase commits and
     335              :                  * doesn't filter the transaction at prepare time.
     336              :                  */
     337          187 :                 if (FilterPrepare(ctx, parsed.twophase_xid,
     338              :                                   parsed.twophase_gid))
     339              :                 {
     340           20 :                     ReorderBufferProcessXid(reorder, parsed.twophase_xid,
     341              :                                             buf->origptr);
     342           20 :                     break;
     343              :                 }
     344              : 
     345              :                 /*
     346              :                  * Note that if the prepared transaction has locked [user]
     347              :                  * catalog tables exclusively then decoding prepare can block
     348              :                  * till the main transaction is committed because it needs to
     349              :                  * lock the catalog tables.
     350              :                  *
     351              :                  * XXX Now, this can even lead to a deadlock if the prepare
     352              :                  * transaction is waiting to get it logically replicated for
     353              :                  * distributed 2PC. This can be avoided by disallowing
     354              :                  * preparing transactions that have locked [user] catalog
     355              :                  * tables exclusively but as of now, we ask users not to do
     356              :                  * such an operation.
     357              :                  */
     358          167 :                 DecodePrepare(ctx, buf, &parsed);
     359          167 :                 break;
     360              :             }
     361            0 :         default:
     362            0 :             elog(ERROR, "unexpected RM_XACT_ID record type: %u", info);
     363              :     }
     364              : }
     365              : 
     366              : /*
     367              :  * Handle rmgr STANDBY_ID records for LogicalDecodingProcessRecord().
     368              :  */
     369              : void
     370         4433 : standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     371              : {
     372         4433 :     SnapBuild  *builder = ctx->snapshot_builder;
     373         4433 :     XLogReaderState *r = buf->record;
     374         4433 :     uint8       info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
     375              : 
     376         4433 :     ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
     377              : 
     378         4433 :     switch (info)
     379              :     {
     380         1674 :         case XLOG_RUNNING_XACTS:
     381              :             {
     382         1674 :                 xl_running_xacts *running = (xl_running_xacts *) XLogRecGetData(r);
     383              : 
     384         1674 :                 SnapBuildProcessRunningXacts(builder, buf->origptr, running);
     385              : 
     386              :                 /*
     387              :                  * Abort all transactions that we keep track of, that are
     388              :                  * older than the record's oldestRunningXid. This is the most
     389              :                  * convenient spot for doing so since, in contrast to shutdown
     390              :                  * or end-of-recovery checkpoints, we have information about
     391              :                  * all running transactions which includes prepared ones,
     392              :                  * while shutdown checkpoints just know that no non-prepared
     393              :                  * transactions are in progress.
     394              :                  */
     395         1672 :                 ReorderBufferAbortOld(ctx->reorder, running->oldestRunningXid);
     396              :             }
     397         1672 :             break;
     398         2735 :         case XLOG_STANDBY_LOCK:
     399         2735 :             break;
     400           24 :         case XLOG_INVALIDATIONS:
     401              : 
     402              :             /*
     403              :              * We are processing the invalidations at the command level via
     404              :              * XLOG_XACT_INVALIDATIONS.  So we don't need to do anything here.
     405              :              */
     406           24 :             break;
     407            0 :         default:
     408            0 :             elog(ERROR, "unexpected RM_STANDBY_ID record type: %u", info);
     409              :     }
     410         4431 : }
     411              : 
     412              : /*
     413              :  * Handle rmgr HEAP2_ID records for LogicalDecodingProcessRecord().
     414              :  */
     415              : void
     416        36686 : heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     417              : {
     418        36686 :     uint8       info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK;
     419        36686 :     TransactionId xid = XLogRecGetXid(buf->record);
     420        36686 :     SnapBuild  *builder = ctx->snapshot_builder;
     421              : 
     422        36686 :     ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
     423              : 
     424              :     /*
     425              :      * If we don't have snapshot or we are just fast-forwarding, there is no
     426              :      * point in decoding data changes. However, it's crucial to build the base
     427              :      * snapshot during fast-forward mode (as is done in
     428              :      * SnapBuildProcessChange()) because we require the snapshot's xmin when
     429              :      * determining the candidate catalog_xmin for the replication slot. See
     430              :      * SnapBuildProcessRunningXacts().
     431              :      */
     432        36686 :     if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
     433            8 :         return;
     434              : 
     435        36678 :     switch (info)
     436              :     {
     437         6972 :         case XLOG_HEAP2_MULTI_INSERT:
     438         6972 :             if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
     439         6972 :                 !ctx->fast_forward)
     440         6868 :                 DecodeMultiInsert(ctx, buf);
     441         6972 :             break;
     442        28178 :         case XLOG_HEAP2_NEW_CID:
     443        28178 :             if (!ctx->fast_forward)
     444              :             {
     445              :                 xl_heap_new_cid *xlrec;
     446              : 
     447        27795 :                 xlrec = (xl_heap_new_cid *) XLogRecGetData(buf->record);
     448        27795 :                 SnapBuildProcessNewCid(builder, xid, buf->origptr, xlrec);
     449              :             }
     450        28178 :             break;
     451           90 :         case XLOG_HEAP2_REWRITE:
     452              : 
     453              :             /*
     454              :              * Although these records only exist to serve the needs of logical
     455              :              * decoding, all the work happens as part of crash or archive
     456              :              * recovery, so we don't need to do anything here.
     457              :              */
     458           90 :             break;
     459              : 
     460              :             /*
     461              :              * Everything else here is just low level physical stuff we're not
     462              :              * interested in.
     463              :              */
     464         1438 :         case XLOG_HEAP2_PRUNE_ON_ACCESS:
     465              :         case XLOG_HEAP2_PRUNE_VACUUM_SCAN:
     466              :         case XLOG_HEAP2_PRUNE_VACUUM_CLEANUP:
     467              :         case XLOG_HEAP2_LOCK_UPDATED:
     468         1438 :             break;
     469            0 :         default:
     470            0 :             elog(ERROR, "unexpected RM_HEAP2_ID record type: %u", info);
     471              :     }
     472              : }
     473              : 
     474              : /*
     475              :  * Handle rmgr HEAP_ID records for LogicalDecodingProcessRecord().
     476              :  */
     477              : void
     478      1650905 : heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     479              : {
     480      1650905 :     uint8       info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK;
     481      1650905 :     TransactionId xid = XLogRecGetXid(buf->record);
     482      1650905 :     SnapBuild  *builder = ctx->snapshot_builder;
     483              : 
     484      1650905 :     ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
     485              : 
     486              :     /*
     487              :      * If we don't have snapshot or we are just fast-forwarding, there is no
     488              :      * point in decoding data changes. However, it's crucial to build the base
     489              :      * snapshot during fast-forward mode (as is done in
     490              :      * SnapBuildProcessChange()) because we require the snapshot's xmin when
     491              :      * determining the candidate catalog_xmin for the replication slot. See
     492              :      * SnapBuildProcessRunningXacts().
     493              :      */
     494      1650905 :     if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
     495            3 :         return;
     496              : 
     497      1650902 :     switch (info)
     498              :     {
     499      1108751 :         case XLOG_HEAP_INSERT:
     500      1108751 :             if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
     501      1108751 :                 !ctx->fast_forward)
     502      1108553 :                 DecodeInsert(ctx, buf);
     503      1108751 :             break;
     504              : 
     505              :             /*
     506              :              * Treat HOT update as normal updates. There is no useful
     507              :              * information in the fact that we could make it a HOT update
     508              :              * locally and the WAL layout is compatible.
     509              :              */
     510       154448 :         case XLOG_HEAP_HOT_UPDATE:
     511              :         case XLOG_HEAP_UPDATE:
     512       154448 :             if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
     513       154448 :                 !ctx->fast_forward)
     514       154435 :                 DecodeUpdate(ctx, buf);
     515       154448 :             break;
     516              : 
     517       208083 :         case XLOG_HEAP_DELETE:
     518       208083 :             if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
     519       208083 :                 !ctx->fast_forward)
     520       207989 :                 DecodeDelete(ctx, buf);
     521       208083 :             break;
     522              : 
     523           75 :         case XLOG_HEAP_TRUNCATE:
     524           75 :             if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
     525           75 :                 !ctx->fast_forward)
     526           73 :                 DecodeTruncate(ctx, buf);
     527           75 :             break;
     528              : 
     529         1217 :         case XLOG_HEAP_INPLACE:
     530              : 
     531              :             /*
     532              :              * Inplace updates are only ever performed on catalog tuples and
     533              :              * can, per definition, not change tuple visibility.  Since we
     534              :              * also don't decode catalog tuples, we're not interested in the
     535              :              * record's contents.
     536              :              */
     537         1217 :             break;
     538              : 
     539        17964 :         case XLOG_HEAP_CONFIRM:
     540        17964 :             if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
     541        17964 :                 !ctx->fast_forward)
     542        17964 :                 DecodeSpecConfirm(ctx, buf);
     543        17964 :             break;
     544              : 
     545       160364 :         case XLOG_HEAP_LOCK:
     546              :             /* we don't care about row level locks for now */
     547       160364 :             break;
     548              : 
     549            0 :         default:
     550            0 :             elog(ERROR, "unexpected RM_HEAP_ID record type: %u", info);
     551              :             break;
     552              :     }
     553              : }
     554              : 
     555              : /*
     556              :  * Ask output plugin whether we want to skip this PREPARE and send
     557              :  * this transaction as a regular commit later.
     558              :  */
     559              : static inline bool
     560          352 : FilterPrepare(LogicalDecodingContext *ctx, TransactionId xid,
     561              :               const char *gid)
     562              : {
     563              :     /*
     564              :      * Skip if decoding of two-phase transactions at PREPARE time is not
     565              :      * enabled. In that case, all two-phase transactions are considered
     566              :      * filtered out and will be applied as regular transactions at COMMIT
     567              :      * PREPARED.
     568              :      */
     569          352 :     if (!ctx->twophase)
     570           23 :         return true;
     571              : 
     572              :     /*
     573              :      * The filter_prepare callback is optional. When not supplied, all
     574              :      * prepared transactions should go through.
     575              :      */
     576          329 :     if (ctx->callbacks.filter_prepare_cb == NULL)
     577          181 :         return false;
     578              : 
     579          148 :     return filter_prepare_cb_wrapper(ctx, xid, gid);
     580              : }
     581              : 
     582              : static inline bool
     583      1486288 : FilterByOrigin(LogicalDecodingContext *ctx, ReplOriginId origin_id)
     584              : {
     585      1486288 :     if (ctx->callbacks.filter_by_origin_cb == NULL)
     586           42 :         return false;
     587              : 
     588      1486246 :     return filter_by_origin_cb_wrapper(ctx, origin_id);
     589              : }
     590              : 
     591              : /*
     592              :  * Handle rmgr LOGICALMSG_ID records for LogicalDecodingProcessRecord().
     593              :  */
     594              : void
     595           98 : logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     596              : {
     597           98 :     SnapBuild  *builder = ctx->snapshot_builder;
     598           98 :     XLogReaderState *r = buf->record;
     599           98 :     TransactionId xid = XLogRecGetXid(r);
     600           98 :     uint8       info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
     601           98 :     ReplOriginId origin_id = XLogRecGetOrigin(r);
     602           98 :     Snapshot    snapshot = NULL;
     603              :     xl_logical_message *message;
     604              : 
     605           98 :     if (info != XLOG_LOGICAL_MESSAGE)
     606            0 :         elog(ERROR, "unexpected RM_LOGICALMSG_ID record type: %u", info);
     607              : 
     608           98 :     ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
     609              : 
     610              :     /* If we don't have snapshot, there is no point in decoding messages */
     611           98 :     if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
     612            0 :         return;
     613              : 
     614           98 :     message = (xl_logical_message *) XLogRecGetData(r);
     615              : 
     616          194 :     if (message->dbId != ctx->slot->data.database ||
     617           96 :         FilterByOrigin(ctx, origin_id))
     618            4 :         return;
     619              : 
     620           94 :     if (message->transactional &&
     621           39 :         !SnapBuildProcessChange(builder, xid, buf->origptr))
     622            0 :         return;
     623          149 :     else if (!message->transactional &&
     624          110 :              (SnapBuildCurrentState(builder) != SNAPBUILD_CONSISTENT ||
     625           55 :               SnapBuildXactNeedsSkip(builder, buf->origptr)))
     626           44 :         return;
     627              : 
     628              :     /*
     629              :      * We also skip decoding in fast_forward mode. This check must be last
     630              :      * because we don't want to set the processing_required flag unless we
     631              :      * have a decodable message.
     632              :      */
     633           50 :     if (ctx->fast_forward)
     634              :     {
     635              :         /*
     636              :          * We need to set processing_required flag to notify the message's
     637              :          * existence to the caller. Usually, the flag is set when either the
     638              :          * COMMIT or ABORT records are decoded, but this must be turned on
     639              :          * here because the non-transactional logical message is decoded
     640              :          * without waiting for these records.
     641              :          */
     642            3 :         if (!message->transactional)
     643            3 :             ctx->processing_required = true;
     644              : 
     645            3 :         return;
     646              :     }
     647              : 
     648              :     /*
     649              :      * If this is a non-transactional change, get the snapshot we're expected
     650              :      * to use. We only get here when the snapshot is consistent, and the
     651              :      * change is not meant to be skipped.
     652              :      *
     653              :      * For transactional changes we don't need a snapshot, we'll use the
     654              :      * regular snapshot maintained by ReorderBuffer. We just leave it NULL.
     655              :      */
     656           47 :     if (!message->transactional)
     657            8 :         snapshot = SnapBuildGetOrBuildSnapshot(builder);
     658              : 
     659           47 :     ReorderBufferQueueMessage(ctx->reorder, xid, snapshot, buf->endptr,
     660           47 :                               message->transactional,
     661           47 :                               message->message, /* first part of message is
     662              :                                                  * prefix */
     663              :                               message->message_size,
     664           47 :                               message->message + message->prefix_size);
     665              : }
     666              : 
     667              : /*
     668              :  * Consolidated commit record handling between the different form of commit
     669              :  * records.
     670              :  *
     671              :  * 'two_phase' indicates that caller wants to process the transaction in two
     672              :  * phases, first process prepare if not already done and then process
     673              :  * commit_prepared.
     674              :  */
     675              : static void
     676         3860 : DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
     677              :              xl_xact_parsed_commit *parsed, TransactionId xid,
     678              :              bool two_phase)
     679              : {
     680         3860 :     XLogRecPtr  origin_lsn = InvalidXLogRecPtr;
     681         3860 :     TimestampTz commit_time = parsed->xact_time;
     682         3860 :     ReplOriginId origin_id = XLogRecGetOrigin(buf->record);
     683              :     int         i;
     684              : 
     685         3860 :     if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
     686              :     {
     687          104 :         origin_lsn = parsed->origin_lsn;
     688          104 :         commit_time = parsed->origin_timestamp;
     689              :     }
     690              : 
     691         3860 :     SnapBuildCommitTxn(ctx->snapshot_builder, buf->origptr, xid,
     692              :                        parsed->nsubxacts, parsed->subxacts,
     693              :                        parsed->xinfo);
     694              : 
     695              :     /* ----
     696              :      * Check whether we are interested in this specific transaction, and tell
     697              :      * the reorderbuffer to forget the content of the (sub-)transactions
     698              :      * if not.
     699              :      *
     700              :      * We can't just use ReorderBufferAbort() here, because we need to execute
     701              :      * the transaction's invalidations.  This currently won't be needed if
     702              :      * we're just skipping over the transaction because currently we only do
     703              :      * so during startup, to get to the first transaction the client needs. As
     704              :      * we have reset the catalog caches before starting to read WAL, and we
     705              :      * haven't yet touched any catalogs, there can't be anything to invalidate.
     706              :      * But if we're "forgetting" this commit because it happened in another
     707              :      * database, the invalidations might be important, because they could be
     708              :      * for shared catalogs and we might have loaded data into the relevant
     709              :      * syscaches.
     710              :      * ---
     711              :      */
     712         3860 :     if (DecodeTXNNeedSkip(ctx, buf, parsed->dbId, origin_id))
     713              :     {
     714         3027 :         for (i = 0; i < parsed->nsubxacts; i++)
     715              :         {
     716          967 :             ReorderBufferForget(ctx->reorder, parsed->subxacts[i], buf->origptr);
     717              :         }
     718         2060 :         ReorderBufferForget(ctx->reorder, xid, buf->origptr);
     719              : 
     720         2060 :         return;
     721              :     }
     722              : 
     723              :     /* tell the reorderbuffer about the surviving subtransactions */
     724         2066 :     for (i = 0; i < parsed->nsubxacts; i++)
     725              :     {
     726          266 :         ReorderBufferCommitChild(ctx->reorder, xid, parsed->subxacts[i],
     727              :                                  buf->origptr, buf->endptr);
     728              :     }
     729              : 
     730              :     /*
     731              :      * Send the final commit record if the transaction data is already
     732              :      * decoded, otherwise, process the entire transaction.
     733              :      */
     734         1800 :     if (two_phase)
     735              :     {
     736           35 :         ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
     737           35 :                                     SnapBuildGetTwoPhaseAt(ctx->snapshot_builder),
     738              :                                     commit_time, origin_id, origin_lsn,
     739           35 :                                     parsed->twophase_gid, true);
     740              :     }
     741              :     else
     742              :     {
     743         1765 :         ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
     744              :                             commit_time, origin_id, origin_lsn);
     745              :     }
     746              : 
     747              :     /*
     748              :      * Update the decoding stats at transaction prepare/commit/abort.
     749              :      * Additionally we send the stats when we spill or stream the changes to
     750              :      * avoid losing them in case the decoding is interrupted. It is not clear
     751              :      * that sending more or less frequently than this would be better.
     752              :      */
     753         1790 :     UpdateDecodingStats(ctx);
     754              : }
     755              : 
     756              : /*
     757              :  * Decode PREPARE record. Similar logic as in DecodeCommit.
     758              :  *
     759              :  * Note that we don't skip prepare even if have detected concurrent abort
     760              :  * because it is quite possible that we had already sent some changes before we
     761              :  * detect abort in which case we need to abort those changes in the subscriber.
     762              :  * To abort such changes, we do send the prepare and then the rollback prepared
     763              :  * which is what happened on the publisher-side as well. Now, we can invent a
     764              :  * new abort API wherein in such cases we send abort and skip sending prepared
     765              :  * and rollback prepared but then it is not that straightforward because we
     766              :  * might have streamed this transaction by that time in which case it is
     767              :  * handled when the rollback is encountered. It is not impossible to optimize
     768              :  * the concurrent abort case but it can introduce design complexity w.r.t
     769              :  * handling different cases so leaving it for now as it doesn't seem worth it.
     770              :  */
     771              : static void
     772          167 : DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
     773              :               xl_xact_parsed_prepare *parsed)
     774              : {
     775          167 :     SnapBuild  *builder = ctx->snapshot_builder;
     776          167 :     XLogRecPtr  origin_lsn = parsed->origin_lsn;
     777          167 :     TimestampTz prepare_time = parsed->xact_time;
     778          167 :     ReplOriginId origin_id = XLogRecGetOrigin(buf->record);
     779              :     int         i;
     780          167 :     TransactionId xid = parsed->twophase_xid;
     781              : 
     782          167 :     if (parsed->origin_timestamp != 0)
     783            8 :         prepare_time = parsed->origin_timestamp;
     784              : 
     785              :     /*
     786              :      * Remember the prepare info for a txn so that it can be used later in
     787              :      * commit prepared if required. See ReorderBufferFinishPrepared.
     788              :      */
     789          167 :     if (!ReorderBufferRememberPrepareInfo(ctx->reorder, xid, buf->origptr,
     790              :                                           buf->endptr, prepare_time, origin_id,
     791              :                                           origin_lsn))
     792            0 :         return;
     793              : 
     794              :     /* We can't start streaming unless a consistent state is reached. */
     795          167 :     if (SnapBuildCurrentState(builder) < SNAPBUILD_CONSISTENT)
     796              :     {
     797            3 :         ReorderBufferSkipPrepare(ctx->reorder, xid);
     798            3 :         return;
     799              :     }
     800              : 
     801              :     /*
     802              :      * Check whether we need to process this transaction. See
     803              :      * DecodeTXNNeedSkip for the reasons why we sometimes want to skip the
     804              :      * transaction.
     805              :      *
     806              :      * We can't call ReorderBufferForget as we did in DecodeCommit as the txn
     807              :      * hasn't yet been committed, removing this txn before a commit might
     808              :      * result in the computation of an incorrect restart_lsn. See
     809              :      * SnapBuildProcessRunningXacts. But we need to process cache
     810              :      * invalidations if there are any for the reasons mentioned in
     811              :      * DecodeCommit.
     812              :      */
     813          164 :     if (DecodeTXNNeedSkip(ctx, buf, parsed->dbId, origin_id))
     814              :     {
     815          120 :         ReorderBufferSkipPrepare(ctx->reorder, xid);
     816          120 :         ReorderBufferInvalidate(ctx->reorder, xid, buf->origptr);
     817          120 :         return;
     818              :     }
     819              : 
     820              :     /* Tell the reorderbuffer about the surviving subtransactions. */
     821           45 :     for (i = 0; i < parsed->nsubxacts; i++)
     822              :     {
     823            1 :         ReorderBufferCommitChild(ctx->reorder, xid, parsed->subxacts[i],
     824              :                                  buf->origptr, buf->endptr);
     825              :     }
     826              : 
     827              :     /* replay actions of all transaction + subtransactions in order */
     828           44 :     ReorderBufferPrepare(ctx->reorder, xid, parsed->twophase_gid);
     829              : 
     830              :     /*
     831              :      * Update the decoding stats at transaction prepare/commit/abort.
     832              :      * Additionally we send the stats when we spill or stream the changes to
     833              :      * avoid losing them in case the decoding is interrupted. It is not clear
     834              :      * that sending more or less frequently than this would be better.
     835              :      */
     836           44 :     UpdateDecodingStats(ctx);
     837              : }
     838              : 
     839              : 
     840              : /*
     841              :  * Get the data from the various forms of abort records and pass it on to
     842              :  * snapbuild.c and reorderbuffer.c.
     843              :  *
     844              :  * 'two_phase' indicates to finish prepared transaction.
     845              :  */
     846              : static void
     847          206 : DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
     848              :             xl_xact_parsed_abort *parsed, TransactionId xid,
     849              :             bool two_phase)
     850              : {
     851              :     int         i;
     852          206 :     XLogRecPtr  origin_lsn = InvalidXLogRecPtr;
     853          206 :     TimestampTz abort_time = parsed->xact_time;
     854          206 :     ReplOriginId origin_id = XLogRecGetOrigin(buf->record);
     855              :     bool        skip_xact;
     856              : 
     857          206 :     if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
     858              :     {
     859            4 :         origin_lsn = parsed->origin_lsn;
     860            4 :         abort_time = parsed->origin_timestamp;
     861              :     }
     862              : 
     863              :     /*
     864              :      * Check whether we need to process this transaction. See
     865              :      * DecodeTXNNeedSkip for the reasons why we sometimes want to skip the
     866              :      * transaction.
     867              :      */
     868          206 :     skip_xact = DecodeTXNNeedSkip(ctx, buf, parsed->dbId, origin_id);
     869              : 
     870              :     /*
     871              :      * Send the final rollback record for a prepared transaction unless we
     872              :      * need to skip it. For non-two-phase xacts, simply forget the xact.
     873              :      */
     874          206 :     if (two_phase && !skip_xact)
     875              :     {
     876           10 :         ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
     877              :                                     InvalidXLogRecPtr,
     878              :                                     abort_time, origin_id, origin_lsn,
     879           10 :                                     parsed->twophase_gid, false);
     880              :     }
     881              :     else
     882              :     {
     883          202 :         for (i = 0; i < parsed->nsubxacts; i++)
     884              :         {
     885            6 :             ReorderBufferAbort(ctx->reorder, parsed->subxacts[i],
     886            6 :                                buf->record->EndRecPtr, abort_time);
     887              :         }
     888              : 
     889          196 :         ReorderBufferAbort(ctx->reorder, xid, buf->record->EndRecPtr,
     890              :                            abort_time);
     891              :     }
     892              : 
     893              :     /* update the decoding stats */
     894          206 :     UpdateDecodingStats(ctx);
     895          206 : }
     896              : 
     897              : /*
     898              :  * Parse XLOG_HEAP_INSERT (not MULTI_INSERT!) records into tuplebufs.
     899              :  *
     900              :  * Inserts can contain the new tuple.
     901              :  */
     902              : static void
     903      1108553 : DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     904              : {
     905              :     Size        datalen;
     906              :     char       *tupledata;
     907              :     Size        tuplelen;
     908      1108553 :     XLogReaderState *r = buf->record;
     909              :     xl_heap_insert *xlrec;
     910              :     ReorderBufferChange *change;
     911              :     RelFileLocator target_locator;
     912              : 
     913      1108553 :     xlrec = (xl_heap_insert *) XLogRecGetData(r);
     914              : 
     915              :     /*
     916              :      * Ignore insert records without new tuples (this does happen when
     917              :      * raw_heap_insert marks the TOAST record as HEAP_INSERT_NO_LOGICAL).
     918              :      */
     919      1108553 :     if (!(xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE))
     920         4712 :         return;
     921              : 
     922              :     /* only interested in our database */
     923      1103956 :     XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
     924      1103956 :     if (target_locator.dbOid != ctx->slot->data.database)
     925            0 :         return;
     926              : 
     927              :     /* output plugin doesn't look for this origin, no need to queue */
     928      1103956 :     if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
     929          115 :         return;
     930              : 
     931      1103841 :     change = ReorderBufferAllocChange(ctx->reorder);
     932      1103841 :     if (!(xlrec->flags & XLH_INSERT_IS_SPECULATIVE))
     933      1085877 :         change->action = REORDER_BUFFER_CHANGE_INSERT;
     934              :     else
     935        17964 :         change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT;
     936      1103841 :     change->origin_id = XLogRecGetOrigin(r);
     937              : 
     938      1103841 :     memcpy(&change->data.tp.rlocator, &target_locator, sizeof(RelFileLocator));
     939              : 
     940      1103841 :     tupledata = XLogRecGetBlockData(r, 0, &datalen);
     941      1103841 :     tuplelen = datalen - SizeOfHeapHeader;
     942              : 
     943      1103841 :     change->data.tp.newtuple =
     944      1103841 :         ReorderBufferAllocTupleBuf(ctx->reorder, tuplelen);
     945              : 
     946      1103841 :     DecodeXLogTuple(tupledata, datalen, change->data.tp.newtuple);
     947              : 
     948      1103841 :     change->data.tp.clear_toast_afterwards = true;
     949              : 
     950      1103841 :     ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
     951              :                              change,
     952      1103841 :                              xlrec->flags & XLH_INSERT_ON_TOAST_RELATION);
     953              : }
     954              : 
     955              : /*
     956              :  * Parse XLOG_HEAP_UPDATE and XLOG_HEAP_HOT_UPDATE, which have the same layout
     957              :  * in the record, from wal into proper tuplebufs.
     958              :  *
     959              :  * Updates can possibly contain a new tuple and the old primary key.
     960              :  */
     961              : static void
     962       154435 : DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     963              : {
     964       154435 :     XLogReaderState *r = buf->record;
     965              :     xl_heap_update *xlrec;
     966              :     ReorderBufferChange *change;
     967              :     char       *data;
     968              :     RelFileLocator target_locator;
     969              : 
     970       154435 :     xlrec = (xl_heap_update *) XLogRecGetData(r);
     971              : 
     972              :     /* only interested in our database */
     973       154435 :     XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
     974       154435 :     if (target_locator.dbOid != ctx->slot->data.database)
     975          236 :         return;
     976              : 
     977              :     /* output plugin doesn't look for this origin, no need to queue */
     978       154232 :     if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
     979           33 :         return;
     980              : 
     981       154199 :     change = ReorderBufferAllocChange(ctx->reorder);
     982       154199 :     change->action = REORDER_BUFFER_CHANGE_UPDATE;
     983       154199 :     change->origin_id = XLogRecGetOrigin(r);
     984       154199 :     memcpy(&change->data.tp.rlocator, &target_locator, sizeof(RelFileLocator));
     985              : 
     986       154199 :     if (xlrec->flags & XLH_UPDATE_CONTAINS_NEW_TUPLE)
     987              :     {
     988              :         Size        datalen;
     989              :         Size        tuplelen;
     990              : 
     991       152269 :         data = XLogRecGetBlockData(r, 0, &datalen);
     992              : 
     993       152269 :         tuplelen = datalen - SizeOfHeapHeader;
     994              : 
     995       152269 :         change->data.tp.newtuple =
     996       152269 :             ReorderBufferAllocTupleBuf(ctx->reorder, tuplelen);
     997              : 
     998       152269 :         DecodeXLogTuple(data, datalen, change->data.tp.newtuple);
     999              :     }
    1000              : 
    1001       154199 :     if (xlrec->flags & XLH_UPDATE_CONTAINS_OLD)
    1002              :     {
    1003              :         Size        datalen;
    1004              :         Size        tuplelen;
    1005              : 
    1006              :         /* caution, remaining data in record is not aligned */
    1007          383 :         data = XLogRecGetData(r) + SizeOfHeapUpdate;
    1008          383 :         datalen = XLogRecGetDataLen(r) - SizeOfHeapUpdate;
    1009          383 :         tuplelen = datalen - SizeOfHeapHeader;
    1010              : 
    1011          383 :         change->data.tp.oldtuple =
    1012          383 :             ReorderBufferAllocTupleBuf(ctx->reorder, tuplelen);
    1013              : 
    1014          383 :         DecodeXLogTuple(data, datalen, change->data.tp.oldtuple);
    1015              :     }
    1016              : 
    1017       154199 :     change->data.tp.clear_toast_afterwards = true;
    1018              : 
    1019       154199 :     ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
    1020              :                              change, false);
    1021              : }
    1022              : 
    1023              : /*
    1024              :  * Parse XLOG_HEAP_DELETE from wal into proper tuplebufs.
    1025              :  *
    1026              :  * Deletes can possibly contain the old primary key.
    1027              :  */
    1028              : static void
    1029       207989 : DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
    1030              : {
    1031       207989 :     XLogReaderState *r = buf->record;
    1032              :     xl_heap_delete *xlrec;
    1033              :     ReorderBufferChange *change;
    1034              :     RelFileLocator target_locator;
    1035              : 
    1036       207989 :     xlrec = (xl_heap_delete *) XLogRecGetData(r);
    1037              : 
    1038              :     /* only interested in our database */
    1039       207989 :     XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
    1040       207989 :     if (target_locator.dbOid != ctx->slot->data.database)
    1041           49 :         return;
    1042              : 
    1043              :     /* output plugin doesn't look for this origin, no need to queue */
    1044       207958 :     if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
    1045           18 :         return;
    1046              : 
    1047       207940 :     change = ReorderBufferAllocChange(ctx->reorder);
    1048              : 
    1049       207940 :     if (xlrec->flags & XLH_DELETE_IS_SUPER)
    1050            0 :         change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT;
    1051              :     else
    1052       207940 :         change->action = REORDER_BUFFER_CHANGE_DELETE;
    1053              : 
    1054       207940 :     change->origin_id = XLogRecGetOrigin(r);
    1055              : 
    1056       207940 :     memcpy(&change->data.tp.rlocator, &target_locator, sizeof(RelFileLocator));
    1057              : 
    1058              :     /* old primary key stored */
    1059       207940 :     if (xlrec->flags & XLH_DELETE_CONTAINS_OLD)
    1060              :     {
    1061       145897 :         Size        datalen = XLogRecGetDataLen(r) - SizeOfHeapDelete;
    1062       145897 :         Size        tuplelen = datalen - SizeOfHeapHeader;
    1063              : 
    1064              :         Assert(XLogRecGetDataLen(r) > (SizeOfHeapDelete + SizeOfHeapHeader));
    1065              : 
    1066       145897 :         change->data.tp.oldtuple =
    1067       145897 :             ReorderBufferAllocTupleBuf(ctx->reorder, tuplelen);
    1068              : 
    1069       145897 :         DecodeXLogTuple((char *) xlrec + SizeOfHeapDelete,
    1070              :                         datalen, change->data.tp.oldtuple);
    1071              :     }
    1072              : 
    1073       207940 :     change->data.tp.clear_toast_afterwards = true;
    1074              : 
    1075       207940 :     ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
    1076              :                              change, false);
    1077              : }
    1078              : 
    1079              : /*
    1080              :  * Parse XLOG_HEAP_TRUNCATE from wal
    1081              :  */
    1082              : static void
    1083           73 : DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
    1084              : {
    1085           73 :     XLogReaderState *r = buf->record;
    1086              :     xl_heap_truncate *xlrec;
    1087              :     ReorderBufferChange *change;
    1088              : 
    1089           73 :     xlrec = (xl_heap_truncate *) XLogRecGetData(r);
    1090              : 
    1091              :     /* only interested in our database */
    1092           73 :     if (xlrec->dbId != ctx->slot->data.database)
    1093            0 :         return;
    1094              : 
    1095              :     /* output plugin doesn't look for this origin, no need to queue */
    1096           73 :     if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
    1097            1 :         return;
    1098              : 
    1099           72 :     change = ReorderBufferAllocChange(ctx->reorder);
    1100           72 :     change->action = REORDER_BUFFER_CHANGE_TRUNCATE;
    1101           72 :     change->origin_id = XLogRecGetOrigin(r);
    1102           72 :     if (xlrec->flags & XLH_TRUNCATE_CASCADE)
    1103            1 :         change->data.truncate.cascade = true;
    1104           72 :     if (xlrec->flags & XLH_TRUNCATE_RESTART_SEQS)
    1105            2 :         change->data.truncate.restart_seqs = true;
    1106           72 :     change->data.truncate.nrelids = xlrec->nrelids;
    1107          144 :     change->data.truncate.relids = ReorderBufferAllocRelids(ctx->reorder,
    1108           72 :                                                             xlrec->nrelids);
    1109           72 :     memcpy(change->data.truncate.relids, xlrec->relids,
    1110           72 :            xlrec->nrelids * sizeof(Oid));
    1111           72 :     ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r),
    1112              :                              buf->origptr, change, false);
    1113              : }
    1114              : 
    1115              : /*
    1116              :  * Decode XLOG_HEAP2_MULTI_INSERT record into multiple tuplebufs.
    1117              :  *
    1118              :  * Currently MULTI_INSERT will always contain the full tuples.
    1119              :  */
    1120              : static void
    1121         6868 : DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
    1122              : {
    1123         6868 :     XLogReaderState *r = buf->record;
    1124              :     xl_heap_multi_insert *xlrec;
    1125              :     int         i;
    1126              :     char       *data;
    1127              :     char       *tupledata;
    1128              :     Size        tuplelen;
    1129              :     RelFileLocator rlocator;
    1130              : 
    1131         6868 :     xlrec = (xl_heap_multi_insert *) XLogRecGetData(r);
    1132              : 
    1133              :     /*
    1134              :      * Ignore insert records without new tuples.  This happens when a
    1135              :      * multi_insert is done on a catalog or on a non-persistent relation.
    1136              :      */
    1137         6868 :     if (!(xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE))
    1138         6854 :         return;
    1139              : 
    1140              :     /* only interested in our database */
    1141           33 :     XLogRecGetBlockTag(r, 0, &rlocator, NULL, NULL);
    1142           33 :     if (rlocator.dbOid != ctx->slot->data.database)
    1143           19 :         return;
    1144              : 
    1145              :     /* output plugin doesn't look for this origin, no need to queue */
    1146           14 :     if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
    1147            0 :         return;
    1148              : 
    1149              :     /*
    1150              :      * We know that this multi_insert isn't for a catalog, so the block should
    1151              :      * always have data even if a full-page write of it is taken.
    1152              :      */
    1153           14 :     tupledata = XLogRecGetBlockData(r, 0, &tuplelen);
    1154              :     Assert(tupledata != NULL);
    1155              : 
    1156           14 :     data = tupledata;
    1157         1053 :     for (i = 0; i < xlrec->ntuples; i++)
    1158              :     {
    1159              :         ReorderBufferChange *change;
    1160              :         xl_multi_insert_tuple *xlhdr;
    1161              :         int         datalen;
    1162              :         HeapTuple   tuple;
    1163              :         HeapTupleHeader header;
    1164              : 
    1165         1039 :         change = ReorderBufferAllocChange(ctx->reorder);
    1166         1039 :         change->action = REORDER_BUFFER_CHANGE_INSERT;
    1167         1039 :         change->origin_id = XLogRecGetOrigin(r);
    1168              : 
    1169         1039 :         memcpy(&change->data.tp.rlocator, &rlocator, sizeof(RelFileLocator));
    1170              : 
    1171         1039 :         xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(data);
    1172         1039 :         data = ((char *) xlhdr) + SizeOfMultiInsertTuple;
    1173         1039 :         datalen = xlhdr->datalen;
    1174              : 
    1175         1039 :         change->data.tp.newtuple =
    1176         1039 :             ReorderBufferAllocTupleBuf(ctx->reorder, datalen);
    1177              : 
    1178         1039 :         tuple = change->data.tp.newtuple;
    1179         1039 :         header = tuple->t_data;
    1180              : 
    1181              :         /* not a disk based tuple */
    1182         1039 :         ItemPointerSetInvalid(&tuple->t_self);
    1183              : 
    1184              :         /*
    1185              :          * We can only figure this out after reassembling the transactions.
    1186              :          */
    1187         1039 :         tuple->t_tableOid = InvalidOid;
    1188              : 
    1189         1039 :         tuple->t_len = datalen + SizeofHeapTupleHeader;
    1190              : 
    1191         1039 :         memset(header, 0, SizeofHeapTupleHeader);
    1192              : 
    1193         1039 :         memcpy((char *) tuple->t_data + SizeofHeapTupleHeader, data, datalen);
    1194         1039 :         header->t_infomask = xlhdr->t_infomask;
    1195         1039 :         header->t_infomask2 = xlhdr->t_infomask2;
    1196         1039 :         header->t_hoff = xlhdr->t_hoff;
    1197              : 
    1198              :         /*
    1199              :          * Reset toast reassembly state only after the last row in the last
    1200              :          * xl_multi_insert_tuple record emitted by one heap_multi_insert()
    1201              :          * call.
    1202              :          */
    1203         1039 :         if (xlrec->flags & XLH_INSERT_LAST_IN_MULTI &&
    1204          179 :             (i + 1) == xlrec->ntuples)
    1205            9 :             change->data.tp.clear_toast_afterwards = true;
    1206              :         else
    1207         1030 :             change->data.tp.clear_toast_afterwards = false;
    1208              : 
    1209         1039 :         ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r),
    1210              :                                  buf->origptr, change, false);
    1211              : 
    1212              :         /* move to the next xl_multi_insert_tuple entry */
    1213         1039 :         data += datalen;
    1214              :     }
    1215              :     Assert(data == tupledata + tuplelen);
    1216              : }
    1217              : 
    1218              : /*
    1219              :  * Parse XLOG_HEAP_CONFIRM from wal into a confirmation change.
    1220              :  *
    1221              :  * This is pretty trivial, all the state essentially already setup by the
    1222              :  * speculative insertion.
    1223              :  */
    1224              : static void
    1225        17964 : DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
    1226              : {
    1227        17964 :     XLogReaderState *r = buf->record;
    1228              :     ReorderBufferChange *change;
    1229              :     RelFileLocator target_locator;
    1230              : 
    1231              :     /* only interested in our database */
    1232        17964 :     XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
    1233        17964 :     if (target_locator.dbOid != ctx->slot->data.database)
    1234            0 :         return;
    1235              : 
    1236              :     /* output plugin doesn't look for this origin, no need to queue */
    1237        17964 :     if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
    1238            0 :         return;
    1239              : 
    1240        17964 :     change = ReorderBufferAllocChange(ctx->reorder);
    1241        17964 :     change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM;
    1242        17964 :     change->origin_id = XLogRecGetOrigin(r);
    1243              : 
    1244        17964 :     memcpy(&change->data.tp.rlocator, &target_locator, sizeof(RelFileLocator));
    1245              : 
    1246        17964 :     change->data.tp.clear_toast_afterwards = true;
    1247              : 
    1248        17964 :     ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
    1249              :                              change, false);
    1250              : }
    1251              : 
    1252              : 
    1253              : /*
    1254              :  * Read a HeapTuple as WAL logged by heap_insert, heap_update and heap_delete
    1255              :  * (but not by heap_multi_insert) into a tuplebuf.
    1256              :  *
    1257              :  * The size 'len' and the pointer 'data' in the record need to be
    1258              :  * computed outside as they are record specific.
    1259              :  */
    1260              : static void
    1261      1402390 : DecodeXLogTuple(char *data, Size len, HeapTuple tuple)
    1262              : {
    1263              :     xl_heap_header xlhdr;
    1264      1402390 :     int         datalen = len - SizeOfHeapHeader;
    1265              :     HeapTupleHeader header;
    1266              : 
    1267              :     Assert(datalen >= 0);
    1268              : 
    1269      1402390 :     tuple->t_len = datalen + SizeofHeapTupleHeader;
    1270      1402390 :     header = tuple->t_data;
    1271              : 
    1272              :     /* not a disk based tuple */
    1273      1402390 :     ItemPointerSetInvalid(&tuple->t_self);
    1274              : 
    1275              :     /* we can only figure this out after reassembling the transactions */
    1276      1402390 :     tuple->t_tableOid = InvalidOid;
    1277              : 
    1278              :     /* data is not stored aligned, copy to aligned storage */
    1279      1402390 :     memcpy(&xlhdr, data, SizeOfHeapHeader);
    1280              : 
    1281      1402390 :     memset(header, 0, SizeofHeapTupleHeader);
    1282              : 
    1283      1402390 :     memcpy(((char *) tuple->t_data) + SizeofHeapTupleHeader,
    1284      1402390 :            data + SizeOfHeapHeader,
    1285              :            datalen);
    1286              : 
    1287      1402390 :     header->t_infomask = xlhdr.t_infomask;
    1288      1402390 :     header->t_infomask2 = xlhdr.t_infomask2;
    1289      1402390 :     header->t_hoff = xlhdr.t_hoff;
    1290      1402390 : }
    1291              : 
    1292              : /*
    1293              :  * Check whether we are interested in this specific transaction.
    1294              :  *
    1295              :  * There can be several reasons we might not be interested in this
    1296              :  * transaction:
    1297              :  * 1) We might not be interested in decoding transactions up to this
    1298              :  *    LSN. This can happen because we previously decoded it and now just
    1299              :  *    are restarting or if we haven't assembled a consistent snapshot yet.
    1300              :  * 2) The transaction happened in another database.
    1301              :  * 3) The output plugin is not interested in the origin.
    1302              :  * 4) We are doing fast-forwarding
    1303              :  */
    1304              : static bool
    1305         4230 : DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
    1306              :                   Oid txn_dbid, ReplOriginId origin_id)
    1307              : {
    1308         4230 :     if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
    1309         3948 :         (txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) ||
    1310         1995 :         FilterByOrigin(ctx, origin_id))
    1311         2274 :         return true;
    1312              : 
    1313              :     /*
    1314              :      * We also skip decoding in fast_forward mode. In passing set the
    1315              :      * processing_required flag to indicate that if it were not for
    1316              :      * fast_forward mode, processing would have been required.
    1317              :      */
    1318         1956 :     if (ctx->fast_forward)
    1319              :     {
    1320           39 :         ctx->processing_required = true;
    1321           39 :         return true;
    1322              :     }
    1323              : 
    1324         1917 :     return false;
    1325              : }
        

Generated by: LCOV version 2.0-1