LCOV - code coverage report
Current view: top level - src/backend/replication/logical - logical.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 690 743 92.9 %
Date: 2025-04-19 19:15:24 Functions: 40 41 97.6 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  * logical.c
       3             :  *     PostgreSQL logical decoding coordination
       4             :  *
       5             :  * Copyright (c) 2012-2025, PostgreSQL Global Development Group
       6             :  *
       7             :  * IDENTIFICATION
       8             :  *    src/backend/replication/logical/logical.c
       9             :  *
      10             :  * NOTES
      11             :  *    This file coordinates interaction between the various modules that
      12             :  *    together provide logical decoding, primarily by providing so
      13             :  *    called LogicalDecodingContexts. The goal is to encapsulate most of the
      14             :  *    internal complexity for consumers of logical decoding, so they can
      15             :  *    create and consume a changestream with a low amount of code. Builtin
      16             :  *    consumers are the walsender and SQL SRF interface, but it's possible to
      17             :  *    add further ones without changing core code, e.g. to consume changes in
      18             :  *    a bgworker.
      19             :  *
      20             :  *    The idea is that a consumer provides three callbacks, one to read WAL,
      21             :  *    one to prepare a data write, and a final one for actually writing since
      22             :  *    their implementation depends on the type of consumer.  Check
      23             :  *    logicalfuncs.c for an example implementation of a fairly simple consumer
      24             :  *    and an implementation of a WAL reading callback that's suitable for
      25             :  *    simple consumers.
      26             :  *-------------------------------------------------------------------------
      27             :  */
      28             : 
      29             : #include "postgres.h"
      30             : 
      31             : #include "access/xact.h"
      32             : #include "access/xlogutils.h"
      33             : #include "fmgr.h"
      34             : #include "miscadmin.h"
      35             : #include "pgstat.h"
      36             : #include "replication/decode.h"
      37             : #include "replication/logical.h"
      38             : #include "replication/reorderbuffer.h"
      39             : #include "replication/slotsync.h"
      40             : #include "replication/snapbuild.h"
      41             : #include "storage/proc.h"
      42             : #include "storage/procarray.h"
      43             : #include "utils/builtins.h"
      44             : #include "utils/inval.h"
      45             : #include "utils/memutils.h"
      46             : 
      47             : /* data for errcontext callback */
      48             : typedef struct LogicalErrorCallbackState
      49             : {
      50             :     LogicalDecodingContext *ctx;
      51             :     const char *callback_name;
      52             :     XLogRecPtr  report_location;
      53             : } LogicalErrorCallbackState;
      54             : 
      55             : /* wrappers around output plugin callbacks */
      56             : static void output_plugin_error_callback(void *arg);
      57             : static void startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
      58             :                                bool is_init);
      59             : static void shutdown_cb_wrapper(LogicalDecodingContext *ctx);
      60             : static void begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn);
      61             : static void commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      62             :                               XLogRecPtr commit_lsn);
      63             : static void begin_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn);
      64             : static void prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      65             :                                XLogRecPtr prepare_lsn);
      66             : static void commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      67             :                                        XLogRecPtr commit_lsn);
      68             : static void rollback_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      69             :                                          XLogRecPtr prepare_end_lsn, TimestampTz prepare_time);
      70             : static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      71             :                               Relation relation, ReorderBufferChange *change);
      72             : static void truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      73             :                                 int nrelations, Relation relations[], ReorderBufferChange *change);
      74             : static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      75             :                                XLogRecPtr message_lsn, bool transactional,
      76             :                                const char *prefix, Size message_size, const char *message);
      77             : 
      78             : /* streaming callbacks */
      79             : static void stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      80             :                                     XLogRecPtr first_lsn);
      81             : static void stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      82             :                                    XLogRecPtr last_lsn);
      83             : static void stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      84             :                                     XLogRecPtr abort_lsn);
      85             : static void stream_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      86             :                                       XLogRecPtr prepare_lsn);
      87             : static void stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      88             :                                      XLogRecPtr commit_lsn);
      89             : static void stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      90             :                                      Relation relation, ReorderBufferChange *change);
      91             : static void stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      92             :                                       XLogRecPtr message_lsn, bool transactional,
      93             :                                       const char *prefix, Size message_size, const char *message);
      94             : static void stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      95             :                                        int nrelations, Relation relations[], ReorderBufferChange *change);
      96             : 
      97             : /* callback to update txn's progress */
      98             : static void update_progress_txn_cb_wrapper(ReorderBuffer *cache,
      99             :                                            ReorderBufferTXN *txn,
     100             :                                            XLogRecPtr lsn);
     101             : 
     102             : static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, const char *plugin);
     103             : 
     104             : /*
     105             :  * Make sure the current settings & environment are capable of doing logical
     106             :  * decoding.
     107             :  */
     108             : void
     109        2970 : CheckLogicalDecodingRequirements(void)
     110             : {
     111        2970 :     CheckSlotRequirements();
     112             : 
     113             :     /*
     114             :      * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
     115             :      * needs the same check.
     116             :      */
     117             : 
     118        2970 :     if (wal_level < WAL_LEVEL_LOGICAL)
     119           0 :         ereport(ERROR,
     120             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     121             :                  errmsg("logical decoding requires \"wal_level\" >= \"logical\"")));
     122             : 
     123        2970 :     if (MyDatabaseId == InvalidOid)
     124           2 :         ereport(ERROR,
     125             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     126             :                  errmsg("logical decoding requires a database connection")));
     127             : 
     128        2968 :     if (RecoveryInProgress())
     129             :     {
     130             :         /*
     131             :          * This check may have race conditions, but whenever
     132             :          * XLOG_PARAMETER_CHANGE indicates that wal_level has changed, we
     133             :          * verify that there are no existing logical replication slots. And to
     134             :          * avoid races around creating a new slot,
     135             :          * CheckLogicalDecodingRequirements() is called once before creating
     136             :          * the slot, and once when logical decoding is initially starting up.
     137             :          */
     138         138 :         if (GetActiveWalLevelOnStandby() < WAL_LEVEL_LOGICAL)
     139           2 :             ereport(ERROR,
     140             :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     141             :                      errmsg("logical decoding on standby requires \"wal_level\" >= \"logical\" on the primary")));
     142             :     }
     143        2966 : }
     144             : 
     145             : /*
     146             :  * Helper function for CreateInitDecodingContext() and
     147             :  * CreateDecodingContext() performing common tasks.
     148             :  */
     149             : static LogicalDecodingContext *
     150        2074 : StartupDecodingContext(List *output_plugin_options,
     151             :                        XLogRecPtr start_lsn,
     152             :                        TransactionId xmin_horizon,
     153             :                        bool need_full_snapshot,
     154             :                        bool fast_forward,
     155             :                        bool in_create,
     156             :                        XLogReaderRoutine *xl_routine,
     157             :                        LogicalOutputPluginWriterPrepareWrite prepare_write,
     158             :                        LogicalOutputPluginWriterWrite do_write,
     159             :                        LogicalOutputPluginWriterUpdateProgress update_progress)
     160             : {
     161             :     ReplicationSlot *slot;
     162             :     MemoryContext context,
     163             :                 old_context;
     164             :     LogicalDecodingContext *ctx;
     165             : 
     166             :     /* shorter lines... */
     167        2074 :     slot = MyReplicationSlot;
     168             : 
     169        2074 :     context = AllocSetContextCreate(CurrentMemoryContext,
     170             :                                     "Logical decoding context",
     171             :                                     ALLOCSET_DEFAULT_SIZES);
     172        2074 :     old_context = MemoryContextSwitchTo(context);
     173        2074 :     ctx = palloc0(sizeof(LogicalDecodingContext));
     174             : 
     175        2074 :     ctx->context = context;
     176             : 
     177             :     /*
     178             :      * (re-)load output plugins, so we detect a bad (removed) output plugin
     179             :      * now.
     180             :      */
     181        2074 :     if (!fast_forward)
     182        2036 :         LoadOutputPlugin(&ctx->callbacks, NameStr(slot->data.plugin));
     183             : 
     184             :     /*
     185             :      * Now that the slot's xmin has been set, we can announce ourselves as a
     186             :      * logical decoding backend which doesn't need to be checked individually
     187             :      * when computing the xmin horizon because the xmin is enforced via
     188             :      * replication slots.
     189             :      *
     190             :      * We can only do so if we're outside of a transaction (i.e. the case when
     191             :      * streaming changes via walsender), otherwise an already setup
     192             :      * snapshot/xid would end up being ignored. That's not a particularly
     193             :      * bothersome restriction since the SQL interface can't be used for
     194             :      * streaming anyway.
     195             :      */
     196        2072 :     if (!IsTransactionOrTransactionBlock())
     197             :     {
     198        1004 :         LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
     199        1004 :         MyProc->statusFlags |= PROC_IN_LOGICAL_DECODING;
     200        1004 :         ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags;
     201        1004 :         LWLockRelease(ProcArrayLock);
     202             :     }
     203             : 
     204        2072 :     ctx->slot = slot;
     205             : 
     206        2072 :     ctx->reader = XLogReaderAllocate(wal_segment_size, NULL, xl_routine, ctx);
     207        2072 :     if (!ctx->reader)
     208           0 :         ereport(ERROR,
     209             :                 (errcode(ERRCODE_OUT_OF_MEMORY),
     210             :                  errmsg("out of memory"),
     211             :                  errdetail("Failed while allocating a WAL reading processor.")));
     212             : 
     213        2072 :     ctx->reorder = ReorderBufferAllocate();
     214        2072 :     ctx->snapshot_builder =
     215        2072 :         AllocateSnapshotBuilder(ctx->reorder, xmin_horizon, start_lsn,
     216             :                                 need_full_snapshot, in_create, slot->data.two_phase_at);
     217             : 
     218        2072 :     ctx->reorder->private_data = ctx;
     219             : 
     220             :     /* wrap output plugin callbacks, so we can add error context information */
     221        2072 :     ctx->reorder->begin = begin_cb_wrapper;
     222        2072 :     ctx->reorder->apply_change = change_cb_wrapper;
     223        2072 :     ctx->reorder->apply_truncate = truncate_cb_wrapper;
     224        2072 :     ctx->reorder->commit = commit_cb_wrapper;
     225        2072 :     ctx->reorder->message = message_cb_wrapper;
     226             : 
     227             :     /*
     228             :      * To support streaming, we require start/stop/abort/commit/change
     229             :      * callbacks. The message and truncate callbacks are optional, similar to
     230             :      * regular output plugins. We however enable streaming when at least one
     231             :      * of the methods is enabled so that we can easily identify missing
     232             :      * methods.
     233             :      *
     234             :      * We decide it here, but only check it later in the wrappers.
     235             :      */
     236        4182 :     ctx->streaming = (ctx->callbacks.stream_start_cb != NULL) ||
     237          38 :         (ctx->callbacks.stream_stop_cb != NULL) ||
     238          38 :         (ctx->callbacks.stream_abort_cb != NULL) ||
     239          38 :         (ctx->callbacks.stream_commit_cb != NULL) ||
     240          38 :         (ctx->callbacks.stream_change_cb != NULL) ||
     241        2148 :         (ctx->callbacks.stream_message_cb != NULL) ||
     242          38 :         (ctx->callbacks.stream_truncate_cb != NULL);
     243             : 
     244             :     /*
     245             :      * streaming callbacks
     246             :      *
     247             :      * stream_message and stream_truncate callbacks are optional, so we do not
     248             :      * fail with ERROR when missing, but the wrappers simply do nothing. We
     249             :      * must set the ReorderBuffer callbacks to something, otherwise the calls
     250             :      * from there will crash (we don't want to move the checks there).
     251             :      */
     252        2072 :     ctx->reorder->stream_start = stream_start_cb_wrapper;
     253        2072 :     ctx->reorder->stream_stop = stream_stop_cb_wrapper;
     254        2072 :     ctx->reorder->stream_abort = stream_abort_cb_wrapper;
     255        2072 :     ctx->reorder->stream_prepare = stream_prepare_cb_wrapper;
     256        2072 :     ctx->reorder->stream_commit = stream_commit_cb_wrapper;
     257        2072 :     ctx->reorder->stream_change = stream_change_cb_wrapper;
     258        2072 :     ctx->reorder->stream_message = stream_message_cb_wrapper;
     259        2072 :     ctx->reorder->stream_truncate = stream_truncate_cb_wrapper;
     260             : 
     261             : 
     262             :     /*
     263             :      * To support two-phase logical decoding, we require
     264             :      * begin_prepare/prepare/commit-prepare/abort-prepare callbacks. The
     265             :      * filter_prepare callback is optional. We however enable two-phase
     266             :      * logical decoding when at least one of the methods is enabled so that we
     267             :      * can easily identify missing methods.
     268             :      *
     269             :      * We decide it here, but only check it later in the wrappers.
     270             :      */
     271        4182 :     ctx->twophase = (ctx->callbacks.begin_prepare_cb != NULL) ||
     272          38 :         (ctx->callbacks.prepare_cb != NULL) ||
     273          38 :         (ctx->callbacks.commit_prepared_cb != NULL) ||
     274          38 :         (ctx->callbacks.rollback_prepared_cb != NULL) ||
     275        2148 :         (ctx->callbacks.stream_prepare_cb != NULL) ||
     276          38 :         (ctx->callbacks.filter_prepare_cb != NULL);
     277             : 
     278             :     /*
     279             :      * Callback to support decoding at prepare time.
     280             :      */
     281        2072 :     ctx->reorder->begin_prepare = begin_prepare_cb_wrapper;
     282        2072 :     ctx->reorder->prepare = prepare_cb_wrapper;
     283        2072 :     ctx->reorder->commit_prepared = commit_prepared_cb_wrapper;
     284        2072 :     ctx->reorder->rollback_prepared = rollback_prepared_cb_wrapper;
     285             : 
     286             :     /*
     287             :      * Callback to support updating progress during sending data of a
     288             :      * transaction (and its subtransactions) to the output plugin.
     289             :      */
     290        2072 :     ctx->reorder->update_progress_txn = update_progress_txn_cb_wrapper;
     291             : 
     292        2072 :     ctx->out = makeStringInfo();
     293        2072 :     ctx->prepare_write = prepare_write;
     294        2072 :     ctx->write = do_write;
     295        2072 :     ctx->update_progress = update_progress;
     296             : 
     297        2072 :     ctx->output_plugin_options = output_plugin_options;
     298             : 
     299        2072 :     ctx->fast_forward = fast_forward;
     300             : 
     301        2072 :     MemoryContextSwitchTo(old_context);
     302             : 
     303        2072 :     return ctx;
     304             : }
     305             : 
     306             : /*
     307             :  * Create a new decoding context, for a new logical slot.
     308             :  *
     309             :  * plugin -- contains the name of the output plugin
     310             :  * output_plugin_options -- contains options passed to the output plugin
     311             :  * need_full_snapshot -- if true, must obtain a snapshot able to read all
     312             :  *      tables; if false, one that can read only catalogs is acceptable.
     313             :  * restart_lsn -- if given as invalid, it's this routine's responsibility to
     314             :  *      mark WAL as reserved by setting a convenient restart_lsn for the slot.
     315             :  *      Otherwise, we set for decoding to start from the given LSN without
     316             :  *      marking WAL reserved beforehand.  In that scenario, it's up to the
     317             :  *      caller to guarantee that WAL remains available.
     318             :  * xl_routine -- XLogReaderRoutine for underlying XLogReader
     319             :  * prepare_write, do_write, update_progress --
     320             :  *      callbacks that perform the use-case dependent, actual, work.
     321             :  *
     322             :  * Needs to be called while in a memory context that's at least as long lived
     323             :  * as the decoding context because further memory contexts will be created
     324             :  * inside it.
     325             :  *
     326             :  * Returns an initialized decoding context after calling the output plugin's
     327             :  * startup function.
     328             :  */
     329             : LogicalDecodingContext *
     330         890 : CreateInitDecodingContext(const char *plugin,
     331             :                           List *output_plugin_options,
     332             :                           bool need_full_snapshot,
     333             :                           XLogRecPtr restart_lsn,
     334             :                           XLogReaderRoutine *xl_routine,
     335             :                           LogicalOutputPluginWriterPrepareWrite prepare_write,
     336             :                           LogicalOutputPluginWriterWrite do_write,
     337             :                           LogicalOutputPluginWriterUpdateProgress update_progress)
     338             : {
     339         890 :     TransactionId xmin_horizon = InvalidTransactionId;
     340             :     ReplicationSlot *slot;
     341             :     NameData    plugin_name;
     342             :     LogicalDecodingContext *ctx;
     343             :     MemoryContext old_context;
     344             : 
     345             :     /*
     346             :      * On a standby, this check is also required while creating the slot.
     347             :      * Check the comments in the function.
     348             :      */
     349         890 :     CheckLogicalDecodingRequirements();
     350             : 
     351             :     /* shorter lines... */
     352         890 :     slot = MyReplicationSlot;
     353             : 
     354             :     /* first some sanity checks that are unlikely to be violated */
     355         890 :     if (slot == NULL)
     356           0 :         elog(ERROR, "cannot perform logical decoding without an acquired slot");
     357             : 
     358         890 :     if (plugin == NULL)
     359           0 :         elog(ERROR, "cannot initialize logical decoding without a specified plugin");
     360             : 
     361             :     /* Make sure the passed slot is suitable. These are user facing errors. */
     362         890 :     if (SlotIsPhysical(slot))
     363           0 :         ereport(ERROR,
     364             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     365             :                  errmsg("cannot use physical replication slot for logical decoding")));
     366             : 
     367         890 :     if (slot->data.database != MyDatabaseId)
     368           0 :         ereport(ERROR,
     369             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     370             :                  errmsg("replication slot \"%s\" was not created in this database",
     371             :                         NameStr(slot->data.name))));
     372             : 
     373        1514 :     if (IsTransactionState() &&
     374         624 :         GetTopTransactionIdIfAny() != InvalidTransactionId)
     375           4 :         ereport(ERROR,
     376             :                 (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
     377             :                  errmsg("cannot create logical replication slot in transaction that has performed writes")));
     378             : 
     379             :     /*
     380             :      * Register output plugin name with slot.  We need the mutex to avoid
     381             :      * concurrent reading of a partially copied string.  But we don't want any
     382             :      * complicated code while holding a spinlock, so do namestrcpy() outside.
     383             :      */
     384         886 :     namestrcpy(&plugin_name, plugin);
     385         886 :     SpinLockAcquire(&slot->mutex);
     386         886 :     slot->data.plugin = plugin_name;
     387         886 :     SpinLockRelease(&slot->mutex);
     388             : 
     389         886 :     if (XLogRecPtrIsInvalid(restart_lsn))
     390         874 :         ReplicationSlotReserveWal();
     391             :     else
     392             :     {
     393          12 :         SpinLockAcquire(&slot->mutex);
     394          12 :         slot->data.restart_lsn = restart_lsn;
     395          12 :         SpinLockRelease(&slot->mutex);
     396             :     }
     397             : 
     398             :     /* ----
     399             :      * This is a bit tricky: We need to determine a safe xmin horizon to start
     400             :      * decoding from, to avoid starting from a running xacts record referring
     401             :      * to xids whose rows have been vacuumed or pruned
     402             :      * already. GetOldestSafeDecodingTransactionId() returns such a value, but
     403             :      * without further interlock its return value might immediately be out of
     404             :      * date.
     405             :      *
     406             :      * So we have to acquire the ProcArrayLock to prevent computation of new
     407             :      * xmin horizons by other backends, get the safe decoding xid, and inform
     408             :      * the slot machinery about the new limit. Once that's done the
     409             :      * ProcArrayLock can be released as the slot machinery now is
     410             :      * protecting against vacuum.
     411             :      *
     412             :      * Note that, temporarily, the data, not just the catalog, xmin has to be
     413             :      * reserved if a data snapshot is to be exported.  Otherwise the initial
     414             :      * data snapshot created here is not guaranteed to be valid. After that
     415             :      * the data xmin doesn't need to be managed anymore and the global xmin
     416             :      * should be recomputed. As we are fine with losing the pegged data xmin
     417             :      * after crash - no chance a snapshot would get exported anymore - we can
     418             :      * get away with just setting the slot's
     419             :      * effective_xmin. ReplicationSlotRelease will reset it again.
     420             :      *
     421             :      * ----
     422             :      */
     423         886 :     LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
     424             : 
     425         886 :     xmin_horizon = GetOldestSafeDecodingTransactionId(!need_full_snapshot);
     426             : 
     427         886 :     SpinLockAcquire(&slot->mutex);
     428         886 :     slot->effective_catalog_xmin = xmin_horizon;
     429         886 :     slot->data.catalog_xmin = xmin_horizon;
     430         886 :     if (need_full_snapshot)
     431         382 :         slot->effective_xmin = xmin_horizon;
     432         886 :     SpinLockRelease(&slot->mutex);
     433             : 
     434         886 :     ReplicationSlotsComputeRequiredXmin(true);
     435             : 
     436         886 :     LWLockRelease(ProcArrayLock);
     437             : 
     438         886 :     ReplicationSlotMarkDirty();
     439         886 :     ReplicationSlotSave();
     440             : 
     441         886 :     ctx = StartupDecodingContext(NIL, restart_lsn, xmin_horizon,
     442             :                                  need_full_snapshot, false, true,
     443             :                                  xl_routine, prepare_write, do_write,
     444             :                                  update_progress);
     445             : 
     446             :     /* call output plugin initialization callback */
     447         884 :     old_context = MemoryContextSwitchTo(ctx->context);
     448         884 :     if (ctx->callbacks.startup_cb != NULL)
     449         884 :         startup_cb_wrapper(ctx, &ctx->options, true);
     450         884 :     MemoryContextSwitchTo(old_context);
     451             : 
     452             :     /*
     453             :      * We allow decoding of prepared transactions when the two_phase is
     454             :      * enabled at the time of slot creation, or when the two_phase option is
     455             :      * given at the streaming start, provided the plugin supports all the
     456             :      * callbacks for two-phase.
     457             :      */
     458         884 :     ctx->twophase &= slot->data.two_phase;
     459             : 
     460         884 :     ctx->reorder->output_rewrites = ctx->options.receive_rewrites;
     461             : 
     462         884 :     return ctx;
     463             : }
     464             : 
     465             : /*
     466             :  * Create a new decoding context, for a logical slot that has previously been
     467             :  * used already.
     468             :  *
     469             :  * start_lsn
     470             :  *      The LSN at which to start decoding.  If InvalidXLogRecPtr, restart
     471             :  *      from the slot's confirmed_flush; otherwise, start from the specified
     472             :  *      location (but move it forwards to confirmed_flush if it's older than
     473             :  *      that, see below).
     474             :  *
     475             :  * output_plugin_options
     476             :  *      options passed to the output plugin.
     477             :  *
     478             :  * fast_forward
     479             :  *      bypass the generation of logical changes.
     480             :  *
     481             :  * xl_routine
     482             :  *      XLogReaderRoutine used by underlying xlogreader
     483             :  *
     484             :  * prepare_write, do_write, update_progress
     485             :  *      callbacks that have to be filled to perform the use-case dependent,
     486             :  *      actual work.
     487             :  *
     488             :  * Needs to be called while in a memory context that's at least as long lived
     489             :  * as the decoding context because further memory contexts will be created
     490             :  * inside it.
     491             :  *
     492             :  * Returns an initialized decoding context after calling the output plugin's
     493             :  * startup function.
     494             :  */
     495             : LogicalDecodingContext *
     496        1198 : CreateDecodingContext(XLogRecPtr start_lsn,
     497             :                       List *output_plugin_options,
     498             :                       bool fast_forward,
     499             :                       XLogReaderRoutine *xl_routine,
     500             :                       LogicalOutputPluginWriterPrepareWrite prepare_write,
     501             :                       LogicalOutputPluginWriterWrite do_write,
     502             :                       LogicalOutputPluginWriterUpdateProgress update_progress)
     503             : {
     504             :     LogicalDecodingContext *ctx;
     505             :     ReplicationSlot *slot;
     506             :     MemoryContext old_context;
     507             : 
     508             :     /* shorter lines... */
     509        1198 :     slot = MyReplicationSlot;
     510             : 
     511             :     /* first some sanity checks that are unlikely to be violated */
     512        1198 :     if (slot == NULL)
     513           0 :         elog(ERROR, "cannot perform logical decoding without an acquired slot");
     514             : 
     515             :     /* make sure the passed slot is suitable, these are user facing errors */
     516        1198 :     if (SlotIsPhysical(slot))
     517           2 :         ereport(ERROR,
     518             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     519             :                  errmsg("cannot use physical replication slot for logical decoding")));
     520             : 
     521             :     /*
     522             :      * We need to access the system tables during decoding to build the
     523             :      * logical changes unless we are in fast_forward mode where no changes are
     524             :      * generated.
     525             :      */
     526        1196 :     if (slot->data.database != MyDatabaseId && !fast_forward)
     527           6 :         ereport(ERROR,
     528             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     529             :                  errmsg("replication slot \"%s\" was not created in this database",
     530             :                         NameStr(slot->data.name))));
     531             : 
     532             :     /*
     533             :      * The slots being synced from the primary can't be used for decoding as
     534             :      * they are used after failover. However, we do allow advancing the LSNs
     535             :      * during the synchronization of slots. See update_local_synced_slot.
     536             :      */
     537        1190 :     if (RecoveryInProgress() && slot->data.synced && !IsSyncingReplicationSlots())
     538           2 :         ereport(ERROR,
     539             :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     540             :                 errmsg("cannot use replication slot \"%s\" for logical decoding",
     541             :                        NameStr(slot->data.name)),
     542             :                 errdetail("This replication slot is being synchronized from the primary server."),
     543             :                 errhint("Specify another replication slot."));
     544             : 
     545             :     /* slot must be valid to allow decoding */
     546             :     Assert(slot->data.invalidated == RS_INVAL_NONE);
     547             :     Assert(slot->data.restart_lsn != InvalidXLogRecPtr);
     548             : 
     549        1188 :     if (start_lsn == InvalidXLogRecPtr)
     550             :     {
     551             :         /* continue from last position */
     552         710 :         start_lsn = slot->data.confirmed_flush;
     553             :     }
     554         478 :     else if (start_lsn < slot->data.confirmed_flush)
     555             :     {
     556             :         /*
     557             :          * It might seem like we should error out in this case, but it's
     558             :          * pretty common for a client to acknowledge a LSN it doesn't have to
     559             :          * do anything for, and thus didn't store persistently, because the
     560             :          * xlog records didn't result in anything relevant for logical
     561             :          * decoding. Clients have to be able to do that to support synchronous
     562             :          * replication.
     563             :          *
     564             :          * Starting at a different LSN than requested might not catch certain
     565             :          * kinds of client errors; so the client may wish to check that
     566             :          * confirmed_flush_lsn matches its expectations.
     567             :          */
     568          36 :         elog(LOG, "%X/%X has been already streamed, forwarding to %X/%X",
     569             :              LSN_FORMAT_ARGS(start_lsn),
     570             :              LSN_FORMAT_ARGS(slot->data.confirmed_flush));
     571             : 
     572          36 :         start_lsn = slot->data.confirmed_flush;
     573             :     }
     574             : 
     575        1188 :     ctx = StartupDecodingContext(output_plugin_options,
     576             :                                  start_lsn, InvalidTransactionId, false,
     577             :                                  fast_forward, false, xl_routine, prepare_write,
     578             :                                  do_write, update_progress);
     579             : 
     580             :     /* call output plugin initialization callback */
     581        1188 :     old_context = MemoryContextSwitchTo(ctx->context);
     582        1188 :     if (ctx->callbacks.startup_cb != NULL)
     583        1150 :         startup_cb_wrapper(ctx, &ctx->options, false);
     584        1182 :     MemoryContextSwitchTo(old_context);
     585             : 
     586             :     /*
     587             :      * We allow decoding of prepared transactions when the two_phase is
     588             :      * enabled at the time of slot creation, or when the two_phase option is
     589             :      * given at the streaming start, provided the plugin supports all the
     590             :      * callbacks for two-phase.
     591             :      */
     592        1182 :     ctx->twophase &= (slot->data.two_phase || ctx->twophase_opt_given);
     593             : 
     594             :     /* Mark slot to allow two_phase decoding if not already marked */
     595        1182 :     if (ctx->twophase && !slot->data.two_phase)
     596             :     {
     597          16 :         SpinLockAcquire(&slot->mutex);
     598          16 :         slot->data.two_phase = true;
     599          16 :         slot->data.two_phase_at = start_lsn;
     600          16 :         SpinLockRelease(&slot->mutex);
     601          16 :         ReplicationSlotMarkDirty();
     602          16 :         ReplicationSlotSave();
     603          16 :         SnapBuildSetTwoPhaseAt(ctx->snapshot_builder, start_lsn);
     604             :     }
     605             : 
     606        1182 :     ctx->reorder->output_rewrites = ctx->options.receive_rewrites;
     607             : 
     608        1182 :     ereport(LOG,
     609             :             (errmsg("starting logical decoding for slot \"%s\"",
     610             :                     NameStr(slot->data.name)),
     611             :              errdetail("Streaming transactions committing after %X/%X, reading WAL from %X/%X.",
     612             :                        LSN_FORMAT_ARGS(slot->data.confirmed_flush),
     613             :                        LSN_FORMAT_ARGS(slot->data.restart_lsn))));
     614             : 
     615        1182 :     return ctx;
     616             : }
     617             : 
     618             : /*
     619             :  * Returns true if a consistent initial decoding snapshot has been built.
     620             :  */
     621             : bool
     622         956 : DecodingContextReady(LogicalDecodingContext *ctx)
     623             : {
     624         956 :     return SnapBuildCurrentState(ctx->snapshot_builder) == SNAPBUILD_CONSISTENT;
     625             : }
     626             : 
     627             : /*
     628             :  * Read from the decoding slot, until it is ready to start extracting changes.
     629             :  */
     630             : void
     631         872 : DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
     632             : {
     633         872 :     ReplicationSlot *slot = ctx->slot;
     634             : 
     635             :     /* Initialize from where to start reading WAL. */
     636         872 :     XLogBeginRead(ctx->reader, slot->data.restart_lsn);
     637             : 
     638         872 :     elog(DEBUG1, "searching for logical decoding starting point, starting at %X/%X",
     639             :          LSN_FORMAT_ARGS(slot->data.restart_lsn));
     640             : 
     641             :     /* Wait for a consistent starting point */
     642             :     for (;;)
     643          78 :     {
     644             :         XLogRecord *record;
     645         950 :         char       *err = NULL;
     646             : 
     647             :         /* the read_page callback waits for new WAL */
     648         950 :         record = XLogReadRecord(ctx->reader, &err);
     649         950 :         if (err)
     650           0 :             elog(ERROR, "could not find logical decoding starting point: %s", err);
     651         950 :         if (!record)
     652           0 :             elog(ERROR, "could not find logical decoding starting point");
     653             : 
     654         950 :         LogicalDecodingProcessRecord(ctx, ctx->reader);
     655             : 
     656             :         /* only continue till we found a consistent spot */
     657         946 :         if (DecodingContextReady(ctx))
     658         868 :             break;
     659             : 
     660          78 :         CHECK_FOR_INTERRUPTS();
     661             :     }
     662             : 
     663         868 :     SpinLockAcquire(&slot->mutex);
     664         868 :     slot->data.confirmed_flush = ctx->reader->EndRecPtr;
     665         868 :     if (slot->data.two_phase)
     666          18 :         slot->data.two_phase_at = ctx->reader->EndRecPtr;
     667         868 :     SpinLockRelease(&slot->mutex);
     668         868 : }
     669             : 
     670             : /*
     671             :  * Free a previously allocated decoding context, invoking the shutdown
     672             :  * callback if necessary.
     673             :  */
     674             : void
     675        1698 : FreeDecodingContext(LogicalDecodingContext *ctx)
     676             : {
     677        1698 :     if (ctx->callbacks.shutdown_cb != NULL)
     678        1660 :         shutdown_cb_wrapper(ctx);
     679             : 
     680        1698 :     ReorderBufferFree(ctx->reorder);
     681        1698 :     FreeSnapshotBuilder(ctx->snapshot_builder);
     682        1698 :     XLogReaderFree(ctx->reader);
     683        1698 :     MemoryContextDelete(ctx->context);
     684        1698 : }
     685             : 
     686             : /*
     687             :  * Prepare a write using the context's output routine.
     688             :  */
     689             : void
     690      672244 : OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
     691             : {
     692      672244 :     if (!ctx->accept_writes)
     693           0 :         elog(ERROR, "writes are only accepted in commit, begin and change callbacks");
     694             : 
     695      672244 :     ctx->prepare_write(ctx, ctx->write_location, ctx->write_xid, last_write);
     696      672244 :     ctx->prepared_write = true;
     697      672244 : }
     698             : 
     699             : /*
     700             :  * Perform a write using the context's output routine.
     701             :  */
     702             : void
     703      672244 : OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
     704             : {
     705      672244 :     if (!ctx->prepared_write)
     706           0 :         elog(ERROR, "OutputPluginPrepareWrite needs to be called before OutputPluginWrite");
     707             : 
     708      672244 :     ctx->write(ctx, ctx->write_location, ctx->write_xid, last_write);
     709      672238 :     ctx->prepared_write = false;
     710      672238 : }
     711             : 
     712             : /*
     713             :  * Update progress tracking (if supported).
     714             :  */
     715             : void
     716        7920 : OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx,
     717             :                            bool skipped_xact)
     718             : {
     719        7920 :     if (!ctx->update_progress)
     720        3172 :         return;
     721             : 
     722        4748 :     ctx->update_progress(ctx, ctx->write_location, ctx->write_xid,
     723             :                          skipped_xact);
     724             : }
     725             : 
     726             : /*
     727             :  * Load the output plugin, lookup its output plugin init function, and check
     728             :  * that it provides the required callbacks.
     729             :  */
     730             : static void
     731        2036 : LoadOutputPlugin(OutputPluginCallbacks *callbacks, const char *plugin)
     732             : {
     733             :     LogicalOutputPluginInit plugin_init;
     734             : 
     735        2034 :     plugin_init = (LogicalOutputPluginInit)
     736        2036 :         load_external_function(plugin, "_PG_output_plugin_init", false, NULL);
     737             : 
     738        2034 :     if (plugin_init == NULL)
     739           0 :         elog(ERROR, "output plugins have to declare the _PG_output_plugin_init symbol");
     740             : 
     741             :     /* ask the output plugin to fill the callback struct */
     742        2034 :     plugin_init(callbacks);
     743             : 
     744        2034 :     if (callbacks->begin_cb == NULL)
     745           0 :         elog(ERROR, "output plugins have to register a begin callback");
     746        2034 :     if (callbacks->change_cb == NULL)
     747           0 :         elog(ERROR, "output plugins have to register a change callback");
     748        2034 :     if (callbacks->commit_cb == NULL)
     749           0 :         elog(ERROR, "output plugins have to register a commit callback");
     750        2034 : }
     751             : 
     752             : static void
     753          20 : output_plugin_error_callback(void *arg)
     754             : {
     755          20 :     LogicalErrorCallbackState *state = (LogicalErrorCallbackState *) arg;
     756             : 
     757             :     /* not all callbacks have an associated LSN  */
     758          20 :     if (state->report_location != InvalidXLogRecPtr)
     759          14 :         errcontext("slot \"%s\", output plugin \"%s\", in the %s callback, associated LSN %X/%X",
     760          14 :                    NameStr(state->ctx->slot->data.name),
     761          14 :                    NameStr(state->ctx->slot->data.plugin),
     762             :                    state->callback_name,
     763          14 :                    LSN_FORMAT_ARGS(state->report_location));
     764             :     else
     765           6 :         errcontext("slot \"%s\", output plugin \"%s\", in the %s callback",
     766           6 :                    NameStr(state->ctx->slot->data.name),
     767           6 :                    NameStr(state->ctx->slot->data.plugin),
     768             :                    state->callback_name);
     769          20 : }
     770             : 
     771             : static void
     772        2034 : startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
     773             : {
     774             :     LogicalErrorCallbackState state;
     775             :     ErrorContextCallback errcallback;
     776             : 
     777             :     Assert(!ctx->fast_forward);
     778             : 
     779             :     /* Push callback + info on the error context stack */
     780        2034 :     state.ctx = ctx;
     781        2034 :     state.callback_name = "startup";
     782        2034 :     state.report_location = InvalidXLogRecPtr;
     783        2034 :     errcallback.callback = output_plugin_error_callback;
     784        2034 :     errcallback.arg = &state;
     785        2034 :     errcallback.previous = error_context_stack;
     786        2034 :     error_context_stack = &errcallback;
     787             : 
     788             :     /* set output state */
     789        2034 :     ctx->accept_writes = false;
     790        2034 :     ctx->end_xact = false;
     791             : 
     792             :     /* do the actual work: call callback */
     793        2034 :     ctx->callbacks.startup_cb(ctx, opt, is_init);
     794             : 
     795             :     /* Pop the error context stack */
     796        2028 :     error_context_stack = errcallback.previous;
     797        2028 : }
     798             : 
     799             : static void
     800        1660 : shutdown_cb_wrapper(LogicalDecodingContext *ctx)
     801             : {
     802             :     LogicalErrorCallbackState state;
     803             :     ErrorContextCallback errcallback;
     804             : 
     805             :     Assert(!ctx->fast_forward);
     806             : 
     807             :     /* Push callback + info on the error context stack */
     808        1660 :     state.ctx = ctx;
     809        1660 :     state.callback_name = "shutdown";
     810        1660 :     state.report_location = InvalidXLogRecPtr;
     811        1660 :     errcallback.callback = output_plugin_error_callback;
     812        1660 :     errcallback.arg = &state;
     813        1660 :     errcallback.previous = error_context_stack;
     814        1660 :     error_context_stack = &errcallback;
     815             : 
     816             :     /* set output state */
     817        1660 :     ctx->accept_writes = false;
     818        1660 :     ctx->end_xact = false;
     819             : 
     820             :     /* do the actual work: call callback */
     821        1660 :     ctx->callbacks.shutdown_cb(ctx);
     822             : 
     823             :     /* Pop the error context stack */
     824        1660 :     error_context_stack = errcallback.previous;
     825        1660 : }
     826             : 
     827             : 
     828             : /*
     829             :  * Callbacks for ReorderBuffer which add in some more information and then call
     830             :  * output_plugin.h plugins.
     831             :  */
     832             : static void
     833        2378 : begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
     834             : {
     835        2378 :     LogicalDecodingContext *ctx = cache->private_data;
     836             :     LogicalErrorCallbackState state;
     837             :     ErrorContextCallback errcallback;
     838             : 
     839             :     Assert(!ctx->fast_forward);
     840             : 
     841             :     /* Push callback + info on the error context stack */
     842        2378 :     state.ctx = ctx;
     843        2378 :     state.callback_name = "begin";
     844        2378 :     state.report_location = txn->first_lsn;
     845        2378 :     errcallback.callback = output_plugin_error_callback;
     846        2378 :     errcallback.arg = &state;
     847        2378 :     errcallback.previous = error_context_stack;
     848        2378 :     error_context_stack = &errcallback;
     849             : 
     850             :     /* set output state */
     851        2378 :     ctx->accept_writes = true;
     852        2378 :     ctx->write_xid = txn->xid;
     853        2378 :     ctx->write_location = txn->first_lsn;
     854        2378 :     ctx->end_xact = false;
     855             : 
     856             :     /* do the actual work: call callback */
     857        2378 :     ctx->callbacks.begin_cb(ctx, txn);
     858             : 
     859             :     /* Pop the error context stack */
     860        2378 :     error_context_stack = errcallback.previous;
     861        2378 : }
     862             : 
     863             : static void
     864        2372 : commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
     865             :                   XLogRecPtr commit_lsn)
     866             : {
     867        2372 :     LogicalDecodingContext *ctx = cache->private_data;
     868             :     LogicalErrorCallbackState state;
     869             :     ErrorContextCallback errcallback;
     870             : 
     871             :     Assert(!ctx->fast_forward);
     872             : 
     873             :     /* Push callback + info on the error context stack */
     874        2372 :     state.ctx = ctx;
     875        2372 :     state.callback_name = "commit";
     876        2372 :     state.report_location = txn->final_lsn; /* beginning of commit record */
     877        2372 :     errcallback.callback = output_plugin_error_callback;
     878        2372 :     errcallback.arg = &state;
     879        2372 :     errcallback.previous = error_context_stack;
     880        2372 :     error_context_stack = &errcallback;
     881             : 
     882             :     /* set output state */
     883        2372 :     ctx->accept_writes = true;
     884        2372 :     ctx->write_xid = txn->xid;
     885        2372 :     ctx->write_location = txn->end_lsn; /* points to the end of the record */
     886        2372 :     ctx->end_xact = true;
     887             : 
     888             :     /* do the actual work: call callback */
     889        2372 :     ctx->callbacks.commit_cb(ctx, txn, commit_lsn);
     890             : 
     891             :     /* Pop the error context stack */
     892        2370 :     error_context_stack = errcallback.previous;
     893        2370 : }
     894             : 
     895             : /*
     896             :  * The functionality of begin_prepare is quite similar to begin with the
     897             :  * exception that this will have gid (global transaction id) information which
     898             :  * can be used by plugin. Now, we thought about extending the existing begin
     899             :  * but that would break the replication protocol and additionally this looks
     900             :  * cleaner.
     901             :  */
     902             : static void
     903          60 : begin_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
     904             : {
     905          60 :     LogicalDecodingContext *ctx = cache->private_data;
     906             :     LogicalErrorCallbackState state;
     907             :     ErrorContextCallback errcallback;
     908             : 
     909             :     Assert(!ctx->fast_forward);
     910             : 
     911             :     /* We're only supposed to call this when two-phase commits are supported */
     912             :     Assert(ctx->twophase);
     913             : 
     914             :     /* Push callback + info on the error context stack */
     915          60 :     state.ctx = ctx;
     916          60 :     state.callback_name = "begin_prepare";
     917          60 :     state.report_location = txn->first_lsn;
     918          60 :     errcallback.callback = output_plugin_error_callback;
     919          60 :     errcallback.arg = &state;
     920          60 :     errcallback.previous = error_context_stack;
     921          60 :     error_context_stack = &errcallback;
     922             : 
     923             :     /* set output state */
     924          60 :     ctx->accept_writes = true;
     925          60 :     ctx->write_xid = txn->xid;
     926          60 :     ctx->write_location = txn->first_lsn;
     927          60 :     ctx->end_xact = false;
     928             : 
     929             :     /*
     930             :      * If the plugin supports two-phase commits then begin prepare callback is
     931             :      * mandatory
     932             :      */
     933          60 :     if (ctx->callbacks.begin_prepare_cb == NULL)
     934           0 :         ereport(ERROR,
     935             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     936             :                  errmsg("logical replication at prepare time requires a %s callback",
     937             :                         "begin_prepare_cb")));
     938             : 
     939             :     /* do the actual work: call callback */
     940          60 :     ctx->callbacks.begin_prepare_cb(ctx, txn);
     941             : 
     942             :     /* Pop the error context stack */
     943          60 :     error_context_stack = errcallback.previous;
     944          60 : }
     945             : 
     946             : static void
     947          60 : prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
     948             :                    XLogRecPtr prepare_lsn)
     949             : {
     950          60 :     LogicalDecodingContext *ctx = cache->private_data;
     951             :     LogicalErrorCallbackState state;
     952             :     ErrorContextCallback errcallback;
     953             : 
     954             :     Assert(!ctx->fast_forward);
     955             : 
     956             :     /* We're only supposed to call this when two-phase commits are supported */
     957             :     Assert(ctx->twophase);
     958             : 
     959             :     /* Push callback + info on the error context stack */
     960          60 :     state.ctx = ctx;
     961          60 :     state.callback_name = "prepare";
     962          60 :     state.report_location = txn->final_lsn; /* beginning of prepare record */
     963          60 :     errcallback.callback = output_plugin_error_callback;
     964          60 :     errcallback.arg = &state;
     965          60 :     errcallback.previous = error_context_stack;
     966          60 :     error_context_stack = &errcallback;
     967             : 
     968             :     /* set output state */
     969          60 :     ctx->accept_writes = true;
     970          60 :     ctx->write_xid = txn->xid;
     971          60 :     ctx->write_location = txn->end_lsn; /* points to the end of the record */
     972          60 :     ctx->end_xact = true;
     973             : 
     974             :     /*
     975             :      * If the plugin supports two-phase commits then prepare callback is
     976             :      * mandatory
     977             :      */
     978          60 :     if (ctx->callbacks.prepare_cb == NULL)
     979           0 :         ereport(ERROR,
     980             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     981             :                  errmsg("logical replication at prepare time requires a %s callback",
     982             :                         "prepare_cb")));
     983             : 
     984             :     /* do the actual work: call callback */
     985          60 :     ctx->callbacks.prepare_cb(ctx, txn, prepare_lsn);
     986             : 
     987             :     /* Pop the error context stack */
     988          60 :     error_context_stack = errcallback.previous;
     989          60 : }
     990             : 
     991             : static void
     992          64 : commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
     993             :                            XLogRecPtr commit_lsn)
     994             : {
     995          64 :     LogicalDecodingContext *ctx = cache->private_data;
     996             :     LogicalErrorCallbackState state;
     997             :     ErrorContextCallback errcallback;
     998             : 
     999             :     Assert(!ctx->fast_forward);
    1000             : 
    1001             :     /* We're only supposed to call this when two-phase commits are supported */
    1002             :     Assert(ctx->twophase);
    1003             : 
    1004             :     /* Push callback + info on the error context stack */
    1005          64 :     state.ctx = ctx;
    1006          64 :     state.callback_name = "commit_prepared";
    1007          64 :     state.report_location = txn->final_lsn; /* beginning of commit record */
    1008          64 :     errcallback.callback = output_plugin_error_callback;
    1009          64 :     errcallback.arg = &state;
    1010          64 :     errcallback.previous = error_context_stack;
    1011          64 :     error_context_stack = &errcallback;
    1012             : 
    1013             :     /* set output state */
    1014          64 :     ctx->accept_writes = true;
    1015          64 :     ctx->write_xid = txn->xid;
    1016          64 :     ctx->write_location = txn->end_lsn; /* points to the end of the record */
    1017          64 :     ctx->end_xact = true;
    1018             : 
    1019             :     /*
    1020             :      * If the plugin support two-phase commits then commit prepared callback
    1021             :      * is mandatory
    1022             :      */
    1023          64 :     if (ctx->callbacks.commit_prepared_cb == NULL)
    1024           0 :         ereport(ERROR,
    1025             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1026             :                  errmsg("logical replication at prepare time requires a %s callback",
    1027             :                         "commit_prepared_cb")));
    1028             : 
    1029             :     /* do the actual work: call callback */
    1030          64 :     ctx->callbacks.commit_prepared_cb(ctx, txn, commit_lsn);
    1031             : 
    1032             :     /* Pop the error context stack */
    1033          64 :     error_context_stack = errcallback.previous;
    1034          64 : }
    1035             : 
    1036             : static void
    1037          22 : rollback_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1038             :                              XLogRecPtr prepare_end_lsn,
    1039             :                              TimestampTz prepare_time)
    1040             : {
    1041          22 :     LogicalDecodingContext *ctx = cache->private_data;
    1042             :     LogicalErrorCallbackState state;
    1043             :     ErrorContextCallback errcallback;
    1044             : 
    1045             :     Assert(!ctx->fast_forward);
    1046             : 
    1047             :     /* We're only supposed to call this when two-phase commits are supported */
    1048             :     Assert(ctx->twophase);
    1049             : 
    1050             :     /* Push callback + info on the error context stack */
    1051          22 :     state.ctx = ctx;
    1052          22 :     state.callback_name = "rollback_prepared";
    1053          22 :     state.report_location = txn->final_lsn; /* beginning of commit record */
    1054          22 :     errcallback.callback = output_plugin_error_callback;
    1055          22 :     errcallback.arg = &state;
    1056          22 :     errcallback.previous = error_context_stack;
    1057          22 :     error_context_stack = &errcallback;
    1058             : 
    1059             :     /* set output state */
    1060          22 :     ctx->accept_writes = true;
    1061          22 :     ctx->write_xid = txn->xid;
    1062          22 :     ctx->write_location = txn->end_lsn; /* points to the end of the record */
    1063          22 :     ctx->end_xact = true;
    1064             : 
    1065             :     /*
    1066             :      * If the plugin support two-phase commits then rollback prepared callback
    1067             :      * is mandatory
    1068             :      */
    1069          22 :     if (ctx->callbacks.rollback_prepared_cb == NULL)
    1070           0 :         ereport(ERROR,
    1071             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1072             :                  errmsg("logical replication at prepare time requires a %s callback",
    1073             :                         "rollback_prepared_cb")));
    1074             : 
    1075             :     /* do the actual work: call callback */
    1076          22 :     ctx->callbacks.rollback_prepared_cb(ctx, txn, prepare_end_lsn,
    1077             :                                         prepare_time);
    1078             : 
    1079             :     /* Pop the error context stack */
    1080          22 :     error_context_stack = errcallback.previous;
    1081          22 : }
    1082             : 
    1083             : static void
    1084      315994 : change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1085             :                   Relation relation, ReorderBufferChange *change)
    1086             : {
    1087      315994 :     LogicalDecodingContext *ctx = cache->private_data;
    1088             :     LogicalErrorCallbackState state;
    1089             :     ErrorContextCallback errcallback;
    1090             : 
    1091             :     Assert(!ctx->fast_forward);
    1092             : 
    1093             :     /* Push callback + info on the error context stack */
    1094      315994 :     state.ctx = ctx;
    1095      315994 :     state.callback_name = "change";
    1096      315994 :     state.report_location = change->lsn;
    1097      315994 :     errcallback.callback = output_plugin_error_callback;
    1098      315994 :     errcallback.arg = &state;
    1099      315994 :     errcallback.previous = error_context_stack;
    1100      315994 :     error_context_stack = &errcallback;
    1101             : 
    1102             :     /* set output state */
    1103      315994 :     ctx->accept_writes = true;
    1104      315994 :     ctx->write_xid = txn->xid;
    1105             : 
    1106             :     /*
    1107             :      * Report this change's lsn so replies from clients can give an up-to-date
    1108             :      * answer. This won't ever be enough (and shouldn't be!) to confirm
    1109             :      * receipt of this transaction, but it might allow another transaction's
    1110             :      * commit to be confirmed with one message.
    1111             :      */
    1112      315994 :     ctx->write_location = change->lsn;
    1113             : 
    1114      315994 :     ctx->end_xact = false;
    1115             : 
    1116      315994 :     ctx->callbacks.change_cb(ctx, txn, relation, change);
    1117             : 
    1118             :     /* Pop the error context stack */
    1119      315988 :     error_context_stack = errcallback.previous;
    1120      315988 : }
    1121             : 
    1122             : static void
    1123          44 : truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1124             :                     int nrelations, Relation relations[], ReorderBufferChange *change)
    1125             : {
    1126          44 :     LogicalDecodingContext *ctx = cache->private_data;
    1127             :     LogicalErrorCallbackState state;
    1128             :     ErrorContextCallback errcallback;
    1129             : 
    1130             :     Assert(!ctx->fast_forward);
    1131             : 
    1132          44 :     if (!ctx->callbacks.truncate_cb)
    1133           0 :         return;
    1134             : 
    1135             :     /* Push callback + info on the error context stack */
    1136          44 :     state.ctx = ctx;
    1137          44 :     state.callback_name = "truncate";
    1138          44 :     state.report_location = change->lsn;
    1139          44 :     errcallback.callback = output_plugin_error_callback;
    1140          44 :     errcallback.arg = &state;
    1141          44 :     errcallback.previous = error_context_stack;
    1142          44 :     error_context_stack = &errcallback;
    1143             : 
    1144             :     /* set output state */
    1145          44 :     ctx->accept_writes = true;
    1146          44 :     ctx->write_xid = txn->xid;
    1147             : 
    1148             :     /*
    1149             :      * Report this change's lsn so replies from clients can give an up-to-date
    1150             :      * answer. This won't ever be enough (and shouldn't be!) to confirm
    1151             :      * receipt of this transaction, but it might allow another transaction's
    1152             :      * commit to be confirmed with one message.
    1153             :      */
    1154          44 :     ctx->write_location = change->lsn;
    1155             : 
    1156          44 :     ctx->end_xact = false;
    1157             : 
    1158          44 :     ctx->callbacks.truncate_cb(ctx, txn, nrelations, relations, change);
    1159             : 
    1160             :     /* Pop the error context stack */
    1161          44 :     error_context_stack = errcallback.previous;
    1162             : }
    1163             : 
    1164             : bool
    1165         296 : filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, TransactionId xid,
    1166             :                           const char *gid)
    1167             : {
    1168             :     LogicalErrorCallbackState state;
    1169             :     ErrorContextCallback errcallback;
    1170             :     bool        ret;
    1171             : 
    1172             :     Assert(!ctx->fast_forward);
    1173             : 
    1174             :     /* Push callback + info on the error context stack */
    1175         296 :     state.ctx = ctx;
    1176         296 :     state.callback_name = "filter_prepare";
    1177         296 :     state.report_location = InvalidXLogRecPtr;
    1178         296 :     errcallback.callback = output_plugin_error_callback;
    1179         296 :     errcallback.arg = &state;
    1180         296 :     errcallback.previous = error_context_stack;
    1181         296 :     error_context_stack = &errcallback;
    1182             : 
    1183             :     /* set output state */
    1184         296 :     ctx->accept_writes = false;
    1185         296 :     ctx->end_xact = false;
    1186             : 
    1187             :     /* do the actual work: call callback */
    1188         296 :     ret = ctx->callbacks.filter_prepare_cb(ctx, xid, gid);
    1189             : 
    1190             :     /* Pop the error context stack */
    1191         296 :     error_context_stack = errcallback.previous;
    1192             : 
    1193         296 :     return ret;
    1194             : }
    1195             : 
    1196             : bool
    1197     3383146 : filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
    1198             : {
    1199             :     LogicalErrorCallbackState state;
    1200             :     ErrorContextCallback errcallback;
    1201             :     bool        ret;
    1202             : 
    1203             :     Assert(!ctx->fast_forward);
    1204             : 
    1205             :     /* Push callback + info on the error context stack */
    1206     3383146 :     state.ctx = ctx;
    1207     3383146 :     state.callback_name = "filter_by_origin";
    1208     3383146 :     state.report_location = InvalidXLogRecPtr;
    1209     3383146 :     errcallback.callback = output_plugin_error_callback;
    1210     3383146 :     errcallback.arg = &state;
    1211     3383146 :     errcallback.previous = error_context_stack;
    1212     3383146 :     error_context_stack = &errcallback;
    1213             : 
    1214             :     /* set output state */
    1215     3383146 :     ctx->accept_writes = false;
    1216     3383146 :     ctx->end_xact = false;
    1217             : 
    1218             :     /* do the actual work: call callback */
    1219     3383146 :     ret = ctx->callbacks.filter_by_origin_cb(ctx, origin_id);
    1220             : 
    1221             :     /* Pop the error context stack */
    1222     3383146 :     error_context_stack = errcallback.previous;
    1223             : 
    1224     3383146 :     return ret;
    1225             : }
    1226             : 
    1227             : static void
    1228          32 : message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1229             :                    XLogRecPtr message_lsn, bool transactional,
    1230             :                    const char *prefix, Size message_size, const char *message)
    1231             : {
    1232          32 :     LogicalDecodingContext *ctx = cache->private_data;
    1233             :     LogicalErrorCallbackState state;
    1234             :     ErrorContextCallback errcallback;
    1235             : 
    1236             :     Assert(!ctx->fast_forward);
    1237             : 
    1238          32 :     if (ctx->callbacks.message_cb == NULL)
    1239           0 :         return;
    1240             : 
    1241             :     /* Push callback + info on the error context stack */
    1242          32 :     state.ctx = ctx;
    1243          32 :     state.callback_name = "message";
    1244          32 :     state.report_location = message_lsn;
    1245          32 :     errcallback.callback = output_plugin_error_callback;
    1246          32 :     errcallback.arg = &state;
    1247          32 :     errcallback.previous = error_context_stack;
    1248          32 :     error_context_stack = &errcallback;
    1249             : 
    1250             :     /* set output state */
    1251          32 :     ctx->accept_writes = true;
    1252          32 :     ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
    1253          32 :     ctx->write_location = message_lsn;
    1254          32 :     ctx->end_xact = false;
    1255             : 
    1256             :     /* do the actual work: call callback */
    1257          32 :     ctx->callbacks.message_cb(ctx, txn, message_lsn, transactional, prefix,
    1258             :                               message_size, message);
    1259             : 
    1260             :     /* Pop the error context stack */
    1261          32 :     error_context_stack = errcallback.previous;
    1262             : }
    1263             : 
    1264             : static void
    1265        1372 : stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1266             :                         XLogRecPtr first_lsn)
    1267             : {
    1268        1372 :     LogicalDecodingContext *ctx = cache->private_data;
    1269             :     LogicalErrorCallbackState state;
    1270             :     ErrorContextCallback errcallback;
    1271             : 
    1272             :     Assert(!ctx->fast_forward);
    1273             : 
    1274             :     /* We're only supposed to call this when streaming is supported. */
    1275             :     Assert(ctx->streaming);
    1276             : 
    1277             :     /* Push callback + info on the error context stack */
    1278        1372 :     state.ctx = ctx;
    1279        1372 :     state.callback_name = "stream_start";
    1280        1372 :     state.report_location = first_lsn;
    1281        1372 :     errcallback.callback = output_plugin_error_callback;
    1282        1372 :     errcallback.arg = &state;
    1283        1372 :     errcallback.previous = error_context_stack;
    1284        1372 :     error_context_stack = &errcallback;
    1285             : 
    1286             :     /* set output state */
    1287        1372 :     ctx->accept_writes = true;
    1288        1372 :     ctx->write_xid = txn->xid;
    1289             : 
    1290             :     /*
    1291             :      * Report this message's lsn so replies from clients can give an
    1292             :      * up-to-date answer. This won't ever be enough (and shouldn't be!) to
    1293             :      * confirm receipt of this transaction, but it might allow another
    1294             :      * transaction's commit to be confirmed with one message.
    1295             :      */
    1296        1372 :     ctx->write_location = first_lsn;
    1297             : 
    1298        1372 :     ctx->end_xact = false;
    1299             : 
    1300             :     /* in streaming mode, stream_start_cb is required */
    1301        1372 :     if (ctx->callbacks.stream_start_cb == NULL)
    1302           0 :         ereport(ERROR,
    1303             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1304             :                  errmsg("logical streaming requires a %s callback",
    1305             :                         "stream_start_cb")));
    1306             : 
    1307        1372 :     ctx->callbacks.stream_start_cb(ctx, txn);
    1308             : 
    1309             :     /* Pop the error context stack */
    1310        1372 :     error_context_stack = errcallback.previous;
    1311        1372 : }
    1312             : 
    1313             : static void
    1314        1372 : stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1315             :                        XLogRecPtr last_lsn)
    1316             : {
    1317        1372 :     LogicalDecodingContext *ctx = cache->private_data;
    1318             :     LogicalErrorCallbackState state;
    1319             :     ErrorContextCallback errcallback;
    1320             : 
    1321             :     Assert(!ctx->fast_forward);
    1322             : 
    1323             :     /* We're only supposed to call this when streaming is supported. */
    1324             :     Assert(ctx->streaming);
    1325             : 
    1326             :     /* Push callback + info on the error context stack */
    1327        1372 :     state.ctx = ctx;
    1328        1372 :     state.callback_name = "stream_stop";
    1329        1372 :     state.report_location = last_lsn;
    1330        1372 :     errcallback.callback = output_plugin_error_callback;
    1331        1372 :     errcallback.arg = &state;
    1332        1372 :     errcallback.previous = error_context_stack;
    1333        1372 :     error_context_stack = &errcallback;
    1334             : 
    1335             :     /* set output state */
    1336        1372 :     ctx->accept_writes = true;
    1337        1372 :     ctx->write_xid = txn->xid;
    1338             : 
    1339             :     /*
    1340             :      * Report this message's lsn so replies from clients can give an
    1341             :      * up-to-date answer. This won't ever be enough (and shouldn't be!) to
    1342             :      * confirm receipt of this transaction, but it might allow another
    1343             :      * transaction's commit to be confirmed with one message.
    1344             :      */
    1345        1372 :     ctx->write_location = last_lsn;
    1346             : 
    1347        1372 :     ctx->end_xact = false;
    1348             : 
    1349             :     /* in streaming mode, stream_stop_cb is required */
    1350        1372 :     if (ctx->callbacks.stream_stop_cb == NULL)
    1351           0 :         ereport(ERROR,
    1352             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1353             :                  errmsg("logical streaming requires a %s callback",
    1354             :                         "stream_stop_cb")));
    1355             : 
    1356        1372 :     ctx->callbacks.stream_stop_cb(ctx, txn);
    1357             : 
    1358             :     /* Pop the error context stack */
    1359        1372 :     error_context_stack = errcallback.previous;
    1360        1372 : }
    1361             : 
    1362             : static void
    1363          60 : stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1364             :                         XLogRecPtr abort_lsn)
    1365             : {
    1366          60 :     LogicalDecodingContext *ctx = cache->private_data;
    1367             :     LogicalErrorCallbackState state;
    1368             :     ErrorContextCallback errcallback;
    1369             : 
    1370             :     Assert(!ctx->fast_forward);
    1371             : 
    1372             :     /* We're only supposed to call this when streaming is supported. */
    1373             :     Assert(ctx->streaming);
    1374             : 
    1375             :     /* Push callback + info on the error context stack */
    1376          60 :     state.ctx = ctx;
    1377          60 :     state.callback_name = "stream_abort";
    1378          60 :     state.report_location = abort_lsn;
    1379          60 :     errcallback.callback = output_plugin_error_callback;
    1380          60 :     errcallback.arg = &state;
    1381          60 :     errcallback.previous = error_context_stack;
    1382          60 :     error_context_stack = &errcallback;
    1383             : 
    1384             :     /* set output state */
    1385          60 :     ctx->accept_writes = true;
    1386          60 :     ctx->write_xid = txn->xid;
    1387          60 :     ctx->write_location = abort_lsn;
    1388          60 :     ctx->end_xact = true;
    1389             : 
    1390             :     /* in streaming mode, stream_abort_cb is required */
    1391          60 :     if (ctx->callbacks.stream_abort_cb == NULL)
    1392           0 :         ereport(ERROR,
    1393             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1394             :                  errmsg("logical streaming requires a %s callback",
    1395             :                         "stream_abort_cb")));
    1396             : 
    1397          60 :     ctx->callbacks.stream_abort_cb(ctx, txn, abort_lsn);
    1398             : 
    1399             :     /* Pop the error context stack */
    1400          60 :     error_context_stack = errcallback.previous;
    1401          60 : }
    1402             : 
    1403             : static void
    1404          30 : stream_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1405             :                           XLogRecPtr prepare_lsn)
    1406             : {
    1407          30 :     LogicalDecodingContext *ctx = cache->private_data;
    1408             :     LogicalErrorCallbackState state;
    1409             :     ErrorContextCallback errcallback;
    1410             : 
    1411             :     Assert(!ctx->fast_forward);
    1412             : 
    1413             :     /*
    1414             :      * We're only supposed to call this when streaming and two-phase commits
    1415             :      * are supported.
    1416             :      */
    1417             :     Assert(ctx->streaming);
    1418             :     Assert(ctx->twophase);
    1419             : 
    1420             :     /* Push callback + info on the error context stack */
    1421          30 :     state.ctx = ctx;
    1422          30 :     state.callback_name = "stream_prepare";
    1423          30 :     state.report_location = txn->final_lsn;
    1424          30 :     errcallback.callback = output_plugin_error_callback;
    1425          30 :     errcallback.arg = &state;
    1426          30 :     errcallback.previous = error_context_stack;
    1427          30 :     error_context_stack = &errcallback;
    1428             : 
    1429             :     /* set output state */
    1430          30 :     ctx->accept_writes = true;
    1431          30 :     ctx->write_xid = txn->xid;
    1432          30 :     ctx->write_location = txn->end_lsn;
    1433          30 :     ctx->end_xact = true;
    1434             : 
    1435             :     /* in streaming mode with two-phase commits, stream_prepare_cb is required */
    1436          30 :     if (ctx->callbacks.stream_prepare_cb == NULL)
    1437           0 :         ereport(ERROR,
    1438             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1439             :                  errmsg("logical streaming at prepare time requires a %s callback",
    1440             :                         "stream_prepare_cb")));
    1441             : 
    1442          30 :     ctx->callbacks.stream_prepare_cb(ctx, txn, prepare_lsn);
    1443             : 
    1444             :     /* Pop the error context stack */
    1445          30 :     error_context_stack = errcallback.previous;
    1446          30 : }
    1447             : 
    1448             : static void
    1449         102 : stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1450             :                          XLogRecPtr commit_lsn)
    1451             : {
    1452         102 :     LogicalDecodingContext *ctx = cache->private_data;
    1453             :     LogicalErrorCallbackState state;
    1454             :     ErrorContextCallback errcallback;
    1455             : 
    1456             :     Assert(!ctx->fast_forward);
    1457             : 
    1458             :     /* We're only supposed to call this when streaming is supported. */
    1459             :     Assert(ctx->streaming);
    1460             : 
    1461             :     /* Push callback + info on the error context stack */
    1462         102 :     state.ctx = ctx;
    1463         102 :     state.callback_name = "stream_commit";
    1464         102 :     state.report_location = txn->final_lsn;
    1465         102 :     errcallback.callback = output_plugin_error_callback;
    1466         102 :     errcallback.arg = &state;
    1467         102 :     errcallback.previous = error_context_stack;
    1468         102 :     error_context_stack = &errcallback;
    1469             : 
    1470             :     /* set output state */
    1471         102 :     ctx->accept_writes = true;
    1472         102 :     ctx->write_xid = txn->xid;
    1473         102 :     ctx->write_location = txn->end_lsn;
    1474         102 :     ctx->end_xact = true;
    1475             : 
    1476             :     /* in streaming mode, stream_commit_cb is required */
    1477         102 :     if (ctx->callbacks.stream_commit_cb == NULL)
    1478           0 :         ereport(ERROR,
    1479             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1480             :                  errmsg("logical streaming requires a %s callback",
    1481             :                         "stream_commit_cb")));
    1482             : 
    1483         102 :     ctx->callbacks.stream_commit_cb(ctx, txn, commit_lsn);
    1484             : 
    1485             :     /* Pop the error context stack */
    1486         102 :     error_context_stack = errcallback.previous;
    1487         102 : }
    1488             : 
    1489             : static void
    1490      352012 : stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1491             :                          Relation relation, ReorderBufferChange *change)
    1492             : {
    1493      352012 :     LogicalDecodingContext *ctx = cache->private_data;
    1494             :     LogicalErrorCallbackState state;
    1495             :     ErrorContextCallback errcallback;
    1496             : 
    1497             :     Assert(!ctx->fast_forward);
    1498             : 
    1499             :     /* We're only supposed to call this when streaming is supported. */
    1500             :     Assert(ctx->streaming);
    1501             : 
    1502             :     /* Push callback + info on the error context stack */
    1503      352012 :     state.ctx = ctx;
    1504      352012 :     state.callback_name = "stream_change";
    1505      352012 :     state.report_location = change->lsn;
    1506      352012 :     errcallback.callback = output_plugin_error_callback;
    1507      352012 :     errcallback.arg = &state;
    1508      352012 :     errcallback.previous = error_context_stack;
    1509      352012 :     error_context_stack = &errcallback;
    1510             : 
    1511             :     /* set output state */
    1512      352012 :     ctx->accept_writes = true;
    1513      352012 :     ctx->write_xid = txn->xid;
    1514             : 
    1515             :     /*
    1516             :      * Report this change's lsn so replies from clients can give an up-to-date
    1517             :      * answer. This won't ever be enough (and shouldn't be!) to confirm
    1518             :      * receipt of this transaction, but it might allow another transaction's
    1519             :      * commit to be confirmed with one message.
    1520             :      */
    1521      352012 :     ctx->write_location = change->lsn;
    1522             : 
    1523      352012 :     ctx->end_xact = false;
    1524             : 
    1525             :     /* in streaming mode, stream_change_cb is required */
    1526      352012 :     if (ctx->callbacks.stream_change_cb == NULL)
    1527           0 :         ereport(ERROR,
    1528             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1529             :                  errmsg("logical streaming requires a %s callback",
    1530             :                         "stream_change_cb")));
    1531             : 
    1532      352012 :     ctx->callbacks.stream_change_cb(ctx, txn, relation, change);
    1533             : 
    1534             :     /* Pop the error context stack */
    1535      352012 :     error_context_stack = errcallback.previous;
    1536      352012 : }
    1537             : 
    1538             : static void
    1539           6 : stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1540             :                           XLogRecPtr message_lsn, bool transactional,
    1541             :                           const char *prefix, Size message_size, const char *message)
    1542             : {
    1543           6 :     LogicalDecodingContext *ctx = cache->private_data;
    1544             :     LogicalErrorCallbackState state;
    1545             :     ErrorContextCallback errcallback;
    1546             : 
    1547             :     Assert(!ctx->fast_forward);
    1548             : 
    1549             :     /* We're only supposed to call this when streaming is supported. */
    1550             :     Assert(ctx->streaming);
    1551             : 
    1552             :     /* this callback is optional */
    1553           6 :     if (ctx->callbacks.stream_message_cb == NULL)
    1554           0 :         return;
    1555             : 
    1556             :     /* Push callback + info on the error context stack */
    1557           6 :     state.ctx = ctx;
    1558           6 :     state.callback_name = "stream_message";
    1559           6 :     state.report_location = message_lsn;
    1560           6 :     errcallback.callback = output_plugin_error_callback;
    1561           6 :     errcallback.arg = &state;
    1562           6 :     errcallback.previous = error_context_stack;
    1563           6 :     error_context_stack = &errcallback;
    1564             : 
    1565             :     /* set output state */
    1566           6 :     ctx->accept_writes = true;
    1567           6 :     ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
    1568           6 :     ctx->write_location = message_lsn;
    1569           6 :     ctx->end_xact = false;
    1570             : 
    1571             :     /* do the actual work: call callback */
    1572           6 :     ctx->callbacks.stream_message_cb(ctx, txn, message_lsn, transactional, prefix,
    1573             :                                      message_size, message);
    1574             : 
    1575             :     /* Pop the error context stack */
    1576           6 :     error_context_stack = errcallback.previous;
    1577             : }
    1578             : 
    1579             : static void
    1580           0 : stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1581             :                            int nrelations, Relation relations[],
    1582             :                            ReorderBufferChange *change)
    1583             : {
    1584           0 :     LogicalDecodingContext *ctx = cache->private_data;
    1585             :     LogicalErrorCallbackState state;
    1586             :     ErrorContextCallback errcallback;
    1587             : 
    1588             :     Assert(!ctx->fast_forward);
    1589             : 
    1590             :     /* We're only supposed to call this when streaming is supported. */
    1591             :     Assert(ctx->streaming);
    1592             : 
    1593             :     /* this callback is optional */
    1594           0 :     if (!ctx->callbacks.stream_truncate_cb)
    1595           0 :         return;
    1596             : 
    1597             :     /* Push callback + info on the error context stack */
    1598           0 :     state.ctx = ctx;
    1599           0 :     state.callback_name = "stream_truncate";
    1600           0 :     state.report_location = change->lsn;
    1601           0 :     errcallback.callback = output_plugin_error_callback;
    1602           0 :     errcallback.arg = &state;
    1603           0 :     errcallback.previous = error_context_stack;
    1604           0 :     error_context_stack = &errcallback;
    1605             : 
    1606             :     /* set output state */
    1607           0 :     ctx->accept_writes = true;
    1608           0 :     ctx->write_xid = txn->xid;
    1609             : 
    1610             :     /*
    1611             :      * Report this change's lsn so replies from clients can give an up-to-date
    1612             :      * answer. This won't ever be enough (and shouldn't be!) to confirm
    1613             :      * receipt of this transaction, but it might allow another transaction's
    1614             :      * commit to be confirmed with one message.
    1615             :      */
    1616           0 :     ctx->write_location = change->lsn;
    1617             : 
    1618           0 :     ctx->end_xact = false;
    1619             : 
    1620           0 :     ctx->callbacks.stream_truncate_cb(ctx, txn, nrelations, relations, change);
    1621             : 
    1622             :     /* Pop the error context stack */
    1623           0 :     error_context_stack = errcallback.previous;
    1624             : }
    1625             : 
    1626             : static void
    1627        6198 : update_progress_txn_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1628             :                                XLogRecPtr lsn)
    1629             : {
    1630        6198 :     LogicalDecodingContext *ctx = cache->private_data;
    1631             :     LogicalErrorCallbackState state;
    1632             :     ErrorContextCallback errcallback;
    1633             : 
    1634             :     Assert(!ctx->fast_forward);
    1635             : 
    1636             :     /* Push callback + info on the error context stack */
    1637        6198 :     state.ctx = ctx;
    1638        6198 :     state.callback_name = "update_progress_txn";
    1639        6198 :     state.report_location = lsn;
    1640        6198 :     errcallback.callback = output_plugin_error_callback;
    1641        6198 :     errcallback.arg = &state;
    1642        6198 :     errcallback.previous = error_context_stack;
    1643        6198 :     error_context_stack = &errcallback;
    1644             : 
    1645             :     /* set output state */
    1646        6198 :     ctx->accept_writes = false;
    1647        6198 :     ctx->write_xid = txn->xid;
    1648             : 
    1649             :     /*
    1650             :      * Report this change's lsn so replies from clients can give an up-to-date
    1651             :      * answer. This won't ever be enough (and shouldn't be!) to confirm
    1652             :      * receipt of this transaction, but it might allow another transaction's
    1653             :      * commit to be confirmed with one message.
    1654             :      */
    1655        6198 :     ctx->write_location = lsn;
    1656             : 
    1657        6198 :     ctx->end_xact = false;
    1658             : 
    1659        6198 :     OutputPluginUpdateProgress(ctx, false);
    1660             : 
    1661             :     /* Pop the error context stack */
    1662        6198 :     error_context_stack = errcallback.previous;
    1663        6198 : }
    1664             : 
    1665             : /*
    1666             :  * Set the required catalog xmin horizon for historic snapshots in the current
    1667             :  * replication slot.
    1668             :  *
    1669             :  * Note that in the most cases, we won't be able to immediately use the xmin
    1670             :  * to increase the xmin horizon: we need to wait till the client has confirmed
    1671             :  * receiving current_lsn with LogicalConfirmReceivedLocation().
    1672             :  */
    1673             : void
    1674         722 : LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
    1675             : {
    1676         722 :     bool        updated_xmin = false;
    1677             :     ReplicationSlot *slot;
    1678         722 :     bool        got_new_xmin = false;
    1679             : 
    1680         722 :     slot = MyReplicationSlot;
    1681             : 
    1682             :     Assert(slot != NULL);
    1683             : 
    1684         722 :     SpinLockAcquire(&slot->mutex);
    1685             : 
    1686             :     /*
    1687             :      * don't overwrite if we already have a newer xmin. This can happen if we
    1688             :      * restart decoding in a slot.
    1689             :      */
    1690         722 :     if (TransactionIdPrecedesOrEquals(xmin, slot->data.catalog_xmin))
    1691             :     {
    1692             :     }
    1693             : 
    1694             :     /*
    1695             :      * If the client has already confirmed up to this lsn, we directly can
    1696             :      * mark this as accepted. This can happen if we restart decoding in a
    1697             :      * slot.
    1698             :      */
    1699         180 :     else if (current_lsn <= slot->data.confirmed_flush)
    1700             :     {
    1701          88 :         slot->candidate_catalog_xmin = xmin;
    1702          88 :         slot->candidate_xmin_lsn = current_lsn;
    1703             : 
    1704             :         /* our candidate can directly be used */
    1705          88 :         updated_xmin = true;
    1706             :     }
    1707             : 
    1708             :     /*
    1709             :      * Only increase if the previous values have been applied, otherwise we
    1710             :      * might never end up updating if the receiver acks too slowly.
    1711             :      */
    1712          92 :     else if (slot->candidate_xmin_lsn == InvalidXLogRecPtr)
    1713             :     {
    1714          44 :         slot->candidate_catalog_xmin = xmin;
    1715          44 :         slot->candidate_xmin_lsn = current_lsn;
    1716             : 
    1717             :         /*
    1718             :          * Log new xmin at an appropriate log level after releasing the
    1719             :          * spinlock.
    1720             :          */
    1721          44 :         got_new_xmin = true;
    1722             :     }
    1723         722 :     SpinLockRelease(&slot->mutex);
    1724             : 
    1725         722 :     if (got_new_xmin)
    1726          44 :         elog(DEBUG1, "got new catalog xmin %u at %X/%X", xmin,
    1727             :              LSN_FORMAT_ARGS(current_lsn));
    1728             : 
    1729             :     /* candidate already valid with the current flush position, apply */
    1730         722 :     if (updated_xmin)
    1731          88 :         LogicalConfirmReceivedLocation(slot->data.confirmed_flush);
    1732         722 : }
    1733             : 
    1734             : /*
    1735             :  * Mark the minimal LSN (restart_lsn) we need to read to replay all
    1736             :  * transactions that have not yet committed at current_lsn.
    1737             :  *
    1738             :  * Just like LogicalIncreaseXminForSlot this only takes effect when the
    1739             :  * client has confirmed to have received current_lsn.
    1740             :  */
    1741             : void
    1742         628 : LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart_lsn)
    1743             : {
    1744         628 :     bool        updated_lsn = false;
    1745             :     ReplicationSlot *slot;
    1746             : 
    1747         628 :     slot = MyReplicationSlot;
    1748             : 
    1749             :     Assert(slot != NULL);
    1750             :     Assert(restart_lsn != InvalidXLogRecPtr);
    1751             :     Assert(current_lsn != InvalidXLogRecPtr);
    1752             : 
    1753         628 :     SpinLockAcquire(&slot->mutex);
    1754             : 
    1755             :     /* don't overwrite if have a newer restart lsn */
    1756         628 :     if (restart_lsn <= slot->data.restart_lsn)
    1757             :     {
    1758          18 :         SpinLockRelease(&slot->mutex);
    1759             :     }
    1760             : 
    1761             :     /*
    1762             :      * We might have already flushed far enough to directly accept this lsn,
    1763             :      * in this case there is no need to check for existing candidate LSNs
    1764             :      */
    1765         610 :     else if (current_lsn <= slot->data.confirmed_flush)
    1766             :     {
    1767         476 :         slot->candidate_restart_valid = current_lsn;
    1768         476 :         slot->candidate_restart_lsn = restart_lsn;
    1769         476 :         SpinLockRelease(&slot->mutex);
    1770             : 
    1771             :         /* our candidate can directly be used */
    1772         476 :         updated_lsn = true;
    1773             :     }
    1774             : 
    1775             :     /*
    1776             :      * Only increase if the previous values have been applied, otherwise we
    1777             :      * might never end up updating if the receiver acks too slowly. A missed
    1778             :      * value here will just cause some extra effort after reconnecting.
    1779             :      */
    1780         134 :     else if (slot->candidate_restart_valid == InvalidXLogRecPtr)
    1781             :     {
    1782          70 :         slot->candidate_restart_valid = current_lsn;
    1783          70 :         slot->candidate_restart_lsn = restart_lsn;
    1784          70 :         SpinLockRelease(&slot->mutex);
    1785             : 
    1786          70 :         elog(DEBUG1, "got new restart lsn %X/%X at %X/%X",
    1787             :              LSN_FORMAT_ARGS(restart_lsn),
    1788             :              LSN_FORMAT_ARGS(current_lsn));
    1789             :     }
    1790             :     else
    1791             :     {
    1792             :         XLogRecPtr  candidate_restart_lsn;
    1793             :         XLogRecPtr  candidate_restart_valid;
    1794             :         XLogRecPtr  confirmed_flush;
    1795             : 
    1796          64 :         candidate_restart_lsn = slot->candidate_restart_lsn;
    1797          64 :         candidate_restart_valid = slot->candidate_restart_valid;
    1798          64 :         confirmed_flush = slot->data.confirmed_flush;
    1799          64 :         SpinLockRelease(&slot->mutex);
    1800             : 
    1801          64 :         elog(DEBUG1, "failed to increase restart lsn: proposed %X/%X, after %X/%X, current candidate %X/%X, current after %X/%X, flushed up to %X/%X",
    1802             :              LSN_FORMAT_ARGS(restart_lsn),
    1803             :              LSN_FORMAT_ARGS(current_lsn),
    1804             :              LSN_FORMAT_ARGS(candidate_restart_lsn),
    1805             :              LSN_FORMAT_ARGS(candidate_restart_valid),
    1806             :              LSN_FORMAT_ARGS(confirmed_flush));
    1807             :     }
    1808             : 
    1809             :     /* candidates are already valid with the current flush position, apply */
    1810         628 :     if (updated_lsn)
    1811         476 :         LogicalConfirmReceivedLocation(slot->data.confirmed_flush);
    1812         628 : }
    1813             : 
    1814             : /*
    1815             :  * Handle a consumer's confirmation having received all changes up to lsn.
    1816             :  */
    1817             : void
    1818       48298 : LogicalConfirmReceivedLocation(XLogRecPtr lsn)
    1819             : {
    1820             :     Assert(lsn != InvalidXLogRecPtr);
    1821             : 
    1822             :     /* Do an unlocked check for candidate_lsn first. */
    1823       48298 :     if (MyReplicationSlot->candidate_xmin_lsn != InvalidXLogRecPtr ||
    1824       48172 :         MyReplicationSlot->candidate_restart_valid != InvalidXLogRecPtr)
    1825         632 :     {
    1826         632 :         bool        updated_xmin = false;
    1827         632 :         bool        updated_restart = false;
    1828             : 
    1829         632 :         SpinLockAcquire(&MyReplicationSlot->mutex);
    1830             : 
    1831         632 :         MyReplicationSlot->data.confirmed_flush = lsn;
    1832             : 
    1833             :         /* if we're past the location required for bumping xmin, do so */
    1834         632 :         if (MyReplicationSlot->candidate_xmin_lsn != InvalidXLogRecPtr &&
    1835         126 :             MyReplicationSlot->candidate_xmin_lsn <= lsn)
    1836             :         {
    1837             :             /*
    1838             :              * We have to write the changed xmin to disk *before* we change
    1839             :              * the in-memory value, otherwise after a crash we wouldn't know
    1840             :              * that some catalog tuples might have been removed already.
    1841             :              *
    1842             :              * Ensure that by first writing to ->xmin and only update
    1843             :              * ->effective_xmin once the new state is synced to disk. After a
    1844             :              * crash ->effective_xmin is set to ->xmin.
    1845             :              */
    1846         122 :             if (TransactionIdIsValid(MyReplicationSlot->candidate_catalog_xmin) &&
    1847         122 :                 MyReplicationSlot->data.catalog_xmin != MyReplicationSlot->candidate_catalog_xmin)
    1848             :             {
    1849         122 :                 MyReplicationSlot->data.catalog_xmin = MyReplicationSlot->candidate_catalog_xmin;
    1850         122 :                 MyReplicationSlot->candidate_catalog_xmin = InvalidTransactionId;
    1851         122 :                 MyReplicationSlot->candidate_xmin_lsn = InvalidXLogRecPtr;
    1852         122 :                 updated_xmin = true;
    1853             :             }
    1854             :         }
    1855             : 
    1856         632 :         if (MyReplicationSlot->candidate_restart_valid != InvalidXLogRecPtr &&
    1857         544 :             MyReplicationSlot->candidate_restart_valid <= lsn)
    1858             :         {
    1859             :             Assert(MyReplicationSlot->candidate_restart_lsn != InvalidXLogRecPtr);
    1860             : 
    1861         538 :             MyReplicationSlot->data.restart_lsn = MyReplicationSlot->candidate_restart_lsn;
    1862         538 :             MyReplicationSlot->candidate_restart_lsn = InvalidXLogRecPtr;
    1863         538 :             MyReplicationSlot->candidate_restart_valid = InvalidXLogRecPtr;
    1864         538 :             updated_restart = true;
    1865             :         }
    1866             : 
    1867         632 :         SpinLockRelease(&MyReplicationSlot->mutex);
    1868             : 
    1869             :         /* first write new xmin to disk, so we know what's up after a crash */
    1870         632 :         if (updated_xmin || updated_restart)
    1871             :         {
    1872         626 :             ReplicationSlotMarkDirty();
    1873         626 :             ReplicationSlotSave();
    1874         626 :             elog(DEBUG1, "updated xmin: %u restart: %u", updated_xmin, updated_restart);
    1875             :         }
    1876             : 
    1877             :         /*
    1878             :          * Now the new xmin is safely on disk, we can let the global value
    1879             :          * advance. We do not take ProcArrayLock or similar since we only
    1880             :          * advance xmin here and there's not much harm done by a concurrent
    1881             :          * computation missing that.
    1882             :          */
    1883         632 :         if (updated_xmin)
    1884             :         {
    1885         122 :             SpinLockAcquire(&MyReplicationSlot->mutex);
    1886         122 :             MyReplicationSlot->effective_catalog_xmin = MyReplicationSlot->data.catalog_xmin;
    1887         122 :             SpinLockRelease(&MyReplicationSlot->mutex);
    1888             : 
    1889         122 :             ReplicationSlotsComputeRequiredXmin(false);
    1890         122 :             ReplicationSlotsComputeRequiredLSN();
    1891             :         }
    1892             :     }
    1893             :     else
    1894             :     {
    1895       47666 :         SpinLockAcquire(&MyReplicationSlot->mutex);
    1896       47666 :         MyReplicationSlot->data.confirmed_flush = lsn;
    1897       47666 :         SpinLockRelease(&MyReplicationSlot->mutex);
    1898             :     }
    1899       48298 : }
    1900             : 
    1901             : /*
    1902             :  * Clear logical streaming state during (sub)transaction abort.
    1903             :  */
    1904             : void
    1905       58198 : ResetLogicalStreamingState(void)
    1906             : {
    1907       58198 :     CheckXidAlive = InvalidTransactionId;
    1908       58198 :     bsysscan = false;
    1909       58198 : }
    1910             : 
    1911             : /*
    1912             :  * Report stats for a slot.
    1913             :  */
    1914             : void
    1915       12760 : UpdateDecodingStats(LogicalDecodingContext *ctx)
    1916             : {
    1917       12760 :     ReorderBuffer *rb = ctx->reorder;
    1918             :     PgStat_StatReplSlotEntry repSlotStat;
    1919             : 
    1920             :     /* Nothing to do if we don't have any replication stats to be sent. */
    1921       12760 :     if (rb->spillBytes <= 0 && rb->streamBytes <= 0 && rb->totalBytes <= 0)
    1922         508 :         return;
    1923             : 
    1924       12252 :     elog(DEBUG2, "UpdateDecodingStats: updating stats %p %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64,
    1925             :          rb,
    1926             :          rb->spillTxns,
    1927             :          rb->spillCount,
    1928             :          rb->spillBytes,
    1929             :          rb->streamTxns,
    1930             :          rb->streamCount,
    1931             :          rb->streamBytes,
    1932             :          rb->totalTxns,
    1933             :          rb->totalBytes);
    1934             : 
    1935       12252 :     repSlotStat.spill_txns = rb->spillTxns;
    1936       12252 :     repSlotStat.spill_count = rb->spillCount;
    1937       12252 :     repSlotStat.spill_bytes = rb->spillBytes;
    1938       12252 :     repSlotStat.stream_txns = rb->streamTxns;
    1939       12252 :     repSlotStat.stream_count = rb->streamCount;
    1940       12252 :     repSlotStat.stream_bytes = rb->streamBytes;
    1941       12252 :     repSlotStat.total_txns = rb->totalTxns;
    1942       12252 :     repSlotStat.total_bytes = rb->totalBytes;
    1943             : 
    1944       12252 :     pgstat_report_replslot(ctx->slot, &repSlotStat);
    1945             : 
    1946       12252 :     rb->spillTxns = 0;
    1947       12252 :     rb->spillCount = 0;
    1948       12252 :     rb->spillBytes = 0;
    1949       12252 :     rb->streamTxns = 0;
    1950       12252 :     rb->streamCount = 0;
    1951       12252 :     rb->streamBytes = 0;
    1952       12252 :     rb->totalTxns = 0;
    1953       12252 :     rb->totalBytes = 0;
    1954             : }
    1955             : 
    1956             : /*
    1957             :  * Read up to the end of WAL starting from the decoding slot's restart_lsn.
    1958             :  * Return true if any meaningful/decodable WAL records are encountered,
    1959             :  * otherwise false.
    1960             :  */
    1961             : bool
    1962          10 : LogicalReplicationSlotHasPendingWal(XLogRecPtr end_of_wal)
    1963             : {
    1964          10 :     bool        has_pending_wal = false;
    1965             : 
    1966             :     Assert(MyReplicationSlot);
    1967             : 
    1968          10 :     PG_TRY();
    1969             :     {
    1970             :         LogicalDecodingContext *ctx;
    1971             : 
    1972             :         /*
    1973             :          * Create our decoding context in fast_forward mode, passing start_lsn
    1974             :          * as InvalidXLogRecPtr, so that we start processing from the slot's
    1975             :          * confirmed_flush.
    1976             :          */
    1977          20 :         ctx = CreateDecodingContext(InvalidXLogRecPtr,
    1978             :                                     NIL,
    1979             :                                     true,   /* fast_forward */
    1980          10 :                                     XL_ROUTINE(.page_read = read_local_xlog_page,
    1981             :                                                .segment_open = wal_segment_open,
    1982             :                                                .segment_close = wal_segment_close),
    1983             :                                     NULL, NULL, NULL);
    1984             : 
    1985             :         /*
    1986             :          * Start reading at the slot's restart_lsn, which we know points to a
    1987             :          * valid record.
    1988             :          */
    1989          10 :         XLogBeginRead(ctx->reader, MyReplicationSlot->data.restart_lsn);
    1990             : 
    1991             :         /* Invalidate non-timetravel entries */
    1992          10 :         InvalidateSystemCaches();
    1993             : 
    1994             :         /* Loop until the end of WAL or some changes are processed */
    1995         300 :         while (!has_pending_wal && ctx->reader->EndRecPtr < end_of_wal)
    1996             :         {
    1997             :             XLogRecord *record;
    1998         290 :             char       *errm = NULL;
    1999             : 
    2000         290 :             record = XLogReadRecord(ctx->reader, &errm);
    2001             : 
    2002         290 :             if (errm)
    2003           0 :                 elog(ERROR, "could not find record for logical decoding: %s", errm);
    2004             : 
    2005         290 :             if (record != NULL)
    2006         290 :                 LogicalDecodingProcessRecord(ctx, ctx->reader);
    2007             : 
    2008         290 :             has_pending_wal = ctx->processing_required;
    2009             : 
    2010         290 :             CHECK_FOR_INTERRUPTS();
    2011             :         }
    2012             : 
    2013             :         /* Clean up */
    2014          10 :         FreeDecodingContext(ctx);
    2015          10 :         InvalidateSystemCaches();
    2016             :     }
    2017           0 :     PG_CATCH();
    2018             :     {
    2019             :         /* clear all timetravel entries */
    2020           0 :         InvalidateSystemCaches();
    2021             : 
    2022           0 :         PG_RE_THROW();
    2023             :     }
    2024          10 :     PG_END_TRY();
    2025             : 
    2026          10 :     return has_pending_wal;
    2027             : }
    2028             : 
    2029             : /*
    2030             :  * Helper function for advancing our logical replication slot forward.
    2031             :  *
    2032             :  * The slot's restart_lsn is used as start point for reading records, while
    2033             :  * confirmed_flush is used as base point for the decoding context.
    2034             :  *
    2035             :  * We cannot just do LogicalConfirmReceivedLocation to update confirmed_flush,
    2036             :  * because we need to digest WAL to advance restart_lsn allowing to recycle
    2037             :  * WAL and removal of old catalog tuples.  As decoding is done in fast_forward
    2038             :  * mode, no changes are generated anyway.
    2039             :  *
    2040             :  * *found_consistent_snapshot will be true if the initial decoding snapshot has
    2041             :  * been built; Otherwise, it will be false.
    2042             :  */
    2043             : XLogRecPtr
    2044          28 : LogicalSlotAdvanceAndCheckSnapState(XLogRecPtr moveto,
    2045             :                                     bool *found_consistent_snapshot)
    2046             : {
    2047             :     LogicalDecodingContext *ctx;
    2048          28 :     ResourceOwner old_resowner = CurrentResourceOwner;
    2049             :     XLogRecPtr  retlsn;
    2050             : 
    2051             :     Assert(moveto != InvalidXLogRecPtr);
    2052             : 
    2053          28 :     if (found_consistent_snapshot)
    2054          10 :         *found_consistent_snapshot = false;
    2055             : 
    2056          28 :     PG_TRY();
    2057             :     {
    2058             :         /*
    2059             :          * Create our decoding context in fast_forward mode, passing start_lsn
    2060             :          * as InvalidXLogRecPtr, so that we start processing from my slot's
    2061             :          * confirmed_flush.
    2062             :          */
    2063          56 :         ctx = CreateDecodingContext(InvalidXLogRecPtr,
    2064             :                                     NIL,
    2065             :                                     true,   /* fast_forward */
    2066          28 :                                     XL_ROUTINE(.page_read = read_local_xlog_page,
    2067             :                                                .segment_open = wal_segment_open,
    2068             :                                                .segment_close = wal_segment_close),
    2069             :                                     NULL, NULL, NULL);
    2070             : 
    2071             :         /*
    2072             :          * Wait for specified streaming replication standby servers (if any)
    2073             :          * to confirm receipt of WAL up to moveto lsn.
    2074             :          */
    2075          28 :         WaitForStandbyConfirmation(moveto);
    2076             : 
    2077             :         /*
    2078             :          * Start reading at the slot's restart_lsn, which we know to point to
    2079             :          * a valid record.
    2080             :          */
    2081          28 :         XLogBeginRead(ctx->reader, MyReplicationSlot->data.restart_lsn);
    2082             : 
    2083             :         /* invalidate non-timetravel entries */
    2084          28 :         InvalidateSystemCaches();
    2085             : 
    2086             :         /* Decode records until we reach the requested target */
    2087        3424 :         while (ctx->reader->EndRecPtr < moveto)
    2088             :         {
    2089        3396 :             char       *errm = NULL;
    2090             :             XLogRecord *record;
    2091             : 
    2092             :             /*
    2093             :              * Read records.  No changes are generated in fast_forward mode,
    2094             :              * but snapbuilder/slot statuses are updated properly.
    2095             :              */
    2096        3396 :             record = XLogReadRecord(ctx->reader, &errm);
    2097        3396 :             if (errm)
    2098           0 :                 elog(ERROR, "could not find record while advancing replication slot: %s",
    2099             :                      errm);
    2100             : 
    2101             :             /*
    2102             :              * Process the record.  Storage-level changes are ignored in
    2103             :              * fast_forward mode, but other modules (such as snapbuilder)
    2104             :              * might still have critical updates to do.
    2105             :              */
    2106        3396 :             if (record)
    2107        3396 :                 LogicalDecodingProcessRecord(ctx, ctx->reader);
    2108             : 
    2109        3396 :             CHECK_FOR_INTERRUPTS();
    2110             :         }
    2111             : 
    2112          28 :         if (found_consistent_snapshot && DecodingContextReady(ctx))
    2113          10 :             *found_consistent_snapshot = true;
    2114             : 
    2115             :         /*
    2116             :          * Logical decoding could have clobbered CurrentResourceOwner during
    2117             :          * transaction management, so restore the executor's value.  (This is
    2118             :          * a kluge, but it's not worth cleaning up right now.)
    2119             :          */
    2120          28 :         CurrentResourceOwner = old_resowner;
    2121             : 
    2122          28 :         if (ctx->reader->EndRecPtr != InvalidXLogRecPtr)
    2123             :         {
    2124          28 :             LogicalConfirmReceivedLocation(moveto);
    2125             : 
    2126             :             /*
    2127             :              * If only the confirmed_flush LSN has changed the slot won't get
    2128             :              * marked as dirty by the above. Callers on the walsender
    2129             :              * interface are expected to keep track of their own progress and
    2130             :              * don't need it written out. But SQL-interface users cannot
    2131             :              * specify their own start positions and it's harder for them to
    2132             :              * keep track of their progress, so we should make more of an
    2133             :              * effort to save it for them.
    2134             :              *
    2135             :              * Dirty the slot so it is written out at the next checkpoint. The
    2136             :              * LSN position advanced to may still be lost on a crash but this
    2137             :              * makes the data consistent after a clean shutdown.
    2138             :              */
    2139          28 :             ReplicationSlotMarkDirty();
    2140             :         }
    2141             : 
    2142          28 :         retlsn = MyReplicationSlot->data.confirmed_flush;
    2143             : 
    2144             :         /* free context, call shutdown callback */
    2145          28 :         FreeDecodingContext(ctx);
    2146             : 
    2147          28 :         InvalidateSystemCaches();
    2148             :     }
    2149           0 :     PG_CATCH();
    2150             :     {
    2151             :         /* clear all timetravel entries */
    2152           0 :         InvalidateSystemCaches();
    2153             : 
    2154           0 :         PG_RE_THROW();
    2155             :     }
    2156          28 :     PG_END_TRY();
    2157             : 
    2158          28 :     return retlsn;
    2159             : }

Generated by: LCOV version 1.14