LCOV - code coverage report
Current view: top level - src/backend/replication/logical - decode.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19beta1 Lines: 93.0 % 456 424
Test Date: 2026-06-26 23:16:30 Functions: 95.2 % 21 20
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /* -------------------------------------------------------------------------
       2              :  *
       3              :  * decode.c
       4              :  *      This module decodes WAL records read using xlogreader.h's APIs for the
       5              :  *      purpose of logical decoding by passing information to the
       6              :  *      reorderbuffer module (containing the actual changes) and to the
       7              :  *      snapbuild module to build a fitting catalog snapshot (to be able to
       8              :  *      properly decode the changes in the reorderbuffer).
       9              :  *
      10              :  * NOTE:
      11              :  *      This basically tries to handle all low level xlog stuff for
      12              :  *      reorderbuffer.c and snapbuild.c. There's some minor leakage where a
      13              :  *      specific record's struct is used to pass data along, but those just
      14              :  *      happen to contain the right amount of data in a convenient
      15              :  *      format. There isn't and shouldn't be much intelligence about the
      16              :  *      contents of records in here except turning them into a more usable
      17              :  *      format.
      18              :  *
      19              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
      20              :  * Portions Copyright (c) 1994, Regents of the University of California
      21              :  *
      22              :  * IDENTIFICATION
      23              :  *    src/backend/replication/logical/decode.c
      24              :  *
      25              :  * -------------------------------------------------------------------------
      26              :  */
      27              : #include "postgres.h"
      28              : 
      29              : #include "access/heapam_xlog.h"
      30              : #include "access/transam.h"
      31              : #include "access/xact.h"
      32              : #include "access/xlog_internal.h"
      33              : #include "access/xlogreader.h"
      34              : #include "access/xlogrecord.h"
      35              : #include "catalog/pg_control.h"
      36              : #include "commands/repack.h"
      37              : #include "replication/decode.h"
      38              : #include "replication/logical.h"
      39              : #include "replication/message.h"
      40              : #include "replication/reorderbuffer.h"
      41              : #include "replication/snapbuild.h"
      42              : #include "storage/standbydefs.h"
      43              : 
      44              : /* individual record(group)'s handlers */
      45              : static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      46              : static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      47              : static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      48              : static void DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      49              : static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      50              : static void DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      51              : 
      52              : static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
      53              :                          xl_xact_parsed_commit *parsed, TransactionId xid,
      54              :                          bool two_phase);
      55              : static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
      56              :                         xl_xact_parsed_abort *parsed, TransactionId xid,
      57              :                         bool two_phase);
      58              : static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
      59              :                           xl_xact_parsed_prepare *parsed);
      60              : 
      61              : 
      62              : /* common function to decode tuples */
      63              : static void DecodeXLogTuple(char *data, Size len, HeapTuple tuple);
      64              : 
      65              : /* helper functions for decoding transactions */
      66              : static inline bool FilterPrepare(LogicalDecodingContext *ctx,
      67              :                                  TransactionId xid, const char *gid);
      68              : static bool DecodeTXNNeedSkip(LogicalDecodingContext *ctx,
      69              :                               XLogRecordBuffer *buf, Oid txn_dbid,
      70              :                               ReplOriginId origin_id);
      71              : 
      72              : /*
      73              :  * Take every XLogReadRecord()ed record and perform the actions required to
      74              :  * decode it using the output plugin already setup in the logical decoding
      75              :  * context.
      76              :  *
      77              :  * NB: Note that every record's xid needs to be processed by reorderbuffer
      78              :  * (xids contained in the content of records are not relevant for this rule).
      79              :  * That means that for records which'd otherwise not go through the
      80              :  * reorderbuffer ReorderBufferProcessXid() has to be called. We don't want to
      81              :  * call ReorderBufferProcessXid for each record type by default, because
      82              :  * e.g. empty xacts can be handled more efficiently if there's no previous
      83              :  * state for them.
      84              :  *
      85              :  * We also support the ability to fast forward thru records, skipping some
      86              :  * record types completely - see individual record types for details.
      87              :  */
      88              : void
      89      2354285 : LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
      90              : {
      91              :     XLogRecordBuffer buf;
      92              :     TransactionId txid;
      93              :     RmgrData    rmgr;
      94              : 
      95      2354285 :     buf.origptr = ctx->reader->ReadRecPtr;
      96      2354285 :     buf.endptr = ctx->reader->EndRecPtr;
      97      2354285 :     buf.record = record;
      98              : 
      99      2354285 :     txid = XLogRecGetTopXid(record);
     100              : 
     101              :     /*
     102              :      * If the top-level xid is valid, we need to assign the subxact to the
     103              :      * top-level xact. We need to do this for all records, hence we do it
     104              :      * before the switch.
     105              :      */
     106      2354285 :     if (TransactionIdIsValid(txid))
     107              :     {
     108          671 :         ReorderBufferAssignChild(ctx->reorder,
     109              :                                  txid,
     110          671 :                                  XLogRecGetXid(record),
     111              :                                  buf.origptr);
     112              :     }
     113              : 
     114      2354285 :     rmgr = GetRmgr(XLogRecGetRmid(record));
     115              : 
     116      2354285 :     if (rmgr.rm_decode != NULL)
     117      1798613 :         rmgr.rm_decode(ctx, &buf);
     118              :     else
     119              :     {
     120              :         /* just deal with xid, and done */
     121       555672 :         ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(record),
     122              :                                 buf.origptr);
     123              :     }
     124      2354273 : }
     125              : 
     126              : /*
     127              :  * Handle rmgr XLOG_ID records for LogicalDecodingProcessRecord().
     128              :  */
     129              : void
     130         6530 : xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     131              : {
     132         6530 :     SnapBuild  *builder = ctx->snapshot_builder;
     133         6530 :     uint8       info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK;
     134              : 
     135         6530 :     ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(buf->record),
     136              :                             buf->origptr);
     137              : 
     138         6530 :     switch (info)
     139              :     {
     140              :             /* this is also used in END_OF_RECOVERY checkpoints */
     141           93 :         case XLOG_CHECKPOINT_SHUTDOWN:
     142              :         case XLOG_END_OF_RECOVERY:
     143           93 :             SnapBuildSerializationPoint(builder, buf->origptr);
     144              : 
     145           93 :             break;
     146           80 :         case XLOG_CHECKPOINT_ONLINE:
     147              : 
     148              :             /*
     149              :              * a RUNNING_XACTS record will have been logged near to this, we
     150              :              * can restart from there.
     151              :              */
     152           80 :             break;
     153            0 :         case XLOG_LOGICAL_DECODING_STATUS_CHANGE:
     154              :             {
     155              :                 bool        logical_decoding;
     156              : 
     157            0 :                 memcpy(&logical_decoding, XLogRecGetData(buf->record), sizeof(bool));
     158              : 
     159              :                 /*
     160              :                  * Error out as we should not decode this WAL record.
     161              :                  *
     162              :                  * Logical decoding is disabled, and existing logical slots on
     163              :                  * the standby are invalidated when this WAL record is
     164              :                  * replayed. No logical decoder can process this WAL record
     165              :                  * until replay completes, and by then the slots are already
     166              :                  * invalidated. Furthermore, no new logical slots can be
     167              :                  * created while logical decoding is disabled. This cannot
     168              :                  * occur even on primary either, since it will not restart
     169              :                  * with wal_level < replica if any logical slots exist.
     170              :                  */
     171            0 :                 elog(ERROR, "unexpected logical decoding status change %d",
     172              :                      logical_decoding);
     173              : 
     174              :                 break;
     175              :             }
     176         6357 :         case XLOG_NOOP:
     177              :         case XLOG_NEXTOID:
     178              :         case XLOG_SWITCH:
     179              :         case XLOG_BACKUP_END:
     180              :         case XLOG_PARAMETER_CHANGE:
     181              :         case XLOG_RESTORE_POINT:
     182              :         case XLOG_FPW_CHANGE:
     183              :         case XLOG_FPI_FOR_HINT:
     184              :         case XLOG_FPI:
     185              :         case XLOG_OVERWRITE_CONTRECORD:
     186              :         case XLOG_CHECKPOINT_REDO:
     187         6357 :             break;
     188            0 :         default:
     189            0 :             elog(ERROR, "unexpected RM_XLOG_ID record type: %u", info);
     190              :     }
     191         6530 : }
     192              : 
     193              : void
     194            0 : xlog2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     195              : {
     196            0 :     uint8       info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK;
     197              : 
     198            0 :     ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(buf->record), buf->origptr);
     199              : 
     200            0 :     switch (info)
     201              :     {
     202            0 :         case XLOG2_CHECKSUMS:
     203            0 :             break;
     204            0 :         default:
     205            0 :             elog(ERROR, "unexpected RM_XLOG2_ID record type: %u", info);
     206              :     }
     207            0 : }
     208              : 
     209              : /*
     210              :  * Handle rmgr XACT_ID records for LogicalDecodingProcessRecord().
     211              :  */
     212              : void
     213        10577 : xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     214              : {
     215        10577 :     SnapBuild  *builder = ctx->snapshot_builder;
     216        10577 :     ReorderBuffer *reorder = ctx->reorder;
     217        10577 :     XLogReaderState *r = buf->record;
     218        10577 :     uint8       info = XLogRecGetInfo(r) & XLOG_XACT_OPMASK;
     219              : 
     220              :     /*
     221              :      * If the snapshot isn't yet fully built, we cannot decode anything, so
     222              :      * bail out.
     223              :      */
     224        10577 :     if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
     225           13 :         return;
     226              : 
     227        10564 :     switch (info)
     228              :     {
     229         3967 :         case XLOG_XACT_COMMIT:
     230              :         case XLOG_XACT_COMMIT_PREPARED:
     231              :             {
     232              :                 xl_xact_commit *xlrec;
     233              :                 xl_xact_parsed_commit parsed;
     234              :                 TransactionId xid;
     235         3967 :                 bool        two_phase = false;
     236              : 
     237         3967 :                 xlrec = (xl_xact_commit *) XLogRecGetData(r);
     238         3967 :                 ParseCommitRecord(XLogRecGetInfo(buf->record), xlrec, &parsed);
     239              : 
     240         3967 :                 if (!TransactionIdIsValid(parsed.twophase_xid))
     241         3839 :                     xid = XLogRecGetXid(r);
     242              :                 else
     243          128 :                     xid = parsed.twophase_xid;
     244              : 
     245              :                 /*
     246              :                  * We would like to process the transaction in a two-phase
     247              :                  * manner iff output plugin supports two-phase commits and
     248              :                  * doesn't filter the transaction at prepare time.
     249              :                  */
     250         3967 :                 if (info == XLOG_XACT_COMMIT_PREPARED)
     251          128 :                     two_phase = !(FilterPrepare(ctx, xid,
     252          128 :                                                 parsed.twophase_gid));
     253              : 
     254         3967 :                 DecodeCommit(ctx, buf, &parsed, xid, two_phase);
     255         3957 :                 break;
     256              :             }
     257          260 :         case XLOG_XACT_ABORT:
     258              :         case XLOG_XACT_ABORT_PREPARED:
     259              :             {
     260              :                 xl_xact_abort *xlrec;
     261              :                 xl_xact_parsed_abort parsed;
     262              :                 TransactionId xid;
     263          260 :                 bool        two_phase = false;
     264              : 
     265          260 :                 xlrec = (xl_xact_abort *) XLogRecGetData(r);
     266          260 :                 ParseAbortRecord(XLogRecGetInfo(buf->record), xlrec, &parsed);
     267              : 
     268          260 :                 if (!TransactionIdIsValid(parsed.twophase_xid))
     269          209 :                     xid = XLogRecGetXid(r);
     270              :                 else
     271           51 :                     xid = parsed.twophase_xid;
     272              : 
     273              :                 /*
     274              :                  * We would like to process the transaction in a two-phase
     275              :                  * manner iff output plugin supports two-phase commits and
     276              :                  * doesn't filter the transaction at prepare time.
     277              :                  */
     278          260 :                 if (info == XLOG_XACT_ABORT_PREPARED)
     279           51 :                     two_phase = !(FilterPrepare(ctx, xid,
     280           51 :                                                 parsed.twophase_gid));
     281              : 
     282          260 :                 DecodeAbort(ctx, buf, &parsed, xid, two_phase);
     283          260 :                 break;
     284              :             }
     285          124 :         case XLOG_XACT_ASSIGNMENT:
     286              : 
     287              :             /*
     288              :              * We assign subxact to the toplevel xact while processing each
     289              :              * record if required.  So, we don't need to do anything here. See
     290              :              * LogicalDecodingProcessRecord.
     291              :              */
     292          124 :             break;
     293         6011 :         case XLOG_XACT_INVALIDATIONS:
     294              :             {
     295              :                 TransactionId xid;
     296              :                 xl_xact_invals *invals;
     297              : 
     298         6011 :                 xid = XLogRecGetXid(r);
     299         6011 :                 invals = (xl_xact_invals *) XLogRecGetData(r);
     300              : 
     301              :                 /*
     302              :                  * Execute the invalidations for xid-less transactions,
     303              :                  * otherwise, accumulate them so that they can be processed at
     304              :                  * the commit time.
     305              :                  */
     306         6011 :                 if (TransactionIdIsValid(xid))
     307              :                 {
     308         5979 :                     if (!ctx->fast_forward)
     309         5904 :                         ReorderBufferAddInvalidations(reorder, xid,
     310              :                                                       buf->origptr,
     311         5904 :                                                       invals->nmsgs,
     312         5904 :                                                       invals->msgs);
     313         5979 :                     ReorderBufferXidSetCatalogChanges(ctx->reorder, xid,
     314              :                                                       buf->origptr);
     315              :                 }
     316           32 :                 else if (!ctx->fast_forward)
     317           32 :                     ReorderBufferImmediateInvalidation(ctx->reorder,
     318           32 :                                                        invals->nmsgs,
     319           32 :                                                        invals->msgs);
     320              : 
     321         6011 :                 break;
     322              :             }
     323          202 :         case XLOG_XACT_PREPARE:
     324              :             {
     325              :                 xl_xact_parsed_prepare parsed;
     326              :                 xl_xact_prepare *xlrec;
     327              : 
     328              :                 /* ok, parse it */
     329          202 :                 xlrec = (xl_xact_prepare *) XLogRecGetData(r);
     330          202 :                 ParsePrepareRecord(XLogRecGetInfo(buf->record),
     331              :                                    xlrec, &parsed);
     332              : 
     333              :                 /*
     334              :                  * We would like to process the transaction in a two-phase
     335              :                  * manner iff output plugin supports two-phase commits and
     336              :                  * doesn't filter the transaction at prepare time.
     337              :                  */
     338          202 :                 if (FilterPrepare(ctx, parsed.twophase_xid,
     339              :                                   parsed.twophase_gid))
     340              :                 {
     341           20 :                     ReorderBufferProcessXid(reorder, parsed.twophase_xid,
     342              :                                             buf->origptr);
     343           20 :                     break;
     344              :                 }
     345              : 
     346              :                 /*
     347              :                  * Note that if the prepared transaction has locked [user]
     348              :                  * catalog tables exclusively then decoding prepare can block
     349              :                  * till the main transaction is committed because it needs to
     350              :                  * lock the catalog tables.
     351              :                  *
     352              :                  * XXX Now, this can even lead to a deadlock if the prepare
     353              :                  * transaction is waiting to get it logically replicated for
     354              :                  * distributed 2PC. This can be avoided by disallowing
     355              :                  * preparing transactions that have locked [user] catalog
     356              :                  * tables exclusively but as of now, we ask users not to do
     357              :                  * such an operation.
     358              :                  */
     359          182 :                 DecodePrepare(ctx, buf, &parsed);
     360          182 :                 break;
     361              :             }
     362            0 :         default:
     363            0 :             elog(ERROR, "unexpected RM_XACT_ID record type: %u", info);
     364              :     }
     365              : }
     366              : 
     367              : /*
     368              :  * Handle rmgr STANDBY_ID records for LogicalDecodingProcessRecord().
     369              :  */
     370              : void
     371         4383 : standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     372              : {
     373         4383 :     SnapBuild  *builder = ctx->snapshot_builder;
     374         4383 :     XLogReaderState *r = buf->record;
     375         4383 :     uint8       info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
     376              : 
     377         4383 :     ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
     378              : 
     379         4383 :     switch (info)
     380              :     {
     381         1703 :         case XLOG_RUNNING_XACTS:
     382              :             {
     383         1703 :                 xl_running_xacts *running = (xl_running_xacts *) XLogRecGetData(r);
     384              : 
     385              :                 /*
     386              :                  * Update this decoder's idea of transactions currently
     387              :                  * running.  In doing so we will determine whether we have
     388              :                  * reached consistent status.
     389              :                  */
     390         1703 :                 SnapBuildProcessRunningXacts(builder, buf->origptr, running);
     391              : 
     392              :                 /*
     393              :                  * Abort all transactions that we keep track of, that are
     394              :                  * older than the record's oldestRunningXid. This is the most
     395              :                  * convenient spot for doing so since, in contrast to shutdown
     396              :                  * or end-of-recovery checkpoints, we have information about
     397              :                  * all running transactions which includes prepared ones,
     398              :                  * while shutdown checkpoints just know that no non-prepared
     399              :                  * transactions are in progress.
     400              :                  */
     401         1701 :                 ReorderBufferAbortOld(ctx->reorder, running->oldestRunningXid);
     402              :             }
     403         1701 :             break;
     404         2648 :         case XLOG_STANDBY_LOCK:
     405         2648 :             break;
     406           32 :         case XLOG_INVALIDATIONS:
     407              : 
     408              :             /*
     409              :              * We are processing the invalidations at the command level via
     410              :              * XLOG_XACT_INVALIDATIONS.  So we don't need to do anything here.
     411              :              */
     412           32 :             break;
     413            0 :         default:
     414            0 :             elog(ERROR, "unexpected RM_STANDBY_ID record type: %u", info);
     415              :     }
     416         4381 : }
     417              : 
     418              : /*
     419              :  * Handle rmgr HEAP2_ID records for LogicalDecodingProcessRecord().
     420              :  */
     421              : void
     422        36944 : heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     423              : {
     424        36944 :     uint8       info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK;
     425        36944 :     TransactionId xid = XLogRecGetXid(buf->record);
     426        36944 :     SnapBuild  *builder = ctx->snapshot_builder;
     427              : 
     428        36944 :     ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
     429              : 
     430              :     /*
     431              :      * If we don't have snapshot or we are just fast-forwarding, there is no
     432              :      * point in decoding data changes. However, it's crucial to build the base
     433              :      * snapshot during fast-forward mode (as is done in
     434              :      * SnapBuildProcessChange()) because we require the snapshot's xmin when
     435              :      * determining the candidate catalog_xmin for the replication slot. See
     436              :      * SnapBuildProcessRunningXacts().
     437              :      */
     438        36944 :     if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
     439            8 :         return;
     440              : 
     441        36936 :     switch (info)
     442              :     {
     443         6958 :         case XLOG_HEAP2_MULTI_INSERT:
     444         6958 :             if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
     445         6958 :                 !ctx->fast_forward &&
     446         6855 :                 !change_useless_for_repack(buf))
     447         6781 :                 DecodeMultiInsert(ctx, buf);
     448         6958 :             break;
     449        27981 :         case XLOG_HEAP2_NEW_CID:
     450        27981 :             if (!ctx->fast_forward)
     451              :             {
     452              :                 xl_heap_new_cid *xlrec;
     453              : 
     454        27598 :                 xlrec = (xl_heap_new_cid *) XLogRecGetData(buf->record);
     455        27598 :                 SnapBuildProcessNewCid(builder, xid, buf->origptr, xlrec);
     456              :             }
     457        27981 :             break;
     458           90 :         case XLOG_HEAP2_REWRITE:
     459              : 
     460              :             /*
     461              :              * Although these records only exist to serve the needs of logical
     462              :              * decoding, all the work happens as part of crash or archive
     463              :              * recovery, so we don't need to do anything here.
     464              :              */
     465           90 :             break;
     466              : 
     467              :             /*
     468              :              * Everything else here is just low level physical stuff we're not
     469              :              * interested in.
     470              :              */
     471         1907 :         case XLOG_HEAP2_PRUNE_ON_ACCESS:
     472              :         case XLOG_HEAP2_PRUNE_VACUUM_SCAN:
     473              :         case XLOG_HEAP2_PRUNE_VACUUM_CLEANUP:
     474              :         case XLOG_HEAP2_LOCK_UPDATED:
     475         1907 :             break;
     476            0 :         default:
     477            0 :             elog(ERROR, "unexpected RM_HEAP2_ID record type: %u", info);
     478              :     }
     479              : }
     480              : 
     481              : /*
     482              :  * Handle rmgr HEAP_ID records for LogicalDecodingProcessRecord().
     483              :  */
     484              : void
     485      1740081 : heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     486              : {
     487      1740081 :     uint8       info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK;
     488      1740081 :     TransactionId xid = XLogRecGetXid(buf->record);
     489      1740081 :     SnapBuild  *builder = ctx->snapshot_builder;
     490              : 
     491      1740081 :     ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
     492              : 
     493              :     /*
     494              :      * If we don't have snapshot or we are just fast-forwarding, there is no
     495              :      * point in decoding data changes. However, it's crucial to build the base
     496              :      * snapshot during fast-forward mode (as is done in
     497              :      * SnapBuildProcessChange()) because we require the snapshot's xmin when
     498              :      * determining the candidate catalog_xmin for the replication slot. See
     499              :      * SnapBuildProcessRunningXacts().
     500              :      */
     501      1740081 :     if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
     502         1009 :         return;
     503              : 
     504      1739072 :     switch (info)
     505              :     {
     506      1125138 :         case XLOG_HEAP_INSERT:
     507      1125138 :             if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
     508      1125138 :                 !ctx->fast_forward &&
     509      1124940 :                 !change_useless_for_repack(buf))
     510      1124721 :                 DecodeInsert(ctx, buf);
     511      1125138 :             break;
     512              : 
     513              :             /*
     514              :              * Treat HOT update as normal updates. There is no useful
     515              :              * information in the fact that we could make it a HOT update
     516              :              * locally and the WAL layout is compatible.
     517              :              */
     518       181103 :         case XLOG_HEAP_HOT_UPDATE:
     519              :         case XLOG_HEAP_UPDATE:
     520       181103 :             if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
     521       181103 :                 !ctx->fast_forward &&
     522       181090 :                 !change_useless_for_repack(buf))
     523       181055 :                 DecodeUpdate(ctx, buf);
     524       181103 :             break;
     525              : 
     526       238110 :         case XLOG_HEAP_DELETE:
     527       238110 :             if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
     528       238110 :                 !ctx->fast_forward &&
     529       238016 :                 !change_useless_for_repack(buf))
     530       237991 :                 DecodeDelete(ctx, buf);
     531       238110 :             break;
     532              : 
     533           64 :         case XLOG_HEAP_TRUNCATE:
     534           64 :             if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
     535           64 :                 !ctx->fast_forward &&
     536           62 :                 !change_useless_for_repack(buf))
     537           62 :                 DecodeTruncate(ctx, buf);
     538           64 :             break;
     539              : 
     540         1228 :         case XLOG_HEAP_INPLACE:
     541              : 
     542              :             /*
     543              :              * Inplace updates are only ever performed on catalog tuples and
     544              :              * can, per definition, not change tuple visibility.  Since we
     545              :              * also don't decode catalog tuples, we're not interested in the
     546              :              * record's contents.
     547              :              */
     548         1228 :             break;
     549              : 
     550        17965 :         case XLOG_HEAP_CONFIRM:
     551        17965 :             if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
     552        17965 :                 !ctx->fast_forward &&
     553        17965 :                 !change_useless_for_repack(buf))
     554        17965 :                 DecodeSpecConfirm(ctx, buf);
     555        17965 :             break;
     556              : 
     557       175464 :         case XLOG_HEAP_LOCK:
     558              :             /* we don't care about row level locks for now */
     559       175464 :             break;
     560              : 
     561            0 :         default:
     562            0 :             elog(ERROR, "unexpected RM_HEAP_ID record type: %u", info);
     563              :             break;
     564              :     }
     565              : }
     566              : 
     567              : /*
     568              :  * Ask output plugin whether we want to skip this PREPARE and send
     569              :  * this transaction as a regular commit later.
     570              :  */
     571              : static inline bool
     572          381 : FilterPrepare(LogicalDecodingContext *ctx, TransactionId xid,
     573              :               const char *gid)
     574              : {
     575              :     /*
     576              :      * Skip if decoding of two-phase transactions at PREPARE time is not
     577              :      * enabled. In that case, all two-phase transactions are considered
     578              :      * filtered out and will be applied as regular transactions at COMMIT
     579              :      * PREPARED.
     580              :      */
     581          381 :     if (!ctx->twophase)
     582           23 :         return true;
     583              : 
     584              :     /*
     585              :      * The filter_prepare callback is optional. When not supplied, all
     586              :      * prepared transactions should go through.
     587              :      */
     588          358 :     if (ctx->callbacks.filter_prepare_cb == NULL)
     589          210 :         return false;
     590              : 
     591          148 :     return filter_prepare_cb_wrapper(ctx, xid, gid);
     592              : }
     593              : 
     594              : static inline bool
     595      1559254 : FilterByOrigin(LogicalDecodingContext *ctx, ReplOriginId origin_id)
     596              : {
     597      1559254 :     if (ctx->callbacks.filter_by_origin_cb == NULL)
     598          131 :         return false;
     599              : 
     600      1559123 :     return filter_by_origin_cb_wrapper(ctx, origin_id);
     601              : }
     602              : 
     603              : /*
     604              :  * Handle rmgr LOGICALMSG_ID records for LogicalDecodingProcessRecord().
     605              :  */
     606              : void
     607           98 : logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     608              : {
     609           98 :     SnapBuild  *builder = ctx->snapshot_builder;
     610           98 :     XLogReaderState *r = buf->record;
     611           98 :     TransactionId xid = XLogRecGetXid(r);
     612           98 :     uint8       info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
     613           98 :     ReplOriginId origin_id = XLogRecGetOrigin(r);
     614           98 :     Snapshot    snapshot = NULL;
     615              :     xl_logical_message *message;
     616              : 
     617           98 :     if (info != XLOG_LOGICAL_MESSAGE)
     618            0 :         elog(ERROR, "unexpected RM_LOGICALMSG_ID record type: %u", info);
     619              : 
     620           98 :     ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
     621              : 
     622              :     /* If we don't have snapshot, there is no point in decoding messages */
     623           98 :     if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
     624            0 :         return;
     625              : 
     626           98 :     message = (xl_logical_message *) XLogRecGetData(r);
     627              : 
     628          194 :     if (message->dbId != ctx->slot->data.database ||
     629           96 :         FilterByOrigin(ctx, origin_id))
     630            4 :         return;
     631              : 
     632           94 :     if (message->transactional &&
     633           39 :         !SnapBuildProcessChange(builder, xid, buf->origptr))
     634            0 :         return;
     635          149 :     else if (!message->transactional &&
     636          110 :              (SnapBuildCurrentState(builder) != SNAPBUILD_CONSISTENT ||
     637           55 :               SnapBuildXactNeedsSkip(builder, buf->origptr)))
     638           44 :         return;
     639              : 
     640              :     /*
     641              :      * We also skip decoding in fast_forward mode. This check must be last
     642              :      * because we don't want to set the processing_required flag unless we
     643              :      * have a decodable message.
     644              :      */
     645           50 :     if (ctx->fast_forward)
     646              :     {
     647              :         /*
     648              :          * We need to set processing_required flag to notify the message's
     649              :          * existence to the caller. Usually, the flag is set when either the
     650              :          * COMMIT or ABORT records are decoded, but this must be turned on
     651              :          * here because the non-transactional logical message is decoded
     652              :          * without waiting for these records.
     653              :          */
     654            3 :         if (!message->transactional)
     655            3 :             ctx->processing_required = true;
     656              : 
     657            3 :         return;
     658              :     }
     659              : 
     660              :     /*
     661              :      * If this is a non-transactional change, get the snapshot we're expected
     662              :      * to use. We only get here when the snapshot is consistent, and the
     663              :      * change is not meant to be skipped.
     664              :      *
     665              :      * For transactional changes we don't need a snapshot, we'll use the
     666              :      * regular snapshot maintained by ReorderBuffer. We just leave it NULL.
     667              :      */
     668           47 :     if (!message->transactional)
     669            8 :         snapshot = SnapBuildGetOrBuildSnapshot(builder);
     670              : 
     671           47 :     ReorderBufferQueueMessage(ctx->reorder, xid, snapshot, buf->endptr,
     672           47 :                               message->transactional,
     673           47 :                               message->message, /* first part of message is
     674              :                                                  * prefix */
     675              :                               message->message_size,
     676           47 :                               message->message + message->prefix_size);
     677              : }
     678              : 
     679              : /*
     680              :  * Consolidated commit record handling between the different form of commit
     681              :  * records.
     682              :  *
     683              :  * 'two_phase' indicates that caller wants to process the transaction in two
     684              :  * phases, first process prepare if not already done and then process
     685              :  * commit_prepared.
     686              :  */
     687              : static void
     688         3967 : DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
     689              :              xl_xact_parsed_commit *parsed, TransactionId xid,
     690              :              bool two_phase)
     691              : {
     692         3967 :     XLogRecPtr  origin_lsn = InvalidXLogRecPtr;
     693         3967 :     TimestampTz commit_time = parsed->xact_time;
     694         3967 :     ReplOriginId origin_id = XLogRecGetOrigin(buf->record);
     695              :     int         i;
     696              : 
     697         3967 :     if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
     698              :     {
     699          105 :         origin_lsn = parsed->origin_lsn;
     700          105 :         commit_time = parsed->origin_timestamp;
     701              :     }
     702              : 
     703         3967 :     SnapBuildCommitTxn(ctx->snapshot_builder, buf->origptr, xid,
     704              :                        parsed->nsubxacts, parsed->subxacts,
     705              :                        parsed->xinfo);
     706              : 
     707              :     /* ----
     708              :      * Check whether we are interested in this specific transaction, and tell
     709              :      * the reorderbuffer to forget the content of the (sub-)transactions
     710              :      * if not.
     711              :      *
     712              :      * We can't just use ReorderBufferAbort() here, because we need to execute
     713              :      * the transaction's invalidations.  This currently won't be needed if
     714              :      * we're just skipping over the transaction because currently we only do
     715              :      * so during startup, to get to the first transaction the client needs. As
     716              :      * we have reset the catalog caches before starting to read WAL, and we
     717              :      * haven't yet touched any catalogs, there can't be anything to invalidate.
     718              :      * But if we're "forgetting" this commit because it happened in another
     719              :      * database, the invalidations might be important, because they could be
     720              :      * for shared catalogs and we might have loaded data into the relevant
     721              :      * syscaches.
     722              :      * ---
     723              :      */
     724         3967 :     if (DecodeTXNNeedSkip(ctx, buf, parsed->dbId, origin_id))
     725              :     {
     726         2946 :         for (i = 0; i < parsed->nsubxacts; i++)
     727              :         {
     728          967 :             ReorderBufferForget(ctx->reorder, parsed->subxacts[i], buf->origptr);
     729              :         }
     730         1979 :         ReorderBufferForget(ctx->reorder, xid, buf->origptr);
     731              : 
     732         1979 :         return;
     733              :     }
     734              : 
     735              :     /* tell the reorderbuffer about the surviving subtransactions */
     736         2257 :     for (i = 0; i < parsed->nsubxacts; i++)
     737              :     {
     738          269 :         ReorderBufferCommitChild(ctx->reorder, xid, parsed->subxacts[i],
     739              :                                  buf->origptr, buf->endptr);
     740              :     }
     741              : 
     742              :     /*
     743              :      * Send the final commit record if the transaction data is already
     744              :      * decoded, otherwise, process the entire transaction.
     745              :      */
     746         1988 :     if (two_phase)
     747              :     {
     748           35 :         ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
     749           35 :                                     SnapBuildGetTwoPhaseAt(ctx->snapshot_builder),
     750              :                                     commit_time, origin_id, origin_lsn,
     751           35 :                                     parsed->twophase_gid, true);
     752              :     }
     753              :     else
     754              :     {
     755         1953 :         ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
     756              :                             commit_time, origin_id, origin_lsn);
     757              :     }
     758              : 
     759              :     /*
     760              :      * Update the decoding stats at transaction prepare/commit/abort.
     761              :      * Additionally we send the stats when we spill or stream the changes to
     762              :      * avoid losing them in case the decoding is interrupted. It is not clear
     763              :      * that sending more or less frequently than this would be better.
     764              :      */
     765         1978 :     UpdateDecodingStats(ctx);
     766              : }
     767              : 
     768              : /*
     769              :  * Decode PREPARE record. Similar logic as in DecodeCommit.
     770              :  *
     771              :  * Note that we don't skip prepare even if have detected concurrent abort
     772              :  * because it is quite possible that we had already sent some changes before we
     773              :  * detect abort in which case we need to abort those changes in the subscriber.
     774              :  * To abort such changes, we do send the prepare and then the rollback prepared
     775              :  * which is what happened on the publisher-side as well. Now, we can invent a
     776              :  * new abort API wherein in such cases we send abort and skip sending prepared
     777              :  * and rollback prepared but then it is not that straightforward because we
     778              :  * might have streamed this transaction by that time in which case it is
     779              :  * handled when the rollback is encountered. It is not impossible to optimize
     780              :  * the concurrent abort case but it can introduce design complexity w.r.t
     781              :  * handling different cases so leaving it for now as it doesn't seem worth it.
     782              :  */
     783              : static void
     784          182 : DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
     785              :               xl_xact_parsed_prepare *parsed)
     786              : {
     787          182 :     SnapBuild  *builder = ctx->snapshot_builder;
     788          182 :     XLogRecPtr  origin_lsn = parsed->origin_lsn;
     789          182 :     TimestampTz prepare_time = parsed->xact_time;
     790          182 :     ReplOriginId origin_id = XLogRecGetOrigin(buf->record);
     791              :     int         i;
     792          182 :     TransactionId xid = parsed->twophase_xid;
     793              : 
     794          182 :     if (parsed->origin_timestamp != 0)
     795            8 :         prepare_time = parsed->origin_timestamp;
     796              : 
     797              :     /*
     798              :      * Remember the prepare info for a txn so that it can be used later in
     799              :      * commit prepared if required. See ReorderBufferFinishPrepared.
     800              :      */
     801          182 :     if (!ReorderBufferRememberPrepareInfo(ctx->reorder, xid, buf->origptr,
     802              :                                           buf->endptr, prepare_time, origin_id,
     803              :                                           origin_lsn))
     804            0 :         return;
     805              : 
     806              :     /* We can't start streaming unless a consistent state is reached. */
     807          182 :     if (SnapBuildCurrentState(builder) < SNAPBUILD_CONSISTENT)
     808              :     {
     809            3 :         ReorderBufferSkipPrepare(ctx->reorder, xid);
     810            3 :         return;
     811              :     }
     812              : 
     813              :     /*
     814              :      * Check whether we need to process this transaction. See
     815              :      * DecodeTXNNeedSkip for the reasons why we sometimes want to skip the
     816              :      * transaction.
     817              :      *
     818              :      * We can't call ReorderBufferForget as we did in DecodeCommit as the txn
     819              :      * hasn't yet been committed, removing this txn before a commit might
     820              :      * result in the computation of an incorrect restart_lsn. See
     821              :      * SnapBuildProcessRunningXacts. But we need to process cache
     822              :      * invalidations if there are any for the reasons mentioned in
     823              :      * DecodeCommit.
     824              :      */
     825          179 :     if (DecodeTXNNeedSkip(ctx, buf, parsed->dbId, origin_id))
     826              :     {
     827          134 :         ReorderBufferSkipPrepare(ctx->reorder, xid);
     828          134 :         ReorderBufferInvalidate(ctx->reorder, xid, buf->origptr);
     829          134 :         return;
     830              :     }
     831              : 
     832              :     /* Tell the reorderbuffer about the surviving subtransactions. */
     833           46 :     for (i = 0; i < parsed->nsubxacts; i++)
     834              :     {
     835            1 :         ReorderBufferCommitChild(ctx->reorder, xid, parsed->subxacts[i],
     836              :                                  buf->origptr, buf->endptr);
     837              :     }
     838              : 
     839              :     /* replay actions of all transaction + subtransactions in order */
     840           45 :     ReorderBufferPrepare(ctx->reorder, xid, parsed->twophase_gid);
     841              : 
     842              :     /*
     843              :      * Update the decoding stats at transaction prepare/commit/abort.
     844              :      * Additionally we send the stats when we spill or stream the changes to
     845              :      * avoid losing them in case the decoding is interrupted. It is not clear
     846              :      * that sending more or less frequently than this would be better.
     847              :      */
     848           45 :     UpdateDecodingStats(ctx);
     849              : }
     850              : 
     851              : 
     852              : /*
     853              :  * Get the data from the various forms of abort records and pass it on to
     854              :  * snapbuild.c and reorderbuffer.c.
     855              :  *
     856              :  * 'two_phase' indicates to finish prepared transaction.
     857              :  */
     858              : static void
     859          260 : DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
     860              :             xl_xact_parsed_abort *parsed, TransactionId xid,
     861              :             bool two_phase)
     862              : {
     863              :     int         i;
     864          260 :     XLogRecPtr  origin_lsn = InvalidXLogRecPtr;
     865          260 :     TimestampTz abort_time = parsed->xact_time;
     866          260 :     ReplOriginId origin_id = XLogRecGetOrigin(buf->record);
     867              :     bool        skip_xact;
     868              : 
     869          260 :     if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
     870              :     {
     871            4 :         origin_lsn = parsed->origin_lsn;
     872            4 :         abort_time = parsed->origin_timestamp;
     873              :     }
     874              : 
     875              :     /*
     876              :      * Check whether we need to process this transaction. See
     877              :      * DecodeTXNNeedSkip for the reasons why we sometimes want to skip the
     878              :      * transaction.
     879              :      */
     880          260 :     skip_xact = DecodeTXNNeedSkip(ctx, buf, parsed->dbId, origin_id);
     881              : 
     882              :     /*
     883              :      * Send the final rollback record for a prepared transaction unless we
     884              :      * need to skip it. For non-two-phase xacts, simply forget the xact.
     885              :      */
     886          260 :     if (two_phase && !skip_xact)
     887              :     {
     888           11 :         ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
     889              :                                     InvalidXLogRecPtr,
     890              :                                     abort_time, origin_id, origin_lsn,
     891           11 :                                     parsed->twophase_gid, false);
     892              :     }
     893              :     else
     894              :     {
     895          255 :         for (i = 0; i < parsed->nsubxacts; i++)
     896              :         {
     897            6 :             ReorderBufferAbort(ctx->reorder, parsed->subxacts[i],
     898            6 :                                buf->record->EndRecPtr, abort_time);
     899              :         }
     900              : 
     901          249 :         ReorderBufferAbort(ctx->reorder, xid, buf->record->EndRecPtr,
     902              :                            abort_time);
     903              :     }
     904              : 
     905              :     /* update the decoding stats */
     906          260 :     UpdateDecodingStats(ctx);
     907          260 : }
     908              : 
     909              : /*
     910              :  * Parse XLOG_HEAP_INSERT (not MULTI_INSERT!) records into tuplebufs.
     911              :  *
     912              :  * Inserts can contain the new tuple.
     913              :  */
     914              : static void
     915      1124721 : DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     916              : {
     917              :     Size        datalen;
     918              :     char       *tupledata;
     919              :     Size        tuplelen;
     920      1124721 :     XLogReaderState *r = buf->record;
     921              :     xl_heap_insert *xlrec;
     922              :     ReorderBufferChange *change;
     923              :     RelFileLocator target_locator;
     924              : 
     925      1124721 :     xlrec = (xl_heap_insert *) XLogRecGetData(r);
     926              : 
     927              :     /*
     928              :      * Ignore insert records without new tuples (this does happen when
     929              :      * raw_heap_insert marks the TOAST record as HEAP_INSERT_NO_LOGICAL).
     930              :      */
     931      1124721 :     if (!(xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE))
     932         4712 :         return;
     933              : 
     934              :     /* only interested in our database */
     935      1120168 :     XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
     936      1120168 :     if (target_locator.dbOid != ctx->slot->data.database)
     937            0 :         return;
     938              : 
     939              :     /* output plugin doesn't look for this origin, no need to queue */
     940      1120168 :     if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
     941          159 :         return;
     942              : 
     943      1120009 :     change = ReorderBufferAllocChange(ctx->reorder);
     944      1120009 :     if (!(xlrec->flags & XLH_INSERT_IS_SPECULATIVE))
     945      1102044 :         change->action = REORDER_BUFFER_CHANGE_INSERT;
     946              :     else
     947        17965 :         change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT;
     948      1120009 :     change->origin_id = XLogRecGetOrigin(r);
     949              : 
     950      1120009 :     memcpy(&change->data.tp.rlocator, &target_locator, sizeof(RelFileLocator));
     951              : 
     952      1120009 :     tupledata = XLogRecGetBlockData(r, 0, &datalen);
     953      1120009 :     tuplelen = datalen - SizeOfHeapHeader;
     954              : 
     955      1120009 :     change->data.tp.newtuple =
     956      1120009 :         ReorderBufferAllocTupleBuf(ctx->reorder, tuplelen);
     957              : 
     958      1120009 :     DecodeXLogTuple(tupledata, datalen, change->data.tp.newtuple);
     959              : 
     960      1120009 :     change->data.tp.clear_toast_afterwards = true;
     961              : 
     962      1120009 :     ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
     963              :                              change,
     964      1120009 :                              xlrec->flags & XLH_INSERT_ON_TOAST_RELATION);
     965              : }
     966              : 
     967              : /*
     968              :  * Parse XLOG_HEAP_UPDATE and XLOG_HEAP_HOT_UPDATE, which have the same layout
     969              :  * in the record, from wal into proper tuplebufs.
     970              :  *
     971              :  * Updates can possibly contain a new tuple and the old primary key.
     972              :  */
     973              : static void
     974       181055 : DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     975              : {
     976       181055 :     XLogReaderState *r = buf->record;
     977              :     xl_heap_update *xlrec;
     978              :     ReorderBufferChange *change;
     979              :     char       *data;
     980              :     RelFileLocator target_locator;
     981              : 
     982       181055 :     xlrec = (xl_heap_update *) XLogRecGetData(r);
     983              : 
     984              :     /* only interested in our database */
     985       181055 :     XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
     986       181055 :     if (target_locator.dbOid != ctx->slot->data.database)
     987          286 :         return;
     988              : 
     989              :     /* output plugin doesn't look for this origin, no need to queue */
     990       180794 :     if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
     991           25 :         return;
     992              : 
     993       180769 :     change = ReorderBufferAllocChange(ctx->reorder);
     994       180769 :     change->action = REORDER_BUFFER_CHANGE_UPDATE;
     995       180769 :     change->origin_id = XLogRecGetOrigin(r);
     996       180769 :     memcpy(&change->data.tp.rlocator, &target_locator, sizeof(RelFileLocator));
     997              : 
     998       180769 :     if (xlrec->flags & XLH_UPDATE_CONTAINS_NEW_TUPLE)
     999              :     {
    1000              :         Size        datalen;
    1001              :         Size        tuplelen;
    1002              : 
    1003       178966 :         data = XLogRecGetBlockData(r, 0, &datalen);
    1004              : 
    1005       178966 :         tuplelen = datalen - SizeOfHeapHeader;
    1006              : 
    1007       178966 :         change->data.tp.newtuple =
    1008       178966 :             ReorderBufferAllocTupleBuf(ctx->reorder, tuplelen);
    1009              : 
    1010       178966 :         DecodeXLogTuple(data, datalen, change->data.tp.newtuple);
    1011              :     }
    1012              : 
    1013       180769 :     if (xlrec->flags & XLH_UPDATE_CONTAINS_OLD)
    1014              :     {
    1015              :         Size        datalen;
    1016              :         Size        tuplelen;
    1017              : 
    1018              :         /* caution, remaining data in record is not aligned */
    1019          362 :         data = XLogRecGetData(r) + SizeOfHeapUpdate;
    1020          362 :         datalen = XLogRecGetDataLen(r) - SizeOfHeapUpdate;
    1021          362 :         tuplelen = datalen - SizeOfHeapHeader;
    1022              : 
    1023          362 :         change->data.tp.oldtuple =
    1024          362 :             ReorderBufferAllocTupleBuf(ctx->reorder, tuplelen);
    1025              : 
    1026          362 :         DecodeXLogTuple(data, datalen, change->data.tp.oldtuple);
    1027              :     }
    1028              : 
    1029       180769 :     change->data.tp.clear_toast_afterwards = true;
    1030              : 
    1031       180769 :     ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
    1032              :                              change, false);
    1033              : }
    1034              : 
    1035              : /*
    1036              :  * Parse XLOG_HEAP_DELETE from wal into proper tuplebufs.
    1037              :  *
    1038              :  * Deletes can possibly contain the old primary key.
    1039              :  */
    1040              : static void
    1041       237991 : DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
    1042              : {
    1043       237991 :     XLogReaderState *r = buf->record;
    1044              :     xl_heap_delete *xlrec;
    1045              :     ReorderBufferChange *change;
    1046              :     RelFileLocator target_locator;
    1047              : 
    1048       237991 :     xlrec = (xl_heap_delete *) XLogRecGetData(r);
    1049              : 
    1050              :     /*
    1051              :      * Skip changes that were marked as ignorable at origin.
    1052              :      *
    1053              :      * (This is used for changes that affect relations not visible to other
    1054              :      * transactions, such as the transient table during concurrent repack.)
    1055              :      */
    1056       237991 :     if (xlrec->flags & XLH_DELETE_NO_LOGICAL)
    1057           50 :         return;
    1058              : 
    1059              :     /* only interested in our database */
    1060       237991 :     XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
    1061       237991 :     if (target_locator.dbOid != ctx->slot->data.database)
    1062           32 :         return;
    1063              : 
    1064              :     /* output plugin doesn't look for this origin, no need to queue */
    1065       237959 :     if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
    1066           18 :         return;
    1067              : 
    1068       237941 :     change = ReorderBufferAllocChange(ctx->reorder);
    1069              : 
    1070       237941 :     if (xlrec->flags & XLH_DELETE_IS_SUPER)
    1071            0 :         change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT;
    1072              :     else
    1073       237941 :         change->action = REORDER_BUFFER_CHANGE_DELETE;
    1074              : 
    1075       237941 :     change->origin_id = XLogRecGetOrigin(r);
    1076              : 
    1077       237941 :     memcpy(&change->data.tp.rlocator, &target_locator, sizeof(RelFileLocator));
    1078              : 
    1079              :     /* old primary key stored */
    1080       237941 :     if (xlrec->flags & XLH_DELETE_CONTAINS_OLD)
    1081              :     {
    1082       175895 :         Size        datalen = XLogRecGetDataLen(r) - SizeOfHeapDelete;
    1083       175895 :         Size        tuplelen = datalen - SizeOfHeapHeader;
    1084              : 
    1085              :         Assert(XLogRecGetDataLen(r) > (SizeOfHeapDelete + SizeOfHeapHeader));
    1086              : 
    1087       175895 :         change->data.tp.oldtuple =
    1088       175895 :             ReorderBufferAllocTupleBuf(ctx->reorder, tuplelen);
    1089              : 
    1090       175895 :         DecodeXLogTuple((char *) xlrec + SizeOfHeapDelete,
    1091              :                         datalen, change->data.tp.oldtuple);
    1092              :     }
    1093              : 
    1094       237941 :     change->data.tp.clear_toast_afterwards = true;
    1095              : 
    1096       237941 :     ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
    1097              :                              change, false);
    1098              : }
    1099              : 
    1100              : /*
    1101              :  * Parse XLOG_HEAP_TRUNCATE from wal
    1102              :  */
    1103              : static void
    1104           62 : DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
    1105              : {
    1106           62 :     XLogReaderState *r = buf->record;
    1107              :     xl_heap_truncate *xlrec;
    1108              :     ReorderBufferChange *change;
    1109              : 
    1110           62 :     xlrec = (xl_heap_truncate *) XLogRecGetData(r);
    1111              : 
    1112              :     /* only interested in our database */
    1113           62 :     if (xlrec->dbId != ctx->slot->data.database)
    1114            0 :         return;
    1115              : 
    1116              :     /* output plugin doesn't look for this origin, no need to queue */
    1117           62 :     if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
    1118            1 :         return;
    1119              : 
    1120           61 :     change = ReorderBufferAllocChange(ctx->reorder);
    1121           61 :     change->action = REORDER_BUFFER_CHANGE_TRUNCATE;
    1122           61 :     change->origin_id = XLogRecGetOrigin(r);
    1123           61 :     if (xlrec->flags & XLH_TRUNCATE_CASCADE)
    1124            1 :         change->data.truncate.cascade = true;
    1125           61 :     if (xlrec->flags & XLH_TRUNCATE_RESTART_SEQS)
    1126            2 :         change->data.truncate.restart_seqs = true;
    1127           61 :     change->data.truncate.nrelids = xlrec->nrelids;
    1128          122 :     change->data.truncate.relids = ReorderBufferAllocRelids(ctx->reorder,
    1129           61 :                                                             xlrec->nrelids);
    1130           61 :     memcpy(change->data.truncate.relids, xlrec->relids,
    1131           61 :            xlrec->nrelids * sizeof(Oid));
    1132           61 :     ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r),
    1133              :                              buf->origptr, change, false);
    1134              : }
    1135              : 
    1136              : /*
    1137              :  * Decode XLOG_HEAP2_MULTI_INSERT record into multiple tuplebufs.
    1138              :  *
    1139              :  * Currently MULTI_INSERT will always contain the full tuples.
    1140              :  */
    1141              : static void
    1142         6781 : DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
    1143              : {
    1144         6781 :     XLogReaderState *r = buf->record;
    1145              :     xl_heap_multi_insert *xlrec;
    1146              :     int         i;
    1147              :     char       *data;
    1148              :     char       *tupledata;
    1149              :     Size        tuplelen;
    1150              :     RelFileLocator rlocator;
    1151              : 
    1152         6781 :     xlrec = (xl_heap_multi_insert *) XLogRecGetData(r);
    1153              : 
    1154              :     /*
    1155              :      * Ignore insert records without new tuples.  This happens when a
    1156              :      * multi_insert is done on a catalog or on a non-persistent relation.
    1157              :      */
    1158         6781 :     if (!(xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE))
    1159         6766 :         return;
    1160              : 
    1161              :     /* only interested in our database */
    1162           53 :     XLogRecGetBlockTag(r, 0, &rlocator, NULL, NULL);
    1163           53 :     if (rlocator.dbOid != ctx->slot->data.database)
    1164           38 :         return;
    1165              : 
    1166              :     /* output plugin doesn't look for this origin, no need to queue */
    1167           15 :     if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
    1168            0 :         return;
    1169              : 
    1170              :     /*
    1171              :      * We know that this multi_insert isn't for a catalog, so the block should
    1172              :      * always have data even if a full-page write of it is taken.
    1173              :      */
    1174           15 :     tupledata = XLogRecGetBlockData(r, 0, &tuplelen);
    1175              :     Assert(tupledata != NULL);
    1176              : 
    1177           15 :     data = tupledata;
    1178         1064 :     for (i = 0; i < xlrec->ntuples; i++)
    1179              :     {
    1180              :         ReorderBufferChange *change;
    1181              :         xl_multi_insert_tuple *xlhdr;
    1182              :         int         datalen;
    1183              :         HeapTuple   tuple;
    1184              :         HeapTupleHeader header;
    1185              : 
    1186         1049 :         change = ReorderBufferAllocChange(ctx->reorder);
    1187         1049 :         change->action = REORDER_BUFFER_CHANGE_INSERT;
    1188         1049 :         change->origin_id = XLogRecGetOrigin(r);
    1189              : 
    1190         1049 :         memcpy(&change->data.tp.rlocator, &rlocator, sizeof(RelFileLocator));
    1191              : 
    1192         1049 :         xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(data);
    1193         1049 :         data = ((char *) xlhdr) + SizeOfMultiInsertTuple;
    1194         1049 :         datalen = xlhdr->datalen;
    1195              : 
    1196         1049 :         change->data.tp.newtuple =
    1197         1049 :             ReorderBufferAllocTupleBuf(ctx->reorder, datalen);
    1198              : 
    1199         1049 :         tuple = change->data.tp.newtuple;
    1200         1049 :         header = tuple->t_data;
    1201              : 
    1202              :         /* not a disk based tuple */
    1203         1049 :         ItemPointerSetInvalid(&tuple->t_self);
    1204              : 
    1205              :         /*
    1206              :          * We can only figure this out after reassembling the transactions.
    1207              :          */
    1208         1049 :         tuple->t_tableOid = InvalidOid;
    1209              : 
    1210         1049 :         tuple->t_len = datalen + SizeofHeapTupleHeader;
    1211              : 
    1212         1049 :         memset(header, 0, SizeofHeapTupleHeader);
    1213              : 
    1214         1049 :         memcpy((char *) tuple->t_data + SizeofHeapTupleHeader, data, datalen);
    1215         1049 :         header->t_infomask = xlhdr->t_infomask;
    1216         1049 :         header->t_infomask2 = xlhdr->t_infomask2;
    1217         1049 :         header->t_hoff = xlhdr->t_hoff;
    1218              : 
    1219              :         /*
    1220              :          * Reset toast reassembly state only after the last row in the last
    1221              :          * xl_multi_insert_tuple record emitted by one heap_multi_insert()
    1222              :          * call.
    1223              :          */
    1224         1049 :         if (xlrec->flags & XLH_INSERT_LAST_IN_MULTI &&
    1225          189 :             (i + 1) == xlrec->ntuples)
    1226           10 :             change->data.tp.clear_toast_afterwards = true;
    1227              :         else
    1228         1039 :             change->data.tp.clear_toast_afterwards = false;
    1229              : 
    1230         1049 :         ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r),
    1231              :                                  buf->origptr, change, false);
    1232              : 
    1233              :         /* move to the next xl_multi_insert_tuple entry */
    1234         1049 :         data += datalen;
    1235              :     }
    1236              :     Assert(data == tupledata + tuplelen);
    1237              : }
    1238              : 
    1239              : /*
    1240              :  * Parse XLOG_HEAP_CONFIRM from wal into a confirmation change.
    1241              :  *
    1242              :  * This is pretty trivial, all the state essentially already setup by the
    1243              :  * speculative insertion.
    1244              :  */
    1245              : static void
    1246        17965 : DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
    1247              : {
    1248        17965 :     XLogReaderState *r = buf->record;
    1249              :     ReorderBufferChange *change;
    1250              :     RelFileLocator target_locator;
    1251              : 
    1252              :     /* only interested in our database */
    1253        17965 :     XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
    1254        17965 :     if (target_locator.dbOid != ctx->slot->data.database)
    1255            0 :         return;
    1256              : 
    1257              :     /* output plugin doesn't look for this origin, no need to queue */
    1258        17965 :     if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
    1259            0 :         return;
    1260              : 
    1261        17965 :     change = ReorderBufferAllocChange(ctx->reorder);
    1262        17965 :     change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM;
    1263        17965 :     change->origin_id = XLogRecGetOrigin(r);
    1264              : 
    1265        17965 :     memcpy(&change->data.tp.rlocator, &target_locator, sizeof(RelFileLocator));
    1266              : 
    1267        17965 :     change->data.tp.clear_toast_afterwards = true;
    1268              : 
    1269        17965 :     ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
    1270              :                              change, false);
    1271              : }
    1272              : 
    1273              : 
    1274              : /*
    1275              :  * Read a HeapTuple as WAL logged by heap_insert, heap_update and heap_delete
    1276              :  * (but not by heap_multi_insert) into a tuplebuf.
    1277              :  *
    1278              :  * The size 'len' and the pointer 'data' in the record need to be
    1279              :  * computed outside as they are record specific.
    1280              :  */
    1281              : static void
    1282      1475232 : DecodeXLogTuple(char *data, Size len, HeapTuple tuple)
    1283              : {
    1284              :     xl_heap_header xlhdr;
    1285      1475232 :     int         datalen = len - SizeOfHeapHeader;
    1286              :     HeapTupleHeader header;
    1287              : 
    1288              :     Assert(datalen >= 0);
    1289              : 
    1290      1475232 :     tuple->t_len = datalen + SizeofHeapTupleHeader;
    1291      1475232 :     header = tuple->t_data;
    1292              : 
    1293              :     /* not a disk based tuple */
    1294      1475232 :     ItemPointerSetInvalid(&tuple->t_self);
    1295              : 
    1296              :     /* we can only figure this out after reassembling the transactions */
    1297      1475232 :     tuple->t_tableOid = InvalidOid;
    1298              : 
    1299              :     /* data is not stored aligned, copy to aligned storage */
    1300      1475232 :     memcpy(&xlhdr, data, SizeOfHeapHeader);
    1301              : 
    1302      1475232 :     memset(header, 0, SizeofHeapTupleHeader);
    1303              : 
    1304      1475232 :     memcpy(((char *) tuple->t_data) + SizeofHeapTupleHeader,
    1305      1475232 :            data + SizeOfHeapHeader,
    1306              :            datalen);
    1307              : 
    1308      1475232 :     header->t_infomask = xlhdr.t_infomask;
    1309      1475232 :     header->t_infomask2 = xlhdr.t_infomask2;
    1310      1475232 :     header->t_hoff = xlhdr.t_hoff;
    1311      1475232 : }
    1312              : 
    1313              : /*
    1314              :  * Check whether we are interested in this specific transaction.
    1315              :  *
    1316              :  * There can be several reasons we might not be interested in this
    1317              :  * transaction:
    1318              :  * 1) We might not be interested in decoding transactions up to this
    1319              :  *    LSN. This can happen because we previously decoded it and now just
    1320              :  *    are restarting or if we haven't assembled a consistent snapshot yet.
    1321              :  * 2) The transaction happened in another database.
    1322              :  * 3) The output plugin is not interested in the origin.
    1323              :  * 4) We are doing fast-forwarding
    1324              :  */
    1325              : static bool
    1326         4406 : DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
    1327              :                   Oid txn_dbid, ReplOriginId origin_id)
    1328              : {
    1329         4406 :     if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
    1330         4337 :         (txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) ||
    1331         2195 :         FilterByOrigin(ctx, origin_id))
    1332         2249 :         return true;
    1333              : 
    1334              :     /*
    1335              :      * We also skip decoding in fast_forward mode. In passing set the
    1336              :      * processing_required flag to indicate that if it were not for
    1337              :      * fast_forward mode, processing would have been required.
    1338              :      */
    1339         2157 :     if (ctx->fast_forward)
    1340              :     {
    1341           39 :         ctx->processing_required = true;
    1342           39 :         return true;
    1343              :     }
    1344              : 
    1345         2118 :     return false;
    1346              : }
        

Generated by: LCOV version 2.0-1