LCOV - code coverage report
Current view: top level - src/backend/replication/logical - decode.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19beta1 Lines: 93.0 % 456 424
Test Date: 2026-06-06 15:16:18 Functions: 95.2 % 21 20
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /* -------------------------------------------------------------------------
       2              :  *
       3              :  * decode.c
       4              :  *      This module decodes WAL records read using xlogreader.h's APIs for the
       5              :  *      purpose of logical decoding by passing information to the
       6              :  *      reorderbuffer module (containing the actual changes) and to the
       7              :  *      snapbuild module to build a fitting catalog snapshot (to be able to
       8              :  *      properly decode the changes in the reorderbuffer).
       9              :  *
      10              :  * NOTE:
      11              :  *      This basically tries to handle all low level xlog stuff for
      12              :  *      reorderbuffer.c and snapbuild.c. There's some minor leakage where a
      13              :  *      specific record's struct is used to pass data along, but those just
      14              :  *      happen to contain the right amount of data in a convenient
      15              :  *      format. There isn't and shouldn't be much intelligence about the
      16              :  *      contents of records in here except turning them into a more usable
      17              :  *      format.
      18              :  *
      19              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
      20              :  * Portions Copyright (c) 1994, Regents of the University of California
      21              :  *
      22              :  * IDENTIFICATION
      23              :  *    src/backend/replication/logical/decode.c
      24              :  *
      25              :  * -------------------------------------------------------------------------
      26              :  */
      27              : #include "postgres.h"
      28              : 
      29              : #include "access/heapam_xlog.h"
      30              : #include "access/transam.h"
      31              : #include "access/xact.h"
      32              : #include "access/xlog_internal.h"
      33              : #include "access/xlogreader.h"
      34              : #include "access/xlogrecord.h"
      35              : #include "catalog/pg_control.h"
      36              : #include "commands/repack.h"
      37              : #include "replication/decode.h"
      38              : #include "replication/logical.h"
      39              : #include "replication/message.h"
      40              : #include "replication/reorderbuffer.h"
      41              : #include "replication/snapbuild.h"
      42              : #include "storage/standbydefs.h"
      43              : 
      44              : /* individual record(group)'s handlers */
      45              : static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      46              : static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      47              : static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      48              : static void DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      49              : static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      50              : static void DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      51              : 
      52              : static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
      53              :                          xl_xact_parsed_commit *parsed, TransactionId xid,
      54              :                          bool two_phase);
      55              : static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
      56              :                         xl_xact_parsed_abort *parsed, TransactionId xid,
      57              :                         bool two_phase);
      58              : static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
      59              :                           xl_xact_parsed_prepare *parsed);
      60              : 
      61              : 
      62              : /* common function to decode tuples */
      63              : static void DecodeXLogTuple(char *data, Size len, HeapTuple tuple);
      64              : 
      65              : /* helper functions for decoding transactions */
      66              : static inline bool FilterPrepare(LogicalDecodingContext *ctx,
      67              :                                  TransactionId xid, const char *gid);
      68              : static bool DecodeTXNNeedSkip(LogicalDecodingContext *ctx,
      69              :                               XLogRecordBuffer *buf, Oid txn_dbid,
      70              :                               ReplOriginId origin_id);
      71              : 
      72              : /*
      73              :  * Take every XLogReadRecord()ed record and perform the actions required to
      74              :  * decode it using the output plugin already setup in the logical decoding
      75              :  * context.
      76              :  *
      77              :  * NB: Note that every record's xid needs to be processed by reorderbuffer
      78              :  * (xids contained in the content of records are not relevant for this rule).
      79              :  * That means that for records which'd otherwise not go through the
      80              :  * reorderbuffer ReorderBufferProcessXid() has to be called. We don't want to
      81              :  * call ReorderBufferProcessXid for each record type by default, because
      82              :  * e.g. empty xacts can be handled more efficiently if there's no previous
      83              :  * state for them.
      84              :  *
      85              :  * We also support the ability to fast forward thru records, skipping some
      86              :  * record types completely - see individual record types for details.
      87              :  */
      88              : void
      89      2593155 : LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
      90              : {
      91              :     XLogRecordBuffer buf;
      92              :     TransactionId txid;
      93              :     RmgrData    rmgr;
      94              : 
      95      2593155 :     buf.origptr = ctx->reader->ReadRecPtr;
      96      2593155 :     buf.endptr = ctx->reader->EndRecPtr;
      97      2593155 :     buf.record = record;
      98              : 
      99      2593155 :     txid = XLogRecGetTopXid(record);
     100              : 
     101              :     /*
     102              :      * If the top-level xid is valid, we need to assign the subxact to the
     103              :      * top-level xact. We need to do this for all records, hence we do it
     104              :      * before the switch.
     105              :      */
     106      2593155 :     if (TransactionIdIsValid(txid))
     107              :     {
     108          687 :         ReorderBufferAssignChild(ctx->reorder,
     109              :                                  txid,
     110          687 :                                  XLogRecGetXid(record),
     111              :                                  buf.origptr);
     112              :     }
     113              : 
     114      2593155 :     rmgr = GetRmgr(XLogRecGetRmid(record));
     115              : 
     116      2593155 :     if (rmgr.rm_decode != NULL)
     117      1992592 :         rmgr.rm_decode(ctx, &buf);
     118              :     else
     119              :     {
     120              :         /* just deal with xid, and done */
     121       600563 :         ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(record),
     122              :                                 buf.origptr);
     123              :     }
     124      2593140 : }
     125              : 
     126              : /*
     127              :  * Handle rmgr XLOG_ID records for LogicalDecodingProcessRecord().
     128              :  */
     129              : void
     130         6430 : xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     131              : {
     132         6430 :     SnapBuild  *builder = ctx->snapshot_builder;
     133         6430 :     uint8       info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK;
     134              : 
     135         6430 :     ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(buf->record),
     136              :                             buf->origptr);
     137              : 
     138         6430 :     switch (info)
     139              :     {
     140              :             /* this is also used in END_OF_RECOVERY checkpoints */
     141           82 :         case XLOG_CHECKPOINT_SHUTDOWN:
     142              :         case XLOG_END_OF_RECOVERY:
     143           82 :             SnapBuildSerializationPoint(builder, buf->origptr);
     144              : 
     145           82 :             break;
     146           81 :         case XLOG_CHECKPOINT_ONLINE:
     147              : 
     148              :             /*
     149              :              * a RUNNING_XACTS record will have been logged near to this, we
     150              :              * can restart from there.
     151              :              */
     152           81 :             break;
     153            0 :         case XLOG_LOGICAL_DECODING_STATUS_CHANGE:
     154              :             {
     155              :                 bool        logical_decoding;
     156              : 
     157            0 :                 memcpy(&logical_decoding, XLogRecGetData(buf->record), sizeof(bool));
     158              : 
     159              :                 /*
     160              :                  * Error out as we should not decode this WAL record.
     161              :                  *
     162              :                  * Logical decoding is disabled, and existing logical slots on
     163              :                  * the standby are invalidated when this WAL record is
     164              :                  * replayed. No logical decoder can process this WAL record
     165              :                  * until replay completes, and by then the slots are already
     166              :                  * invalidated. Furthermore, no new logical slots can be
     167              :                  * created while logical decoding is disabled. This cannot
     168              :                  * occur even on primary either, since it will not restart
     169              :                  * with wal_level < replica if any logical slots exist.
     170              :                  */
     171            0 :                 elog(ERROR, "unexpected logical decoding status change %d",
     172              :                      logical_decoding);
     173              : 
     174              :                 break;
     175              :             }
     176         6267 :         case XLOG_NOOP:
     177              :         case XLOG_NEXTOID:
     178              :         case XLOG_SWITCH:
     179              :         case XLOG_BACKUP_END:
     180              :         case XLOG_PARAMETER_CHANGE:
     181              :         case XLOG_RESTORE_POINT:
     182              :         case XLOG_FPW_CHANGE:
     183              :         case XLOG_FPI_FOR_HINT:
     184              :         case XLOG_FPI:
     185              :         case XLOG_OVERWRITE_CONTRECORD:
     186              :         case XLOG_CHECKPOINT_REDO:
     187         6267 :             break;
     188            0 :         default:
     189            0 :             elog(ERROR, "unexpected RM_XLOG_ID record type: %u", info);
     190              :     }
     191         6430 : }
     192              : 
     193              : void
     194            0 : xlog2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     195              : {
     196            0 :     uint8       info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK;
     197              : 
     198            0 :     ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(buf->record), buf->origptr);
     199              : 
     200            0 :     switch (info)
     201              :     {
     202            0 :         case XLOG2_CHECKSUMS:
     203            0 :             break;
     204            0 :         default:
     205            0 :             elog(ERROR, "unexpected RM_XLOG2_ID record type: %u", info);
     206              :     }
     207            0 : }
     208              : 
     209              : /*
     210              :  * Handle rmgr XACT_ID records for LogicalDecodingProcessRecord().
     211              :  */
     212              : void
     213        10181 : xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     214              : {
     215        10181 :     SnapBuild  *builder = ctx->snapshot_builder;
     216        10181 :     ReorderBuffer *reorder = ctx->reorder;
     217        10181 :     XLogReaderState *r = buf->record;
     218        10181 :     uint8       info = XLogRecGetInfo(r) & XLOG_XACT_OPMASK;
     219              : 
     220              :     /*
     221              :      * If the snapshot isn't yet fully built, we cannot decode anything, so
     222              :      * bail out.
     223              :      */
     224        10181 :     if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
     225           14 :         return;
     226              : 
     227        10167 :     switch (info)
     228              :     {
     229         3737 :         case XLOG_XACT_COMMIT:
     230              :         case XLOG_XACT_COMMIT_PREPARED:
     231              :             {
     232              :                 xl_xact_commit *xlrec;
     233              :                 xl_xact_parsed_commit parsed;
     234              :                 TransactionId xid;
     235         3737 :                 bool        two_phase = false;
     236              : 
     237         3737 :                 xlrec = (xl_xact_commit *) XLogRecGetData(r);
     238         3737 :                 ParseCommitRecord(XLogRecGetInfo(buf->record), xlrec, &parsed);
     239              : 
     240         3737 :                 if (!TransactionIdIsValid(parsed.twophase_xid))
     241         3610 :                     xid = XLogRecGetXid(r);
     242              :                 else
     243          127 :                     xid = parsed.twophase_xid;
     244              : 
     245              :                 /*
     246              :                  * We would like to process the transaction in a two-phase
     247              :                  * manner iff output plugin supports two-phase commits and
     248              :                  * doesn't filter the transaction at prepare time.
     249              :                  */
     250         3737 :                 if (info == XLOG_XACT_COMMIT_PREPARED)
     251          127 :                     two_phase = !(FilterPrepare(ctx, xid,
     252          127 :                                                 parsed.twophase_gid));
     253              : 
     254         3737 :                 DecodeCommit(ctx, buf, &parsed, xid, two_phase);
     255         3724 :                 break;
     256              :             }
     257          218 :         case XLOG_XACT_ABORT:
     258              :         case XLOG_XACT_ABORT_PREPARED:
     259              :             {
     260              :                 xl_xact_abort *xlrec;
     261              :                 xl_xact_parsed_abort parsed;
     262              :                 TransactionId xid;
     263          218 :                 bool        two_phase = false;
     264              : 
     265          218 :                 xlrec = (xl_xact_abort *) XLogRecGetData(r);
     266          218 :                 ParseAbortRecord(XLogRecGetInfo(buf->record), xlrec, &parsed);
     267              : 
     268          218 :                 if (!TransactionIdIsValid(parsed.twophase_xid))
     269          168 :                     xid = XLogRecGetXid(r);
     270              :                 else
     271           50 :                     xid = parsed.twophase_xid;
     272              : 
     273              :                 /*
     274              :                  * We would like to process the transaction in a two-phase
     275              :                  * manner iff output plugin supports two-phase commits and
     276              :                  * doesn't filter the transaction at prepare time.
     277              :                  */
     278          218 :                 if (info == XLOG_XACT_ABORT_PREPARED)
     279           50 :                     two_phase = !(FilterPrepare(ctx, xid,
     280           50 :                                                 parsed.twophase_gid));
     281              : 
     282          218 :                 DecodeAbort(ctx, buf, &parsed, xid, two_phase);
     283          218 :                 break;
     284              :             }
     285          133 :         case XLOG_XACT_ASSIGNMENT:
     286              : 
     287              :             /*
     288              :              * We assign subxact to the toplevel xact while processing each
     289              :              * record if required.  So, we don't need to do anything here. See
     290              :              * LogicalDecodingProcessRecord.
     291              :              */
     292          133 :             break;
     293         5879 :         case XLOG_XACT_INVALIDATIONS:
     294              :             {
     295              :                 TransactionId xid;
     296              :                 xl_xact_invals *invals;
     297              : 
     298         5879 :                 xid = XLogRecGetXid(r);
     299         5879 :                 invals = (xl_xact_invals *) XLogRecGetData(r);
     300              : 
     301              :                 /*
     302              :                  * Execute the invalidations for xid-less transactions,
     303              :                  * otherwise, accumulate them so that they can be processed at
     304              :                  * the commit time.
     305              :                  */
     306         5879 :                 if (TransactionIdIsValid(xid))
     307              :                 {
     308         5856 :                     if (!ctx->fast_forward)
     309         5781 :                         ReorderBufferAddInvalidations(reorder, xid,
     310              :                                                       buf->origptr,
     311         5781 :                                                       invals->nmsgs,
     312         5781 :                                                       invals->msgs);
     313         5856 :                     ReorderBufferXidSetCatalogChanges(ctx->reorder, xid,
     314              :                                                       buf->origptr);
     315              :                 }
     316           23 :                 else if (!ctx->fast_forward)
     317           23 :                     ReorderBufferImmediateInvalidation(ctx->reorder,
     318           23 :                                                        invals->nmsgs,
     319           23 :                                                        invals->msgs);
     320              : 
     321         5879 :                 break;
     322              :             }
     323          200 :         case XLOG_XACT_PREPARE:
     324              :             {
     325              :                 xl_xact_parsed_prepare parsed;
     326              :                 xl_xact_prepare *xlrec;
     327              : 
     328              :                 /* ok, parse it */
     329          200 :                 xlrec = (xl_xact_prepare *) XLogRecGetData(r);
     330          200 :                 ParsePrepareRecord(XLogRecGetInfo(buf->record),
     331              :                                    xlrec, &parsed);
     332              : 
     333              :                 /*
     334              :                  * We would like to process the transaction in a two-phase
     335              :                  * manner iff output plugin supports two-phase commits and
     336              :                  * doesn't filter the transaction at prepare time.
     337              :                  */
     338          200 :                 if (FilterPrepare(ctx, parsed.twophase_xid,
     339              :                                   parsed.twophase_gid))
     340              :                 {
     341           20 :                     ReorderBufferProcessXid(reorder, parsed.twophase_xid,
     342              :                                             buf->origptr);
     343           20 :                     break;
     344              :                 }
     345              : 
     346              :                 /*
     347              :                  * Note that if the prepared transaction has locked [user]
     348              :                  * catalog tables exclusively then decoding prepare can block
     349              :                  * till the main transaction is committed because it needs to
     350              :                  * lock the catalog tables.
     351              :                  *
     352              :                  * XXX Now, this can even lead to a deadlock if the prepare
     353              :                  * transaction is waiting to get it logically replicated for
     354              :                  * distributed 2PC. This can be avoided by disallowing
     355              :                  * preparing transactions that have locked [user] catalog
     356              :                  * tables exclusively but as of now, we ask users not to do
     357              :                  * such an operation.
     358              :                  */
     359          180 :                 DecodePrepare(ctx, buf, &parsed);
     360          180 :                 break;
     361              :             }
     362            0 :         default:
     363            0 :             elog(ERROR, "unexpected RM_XACT_ID record type: %u", info);
     364              :     }
     365              : }
     366              : 
     367              : /*
     368              :  * Handle rmgr STANDBY_ID records for LogicalDecodingProcessRecord().
     369              :  */
     370              : void
     371         4324 : standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     372              : {
     373         4324 :     SnapBuild  *builder = ctx->snapshot_builder;
     374         4324 :     XLogReaderState *r = buf->record;
     375         4324 :     uint8       info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
     376              : 
     377         4324 :     ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
     378              : 
     379         4324 :     switch (info)
     380              :     {
     381         1673 :         case XLOG_RUNNING_XACTS:
     382              :             {
     383         1673 :                 xl_running_xacts *running = (xl_running_xacts *) XLogRecGetData(r);
     384              : 
     385              :                 /*
     386              :                  * Update this decoder's idea of transactions currently
     387              :                  * running.  In doing so we will determine whether we have
     388              :                  * reached consistent status.
     389              :                  */
     390         1673 :                 SnapBuildProcessRunningXacts(builder, buf->origptr, running);
     391              : 
     392              :                 /*
     393              :                  * Abort all transactions that we keep track of, that are
     394              :                  * older than the record's oldestRunningXid. This is the most
     395              :                  * convenient spot for doing so since, in contrast to shutdown
     396              :                  * or end-of-recovery checkpoints, we have information about
     397              :                  * all running transactions which includes prepared ones,
     398              :                  * while shutdown checkpoints just know that no non-prepared
     399              :                  * transactions are in progress.
     400              :                  */
     401         1671 :                 ReorderBufferAbortOld(ctx->reorder, running->oldestRunningXid);
     402              :             }
     403         1671 :             break;
     404         2628 :         case XLOG_STANDBY_LOCK:
     405         2628 :             break;
     406           23 :         case XLOG_INVALIDATIONS:
     407              : 
     408              :             /*
     409              :              * We are processing the invalidations at the command level via
     410              :              * XLOG_XACT_INVALIDATIONS.  So we don't need to do anything here.
     411              :              */
     412           23 :             break;
     413            0 :         default:
     414            0 :             elog(ERROR, "unexpected RM_STANDBY_ID record type: %u", info);
     415              :     }
     416         4322 : }
     417              : 
     418              : /*
     419              :  * Handle rmgr HEAP2_ID records for LogicalDecodingProcessRecord().
     420              :  */
     421              : void
     422        37277 : heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     423              : {
     424        37277 :     uint8       info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK;
     425        37277 :     TransactionId xid = XLogRecGetXid(buf->record);
     426        37277 :     SnapBuild  *builder = ctx->snapshot_builder;
     427              : 
     428        37277 :     ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
     429              : 
     430              :     /*
     431              :      * If we don't have snapshot or we are just fast-forwarding, there is no
     432              :      * point in decoding data changes. However, it's crucial to build the base
     433              :      * snapshot during fast-forward mode (as is done in
     434              :      * SnapBuildProcessChange()) because we require the snapshot's xmin when
     435              :      * determining the candidate catalog_xmin for the replication slot. See
     436              :      * SnapBuildProcessRunningXacts().
     437              :      */
     438        37277 :     if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
     439            9 :         return;
     440              : 
     441        37268 :     switch (info)
     442              :     {
     443         6985 :         case XLOG_HEAP2_MULTI_INSERT:
     444         6985 :             if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
     445         6985 :                 !ctx->fast_forward &&
     446         6882 :                 !change_useless_for_repack(buf))
     447         6808 :                 DecodeMultiInsert(ctx, buf);
     448         6985 :             break;
     449        27828 :         case XLOG_HEAP2_NEW_CID:
     450        27828 :             if (!ctx->fast_forward)
     451              :             {
     452              :                 xl_heap_new_cid *xlrec;
     453              : 
     454        27445 :                 xlrec = (xl_heap_new_cid *) XLogRecGetData(buf->record);
     455        27445 :                 SnapBuildProcessNewCid(builder, xid, buf->origptr, xlrec);
     456              :             }
     457        27828 :             break;
     458           90 :         case XLOG_HEAP2_REWRITE:
     459              : 
     460              :             /*
     461              :              * Although these records only exist to serve the needs of logical
     462              :              * decoding, all the work happens as part of crash or archive
     463              :              * recovery, so we don't need to do anything here.
     464              :              */
     465           90 :             break;
     466              : 
     467              :             /*
     468              :              * Everything else here is just low level physical stuff we're not
     469              :              * interested in.
     470              :              */
     471         2365 :         case XLOG_HEAP2_PRUNE_ON_ACCESS:
     472              :         case XLOG_HEAP2_PRUNE_VACUUM_SCAN:
     473              :         case XLOG_HEAP2_PRUNE_VACUUM_CLEANUP:
     474              :         case XLOG_HEAP2_LOCK_UPDATED:
     475         2365 :             break;
     476            0 :         default:
     477            0 :             elog(ERROR, "unexpected RM_HEAP2_ID record type: %u", info);
     478              :     }
     479              : }
     480              : 
     481              : /*
     482              :  * Handle rmgr HEAP_ID records for LogicalDecodingProcessRecord().
     483              :  */
     484              : void
     485      1934282 : heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     486              : {
     487      1934282 :     uint8       info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK;
     488      1934282 :     TransactionId xid = XLogRecGetXid(buf->record);
     489      1934282 :     SnapBuild  *builder = ctx->snapshot_builder;
     490              : 
     491      1934282 :     ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
     492              : 
     493              :     /*
     494              :      * If we don't have snapshot or we are just fast-forwarding, there is no
     495              :      * point in decoding data changes. However, it's crucial to build the base
     496              :      * snapshot during fast-forward mode (as is done in
     497              :      * SnapBuildProcessChange()) because we require the snapshot's xmin when
     498              :      * determining the candidate catalog_xmin for the replication slot. See
     499              :      * SnapBuildProcessRunningXacts().
     500              :      */
     501      1934282 :     if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
     502            4 :         return;
     503              : 
     504      1934278 :     switch (info)
     505              :     {
     506      1248758 :         case XLOG_HEAP_INSERT:
     507      1248758 :             if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
     508      1248758 :                 !ctx->fast_forward &&
     509      1248560 :                 !change_useless_for_repack(buf))
     510      1248341 :                 DecodeInsert(ctx, buf);
     511      1248758 :             break;
     512              : 
     513              :             /*
     514              :              * Treat HOT update as normal updates. There is no useful
     515              :              * information in the fact that we could make it a HOT update
     516              :              * locally and the WAL layout is compatible.
     517              :              */
     518       207657 :         case XLOG_HEAP_HOT_UPDATE:
     519              :         case XLOG_HEAP_UPDATE:
     520       207657 :             if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
     521       207657 :                 !ctx->fast_forward &&
     522       207644 :                 !change_useless_for_repack(buf))
     523       207609 :                 DecodeUpdate(ctx, buf);
     524       207657 :             break;
     525              : 
     526       268064 :         case XLOG_HEAP_DELETE:
     527       268064 :             if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
     528       268064 :                 !ctx->fast_forward &&
     529       267970 :                 !change_useless_for_repack(buf))
     530       267945 :                 DecodeDelete(ctx, buf);
     531       268064 :             break;
     532              : 
     533           60 :         case XLOG_HEAP_TRUNCATE:
     534           60 :             if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
     535           60 :                 !ctx->fast_forward &&
     536           58 :                 !change_useless_for_repack(buf))
     537           58 :                 DecodeTruncate(ctx, buf);
     538           60 :             break;
     539              : 
     540         1217 :         case XLOG_HEAP_INPLACE:
     541              : 
     542              :             /*
     543              :              * Inplace updates are only ever performed on catalog tuples and
     544              :              * can, per definition, not change tuple visibility.  Since we
     545              :              * also don't decode catalog tuples, we're not interested in the
     546              :              * record's contents.
     547              :              */
     548         1217 :             break;
     549              : 
     550        17964 :         case XLOG_HEAP_CONFIRM:
     551        17964 :             if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
     552        17964 :                 !ctx->fast_forward &&
     553        17964 :                 !change_useless_for_repack(buf))
     554        17964 :                 DecodeSpecConfirm(ctx, buf);
     555        17964 :             break;
     556              : 
     557       190558 :         case XLOG_HEAP_LOCK:
     558              :             /* we don't care about row level locks for now */
     559       190558 :             break;
     560              : 
     561            0 :         default:
     562            0 :             elog(ERROR, "unexpected RM_HEAP_ID record type: %u", info);
     563              :             break;
     564              :     }
     565              : }
     566              : 
     567              : /*
     568              :  * Ask output plugin whether we want to skip this PREPARE and send
     569              :  * this transaction as a regular commit later.
     570              :  */
     571              : static inline bool
     572          377 : FilterPrepare(LogicalDecodingContext *ctx, TransactionId xid,
     573              :               const char *gid)
     574              : {
     575              :     /*
     576              :      * Skip if decoding of two-phase transactions at PREPARE time is not
     577              :      * enabled. In that case, all two-phase transactions are considered
     578              :      * filtered out and will be applied as regular transactions at COMMIT
     579              :      * PREPARED.
     580              :      */
     581          377 :     if (!ctx->twophase)
     582           23 :         return true;
     583              : 
     584              :     /*
     585              :      * The filter_prepare callback is optional. When not supplied, all
     586              :      * prepared transactions should go through.
     587              :      */
     588          354 :     if (ctx->callbacks.filter_prepare_cb == NULL)
     589          206 :         return false;
     590              : 
     591          148 :     return filter_prepare_cb_wrapper(ctx, xid, gid);
     592              : }
     593              : 
     594              : static inline bool
     595      1739285 : FilterByOrigin(LogicalDecodingContext *ctx, ReplOriginId origin_id)
     596              : {
     597      1739285 :     if (ctx->callbacks.filter_by_origin_cb == NULL)
     598          132 :         return false;
     599              : 
     600      1739153 :     return filter_by_origin_cb_wrapper(ctx, origin_id);
     601              : }
     602              : 
     603              : /*
     604              :  * Handle rmgr LOGICALMSG_ID records for LogicalDecodingProcessRecord().
     605              :  */
     606              : void
     607           98 : logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     608              : {
     609           98 :     SnapBuild  *builder = ctx->snapshot_builder;
     610           98 :     XLogReaderState *r = buf->record;
     611           98 :     TransactionId xid = XLogRecGetXid(r);
     612           98 :     uint8       info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
     613           98 :     ReplOriginId origin_id = XLogRecGetOrigin(r);
     614           98 :     Snapshot    snapshot = NULL;
     615              :     xl_logical_message *message;
     616              : 
     617           98 :     if (info != XLOG_LOGICAL_MESSAGE)
     618            0 :         elog(ERROR, "unexpected RM_LOGICALMSG_ID record type: %u", info);
     619              : 
     620           98 :     ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
     621              : 
     622              :     /* If we don't have snapshot, there is no point in decoding messages */
     623           98 :     if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
     624            0 :         return;
     625              : 
     626           98 :     message = (xl_logical_message *) XLogRecGetData(r);
     627              : 
     628          194 :     if (message->dbId != ctx->slot->data.database ||
     629           96 :         FilterByOrigin(ctx, origin_id))
     630            4 :         return;
     631              : 
     632           94 :     if (message->transactional &&
     633           39 :         !SnapBuildProcessChange(builder, xid, buf->origptr))
     634            0 :         return;
     635          149 :     else if (!message->transactional &&
     636          110 :              (SnapBuildCurrentState(builder) != SNAPBUILD_CONSISTENT ||
     637           55 :               SnapBuildXactNeedsSkip(builder, buf->origptr)))
     638           44 :         return;
     639              : 
     640              :     /*
     641              :      * We also skip decoding in fast_forward mode. This check must be last
     642              :      * because we don't want to set the processing_required flag unless we
     643              :      * have a decodable message.
     644              :      */
     645           50 :     if (ctx->fast_forward)
     646              :     {
     647              :         /*
     648              :          * We need to set processing_required flag to notify the message's
     649              :          * existence to the caller. Usually, the flag is set when either the
     650              :          * COMMIT or ABORT records are decoded, but this must be turned on
     651              :          * here because the non-transactional logical message is decoded
     652              :          * without waiting for these records.
     653              :          */
     654            3 :         if (!message->transactional)
     655            3 :             ctx->processing_required = true;
     656              : 
     657            3 :         return;
     658              :     }
     659              : 
     660              :     /*
     661              :      * If this is a non-transactional change, get the snapshot we're expected
     662              :      * to use. We only get here when the snapshot is consistent, and the
     663              :      * change is not meant to be skipped.
     664              :      *
     665              :      * For transactional changes we don't need a snapshot, we'll use the
     666              :      * regular snapshot maintained by ReorderBuffer. We just leave it NULL.
     667              :      */
     668           47 :     if (!message->transactional)
     669            8 :         snapshot = SnapBuildGetOrBuildSnapshot(builder);
     670              : 
     671           47 :     ReorderBufferQueueMessage(ctx->reorder, xid, snapshot, buf->endptr,
     672           47 :                               message->transactional,
     673           47 :                               message->message, /* first part of message is
     674              :                                                  * prefix */
     675              :                               message->message_size,
     676           47 :                               message->message + message->prefix_size);
     677              : }
     678              : 
     679              : /*
     680              :  * Consolidated commit record handling between the different form of commit
     681              :  * records.
     682              :  *
     683              :  * 'two_phase' indicates that caller wants to process the transaction in two
     684              :  * phases, first process prepare if not already done and then process
     685              :  * commit_prepared.
     686              :  */
     687              : static void
     688         3737 : DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
     689              :              xl_xact_parsed_commit *parsed, TransactionId xid,
     690              :              bool two_phase)
     691              : {
     692         3737 :     XLogRecPtr  origin_lsn = InvalidXLogRecPtr;
     693         3737 :     TimestampTz commit_time = parsed->xact_time;
     694         3737 :     ReplOriginId origin_id = XLogRecGetOrigin(buf->record);
     695              :     int         i;
     696              : 
     697         3737 :     if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
     698              :     {
     699          100 :         origin_lsn = parsed->origin_lsn;
     700          100 :         commit_time = parsed->origin_timestamp;
     701              :     }
     702              : 
     703         3737 :     SnapBuildCommitTxn(ctx->snapshot_builder, buf->origptr, xid,
     704              :                        parsed->nsubxacts, parsed->subxacts,
     705              :                        parsed->xinfo);
     706              : 
     707              :     /* ----
     708              :      * Check whether we are interested in this specific transaction, and tell
     709              :      * the reorderbuffer to forget the content of the (sub-)transactions
     710              :      * if not.
     711              :      *
     712              :      * We can't just use ReorderBufferAbort() here, because we need to execute
     713              :      * the transaction's invalidations.  This currently won't be needed if
     714              :      * we're just skipping over the transaction because currently we only do
     715              :      * so during startup, to get to the first transaction the client needs. As
     716              :      * we have reset the catalog caches before starting to read WAL, and we
     717              :      * haven't yet touched any catalogs, there can't be anything to invalidate.
     718              :      * But if we're "forgetting" this commit because it happened in another
     719              :      * database, the invalidations might be important, because they could be
     720              :      * for shared catalogs and we might have loaded data into the relevant
     721              :      * syscaches.
     722              :      * ---
     723              :      */
     724         3737 :     if (DecodeTXNNeedSkip(ctx, buf, parsed->dbId, origin_id))
     725              :     {
     726         2937 :         for (i = 0; i < parsed->nsubxacts; i++)
     727              :         {
     728          983 :             ReorderBufferForget(ctx->reorder, parsed->subxacts[i], buf->origptr);
     729              :         }
     730         1954 :         ReorderBufferForget(ctx->reorder, xid, buf->origptr);
     731              : 
     732         1954 :         return;
     733              :     }
     734              : 
     735              :     /* tell the reorderbuffer about the surviving subtransactions */
     736         2052 :     for (i = 0; i < parsed->nsubxacts; i++)
     737              :     {
     738          269 :         ReorderBufferCommitChild(ctx->reorder, xid, parsed->subxacts[i],
     739              :                                  buf->origptr, buf->endptr);
     740              :     }
     741              : 
     742              :     /*
     743              :      * Send the final commit record if the transaction data is already
     744              :      * decoded, otherwise, process the entire transaction.
     745              :      */
     746         1783 :     if (two_phase)
     747              :     {
     748           36 :         ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
     749           36 :                                     SnapBuildGetTwoPhaseAt(ctx->snapshot_builder),
     750              :                                     commit_time, origin_id, origin_lsn,
     751           36 :                                     parsed->twophase_gid, true);
     752              :     }
     753              :     else
     754              :     {
     755         1747 :         ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
     756              :                             commit_time, origin_id, origin_lsn);
     757              :     }
     758              : 
     759              :     /*
     760              :      * Update the decoding stats at transaction prepare/commit/abort.
     761              :      * Additionally we send the stats when we spill or stream the changes to
     762              :      * avoid losing them in case the decoding is interrupted. It is not clear
     763              :      * that sending more or less frequently than this would be better.
     764              :      */
     765         1770 :     UpdateDecodingStats(ctx);
     766              : }
     767              : 
     768              : /*
     769              :  * Decode PREPARE record. Similar logic as in DecodeCommit.
     770              :  *
     771              :  * Note that we don't skip prepare even if have detected concurrent abort
     772              :  * because it is quite possible that we had already sent some changes before we
     773              :  * detect abort in which case we need to abort those changes in the subscriber.
     774              :  * To abort such changes, we do send the prepare and then the rollback prepared
     775              :  * which is what happened on the publisher-side as well. Now, we can invent a
     776              :  * new abort API wherein in such cases we send abort and skip sending prepared
     777              :  * and rollback prepared but then it is not that straightforward because we
     778              :  * might have streamed this transaction by that time in which case it is
     779              :  * handled when the rollback is encountered. It is not impossible to optimize
     780              :  * the concurrent abort case but it can introduce design complexity w.r.t
     781              :  * handling different cases so leaving it for now as it doesn't seem worth it.
     782              :  */
     783              : static void
     784          180 : DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
     785              :               xl_xact_parsed_prepare *parsed)
     786              : {
     787          180 :     SnapBuild  *builder = ctx->snapshot_builder;
     788          180 :     XLogRecPtr  origin_lsn = parsed->origin_lsn;
     789          180 :     TimestampTz prepare_time = parsed->xact_time;
     790          180 :     ReplOriginId origin_id = XLogRecGetOrigin(buf->record);
     791              :     int         i;
     792          180 :     TransactionId xid = parsed->twophase_xid;
     793              : 
     794          180 :     if (parsed->origin_timestamp != 0)
     795            5 :         prepare_time = parsed->origin_timestamp;
     796              : 
     797              :     /*
     798              :      * Remember the prepare info for a txn so that it can be used later in
     799              :      * commit prepared if required. See ReorderBufferFinishPrepared.
     800              :      */
     801          180 :     if (!ReorderBufferRememberPrepareInfo(ctx->reorder, xid, buf->origptr,
     802              :                                           buf->endptr, prepare_time, origin_id,
     803              :                                           origin_lsn))
     804            0 :         return;
     805              : 
     806              :     /* We can't start streaming unless a consistent state is reached. */
     807          180 :     if (SnapBuildCurrentState(builder) < SNAPBUILD_CONSISTENT)
     808              :     {
     809            3 :         ReorderBufferSkipPrepare(ctx->reorder, xid);
     810            3 :         return;
     811              :     }
     812              : 
     813              :     /*
     814              :      * Check whether we need to process this transaction. See
     815              :      * DecodeTXNNeedSkip for the reasons why we sometimes want to skip the
     816              :      * transaction.
     817              :      *
     818              :      * We can't call ReorderBufferForget as we did in DecodeCommit as the txn
     819              :      * hasn't yet been committed, removing this txn before a commit might
     820              :      * result in the computation of an incorrect restart_lsn. See
     821              :      * SnapBuildProcessRunningXacts. But we need to process cache
     822              :      * invalidations if there are any for the reasons mentioned in
     823              :      * DecodeCommit.
     824              :      */
     825          177 :     if (DecodeTXNNeedSkip(ctx, buf, parsed->dbId, origin_id))
     826              :     {
     827          131 :         ReorderBufferSkipPrepare(ctx->reorder, xid);
     828          131 :         ReorderBufferInvalidate(ctx->reorder, xid, buf->origptr);
     829          131 :         return;
     830              :     }
     831              : 
     832              :     /* Tell the reorderbuffer about the surviving subtransactions. */
     833           47 :     for (i = 0; i < parsed->nsubxacts; i++)
     834              :     {
     835            1 :         ReorderBufferCommitChild(ctx->reorder, xid, parsed->subxacts[i],
     836              :                                  buf->origptr, buf->endptr);
     837              :     }
     838              : 
     839              :     /* replay actions of all transaction + subtransactions in order */
     840           46 :     ReorderBufferPrepare(ctx->reorder, xid, parsed->twophase_gid);
     841              : 
     842              :     /*
     843              :      * Update the decoding stats at transaction prepare/commit/abort.
     844              :      * Additionally we send the stats when we spill or stream the changes to
     845              :      * avoid losing them in case the decoding is interrupted. It is not clear
     846              :      * that sending more or less frequently than this would be better.
     847              :      */
     848           46 :     UpdateDecodingStats(ctx);
     849              : }
     850              : 
     851              : 
     852              : /*
     853              :  * Get the data from the various forms of abort records and pass it on to
     854              :  * snapbuild.c and reorderbuffer.c.
     855              :  *
     856              :  * 'two_phase' indicates to finish prepared transaction.
     857              :  */
     858              : static void
     859          218 : DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
     860              :             xl_xact_parsed_abort *parsed, TransactionId xid,
     861              :             bool two_phase)
     862              : {
     863              :     int         i;
     864          218 :     XLogRecPtr  origin_lsn = InvalidXLogRecPtr;
     865          218 :     TimestampTz abort_time = parsed->xact_time;
     866          218 :     ReplOriginId origin_id = XLogRecGetOrigin(buf->record);
     867              :     bool        skip_xact;
     868              : 
     869          218 :     if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
     870              :     {
     871            3 :         origin_lsn = parsed->origin_lsn;
     872            3 :         abort_time = parsed->origin_timestamp;
     873              :     }
     874              : 
     875              :     /*
     876              :      * Check whether we need to process this transaction. See
     877              :      * DecodeTXNNeedSkip for the reasons why we sometimes want to skip the
     878              :      * transaction.
     879              :      */
     880          218 :     skip_xact = DecodeTXNNeedSkip(ctx, buf, parsed->dbId, origin_id);
     881              : 
     882              :     /*
     883              :      * Send the final rollback record for a prepared transaction unless we
     884              :      * need to skip it. For non-two-phase xacts, simply forget the xact.
     885              :      */
     886          218 :     if (two_phase && !skip_xact)
     887              :     {
     888           11 :         ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
     889              :                                     InvalidXLogRecPtr,
     890              :                                     abort_time, origin_id, origin_lsn,
     891           11 :                                     parsed->twophase_gid, false);
     892              :     }
     893              :     else
     894              :     {
     895          213 :         for (i = 0; i < parsed->nsubxacts; i++)
     896              :         {
     897            6 :             ReorderBufferAbort(ctx->reorder, parsed->subxacts[i],
     898            6 :                                buf->record->EndRecPtr, abort_time);
     899              :         }
     900              : 
     901          207 :         ReorderBufferAbort(ctx->reorder, xid, buf->record->EndRecPtr,
     902              :                            abort_time);
     903              :     }
     904              : 
     905              :     /* update the decoding stats */
     906          218 :     UpdateDecodingStats(ctx);
     907          218 : }
     908              : 
     909              : /*
     910              :  * Parse XLOG_HEAP_INSERT (not MULTI_INSERT!) records into tuplebufs.
     911              :  *
     912              :  * Inserts can contain the new tuple.
     913              :  */
     914              : static void
     915      1248341 : DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     916              : {
     917              :     Size        datalen;
     918              :     char       *tupledata;
     919              :     Size        tuplelen;
     920      1248341 :     XLogReaderState *r = buf->record;
     921              :     xl_heap_insert *xlrec;
     922              :     ReorderBufferChange *change;
     923              :     RelFileLocator target_locator;
     924              : 
     925      1248341 :     xlrec = (xl_heap_insert *) XLogRecGetData(r);
     926              : 
     927              :     /*
     928              :      * Ignore insert records without new tuples (this does happen when
     929              :      * raw_heap_insert marks the TOAST record as HEAP_INSERT_NO_LOGICAL).
     930              :      */
     931      1248341 :     if (!(xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE))
     932         4625 :         return;
     933              : 
     934              :     /* only interested in our database */
     935      1243835 :     XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
     936      1243835 :     if (target_locator.dbOid != ctx->slot->data.database)
     937            0 :         return;
     938              : 
     939              :     /* output plugin doesn't look for this origin, no need to queue */
     940      1243835 :     if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
     941          119 :         return;
     942              : 
     943      1243716 :     change = ReorderBufferAllocChange(ctx->reorder);
     944      1243716 :     if (!(xlrec->flags & XLH_INSERT_IS_SPECULATIVE))
     945      1225752 :         change->action = REORDER_BUFFER_CHANGE_INSERT;
     946              :     else
     947        17964 :         change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT;
     948      1243716 :     change->origin_id = XLogRecGetOrigin(r);
     949              : 
     950      1243716 :     memcpy(&change->data.tp.rlocator, &target_locator, sizeof(RelFileLocator));
     951              : 
     952      1243716 :     tupledata = XLogRecGetBlockData(r, 0, &datalen);
     953      1243716 :     tuplelen = datalen - SizeOfHeapHeader;
     954              : 
     955      1243716 :     change->data.tp.newtuple =
     956      1243716 :         ReorderBufferAllocTupleBuf(ctx->reorder, tuplelen);
     957              : 
     958      1243716 :     DecodeXLogTuple(tupledata, datalen, change->data.tp.newtuple);
     959              : 
     960      1243716 :     change->data.tp.clear_toast_afterwards = true;
     961              : 
     962      1243716 :     ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
     963              :                              change,
     964      1243716 :                              xlrec->flags & XLH_INSERT_ON_TOAST_RELATION);
     965              : }
     966              : 
     967              : /*
     968              :  * Parse XLOG_HEAP_UPDATE and XLOG_HEAP_HOT_UPDATE, which have the same layout
     969              :  * in the record, from wal into proper tuplebufs.
     970              :  *
     971              :  * Updates can possibly contain a new tuple and the old primary key.
     972              :  */
     973              : static void
     974       207609 : DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     975              : {
     976       207609 :     XLogReaderState *r = buf->record;
     977              :     xl_heap_update *xlrec;
     978              :     ReorderBufferChange *change;
     979              :     char       *data;
     980              :     RelFileLocator target_locator;
     981              : 
     982       207609 :     xlrec = (xl_heap_update *) XLogRecGetData(r);
     983              : 
     984              :     /* only interested in our database */
     985       207609 :     XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
     986       207609 :     if (target_locator.dbOid != ctx->slot->data.database)
     987          208 :         return;
     988              : 
     989              :     /* output plugin doesn't look for this origin, no need to queue */
     990       207426 :     if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
     991           25 :         return;
     992              : 
     993       207401 :     change = ReorderBufferAllocChange(ctx->reorder);
     994       207401 :     change->action = REORDER_BUFFER_CHANGE_UPDATE;
     995       207401 :     change->origin_id = XLogRecGetOrigin(r);
     996       207401 :     memcpy(&change->data.tp.rlocator, &target_locator, sizeof(RelFileLocator));
     997              : 
     998       207401 :     if (xlrec->flags & XLH_UPDATE_CONTAINS_NEW_TUPLE)
     999              :     {
    1000              :         Size        datalen;
    1001              :         Size        tuplelen;
    1002              : 
    1003       205585 :         data = XLogRecGetBlockData(r, 0, &datalen);
    1004              : 
    1005       205585 :         tuplelen = datalen - SizeOfHeapHeader;
    1006              : 
    1007       205585 :         change->data.tp.newtuple =
    1008       205585 :             ReorderBufferAllocTupleBuf(ctx->reorder, tuplelen);
    1009              : 
    1010       205585 :         DecodeXLogTuple(data, datalen, change->data.tp.newtuple);
    1011              :     }
    1012              : 
    1013       207401 :     if (xlrec->flags & XLH_UPDATE_CONTAINS_OLD)
    1014              :     {
    1015              :         Size        datalen;
    1016              :         Size        tuplelen;
    1017              : 
    1018              :         /* caution, remaining data in record is not aligned */
    1019          332 :         data = XLogRecGetData(r) + SizeOfHeapUpdate;
    1020          332 :         datalen = XLogRecGetDataLen(r) - SizeOfHeapUpdate;
    1021          332 :         tuplelen = datalen - SizeOfHeapHeader;
    1022              : 
    1023          332 :         change->data.tp.oldtuple =
    1024          332 :             ReorderBufferAllocTupleBuf(ctx->reorder, tuplelen);
    1025              : 
    1026          332 :         DecodeXLogTuple(data, datalen, change->data.tp.oldtuple);
    1027              :     }
    1028              : 
    1029       207401 :     change->data.tp.clear_toast_afterwards = true;
    1030              : 
    1031       207401 :     ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
    1032              :                              change, false);
    1033              : }
    1034              : 
    1035              : /*
    1036              :  * Parse XLOG_HEAP_DELETE from wal into proper tuplebufs.
    1037              :  *
    1038              :  * Deletes can possibly contain the old primary key.
    1039              :  */
    1040              : static void
    1041       267945 : DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
    1042              : {
    1043       267945 :     XLogReaderState *r = buf->record;
    1044              :     xl_heap_delete *xlrec;
    1045              :     ReorderBufferChange *change;
    1046              :     RelFileLocator target_locator;
    1047              : 
    1048       267945 :     xlrec = (xl_heap_delete *) XLogRecGetData(r);
    1049              : 
    1050              :     /*
    1051              :      * Skip changes that were marked as ignorable at origin.
    1052              :      *
    1053              :      * (This is used for changes that affect relations not visible to other
    1054              :      * transactions, such as the transient table during concurrent repack.)
    1055              :      */
    1056       267945 :     if (xlrec->flags & XLH_DELETE_NO_LOGICAL)
    1057           53 :         return;
    1058              : 
    1059              :     /* only interested in our database */
    1060       267945 :     XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
    1061       267945 :     if (target_locator.dbOid != ctx->slot->data.database)
    1062           35 :         return;
    1063              : 
    1064              :     /* output plugin doesn't look for this origin, no need to queue */
    1065       267910 :     if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
    1066           18 :         return;
    1067              : 
    1068       267892 :     change = ReorderBufferAllocChange(ctx->reorder);
    1069              : 
    1070       267892 :     if (xlrec->flags & XLH_DELETE_IS_SUPER)
    1071            0 :         change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT;
    1072              :     else
    1073       267892 :         change->action = REORDER_BUFFER_CHANGE_DELETE;
    1074              : 
    1075       267892 :     change->origin_id = XLogRecGetOrigin(r);
    1076              : 
    1077       267892 :     memcpy(&change->data.tp.rlocator, &target_locator, sizeof(RelFileLocator));
    1078              : 
    1079              :     /* old primary key stored */
    1080       267892 :     if (xlrec->flags & XLH_DELETE_CONTAINS_OLD)
    1081              :     {
    1082       205869 :         Size        datalen = XLogRecGetDataLen(r) - SizeOfHeapDelete;
    1083       205869 :         Size        tuplelen = datalen - SizeOfHeapHeader;
    1084              : 
    1085              :         Assert(XLogRecGetDataLen(r) > (SizeOfHeapDelete + SizeOfHeapHeader));
    1086              : 
    1087       205869 :         change->data.tp.oldtuple =
    1088       205869 :             ReorderBufferAllocTupleBuf(ctx->reorder, tuplelen);
    1089              : 
    1090       205869 :         DecodeXLogTuple((char *) xlrec + SizeOfHeapDelete,
    1091              :                         datalen, change->data.tp.oldtuple);
    1092              :     }
    1093              : 
    1094       267892 :     change->data.tp.clear_toast_afterwards = true;
    1095              : 
    1096       267892 :     ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
    1097              :                              change, false);
    1098              : }
    1099              : 
    1100              : /*
    1101              :  * Parse XLOG_HEAP_TRUNCATE from wal
    1102              :  */
    1103              : static void
    1104           58 : DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
    1105              : {
    1106           58 :     XLogReaderState *r = buf->record;
    1107              :     xl_heap_truncate *xlrec;
    1108              :     ReorderBufferChange *change;
    1109              : 
    1110           58 :     xlrec = (xl_heap_truncate *) XLogRecGetData(r);
    1111              : 
    1112              :     /* only interested in our database */
    1113           58 :     if (xlrec->dbId != ctx->slot->data.database)
    1114            0 :         return;
    1115              : 
    1116              :     /* output plugin doesn't look for this origin, no need to queue */
    1117           58 :     if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
    1118            1 :         return;
    1119              : 
    1120           57 :     change = ReorderBufferAllocChange(ctx->reorder);
    1121           57 :     change->action = REORDER_BUFFER_CHANGE_TRUNCATE;
    1122           57 :     change->origin_id = XLogRecGetOrigin(r);
    1123           57 :     if (xlrec->flags & XLH_TRUNCATE_CASCADE)
    1124            1 :         change->data.truncate.cascade = true;
    1125           57 :     if (xlrec->flags & XLH_TRUNCATE_RESTART_SEQS)
    1126            2 :         change->data.truncate.restart_seqs = true;
    1127           57 :     change->data.truncate.nrelids = xlrec->nrelids;
    1128          114 :     change->data.truncate.relids = ReorderBufferAllocRelids(ctx->reorder,
    1129           57 :                                                             xlrec->nrelids);
    1130           57 :     memcpy(change->data.truncate.relids, xlrec->relids,
    1131           57 :            xlrec->nrelids * sizeof(Oid));
    1132           57 :     ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r),
    1133              :                              buf->origptr, change, false);
    1134              : }
    1135              : 
    1136              : /*
    1137              :  * Decode XLOG_HEAP2_MULTI_INSERT record into multiple tuplebufs.
    1138              :  *
    1139              :  * Currently MULTI_INSERT will always contain the full tuples.
    1140              :  */
    1141              : static void
    1142         6808 : DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
    1143              : {
    1144         6808 :     XLogReaderState *r = buf->record;
    1145              :     xl_heap_multi_insert *xlrec;
    1146              :     int         i;
    1147              :     char       *data;
    1148              :     char       *tupledata;
    1149              :     Size        tuplelen;
    1150              :     RelFileLocator rlocator;
    1151              : 
    1152         6808 :     xlrec = (xl_heap_multi_insert *) XLogRecGetData(r);
    1153              : 
    1154              :     /*
    1155              :      * Ignore insert records without new tuples.  This happens when a
    1156              :      * multi_insert is done on a catalog or on a non-persistent relation.
    1157              :      */
    1158         6808 :     if (!(xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE))
    1159         6794 :         return;
    1160              : 
    1161              :     /* only interested in our database */
    1162           49 :     XLogRecGetBlockTag(r, 0, &rlocator, NULL, NULL);
    1163           49 :     if (rlocator.dbOid != ctx->slot->data.database)
    1164           35 :         return;
    1165              : 
    1166              :     /* output plugin doesn't look for this origin, no need to queue */
    1167           14 :     if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
    1168            0 :         return;
    1169              : 
    1170              :     /*
    1171              :      * We know that this multi_insert isn't for a catalog, so the block should
    1172              :      * always have data even if a full-page write of it is taken.
    1173              :      */
    1174           14 :     tupledata = XLogRecGetBlockData(r, 0, &tuplelen);
    1175              :     Assert(tupledata != NULL);
    1176              : 
    1177           14 :     data = tupledata;
    1178         1053 :     for (i = 0; i < xlrec->ntuples; i++)
    1179              :     {
    1180              :         ReorderBufferChange *change;
    1181              :         xl_multi_insert_tuple *xlhdr;
    1182              :         int         datalen;
    1183              :         HeapTuple   tuple;
    1184              :         HeapTupleHeader header;
    1185              : 
    1186         1039 :         change = ReorderBufferAllocChange(ctx->reorder);
    1187         1039 :         change->action = REORDER_BUFFER_CHANGE_INSERT;
    1188         1039 :         change->origin_id = XLogRecGetOrigin(r);
    1189              : 
    1190         1039 :         memcpy(&change->data.tp.rlocator, &rlocator, sizeof(RelFileLocator));
    1191              : 
    1192         1039 :         xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(data);
    1193         1039 :         data = ((char *) xlhdr) + SizeOfMultiInsertTuple;
    1194         1039 :         datalen = xlhdr->datalen;
    1195              : 
    1196         1039 :         change->data.tp.newtuple =
    1197         1039 :             ReorderBufferAllocTupleBuf(ctx->reorder, datalen);
    1198              : 
    1199         1039 :         tuple = change->data.tp.newtuple;
    1200         1039 :         header = tuple->t_data;
    1201              : 
    1202              :         /* not a disk based tuple */
    1203         1039 :         ItemPointerSetInvalid(&tuple->t_self);
    1204              : 
    1205              :         /*
    1206              :          * We can only figure this out after reassembling the transactions.
    1207              :          */
    1208         1039 :         tuple->t_tableOid = InvalidOid;
    1209              : 
    1210         1039 :         tuple->t_len = datalen + SizeofHeapTupleHeader;
    1211              : 
    1212         1039 :         memset(header, 0, SizeofHeapTupleHeader);
    1213              : 
    1214         1039 :         memcpy((char *) tuple->t_data + SizeofHeapTupleHeader, data, datalen);
    1215         1039 :         header->t_infomask = xlhdr->t_infomask;
    1216         1039 :         header->t_infomask2 = xlhdr->t_infomask2;
    1217         1039 :         header->t_hoff = xlhdr->t_hoff;
    1218              : 
    1219              :         /*
    1220              :          * Reset toast reassembly state only after the last row in the last
    1221              :          * xl_multi_insert_tuple record emitted by one heap_multi_insert()
    1222              :          * call.
    1223              :          */
    1224         1039 :         if (xlrec->flags & XLH_INSERT_LAST_IN_MULTI &&
    1225          179 :             (i + 1) == xlrec->ntuples)
    1226            9 :             change->data.tp.clear_toast_afterwards = true;
    1227              :         else
    1228         1030 :             change->data.tp.clear_toast_afterwards = false;
    1229              : 
    1230         1039 :         ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r),
    1231              :                                  buf->origptr, change, false);
    1232              : 
    1233              :         /* move to the next xl_multi_insert_tuple entry */
    1234         1039 :         data += datalen;
    1235              :     }
    1236              :     Assert(data == tupledata + tuplelen);
    1237              : }
    1238              : 
    1239              : /*
    1240              :  * Parse XLOG_HEAP_CONFIRM from wal into a confirmation change.
    1241              :  *
    1242              :  * This is pretty trivial, all the state essentially already setup by the
    1243              :  * speculative insertion.
    1244              :  */
    1245              : static void
    1246        17964 : DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
    1247              : {
    1248        17964 :     XLogReaderState *r = buf->record;
    1249              :     ReorderBufferChange *change;
    1250              :     RelFileLocator target_locator;
    1251              : 
    1252              :     /* only interested in our database */
    1253        17964 :     XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
    1254        17964 :     if (target_locator.dbOid != ctx->slot->data.database)
    1255            0 :         return;
    1256              : 
    1257              :     /* output plugin doesn't look for this origin, no need to queue */
    1258        17964 :     if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
    1259            0 :         return;
    1260              : 
    1261        17964 :     change = ReorderBufferAllocChange(ctx->reorder);
    1262        17964 :     change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM;
    1263        17964 :     change->origin_id = XLogRecGetOrigin(r);
    1264              : 
    1265        17964 :     memcpy(&change->data.tp.rlocator, &target_locator, sizeof(RelFileLocator));
    1266              : 
    1267        17964 :     change->data.tp.clear_toast_afterwards = true;
    1268              : 
    1269        17964 :     ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
    1270              :                              change, false);
    1271              : }
    1272              : 
    1273              : 
    1274              : /*
    1275              :  * Read a HeapTuple as WAL logged by heap_insert, heap_update and heap_delete
    1276              :  * (but not by heap_multi_insert) into a tuplebuf.
    1277              :  *
    1278              :  * The size 'len' and the pointer 'data' in the record need to be
    1279              :  * computed outside as they are record specific.
    1280              :  */
    1281              : static void
    1282      1655502 : DecodeXLogTuple(char *data, Size len, HeapTuple tuple)
    1283              : {
    1284              :     xl_heap_header xlhdr;
    1285      1655502 :     int         datalen = len - SizeOfHeapHeader;
    1286              :     HeapTupleHeader header;
    1287              : 
    1288              :     Assert(datalen >= 0);
    1289              : 
    1290      1655502 :     tuple->t_len = datalen + SizeofHeapTupleHeader;
    1291      1655502 :     header = tuple->t_data;
    1292              : 
    1293              :     /* not a disk based tuple */
    1294      1655502 :     ItemPointerSetInvalid(&tuple->t_self);
    1295              : 
    1296              :     /* we can only figure this out after reassembling the transactions */
    1297      1655502 :     tuple->t_tableOid = InvalidOid;
    1298              : 
    1299              :     /* data is not stored aligned, copy to aligned storage */
    1300      1655502 :     memcpy(&xlhdr, data, SizeOfHeapHeader);
    1301              : 
    1302      1655502 :     memset(header, 0, SizeofHeapTupleHeader);
    1303              : 
    1304      1655502 :     memcpy(((char *) tuple->t_data) + SizeofHeapTupleHeader,
    1305      1655502 :            data + SizeOfHeapHeader,
    1306              :            datalen);
    1307              : 
    1308      1655502 :     header->t_infomask = xlhdr.t_infomask;
    1309      1655502 :     header->t_infomask2 = xlhdr.t_infomask2;
    1310      1655502 :     header->t_hoff = xlhdr.t_hoff;
    1311      1655502 : }
    1312              : 
    1313              : /*
    1314              :  * Check whether we are interested in this specific transaction.
    1315              :  *
    1316              :  * There can be several reasons we might not be interested in this
    1317              :  * transaction:
    1318              :  * 1) We might not be interested in decoding transactions up to this
    1319              :  *    LSN. This can happen because we previously decoded it and now just
    1320              :  *    are restarting or if we haven't assembled a consistent snapshot yet.
    1321              :  * 2) The transaction happened in another database.
    1322              :  * 3) The output plugin is not interested in the origin.
    1323              :  * 4) We are doing fast-forwarding
    1324              :  */
    1325              : static bool
    1326         4132 : DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
    1327              :                   Oid txn_dbid, ReplOriginId origin_id)
    1328              : {
    1329         4132 :     if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
    1330         3921 :         (txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) ||
    1331         1982 :         FilterByOrigin(ctx, origin_id))
    1332         2188 :         return true;
    1333              : 
    1334              :     /*
    1335              :      * We also skip decoding in fast_forward mode. In passing set the
    1336              :      * processing_required flag to indicate that if it were not for
    1337              :      * fast_forward mode, processing would have been required.
    1338              :      */
    1339         1944 :     if (ctx->fast_forward)
    1340              :     {
    1341           40 :         ctx->processing_required = true;
    1342           40 :         return true;
    1343              :     }
    1344              : 
    1345         1904 :     return false;
    1346              : }
        

Generated by: LCOV version 2.0-1