LCOV - code coverage report
Current view: top level - src/backend/replication/logical - logical.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18beta1 Lines: 697 750 92.9 %
Date: 2025-06-28 00:18:14 Functions: 40 41 97.6 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  * logical.c
       3             :  *     PostgreSQL logical decoding coordination
       4             :  *
       5             :  * Copyright (c) 2012-2025, PostgreSQL Global Development Group
       6             :  *
       7             :  * IDENTIFICATION
       8             :  *    src/backend/replication/logical/logical.c
       9             :  *
      10             :  * NOTES
      11             :  *    This file coordinates interaction between the various modules that
      12             :  *    together provide logical decoding, primarily by providing so
      13             :  *    called LogicalDecodingContexts. The goal is to encapsulate most of the
      14             :  *    internal complexity for consumers of logical decoding, so they can
      15             :  *    create and consume a changestream with a low amount of code. Builtin
      16             :  *    consumers are the walsender and SQL SRF interface, but it's possible to
      17             :  *    add further ones without changing core code, e.g. to consume changes in
      18             :  *    a bgworker.
      19             :  *
      20             :  *    The idea is that a consumer provides three callbacks, one to read WAL,
      21             :  *    one to prepare a data write, and a final one for actually writing since
      22             :  *    their implementation depends on the type of consumer.  Check
      23             :  *    logicalfuncs.c for an example implementation of a fairly simple consumer
      24             :  *    and an implementation of a WAL reading callback that's suitable for
      25             :  *    simple consumers.
      26             :  *-------------------------------------------------------------------------
      27             :  */
      28             : 
      29             : #include "postgres.h"
      30             : 
      31             : #include "access/xact.h"
      32             : #include "access/xlog_internal.h"
      33             : #include "access/xlogutils.h"
      34             : #include "fmgr.h"
      35             : #include "miscadmin.h"
      36             : #include "pgstat.h"
      37             : #include "replication/decode.h"
      38             : #include "replication/logical.h"
      39             : #include "replication/reorderbuffer.h"
      40             : #include "replication/slotsync.h"
      41             : #include "replication/snapbuild.h"
      42             : #include "storage/proc.h"
      43             : #include "storage/procarray.h"
      44             : #include "utils/builtins.h"
      45             : #include "utils/injection_point.h"
      46             : #include "utils/inval.h"
      47             : #include "utils/memutils.h"
      48             : 
      49             : /* data for errcontext callback */
      50             : typedef struct LogicalErrorCallbackState
      51             : {
      52             :     LogicalDecodingContext *ctx;
      53             :     const char *callback_name;
      54             :     XLogRecPtr  report_location;
      55             : } LogicalErrorCallbackState;
      56             : 
      57             : /* wrappers around output plugin callbacks */
      58             : static void output_plugin_error_callback(void *arg);
      59             : static void startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
      60             :                                bool is_init);
      61             : static void shutdown_cb_wrapper(LogicalDecodingContext *ctx);
      62             : static void begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn);
      63             : static void commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      64             :                               XLogRecPtr commit_lsn);
      65             : static void begin_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn);
      66             : static void prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      67             :                                XLogRecPtr prepare_lsn);
      68             : static void commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      69             :                                        XLogRecPtr commit_lsn);
      70             : static void rollback_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      71             :                                          XLogRecPtr prepare_end_lsn, TimestampTz prepare_time);
      72             : static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      73             :                               Relation relation, ReorderBufferChange *change);
      74             : static void truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      75             :                                 int nrelations, Relation relations[], ReorderBufferChange *change);
      76             : static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      77             :                                XLogRecPtr message_lsn, bool transactional,
      78             :                                const char *prefix, Size message_size, const char *message);
      79             : 
      80             : /* streaming callbacks */
      81             : static void stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      82             :                                     XLogRecPtr first_lsn);
      83             : static void stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      84             :                                    XLogRecPtr last_lsn);
      85             : static void stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      86             :                                     XLogRecPtr abort_lsn);
      87             : static void stream_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      88             :                                       XLogRecPtr prepare_lsn);
      89             : static void stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      90             :                                      XLogRecPtr commit_lsn);
      91             : static void stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      92             :                                      Relation relation, ReorderBufferChange *change);
      93             : static void stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      94             :                                       XLogRecPtr message_lsn, bool transactional,
      95             :                                       const char *prefix, Size message_size, const char *message);
      96             : static void stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      97             :                                        int nrelations, Relation relations[], ReorderBufferChange *change);
      98             : 
      99             : /* callback to update txn's progress */
     100             : static void update_progress_txn_cb_wrapper(ReorderBuffer *cache,
     101             :                                            ReorderBufferTXN *txn,
     102             :                                            XLogRecPtr lsn);
     103             : 
     104             : static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, const char *plugin);
     105             : 
     106             : /*
     107             :  * Make sure the current settings & environment are capable of doing logical
     108             :  * decoding.
     109             :  */
     110             : void
     111        3012 : CheckLogicalDecodingRequirements(void)
     112             : {
     113        3012 :     CheckSlotRequirements();
     114             : 
     115             :     /*
     116             :      * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
     117             :      * needs the same check.
     118             :      */
     119             : 
     120        3012 :     if (wal_level < WAL_LEVEL_LOGICAL)
     121           0 :         ereport(ERROR,
     122             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     123             :                  errmsg("logical decoding requires \"wal_level\" >= \"logical\"")));
     124             : 
     125        3012 :     if (MyDatabaseId == InvalidOid)
     126           2 :         ereport(ERROR,
     127             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     128             :                  errmsg("logical decoding requires a database connection")));
     129             : 
     130        3010 :     if (RecoveryInProgress())
     131             :     {
     132             :         /*
     133             :          * This check may have race conditions, but whenever
     134             :          * XLOG_PARAMETER_CHANGE indicates that wal_level has changed, we
     135             :          * verify that there are no existing logical replication slots. And to
     136             :          * avoid races around creating a new slot,
     137             :          * CheckLogicalDecodingRequirements() is called once before creating
     138             :          * the slot, and once when logical decoding is initially starting up.
     139             :          */
     140         138 :         if (GetActiveWalLevelOnStandby() < WAL_LEVEL_LOGICAL)
     141           2 :             ereport(ERROR,
     142             :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     143             :                      errmsg("logical decoding on standby requires \"wal_level\" >= \"logical\" on the primary")));
     144             :     }
     145        3008 : }
     146             : 
     147             : /*
     148             :  * Helper function for CreateInitDecodingContext() and
     149             :  * CreateDecodingContext() performing common tasks.
     150             :  */
     151             : static LogicalDecodingContext *
     152        2108 : StartupDecodingContext(List *output_plugin_options,
     153             :                        XLogRecPtr start_lsn,
     154             :                        TransactionId xmin_horizon,
     155             :                        bool need_full_snapshot,
     156             :                        bool fast_forward,
     157             :                        bool in_create,
     158             :                        XLogReaderRoutine *xl_routine,
     159             :                        LogicalOutputPluginWriterPrepareWrite prepare_write,
     160             :                        LogicalOutputPluginWriterWrite do_write,
     161             :                        LogicalOutputPluginWriterUpdateProgress update_progress)
     162             : {
     163             :     ReplicationSlot *slot;
     164             :     MemoryContext context,
     165             :                 old_context;
     166             :     LogicalDecodingContext *ctx;
     167             : 
     168             :     /* shorter lines... */
     169        2108 :     slot = MyReplicationSlot;
     170             : 
     171        2108 :     context = AllocSetContextCreate(CurrentMemoryContext,
     172             :                                     "Logical decoding context",
     173             :                                     ALLOCSET_DEFAULT_SIZES);
     174        2108 :     old_context = MemoryContextSwitchTo(context);
     175        2108 :     ctx = palloc0(sizeof(LogicalDecodingContext));
     176             : 
     177        2108 :     ctx->context = context;
     178             : 
     179             :     /*
     180             :      * (re-)load output plugins, so we detect a bad (removed) output plugin
     181             :      * now.
     182             :      */
     183        2108 :     if (!fast_forward)
     184        2066 :         LoadOutputPlugin(&ctx->callbacks, NameStr(slot->data.plugin));
     185             : 
     186             :     /*
     187             :      * Now that the slot's xmin has been set, we can announce ourselves as a
     188             :      * logical decoding backend which doesn't need to be checked individually
     189             :      * when computing the xmin horizon because the xmin is enforced via
     190             :      * replication slots.
     191             :      *
     192             :      * We can only do so if we're outside of a transaction (i.e. the case when
     193             :      * streaming changes via walsender), otherwise an already setup
     194             :      * snapshot/xid would end up being ignored. That's not a particularly
     195             :      * bothersome restriction since the SQL interface can't be used for
     196             :      * streaming anyway.
     197             :      */
     198        2106 :     if (!IsTransactionOrTransactionBlock())
     199             :     {
     200        1024 :         LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
     201        1024 :         MyProc->statusFlags |= PROC_IN_LOGICAL_DECODING;
     202        1024 :         ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags;
     203        1024 :         LWLockRelease(ProcArrayLock);
     204             :     }
     205             : 
     206        2106 :     ctx->slot = slot;
     207             : 
     208        2106 :     ctx->reader = XLogReaderAllocate(wal_segment_size, NULL, xl_routine, ctx);
     209        2106 :     if (!ctx->reader)
     210           0 :         ereport(ERROR,
     211             :                 (errcode(ERRCODE_OUT_OF_MEMORY),
     212             :                  errmsg("out of memory"),
     213             :                  errdetail("Failed while allocating a WAL reading processor.")));
     214             : 
     215        2106 :     ctx->reorder = ReorderBufferAllocate();
     216        2106 :     ctx->snapshot_builder =
     217        2106 :         AllocateSnapshotBuilder(ctx->reorder, xmin_horizon, start_lsn,
     218             :                                 need_full_snapshot, in_create, slot->data.two_phase_at);
     219             : 
     220        2106 :     ctx->reorder->private_data = ctx;
     221             : 
     222             :     /* wrap output plugin callbacks, so we can add error context information */
     223        2106 :     ctx->reorder->begin = begin_cb_wrapper;
     224        2106 :     ctx->reorder->apply_change = change_cb_wrapper;
     225        2106 :     ctx->reorder->apply_truncate = truncate_cb_wrapper;
     226        2106 :     ctx->reorder->commit = commit_cb_wrapper;
     227        2106 :     ctx->reorder->message = message_cb_wrapper;
     228             : 
     229             :     /*
     230             :      * To support streaming, we require start/stop/abort/commit/change
     231             :      * callbacks. The message and truncate callbacks are optional, similar to
     232             :      * regular output plugins. We however enable streaming when at least one
     233             :      * of the methods is enabled so that we can easily identify missing
     234             :      * methods.
     235             :      *
     236             :      * We decide it here, but only check it later in the wrappers.
     237             :      */
     238        4254 :     ctx->streaming = (ctx->callbacks.stream_start_cb != NULL) ||
     239          42 :         (ctx->callbacks.stream_stop_cb != NULL) ||
     240          42 :         (ctx->callbacks.stream_abort_cb != NULL) ||
     241          42 :         (ctx->callbacks.stream_commit_cb != NULL) ||
     242          42 :         (ctx->callbacks.stream_change_cb != NULL) ||
     243        2190 :         (ctx->callbacks.stream_message_cb != NULL) ||
     244          42 :         (ctx->callbacks.stream_truncate_cb != NULL);
     245             : 
     246             :     /*
     247             :      * streaming callbacks
     248             :      *
     249             :      * stream_message and stream_truncate callbacks are optional, so we do not
     250             :      * fail with ERROR when missing, but the wrappers simply do nothing. We
     251             :      * must set the ReorderBuffer callbacks to something, otherwise the calls
     252             :      * from there will crash (we don't want to move the checks there).
     253             :      */
     254        2106 :     ctx->reorder->stream_start = stream_start_cb_wrapper;
     255        2106 :     ctx->reorder->stream_stop = stream_stop_cb_wrapper;
     256        2106 :     ctx->reorder->stream_abort = stream_abort_cb_wrapper;
     257        2106 :     ctx->reorder->stream_prepare = stream_prepare_cb_wrapper;
     258        2106 :     ctx->reorder->stream_commit = stream_commit_cb_wrapper;
     259        2106 :     ctx->reorder->stream_change = stream_change_cb_wrapper;
     260        2106 :     ctx->reorder->stream_message = stream_message_cb_wrapper;
     261        2106 :     ctx->reorder->stream_truncate = stream_truncate_cb_wrapper;
     262             : 
     263             : 
     264             :     /*
     265             :      * To support two-phase logical decoding, we require
     266             :      * begin_prepare/prepare/commit-prepare/abort-prepare callbacks. The
     267             :      * filter_prepare callback is optional. We however enable two-phase
     268             :      * logical decoding when at least one of the methods is enabled so that we
     269             :      * can easily identify missing methods.
     270             :      *
     271             :      * We decide it here, but only check it later in the wrappers.
     272             :      */
     273        4254 :     ctx->twophase = (ctx->callbacks.begin_prepare_cb != NULL) ||
     274          42 :         (ctx->callbacks.prepare_cb != NULL) ||
     275          42 :         (ctx->callbacks.commit_prepared_cb != NULL) ||
     276          42 :         (ctx->callbacks.rollback_prepared_cb != NULL) ||
     277        2190 :         (ctx->callbacks.stream_prepare_cb != NULL) ||
     278          42 :         (ctx->callbacks.filter_prepare_cb != NULL);
     279             : 
     280             :     /*
     281             :      * Callback to support decoding at prepare time.
     282             :      */
     283        2106 :     ctx->reorder->begin_prepare = begin_prepare_cb_wrapper;
     284        2106 :     ctx->reorder->prepare = prepare_cb_wrapper;
     285        2106 :     ctx->reorder->commit_prepared = commit_prepared_cb_wrapper;
     286        2106 :     ctx->reorder->rollback_prepared = rollback_prepared_cb_wrapper;
     287             : 
     288             :     /*
     289             :      * Callback to support updating progress during sending data of a
     290             :      * transaction (and its subtransactions) to the output plugin.
     291             :      */
     292        2106 :     ctx->reorder->update_progress_txn = update_progress_txn_cb_wrapper;
     293             : 
     294        2106 :     ctx->out = makeStringInfo();
     295        2106 :     ctx->prepare_write = prepare_write;
     296        2106 :     ctx->write = do_write;
     297        2106 :     ctx->update_progress = update_progress;
     298             : 
     299        2106 :     ctx->output_plugin_options = output_plugin_options;
     300             : 
     301        2106 :     ctx->fast_forward = fast_forward;
     302             : 
     303        2106 :     MemoryContextSwitchTo(old_context);
     304             : 
     305        2106 :     return ctx;
     306             : }
     307             : 
     308             : /*
     309             :  * Create a new decoding context, for a new logical slot.
     310             :  *
     311             :  * plugin -- contains the name of the output plugin
     312             :  * output_plugin_options -- contains options passed to the output plugin
     313             :  * need_full_snapshot -- if true, must obtain a snapshot able to read all
     314             :  *      tables; if false, one that can read only catalogs is acceptable.
     315             :  * restart_lsn -- if given as invalid, it's this routine's responsibility to
     316             :  *      mark WAL as reserved by setting a convenient restart_lsn for the slot.
     317             :  *      Otherwise, we set for decoding to start from the given LSN without
     318             :  *      marking WAL reserved beforehand.  In that scenario, it's up to the
     319             :  *      caller to guarantee that WAL remains available.
     320             :  * xl_routine -- XLogReaderRoutine for underlying XLogReader
     321             :  * prepare_write, do_write, update_progress --
     322             :  *      callbacks that perform the use-case dependent, actual, work.
     323             :  *
     324             :  * Needs to be called while in a memory context that's at least as long lived
     325             :  * as the decoding context because further memory contexts will be created
     326             :  * inside it.
     327             :  *
     328             :  * Returns an initialized decoding context after calling the output plugin's
     329             :  * startup function.
     330             :  */
     331             : LogicalDecodingContext *
     332         902 : CreateInitDecodingContext(const char *plugin,
     333             :                           List *output_plugin_options,
     334             :                           bool need_full_snapshot,
     335             :                           XLogRecPtr restart_lsn,
     336             :                           XLogReaderRoutine *xl_routine,
     337             :                           LogicalOutputPluginWriterPrepareWrite prepare_write,
     338             :                           LogicalOutputPluginWriterWrite do_write,
     339             :                           LogicalOutputPluginWriterUpdateProgress update_progress)
     340             : {
     341         902 :     TransactionId xmin_horizon = InvalidTransactionId;
     342             :     ReplicationSlot *slot;
     343             :     NameData    plugin_name;
     344             :     LogicalDecodingContext *ctx;
     345             :     MemoryContext old_context;
     346             : 
     347             :     /*
     348             :      * On a standby, this check is also required while creating the slot.
     349             :      * Check the comments in the function.
     350             :      */
     351         902 :     CheckLogicalDecodingRequirements();
     352             : 
     353             :     /* shorter lines... */
     354         902 :     slot = MyReplicationSlot;
     355             : 
     356             :     /* first some sanity checks that are unlikely to be violated */
     357         902 :     if (slot == NULL)
     358           0 :         elog(ERROR, "cannot perform logical decoding without an acquired slot");
     359             : 
     360         902 :     if (plugin == NULL)
     361           0 :         elog(ERROR, "cannot initialize logical decoding without a specified plugin");
     362             : 
     363             :     /* Make sure the passed slot is suitable. These are user facing errors. */
     364         902 :     if (SlotIsPhysical(slot))
     365           0 :         ereport(ERROR,
     366             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     367             :                  errmsg("cannot use physical replication slot for logical decoding")));
     368             : 
     369         902 :     if (slot->data.database != MyDatabaseId)
     370           0 :         ereport(ERROR,
     371             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     372             :                  errmsg("replication slot \"%s\" was not created in this database",
     373             :                         NameStr(slot->data.name))));
     374             : 
     375        1534 :     if (IsTransactionState() &&
     376         632 :         GetTopTransactionIdIfAny() != InvalidTransactionId)
     377           4 :         ereport(ERROR,
     378             :                 (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
     379             :                  errmsg("cannot create logical replication slot in transaction that has performed writes")));
     380             : 
     381             :     /*
     382             :      * Register output plugin name with slot.  We need the mutex to avoid
     383             :      * concurrent reading of a partially copied string.  But we don't want any
     384             :      * complicated code while holding a spinlock, so do namestrcpy() outside.
     385             :      */
     386         898 :     namestrcpy(&plugin_name, plugin);
     387         898 :     SpinLockAcquire(&slot->mutex);
     388         898 :     slot->data.plugin = plugin_name;
     389         898 :     SpinLockRelease(&slot->mutex);
     390             : 
     391         898 :     if (XLogRecPtrIsInvalid(restart_lsn))
     392         886 :         ReplicationSlotReserveWal();
     393             :     else
     394             :     {
     395          12 :         SpinLockAcquire(&slot->mutex);
     396          12 :         slot->data.restart_lsn = restart_lsn;
     397          12 :         SpinLockRelease(&slot->mutex);
     398             :     }
     399             : 
     400             :     /* ----
     401             :      * This is a bit tricky: We need to determine a safe xmin horizon to start
     402             :      * decoding from, to avoid starting from a running xacts record referring
     403             :      * to xids whose rows have been vacuumed or pruned
     404             :      * already. GetOldestSafeDecodingTransactionId() returns such a value, but
     405             :      * without further interlock its return value might immediately be out of
     406             :      * date.
     407             :      *
     408             :      * So we have to acquire the ProcArrayLock to prevent computation of new
     409             :      * xmin horizons by other backends, get the safe decoding xid, and inform
     410             :      * the slot machinery about the new limit. Once that's done the
     411             :      * ProcArrayLock can be released as the slot machinery now is
     412             :      * protecting against vacuum.
     413             :      *
     414             :      * Note that, temporarily, the data, not just the catalog, xmin has to be
     415             :      * reserved if a data snapshot is to be exported.  Otherwise the initial
     416             :      * data snapshot created here is not guaranteed to be valid. After that
     417             :      * the data xmin doesn't need to be managed anymore and the global xmin
     418             :      * should be recomputed. As we are fine with losing the pegged data xmin
     419             :      * after crash - no chance a snapshot would get exported anymore - we can
     420             :      * get away with just setting the slot's
     421             :      * effective_xmin. ReplicationSlotRelease will reset it again.
     422             :      *
     423             :      * ----
     424             :      */
     425         898 :     LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
     426             : 
     427         898 :     xmin_horizon = GetOldestSafeDecodingTransactionId(!need_full_snapshot);
     428             : 
     429         898 :     SpinLockAcquire(&slot->mutex);
     430         898 :     slot->effective_catalog_xmin = xmin_horizon;
     431         898 :     slot->data.catalog_xmin = xmin_horizon;
     432         898 :     if (need_full_snapshot)
     433         388 :         slot->effective_xmin = xmin_horizon;
     434         898 :     SpinLockRelease(&slot->mutex);
     435             : 
     436         898 :     ReplicationSlotsComputeRequiredXmin(true);
     437             : 
     438         898 :     LWLockRelease(ProcArrayLock);
     439             : 
     440         898 :     ReplicationSlotMarkDirty();
     441         898 :     ReplicationSlotSave();
     442             : 
     443         898 :     ctx = StartupDecodingContext(NIL, restart_lsn, xmin_horizon,
     444             :                                  need_full_snapshot, false, true,
     445             :                                  xl_routine, prepare_write, do_write,
     446             :                                  update_progress);
     447             : 
     448             :     /* call output plugin initialization callback */
     449         896 :     old_context = MemoryContextSwitchTo(ctx->context);
     450         896 :     if (ctx->callbacks.startup_cb != NULL)
     451         896 :         startup_cb_wrapper(ctx, &ctx->options, true);
     452         896 :     MemoryContextSwitchTo(old_context);
     453             : 
     454             :     /*
     455             :      * We allow decoding of prepared transactions when the two_phase is
     456             :      * enabled at the time of slot creation, or when the two_phase option is
     457             :      * given at the streaming start, provided the plugin supports all the
     458             :      * callbacks for two-phase.
     459             :      */
     460         896 :     ctx->twophase &= slot->data.two_phase;
     461             : 
     462         896 :     ctx->reorder->output_rewrites = ctx->options.receive_rewrites;
     463             : 
     464         896 :     return ctx;
     465             : }
     466             : 
     467             : /*
     468             :  * Create a new decoding context, for a logical slot that has previously been
     469             :  * used already.
     470             :  *
     471             :  * start_lsn
     472             :  *      The LSN at which to start decoding.  If InvalidXLogRecPtr, restart
     473             :  *      from the slot's confirmed_flush; otherwise, start from the specified
     474             :  *      location (but move it forwards to confirmed_flush if it's older than
     475             :  *      that, see below).
     476             :  *
     477             :  * output_plugin_options
     478             :  *      options passed to the output plugin.
     479             :  *
     480             :  * fast_forward
     481             :  *      bypass the generation of logical changes.
     482             :  *
     483             :  * xl_routine
     484             :  *      XLogReaderRoutine used by underlying xlogreader
     485             :  *
     486             :  * prepare_write, do_write, update_progress
     487             :  *      callbacks that have to be filled to perform the use-case dependent,
     488             :  *      actual work.
     489             :  *
     490             :  * Needs to be called while in a memory context that's at least as long lived
     491             :  * as the decoding context because further memory contexts will be created
     492             :  * inside it.
     493             :  *
     494             :  * Returns an initialized decoding context after calling the output plugin's
     495             :  * startup function.
     496             :  */
     497             : LogicalDecodingContext *
     498        1220 : CreateDecodingContext(XLogRecPtr start_lsn,
     499             :                       List *output_plugin_options,
     500             :                       bool fast_forward,
     501             :                       XLogReaderRoutine *xl_routine,
     502             :                       LogicalOutputPluginWriterPrepareWrite prepare_write,
     503             :                       LogicalOutputPluginWriterWrite do_write,
     504             :                       LogicalOutputPluginWriterUpdateProgress update_progress)
     505             : {
     506             :     LogicalDecodingContext *ctx;
     507             :     ReplicationSlot *slot;
     508             :     MemoryContext old_context;
     509             : 
     510             :     /* shorter lines... */
     511        1220 :     slot = MyReplicationSlot;
     512             : 
     513             :     /* first some sanity checks that are unlikely to be violated */
     514        1220 :     if (slot == NULL)
     515           0 :         elog(ERROR, "cannot perform logical decoding without an acquired slot");
     516             : 
     517             :     /* make sure the passed slot is suitable, these are user facing errors */
     518        1220 :     if (SlotIsPhysical(slot))
     519           2 :         ereport(ERROR,
     520             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     521             :                  errmsg("cannot use physical replication slot for logical decoding")));
     522             : 
     523             :     /*
     524             :      * We need to access the system tables during decoding to build the
     525             :      * logical changes unless we are in fast_forward mode where no changes are
     526             :      * generated.
     527             :      */
     528        1218 :     if (slot->data.database != MyDatabaseId && !fast_forward)
     529           6 :         ereport(ERROR,
     530             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     531             :                  errmsg("replication slot \"%s\" was not created in this database",
     532             :                         NameStr(slot->data.name))));
     533             : 
     534             :     /*
     535             :      * The slots being synced from the primary can't be used for decoding as
     536             :      * they are used after failover. However, we do allow advancing the LSNs
     537             :      * during the synchronization of slots. See update_local_synced_slot.
     538             :      */
     539        1212 :     if (RecoveryInProgress() && slot->data.synced && !IsSyncingReplicationSlots())
     540           2 :         ereport(ERROR,
     541             :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     542             :                 errmsg("cannot use replication slot \"%s\" for logical decoding",
     543             :                        NameStr(slot->data.name)),
     544             :                 errdetail("This replication slot is being synchronized from the primary server."),
     545             :                 errhint("Specify another replication slot."));
     546             : 
     547             :     /* slot must be valid to allow decoding */
     548             :     Assert(slot->data.invalidated == RS_INVAL_NONE);
     549             :     Assert(slot->data.restart_lsn != InvalidXLogRecPtr);
     550             : 
     551        1210 :     if (start_lsn == InvalidXLogRecPtr)
     552             :     {
     553             :         /* continue from last position */
     554         726 :         start_lsn = slot->data.confirmed_flush;
     555             :     }
     556         484 :     else if (start_lsn < slot->data.confirmed_flush)
     557             :     {
     558             :         /*
     559             :          * It might seem like we should error out in this case, but it's
     560             :          * pretty common for a client to acknowledge a LSN it doesn't have to
     561             :          * do anything for, and thus didn't store persistently, because the
     562             :          * xlog records didn't result in anything relevant for logical
     563             :          * decoding. Clients have to be able to do that to support synchronous
     564             :          * replication.
     565             :          *
     566             :          * Starting at a different LSN than requested might not catch certain
     567             :          * kinds of client errors; so the client may wish to check that
     568             :          * confirmed_flush_lsn matches its expectations.
     569             :          */
     570          44 :         elog(LOG, "%X/%X has been already streamed, forwarding to %X/%X",
     571             :              LSN_FORMAT_ARGS(start_lsn),
     572             :              LSN_FORMAT_ARGS(slot->data.confirmed_flush));
     573             : 
     574          44 :         start_lsn = slot->data.confirmed_flush;
     575             :     }
     576             : 
     577        1210 :     ctx = StartupDecodingContext(output_plugin_options,
     578             :                                  start_lsn, InvalidTransactionId, false,
     579             :                                  fast_forward, false, xl_routine, prepare_write,
     580             :                                  do_write, update_progress);
     581             : 
     582             :     /* call output plugin initialization callback */
     583        1210 :     old_context = MemoryContextSwitchTo(ctx->context);
     584        1210 :     if (ctx->callbacks.startup_cb != NULL)
     585        1168 :         startup_cb_wrapper(ctx, &ctx->options, false);
     586        1204 :     MemoryContextSwitchTo(old_context);
     587             : 
     588             :     /*
     589             :      * We allow decoding of prepared transactions when the two_phase is
     590             :      * enabled at the time of slot creation, or when the two_phase option is
     591             :      * given at the streaming start, provided the plugin supports all the
     592             :      * callbacks for two-phase.
     593             :      */
     594        1204 :     ctx->twophase &= (slot->data.two_phase || ctx->twophase_opt_given);
     595             : 
     596             :     /* Mark slot to allow two_phase decoding if not already marked */
     597        1204 :     if (ctx->twophase && !slot->data.two_phase)
     598             :     {
     599          16 :         SpinLockAcquire(&slot->mutex);
     600          16 :         slot->data.two_phase = true;
     601          16 :         slot->data.two_phase_at = start_lsn;
     602          16 :         SpinLockRelease(&slot->mutex);
     603          16 :         ReplicationSlotMarkDirty();
     604          16 :         ReplicationSlotSave();
     605          16 :         SnapBuildSetTwoPhaseAt(ctx->snapshot_builder, start_lsn);
     606             :     }
     607             : 
     608        1204 :     ctx->reorder->output_rewrites = ctx->options.receive_rewrites;
     609             : 
     610        1204 :     ereport(LOG,
     611             :             (errmsg("starting logical decoding for slot \"%s\"",
     612             :                     NameStr(slot->data.name)),
     613             :              errdetail("Streaming transactions committing after %X/%X, reading WAL from %X/%X.",
     614             :                        LSN_FORMAT_ARGS(slot->data.confirmed_flush),
     615             :                        LSN_FORMAT_ARGS(slot->data.restart_lsn))));
     616             : 
     617        1204 :     return ctx;
     618             : }
     619             : 
     620             : /*
     621             :  * Returns true if a consistent initial decoding snapshot has been built.
     622             :  */
     623             : bool
     624        6974 : DecodingContextReady(LogicalDecodingContext *ctx)
     625             : {
     626        6974 :     return SnapBuildCurrentState(ctx->snapshot_builder) == SNAPBUILD_CONSISTENT;
     627             : }
     628             : 
     629             : /*
     630             :  * Read from the decoding slot, until it is ready to start extracting changes.
     631             :  */
     632             : void
     633         884 : DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
     634             : {
     635         884 :     ReplicationSlot *slot = ctx->slot;
     636             : 
     637             :     /* Initialize from where to start reading WAL. */
     638         884 :     XLogBeginRead(ctx->reader, slot->data.restart_lsn);
     639             : 
     640         884 :     elog(DEBUG1, "searching for logical decoding starting point, starting at %X/%X",
     641             :          LSN_FORMAT_ARGS(slot->data.restart_lsn));
     642             : 
     643             :     /* Wait for a consistent starting point */
     644             :     for (;;)
     645        6084 :     {
     646             :         XLogRecord *record;
     647        6968 :         char       *err = NULL;
     648             : 
     649             :         /* the read_page callback waits for new WAL */
     650        6968 :         record = XLogReadRecord(ctx->reader, &err);
     651        6968 :         if (err)
     652           0 :             elog(ERROR, "could not find logical decoding starting point: %s", err);
     653        6968 :         if (!record)
     654           0 :             elog(ERROR, "could not find logical decoding starting point");
     655             : 
     656        6968 :         LogicalDecodingProcessRecord(ctx, ctx->reader);
     657             : 
     658             :         /* only continue till we found a consistent spot */
     659        6964 :         if (DecodingContextReady(ctx))
     660         880 :             break;
     661             : 
     662        6084 :         CHECK_FOR_INTERRUPTS();
     663             :     }
     664             : 
     665         880 :     SpinLockAcquire(&slot->mutex);
     666         880 :     slot->data.confirmed_flush = ctx->reader->EndRecPtr;
     667         880 :     if (slot->data.two_phase)
     668          18 :         slot->data.two_phase_at = ctx->reader->EndRecPtr;
     669         880 :     SpinLockRelease(&slot->mutex);
     670         880 : }
     671             : 
     672             : /*
     673             :  * Free a previously allocated decoding context, invoking the shutdown
     674             :  * callback if necessary.
     675             :  */
     676             : void
     677        1718 : FreeDecodingContext(LogicalDecodingContext *ctx)
     678             : {
     679        1718 :     if (ctx->callbacks.shutdown_cb != NULL)
     680        1676 :         shutdown_cb_wrapper(ctx);
     681             : 
     682        1718 :     ReorderBufferFree(ctx->reorder);
     683        1718 :     FreeSnapshotBuilder(ctx->snapshot_builder);
     684        1718 :     XLogReaderFree(ctx->reader);
     685        1718 :     MemoryContextDelete(ctx->context);
     686        1718 : }
     687             : 
     688             : /*
     689             :  * Prepare a write using the context's output routine.
     690             :  */
     691             : void
     692      672062 : OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
     693             : {
     694      672062 :     if (!ctx->accept_writes)
     695           0 :         elog(ERROR, "writes are only accepted in commit, begin and change callbacks");
     696             : 
     697      672062 :     ctx->prepare_write(ctx, ctx->write_location, ctx->write_xid, last_write);
     698      672062 :     ctx->prepared_write = true;
     699      672062 : }
     700             : 
     701             : /*
     702             :  * Perform a write using the context's output routine.
     703             :  */
     704             : void
     705      672062 : OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
     706             : {
     707      672062 :     if (!ctx->prepared_write)
     708           0 :         elog(ERROR, "OutputPluginPrepareWrite needs to be called before OutputPluginWrite");
     709             : 
     710      672062 :     ctx->write(ctx, ctx->write_location, ctx->write_xid, last_write);
     711      672040 :     ctx->prepared_write = false;
     712      672040 : }
     713             : 
     714             : /*
     715             :  * Update progress tracking (if supported).
     716             :  */
     717             : void
     718        7962 : OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx,
     719             :                            bool skipped_xact)
     720             : {
     721        7962 :     if (!ctx->update_progress)
     722        3180 :         return;
     723             : 
     724        4782 :     ctx->update_progress(ctx, ctx->write_location, ctx->write_xid,
     725             :                          skipped_xact);
     726             : }
     727             : 
     728             : /*
     729             :  * Load the output plugin, lookup its output plugin init function, and check
     730             :  * that it provides the required callbacks.
     731             :  */
     732             : static void
     733        2066 : LoadOutputPlugin(OutputPluginCallbacks *callbacks, const char *plugin)
     734             : {
     735             :     LogicalOutputPluginInit plugin_init;
     736             : 
     737        2064 :     plugin_init = (LogicalOutputPluginInit)
     738        2066 :         load_external_function(plugin, "_PG_output_plugin_init", false, NULL);
     739             : 
     740        2064 :     if (plugin_init == NULL)
     741           0 :         elog(ERROR, "output plugins have to declare the _PG_output_plugin_init symbol");
     742             : 
     743             :     /* ask the output plugin to fill the callback struct */
     744        2064 :     plugin_init(callbacks);
     745             : 
     746        2064 :     if (callbacks->begin_cb == NULL)
     747           0 :         elog(ERROR, "output plugins have to register a begin callback");
     748        2064 :     if (callbacks->change_cb == NULL)
     749           0 :         elog(ERROR, "output plugins have to register a change callback");
     750        2064 :     if (callbacks->commit_cb == NULL)
     751           0 :         elog(ERROR, "output plugins have to register a commit callback");
     752        2064 : }
     753             : 
     754             : static void
     755          36 : output_plugin_error_callback(void *arg)
     756             : {
     757          36 :     LogicalErrorCallbackState *state = (LogicalErrorCallbackState *) arg;
     758             : 
     759             :     /* not all callbacks have an associated LSN  */
     760          36 :     if (state->report_location != InvalidXLogRecPtr)
     761          30 :         errcontext("slot \"%s\", output plugin \"%s\", in the %s callback, associated LSN %X/%X",
     762          30 :                    NameStr(state->ctx->slot->data.name),
     763          30 :                    NameStr(state->ctx->slot->data.plugin),
     764             :                    state->callback_name,
     765          30 :                    LSN_FORMAT_ARGS(state->report_location));
     766             :     else
     767           6 :         errcontext("slot \"%s\", output plugin \"%s\", in the %s callback",
     768           6 :                    NameStr(state->ctx->slot->data.name),
     769           6 :                    NameStr(state->ctx->slot->data.plugin),
     770             :                    state->callback_name);
     771          36 : }
     772             : 
     773             : static void
     774        2064 : startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
     775             : {
     776             :     LogicalErrorCallbackState state;
     777             :     ErrorContextCallback errcallback;
     778             : 
     779             :     Assert(!ctx->fast_forward);
     780             : 
     781             :     /* Push callback + info on the error context stack */
     782        2064 :     state.ctx = ctx;
     783        2064 :     state.callback_name = "startup";
     784        2064 :     state.report_location = InvalidXLogRecPtr;
     785        2064 :     errcallback.callback = output_plugin_error_callback;
     786        2064 :     errcallback.arg = &state;
     787        2064 :     errcallback.previous = error_context_stack;
     788        2064 :     error_context_stack = &errcallback;
     789             : 
     790             :     /* set output state */
     791        2064 :     ctx->accept_writes = false;
     792        2064 :     ctx->end_xact = false;
     793             : 
     794             :     /* do the actual work: call callback */
     795        2064 :     ctx->callbacks.startup_cb(ctx, opt, is_init);
     796             : 
     797             :     /* Pop the error context stack */
     798        2058 :     error_context_stack = errcallback.previous;
     799        2058 : }
     800             : 
     801             : static void
     802        1676 : shutdown_cb_wrapper(LogicalDecodingContext *ctx)
     803             : {
     804             :     LogicalErrorCallbackState state;
     805             :     ErrorContextCallback errcallback;
     806             : 
     807             :     Assert(!ctx->fast_forward);
     808             : 
     809             :     /* Push callback + info on the error context stack */
     810        1676 :     state.ctx = ctx;
     811        1676 :     state.callback_name = "shutdown";
     812        1676 :     state.report_location = InvalidXLogRecPtr;
     813        1676 :     errcallback.callback = output_plugin_error_callback;
     814        1676 :     errcallback.arg = &state;
     815        1676 :     errcallback.previous = error_context_stack;
     816        1676 :     error_context_stack = &errcallback;
     817             : 
     818             :     /* set output state */
     819        1676 :     ctx->accept_writes = false;
     820        1676 :     ctx->end_xact = false;
     821             : 
     822             :     /* do the actual work: call callback */
     823        1676 :     ctx->callbacks.shutdown_cb(ctx);
     824             : 
     825             :     /* Pop the error context stack */
     826        1676 :     error_context_stack = errcallback.previous;
     827        1676 : }
     828             : 
     829             : 
     830             : /*
     831             :  * Callbacks for ReorderBuffer which add in some more information and then call
     832             :  * output_plugin.h plugins.
     833             :  */
     834             : static void
     835        2404 : begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
     836             : {
     837        2404 :     LogicalDecodingContext *ctx = cache->private_data;
     838             :     LogicalErrorCallbackState state;
     839             :     ErrorContextCallback errcallback;
     840             : 
     841             :     Assert(!ctx->fast_forward);
     842             : 
     843             :     /* Push callback + info on the error context stack */
     844        2404 :     state.ctx = ctx;
     845        2404 :     state.callback_name = "begin";
     846        2404 :     state.report_location = txn->first_lsn;
     847        2404 :     errcallback.callback = output_plugin_error_callback;
     848        2404 :     errcallback.arg = &state;
     849        2404 :     errcallback.previous = error_context_stack;
     850        2404 :     error_context_stack = &errcallback;
     851             : 
     852             :     /* set output state */
     853        2404 :     ctx->accept_writes = true;
     854        2404 :     ctx->write_xid = txn->xid;
     855        2404 :     ctx->write_location = txn->first_lsn;
     856        2404 :     ctx->end_xact = false;
     857             : 
     858             :     /* do the actual work: call callback */
     859        2404 :     ctx->callbacks.begin_cb(ctx, txn);
     860             : 
     861             :     /* Pop the error context stack */
     862        2404 :     error_context_stack = errcallback.previous;
     863        2404 : }
     864             : 
     865             : static void
     866        2400 : commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
     867             :                   XLogRecPtr commit_lsn)
     868             : {
     869        2400 :     LogicalDecodingContext *ctx = cache->private_data;
     870             :     LogicalErrorCallbackState state;
     871             :     ErrorContextCallback errcallback;
     872             : 
     873             :     Assert(!ctx->fast_forward);
     874             : 
     875             :     /* Push callback + info on the error context stack */
     876        2400 :     state.ctx = ctx;
     877        2400 :     state.callback_name = "commit";
     878        2400 :     state.report_location = txn->final_lsn; /* beginning of commit record */
     879        2400 :     errcallback.callback = output_plugin_error_callback;
     880        2400 :     errcallback.arg = &state;
     881        2400 :     errcallback.previous = error_context_stack;
     882        2400 :     error_context_stack = &errcallback;
     883             : 
     884             :     /* set output state */
     885        2400 :     ctx->accept_writes = true;
     886        2400 :     ctx->write_xid = txn->xid;
     887        2400 :     ctx->write_location = txn->end_lsn; /* points to the end of the record */
     888        2400 :     ctx->end_xact = true;
     889             : 
     890             :     /* do the actual work: call callback */
     891        2400 :     ctx->callbacks.commit_cb(ctx, txn, commit_lsn);
     892             : 
     893             :     /* Pop the error context stack */
     894        2380 :     error_context_stack = errcallback.previous;
     895        2380 : }
     896             : 
     897             : /*
     898             :  * The functionality of begin_prepare is quite similar to begin with the
     899             :  * exception that this will have gid (global transaction id) information which
     900             :  * can be used by plugin. Now, we thought about extending the existing begin
     901             :  * but that would break the replication protocol and additionally this looks
     902             :  * cleaner.
     903             :  */
     904             : static void
     905          60 : begin_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
     906             : {
     907          60 :     LogicalDecodingContext *ctx = cache->private_data;
     908             :     LogicalErrorCallbackState state;
     909             :     ErrorContextCallback errcallback;
     910             : 
     911             :     Assert(!ctx->fast_forward);
     912             : 
     913             :     /* We're only supposed to call this when two-phase commits are supported */
     914             :     Assert(ctx->twophase);
     915             : 
     916             :     /* Push callback + info on the error context stack */
     917          60 :     state.ctx = ctx;
     918          60 :     state.callback_name = "begin_prepare";
     919          60 :     state.report_location = txn->first_lsn;
     920          60 :     errcallback.callback = output_plugin_error_callback;
     921          60 :     errcallback.arg = &state;
     922          60 :     errcallback.previous = error_context_stack;
     923          60 :     error_context_stack = &errcallback;
     924             : 
     925             :     /* set output state */
     926          60 :     ctx->accept_writes = true;
     927          60 :     ctx->write_xid = txn->xid;
     928          60 :     ctx->write_location = txn->first_lsn;
     929          60 :     ctx->end_xact = false;
     930             : 
     931             :     /*
     932             :      * If the plugin supports two-phase commits then begin prepare callback is
     933             :      * mandatory
     934             :      */
     935          60 :     if (ctx->callbacks.begin_prepare_cb == NULL)
     936           0 :         ereport(ERROR,
     937             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     938             :                  errmsg("logical replication at prepare time requires a %s callback",
     939             :                         "begin_prepare_cb")));
     940             : 
     941             :     /* do the actual work: call callback */
     942          60 :     ctx->callbacks.begin_prepare_cb(ctx, txn);
     943             : 
     944             :     /* Pop the error context stack */
     945          60 :     error_context_stack = errcallback.previous;
     946          60 : }
     947             : 
     948             : static void
     949          60 : prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
     950             :                    XLogRecPtr prepare_lsn)
     951             : {
     952          60 :     LogicalDecodingContext *ctx = cache->private_data;
     953             :     LogicalErrorCallbackState state;
     954             :     ErrorContextCallback errcallback;
     955             : 
     956             :     Assert(!ctx->fast_forward);
     957             : 
     958             :     /* We're only supposed to call this when two-phase commits are supported */
     959             :     Assert(ctx->twophase);
     960             : 
     961             :     /* Push callback + info on the error context stack */
     962          60 :     state.ctx = ctx;
     963          60 :     state.callback_name = "prepare";
     964          60 :     state.report_location = txn->final_lsn; /* beginning of prepare record */
     965          60 :     errcallback.callback = output_plugin_error_callback;
     966          60 :     errcallback.arg = &state;
     967          60 :     errcallback.previous = error_context_stack;
     968          60 :     error_context_stack = &errcallback;
     969             : 
     970             :     /* set output state */
     971          60 :     ctx->accept_writes = true;
     972          60 :     ctx->write_xid = txn->xid;
     973          60 :     ctx->write_location = txn->end_lsn; /* points to the end of the record */
     974          60 :     ctx->end_xact = true;
     975             : 
     976             :     /*
     977             :      * If the plugin supports two-phase commits then prepare callback is
     978             :      * mandatory
     979             :      */
     980          60 :     if (ctx->callbacks.prepare_cb == NULL)
     981           0 :         ereport(ERROR,
     982             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     983             :                  errmsg("logical replication at prepare time requires a %s callback",
     984             :                         "prepare_cb")));
     985             : 
     986             :     /* do the actual work: call callback */
     987          60 :     ctx->callbacks.prepare_cb(ctx, txn, prepare_lsn);
     988             : 
     989             :     /* Pop the error context stack */
     990          60 :     error_context_stack = errcallback.previous;
     991          60 : }
     992             : 
     993             : static void
     994          64 : commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
     995             :                            XLogRecPtr commit_lsn)
     996             : {
     997          64 :     LogicalDecodingContext *ctx = cache->private_data;
     998             :     LogicalErrorCallbackState state;
     999             :     ErrorContextCallback errcallback;
    1000             : 
    1001             :     Assert(!ctx->fast_forward);
    1002             : 
    1003             :     /* We're only supposed to call this when two-phase commits are supported */
    1004             :     Assert(ctx->twophase);
    1005             : 
    1006             :     /* Push callback + info on the error context stack */
    1007          64 :     state.ctx = ctx;
    1008          64 :     state.callback_name = "commit_prepared";
    1009          64 :     state.report_location = txn->final_lsn; /* beginning of commit record */
    1010          64 :     errcallback.callback = output_plugin_error_callback;
    1011          64 :     errcallback.arg = &state;
    1012          64 :     errcallback.previous = error_context_stack;
    1013          64 :     error_context_stack = &errcallback;
    1014             : 
    1015             :     /* set output state */
    1016          64 :     ctx->accept_writes = true;
    1017          64 :     ctx->write_xid = txn->xid;
    1018          64 :     ctx->write_location = txn->end_lsn; /* points to the end of the record */
    1019          64 :     ctx->end_xact = true;
    1020             : 
    1021             :     /*
    1022             :      * If the plugin support two-phase commits then commit prepared callback
    1023             :      * is mandatory
    1024             :      */
    1025          64 :     if (ctx->callbacks.commit_prepared_cb == NULL)
    1026           0 :         ereport(ERROR,
    1027             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1028             :                  errmsg("logical replication at prepare time requires a %s callback",
    1029             :                         "commit_prepared_cb")));
    1030             : 
    1031             :     /* do the actual work: call callback */
    1032          64 :     ctx->callbacks.commit_prepared_cb(ctx, txn, commit_lsn);
    1033             : 
    1034             :     /* Pop the error context stack */
    1035          64 :     error_context_stack = errcallback.previous;
    1036          64 : }
    1037             : 
    1038             : static void
    1039          20 : rollback_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1040             :                              XLogRecPtr prepare_end_lsn,
    1041             :                              TimestampTz prepare_time)
    1042             : {
    1043          20 :     LogicalDecodingContext *ctx = cache->private_data;
    1044             :     LogicalErrorCallbackState state;
    1045             :     ErrorContextCallback errcallback;
    1046             : 
    1047             :     Assert(!ctx->fast_forward);
    1048             : 
    1049             :     /* We're only supposed to call this when two-phase commits are supported */
    1050             :     Assert(ctx->twophase);
    1051             : 
    1052             :     /* Push callback + info on the error context stack */
    1053          20 :     state.ctx = ctx;
    1054          20 :     state.callback_name = "rollback_prepared";
    1055          20 :     state.report_location = txn->final_lsn; /* beginning of commit record */
    1056          20 :     errcallback.callback = output_plugin_error_callback;
    1057          20 :     errcallback.arg = &state;
    1058          20 :     errcallback.previous = error_context_stack;
    1059          20 :     error_context_stack = &errcallback;
    1060             : 
    1061             :     /* set output state */
    1062          20 :     ctx->accept_writes = true;
    1063          20 :     ctx->write_xid = txn->xid;
    1064          20 :     ctx->write_location = txn->end_lsn; /* points to the end of the record */
    1065          20 :     ctx->end_xact = true;
    1066             : 
    1067             :     /*
    1068             :      * If the plugin support two-phase commits then rollback prepared callback
    1069             :      * is mandatory
    1070             :      */
    1071          20 :     if (ctx->callbacks.rollback_prepared_cb == NULL)
    1072           0 :         ereport(ERROR,
    1073             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1074             :                  errmsg("logical replication at prepare time requires a %s callback",
    1075             :                         "rollback_prepared_cb")));
    1076             : 
    1077             :     /* do the actual work: call callback */
    1078          20 :     ctx->callbacks.rollback_prepared_cb(ctx, txn, prepare_end_lsn,
    1079             :                                         prepare_time);
    1080             : 
    1081             :     /* Pop the error context stack */
    1082          20 :     error_context_stack = errcallback.previous;
    1083          20 : }
    1084             : 
    1085             : static void
    1086      315998 : change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1087             :                   Relation relation, ReorderBufferChange *change)
    1088             : {
    1089      315998 :     LogicalDecodingContext *ctx = cache->private_data;
    1090             :     LogicalErrorCallbackState state;
    1091             :     ErrorContextCallback errcallback;
    1092             : 
    1093             :     Assert(!ctx->fast_forward);
    1094             : 
    1095             :     /* Push callback + info on the error context stack */
    1096      315998 :     state.ctx = ctx;
    1097      315998 :     state.callback_name = "change";
    1098      315998 :     state.report_location = change->lsn;
    1099      315998 :     errcallback.callback = output_plugin_error_callback;
    1100      315998 :     errcallback.arg = &state;
    1101      315998 :     errcallback.previous = error_context_stack;
    1102      315998 :     error_context_stack = &errcallback;
    1103             : 
    1104             :     /* set output state */
    1105      315998 :     ctx->accept_writes = true;
    1106      315998 :     ctx->write_xid = txn->xid;
    1107             : 
    1108             :     /*
    1109             :      * Report this change's lsn so replies from clients can give an up-to-date
    1110             :      * answer. This won't ever be enough (and shouldn't be!) to confirm
    1111             :      * receipt of this transaction, but it might allow another transaction's
    1112             :      * commit to be confirmed with one message.
    1113             :      */
    1114      315998 :     ctx->write_location = change->lsn;
    1115             : 
    1116      315998 :     ctx->end_xact = false;
    1117             : 
    1118      315998 :     ctx->callbacks.change_cb(ctx, txn, relation, change);
    1119             : 
    1120             :     /* Pop the error context stack */
    1121      315994 :     error_context_stack = errcallback.previous;
    1122      315994 : }
    1123             : 
    1124             : static void
    1125          44 : truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1126             :                     int nrelations, Relation relations[], ReorderBufferChange *change)
    1127             : {
    1128          44 :     LogicalDecodingContext *ctx = cache->private_data;
    1129             :     LogicalErrorCallbackState state;
    1130             :     ErrorContextCallback errcallback;
    1131             : 
    1132             :     Assert(!ctx->fast_forward);
    1133             : 
    1134          44 :     if (!ctx->callbacks.truncate_cb)
    1135           0 :         return;
    1136             : 
    1137             :     /* Push callback + info on the error context stack */
    1138          44 :     state.ctx = ctx;
    1139          44 :     state.callback_name = "truncate";
    1140          44 :     state.report_location = change->lsn;
    1141          44 :     errcallback.callback = output_plugin_error_callback;
    1142          44 :     errcallback.arg = &state;
    1143          44 :     errcallback.previous = error_context_stack;
    1144          44 :     error_context_stack = &errcallback;
    1145             : 
    1146             :     /* set output state */
    1147          44 :     ctx->accept_writes = true;
    1148          44 :     ctx->write_xid = txn->xid;
    1149             : 
    1150             :     /*
    1151             :      * Report this change's lsn so replies from clients can give an up-to-date
    1152             :      * answer. This won't ever be enough (and shouldn't be!) to confirm
    1153             :      * receipt of this transaction, but it might allow another transaction's
    1154             :      * commit to be confirmed with one message.
    1155             :      */
    1156          44 :     ctx->write_location = change->lsn;
    1157             : 
    1158          44 :     ctx->end_xact = false;
    1159             : 
    1160          44 :     ctx->callbacks.truncate_cb(ctx, txn, nrelations, relations, change);
    1161             : 
    1162             :     /* Pop the error context stack */
    1163          44 :     error_context_stack = errcallback.previous;
    1164             : }
    1165             : 
    1166             : bool
    1167         296 : filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, TransactionId xid,
    1168             :                           const char *gid)
    1169             : {
    1170             :     LogicalErrorCallbackState state;
    1171             :     ErrorContextCallback errcallback;
    1172             :     bool        ret;
    1173             : 
    1174             :     Assert(!ctx->fast_forward);
    1175             : 
    1176             :     /* Push callback + info on the error context stack */
    1177         296 :     state.ctx = ctx;
    1178         296 :     state.callback_name = "filter_prepare";
    1179         296 :     state.report_location = InvalidXLogRecPtr;
    1180         296 :     errcallback.callback = output_plugin_error_callback;
    1181         296 :     errcallback.arg = &state;
    1182         296 :     errcallback.previous = error_context_stack;
    1183         296 :     error_context_stack = &errcallback;
    1184             : 
    1185             :     /* set output state */
    1186         296 :     ctx->accept_writes = false;
    1187         296 :     ctx->end_xact = false;
    1188             : 
    1189             :     /* do the actual work: call callback */
    1190         296 :     ret = ctx->callbacks.filter_prepare_cb(ctx, xid, gid);
    1191             : 
    1192             :     /* Pop the error context stack */
    1193         296 :     error_context_stack = errcallback.previous;
    1194             : 
    1195         296 :     return ret;
    1196             : }
    1197             : 
    1198             : bool
    1199     2469842 : filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
    1200             : {
    1201             :     LogicalErrorCallbackState state;
    1202             :     ErrorContextCallback errcallback;
    1203             :     bool        ret;
    1204             : 
    1205             :     Assert(!ctx->fast_forward);
    1206             : 
    1207             :     /* Push callback + info on the error context stack */
    1208     2469842 :     state.ctx = ctx;
    1209     2469842 :     state.callback_name = "filter_by_origin";
    1210     2469842 :     state.report_location = InvalidXLogRecPtr;
    1211     2469842 :     errcallback.callback = output_plugin_error_callback;
    1212     2469842 :     errcallback.arg = &state;
    1213     2469842 :     errcallback.previous = error_context_stack;
    1214     2469842 :     error_context_stack = &errcallback;
    1215             : 
    1216             :     /* set output state */
    1217     2469842 :     ctx->accept_writes = false;
    1218     2469842 :     ctx->end_xact = false;
    1219             : 
    1220             :     /* do the actual work: call callback */
    1221     2469842 :     ret = ctx->callbacks.filter_by_origin_cb(ctx, origin_id);
    1222             : 
    1223             :     /* Pop the error context stack */
    1224     2469842 :     error_context_stack = errcallback.previous;
    1225             : 
    1226     2469842 :     return ret;
    1227             : }
    1228             : 
    1229             : static void
    1230          32 : message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1231             :                    XLogRecPtr message_lsn, bool transactional,
    1232             :                    const char *prefix, Size message_size, const char *message)
    1233             : {
    1234          32 :     LogicalDecodingContext *ctx = cache->private_data;
    1235             :     LogicalErrorCallbackState state;
    1236             :     ErrorContextCallback errcallback;
    1237             : 
    1238             :     Assert(!ctx->fast_forward);
    1239             : 
    1240          32 :     if (ctx->callbacks.message_cb == NULL)
    1241           0 :         return;
    1242             : 
    1243             :     /* Push callback + info on the error context stack */
    1244          32 :     state.ctx = ctx;
    1245          32 :     state.callback_name = "message";
    1246          32 :     state.report_location = message_lsn;
    1247          32 :     errcallback.callback = output_plugin_error_callback;
    1248          32 :     errcallback.arg = &state;
    1249          32 :     errcallback.previous = error_context_stack;
    1250          32 :     error_context_stack = &errcallback;
    1251             : 
    1252             :     /* set output state */
    1253          32 :     ctx->accept_writes = true;
    1254          32 :     ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
    1255          32 :     ctx->write_location = message_lsn;
    1256          32 :     ctx->end_xact = false;
    1257             : 
    1258             :     /* do the actual work: call callback */
    1259          32 :     ctx->callbacks.message_cb(ctx, txn, message_lsn, transactional, prefix,
    1260             :                               message_size, message);
    1261             : 
    1262             :     /* Pop the error context stack */
    1263          32 :     error_context_stack = errcallback.previous;
    1264             : }
    1265             : 
    1266             : static void
    1267        1316 : stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1268             :                         XLogRecPtr first_lsn)
    1269             : {
    1270        1316 :     LogicalDecodingContext *ctx = cache->private_data;
    1271             :     LogicalErrorCallbackState state;
    1272             :     ErrorContextCallback errcallback;
    1273             : 
    1274             :     Assert(!ctx->fast_forward);
    1275             : 
    1276             :     /* We're only supposed to call this when streaming is supported. */
    1277             :     Assert(ctx->streaming);
    1278             : 
    1279             :     /* Push callback + info on the error context stack */
    1280        1316 :     state.ctx = ctx;
    1281        1316 :     state.callback_name = "stream_start";
    1282        1316 :     state.report_location = first_lsn;
    1283        1316 :     errcallback.callback = output_plugin_error_callback;
    1284        1316 :     errcallback.arg = &state;
    1285        1316 :     errcallback.previous = error_context_stack;
    1286        1316 :     error_context_stack = &errcallback;
    1287             : 
    1288             :     /* set output state */
    1289        1316 :     ctx->accept_writes = true;
    1290        1316 :     ctx->write_xid = txn->xid;
    1291             : 
    1292             :     /*
    1293             :      * Report this message's lsn so replies from clients can give an
    1294             :      * up-to-date answer. This won't ever be enough (and shouldn't be!) to
    1295             :      * confirm receipt of this transaction, but it might allow another
    1296             :      * transaction's commit to be confirmed with one message.
    1297             :      */
    1298        1316 :     ctx->write_location = first_lsn;
    1299             : 
    1300        1316 :     ctx->end_xact = false;
    1301             : 
    1302             :     /* in streaming mode, stream_start_cb is required */
    1303        1316 :     if (ctx->callbacks.stream_start_cb == NULL)
    1304           0 :         ereport(ERROR,
    1305             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1306             :                  errmsg("logical streaming requires a %s callback",
    1307             :                         "stream_start_cb")));
    1308             : 
    1309        1316 :     ctx->callbacks.stream_start_cb(ctx, txn);
    1310             : 
    1311             :     /* Pop the error context stack */
    1312        1316 :     error_context_stack = errcallback.previous;
    1313        1316 : }
    1314             : 
    1315             : static void
    1316        1316 : stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1317             :                        XLogRecPtr last_lsn)
    1318             : {
    1319        1316 :     LogicalDecodingContext *ctx = cache->private_data;
    1320             :     LogicalErrorCallbackState state;
    1321             :     ErrorContextCallback errcallback;
    1322             : 
    1323             :     Assert(!ctx->fast_forward);
    1324             : 
    1325             :     /* We're only supposed to call this when streaming is supported. */
    1326             :     Assert(ctx->streaming);
    1327             : 
    1328             :     /* Push callback + info on the error context stack */
    1329        1316 :     state.ctx = ctx;
    1330        1316 :     state.callback_name = "stream_stop";
    1331        1316 :     state.report_location = last_lsn;
    1332        1316 :     errcallback.callback = output_plugin_error_callback;
    1333        1316 :     errcallback.arg = &state;
    1334        1316 :     errcallback.previous = error_context_stack;
    1335        1316 :     error_context_stack = &errcallback;
    1336             : 
    1337             :     /* set output state */
    1338        1316 :     ctx->accept_writes = true;
    1339        1316 :     ctx->write_xid = txn->xid;
    1340             : 
    1341             :     /*
    1342             :      * Report this message's lsn so replies from clients can give an
    1343             :      * up-to-date answer. This won't ever be enough (and shouldn't be!) to
    1344             :      * confirm receipt of this transaction, but it might allow another
    1345             :      * transaction's commit to be confirmed with one message.
    1346             :      */
    1347        1316 :     ctx->write_location = last_lsn;
    1348             : 
    1349        1316 :     ctx->end_xact = false;
    1350             : 
    1351             :     /* in streaming mode, stream_stop_cb is required */
    1352        1316 :     if (ctx->callbacks.stream_stop_cb == NULL)
    1353           0 :         ereport(ERROR,
    1354             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1355             :                  errmsg("logical streaming requires a %s callback",
    1356             :                         "stream_stop_cb")));
    1357             : 
    1358        1316 :     ctx->callbacks.stream_stop_cb(ctx, txn);
    1359             : 
    1360             :     /* Pop the error context stack */
    1361        1316 :     error_context_stack = errcallback.previous;
    1362        1316 : }
    1363             : 
    1364             : static void
    1365          60 : stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1366             :                         XLogRecPtr abort_lsn)
    1367             : {
    1368          60 :     LogicalDecodingContext *ctx = cache->private_data;
    1369             :     LogicalErrorCallbackState state;
    1370             :     ErrorContextCallback errcallback;
    1371             : 
    1372             :     Assert(!ctx->fast_forward);
    1373             : 
    1374             :     /* We're only supposed to call this when streaming is supported. */
    1375             :     Assert(ctx->streaming);
    1376             : 
    1377             :     /* Push callback + info on the error context stack */
    1378          60 :     state.ctx = ctx;
    1379          60 :     state.callback_name = "stream_abort";
    1380          60 :     state.report_location = abort_lsn;
    1381          60 :     errcallback.callback = output_plugin_error_callback;
    1382          60 :     errcallback.arg = &state;
    1383          60 :     errcallback.previous = error_context_stack;
    1384          60 :     error_context_stack = &errcallback;
    1385             : 
    1386             :     /* set output state */
    1387          60 :     ctx->accept_writes = true;
    1388          60 :     ctx->write_xid = txn->xid;
    1389          60 :     ctx->write_location = abort_lsn;
    1390          60 :     ctx->end_xact = true;
    1391             : 
    1392             :     /* in streaming mode, stream_abort_cb is required */
    1393          60 :     if (ctx->callbacks.stream_abort_cb == NULL)
    1394           0 :         ereport(ERROR,
    1395             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1396             :                  errmsg("logical streaming requires a %s callback",
    1397             :                         "stream_abort_cb")));
    1398             : 
    1399          60 :     ctx->callbacks.stream_abort_cb(ctx, txn, abort_lsn);
    1400             : 
    1401             :     /* Pop the error context stack */
    1402          60 :     error_context_stack = errcallback.previous;
    1403          60 : }
    1404             : 
    1405             : static void
    1406          24 : stream_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1407             :                           XLogRecPtr prepare_lsn)
    1408             : {
    1409          24 :     LogicalDecodingContext *ctx = cache->private_data;
    1410             :     LogicalErrorCallbackState state;
    1411             :     ErrorContextCallback errcallback;
    1412             : 
    1413             :     Assert(!ctx->fast_forward);
    1414             : 
    1415             :     /*
    1416             :      * We're only supposed to call this when streaming and two-phase commits
    1417             :      * are supported.
    1418             :      */
    1419             :     Assert(ctx->streaming);
    1420             :     Assert(ctx->twophase);
    1421             : 
    1422             :     /* Push callback + info on the error context stack */
    1423          24 :     state.ctx = ctx;
    1424          24 :     state.callback_name = "stream_prepare";
    1425          24 :     state.report_location = txn->final_lsn;
    1426          24 :     errcallback.callback = output_plugin_error_callback;
    1427          24 :     errcallback.arg = &state;
    1428          24 :     errcallback.previous = error_context_stack;
    1429          24 :     error_context_stack = &errcallback;
    1430             : 
    1431             :     /* set output state */
    1432          24 :     ctx->accept_writes = true;
    1433          24 :     ctx->write_xid = txn->xid;
    1434          24 :     ctx->write_location = txn->end_lsn;
    1435          24 :     ctx->end_xact = true;
    1436             : 
    1437             :     /* in streaming mode with two-phase commits, stream_prepare_cb is required */
    1438          24 :     if (ctx->callbacks.stream_prepare_cb == NULL)
    1439           0 :         ereport(ERROR,
    1440             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1441             :                  errmsg("logical streaming at prepare time requires a %s callback",
    1442             :                         "stream_prepare_cb")));
    1443             : 
    1444          24 :     ctx->callbacks.stream_prepare_cb(ctx, txn, prepare_lsn);
    1445             : 
    1446             :     /* Pop the error context stack */
    1447          24 :     error_context_stack = errcallback.previous;
    1448          24 : }
    1449             : 
    1450             : static void
    1451         100 : stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1452             :                          XLogRecPtr commit_lsn)
    1453             : {
    1454         100 :     LogicalDecodingContext *ctx = cache->private_data;
    1455             :     LogicalErrorCallbackState state;
    1456             :     ErrorContextCallback errcallback;
    1457             : 
    1458             :     Assert(!ctx->fast_forward);
    1459             : 
    1460             :     /* We're only supposed to call this when streaming is supported. */
    1461             :     Assert(ctx->streaming);
    1462             : 
    1463             :     /* Push callback + info on the error context stack */
    1464         100 :     state.ctx = ctx;
    1465         100 :     state.callback_name = "stream_commit";
    1466         100 :     state.report_location = txn->final_lsn;
    1467         100 :     errcallback.callback = output_plugin_error_callback;
    1468         100 :     errcallback.arg = &state;
    1469         100 :     errcallback.previous = error_context_stack;
    1470         100 :     error_context_stack = &errcallback;
    1471             : 
    1472             :     /* set output state */
    1473         100 :     ctx->accept_writes = true;
    1474         100 :     ctx->write_xid = txn->xid;
    1475         100 :     ctx->write_location = txn->end_lsn;
    1476         100 :     ctx->end_xact = true;
    1477             : 
    1478             :     /* in streaming mode, stream_commit_cb is required */
    1479         100 :     if (ctx->callbacks.stream_commit_cb == NULL)
    1480           0 :         ereport(ERROR,
    1481             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1482             :                  errmsg("logical streaming requires a %s callback",
    1483             :                         "stream_commit_cb")));
    1484             : 
    1485         100 :     ctx->callbacks.stream_commit_cb(ctx, txn, commit_lsn);
    1486             : 
    1487             :     /* Pop the error context stack */
    1488         100 :     error_context_stack = errcallback.previous;
    1489         100 : }
    1490             : 
    1491             : static void
    1492      351972 : stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1493             :                          Relation relation, ReorderBufferChange *change)
    1494             : {
    1495      351972 :     LogicalDecodingContext *ctx = cache->private_data;
    1496             :     LogicalErrorCallbackState state;
    1497             :     ErrorContextCallback errcallback;
    1498             : 
    1499             :     Assert(!ctx->fast_forward);
    1500             : 
    1501             :     /* We're only supposed to call this when streaming is supported. */
    1502             :     Assert(ctx->streaming);
    1503             : 
    1504             :     /* Push callback + info on the error context stack */
    1505      351972 :     state.ctx = ctx;
    1506      351972 :     state.callback_name = "stream_change";
    1507      351972 :     state.report_location = change->lsn;
    1508      351972 :     errcallback.callback = output_plugin_error_callback;
    1509      351972 :     errcallback.arg = &state;
    1510      351972 :     errcallback.previous = error_context_stack;
    1511      351972 :     error_context_stack = &errcallback;
    1512             : 
    1513             :     /* set output state */
    1514      351972 :     ctx->accept_writes = true;
    1515      351972 :     ctx->write_xid = txn->xid;
    1516             : 
    1517             :     /*
    1518             :      * Report this change's lsn so replies from clients can give an up-to-date
    1519             :      * answer. This won't ever be enough (and shouldn't be!) to confirm
    1520             :      * receipt of this transaction, but it might allow another transaction's
    1521             :      * commit to be confirmed with one message.
    1522             :      */
    1523      351972 :     ctx->write_location = change->lsn;
    1524             : 
    1525      351972 :     ctx->end_xact = false;
    1526             : 
    1527             :     /* in streaming mode, stream_change_cb is required */
    1528      351972 :     if (ctx->callbacks.stream_change_cb == NULL)
    1529           0 :         ereport(ERROR,
    1530             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1531             :                  errmsg("logical streaming requires a %s callback",
    1532             :                         "stream_change_cb")));
    1533             : 
    1534      351972 :     ctx->callbacks.stream_change_cb(ctx, txn, relation, change);
    1535             : 
    1536             :     /* Pop the error context stack */
    1537      351972 :     error_context_stack = errcallback.previous;
    1538      351972 : }
    1539             : 
    1540             : static void
    1541           6 : stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1542             :                           XLogRecPtr message_lsn, bool transactional,
    1543             :                           const char *prefix, Size message_size, const char *message)
    1544             : {
    1545           6 :     LogicalDecodingContext *ctx = cache->private_data;
    1546             :     LogicalErrorCallbackState state;
    1547             :     ErrorContextCallback errcallback;
    1548             : 
    1549             :     Assert(!ctx->fast_forward);
    1550             : 
    1551             :     /* We're only supposed to call this when streaming is supported. */
    1552             :     Assert(ctx->streaming);
    1553             : 
    1554             :     /* this callback is optional */
    1555           6 :     if (ctx->callbacks.stream_message_cb == NULL)
    1556           0 :         return;
    1557             : 
    1558             :     /* Push callback + info on the error context stack */
    1559           6 :     state.ctx = ctx;
    1560           6 :     state.callback_name = "stream_message";
    1561           6 :     state.report_location = message_lsn;
    1562           6 :     errcallback.callback = output_plugin_error_callback;
    1563           6 :     errcallback.arg = &state;
    1564           6 :     errcallback.previous = error_context_stack;
    1565           6 :     error_context_stack = &errcallback;
    1566             : 
    1567             :     /* set output state */
    1568           6 :     ctx->accept_writes = true;
    1569           6 :     ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
    1570           6 :     ctx->write_location = message_lsn;
    1571           6 :     ctx->end_xact = false;
    1572             : 
    1573             :     /* do the actual work: call callback */
    1574           6 :     ctx->callbacks.stream_message_cb(ctx, txn, message_lsn, transactional, prefix,
    1575             :                                      message_size, message);
    1576             : 
    1577             :     /* Pop the error context stack */
    1578           6 :     error_context_stack = errcallback.previous;
    1579             : }
    1580             : 
    1581             : static void
    1582           0 : stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1583             :                            int nrelations, Relation relations[],
    1584             :                            ReorderBufferChange *change)
    1585             : {
    1586           0 :     LogicalDecodingContext *ctx = cache->private_data;
    1587             :     LogicalErrorCallbackState state;
    1588             :     ErrorContextCallback errcallback;
    1589             : 
    1590             :     Assert(!ctx->fast_forward);
    1591             : 
    1592             :     /* We're only supposed to call this when streaming is supported. */
    1593             :     Assert(ctx->streaming);
    1594             : 
    1595             :     /* this callback is optional */
    1596           0 :     if (!ctx->callbacks.stream_truncate_cb)
    1597           0 :         return;
    1598             : 
    1599             :     /* Push callback + info on the error context stack */
    1600           0 :     state.ctx = ctx;
    1601           0 :     state.callback_name = "stream_truncate";
    1602           0 :     state.report_location = change->lsn;
    1603           0 :     errcallback.callback = output_plugin_error_callback;
    1604           0 :     errcallback.arg = &state;
    1605           0 :     errcallback.previous = error_context_stack;
    1606           0 :     error_context_stack = &errcallback;
    1607             : 
    1608             :     /* set output state */
    1609           0 :     ctx->accept_writes = true;
    1610           0 :     ctx->write_xid = txn->xid;
    1611             : 
    1612             :     /*
    1613             :      * Report this change's lsn so replies from clients can give an up-to-date
    1614             :      * answer. This won't ever be enough (and shouldn't be!) to confirm
    1615             :      * receipt of this transaction, but it might allow another transaction's
    1616             :      * commit to be confirmed with one message.
    1617             :      */
    1618           0 :     ctx->write_location = change->lsn;
    1619             : 
    1620           0 :     ctx->end_xact = false;
    1621             : 
    1622           0 :     ctx->callbacks.stream_truncate_cb(ctx, txn, nrelations, relations, change);
    1623             : 
    1624             :     /* Pop the error context stack */
    1625           0 :     error_context_stack = errcallback.previous;
    1626             : }
    1627             : 
    1628             : static void
    1629        6214 : update_progress_txn_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1630             :                                XLogRecPtr lsn)
    1631             : {
    1632        6214 :     LogicalDecodingContext *ctx = cache->private_data;
    1633             :     LogicalErrorCallbackState state;
    1634             :     ErrorContextCallback errcallback;
    1635             : 
    1636             :     Assert(!ctx->fast_forward);
    1637             : 
    1638             :     /* Push callback + info on the error context stack */
    1639        6214 :     state.ctx = ctx;
    1640        6214 :     state.callback_name = "update_progress_txn";
    1641        6214 :     state.report_location = lsn;
    1642        6214 :     errcallback.callback = output_plugin_error_callback;
    1643        6214 :     errcallback.arg = &state;
    1644        6214 :     errcallback.previous = error_context_stack;
    1645        6214 :     error_context_stack = &errcallback;
    1646             : 
    1647             :     /* set output state */
    1648        6214 :     ctx->accept_writes = false;
    1649        6214 :     ctx->write_xid = txn->xid;
    1650             : 
    1651             :     /*
    1652             :      * Report this change's lsn so replies from clients can give an up-to-date
    1653             :      * answer. This won't ever be enough (and shouldn't be!) to confirm
    1654             :      * receipt of this transaction, but it might allow another transaction's
    1655             :      * commit to be confirmed with one message.
    1656             :      */
    1657        6214 :     ctx->write_location = lsn;
    1658             : 
    1659        6214 :     ctx->end_xact = false;
    1660             : 
    1661        6214 :     OutputPluginUpdateProgress(ctx, false);
    1662             : 
    1663             :     /* Pop the error context stack */
    1664        6214 :     error_context_stack = errcallback.previous;
    1665        6214 : }
    1666             : 
    1667             : /*
    1668             :  * Set the required catalog xmin horizon for historic snapshots in the current
    1669             :  * replication slot.
    1670             :  *
    1671             :  * Note that in the most cases, we won't be able to immediately use the xmin
    1672             :  * to increase the xmin horizon: we need to wait till the client has confirmed
    1673             :  * receiving current_lsn with LogicalConfirmReceivedLocation().
    1674             :  */
    1675             : void
    1676         794 : LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
    1677             : {
    1678         794 :     bool        updated_xmin = false;
    1679             :     ReplicationSlot *slot;
    1680         794 :     bool        got_new_xmin = false;
    1681             : 
    1682         794 :     slot = MyReplicationSlot;
    1683             : 
    1684             :     Assert(slot != NULL);
    1685             : 
    1686         794 :     SpinLockAcquire(&slot->mutex);
    1687             : 
    1688             :     /*
    1689             :      * don't overwrite if we already have a newer xmin. This can happen if we
    1690             :      * restart decoding in a slot.
    1691             :      */
    1692         794 :     if (TransactionIdPrecedesOrEquals(xmin, slot->data.catalog_xmin))
    1693             :     {
    1694             :     }
    1695             : 
    1696             :     /*
    1697             :      * If the client has already confirmed up to this lsn, we directly can
    1698             :      * mark this as accepted. This can happen if we restart decoding in a
    1699             :      * slot.
    1700             :      */
    1701         198 :     else if (current_lsn <= slot->data.confirmed_flush)
    1702             :     {
    1703         102 :         slot->candidate_catalog_xmin = xmin;
    1704         102 :         slot->candidate_xmin_lsn = current_lsn;
    1705             : 
    1706             :         /* our candidate can directly be used */
    1707         102 :         updated_xmin = true;
    1708             :     }
    1709             : 
    1710             :     /*
    1711             :      * Only increase if the previous values have been applied, otherwise we
    1712             :      * might never end up updating if the receiver acks too slowly.
    1713             :      */
    1714          96 :     else if (slot->candidate_xmin_lsn == InvalidXLogRecPtr)
    1715             :     {
    1716          46 :         slot->candidate_catalog_xmin = xmin;
    1717          46 :         slot->candidate_xmin_lsn = current_lsn;
    1718             : 
    1719             :         /*
    1720             :          * Log new xmin at an appropriate log level after releasing the
    1721             :          * spinlock.
    1722             :          */
    1723          46 :         got_new_xmin = true;
    1724             :     }
    1725         794 :     SpinLockRelease(&slot->mutex);
    1726             : 
    1727         794 :     if (got_new_xmin)
    1728          46 :         elog(DEBUG1, "got new catalog xmin %u at %X/%X", xmin,
    1729             :              LSN_FORMAT_ARGS(current_lsn));
    1730             : 
    1731             :     /* candidate already valid with the current flush position, apply */
    1732         794 :     if (updated_xmin)
    1733         102 :         LogicalConfirmReceivedLocation(slot->data.confirmed_flush);
    1734         794 : }
    1735             : 
    1736             : /*
    1737             :  * Mark the minimal LSN (restart_lsn) we need to read to replay all
    1738             :  * transactions that have not yet committed at current_lsn.
    1739             :  *
    1740             :  * Just like LogicalIncreaseXminForSlot this only takes effect when the
    1741             :  * client has confirmed to have received current_lsn.
    1742             :  */
    1743             : void
    1744         692 : LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart_lsn)
    1745             : {
    1746         692 :     bool        updated_lsn = false;
    1747             :     ReplicationSlot *slot;
    1748             : 
    1749         692 :     slot = MyReplicationSlot;
    1750             : 
    1751             :     Assert(slot != NULL);
    1752             :     Assert(restart_lsn != InvalidXLogRecPtr);
    1753             :     Assert(current_lsn != InvalidXLogRecPtr);
    1754             : 
    1755         692 :     SpinLockAcquire(&slot->mutex);
    1756             : 
    1757             :     /* don't overwrite if have a newer restart lsn */
    1758         692 :     if (restart_lsn <= slot->data.restart_lsn)
    1759             :     {
    1760          20 :         SpinLockRelease(&slot->mutex);
    1761             :     }
    1762             : 
    1763             :     /*
    1764             :      * We might have already flushed far enough to directly accept this lsn,
    1765             :      * in this case there is no need to check for existing candidate LSNs
    1766             :      */
    1767         672 :     else if (current_lsn <= slot->data.confirmed_flush)
    1768             :     {
    1769         534 :         slot->candidate_restart_valid = current_lsn;
    1770         534 :         slot->candidate_restart_lsn = restart_lsn;
    1771         534 :         SpinLockRelease(&slot->mutex);
    1772             : 
    1773             :         /* our candidate can directly be used */
    1774         534 :         updated_lsn = true;
    1775             :     }
    1776             : 
    1777             :     /*
    1778             :      * Only increase if the previous values have been applied, otherwise we
    1779             :      * might never end up updating if the receiver acks too slowly. A missed
    1780             :      * value here will just cause some extra effort after reconnecting.
    1781             :      */
    1782         138 :     else if (slot->candidate_restart_valid == InvalidXLogRecPtr)
    1783             :     {
    1784          76 :         slot->candidate_restart_valid = current_lsn;
    1785          76 :         slot->candidate_restart_lsn = restart_lsn;
    1786          76 :         SpinLockRelease(&slot->mutex);
    1787             : 
    1788          76 :         elog(DEBUG1, "got new restart lsn %X/%X at %X/%X",
    1789             :              LSN_FORMAT_ARGS(restart_lsn),
    1790             :              LSN_FORMAT_ARGS(current_lsn));
    1791             :     }
    1792             :     else
    1793             :     {
    1794             :         XLogRecPtr  candidate_restart_lsn;
    1795             :         XLogRecPtr  candidate_restart_valid;
    1796             :         XLogRecPtr  confirmed_flush;
    1797             : 
    1798          62 :         candidate_restart_lsn = slot->candidate_restart_lsn;
    1799          62 :         candidate_restart_valid = slot->candidate_restart_valid;
    1800          62 :         confirmed_flush = slot->data.confirmed_flush;
    1801          62 :         SpinLockRelease(&slot->mutex);
    1802             : 
    1803          62 :         elog(DEBUG1, "failed to increase restart lsn: proposed %X/%X, after %X/%X, current candidate %X/%X, current after %X/%X, flushed up to %X/%X",
    1804             :              LSN_FORMAT_ARGS(restart_lsn),
    1805             :              LSN_FORMAT_ARGS(current_lsn),
    1806             :              LSN_FORMAT_ARGS(candidate_restart_lsn),
    1807             :              LSN_FORMAT_ARGS(candidate_restart_valid),
    1808             :              LSN_FORMAT_ARGS(confirmed_flush));
    1809             :     }
    1810             : 
    1811             :     /* candidates are already valid with the current flush position, apply */
    1812         692 :     if (updated_lsn)
    1813         534 :         LogicalConfirmReceivedLocation(slot->data.confirmed_flush);
    1814         692 : }
    1815             : 
    1816             : /*
    1817             :  * Handle a consumer's confirmation having received all changes up to lsn.
    1818             :  */
    1819             : void
    1820       93232 : LogicalConfirmReceivedLocation(XLogRecPtr lsn)
    1821             : {
    1822             :     Assert(lsn != InvalidXLogRecPtr);
    1823             : 
    1824             :     /* Do an unlocked check for candidate_lsn first. */
    1825       93232 :     if (MyReplicationSlot->candidate_xmin_lsn != InvalidXLogRecPtr ||
    1826       93092 :         MyReplicationSlot->candidate_restart_valid != InvalidXLogRecPtr)
    1827         708 :     {
    1828         708 :         bool        updated_xmin = false;
    1829         708 :         bool        updated_restart = false;
    1830             :         XLogRecPtr  restart_lsn pg_attribute_unused();
    1831             : 
    1832         708 :         SpinLockAcquire(&MyReplicationSlot->mutex);
    1833             : 
    1834             :         /* remember the old restart lsn */
    1835         708 :         restart_lsn = MyReplicationSlot->data.restart_lsn;
    1836             : 
    1837             :         /*
    1838             :          * Prevent moving the confirmed_flush backwards, as this could lead to
    1839             :          * data duplication issues caused by replicating already replicated
    1840             :          * changes.
    1841             :          *
    1842             :          * This can happen when a client acknowledges an LSN it doesn't have
    1843             :          * to do anything for, and thus didn't store persistently. After a
    1844             :          * restart, the client can send the prior LSN that it stored
    1845             :          * persistently as an acknowledgement, but we need to ignore such an
    1846             :          * LSN. See similar case handling in CreateDecodingContext.
    1847             :          */
    1848         708 :         if (lsn > MyReplicationSlot->data.confirmed_flush)
    1849          68 :             MyReplicationSlot->data.confirmed_flush = lsn;
    1850             : 
    1851             :         /* if we're past the location required for bumping xmin, do so */
    1852         708 :         if (MyReplicationSlot->candidate_xmin_lsn != InvalidXLogRecPtr &&
    1853         140 :             MyReplicationSlot->candidate_xmin_lsn <= lsn)
    1854             :         {
    1855             :             /*
    1856             :              * We have to write the changed xmin to disk *before* we change
    1857             :              * the in-memory value, otherwise after a crash we wouldn't know
    1858             :              * that some catalog tuples might have been removed already.
    1859             :              *
    1860             :              * Ensure that by first writing to ->xmin and only update
    1861             :              * ->effective_xmin once the new state is synced to disk. After a
    1862             :              * crash ->effective_xmin is set to ->xmin.
    1863             :              */
    1864         136 :             if (TransactionIdIsValid(MyReplicationSlot->candidate_catalog_xmin) &&
    1865         136 :                 MyReplicationSlot->data.catalog_xmin != MyReplicationSlot->candidate_catalog_xmin)
    1866             :             {
    1867         136 :                 MyReplicationSlot->data.catalog_xmin = MyReplicationSlot->candidate_catalog_xmin;
    1868         136 :                 MyReplicationSlot->candidate_catalog_xmin = InvalidTransactionId;
    1869         136 :                 MyReplicationSlot->candidate_xmin_lsn = InvalidXLogRecPtr;
    1870         136 :                 updated_xmin = true;
    1871             :             }
    1872             :         }
    1873             : 
    1874         708 :         if (MyReplicationSlot->candidate_restart_valid != InvalidXLogRecPtr &&
    1875         606 :             MyReplicationSlot->candidate_restart_valid <= lsn)
    1876             :         {
    1877             :             Assert(MyReplicationSlot->candidate_restart_lsn != InvalidXLogRecPtr);
    1878             : 
    1879         602 :             MyReplicationSlot->data.restart_lsn = MyReplicationSlot->candidate_restart_lsn;
    1880         602 :             MyReplicationSlot->candidate_restart_lsn = InvalidXLogRecPtr;
    1881         602 :             MyReplicationSlot->candidate_restart_valid = InvalidXLogRecPtr;
    1882         602 :             updated_restart = true;
    1883             :         }
    1884             : 
    1885         708 :         SpinLockRelease(&MyReplicationSlot->mutex);
    1886             : 
    1887             :         /* first write new xmin to disk, so we know what's up after a crash */
    1888         708 :         if (updated_xmin || updated_restart)
    1889             :         {
    1890             : #ifdef USE_INJECTION_POINTS
    1891             :             XLogSegNo   seg1,
    1892             :                         seg2;
    1893             : 
    1894         704 :             XLByteToSeg(restart_lsn, seg1, wal_segment_size);
    1895         704 :             XLByteToSeg(MyReplicationSlot->data.restart_lsn, seg2, wal_segment_size);
    1896             : 
    1897             :             /* trigger injection point, but only if segment changes */
    1898         704 :             if (seg1 != seg2)
    1899          12 :                 INJECTION_POINT("logical-replication-slot-advance-segment", NULL);
    1900             : #endif
    1901             : 
    1902         704 :             ReplicationSlotMarkDirty();
    1903         704 :             ReplicationSlotSave();
    1904         704 :             elog(DEBUG1, "updated xmin: %u restart: %u", updated_xmin, updated_restart);
    1905             :         }
    1906             : 
    1907             :         /*
    1908             :          * Now the new xmin is safely on disk, we can let the global value
    1909             :          * advance. We do not take ProcArrayLock or similar since we only
    1910             :          * advance xmin here and there's not much harm done by a concurrent
    1911             :          * computation missing that.
    1912             :          */
    1913         708 :         if (updated_xmin)
    1914             :         {
    1915         136 :             SpinLockAcquire(&MyReplicationSlot->mutex);
    1916         136 :             MyReplicationSlot->effective_catalog_xmin = MyReplicationSlot->data.catalog_xmin;
    1917         136 :             SpinLockRelease(&MyReplicationSlot->mutex);
    1918             : 
    1919         136 :             ReplicationSlotsComputeRequiredXmin(false);
    1920         136 :             ReplicationSlotsComputeRequiredLSN();
    1921             :         }
    1922             :     }
    1923             :     else
    1924             :     {
    1925       92524 :         SpinLockAcquire(&MyReplicationSlot->mutex);
    1926             : 
    1927             :         /*
    1928             :          * Prevent moving the confirmed_flush backwards. See comments above
    1929             :          * for the details.
    1930             :          */
    1931       92524 :         if (lsn > MyReplicationSlot->data.confirmed_flush)
    1932       91056 :             MyReplicationSlot->data.confirmed_flush = lsn;
    1933             : 
    1934       92524 :         SpinLockRelease(&MyReplicationSlot->mutex);
    1935             :     }
    1936       93232 : }
    1937             : 
    1938             : /*
    1939             :  * Clear logical streaming state during (sub)transaction abort.
    1940             :  */
    1941             : void
    1942       58412 : ResetLogicalStreamingState(void)
    1943             : {
    1944       58412 :     CheckXidAlive = InvalidTransactionId;
    1945       58412 :     bsysscan = false;
    1946       58412 : }
    1947             : 
    1948             : /*
    1949             :  * Report stats for a slot.
    1950             :  */
    1951             : void
    1952       10362 : UpdateDecodingStats(LogicalDecodingContext *ctx)
    1953             : {
    1954       10362 :     ReorderBuffer *rb = ctx->reorder;
    1955             :     PgStat_StatReplSlotEntry repSlotStat;
    1956             : 
    1957             :     /* Nothing to do if we don't have any replication stats to be sent. */
    1958       10362 :     if (rb->spillBytes <= 0 && rb->streamBytes <= 0 && rb->totalBytes <= 0)
    1959         484 :         return;
    1960             : 
    1961        9878 :     elog(DEBUG2, "UpdateDecodingStats: updating stats %p %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64,
    1962             :          rb,
    1963             :          rb->spillTxns,
    1964             :          rb->spillCount,
    1965             :          rb->spillBytes,
    1966             :          rb->streamTxns,
    1967             :          rb->streamCount,
    1968             :          rb->streamBytes,
    1969             :          rb->totalTxns,
    1970             :          rb->totalBytes);
    1971             : 
    1972        9878 :     repSlotStat.spill_txns = rb->spillTxns;
    1973        9878 :     repSlotStat.spill_count = rb->spillCount;
    1974        9878 :     repSlotStat.spill_bytes = rb->spillBytes;
    1975        9878 :     repSlotStat.stream_txns = rb->streamTxns;
    1976        9878 :     repSlotStat.stream_count = rb->streamCount;
    1977        9878 :     repSlotStat.stream_bytes = rb->streamBytes;
    1978        9878 :     repSlotStat.total_txns = rb->totalTxns;
    1979        9878 :     repSlotStat.total_bytes = rb->totalBytes;
    1980             : 
    1981        9878 :     pgstat_report_replslot(ctx->slot, &repSlotStat);
    1982             : 
    1983        9878 :     rb->spillTxns = 0;
    1984        9878 :     rb->spillCount = 0;
    1985        9878 :     rb->spillBytes = 0;
    1986        9878 :     rb->streamTxns = 0;
    1987        9878 :     rb->streamCount = 0;
    1988        9878 :     rb->streamBytes = 0;
    1989        9878 :     rb->totalTxns = 0;
    1990        9878 :     rb->totalBytes = 0;
    1991             : }
    1992             : 
    1993             : /*
    1994             :  * Read up to the end of WAL starting from the decoding slot's restart_lsn.
    1995             :  * Return true if any meaningful/decodable WAL records are encountered,
    1996             :  * otherwise false.
    1997             :  */
    1998             : bool
    1999          10 : LogicalReplicationSlotHasPendingWal(XLogRecPtr end_of_wal)
    2000             : {
    2001          10 :     bool        has_pending_wal = false;
    2002             : 
    2003             :     Assert(MyReplicationSlot);
    2004             : 
    2005          10 :     PG_TRY();
    2006             :     {
    2007             :         LogicalDecodingContext *ctx;
    2008             : 
    2009             :         /*
    2010             :          * Create our decoding context in fast_forward mode, passing start_lsn
    2011             :          * as InvalidXLogRecPtr, so that we start processing from the slot's
    2012             :          * confirmed_flush.
    2013             :          */
    2014          20 :         ctx = CreateDecodingContext(InvalidXLogRecPtr,
    2015             :                                     NIL,
    2016             :                                     true,   /* fast_forward */
    2017          10 :                                     XL_ROUTINE(.page_read = read_local_xlog_page,
    2018             :                                                .segment_open = wal_segment_open,
    2019             :                                                .segment_close = wal_segment_close),
    2020             :                                     NULL, NULL, NULL);
    2021             : 
    2022             :         /*
    2023             :          * Start reading at the slot's restart_lsn, which we know points to a
    2024             :          * valid record.
    2025             :          */
    2026          10 :         XLogBeginRead(ctx->reader, MyReplicationSlot->data.restart_lsn);
    2027             : 
    2028             :         /* Invalidate non-timetravel entries */
    2029          10 :         InvalidateSystemCaches();
    2030             : 
    2031             :         /* Loop until the end of WAL or some changes are processed */
    2032         300 :         while (!has_pending_wal && ctx->reader->EndRecPtr < end_of_wal)
    2033             :         {
    2034             :             XLogRecord *record;
    2035         290 :             char       *errm = NULL;
    2036             : 
    2037         290 :             record = XLogReadRecord(ctx->reader, &errm);
    2038             : 
    2039         290 :             if (errm)
    2040           0 :                 elog(ERROR, "could not find record for logical decoding: %s", errm);
    2041             : 
    2042         290 :             if (record != NULL)
    2043         290 :                 LogicalDecodingProcessRecord(ctx, ctx->reader);
    2044             : 
    2045         290 :             has_pending_wal = ctx->processing_required;
    2046             : 
    2047         290 :             CHECK_FOR_INTERRUPTS();
    2048             :         }
    2049             : 
    2050             :         /* Clean up */
    2051          10 :         FreeDecodingContext(ctx);
    2052          10 :         InvalidateSystemCaches();
    2053             :     }
    2054           0 :     PG_CATCH();
    2055             :     {
    2056             :         /* clear all timetravel entries */
    2057           0 :         InvalidateSystemCaches();
    2058             : 
    2059           0 :         PG_RE_THROW();
    2060             :     }
    2061          10 :     PG_END_TRY();
    2062             : 
    2063          10 :     return has_pending_wal;
    2064             : }
    2065             : 
    2066             : /*
    2067             :  * Helper function for advancing our logical replication slot forward.
    2068             :  *
    2069             :  * The slot's restart_lsn is used as start point for reading records, while
    2070             :  * confirmed_flush is used as base point for the decoding context.
    2071             :  *
    2072             :  * We cannot just do LogicalConfirmReceivedLocation to update confirmed_flush,
    2073             :  * because we need to digest WAL to advance restart_lsn allowing to recycle
    2074             :  * WAL and removal of old catalog tuples.  As decoding is done in fast_forward
    2075             :  * mode, no changes are generated anyway.
    2076             :  *
    2077             :  * *found_consistent_snapshot will be true if the initial decoding snapshot has
    2078             :  * been built; Otherwise, it will be false.
    2079             :  */
    2080             : XLogRecPtr
    2081          32 : LogicalSlotAdvanceAndCheckSnapState(XLogRecPtr moveto,
    2082             :                                     bool *found_consistent_snapshot)
    2083             : {
    2084             :     LogicalDecodingContext *ctx;
    2085          32 :     ResourceOwner old_resowner = CurrentResourceOwner;
    2086             :     XLogRecPtr  retlsn;
    2087             : 
    2088             :     Assert(moveto != InvalidXLogRecPtr);
    2089             : 
    2090          32 :     if (found_consistent_snapshot)
    2091          10 :         *found_consistent_snapshot = false;
    2092             : 
    2093          32 :     PG_TRY();
    2094             :     {
    2095             :         /*
    2096             :          * Create our decoding context in fast_forward mode, passing start_lsn
    2097             :          * as InvalidXLogRecPtr, so that we start processing from my slot's
    2098             :          * confirmed_flush.
    2099             :          */
    2100          64 :         ctx = CreateDecodingContext(InvalidXLogRecPtr,
    2101             :                                     NIL,
    2102             :                                     true,   /* fast_forward */
    2103          32 :                                     XL_ROUTINE(.page_read = read_local_xlog_page,
    2104             :                                                .segment_open = wal_segment_open,
    2105             :                                                .segment_close = wal_segment_close),
    2106             :                                     NULL, NULL, NULL);
    2107             : 
    2108             :         /*
    2109             :          * Wait for specified streaming replication standby servers (if any)
    2110             :          * to confirm receipt of WAL up to moveto lsn.
    2111             :          */
    2112          32 :         WaitForStandbyConfirmation(moveto);
    2113             : 
    2114             :         /*
    2115             :          * Start reading at the slot's restart_lsn, which we know to point to
    2116             :          * a valid record.
    2117             :          */
    2118          32 :         XLogBeginRead(ctx->reader, MyReplicationSlot->data.restart_lsn);
    2119             : 
    2120             :         /* invalidate non-timetravel entries */
    2121          32 :         InvalidateSystemCaches();
    2122             : 
    2123             :         /* Decode records until we reach the requested target */
    2124        4184 :         while (ctx->reader->EndRecPtr < moveto)
    2125             :         {
    2126        4152 :             char       *errm = NULL;
    2127             :             XLogRecord *record;
    2128             : 
    2129             :             /*
    2130             :              * Read records.  No changes are generated in fast_forward mode,
    2131             :              * but snapbuilder/slot statuses are updated properly.
    2132             :              */
    2133        4152 :             record = XLogReadRecord(ctx->reader, &errm);
    2134        4152 :             if (errm)
    2135           0 :                 elog(ERROR, "could not find record while advancing replication slot: %s",
    2136             :                      errm);
    2137             : 
    2138             :             /*
    2139             :              * Process the record.  Storage-level changes are ignored in
    2140             :              * fast_forward mode, but other modules (such as snapbuilder)
    2141             :              * might still have critical updates to do.
    2142             :              */
    2143        4152 :             if (record)
    2144        4152 :                 LogicalDecodingProcessRecord(ctx, ctx->reader);
    2145             : 
    2146        4152 :             CHECK_FOR_INTERRUPTS();
    2147             :         }
    2148             : 
    2149          32 :         if (found_consistent_snapshot && DecodingContextReady(ctx))
    2150          10 :             *found_consistent_snapshot = true;
    2151             : 
    2152             :         /*
    2153             :          * Logical decoding could have clobbered CurrentResourceOwner during
    2154             :          * transaction management, so restore the executor's value.  (This is
    2155             :          * a kluge, but it's not worth cleaning up right now.)
    2156             :          */
    2157          32 :         CurrentResourceOwner = old_resowner;
    2158             : 
    2159          32 :         if (ctx->reader->EndRecPtr != InvalidXLogRecPtr)
    2160             :         {
    2161          32 :             LogicalConfirmReceivedLocation(moveto);
    2162             : 
    2163             :             /*
    2164             :              * If only the confirmed_flush LSN has changed the slot won't get
    2165             :              * marked as dirty by the above. Callers on the walsender
    2166             :              * interface are expected to keep track of their own progress and
    2167             :              * don't need it written out. But SQL-interface users cannot
    2168             :              * specify their own start positions and it's harder for them to
    2169             :              * keep track of their progress, so we should make more of an
    2170             :              * effort to save it for them.
    2171             :              *
    2172             :              * Dirty the slot so it is written out at the next checkpoint. The
    2173             :              * LSN position advanced to may still be lost on a crash but this
    2174             :              * makes the data consistent after a clean shutdown.
    2175             :              */
    2176          32 :             ReplicationSlotMarkDirty();
    2177             :         }
    2178             : 
    2179          32 :         retlsn = MyReplicationSlot->data.confirmed_flush;
    2180             : 
    2181             :         /* free context, call shutdown callback */
    2182          32 :         FreeDecodingContext(ctx);
    2183             : 
    2184          32 :         InvalidateSystemCaches();
    2185             :     }
    2186           0 :     PG_CATCH();
    2187             :     {
    2188             :         /* clear all timetravel entries */
    2189           0 :         InvalidateSystemCaches();
    2190             : 
    2191           0 :         PG_RE_THROW();
    2192             :     }
    2193          32 :     PG_END_TRY();
    2194             : 
    2195          32 :     return retlsn;
    2196             : }

Generated by: LCOV version 1.16