LCOV - code coverage report
Current view: top level - src/backend/replication/logical - logical.c (source / functions) Hit Total Coverage
Test: PostgreSQL 17devel Lines: 691 745 92.8 %
Date: 2024-04-25 21:11:15 Functions: 40 41 97.6 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  * logical.c
       3             :  *     PostgreSQL logical decoding coordination
       4             :  *
       5             :  * Copyright (c) 2012-2024, PostgreSQL Global Development Group
       6             :  *
       7             :  * IDENTIFICATION
       8             :  *    src/backend/replication/logical/logical.c
       9             :  *
      10             :  * NOTES
      11             :  *    This file coordinates interaction between the various modules that
      12             :  *    together provide logical decoding, primarily by providing so
      13             :  *    called LogicalDecodingContexts. The goal is to encapsulate most of the
      14             :  *    internal complexity for consumers of logical decoding, so they can
      15             :  *    create and consume a changestream with a low amount of code. Builtin
      16             :  *    consumers are the walsender and SQL SRF interface, but it's possible to
      17             :  *    add further ones without changing core code, e.g. to consume changes in
      18             :  *    a bgworker.
      19             :  *
      20             :  *    The idea is that a consumer provides three callbacks, one to read WAL,
      21             :  *    one to prepare a data write, and a final one for actually writing since
      22             :  *    their implementation depends on the type of consumer.  Check
      23             :  *    logicalfuncs.c for an example implementation of a fairly simple consumer
      24             :  *    and an implementation of a WAL reading callback that's suitable for
      25             :  *    simple consumers.
      26             :  *-------------------------------------------------------------------------
      27             :  */
      28             : 
      29             : #include "postgres.h"
      30             : 
      31             : #include "access/xact.h"
      32             : #include "access/xlogutils.h"
      33             : #include "fmgr.h"
      34             : #include "miscadmin.h"
      35             : #include "pgstat.h"
      36             : #include "replication/decode.h"
      37             : #include "replication/logical.h"
      38             : #include "replication/reorderbuffer.h"
      39             : #include "replication/slotsync.h"
      40             : #include "replication/snapbuild.h"
      41             : #include "storage/proc.h"
      42             : #include "storage/procarray.h"
      43             : #include "utils/builtins.h"
      44             : #include "utils/inval.h"
      45             : #include "utils/memutils.h"
      46             : 
      47             : /* data for errcontext callback */
      48             : typedef struct LogicalErrorCallbackState
      49             : {
      50             :     LogicalDecodingContext *ctx;
      51             :     const char *callback_name;
      52             :     XLogRecPtr  report_location;
      53             : } LogicalErrorCallbackState;
      54             : 
      55             : /* wrappers around output plugin callbacks */
      56             : static void output_plugin_error_callback(void *arg);
      57             : static void startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
      58             :                                bool is_init);
      59             : static void shutdown_cb_wrapper(LogicalDecodingContext *ctx);
      60             : static void begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn);
      61             : static void commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      62             :                               XLogRecPtr commit_lsn);
      63             : static void begin_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn);
      64             : static void prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      65             :                                XLogRecPtr prepare_lsn);
      66             : static void commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      67             :                                        XLogRecPtr commit_lsn);
      68             : static void rollback_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      69             :                                          XLogRecPtr prepare_end_lsn, TimestampTz prepare_time);
      70             : static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      71             :                               Relation relation, ReorderBufferChange *change);
      72             : static void truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      73             :                                 int nrelations, Relation relations[], ReorderBufferChange *change);
      74             : static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      75             :                                XLogRecPtr message_lsn, bool transactional,
      76             :                                const char *prefix, Size message_size, const char *message);
      77             : 
      78             : /* streaming callbacks */
      79             : static void stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      80             :                                     XLogRecPtr first_lsn);
      81             : static void stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      82             :                                    XLogRecPtr last_lsn);
      83             : static void stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      84             :                                     XLogRecPtr abort_lsn);
      85             : static void stream_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      86             :                                       XLogRecPtr prepare_lsn);
      87             : static void stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      88             :                                      XLogRecPtr commit_lsn);
      89             : static void stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      90             :                                      Relation relation, ReorderBufferChange *change);
      91             : static void stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      92             :                                       XLogRecPtr message_lsn, bool transactional,
      93             :                                       const char *prefix, Size message_size, const char *message);
      94             : static void stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      95             :                                        int nrelations, Relation relations[], ReorderBufferChange *change);
      96             : 
      97             : /* callback to update txn's progress */
      98             : static void update_progress_txn_cb_wrapper(ReorderBuffer *cache,
      99             :                                            ReorderBufferTXN *txn,
     100             :                                            XLogRecPtr lsn);
     101             : 
     102             : static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, const char *plugin);
     103             : 
     104             : /*
     105             :  * Make sure the current settings & environment are capable of doing logical
     106             :  * decoding.
     107             :  */
     108             : void
     109        2666 : CheckLogicalDecodingRequirements(void)
     110             : {
     111        2666 :     CheckSlotRequirements();
     112             : 
     113             :     /*
     114             :      * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
     115             :      * needs the same check.
     116             :      */
     117             : 
     118        2666 :     if (wal_level < WAL_LEVEL_LOGICAL)
     119           0 :         ereport(ERROR,
     120             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     121             :                  errmsg("logical decoding requires wal_level >= logical")));
     122             : 
     123        2666 :     if (MyDatabaseId == InvalidOid)
     124           2 :         ereport(ERROR,
     125             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     126             :                  errmsg("logical decoding requires a database connection")));
     127             : 
     128        2664 :     if (RecoveryInProgress())
     129             :     {
     130             :         /*
     131             :          * This check may have race conditions, but whenever
     132             :          * XLOG_PARAMETER_CHANGE indicates that wal_level has changed, we
     133             :          * verify that there are no existing logical replication slots. And to
     134             :          * avoid races around creating a new slot,
     135             :          * CheckLogicalDecodingRequirements() is called once before creating
     136             :          * the slot, and once when logical decoding is initially starting up.
     137             :          */
     138         132 :         if (GetActiveWalLevelOnStandby() < WAL_LEVEL_LOGICAL)
     139           2 :             ereport(ERROR,
     140             :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     141             :                      errmsg("logical decoding on standby requires wal_level >= logical on the primary")));
     142             :     }
     143        2662 : }
     144             : 
     145             : /*
     146             :  * Helper function for CreateInitDecodingContext() and
     147             :  * CreateDecodingContext() performing common tasks.
     148             :  */
     149             : static LogicalDecodingContext *
     150        1860 : StartupDecodingContext(List *output_plugin_options,
     151             :                        XLogRecPtr start_lsn,
     152             :                        TransactionId xmin_horizon,
     153             :                        bool need_full_snapshot,
     154             :                        bool fast_forward,
     155             :                        XLogReaderRoutine *xl_routine,
     156             :                        LogicalOutputPluginWriterPrepareWrite prepare_write,
     157             :                        LogicalOutputPluginWriterWrite do_write,
     158             :                        LogicalOutputPluginWriterUpdateProgress update_progress)
     159             : {
     160             :     ReplicationSlot *slot;
     161             :     MemoryContext context,
     162             :                 old_context;
     163             :     LogicalDecodingContext *ctx;
     164             : 
     165             :     /* shorter lines... */
     166        1860 :     slot = MyReplicationSlot;
     167             : 
     168        1860 :     context = AllocSetContextCreate(CurrentMemoryContext,
     169             :                                     "Logical decoding context",
     170             :                                     ALLOCSET_DEFAULT_SIZES);
     171        1860 :     old_context = MemoryContextSwitchTo(context);
     172        1860 :     ctx = palloc0(sizeof(LogicalDecodingContext));
     173             : 
     174        1860 :     ctx->context = context;
     175             : 
     176             :     /*
     177             :      * (re-)load output plugins, so we detect a bad (removed) output plugin
     178             :      * now.
     179             :      */
     180        1860 :     if (!fast_forward)
     181        1824 :         LoadOutputPlugin(&ctx->callbacks, NameStr(slot->data.plugin));
     182             : 
     183             :     /*
     184             :      * Now that the slot's xmin has been set, we can announce ourselves as a
     185             :      * logical decoding backend which doesn't need to be checked individually
     186             :      * when computing the xmin horizon because the xmin is enforced via
     187             :      * replication slots.
     188             :      *
     189             :      * We can only do so if we're outside of a transaction (i.e. the case when
     190             :      * streaming changes via walsender), otherwise an already setup
     191             :      * snapshot/xid would end up being ignored. That's not a particularly
     192             :      * bothersome restriction since the SQL interface can't be used for
     193             :      * streaming anyway.
     194             :      */
     195        1858 :     if (!IsTransactionOrTransactionBlock())
     196             :     {
     197         880 :         LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
     198         880 :         MyProc->statusFlags |= PROC_IN_LOGICAL_DECODING;
     199         880 :         ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags;
     200         880 :         LWLockRelease(ProcArrayLock);
     201             :     }
     202             : 
     203        1858 :     ctx->slot = slot;
     204             : 
     205        1858 :     ctx->reader = XLogReaderAllocate(wal_segment_size, NULL, xl_routine, ctx);
     206        1858 :     if (!ctx->reader)
     207           0 :         ereport(ERROR,
     208             :                 (errcode(ERRCODE_OUT_OF_MEMORY),
     209             :                  errmsg("out of memory"),
     210             :                  errdetail("Failed while allocating a WAL reading processor.")));
     211             : 
     212        1858 :     ctx->reorder = ReorderBufferAllocate();
     213        1858 :     ctx->snapshot_builder =
     214        1858 :         AllocateSnapshotBuilder(ctx->reorder, xmin_horizon, start_lsn,
     215             :                                 need_full_snapshot, slot->data.two_phase_at);
     216             : 
     217        1858 :     ctx->reorder->private_data = ctx;
     218             : 
     219             :     /* wrap output plugin callbacks, so we can add error context information */
     220        1858 :     ctx->reorder->begin = begin_cb_wrapper;
     221        1858 :     ctx->reorder->apply_change = change_cb_wrapper;
     222        1858 :     ctx->reorder->apply_truncate = truncate_cb_wrapper;
     223        1858 :     ctx->reorder->commit = commit_cb_wrapper;
     224        1858 :     ctx->reorder->message = message_cb_wrapper;
     225             : 
     226             :     /*
     227             :      * To support streaming, we require start/stop/abort/commit/change
     228             :      * callbacks. The message and truncate callbacks are optional, similar to
     229             :      * regular output plugins. We however enable streaming when at least one
     230             :      * of the methods is enabled so that we can easily identify missing
     231             :      * methods.
     232             :      *
     233             :      * We decide it here, but only check it later in the wrappers.
     234             :      */
     235        3752 :     ctx->streaming = (ctx->callbacks.stream_start_cb != NULL) ||
     236          36 :         (ctx->callbacks.stream_stop_cb != NULL) ||
     237          36 :         (ctx->callbacks.stream_abort_cb != NULL) ||
     238          36 :         (ctx->callbacks.stream_commit_cb != NULL) ||
     239          36 :         (ctx->callbacks.stream_change_cb != NULL) ||
     240        1930 :         (ctx->callbacks.stream_message_cb != NULL) ||
     241          36 :         (ctx->callbacks.stream_truncate_cb != NULL);
     242             : 
     243             :     /*
     244             :      * streaming callbacks
     245             :      *
     246             :      * stream_message and stream_truncate callbacks are optional, so we do not
     247             :      * fail with ERROR when missing, but the wrappers simply do nothing. We
     248             :      * must set the ReorderBuffer callbacks to something, otherwise the calls
     249             :      * from there will crash (we don't want to move the checks there).
     250             :      */
     251        1858 :     ctx->reorder->stream_start = stream_start_cb_wrapper;
     252        1858 :     ctx->reorder->stream_stop = stream_stop_cb_wrapper;
     253        1858 :     ctx->reorder->stream_abort = stream_abort_cb_wrapper;
     254        1858 :     ctx->reorder->stream_prepare = stream_prepare_cb_wrapper;
     255        1858 :     ctx->reorder->stream_commit = stream_commit_cb_wrapper;
     256        1858 :     ctx->reorder->stream_change = stream_change_cb_wrapper;
     257        1858 :     ctx->reorder->stream_message = stream_message_cb_wrapper;
     258        1858 :     ctx->reorder->stream_truncate = stream_truncate_cb_wrapper;
     259             : 
     260             : 
     261             :     /*
     262             :      * To support two-phase logical decoding, we require
     263             :      * begin_prepare/prepare/commit-prepare/abort-prepare callbacks. The
     264             :      * filter_prepare callback is optional. We however enable two-phase
     265             :      * logical decoding when at least one of the methods is enabled so that we
     266             :      * can easily identify missing methods.
     267             :      *
     268             :      * We decide it here, but only check it later in the wrappers.
     269             :      */
     270        3752 :     ctx->twophase = (ctx->callbacks.begin_prepare_cb != NULL) ||
     271          36 :         (ctx->callbacks.prepare_cb != NULL) ||
     272          36 :         (ctx->callbacks.commit_prepared_cb != NULL) ||
     273          36 :         (ctx->callbacks.rollback_prepared_cb != NULL) ||
     274        1930 :         (ctx->callbacks.stream_prepare_cb != NULL) ||
     275          36 :         (ctx->callbacks.filter_prepare_cb != NULL);
     276             : 
     277             :     /*
     278             :      * Callback to support decoding at prepare time.
     279             :      */
     280        1858 :     ctx->reorder->begin_prepare = begin_prepare_cb_wrapper;
     281        1858 :     ctx->reorder->prepare = prepare_cb_wrapper;
     282        1858 :     ctx->reorder->commit_prepared = commit_prepared_cb_wrapper;
     283        1858 :     ctx->reorder->rollback_prepared = rollback_prepared_cb_wrapper;
     284             : 
     285             :     /*
     286             :      * Callback to support updating progress during sending data of a
     287             :      * transaction (and its subtransactions) to the output plugin.
     288             :      */
     289        1858 :     ctx->reorder->update_progress_txn = update_progress_txn_cb_wrapper;
     290             : 
     291        1858 :     ctx->out = makeStringInfo();
     292        1858 :     ctx->prepare_write = prepare_write;
     293        1858 :     ctx->write = do_write;
     294        1858 :     ctx->update_progress = update_progress;
     295             : 
     296        1858 :     ctx->output_plugin_options = output_plugin_options;
     297             : 
     298        1858 :     ctx->fast_forward = fast_forward;
     299             : 
     300        1858 :     MemoryContextSwitchTo(old_context);
     301             : 
     302        1858 :     return ctx;
     303             : }
     304             : 
     305             : /*
     306             :  * Create a new decoding context, for a new logical slot.
     307             :  *
     308             :  * plugin -- contains the name of the output plugin
     309             :  * output_plugin_options -- contains options passed to the output plugin
     310             :  * need_full_snapshot -- if true, must obtain a snapshot able to read all
     311             :  *      tables; if false, one that can read only catalogs is acceptable.
     312             :  * restart_lsn -- if given as invalid, it's this routine's responsibility to
     313             :  *      mark WAL as reserved by setting a convenient restart_lsn for the slot.
     314             :  *      Otherwise, we set for decoding to start from the given LSN without
     315             :  *      marking WAL reserved beforehand.  In that scenario, it's up to the
     316             :  *      caller to guarantee that WAL remains available.
     317             :  * xl_routine -- XLogReaderRoutine for underlying XLogReader
     318             :  * prepare_write, do_write, update_progress --
     319             :  *      callbacks that perform the use-case dependent, actual, work.
     320             :  *
     321             :  * Needs to be called while in a memory context that's at least as long lived
     322             :  * as the decoding context because further memory contexts will be created
     323             :  * inside it.
     324             :  *
     325             :  * Returns an initialized decoding context after calling the output plugin's
     326             :  * startup function.
     327             :  */
     328             : LogicalDecodingContext *
     329         800 : CreateInitDecodingContext(const char *plugin,
     330             :                           List *output_plugin_options,
     331             :                           bool need_full_snapshot,
     332             :                           XLogRecPtr restart_lsn,
     333             :                           XLogReaderRoutine *xl_routine,
     334             :                           LogicalOutputPluginWriterPrepareWrite prepare_write,
     335             :                           LogicalOutputPluginWriterWrite do_write,
     336             :                           LogicalOutputPluginWriterUpdateProgress update_progress)
     337             : {
     338         800 :     TransactionId xmin_horizon = InvalidTransactionId;
     339             :     ReplicationSlot *slot;
     340             :     NameData    plugin_name;
     341             :     LogicalDecodingContext *ctx;
     342             :     MemoryContext old_context;
     343             : 
     344             :     /*
     345             :      * On a standby, this check is also required while creating the slot.
     346             :      * Check the comments in the function.
     347             :      */
     348         800 :     CheckLogicalDecodingRequirements();
     349             : 
     350             :     /* shorter lines... */
     351         800 :     slot = MyReplicationSlot;
     352             : 
     353             :     /* first some sanity checks that are unlikely to be violated */
     354         800 :     if (slot == NULL)
     355           0 :         elog(ERROR, "cannot perform logical decoding without an acquired slot");
     356             : 
     357         800 :     if (plugin == NULL)
     358           0 :         elog(ERROR, "cannot initialize logical decoding without a specified plugin");
     359             : 
     360             :     /* Make sure the passed slot is suitable. These are user facing errors. */
     361         800 :     if (SlotIsPhysical(slot))
     362           0 :         ereport(ERROR,
     363             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     364             :                  errmsg("cannot use physical replication slot for logical decoding")));
     365             : 
     366         800 :     if (slot->data.database != MyDatabaseId)
     367           0 :         ereport(ERROR,
     368             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     369             :                  errmsg("replication slot \"%s\" was not created in this database",
     370             :                         NameStr(slot->data.name))));
     371             : 
     372        1366 :     if (IsTransactionState() &&
     373         566 :         GetTopTransactionIdIfAny() != InvalidTransactionId)
     374           4 :         ereport(ERROR,
     375             :                 (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
     376             :                  errmsg("cannot create logical replication slot in transaction that has performed writes")));
     377             : 
     378             :     /*
     379             :      * Register output plugin name with slot.  We need the mutex to avoid
     380             :      * concurrent reading of a partially copied string.  But we don't want any
     381             :      * complicated code while holding a spinlock, so do namestrcpy() outside.
     382             :      */
     383         796 :     namestrcpy(&plugin_name, plugin);
     384         796 :     SpinLockAcquire(&slot->mutex);
     385         796 :     slot->data.plugin = plugin_name;
     386         796 :     SpinLockRelease(&slot->mutex);
     387             : 
     388         796 :     if (XLogRecPtrIsInvalid(restart_lsn))
     389         784 :         ReplicationSlotReserveWal();
     390             :     else
     391             :     {
     392          12 :         SpinLockAcquire(&slot->mutex);
     393          12 :         slot->data.restart_lsn = restart_lsn;
     394          12 :         SpinLockRelease(&slot->mutex);
     395             :     }
     396             : 
     397             :     /* ----
     398             :      * This is a bit tricky: We need to determine a safe xmin horizon to start
     399             :      * decoding from, to avoid starting from a running xacts record referring
     400             :      * to xids whose rows have been vacuumed or pruned
     401             :      * already. GetOldestSafeDecodingTransactionId() returns such a value, but
     402             :      * without further interlock its return value might immediately be out of
     403             :      * date.
     404             :      *
     405             :      * So we have to acquire the ProcArrayLock to prevent computation of new
     406             :      * xmin horizons by other backends, get the safe decoding xid, and inform
     407             :      * the slot machinery about the new limit. Once that's done the
     408             :      * ProcArrayLock can be released as the slot machinery now is
     409             :      * protecting against vacuum.
     410             :      *
     411             :      * Note that, temporarily, the data, not just the catalog, xmin has to be
     412             :      * reserved if a data snapshot is to be exported.  Otherwise the initial
     413             :      * data snapshot created here is not guaranteed to be valid. After that
     414             :      * the data xmin doesn't need to be managed anymore and the global xmin
     415             :      * should be recomputed. As we are fine with losing the pegged data xmin
     416             :      * after crash - no chance a snapshot would get exported anymore - we can
     417             :      * get away with just setting the slot's
     418             :      * effective_xmin. ReplicationSlotRelease will reset it again.
     419             :      *
     420             :      * ----
     421             :      */
     422         796 :     LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
     423             : 
     424         796 :     xmin_horizon = GetOldestSafeDecodingTransactionId(!need_full_snapshot);
     425             : 
     426         796 :     SpinLockAcquire(&slot->mutex);
     427         796 :     slot->effective_catalog_xmin = xmin_horizon;
     428         796 :     slot->data.catalog_xmin = xmin_horizon;
     429         796 :     if (need_full_snapshot)
     430         342 :         slot->effective_xmin = xmin_horizon;
     431         796 :     SpinLockRelease(&slot->mutex);
     432             : 
     433         796 :     ReplicationSlotsComputeRequiredXmin(true);
     434             : 
     435         796 :     LWLockRelease(ProcArrayLock);
     436             : 
     437         796 :     ReplicationSlotMarkDirty();
     438         796 :     ReplicationSlotSave();
     439             : 
     440         796 :     ctx = StartupDecodingContext(NIL, restart_lsn, xmin_horizon,
     441             :                                  need_full_snapshot, false,
     442             :                                  xl_routine, prepare_write, do_write,
     443             :                                  update_progress);
     444             : 
     445             :     /* call output plugin initialization callback */
     446         794 :     old_context = MemoryContextSwitchTo(ctx->context);
     447         794 :     if (ctx->callbacks.startup_cb != NULL)
     448         794 :         startup_cb_wrapper(ctx, &ctx->options, true);
     449         794 :     MemoryContextSwitchTo(old_context);
     450             : 
     451             :     /*
     452             :      * We allow decoding of prepared transactions when the two_phase is
     453             :      * enabled at the time of slot creation, or when the two_phase option is
     454             :      * given at the streaming start, provided the plugin supports all the
     455             :      * callbacks for two-phase.
     456             :      */
     457         794 :     ctx->twophase &= slot->data.two_phase;
     458             : 
     459         794 :     ctx->reorder->output_rewrites = ctx->options.receive_rewrites;
     460             : 
     461         794 :     return ctx;
     462             : }
     463             : 
     464             : /*
     465             :  * Create a new decoding context, for a logical slot that has previously been
     466             :  * used already.
     467             :  *
     468             :  * start_lsn
     469             :  *      The LSN at which to start decoding.  If InvalidXLogRecPtr, restart
     470             :  *      from the slot's confirmed_flush; otherwise, start from the specified
     471             :  *      location (but move it forwards to confirmed_flush if it's older than
     472             :  *      that, see below).
     473             :  *
     474             :  * output_plugin_options
     475             :  *      options passed to the output plugin.
     476             :  *
     477             :  * fast_forward
     478             :  *      bypass the generation of logical changes.
     479             :  *
     480             :  * xl_routine
     481             :  *      XLogReaderRoutine used by underlying xlogreader
     482             :  *
     483             :  * prepare_write, do_write, update_progress
     484             :  *      callbacks that have to be filled to perform the use-case dependent,
     485             :  *      actual work.
     486             :  *
     487             :  * Needs to be called while in a memory context that's at least as long lived
     488             :  * as the decoding context because further memory contexts will be created
     489             :  * inside it.
     490             :  *
     491             :  * Returns an initialized decoding context after calling the output plugin's
     492             :  * startup function.
     493             :  */
     494             : LogicalDecodingContext *
     495        1084 : CreateDecodingContext(XLogRecPtr start_lsn,
     496             :                       List *output_plugin_options,
     497             :                       bool fast_forward,
     498             :                       XLogReaderRoutine *xl_routine,
     499             :                       LogicalOutputPluginWriterPrepareWrite prepare_write,
     500             :                       LogicalOutputPluginWriterWrite do_write,
     501             :                       LogicalOutputPluginWriterUpdateProgress update_progress)
     502             : {
     503             :     LogicalDecodingContext *ctx;
     504             :     ReplicationSlot *slot;
     505             :     MemoryContext old_context;
     506             : 
     507             :     /* shorter lines... */
     508        1084 :     slot = MyReplicationSlot;
     509             : 
     510             :     /* first some sanity checks that are unlikely to be violated */
     511        1084 :     if (slot == NULL)
     512           0 :         elog(ERROR, "cannot perform logical decoding without an acquired slot");
     513             : 
     514             :     /* make sure the passed slot is suitable, these are user facing errors */
     515        1084 :     if (SlotIsPhysical(slot))
     516           2 :         ereport(ERROR,
     517             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     518             :                  errmsg("cannot use physical replication slot for logical decoding")));
     519             : 
     520             :     /*
     521             :      * We need to access the system tables during decoding to build the
     522             :      * logical changes unless we are in fast_forward mode where no changes are
     523             :      * generated.
     524             :      */
     525        1082 :     if (slot->data.database != MyDatabaseId && !fast_forward)
     526           6 :         ereport(ERROR,
     527             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     528             :                  errmsg("replication slot \"%s\" was not created in this database",
     529             :                         NameStr(slot->data.name))));
     530             : 
     531             :     /*
     532             :      * The slots being synced from the primary can't be used for decoding as
     533             :      * they are used after failover. However, we do allow advancing the LSNs
     534             :      * during the synchronization of slots. See update_local_synced_slot.
     535             :      */
     536        1076 :     if (RecoveryInProgress() && slot->data.synced && !IsSyncingReplicationSlots())
     537           2 :         ereport(ERROR,
     538             :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     539             :                 errmsg("cannot use replication slot \"%s\" for logical decoding",
     540             :                        NameStr(slot->data.name)),
     541             :                 errdetail("This slot is being synchronized from the primary server."),
     542             :                 errhint("Specify another replication slot."));
     543             : 
     544             :     /*
     545             :      * Check if slot has been invalidated due to max_slot_wal_keep_size. Avoid
     546             :      * "cannot get changes" wording in this errmsg because that'd be
     547             :      * confusingly ambiguous about no changes being available when called from
     548             :      * pg_logical_slot_get_changes_guts().
     549             :      */
     550        1074 :     if (MyReplicationSlot->data.invalidated == RS_INVAL_WAL_REMOVED)
     551           0 :         ereport(ERROR,
     552             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     553             :                  errmsg("can no longer get changes from replication slot \"%s\"",
     554             :                         NameStr(MyReplicationSlot->data.name)),
     555             :                  errdetail("This slot has been invalidated because it exceeded the maximum reserved size.")));
     556             : 
     557        1074 :     if (MyReplicationSlot->data.invalidated != RS_INVAL_NONE)
     558          10 :         ereport(ERROR,
     559             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     560             :                  errmsg("can no longer get changes from replication slot \"%s\"",
     561             :                         NameStr(MyReplicationSlot->data.name)),
     562             :                  errdetail("This slot has been invalidated because it was conflicting with recovery.")));
     563             : 
     564             :     Assert(MyReplicationSlot->data.invalidated == RS_INVAL_NONE);
     565             :     Assert(MyReplicationSlot->data.restart_lsn != InvalidXLogRecPtr);
     566             : 
     567        1064 :     if (start_lsn == InvalidXLogRecPtr)
     568             :     {
     569             :         /* continue from last position */
     570         646 :         start_lsn = slot->data.confirmed_flush;
     571             :     }
     572         418 :     else if (start_lsn < slot->data.confirmed_flush)
     573             :     {
     574             :         /*
     575             :          * It might seem like we should error out in this case, but it's
     576             :          * pretty common for a client to acknowledge a LSN it doesn't have to
     577             :          * do anything for, and thus didn't store persistently, because the
     578             :          * xlog records didn't result in anything relevant for logical
     579             :          * decoding. Clients have to be able to do that to support synchronous
     580             :          * replication.
     581             :          *
     582             :          * Starting at a different LSN than requested might not catch certain
     583             :          * kinds of client errors; so the client may wish to check that
     584             :          * confirmed_flush_lsn matches its expectations.
     585             :          */
     586          26 :         elog(LOG, "%X/%X has been already streamed, forwarding to %X/%X",
     587             :              LSN_FORMAT_ARGS(start_lsn),
     588             :              LSN_FORMAT_ARGS(slot->data.confirmed_flush));
     589             : 
     590          26 :         start_lsn = slot->data.confirmed_flush;
     591             :     }
     592             : 
     593        1064 :     ctx = StartupDecodingContext(output_plugin_options,
     594             :                                  start_lsn, InvalidTransactionId, false,
     595             :                                  fast_forward, xl_routine, prepare_write,
     596             :                                  do_write, update_progress);
     597             : 
     598             :     /* call output plugin initialization callback */
     599        1064 :     old_context = MemoryContextSwitchTo(ctx->context);
     600        1064 :     if (ctx->callbacks.startup_cb != NULL)
     601        1028 :         startup_cb_wrapper(ctx, &ctx->options, false);
     602        1058 :     MemoryContextSwitchTo(old_context);
     603             : 
     604             :     /*
     605             :      * We allow decoding of prepared transactions when the two_phase is
     606             :      * enabled at the time of slot creation, or when the two_phase option is
     607             :      * given at the streaming start, provided the plugin supports all the
     608             :      * callbacks for two-phase.
     609             :      */
     610        1058 :     ctx->twophase &= (slot->data.two_phase || ctx->twophase_opt_given);
     611             : 
     612             :     /* Mark slot to allow two_phase decoding if not already marked */
     613        1058 :     if (ctx->twophase && !slot->data.two_phase)
     614             :     {
     615          12 :         SpinLockAcquire(&slot->mutex);
     616          12 :         slot->data.two_phase = true;
     617          12 :         slot->data.two_phase_at = start_lsn;
     618          12 :         SpinLockRelease(&slot->mutex);
     619          12 :         ReplicationSlotMarkDirty();
     620          12 :         ReplicationSlotSave();
     621          12 :         SnapBuildSetTwoPhaseAt(ctx->snapshot_builder, start_lsn);
     622             :     }
     623             : 
     624        1058 :     ctx->reorder->output_rewrites = ctx->options.receive_rewrites;
     625             : 
     626        1058 :     ereport(LOG,
     627             :             (errmsg("starting logical decoding for slot \"%s\"",
     628             :                     NameStr(slot->data.name)),
     629             :              errdetail("Streaming transactions committing after %X/%X, reading WAL from %X/%X.",
     630             :                        LSN_FORMAT_ARGS(slot->data.confirmed_flush),
     631             :                        LSN_FORMAT_ARGS(slot->data.restart_lsn))));
     632             : 
     633        1058 :     return ctx;
     634             : }
     635             : 
     636             : /*
     637             :  * Returns true if a consistent initial decoding snapshot has been built.
     638             :  */
     639             : bool
     640         872 : DecodingContextReady(LogicalDecodingContext *ctx)
     641             : {
     642         872 :     return SnapBuildCurrentState(ctx->snapshot_builder) == SNAPBUILD_CONSISTENT;
     643             : }
     644             : 
     645             : /*
     646             :  * Read from the decoding slot, until it is ready to start extracting changes.
     647             :  */
     648             : void
     649         782 : DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
     650             : {
     651         782 :     ReplicationSlot *slot = ctx->slot;
     652             : 
     653             :     /* Initialize from where to start reading WAL. */
     654         782 :     XLogBeginRead(ctx->reader, slot->data.restart_lsn);
     655             : 
     656         782 :     elog(DEBUG1, "searching for logical decoding starting point, starting at %X/%X",
     657             :          LSN_FORMAT_ARGS(slot->data.restart_lsn));
     658             : 
     659             :     /* Wait for a consistent starting point */
     660             :     for (;;)
     661          86 :     {
     662             :         XLogRecord *record;
     663         868 :         char       *err = NULL;
     664             : 
     665             :         /* the read_page callback waits for new WAL */
     666         868 :         record = XLogReadRecord(ctx->reader, &err);
     667         868 :         if (err)
     668           0 :             elog(ERROR, "could not find logical decoding starting point: %s", err);
     669         868 :         if (!record)
     670           0 :             elog(ERROR, "could not find logical decoding starting point");
     671             : 
     672         868 :         LogicalDecodingProcessRecord(ctx, ctx->reader);
     673             : 
     674             :         /* only continue till we found a consistent spot */
     675         864 :         if (DecodingContextReady(ctx))
     676         778 :             break;
     677             : 
     678          86 :         CHECK_FOR_INTERRUPTS();
     679             :     }
     680             : 
     681         778 :     SpinLockAcquire(&slot->mutex);
     682         778 :     slot->data.confirmed_flush = ctx->reader->EndRecPtr;
     683         778 :     if (slot->data.two_phase)
     684          12 :         slot->data.two_phase_at = ctx->reader->EndRecPtr;
     685         778 :     SpinLockRelease(&slot->mutex);
     686         778 : }
     687             : 
     688             : /*
     689             :  * Free a previously allocated decoding context, invoking the shutdown
     690             :  * callback if necessary.
     691             :  */
     692             : void
     693        1532 : FreeDecodingContext(LogicalDecodingContext *ctx)
     694             : {
     695        1532 :     if (ctx->callbacks.shutdown_cb != NULL)
     696        1496 :         shutdown_cb_wrapper(ctx);
     697             : 
     698        1532 :     ReorderBufferFree(ctx->reorder);
     699        1532 :     FreeSnapshotBuilder(ctx->snapshot_builder);
     700        1532 :     XLogReaderFree(ctx->reader);
     701        1532 :     MemoryContextDelete(ctx->context);
     702        1532 : }
     703             : 
     704             : /*
     705             :  * Prepare a write using the context's output routine.
     706             :  */
     707             : void
     708      671536 : OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
     709             : {
     710      671536 :     if (!ctx->accept_writes)
     711           0 :         elog(ERROR, "writes are only accepted in commit, begin and change callbacks");
     712             : 
     713      671536 :     ctx->prepare_write(ctx, ctx->write_location, ctx->write_xid, last_write);
     714      671536 :     ctx->prepared_write = true;
     715      671536 : }
     716             : 
     717             : /*
     718             :  * Perform a write using the context's output routine.
     719             :  */
     720             : void
     721      671536 : OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
     722             : {
     723      671536 :     if (!ctx->prepared_write)
     724           0 :         elog(ERROR, "OutputPluginPrepareWrite needs to be called before OutputPluginWrite");
     725             : 
     726      671536 :     ctx->write(ctx, ctx->write_location, ctx->write_xid, last_write);
     727      671532 :     ctx->prepared_write = false;
     728      671532 : }
     729             : 
     730             : /*
     731             :  * Update progress tracking (if supported).
     732             :  */
     733             : void
     734        7672 : OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx,
     735             :                            bool skipped_xact)
     736             : {
     737        7672 :     if (!ctx->update_progress)
     738        3162 :         return;
     739             : 
     740        4510 :     ctx->update_progress(ctx, ctx->write_location, ctx->write_xid,
     741             :                          skipped_xact);
     742             : }
     743             : 
     744             : /*
     745             :  * Load the output plugin, lookup its output plugin init function, and check
     746             :  * that it provides the required callbacks.
     747             :  */
     748             : static void
     749        1824 : LoadOutputPlugin(OutputPluginCallbacks *callbacks, const char *plugin)
     750             : {
     751             :     LogicalOutputPluginInit plugin_init;
     752             : 
     753        1822 :     plugin_init = (LogicalOutputPluginInit)
     754        1824 :         load_external_function(plugin, "_PG_output_plugin_init", false, NULL);
     755             : 
     756        1822 :     if (plugin_init == NULL)
     757           0 :         elog(ERROR, "output plugins have to declare the _PG_output_plugin_init symbol");
     758             : 
     759             :     /* ask the output plugin to fill the callback struct */
     760        1822 :     plugin_init(callbacks);
     761             : 
     762        1822 :     if (callbacks->begin_cb == NULL)
     763           0 :         elog(ERROR, "output plugins have to register a begin callback");
     764        1822 :     if (callbacks->change_cb == NULL)
     765           0 :         elog(ERROR, "output plugins have to register a change callback");
     766        1822 :     if (callbacks->commit_cb == NULL)
     767           0 :         elog(ERROR, "output plugins have to register a commit callback");
     768        1822 : }
     769             : 
     770             : static void
     771          18 : output_plugin_error_callback(void *arg)
     772             : {
     773          18 :     LogicalErrorCallbackState *state = (LogicalErrorCallbackState *) arg;
     774             : 
     775             :     /* not all callbacks have an associated LSN  */
     776          18 :     if (state->report_location != InvalidXLogRecPtr)
     777          12 :         errcontext("slot \"%s\", output plugin \"%s\", in the %s callback, associated LSN %X/%X",
     778          12 :                    NameStr(state->ctx->slot->data.name),
     779          12 :                    NameStr(state->ctx->slot->data.plugin),
     780             :                    state->callback_name,
     781          12 :                    LSN_FORMAT_ARGS(state->report_location));
     782             :     else
     783           6 :         errcontext("slot \"%s\", output plugin \"%s\", in the %s callback",
     784           6 :                    NameStr(state->ctx->slot->data.name),
     785           6 :                    NameStr(state->ctx->slot->data.plugin),
     786             :                    state->callback_name);
     787          18 : }
     788             : 
     789             : static void
     790        1822 : startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
     791             : {
     792             :     LogicalErrorCallbackState state;
     793             :     ErrorContextCallback errcallback;
     794             : 
     795             :     Assert(!ctx->fast_forward);
     796             : 
     797             :     /* Push callback + info on the error context stack */
     798        1822 :     state.ctx = ctx;
     799        1822 :     state.callback_name = "startup";
     800        1822 :     state.report_location = InvalidXLogRecPtr;
     801        1822 :     errcallback.callback = output_plugin_error_callback;
     802        1822 :     errcallback.arg = (void *) &state;
     803        1822 :     errcallback.previous = error_context_stack;
     804        1822 :     error_context_stack = &errcallback;
     805             : 
     806             :     /* set output state */
     807        1822 :     ctx->accept_writes = false;
     808        1822 :     ctx->end_xact = false;
     809             : 
     810             :     /* do the actual work: call callback */
     811        1822 :     ctx->callbacks.startup_cb(ctx, opt, is_init);
     812             : 
     813             :     /* Pop the error context stack */
     814        1816 :     error_context_stack = errcallback.previous;
     815        1816 : }
     816             : 
     817             : static void
     818        1496 : shutdown_cb_wrapper(LogicalDecodingContext *ctx)
     819             : {
     820             :     LogicalErrorCallbackState state;
     821             :     ErrorContextCallback errcallback;
     822             : 
     823             :     Assert(!ctx->fast_forward);
     824             : 
     825             :     /* Push callback + info on the error context stack */
     826        1496 :     state.ctx = ctx;
     827        1496 :     state.callback_name = "shutdown";
     828        1496 :     state.report_location = InvalidXLogRecPtr;
     829        1496 :     errcallback.callback = output_plugin_error_callback;
     830        1496 :     errcallback.arg = (void *) &state;
     831        1496 :     errcallback.previous = error_context_stack;
     832        1496 :     error_context_stack = &errcallback;
     833             : 
     834             :     /* set output state */
     835        1496 :     ctx->accept_writes = false;
     836        1496 :     ctx->end_xact = false;
     837             : 
     838             :     /* do the actual work: call callback */
     839        1496 :     ctx->callbacks.shutdown_cb(ctx);
     840             : 
     841             :     /* Pop the error context stack */
     842        1496 :     error_context_stack = errcallback.previous;
     843        1496 : }
     844             : 
     845             : 
     846             : /*
     847             :  * Callbacks for ReorderBuffer which add in some more information and then call
     848             :  * output_plugin.h plugins.
     849             :  */
     850             : static void
     851        2116 : begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
     852             : {
     853        2116 :     LogicalDecodingContext *ctx = cache->private_data;
     854             :     LogicalErrorCallbackState state;
     855             :     ErrorContextCallback errcallback;
     856             : 
     857             :     Assert(!ctx->fast_forward);
     858             : 
     859             :     /* Push callback + info on the error context stack */
     860        2116 :     state.ctx = ctx;
     861        2116 :     state.callback_name = "begin";
     862        2116 :     state.report_location = txn->first_lsn;
     863        2116 :     errcallback.callback = output_plugin_error_callback;
     864        2116 :     errcallback.arg = (void *) &state;
     865        2116 :     errcallback.previous = error_context_stack;
     866        2116 :     error_context_stack = &errcallback;
     867             : 
     868             :     /* set output state */
     869        2116 :     ctx->accept_writes = true;
     870        2116 :     ctx->write_xid = txn->xid;
     871        2116 :     ctx->write_location = txn->first_lsn;
     872        2116 :     ctx->end_xact = false;
     873             : 
     874             :     /* do the actual work: call callback */
     875        2116 :     ctx->callbacks.begin_cb(ctx, txn);
     876             : 
     877             :     /* Pop the error context stack */
     878        2116 :     error_context_stack = errcallback.previous;
     879        2116 : }
     880             : 
     881             : static void
     882        2106 : commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
     883             :                   XLogRecPtr commit_lsn)
     884             : {
     885        2106 :     LogicalDecodingContext *ctx = cache->private_data;
     886             :     LogicalErrorCallbackState state;
     887             :     ErrorContextCallback errcallback;
     888             : 
     889             :     Assert(!ctx->fast_forward);
     890             : 
     891             :     /* Push callback + info on the error context stack */
     892        2106 :     state.ctx = ctx;
     893        2106 :     state.callback_name = "commit";
     894        2106 :     state.report_location = txn->final_lsn; /* beginning of commit record */
     895        2106 :     errcallback.callback = output_plugin_error_callback;
     896        2106 :     errcallback.arg = (void *) &state;
     897        2106 :     errcallback.previous = error_context_stack;
     898        2106 :     error_context_stack = &errcallback;
     899             : 
     900             :     /* set output state */
     901        2106 :     ctx->accept_writes = true;
     902        2106 :     ctx->write_xid = txn->xid;
     903        2106 :     ctx->write_location = txn->end_lsn; /* points to the end of the record */
     904        2106 :     ctx->end_xact = true;
     905             : 
     906             :     /* do the actual work: call callback */
     907        2106 :     ctx->callbacks.commit_cb(ctx, txn, commit_lsn);
     908             : 
     909             :     /* Pop the error context stack */
     910        2106 :     error_context_stack = errcallback.previous;
     911        2106 : }
     912             : 
     913             : /*
     914             :  * The functionality of begin_prepare is quite similar to begin with the
     915             :  * exception that this will have gid (global transaction id) information which
     916             :  * can be used by plugin. Now, we thought about extending the existing begin
     917             :  * but that would break the replication protocol and additionally this looks
     918             :  * cleaner.
     919             :  */
     920             : static void
     921          48 : begin_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
     922             : {
     923          48 :     LogicalDecodingContext *ctx = cache->private_data;
     924             :     LogicalErrorCallbackState state;
     925             :     ErrorContextCallback errcallback;
     926             : 
     927             :     Assert(!ctx->fast_forward);
     928             : 
     929             :     /* We're only supposed to call this when two-phase commits are supported */
     930             :     Assert(ctx->twophase);
     931             : 
     932             :     /* Push callback + info on the error context stack */
     933          48 :     state.ctx = ctx;
     934          48 :     state.callback_name = "begin_prepare";
     935          48 :     state.report_location = txn->first_lsn;
     936          48 :     errcallback.callback = output_plugin_error_callback;
     937          48 :     errcallback.arg = (void *) &state;
     938          48 :     errcallback.previous = error_context_stack;
     939          48 :     error_context_stack = &errcallback;
     940             : 
     941             :     /* set output state */
     942          48 :     ctx->accept_writes = true;
     943          48 :     ctx->write_xid = txn->xid;
     944          48 :     ctx->write_location = txn->first_lsn;
     945          48 :     ctx->end_xact = false;
     946             : 
     947             :     /*
     948             :      * If the plugin supports two-phase commits then begin prepare callback is
     949             :      * mandatory
     950             :      */
     951          48 :     if (ctx->callbacks.begin_prepare_cb == NULL)
     952           0 :         ereport(ERROR,
     953             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     954             :                  errmsg("logical replication at prepare time requires a %s callback",
     955             :                         "begin_prepare_cb")));
     956             : 
     957             :     /* do the actual work: call callback */
     958          48 :     ctx->callbacks.begin_prepare_cb(ctx, txn);
     959             : 
     960             :     /* Pop the error context stack */
     961          48 :     error_context_stack = errcallback.previous;
     962          48 : }
     963             : 
     964             : static void
     965          48 : prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
     966             :                    XLogRecPtr prepare_lsn)
     967             : {
     968          48 :     LogicalDecodingContext *ctx = cache->private_data;
     969             :     LogicalErrorCallbackState state;
     970             :     ErrorContextCallback errcallback;
     971             : 
     972             :     Assert(!ctx->fast_forward);
     973             : 
     974             :     /* We're only supposed to call this when two-phase commits are supported */
     975             :     Assert(ctx->twophase);
     976             : 
     977             :     /* Push callback + info on the error context stack */
     978          48 :     state.ctx = ctx;
     979          48 :     state.callback_name = "prepare";
     980          48 :     state.report_location = txn->final_lsn; /* beginning of prepare record */
     981          48 :     errcallback.callback = output_plugin_error_callback;
     982          48 :     errcallback.arg = (void *) &state;
     983          48 :     errcallback.previous = error_context_stack;
     984          48 :     error_context_stack = &errcallback;
     985             : 
     986             :     /* set output state */
     987          48 :     ctx->accept_writes = true;
     988          48 :     ctx->write_xid = txn->xid;
     989          48 :     ctx->write_location = txn->end_lsn; /* points to the end of the record */
     990          48 :     ctx->end_xact = true;
     991             : 
     992             :     /*
     993             :      * If the plugin supports two-phase commits then prepare callback is
     994             :      * mandatory
     995             :      */
     996          48 :     if (ctx->callbacks.prepare_cb == NULL)
     997           0 :         ereport(ERROR,
     998             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     999             :                  errmsg("logical replication at prepare time requires a %s callback",
    1000             :                         "prepare_cb")));
    1001             : 
    1002             :     /* do the actual work: call callback */
    1003          48 :     ctx->callbacks.prepare_cb(ctx, txn, prepare_lsn);
    1004             : 
    1005             :     /* Pop the error context stack */
    1006          48 :     error_context_stack = errcallback.previous;
    1007          48 : }
    1008             : 
    1009             : static void
    1010          58 : commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1011             :                            XLogRecPtr commit_lsn)
    1012             : {
    1013          58 :     LogicalDecodingContext *ctx = cache->private_data;
    1014             :     LogicalErrorCallbackState state;
    1015             :     ErrorContextCallback errcallback;
    1016             : 
    1017             :     Assert(!ctx->fast_forward);
    1018             : 
    1019             :     /* We're only supposed to call this when two-phase commits are supported */
    1020             :     Assert(ctx->twophase);
    1021             : 
    1022             :     /* Push callback + info on the error context stack */
    1023          58 :     state.ctx = ctx;
    1024          58 :     state.callback_name = "commit_prepared";
    1025          58 :     state.report_location = txn->final_lsn; /* beginning of commit record */
    1026          58 :     errcallback.callback = output_plugin_error_callback;
    1027          58 :     errcallback.arg = (void *) &state;
    1028          58 :     errcallback.previous = error_context_stack;
    1029          58 :     error_context_stack = &errcallback;
    1030             : 
    1031             :     /* set output state */
    1032          58 :     ctx->accept_writes = true;
    1033          58 :     ctx->write_xid = txn->xid;
    1034          58 :     ctx->write_location = txn->end_lsn; /* points to the end of the record */
    1035          58 :     ctx->end_xact = true;
    1036             : 
    1037             :     /*
    1038             :      * If the plugin support two-phase commits then commit prepared callback
    1039             :      * is mandatory
    1040             :      */
    1041          58 :     if (ctx->callbacks.commit_prepared_cb == NULL)
    1042           0 :         ereport(ERROR,
    1043             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1044             :                  errmsg("logical replication at prepare time requires a %s callback",
    1045             :                         "commit_prepared_cb")));
    1046             : 
    1047             :     /* do the actual work: call callback */
    1048          58 :     ctx->callbacks.commit_prepared_cb(ctx, txn, commit_lsn);
    1049             : 
    1050             :     /* Pop the error context stack */
    1051          58 :     error_context_stack = errcallback.previous;
    1052          58 : }
    1053             : 
    1054             : static void
    1055          20 : rollback_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1056             :                              XLogRecPtr prepare_end_lsn,
    1057             :                              TimestampTz prepare_time)
    1058             : {
    1059          20 :     LogicalDecodingContext *ctx = cache->private_data;
    1060             :     LogicalErrorCallbackState state;
    1061             :     ErrorContextCallback errcallback;
    1062             : 
    1063             :     Assert(!ctx->fast_forward);
    1064             : 
    1065             :     /* We're only supposed to call this when two-phase commits are supported */
    1066             :     Assert(ctx->twophase);
    1067             : 
    1068             :     /* Push callback + info on the error context stack */
    1069          20 :     state.ctx = ctx;
    1070          20 :     state.callback_name = "rollback_prepared";
    1071          20 :     state.report_location = txn->final_lsn; /* beginning of commit record */
    1072          20 :     errcallback.callback = output_plugin_error_callback;
    1073          20 :     errcallback.arg = (void *) &state;
    1074          20 :     errcallback.previous = error_context_stack;
    1075          20 :     error_context_stack = &errcallback;
    1076             : 
    1077             :     /* set output state */
    1078          20 :     ctx->accept_writes = true;
    1079          20 :     ctx->write_xid = txn->xid;
    1080          20 :     ctx->write_location = txn->end_lsn; /* points to the end of the record */
    1081          20 :     ctx->end_xact = true;
    1082             : 
    1083             :     /*
    1084             :      * If the plugin support two-phase commits then rollback prepared callback
    1085             :      * is mandatory
    1086             :      */
    1087          20 :     if (ctx->callbacks.rollback_prepared_cb == NULL)
    1088           0 :         ereport(ERROR,
    1089             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1090             :                  errmsg("logical replication at prepare time requires a %s callback",
    1091             :                         "rollback_prepared_cb")));
    1092             : 
    1093             :     /* do the actual work: call callback */
    1094          20 :     ctx->callbacks.rollback_prepared_cb(ctx, txn, prepare_end_lsn,
    1095             :                                         prepare_time);
    1096             : 
    1097             :     /* Pop the error context stack */
    1098          20 :     error_context_stack = errcallback.previous;
    1099          20 : }
    1100             : 
    1101             : static void
    1102      315694 : change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1103             :                   Relation relation, ReorderBufferChange *change)
    1104             : {
    1105      315694 :     LogicalDecodingContext *ctx = cache->private_data;
    1106             :     LogicalErrorCallbackState state;
    1107             :     ErrorContextCallback errcallback;
    1108             : 
    1109             :     Assert(!ctx->fast_forward);
    1110             : 
    1111             :     /* Push callback + info on the error context stack */
    1112      315694 :     state.ctx = ctx;
    1113      315694 :     state.callback_name = "change";
    1114      315694 :     state.report_location = change->lsn;
    1115      315694 :     errcallback.callback = output_plugin_error_callback;
    1116      315694 :     errcallback.arg = (void *) &state;
    1117      315694 :     errcallback.previous = error_context_stack;
    1118      315694 :     error_context_stack = &errcallback;
    1119             : 
    1120             :     /* set output state */
    1121      315694 :     ctx->accept_writes = true;
    1122      315694 :     ctx->write_xid = txn->xid;
    1123             : 
    1124             :     /*
    1125             :      * Report this change's lsn so replies from clients can give an up-to-date
    1126             :      * answer. This won't ever be enough (and shouldn't be!) to confirm
    1127             :      * receipt of this transaction, but it might allow another transaction's
    1128             :      * commit to be confirmed with one message.
    1129             :      */
    1130      315694 :     ctx->write_location = change->lsn;
    1131             : 
    1132      315694 :     ctx->end_xact = false;
    1133             : 
    1134      315694 :     ctx->callbacks.change_cb(ctx, txn, relation, change);
    1135             : 
    1136             :     /* Pop the error context stack */
    1137      315684 :     error_context_stack = errcallback.previous;
    1138      315684 : }
    1139             : 
    1140             : static void
    1141          38 : truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1142             :                     int nrelations, Relation relations[], ReorderBufferChange *change)
    1143             : {
    1144          38 :     LogicalDecodingContext *ctx = cache->private_data;
    1145             :     LogicalErrorCallbackState state;
    1146             :     ErrorContextCallback errcallback;
    1147             : 
    1148             :     Assert(!ctx->fast_forward);
    1149             : 
    1150          38 :     if (!ctx->callbacks.truncate_cb)
    1151           0 :         return;
    1152             : 
    1153             :     /* Push callback + info on the error context stack */
    1154          38 :     state.ctx = ctx;
    1155          38 :     state.callback_name = "truncate";
    1156          38 :     state.report_location = change->lsn;
    1157          38 :     errcallback.callback = output_plugin_error_callback;
    1158          38 :     errcallback.arg = (void *) &state;
    1159          38 :     errcallback.previous = error_context_stack;
    1160          38 :     error_context_stack = &errcallback;
    1161             : 
    1162             :     /* set output state */
    1163          38 :     ctx->accept_writes = true;
    1164          38 :     ctx->write_xid = txn->xid;
    1165             : 
    1166             :     /*
    1167             :      * Report this change's lsn so replies from clients can give an up-to-date
    1168             :      * answer. This won't ever be enough (and shouldn't be!) to confirm
    1169             :      * receipt of this transaction, but it might allow another transaction's
    1170             :      * commit to be confirmed with one message.
    1171             :      */
    1172          38 :     ctx->write_location = change->lsn;
    1173             : 
    1174          38 :     ctx->end_xact = false;
    1175             : 
    1176          38 :     ctx->callbacks.truncate_cb(ctx, txn, nrelations, relations, change);
    1177             : 
    1178             :     /* Pop the error context stack */
    1179          38 :     error_context_stack = errcallback.previous;
    1180             : }
    1181             : 
    1182             : bool
    1183         230 : filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, TransactionId xid,
    1184             :                           const char *gid)
    1185             : {
    1186             :     LogicalErrorCallbackState state;
    1187             :     ErrorContextCallback errcallback;
    1188             :     bool        ret;
    1189             : 
    1190             :     Assert(!ctx->fast_forward);
    1191             : 
    1192             :     /* Push callback + info on the error context stack */
    1193         230 :     state.ctx = ctx;
    1194         230 :     state.callback_name = "filter_prepare";
    1195         230 :     state.report_location = InvalidXLogRecPtr;
    1196         230 :     errcallback.callback = output_plugin_error_callback;
    1197         230 :     errcallback.arg = (void *) &state;
    1198         230 :     errcallback.previous = error_context_stack;
    1199         230 :     error_context_stack = &errcallback;
    1200             : 
    1201             :     /* set output state */
    1202         230 :     ctx->accept_writes = false;
    1203         230 :     ctx->end_xact = false;
    1204             : 
    1205             :     /* do the actual work: call callback */
    1206         230 :     ret = ctx->callbacks.filter_prepare_cb(ctx, xid, gid);
    1207             : 
    1208             :     /* Pop the error context stack */
    1209         230 :     error_context_stack = errcallback.previous;
    1210             : 
    1211         230 :     return ret;
    1212             : }
    1213             : 
    1214             : bool
    1215     3370284 : filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
    1216             : {
    1217             :     LogicalErrorCallbackState state;
    1218             :     ErrorContextCallback errcallback;
    1219             :     bool        ret;
    1220             : 
    1221             :     Assert(!ctx->fast_forward);
    1222             : 
    1223             :     /* Push callback + info on the error context stack */
    1224     3370284 :     state.ctx = ctx;
    1225     3370284 :     state.callback_name = "filter_by_origin";
    1226     3370284 :     state.report_location = InvalidXLogRecPtr;
    1227     3370284 :     errcallback.callback = output_plugin_error_callback;
    1228     3370284 :     errcallback.arg = (void *) &state;
    1229     3370284 :     errcallback.previous = error_context_stack;
    1230     3370284 :     error_context_stack = &errcallback;
    1231             : 
    1232             :     /* set output state */
    1233     3370284 :     ctx->accept_writes = false;
    1234     3370284 :     ctx->end_xact = false;
    1235             : 
    1236             :     /* do the actual work: call callback */
    1237     3370284 :     ret = ctx->callbacks.filter_by_origin_cb(ctx, origin_id);
    1238             : 
    1239             :     /* Pop the error context stack */
    1240     3370284 :     error_context_stack = errcallback.previous;
    1241             : 
    1242     3370284 :     return ret;
    1243             : }
    1244             : 
    1245             : static void
    1246          32 : message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1247             :                    XLogRecPtr message_lsn, bool transactional,
    1248             :                    const char *prefix, Size message_size, const char *message)
    1249             : {
    1250          32 :     LogicalDecodingContext *ctx = cache->private_data;
    1251             :     LogicalErrorCallbackState state;
    1252             :     ErrorContextCallback errcallback;
    1253             : 
    1254             :     Assert(!ctx->fast_forward);
    1255             : 
    1256          32 :     if (ctx->callbacks.message_cb == NULL)
    1257           0 :         return;
    1258             : 
    1259             :     /* Push callback + info on the error context stack */
    1260          32 :     state.ctx = ctx;
    1261          32 :     state.callback_name = "message";
    1262          32 :     state.report_location = message_lsn;
    1263          32 :     errcallback.callback = output_plugin_error_callback;
    1264          32 :     errcallback.arg = (void *) &state;
    1265          32 :     errcallback.previous = error_context_stack;
    1266          32 :     error_context_stack = &errcallback;
    1267             : 
    1268             :     /* set output state */
    1269          32 :     ctx->accept_writes = true;
    1270          32 :     ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
    1271          32 :     ctx->write_location = message_lsn;
    1272          32 :     ctx->end_xact = false;
    1273             : 
    1274             :     /* do the actual work: call callback */
    1275          32 :     ctx->callbacks.message_cb(ctx, txn, message_lsn, transactional, prefix,
    1276             :                               message_size, message);
    1277             : 
    1278             :     /* Pop the error context stack */
    1279          32 :     error_context_stack = errcallback.previous;
    1280             : }
    1281             : 
    1282             : static void
    1283        1298 : stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1284             :                         XLogRecPtr first_lsn)
    1285             : {
    1286        1298 :     LogicalDecodingContext *ctx = cache->private_data;
    1287             :     LogicalErrorCallbackState state;
    1288             :     ErrorContextCallback errcallback;
    1289             : 
    1290             :     Assert(!ctx->fast_forward);
    1291             : 
    1292             :     /* We're only supposed to call this when streaming is supported. */
    1293             :     Assert(ctx->streaming);
    1294             : 
    1295             :     /* Push callback + info on the error context stack */
    1296        1298 :     state.ctx = ctx;
    1297        1298 :     state.callback_name = "stream_start";
    1298        1298 :     state.report_location = first_lsn;
    1299        1298 :     errcallback.callback = output_plugin_error_callback;
    1300        1298 :     errcallback.arg = (void *) &state;
    1301        1298 :     errcallback.previous = error_context_stack;
    1302        1298 :     error_context_stack = &errcallback;
    1303             : 
    1304             :     /* set output state */
    1305        1298 :     ctx->accept_writes = true;
    1306        1298 :     ctx->write_xid = txn->xid;
    1307             : 
    1308             :     /*
    1309             :      * Report this message's lsn so replies from clients can give an
    1310             :      * up-to-date answer. This won't ever be enough (and shouldn't be!) to
    1311             :      * confirm receipt of this transaction, but it might allow another
    1312             :      * transaction's commit to be confirmed with one message.
    1313             :      */
    1314        1298 :     ctx->write_location = first_lsn;
    1315             : 
    1316        1298 :     ctx->end_xact = false;
    1317             : 
    1318             :     /* in streaming mode, stream_start_cb is required */
    1319        1298 :     if (ctx->callbacks.stream_start_cb == NULL)
    1320           0 :         ereport(ERROR,
    1321             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1322             :                  errmsg("logical streaming requires a %s callback",
    1323             :                         "stream_start_cb")));
    1324             : 
    1325        1298 :     ctx->callbacks.stream_start_cb(ctx, txn);
    1326             : 
    1327             :     /* Pop the error context stack */
    1328        1298 :     error_context_stack = errcallback.previous;
    1329        1298 : }
    1330             : 
    1331             : static void
    1332        1298 : stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1333             :                        XLogRecPtr last_lsn)
    1334             : {
    1335        1298 :     LogicalDecodingContext *ctx = cache->private_data;
    1336             :     LogicalErrorCallbackState state;
    1337             :     ErrorContextCallback errcallback;
    1338             : 
    1339             :     Assert(!ctx->fast_forward);
    1340             : 
    1341             :     /* We're only supposed to call this when streaming is supported. */
    1342             :     Assert(ctx->streaming);
    1343             : 
    1344             :     /* Push callback + info on the error context stack */
    1345        1298 :     state.ctx = ctx;
    1346        1298 :     state.callback_name = "stream_stop";
    1347        1298 :     state.report_location = last_lsn;
    1348        1298 :     errcallback.callback = output_plugin_error_callback;
    1349        1298 :     errcallback.arg = (void *) &state;
    1350        1298 :     errcallback.previous = error_context_stack;
    1351        1298 :     error_context_stack = &errcallback;
    1352             : 
    1353             :     /* set output state */
    1354        1298 :     ctx->accept_writes = true;
    1355        1298 :     ctx->write_xid = txn->xid;
    1356             : 
    1357             :     /*
    1358             :      * Report this message's lsn so replies from clients can give an
    1359             :      * up-to-date answer. This won't ever be enough (and shouldn't be!) to
    1360             :      * confirm receipt of this transaction, but it might allow another
    1361             :      * transaction's commit to be confirmed with one message.
    1362             :      */
    1363        1298 :     ctx->write_location = last_lsn;
    1364             : 
    1365        1298 :     ctx->end_xact = false;
    1366             : 
    1367             :     /* in streaming mode, stream_stop_cb is required */
    1368        1298 :     if (ctx->callbacks.stream_stop_cb == NULL)
    1369           0 :         ereport(ERROR,
    1370             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1371             :                  errmsg("logical streaming requires a %s callback",
    1372             :                         "stream_stop_cb")));
    1373             : 
    1374        1298 :     ctx->callbacks.stream_stop_cb(ctx, txn);
    1375             : 
    1376             :     /* Pop the error context stack */
    1377        1298 :     error_context_stack = errcallback.previous;
    1378        1298 : }
    1379             : 
    1380             : static void
    1381          58 : stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1382             :                         XLogRecPtr abort_lsn)
    1383             : {
    1384          58 :     LogicalDecodingContext *ctx = cache->private_data;
    1385             :     LogicalErrorCallbackState state;
    1386             :     ErrorContextCallback errcallback;
    1387             : 
    1388             :     Assert(!ctx->fast_forward);
    1389             : 
    1390             :     /* We're only supposed to call this when streaming is supported. */
    1391             :     Assert(ctx->streaming);
    1392             : 
    1393             :     /* Push callback + info on the error context stack */
    1394          58 :     state.ctx = ctx;
    1395          58 :     state.callback_name = "stream_abort";
    1396          58 :     state.report_location = abort_lsn;
    1397          58 :     errcallback.callback = output_plugin_error_callback;
    1398          58 :     errcallback.arg = (void *) &state;
    1399          58 :     errcallback.previous = error_context_stack;
    1400          58 :     error_context_stack = &errcallback;
    1401             : 
    1402             :     /* set output state */
    1403          58 :     ctx->accept_writes = true;
    1404          58 :     ctx->write_xid = txn->xid;
    1405          58 :     ctx->write_location = abort_lsn;
    1406          58 :     ctx->end_xact = true;
    1407             : 
    1408             :     /* in streaming mode, stream_abort_cb is required */
    1409          58 :     if (ctx->callbacks.stream_abort_cb == NULL)
    1410           0 :         ereport(ERROR,
    1411             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1412             :                  errmsg("logical streaming requires a %s callback",
    1413             :                         "stream_abort_cb")));
    1414             : 
    1415          58 :     ctx->callbacks.stream_abort_cb(ctx, txn, abort_lsn);
    1416             : 
    1417             :     /* Pop the error context stack */
    1418          58 :     error_context_stack = errcallback.previous;
    1419          58 : }
    1420             : 
    1421             : static void
    1422          30 : stream_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1423             :                           XLogRecPtr prepare_lsn)
    1424             : {
    1425          30 :     LogicalDecodingContext *ctx = cache->private_data;
    1426             :     LogicalErrorCallbackState state;
    1427             :     ErrorContextCallback errcallback;
    1428             : 
    1429             :     Assert(!ctx->fast_forward);
    1430             : 
    1431             :     /*
    1432             :      * We're only supposed to call this when streaming and two-phase commits
    1433             :      * are supported.
    1434             :      */
    1435             :     Assert(ctx->streaming);
    1436             :     Assert(ctx->twophase);
    1437             : 
    1438             :     /* Push callback + info on the error context stack */
    1439          30 :     state.ctx = ctx;
    1440          30 :     state.callback_name = "stream_prepare";
    1441          30 :     state.report_location = txn->final_lsn;
    1442          30 :     errcallback.callback = output_plugin_error_callback;
    1443          30 :     errcallback.arg = (void *) &state;
    1444          30 :     errcallback.previous = error_context_stack;
    1445          30 :     error_context_stack = &errcallback;
    1446             : 
    1447             :     /* set output state */
    1448          30 :     ctx->accept_writes = true;
    1449          30 :     ctx->write_xid = txn->xid;
    1450          30 :     ctx->write_location = txn->end_lsn;
    1451          30 :     ctx->end_xact = true;
    1452             : 
    1453             :     /* in streaming mode with two-phase commits, stream_prepare_cb is required */
    1454          30 :     if (ctx->callbacks.stream_prepare_cb == NULL)
    1455           0 :         ereport(ERROR,
    1456             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1457             :                  errmsg("logical streaming at prepare time requires a %s callback",
    1458             :                         "stream_prepare_cb")));
    1459             : 
    1460          30 :     ctx->callbacks.stream_prepare_cb(ctx, txn, prepare_lsn);
    1461             : 
    1462             :     /* Pop the error context stack */
    1463          30 :     error_context_stack = errcallback.previous;
    1464          30 : }
    1465             : 
    1466             : static void
    1467         100 : stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1468             :                          XLogRecPtr commit_lsn)
    1469             : {
    1470         100 :     LogicalDecodingContext *ctx = cache->private_data;
    1471             :     LogicalErrorCallbackState state;
    1472             :     ErrorContextCallback errcallback;
    1473             : 
    1474             :     Assert(!ctx->fast_forward);
    1475             : 
    1476             :     /* We're only supposed to call this when streaming is supported. */
    1477             :     Assert(ctx->streaming);
    1478             : 
    1479             :     /* Push callback + info on the error context stack */
    1480         100 :     state.ctx = ctx;
    1481         100 :     state.callback_name = "stream_commit";
    1482         100 :     state.report_location = txn->final_lsn;
    1483         100 :     errcallback.callback = output_plugin_error_callback;
    1484         100 :     errcallback.arg = (void *) &state;
    1485         100 :     errcallback.previous = error_context_stack;
    1486         100 :     error_context_stack = &errcallback;
    1487             : 
    1488             :     /* set output state */
    1489         100 :     ctx->accept_writes = true;
    1490         100 :     ctx->write_xid = txn->xid;
    1491         100 :     ctx->write_location = txn->end_lsn;
    1492         100 :     ctx->end_xact = true;
    1493             : 
    1494             :     /* in streaming mode, stream_commit_cb is required */
    1495         100 :     if (ctx->callbacks.stream_commit_cb == NULL)
    1496           0 :         ereport(ERROR,
    1497             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1498             :                  errmsg("logical streaming requires a %s callback",
    1499             :                         "stream_commit_cb")));
    1500             : 
    1501         100 :     ctx->callbacks.stream_commit_cb(ctx, txn, commit_lsn);
    1502             : 
    1503             :     /* Pop the error context stack */
    1504         100 :     error_context_stack = errcallback.previous;
    1505         100 : }
    1506             : 
    1507             : static void
    1508      352008 : stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1509             :                          Relation relation, ReorderBufferChange *change)
    1510             : {
    1511      352008 :     LogicalDecodingContext *ctx = cache->private_data;
    1512             :     LogicalErrorCallbackState state;
    1513             :     ErrorContextCallback errcallback;
    1514             : 
    1515             :     Assert(!ctx->fast_forward);
    1516             : 
    1517             :     /* We're only supposed to call this when streaming is supported. */
    1518             :     Assert(ctx->streaming);
    1519             : 
    1520             :     /* Push callback + info on the error context stack */
    1521      352008 :     state.ctx = ctx;
    1522      352008 :     state.callback_name = "stream_change";
    1523      352008 :     state.report_location = change->lsn;
    1524      352008 :     errcallback.callback = output_plugin_error_callback;
    1525      352008 :     errcallback.arg = (void *) &state;
    1526      352008 :     errcallback.previous = error_context_stack;
    1527      352008 :     error_context_stack = &errcallback;
    1528             : 
    1529             :     /* set output state */
    1530      352008 :     ctx->accept_writes = true;
    1531      352008 :     ctx->write_xid = txn->xid;
    1532             : 
    1533             :     /*
    1534             :      * Report this change's lsn so replies from clients can give an up-to-date
    1535             :      * answer. This won't ever be enough (and shouldn't be!) to confirm
    1536             :      * receipt of this transaction, but it might allow another transaction's
    1537             :      * commit to be confirmed with one message.
    1538             :      */
    1539      352008 :     ctx->write_location = change->lsn;
    1540             : 
    1541      352008 :     ctx->end_xact = false;
    1542             : 
    1543             :     /* in streaming mode, stream_change_cb is required */
    1544      352008 :     if (ctx->callbacks.stream_change_cb == NULL)
    1545           0 :         ereport(ERROR,
    1546             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1547             :                  errmsg("logical streaming requires a %s callback",
    1548             :                         "stream_change_cb")));
    1549             : 
    1550      352008 :     ctx->callbacks.stream_change_cb(ctx, txn, relation, change);
    1551             : 
    1552             :     /* Pop the error context stack */
    1553      352008 :     error_context_stack = errcallback.previous;
    1554      352008 : }
    1555             : 
    1556             : static void
    1557           6 : stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1558             :                           XLogRecPtr message_lsn, bool transactional,
    1559             :                           const char *prefix, Size message_size, const char *message)
    1560             : {
    1561           6 :     LogicalDecodingContext *ctx = cache->private_data;
    1562             :     LogicalErrorCallbackState state;
    1563             :     ErrorContextCallback errcallback;
    1564             : 
    1565             :     Assert(!ctx->fast_forward);
    1566             : 
    1567             :     /* We're only supposed to call this when streaming is supported. */
    1568             :     Assert(ctx->streaming);
    1569             : 
    1570             :     /* this callback is optional */
    1571           6 :     if (ctx->callbacks.stream_message_cb == NULL)
    1572           0 :         return;
    1573             : 
    1574             :     /* Push callback + info on the error context stack */
    1575           6 :     state.ctx = ctx;
    1576           6 :     state.callback_name = "stream_message";
    1577           6 :     state.report_location = message_lsn;
    1578           6 :     errcallback.callback = output_plugin_error_callback;
    1579           6 :     errcallback.arg = (void *) &state;
    1580           6 :     errcallback.previous = error_context_stack;
    1581           6 :     error_context_stack = &errcallback;
    1582             : 
    1583             :     /* set output state */
    1584           6 :     ctx->accept_writes = true;
    1585           6 :     ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
    1586           6 :     ctx->write_location = message_lsn;
    1587           6 :     ctx->end_xact = false;
    1588             : 
    1589             :     /* do the actual work: call callback */
    1590           6 :     ctx->callbacks.stream_message_cb(ctx, txn, message_lsn, transactional, prefix,
    1591             :                                      message_size, message);
    1592             : 
    1593             :     /* Pop the error context stack */
    1594           6 :     error_context_stack = errcallback.previous;
    1595             : }
    1596             : 
    1597             : static void
    1598           0 : stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1599             :                            int nrelations, Relation relations[],
    1600             :                            ReorderBufferChange *change)
    1601             : {
    1602           0 :     LogicalDecodingContext *ctx = cache->private_data;
    1603             :     LogicalErrorCallbackState state;
    1604             :     ErrorContextCallback errcallback;
    1605             : 
    1606             :     Assert(!ctx->fast_forward);
    1607             : 
    1608             :     /* We're only supposed to call this when streaming is supported. */
    1609             :     Assert(ctx->streaming);
    1610             : 
    1611             :     /* this callback is optional */
    1612           0 :     if (!ctx->callbacks.stream_truncate_cb)
    1613           0 :         return;
    1614             : 
    1615             :     /* Push callback + info on the error context stack */
    1616           0 :     state.ctx = ctx;
    1617           0 :     state.callback_name = "stream_truncate";
    1618           0 :     state.report_location = change->lsn;
    1619           0 :     errcallback.callback = output_plugin_error_callback;
    1620           0 :     errcallback.arg = (void *) &state;
    1621           0 :     errcallback.previous = error_context_stack;
    1622           0 :     error_context_stack = &errcallback;
    1623             : 
    1624             :     /* set output state */
    1625           0 :     ctx->accept_writes = true;
    1626           0 :     ctx->write_xid = txn->xid;
    1627             : 
    1628             :     /*
    1629             :      * Report this change's lsn so replies from clients can give an up-to-date
    1630             :      * answer. This won't ever be enough (and shouldn't be!) to confirm
    1631             :      * receipt of this transaction, but it might allow another transaction's
    1632             :      * commit to be confirmed with one message.
    1633             :      */
    1634           0 :     ctx->write_location = change->lsn;
    1635             : 
    1636           0 :     ctx->end_xact = false;
    1637             : 
    1638           0 :     ctx->callbacks.stream_truncate_cb(ctx, txn, nrelations, relations, change);
    1639             : 
    1640             :     /* Pop the error context stack */
    1641           0 :     error_context_stack = errcallback.previous;
    1642             : }
    1643             : 
    1644             : static void
    1645        6208 : update_progress_txn_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1646             :                                XLogRecPtr lsn)
    1647             : {
    1648        6208 :     LogicalDecodingContext *ctx = cache->private_data;
    1649             :     LogicalErrorCallbackState state;
    1650             :     ErrorContextCallback errcallback;
    1651             : 
    1652             :     Assert(!ctx->fast_forward);
    1653             : 
    1654             :     /* Push callback + info on the error context stack */
    1655        6208 :     state.ctx = ctx;
    1656        6208 :     state.callback_name = "update_progress_txn";
    1657        6208 :     state.report_location = lsn;
    1658        6208 :     errcallback.callback = output_plugin_error_callback;
    1659        6208 :     errcallback.arg = (void *) &state;
    1660        6208 :     errcallback.previous = error_context_stack;
    1661        6208 :     error_context_stack = &errcallback;
    1662             : 
    1663             :     /* set output state */
    1664        6208 :     ctx->accept_writes = false;
    1665        6208 :     ctx->write_xid = txn->xid;
    1666             : 
    1667             :     /*
    1668             :      * Report this change's lsn so replies from clients can give an up-to-date
    1669             :      * answer. This won't ever be enough (and shouldn't be!) to confirm
    1670             :      * receipt of this transaction, but it might allow another transaction's
    1671             :      * commit to be confirmed with one message.
    1672             :      */
    1673        6208 :     ctx->write_location = lsn;
    1674             : 
    1675        6208 :     ctx->end_xact = false;
    1676             : 
    1677        6208 :     OutputPluginUpdateProgress(ctx, false);
    1678             : 
    1679             :     /* Pop the error context stack */
    1680        6208 :     error_context_stack = errcallback.previous;
    1681        6208 : }
    1682             : 
    1683             : /*
    1684             :  * Set the required catalog xmin horizon for historic snapshots in the current
    1685             :  * replication slot.
    1686             :  *
    1687             :  * Note that in the most cases, we won't be able to immediately use the xmin
    1688             :  * to increase the xmin horizon: we need to wait till the client has confirmed
    1689             :  * receiving current_lsn with LogicalConfirmReceivedLocation().
    1690             :  */
    1691             : void
    1692         618 : LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
    1693             : {
    1694         618 :     bool        updated_xmin = false;
    1695             :     ReplicationSlot *slot;
    1696         618 :     bool        got_new_xmin = false;
    1697             : 
    1698         618 :     slot = MyReplicationSlot;
    1699             : 
    1700             :     Assert(slot != NULL);
    1701             : 
    1702         618 :     SpinLockAcquire(&slot->mutex);
    1703             : 
    1704             :     /*
    1705             :      * don't overwrite if we already have a newer xmin. This can happen if we
    1706             :      * restart decoding in a slot.
    1707             :      */
    1708         618 :     if (TransactionIdPrecedesOrEquals(xmin, slot->data.catalog_xmin))
    1709             :     {
    1710             :     }
    1711             : 
    1712             :     /*
    1713             :      * If the client has already confirmed up to this lsn, we directly can
    1714             :      * mark this as accepted. This can happen if we restart decoding in a
    1715             :      * slot.
    1716             :      */
    1717         154 :     else if (current_lsn <= slot->data.confirmed_flush)
    1718             :     {
    1719          78 :         slot->candidate_catalog_xmin = xmin;
    1720          78 :         slot->candidate_xmin_lsn = current_lsn;
    1721             : 
    1722             :         /* our candidate can directly be used */
    1723          78 :         updated_xmin = true;
    1724             :     }
    1725             : 
    1726             :     /*
    1727             :      * Only increase if the previous values have been applied, otherwise we
    1728             :      * might never end up updating if the receiver acks too slowly.
    1729             :      */
    1730          76 :     else if (slot->candidate_xmin_lsn == InvalidXLogRecPtr)
    1731             :     {
    1732          30 :         slot->candidate_catalog_xmin = xmin;
    1733          30 :         slot->candidate_xmin_lsn = current_lsn;
    1734             : 
    1735             :         /*
    1736             :          * Log new xmin at an appropriate log level after releasing the
    1737             :          * spinlock.
    1738             :          */
    1739          30 :         got_new_xmin = true;
    1740             :     }
    1741         618 :     SpinLockRelease(&slot->mutex);
    1742             : 
    1743         618 :     if (got_new_xmin)
    1744          30 :         elog(DEBUG1, "got new catalog xmin %u at %X/%X", xmin,
    1745             :              LSN_FORMAT_ARGS(current_lsn));
    1746             : 
    1747             :     /* candidate already valid with the current flush position, apply */
    1748         618 :     if (updated_xmin)
    1749          78 :         LogicalConfirmReceivedLocation(slot->data.confirmed_flush);
    1750         618 : }
    1751             : 
    1752             : /*
    1753             :  * Mark the minimal LSN (restart_lsn) we need to read to replay all
    1754             :  * transactions that have not yet committed at current_lsn.
    1755             :  *
    1756             :  * Just like LogicalIncreaseXminForSlot this only takes effect when the
    1757             :  * client has confirmed to have received current_lsn.
    1758             :  */
    1759             : void
    1760         532 : LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart_lsn)
    1761             : {
    1762         532 :     bool        updated_lsn = false;
    1763             :     ReplicationSlot *slot;
    1764             : 
    1765         532 :     slot = MyReplicationSlot;
    1766             : 
    1767             :     Assert(slot != NULL);
    1768             :     Assert(restart_lsn != InvalidXLogRecPtr);
    1769             :     Assert(current_lsn != InvalidXLogRecPtr);
    1770             : 
    1771         532 :     SpinLockAcquire(&slot->mutex);
    1772             : 
    1773             :     /* don't overwrite if have a newer restart lsn */
    1774         532 :     if (restart_lsn <= slot->data.restart_lsn)
    1775             :     {
    1776             :     }
    1777             : 
    1778             :     /*
    1779             :      * We might have already flushed far enough to directly accept this lsn,
    1780             :      * in this case there is no need to check for existing candidate LSNs
    1781             :      */
    1782         512 :     else if (current_lsn <= slot->data.confirmed_flush)
    1783             :     {
    1784         400 :         slot->candidate_restart_valid = current_lsn;
    1785         400 :         slot->candidate_restart_lsn = restart_lsn;
    1786             : 
    1787             :         /* our candidate can directly be used */
    1788         400 :         updated_lsn = true;
    1789             :     }
    1790             : 
    1791             :     /*
    1792             :      * Only increase if the previous values have been applied, otherwise we
    1793             :      * might never end up updating if the receiver acks too slowly. A missed
    1794             :      * value here will just cause some extra effort after reconnecting.
    1795             :      */
    1796         532 :     if (slot->candidate_restart_valid == InvalidXLogRecPtr)
    1797             :     {
    1798          62 :         slot->candidate_restart_valid = current_lsn;
    1799          62 :         slot->candidate_restart_lsn = restart_lsn;
    1800          62 :         SpinLockRelease(&slot->mutex);
    1801             : 
    1802          62 :         elog(DEBUG1, "got new restart lsn %X/%X at %X/%X",
    1803             :              LSN_FORMAT_ARGS(restart_lsn),
    1804             :              LSN_FORMAT_ARGS(current_lsn));
    1805             :     }
    1806             :     else
    1807             :     {
    1808             :         XLogRecPtr  candidate_restart_lsn;
    1809             :         XLogRecPtr  candidate_restart_valid;
    1810             :         XLogRecPtr  confirmed_flush;
    1811             : 
    1812         470 :         candidate_restart_lsn = slot->candidate_restart_lsn;
    1813         470 :         candidate_restart_valid = slot->candidate_restart_valid;
    1814         470 :         confirmed_flush = slot->data.confirmed_flush;
    1815         470 :         SpinLockRelease(&slot->mutex);
    1816             : 
    1817         470 :         elog(DEBUG1, "failed to increase restart lsn: proposed %X/%X, after %X/%X, current candidate %X/%X, current after %X/%X, flushed up to %X/%X",
    1818             :              LSN_FORMAT_ARGS(restart_lsn),
    1819             :              LSN_FORMAT_ARGS(current_lsn),
    1820             :              LSN_FORMAT_ARGS(candidate_restart_lsn),
    1821             :              LSN_FORMAT_ARGS(candidate_restart_valid),
    1822             :              LSN_FORMAT_ARGS(confirmed_flush));
    1823             :     }
    1824             : 
    1825             :     /* candidates are already valid with the current flush position, apply */
    1826         532 :     if (updated_lsn)
    1827         400 :         LogicalConfirmReceivedLocation(slot->data.confirmed_flush);
    1828         532 : }
    1829             : 
    1830             : /*
    1831             :  * Handle a consumer's confirmation having received all changes up to lsn.
    1832             :  */
    1833             : void
    1834       55470 : LogicalConfirmReceivedLocation(XLogRecPtr lsn)
    1835             : {
    1836             :     Assert(lsn != InvalidXLogRecPtr);
    1837             : 
    1838             :     /* Do an unlocked check for candidate_lsn first. */
    1839       55470 :     if (MyReplicationSlot->candidate_xmin_lsn != InvalidXLogRecPtr ||
    1840       55366 :         MyReplicationSlot->candidate_restart_valid != InvalidXLogRecPtr)
    1841         540 :     {
    1842         540 :         bool        updated_xmin = false;
    1843         540 :         bool        updated_restart = false;
    1844             : 
    1845         540 :         SpinLockAcquire(&MyReplicationSlot->mutex);
    1846             : 
    1847         540 :         MyReplicationSlot->data.confirmed_flush = lsn;
    1848             : 
    1849             :         /* if we're past the location required for bumping xmin, do so */
    1850         540 :         if (MyReplicationSlot->candidate_xmin_lsn != InvalidXLogRecPtr &&
    1851         104 :             MyReplicationSlot->candidate_xmin_lsn <= lsn)
    1852             :         {
    1853             :             /*
    1854             :              * We have to write the changed xmin to disk *before* we change
    1855             :              * the in-memory value, otherwise after a crash we wouldn't know
    1856             :              * that some catalog tuples might have been removed already.
    1857             :              *
    1858             :              * Ensure that by first writing to ->xmin and only update
    1859             :              * ->effective_xmin once the new state is synced to disk. After a
    1860             :              * crash ->effective_xmin is set to ->xmin.
    1861             :              */
    1862          98 :             if (TransactionIdIsValid(MyReplicationSlot->candidate_catalog_xmin) &&
    1863          98 :                 MyReplicationSlot->data.catalog_xmin != MyReplicationSlot->candidate_catalog_xmin)
    1864             :             {
    1865          98 :                 MyReplicationSlot->data.catalog_xmin = MyReplicationSlot->candidate_catalog_xmin;
    1866          98 :                 MyReplicationSlot->candidate_catalog_xmin = InvalidTransactionId;
    1867          98 :                 MyReplicationSlot->candidate_xmin_lsn = InvalidXLogRecPtr;
    1868          98 :                 updated_xmin = true;
    1869             :             }
    1870             :         }
    1871             : 
    1872         540 :         if (MyReplicationSlot->candidate_restart_valid != InvalidXLogRecPtr &&
    1873         462 :             MyReplicationSlot->candidate_restart_valid <= lsn)
    1874             :         {
    1875             :             Assert(MyReplicationSlot->candidate_restart_lsn != InvalidXLogRecPtr);
    1876             : 
    1877         454 :             MyReplicationSlot->data.restart_lsn = MyReplicationSlot->candidate_restart_lsn;
    1878         454 :             MyReplicationSlot->candidate_restart_lsn = InvalidXLogRecPtr;
    1879         454 :             MyReplicationSlot->candidate_restart_valid = InvalidXLogRecPtr;
    1880         454 :             updated_restart = true;
    1881             :         }
    1882             : 
    1883         540 :         SpinLockRelease(&MyReplicationSlot->mutex);
    1884             : 
    1885             :         /* first write new xmin to disk, so we know what's up after a crash */
    1886         540 :         if (updated_xmin || updated_restart)
    1887             :         {
    1888         532 :             ReplicationSlotMarkDirty();
    1889         532 :             ReplicationSlotSave();
    1890         532 :             elog(DEBUG1, "updated xmin: %u restart: %u", updated_xmin, updated_restart);
    1891             :         }
    1892             : 
    1893             :         /*
    1894             :          * Now the new xmin is safely on disk, we can let the global value
    1895             :          * advance. We do not take ProcArrayLock or similar since we only
    1896             :          * advance xmin here and there's not much harm done by a concurrent
    1897             :          * computation missing that.
    1898             :          */
    1899         540 :         if (updated_xmin)
    1900             :         {
    1901          98 :             SpinLockAcquire(&MyReplicationSlot->mutex);
    1902          98 :             MyReplicationSlot->effective_catalog_xmin = MyReplicationSlot->data.catalog_xmin;
    1903          98 :             SpinLockRelease(&MyReplicationSlot->mutex);
    1904             : 
    1905          98 :             ReplicationSlotsComputeRequiredXmin(false);
    1906          98 :             ReplicationSlotsComputeRequiredLSN();
    1907             :         }
    1908             :     }
    1909             :     else
    1910             :     {
    1911       54930 :         SpinLockAcquire(&MyReplicationSlot->mutex);
    1912       54930 :         MyReplicationSlot->data.confirmed_flush = lsn;
    1913       54930 :         SpinLockRelease(&MyReplicationSlot->mutex);
    1914             :     }
    1915       55470 : }
    1916             : 
    1917             : /*
    1918             :  * Clear logical streaming state during (sub)transaction abort.
    1919             :  */
    1920             : void
    1921       54264 : ResetLogicalStreamingState(void)
    1922             : {
    1923       54264 :     CheckXidAlive = InvalidTransactionId;
    1924       54264 :     bsysscan = false;
    1925       54264 : }
    1926             : 
    1927             : /*
    1928             :  * Report stats for a slot.
    1929             :  */
    1930             : void
    1931       11438 : UpdateDecodingStats(LogicalDecodingContext *ctx)
    1932             : {
    1933       11438 :     ReorderBuffer *rb = ctx->reorder;
    1934             :     PgStat_StatReplSlotEntry repSlotStat;
    1935             : 
    1936             :     /* Nothing to do if we don't have any replication stats to be sent. */
    1937       11438 :     if (rb->spillBytes <= 0 && rb->streamBytes <= 0 && rb->totalBytes <= 0)
    1938         484 :         return;
    1939             : 
    1940       10954 :     elog(DEBUG2, "UpdateDecodingStats: updating stats %p %lld %lld %lld %lld %lld %lld %lld %lld",
    1941             :          rb,
    1942             :          (long long) rb->spillTxns,
    1943             :          (long long) rb->spillCount,
    1944             :          (long long) rb->spillBytes,
    1945             :          (long long) rb->streamTxns,
    1946             :          (long long) rb->streamCount,
    1947             :          (long long) rb->streamBytes,
    1948             :          (long long) rb->totalTxns,
    1949             :          (long long) rb->totalBytes);
    1950             : 
    1951       10954 :     repSlotStat.spill_txns = rb->spillTxns;
    1952       10954 :     repSlotStat.spill_count = rb->spillCount;
    1953       10954 :     repSlotStat.spill_bytes = rb->spillBytes;
    1954       10954 :     repSlotStat.stream_txns = rb->streamTxns;
    1955       10954 :     repSlotStat.stream_count = rb->streamCount;
    1956       10954 :     repSlotStat.stream_bytes = rb->streamBytes;
    1957       10954 :     repSlotStat.total_txns = rb->totalTxns;
    1958       10954 :     repSlotStat.total_bytes = rb->totalBytes;
    1959             : 
    1960       10954 :     pgstat_report_replslot(ctx->slot, &repSlotStat);
    1961             : 
    1962       10954 :     rb->spillTxns = 0;
    1963       10954 :     rb->spillCount = 0;
    1964       10954 :     rb->spillBytes = 0;
    1965       10954 :     rb->streamTxns = 0;
    1966       10954 :     rb->streamCount = 0;
    1967       10954 :     rb->streamBytes = 0;
    1968       10954 :     rb->totalTxns = 0;
    1969       10954 :     rb->totalBytes = 0;
    1970             : }
    1971             : 
    1972             : /*
    1973             :  * Read up to the end of WAL starting from the decoding slot's restart_lsn.
    1974             :  * Return true if any meaningful/decodable WAL records are encountered,
    1975             :  * otherwise false.
    1976             :  */
    1977             : bool
    1978          10 : LogicalReplicationSlotHasPendingWal(XLogRecPtr end_of_wal)
    1979             : {
    1980          10 :     bool        has_pending_wal = false;
    1981             : 
    1982             :     Assert(MyReplicationSlot);
    1983             : 
    1984          10 :     PG_TRY();
    1985             :     {
    1986             :         LogicalDecodingContext *ctx;
    1987             : 
    1988             :         /*
    1989             :          * Create our decoding context in fast_forward mode, passing start_lsn
    1990             :          * as InvalidXLogRecPtr, so that we start processing from the slot's
    1991             :          * confirmed_flush.
    1992             :          */
    1993          20 :         ctx = CreateDecodingContext(InvalidXLogRecPtr,
    1994             :                                     NIL,
    1995             :                                     true,   /* fast_forward */
    1996          10 :                                     XL_ROUTINE(.page_read = read_local_xlog_page,
    1997             :                                                .segment_open = wal_segment_open,
    1998             :                                                .segment_close = wal_segment_close),
    1999             :                                     NULL, NULL, NULL);
    2000             : 
    2001             :         /*
    2002             :          * Start reading at the slot's restart_lsn, which we know points to a
    2003             :          * valid record.
    2004             :          */
    2005          10 :         XLogBeginRead(ctx->reader, MyReplicationSlot->data.restart_lsn);
    2006             : 
    2007             :         /* Invalidate non-timetravel entries */
    2008          10 :         InvalidateSystemCaches();
    2009             : 
    2010             :         /* Loop until the end of WAL or some changes are processed */
    2011         300 :         while (!has_pending_wal && ctx->reader->EndRecPtr < end_of_wal)
    2012             :         {
    2013             :             XLogRecord *record;
    2014         290 :             char       *errm = NULL;
    2015             : 
    2016         290 :             record = XLogReadRecord(ctx->reader, &errm);
    2017             : 
    2018         290 :             if (errm)
    2019           0 :                 elog(ERROR, "could not find record for logical decoding: %s", errm);
    2020             : 
    2021         290 :             if (record != NULL)
    2022         290 :                 LogicalDecodingProcessRecord(ctx, ctx->reader);
    2023             : 
    2024         290 :             has_pending_wal = ctx->processing_required;
    2025             : 
    2026         290 :             CHECK_FOR_INTERRUPTS();
    2027             :         }
    2028             : 
    2029             :         /* Clean up */
    2030          10 :         FreeDecodingContext(ctx);
    2031          10 :         InvalidateSystemCaches();
    2032             :     }
    2033           0 :     PG_CATCH();
    2034             :     {
    2035             :         /* clear all timetravel entries */
    2036           0 :         InvalidateSystemCaches();
    2037             : 
    2038           0 :         PG_RE_THROW();
    2039             :     }
    2040          10 :     PG_END_TRY();
    2041             : 
    2042          10 :     return has_pending_wal;
    2043             : }
    2044             : 
    2045             : /*
    2046             :  * Helper function for advancing our logical replication slot forward.
    2047             :  *
    2048             :  * The slot's restart_lsn is used as start point for reading records, while
    2049             :  * confirmed_flush is used as base point for the decoding context.
    2050             :  *
    2051             :  * We cannot just do LogicalConfirmReceivedLocation to update confirmed_flush,
    2052             :  * because we need to digest WAL to advance restart_lsn allowing to recycle
    2053             :  * WAL and removal of old catalog tuples.  As decoding is done in fast_forward
    2054             :  * mode, no changes are generated anyway.
    2055             :  *
    2056             :  * *found_consistent_snapshot will be true if the initial decoding snapshot has
    2057             :  * been built; Otherwise, it will be false.
    2058             :  */
    2059             : XLogRecPtr
    2060          26 : LogicalSlotAdvanceAndCheckSnapState(XLogRecPtr moveto,
    2061             :                                     bool *found_consistent_snapshot)
    2062             : {
    2063             :     LogicalDecodingContext *ctx;
    2064          26 :     ResourceOwner old_resowner = CurrentResourceOwner;
    2065             :     XLogRecPtr  retlsn;
    2066             : 
    2067             :     Assert(moveto != InvalidXLogRecPtr);
    2068             : 
    2069          26 :     if (found_consistent_snapshot)
    2070           8 :         *found_consistent_snapshot = false;
    2071             : 
    2072          26 :     PG_TRY();
    2073             :     {
    2074             :         /*
    2075             :          * Create our decoding context in fast_forward mode, passing start_lsn
    2076             :          * as InvalidXLogRecPtr, so that we start processing from my slot's
    2077             :          * confirmed_flush.
    2078             :          */
    2079          52 :         ctx = CreateDecodingContext(InvalidXLogRecPtr,
    2080             :                                     NIL,
    2081             :                                     true,   /* fast_forward */
    2082          26 :                                     XL_ROUTINE(.page_read = read_local_xlog_page,
    2083             :                                                .segment_open = wal_segment_open,
    2084             :                                                .segment_close = wal_segment_close),
    2085             :                                     NULL, NULL, NULL);
    2086             : 
    2087             :         /*
    2088             :          * Wait for specified streaming replication standby servers (if any)
    2089             :          * to confirm receipt of WAL up to moveto lsn.
    2090             :          */
    2091          26 :         WaitForStandbyConfirmation(moveto);
    2092             : 
    2093             :         /*
    2094             :          * Start reading at the slot's restart_lsn, which we know to point to
    2095             :          * a valid record.
    2096             :          */
    2097          26 :         XLogBeginRead(ctx->reader, MyReplicationSlot->data.restart_lsn);
    2098             : 
    2099             :         /* invalidate non-timetravel entries */
    2100          26 :         InvalidateSystemCaches();
    2101             : 
    2102             :         /* Decode records until we reach the requested target */
    2103        3338 :         while (ctx->reader->EndRecPtr < moveto)
    2104             :         {
    2105        3312 :             char       *errm = NULL;
    2106             :             XLogRecord *record;
    2107             : 
    2108             :             /*
    2109             :              * Read records.  No changes are generated in fast_forward mode,
    2110             :              * but snapbuilder/slot statuses are updated properly.
    2111             :              */
    2112        3312 :             record = XLogReadRecord(ctx->reader, &errm);
    2113        3312 :             if (errm)
    2114           0 :                 elog(ERROR, "could not find record while advancing replication slot: %s",
    2115             :                      errm);
    2116             : 
    2117             :             /*
    2118             :              * Process the record.  Storage-level changes are ignored in
    2119             :              * fast_forward mode, but other modules (such as snapbuilder)
    2120             :              * might still have critical updates to do.
    2121             :              */
    2122        3312 :             if (record)
    2123        3312 :                 LogicalDecodingProcessRecord(ctx, ctx->reader);
    2124             : 
    2125        3312 :             CHECK_FOR_INTERRUPTS();
    2126             :         }
    2127             : 
    2128          26 :         if (found_consistent_snapshot && DecodingContextReady(ctx))
    2129           8 :             *found_consistent_snapshot = true;
    2130             : 
    2131             :         /*
    2132             :          * Logical decoding could have clobbered CurrentResourceOwner during
    2133             :          * transaction management, so restore the executor's value.  (This is
    2134             :          * a kluge, but it's not worth cleaning up right now.)
    2135             :          */
    2136          26 :         CurrentResourceOwner = old_resowner;
    2137             : 
    2138          26 :         if (ctx->reader->EndRecPtr != InvalidXLogRecPtr)
    2139             :         {
    2140          26 :             LogicalConfirmReceivedLocation(moveto);
    2141             : 
    2142             :             /*
    2143             :              * If only the confirmed_flush LSN has changed the slot won't get
    2144             :              * marked as dirty by the above. Callers on the walsender
    2145             :              * interface are expected to keep track of their own progress and
    2146             :              * don't need it written out. But SQL-interface users cannot
    2147             :              * specify their own start positions and it's harder for them to
    2148             :              * keep track of their progress, so we should make more of an
    2149             :              * effort to save it for them.
    2150             :              *
    2151             :              * Dirty the slot so it is written out at the next checkpoint. The
    2152             :              * LSN position advanced to may still be lost on a crash but this
    2153             :              * makes the data consistent after a clean shutdown.
    2154             :              */
    2155          26 :             ReplicationSlotMarkDirty();
    2156             :         }
    2157             : 
    2158          26 :         retlsn = MyReplicationSlot->data.confirmed_flush;
    2159             : 
    2160             :         /* free context, call shutdown callback */
    2161          26 :         FreeDecodingContext(ctx);
    2162             : 
    2163          26 :         InvalidateSystemCaches();
    2164             :     }
    2165           0 :     PG_CATCH();
    2166             :     {
    2167             :         /* clear all timetravel entries */
    2168           0 :         InvalidateSystemCaches();
    2169             : 
    2170           0 :         PG_RE_THROW();
    2171             :     }
    2172          26 :     PG_END_TRY();
    2173             : 
    2174          26 :     return retlsn;
    2175             : }

Generated by: LCOV version 1.14