LCOV - code coverage report
Current view: top level - src/backend/replication/logical - logical.c (source / functions) Hit Total Coverage
Test: PostgreSQL 15devel Lines: 598 643 93.0 %
Date: 2021-12-09 04:09:06 Functions: 37 38 97.4 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  * logical.c
       3             :  *     PostgreSQL logical decoding coordination
       4             :  *
       5             :  * Copyright (c) 2012-2021, PostgreSQL Global Development Group
       6             :  *
       7             :  * IDENTIFICATION
       8             :  *    src/backend/replication/logical/logical.c
       9             :  *
      10             :  * NOTES
      11             :  *    This file coordinates interaction between the various modules that
      12             :  *    together provide logical decoding, primarily by providing so
      13             :  *    called LogicalDecodingContexts. The goal is to encapsulate most of the
      14             :  *    internal complexity for consumers of logical decoding, so they can
      15             :  *    create and consume a changestream with a low amount of code. Builtin
      16             :  *    consumers are the walsender and SQL SRF interface, but it's possible to
      17             :  *    add further ones without changing core code, e.g. to consume changes in
      18             :  *    a bgworker.
      19             :  *
      20             :  *    The idea is that a consumer provides three callbacks, one to read WAL,
      21             :  *    one to prepare a data write, and a final one for actually writing since
      22             :  *    their implementation depends on the type of consumer.  Check
      23             :  *    logicalfuncs.c for an example implementation of a fairly simple consumer
      24             :  *    and an implementation of a WAL reading callback that's suitable for
      25             :  *    simple consumers.
      26             :  *-------------------------------------------------------------------------
      27             :  */
      28             : 
      29             : #include "postgres.h"
      30             : 
      31             : #include "access/xact.h"
      32             : #include "access/xlog_internal.h"
      33             : #include "fmgr.h"
      34             : #include "miscadmin.h"
      35             : #include "pgstat.h"
      36             : #include "replication/decode.h"
      37             : #include "replication/logical.h"
      38             : #include "replication/origin.h"
      39             : #include "replication/reorderbuffer.h"
      40             : #include "replication/snapbuild.h"
      41             : #include "storage/proc.h"
      42             : #include "storage/procarray.h"
      43             : #include "utils/builtins.h"
      44             : #include "utils/memutils.h"
      45             : 
      46             : /* data for errcontext callback */
      47             : typedef struct LogicalErrorCallbackState
      48             : {
      49             :     LogicalDecodingContext *ctx;
      50             :     const char *callback_name;
      51             :     XLogRecPtr  report_location;
      52             : } LogicalErrorCallbackState;
      53             : 
      54             : /* wrappers around output plugin callbacks */
      55             : static void output_plugin_error_callback(void *arg);
      56             : static void startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
      57             :                                bool is_init);
      58             : static void shutdown_cb_wrapper(LogicalDecodingContext *ctx);
      59             : static void begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn);
      60             : static void commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      61             :                               XLogRecPtr commit_lsn);
      62             : static void begin_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn);
      63             : static void prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      64             :                                XLogRecPtr prepare_lsn);
      65             : static void commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      66             :                                        XLogRecPtr commit_lsn);
      67             : static void rollback_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      68             :                                          XLogRecPtr prepare_end_lsn, TimestampTz prepare_time);
      69             : static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      70             :                               Relation relation, ReorderBufferChange *change);
      71             : static void truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      72             :                                 int nrelations, Relation relations[], ReorderBufferChange *change);
      73             : static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      74             :                                XLogRecPtr message_lsn, bool transactional,
      75             :                                const char *prefix, Size message_size, const char *message);
      76             : 
      77             : /* streaming callbacks */
      78             : static void stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      79             :                                     XLogRecPtr first_lsn);
      80             : static void stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      81             :                                    XLogRecPtr last_lsn);
      82             : static void stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      83             :                                     XLogRecPtr abort_lsn);
      84             : static void stream_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      85             :                                       XLogRecPtr prepare_lsn);
      86             : static void stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      87             :                                      XLogRecPtr commit_lsn);
      88             : static void stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      89             :                                      Relation relation, ReorderBufferChange *change);
      90             : static void stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      91             :                                       XLogRecPtr message_lsn, bool transactional,
      92             :                                       const char *prefix, Size message_size, const char *message);
      93             : static void stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      94             :                                        int nrelations, Relation relations[], ReorderBufferChange *change);
      95             : 
      96             : static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, const char *plugin);
      97             : 
      98             : /*
      99             :  * Make sure the current settings & environment are capable of doing logical
     100             :  * decoding.
     101             :  */
     102             : void
     103        1090 : CheckLogicalDecodingRequirements(void)
     104             : {
     105        1090 :     CheckSlotRequirements();
     106             : 
     107             :     /*
     108             :      * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
     109             :      * needs the same check.
     110             :      */
     111             : 
     112        1090 :     if (wal_level < WAL_LEVEL_LOGICAL)
     113           0 :         ereport(ERROR,
     114             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     115             :                  errmsg("logical decoding requires wal_level >= logical")));
     116             : 
     117        1090 :     if (MyDatabaseId == InvalidOid)
     118           2 :         ereport(ERROR,
     119             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     120             :                  errmsg("logical decoding requires a database connection")));
     121             : 
     122             :     /* ----
     123             :      * TODO: We got to change that someday soon...
     124             :      *
     125             :      * There's basically three things missing to allow this:
     126             :      * 1) We need to be able to correctly and quickly identify the timeline a
     127             :      *    LSN belongs to
     128             :      * 2) We need to force hot_standby_feedback to be enabled at all times so
     129             :      *    the primary cannot remove rows we need.
     130             :      * 3) support dropping replication slots referring to a database, in
     131             :      *    dbase_redo. There can't be any active ones due to HS recovery
     132             :      *    conflicts, so that should be relatively easy.
     133             :      * ----
     134             :      */
     135        1088 :     if (RecoveryInProgress())
     136           0 :         ereport(ERROR,
     137             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     138             :                  errmsg("logical decoding cannot be used while in recovery")));
     139        1088 : }
     140             : 
     141             : /*
     142             :  * Helper function for CreateInitDecodingContext() and
     143             :  * CreateDecodingContext() performing common tasks.
     144             :  */
     145             : static LogicalDecodingContext *
     146        1072 : StartupDecodingContext(List *output_plugin_options,
     147             :                        XLogRecPtr start_lsn,
     148             :                        TransactionId xmin_horizon,
     149             :                        bool need_full_snapshot,
     150             :                        bool fast_forward,
     151             :                        XLogReaderRoutine *xl_routine,
     152             :                        LogicalOutputPluginWriterPrepareWrite prepare_write,
     153             :                        LogicalOutputPluginWriterWrite do_write,
     154             :                        LogicalOutputPluginWriterUpdateProgress update_progress)
     155             : {
     156             :     ReplicationSlot *slot;
     157             :     MemoryContext context,
     158             :                 old_context;
     159             :     LogicalDecodingContext *ctx;
     160             : 
     161             :     /* shorter lines... */
     162        1072 :     slot = MyReplicationSlot;
     163             : 
     164        1072 :     context = AllocSetContextCreate(CurrentMemoryContext,
     165             :                                     "Logical decoding context",
     166             :                                     ALLOCSET_DEFAULT_SIZES);
     167        1072 :     old_context = MemoryContextSwitchTo(context);
     168        1072 :     ctx = palloc0(sizeof(LogicalDecodingContext));
     169             : 
     170        1072 :     ctx->context = context;
     171             : 
     172             :     /*
     173             :      * (re-)load output plugins, so we detect a bad (removed) output plugin
     174             :      * now.
     175             :      */
     176        1072 :     if (!fast_forward)
     177        1066 :         LoadOutputPlugin(&ctx->callbacks, NameStr(slot->data.plugin));
     178             : 
     179             :     /*
     180             :      * Now that the slot's xmin has been set, we can announce ourselves as a
     181             :      * logical decoding backend which doesn't need to be checked individually
     182             :      * when computing the xmin horizon because the xmin is enforced via
     183             :      * replication slots.
     184             :      *
     185             :      * We can only do so if we're outside of a transaction (i.e. the case when
     186             :      * streaming changes via walsender), otherwise an already setup
     187             :      * snapshot/xid would end up being ignored. That's not a particularly
     188             :      * bothersome restriction since the SQL interface can't be used for
     189             :      * streaming anyway.
     190             :      */
     191        1070 :     if (!IsTransactionOrTransactionBlock())
     192             :     {
     193         386 :         LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
     194         386 :         MyProc->statusFlags |= PROC_IN_LOGICAL_DECODING;
     195         386 :         ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags;
     196         386 :         LWLockRelease(ProcArrayLock);
     197             :     }
     198             : 
     199        1070 :     ctx->slot = slot;
     200             : 
     201        1070 :     ctx->reader = XLogReaderAllocate(wal_segment_size, NULL, xl_routine, ctx);
     202        1070 :     if (!ctx->reader)
     203           0 :         ereport(ERROR,
     204             :                 (errcode(ERRCODE_OUT_OF_MEMORY),
     205             :                  errmsg("out of memory"),
     206             :                  errdetail("Failed while allocating a WAL reading processor.")));
     207             : 
     208        1070 :     ctx->reorder = ReorderBufferAllocate();
     209        1070 :     ctx->snapshot_builder =
     210        1070 :         AllocateSnapshotBuilder(ctx->reorder, xmin_horizon, start_lsn,
     211             :                                 need_full_snapshot, slot->data.two_phase_at);
     212             : 
     213        1070 :     ctx->reorder->private_data = ctx;
     214             : 
     215             :     /* wrap output plugin callbacks, so we can add error context information */
     216        1070 :     ctx->reorder->begin = begin_cb_wrapper;
     217        1070 :     ctx->reorder->apply_change = change_cb_wrapper;
     218        1070 :     ctx->reorder->apply_truncate = truncate_cb_wrapper;
     219        1070 :     ctx->reorder->commit = commit_cb_wrapper;
     220        1070 :     ctx->reorder->message = message_cb_wrapper;
     221             : 
     222             :     /*
     223             :      * To support streaming, we require start/stop/abort/commit/change
     224             :      * callbacks. The message and truncate callbacks are optional, similar to
     225             :      * regular output plugins. We however enable streaming when at least one
     226             :      * of the methods is enabled so that we can easily identify missing
     227             :      * methods.
     228             :      *
     229             :      * We decide it here, but only check it later in the wrappers.
     230             :      */
     231        2146 :     ctx->streaming = (ctx->callbacks.stream_start_cb != NULL) ||
     232           6 :         (ctx->callbacks.stream_stop_cb != NULL) ||
     233           6 :         (ctx->callbacks.stream_abort_cb != NULL) ||
     234           6 :         (ctx->callbacks.stream_commit_cb != NULL) ||
     235           6 :         (ctx->callbacks.stream_change_cb != NULL) ||
     236        1082 :         (ctx->callbacks.stream_message_cb != NULL) ||
     237           6 :         (ctx->callbacks.stream_truncate_cb != NULL);
     238             : 
     239             :     /*
     240             :      * streaming callbacks
     241             :      *
     242             :      * stream_message and stream_truncate callbacks are optional, so we do not
     243             :      * fail with ERROR when missing, but the wrappers simply do nothing. We
     244             :      * must set the ReorderBuffer callbacks to something, otherwise the calls
     245             :      * from there will crash (we don't want to move the checks there).
     246             :      */
     247        1070 :     ctx->reorder->stream_start = stream_start_cb_wrapper;
     248        1070 :     ctx->reorder->stream_stop = stream_stop_cb_wrapper;
     249        1070 :     ctx->reorder->stream_abort = stream_abort_cb_wrapper;
     250        1070 :     ctx->reorder->stream_prepare = stream_prepare_cb_wrapper;
     251        1070 :     ctx->reorder->stream_commit = stream_commit_cb_wrapper;
     252        1070 :     ctx->reorder->stream_change = stream_change_cb_wrapper;
     253        1070 :     ctx->reorder->stream_message = stream_message_cb_wrapper;
     254        1070 :     ctx->reorder->stream_truncate = stream_truncate_cb_wrapper;
     255             : 
     256             : 
     257             :     /*
     258             :      * To support two-phase logical decoding, we require
     259             :      * begin_prepare/prepare/commit-prepare/abort-prepare callbacks. The
     260             :      * filter_prepare callback is optional. We however enable two-phase
     261             :      * logical decoding when at least one of the methods is enabled so that we
     262             :      * can easily identify missing methods.
     263             :      *
     264             :      * We decide it here, but only check it later in the wrappers.
     265             :      */
     266        2146 :     ctx->twophase = (ctx->callbacks.begin_prepare_cb != NULL) ||
     267           6 :         (ctx->callbacks.prepare_cb != NULL) ||
     268           6 :         (ctx->callbacks.commit_prepared_cb != NULL) ||
     269           6 :         (ctx->callbacks.rollback_prepared_cb != NULL) ||
     270        1082 :         (ctx->callbacks.stream_prepare_cb != NULL) ||
     271           6 :         (ctx->callbacks.filter_prepare_cb != NULL);
     272             : 
     273             :     /*
     274             :      * Callback to support decoding at prepare time.
     275             :      */
     276        1070 :     ctx->reorder->begin_prepare = begin_prepare_cb_wrapper;
     277        1070 :     ctx->reorder->prepare = prepare_cb_wrapper;
     278        1070 :     ctx->reorder->commit_prepared = commit_prepared_cb_wrapper;
     279        1070 :     ctx->reorder->rollback_prepared = rollback_prepared_cb_wrapper;
     280             : 
     281        1070 :     ctx->out = makeStringInfo();
     282        1070 :     ctx->prepare_write = prepare_write;
     283        1070 :     ctx->write = do_write;
     284        1070 :     ctx->update_progress = update_progress;
     285             : 
     286        1070 :     ctx->output_plugin_options = output_plugin_options;
     287             : 
     288        1070 :     ctx->fast_forward = fast_forward;
     289             : 
     290        1070 :     MemoryContextSwitchTo(old_context);
     291             : 
     292        1070 :     return ctx;
     293             : }
     294             : 
     295             : /*
     296             :  * Create a new decoding context, for a new logical slot.
     297             :  *
     298             :  * plugin -- contains the name of the output plugin
     299             :  * output_plugin_options -- contains options passed to the output plugin
     300             :  * need_full_snapshot -- if true, must obtain a snapshot able to read all
     301             :  *      tables; if false, one that can read only catalogs is acceptable.
     302             :  * restart_lsn -- if given as invalid, it's this routine's responsibility to
     303             :  *      mark WAL as reserved by setting a convenient restart_lsn for the slot.
     304             :  *      Otherwise, we set for decoding to start from the given LSN without
     305             :  *      marking WAL reserved beforehand.  In that scenario, it's up to the
     306             :  *      caller to guarantee that WAL remains available.
     307             :  * xl_routine -- XLogReaderRoutine for underlying XLogReader
     308             :  * prepare_write, do_write, update_progress --
     309             :  *      callbacks that perform the use-case dependent, actual, work.
     310             :  *
     311             :  * Needs to be called while in a memory context that's at least as long lived
     312             :  * as the decoding context because further memory contexts will be created
     313             :  * inside it.
     314             :  *
     315             :  * Returns an initialized decoding context after calling the output plugin's
     316             :  * startup function.
     317             :  */
     318             : LogicalDecodingContext *
     319         438 : CreateInitDecodingContext(const char *plugin,
     320             :                           List *output_plugin_options,
     321             :                           bool need_full_snapshot,
     322             :                           XLogRecPtr restart_lsn,
     323             :                           XLogReaderRoutine *xl_routine,
     324             :                           LogicalOutputPluginWriterPrepareWrite prepare_write,
     325             :                           LogicalOutputPluginWriterWrite do_write,
     326             :                           LogicalOutputPluginWriterUpdateProgress update_progress)
     327             : {
     328         438 :     TransactionId xmin_horizon = InvalidTransactionId;
     329             :     ReplicationSlot *slot;
     330             :     NameData    plugin_name;
     331             :     LogicalDecodingContext *ctx;
     332             :     MemoryContext old_context;
     333             : 
     334             :     /* shorter lines... */
     335         438 :     slot = MyReplicationSlot;
     336             : 
     337             :     /* first some sanity checks that are unlikely to be violated */
     338         438 :     if (slot == NULL)
     339           0 :         elog(ERROR, "cannot perform logical decoding without an acquired slot");
     340             : 
     341         438 :     if (plugin == NULL)
     342           0 :         elog(ERROR, "cannot initialize logical decoding without a specified plugin");
     343             : 
     344             :     /* Make sure the passed slot is suitable. These are user facing errors. */
     345         438 :     if (SlotIsPhysical(slot))
     346           0 :         ereport(ERROR,
     347             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     348             :                  errmsg("cannot use physical replication slot for logical decoding")));
     349             : 
     350         438 :     if (slot->data.database != MyDatabaseId)
     351           0 :         ereport(ERROR,
     352             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     353             :                  errmsg("replication slot \"%s\" was not created in this database",
     354             :                         NameStr(slot->data.name))));
     355             : 
     356         788 :     if (IsTransactionState() &&
     357         350 :         GetTopTransactionIdIfAny() != InvalidTransactionId)
     358           4 :         ereport(ERROR,
     359             :                 (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
     360             :                  errmsg("cannot create logical replication slot in transaction that has performed writes")));
     361             : 
     362             :     /*
     363             :      * Register output plugin name with slot.  We need the mutex to avoid
     364             :      * concurrent reading of a partially copied string.  But we don't want any
     365             :      * complicated code while holding a spinlock, so do namestrcpy() outside.
     366             :      */
     367         434 :     namestrcpy(&plugin_name, plugin);
     368         434 :     SpinLockAcquire(&slot->mutex);
     369         434 :     slot->data.plugin = plugin_name;
     370         434 :     SpinLockRelease(&slot->mutex);
     371             : 
     372         434 :     if (XLogRecPtrIsInvalid(restart_lsn))
     373         422 :         ReplicationSlotReserveWal();
     374             :     else
     375             :     {
     376          12 :         SpinLockAcquire(&slot->mutex);
     377          12 :         slot->data.restart_lsn = restart_lsn;
     378          12 :         SpinLockRelease(&slot->mutex);
     379             :     }
     380             : 
     381             :     /* ----
     382             :      * This is a bit tricky: We need to determine a safe xmin horizon to start
     383             :      * decoding from, to avoid starting from a running xacts record referring
     384             :      * to xids whose rows have been vacuumed or pruned
     385             :      * already. GetOldestSafeDecodingTransactionId() returns such a value, but
     386             :      * without further interlock its return value might immediately be out of
     387             :      * date.
     388             :      *
     389             :      * So we have to acquire the ProcArrayLock to prevent computation of new
     390             :      * xmin horizons by other backends, get the safe decoding xid, and inform
     391             :      * the slot machinery about the new limit. Once that's done the
     392             :      * ProcArrayLock can be released as the slot machinery now is
     393             :      * protecting against vacuum.
     394             :      *
     395             :      * Note that, temporarily, the data, not just the catalog, xmin has to be
     396             :      * reserved if a data snapshot is to be exported.  Otherwise the initial
     397             :      * data snapshot created here is not guaranteed to be valid. After that
     398             :      * the data xmin doesn't need to be managed anymore and the global xmin
     399             :      * should be recomputed. As we are fine with losing the pegged data xmin
     400             :      * after crash - no chance a snapshot would get exported anymore - we can
     401             :      * get away with just setting the slot's
     402             :      * effective_xmin. ReplicationSlotRelease will reset it again.
     403             :      *
     404             :      * ----
     405             :      */
     406         434 :     LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
     407             : 
     408         434 :     xmin_horizon = GetOldestSafeDecodingTransactionId(!need_full_snapshot);
     409             : 
     410         434 :     SpinLockAcquire(&slot->mutex);
     411         434 :     slot->effective_catalog_xmin = xmin_horizon;
     412         434 :     slot->data.catalog_xmin = xmin_horizon;
     413         434 :     if (need_full_snapshot)
     414         176 :         slot->effective_xmin = xmin_horizon;
     415         434 :     SpinLockRelease(&slot->mutex);
     416             : 
     417         434 :     ReplicationSlotsComputeRequiredXmin(true);
     418             : 
     419         434 :     LWLockRelease(ProcArrayLock);
     420             : 
     421         434 :     ReplicationSlotMarkDirty();
     422         434 :     ReplicationSlotSave();
     423             : 
     424         434 :     ctx = StartupDecodingContext(NIL, restart_lsn, xmin_horizon,
     425             :                                  need_full_snapshot, false,
     426             :                                  xl_routine, prepare_write, do_write,
     427             :                                  update_progress);
     428             : 
     429             :     /* call output plugin initialization callback */
     430         432 :     old_context = MemoryContextSwitchTo(ctx->context);
     431         432 :     if (ctx->callbacks.startup_cb != NULL)
     432         432 :         startup_cb_wrapper(ctx, &ctx->options, true);
     433         432 :     MemoryContextSwitchTo(old_context);
     434             : 
     435             :     /*
     436             :      * We allow decoding of prepared transactions when the two_phase is
     437             :      * enabled at the time of slot creation, or when the two_phase option is
     438             :      * given at the streaming start, provided the plugin supports all the
     439             :      * callbacks for two-phase.
     440             :      */
     441         432 :     ctx->twophase &= slot->data.two_phase;
     442             : 
     443         432 :     ctx->reorder->output_rewrites = ctx->options.receive_rewrites;
     444             : 
     445         432 :     return ctx;
     446             : }
     447             : 
     448             : /*
     449             :  * Create a new decoding context, for a logical slot that has previously been
     450             :  * used already.
     451             :  *
     452             :  * start_lsn
     453             :  *      The LSN at which to start decoding.  If InvalidXLogRecPtr, restart
     454             :  *      from the slot's confirmed_flush; otherwise, start from the specified
     455             :  *      location (but move it forwards to confirmed_flush if it's older than
     456             :  *      that, see below).
     457             :  *
     458             :  * output_plugin_options
     459             :  *      options passed to the output plugin.
     460             :  *
     461             :  * fast_forward
     462             :  *      bypass the generation of logical changes.
     463             :  *
     464             :  * xl_routine
     465             :  *      XLogReaderRoutine used by underlying xlogreader
     466             :  *
     467             :  * prepare_write, do_write, update_progress
     468             :  *      callbacks that have to be filled to perform the use-case dependent,
     469             :  *      actual work.
     470             :  *
     471             :  * Needs to be called while in a memory context that's at least as long lived
     472             :  * as the decoding context because further memory contexts will be created
     473             :  * inside it.
     474             :  *
     475             :  * Returns an initialized decoding context after calling the output plugin's
     476             :  * startup function.
     477             :  */
     478             : LogicalDecodingContext *
     479         644 : CreateDecodingContext(XLogRecPtr start_lsn,
     480             :                       List *output_plugin_options,
     481             :                       bool fast_forward,
     482             :                       XLogReaderRoutine *xl_routine,
     483             :                       LogicalOutputPluginWriterPrepareWrite prepare_write,
     484             :                       LogicalOutputPluginWriterWrite do_write,
     485             :                       LogicalOutputPluginWriterUpdateProgress update_progress)
     486             : {
     487             :     LogicalDecodingContext *ctx;
     488             :     ReplicationSlot *slot;
     489             :     MemoryContext old_context;
     490             : 
     491             :     /* shorter lines... */
     492         644 :     slot = MyReplicationSlot;
     493             : 
     494             :     /* first some sanity checks that are unlikely to be violated */
     495         644 :     if (slot == NULL)
     496           0 :         elog(ERROR, "cannot perform logical decoding without an acquired slot");
     497             : 
     498             :     /* make sure the passed slot is suitable, these are user facing errors */
     499         644 :     if (SlotIsPhysical(slot))
     500           2 :         ereport(ERROR,
     501             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     502             :                  errmsg("cannot use physical replication slot for logical decoding")));
     503             : 
     504         642 :     if (slot->data.database != MyDatabaseId)
     505           4 :         ereport(ERROR,
     506             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     507             :                  errmsg("replication slot \"%s\" was not created in this database",
     508             :                         NameStr(slot->data.name))));
     509             : 
     510         638 :     if (start_lsn == InvalidXLogRecPtr)
     511             :     {
     512             :         /* continue from last position */
     513         438 :         start_lsn = slot->data.confirmed_flush;
     514             :     }
     515         200 :     else if (start_lsn < slot->data.confirmed_flush)
     516             :     {
     517             :         /*
     518             :          * It might seem like we should error out in this case, but it's
     519             :          * pretty common for a client to acknowledge a LSN it doesn't have to
     520             :          * do anything for, and thus didn't store persistently, because the
     521             :          * xlog records didn't result in anything relevant for logical
     522             :          * decoding. Clients have to be able to do that to support synchronous
     523             :          * replication.
     524             :          *
     525             :          * Starting at a different LSN than requested might not catch certain
     526             :          * kinds of client errors; so the client may wish to check that
     527             :          * confirmed_flush_lsn matches its expectations.
     528             :          */
     529          12 :         elog(LOG, "%X/%X has been already streamed, forwarding to %X/%X",
     530             :              LSN_FORMAT_ARGS(start_lsn),
     531             :              LSN_FORMAT_ARGS(slot->data.confirmed_flush));
     532             : 
     533          12 :         start_lsn = slot->data.confirmed_flush;
     534             :     }
     535             : 
     536         638 :     ctx = StartupDecodingContext(output_plugin_options,
     537             :                                  start_lsn, InvalidTransactionId, false,
     538             :                                  fast_forward, xl_routine, prepare_write,
     539             :                                  do_write, update_progress);
     540             : 
     541             :     /* call output plugin initialization callback */
     542         638 :     old_context = MemoryContextSwitchTo(ctx->context);
     543         638 :     if (ctx->callbacks.startup_cb != NULL)
     544         632 :         startup_cb_wrapper(ctx, &ctx->options, false);
     545         632 :     MemoryContextSwitchTo(old_context);
     546             : 
     547             :     /*
     548             :      * We allow decoding of prepared transactions when the two_phase is
     549             :      * enabled at the time of slot creation, or when the two_phase option is
     550             :      * given at the streaming start, provided the plugin supports all the
     551             :      * callbacks for two-phase.
     552             :      */
     553         632 :     ctx->twophase &= (slot->data.two_phase || ctx->twophase_opt_given);
     554             : 
     555             :     /* Mark slot to allow two_phase decoding if not already marked */
     556         632 :     if (ctx->twophase && !slot->data.two_phase)
     557             :     {
     558           8 :         slot->data.two_phase = true;
     559           8 :         slot->data.two_phase_at = start_lsn;
     560           8 :         ReplicationSlotMarkDirty();
     561           8 :         ReplicationSlotSave();
     562           8 :         SnapBuildSetTwoPhaseAt(ctx->snapshot_builder, start_lsn);
     563             :     }
     564             : 
     565         632 :     ctx->reorder->output_rewrites = ctx->options.receive_rewrites;
     566             : 
     567         632 :     ereport(LOG,
     568             :             (errmsg("starting logical decoding for slot \"%s\"",
     569             :                     NameStr(slot->data.name)),
     570             :              errdetail("Streaming transactions committing after %X/%X, reading WAL from %X/%X.",
     571             :                        LSN_FORMAT_ARGS(slot->data.confirmed_flush),
     572             :                        LSN_FORMAT_ARGS(slot->data.restart_lsn))));
     573             : 
     574         632 :     return ctx;
     575             : }
     576             : 
     577             : /*
     578             :  * Returns true if a consistent initial decoding snapshot has been built.
     579             :  */
     580             : bool
     581         488 : DecodingContextReady(LogicalDecodingContext *ctx)
     582             : {
     583         488 :     return SnapBuildCurrentState(ctx->snapshot_builder) == SNAPBUILD_CONSISTENT;
     584             : }
     585             : 
     586             : /*
     587             :  * Read from the decoding slot, until it is ready to start extracting changes.
     588             :  */
     589             : void
     590         420 : DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
     591             : {
     592         420 :     ReplicationSlot *slot = ctx->slot;
     593             : 
     594             :     /* Initialize from where to start reading WAL. */
     595         420 :     XLogBeginRead(ctx->reader, slot->data.restart_lsn);
     596             : 
     597         420 :     elog(DEBUG1, "searching for logical decoding starting point, starting at %X/%X",
     598             :          LSN_FORMAT_ARGS(slot->data.restart_lsn));
     599             : 
     600             :     /* Wait for a consistent starting point */
     601             :     for (;;)
     602          68 :     {
     603             :         XLogRecord *record;
     604         488 :         char       *err = NULL;
     605             : 
     606             :         /* the read_page callback waits for new WAL */
     607         488 :         record = XLogReadRecord(ctx->reader, &err);
     608         488 :         if (err)
     609           0 :             elog(ERROR, "could not find logical decoding starting point: %s", err);
     610         488 :         if (!record)
     611           0 :             elog(ERROR, "could not find logical decoding starting point");
     612             : 
     613         488 :         LogicalDecodingProcessRecord(ctx, ctx->reader);
     614             : 
     615             :         /* only continue till we found a consistent spot */
     616         488 :         if (DecodingContextReady(ctx))
     617         420 :             break;
     618             : 
     619          68 :         CHECK_FOR_INTERRUPTS();
     620             :     }
     621             : 
     622         420 :     SpinLockAcquire(&slot->mutex);
     623         420 :     slot->data.confirmed_flush = ctx->reader->EndRecPtr;
     624         420 :     if (slot->data.two_phase)
     625          10 :         slot->data.two_phase_at = ctx->reader->EndRecPtr;
     626         420 :     SpinLockRelease(&slot->mutex);
     627         420 : }
     628             : 
     629             : /*
     630             :  * Free a previously allocated decoding context, invoking the shutdown
     631             :  * callback if necessary.
     632             :  */
     633             : void
     634         938 : FreeDecodingContext(LogicalDecodingContext *ctx)
     635             : {
     636         938 :     if (ctx->callbacks.shutdown_cb != NULL)
     637         932 :         shutdown_cb_wrapper(ctx);
     638             : 
     639         938 :     ReorderBufferFree(ctx->reorder);
     640         938 :     FreeSnapshotBuilder(ctx->snapshot_builder);
     641         938 :     XLogReaderFree(ctx->reader);
     642         938 :     MemoryContextDelete(ctx->context);
     643         938 : }
     644             : 
     645             : /*
     646             :  * Prepare a write using the context's output routine.
     647             :  */
     648             : void
     649      609480 : OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
     650             : {
     651      609480 :     if (!ctx->accept_writes)
     652           0 :         elog(ERROR, "writes are only accepted in commit, begin and change callbacks");
     653             : 
     654      609480 :     ctx->prepare_write(ctx, ctx->write_location, ctx->write_xid, last_write);
     655      609480 :     ctx->prepared_write = true;
     656      609480 : }
     657             : 
     658             : /*
     659             :  * Perform a write using the context's output routine.
     660             :  */
     661             : void
     662      609480 : OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
     663             : {
     664      609480 :     if (!ctx->prepared_write)
     665           0 :         elog(ERROR, "OutputPluginPrepareWrite needs to be called before OutputPluginWrite");
     666             : 
     667      609480 :     ctx->write(ctx, ctx->write_location, ctx->write_xid, last_write);
     668      609478 :     ctx->prepared_write = false;
     669      609478 : }
     670             : 
     671             : /*
     672             :  * Update progress tracking (if supported).
     673             :  */
     674             : void
     675         638 : OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx)
     676             : {
     677         638 :     if (!ctx->update_progress)
     678          10 :         return;
     679             : 
     680         628 :     ctx->update_progress(ctx, ctx->write_location, ctx->write_xid);
     681             : }
     682             : 
     683             : /*
     684             :  * Load the output plugin, lookup its output plugin init function, and check
     685             :  * that it provides the required callbacks.
     686             :  */
     687             : static void
     688        1066 : LoadOutputPlugin(OutputPluginCallbacks *callbacks, const char *plugin)
     689             : {
     690             :     LogicalOutputPluginInit plugin_init;
     691             : 
     692        1064 :     plugin_init = (LogicalOutputPluginInit)
     693        1066 :         load_external_function(plugin, "_PG_output_plugin_init", false, NULL);
     694             : 
     695        1064 :     if (plugin_init == NULL)
     696           0 :         elog(ERROR, "output plugins have to declare the _PG_output_plugin_init symbol");
     697             : 
     698             :     /* ask the output plugin to fill the callback struct */
     699        1064 :     plugin_init(callbacks);
     700             : 
     701        1064 :     if (callbacks->begin_cb == NULL)
     702           0 :         elog(ERROR, "output plugins have to register a begin callback");
     703        1064 :     if (callbacks->change_cb == NULL)
     704           0 :         elog(ERROR, "output plugins have to register a change callback");
     705        1064 :     if (callbacks->commit_cb == NULL)
     706           0 :         elog(ERROR, "output plugins have to register a commit callback");
     707        1064 : }
     708             : 
     709             : static void
     710          12 : output_plugin_error_callback(void *arg)
     711             : {
     712          12 :     LogicalErrorCallbackState *state = (LogicalErrorCallbackState *) arg;
     713             : 
     714             :     /* not all callbacks have an associated LSN  */
     715          12 :     if (state->report_location != InvalidXLogRecPtr)
     716           6 :         errcontext("slot \"%s\", output plugin \"%s\", in the %s callback, associated LSN %X/%X",
     717           6 :                    NameStr(state->ctx->slot->data.name),
     718           6 :                    NameStr(state->ctx->slot->data.plugin),
     719             :                    state->callback_name,
     720           6 :                    LSN_FORMAT_ARGS(state->report_location));
     721             :     else
     722           6 :         errcontext("slot \"%s\", output plugin \"%s\", in the %s callback",
     723           6 :                    NameStr(state->ctx->slot->data.name),
     724           6 :                    NameStr(state->ctx->slot->data.plugin),
     725             :                    state->callback_name);
     726          12 : }
     727             : 
     728             : static void
     729        1064 : startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
     730             : {
     731             :     LogicalErrorCallbackState state;
     732             :     ErrorContextCallback errcallback;
     733             : 
     734             :     Assert(!ctx->fast_forward);
     735             : 
     736             :     /* Push callback + info on the error context stack */
     737        1064 :     state.ctx = ctx;
     738        1064 :     state.callback_name = "startup";
     739        1064 :     state.report_location = InvalidXLogRecPtr;
     740        1064 :     errcallback.callback = output_plugin_error_callback;
     741        1064 :     errcallback.arg = (void *) &state;
     742        1064 :     errcallback.previous = error_context_stack;
     743        1064 :     error_context_stack = &errcallback;
     744             : 
     745             :     /* set output state */
     746        1064 :     ctx->accept_writes = false;
     747             : 
     748             :     /* do the actual work: call callback */
     749        1064 :     ctx->callbacks.startup_cb(ctx, opt, is_init);
     750             : 
     751             :     /* Pop the error context stack */
     752        1058 :     error_context_stack = errcallback.previous;
     753        1058 : }
     754             : 
     755             : static void
     756         932 : shutdown_cb_wrapper(LogicalDecodingContext *ctx)
     757             : {
     758             :     LogicalErrorCallbackState state;
     759             :     ErrorContextCallback errcallback;
     760             : 
     761             :     Assert(!ctx->fast_forward);
     762             : 
     763             :     /* Push callback + info on the error context stack */
     764         932 :     state.ctx = ctx;
     765         932 :     state.callback_name = "shutdown";
     766         932 :     state.report_location = InvalidXLogRecPtr;
     767         932 :     errcallback.callback = output_plugin_error_callback;
     768         932 :     errcallback.arg = (void *) &state;
     769         932 :     errcallback.previous = error_context_stack;
     770         932 :     error_context_stack = &errcallback;
     771             : 
     772             :     /* set output state */
     773         932 :     ctx->accept_writes = false;
     774             : 
     775             :     /* do the actual work: call callback */
     776         932 :     ctx->callbacks.shutdown_cb(ctx);
     777             : 
     778             :     /* Pop the error context stack */
     779         932 :     error_context_stack = errcallback.previous;
     780         932 : }
     781             : 
     782             : 
     783             : /*
     784             :  * Callbacks for ReorderBuffer which add in some more information and then call
     785             :  * output_plugin.h plugins.
     786             :  */
     787             : static void
     788        1284 : begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
     789             : {
     790        1284 :     LogicalDecodingContext *ctx = cache->private_data;
     791             :     LogicalErrorCallbackState state;
     792             :     ErrorContextCallback errcallback;
     793             : 
     794             :     Assert(!ctx->fast_forward);
     795             : 
     796             :     /* Push callback + info on the error context stack */
     797        1284 :     state.ctx = ctx;
     798        1284 :     state.callback_name = "begin";
     799        1284 :     state.report_location = txn->first_lsn;
     800        1284 :     errcallback.callback = output_plugin_error_callback;
     801        1284 :     errcallback.arg = (void *) &state;
     802        1284 :     errcallback.previous = error_context_stack;
     803        1284 :     error_context_stack = &errcallback;
     804             : 
     805             :     /* set output state */
     806        1284 :     ctx->accept_writes = true;
     807        1284 :     ctx->write_xid = txn->xid;
     808        1284 :     ctx->write_location = txn->first_lsn;
     809             : 
     810             :     /* do the actual work: call callback */
     811        1284 :     ctx->callbacks.begin_cb(ctx, txn);
     812             : 
     813             :     /* Pop the error context stack */
     814        1284 :     error_context_stack = errcallback.previous;
     815        1284 : }
     816             : 
     817             : static void
     818        1280 : commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
     819             :                   XLogRecPtr commit_lsn)
     820             : {
     821        1280 :     LogicalDecodingContext *ctx = cache->private_data;
     822             :     LogicalErrorCallbackState state;
     823             :     ErrorContextCallback errcallback;
     824             : 
     825             :     Assert(!ctx->fast_forward);
     826             : 
     827             :     /* Push callback + info on the error context stack */
     828        1280 :     state.ctx = ctx;
     829        1280 :     state.callback_name = "commit";
     830        1280 :     state.report_location = txn->final_lsn; /* beginning of commit record */
     831        1280 :     errcallback.callback = output_plugin_error_callback;
     832        1280 :     errcallback.arg = (void *) &state;
     833        1280 :     errcallback.previous = error_context_stack;
     834        1280 :     error_context_stack = &errcallback;
     835             : 
     836             :     /* set output state */
     837        1280 :     ctx->accept_writes = true;
     838        1280 :     ctx->write_xid = txn->xid;
     839        1280 :     ctx->write_location = txn->end_lsn; /* points to the end of the record */
     840             : 
     841             :     /* do the actual work: call callback */
     842        1280 :     ctx->callbacks.commit_cb(ctx, txn, commit_lsn);
     843             : 
     844             :     /* Pop the error context stack */
     845        1280 :     error_context_stack = errcallback.previous;
     846        1280 : }
     847             : 
     848             : /*
     849             :  * The functionality of begin_prepare is quite similar to begin with the
     850             :  * exception that this will have gid (global transaction id) information which
     851             :  * can be used by plugin. Now, we thought about extending the existing begin
     852             :  * but that would break the replication protocol and additionally this looks
     853             :  * cleaner.
     854             :  */
     855             : static void
     856          46 : begin_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
     857             : {
     858          46 :     LogicalDecodingContext *ctx = cache->private_data;
     859             :     LogicalErrorCallbackState state;
     860             :     ErrorContextCallback errcallback;
     861             : 
     862             :     Assert(!ctx->fast_forward);
     863             : 
     864             :     /* We're only supposed to call this when two-phase commits are supported */
     865             :     Assert(ctx->twophase);
     866             : 
     867             :     /* Push callback + info on the error context stack */
     868          46 :     state.ctx = ctx;
     869          46 :     state.callback_name = "begin_prepare";
     870          46 :     state.report_location = txn->first_lsn;
     871          46 :     errcallback.callback = output_plugin_error_callback;
     872          46 :     errcallback.arg = (void *) &state;
     873          46 :     errcallback.previous = error_context_stack;
     874          46 :     error_context_stack = &errcallback;
     875             : 
     876             :     /* set output state */
     877          46 :     ctx->accept_writes = true;
     878          46 :     ctx->write_xid = txn->xid;
     879          46 :     ctx->write_location = txn->first_lsn;
     880             : 
     881             :     /*
     882             :      * If the plugin supports two-phase commits then begin prepare callback is
     883             :      * mandatory
     884             :      */
     885          46 :     if (ctx->callbacks.begin_prepare_cb == NULL)
     886           0 :         ereport(ERROR,
     887             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     888             :                  errmsg("logical replication at prepare time requires a %s callback",
     889             :                         "begin_prepare_cb")));
     890             : 
     891             :     /* do the actual work: call callback */
     892          46 :     ctx->callbacks.begin_prepare_cb(ctx, txn);
     893             : 
     894             :     /* Pop the error context stack */
     895          46 :     error_context_stack = errcallback.previous;
     896          46 : }
     897             : 
     898             : static void
     899          46 : prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
     900             :                    XLogRecPtr prepare_lsn)
     901             : {
     902          46 :     LogicalDecodingContext *ctx = cache->private_data;
     903             :     LogicalErrorCallbackState state;
     904             :     ErrorContextCallback errcallback;
     905             : 
     906             :     Assert(!ctx->fast_forward);
     907             : 
     908             :     /* We're only supposed to call this when two-phase commits are supported */
     909             :     Assert(ctx->twophase);
     910             : 
     911             :     /* Push callback + info on the error context stack */
     912          46 :     state.ctx = ctx;
     913          46 :     state.callback_name = "prepare";
     914          46 :     state.report_location = txn->final_lsn; /* beginning of prepare record */
     915          46 :     errcallback.callback = output_plugin_error_callback;
     916          46 :     errcallback.arg = (void *) &state;
     917          46 :     errcallback.previous = error_context_stack;
     918          46 :     error_context_stack = &errcallback;
     919             : 
     920             :     /* set output state */
     921          46 :     ctx->accept_writes = true;
     922          46 :     ctx->write_xid = txn->xid;
     923          46 :     ctx->write_location = txn->end_lsn; /* points to the end of the record */
     924             : 
     925             :     /*
     926             :      * If the plugin supports two-phase commits then prepare callback is
     927             :      * mandatory
     928             :      */
     929          46 :     if (ctx->callbacks.prepare_cb == NULL)
     930           0 :         ereport(ERROR,
     931             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     932             :                  errmsg("logical replication at prepare time requires a %s callback",
     933             :                         "prepare_cb")));
     934             : 
     935             :     /* do the actual work: call callback */
     936          46 :     ctx->callbacks.prepare_cb(ctx, txn, prepare_lsn);
     937             : 
     938             :     /* Pop the error context stack */
     939          46 :     error_context_stack = errcallback.previous;
     940          46 : }
     941             : 
     942             : static void
     943          48 : commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
     944             :                            XLogRecPtr commit_lsn)
     945             : {
     946          48 :     LogicalDecodingContext *ctx = cache->private_data;
     947             :     LogicalErrorCallbackState state;
     948             :     ErrorContextCallback errcallback;
     949             : 
     950             :     Assert(!ctx->fast_forward);
     951             : 
     952             :     /* We're only supposed to call this when two-phase commits are supported */
     953             :     Assert(ctx->twophase);
     954             : 
     955             :     /* Push callback + info on the error context stack */
     956          48 :     state.ctx = ctx;
     957          48 :     state.callback_name = "commit_prepared";
     958          48 :     state.report_location = txn->final_lsn; /* beginning of commit record */
     959          48 :     errcallback.callback = output_plugin_error_callback;
     960          48 :     errcallback.arg = (void *) &state;
     961          48 :     errcallback.previous = error_context_stack;
     962          48 :     error_context_stack = &errcallback;
     963             : 
     964             :     /* set output state */
     965          48 :     ctx->accept_writes = true;
     966          48 :     ctx->write_xid = txn->xid;
     967          48 :     ctx->write_location = txn->end_lsn; /* points to the end of the record */
     968             : 
     969             :     /*
     970             :      * If the plugin support two-phase commits then commit prepared callback
     971             :      * is mandatory
     972             :      */
     973          48 :     if (ctx->callbacks.commit_prepared_cb == NULL)
     974           0 :         ereport(ERROR,
     975             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     976             :                  errmsg("logical replication at prepare time requires a %s callback",
     977             :                         "commit_prepared_cb")));
     978             : 
     979             :     /* do the actual work: call callback */
     980          48 :     ctx->callbacks.commit_prepared_cb(ctx, txn, commit_lsn);
     981             : 
     982             :     /* Pop the error context stack */
     983          48 :     error_context_stack = errcallback.previous;
     984          48 : }
     985             : 
     986             : static void
     987          16 : rollback_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
     988             :                              XLogRecPtr prepare_end_lsn,
     989             :                              TimestampTz prepare_time)
     990             : {
     991          16 :     LogicalDecodingContext *ctx = cache->private_data;
     992             :     LogicalErrorCallbackState state;
     993             :     ErrorContextCallback errcallback;
     994             : 
     995             :     Assert(!ctx->fast_forward);
     996             : 
     997             :     /* We're only supposed to call this when two-phase commits are supported */
     998             :     Assert(ctx->twophase);
     999             : 
    1000             :     /* Push callback + info on the error context stack */
    1001          16 :     state.ctx = ctx;
    1002          16 :     state.callback_name = "rollback_prepared";
    1003          16 :     state.report_location = txn->final_lsn; /* beginning of commit record */
    1004          16 :     errcallback.callback = output_plugin_error_callback;
    1005          16 :     errcallback.arg = (void *) &state;
    1006          16 :     errcallback.previous = error_context_stack;
    1007          16 :     error_context_stack = &errcallback;
    1008             : 
    1009             :     /* set output state */
    1010          16 :     ctx->accept_writes = true;
    1011          16 :     ctx->write_xid = txn->xid;
    1012          16 :     ctx->write_location = txn->end_lsn; /* points to the end of the record */
    1013             : 
    1014             :     /*
    1015             :      * If the plugin support two-phase commits then rollback prepared callback
    1016             :      * is mandatory
    1017             :      */
    1018          16 :     if (ctx->callbacks.rollback_prepared_cb == NULL)
    1019           0 :         ereport(ERROR,
    1020             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1021             :                  errmsg("logical replication at prepare time requires a %s callback",
    1022             :                         "rollback_prepared_cb")));
    1023             : 
    1024             :     /* do the actual work: call callback */
    1025          16 :     ctx->callbacks.rollback_prepared_cb(ctx, txn, prepare_end_lsn,
    1026             :                                         prepare_time);
    1027             : 
    1028             :     /* Pop the error context stack */
    1029          16 :     error_context_stack = errcallback.previous;
    1030          16 : }
    1031             : 
    1032             : static void
    1033      304792 : change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1034             :                   Relation relation, ReorderBufferChange *change)
    1035             : {
    1036      304792 :     LogicalDecodingContext *ctx = cache->private_data;
    1037             :     LogicalErrorCallbackState state;
    1038             :     ErrorContextCallback errcallback;
    1039             : 
    1040             :     Assert(!ctx->fast_forward);
    1041             : 
    1042             :     /* Push callback + info on the error context stack */
    1043      304792 :     state.ctx = ctx;
    1044      304792 :     state.callback_name = "change";
    1045      304792 :     state.report_location = change->lsn;
    1046      304792 :     errcallback.callback = output_plugin_error_callback;
    1047      304792 :     errcallback.arg = (void *) &state;
    1048      304792 :     errcallback.previous = error_context_stack;
    1049      304792 :     error_context_stack = &errcallback;
    1050             : 
    1051             :     /* set output state */
    1052      304792 :     ctx->accept_writes = true;
    1053      304792 :     ctx->write_xid = txn->xid;
    1054             : 
    1055             :     /*
    1056             :      * Report this change's lsn so replies from clients can give an up-to-date
    1057             :      * answer. This won't ever be enough (and shouldn't be!) to confirm
    1058             :      * receipt of this transaction, but it might allow another transaction's
    1059             :      * commit to be confirmed with one message.
    1060             :      */
    1061      304792 :     ctx->write_location = change->lsn;
    1062             : 
    1063      304792 :     ctx->callbacks.change_cb(ctx, txn, relation, change);
    1064             : 
    1065             :     /* Pop the error context stack */
    1066      304788 :     error_context_stack = errcallback.previous;
    1067      304788 : }
    1068             : 
    1069             : static void
    1070          24 : truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1071             :                     int nrelations, Relation relations[], ReorderBufferChange *change)
    1072             : {
    1073          24 :     LogicalDecodingContext *ctx = cache->private_data;
    1074             :     LogicalErrorCallbackState state;
    1075             :     ErrorContextCallback errcallback;
    1076             : 
    1077             :     Assert(!ctx->fast_forward);
    1078             : 
    1079          24 :     if (!ctx->callbacks.truncate_cb)
    1080           0 :         return;
    1081             : 
    1082             :     /* Push callback + info on the error context stack */
    1083          24 :     state.ctx = ctx;
    1084          24 :     state.callback_name = "truncate";
    1085          24 :     state.report_location = change->lsn;
    1086          24 :     errcallback.callback = output_plugin_error_callback;
    1087          24 :     errcallback.arg = (void *) &state;
    1088          24 :     errcallback.previous = error_context_stack;
    1089          24 :     error_context_stack = &errcallback;
    1090             : 
    1091             :     /* set output state */
    1092          24 :     ctx->accept_writes = true;
    1093          24 :     ctx->write_xid = txn->xid;
    1094             : 
    1095             :     /*
    1096             :      * Report this change's lsn so replies from clients can give an up-to-date
    1097             :      * answer. This won't ever be enough (and shouldn't be!) to confirm
    1098             :      * receipt of this transaction, but it might allow another transaction's
    1099             :      * commit to be confirmed with one message.
    1100             :      */
    1101          24 :     ctx->write_location = change->lsn;
    1102             : 
    1103          24 :     ctx->callbacks.truncate_cb(ctx, txn, nrelations, relations, change);
    1104             : 
    1105             :     /* Pop the error context stack */
    1106          24 :     error_context_stack = errcallback.previous;
    1107             : }
    1108             : 
    1109             : bool
    1110         234 : filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, TransactionId xid,
    1111             :                           const char *gid)
    1112             : {
    1113             :     LogicalErrorCallbackState state;
    1114             :     ErrorContextCallback errcallback;
    1115             :     bool        ret;
    1116             : 
    1117             :     Assert(!ctx->fast_forward);
    1118             : 
    1119             :     /* Push callback + info on the error context stack */
    1120         234 :     state.ctx = ctx;
    1121         234 :     state.callback_name = "filter_prepare";
    1122         234 :     state.report_location = InvalidXLogRecPtr;
    1123         234 :     errcallback.callback = output_plugin_error_callback;
    1124         234 :     errcallback.arg = (void *) &state;
    1125         234 :     errcallback.previous = error_context_stack;
    1126         234 :     error_context_stack = &errcallback;
    1127             : 
    1128             :     /* set output state */
    1129         234 :     ctx->accept_writes = false;
    1130             : 
    1131             :     /* do the actual work: call callback */
    1132         234 :     ret = ctx->callbacks.filter_prepare_cb(ctx, xid, gid);
    1133             : 
    1134             :     /* Pop the error context stack */
    1135         234 :     error_context_stack = errcallback.previous;
    1136             : 
    1137         234 :     return ret;
    1138             : }
    1139             : 
    1140             : bool
    1141     2784148 : filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
    1142             : {
    1143             :     LogicalErrorCallbackState state;
    1144             :     ErrorContextCallback errcallback;
    1145             :     bool        ret;
    1146             : 
    1147             :     Assert(!ctx->fast_forward);
    1148             : 
    1149             :     /* Push callback + info on the error context stack */
    1150     2784148 :     state.ctx = ctx;
    1151     2784148 :     state.callback_name = "filter_by_origin";
    1152     2784148 :     state.report_location = InvalidXLogRecPtr;
    1153     2784148 :     errcallback.callback = output_plugin_error_callback;
    1154     2784148 :     errcallback.arg = (void *) &state;
    1155     2784148 :     errcallback.previous = error_context_stack;
    1156     2784148 :     error_context_stack = &errcallback;
    1157             : 
    1158             :     /* set output state */
    1159     2784148 :     ctx->accept_writes = false;
    1160             : 
    1161             :     /* do the actual work: call callback */
    1162     2784148 :     ret = ctx->callbacks.filter_by_origin_cb(ctx, origin_id);
    1163             : 
    1164             :     /* Pop the error context stack */
    1165     2784148 :     error_context_stack = errcallback.previous;
    1166             : 
    1167     2784148 :     return ret;
    1168             : }
    1169             : 
    1170             : static void
    1171          28 : message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1172             :                    XLogRecPtr message_lsn, bool transactional,
    1173             :                    const char *prefix, Size message_size, const char *message)
    1174             : {
    1175          28 :     LogicalDecodingContext *ctx = cache->private_data;
    1176             :     LogicalErrorCallbackState state;
    1177             :     ErrorContextCallback errcallback;
    1178             : 
    1179             :     Assert(!ctx->fast_forward);
    1180             : 
    1181          28 :     if (ctx->callbacks.message_cb == NULL)
    1182           0 :         return;
    1183             : 
    1184             :     /* Push callback + info on the error context stack */
    1185          28 :     state.ctx = ctx;
    1186          28 :     state.callback_name = "message";
    1187          28 :     state.report_location = message_lsn;
    1188          28 :     errcallback.callback = output_plugin_error_callback;
    1189          28 :     errcallback.arg = (void *) &state;
    1190          28 :     errcallback.previous = error_context_stack;
    1191          28 :     error_context_stack = &errcallback;
    1192             : 
    1193             :     /* set output state */
    1194          28 :     ctx->accept_writes = true;
    1195          28 :     ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
    1196          28 :     ctx->write_location = message_lsn;
    1197             : 
    1198             :     /* do the actual work: call callback */
    1199          28 :     ctx->callbacks.message_cb(ctx, txn, message_lsn, transactional, prefix,
    1200             :                               message_size, message);
    1201             : 
    1202             :     /* Pop the error context stack */
    1203          28 :     error_context_stack = errcallback.previous;
    1204             : }
    1205             : 
    1206             : static void
    1207         822 : stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1208             :                         XLogRecPtr first_lsn)
    1209             : {
    1210         822 :     LogicalDecodingContext *ctx = cache->private_data;
    1211             :     LogicalErrorCallbackState state;
    1212             :     ErrorContextCallback errcallback;
    1213             : 
    1214             :     Assert(!ctx->fast_forward);
    1215             : 
    1216             :     /* We're only supposed to call this when streaming is supported. */
    1217             :     Assert(ctx->streaming);
    1218             : 
    1219             :     /* Push callback + info on the error context stack */
    1220         822 :     state.ctx = ctx;
    1221         822 :     state.callback_name = "stream_start";
    1222         822 :     state.report_location = first_lsn;
    1223         822 :     errcallback.callback = output_plugin_error_callback;
    1224         822 :     errcallback.arg = (void *) &state;
    1225         822 :     errcallback.previous = error_context_stack;
    1226         822 :     error_context_stack = &errcallback;
    1227             : 
    1228             :     /* set output state */
    1229         822 :     ctx->accept_writes = true;
    1230         822 :     ctx->write_xid = txn->xid;
    1231             : 
    1232             :     /*
    1233             :      * Report this message's lsn so replies from clients can give an
    1234             :      * up-to-date answer. This won't ever be enough (and shouldn't be!) to
    1235             :      * confirm receipt of this transaction, but it might allow another
    1236             :      * transaction's commit to be confirmed with one message.
    1237             :      */
    1238         822 :     ctx->write_location = first_lsn;
    1239             : 
    1240             :     /* in streaming mode, stream_start_cb is required */
    1241         822 :     if (ctx->callbacks.stream_start_cb == NULL)
    1242           0 :         ereport(ERROR,
    1243             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1244             :                  errmsg("logical streaming requires a %s callback",
    1245             :                         "stream_start_cb")));
    1246             : 
    1247         822 :     ctx->callbacks.stream_start_cb(ctx, txn);
    1248             : 
    1249             :     /* Pop the error context stack */
    1250         822 :     error_context_stack = errcallback.previous;
    1251         822 : }
    1252             : 
    1253             : static void
    1254         820 : stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1255             :                        XLogRecPtr last_lsn)
    1256             : {
    1257         820 :     LogicalDecodingContext *ctx = cache->private_data;
    1258             :     LogicalErrorCallbackState state;
    1259             :     ErrorContextCallback errcallback;
    1260             : 
    1261             :     Assert(!ctx->fast_forward);
    1262             : 
    1263             :     /* We're only supposed to call this when streaming is supported. */
    1264             :     Assert(ctx->streaming);
    1265             : 
    1266             :     /* Push callback + info on the error context stack */
    1267         820 :     state.ctx = ctx;
    1268         820 :     state.callback_name = "stream_stop";
    1269         820 :     state.report_location = last_lsn;
    1270         820 :     errcallback.callback = output_plugin_error_callback;
    1271         820 :     errcallback.arg = (void *) &state;
    1272         820 :     errcallback.previous = error_context_stack;
    1273         820 :     error_context_stack = &errcallback;
    1274             : 
    1275             :     /* set output state */
    1276         820 :     ctx->accept_writes = true;
    1277         820 :     ctx->write_xid = txn->xid;
    1278             : 
    1279             :     /*
    1280             :      * Report this message's lsn so replies from clients can give an
    1281             :      * up-to-date answer. This won't ever be enough (and shouldn't be!) to
    1282             :      * confirm receipt of this transaction, but it might allow another
    1283             :      * transaction's commit to be confirmed with one message.
    1284             :      */
    1285         820 :     ctx->write_location = last_lsn;
    1286             : 
    1287             :     /* in streaming mode, stream_stop_cb is required */
    1288         820 :     if (ctx->callbacks.stream_stop_cb == NULL)
    1289           0 :         ereport(ERROR,
    1290             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1291             :                  errmsg("logical streaming requires a %s callback",
    1292             :                         "stream_stop_cb")));
    1293             : 
    1294         820 :     ctx->callbacks.stream_stop_cb(ctx, txn);
    1295             : 
    1296             :     /* Pop the error context stack */
    1297         820 :     error_context_stack = errcallback.previous;
    1298         820 : }
    1299             : 
    1300             : static void
    1301          28 : stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1302             :                         XLogRecPtr abort_lsn)
    1303             : {
    1304          28 :     LogicalDecodingContext *ctx = cache->private_data;
    1305             :     LogicalErrorCallbackState state;
    1306             :     ErrorContextCallback errcallback;
    1307             : 
    1308             :     Assert(!ctx->fast_forward);
    1309             : 
    1310             :     /* We're only supposed to call this when streaming is supported. */
    1311             :     Assert(ctx->streaming);
    1312             : 
    1313             :     /* Push callback + info on the error context stack */
    1314          28 :     state.ctx = ctx;
    1315          28 :     state.callback_name = "stream_abort";
    1316          28 :     state.report_location = abort_lsn;
    1317          28 :     errcallback.callback = output_plugin_error_callback;
    1318          28 :     errcallback.arg = (void *) &state;
    1319          28 :     errcallback.previous = error_context_stack;
    1320          28 :     error_context_stack = &errcallback;
    1321             : 
    1322             :     /* set output state */
    1323          28 :     ctx->accept_writes = true;
    1324          28 :     ctx->write_xid = txn->xid;
    1325          28 :     ctx->write_location = abort_lsn;
    1326             : 
    1327             :     /* in streaming mode, stream_abort_cb is required */
    1328          28 :     if (ctx->callbacks.stream_abort_cb == NULL)
    1329           0 :         ereport(ERROR,
    1330             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1331             :                  errmsg("logical streaming requires a %s callback",
    1332             :                         "stream_abort_cb")));
    1333             : 
    1334          28 :     ctx->callbacks.stream_abort_cb(ctx, txn, abort_lsn);
    1335             : 
    1336             :     /* Pop the error context stack */
    1337          28 :     error_context_stack = errcallback.previous;
    1338          28 : }
    1339             : 
    1340             : static void
    1341          18 : stream_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1342             :                           XLogRecPtr prepare_lsn)
    1343             : {
    1344          18 :     LogicalDecodingContext *ctx = cache->private_data;
    1345             :     LogicalErrorCallbackState state;
    1346             :     ErrorContextCallback errcallback;
    1347             : 
    1348             :     Assert(!ctx->fast_forward);
    1349             : 
    1350             :     /*
    1351             :      * We're only supposed to call this when streaming and two-phase commits
    1352             :      * are supported.
    1353             :      */
    1354             :     Assert(ctx->streaming);
    1355             :     Assert(ctx->twophase);
    1356             : 
    1357             :     /* Push callback + info on the error context stack */
    1358          18 :     state.ctx = ctx;
    1359          18 :     state.callback_name = "stream_prepare";
    1360          18 :     state.report_location = txn->final_lsn;
    1361          18 :     errcallback.callback = output_plugin_error_callback;
    1362          18 :     errcallback.arg = (void *) &state;
    1363          18 :     errcallback.previous = error_context_stack;
    1364          18 :     error_context_stack = &errcallback;
    1365             : 
    1366             :     /* set output state */
    1367          18 :     ctx->accept_writes = true;
    1368          18 :     ctx->write_xid = txn->xid;
    1369          18 :     ctx->write_location = txn->end_lsn;
    1370             : 
    1371             :     /* in streaming mode with two-phase commits, stream_prepare_cb is required */
    1372          18 :     if (ctx->callbacks.stream_prepare_cb == NULL)
    1373           0 :         ereport(ERROR,
    1374             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1375             :                  errmsg("logical streaming at prepare time requires a %s callback",
    1376             :                         "stream_prepare_cb")));
    1377             : 
    1378          18 :     ctx->callbacks.stream_prepare_cb(ctx, txn, prepare_lsn);
    1379             : 
    1380             :     /* Pop the error context stack */
    1381          18 :     error_context_stack = errcallback.previous;
    1382          18 : }
    1383             : 
    1384             : static void
    1385          36 : stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1386             :                          XLogRecPtr commit_lsn)
    1387             : {
    1388          36 :     LogicalDecodingContext *ctx = cache->private_data;
    1389             :     LogicalErrorCallbackState state;
    1390             :     ErrorContextCallback errcallback;
    1391             : 
    1392             :     Assert(!ctx->fast_forward);
    1393             : 
    1394             :     /* We're only supposed to call this when streaming is supported. */
    1395             :     Assert(ctx->streaming);
    1396             : 
    1397             :     /* Push callback + info on the error context stack */
    1398          36 :     state.ctx = ctx;
    1399          36 :     state.callback_name = "stream_commit";
    1400          36 :     state.report_location = txn->final_lsn;
    1401          36 :     errcallback.callback = output_plugin_error_callback;
    1402          36 :     errcallback.arg = (void *) &state;
    1403          36 :     errcallback.previous = error_context_stack;
    1404          36 :     error_context_stack = &errcallback;
    1405             : 
    1406             :     /* set output state */
    1407          36 :     ctx->accept_writes = true;
    1408          36 :     ctx->write_xid = txn->xid;
    1409          36 :     ctx->write_location = txn->end_lsn;
    1410             : 
    1411             :     /* in streaming mode, stream_commit_cb is required */
    1412          36 :     if (ctx->callbacks.stream_commit_cb == NULL)
    1413           0 :         ereport(ERROR,
    1414             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1415             :                  errmsg("logical streaming requires a %s callback",
    1416             :                         "stream_commit_cb")));
    1417             : 
    1418          36 :     ctx->callbacks.stream_commit_cb(ctx, txn, commit_lsn);
    1419             : 
    1420             :     /* Pop the error context stack */
    1421          36 :     error_context_stack = errcallback.previous;
    1422          36 : }
    1423             : 
    1424             : static void
    1425      302792 : stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1426             :                          Relation relation, ReorderBufferChange *change)
    1427             : {
    1428      302792 :     LogicalDecodingContext *ctx = cache->private_data;
    1429             :     LogicalErrorCallbackState state;
    1430             :     ErrorContextCallback errcallback;
    1431             : 
    1432             :     Assert(!ctx->fast_forward);
    1433             : 
    1434             :     /* We're only supposed to call this when streaming is supported. */
    1435             :     Assert(ctx->streaming);
    1436             : 
    1437             :     /* Push callback + info on the error context stack */
    1438      302792 :     state.ctx = ctx;
    1439      302792 :     state.callback_name = "stream_change";
    1440      302792 :     state.report_location = change->lsn;
    1441      302792 :     errcallback.callback = output_plugin_error_callback;
    1442      302792 :     errcallback.arg = (void *) &state;
    1443      302792 :     errcallback.previous = error_context_stack;
    1444      302792 :     error_context_stack = &errcallback;
    1445             : 
    1446             :     /* set output state */
    1447      302792 :     ctx->accept_writes = true;
    1448      302792 :     ctx->write_xid = txn->xid;
    1449             : 
    1450             :     /*
    1451             :      * Report this change's lsn so replies from clients can give an up-to-date
    1452             :      * answer. This won't ever be enough (and shouldn't be!) to confirm
    1453             :      * receipt of this transaction, but it might allow another transaction's
    1454             :      * commit to be confirmed with one message.
    1455             :      */
    1456      302792 :     ctx->write_location = change->lsn;
    1457             : 
    1458             :     /* in streaming mode, stream_change_cb is required */
    1459      302792 :     if (ctx->callbacks.stream_change_cb == NULL)
    1460           0 :         ereport(ERROR,
    1461             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1462             :                  errmsg("logical streaming requires a %s callback",
    1463             :                         "stream_change_cb")));
    1464             : 
    1465      302792 :     ctx->callbacks.stream_change_cb(ctx, txn, relation, change);
    1466             : 
    1467             :     /* Pop the error context stack */
    1468      302790 :     error_context_stack = errcallback.previous;
    1469      302790 : }
    1470             : 
    1471             : static void
    1472           6 : stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1473             :                           XLogRecPtr message_lsn, bool transactional,
    1474             :                           const char *prefix, Size message_size, const char *message)
    1475             : {
    1476           6 :     LogicalDecodingContext *ctx = cache->private_data;
    1477             :     LogicalErrorCallbackState state;
    1478             :     ErrorContextCallback errcallback;
    1479             : 
    1480             :     Assert(!ctx->fast_forward);
    1481             : 
    1482             :     /* We're only supposed to call this when streaming is supported. */
    1483             :     Assert(ctx->streaming);
    1484             : 
    1485             :     /* this callback is optional */
    1486           6 :     if (ctx->callbacks.stream_message_cb == NULL)
    1487           0 :         return;
    1488             : 
    1489             :     /* Push callback + info on the error context stack */
    1490           6 :     state.ctx = ctx;
    1491           6 :     state.callback_name = "stream_message";
    1492           6 :     state.report_location = message_lsn;
    1493           6 :     errcallback.callback = output_plugin_error_callback;
    1494           6 :     errcallback.arg = (void *) &state;
    1495           6 :     errcallback.previous = error_context_stack;
    1496           6 :     error_context_stack = &errcallback;
    1497             : 
    1498             :     /* set output state */
    1499           6 :     ctx->accept_writes = true;
    1500           6 :     ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
    1501           6 :     ctx->write_location = message_lsn;
    1502             : 
    1503             :     /* do the actual work: call callback */
    1504           6 :     ctx->callbacks.stream_message_cb(ctx, txn, message_lsn, transactional, prefix,
    1505             :                                      message_size, message);
    1506             : 
    1507             :     /* Pop the error context stack */
    1508           6 :     error_context_stack = errcallback.previous;
    1509             : }
    1510             : 
    1511             : static void
    1512           0 : stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
    1513             :                            int nrelations, Relation relations[],
    1514             :                            ReorderBufferChange *change)
    1515             : {
    1516           0 :     LogicalDecodingContext *ctx = cache->private_data;
    1517             :     LogicalErrorCallbackState state;
    1518             :     ErrorContextCallback errcallback;
    1519             : 
    1520             :     Assert(!ctx->fast_forward);
    1521             : 
    1522             :     /* We're only supposed to call this when streaming is supported. */
    1523             :     Assert(ctx->streaming);
    1524             : 
    1525             :     /* this callback is optional */
    1526           0 :     if (!ctx->callbacks.stream_truncate_cb)
    1527           0 :         return;
    1528             : 
    1529             :     /* Push callback + info on the error context stack */
    1530           0 :     state.ctx = ctx;
    1531           0 :     state.callback_name = "stream_truncate";
    1532           0 :     state.report_location = change->lsn;
    1533           0 :     errcallback.callback = output_plugin_error_callback;
    1534           0 :     errcallback.arg = (void *) &state;
    1535           0 :     errcallback.previous = error_context_stack;
    1536           0 :     error_context_stack = &errcallback;
    1537             : 
    1538             :     /* set output state */
    1539           0 :     ctx->accept_writes = true;
    1540           0 :     ctx->write_xid = txn->xid;
    1541             : 
    1542             :     /*
    1543             :      * Report this change's lsn so replies from clients can give an up-to-date
    1544             :      * answer. This won't ever be enough (and shouldn't be!) to confirm
    1545             :      * receipt of this transaction, but it might allow another transaction's
    1546             :      * commit to be confirmed with one message.
    1547             :      */
    1548           0 :     ctx->write_location = change->lsn;
    1549             : 
    1550           0 :     ctx->callbacks.stream_truncate_cb(ctx, txn, nrelations, relations, change);
    1551             : 
    1552             :     /* Pop the error context stack */
    1553           0 :     error_context_stack = errcallback.previous;
    1554             : }
    1555             : 
    1556             : /*
    1557             :  * Set the required catalog xmin horizon for historic snapshots in the current
    1558             :  * replication slot.
    1559             :  *
    1560             :  * Note that in the most cases, we won't be able to immediately use the xmin
    1561             :  * to increase the xmin horizon: we need to wait till the client has confirmed
    1562             :  * receiving current_lsn with LogicalConfirmReceivedLocation().
    1563             :  */
    1564             : void
    1565         306 : LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
    1566             : {
    1567         306 :     bool        updated_xmin = false;
    1568             :     ReplicationSlot *slot;
    1569         306 :     bool        got_new_xmin = false;
    1570             : 
    1571         306 :     slot = MyReplicationSlot;
    1572             : 
    1573             :     Assert(slot != NULL);
    1574             : 
    1575         306 :     SpinLockAcquire(&slot->mutex);
    1576             : 
    1577             :     /*
    1578             :      * don't overwrite if we already have a newer xmin. This can happen if we
    1579             :      * restart decoding in a slot.
    1580             :      */
    1581         306 :     if (TransactionIdPrecedesOrEquals(xmin, slot->data.catalog_xmin))
    1582             :     {
    1583             :     }
    1584             : 
    1585             :     /*
    1586             :      * If the client has already confirmed up to this lsn, we directly can
    1587             :      * mark this as accepted. This can happen if we restart decoding in a
    1588             :      * slot.
    1589             :      */
    1590          84 :     else if (current_lsn <= slot->data.confirmed_flush)
    1591             :     {
    1592          30 :         slot->candidate_catalog_xmin = xmin;
    1593          30 :         slot->candidate_xmin_lsn = current_lsn;
    1594             : 
    1595             :         /* our candidate can directly be used */
    1596          30 :         updated_xmin = true;
    1597             :     }
    1598             : 
    1599             :     /*
    1600             :      * Only increase if the previous values have been applied, otherwise we
    1601             :      * might never end up updating if the receiver acks too slowly.
    1602             :      */
    1603          54 :     else if (slot->candidate_xmin_lsn == InvalidXLogRecPtr)
    1604             :     {
    1605          18 :         slot->candidate_catalog_xmin = xmin;
    1606          18 :         slot->candidate_xmin_lsn = current_lsn;
    1607             : 
    1608             :         /*
    1609             :          * Log new xmin at an appropriate log level after releasing the
    1610             :          * spinlock.
    1611             :          */
    1612          18 :         got_new_xmin = true;
    1613             :     }
    1614         306 :     SpinLockRelease(&slot->mutex);
    1615             : 
    1616         306 :     if (got_new_xmin)
    1617          18 :         elog(DEBUG1, "got new catalog xmin %u at %X/%X", xmin,
    1618             :              LSN_FORMAT_ARGS(current_lsn));
    1619             : 
    1620             :     /* candidate already valid with the current flush position, apply */
    1621         306 :     if (updated_xmin)
    1622          30 :         LogicalConfirmReceivedLocation(slot->data.confirmed_flush);
    1623         306 : }
    1624             : 
    1625             : /*
    1626             :  * Mark the minimal LSN (restart_lsn) we need to read to replay all
    1627             :  * transactions that have not yet committed at current_lsn.
    1628             :  *
    1629             :  * Just like LogicalIncreaseXminForSlot this only takes effect when the
    1630             :  * client has confirmed to have received current_lsn.
    1631             :  */
    1632             : void
    1633         246 : LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart_lsn)
    1634             : {
    1635         246 :     bool        updated_lsn = false;
    1636             :     ReplicationSlot *slot;
    1637             : 
    1638         246 :     slot = MyReplicationSlot;
    1639             : 
    1640             :     Assert(slot != NULL);
    1641             :     Assert(restart_lsn != InvalidXLogRecPtr);
    1642             :     Assert(current_lsn != InvalidXLogRecPtr);
    1643             : 
    1644         246 :     SpinLockAcquire(&slot->mutex);
    1645             : 
    1646             :     /* don't overwrite if have a newer restart lsn */
    1647         246 :     if (restart_lsn <= slot->data.restart_lsn)
    1648             :     {
    1649             :     }
    1650             : 
    1651             :     /*
    1652             :      * We might have already flushed far enough to directly accept this lsn,
    1653             :      * in this case there is no need to check for existing candidate LSNs
    1654             :      */
    1655         242 :     else if (current_lsn <= slot->data.confirmed_flush)
    1656             :     {
    1657         180 :         slot->candidate_restart_valid = current_lsn;
    1658         180 :         slot->candidate_restart_lsn = restart_lsn;
    1659             : 
    1660             :         /* our candidate can directly be used */
    1661         180 :         updated_lsn = true;
    1662             :     }
    1663             : 
    1664             :     /*
    1665             :      * Only increase if the previous values have been applied, otherwise we
    1666             :      * might never end up updating if the receiver acks too slowly. A missed
    1667             :      * value here will just cause some extra effort after reconnecting.
    1668             :      */
    1669         246 :     if (slot->candidate_restart_valid == InvalidXLogRecPtr)
    1670             :     {
    1671          30 :         slot->candidate_restart_valid = current_lsn;
    1672          30 :         slot->candidate_restart_lsn = restart_lsn;
    1673          30 :         SpinLockRelease(&slot->mutex);
    1674             : 
    1675          30 :         elog(DEBUG1, "got new restart lsn %X/%X at %X/%X",
    1676             :              LSN_FORMAT_ARGS(restart_lsn),
    1677             :              LSN_FORMAT_ARGS(current_lsn));
    1678             :     }
    1679             :     else
    1680             :     {
    1681             :         XLogRecPtr  candidate_restart_lsn;
    1682             :         XLogRecPtr  candidate_restart_valid;
    1683             :         XLogRecPtr  confirmed_flush;
    1684             : 
    1685         216 :         candidate_restart_lsn = slot->candidate_restart_lsn;
    1686         216 :         candidate_restart_valid = slot->candidate_restart_valid;
    1687         216 :         confirmed_flush = slot->data.confirmed_flush;
    1688         216 :         SpinLockRelease(&slot->mutex);
    1689             : 
    1690         216 :         elog(DEBUG1, "failed to increase restart lsn: proposed %X/%X, after %X/%X, current candidate %X/%X, current after %X/%X, flushed up to %X/%X",
    1691             :              LSN_FORMAT_ARGS(restart_lsn),
    1692             :              LSN_FORMAT_ARGS(current_lsn),
    1693             :              LSN_FORMAT_ARGS(candidate_restart_lsn),
    1694             :              LSN_FORMAT_ARGS(candidate_restart_valid),
    1695             :              LSN_FORMAT_ARGS(confirmed_flush));
    1696             :     }
    1697             : 
    1698             :     /* candidates are already valid with the current flush position, apply */
    1699         246 :     if (updated_lsn)
    1700         180 :         LogicalConfirmReceivedLocation(slot->data.confirmed_flush);
    1701         246 : }
    1702             : 
    1703             : /*
    1704             :  * Handle a consumer's confirmation having received all changes up to lsn.
    1705             :  */
    1706             : void
    1707       44046 : LogicalConfirmReceivedLocation(XLogRecPtr lsn)
    1708             : {
    1709             :     Assert(lsn != InvalidXLogRecPtr);
    1710             : 
    1711             :     /* Do an unlocked check for candidate_lsn first. */
    1712       44046 :     if (MyReplicationSlot->candidate_xmin_lsn != InvalidXLogRecPtr ||
    1713       44004 :         MyReplicationSlot->candidate_restart_valid != InvalidXLogRecPtr)
    1714         240 :     {
    1715         240 :         bool        updated_xmin = false;
    1716         240 :         bool        updated_restart = false;
    1717             : 
    1718         240 :         SpinLockAcquire(&MyReplicationSlot->mutex);
    1719             : 
    1720         240 :         MyReplicationSlot->data.confirmed_flush = lsn;
    1721             : 
    1722             :         /* if we're past the location required for bumping xmin, do so */
    1723         240 :         if (MyReplicationSlot->candidate_xmin_lsn != InvalidXLogRecPtr &&
    1724          42 :             MyReplicationSlot->candidate_xmin_lsn <= lsn)
    1725             :         {
    1726             :             /*
    1727             :              * We have to write the changed xmin to disk *before* we change
    1728             :              * the in-memory value, otherwise after a crash we wouldn't know
    1729             :              * that some catalog tuples might have been removed already.
    1730             :              *
    1731             :              * Ensure that by first writing to ->xmin and only update
    1732             :              * ->effective_xmin once the new state is synced to disk. After a
    1733             :              * crash ->effective_xmin is set to ->xmin.
    1734             :              */
    1735          40 :             if (TransactionIdIsValid(MyReplicationSlot->candidate_catalog_xmin) &&
    1736          40 :                 MyReplicationSlot->data.catalog_xmin != MyReplicationSlot->candidate_catalog_xmin)
    1737             :             {
    1738          40 :                 MyReplicationSlot->data.catalog_xmin = MyReplicationSlot->candidate_catalog_xmin;
    1739          40 :                 MyReplicationSlot->candidate_catalog_xmin = InvalidTransactionId;
    1740          40 :                 MyReplicationSlot->candidate_xmin_lsn = InvalidXLogRecPtr;
    1741          40 :                 updated_xmin = true;
    1742             :             }
    1743             :         }
    1744             : 
    1745         240 :         if (MyReplicationSlot->candidate_restart_valid != InvalidXLogRecPtr &&
    1746         210 :             MyReplicationSlot->candidate_restart_valid <= lsn)
    1747             :         {
    1748             :             Assert(MyReplicationSlot->candidate_restart_lsn != InvalidXLogRecPtr);
    1749             : 
    1750         206 :             MyReplicationSlot->data.restart_lsn = MyReplicationSlot->candidate_restart_lsn;
    1751         206 :             MyReplicationSlot->candidate_restart_lsn = InvalidXLogRecPtr;
    1752         206 :             MyReplicationSlot->candidate_restart_valid = InvalidXLogRecPtr;
    1753         206 :             updated_restart = true;
    1754             :         }
    1755             : 
    1756         240 :         SpinLockRelease(&MyReplicationSlot->mutex);
    1757             : 
    1758             :         /* first write new xmin to disk, so we know what's up after a crash */
    1759         240 :         if (updated_xmin || updated_restart)
    1760             :         {
    1761         236 :             ReplicationSlotMarkDirty();
    1762         236 :             ReplicationSlotSave();
    1763         236 :             elog(DEBUG1, "updated xmin: %u restart: %u", updated_xmin, updated_restart);
    1764             :         }
    1765             : 
    1766             :         /*
    1767             :          * Now the new xmin is safely on disk, we can let the global value
    1768             :          * advance. We do not take ProcArrayLock or similar since we only
    1769             :          * advance xmin here and there's not much harm done by a concurrent
    1770             :          * computation missing that.
    1771             :          */
    1772         240 :         if (updated_xmin)
    1773             :         {
    1774          40 :             SpinLockAcquire(&MyReplicationSlot->mutex);
    1775          40 :             MyReplicationSlot->effective_catalog_xmin = MyReplicationSlot->data.catalog_xmin;
    1776          40 :             SpinLockRelease(&MyReplicationSlot->mutex);
    1777             : 
    1778          40 :             ReplicationSlotsComputeRequiredXmin(false);
    1779          40 :             ReplicationSlotsComputeRequiredLSN();
    1780             :         }
    1781             :     }
    1782             :     else
    1783             :     {
    1784       43806 :         SpinLockAcquire(&MyReplicationSlot->mutex);
    1785       43806 :         MyReplicationSlot->data.confirmed_flush = lsn;
    1786       43806 :         SpinLockRelease(&MyReplicationSlot->mutex);
    1787             :     }
    1788       44046 : }
    1789             : 
    1790             : /*
    1791             :  * Clear logical streaming state during (sub)transaction abort.
    1792             :  */
    1793             : void
    1794       30048 : ResetLogicalStreamingState(void)
    1795             : {
    1796       30048 :     CheckXidAlive = InvalidTransactionId;
    1797       30048 :     bsysscan = false;
    1798       30048 : }
    1799             : 
    1800             : /*
    1801             :  * Report stats for a slot.
    1802             :  */
    1803             : void
    1804        8162 : UpdateDecodingStats(LogicalDecodingContext *ctx)
    1805             : {
    1806        8162 :     ReorderBuffer *rb = ctx->reorder;
    1807             :     PgStat_StatReplSlotEntry repSlotStat;
    1808             : 
    1809             :     /* Nothing to do if we don't have any replication stats to be sent. */
    1810        8162 :     if (rb->spillBytes <= 0 && rb->streamBytes <= 0 && rb->totalBytes <= 0)
    1811         266 :         return;
    1812             : 
    1813        7896 :     elog(DEBUG2, "UpdateDecodingStats: updating stats %p %lld %lld %lld %lld %lld %lld %lld %lld",
    1814             :          rb,
    1815             :          (long long) rb->spillTxns,
    1816             :          (long long) rb->spillCount,
    1817             :          (long long) rb->spillBytes,
    1818             :          (long long) rb->streamTxns,
    1819             :          (long long) rb->streamCount,
    1820             :          (long long) rb->streamBytes,
    1821             :          (long long) rb->totalTxns,
    1822             :          (long long) rb->totalBytes);
    1823             : 
    1824        7896 :     namestrcpy(&repSlotStat.slotname, NameStr(ctx->slot->data.name));
    1825        7896 :     repSlotStat.spill_txns = rb->spillTxns;
    1826        7896 :     repSlotStat.spill_count = rb->spillCount;
    1827        7896 :     repSlotStat.spill_bytes = rb->spillBytes;
    1828        7896 :     repSlotStat.stream_txns = rb->streamTxns;
    1829        7896 :     repSlotStat.stream_count = rb->streamCount;
    1830        7896 :     repSlotStat.stream_bytes = rb->streamBytes;
    1831        7896 :     repSlotStat.total_txns = rb->totalTxns;
    1832        7896 :     repSlotStat.total_bytes = rb->totalBytes;
    1833             : 
    1834        7896 :     pgstat_report_replslot(&repSlotStat);
    1835             : 
    1836        7896 :     rb->spillTxns = 0;
    1837        7896 :     rb->spillCount = 0;
    1838        7896 :     rb->spillBytes = 0;
    1839        7896 :     rb->streamTxns = 0;
    1840        7896 :     rb->streamCount = 0;
    1841        7896 :     rb->streamBytes = 0;
    1842        7896 :     rb->totalTxns = 0;
    1843        7896 :     rb->totalBytes = 0;
    1844             : }

Generated by: LCOV version 1.14