LCOV - code coverage report
Current view: top level - src/backend/replication/logical - logical.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 93.0 % 756 703
Test Date: 2026-04-17 02:16:33 Functions: 97.6 % 41 40
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  * logical.c
       3              :  *     PostgreSQL logical decoding coordination
       4              :  *
       5              :  * Copyright (c) 2012-2026, PostgreSQL Global Development Group
       6              :  *
       7              :  * IDENTIFICATION
       8              :  *    src/backend/replication/logical/logical.c
       9              :  *
      10              :  * NOTES
      11              :  *    This file coordinates interaction between the various modules that
      12              :  *    together provide logical decoding, primarily by providing so
      13              :  *    called LogicalDecodingContexts. The goal is to encapsulate most of the
      14              :  *    internal complexity for consumers of logical decoding, so they can
      15              :  *    create and consume a changestream with a low amount of code. Builtin
      16              :  *    consumers are the walsender and SQL SRF interface, but it's possible to
      17              :  *    add further ones without changing core code, e.g. to consume changes in
      18              :  *    a bgworker.
      19              :  *
      20              :  *    The idea is that a consumer provides three callbacks, one to read WAL,
      21              :  *    one to prepare a data write, and a final one for actually writing since
      22              :  *    their implementation depends on the type of consumer.  Check
      23              :  *    logicalfuncs.c for an example implementation of a fairly simple consumer
      24              :  *    and an implementation of a WAL reading callback that's suitable for
      25              :  *    simple consumers.
      26              :  *-------------------------------------------------------------------------
      27              :  */
      28              : 
      29              : #include "postgres.h"
      30              : 
      31              : #include "access/xact.h"
      32              : #include "access/xlog_internal.h"
      33              : #include "access/xlogutils.h"
      34              : #include "fmgr.h"
      35              : #include "miscadmin.h"
      36              : #include "pgstat.h"
      37              : #include "replication/decode.h"
      38              : #include "replication/logical.h"
      39              : #include "replication/reorderbuffer.h"
      40              : #include "replication/slotsync.h"
      41              : #include "replication/snapbuild.h"
      42              : #include "storage/proc.h"
      43              : #include "storage/procarray.h"
      44              : #include "utils/builtins.h"
      45              : #include "utils/injection_point.h"
      46              : #include "utils/inval.h"
      47              : #include "utils/memutils.h"
      48              : 
      49              : /* data for errcontext callback */
      50              : typedef struct LogicalErrorCallbackState
      51              : {
      52              :     LogicalDecodingContext *ctx;
      53              :     const char *callback_name;
      54              :     XLogRecPtr  report_location;
      55              : } LogicalErrorCallbackState;
      56              : 
      57              : /* wrappers around output plugin callbacks */
      58              : static void output_plugin_error_callback(void *arg);
      59              : static void startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
      60              :                                bool is_init);
      61              : static void shutdown_cb_wrapper(LogicalDecodingContext *ctx);
      62              : static void begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn);
      63              : static void commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      64              :                               XLogRecPtr commit_lsn);
      65              : static void begin_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn);
      66              : static void prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      67              :                                XLogRecPtr prepare_lsn);
      68              : static void commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      69              :                                        XLogRecPtr commit_lsn);
      70              : static void rollback_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      71              :                                          XLogRecPtr prepare_end_lsn, TimestampTz prepare_time);
      72              : static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      73              :                               Relation relation, ReorderBufferChange *change);
      74              : static void truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      75              :                                 int nrelations, Relation relations[], ReorderBufferChange *change);
      76              : static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      77              :                                XLogRecPtr message_lsn, bool transactional,
      78              :                                const char *prefix, Size message_size, const char *message);
      79              : 
      80              : /* streaming callbacks */
      81              : static void stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      82              :                                     XLogRecPtr first_lsn);
      83              : static void stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      84              :                                    XLogRecPtr last_lsn);
      85              : static void stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      86              :                                     XLogRecPtr abort_lsn);
      87              : static void stream_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      88              :                                       XLogRecPtr prepare_lsn);
      89              : static void stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      90              :                                      XLogRecPtr commit_lsn);
      91              : static void stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      92              :                                      Relation relation, ReorderBufferChange *change);
      93              : static void stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      94              :                                       XLogRecPtr message_lsn, bool transactional,
      95              :                                       const char *prefix, Size message_size, const char *message);
      96              : static void stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      97              :                                        int nrelations, Relation relations[], ReorderBufferChange *change);
      98              : 
      99              : /* callback to update txn's progress */
     100              : static void update_progress_txn_cb_wrapper(ReorderBuffer *cache,
     101              :                                            ReorderBufferTXN *txn,
     102              :                                            XLogRecPtr lsn);
     103              : 
     104              : static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, const char *plugin);
     105              : 
     106              : /*
     107              :  * Make sure the current settings & environment are capable of doing logical
     108              :  * decoding.
     109              :  */
     110              : void
     111         1712 : CheckLogicalDecodingRequirements(bool repack)
     112              : {
     113         1712 :     CheckSlotRequirements(repack);
     114              : 
     115              :     /*
     116              :      * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
     117              :      * needs the same check.
     118              :      */
     119              : 
     120         1712 :     if (MyDatabaseId == InvalidOid)
     121            1 :         ereport(ERROR,
     122              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     123              :                  errmsg("logical decoding requires a database connection")));
     124              : 
     125              :     /* CheckSlotRequirements() has already checked if wal_level >= 'replica' */
     126              :     Assert(wal_level >= WAL_LEVEL_REPLICA);
     127              : 
     128              :     /* Check if logical decoding is available on standby */
     129         1711 :     if (RecoveryInProgress() && !IsLogicalDecodingEnabled())
     130            2 :         ereport(ERROR,
     131              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     132              :                  errmsg("logical decoding on standby requires \"effective_wal_level\" >= \"logical\" on the primary"),
     133              :                  errhint("Set \"wal_level\" >= \"logical\" or create at least one logical slot when \"wal_level\" = \"replica\".")));
     134         1709 : }
     135              : 
     136              : /*
     137              :  * Helper function for CreateInitDecodingContext() and
     138              :  * CreateDecodingContext() performing common tasks.
     139              :  */
     140              : static LogicalDecodingContext *
     141         1211 : StartupDecodingContext(List *output_plugin_options,
     142              :                        XLogRecPtr start_lsn,
     143              :                        TransactionId xmin_horizon,
     144              :                        bool need_full_snapshot,
     145              :                        bool fast_forward,
     146              :                        bool in_create,
     147              :                        XLogReaderRoutine *xl_routine,
     148              :                        LogicalOutputPluginWriterPrepareWrite prepare_write,
     149              :                        LogicalOutputPluginWriterWrite do_write,
     150              :                        LogicalOutputPluginWriterUpdateProgress update_progress)
     151              : {
     152              :     ReplicationSlot *slot;
     153              :     MemoryContext context,
     154              :                 old_context;
     155              :     LogicalDecodingContext *ctx;
     156              : 
     157              :     /* shorter lines... */
     158         1211 :     slot = MyReplicationSlot;
     159              : 
     160         1211 :     context = AllocSetContextCreate(CurrentMemoryContext,
     161              :                                     "Logical decoding context",
     162              :                                     ALLOCSET_DEFAULT_SIZES);
     163         1211 :     old_context = MemoryContextSwitchTo(context);
     164         1211 :     ctx = palloc0_object(LogicalDecodingContext);
     165              : 
     166         1211 :     ctx->context = context;
     167              : 
     168              :     /*
     169              :      * (re-)load output plugins, so we detect a bad (removed) output plugin
     170              :      * now.
     171              :      */
     172         1211 :     if (!fast_forward)
     173         1186 :         LoadOutputPlugin(&ctx->callbacks, NameStr(slot->data.plugin));
     174              : 
     175              :     /*
     176              :      * Now that the slot's xmin has been set, we can announce ourselves as a
     177              :      * logical decoding backend which doesn't need to be checked individually
     178              :      * when computing the xmin horizon because the xmin is enforced via
     179              :      * replication slots.
     180              :      *
     181              :      * We can only do so if we're outside of a transaction (i.e. the case when
     182              :      * streaming changes via walsender), otherwise an already setup
     183              :      * snapshot/xid would end up being ignored. That's not a particularly
     184              :      * bothersome restriction since the SQL interface can't be used for
     185              :      * streaming anyway.
     186              :      */
     187         1210 :     if (!IsTransactionOrTransactionBlock())
     188              :     {
     189          619 :         LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
     190          619 :         MyProc->statusFlags |= PROC_IN_LOGICAL_DECODING;
     191          619 :         ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags;
     192          619 :         LWLockRelease(ProcArrayLock);
     193              :     }
     194              : 
     195         1210 :     ctx->slot = slot;
     196              : 
     197         1210 :     ctx->reader = XLogReaderAllocate(wal_segment_size, NULL, xl_routine, ctx);
     198         1210 :     if (!ctx->reader)
     199            0 :         ereport(ERROR,
     200              :                 (errcode(ERRCODE_OUT_OF_MEMORY),
     201              :                  errmsg("out of memory"),
     202              :                  errdetail("Failed while allocating a WAL reading processor.")));
     203              : 
     204         1210 :     ctx->reorder = ReorderBufferAllocate();
     205         1210 :     ctx->snapshot_builder =
     206         1210 :         AllocateSnapshotBuilder(ctx->reorder, xmin_horizon, start_lsn,
     207              :                                 need_full_snapshot, in_create, slot->data.two_phase_at);
     208              : 
     209         1210 :     ctx->reorder->private_data = ctx;
     210              : 
     211              :     /* wrap output plugin callbacks, so we can add error context information */
     212         1210 :     ctx->reorder->begin = begin_cb_wrapper;
     213         1210 :     ctx->reorder->apply_change = change_cb_wrapper;
     214         1210 :     ctx->reorder->apply_truncate = truncate_cb_wrapper;
     215         1210 :     ctx->reorder->commit = commit_cb_wrapper;
     216         1210 :     ctx->reorder->message = message_cb_wrapper;
     217              : 
     218              :     /*
     219              :      * To support streaming, we require start/stop/abort/commit/change
     220              :      * callbacks. The message and truncate callbacks are optional, similar to
     221              :      * regular output plugins. We however enable streaming when at least one
     222              :      * of the methods is enabled so that we can easily identify missing
     223              :      * methods.
     224              :      *
     225              :      * We decide it here, but only check it later in the wrappers.
     226              :      */
     227         2447 :     ctx->streaming = (ctx->callbacks.stream_start_cb != NULL) ||
     228           27 :         (ctx->callbacks.stream_stop_cb != NULL) ||
     229           27 :         (ctx->callbacks.stream_abort_cb != NULL) ||
     230           27 :         (ctx->callbacks.stream_commit_cb != NULL) ||
     231           27 :         (ctx->callbacks.stream_change_cb != NULL) ||
     232         1264 :         (ctx->callbacks.stream_message_cb != NULL) ||
     233           27 :         (ctx->callbacks.stream_truncate_cb != NULL);
     234              : 
     235              :     /*
     236              :      * streaming callbacks
     237              :      *
     238              :      * stream_message and stream_truncate callbacks are optional, so we do not
     239              :      * fail with ERROR when missing, but the wrappers simply do nothing. We
     240              :      * must set the ReorderBuffer callbacks to something, otherwise the calls
     241              :      * from there will crash (we don't want to move the checks there).
     242              :      */
     243         1210 :     ctx->reorder->stream_start = stream_start_cb_wrapper;
     244         1210 :     ctx->reorder->stream_stop = stream_stop_cb_wrapper;
     245         1210 :     ctx->reorder->stream_abort = stream_abort_cb_wrapper;
     246         1210 :     ctx->reorder->stream_prepare = stream_prepare_cb_wrapper;
     247         1210 :     ctx->reorder->stream_commit = stream_commit_cb_wrapper;
     248         1210 :     ctx->reorder->stream_change = stream_change_cb_wrapper;
     249         1210 :     ctx->reorder->stream_message = stream_message_cb_wrapper;
     250         1210 :     ctx->reorder->stream_truncate = stream_truncate_cb_wrapper;
     251              : 
     252              : 
     253              :     /*
     254              :      * To support two-phase logical decoding, we require
     255              :      * begin_prepare/prepare/commit-prepare/abort-prepare callbacks. The
     256              :      * filter_prepare callback is optional. We however enable two-phase
     257              :      * logical decoding when at least one of the methods is enabled so that we
     258              :      * can easily identify missing methods.
     259              :      *
     260              :      * We decide it here, but only check it later in the wrappers.
     261              :      */
     262         2447 :     ctx->twophase = (ctx->callbacks.begin_prepare_cb != NULL) ||
     263           27 :         (ctx->callbacks.prepare_cb != NULL) ||
     264           27 :         (ctx->callbacks.commit_prepared_cb != NULL) ||
     265           27 :         (ctx->callbacks.rollback_prepared_cb != NULL) ||
     266         1264 :         (ctx->callbacks.stream_prepare_cb != NULL) ||
     267           27 :         (ctx->callbacks.filter_prepare_cb != NULL);
     268              : 
     269              :     /*
     270              :      * Callback to support decoding at prepare time.
     271              :      */
     272         1210 :     ctx->reorder->begin_prepare = begin_prepare_cb_wrapper;
     273         1210 :     ctx->reorder->prepare = prepare_cb_wrapper;
     274         1210 :     ctx->reorder->commit_prepared = commit_prepared_cb_wrapper;
     275         1210 :     ctx->reorder->rollback_prepared = rollback_prepared_cb_wrapper;
     276              : 
     277              :     /*
     278              :      * Callback to support updating progress during sending data of a
     279              :      * transaction (and its subtransactions) to the output plugin.
     280              :      */
     281         1210 :     ctx->reorder->update_progress_txn = update_progress_txn_cb_wrapper;
     282              : 
     283         1210 :     ctx->out = makeStringInfo();
     284         1210 :     ctx->prepare_write = prepare_write;
     285         1210 :     ctx->write = do_write;
     286         1210 :     ctx->update_progress = update_progress;
     287              : 
     288              :     /* Assume shared catalog access. The startup callback can change it. */
     289         1210 :     ctx->options.need_shared_catalogs = true;
     290              : 
     291         1210 :     ctx->output_plugin_options = output_plugin_options;
     292              : 
     293         1210 :     ctx->fast_forward = fast_forward;
     294              : 
     295         1210 :     MemoryContextSwitchTo(old_context);
     296              : 
     297         1210 :     return ctx;
     298              : }
     299              : 
     300              : /*
     301              :  * Create a new decoding context, for a new logical slot.
     302              :  *
     303              :  * plugin -- contains the name of the output plugin
     304              :  * output_plugin_options -- contains options passed to the output plugin
     305              :  * need_full_snapshot -- if true, must obtain a snapshot able to read all
     306              :  *      tables; if false, one that can read only catalogs is acceptable.
     307              :  * for_repack -- if true, we're going to be decoding for REPACK.
     308              :  * restart_lsn -- if given as invalid, it's this routine's responsibility to
     309              :  *      mark WAL as reserved by setting a convenient restart_lsn for the slot.
     310              :  *      Otherwise, we set for decoding to start from the given LSN without
     311              :  *      marking WAL reserved beforehand.  In that scenario, it's up to the
     312              :  *      caller to guarantee that WAL remains available.
     313              :  * xl_routine -- XLogReaderRoutine for underlying XLogReader
     314              :  * prepare_write, do_write, update_progress --
     315              :  *      callbacks that perform the use-case dependent, actual, work.
     316              :  *
     317              :  * Needs to be called while in a memory context that's at least as long lived
     318              :  * as the decoding context because further memory contexts will be created
     319              :  * inside it.
     320              :  *
     321              :  * Returns an initialized decoding context after calling the output plugin's
     322              :  * startup function.
     323              :  */
     324              : LogicalDecodingContext *
     325          502 : CreateInitDecodingContext(const char *plugin,
     326              :                           List *output_plugin_options,
     327              :                           bool need_full_snapshot,
     328              :                           bool for_repack,
     329              :                           XLogRecPtr restart_lsn,
     330              :                           XLogReaderRoutine *xl_routine,
     331              :                           LogicalOutputPluginWriterPrepareWrite prepare_write,
     332              :                           LogicalOutputPluginWriterWrite do_write,
     333              :                           LogicalOutputPluginWriterUpdateProgress update_progress)
     334              : {
     335          502 :     TransactionId xmin_horizon = InvalidTransactionId;
     336              :     ReplicationSlot *slot;
     337              :     NameData    plugin_name;
     338              :     LogicalDecodingContext *ctx;
     339              :     MemoryContext old_context;
     340              : 
     341              :     /*
     342              :      * On a standby, this check is also required while creating the slot.
     343              :      * Check the comments in the function.
     344              :      */
     345          502 :     CheckLogicalDecodingRequirements(for_repack);
     346              : 
     347              :     /* shorter lines... */
     348          502 :     slot = MyReplicationSlot;
     349              : 
     350              :     /* first some sanity checks that are unlikely to be violated */
     351          502 :     if (slot == NULL)
     352            0 :         elog(ERROR, "cannot perform logical decoding without an acquired slot");
     353              : 
     354          502 :     if (plugin == NULL)
     355            0 :         elog(ERROR, "cannot initialize logical decoding without a specified plugin");
     356              : 
     357              :     /* Make sure the passed slot is suitable. These are user facing errors. */
     358          502 :     if (SlotIsPhysical(slot))
     359            0 :         ereport(ERROR,
     360              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     361              :                  errmsg("cannot use physical replication slot for logical decoding")));
     362              : 
     363          502 :     if (slot->data.database != MyDatabaseId)
     364            0 :         ereport(ERROR,
     365              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     366              :                  errmsg("replication slot \"%s\" was not created in this database",
     367              :                         NameStr(slot->data.name))));
     368              : 
     369          854 :     if (IsTransactionState() &&
     370          352 :         GetTopTransactionIdIfAny() != InvalidTransactionId)
     371            2 :         ereport(ERROR,
     372              :                 (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
     373              :                  errmsg("cannot create logical replication slot in transaction that has performed writes")));
     374              : 
     375              :     /*
     376              :      * Register output plugin name with slot.  We need the mutex to avoid
     377              :      * concurrent reading of a partially copied string.  But we don't want any
     378              :      * complicated code while holding a spinlock, so do namestrcpy() outside.
     379              :      */
     380          500 :     namestrcpy(&plugin_name, plugin);
     381          500 :     SpinLockAcquire(&slot->mutex);
     382          500 :     slot->data.plugin = plugin_name;
     383          500 :     SpinLockRelease(&slot->mutex);
     384              : 
     385          500 :     if (!XLogRecPtrIsValid(restart_lsn))
     386          493 :         ReplicationSlotReserveWal();
     387              :     else
     388              :     {
     389            7 :         SpinLockAcquire(&slot->mutex);
     390            7 :         slot->data.restart_lsn = restart_lsn;
     391            7 :         SpinLockRelease(&slot->mutex);
     392              :     }
     393              : 
     394              :     /* ----
     395              :      * This is a bit tricky: We need to determine a safe xmin horizon to start
     396              :      * decoding from, to avoid starting from a running xacts record referring
     397              :      * to xids whose rows have been vacuumed or pruned
     398              :      * already. GetOldestSafeDecodingTransactionId() returns such a value, but
     399              :      * without further interlock its return value might immediately be out of
     400              :      * date.
     401              :      *
     402              :      * So we have to acquire both the ReplicationSlotControlLock and the
     403              :      * ProcArrayLock to prevent concurrent computation and update of new xmin
     404              :      * horizons by other backends, get the safe decoding xid, and inform the
     405              :      * slot machinery about the new limit. Once that's done both locks can be
     406              :      * released as the slot machinery now is protecting against vacuum.
     407              :      *
     408              :      * Note that, temporarily, the data, not just the catalog, xmin has to be
     409              :      * reserved if a data snapshot is to be exported.  Otherwise the initial
     410              :      * data snapshot created here is not guaranteed to be valid. After that
     411              :      * the data xmin doesn't need to be managed anymore and the global xmin
     412              :      * should be recomputed. As we are fine with losing the pegged data xmin
     413              :      * after crash - no chance a snapshot would get exported anymore - we can
     414              :      * get away with just setting the slot's
     415              :      * effective_xmin. ReplicationSlotRelease will reset it again.
     416              :      *
     417              :      * ----
     418              :      */
     419          500 :     LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
     420          500 :     LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
     421              : 
     422          500 :     xmin_horizon = GetOldestSafeDecodingTransactionId(!need_full_snapshot);
     423              : 
     424          500 :     SpinLockAcquire(&slot->mutex);
     425          500 :     slot->effective_catalog_xmin = xmin_horizon;
     426          500 :     slot->data.catalog_xmin = xmin_horizon;
     427          500 :     if (need_full_snapshot)
     428          217 :         slot->effective_xmin = xmin_horizon;
     429          500 :     SpinLockRelease(&slot->mutex);
     430              : 
     431          500 :     ReplicationSlotsComputeRequiredXmin(true);
     432              : 
     433          500 :     LWLockRelease(ProcArrayLock);
     434          500 :     LWLockRelease(ReplicationSlotControlLock);
     435              : 
     436          500 :     ReplicationSlotMarkDirty();
     437          500 :     ReplicationSlotSave();
     438              : 
     439          500 :     ctx = StartupDecodingContext(NIL, restart_lsn, xmin_horizon,
     440              :                                  need_full_snapshot, false, true,
     441              :                                  xl_routine, prepare_write, do_write,
     442              :                                  update_progress);
     443              : 
     444              :     /* call output plugin initialization callback */
     445          499 :     old_context = MemoryContextSwitchTo(ctx->context);
     446          499 :     if (ctx->callbacks.startup_cb != NULL)
     447          499 :         startup_cb_wrapper(ctx, &ctx->options, true);
     448          499 :     MemoryContextSwitchTo(old_context);
     449              : 
     450              :     /*
     451              :      * We allow decoding of prepared transactions when the two_phase is
     452              :      * enabled at the time of slot creation, or when the two_phase option is
     453              :      * given at the streaming start, provided the plugin supports all the
     454              :      * callbacks for two-phase.
     455              :      */
     456          499 :     ctx->twophase &= slot->data.two_phase;
     457              : 
     458          499 :     ctx->reorder->output_rewrites = ctx->options.receive_rewrites;
     459              : 
     460          499 :     return ctx;
     461              : }
     462              : 
     463              : /*
     464              :  * Create a new decoding context, for a logical slot that has previously been
     465              :  * used already.
     466              :  *
     467              :  * start_lsn
     468              :  *      The LSN at which to start decoding.  If InvalidXLogRecPtr, restart
     469              :  *      from the slot's confirmed_flush; otherwise, start from the specified
     470              :  *      location (but move it forwards to confirmed_flush if it's older than
     471              :  *      that, see below).
     472              :  *
     473              :  * output_plugin_options
     474              :  *      options passed to the output plugin.
     475              :  *
     476              :  * fast_forward
     477              :  *      bypass the generation of logical changes.
     478              :  *
     479              :  * xl_routine
     480              :  *      XLogReaderRoutine used by underlying xlogreader
     481              :  *
     482              :  * prepare_write, do_write, update_progress
     483              :  *      callbacks that have to be filled to perform the use-case dependent,
     484              :  *      actual work.
     485              :  *
     486              :  * Needs to be called while in a memory context that's at least as long lived
     487              :  * as the decoding context because further memory contexts will be created
     488              :  * inside it.
     489              :  *
     490              :  * Returns an initialized decoding context after calling the output plugin's
     491              :  * startup function.
     492              :  */
     493              : LogicalDecodingContext *
     494          716 : CreateDecodingContext(XLogRecPtr start_lsn,
     495              :                       List *output_plugin_options,
     496              :                       bool fast_forward,
     497              :                       XLogReaderRoutine *xl_routine,
     498              :                       LogicalOutputPluginWriterPrepareWrite prepare_write,
     499              :                       LogicalOutputPluginWriterWrite do_write,
     500              :                       LogicalOutputPluginWriterUpdateProgress update_progress)
     501              : {
     502              :     LogicalDecodingContext *ctx;
     503              :     ReplicationSlot *slot;
     504              :     MemoryContext old_context;
     505              : 
     506              :     /* shorter lines... */
     507          716 :     slot = MyReplicationSlot;
     508              : 
     509              :     /* first some sanity checks that are unlikely to be violated */
     510          716 :     if (slot == NULL)
     511            0 :         elog(ERROR, "cannot perform logical decoding without an acquired slot");
     512              : 
     513              :     /* make sure the passed slot is suitable, these are user facing errors */
     514          716 :     if (SlotIsPhysical(slot))
     515            1 :         ereport(ERROR,
     516              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     517              :                  errmsg("cannot use physical replication slot for logical decoding")));
     518              : 
     519              :     /*
     520              :      * We need to access the system tables during decoding to build the
     521              :      * logical changes unless we are in fast_forward mode where no changes are
     522              :      * generated.
     523              :      */
     524          715 :     if (slot->data.database != MyDatabaseId && !fast_forward)
     525            3 :         ereport(ERROR,
     526              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     527              :                  errmsg("replication slot \"%s\" was not created in this database",
     528              :                         NameStr(slot->data.name))));
     529              : 
     530              :     /*
     531              :      * The slots being synced from the primary can't be used for decoding as
     532              :      * they are used after failover. However, we do allow advancing the LSNs
     533              :      * during the synchronization of slots. See update_local_synced_slot.
     534              :      */
     535          712 :     if (RecoveryInProgress() && slot->data.synced && !IsSyncingReplicationSlots())
     536            1 :         ereport(ERROR,
     537              :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     538              :                 errmsg("cannot use replication slot \"%s\" for logical decoding",
     539              :                        NameStr(slot->data.name)),
     540              :                 errdetail("This replication slot is being synchronized from the primary server."),
     541              :                 errhint("Specify another replication slot."));
     542              : 
     543              :     /* slot must be valid to allow decoding */
     544              :     Assert(slot->data.invalidated == RS_INVAL_NONE);
     545              :     Assert(XLogRecPtrIsValid(slot->data.restart_lsn));
     546              : 
     547          711 :     if (!XLogRecPtrIsValid(start_lsn))
     548              :     {
     549              :         /* continue from last position */
     550          393 :         start_lsn = slot->data.confirmed_flush;
     551              :     }
     552          318 :     else if (start_lsn < slot->data.confirmed_flush)
     553              :     {
     554              :         /*
     555              :          * It might seem like we should error out in this case, but it's
     556              :          * pretty common for a client to acknowledge a LSN it doesn't have to
     557              :          * do anything for, and thus didn't store persistently, because the
     558              :          * xlog records didn't result in anything relevant for logical
     559              :          * decoding. Clients have to be able to do that to support synchronous
     560              :          * replication.
     561              :          *
     562              :          * Starting at a different LSN than requested might not catch certain
     563              :          * kinds of client errors; so the client may wish to check that
     564              :          * confirmed_flush_lsn matches its expectations.
     565              :          */
     566           33 :         elog(LOG, "%X/%08X has been already streamed, forwarding to %X/%08X",
     567              :              LSN_FORMAT_ARGS(start_lsn),
     568              :              LSN_FORMAT_ARGS(slot->data.confirmed_flush));
     569              : 
     570           33 :         start_lsn = slot->data.confirmed_flush;
     571              :     }
     572              : 
     573          711 :     ctx = StartupDecodingContext(output_plugin_options,
     574              :                                  start_lsn, InvalidTransactionId, false,
     575              :                                  fast_forward, false, xl_routine, prepare_write,
     576              :                                  do_write, update_progress);
     577              : 
     578              :     /* call output plugin initialization callback */
     579          711 :     old_context = MemoryContextSwitchTo(ctx->context);
     580          711 :     if (ctx->callbacks.startup_cb != NULL)
     581          686 :         startup_cb_wrapper(ctx, &ctx->options, false);
     582          708 :     MemoryContextSwitchTo(old_context);
     583              : 
     584              :     /*
     585              :      * We allow decoding of prepared transactions when the two_phase is
     586              :      * enabled at the time of slot creation, or when the two_phase option is
     587              :      * given at the streaming start, provided the plugin supports all the
     588              :      * callbacks for two-phase.
     589              :      */
     590          708 :     ctx->twophase &= (slot->data.two_phase || ctx->twophase_opt_given);
     591              : 
     592              :     /* Mark slot to allow two_phase decoding if not already marked */
     593          708 :     if (ctx->twophase && !slot->data.two_phase)
     594              :     {
     595            8 :         SpinLockAcquire(&slot->mutex);
     596            8 :         slot->data.two_phase = true;
     597            8 :         slot->data.two_phase_at = start_lsn;
     598            8 :         SpinLockRelease(&slot->mutex);
     599            8 :         ReplicationSlotMarkDirty();
     600            8 :         ReplicationSlotSave();
     601            8 :         SnapBuildSetTwoPhaseAt(ctx->snapshot_builder, start_lsn);
     602              :     }
     603              : 
     604          708 :     ctx->reorder->output_rewrites = ctx->options.receive_rewrites;
     605              : 
     606          708 :     ereport(LogicalDecodingLogLevel(),
     607              :             (errmsg("starting logical decoding for slot \"%s\"",
     608              :                     NameStr(slot->data.name)),
     609              :              errdetail("Streaming transactions committing after %X/%08X, reading WAL from %X/%08X.",
     610              :                        LSN_FORMAT_ARGS(slot->data.confirmed_flush),
     611              :                        LSN_FORMAT_ARGS(slot->data.restart_lsn))));
     612              : 
     613          708 :     return ctx;
     614              : }
     615              : 
     616              : /*
     617              :  * Returns true if a consistent initial decoding snapshot has been built.
     618              :  */
     619              : bool
     620         1991 : DecodingContextReady(LogicalDecodingContext *ctx)
     621              : {
     622         1991 :     return SnapBuildCurrentState(ctx->snapshot_builder) == SNAPBUILD_CONSISTENT;
     623              : }
     624              : 
     625              : /*
     626              :  * Read from the decoding slot, until it is ready to start extracting changes.
     627              :  */
     628              : void
     629          492 : DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
     630              : {
     631          492 :     ReplicationSlot *slot = ctx->slot;
     632              : 
     633              :     /* Initialize from where to start reading WAL. */
     634          492 :     XLogBeginRead(ctx->reader, slot->data.restart_lsn);
     635              : 
     636          492 :     elog(DEBUG1, "searching for logical decoding starting point, starting at %X/%08X",
     637              :          LSN_FORMAT_ARGS(slot->data.restart_lsn));
     638              : 
     639              :     /* Wait for a consistent starting point */
     640              :     for (;;)
     641         1490 :     {
     642              :         XLogRecord *record;
     643         1982 :         char       *err = NULL;
     644              : 
     645              :         /* the read_page callback waits for new WAL */
     646         1982 :         record = XLogReadRecord(ctx->reader, &err);
     647         1982 :         if (err)
     648            0 :             elog(ERROR, "could not find logical decoding starting point: %s", err);
     649         1982 :         if (!record)
     650            0 :             elog(ERROR, "could not find logical decoding starting point");
     651              : 
     652         1982 :         LogicalDecodingProcessRecord(ctx, ctx->reader);
     653              : 
     654              :         /* only continue till we found a consistent spot */
     655         1980 :         if (DecodingContextReady(ctx))
     656          490 :             break;
     657              : 
     658         1490 :         CHECK_FOR_INTERRUPTS();
     659              :     }
     660              : 
     661          490 :     SpinLockAcquire(&slot->mutex);
     662          490 :     slot->data.confirmed_flush = ctx->reader->EndRecPtr;
     663          490 :     if (slot->data.two_phase)
     664            9 :         slot->data.two_phase_at = ctx->reader->EndRecPtr;
     665          490 :     SpinLockRelease(&slot->mutex);
     666          490 : }
     667              : 
     668              : /*
     669              :  * Free a previously allocated decoding context, invoking the shutdown
     670              :  * callback if necessary.
     671              :  */
     672              : void
     673          943 : FreeDecodingContext(LogicalDecodingContext *ctx)
     674              : {
     675          943 :     if (ctx->callbacks.shutdown_cb != NULL)
     676          918 :         shutdown_cb_wrapper(ctx);
     677              : 
     678          943 :     ReorderBufferFree(ctx->reorder);
     679          943 :     FreeSnapshotBuilder(ctx->snapshot_builder);
     680          943 :     XLogReaderFree(ctx->reader);
     681          943 :     MemoryContextDelete(ctx->context);
     682          943 : }
     683              : 
     684              : /*
     685              :  * Prepare a write using the context's output routine.
     686              :  */
     687              : void
     688       362405 : OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
     689              : {
     690       362405 :     if (!ctx->accept_writes)
     691            0 :         elog(ERROR, "writes are only accepted in commit, begin and change callbacks");
     692              : 
     693       362405 :     ctx->prepare_write(ctx, ctx->write_location, ctx->write_xid, last_write);
     694       362405 :     ctx->prepared_write = true;
     695       362405 : }
     696              : 
     697              : /*
     698              :  * Perform a write using the context's output routine.
     699              :  */
     700              : void
     701       362405 : OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
     702              : {
     703       362405 :     if (!ctx->prepared_write)
     704            0 :         elog(ERROR, "OutputPluginPrepareWrite needs to be called before OutputPluginWrite");
     705              : 
     706       362405 :     ctx->write(ctx, ctx->write_location, ctx->write_xid, last_write);
     707       362391 :     ctx->prepared_write = false;
     708       362391 : }
     709              : 
     710              : /*
     711              :  * Update progress tracking (if supported).
     712              :  */
     713              : void
     714         4838 : OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx,
     715              :                            bool skipped_xact)
     716              : {
     717         4838 :     if (!ctx->update_progress)
     718         1594 :         return;
     719              : 
     720         3244 :     ctx->update_progress(ctx, ctx->write_location, ctx->write_xid,
     721              :                          skipped_xact);
     722              : }
     723              : 
     724              : /*
     725              :  * Load the output plugin, lookup its output plugin init function, and check
     726              :  * that it provides the required callbacks.
     727              :  */
     728              : static void
     729         1186 : LoadOutputPlugin(OutputPluginCallbacks *callbacks, const char *plugin)
     730              : {
     731              :     LogicalOutputPluginInit plugin_init;
     732              : 
     733         1185 :     plugin_init = (LogicalOutputPluginInit)
     734         1186 :         load_external_function(plugin, "_PG_output_plugin_init", false, NULL);
     735              : 
     736         1185 :     if (plugin_init == NULL)
     737            0 :         elog(ERROR, "output plugins have to declare the _PG_output_plugin_init symbol");
     738              : 
     739              :     /* ask the output plugin to fill the callback struct */
     740         1185 :     plugin_init(callbacks);
     741              : 
     742         1185 :     if (callbacks->begin_cb == NULL)
     743            0 :         elog(ERROR, "output plugins have to register a begin callback");
     744         1185 :     if (callbacks->change_cb == NULL)
     745            0 :         elog(ERROR, "output plugins have to register a change callback");
     746         1185 :     if (callbacks->commit_cb == NULL)
     747            0 :         elog(ERROR, "output plugins have to register a commit callback");
     748         1185 : }
     749              : 
     750              : static void
     751          481 : output_plugin_error_callback(void *arg)
     752              : {
     753          481 :     LogicalErrorCallbackState *state = (LogicalErrorCallbackState *) arg;
     754              : 
     755              :     /* not all callbacks have an associated LSN  */
     756          481 :     if (XLogRecPtrIsValid(state->report_location))
     757          478 :         errcontext("slot \"%s\", output plugin \"%s\", in the %s callback, associated LSN %X/%08X",
     758          478 :                    NameStr(state->ctx->slot->data.name),
     759          478 :                    NameStr(state->ctx->slot->data.plugin),
     760              :                    state->callback_name,
     761          478 :                    LSN_FORMAT_ARGS(state->report_location));
     762              :     else
     763            3 :         errcontext("slot \"%s\", output plugin \"%s\", in the %s callback",
     764            3 :                    NameStr(state->ctx->slot->data.name),
     765            3 :                    NameStr(state->ctx->slot->data.plugin),
     766              :                    state->callback_name);
     767          481 : }
     768              : 
     769              : static void
     770         1185 : startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
     771              : {
     772              :     LogicalErrorCallbackState state;
     773              :     ErrorContextCallback errcallback;
     774              : 
     775              :     Assert(!ctx->fast_forward);
     776              : 
     777              :     /* Push callback + info on the error context stack */
     778         1185 :     state.ctx = ctx;
     779         1185 :     state.callback_name = "startup";
     780         1185 :     state.report_location = InvalidXLogRecPtr;
     781         1185 :     errcallback.callback = output_plugin_error_callback;
     782         1185 :     errcallback.arg = &state;
     783         1185 :     errcallback.previous = error_context_stack;
     784         1185 :     error_context_stack = &errcallback;
     785              : 
     786              :     /* set output state */
     787         1185 :     ctx->accept_writes = false;
     788         1185 :     ctx->end_xact = false;
     789              : 
     790              :     /* do the actual work: call callback */
     791         1185 :     ctx->callbacks.startup_cb(ctx, opt, is_init);
     792              : 
     793              :     /* Pop the error context stack */
     794         1182 :     error_context_stack = errcallback.previous;
     795         1182 : }
     796              : 
     797              : static void
     798          918 : shutdown_cb_wrapper(LogicalDecodingContext *ctx)
     799              : {
     800              :     LogicalErrorCallbackState state;
     801              :     ErrorContextCallback errcallback;
     802              : 
     803              :     Assert(!ctx->fast_forward);
     804              : 
     805              :     /* Push callback + info on the error context stack */
     806          918 :     state.ctx = ctx;
     807          918 :     state.callback_name = "shutdown";
     808          918 :     state.report_location = InvalidXLogRecPtr;
     809          918 :     errcallback.callback = output_plugin_error_callback;
     810          918 :     errcallback.arg = &state;
     811          918 :     errcallback.previous = error_context_stack;
     812          918 :     error_context_stack = &errcallback;
     813              : 
     814              :     /* set output state */
     815          918 :     ctx->accept_writes = false;
     816          918 :     ctx->end_xact = false;
     817              : 
     818              :     /* do the actual work: call callback */
     819          918 :     ctx->callbacks.shutdown_cb(ctx);
     820              : 
     821              :     /* Pop the error context stack */
     822          918 :     error_context_stack = errcallback.previous;
     823          918 : }
     824              : 
     825              : 
     826              : /*
     827              :  * Callbacks for ReorderBuffer which add in some more information and then call
     828              :  * output_plugin.h plugins.
     829              :  */
     830              : static void
     831         1822 : begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
     832              : {
     833         1822 :     LogicalDecodingContext *ctx = cache->private_data;
     834              :     LogicalErrorCallbackState state;
     835              :     ErrorContextCallback errcallback;
     836              : 
     837              :     Assert(!ctx->fast_forward);
     838              : 
     839              :     /* Push callback + info on the error context stack */
     840         1822 :     state.ctx = ctx;
     841         1822 :     state.callback_name = "begin";
     842         1822 :     state.report_location = txn->first_lsn;
     843         1822 :     errcallback.callback = output_plugin_error_callback;
     844         1822 :     errcallback.arg = &state;
     845         1822 :     errcallback.previous = error_context_stack;
     846         1822 :     error_context_stack = &errcallback;
     847              : 
     848              :     /* set output state */
     849         1822 :     ctx->accept_writes = true;
     850         1822 :     ctx->write_xid = txn->xid;
     851         1822 :     ctx->write_location = txn->first_lsn;
     852         1822 :     ctx->end_xact = false;
     853              : 
     854              :     /* do the actual work: call callback */
     855         1822 :     ctx->callbacks.begin_cb(ctx, txn);
     856              : 
     857              :     /* Pop the error context stack */
     858         1822 :     error_context_stack = errcallback.previous;
     859         1822 : }
     860              : 
     861              : static void
     862         1817 : commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
     863              :                   XLogRecPtr commit_lsn)
     864              : {
     865         1817 :     LogicalDecodingContext *ctx = cache->private_data;
     866              :     LogicalErrorCallbackState state;
     867              :     ErrorContextCallback errcallback;
     868              : 
     869              :     Assert(!ctx->fast_forward);
     870              : 
     871              :     /* Push callback + info on the error context stack */
     872         1817 :     state.ctx = ctx;
     873         1817 :     state.callback_name = "commit";
     874         1817 :     state.report_location = txn->final_lsn; /* beginning of commit record */
     875         1817 :     errcallback.callback = output_plugin_error_callback;
     876         1817 :     errcallback.arg = &state;
     877         1817 :     errcallback.previous = error_context_stack;
     878         1817 :     error_context_stack = &errcallback;
     879              : 
     880              :     /* set output state */
     881         1817 :     ctx->accept_writes = true;
     882         1817 :     ctx->write_xid = txn->xid;
     883         1817 :     ctx->write_location = txn->end_lsn; /* points to the end of the record */
     884         1817 :     ctx->end_xact = true;
     885              : 
     886              :     /* do the actual work: call callback */
     887         1817 :     ctx->callbacks.commit_cb(ctx, txn, commit_lsn);
     888              : 
     889              :     /* Pop the error context stack */
     890         1807 :     error_context_stack = errcallback.previous;
     891         1807 : }
     892              : 
     893              : /*
     894              :  * The functionality of begin_prepare is quite similar to begin with the
     895              :  * exception that this will have gid (global transaction id) information which
     896              :  * can be used by plugin. Now, we thought about extending the existing begin
     897              :  * but that would break the replication protocol and additionally this looks
     898              :  * cleaner.
     899              :  */
     900              : static void
     901           30 : begin_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
     902              : {
     903           30 :     LogicalDecodingContext *ctx = cache->private_data;
     904              :     LogicalErrorCallbackState state;
     905              :     ErrorContextCallback errcallback;
     906              : 
     907              :     Assert(!ctx->fast_forward);
     908              : 
     909              :     /* We're only supposed to call this when two-phase commits are supported */
     910              :     Assert(ctx->twophase);
     911              : 
     912              :     /* Push callback + info on the error context stack */
     913           30 :     state.ctx = ctx;
     914           30 :     state.callback_name = "begin_prepare";
     915           30 :     state.report_location = txn->first_lsn;
     916           30 :     errcallback.callback = output_plugin_error_callback;
     917           30 :     errcallback.arg = &state;
     918           30 :     errcallback.previous = error_context_stack;
     919           30 :     error_context_stack = &errcallback;
     920              : 
     921              :     /* set output state */
     922           30 :     ctx->accept_writes = true;
     923           30 :     ctx->write_xid = txn->xid;
     924           30 :     ctx->write_location = txn->first_lsn;
     925           30 :     ctx->end_xact = false;
     926              : 
     927              :     /*
     928              :      * If the plugin supports two-phase commits then begin prepare callback is
     929              :      * mandatory
     930              :      */
     931           30 :     if (ctx->callbacks.begin_prepare_cb == NULL)
     932            0 :         ereport(ERROR,
     933              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     934              :                  errmsg("logical replication at prepare time requires a %s callback",
     935              :                         "begin_prepare_cb")));
     936              : 
     937              :     /* do the actual work: call callback */
     938           30 :     ctx->callbacks.begin_prepare_cb(ctx, txn);
     939              : 
     940              :     /* Pop the error context stack */
     941           30 :     error_context_stack = errcallback.previous;
     942           30 : }
     943              : 
     944              : static void
     945           30 : prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
     946              :                    XLogRecPtr prepare_lsn)
     947              : {
     948           30 :     LogicalDecodingContext *ctx = cache->private_data;
     949              :     LogicalErrorCallbackState state;
     950              :     ErrorContextCallback errcallback;
     951              : 
     952              :     Assert(!ctx->fast_forward);
     953              : 
     954              :     /* We're only supposed to call this when two-phase commits are supported */
     955              :     Assert(ctx->twophase);
     956              : 
     957              :     /* Push callback + info on the error context stack */
     958           30 :     state.ctx = ctx;
     959           30 :     state.callback_name = "prepare";
     960           30 :     state.report_location = txn->final_lsn; /* beginning of prepare record */
     961           30 :     errcallback.callback = output_plugin_error_callback;
     962           30 :     errcallback.arg = &state;
     963           30 :     errcallback.previous = error_context_stack;
     964           30 :     error_context_stack = &errcallback;
     965              : 
     966              :     /* set output state */
     967           30 :     ctx->accept_writes = true;
     968           30 :     ctx->write_xid = txn->xid;
     969           30 :     ctx->write_location = txn->end_lsn; /* points to the end of the record */
     970           30 :     ctx->end_xact = true;
     971              : 
     972              :     /*
     973              :      * If the plugin supports two-phase commits then prepare callback is
     974              :      * mandatory
     975              :      */
     976           30 :     if (ctx->callbacks.prepare_cb == NULL)
     977            0 :         ereport(ERROR,
     978              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     979              :                  errmsg("logical replication at prepare time requires a %s callback",
     980              :                         "prepare_cb")));
     981              : 
     982              :     /* do the actual work: call callback */
     983           30 :     ctx->callbacks.prepare_cb(ctx, txn, prepare_lsn);
     984              : 
     985              :     /* Pop the error context stack */
     986           30 :     error_context_stack = errcallback.previous;
     987           30 : }
     988              : 
     989              : static void
     990           34 : commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
     991              :                            XLogRecPtr commit_lsn)
     992              : {
     993           34 :     LogicalDecodingContext *ctx = cache->private_data;
     994              :     LogicalErrorCallbackState state;
     995              :     ErrorContextCallback errcallback;
     996              : 
     997              :     Assert(!ctx->fast_forward);
     998              : 
     999              :     /* We're only supposed to call this when two-phase commits are supported */
    1000              :     Assert(ctx->twophase);
    1001              : 
    1002              :     /* Push callback + info on the error context stack */
    1003           34 :     state.ctx = ctx;
    1004           34 :     state.callback_name = "commit_prepared";
    1005           34 :     state.report_location = txn->final_lsn; /* beginning of commit record */
    1006           34 :     errcallback.callback = output_plugin_error_callback;
    1007           34 :     errcallback.arg = &state;
    1008           34 :     errcallback.previous = error_context_stack;
    1009           34 :     error_context_stack = &errcallback;
    1010              : 
    1011              :     /* set output state */
    1012           34 :     ctx->accept_writes = true;
    1013           34 :     ctx->write_xid = txn->xid;
    1014           34 :     ctx->write_location = txn->end_lsn; /* points to the end of the record */
    1015           34 :     ctx->end_xact = true;
    1016              : 
    1017              :     /*
    1018              :      * If the plugin support two-phase commits then commit prepared callback
    1019              :      * is mandatory
    1020              :      */
    1021           34 :     if (ctx->callbacks.commit_prepared_cb == NULL)
    1022            0 :         ereport(ERROR,
    1023              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1024              :                  errmsg("logical replication at prepare time requires a %s callback",
    1025              :                         "commit_prepared_cb")));
    1026              : 
    1027              :     /* do the actual work: call callback */
    1028           34 :     ctx->callbacks.commit_prepared_cb(ctx, txn, commit_lsn);
    1029              : 
    1030              :     /* Pop the error context stack */
    1031           34 :     error_context_stack = errcallback.previous;
    1032           34 : }
    1033              : 
    1034              : static void
    1035           11 : rollback_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1036              :                              XLogRecPtr prepare_end_lsn,
    1037              :                              TimestampTz prepare_time)
    1038              : {
    1039           11 :     LogicalDecodingContext *ctx = cache->private_data;
    1040              :     LogicalErrorCallbackState state;
    1041              :     ErrorContextCallback errcallback;
    1042              : 
    1043              :     Assert(!ctx->fast_forward);
    1044              : 
    1045              :     /* We're only supposed to call this when two-phase commits are supported */
    1046              :     Assert(ctx->twophase);
    1047              : 
    1048              :     /* Push callback + info on the error context stack */
    1049           11 :     state.ctx = ctx;
    1050           11 :     state.callback_name = "rollback_prepared";
    1051           11 :     state.report_location = txn->final_lsn; /* beginning of commit record */
    1052           11 :     errcallback.callback = output_plugin_error_callback;
    1053           11 :     errcallback.arg = &state;
    1054           11 :     errcallback.previous = error_context_stack;
    1055           11 :     error_context_stack = &errcallback;
    1056              : 
    1057              :     /* set output state */
    1058           11 :     ctx->accept_writes = true;
    1059           11 :     ctx->write_xid = txn->xid;
    1060           11 :     ctx->write_location = txn->end_lsn; /* points to the end of the record */
    1061           11 :     ctx->end_xact = true;
    1062              : 
    1063              :     /*
    1064              :      * If the plugin support two-phase commits then rollback prepared callback
    1065              :      * is mandatory
    1066              :      */
    1067           11 :     if (ctx->callbacks.rollback_prepared_cb == NULL)
    1068            0 :         ereport(ERROR,
    1069              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1070              :                  errmsg("logical replication at prepare time requires a %s callback",
    1071              :                         "rollback_prepared_cb")));
    1072              : 
    1073              :     /* do the actual work: call callback */
    1074           11 :     ctx->callbacks.rollback_prepared_cb(ctx, txn, prepare_end_lsn,
    1075              :                                         prepare_time);
    1076              : 
    1077              :     /* Pop the error context stack */
    1078           11 :     error_context_stack = errcallback.previous;
    1079           11 : }
    1080              : 
    1081              : static void
    1082       179101 : change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1083              :                   Relation relation, ReorderBufferChange *change)
    1084              : {
    1085       179101 :     LogicalDecodingContext *ctx = cache->private_data;
    1086              :     LogicalErrorCallbackState state;
    1087              :     ErrorContextCallback errcallback;
    1088              : 
    1089              :     Assert(!ctx->fast_forward);
    1090              : 
    1091              :     /* Push callback + info on the error context stack */
    1092       179101 :     state.ctx = ctx;
    1093       179101 :     state.callback_name = "change";
    1094       179101 :     state.report_location = change->lsn;
    1095       179101 :     errcallback.callback = output_plugin_error_callback;
    1096       179101 :     errcallback.arg = &state;
    1097       179101 :     errcallback.previous = error_context_stack;
    1098       179101 :     error_context_stack = &errcallback;
    1099              : 
    1100              :     /* set output state */
    1101       179101 :     ctx->accept_writes = true;
    1102       179101 :     ctx->write_xid = txn->xid;
    1103              : 
    1104              :     /*
    1105              :      * Report this change's lsn so replies from clients can give an up-to-date
    1106              :      * answer. This won't ever be enough (and shouldn't be!) to confirm
    1107              :      * receipt of this transaction, but it might allow another transaction's
    1108              :      * commit to be confirmed with one message.
    1109              :      */
    1110       179101 :     ctx->write_location = change->lsn;
    1111              : 
    1112       179101 :     ctx->end_xact = false;
    1113              : 
    1114       179101 :     ctx->callbacks.change_cb(ctx, txn, relation, change);
    1115              : 
    1116              :     /* Pop the error context stack */
    1117       179096 :     error_context_stack = errcallback.previous;
    1118       179096 : }
    1119              : 
    1120              : static void
    1121           35 : truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1122              :                     int nrelations, Relation relations[], ReorderBufferChange *change)
    1123              : {
    1124           35 :     LogicalDecodingContext *ctx = cache->private_data;
    1125              :     LogicalErrorCallbackState state;
    1126              :     ErrorContextCallback errcallback;
    1127              : 
    1128              :     Assert(!ctx->fast_forward);
    1129              : 
    1130           35 :     if (!ctx->callbacks.truncate_cb)
    1131            0 :         return;
    1132              : 
    1133              :     /* Push callback + info on the error context stack */
    1134           35 :     state.ctx = ctx;
    1135           35 :     state.callback_name = "truncate";
    1136           35 :     state.report_location = change->lsn;
    1137           35 :     errcallback.callback = output_plugin_error_callback;
    1138           35 :     errcallback.arg = &state;
    1139           35 :     errcallback.previous = error_context_stack;
    1140           35 :     error_context_stack = &errcallback;
    1141              : 
    1142              :     /* set output state */
    1143           35 :     ctx->accept_writes = true;
    1144           35 :     ctx->write_xid = txn->xid;
    1145              : 
    1146              :     /*
    1147              :      * Report this change's lsn so replies from clients can give an up-to-date
    1148              :      * answer. This won't ever be enough (and shouldn't be!) to confirm
    1149              :      * receipt of this transaction, but it might allow another transaction's
    1150              :      * commit to be confirmed with one message.
    1151              :      */
    1152           35 :     ctx->write_location = change->lsn;
    1153              : 
    1154           35 :     ctx->end_xact = false;
    1155              : 
    1156           35 :     ctx->callbacks.truncate_cb(ctx, txn, nrelations, relations, change);
    1157              : 
    1158              :     /* Pop the error context stack */
    1159           35 :     error_context_stack = errcallback.previous;
    1160              : }
    1161              : 
    1162              : bool
    1163          148 : filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, TransactionId xid,
    1164              :                           const char *gid)
    1165              : {
    1166              :     LogicalErrorCallbackState state;
    1167              :     ErrorContextCallback errcallback;
    1168              :     bool        ret;
    1169              : 
    1170              :     Assert(!ctx->fast_forward);
    1171              : 
    1172              :     /* Push callback + info on the error context stack */
    1173          148 :     state.ctx = ctx;
    1174          148 :     state.callback_name = "filter_prepare";
    1175          148 :     state.report_location = InvalidXLogRecPtr;
    1176          148 :     errcallback.callback = output_plugin_error_callback;
    1177          148 :     errcallback.arg = &state;
    1178          148 :     errcallback.previous = error_context_stack;
    1179          148 :     error_context_stack = &errcallback;
    1180              : 
    1181              :     /* set output state */
    1182          148 :     ctx->accept_writes = false;
    1183          148 :     ctx->end_xact = false;
    1184              : 
    1185              :     /* do the actual work: call callback */
    1186          148 :     ret = ctx->callbacks.filter_prepare_cb(ctx, xid, gid);
    1187              : 
    1188              :     /* Pop the error context stack */
    1189          148 :     error_context_stack = errcallback.previous;
    1190              : 
    1191          148 :     return ret;
    1192              : }
    1193              : 
    1194              : bool
    1195      1307402 : filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, ReplOriginId origin_id)
    1196              : {
    1197              :     LogicalErrorCallbackState state;
    1198              :     ErrorContextCallback errcallback;
    1199              :     bool        ret;
    1200              : 
    1201              :     Assert(!ctx->fast_forward);
    1202              : 
    1203              :     /* Push callback + info on the error context stack */
    1204      1307402 :     state.ctx = ctx;
    1205      1307402 :     state.callback_name = "filter_by_origin";
    1206      1307402 :     state.report_location = InvalidXLogRecPtr;
    1207      1307402 :     errcallback.callback = output_plugin_error_callback;
    1208      1307402 :     errcallback.arg = &state;
    1209      1307402 :     errcallback.previous = error_context_stack;
    1210      1307402 :     error_context_stack = &errcallback;
    1211              : 
    1212              :     /* set output state */
    1213      1307402 :     ctx->accept_writes = false;
    1214      1307402 :     ctx->end_xact = false;
    1215              : 
    1216              :     /* do the actual work: call callback */
    1217      1307402 :     ret = ctx->callbacks.filter_by_origin_cb(ctx, origin_id);
    1218              : 
    1219              :     /* Pop the error context stack */
    1220      1307402 :     error_context_stack = errcallback.previous;
    1221              : 
    1222      1307402 :     return ret;
    1223              : }
    1224              : 
    1225              : static void
    1226           16 : message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1227              :                    XLogRecPtr message_lsn, bool transactional,
    1228              :                    const char *prefix, Size message_size, const char *message)
    1229              : {
    1230           16 :     LogicalDecodingContext *ctx = cache->private_data;
    1231              :     LogicalErrorCallbackState state;
    1232              :     ErrorContextCallback errcallback;
    1233              : 
    1234              :     Assert(!ctx->fast_forward);
    1235              : 
    1236           16 :     if (ctx->callbacks.message_cb == NULL)
    1237            0 :         return;
    1238              : 
    1239              :     /* Push callback + info on the error context stack */
    1240           16 :     state.ctx = ctx;
    1241           16 :     state.callback_name = "message";
    1242           16 :     state.report_location = message_lsn;
    1243           16 :     errcallback.callback = output_plugin_error_callback;
    1244           16 :     errcallback.arg = &state;
    1245           16 :     errcallback.previous = error_context_stack;
    1246           16 :     error_context_stack = &errcallback;
    1247              : 
    1248              :     /* set output state */
    1249           16 :     ctx->accept_writes = true;
    1250           16 :     ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
    1251           16 :     ctx->write_location = message_lsn;
    1252           16 :     ctx->end_xact = false;
    1253              : 
    1254              :     /* do the actual work: call callback */
    1255           16 :     ctx->callbacks.message_cb(ctx, txn, message_lsn, transactional, prefix,
    1256              :                               message_size, message);
    1257              : 
    1258              :     /* Pop the error context stack */
    1259           16 :     error_context_stack = errcallback.previous;
    1260              : }
    1261              : 
    1262              : static void
    1263          680 : stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1264              :                         XLogRecPtr first_lsn)
    1265              : {
    1266          680 :     LogicalDecodingContext *ctx = cache->private_data;
    1267              :     LogicalErrorCallbackState state;
    1268              :     ErrorContextCallback errcallback;
    1269              : 
    1270              :     Assert(!ctx->fast_forward);
    1271              : 
    1272              :     /* We're only supposed to call this when streaming is supported. */
    1273              :     Assert(ctx->streaming);
    1274              : 
    1275              :     /* Push callback + info on the error context stack */
    1276          680 :     state.ctx = ctx;
    1277          680 :     state.callback_name = "stream_start";
    1278          680 :     state.report_location = first_lsn;
    1279          680 :     errcallback.callback = output_plugin_error_callback;
    1280          680 :     errcallback.arg = &state;
    1281          680 :     errcallback.previous = error_context_stack;
    1282          680 :     error_context_stack = &errcallback;
    1283              : 
    1284              :     /* set output state */
    1285          680 :     ctx->accept_writes = true;
    1286          680 :     ctx->write_xid = txn->xid;
    1287              : 
    1288              :     /*
    1289              :      * Report this message's lsn so replies from clients can give an
    1290              :      * up-to-date answer. This won't ever be enough (and shouldn't be!) to
    1291              :      * confirm receipt of this transaction, but it might allow another
    1292              :      * transaction's commit to be confirmed with one message.
    1293              :      */
    1294          680 :     ctx->write_location = first_lsn;
    1295              : 
    1296          680 :     ctx->end_xact = false;
    1297              : 
    1298              :     /* in streaming mode, stream_start_cb is required */
    1299          680 :     if (ctx->callbacks.stream_start_cb == NULL)
    1300            0 :         ereport(ERROR,
    1301              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1302              :                  errmsg("logical streaming requires a %s callback",
    1303              :                         "stream_start_cb")));
    1304              : 
    1305          680 :     ctx->callbacks.stream_start_cb(ctx, txn);
    1306              : 
    1307              :     /* Pop the error context stack */
    1308          680 :     error_context_stack = errcallback.previous;
    1309          680 : }
    1310              : 
    1311              : static void
    1312          680 : stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1313              :                        XLogRecPtr last_lsn)
    1314              : {
    1315          680 :     LogicalDecodingContext *ctx = cache->private_data;
    1316              :     LogicalErrorCallbackState state;
    1317              :     ErrorContextCallback errcallback;
    1318              : 
    1319              :     Assert(!ctx->fast_forward);
    1320              : 
    1321              :     /* We're only supposed to call this when streaming is supported. */
    1322              :     Assert(ctx->streaming);
    1323              : 
    1324              :     /* Push callback + info on the error context stack */
    1325          680 :     state.ctx = ctx;
    1326          680 :     state.callback_name = "stream_stop";
    1327          680 :     state.report_location = last_lsn;
    1328          680 :     errcallback.callback = output_plugin_error_callback;
    1329          680 :     errcallback.arg = &state;
    1330          680 :     errcallback.previous = error_context_stack;
    1331          680 :     error_context_stack = &errcallback;
    1332              : 
    1333              :     /* set output state */
    1334          680 :     ctx->accept_writes = true;
    1335          680 :     ctx->write_xid = txn->xid;
    1336              : 
    1337              :     /*
    1338              :      * Report this message's lsn so replies from clients can give an
    1339              :      * up-to-date answer. This won't ever be enough (and shouldn't be!) to
    1340              :      * confirm receipt of this transaction, but it might allow another
    1341              :      * transaction's commit to be confirmed with one message.
    1342              :      */
    1343          680 :     ctx->write_location = last_lsn;
    1344              : 
    1345          680 :     ctx->end_xact = false;
    1346              : 
    1347              :     /* in streaming mode, stream_stop_cb is required */
    1348          680 :     if (ctx->callbacks.stream_stop_cb == NULL)
    1349            0 :         ereport(ERROR,
    1350              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1351              :                  errmsg("logical streaming requires a %s callback",
    1352              :                         "stream_stop_cb")));
    1353              : 
    1354          680 :     ctx->callbacks.stream_stop_cb(ctx, txn);
    1355              : 
    1356              :     /* Pop the error context stack */
    1357          680 :     error_context_stack = errcallback.previous;
    1358          680 : }
    1359              : 
    1360              : static void
    1361           30 : stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1362              :                         XLogRecPtr abort_lsn)
    1363              : {
    1364           30 :     LogicalDecodingContext *ctx = cache->private_data;
    1365              :     LogicalErrorCallbackState state;
    1366              :     ErrorContextCallback errcallback;
    1367              : 
    1368              :     Assert(!ctx->fast_forward);
    1369              : 
    1370              :     /* We're only supposed to call this when streaming is supported. */
    1371              :     Assert(ctx->streaming);
    1372              : 
    1373              :     /* Push callback + info on the error context stack */
    1374           30 :     state.ctx = ctx;
    1375           30 :     state.callback_name = "stream_abort";
    1376           30 :     state.report_location = abort_lsn;
    1377           30 :     errcallback.callback = output_plugin_error_callback;
    1378           30 :     errcallback.arg = &state;
    1379           30 :     errcallback.previous = error_context_stack;
    1380           30 :     error_context_stack = &errcallback;
    1381              : 
    1382              :     /* set output state */
    1383           30 :     ctx->accept_writes = true;
    1384           30 :     ctx->write_xid = txn->xid;
    1385           30 :     ctx->write_location = abort_lsn;
    1386           30 :     ctx->end_xact = true;
    1387              : 
    1388              :     /* in streaming mode, stream_abort_cb is required */
    1389           30 :     if (ctx->callbacks.stream_abort_cb == NULL)
    1390            0 :         ereport(ERROR,
    1391              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1392              :                  errmsg("logical streaming requires a %s callback",
    1393              :                         "stream_abort_cb")));
    1394              : 
    1395           30 :     ctx->callbacks.stream_abort_cb(ctx, txn, abort_lsn);
    1396              : 
    1397              :     /* Pop the error context stack */
    1398           30 :     error_context_stack = errcallback.previous;
    1399           30 : }
    1400              : 
    1401              : static void
    1402           17 : stream_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1403              :                           XLogRecPtr prepare_lsn)
    1404              : {
    1405           17 :     LogicalDecodingContext *ctx = cache->private_data;
    1406              :     LogicalErrorCallbackState state;
    1407              :     ErrorContextCallback errcallback;
    1408              : 
    1409              :     Assert(!ctx->fast_forward);
    1410              : 
    1411              :     /*
    1412              :      * We're only supposed to call this when streaming and two-phase commits
    1413              :      * are supported.
    1414              :      */
    1415              :     Assert(ctx->streaming);
    1416              :     Assert(ctx->twophase);
    1417              : 
    1418              :     /* Push callback + info on the error context stack */
    1419           17 :     state.ctx = ctx;
    1420           17 :     state.callback_name = "stream_prepare";
    1421           17 :     state.report_location = txn->final_lsn;
    1422           17 :     errcallback.callback = output_plugin_error_callback;
    1423           17 :     errcallback.arg = &state;
    1424           17 :     errcallback.previous = error_context_stack;
    1425           17 :     error_context_stack = &errcallback;
    1426              : 
    1427              :     /* set output state */
    1428           17 :     ctx->accept_writes = true;
    1429           17 :     ctx->write_xid = txn->xid;
    1430           17 :     ctx->write_location = txn->end_lsn;
    1431           17 :     ctx->end_xact = true;
    1432              : 
    1433              :     /* in streaming mode with two-phase commits, stream_prepare_cb is required */
    1434           17 :     if (ctx->callbacks.stream_prepare_cb == NULL)
    1435            0 :         ereport(ERROR,
    1436              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1437              :                  errmsg("logical streaming at prepare time requires a %s callback",
    1438              :                         "stream_prepare_cb")));
    1439              : 
    1440           17 :     ctx->callbacks.stream_prepare_cb(ctx, txn, prepare_lsn);
    1441              : 
    1442              :     /* Pop the error context stack */
    1443           17 :     error_context_stack = errcallback.previous;
    1444           17 : }
    1445              : 
    1446              : static void
    1447           51 : stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1448              :                          XLogRecPtr commit_lsn)
    1449              : {
    1450           51 :     LogicalDecodingContext *ctx = cache->private_data;
    1451              :     LogicalErrorCallbackState state;
    1452              :     ErrorContextCallback errcallback;
    1453              : 
    1454              :     Assert(!ctx->fast_forward);
    1455              : 
    1456              :     /* We're only supposed to call this when streaming is supported. */
    1457              :     Assert(ctx->streaming);
    1458              : 
    1459              :     /* Push callback + info on the error context stack */
    1460           51 :     state.ctx = ctx;
    1461           51 :     state.callback_name = "stream_commit";
    1462           51 :     state.report_location = txn->final_lsn;
    1463           51 :     errcallback.callback = output_plugin_error_callback;
    1464           51 :     errcallback.arg = &state;
    1465           51 :     errcallback.previous = error_context_stack;
    1466           51 :     error_context_stack = &errcallback;
    1467              : 
    1468              :     /* set output state */
    1469           51 :     ctx->accept_writes = true;
    1470           51 :     ctx->write_xid = txn->xid;
    1471           51 :     ctx->write_location = txn->end_lsn;
    1472           51 :     ctx->end_xact = true;
    1473              : 
    1474              :     /* in streaming mode, stream_commit_cb is required */
    1475           51 :     if (ctx->callbacks.stream_commit_cb == NULL)
    1476            0 :         ereport(ERROR,
    1477              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1478              :                  errmsg("logical streaming requires a %s callback",
    1479              :                         "stream_commit_cb")));
    1480              : 
    1481           51 :     ctx->callbacks.stream_commit_cb(ctx, txn, commit_lsn);
    1482              : 
    1483              :     /* Pop the error context stack */
    1484           51 :     error_context_stack = errcallback.previous;
    1485           51 : }
    1486              : 
    1487              : static void
    1488       180978 : stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1489              :                          Relation relation, ReorderBufferChange *change)
    1490              : {
    1491       180978 :     LogicalDecodingContext *ctx = cache->private_data;
    1492              :     LogicalErrorCallbackState state;
    1493              :     ErrorContextCallback errcallback;
    1494              : 
    1495              :     Assert(!ctx->fast_forward);
    1496              : 
    1497              :     /* We're only supposed to call this when streaming is supported. */
    1498              :     Assert(ctx->streaming);
    1499              : 
    1500              :     /* Push callback + info on the error context stack */
    1501       180978 :     state.ctx = ctx;
    1502       180978 :     state.callback_name = "stream_change";
    1503       180978 :     state.report_location = change->lsn;
    1504       180978 :     errcallback.callback = output_plugin_error_callback;
    1505       180978 :     errcallback.arg = &state;
    1506       180978 :     errcallback.previous = error_context_stack;
    1507       180978 :     error_context_stack = &errcallback;
    1508              : 
    1509              :     /* set output state */
    1510       180978 :     ctx->accept_writes = true;
    1511       180978 :     ctx->write_xid = txn->xid;
    1512              : 
    1513              :     /*
    1514              :      * Report this change's lsn so replies from clients can give an up-to-date
    1515              :      * answer. This won't ever be enough (and shouldn't be!) to confirm
    1516              :      * receipt of this transaction, but it might allow another transaction's
    1517              :      * commit to be confirmed with one message.
    1518              :      */
    1519       180978 :     ctx->write_location = change->lsn;
    1520              : 
    1521       180978 :     ctx->end_xact = false;
    1522              : 
    1523              :     /* in streaming mode, stream_change_cb is required */
    1524       180978 :     if (ctx->callbacks.stream_change_cb == NULL)
    1525            0 :         ereport(ERROR,
    1526              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1527              :                  errmsg("logical streaming requires a %s callback",
    1528              :                         "stream_change_cb")));
    1529              : 
    1530       180978 :     ctx->callbacks.stream_change_cb(ctx, txn, relation, change);
    1531              : 
    1532              :     /* Pop the error context stack */
    1533       180978 :     error_context_stack = errcallback.previous;
    1534       180978 : }
    1535              : 
    1536              : static void
    1537            3 : stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1538              :                           XLogRecPtr message_lsn, bool transactional,
    1539              :                           const char *prefix, Size message_size, const char *message)
    1540              : {
    1541            3 :     LogicalDecodingContext *ctx = cache->private_data;
    1542              :     LogicalErrorCallbackState state;
    1543              :     ErrorContextCallback errcallback;
    1544              : 
    1545              :     Assert(!ctx->fast_forward);
    1546              : 
    1547              :     /* We're only supposed to call this when streaming is supported. */
    1548              :     Assert(ctx->streaming);
    1549              : 
    1550              :     /* this callback is optional */
    1551            3 :     if (ctx->callbacks.stream_message_cb == NULL)
    1552            0 :         return;
    1553              : 
    1554              :     /* Push callback + info on the error context stack */
    1555            3 :     state.ctx = ctx;
    1556            3 :     state.callback_name = "stream_message";
    1557            3 :     state.report_location = message_lsn;
    1558            3 :     errcallback.callback = output_plugin_error_callback;
    1559            3 :     errcallback.arg = &state;
    1560            3 :     errcallback.previous = error_context_stack;
    1561            3 :     error_context_stack = &errcallback;
    1562              : 
    1563              :     /* set output state */
    1564            3 :     ctx->accept_writes = true;
    1565            3 :     ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
    1566            3 :     ctx->write_location = message_lsn;
    1567            3 :     ctx->end_xact = false;
    1568              : 
    1569              :     /* do the actual work: call callback */
    1570            3 :     ctx->callbacks.stream_message_cb(ctx, txn, message_lsn, transactional, prefix,
    1571              :                                      message_size, message);
    1572              : 
    1573              :     /* Pop the error context stack */
    1574            3 :     error_context_stack = errcallback.previous;
    1575              : }
    1576              : 
    1577              : static void
    1578            0 : stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1579              :                            int nrelations, Relation relations[],
    1580              :                            ReorderBufferChange *change)
    1581              : {
    1582            0 :     LogicalDecodingContext *ctx = cache->private_data;
    1583              :     LogicalErrorCallbackState state;
    1584              :     ErrorContextCallback errcallback;
    1585              : 
    1586              :     Assert(!ctx->fast_forward);
    1587              : 
    1588              :     /* We're only supposed to call this when streaming is supported. */
    1589              :     Assert(ctx->streaming);
    1590              : 
    1591              :     /* this callback is optional */
    1592            0 :     if (!ctx->callbacks.stream_truncate_cb)
    1593            0 :         return;
    1594              : 
    1595              :     /* Push callback + info on the error context stack */
    1596            0 :     state.ctx = ctx;
    1597            0 :     state.callback_name = "stream_truncate";
    1598            0 :     state.report_location = change->lsn;
    1599            0 :     errcallback.callback = output_plugin_error_callback;
    1600            0 :     errcallback.arg = &state;
    1601            0 :     errcallback.previous = error_context_stack;
    1602            0 :     error_context_stack = &errcallback;
    1603              : 
    1604              :     /* set output state */
    1605            0 :     ctx->accept_writes = true;
    1606            0 :     ctx->write_xid = txn->xid;
    1607              : 
    1608              :     /*
    1609              :      * Report this change's lsn so replies from clients can give an up-to-date
    1610              :      * answer. This won't ever be enough (and shouldn't be!) to confirm
    1611              :      * receipt of this transaction, but it might allow another transaction's
    1612              :      * commit to be confirmed with one message.
    1613              :      */
    1614            0 :     ctx->write_location = change->lsn;
    1615              : 
    1616            0 :     ctx->end_xact = false;
    1617              : 
    1618            0 :     ctx->callbacks.stream_truncate_cb(ctx, txn, nrelations, relations, change);
    1619              : 
    1620              :     /* Pop the error context stack */
    1621            0 :     error_context_stack = errcallback.previous;
    1622              : }
    1623              : 
    1624              : static void
    1625         3362 : update_progress_txn_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1626              :                                XLogRecPtr lsn)
    1627              : {
    1628         3362 :     LogicalDecodingContext *ctx = cache->private_data;
    1629              :     LogicalErrorCallbackState state;
    1630              :     ErrorContextCallback errcallback;
    1631              : 
    1632              :     Assert(!ctx->fast_forward);
    1633              : 
    1634              :     /* Push callback + info on the error context stack */
    1635         3362 :     state.ctx = ctx;
    1636         3362 :     state.callback_name = "update_progress_txn";
    1637         3362 :     state.report_location = lsn;
    1638         3362 :     errcallback.callback = output_plugin_error_callback;
    1639         3362 :     errcallback.arg = &state;
    1640         3362 :     errcallback.previous = error_context_stack;
    1641         3362 :     error_context_stack = &errcallback;
    1642              : 
    1643              :     /* set output state */
    1644         3362 :     ctx->accept_writes = false;
    1645         3362 :     ctx->write_xid = txn->xid;
    1646              : 
    1647              :     /*
    1648              :      * Report this change's lsn so replies from clients can give an up-to-date
    1649              :      * answer. This won't ever be enough (and shouldn't be!) to confirm
    1650              :      * receipt of this transaction, but it might allow another transaction's
    1651              :      * commit to be confirmed with one message.
    1652              :      */
    1653         3362 :     ctx->write_location = lsn;
    1654              : 
    1655         3362 :     ctx->end_xact = false;
    1656              : 
    1657         3362 :     OutputPluginUpdateProgress(ctx, false);
    1658              : 
    1659              :     /* Pop the error context stack */
    1660         3362 :     error_context_stack = errcallback.previous;
    1661         3362 : }
    1662              : 
    1663              : /*
    1664              :  * Set the required catalog xmin horizon for historic snapshots in the current
    1665              :  * replication slot.
    1666              :  *
    1667              :  * Note that in the most cases, we won't be able to immediately use the xmin
    1668              :  * to increase the xmin horizon: we need to wait till the client has confirmed
    1669              :  * receiving current_lsn with LogicalConfirmReceivedLocation().
    1670              :  */
    1671              : void
    1672          541 : LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
    1673              : {
    1674          541 :     bool        updated_xmin = false;
    1675              :     ReplicationSlot *slot;
    1676          541 :     bool        got_new_xmin = false;
    1677              : 
    1678          541 :     slot = MyReplicationSlot;
    1679              : 
    1680              :     Assert(slot != NULL);
    1681              : 
    1682          541 :     SpinLockAcquire(&slot->mutex);
    1683              : 
    1684              :     /*
    1685              :      * don't overwrite if we already have a newer xmin. This can happen if we
    1686              :      * restart decoding in a slot.
    1687              :      */
    1688          541 :     if (TransactionIdPrecedesOrEquals(xmin, slot->data.catalog_xmin))
    1689              :     {
    1690              :     }
    1691              : 
    1692              :     /*
    1693              :      * If the client has already confirmed up to this lsn, we directly can
    1694              :      * mark this as accepted. This can happen if we restart decoding in a
    1695              :      * slot.
    1696              :      */
    1697          193 :     else if (current_lsn <= slot->data.confirmed_flush)
    1698              :     {
    1699           67 :         slot->candidate_catalog_xmin = xmin;
    1700           67 :         slot->candidate_xmin_lsn = current_lsn;
    1701              : 
    1702              :         /* our candidate can directly be used */
    1703           67 :         updated_xmin = true;
    1704              :     }
    1705              : 
    1706              :     /*
    1707              :      * Only increase if the previous values have been applied, otherwise we
    1708              :      * might never end up updating if the receiver acks too slowly.
    1709              :      */
    1710          126 :     else if (!XLogRecPtrIsValid(slot->candidate_xmin_lsn))
    1711              :     {
    1712           23 :         slot->candidate_catalog_xmin = xmin;
    1713           23 :         slot->candidate_xmin_lsn = current_lsn;
    1714              : 
    1715              :         /*
    1716              :          * Log new xmin at an appropriate log level after releasing the
    1717              :          * spinlock.
    1718              :          */
    1719           23 :         got_new_xmin = true;
    1720              :     }
    1721          541 :     SpinLockRelease(&slot->mutex);
    1722              : 
    1723          541 :     if (got_new_xmin)
    1724           23 :         elog(DEBUG1, "got new catalog xmin %u at %X/%08X", xmin,
    1725              :              LSN_FORMAT_ARGS(current_lsn));
    1726              : 
    1727              :     /* candidate already valid with the current flush position, apply */
    1728          541 :     if (updated_xmin)
    1729           67 :         LogicalConfirmReceivedLocation(slot->data.confirmed_flush);
    1730          541 : }
    1731              : 
    1732              : /*
    1733              :  * Mark the minimal LSN (restart_lsn) we need to read to replay all
    1734              :  * transactions that have not yet committed at current_lsn.
    1735              :  *
    1736              :  * Just like LogicalIncreaseXminForSlot this only takes effect when the
    1737              :  * client has confirmed to have received current_lsn.
    1738              :  */
    1739              : void
    1740          490 : LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart_lsn)
    1741              : {
    1742          490 :     bool        updated_lsn = false;
    1743              :     ReplicationSlot *slot;
    1744              : 
    1745          490 :     slot = MyReplicationSlot;
    1746              : 
    1747              :     Assert(slot != NULL);
    1748              :     Assert(XLogRecPtrIsValid(restart_lsn));
    1749              :     Assert(XLogRecPtrIsValid(current_lsn));
    1750              : 
    1751          490 :     SpinLockAcquire(&slot->mutex);
    1752              : 
    1753              :     /* don't overwrite if have a newer restart lsn */
    1754          490 :     if (restart_lsn <= slot->data.restart_lsn)
    1755              :     {
    1756           10 :         SpinLockRelease(&slot->mutex);
    1757              :     }
    1758              : 
    1759              :     /*
    1760              :      * We might have already flushed far enough to directly accept this lsn,
    1761              :      * in this case there is no need to check for existing candidate LSNs
    1762              :      */
    1763          480 :     else if (current_lsn <= slot->data.confirmed_flush)
    1764              :     {
    1765          324 :         slot->candidate_restart_valid = current_lsn;
    1766          324 :         slot->candidate_restart_lsn = restart_lsn;
    1767          324 :         SpinLockRelease(&slot->mutex);
    1768              : 
    1769              :         /* our candidate can directly be used */
    1770          324 :         updated_lsn = true;
    1771              :     }
    1772              : 
    1773              :     /*
    1774              :      * Only increase if the previous values have been applied, otherwise we
    1775              :      * might never end up updating if the receiver acks too slowly. A missed
    1776              :      * value here will just cause some extra effort after reconnecting.
    1777              :      */
    1778          156 :     else if (!XLogRecPtrIsValid(slot->candidate_restart_valid))
    1779              :     {
    1780           44 :         slot->candidate_restart_valid = current_lsn;
    1781           44 :         slot->candidate_restart_lsn = restart_lsn;
    1782           44 :         SpinLockRelease(&slot->mutex);
    1783              : 
    1784           44 :         elog(DEBUG1, "got new restart lsn %X/%08X at %X/%08X",
    1785              :              LSN_FORMAT_ARGS(restart_lsn),
    1786              :              LSN_FORMAT_ARGS(current_lsn));
    1787              :     }
    1788              :     else
    1789              :     {
    1790              :         XLogRecPtr  candidate_restart_lsn;
    1791              :         XLogRecPtr  candidate_restart_valid;
    1792              :         XLogRecPtr  confirmed_flush;
    1793              : 
    1794          112 :         candidate_restart_lsn = slot->candidate_restart_lsn;
    1795          112 :         candidate_restart_valid = slot->candidate_restart_valid;
    1796          112 :         confirmed_flush = slot->data.confirmed_flush;
    1797          112 :         SpinLockRelease(&slot->mutex);
    1798              : 
    1799          112 :         elog(DEBUG1, "failed to increase restart lsn: proposed %X/%08X, after %X/%08X, current candidate %X/%08X, current after %X/%08X, flushed up to %X/%08X",
    1800              :              LSN_FORMAT_ARGS(restart_lsn),
    1801              :              LSN_FORMAT_ARGS(current_lsn),
    1802              :              LSN_FORMAT_ARGS(candidate_restart_lsn),
    1803              :              LSN_FORMAT_ARGS(candidate_restart_valid),
    1804              :              LSN_FORMAT_ARGS(confirmed_flush));
    1805              :     }
    1806              : 
    1807              :     /* candidates are already valid with the current flush position, apply */
    1808          490 :     if (updated_lsn)
    1809          324 :         LogicalConfirmReceivedLocation(slot->data.confirmed_flush);
    1810          490 : }
    1811              : 
    1812              : /*
    1813              :  * Handle a consumer's confirmation having received all changes up to lsn.
    1814              :  */
    1815              : void
    1816        19539 : LogicalConfirmReceivedLocation(XLogRecPtr lsn)
    1817              : {
    1818              :     Assert(XLogRecPtrIsValid(lsn));
    1819              : 
    1820              :     /* Do an unlocked check for candidate_lsn first. */
    1821        19539 :     if (XLogRecPtrIsValid(MyReplicationSlot->candidate_xmin_lsn) ||
    1822        19418 :         XLogRecPtrIsValid(MyReplicationSlot->candidate_restart_valid))
    1823          467 :     {
    1824          467 :         bool        updated_xmin = false;
    1825          467 :         bool        updated_restart = false;
    1826              :         XLogRecPtr  restart_lsn pg_attribute_unused();
    1827              : 
    1828          467 :         SpinLockAcquire(&MyReplicationSlot->mutex);
    1829              : 
    1830              :         /* remember the old restart lsn */
    1831          467 :         restart_lsn = MyReplicationSlot->data.restart_lsn;
    1832              : 
    1833              :         /*
    1834              :          * Prevent moving the confirmed_flush backwards, as this could lead to
    1835              :          * data duplication issues caused by replicating already replicated
    1836              :          * changes.
    1837              :          *
    1838              :          * This can happen when a client acknowledges an LSN it doesn't have
    1839              :          * to do anything for, and thus didn't store persistently. After a
    1840              :          * restart, the client can send the prior LSN that it stored
    1841              :          * persistently as an acknowledgement, but we need to ignore such an
    1842              :          * LSN. See similar case handling in CreateDecodingContext.
    1843              :          */
    1844          467 :         if (lsn > MyReplicationSlot->data.confirmed_flush)
    1845           39 :             MyReplicationSlot->data.confirmed_flush = lsn;
    1846              : 
    1847              :         /* if we're past the location required for bumping xmin, do so */
    1848          467 :         if (XLogRecPtrIsValid(MyReplicationSlot->candidate_xmin_lsn) &&
    1849          121 :             MyReplicationSlot->candidate_xmin_lsn <= lsn)
    1850              :         {
    1851              :             /*
    1852              :              * We have to write the changed xmin to disk *before* we change
    1853              :              * the in-memory value, otherwise after a crash we wouldn't know
    1854              :              * that some catalog tuples might have been removed already.
    1855              :              *
    1856              :              * Ensure that by first writing to ->xmin and only update
    1857              :              * ->effective_xmin once the new state is synced to disk. After a
    1858              :              * crash ->effective_xmin is set to ->xmin.
    1859              :              */
    1860           84 :             if (TransactionIdIsValid(MyReplicationSlot->candidate_catalog_xmin) &&
    1861           84 :                 MyReplicationSlot->data.catalog_xmin != MyReplicationSlot->candidate_catalog_xmin)
    1862              :             {
    1863           84 :                 MyReplicationSlot->data.catalog_xmin = MyReplicationSlot->candidate_catalog_xmin;
    1864           84 :                 MyReplicationSlot->candidate_catalog_xmin = InvalidTransactionId;
    1865           84 :                 MyReplicationSlot->candidate_xmin_lsn = InvalidXLogRecPtr;
    1866           84 :                 updated_xmin = true;
    1867              :             }
    1868              :         }
    1869              : 
    1870          467 :         if (XLogRecPtrIsValid(MyReplicationSlot->candidate_restart_valid) &&
    1871          400 :             MyReplicationSlot->candidate_restart_valid <= lsn)
    1872              :         {
    1873              :             Assert(XLogRecPtrIsValid(MyReplicationSlot->candidate_restart_lsn));
    1874              : 
    1875          362 :             MyReplicationSlot->data.restart_lsn = MyReplicationSlot->candidate_restart_lsn;
    1876          362 :             MyReplicationSlot->candidate_restart_lsn = InvalidXLogRecPtr;
    1877          362 :             MyReplicationSlot->candidate_restart_valid = InvalidXLogRecPtr;
    1878          362 :             updated_restart = true;
    1879              :         }
    1880              : 
    1881          467 :         SpinLockRelease(&MyReplicationSlot->mutex);
    1882              : 
    1883              :         /* first write new xmin to disk, so we know what's up after a crash */
    1884          467 :         if (updated_xmin || updated_restart)
    1885              :         {
    1886              : #ifdef USE_INJECTION_POINTS
    1887              :             XLogSegNo   seg1,
    1888              :                         seg2;
    1889              : 
    1890          429 :             XLByteToSeg(restart_lsn, seg1, wal_segment_size);
    1891          429 :             XLByteToSeg(MyReplicationSlot->data.restart_lsn, seg2, wal_segment_size);
    1892              : 
    1893              :             /* trigger injection point, but only if segment changes */
    1894          429 :             if (seg1 != seg2)
    1895           18 :                 INJECTION_POINT("logical-replication-slot-advance-segment", NULL);
    1896              : #endif
    1897              : 
    1898          429 :             ReplicationSlotMarkDirty();
    1899          429 :             ReplicationSlotSave();
    1900          429 :             elog(DEBUG1, "updated xmin: %u restart: %u", updated_xmin, updated_restart);
    1901              :         }
    1902              : 
    1903              :         /*
    1904              :          * Now the new xmin is safely on disk, we can let the global value
    1905              :          * advance. We do not take ProcArrayLock or similar since we only
    1906              :          * advance xmin here and there's not much harm done by a concurrent
    1907              :          * computation missing that.
    1908              :          */
    1909          467 :         if (updated_xmin)
    1910              :         {
    1911           84 :             SpinLockAcquire(&MyReplicationSlot->mutex);
    1912           84 :             MyReplicationSlot->effective_catalog_xmin = MyReplicationSlot->data.catalog_xmin;
    1913           84 :             SpinLockRelease(&MyReplicationSlot->mutex);
    1914              : 
    1915           84 :             ReplicationSlotsComputeRequiredXmin(false);
    1916           84 :             ReplicationSlotsComputeRequiredLSN();
    1917              :         }
    1918              :     }
    1919              :     else
    1920              :     {
    1921        19072 :         SpinLockAcquire(&MyReplicationSlot->mutex);
    1922              : 
    1923              :         /*
    1924              :          * Prevent moving the confirmed_flush backwards. See comments above
    1925              :          * for the details.
    1926              :          */
    1927        19072 :         if (lsn > MyReplicationSlot->data.confirmed_flush)
    1928        17270 :             MyReplicationSlot->data.confirmed_flush = lsn;
    1929              : 
    1930        19072 :         SpinLockRelease(&MyReplicationSlot->mutex);
    1931              :     }
    1932        19539 : }
    1933              : 
    1934              : /*
    1935              :  * Clear logical streaming state during (sub)transaction abort.
    1936              :  */
    1937              : void
    1938        40836 : ResetLogicalStreamingState(void)
    1939              : {
    1940        40836 :     CheckXidAlive = InvalidTransactionId;
    1941        40836 :     bsysscan = false;
    1942        40836 : }
    1943              : 
    1944              : /*
    1945              :  * Report stats for a slot.
    1946              :  */
    1947              : void
    1948         6327 : UpdateDecodingStats(LogicalDecodingContext *ctx)
    1949              : {
    1950         6327 :     ReorderBuffer *rb = ctx->reorder;
    1951              :     PgStat_StatReplSlotEntry repSlotStat;
    1952              : 
    1953              :     /* Nothing to do if we don't have any replication stats to be sent. */
    1954         6327 :     if (rb->spillBytes <= 0 && rb->streamBytes <= 0 && rb->totalBytes <= 0 &&
    1955          465 :         rb->memExceededCount <= 0)
    1956          456 :         return;
    1957              : 
    1958         5871 :     elog(DEBUG2, "UpdateDecodingStats: updating stats %p %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64,
    1959              :          rb,
    1960              :          rb->spillTxns,
    1961              :          rb->spillCount,
    1962              :          rb->spillBytes,
    1963              :          rb->streamTxns,
    1964              :          rb->streamCount,
    1965              :          rb->streamBytes,
    1966              :          rb->memExceededCount,
    1967              :          rb->totalTxns,
    1968              :          rb->totalBytes);
    1969              : 
    1970         5871 :     repSlotStat.spill_txns = rb->spillTxns;
    1971         5871 :     repSlotStat.spill_count = rb->spillCount;
    1972         5871 :     repSlotStat.spill_bytes = rb->spillBytes;
    1973         5871 :     repSlotStat.stream_txns = rb->streamTxns;
    1974         5871 :     repSlotStat.stream_count = rb->streamCount;
    1975         5871 :     repSlotStat.stream_bytes = rb->streamBytes;
    1976         5871 :     repSlotStat.mem_exceeded_count = rb->memExceededCount;
    1977         5871 :     repSlotStat.total_txns = rb->totalTxns;
    1978         5871 :     repSlotStat.total_bytes = rb->totalBytes;
    1979              : 
    1980         5871 :     pgstat_report_replslot(ctx->slot, &repSlotStat);
    1981              : 
    1982         5871 :     rb->spillTxns = 0;
    1983         5871 :     rb->spillCount = 0;
    1984         5871 :     rb->spillBytes = 0;
    1985         5871 :     rb->streamTxns = 0;
    1986         5871 :     rb->streamCount = 0;
    1987         5871 :     rb->streamBytes = 0;
    1988         5871 :     rb->memExceededCount = 0;
    1989         5871 :     rb->totalTxns = 0;
    1990         5871 :     rb->totalBytes = 0;
    1991              : }
    1992              : 
    1993              : /*
    1994              :  * Read up to the end of WAL starting from the decoding slot's restart_lsn
    1995              :  * to end_of_wal in order to check if any meaningful/decodable WAL records
    1996              :  * are encountered. scan_cutoff_lsn is the LSN, where we can terminate the
    1997              :  * WAL scan early if we find a decodable WAL record after this LSN.
    1998              :  *
    1999              :  * Returns the last LSN decodable WAL record's LSN if found, otherwise
    2000              :  * returns InvalidXLogRecPtr.
    2001              :  */
    2002              : XLogRecPtr
    2003            3 : LogicalReplicationSlotCheckPendingWal(XLogRecPtr end_of_wal,
    2004              :                                       XLogRecPtr scan_cutoff_lsn)
    2005              : {
    2006            3 :     XLogRecPtr  last_pending_wal = InvalidXLogRecPtr;
    2007              : 
    2008              :     Assert(MyReplicationSlot);
    2009              :     Assert(end_of_wal >= scan_cutoff_lsn);
    2010              : 
    2011            3 :     PG_TRY();
    2012              :     {
    2013              :         LogicalDecodingContext *ctx;
    2014              : 
    2015              :         /*
    2016              :          * Create our decoding context in fast_forward mode, passing start_lsn
    2017              :          * as InvalidXLogRecPtr, so that we start processing from the slot's
    2018              :          * confirmed_flush.
    2019              :          */
    2020            6 :         ctx = CreateDecodingContext(InvalidXLogRecPtr,
    2021              :                                     NIL,
    2022              :                                     true,   /* fast_forward */
    2023            3 :                                     XL_ROUTINE(.page_read = read_local_xlog_page,
    2024              :                                                .segment_open = wal_segment_open,
    2025              :                                                .segment_close = wal_segment_close),
    2026              :                                     NULL, NULL, NULL);
    2027              : 
    2028              :         /*
    2029              :          * Start reading at the slot's restart_lsn, which we know points to a
    2030              :          * valid record.
    2031              :          */
    2032            3 :         XLogBeginRead(ctx->reader, MyReplicationSlot->data.restart_lsn);
    2033              : 
    2034              :         /* Invalidate non-timetravel entries */
    2035            3 :         InvalidateSystemCaches();
    2036              : 
    2037           81 :         while (ctx->reader->EndRecPtr < end_of_wal)
    2038              :         {
    2039              :             XLogRecord *record;
    2040           78 :             char       *errm = NULL;
    2041              : 
    2042           78 :             record = XLogReadRecord(ctx->reader, &errm);
    2043              : 
    2044           78 :             if (errm)
    2045            0 :                 elog(ERROR, "could not find record for logical decoding: %s", errm);
    2046              : 
    2047           78 :             if (record != NULL)
    2048           78 :                 LogicalDecodingProcessRecord(ctx, ctx->reader);
    2049              : 
    2050           78 :             if (ctx->processing_required)
    2051              :             {
    2052            2 :                 last_pending_wal = ctx->reader->ReadRecPtr;
    2053              : 
    2054              :                 /*
    2055              :                  * If we find a decodable WAL after the scan_cutoff_lsn point,
    2056              :                  * we can terminate the scan early.
    2057              :                  */
    2058            2 :                 if (last_pending_wal >= scan_cutoff_lsn)
    2059            0 :                     break;
    2060              : 
    2061              :                 /* Reset the flag and continue checking */
    2062            2 :                 ctx->processing_required = false;
    2063              :             }
    2064              : 
    2065           78 :             CHECK_FOR_INTERRUPTS();
    2066              :         }
    2067              : 
    2068              :         /* Clean up */
    2069            3 :         FreeDecodingContext(ctx);
    2070            3 :         InvalidateSystemCaches();
    2071              :     }
    2072            0 :     PG_CATCH();
    2073              :     {
    2074              :         /* clear all timetravel entries */
    2075            0 :         InvalidateSystemCaches();
    2076              : 
    2077            0 :         PG_RE_THROW();
    2078              :     }
    2079            3 :     PG_END_TRY();
    2080              : 
    2081            3 :     return last_pending_wal;
    2082              : }
    2083              : 
    2084              : /*
    2085              :  * Helper function for advancing our logical replication slot forward.
    2086              :  *
    2087              :  * The slot's restart_lsn is used as start point for reading records, while
    2088              :  * confirmed_flush is used as base point for the decoding context.
    2089              :  *
    2090              :  * We cannot just do LogicalConfirmReceivedLocation to update confirmed_flush,
    2091              :  * because we need to digest WAL to advance restart_lsn allowing to recycle
    2092              :  * WAL and removal of old catalog tuples.  As decoding is done in fast_forward
    2093              :  * mode, no changes are generated anyway.
    2094              :  *
    2095              :  * *found_consistent_snapshot will be true if the initial decoding snapshot has
    2096              :  * been built; Otherwise, it will be false.
    2097              :  */
    2098              : XLogRecPtr
    2099           22 : LogicalSlotAdvanceAndCheckSnapState(XLogRecPtr moveto,
    2100              :                                     bool *found_consistent_snapshot)
    2101              : {
    2102              :     LogicalDecodingContext *ctx;
    2103           22 :     ResourceOwner old_resowner PG_USED_FOR_ASSERTS_ONLY = CurrentResourceOwner;
    2104              :     XLogRecPtr  retlsn;
    2105              : 
    2106              :     Assert(XLogRecPtrIsValid(moveto));
    2107              : 
    2108           22 :     if (found_consistent_snapshot)
    2109           11 :         *found_consistent_snapshot = false;
    2110              : 
    2111           22 :     PG_TRY();
    2112              :     {
    2113              :         /*
    2114              :          * Create our decoding context in fast_forward mode, passing start_lsn
    2115              :          * as InvalidXLogRecPtr, so that we start processing from my slot's
    2116              :          * confirmed_flush.
    2117              :          */
    2118           44 :         ctx = CreateDecodingContext(InvalidXLogRecPtr,
    2119              :                                     NIL,
    2120              :                                     true,   /* fast_forward */
    2121           22 :                                     XL_ROUTINE(.page_read = read_local_xlog_page,
    2122              :                                                .segment_open = wal_segment_open,
    2123              :                                                .segment_close = wal_segment_close),
    2124              :                                     NULL, NULL, NULL);
    2125              : 
    2126              :         /*
    2127              :          * Wait for specified streaming replication standby servers (if any)
    2128              :          * to confirm receipt of WAL up to moveto lsn.
    2129              :          */
    2130           22 :         WaitForStandbyConfirmation(moveto);
    2131              : 
    2132              :         /*
    2133              :          * Start reading at the slot's restart_lsn, which we know to point to
    2134              :          * a valid record.
    2135              :          */
    2136           22 :         XLogBeginRead(ctx->reader, MyReplicationSlot->data.restart_lsn);
    2137              : 
    2138              :         /* invalidate non-timetravel entries */
    2139           22 :         InvalidateSystemCaches();
    2140              : 
    2141              :         /* Decode records until we reach the requested target */
    2142         2961 :         while (ctx->reader->EndRecPtr < moveto)
    2143              :         {
    2144         2939 :             char       *errm = NULL;
    2145              :             XLogRecord *record;
    2146              : 
    2147              :             /*
    2148              :              * Read records.  No changes are generated in fast_forward mode,
    2149              :              * but snapbuilder/slot statuses are updated properly.
    2150              :              */
    2151         2939 :             record = XLogReadRecord(ctx->reader, &errm);
    2152         2939 :             if (errm)
    2153            0 :                 elog(ERROR, "could not find record while advancing replication slot: %s",
    2154              :                      errm);
    2155              : 
    2156              :             /*
    2157              :              * Process the record.  Storage-level changes are ignored in
    2158              :              * fast_forward mode, but other modules (such as snapbuilder)
    2159              :              * might still have critical updates to do.
    2160              :              */
    2161         2939 :             if (record)
    2162              :             {
    2163         2939 :                 LogicalDecodingProcessRecord(ctx, ctx->reader);
    2164              : 
    2165              :                 /*
    2166              :                  * We used to have bugs where logical decoding would fail to
    2167              :                  * preserve the resource owner.  That's important here, so
    2168              :                  * verify that that doesn't happen anymore.  XXX this could be
    2169              :                  * removed once it's been battle-tested.
    2170              :                  */
    2171              :                 Assert(CurrentResourceOwner == old_resowner);
    2172              :             }
    2173              : 
    2174         2939 :             CHECK_FOR_INTERRUPTS();
    2175              :         }
    2176              : 
    2177           22 :         if (found_consistent_snapshot && DecodingContextReady(ctx))
    2178           11 :             *found_consistent_snapshot = true;
    2179              : 
    2180           22 :         if (XLogRecPtrIsValid(ctx->reader->EndRecPtr))
    2181              :         {
    2182           22 :             LogicalConfirmReceivedLocation(moveto);
    2183              : 
    2184              :             /*
    2185              :              * If only the confirmed_flush LSN has changed the slot won't get
    2186              :              * marked as dirty by the above. Callers on the walsender
    2187              :              * interface are expected to keep track of their own progress and
    2188              :              * don't need it written out. But SQL-interface users cannot
    2189              :              * specify their own start positions and it's harder for them to
    2190              :              * keep track of their progress, so we should make more of an
    2191              :              * effort to save it for them.
    2192              :              *
    2193              :              * Dirty the slot so it is written out at the next checkpoint. The
    2194              :              * LSN position advanced to may still be lost on a crash but this
    2195              :              * makes the data consistent after a clean shutdown.
    2196              :              */
    2197           22 :             ReplicationSlotMarkDirty();
    2198              :         }
    2199              : 
    2200           22 :         retlsn = MyReplicationSlot->data.confirmed_flush;
    2201              : 
    2202              :         /* free context, call shutdown callback */
    2203           22 :         FreeDecodingContext(ctx);
    2204              : 
    2205           22 :         InvalidateSystemCaches();
    2206              :     }
    2207            0 :     PG_CATCH();
    2208              :     {
    2209              :         /* clear all timetravel entries */
    2210            0 :         InvalidateSystemCaches();
    2211              : 
    2212            0 :         PG_RE_THROW();
    2213              :     }
    2214           22 :     PG_END_TRY();
    2215              : 
    2216           22 :     return retlsn;
    2217              : }
        

Generated by: LCOV version 2.0-1