LCOV - code coverage report
Current view: top level - src/backend/replication/logical - decode.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 94.8 % 439 416
Test Date: 2026-03-14 14:14:39 Functions: 100.0 % 20 20
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /* -------------------------------------------------------------------------
       2              :  *
       3              :  * decode.c
       4              :  *      This module decodes WAL records read using xlogreader.h's APIs for the
       5              :  *      purpose of logical decoding by passing information to the
       6              :  *      reorderbuffer module (containing the actual changes) and to the
       7              :  *      snapbuild module to build a fitting catalog snapshot (to be able to
       8              :  *      properly decode the changes in the reorderbuffer).
       9              :  *
      10              :  * NOTE:
      11              :  *      This basically tries to handle all low level xlog stuff for
      12              :  *      reorderbuffer.c and snapbuild.c. There's some minor leakage where a
      13              :  *      specific record's struct is used to pass data along, but those just
      14              :  *      happen to contain the right amount of data in a convenient
      15              :  *      format. There isn't and shouldn't be much intelligence about the
      16              :  *      contents of records in here except turning them into a more usable
      17              :  *      format.
      18              :  *
      19              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
      20              :  * Portions Copyright (c) 1994, Regents of the University of California
      21              :  *
      22              :  * IDENTIFICATION
      23              :  *    src/backend/replication/logical/decode.c
      24              :  *
      25              :  * -------------------------------------------------------------------------
      26              :  */
      27              : #include "postgres.h"
      28              : 
      29              : #include "access/heapam_xlog.h"
      30              : #include "access/transam.h"
      31              : #include "access/xact.h"
      32              : #include "access/xlog_internal.h"
      33              : #include "access/xlogreader.h"
      34              : #include "access/xlogrecord.h"
      35              : #include "catalog/pg_control.h"
      36              : #include "replication/decode.h"
      37              : #include "replication/logical.h"
      38              : #include "replication/message.h"
      39              : #include "replication/reorderbuffer.h"
      40              : #include "replication/snapbuild.h"
      41              : #include "storage/standbydefs.h"
      42              : 
      43              : /* individual record(group)'s handlers */
      44              : static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      45              : static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      46              : static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      47              : static void DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      48              : static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      49              : static void DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
      50              : 
      51              : static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
      52              :                          xl_xact_parsed_commit *parsed, TransactionId xid,
      53              :                          bool two_phase);
      54              : static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
      55              :                         xl_xact_parsed_abort *parsed, TransactionId xid,
      56              :                         bool two_phase);
      57              : static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
      58              :                           xl_xact_parsed_prepare *parsed);
      59              : 
      60              : 
      61              : /* common function to decode tuples */
      62              : static void DecodeXLogTuple(char *data, Size len, HeapTuple tuple);
      63              : 
      64              : /* helper functions for decoding transactions */
      65              : static inline bool FilterPrepare(LogicalDecodingContext *ctx,
      66              :                                  TransactionId xid, const char *gid);
      67              : static bool DecodeTXNNeedSkip(LogicalDecodingContext *ctx,
      68              :                               XLogRecordBuffer *buf, Oid txn_dbid,
      69              :                               ReplOriginId origin_id);
      70              : 
      71              : /*
      72              :  * Take every XLogReadRecord()ed record and perform the actions required to
      73              :  * decode it using the output plugin already setup in the logical decoding
      74              :  * context.
      75              :  *
      76              :  * NB: Note that every record's xid needs to be processed by reorderbuffer
      77              :  * (xids contained in the content of records are not relevant for this rule).
      78              :  * That means that for records which'd otherwise not go through the
      79              :  * reorderbuffer ReorderBufferProcessXid() has to be called. We don't want to
      80              :  * call ReorderBufferProcessXid for each record type by default, because
      81              :  * e.g. empty xacts can be handled more efficiently if there's no previous
      82              :  * state for them.
      83              :  *
      84              :  * We also support the ability to fast forward thru records, skipping some
      85              :  * record types completely - see individual record types for details.
      86              :  */
      87              : void
      88      2351590 : LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
      89              : {
      90              :     XLogRecordBuffer buf;
      91              :     TransactionId txid;
      92              :     RmgrData    rmgr;
      93              : 
      94      2351590 :     buf.origptr = ctx->reader->ReadRecPtr;
      95      2351590 :     buf.endptr = ctx->reader->EndRecPtr;
      96      2351590 :     buf.record = record;
      97              : 
      98      2351590 :     txid = XLogRecGetTopXid(record);
      99              : 
     100              :     /*
     101              :      * If the top-level xid is valid, we need to assign the subxact to the
     102              :      * top-level xact. We need to do this for all records, hence we do it
     103              :      * before the switch.
     104              :      */
     105      2351590 :     if (TransactionIdIsValid(txid))
     106              :     {
     107          683 :         ReorderBufferAssignChild(ctx->reorder,
     108              :                                  txid,
     109          683 :                                  XLogRecGetXid(record),
     110              :                                  buf.origptr);
     111              :     }
     112              : 
     113      2351590 :     rmgr = GetRmgr(XLogRecGetRmid(record));
     114              : 
     115      2351590 :     if (rmgr.rm_decode != NULL)
     116      1840173 :         rmgr.rm_decode(ctx, &buf);
     117              :     else
     118              :     {
     119              :         /* just deal with xid, and done */
     120       511417 :         ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(record),
     121              :                                 buf.origptr);
     122              :     }
     123      2351578 : }
     124              : 
     125              : /*
     126              :  * Handle rmgr XLOG_ID records for LogicalDecodingProcessRecord().
     127              :  */
     128              : void
     129         7171 : xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     130              : {
     131         7171 :     SnapBuild  *builder = ctx->snapshot_builder;
     132         7171 :     uint8       info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK;
     133              : 
     134         7171 :     ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(buf->record),
     135              :                             buf->origptr);
     136              : 
     137         7171 :     switch (info)
     138              :     {
     139              :             /* this is also used in END_OF_RECOVERY checkpoints */
     140           79 :         case XLOG_CHECKPOINT_SHUTDOWN:
     141              :         case XLOG_END_OF_RECOVERY:
     142           79 :             SnapBuildSerializationPoint(builder, buf->origptr);
     143              : 
     144           79 :             break;
     145           82 :         case XLOG_CHECKPOINT_ONLINE:
     146              : 
     147              :             /*
     148              :              * a RUNNING_XACTS record will have been logged near to this, we
     149              :              * can restart from there.
     150              :              */
     151           82 :             break;
     152            0 :         case XLOG_LOGICAL_DECODING_STATUS_CHANGE:
     153              :             {
     154              :                 bool        logical_decoding;
     155              : 
     156            0 :                 memcpy(&logical_decoding, XLogRecGetData(buf->record), sizeof(bool));
     157              : 
     158              :                 /*
     159              :                  * Error out as we should not decode this WAL record.
     160              :                  *
     161              :                  * Logical decoding is disabled, and existing logical slots on
     162              :                  * the standby are invalidated when this WAL record is
     163              :                  * replayed. No logical decoder can process this WAL record
     164              :                  * until replay completes, and by then the slots are already
     165              :                  * invalidated. Furthermore, no new logical slots can be
     166              :                  * created while logical decoding is disabled. This cannot
     167              :                  * occur even on primary either, since it will not restart
     168              :                  * with wal_level < replica if any logical slots exist.
     169              :                  */
     170            0 :                 elog(ERROR, "unexpected logical decoding status change %d",
     171              :                      logical_decoding);
     172              : 
     173              :                 break;
     174              :             }
     175         7010 :         case XLOG_NOOP:
     176              :         case XLOG_NEXTOID:
     177              :         case XLOG_SWITCH:
     178              :         case XLOG_BACKUP_END:
     179              :         case XLOG_PARAMETER_CHANGE:
     180              :         case XLOG_RESTORE_POINT:
     181              :         case XLOG_FPW_CHANGE:
     182              :         case XLOG_FPI_FOR_HINT:
     183              :         case XLOG_FPI:
     184              :         case XLOG_OVERWRITE_CONTRECORD:
     185              :         case XLOG_CHECKPOINT_REDO:
     186         7010 :             break;
     187            0 :         default:
     188            0 :             elog(ERROR, "unexpected RM_XLOG_ID record type: %u", info);
     189              :     }
     190         7171 : }
     191              : 
     192              : /*
     193              :  * Handle rmgr XACT_ID records for LogicalDecodingProcessRecord().
     194              :  */
     195              : void
     196         9702 : xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     197              : {
     198         9702 :     SnapBuild  *builder = ctx->snapshot_builder;
     199         9702 :     ReorderBuffer *reorder = ctx->reorder;
     200         9702 :     XLogReaderState *r = buf->record;
     201         9702 :     uint8       info = XLogRecGetInfo(r) & XLOG_XACT_OPMASK;
     202              : 
     203              :     /*
     204              :      * If the snapshot isn't yet fully built, we cannot decode anything, so
     205              :      * bail out.
     206              :      */
     207         9702 :     if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
     208           13 :         return;
     209              : 
     210         9689 :     switch (info)
     211              :     {
     212         3540 :         case XLOG_XACT_COMMIT:
     213              :         case XLOG_XACT_COMMIT_PREPARED:
     214              :             {
     215              :                 xl_xact_commit *xlrec;
     216              :                 xl_xact_parsed_commit parsed;
     217              :                 TransactionId xid;
     218         3540 :                 bool        two_phase = false;
     219              : 
     220         3540 :                 xlrec = (xl_xact_commit *) XLogRecGetData(r);
     221         3540 :                 ParseCommitRecord(XLogRecGetInfo(buf->record), xlrec, &parsed);
     222              : 
     223         3540 :                 if (!TransactionIdIsValid(parsed.twophase_xid))
     224         3419 :                     xid = XLogRecGetXid(r);
     225              :                 else
     226          121 :                     xid = parsed.twophase_xid;
     227              : 
     228              :                 /*
     229              :                  * We would like to process the transaction in a two-phase
     230              :                  * manner iff output plugin supports two-phase commits and
     231              :                  * doesn't filter the transaction at prepare time.
     232              :                  */
     233         3540 :                 if (info == XLOG_XACT_COMMIT_PREPARED)
     234          121 :                     two_phase = !(FilterPrepare(ctx, xid,
     235          121 :                                                 parsed.twophase_gid));
     236              : 
     237         3540 :                 DecodeCommit(ctx, buf, &parsed, xid, two_phase);
     238         3530 :                 break;
     239              :             }
     240          206 :         case XLOG_XACT_ABORT:
     241              :         case XLOG_XACT_ABORT_PREPARED:
     242              :             {
     243              :                 xl_xact_abort *xlrec;
     244              :                 xl_xact_parsed_abort parsed;
     245              :                 TransactionId xid;
     246          206 :                 bool        two_phase = false;
     247              : 
     248          206 :                 xlrec = (xl_xact_abort *) XLogRecGetData(r);
     249          206 :                 ParseAbortRecord(XLogRecGetInfo(buf->record), xlrec, &parsed);
     250              : 
     251          206 :                 if (!TransactionIdIsValid(parsed.twophase_xid))
     252          159 :                     xid = XLogRecGetXid(r);
     253              :                 else
     254           47 :                     xid = parsed.twophase_xid;
     255              : 
     256              :                 /*
     257              :                  * We would like to process the transaction in a two-phase
     258              :                  * manner iff output plugin supports two-phase commits and
     259              :                  * doesn't filter the transaction at prepare time.
     260              :                  */
     261          206 :                 if (info == XLOG_XACT_ABORT_PREPARED)
     262           47 :                     two_phase = !(FilterPrepare(ctx, xid,
     263           47 :                                                 parsed.twophase_gid));
     264              : 
     265          206 :                 DecodeAbort(ctx, buf, &parsed, xid, two_phase);
     266          206 :                 break;
     267              :             }
     268          132 :         case XLOG_XACT_ASSIGNMENT:
     269              : 
     270              :             /*
     271              :              * We assign subxact to the toplevel xact while processing each
     272              :              * record if required.  So, we don't need to do anything here. See
     273              :              * LogicalDecodingProcessRecord.
     274              :              */
     275          132 :             break;
     276         5620 :         case XLOG_XACT_INVALIDATIONS:
     277              :             {
     278              :                 TransactionId xid;
     279              :                 xl_xact_invals *invals;
     280              : 
     281         5620 :                 xid = XLogRecGetXid(r);
     282         5620 :                 invals = (xl_xact_invals *) XLogRecGetData(r);
     283              : 
     284              :                 /*
     285              :                  * Execute the invalidations for xid-less transactions,
     286              :                  * otherwise, accumulate them so that they can be processed at
     287              :                  * the commit time.
     288              :                  */
     289         5620 :                 if (TransactionIdIsValid(xid))
     290              :                 {
     291         5598 :                     if (!ctx->fast_forward)
     292         5529 :                         ReorderBufferAddInvalidations(reorder, xid,
     293              :                                                       buf->origptr,
     294         5529 :                                                       invals->nmsgs,
     295         5529 :                                                       invals->msgs);
     296         5598 :                     ReorderBufferXidSetCatalogChanges(ctx->reorder, xid,
     297              :                                                       buf->origptr);
     298              :                 }
     299           22 :                 else if (!ctx->fast_forward)
     300           22 :                     ReorderBufferImmediateInvalidation(ctx->reorder,
     301           22 :                                                        invals->nmsgs,
     302           22 :                                                        invals->msgs);
     303              : 
     304         5620 :                 break;
     305              :             }
     306          191 :         case XLOG_XACT_PREPARE:
     307              :             {
     308              :                 xl_xact_parsed_prepare parsed;
     309              :                 xl_xact_prepare *xlrec;
     310              : 
     311              :                 /* ok, parse it */
     312          191 :                 xlrec = (xl_xact_prepare *) XLogRecGetData(r);
     313          191 :                 ParsePrepareRecord(XLogRecGetInfo(buf->record),
     314              :                                    xlrec, &parsed);
     315              : 
     316              :                 /*
     317              :                  * We would like to process the transaction in a two-phase
     318              :                  * manner iff output plugin supports two-phase commits and
     319              :                  * doesn't filter the transaction at prepare time.
     320              :                  */
     321          191 :                 if (FilterPrepare(ctx, parsed.twophase_xid,
     322              :                                   parsed.twophase_gid))
     323              :                 {
     324           21 :                     ReorderBufferProcessXid(reorder, parsed.twophase_xid,
     325              :                                             buf->origptr);
     326           21 :                     break;
     327              :                 }
     328              : 
     329              :                 /*
     330              :                  * Note that if the prepared transaction has locked [user]
     331              :                  * catalog tables exclusively then decoding prepare can block
     332              :                  * till the main transaction is committed because it needs to
     333              :                  * lock the catalog tables.
     334              :                  *
     335              :                  * XXX Now, this can even lead to a deadlock if the prepare
     336              :                  * transaction is waiting to get it logically replicated for
     337              :                  * distributed 2PC. This can be avoided by disallowing
     338              :                  * preparing transactions that have locked [user] catalog
     339              :                  * tables exclusively but as of now, we ask users not to do
     340              :                  * such an operation.
     341              :                  */
     342          170 :                 DecodePrepare(ctx, buf, &parsed);
     343          170 :                 break;
     344              :             }
     345            0 :         default:
     346            0 :             elog(ERROR, "unexpected RM_XACT_ID record type: %u", info);
     347              :     }
     348              : }
     349              : 
     350              : /*
     351              :  * Handle rmgr STANDBY_ID records for LogicalDecodingProcessRecord().
     352              :  */
     353              : void
     354         4178 : standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     355              : {
     356         4178 :     SnapBuild  *builder = ctx->snapshot_builder;
     357         4178 :     XLogReaderState *r = buf->record;
     358         4178 :     uint8       info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
     359              : 
     360         4178 :     ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
     361              : 
     362         4178 :     switch (info)
     363              :     {
     364         1622 :         case XLOG_RUNNING_XACTS:
     365              :             {
     366         1622 :                 xl_running_xacts *running = (xl_running_xacts *) XLogRecGetData(r);
     367              : 
     368         1622 :                 SnapBuildProcessRunningXacts(builder, buf->origptr, running);
     369              : 
     370              :                 /*
     371              :                  * Abort all transactions that we keep track of, that are
     372              :                  * older than the record's oldestRunningXid. This is the most
     373              :                  * convenient spot for doing so since, in contrast to shutdown
     374              :                  * or end-of-recovery checkpoints, we have information about
     375              :                  * all running transactions which includes prepared ones,
     376              :                  * while shutdown checkpoints just know that no non-prepared
     377              :                  * transactions are in progress.
     378              :                  */
     379         1620 :                 ReorderBufferAbortOld(ctx->reorder, running->oldestRunningXid);
     380              :             }
     381         1620 :             break;
     382         2534 :         case XLOG_STANDBY_LOCK:
     383         2534 :             break;
     384           22 :         case XLOG_INVALIDATIONS:
     385              : 
     386              :             /*
     387              :              * We are processing the invalidations at the command level via
     388              :              * XLOG_XACT_INVALIDATIONS.  So we don't need to do anything here.
     389              :              */
     390           22 :             break;
     391            0 :         default:
     392            0 :             elog(ERROR, "unexpected RM_STANDBY_ID record type: %u", info);
     393              :     }
     394         4176 : }
     395              : 
     396              : /*
     397              :  * Handle rmgr HEAP2_ID records for LogicalDecodingProcessRecord().
     398              :  */
     399              : void
     400        34360 : heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     401              : {
     402        34360 :     uint8       info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK;
     403        34360 :     TransactionId xid = XLogRecGetXid(buf->record);
     404        34360 :     SnapBuild  *builder = ctx->snapshot_builder;
     405              : 
     406        34360 :     ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
     407              : 
     408              :     /*
     409              :      * If we don't have snapshot or we are just fast-forwarding, there is no
     410              :      * point in decoding data changes. However, it's crucial to build the base
     411              :      * snapshot during fast-forward mode (as is done in
     412              :      * SnapBuildProcessChange()) because we require the snapshot's xmin when
     413              :      * determining the candidate catalog_xmin for the replication slot. See
     414              :      * SnapBuildProcessRunningXacts().
     415              :      */
     416        34360 :     if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
     417            8 :         return;
     418              : 
     419        34352 :     switch (info)
     420              :     {
     421         6359 :         case XLOG_HEAP2_MULTI_INSERT:
     422         6359 :             if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
     423         6359 :                 !ctx->fast_forward)
     424         6263 :                 DecodeMultiInsert(ctx, buf);
     425         6359 :             break;
     426        25962 :         case XLOG_HEAP2_NEW_CID:
     427        25962 :             if (!ctx->fast_forward)
     428              :             {
     429              :                 xl_heap_new_cid *xlrec;
     430              : 
     431        25611 :                 xlrec = (xl_heap_new_cid *) XLogRecGetData(buf->record);
     432        25611 :                 SnapBuildProcessNewCid(builder, xid, buf->origptr, xlrec);
     433              :             }
     434        25962 :             break;
     435           90 :         case XLOG_HEAP2_REWRITE:
     436              : 
     437              :             /*
     438              :              * Although these records only exist to serve the needs of logical
     439              :              * decoding, all the work happens as part of crash or archive
     440              :              * recovery, so we don't need to do anything here.
     441              :              */
     442           90 :             break;
     443              : 
     444              :             /*
     445              :              * Everything else here is just low level physical stuff we're not
     446              :              * interested in.
     447              :              */
     448         1941 :         case XLOG_HEAP2_PRUNE_ON_ACCESS:
     449              :         case XLOG_HEAP2_PRUNE_VACUUM_SCAN:
     450              :         case XLOG_HEAP2_PRUNE_VACUUM_CLEANUP:
     451              :         case XLOG_HEAP2_VISIBLE:
     452              :         case XLOG_HEAP2_LOCK_UPDATED:
     453         1941 :             break;
     454            0 :         default:
     455            0 :             elog(ERROR, "unexpected RM_HEAP2_ID record type: %u", info);
     456              :     }
     457              : }
     458              : 
     459              : /*
     460              :  * Handle rmgr HEAP_ID records for LogicalDecodingProcessRecord().
     461              :  */
     462              : void
     463      1784667 : heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     464              : {
     465      1784667 :     uint8       info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK;
     466      1784667 :     TransactionId xid = XLogRecGetXid(buf->record);
     467      1784667 :     SnapBuild  *builder = ctx->snapshot_builder;
     468              : 
     469      1784667 :     ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
     470              : 
     471              :     /*
     472              :      * If we don't have snapshot or we are just fast-forwarding, there is no
     473              :      * point in decoding data changes. However, it's crucial to build the base
     474              :      * snapshot during fast-forward mode (as is done in
     475              :      * SnapBuildProcessChange()) because we require the snapshot's xmin when
     476              :      * determining the candidate catalog_xmin for the replication slot. See
     477              :      * SnapBuildProcessRunningXacts().
     478              :      */
     479      1784667 :     if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
     480         2026 :         return;
     481              : 
     482      1782641 :     switch (info)
     483              :     {
     484      1169565 :         case XLOG_HEAP_INSERT:
     485      1169565 :             if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
     486      1169565 :                 !ctx->fast_forward)
     487      1169380 :                 DecodeInsert(ctx, buf);
     488      1169565 :             break;
     489              : 
     490              :             /*
     491              :              * Treat HOT update as normal updates. There is no useful
     492              :              * information in the fact that we could make it a HOT update
     493              :              * locally and the WAL layout is compatible.
     494              :              */
     495       180870 :         case XLOG_HEAP_HOT_UPDATE:
     496              :         case XLOG_HEAP_UPDATE:
     497       180870 :             if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
     498       180870 :                 !ctx->fast_forward)
     499       180857 :                 DecodeUpdate(ctx, buf);
     500       180870 :             break;
     501              : 
     502       237759 :         case XLOG_HEAP_DELETE:
     503       237759 :             if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
     504       237759 :                 !ctx->fast_forward)
     505       237681 :                 DecodeDelete(ctx, buf);
     506       237759 :             break;
     507              : 
     508           71 :         case XLOG_HEAP_TRUNCATE:
     509           71 :             if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
     510           71 :                 !ctx->fast_forward)
     511           69 :                 DecodeTruncate(ctx, buf);
     512           71 :             break;
     513              : 
     514         1095 :         case XLOG_HEAP_INPLACE:
     515              : 
     516              :             /*
     517              :              * Inplace updates are only ever performed on catalog tuples and
     518              :              * can, per definition, not change tuple visibility.  Since we
     519              :              * also don't decode catalog tuples, we're not interested in the
     520              :              * record's contents.
     521              :              */
     522         1095 :             break;
     523              : 
     524        17916 :         case XLOG_HEAP_CONFIRM:
     525        17916 :             if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
     526        17916 :                 !ctx->fast_forward)
     527        17916 :                 DecodeSpecConfirm(ctx, buf);
     528        17916 :             break;
     529              : 
     530       175365 :         case XLOG_HEAP_LOCK:
     531              :             /* we don't care about row level locks for now */
     532       175365 :             break;
     533              : 
     534            0 :         default:
     535            0 :             elog(ERROR, "unexpected RM_HEAP_ID record type: %u", info);
     536              :             break;
     537              :     }
     538              : }
     539              : 
     540              : /*
     541              :  * Ask output plugin whether we want to skip this PREPARE and send
     542              :  * this transaction as a regular commit later.
     543              :  */
     544              : static inline bool
     545          359 : FilterPrepare(LogicalDecodingContext *ctx, TransactionId xid,
     546              :               const char *gid)
     547              : {
     548              :     /*
     549              :      * Skip if decoding of two-phase transactions at PREPARE time is not
     550              :      * enabled. In that case, all two-phase transactions are considered
     551              :      * filtered out and will be applied as regular transactions at COMMIT
     552              :      * PREPARED.
     553              :      */
     554          359 :     if (!ctx->twophase)
     555           24 :         return true;
     556              : 
     557              :     /*
     558              :      * The filter_prepare callback is optional. When not supplied, all
     559              :      * prepared transactions should go through.
     560              :      */
     561          335 :     if (ctx->callbacks.filter_prepare_cb == NULL)
     562          187 :         return false;
     563              : 
     564          148 :     return filter_prepare_cb_wrapper(ctx, xid, gid);
     565              : }
     566              : 
     567              : static inline bool
     568      1603572 : FilterByOrigin(LogicalDecodingContext *ctx, ReplOriginId origin_id)
     569              : {
     570      1603572 :     if (ctx->callbacks.filter_by_origin_cb == NULL)
     571           40 :         return false;
     572              : 
     573      1603532 :     return filter_by_origin_cb_wrapper(ctx, origin_id);
     574              : }
     575              : 
     576              : /*
     577              :  * Handle rmgr LOGICALMSG_ID records for LogicalDecodingProcessRecord().
     578              :  */
     579              : void
     580           95 : logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     581              : {
     582           95 :     SnapBuild  *builder = ctx->snapshot_builder;
     583           95 :     XLogReaderState *r = buf->record;
     584           95 :     TransactionId xid = XLogRecGetXid(r);
     585           95 :     uint8       info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
     586           95 :     ReplOriginId origin_id = XLogRecGetOrigin(r);
     587           95 :     Snapshot    snapshot = NULL;
     588              :     xl_logical_message *message;
     589              : 
     590           95 :     if (info != XLOG_LOGICAL_MESSAGE)
     591            0 :         elog(ERROR, "unexpected RM_LOGICALMSG_ID record type: %u", info);
     592              : 
     593           95 :     ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
     594              : 
     595              :     /* If we don't have snapshot, there is no point in decoding messages */
     596           95 :     if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
     597            0 :         return;
     598              : 
     599           95 :     message = (xl_logical_message *) XLogRecGetData(r);
     600              : 
     601          188 :     if (message->dbId != ctx->slot->data.database ||
     602           93 :         FilterByOrigin(ctx, origin_id))
     603            4 :         return;
     604              : 
     605           91 :     if (message->transactional &&
     606           39 :         !SnapBuildProcessChange(builder, xid, buf->origptr))
     607            0 :         return;
     608          143 :     else if (!message->transactional &&
     609          104 :              (SnapBuildCurrentState(builder) != SNAPBUILD_CONSISTENT ||
     610           52 :               SnapBuildXactNeedsSkip(builder, buf->origptr)))
     611           40 :         return;
     612              : 
     613              :     /*
     614              :      * We also skip decoding in fast_forward mode. This check must be last
     615              :      * because we don't want to set the processing_required flag unless we
     616              :      * have a decodable message.
     617              :      */
     618           51 :     if (ctx->fast_forward)
     619              :     {
     620              :         /*
     621              :          * We need to set processing_required flag to notify the message's
     622              :          * existence to the caller. Usually, the flag is set when either the
     623              :          * COMMIT or ABORT records are decoded, but this must be turned on
     624              :          * here because the non-transactional logical message is decoded
     625              :          * without waiting for these records.
     626              :          */
     627            3 :         if (!message->transactional)
     628            3 :             ctx->processing_required = true;
     629              : 
     630            3 :         return;
     631              :     }
     632              : 
     633              :     /*
     634              :      * If this is a non-transactional change, get the snapshot we're expected
     635              :      * to use. We only get here when the snapshot is consistent, and the
     636              :      * change is not meant to be skipped.
     637              :      *
     638              :      * For transactional changes we don't need a snapshot, we'll use the
     639              :      * regular snapshot maintained by ReorderBuffer. We just leave it NULL.
     640              :      */
     641           48 :     if (!message->transactional)
     642            9 :         snapshot = SnapBuildGetOrBuildSnapshot(builder);
     643              : 
     644           48 :     ReorderBufferQueueMessage(ctx->reorder, xid, snapshot, buf->endptr,
     645           48 :                               message->transactional,
     646           48 :                               message->message, /* first part of message is
     647              :                                                  * prefix */
     648              :                               message->message_size,
     649           48 :                               message->message + message->prefix_size);
     650              : }
     651              : 
     652              : /*
     653              :  * Consolidated commit record handling between the different form of commit
     654              :  * records.
     655              :  *
     656              :  * 'two_phase' indicates that caller wants to process the transaction in two
     657              :  * phases, first process prepare if not already done and then process
     658              :  * commit_prepared.
     659              :  */
     660              : static void
     661         3540 : DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
     662              :              xl_xact_parsed_commit *parsed, TransactionId xid,
     663              :              bool two_phase)
     664              : {
     665         3540 :     XLogRecPtr  origin_lsn = InvalidXLogRecPtr;
     666         3540 :     TimestampTz commit_time = parsed->xact_time;
     667         3540 :     ReplOriginId origin_id = XLogRecGetOrigin(buf->record);
     668              :     int         i;
     669              : 
     670         3540 :     if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
     671              :     {
     672           99 :         origin_lsn = parsed->origin_lsn;
     673           99 :         commit_time = parsed->origin_timestamp;
     674              :     }
     675              : 
     676         3540 :     SnapBuildCommitTxn(ctx->snapshot_builder, buf->origptr, xid,
     677              :                        parsed->nsubxacts, parsed->subxacts,
     678              :                        parsed->xinfo);
     679              : 
     680              :     /* ----
     681              :      * Check whether we are interested in this specific transaction, and tell
     682              :      * the reorderbuffer to forget the content of the (sub-)transactions
     683              :      * if not.
     684              :      *
     685              :      * We can't just use ReorderBufferAbort() here, because we need to execute
     686              :      * the transaction's invalidations.  This currently won't be needed if
     687              :      * we're just skipping over the transaction because currently we only do
     688              :      * so during startup, to get to the first transaction the client needs. As
     689              :      * we have reset the catalog caches before starting to read WAL, and we
     690              :      * haven't yet touched any catalogs, there can't be anything to invalidate.
     691              :      * But if we're "forgetting" this commit because it happened in another
     692              :      * database, the invalidations might be important, because they could be
     693              :      * for shared catalogs and we might have loaded data into the relevant
     694              :      * syscaches.
     695              :      * ---
     696              :      */
     697         3540 :     if (DecodeTXNNeedSkip(ctx, buf, parsed->dbId, origin_id))
     698              :     {
     699         2789 :         for (i = 0; i < parsed->nsubxacts; i++)
     700              :         {
     701          983 :             ReorderBufferForget(ctx->reorder, parsed->subxacts[i], buf->origptr);
     702              :         }
     703         1806 :         ReorderBufferForget(ctx->reorder, xid, buf->origptr);
     704              : 
     705         1806 :         return;
     706              :     }
     707              : 
     708              :     /* tell the reorderbuffer about the surviving subtransactions */
     709         2000 :     for (i = 0; i < parsed->nsubxacts; i++)
     710              :     {
     711          266 :         ReorderBufferCommitChild(ctx->reorder, xid, parsed->subxacts[i],
     712              :                                  buf->origptr, buf->endptr);
     713              :     }
     714              : 
     715              :     /*
     716              :      * Send the final commit record if the transaction data is already
     717              :      * decoded, otherwise, process the entire transaction.
     718              :      */
     719         1734 :     if (two_phase)
     720              :     {
     721           35 :         ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
     722           35 :                                     SnapBuildGetTwoPhaseAt(ctx->snapshot_builder),
     723              :                                     commit_time, origin_id, origin_lsn,
     724           35 :                                     parsed->twophase_gid, true);
     725              :     }
     726              :     else
     727              :     {
     728         1699 :         ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
     729              :                             commit_time, origin_id, origin_lsn);
     730              :     }
     731              : 
     732              :     /*
     733              :      * Update the decoding stats at transaction prepare/commit/abort.
     734              :      * Additionally we send the stats when we spill or stream the changes to
     735              :      * avoid losing them in case the decoding is interrupted. It is not clear
     736              :      * that sending more or less frequently than this would be better.
     737              :      */
     738         1724 :     UpdateDecodingStats(ctx);
     739              : }
     740              : 
     741              : /*
     742              :  * Decode PREPARE record. Similar logic as in DecodeCommit.
     743              :  *
     744              :  * Note that we don't skip prepare even if have detected concurrent abort
     745              :  * because it is quite possible that we had already sent some changes before we
     746              :  * detect abort in which case we need to abort those changes in the subscriber.
     747              :  * To abort such changes, we do send the prepare and then the rollback prepared
     748              :  * which is what happened on the publisher-side as well. Now, we can invent a
     749              :  * new abort API wherein in such cases we send abort and skip sending prepared
     750              :  * and rollback prepared but then it is not that straightforward because we
     751              :  * might have streamed this transaction by that time in which case it is
     752              :  * handled when the rollback is encountered. It is not impossible to optimize
     753              :  * the concurrent abort case but it can introduce design complexity w.r.t
     754              :  * handling different cases so leaving it for now as it doesn't seem worth it.
     755              :  */
     756              : static void
     757          170 : DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
     758              :               xl_xact_parsed_prepare *parsed)
     759              : {
     760          170 :     SnapBuild  *builder = ctx->snapshot_builder;
     761          170 :     XLogRecPtr  origin_lsn = parsed->origin_lsn;
     762          170 :     TimestampTz prepare_time = parsed->xact_time;
     763          170 :     ReplOriginId origin_id = XLogRecGetOrigin(buf->record);
     764              :     int         i;
     765          170 :     TransactionId xid = parsed->twophase_xid;
     766              : 
     767          170 :     if (parsed->origin_timestamp != 0)
     768            8 :         prepare_time = parsed->origin_timestamp;
     769              : 
     770              :     /*
     771              :      * Remember the prepare info for a txn so that it can be used later in
     772              :      * commit prepared if required. See ReorderBufferFinishPrepared.
     773              :      */
     774          170 :     if (!ReorderBufferRememberPrepareInfo(ctx->reorder, xid, buf->origptr,
     775              :                                           buf->endptr, prepare_time, origin_id,
     776              :                                           origin_lsn))
     777            0 :         return;
     778              : 
     779              :     /* We can't start streaming unless a consistent state is reached. */
     780          170 :     if (SnapBuildCurrentState(builder) < SNAPBUILD_CONSISTENT)
     781              :     {
     782            3 :         ReorderBufferSkipPrepare(ctx->reorder, xid);
     783            3 :         return;
     784              :     }
     785              : 
     786              :     /*
     787              :      * Check whether we need to process this transaction. See
     788              :      * DecodeTXNNeedSkip for the reasons why we sometimes want to skip the
     789              :      * transaction.
     790              :      *
     791              :      * We can't call ReorderBufferForget as we did in DecodeCommit as the txn
     792              :      * hasn't yet been committed, removing this txn before a commit might
     793              :      * result in the computation of an incorrect restart_lsn. See
     794              :      * SnapBuildProcessRunningXacts. But we need to process cache
     795              :      * invalidations if there are any for the reasons mentioned in
     796              :      * DecodeCommit.
     797              :      */
     798          167 :     if (DecodeTXNNeedSkip(ctx, buf, parsed->dbId, origin_id))
     799              :     {
     800          123 :         ReorderBufferSkipPrepare(ctx->reorder, xid);
     801          123 :         ReorderBufferInvalidate(ctx->reorder, xid, buf->origptr);
     802          123 :         return;
     803              :     }
     804              : 
     805              :     /* Tell the reorderbuffer about the surviving subtransactions. */
     806           45 :     for (i = 0; i < parsed->nsubxacts; i++)
     807              :     {
     808            1 :         ReorderBufferCommitChild(ctx->reorder, xid, parsed->subxacts[i],
     809              :                                  buf->origptr, buf->endptr);
     810              :     }
     811              : 
     812              :     /* replay actions of all transaction + subtransactions in order */
     813           44 :     ReorderBufferPrepare(ctx->reorder, xid, parsed->twophase_gid);
     814              : 
     815              :     /*
     816              :      * Update the decoding stats at transaction prepare/commit/abort.
     817              :      * Additionally we send the stats when we spill or stream the changes to
     818              :      * avoid losing them in case the decoding is interrupted. It is not clear
     819              :      * that sending more or less frequently than this would be better.
     820              :      */
     821           44 :     UpdateDecodingStats(ctx);
     822              : }
     823              : 
     824              : 
     825              : /*
     826              :  * Get the data from the various forms of abort records and pass it on to
     827              :  * snapbuild.c and reorderbuffer.c.
     828              :  *
     829              :  * 'two_phase' indicates to finish prepared transaction.
     830              :  */
     831              : static void
     832          206 : DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
     833              :             xl_xact_parsed_abort *parsed, TransactionId xid,
     834              :             bool two_phase)
     835              : {
     836              :     int         i;
     837          206 :     XLogRecPtr  origin_lsn = InvalidXLogRecPtr;
     838          206 :     TimestampTz abort_time = parsed->xact_time;
     839          206 :     ReplOriginId origin_id = XLogRecGetOrigin(buf->record);
     840              :     bool        skip_xact;
     841              : 
     842          206 :     if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
     843              :     {
     844            4 :         origin_lsn = parsed->origin_lsn;
     845            4 :         abort_time = parsed->origin_timestamp;
     846              :     }
     847              : 
     848              :     /*
     849              :      * Check whether we need to process this transaction. See
     850              :      * DecodeTXNNeedSkip for the reasons why we sometimes want to skip the
     851              :      * transaction.
     852              :      */
     853          206 :     skip_xact = DecodeTXNNeedSkip(ctx, buf, parsed->dbId, origin_id);
     854              : 
     855              :     /*
     856              :      * Send the final rollback record for a prepared transaction unless we
     857              :      * need to skip it. For non-two-phase xacts, simply forget the xact.
     858              :      */
     859          206 :     if (two_phase && !skip_xact)
     860              :     {
     861           11 :         ReorderBufferFinishPrepared(ctx->reorder, xid, buf->origptr, buf->endptr,
     862              :                                     InvalidXLogRecPtr,
     863              :                                     abort_time, origin_id, origin_lsn,
     864           11 :                                     parsed->twophase_gid, false);
     865              :     }
     866              :     else
     867              :     {
     868          201 :         for (i = 0; i < parsed->nsubxacts; i++)
     869              :         {
     870            6 :             ReorderBufferAbort(ctx->reorder, parsed->subxacts[i],
     871            6 :                                buf->record->EndRecPtr, abort_time);
     872              :         }
     873              : 
     874          195 :         ReorderBufferAbort(ctx->reorder, xid, buf->record->EndRecPtr,
     875              :                            abort_time);
     876              :     }
     877              : 
     878              :     /* update the decoding stats */
     879          206 :     UpdateDecodingStats(ctx);
     880          206 : }
     881              : 
     882              : /*
     883              :  * Parse XLOG_HEAP_INSERT (not MULTI_INSERT!) records into tuplebufs.
     884              :  *
     885              :  * Inserts can contain the new tuple.
     886              :  */
     887              : static void
     888      1169380 : DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     889              : {
     890              :     Size        datalen;
     891              :     char       *tupledata;
     892              :     Size        tuplelen;
     893      1169380 :     XLogReaderState *r = buf->record;
     894              :     xl_heap_insert *xlrec;
     895              :     ReorderBufferChange *change;
     896              :     RelFileLocator target_locator;
     897              : 
     898      1169380 :     xlrec = (xl_heap_insert *) XLogRecGetData(r);
     899              : 
     900              :     /*
     901              :      * Ignore insert records without new tuples (this does happen when
     902              :      * raw_heap_insert marks the TOAST record as HEAP_INSERT_NO_LOGICAL).
     903              :      */
     904      1169380 :     if (!(xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE))
     905         4263 :         return;
     906              : 
     907              :     /* only interested in our database */
     908      1165229 :     XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
     909      1165229 :     if (target_locator.dbOid != ctx->slot->data.database)
     910            0 :         return;
     911              : 
     912              :     /* output plugin doesn't look for this origin, no need to queue */
     913      1165229 :     if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
     914          112 :         return;
     915              : 
     916      1165117 :     change = ReorderBufferAllocChange(ctx->reorder);
     917      1165117 :     if (!(xlrec->flags & XLH_INSERT_IS_SPECULATIVE))
     918      1147201 :         change->action = REORDER_BUFFER_CHANGE_INSERT;
     919              :     else
     920        17916 :         change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT;
     921      1165117 :     change->origin_id = XLogRecGetOrigin(r);
     922              : 
     923      1165117 :     memcpy(&change->data.tp.rlocator, &target_locator, sizeof(RelFileLocator));
     924              : 
     925      1165117 :     tupledata = XLogRecGetBlockData(r, 0, &datalen);
     926      1165117 :     tuplelen = datalen - SizeOfHeapHeader;
     927              : 
     928      1165117 :     change->data.tp.newtuple =
     929      1165117 :         ReorderBufferAllocTupleBuf(ctx->reorder, tuplelen);
     930              : 
     931      1165117 :     DecodeXLogTuple(tupledata, datalen, change->data.tp.newtuple);
     932              : 
     933      1165117 :     change->data.tp.clear_toast_afterwards = true;
     934              : 
     935      1165117 :     ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
     936              :                              change,
     937      1165117 :                              xlrec->flags & XLH_INSERT_ON_TOAST_RELATION);
     938              : }
     939              : 
     940              : /*
     941              :  * Parse XLOG_HEAP_UPDATE and XLOG_HEAP_HOT_UPDATE, which have the same layout
     942              :  * in the record, from wal into proper tuplebufs.
     943              :  *
     944              :  * Updates can possibly contain a new tuple and the old primary key.
     945              :  */
     946              : static void
     947       180857 : DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
     948              : {
     949       180857 :     XLogReaderState *r = buf->record;
     950              :     xl_heap_update *xlrec;
     951              :     ReorderBufferChange *change;
     952              :     char       *data;
     953              :     RelFileLocator target_locator;
     954              : 
     955       180857 :     xlrec = (xl_heap_update *) XLogRecGetData(r);
     956              : 
     957              :     /* only interested in our database */
     958       180857 :     XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
     959       180857 :     if (target_locator.dbOid != ctx->slot->data.database)
     960          215 :         return;
     961              : 
     962              :     /* output plugin doesn't look for this origin, no need to queue */
     963       180674 :     if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
     964           32 :         return;
     965              : 
     966       180642 :     change = ReorderBufferAllocChange(ctx->reorder);
     967       180642 :     change->action = REORDER_BUFFER_CHANGE_UPDATE;
     968       180642 :     change->origin_id = XLogRecGetOrigin(r);
     969       180642 :     memcpy(&change->data.tp.rlocator, &target_locator, sizeof(RelFileLocator));
     970              : 
     971       180642 :     if (xlrec->flags & XLH_UPDATE_CONTAINS_NEW_TUPLE)
     972              :     {
     973              :         Size        datalen;
     974              :         Size        tuplelen;
     975              : 
     976       178831 :         data = XLogRecGetBlockData(r, 0, &datalen);
     977              : 
     978       178831 :         tuplelen = datalen - SizeOfHeapHeader;
     979              : 
     980       178831 :         change->data.tp.newtuple =
     981       178831 :             ReorderBufferAllocTupleBuf(ctx->reorder, tuplelen);
     982              : 
     983       178831 :         DecodeXLogTuple(data, datalen, change->data.tp.newtuple);
     984              :     }
     985              : 
     986       180642 :     if (xlrec->flags & XLH_UPDATE_CONTAINS_OLD)
     987              :     {
     988              :         Size        datalen;
     989              :         Size        tuplelen;
     990              : 
     991              :         /* caution, remaining data in record is not aligned */
     992          310 :         data = XLogRecGetData(r) + SizeOfHeapUpdate;
     993          310 :         datalen = XLogRecGetDataLen(r) - SizeOfHeapUpdate;
     994          310 :         tuplelen = datalen - SizeOfHeapHeader;
     995              : 
     996          310 :         change->data.tp.oldtuple =
     997          310 :             ReorderBufferAllocTupleBuf(ctx->reorder, tuplelen);
     998              : 
     999          310 :         DecodeXLogTuple(data, datalen, change->data.tp.oldtuple);
    1000              :     }
    1001              : 
    1002       180642 :     change->data.tp.clear_toast_afterwards = true;
    1003              : 
    1004       180642 :     ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
    1005              :                              change, false);
    1006              : }
    1007              : 
    1008              : /*
    1009              :  * Parse XLOG_HEAP_DELETE from wal into proper tuplebufs.
    1010              :  *
    1011              :  * Deletes can possibly contain the old primary key.
    1012              :  */
    1013              : static void
    1014       237681 : DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
    1015              : {
    1016       237681 :     XLogReaderState *r = buf->record;
    1017              :     xl_heap_delete *xlrec;
    1018              :     ReorderBufferChange *change;
    1019              :     RelFileLocator target_locator;
    1020              : 
    1021       237681 :     xlrec = (xl_heap_delete *) XLogRecGetData(r);
    1022              : 
    1023              :     /* only interested in our database */
    1024       237681 :     XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
    1025       237681 :     if (target_locator.dbOid != ctx->slot->data.database)
    1026           48 :         return;
    1027              : 
    1028              :     /* output plugin doesn't look for this origin, no need to queue */
    1029       237650 :     if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
    1030           17 :         return;
    1031              : 
    1032       237633 :     change = ReorderBufferAllocChange(ctx->reorder);
    1033              : 
    1034       237633 :     if (xlrec->flags & XLH_DELETE_IS_SUPER)
    1035            0 :         change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT;
    1036              :     else
    1037       237633 :         change->action = REORDER_BUFFER_CHANGE_DELETE;
    1038              : 
    1039       237633 :     change->origin_id = XLogRecGetOrigin(r);
    1040              : 
    1041       237633 :     memcpy(&change->data.tp.rlocator, &target_locator, sizeof(RelFileLocator));
    1042              : 
    1043              :     /* old primary key stored */
    1044       237633 :     if (xlrec->flags & XLH_DELETE_CONTAINS_OLD)
    1045              :     {
    1046       175832 :         Size        datalen = XLogRecGetDataLen(r) - SizeOfHeapDelete;
    1047       175832 :         Size        tuplelen = datalen - SizeOfHeapHeader;
    1048              : 
    1049              :         Assert(XLogRecGetDataLen(r) > (SizeOfHeapDelete + SizeOfHeapHeader));
    1050              : 
    1051       175832 :         change->data.tp.oldtuple =
    1052       175832 :             ReorderBufferAllocTupleBuf(ctx->reorder, tuplelen);
    1053              : 
    1054       175832 :         DecodeXLogTuple((char *) xlrec + SizeOfHeapDelete,
    1055              :                         datalen, change->data.tp.oldtuple);
    1056              :     }
    1057              : 
    1058       237633 :     change->data.tp.clear_toast_afterwards = true;
    1059              : 
    1060       237633 :     ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
    1061              :                              change, false);
    1062              : }
    1063              : 
    1064              : /*
    1065              :  * Parse XLOG_HEAP_TRUNCATE from wal
    1066              :  */
    1067              : static void
    1068           69 : DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
    1069              : {
    1070           69 :     XLogReaderState *r = buf->record;
    1071              :     xl_heap_truncate *xlrec;
    1072              :     ReorderBufferChange *change;
    1073              : 
    1074           69 :     xlrec = (xl_heap_truncate *) XLogRecGetData(r);
    1075              : 
    1076              :     /* only interested in our database */
    1077           69 :     if (xlrec->dbId != ctx->slot->data.database)
    1078            0 :         return;
    1079              : 
    1080              :     /* output plugin doesn't look for this origin, no need to queue */
    1081           69 :     if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
    1082            1 :         return;
    1083              : 
    1084           68 :     change = ReorderBufferAllocChange(ctx->reorder);
    1085           68 :     change->action = REORDER_BUFFER_CHANGE_TRUNCATE;
    1086           68 :     change->origin_id = XLogRecGetOrigin(r);
    1087           68 :     if (xlrec->flags & XLH_TRUNCATE_CASCADE)
    1088            1 :         change->data.truncate.cascade = true;
    1089           68 :     if (xlrec->flags & XLH_TRUNCATE_RESTART_SEQS)
    1090            2 :         change->data.truncate.restart_seqs = true;
    1091           68 :     change->data.truncate.nrelids = xlrec->nrelids;
    1092          136 :     change->data.truncate.relids = ReorderBufferAllocRelids(ctx->reorder,
    1093           68 :                                                             xlrec->nrelids);
    1094           68 :     memcpy(change->data.truncate.relids, xlrec->relids,
    1095           68 :            xlrec->nrelids * sizeof(Oid));
    1096           68 :     ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r),
    1097              :                              buf->origptr, change, false);
    1098              : }
    1099              : 
    1100              : /*
    1101              :  * Decode XLOG_HEAP2_MULTI_INSERT record into multiple tuplebufs.
    1102              :  *
    1103              :  * Currently MULTI_INSERT will always contain the full tuples.
    1104              :  */
    1105              : static void
    1106         6263 : DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
    1107              : {
    1108         6263 :     XLogReaderState *r = buf->record;
    1109              :     xl_heap_multi_insert *xlrec;
    1110              :     int         i;
    1111              :     char       *data;
    1112              :     char       *tupledata;
    1113              :     Size        tuplelen;
    1114              :     RelFileLocator rlocator;
    1115              : 
    1116         6263 :     xlrec = (xl_heap_multi_insert *) XLogRecGetData(r);
    1117              : 
    1118              :     /*
    1119              :      * Ignore insert records without new tuples.  This happens when a
    1120              :      * multi_insert is done on a catalog or on a non-persistent relation.
    1121              :      */
    1122         6263 :     if (!(xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE))
    1123         6248 :         return;
    1124              : 
    1125              :     /* only interested in our database */
    1126           53 :     XLogRecGetBlockTag(r, 0, &rlocator, NULL, NULL);
    1127           53 :     if (rlocator.dbOid != ctx->slot->data.database)
    1128           38 :         return;
    1129              : 
    1130              :     /* output plugin doesn't look for this origin, no need to queue */
    1131           15 :     if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
    1132            0 :         return;
    1133              : 
    1134              :     /*
    1135              :      * We know that this multi_insert isn't for a catalog, so the block should
    1136              :      * always have data even if a full-page write of it is taken.
    1137              :      */
    1138           15 :     tupledata = XLogRecGetBlockData(r, 0, &tuplelen);
    1139              :     Assert(tupledata != NULL);
    1140              : 
    1141           15 :     data = tupledata;
    1142         1064 :     for (i = 0; i < xlrec->ntuples; i++)
    1143              :     {
    1144              :         ReorderBufferChange *change;
    1145              :         xl_multi_insert_tuple *xlhdr;
    1146              :         int         datalen;
    1147              :         HeapTuple   tuple;
    1148              :         HeapTupleHeader header;
    1149              : 
    1150         1049 :         change = ReorderBufferAllocChange(ctx->reorder);
    1151         1049 :         change->action = REORDER_BUFFER_CHANGE_INSERT;
    1152         1049 :         change->origin_id = XLogRecGetOrigin(r);
    1153              : 
    1154         1049 :         memcpy(&change->data.tp.rlocator, &rlocator, sizeof(RelFileLocator));
    1155              : 
    1156         1049 :         xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(data);
    1157         1049 :         data = ((char *) xlhdr) + SizeOfMultiInsertTuple;
    1158         1049 :         datalen = xlhdr->datalen;
    1159              : 
    1160         1049 :         change->data.tp.newtuple =
    1161         1049 :             ReorderBufferAllocTupleBuf(ctx->reorder, datalen);
    1162              : 
    1163         1049 :         tuple = change->data.tp.newtuple;
    1164         1049 :         header = tuple->t_data;
    1165              : 
    1166              :         /* not a disk based tuple */
    1167         1049 :         ItemPointerSetInvalid(&tuple->t_self);
    1168              : 
    1169              :         /*
    1170              :          * We can only figure this out after reassembling the transactions.
    1171              :          */
    1172         1049 :         tuple->t_tableOid = InvalidOid;
    1173              : 
    1174         1049 :         tuple->t_len = datalen + SizeofHeapTupleHeader;
    1175              : 
    1176         1049 :         memset(header, 0, SizeofHeapTupleHeader);
    1177              : 
    1178         1049 :         memcpy((char *) tuple->t_data + SizeofHeapTupleHeader, data, datalen);
    1179         1049 :         header->t_infomask = xlhdr->t_infomask;
    1180         1049 :         header->t_infomask2 = xlhdr->t_infomask2;
    1181         1049 :         header->t_hoff = xlhdr->t_hoff;
    1182              : 
    1183              :         /*
    1184              :          * Reset toast reassembly state only after the last row in the last
    1185              :          * xl_multi_insert_tuple record emitted by one heap_multi_insert()
    1186              :          * call.
    1187              :          */
    1188         1049 :         if (xlrec->flags & XLH_INSERT_LAST_IN_MULTI &&
    1189          189 :             (i + 1) == xlrec->ntuples)
    1190           10 :             change->data.tp.clear_toast_afterwards = true;
    1191              :         else
    1192         1039 :             change->data.tp.clear_toast_afterwards = false;
    1193              : 
    1194         1049 :         ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r),
    1195              :                                  buf->origptr, change, false);
    1196              : 
    1197              :         /* move to the next xl_multi_insert_tuple entry */
    1198         1049 :         data += datalen;
    1199              :     }
    1200              :     Assert(data == tupledata + tuplelen);
    1201              : }
    1202              : 
    1203              : /*
    1204              :  * Parse XLOG_HEAP_CONFIRM from wal into a confirmation change.
    1205              :  *
    1206              :  * This is pretty trivial, all the state essentially already setup by the
    1207              :  * speculative insertion.
    1208              :  */
    1209              : static void
    1210        17916 : DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
    1211              : {
    1212        17916 :     XLogReaderState *r = buf->record;
    1213              :     ReorderBufferChange *change;
    1214              :     RelFileLocator target_locator;
    1215              : 
    1216              :     /* only interested in our database */
    1217        17916 :     XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
    1218        17916 :     if (target_locator.dbOid != ctx->slot->data.database)
    1219            0 :         return;
    1220              : 
    1221              :     /* output plugin doesn't look for this origin, no need to queue */
    1222        17916 :     if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
    1223            0 :         return;
    1224              : 
    1225        17916 :     change = ReorderBufferAllocChange(ctx->reorder);
    1226        17916 :     change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM;
    1227        17916 :     change->origin_id = XLogRecGetOrigin(r);
    1228              : 
    1229        17916 :     memcpy(&change->data.tp.rlocator, &target_locator, sizeof(RelFileLocator));
    1230              : 
    1231        17916 :     change->data.tp.clear_toast_afterwards = true;
    1232              : 
    1233        17916 :     ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
    1234              :                              change, false);
    1235              : }
    1236              : 
    1237              : 
    1238              : /*
    1239              :  * Read a HeapTuple as WAL logged by heap_insert, heap_update and heap_delete
    1240              :  * (but not by heap_multi_insert) into a tuplebuf.
    1241              :  *
    1242              :  * The size 'len' and the pointer 'data' in the record need to be
    1243              :  * computed outside as they are record specific.
    1244              :  */
    1245              : static void
    1246      1520090 : DecodeXLogTuple(char *data, Size len, HeapTuple tuple)
    1247              : {
    1248              :     xl_heap_header xlhdr;
    1249      1520090 :     int         datalen = len - SizeOfHeapHeader;
    1250              :     HeapTupleHeader header;
    1251              : 
    1252              :     Assert(datalen >= 0);
    1253              : 
    1254      1520090 :     tuple->t_len = datalen + SizeofHeapTupleHeader;
    1255      1520090 :     header = tuple->t_data;
    1256              : 
    1257              :     /* not a disk based tuple */
    1258      1520090 :     ItemPointerSetInvalid(&tuple->t_self);
    1259              : 
    1260              :     /* we can only figure this out after reassembling the transactions */
    1261      1520090 :     tuple->t_tableOid = InvalidOid;
    1262              : 
    1263              :     /* data is not stored aligned, copy to aligned storage */
    1264      1520090 :     memcpy(&xlhdr, data, SizeOfHeapHeader);
    1265              : 
    1266      1520090 :     memset(header, 0, SizeofHeapTupleHeader);
    1267              : 
    1268      1520090 :     memcpy(((char *) tuple->t_data) + SizeofHeapTupleHeader,
    1269      1520090 :            data + SizeOfHeapHeader,
    1270              :            datalen);
    1271              : 
    1272      1520090 :     header->t_infomask = xlhdr.t_infomask;
    1273      1520090 :     header->t_infomask2 = xlhdr.t_infomask2;
    1274      1520090 :     header->t_hoff = xlhdr.t_hoff;
    1275      1520090 : }
    1276              : 
    1277              : /*
    1278              :  * Check whether we are interested in this specific transaction.
    1279              :  *
    1280              :  * There can be several reasons we might not be interested in this
    1281              :  * transaction:
    1282              :  * 1) We might not be interested in decoding transactions up to this
    1283              :  *    LSN. This can happen because we previously decoded it and now just
    1284              :  *    are restarting or if we haven't assembled a consistent snapshot yet.
    1285              :  * 2) The transaction happened in another database.
    1286              :  * 3) The output plugin is not interested in the origin.
    1287              :  * 4) We are doing fast-forwarding
    1288              :  */
    1289              : static bool
    1290         3913 : DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
    1291              :                   Oid txn_dbid, ReplOriginId origin_id)
    1292              : {
    1293         3913 :     if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
    1294         3810 :         (txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) ||
    1295         1926 :         FilterByOrigin(ctx, origin_id))
    1296         2024 :         return true;
    1297              : 
    1298              :     /*
    1299              :      * We also skip decoding in fast_forward mode. In passing set the
    1300              :      * processing_required flag to indicate that if it were not for
    1301              :      * fast_forward mode, processing would have been required.
    1302              :      */
    1303         1889 :     if (ctx->fast_forward)
    1304              :     {
    1305           37 :         ctx->processing_required = true;
    1306           37 :         return true;
    1307              :     }
    1308              : 
    1309         1852 :     return false;
    1310              : }
        

Generated by: LCOV version 2.0-1