LCOV - code coverage report
Current view: top level - src/backend/replication/logical - logical.c (source / functions) Hit Total Coverage
Test: PostgreSQL 19devel Lines: 699 751 93.1 %
Date: 2026-01-12 05:17:19 Functions: 40 41 97.6 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  * logical.c
       3             :  *     PostgreSQL logical decoding coordination
       4             :  *
       5             :  * Copyright (c) 2012-2026, PostgreSQL Global Development Group
       6             :  *
       7             :  * IDENTIFICATION
       8             :  *    src/backend/replication/logical/logical.c
       9             :  *
      10             :  * NOTES
      11             :  *    This file coordinates interaction between the various modules that
      12             :  *    together provide logical decoding, primarily by providing so
      13             :  *    called LogicalDecodingContexts. The goal is to encapsulate most of the
      14             :  *    internal complexity for consumers of logical decoding, so they can
      15             :  *    create and consume a changestream with a low amount of code. Builtin
      16             :  *    consumers are the walsender and SQL SRF interface, but it's possible to
      17             :  *    add further ones without changing core code, e.g. to consume changes in
      18             :  *    a bgworker.
      19             :  *
      20             :  *    The idea is that a consumer provides three callbacks, one to read WAL,
      21             :  *    one to prepare a data write, and a final one for actually writing since
      22             :  *    their implementation depends on the type of consumer.  Check
      23             :  *    logicalfuncs.c for an example implementation of a fairly simple consumer
      24             :  *    and an implementation of a WAL reading callback that's suitable for
      25             :  *    simple consumers.
      26             :  *-------------------------------------------------------------------------
      27             :  */
      28             : 
      29             : #include "postgres.h"
      30             : 
      31             : #include "access/xact.h"
      32             : #include "access/xlog_internal.h"
      33             : #include "access/xlogutils.h"
      34             : #include "fmgr.h"
      35             : #include "miscadmin.h"
      36             : #include "pgstat.h"
      37             : #include "replication/decode.h"
      38             : #include "replication/logical.h"
      39             : #include "replication/reorderbuffer.h"
      40             : #include "replication/slotsync.h"
      41             : #include "replication/snapbuild.h"
      42             : #include "storage/proc.h"
      43             : #include "storage/procarray.h"
      44             : #include "utils/builtins.h"
      45             : #include "utils/injection_point.h"
      46             : #include "utils/inval.h"
      47             : #include "utils/memutils.h"
      48             : 
      49             : /* data for errcontext callback */
      50             : typedef struct LogicalErrorCallbackState
      51             : {
      52             :     LogicalDecodingContext *ctx;
      53             :     const char *callback_name;
      54             :     XLogRecPtr  report_location;
      55             : } LogicalErrorCallbackState;
      56             : 
      57             : /* wrappers around output plugin callbacks */
      58             : static void output_plugin_error_callback(void *arg);
      59             : static void startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
      60             :                                bool is_init);
      61             : static void shutdown_cb_wrapper(LogicalDecodingContext *ctx);
      62             : static void begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn);
      63             : static void commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      64             :                               XLogRecPtr commit_lsn);
      65             : static void begin_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn);
      66             : static void prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      67             :                                XLogRecPtr prepare_lsn);
      68             : static void commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      69             :                                        XLogRecPtr commit_lsn);
      70             : static void rollback_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      71             :                                          XLogRecPtr prepare_end_lsn, TimestampTz prepare_time);
      72             : static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      73             :                               Relation relation, ReorderBufferChange *change);
      74             : static void truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      75             :                                 int nrelations, Relation relations[], ReorderBufferChange *change);
      76             : static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      77             :                                XLogRecPtr message_lsn, bool transactional,
      78             :                                const char *prefix, Size message_size, const char *message);
      79             : 
      80             : /* streaming callbacks */
      81             : static void stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      82             :                                     XLogRecPtr first_lsn);
      83             : static void stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      84             :                                    XLogRecPtr last_lsn);
      85             : static void stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      86             :                                     XLogRecPtr abort_lsn);
      87             : static void stream_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      88             :                                       XLogRecPtr prepare_lsn);
      89             : static void stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      90             :                                      XLogRecPtr commit_lsn);
      91             : static void stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      92             :                                      Relation relation, ReorderBufferChange *change);
      93             : static void stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      94             :                                       XLogRecPtr message_lsn, bool transactional,
      95             :                                       const char *prefix, Size message_size, const char *message);
      96             : static void stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      97             :                                        int nrelations, Relation relations[], ReorderBufferChange *change);
      98             : 
      99             : /* callback to update txn's progress */
     100             : static void update_progress_txn_cb_wrapper(ReorderBuffer *cache,
     101             :                                            ReorderBufferTXN *txn,
     102             :                                            XLogRecPtr lsn);
     103             : 
     104             : static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, const char *plugin);
     105             : 
     106             : /*
     107             :  * Make sure the current settings & environment are capable of doing logical
     108             :  * decoding.
     109             :  */
     110             : void
     111        3182 : CheckLogicalDecodingRequirements(void)
     112             : {
     113        3182 :     CheckSlotRequirements();
     114             : 
     115             :     /*
     116             :      * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
     117             :      * needs the same check.
     118             :      */
     119             : 
     120        3182 :     if (MyDatabaseId == InvalidOid)
     121           2 :         ereport(ERROR,
     122             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     123             :                  errmsg("logical decoding requires a database connection")));
     124             : 
     125             :     /* CheckSlotRequirements() has already checked if wal_level >= 'replica' */
     126             :     Assert(wal_level >= WAL_LEVEL_REPLICA);
     127             : 
     128             :     /* Check if logical decoding is available on standby */
     129        3180 :     if (RecoveryInProgress() && !IsLogicalDecodingEnabled())
     130           4 :         ereport(ERROR,
     131             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     132             :                  errmsg("logical decoding on standby requires \"effective_wal_level\" >= \"logical\" on the primary"),
     133             :                  errhint("Set \"wal_level\" >= \"logical\" or create at least one logical slot when \"wal_level\" = \"replica\".")));
     134        3176 : }
     135             : 
     136             : /*
     137             :  * Helper function for CreateInitDecodingContext() and
     138             :  * CreateDecodingContext() performing common tasks.
     139             :  */
     140             : static LogicalDecodingContext *
     141        2232 : StartupDecodingContext(List *output_plugin_options,
     142             :                        XLogRecPtr start_lsn,
     143             :                        TransactionId xmin_horizon,
     144             :                        bool need_full_snapshot,
     145             :                        bool fast_forward,
     146             :                        bool in_create,
     147             :                        XLogReaderRoutine *xl_routine,
     148             :                        LogicalOutputPluginWriterPrepareWrite prepare_write,
     149             :                        LogicalOutputPluginWriterWrite do_write,
     150             :                        LogicalOutputPluginWriterUpdateProgress update_progress)
     151             : {
     152             :     ReplicationSlot *slot;
     153             :     MemoryContext context,
     154             :                 old_context;
     155             :     LogicalDecodingContext *ctx;
     156             : 
     157             :     /* shorter lines... */
     158        2232 :     slot = MyReplicationSlot;
     159             : 
     160        2232 :     context = AllocSetContextCreate(CurrentMemoryContext,
     161             :                                     "Logical decoding context",
     162             :                                     ALLOCSET_DEFAULT_SIZES);
     163        2232 :     old_context = MemoryContextSwitchTo(context);
     164        2232 :     ctx = palloc0_object(LogicalDecodingContext);
     165             : 
     166        2232 :     ctx->context = context;
     167             : 
     168             :     /*
     169             :      * (re-)load output plugins, so we detect a bad (removed) output plugin
     170             :      * now.
     171             :      */
     172        2232 :     if (!fast_forward)
     173        2186 :         LoadOutputPlugin(&ctx->callbacks, NameStr(slot->data.plugin));
     174             : 
     175             :     /*
     176             :      * Now that the slot's xmin has been set, we can announce ourselves as a
     177             :      * logical decoding backend which doesn't need to be checked individually
     178             :      * when computing the xmin horizon because the xmin is enforced via
     179             :      * replication slots.
     180             :      *
     181             :      * We can only do so if we're outside of a transaction (i.e. the case when
     182             :      * streaming changes via walsender), otherwise an already setup
     183             :      * snapshot/xid would end up being ignored. That's not a particularly
     184             :      * bothersome restriction since the SQL interface can't be used for
     185             :      * streaming anyway.
     186             :      */
     187        2230 :     if (!IsTransactionOrTransactionBlock())
     188             :     {
     189        1110 :         LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
     190        1110 :         MyProc->statusFlags |= PROC_IN_LOGICAL_DECODING;
     191        1110 :         ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags;
     192        1110 :         LWLockRelease(ProcArrayLock);
     193             :     }
     194             : 
     195        2230 :     ctx->slot = slot;
     196             : 
     197        2230 :     ctx->reader = XLogReaderAllocate(wal_segment_size, NULL, xl_routine, ctx);
     198        2230 :     if (!ctx->reader)
     199           0 :         ereport(ERROR,
     200             :                 (errcode(ERRCODE_OUT_OF_MEMORY),
     201             :                  errmsg("out of memory"),
     202             :                  errdetail("Failed while allocating a WAL reading processor.")));
     203             : 
     204        2230 :     ctx->reorder = ReorderBufferAllocate();
     205        2230 :     ctx->snapshot_builder =
     206        2230 :         AllocateSnapshotBuilder(ctx->reorder, xmin_horizon, start_lsn,
     207             :                                 need_full_snapshot, in_create, slot->data.two_phase_at);
     208             : 
     209        2230 :     ctx->reorder->private_data = ctx;
     210             : 
     211             :     /* wrap output plugin callbacks, so we can add error context information */
     212        2230 :     ctx->reorder->begin = begin_cb_wrapper;
     213        2230 :     ctx->reorder->apply_change = change_cb_wrapper;
     214        2230 :     ctx->reorder->apply_truncate = truncate_cb_wrapper;
     215        2230 :     ctx->reorder->commit = commit_cb_wrapper;
     216        2230 :     ctx->reorder->message = message_cb_wrapper;
     217             : 
     218             :     /*
     219             :      * To support streaming, we require start/stop/abort/commit/change
     220             :      * callbacks. The message and truncate callbacks are optional, similar to
     221             :      * regular output plugins. We however enable streaming when at least one
     222             :      * of the methods is enabled so that we can easily identify missing
     223             :      * methods.
     224             :      *
     225             :      * We decide it here, but only check it later in the wrappers.
     226             :      */
     227        4506 :     ctx->streaming = (ctx->callbacks.stream_start_cb != NULL) ||
     228          46 :         (ctx->callbacks.stream_stop_cb != NULL) ||
     229          46 :         (ctx->callbacks.stream_abort_cb != NULL) ||
     230          46 :         (ctx->callbacks.stream_commit_cb != NULL) ||
     231          46 :         (ctx->callbacks.stream_change_cb != NULL) ||
     232        2322 :         (ctx->callbacks.stream_message_cb != NULL) ||
     233          46 :         (ctx->callbacks.stream_truncate_cb != NULL);
     234             : 
     235             :     /*
     236             :      * streaming callbacks
     237             :      *
     238             :      * stream_message and stream_truncate callbacks are optional, so we do not
     239             :      * fail with ERROR when missing, but the wrappers simply do nothing. We
     240             :      * must set the ReorderBuffer callbacks to something, otherwise the calls
     241             :      * from there will crash (we don't want to move the checks there).
     242             :      */
     243        2230 :     ctx->reorder->stream_start = stream_start_cb_wrapper;
     244        2230 :     ctx->reorder->stream_stop = stream_stop_cb_wrapper;
     245        2230 :     ctx->reorder->stream_abort = stream_abort_cb_wrapper;
     246        2230 :     ctx->reorder->stream_prepare = stream_prepare_cb_wrapper;
     247        2230 :     ctx->reorder->stream_commit = stream_commit_cb_wrapper;
     248        2230 :     ctx->reorder->stream_change = stream_change_cb_wrapper;
     249        2230 :     ctx->reorder->stream_message = stream_message_cb_wrapper;
     250        2230 :     ctx->reorder->stream_truncate = stream_truncate_cb_wrapper;
     251             : 
     252             : 
     253             :     /*
     254             :      * To support two-phase logical decoding, we require
     255             :      * begin_prepare/prepare/commit-prepare/abort-prepare callbacks. The
     256             :      * filter_prepare callback is optional. We however enable two-phase
     257             :      * logical decoding when at least one of the methods is enabled so that we
     258             :      * can easily identify missing methods.
     259             :      *
     260             :      * We decide it here, but only check it later in the wrappers.
     261             :      */
     262        4506 :     ctx->twophase = (ctx->callbacks.begin_prepare_cb != NULL) ||
     263          46 :         (ctx->callbacks.prepare_cb != NULL) ||
     264          46 :         (ctx->callbacks.commit_prepared_cb != NULL) ||
     265          46 :         (ctx->callbacks.rollback_prepared_cb != NULL) ||
     266        2322 :         (ctx->callbacks.stream_prepare_cb != NULL) ||
     267          46 :         (ctx->callbacks.filter_prepare_cb != NULL);
     268             : 
     269             :     /*
     270             :      * Callback to support decoding at prepare time.
     271             :      */
     272        2230 :     ctx->reorder->begin_prepare = begin_prepare_cb_wrapper;
     273        2230 :     ctx->reorder->prepare = prepare_cb_wrapper;
     274        2230 :     ctx->reorder->commit_prepared = commit_prepared_cb_wrapper;
     275        2230 :     ctx->reorder->rollback_prepared = rollback_prepared_cb_wrapper;
     276             : 
     277             :     /*
     278             :      * Callback to support updating progress during sending data of a
     279             :      * transaction (and its subtransactions) to the output plugin.
     280             :      */
     281        2230 :     ctx->reorder->update_progress_txn = update_progress_txn_cb_wrapper;
     282             : 
     283        2230 :     ctx->out = makeStringInfo();
     284        2230 :     ctx->prepare_write = prepare_write;
     285        2230 :     ctx->write = do_write;
     286        2230 :     ctx->update_progress = update_progress;
     287             : 
     288        2230 :     ctx->output_plugin_options = output_plugin_options;
     289             : 
     290        2230 :     ctx->fast_forward = fast_forward;
     291             : 
     292        2230 :     MemoryContextSwitchTo(old_context);
     293             : 
     294        2230 :     return ctx;
     295             : }
     296             : 
     297             : /*
     298             :  * Create a new decoding context, for a new logical slot.
     299             :  *
     300             :  * plugin -- contains the name of the output plugin
     301             :  * output_plugin_options -- contains options passed to the output plugin
     302             :  * need_full_snapshot -- if true, must obtain a snapshot able to read all
     303             :  *      tables; if false, one that can read only catalogs is acceptable.
     304             :  * restart_lsn -- if given as invalid, it's this routine's responsibility to
     305             :  *      mark WAL as reserved by setting a convenient restart_lsn for the slot.
     306             :  *      Otherwise, we set for decoding to start from the given LSN without
     307             :  *      marking WAL reserved beforehand.  In that scenario, it's up to the
     308             :  *      caller to guarantee that WAL remains available.
     309             :  * xl_routine -- XLogReaderRoutine for underlying XLogReader
     310             :  * prepare_write, do_write, update_progress --
     311             :  *      callbacks that perform the use-case dependent, actual, work.
     312             :  *
     313             :  * Needs to be called while in a memory context that's at least as long lived
     314             :  * as the decoding context because further memory contexts will be created
     315             :  * inside it.
     316             :  *
     317             :  * Returns an initialized decoding context after calling the output plugin's
     318             :  * startup function.
     319             :  */
     320             : LogicalDecodingContext *
     321         948 : CreateInitDecodingContext(const char *plugin,
     322             :                           List *output_plugin_options,
     323             :                           bool need_full_snapshot,
     324             :                           XLogRecPtr restart_lsn,
     325             :                           XLogReaderRoutine *xl_routine,
     326             :                           LogicalOutputPluginWriterPrepareWrite prepare_write,
     327             :                           LogicalOutputPluginWriterWrite do_write,
     328             :                           LogicalOutputPluginWriterUpdateProgress update_progress)
     329             : {
     330         948 :     TransactionId xmin_horizon = InvalidTransactionId;
     331             :     ReplicationSlot *slot;
     332             :     NameData    plugin_name;
     333             :     LogicalDecodingContext *ctx;
     334             :     MemoryContext old_context;
     335             : 
     336             :     /*
     337             :      * On a standby, this check is also required while creating the slot.
     338             :      * Check the comments in the function.
     339             :      */
     340         948 :     CheckLogicalDecodingRequirements();
     341             : 
     342             :     /* shorter lines... */
     343         948 :     slot = MyReplicationSlot;
     344             : 
     345             :     /* first some sanity checks that are unlikely to be violated */
     346         948 :     if (slot == NULL)
     347           0 :         elog(ERROR, "cannot perform logical decoding without an acquired slot");
     348             : 
     349         948 :     if (plugin == NULL)
     350           0 :         elog(ERROR, "cannot initialize logical decoding without a specified plugin");
     351             : 
     352             :     /* Make sure the passed slot is suitable. These are user facing errors. */
     353         948 :     if (SlotIsPhysical(slot))
     354           0 :         ereport(ERROR,
     355             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     356             :                  errmsg("cannot use physical replication slot for logical decoding")));
     357             : 
     358         948 :     if (slot->data.database != MyDatabaseId)
     359           0 :         ereport(ERROR,
     360             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     361             :                  errmsg("replication slot \"%s\" was not created in this database",
     362             :                         NameStr(slot->data.name))));
     363             : 
     364        1610 :     if (IsTransactionState() &&
     365         662 :         GetTopTransactionIdIfAny() != InvalidTransactionId)
     366           4 :         ereport(ERROR,
     367             :                 (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
     368             :                  errmsg("cannot create logical replication slot in transaction that has performed writes")));
     369             : 
     370             :     /*
     371             :      * Register output plugin name with slot.  We need the mutex to avoid
     372             :      * concurrent reading of a partially copied string.  But we don't want any
     373             :      * complicated code while holding a spinlock, so do namestrcpy() outside.
     374             :      */
     375         944 :     namestrcpy(&plugin_name, plugin);
     376         944 :     SpinLockAcquire(&slot->mutex);
     377         944 :     slot->data.plugin = plugin_name;
     378         944 :     SpinLockRelease(&slot->mutex);
     379             : 
     380         944 :     if (!XLogRecPtrIsValid(restart_lsn))
     381         930 :         ReplicationSlotReserveWal();
     382             :     else
     383             :     {
     384          14 :         SpinLockAcquire(&slot->mutex);
     385          14 :         slot->data.restart_lsn = restart_lsn;
     386          14 :         SpinLockRelease(&slot->mutex);
     387             :     }
     388             : 
     389             :     /* ----
     390             :      * This is a bit tricky: We need to determine a safe xmin horizon to start
     391             :      * decoding from, to avoid starting from a running xacts record referring
     392             :      * to xids whose rows have been vacuumed or pruned
     393             :      * already. GetOldestSafeDecodingTransactionId() returns such a value, but
     394             :      * without further interlock its return value might immediately be out of
     395             :      * date.
     396             :      *
     397             :      * So we have to acquire both the ReplicationSlotControlLock and the
     398             :      * ProcArrayLock to prevent concurrent computation and update of new xmin
     399             :      * horizons by other backends, get the safe decoding xid, and inform the
     400             :      * slot machinery about the new limit. Once that's done both locks can be
     401             :      * released as the slot machinery now is protecting against vacuum.
     402             :      *
     403             :      * Note that, temporarily, the data, not just the catalog, xmin has to be
     404             :      * reserved if a data snapshot is to be exported.  Otherwise the initial
     405             :      * data snapshot created here is not guaranteed to be valid. After that
     406             :      * the data xmin doesn't need to be managed anymore and the global xmin
     407             :      * should be recomputed. As we are fine with losing the pegged data xmin
     408             :      * after crash - no chance a snapshot would get exported anymore - we can
     409             :      * get away with just setting the slot's
     410             :      * effective_xmin. ReplicationSlotRelease will reset it again.
     411             :      *
     412             :      * ----
     413             :      */
     414         944 :     LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
     415         944 :     LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
     416             : 
     417         944 :     xmin_horizon = GetOldestSafeDecodingTransactionId(!need_full_snapshot);
     418             : 
     419         944 :     SpinLockAcquire(&slot->mutex);
     420         944 :     slot->effective_catalog_xmin = xmin_horizon;
     421         944 :     slot->data.catalog_xmin = xmin_horizon;
     422         944 :     if (need_full_snapshot)
     423         398 :         slot->effective_xmin = xmin_horizon;
     424         944 :     SpinLockRelease(&slot->mutex);
     425             : 
     426         944 :     ReplicationSlotsComputeRequiredXmin(true);
     427             : 
     428         944 :     LWLockRelease(ProcArrayLock);
     429         944 :     LWLockRelease(ReplicationSlotControlLock);
     430             : 
     431         944 :     ReplicationSlotMarkDirty();
     432         944 :     ReplicationSlotSave();
     433             : 
     434         944 :     ctx = StartupDecodingContext(NIL, restart_lsn, xmin_horizon,
     435             :                                  need_full_snapshot, false, true,
     436             :                                  xl_routine, prepare_write, do_write,
     437             :                                  update_progress);
     438             : 
     439             :     /* call output plugin initialization callback */
     440         942 :     old_context = MemoryContextSwitchTo(ctx->context);
     441         942 :     if (ctx->callbacks.startup_cb != NULL)
     442         942 :         startup_cb_wrapper(ctx, &ctx->options, true);
     443         942 :     MemoryContextSwitchTo(old_context);
     444             : 
     445             :     /*
     446             :      * We allow decoding of prepared transactions when the two_phase is
     447             :      * enabled at the time of slot creation, or when the two_phase option is
     448             :      * given at the streaming start, provided the plugin supports all the
     449             :      * callbacks for two-phase.
     450             :      */
     451         942 :     ctx->twophase &= slot->data.two_phase;
     452             : 
     453         942 :     ctx->reorder->output_rewrites = ctx->options.receive_rewrites;
     454             : 
     455         942 :     return ctx;
     456             : }
     457             : 
     458             : /*
     459             :  * Create a new decoding context, for a logical slot that has previously been
     460             :  * used already.
     461             :  *
     462             :  * start_lsn
     463             :  *      The LSN at which to start decoding.  If InvalidXLogRecPtr, restart
     464             :  *      from the slot's confirmed_flush; otherwise, start from the specified
     465             :  *      location (but move it forwards to confirmed_flush if it's older than
     466             :  *      that, see below).
     467             :  *
     468             :  * output_plugin_options
     469             :  *      options passed to the output plugin.
     470             :  *
     471             :  * fast_forward
     472             :  *      bypass the generation of logical changes.
     473             :  *
     474             :  * xl_routine
     475             :  *      XLogReaderRoutine used by underlying xlogreader
     476             :  *
     477             :  * prepare_write, do_write, update_progress
     478             :  *      callbacks that have to be filled to perform the use-case dependent,
     479             :  *      actual work.
     480             :  *
     481             :  * Needs to be called while in a memory context that's at least as long lived
     482             :  * as the decoding context because further memory contexts will be created
     483             :  * inside it.
     484             :  *
     485             :  * Returns an initialized decoding context after calling the output plugin's
     486             :  * startup function.
     487             :  */
     488             : LogicalDecodingContext *
     489        1298 : CreateDecodingContext(XLogRecPtr start_lsn,
     490             :                       List *output_plugin_options,
     491             :                       bool fast_forward,
     492             :                       XLogReaderRoutine *xl_routine,
     493             :                       LogicalOutputPluginWriterPrepareWrite prepare_write,
     494             :                       LogicalOutputPluginWriterWrite do_write,
     495             :                       LogicalOutputPluginWriterUpdateProgress update_progress)
     496             : {
     497             :     LogicalDecodingContext *ctx;
     498             :     ReplicationSlot *slot;
     499             :     MemoryContext old_context;
     500             : 
     501             :     /* shorter lines... */
     502        1298 :     slot = MyReplicationSlot;
     503             : 
     504             :     /* first some sanity checks that are unlikely to be violated */
     505        1298 :     if (slot == NULL)
     506           0 :         elog(ERROR, "cannot perform logical decoding without an acquired slot");
     507             : 
     508             :     /* make sure the passed slot is suitable, these are user facing errors */
     509        1298 :     if (SlotIsPhysical(slot))
     510           2 :         ereport(ERROR,
     511             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     512             :                  errmsg("cannot use physical replication slot for logical decoding")));
     513             : 
     514             :     /*
     515             :      * We need to access the system tables during decoding to build the
     516             :      * logical changes unless we are in fast_forward mode where no changes are
     517             :      * generated.
     518             :      */
     519        1296 :     if (slot->data.database != MyDatabaseId && !fast_forward)
     520           6 :         ereport(ERROR,
     521             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     522             :                  errmsg("replication slot \"%s\" was not created in this database",
     523             :                         NameStr(slot->data.name))));
     524             : 
     525             :     /*
     526             :      * The slots being synced from the primary can't be used for decoding as
     527             :      * they are used after failover. However, we do allow advancing the LSNs
     528             :      * during the synchronization of slots. See update_local_synced_slot.
     529             :      */
     530        1290 :     if (RecoveryInProgress() && slot->data.synced && !IsSyncingReplicationSlots())
     531           2 :         ereport(ERROR,
     532             :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     533             :                 errmsg("cannot use replication slot \"%s\" for logical decoding",
     534             :                        NameStr(slot->data.name)),
     535             :                 errdetail("This replication slot is being synchronized from the primary server."),
     536             :                 errhint("Specify another replication slot."));
     537             : 
     538             :     /* slot must be valid to allow decoding */
     539             :     Assert(slot->data.invalidated == RS_INVAL_NONE);
     540             :     Assert(XLogRecPtrIsValid(slot->data.restart_lsn));
     541             : 
     542        1288 :     if (!XLogRecPtrIsValid(start_lsn))
     543             :     {
     544             :         /* continue from last position */
     545         750 :         start_lsn = slot->data.confirmed_flush;
     546             :     }
     547         538 :     else if (start_lsn < slot->data.confirmed_flush)
     548             :     {
     549             :         /*
     550             :          * It might seem like we should error out in this case, but it's
     551             :          * pretty common for a client to acknowledge a LSN it doesn't have to
     552             :          * do anything for, and thus didn't store persistently, because the
     553             :          * xlog records didn't result in anything relevant for logical
     554             :          * decoding. Clients have to be able to do that to support synchronous
     555             :          * replication.
     556             :          *
     557             :          * Starting at a different LSN than requested might not catch certain
     558             :          * kinds of client errors; so the client may wish to check that
     559             :          * confirmed_flush_lsn matches its expectations.
     560             :          */
     561          80 :         elog(LOG, "%X/%08X has been already streamed, forwarding to %X/%08X",
     562             :              LSN_FORMAT_ARGS(start_lsn),
     563             :              LSN_FORMAT_ARGS(slot->data.confirmed_flush));
     564             : 
     565          80 :         start_lsn = slot->data.confirmed_flush;
     566             :     }
     567             : 
     568        1288 :     ctx = StartupDecodingContext(output_plugin_options,
     569             :                                  start_lsn, InvalidTransactionId, false,
     570             :                                  fast_forward, false, xl_routine, prepare_write,
     571             :                                  do_write, update_progress);
     572             : 
     573             :     /* call output plugin initialization callback */
     574        1288 :     old_context = MemoryContextSwitchTo(ctx->context);
     575        1288 :     if (ctx->callbacks.startup_cb != NULL)
     576        1242 :         startup_cb_wrapper(ctx, &ctx->options, false);
     577        1282 :     MemoryContextSwitchTo(old_context);
     578             : 
     579             :     /*
     580             :      * We allow decoding of prepared transactions when the two_phase is
     581             :      * enabled at the time of slot creation, or when the two_phase option is
     582             :      * given at the streaming start, provided the plugin supports all the
     583             :      * callbacks for two-phase.
     584             :      */
     585        1282 :     ctx->twophase &= (slot->data.two_phase || ctx->twophase_opt_given);
     586             : 
     587             :     /* Mark slot to allow two_phase decoding if not already marked */
     588        1282 :     if (ctx->twophase && !slot->data.two_phase)
     589             :     {
     590          16 :         SpinLockAcquire(&slot->mutex);
     591          16 :         slot->data.two_phase = true;
     592          16 :         slot->data.two_phase_at = start_lsn;
     593          16 :         SpinLockRelease(&slot->mutex);
     594          16 :         ReplicationSlotMarkDirty();
     595          16 :         ReplicationSlotSave();
     596          16 :         SnapBuildSetTwoPhaseAt(ctx->snapshot_builder, start_lsn);
     597             :     }
     598             : 
     599        1282 :     ctx->reorder->output_rewrites = ctx->options.receive_rewrites;
     600             : 
     601        1282 :     ereport(LOG,
     602             :             (errmsg("starting logical decoding for slot \"%s\"",
     603             :                     NameStr(slot->data.name)),
     604             :              errdetail("Streaming transactions committing after %X/%08X, reading WAL from %X/%08X.",
     605             :                        LSN_FORMAT_ARGS(slot->data.confirmed_flush),
     606             :                        LSN_FORMAT_ARGS(slot->data.restart_lsn))));
     607             : 
     608        1282 :     return ctx;
     609             : }
     610             : 
     611             : /*
     612             :  * Returns true if a consistent initial decoding snapshot has been built.
     613             :  */
     614             : bool
     615        1030 : DecodingContextReady(LogicalDecodingContext *ctx)
     616             : {
     617        1030 :     return SnapBuildCurrentState(ctx->snapshot_builder) == SNAPBUILD_CONSISTENT;
     618             : }
     619             : 
     620             : /*
     621             :  * Read from the decoding slot, until it is ready to start extracting changes.
     622             :  */
     623             : void
     624         928 : DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
     625             : {
     626         928 :     ReplicationSlot *slot = ctx->slot;
     627             : 
     628             :     /* Initialize from where to start reading WAL. */
     629         928 :     XLogBeginRead(ctx->reader, slot->data.restart_lsn);
     630             : 
     631         928 :     elog(DEBUG1, "searching for logical decoding starting point, starting at %X/%08X",
     632             :          LSN_FORMAT_ARGS(slot->data.restart_lsn));
     633             : 
     634             :     /* Wait for a consistent starting point */
     635             :     for (;;)
     636          94 :     {
     637             :         XLogRecord *record;
     638        1022 :         char       *err = NULL;
     639             : 
     640             :         /* the read_page callback waits for new WAL */
     641        1022 :         record = XLogReadRecord(ctx->reader, &err);
     642        1022 :         if (err)
     643           0 :             elog(ERROR, "could not find logical decoding starting point: %s", err);
     644        1022 :         if (!record)
     645           0 :             elog(ERROR, "could not find logical decoding starting point");
     646             : 
     647        1022 :         LogicalDecodingProcessRecord(ctx, ctx->reader);
     648             : 
     649             :         /* only continue till we found a consistent spot */
     650        1018 :         if (DecodingContextReady(ctx))
     651         924 :             break;
     652             : 
     653          94 :         CHECK_FOR_INTERRUPTS();
     654             :     }
     655             : 
     656         924 :     SpinLockAcquire(&slot->mutex);
     657         924 :     slot->data.confirmed_flush = ctx->reader->EndRecPtr;
     658         924 :     if (slot->data.two_phase)
     659          18 :         slot->data.two_phase_at = ctx->reader->EndRecPtr;
     660         924 :     SpinLockRelease(&slot->mutex);
     661         924 : }
     662             : 
     663             : /*
     664             :  * Free a previously allocated decoding context, invoking the shutdown
     665             :  * callback if necessary.
     666             :  */
     667             : void
     668        1780 : FreeDecodingContext(LogicalDecodingContext *ctx)
     669             : {
     670        1780 :     if (ctx->callbacks.shutdown_cb != NULL)
     671        1734 :         shutdown_cb_wrapper(ctx);
     672             : 
     673        1780 :     ReorderBufferFree(ctx->reorder);
     674        1780 :     FreeSnapshotBuilder(ctx->snapshot_builder);
     675        1780 :     XLogReaderFree(ctx->reader);
     676        1780 :     MemoryContextDelete(ctx->context);
     677        1780 : }
     678             : 
     679             : /*
     680             :  * Prepare a write using the context's output routine.
     681             :  */
     682             : void
     683      672414 : OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
     684             : {
     685      672414 :     if (!ctx->accept_writes)
     686           0 :         elog(ERROR, "writes are only accepted in commit, begin and change callbacks");
     687             : 
     688      672414 :     ctx->prepare_write(ctx, ctx->write_location, ctx->write_xid, last_write);
     689      672414 :     ctx->prepared_write = true;
     690      672414 : }
     691             : 
     692             : /*
     693             :  * Perform a write using the context's output routine.
     694             :  */
     695             : void
     696      672414 : OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
     697             : {
     698      672414 :     if (!ctx->prepared_write)
     699           0 :         elog(ERROR, "OutputPluginPrepareWrite needs to be called before OutputPluginWrite");
     700             : 
     701      672414 :     ctx->write(ctx, ctx->write_location, ctx->write_xid, last_write);
     702      672378 :     ctx->prepared_write = false;
     703      672378 : }
     704             : 
     705             : /*
     706             :  * Update progress tracking (if supported).
     707             :  */
     708             : void
     709        8314 : OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx,
     710             :                            bool skipped_xact)
     711             : {
     712        8314 :     if (!ctx->update_progress)
     713        3180 :         return;
     714             : 
     715        5134 :     ctx->update_progress(ctx, ctx->write_location, ctx->write_xid,
     716             :                          skipped_xact);
     717             : }
     718             : 
     719             : /*
     720             :  * Load the output plugin, lookup its output plugin init function, and check
     721             :  * that it provides the required callbacks.
     722             :  */
     723             : static void
     724        2186 : LoadOutputPlugin(OutputPluginCallbacks *callbacks, const char *plugin)
     725             : {
     726             :     LogicalOutputPluginInit plugin_init;
     727             : 
     728        2184 :     plugin_init = (LogicalOutputPluginInit)
     729        2186 :         load_external_function(plugin, "_PG_output_plugin_init", false, NULL);
     730             : 
     731        2184 :     if (plugin_init == NULL)
     732           0 :         elog(ERROR, "output plugins have to declare the _PG_output_plugin_init symbol");
     733             : 
     734             :     /* ask the output plugin to fill the callback struct */
     735        2184 :     plugin_init(callbacks);
     736             : 
     737        2184 :     if (callbacks->begin_cb == NULL)
     738           0 :         elog(ERROR, "output plugins have to register a begin callback");
     739        2184 :     if (callbacks->change_cb == NULL)
     740           0 :         elog(ERROR, "output plugins have to register a change callback");
     741        2184 :     if (callbacks->commit_cb == NULL)
     742           0 :         elog(ERROR, "output plugins have to register a commit callback");
     743        2184 : }
     744             : 
     745             : static void
     746         278 : output_plugin_error_callback(void *arg)
     747             : {
     748         278 :     LogicalErrorCallbackState *state = (LogicalErrorCallbackState *) arg;
     749             : 
     750             :     /* not all callbacks have an associated LSN  */
     751         278 :     if (XLogRecPtrIsValid(state->report_location))
     752         272 :         errcontext("slot \"%s\", output plugin \"%s\", in the %s callback, associated LSN %X/%08X",
     753         272 :                    NameStr(state->ctx->slot->data.name),
     754         272 :                    NameStr(state->ctx->slot->data.plugin),
     755             :                    state->callback_name,
     756         272 :                    LSN_FORMAT_ARGS(state->report_location));
     757             :     else
     758           6 :         errcontext("slot \"%s\", output plugin \"%s\", in the %s callback",
     759           6 :                    NameStr(state->ctx->slot->data.name),
     760           6 :                    NameStr(state->ctx->slot->data.plugin),
     761             :                    state->callback_name);
     762         278 : }
     763             : 
     764             : static void
     765        2184 : startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
     766             : {
     767             :     LogicalErrorCallbackState state;
     768             :     ErrorContextCallback errcallback;
     769             : 
     770             :     Assert(!ctx->fast_forward);
     771             : 
     772             :     /* Push callback + info on the error context stack */
     773        2184 :     state.ctx = ctx;
     774        2184 :     state.callback_name = "startup";
     775        2184 :     state.report_location = InvalidXLogRecPtr;
     776        2184 :     errcallback.callback = output_plugin_error_callback;
     777        2184 :     errcallback.arg = &state;
     778        2184 :     errcallback.previous = error_context_stack;
     779        2184 :     error_context_stack = &errcallback;
     780             : 
     781             :     /* set output state */
     782        2184 :     ctx->accept_writes = false;
     783        2184 :     ctx->end_xact = false;
     784             : 
     785             :     /* do the actual work: call callback */
     786        2184 :     ctx->callbacks.startup_cb(ctx, opt, is_init);
     787             : 
     788             :     /* Pop the error context stack */
     789        2178 :     error_context_stack = errcallback.previous;
     790        2178 : }
     791             : 
     792             : static void
     793        1734 : shutdown_cb_wrapper(LogicalDecodingContext *ctx)
     794             : {
     795             :     LogicalErrorCallbackState state;
     796             :     ErrorContextCallback errcallback;
     797             : 
     798             :     Assert(!ctx->fast_forward);
     799             : 
     800             :     /* Push callback + info on the error context stack */
     801        1734 :     state.ctx = ctx;
     802        1734 :     state.callback_name = "shutdown";
     803        1734 :     state.report_location = InvalidXLogRecPtr;
     804        1734 :     errcallback.callback = output_plugin_error_callback;
     805        1734 :     errcallback.arg = &state;
     806        1734 :     errcallback.previous = error_context_stack;
     807        1734 :     error_context_stack = &errcallback;
     808             : 
     809             :     /* set output state */
     810        1734 :     ctx->accept_writes = false;
     811        1734 :     ctx->end_xact = false;
     812             : 
     813             :     /* do the actual work: call callback */
     814        1734 :     ctx->callbacks.shutdown_cb(ctx);
     815             : 
     816             :     /* Pop the error context stack */
     817        1734 :     error_context_stack = errcallback.previous;
     818        1734 : }
     819             : 
     820             : 
     821             : /*
     822             :  * Callbacks for ReorderBuffer which add in some more information and then call
     823             :  * output_plugin.h plugins.
     824             :  */
     825             : static void
     826        2710 : begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
     827             : {
     828        2710 :     LogicalDecodingContext *ctx = cache->private_data;
     829             :     LogicalErrorCallbackState state;
     830             :     ErrorContextCallback errcallback;
     831             : 
     832             :     Assert(!ctx->fast_forward);
     833             : 
     834             :     /* Push callback + info on the error context stack */
     835        2710 :     state.ctx = ctx;
     836        2710 :     state.callback_name = "begin";
     837        2710 :     state.report_location = txn->first_lsn;
     838        2710 :     errcallback.callback = output_plugin_error_callback;
     839        2710 :     errcallback.arg = &state;
     840        2710 :     errcallback.previous = error_context_stack;
     841        2710 :     error_context_stack = &errcallback;
     842             : 
     843             :     /* set output state */
     844        2710 :     ctx->accept_writes = true;
     845        2710 :     ctx->write_xid = txn->xid;
     846        2710 :     ctx->write_location = txn->first_lsn;
     847        2710 :     ctx->end_xact = false;
     848             : 
     849             :     /* do the actual work: call callback */
     850        2710 :     ctx->callbacks.begin_cb(ctx, txn);
     851             : 
     852             :     /* Pop the error context stack */
     853        2710 :     error_context_stack = errcallback.previous;
     854        2710 : }
     855             : 
     856             : static void
     857        2704 : commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
     858             :                   XLogRecPtr commit_lsn)
     859             : {
     860        2704 :     LogicalDecodingContext *ctx = cache->private_data;
     861             :     LogicalErrorCallbackState state;
     862             :     ErrorContextCallback errcallback;
     863             : 
     864             :     Assert(!ctx->fast_forward);
     865             : 
     866             :     /* Push callback + info on the error context stack */
     867        2704 :     state.ctx = ctx;
     868        2704 :     state.callback_name = "commit";
     869        2704 :     state.report_location = txn->final_lsn; /* beginning of commit record */
     870        2704 :     errcallback.callback = output_plugin_error_callback;
     871        2704 :     errcallback.arg = &state;
     872        2704 :     errcallback.previous = error_context_stack;
     873        2704 :     error_context_stack = &errcallback;
     874             : 
     875             :     /* set output state */
     876        2704 :     ctx->accept_writes = true;
     877        2704 :     ctx->write_xid = txn->xid;
     878        2704 :     ctx->write_location = txn->end_lsn; /* points to the end of the record */
     879        2704 :     ctx->end_xact = true;
     880             : 
     881             :     /* do the actual work: call callback */
     882        2704 :     ctx->callbacks.commit_cb(ctx, txn, commit_lsn);
     883             : 
     884             :     /* Pop the error context stack */
     885        2672 :     error_context_stack = errcallback.previous;
     886        2672 : }
     887             : 
     888             : /*
     889             :  * The functionality of begin_prepare is quite similar to begin with the
     890             :  * exception that this will have gid (global transaction id) information which
     891             :  * can be used by plugin. Now, we thought about extending the existing begin
     892             :  * but that would break the replication protocol and additionally this looks
     893             :  * cleaner.
     894             :  */
     895             : static void
     896          60 : begin_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
     897             : {
     898          60 :     LogicalDecodingContext *ctx = cache->private_data;
     899             :     LogicalErrorCallbackState state;
     900             :     ErrorContextCallback errcallback;
     901             : 
     902             :     Assert(!ctx->fast_forward);
     903             : 
     904             :     /* We're only supposed to call this when two-phase commits are supported */
     905             :     Assert(ctx->twophase);
     906             : 
     907             :     /* Push callback + info on the error context stack */
     908          60 :     state.ctx = ctx;
     909          60 :     state.callback_name = "begin_prepare";
     910          60 :     state.report_location = txn->first_lsn;
     911          60 :     errcallback.callback = output_plugin_error_callback;
     912          60 :     errcallback.arg = &state;
     913          60 :     errcallback.previous = error_context_stack;
     914          60 :     error_context_stack = &errcallback;
     915             : 
     916             :     /* set output state */
     917          60 :     ctx->accept_writes = true;
     918          60 :     ctx->write_xid = txn->xid;
     919          60 :     ctx->write_location = txn->first_lsn;
     920          60 :     ctx->end_xact = false;
     921             : 
     922             :     /*
     923             :      * If the plugin supports two-phase commits then begin prepare callback is
     924             :      * mandatory
     925             :      */
     926          60 :     if (ctx->callbacks.begin_prepare_cb == NULL)
     927           0 :         ereport(ERROR,
     928             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     929             :                  errmsg("logical replication at prepare time requires a %s callback",
     930             :                         "begin_prepare_cb")));
     931             : 
     932             :     /* do the actual work: call callback */
     933          60 :     ctx->callbacks.begin_prepare_cb(ctx, txn);
     934             : 
     935             :     /* Pop the error context stack */
     936          60 :     error_context_stack = errcallback.previous;
     937          60 : }
     938             : 
     939             : static void
     940          60 : prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
     941             :                    XLogRecPtr prepare_lsn)
     942             : {
     943          60 :     LogicalDecodingContext *ctx = cache->private_data;
     944             :     LogicalErrorCallbackState state;
     945             :     ErrorContextCallback errcallback;
     946             : 
     947             :     Assert(!ctx->fast_forward);
     948             : 
     949             :     /* We're only supposed to call this when two-phase commits are supported */
     950             :     Assert(ctx->twophase);
     951             : 
     952             :     /* Push callback + info on the error context stack */
     953          60 :     state.ctx = ctx;
     954          60 :     state.callback_name = "prepare";
     955          60 :     state.report_location = txn->final_lsn; /* beginning of prepare record */
     956          60 :     errcallback.callback = output_plugin_error_callback;
     957          60 :     errcallback.arg = &state;
     958          60 :     errcallback.previous = error_context_stack;
     959          60 :     error_context_stack = &errcallback;
     960             : 
     961             :     /* set output state */
     962          60 :     ctx->accept_writes = true;
     963          60 :     ctx->write_xid = txn->xid;
     964          60 :     ctx->write_location = txn->end_lsn; /* points to the end of the record */
     965          60 :     ctx->end_xact = true;
     966             : 
     967             :     /*
     968             :      * If the plugin supports two-phase commits then prepare callback is
     969             :      * mandatory
     970             :      */
     971          60 :     if (ctx->callbacks.prepare_cb == NULL)
     972           0 :         ereport(ERROR,
     973             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     974             :                  errmsg("logical replication at prepare time requires a %s callback",
     975             :                         "prepare_cb")));
     976             : 
     977             :     /* do the actual work: call callback */
     978          60 :     ctx->callbacks.prepare_cb(ctx, txn, prepare_lsn);
     979             : 
     980             :     /* Pop the error context stack */
     981          60 :     error_context_stack = errcallback.previous;
     982          60 : }
     983             : 
     984             : static void
     985          66 : commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
     986             :                            XLogRecPtr commit_lsn)
     987             : {
     988          66 :     LogicalDecodingContext *ctx = cache->private_data;
     989             :     LogicalErrorCallbackState state;
     990             :     ErrorContextCallback errcallback;
     991             : 
     992             :     Assert(!ctx->fast_forward);
     993             : 
     994             :     /* We're only supposed to call this when two-phase commits are supported */
     995             :     Assert(ctx->twophase);
     996             : 
     997             :     /* Push callback + info on the error context stack */
     998          66 :     state.ctx = ctx;
     999          66 :     state.callback_name = "commit_prepared";
    1000          66 :     state.report_location = txn->final_lsn; /* beginning of commit record */
    1001          66 :     errcallback.callback = output_plugin_error_callback;
    1002          66 :     errcallback.arg = &state;
    1003          66 :     errcallback.previous = error_context_stack;
    1004          66 :     error_context_stack = &errcallback;
    1005             : 
    1006             :     /* set output state */
    1007          66 :     ctx->accept_writes = true;
    1008          66 :     ctx->write_xid = txn->xid;
    1009          66 :     ctx->write_location = txn->end_lsn; /* points to the end of the record */
    1010          66 :     ctx->end_xact = true;
    1011             : 
    1012             :     /*
    1013             :      * If the plugin support two-phase commits then commit prepared callback
    1014             :      * is mandatory
    1015             :      */
    1016          66 :     if (ctx->callbacks.commit_prepared_cb == NULL)
    1017           0 :         ereport(ERROR,
    1018             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1019             :                  errmsg("logical replication at prepare time requires a %s callback",
    1020             :                         "commit_prepared_cb")));
    1021             : 
    1022             :     /* do the actual work: call callback */
    1023          66 :     ctx->callbacks.commit_prepared_cb(ctx, txn, commit_lsn);
    1024             : 
    1025             :     /* Pop the error context stack */
    1026          66 :     error_context_stack = errcallback.previous;
    1027          66 : }
    1028             : 
    1029             : static void
    1030          20 : rollback_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1031             :                              XLogRecPtr prepare_end_lsn,
    1032             :                              TimestampTz prepare_time)
    1033             : {
    1034          20 :     LogicalDecodingContext *ctx = cache->private_data;
    1035             :     LogicalErrorCallbackState state;
    1036             :     ErrorContextCallback errcallback;
    1037             : 
    1038             :     Assert(!ctx->fast_forward);
    1039             : 
    1040             :     /* We're only supposed to call this when two-phase commits are supported */
    1041             :     Assert(ctx->twophase);
    1042             : 
    1043             :     /* Push callback + info on the error context stack */
    1044          20 :     state.ctx = ctx;
    1045          20 :     state.callback_name = "rollback_prepared";
    1046          20 :     state.report_location = txn->final_lsn; /* beginning of commit record */
    1047          20 :     errcallback.callback = output_plugin_error_callback;
    1048          20 :     errcallback.arg = &state;
    1049          20 :     errcallback.previous = error_context_stack;
    1050          20 :     error_context_stack = &errcallback;
    1051             : 
    1052             :     /* set output state */
    1053          20 :     ctx->accept_writes = true;
    1054          20 :     ctx->write_xid = txn->xid;
    1055          20 :     ctx->write_location = txn->end_lsn; /* points to the end of the record */
    1056          20 :     ctx->end_xact = true;
    1057             : 
    1058             :     /*
    1059             :      * If the plugin support two-phase commits then rollback prepared callback
    1060             :      * is mandatory
    1061             :      */
    1062          20 :     if (ctx->callbacks.rollback_prepared_cb == NULL)
    1063           0 :         ereport(ERROR,
    1064             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1065             :                  errmsg("logical replication at prepare time requires a %s callback",
    1066             :                         "rollback_prepared_cb")));
    1067             : 
    1068             :     /* do the actual work: call callback */
    1069          20 :     ctx->callbacks.rollback_prepared_cb(ctx, txn, prepare_end_lsn,
    1070             :                                         prepare_time);
    1071             : 
    1072             :     /* Pop the error context stack */
    1073          20 :     error_context_stack = errcallback.previous;
    1074          20 : }
    1075             : 
    1076             : static void
    1077      322142 : change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1078             :                   Relation relation, ReorderBufferChange *change)
    1079             : {
    1080      322142 :     LogicalDecodingContext *ctx = cache->private_data;
    1081             :     LogicalErrorCallbackState state;
    1082             :     ErrorContextCallback errcallback;
    1083             : 
    1084             :     Assert(!ctx->fast_forward);
    1085             : 
    1086             :     /* Push callback + info on the error context stack */
    1087      322142 :     state.ctx = ctx;
    1088      322142 :     state.callback_name = "change";
    1089      322142 :     state.report_location = change->lsn;
    1090      322142 :     errcallback.callback = output_plugin_error_callback;
    1091      322142 :     errcallback.arg = &state;
    1092      322142 :     errcallback.previous = error_context_stack;
    1093      322142 :     error_context_stack = &errcallback;
    1094             : 
    1095             :     /* set output state */
    1096      322142 :     ctx->accept_writes = true;
    1097      322142 :     ctx->write_xid = txn->xid;
    1098             : 
    1099             :     /*
    1100             :      * Report this change's lsn so replies from clients can give an up-to-date
    1101             :      * answer. This won't ever be enough (and shouldn't be!) to confirm
    1102             :      * receipt of this transaction, but it might allow another transaction's
    1103             :      * commit to be confirmed with one message.
    1104             :      */
    1105      322142 :     ctx->write_location = change->lsn;
    1106             : 
    1107      322142 :     ctx->end_xact = false;
    1108             : 
    1109      322142 :     ctx->callbacks.change_cb(ctx, txn, relation, change);
    1110             : 
    1111             :     /* Pop the error context stack */
    1112      322136 :     error_context_stack = errcallback.previous;
    1113      322136 : }
    1114             : 
    1115             : static void
    1116          52 : truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1117             :                     int nrelations, Relation relations[], ReorderBufferChange *change)
    1118             : {
    1119          52 :     LogicalDecodingContext *ctx = cache->private_data;
    1120             :     LogicalErrorCallbackState state;
    1121             :     ErrorContextCallback errcallback;
    1122             : 
    1123             :     Assert(!ctx->fast_forward);
    1124             : 
    1125          52 :     if (!ctx->callbacks.truncate_cb)
    1126           0 :         return;
    1127             : 
    1128             :     /* Push callback + info on the error context stack */
    1129          52 :     state.ctx = ctx;
    1130          52 :     state.callback_name = "truncate";
    1131          52 :     state.report_location = change->lsn;
    1132          52 :     errcallback.callback = output_plugin_error_callback;
    1133          52 :     errcallback.arg = &state;
    1134          52 :     errcallback.previous = error_context_stack;
    1135          52 :     error_context_stack = &errcallback;
    1136             : 
    1137             :     /* set output state */
    1138          52 :     ctx->accept_writes = true;
    1139          52 :     ctx->write_xid = txn->xid;
    1140             : 
    1141             :     /*
    1142             :      * Report this change's lsn so replies from clients can give an up-to-date
    1143             :      * answer. This won't ever be enough (and shouldn't be!) to confirm
    1144             :      * receipt of this transaction, but it might allow another transaction's
    1145             :      * commit to be confirmed with one message.
    1146             :      */
    1147          52 :     ctx->write_location = change->lsn;
    1148             : 
    1149          52 :     ctx->end_xact = false;
    1150             : 
    1151          52 :     ctx->callbacks.truncate_cb(ctx, txn, nrelations, relations, change);
    1152             : 
    1153             :     /* Pop the error context stack */
    1154          52 :     error_context_stack = errcallback.previous;
    1155             : }
    1156             : 
    1157             : bool
    1158         296 : filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, TransactionId xid,
    1159             :                           const char *gid)
    1160             : {
    1161             :     LogicalErrorCallbackState state;
    1162             :     ErrorContextCallback errcallback;
    1163             :     bool        ret;
    1164             : 
    1165             :     Assert(!ctx->fast_forward);
    1166             : 
    1167             :     /* Push callback + info on the error context stack */
    1168         296 :     state.ctx = ctx;
    1169         296 :     state.callback_name = "filter_prepare";
    1170         296 :     state.report_location = InvalidXLogRecPtr;
    1171         296 :     errcallback.callback = output_plugin_error_callback;
    1172         296 :     errcallback.arg = &state;
    1173         296 :     errcallback.previous = error_context_stack;
    1174         296 :     error_context_stack = &errcallback;
    1175             : 
    1176             :     /* set output state */
    1177         296 :     ctx->accept_writes = false;
    1178         296 :     ctx->end_xact = false;
    1179             : 
    1180             :     /* do the actual work: call callback */
    1181         296 :     ret = ctx->callbacks.filter_prepare_cb(ctx, xid, gid);
    1182             : 
    1183             :     /* Pop the error context stack */
    1184         296 :     error_context_stack = errcallback.previous;
    1185             : 
    1186         296 :     return ret;
    1187             : }
    1188             : 
    1189             : bool
    1190     3207182 : filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
    1191             : {
    1192             :     LogicalErrorCallbackState state;
    1193             :     ErrorContextCallback errcallback;
    1194             :     bool        ret;
    1195             : 
    1196             :     Assert(!ctx->fast_forward);
    1197             : 
    1198             :     /* Push callback + info on the error context stack */
    1199     3207182 :     state.ctx = ctx;
    1200     3207182 :     state.callback_name = "filter_by_origin";
    1201     3207182 :     state.report_location = InvalidXLogRecPtr;
    1202     3207182 :     errcallback.callback = output_plugin_error_callback;
    1203     3207182 :     errcallback.arg = &state;
    1204     3207182 :     errcallback.previous = error_context_stack;
    1205     3207182 :     error_context_stack = &errcallback;
    1206             : 
    1207             :     /* set output state */
    1208     3207182 :     ctx->accept_writes = false;
    1209     3207182 :     ctx->end_xact = false;
    1210             : 
    1211             :     /* do the actual work: call callback */
    1212     3207182 :     ret = ctx->callbacks.filter_by_origin_cb(ctx, origin_id);
    1213             : 
    1214             :     /* Pop the error context stack */
    1215     3207182 :     error_context_stack = errcallback.previous;
    1216             : 
    1217     3207182 :     return ret;
    1218             : }
    1219             : 
    1220             : static void
    1221          32 : message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1222             :                    XLogRecPtr message_lsn, bool transactional,
    1223             :                    const char *prefix, Size message_size, const char *message)
    1224             : {
    1225          32 :     LogicalDecodingContext *ctx = cache->private_data;
    1226             :     LogicalErrorCallbackState state;
    1227             :     ErrorContextCallback errcallback;
    1228             : 
    1229             :     Assert(!ctx->fast_forward);
    1230             : 
    1231          32 :     if (ctx->callbacks.message_cb == NULL)
    1232           0 :         return;
    1233             : 
    1234             :     /* Push callback + info on the error context stack */
    1235          32 :     state.ctx = ctx;
    1236          32 :     state.callback_name = "message";
    1237          32 :     state.report_location = message_lsn;
    1238          32 :     errcallback.callback = output_plugin_error_callback;
    1239          32 :     errcallback.arg = &state;
    1240          32 :     errcallback.previous = error_context_stack;
    1241          32 :     error_context_stack = &errcallback;
    1242             : 
    1243             :     /* set output state */
    1244          32 :     ctx->accept_writes = true;
    1245          32 :     ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
    1246          32 :     ctx->write_location = message_lsn;
    1247          32 :     ctx->end_xact = false;
    1248             : 
    1249             :     /* do the actual work: call callback */
    1250          32 :     ctx->callbacks.message_cb(ctx, txn, message_lsn, transactional, prefix,
    1251             :                               message_size, message);
    1252             : 
    1253             :     /* Pop the error context stack */
    1254          32 :     error_context_stack = errcallback.previous;
    1255             : }
    1256             : 
    1257             : static void
    1258        1336 : stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1259             :                         XLogRecPtr first_lsn)
    1260             : {
    1261        1336 :     LogicalDecodingContext *ctx = cache->private_data;
    1262             :     LogicalErrorCallbackState state;
    1263             :     ErrorContextCallback errcallback;
    1264             : 
    1265             :     Assert(!ctx->fast_forward);
    1266             : 
    1267             :     /* We're only supposed to call this when streaming is supported. */
    1268             :     Assert(ctx->streaming);
    1269             : 
    1270             :     /* Push callback + info on the error context stack */
    1271        1336 :     state.ctx = ctx;
    1272        1336 :     state.callback_name = "stream_start";
    1273        1336 :     state.report_location = first_lsn;
    1274        1336 :     errcallback.callback = output_plugin_error_callback;
    1275        1336 :     errcallback.arg = &state;
    1276        1336 :     errcallback.previous = error_context_stack;
    1277        1336 :     error_context_stack = &errcallback;
    1278             : 
    1279             :     /* set output state */
    1280        1336 :     ctx->accept_writes = true;
    1281        1336 :     ctx->write_xid = txn->xid;
    1282             : 
    1283             :     /*
    1284             :      * Report this message's lsn so replies from clients can give an
    1285             :      * up-to-date answer. This won't ever be enough (and shouldn't be!) to
    1286             :      * confirm receipt of this transaction, but it might allow another
    1287             :      * transaction's commit to be confirmed with one message.
    1288             :      */
    1289        1336 :     ctx->write_location = first_lsn;
    1290             : 
    1291        1336 :     ctx->end_xact = false;
    1292             : 
    1293             :     /* in streaming mode, stream_start_cb is required */
    1294        1336 :     if (ctx->callbacks.stream_start_cb == NULL)
    1295           0 :         ereport(ERROR,
    1296             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1297             :                  errmsg("logical streaming requires a %s callback",
    1298             :                         "stream_start_cb")));
    1299             : 
    1300        1336 :     ctx->callbacks.stream_start_cb(ctx, txn);
    1301             : 
    1302             :     /* Pop the error context stack */
    1303        1336 :     error_context_stack = errcallback.previous;
    1304        1336 : }
    1305             : 
    1306             : static void
    1307        1336 : stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1308             :                        XLogRecPtr last_lsn)
    1309             : {
    1310        1336 :     LogicalDecodingContext *ctx = cache->private_data;
    1311             :     LogicalErrorCallbackState state;
    1312             :     ErrorContextCallback errcallback;
    1313             : 
    1314             :     Assert(!ctx->fast_forward);
    1315             : 
    1316             :     /* We're only supposed to call this when streaming is supported. */
    1317             :     Assert(ctx->streaming);
    1318             : 
    1319             :     /* Push callback + info on the error context stack */
    1320        1336 :     state.ctx = ctx;
    1321        1336 :     state.callback_name = "stream_stop";
    1322        1336 :     state.report_location = last_lsn;
    1323        1336 :     errcallback.callback = output_plugin_error_callback;
    1324        1336 :     errcallback.arg = &state;
    1325        1336 :     errcallback.previous = error_context_stack;
    1326        1336 :     error_context_stack = &errcallback;
    1327             : 
    1328             :     /* set output state */
    1329        1336 :     ctx->accept_writes = true;
    1330        1336 :     ctx->write_xid = txn->xid;
    1331             : 
    1332             :     /*
    1333             :      * Report this message's lsn so replies from clients can give an
    1334             :      * up-to-date answer. This won't ever be enough (and shouldn't be!) to
    1335             :      * confirm receipt of this transaction, but it might allow another
    1336             :      * transaction's commit to be confirmed with one message.
    1337             :      */
    1338        1336 :     ctx->write_location = last_lsn;
    1339             : 
    1340        1336 :     ctx->end_xact = false;
    1341             : 
    1342             :     /* in streaming mode, stream_stop_cb is required */
    1343        1336 :     if (ctx->callbacks.stream_stop_cb == NULL)
    1344           0 :         ereport(ERROR,
    1345             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1346             :                  errmsg("logical streaming requires a %s callback",
    1347             :                         "stream_stop_cb")));
    1348             : 
    1349        1336 :     ctx->callbacks.stream_stop_cb(ctx, txn);
    1350             : 
    1351             :     /* Pop the error context stack */
    1352        1336 :     error_context_stack = errcallback.previous;
    1353        1336 : }
    1354             : 
    1355             : static void
    1356          60 : stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1357             :                         XLogRecPtr abort_lsn)
    1358             : {
    1359          60 :     LogicalDecodingContext *ctx = cache->private_data;
    1360             :     LogicalErrorCallbackState state;
    1361             :     ErrorContextCallback errcallback;
    1362             : 
    1363             :     Assert(!ctx->fast_forward);
    1364             : 
    1365             :     /* We're only supposed to call this when streaming is supported. */
    1366             :     Assert(ctx->streaming);
    1367             : 
    1368             :     /* Push callback + info on the error context stack */
    1369          60 :     state.ctx = ctx;
    1370          60 :     state.callback_name = "stream_abort";
    1371          60 :     state.report_location = abort_lsn;
    1372          60 :     errcallback.callback = output_plugin_error_callback;
    1373          60 :     errcallback.arg = &state;
    1374          60 :     errcallback.previous = error_context_stack;
    1375          60 :     error_context_stack = &errcallback;
    1376             : 
    1377             :     /* set output state */
    1378          60 :     ctx->accept_writes = true;
    1379          60 :     ctx->write_xid = txn->xid;
    1380          60 :     ctx->write_location = abort_lsn;
    1381          60 :     ctx->end_xact = true;
    1382             : 
    1383             :     /* in streaming mode, stream_abort_cb is required */
    1384          60 :     if (ctx->callbacks.stream_abort_cb == NULL)
    1385           0 :         ereport(ERROR,
    1386             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1387             :                  errmsg("logical streaming requires a %s callback",
    1388             :                         "stream_abort_cb")));
    1389             : 
    1390          60 :     ctx->callbacks.stream_abort_cb(ctx, txn, abort_lsn);
    1391             : 
    1392             :     /* Pop the error context stack */
    1393          60 :     error_context_stack = errcallback.previous;
    1394          60 : }
    1395             : 
    1396             : static void
    1397          28 : stream_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1398             :                           XLogRecPtr prepare_lsn)
    1399             : {
    1400          28 :     LogicalDecodingContext *ctx = cache->private_data;
    1401             :     LogicalErrorCallbackState state;
    1402             :     ErrorContextCallback errcallback;
    1403             : 
    1404             :     Assert(!ctx->fast_forward);
    1405             : 
    1406             :     /*
    1407             :      * We're only supposed to call this when streaming and two-phase commits
    1408             :      * are supported.
    1409             :      */
    1410             :     Assert(ctx->streaming);
    1411             :     Assert(ctx->twophase);
    1412             : 
    1413             :     /* Push callback + info on the error context stack */
    1414          28 :     state.ctx = ctx;
    1415          28 :     state.callback_name = "stream_prepare";
    1416          28 :     state.report_location = txn->final_lsn;
    1417          28 :     errcallback.callback = output_plugin_error_callback;
    1418          28 :     errcallback.arg = &state;
    1419          28 :     errcallback.previous = error_context_stack;
    1420          28 :     error_context_stack = &errcallback;
    1421             : 
    1422             :     /* set output state */
    1423          28 :     ctx->accept_writes = true;
    1424          28 :     ctx->write_xid = txn->xid;
    1425          28 :     ctx->write_location = txn->end_lsn;
    1426          28 :     ctx->end_xact = true;
    1427             : 
    1428             :     /* in streaming mode with two-phase commits, stream_prepare_cb is required */
    1429          28 :     if (ctx->callbacks.stream_prepare_cb == NULL)
    1430           0 :         ereport(ERROR,
    1431             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1432             :                  errmsg("logical streaming at prepare time requires a %s callback",
    1433             :                         "stream_prepare_cb")));
    1434             : 
    1435          28 :     ctx->callbacks.stream_prepare_cb(ctx, txn, prepare_lsn);
    1436             : 
    1437             :     /* Pop the error context stack */
    1438          28 :     error_context_stack = errcallback.previous;
    1439          28 : }
    1440             : 
    1441             : static void
    1442         100 : stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1443             :                          XLogRecPtr commit_lsn)
    1444             : {
    1445         100 :     LogicalDecodingContext *ctx = cache->private_data;
    1446             :     LogicalErrorCallbackState state;
    1447             :     ErrorContextCallback errcallback;
    1448             : 
    1449             :     Assert(!ctx->fast_forward);
    1450             : 
    1451             :     /* We're only supposed to call this when streaming is supported. */
    1452             :     Assert(ctx->streaming);
    1453             : 
    1454             :     /* Push callback + info on the error context stack */
    1455         100 :     state.ctx = ctx;
    1456         100 :     state.callback_name = "stream_commit";
    1457         100 :     state.report_location = txn->final_lsn;
    1458         100 :     errcallback.callback = output_plugin_error_callback;
    1459         100 :     errcallback.arg = &state;
    1460         100 :     errcallback.previous = error_context_stack;
    1461         100 :     error_context_stack = &errcallback;
    1462             : 
    1463             :     /* set output state */
    1464         100 :     ctx->accept_writes = true;
    1465         100 :     ctx->write_xid = txn->xid;
    1466         100 :     ctx->write_location = txn->end_lsn;
    1467         100 :     ctx->end_xact = true;
    1468             : 
    1469             :     /* in streaming mode, stream_commit_cb is required */
    1470         100 :     if (ctx->callbacks.stream_commit_cb == NULL)
    1471           0 :         ereport(ERROR,
    1472             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1473             :                  errmsg("logical streaming requires a %s callback",
    1474             :                         "stream_commit_cb")));
    1475             : 
    1476         100 :     ctx->callbacks.stream_commit_cb(ctx, txn, commit_lsn);
    1477             : 
    1478             :     /* Pop the error context stack */
    1479         100 :     error_context_stack = errcallback.previous;
    1480         100 : }
    1481             : 
    1482             : static void
    1483      351976 : stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1484             :                          Relation relation, ReorderBufferChange *change)
    1485             : {
    1486      351976 :     LogicalDecodingContext *ctx = cache->private_data;
    1487             :     LogicalErrorCallbackState state;
    1488             :     ErrorContextCallback errcallback;
    1489             : 
    1490             :     Assert(!ctx->fast_forward);
    1491             : 
    1492             :     /* We're only supposed to call this when streaming is supported. */
    1493             :     Assert(ctx->streaming);
    1494             : 
    1495             :     /* Push callback + info on the error context stack */
    1496      351976 :     state.ctx = ctx;
    1497      351976 :     state.callback_name = "stream_change";
    1498      351976 :     state.report_location = change->lsn;
    1499      351976 :     errcallback.callback = output_plugin_error_callback;
    1500      351976 :     errcallback.arg = &state;
    1501      351976 :     errcallback.previous = error_context_stack;
    1502      351976 :     error_context_stack = &errcallback;
    1503             : 
    1504             :     /* set output state */
    1505      351976 :     ctx->accept_writes = true;
    1506      351976 :     ctx->write_xid = txn->xid;
    1507             : 
    1508             :     /*
    1509             :      * Report this change's lsn so replies from clients can give an up-to-date
    1510             :      * answer. This won't ever be enough (and shouldn't be!) to confirm
    1511             :      * receipt of this transaction, but it might allow another transaction's
    1512             :      * commit to be confirmed with one message.
    1513             :      */
    1514      351976 :     ctx->write_location = change->lsn;
    1515             : 
    1516      351976 :     ctx->end_xact = false;
    1517             : 
    1518             :     /* in streaming mode, stream_change_cb is required */
    1519      351976 :     if (ctx->callbacks.stream_change_cb == NULL)
    1520           0 :         ereport(ERROR,
    1521             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1522             :                  errmsg("logical streaming requires a %s callback",
    1523             :                         "stream_change_cb")));
    1524             : 
    1525      351976 :     ctx->callbacks.stream_change_cb(ctx, txn, relation, change);
    1526             : 
    1527             :     /* Pop the error context stack */
    1528      351976 :     error_context_stack = errcallback.previous;
    1529      351976 : }
    1530             : 
    1531             : static void
    1532           6 : stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1533             :                           XLogRecPtr message_lsn, bool transactional,
    1534             :                           const char *prefix, Size message_size, const char *message)
    1535             : {
    1536           6 :     LogicalDecodingContext *ctx = cache->private_data;
    1537             :     LogicalErrorCallbackState state;
    1538             :     ErrorContextCallback errcallback;
    1539             : 
    1540             :     Assert(!ctx->fast_forward);
    1541             : 
    1542             :     /* We're only supposed to call this when streaming is supported. */
    1543             :     Assert(ctx->streaming);
    1544             : 
    1545             :     /* this callback is optional */
    1546           6 :     if (ctx->callbacks.stream_message_cb == NULL)
    1547           0 :         return;
    1548             : 
    1549             :     /* Push callback + info on the error context stack */
    1550           6 :     state.ctx = ctx;
    1551           6 :     state.callback_name = "stream_message";
    1552           6 :     state.report_location = message_lsn;
    1553           6 :     errcallback.callback = output_plugin_error_callback;
    1554           6 :     errcallback.arg = &state;
    1555           6 :     errcallback.previous = error_context_stack;
    1556           6 :     error_context_stack = &errcallback;
    1557             : 
    1558             :     /* set output state */
    1559           6 :     ctx->accept_writes = true;
    1560           6 :     ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
    1561           6 :     ctx->write_location = message_lsn;
    1562           6 :     ctx->end_xact = false;
    1563             : 
    1564             :     /* do the actual work: call callback */
    1565           6 :     ctx->callbacks.stream_message_cb(ctx, txn, message_lsn, transactional, prefix,
    1566             :                                      message_size, message);
    1567             : 
    1568             :     /* Pop the error context stack */
    1569           6 :     error_context_stack = errcallback.previous;
    1570             : }
    1571             : 
    1572             : static void
    1573           0 : stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1574             :                            int nrelations, Relation relations[],
    1575             :                            ReorderBufferChange *change)
    1576             : {
    1577           0 :     LogicalDecodingContext *ctx = cache->private_data;
    1578             :     LogicalErrorCallbackState state;
    1579             :     ErrorContextCallback errcallback;
    1580             : 
    1581             :     Assert(!ctx->fast_forward);
    1582             : 
    1583             :     /* We're only supposed to call this when streaming is supported. */
    1584             :     Assert(ctx->streaming);
    1585             : 
    1586             :     /* this callback is optional */
    1587           0 :     if (!ctx->callbacks.stream_truncate_cb)
    1588           0 :         return;
    1589             : 
    1590             :     /* Push callback + info on the error context stack */
    1591           0 :     state.ctx = ctx;
    1592           0 :     state.callback_name = "stream_truncate";
    1593           0 :     state.report_location = change->lsn;
    1594           0 :     errcallback.callback = output_plugin_error_callback;
    1595           0 :     errcallback.arg = &state;
    1596           0 :     errcallback.previous = error_context_stack;
    1597           0 :     error_context_stack = &errcallback;
    1598             : 
    1599             :     /* set output state */
    1600           0 :     ctx->accept_writes = true;
    1601           0 :     ctx->write_xid = txn->xid;
    1602             : 
    1603             :     /*
    1604             :      * Report this change's lsn so replies from clients can give an up-to-date
    1605             :      * answer. This won't ever be enough (and shouldn't be!) to confirm
    1606             :      * receipt of this transaction, but it might allow another transaction's
    1607             :      * commit to be confirmed with one message.
    1608             :      */
    1609           0 :     ctx->write_location = change->lsn;
    1610             : 
    1611           0 :     ctx->end_xact = false;
    1612             : 
    1613           0 :     ctx->callbacks.stream_truncate_cb(ctx, txn, nrelations, relations, change);
    1614             : 
    1615             :     /* Pop the error context stack */
    1616           0 :     error_context_stack = errcallback.previous;
    1617             : }
    1618             : 
    1619             : static void
    1620        6258 : update_progress_txn_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1621             :                                XLogRecPtr lsn)
    1622             : {
    1623        6258 :     LogicalDecodingContext *ctx = cache->private_data;
    1624             :     LogicalErrorCallbackState state;
    1625             :     ErrorContextCallback errcallback;
    1626             : 
    1627             :     Assert(!ctx->fast_forward);
    1628             : 
    1629             :     /* Push callback + info on the error context stack */
    1630        6258 :     state.ctx = ctx;
    1631        6258 :     state.callback_name = "update_progress_txn";
    1632        6258 :     state.report_location = lsn;
    1633        6258 :     errcallback.callback = output_plugin_error_callback;
    1634        6258 :     errcallback.arg = &state;
    1635        6258 :     errcallback.previous = error_context_stack;
    1636        6258 :     error_context_stack = &errcallback;
    1637             : 
    1638             :     /* set output state */
    1639        6258 :     ctx->accept_writes = false;
    1640        6258 :     ctx->write_xid = txn->xid;
    1641             : 
    1642             :     /*
    1643             :      * Report this change's lsn so replies from clients can give an up-to-date
    1644             :      * answer. This won't ever be enough (and shouldn't be!) to confirm
    1645             :      * receipt of this transaction, but it might allow another transaction's
    1646             :      * commit to be confirmed with one message.
    1647             :      */
    1648        6258 :     ctx->write_location = lsn;
    1649             : 
    1650        6258 :     ctx->end_xact = false;
    1651             : 
    1652        6258 :     OutputPluginUpdateProgress(ctx, false);
    1653             : 
    1654             :     /* Pop the error context stack */
    1655        6258 :     error_context_stack = errcallback.previous;
    1656        6258 : }
    1657             : 
    1658             : /*
    1659             :  * Set the required catalog xmin horizon for historic snapshots in the current
    1660             :  * replication slot.
    1661             :  *
    1662             :  * Note that in the most cases, we won't be able to immediately use the xmin
    1663             :  * to increase the xmin horizon: we need to wait till the client has confirmed
    1664             :  * receiving current_lsn with LogicalConfirmReceivedLocation().
    1665             :  */
    1666             : void
    1667         848 : LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
    1668             : {
    1669         848 :     bool        updated_xmin = false;
    1670             :     ReplicationSlot *slot;
    1671         848 :     bool        got_new_xmin = false;
    1672             : 
    1673         848 :     slot = MyReplicationSlot;
    1674             : 
    1675             :     Assert(slot != NULL);
    1676             : 
    1677         848 :     SpinLockAcquire(&slot->mutex);
    1678             : 
    1679             :     /*
    1680             :      * don't overwrite if we already have a newer xmin. This can happen if we
    1681             :      * restart decoding in a slot.
    1682             :      */
    1683         848 :     if (TransactionIdPrecedesOrEquals(xmin, slot->data.catalog_xmin))
    1684             :     {
    1685             :     }
    1686             : 
    1687             :     /*
    1688             :      * If the client has already confirmed up to this lsn, we directly can
    1689             :      * mark this as accepted. This can happen if we restart decoding in a
    1690             :      * slot.
    1691             :      */
    1692         214 :     else if (current_lsn <= slot->data.confirmed_flush)
    1693             :     {
    1694         108 :         slot->candidate_catalog_xmin = xmin;
    1695         108 :         slot->candidate_xmin_lsn = current_lsn;
    1696             : 
    1697             :         /* our candidate can directly be used */
    1698         108 :         updated_xmin = true;
    1699             :     }
    1700             : 
    1701             :     /*
    1702             :      * Only increase if the previous values have been applied, otherwise we
    1703             :      * might never end up updating if the receiver acks too slowly.
    1704             :      */
    1705         106 :     else if (!XLogRecPtrIsValid(slot->candidate_xmin_lsn))
    1706             :     {
    1707          44 :         slot->candidate_catalog_xmin = xmin;
    1708          44 :         slot->candidate_xmin_lsn = current_lsn;
    1709             : 
    1710             :         /*
    1711             :          * Log new xmin at an appropriate log level after releasing the
    1712             :          * spinlock.
    1713             :          */
    1714          44 :         got_new_xmin = true;
    1715             :     }
    1716         848 :     SpinLockRelease(&slot->mutex);
    1717             : 
    1718         848 :     if (got_new_xmin)
    1719          44 :         elog(DEBUG1, "got new catalog xmin %u at %X/%08X", xmin,
    1720             :              LSN_FORMAT_ARGS(current_lsn));
    1721             : 
    1722             :     /* candidate already valid with the current flush position, apply */
    1723         848 :     if (updated_xmin)
    1724         108 :         LogicalConfirmReceivedLocation(slot->data.confirmed_flush);
    1725         848 : }
    1726             : 
    1727             : /*
    1728             :  * Mark the minimal LSN (restart_lsn) we need to read to replay all
    1729             :  * transactions that have not yet committed at current_lsn.
    1730             :  *
    1731             :  * Just like LogicalIncreaseXminForSlot this only takes effect when the
    1732             :  * client has confirmed to have received current_lsn.
    1733             :  */
    1734             : void
    1735         744 : LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart_lsn)
    1736             : {
    1737         744 :     bool        updated_lsn = false;
    1738             :     ReplicationSlot *slot;
    1739             : 
    1740         744 :     slot = MyReplicationSlot;
    1741             : 
    1742             :     Assert(slot != NULL);
    1743             :     Assert(XLogRecPtrIsValid(restart_lsn));
    1744             :     Assert(XLogRecPtrIsValid(current_lsn));
    1745             : 
    1746         744 :     SpinLockAcquire(&slot->mutex);
    1747             : 
    1748             :     /* don't overwrite if have a newer restart lsn */
    1749         744 :     if (restart_lsn <= slot->data.restart_lsn)
    1750             :     {
    1751          20 :         SpinLockRelease(&slot->mutex);
    1752             :     }
    1753             : 
    1754             :     /*
    1755             :      * We might have already flushed far enough to directly accept this lsn,
    1756             :      * in this case there is no need to check for existing candidate LSNs
    1757             :      */
    1758         724 :     else if (current_lsn <= slot->data.confirmed_flush)
    1759             :     {
    1760         562 :         slot->candidate_restart_valid = current_lsn;
    1761         562 :         slot->candidate_restart_lsn = restart_lsn;
    1762         562 :         SpinLockRelease(&slot->mutex);
    1763             : 
    1764             :         /* our candidate can directly be used */
    1765         562 :         updated_lsn = true;
    1766             :     }
    1767             : 
    1768             :     /*
    1769             :      * Only increase if the previous values have been applied, otherwise we
    1770             :      * might never end up updating if the receiver acks too slowly. A missed
    1771             :      * value here will just cause some extra effort after reconnecting.
    1772             :      */
    1773         162 :     else if (!XLogRecPtrIsValid(slot->candidate_restart_valid))
    1774             :     {
    1775          80 :         slot->candidate_restart_valid = current_lsn;
    1776          80 :         slot->candidate_restart_lsn = restart_lsn;
    1777          80 :         SpinLockRelease(&slot->mutex);
    1778             : 
    1779          80 :         elog(DEBUG1, "got new restart lsn %X/%08X at %X/%08X",
    1780             :              LSN_FORMAT_ARGS(restart_lsn),
    1781             :              LSN_FORMAT_ARGS(current_lsn));
    1782             :     }
    1783             :     else
    1784             :     {
    1785             :         XLogRecPtr  candidate_restart_lsn;
    1786             :         XLogRecPtr  candidate_restart_valid;
    1787             :         XLogRecPtr  confirmed_flush;
    1788             : 
    1789          82 :         candidate_restart_lsn = slot->candidate_restart_lsn;
    1790          82 :         candidate_restart_valid = slot->candidate_restart_valid;
    1791          82 :         confirmed_flush = slot->data.confirmed_flush;
    1792          82 :         SpinLockRelease(&slot->mutex);
    1793             : 
    1794          82 :         elog(DEBUG1, "failed to increase restart lsn: proposed %X/%08X, after %X/%08X, current candidate %X/%08X, current after %X/%08X, flushed up to %X/%08X",
    1795             :              LSN_FORMAT_ARGS(restart_lsn),
    1796             :              LSN_FORMAT_ARGS(current_lsn),
    1797             :              LSN_FORMAT_ARGS(candidate_restart_lsn),
    1798             :              LSN_FORMAT_ARGS(candidate_restart_valid),
    1799             :              LSN_FORMAT_ARGS(confirmed_flush));
    1800             :     }
    1801             : 
    1802             :     /* candidates are already valid with the current flush position, apply */
    1803         744 :     if (updated_lsn)
    1804         562 :         LogicalConfirmReceivedLocation(slot->data.confirmed_flush);
    1805         744 : }
    1806             : 
    1807             : /*
    1808             :  * Handle a consumer's confirmation having received all changes up to lsn.
    1809             :  */
    1810             : void
    1811      103900 : LogicalConfirmReceivedLocation(XLogRecPtr lsn)
    1812             : {
    1813             :     Assert(XLogRecPtrIsValid(lsn));
    1814             : 
    1815             :     /* Do an unlocked check for candidate_lsn first. */
    1816      103900 :     if (XLogRecPtrIsValid(MyReplicationSlot->candidate_xmin_lsn) ||
    1817      103702 :         XLogRecPtrIsValid(MyReplicationSlot->candidate_restart_valid))
    1818         798 :     {
    1819         798 :         bool        updated_xmin = false;
    1820         798 :         bool        updated_restart = false;
    1821             :         XLogRecPtr  restart_lsn pg_attribute_unused();
    1822             : 
    1823         798 :         SpinLockAcquire(&MyReplicationSlot->mutex);
    1824             : 
    1825             :         /* remember the old restart lsn */
    1826         798 :         restart_lsn = MyReplicationSlot->data.restart_lsn;
    1827             : 
    1828             :         /*
    1829             :          * Prevent moving the confirmed_flush backwards, as this could lead to
    1830             :          * data duplication issues caused by replicating already replicated
    1831             :          * changes.
    1832             :          *
    1833             :          * This can happen when a client acknowledges an LSN it doesn't have
    1834             :          * to do anything for, and thus didn't store persistently. After a
    1835             :          * restart, the client can send the prior LSN that it stored
    1836             :          * persistently as an acknowledgement, but we need to ignore such an
    1837             :          * LSN. See similar case handling in CreateDecodingContext.
    1838             :          */
    1839         798 :         if (lsn > MyReplicationSlot->data.confirmed_flush)
    1840          72 :             MyReplicationSlot->data.confirmed_flush = lsn;
    1841             : 
    1842             :         /* if we're past the location required for bumping xmin, do so */
    1843         798 :         if (XLogRecPtrIsValid(MyReplicationSlot->candidate_xmin_lsn) &&
    1844         198 :             MyReplicationSlot->candidate_xmin_lsn <= lsn)
    1845             :         {
    1846             :             /*
    1847             :              * We have to write the changed xmin to disk *before* we change
    1848             :              * the in-memory value, otherwise after a crash we wouldn't know
    1849             :              * that some catalog tuples might have been removed already.
    1850             :              *
    1851             :              * Ensure that by first writing to ->xmin and only update
    1852             :              * ->effective_xmin once the new state is synced to disk. After a
    1853             :              * crash ->effective_xmin is set to ->xmin.
    1854             :              */
    1855         140 :             if (TransactionIdIsValid(MyReplicationSlot->candidate_catalog_xmin) &&
    1856         140 :                 MyReplicationSlot->data.catalog_xmin != MyReplicationSlot->candidate_catalog_xmin)
    1857             :             {
    1858         140 :                 MyReplicationSlot->data.catalog_xmin = MyReplicationSlot->candidate_catalog_xmin;
    1859         140 :                 MyReplicationSlot->candidate_catalog_xmin = InvalidTransactionId;
    1860         140 :                 MyReplicationSlot->candidate_xmin_lsn = InvalidXLogRecPtr;
    1861         140 :                 updated_xmin = true;
    1862             :             }
    1863             :         }
    1864             : 
    1865         798 :         if (XLogRecPtrIsValid(MyReplicationSlot->candidate_restart_valid) &&
    1866         690 :             MyReplicationSlot->candidate_restart_valid <= lsn)
    1867             :         {
    1868             :             Assert(XLogRecPtrIsValid(MyReplicationSlot->candidate_restart_lsn));
    1869             : 
    1870         632 :             MyReplicationSlot->data.restart_lsn = MyReplicationSlot->candidate_restart_lsn;
    1871         632 :             MyReplicationSlot->candidate_restart_lsn = InvalidXLogRecPtr;
    1872         632 :             MyReplicationSlot->candidate_restart_valid = InvalidXLogRecPtr;
    1873         632 :             updated_restart = true;
    1874             :         }
    1875             : 
    1876         798 :         SpinLockRelease(&MyReplicationSlot->mutex);
    1877             : 
    1878             :         /* first write new xmin to disk, so we know what's up after a crash */
    1879         798 :         if (updated_xmin || updated_restart)
    1880             :         {
    1881             : #ifdef USE_INJECTION_POINTS
    1882             :             XLogSegNo   seg1,
    1883             :                         seg2;
    1884             : 
    1885         740 :             XLByteToSeg(restart_lsn, seg1, wal_segment_size);
    1886         740 :             XLByteToSeg(MyReplicationSlot->data.restart_lsn, seg2, wal_segment_size);
    1887             : 
    1888             :             /* trigger injection point, but only if segment changes */
    1889         740 :             if (seg1 != seg2)
    1890          12 :                 INJECTION_POINT("logical-replication-slot-advance-segment", NULL);
    1891             : #endif
    1892             : 
    1893         740 :             ReplicationSlotMarkDirty();
    1894         740 :             ReplicationSlotSave();
    1895         740 :             elog(DEBUG1, "updated xmin: %u restart: %u", updated_xmin, updated_restart);
    1896             :         }
    1897             : 
    1898             :         /*
    1899             :          * Now the new xmin is safely on disk, we can let the global value
    1900             :          * advance. We do not take ProcArrayLock or similar since we only
    1901             :          * advance xmin here and there's not much harm done by a concurrent
    1902             :          * computation missing that.
    1903             :          */
    1904         798 :         if (updated_xmin)
    1905             :         {
    1906         140 :             SpinLockAcquire(&MyReplicationSlot->mutex);
    1907         140 :             MyReplicationSlot->effective_catalog_xmin = MyReplicationSlot->data.catalog_xmin;
    1908         140 :             SpinLockRelease(&MyReplicationSlot->mutex);
    1909             : 
    1910         140 :             ReplicationSlotsComputeRequiredXmin(false);
    1911         140 :             ReplicationSlotsComputeRequiredLSN();
    1912             :         }
    1913             :     }
    1914             :     else
    1915             :     {
    1916      103102 :         SpinLockAcquire(&MyReplicationSlot->mutex);
    1917             : 
    1918             :         /*
    1919             :          * Prevent moving the confirmed_flush backwards. See comments above
    1920             :          * for the details.
    1921             :          */
    1922      103102 :         if (lsn > MyReplicationSlot->data.confirmed_flush)
    1923      101568 :             MyReplicationSlot->data.confirmed_flush = lsn;
    1924             : 
    1925      103102 :         SpinLockRelease(&MyReplicationSlot->mutex);
    1926             :     }
    1927      103900 : }
    1928             : 
    1929             : /*
    1930             :  * Clear logical streaming state during (sub)transaction abort.
    1931             :  */
    1932             : void
    1933       61554 : ResetLogicalStreamingState(void)
    1934             : {
    1935       61554 :     CheckXidAlive = InvalidTransactionId;
    1936       61554 :     bsysscan = false;
    1937       61554 : }
    1938             : 
    1939             : /*
    1940             :  * Report stats for a slot.
    1941             :  */
    1942             : void
    1943       13014 : UpdateDecodingStats(LogicalDecodingContext *ctx)
    1944             : {
    1945       13014 :     ReorderBuffer *rb = ctx->reorder;
    1946             :     PgStat_StatReplSlotEntry repSlotStat;
    1947             : 
    1948             :     /* Nothing to do if we don't have any replication stats to be sent. */
    1949       13014 :     if (rb->spillBytes <= 0 && rb->streamBytes <= 0 && rb->totalBytes <= 0 &&
    1950         692 :         rb->memExceededCount <= 0)
    1951         674 :         return;
    1952             : 
    1953       12340 :     elog(DEBUG2, "UpdateDecodingStats: updating stats %p %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64,
    1954             :          rb,
    1955             :          rb->spillTxns,
    1956             :          rb->spillCount,
    1957             :          rb->spillBytes,
    1958             :          rb->streamTxns,
    1959             :          rb->streamCount,
    1960             :          rb->streamBytes,
    1961             :          rb->memExceededCount,
    1962             :          rb->totalTxns,
    1963             :          rb->totalBytes);
    1964             : 
    1965       12340 :     repSlotStat.spill_txns = rb->spillTxns;
    1966       12340 :     repSlotStat.spill_count = rb->spillCount;
    1967       12340 :     repSlotStat.spill_bytes = rb->spillBytes;
    1968       12340 :     repSlotStat.stream_txns = rb->streamTxns;
    1969       12340 :     repSlotStat.stream_count = rb->streamCount;
    1970       12340 :     repSlotStat.stream_bytes = rb->streamBytes;
    1971       12340 :     repSlotStat.mem_exceeded_count = rb->memExceededCount;
    1972       12340 :     repSlotStat.total_txns = rb->totalTxns;
    1973       12340 :     repSlotStat.total_bytes = rb->totalBytes;
    1974             : 
    1975       12340 :     pgstat_report_replslot(ctx->slot, &repSlotStat);
    1976             : 
    1977       12340 :     rb->spillTxns = 0;
    1978       12340 :     rb->spillCount = 0;
    1979       12340 :     rb->spillBytes = 0;
    1980       12340 :     rb->streamTxns = 0;
    1981       12340 :     rb->streamCount = 0;
    1982       12340 :     rb->streamBytes = 0;
    1983       12340 :     rb->memExceededCount = 0;
    1984       12340 :     rb->totalTxns = 0;
    1985       12340 :     rb->totalBytes = 0;
    1986             : }
    1987             : 
    1988             : /*
    1989             :  * Read up to the end of WAL starting from the decoding slot's restart_lsn.
    1990             :  * Return true if any meaningful/decodable WAL records are encountered,
    1991             :  * otherwise false.
    1992             :  */
    1993             : bool
    1994          10 : LogicalReplicationSlotHasPendingWal(XLogRecPtr end_of_wal)
    1995             : {
    1996          10 :     bool        has_pending_wal = false;
    1997             : 
    1998             :     Assert(MyReplicationSlot);
    1999             : 
    2000          10 :     PG_TRY();
    2001             :     {
    2002             :         LogicalDecodingContext *ctx;
    2003             : 
    2004             :         /*
    2005             :          * Create our decoding context in fast_forward mode, passing start_lsn
    2006             :          * as InvalidXLogRecPtr, so that we start processing from the slot's
    2007             :          * confirmed_flush.
    2008             :          */
    2009          20 :         ctx = CreateDecodingContext(InvalidXLogRecPtr,
    2010             :                                     NIL,
    2011             :                                     true,   /* fast_forward */
    2012          10 :                                     XL_ROUTINE(.page_read = read_local_xlog_page,
    2013             :                                                .segment_open = wal_segment_open,
    2014             :                                                .segment_close = wal_segment_close),
    2015             :                                     NULL, NULL, NULL);
    2016             : 
    2017             :         /*
    2018             :          * Start reading at the slot's restart_lsn, which we know points to a
    2019             :          * valid record.
    2020             :          */
    2021          10 :         XLogBeginRead(ctx->reader, MyReplicationSlot->data.restart_lsn);
    2022             : 
    2023             :         /* Invalidate non-timetravel entries */
    2024          10 :         InvalidateSystemCaches();
    2025             : 
    2026             :         /* Loop until the end of WAL or some changes are processed */
    2027         296 :         while (!has_pending_wal && ctx->reader->EndRecPtr < end_of_wal)
    2028             :         {
    2029             :             XLogRecord *record;
    2030         286 :             char       *errm = NULL;
    2031             : 
    2032         286 :             record = XLogReadRecord(ctx->reader, &errm);
    2033             : 
    2034         286 :             if (errm)
    2035           0 :                 elog(ERROR, "could not find record for logical decoding: %s", errm);
    2036             : 
    2037         286 :             if (record != NULL)
    2038         286 :                 LogicalDecodingProcessRecord(ctx, ctx->reader);
    2039             : 
    2040         286 :             has_pending_wal = ctx->processing_required;
    2041             : 
    2042         286 :             CHECK_FOR_INTERRUPTS();
    2043             :         }
    2044             : 
    2045             :         /* Clean up */
    2046          10 :         FreeDecodingContext(ctx);
    2047          10 :         InvalidateSystemCaches();
    2048             :     }
    2049           0 :     PG_CATCH();
    2050             :     {
    2051             :         /* clear all timetravel entries */
    2052           0 :         InvalidateSystemCaches();
    2053             : 
    2054           0 :         PG_RE_THROW();
    2055             :     }
    2056          10 :     PG_END_TRY();
    2057             : 
    2058          10 :     return has_pending_wal;
    2059             : }
    2060             : 
    2061             : /*
    2062             :  * Helper function for advancing our logical replication slot forward.
    2063             :  *
    2064             :  * The slot's restart_lsn is used as start point for reading records, while
    2065             :  * confirmed_flush is used as base point for the decoding context.
    2066             :  *
    2067             :  * We cannot just do LogicalConfirmReceivedLocation to update confirmed_flush,
    2068             :  * because we need to digest WAL to advance restart_lsn allowing to recycle
    2069             :  * WAL and removal of old catalog tuples.  As decoding is done in fast_forward
    2070             :  * mode, no changes are generated anyway.
    2071             :  *
    2072             :  * *found_consistent_snapshot will be true if the initial decoding snapshot has
    2073             :  * been built; Otherwise, it will be false.
    2074             :  */
    2075             : XLogRecPtr
    2076          36 : LogicalSlotAdvanceAndCheckSnapState(XLogRecPtr moveto,
    2077             :                                     bool *found_consistent_snapshot)
    2078             : {
    2079             :     LogicalDecodingContext *ctx;
    2080          36 :     ResourceOwner old_resowner PG_USED_FOR_ASSERTS_ONLY = CurrentResourceOwner;
    2081             :     XLogRecPtr  retlsn;
    2082             : 
    2083             :     Assert(XLogRecPtrIsValid(moveto));
    2084             : 
    2085          36 :     if (found_consistent_snapshot)
    2086          12 :         *found_consistent_snapshot = false;
    2087             : 
    2088          36 :     PG_TRY();
    2089             :     {
    2090             :         /*
    2091             :          * Create our decoding context in fast_forward mode, passing start_lsn
    2092             :          * as InvalidXLogRecPtr, so that we start processing from my slot's
    2093             :          * confirmed_flush.
    2094             :          */
    2095          72 :         ctx = CreateDecodingContext(InvalidXLogRecPtr,
    2096             :                                     NIL,
    2097             :                                     true,   /* fast_forward */
    2098          36 :                                     XL_ROUTINE(.page_read = read_local_xlog_page,
    2099             :                                                .segment_open = wal_segment_open,
    2100             :                                                .segment_close = wal_segment_close),
    2101             :                                     NULL, NULL, NULL);
    2102             : 
    2103             :         /*
    2104             :          * Wait for specified streaming replication standby servers (if any)
    2105             :          * to confirm receipt of WAL up to moveto lsn.
    2106             :          */
    2107          36 :         WaitForStandbyConfirmation(moveto);
    2108             : 
    2109             :         /*
    2110             :          * Start reading at the slot's restart_lsn, which we know to point to
    2111             :          * a valid record.
    2112             :          */
    2113          36 :         XLogBeginRead(ctx->reader, MyReplicationSlot->data.restart_lsn);
    2114             : 
    2115             :         /* invalidate non-timetravel entries */
    2116          36 :         InvalidateSystemCaches();
    2117             : 
    2118             :         /* Decode records until we reach the requested target */
    2119        4274 :         while (ctx->reader->EndRecPtr < moveto)
    2120             :         {
    2121        4238 :             char       *errm = NULL;
    2122             :             XLogRecord *record;
    2123             : 
    2124             :             /*
    2125             :              * Read records.  No changes are generated in fast_forward mode,
    2126             :              * but snapbuilder/slot statuses are updated properly.
    2127             :              */
    2128        4238 :             record = XLogReadRecord(ctx->reader, &errm);
    2129        4238 :             if (errm)
    2130           0 :                 elog(ERROR, "could not find record while advancing replication slot: %s",
    2131             :                      errm);
    2132             : 
    2133             :             /*
    2134             :              * Process the record.  Storage-level changes are ignored in
    2135             :              * fast_forward mode, but other modules (such as snapbuilder)
    2136             :              * might still have critical updates to do.
    2137             :              */
    2138        4238 :             if (record)
    2139             :             {
    2140        4238 :                 LogicalDecodingProcessRecord(ctx, ctx->reader);
    2141             : 
    2142             :                 /*
    2143             :                  * We used to have bugs where logical decoding would fail to
    2144             :                  * preserve the resource owner.  That's important here, so
    2145             :                  * verify that that doesn't happen anymore.  XXX this could be
    2146             :                  * removed once it's been battle-tested.
    2147             :                  */
    2148             :                 Assert(CurrentResourceOwner == old_resowner);
    2149             :             }
    2150             : 
    2151        4238 :             CHECK_FOR_INTERRUPTS();
    2152             :         }
    2153             : 
    2154          36 :         if (found_consistent_snapshot && DecodingContextReady(ctx))
    2155          12 :             *found_consistent_snapshot = true;
    2156             : 
    2157          36 :         if (XLogRecPtrIsValid(ctx->reader->EndRecPtr))
    2158             :         {
    2159          36 :             LogicalConfirmReceivedLocation(moveto);
    2160             : 
    2161             :             /*
    2162             :              * If only the confirmed_flush LSN has changed the slot won't get
    2163             :              * marked as dirty by the above. Callers on the walsender
    2164             :              * interface are expected to keep track of their own progress and
    2165             :              * don't need it written out. But SQL-interface users cannot
    2166             :              * specify their own start positions and it's harder for them to
    2167             :              * keep track of their progress, so we should make more of an
    2168             :              * effort to save it for them.
    2169             :              *
    2170             :              * Dirty the slot so it is written out at the next checkpoint. The
    2171             :              * LSN position advanced to may still be lost on a crash but this
    2172             :              * makes the data consistent after a clean shutdown.
    2173             :              */
    2174          36 :             ReplicationSlotMarkDirty();
    2175             :         }
    2176             : 
    2177          36 :         retlsn = MyReplicationSlot->data.confirmed_flush;
    2178             : 
    2179             :         /* free context, call shutdown callback */
    2180          36 :         FreeDecodingContext(ctx);
    2181             : 
    2182          36 :         InvalidateSystemCaches();
    2183             :     }
    2184           0 :     PG_CATCH();
    2185             :     {
    2186             :         /* clear all timetravel entries */
    2187           0 :         InvalidateSystemCaches();
    2188             : 
    2189           0 :         PG_RE_THROW();
    2190             :     }
    2191          36 :     PG_END_TRY();
    2192             : 
    2193          36 :     return retlsn;
    2194             : }

Generated by: LCOV version 1.16