LCOV - code coverage report
Current view: top level - src/backend/replication/logical - logical.c (source / functions) Hit Total Coverage
Test: PostgreSQL 13beta1 Lines: 330 357 92.4 %
Date: 2020-06-01 00:06:26 Functions: 23 23 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  * logical.c
       3             :  *     PostgreSQL logical decoding coordination
       4             :  *
       5             :  * Copyright (c) 2012-2020, PostgreSQL Global Development Group
       6             :  *
       7             :  * IDENTIFICATION
       8             :  *    src/backend/replication/logical/logical.c
       9             :  *
      10             :  * NOTES
      11             :  *    This file coordinates interaction between the various modules that
      12             :  *    together provide logical decoding, primarily by providing so
      13             :  *    called LogicalDecodingContexts. The goal is to encapsulate most of the
      14             :  *    internal complexity for consumers of logical decoding, so they can
      15             :  *    create and consume a changestream with a low amount of code. Builtin
      16             :  *    consumers are the walsender and SQL SRF interface, but it's possible to
      17             :  *    add further ones without changing core code, e.g. to consume changes in
      18             :  *    a bgworker.
      19             :  *
      20             :  *    The idea is that a consumer provides three callbacks, one to read WAL,
      21             :  *    one to prepare a data write, and a final one for actually writing since
      22             :  *    their implementation depends on the type of consumer.  Check
      23             :  *    logicalfuncs.c for an example implementation of a fairly simple consumer
      24             :  *    and an implementation of a WAL reading callback that's suitable for
      25             :  *    simple consumers.
      26             :  *-------------------------------------------------------------------------
      27             :  */
      28             : 
      29             : #include "postgres.h"
      30             : 
      31             : #include "access/xact.h"
      32             : #include "access/xlog_internal.h"
      33             : #include "fmgr.h"
      34             : #include "miscadmin.h"
      35             : #include "replication/decode.h"
      36             : #include "replication/logical.h"
      37             : #include "replication/origin.h"
      38             : #include "replication/reorderbuffer.h"
      39             : #include "replication/snapbuild.h"
      40             : #include "storage/proc.h"
      41             : #include "storage/procarray.h"
      42             : #include "utils/memutils.h"
      43             : 
      44             : /* data for errcontext callback */
      45             : typedef struct LogicalErrorCallbackState
      46             : {
      47             :     LogicalDecodingContext *ctx;
      48             :     const char *callback_name;
      49             :     XLogRecPtr  report_location;
      50             : } LogicalErrorCallbackState;
      51             : 
      52             : /* wrappers around output plugin callbacks */
      53             : static void output_plugin_error_callback(void *arg);
      54             : static void startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
      55             :                                bool is_init);
      56             : static void shutdown_cb_wrapper(LogicalDecodingContext *ctx);
      57             : static void begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn);
      58             : static void commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      59             :                               XLogRecPtr commit_lsn);
      60             : static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      61             :                               Relation relation, ReorderBufferChange *change);
      62             : static void truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      63             :                                 int nrelations, Relation relations[], ReorderBufferChange *change);
      64             : static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
      65             :                                XLogRecPtr message_lsn, bool transactional,
      66             :                                const char *prefix, Size message_size, const char *message);
      67             : 
      68             : static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, char *plugin);
      69             : 
      70             : /*
      71             :  * Make sure the current settings & environment are capable of doing logical
      72             :  * decoding.
      73             :  */
      74             : void
      75         612 : CheckLogicalDecodingRequirements(void)
      76             : {
      77         612 :     CheckSlotRequirements();
      78             : 
      79             :     /*
      80             :      * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
      81             :      * needs the same check.
      82             :      */
      83             : 
      84         612 :     if (wal_level < WAL_LEVEL_LOGICAL)
      85           0 :         ereport(ERROR,
      86             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
      87             :                  errmsg("logical decoding requires wal_level >= logical")));
      88             : 
      89         612 :     if (MyDatabaseId == InvalidOid)
      90           0 :         ereport(ERROR,
      91             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
      92             :                  errmsg("logical decoding requires a database connection")));
      93             : 
      94             :     /* ----
      95             :      * TODO: We got to change that someday soon...
      96             :      *
      97             :      * There's basically three things missing to allow this:
      98             :      * 1) We need to be able to correctly and quickly identify the timeline a
      99             :      *    LSN belongs to
     100             :      * 2) We need to force hot_standby_feedback to be enabled at all times so
     101             :      *    the primary cannot remove rows we need.
     102             :      * 3) support dropping replication slots referring to a database, in
     103             :      *    dbase_redo. There can't be any active ones due to HS recovery
     104             :      *    conflicts, so that should be relatively easy.
     105             :      * ----
     106             :      */
     107         612 :     if (RecoveryInProgress())
     108           0 :         ereport(ERROR,
     109             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     110             :                  errmsg("logical decoding cannot be used while in recovery")));
     111         612 : }
     112             : 
     113             : /*
     114             :  * Helper function for CreateInitDecodingContext() and
     115             :  * CreateDecodingContext() performing common tasks.
     116             :  */
     117             : static LogicalDecodingContext *
     118         596 : StartupDecodingContext(List *output_plugin_options,
     119             :                        XLogRecPtr start_lsn,
     120             :                        TransactionId xmin_horizon,
     121             :                        bool need_full_snapshot,
     122             :                        bool fast_forward,
     123             :                        XLogReaderRoutine *xl_routine,
     124             :                        LogicalOutputPluginWriterPrepareWrite prepare_write,
     125             :                        LogicalOutputPluginWriterWrite do_write,
     126             :                        LogicalOutputPluginWriterUpdateProgress update_progress)
     127             : {
     128             :     ReplicationSlot *slot;
     129             :     MemoryContext context,
     130             :                 old_context;
     131             :     LogicalDecodingContext *ctx;
     132             : 
     133             :     /* shorter lines... */
     134         596 :     slot = MyReplicationSlot;
     135             : 
     136         596 :     context = AllocSetContextCreate(CurrentMemoryContext,
     137             :                                     "Logical decoding context",
     138             :                                     ALLOCSET_DEFAULT_SIZES);
     139         596 :     old_context = MemoryContextSwitchTo(context);
     140         596 :     ctx = palloc0(sizeof(LogicalDecodingContext));
     141             : 
     142         596 :     ctx->context = context;
     143             : 
     144             :     /*
     145             :      * (re-)load output plugins, so we detect a bad (removed) output plugin
     146             :      * now.
     147             :      */
     148         596 :     if (!fast_forward)
     149         590 :         LoadOutputPlugin(&ctx->callbacks, NameStr(slot->data.plugin));
     150             : 
     151             :     /*
     152             :      * Now that the slot's xmin has been set, we can announce ourselves as a
     153             :      * logical decoding backend which doesn't need to be checked individually
     154             :      * when computing the xmin horizon because the xmin is enforced via
     155             :      * replication slots.
     156             :      *
     157             :      * We can only do so if we're outside of a transaction (i.e. the case when
     158             :      * streaming changes via walsender), otherwise an already setup
     159             :      * snapshot/xid would end up being ignored. That's not a particularly
     160             :      * bothersome restriction since the SQL interface can't be used for
     161             :      * streaming anyway.
     162             :      */
     163         594 :     if (!IsTransactionOrTransactionBlock())
     164             :     {
     165          90 :         LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
     166          90 :         MyPgXact->vacuumFlags |= PROC_IN_LOGICAL_DECODING;
     167          90 :         LWLockRelease(ProcArrayLock);
     168             :     }
     169             : 
     170         594 :     ctx->slot = slot;
     171             : 
     172         594 :     ctx->reader = XLogReaderAllocate(wal_segment_size, NULL, xl_routine, ctx);
     173         594 :     if (!ctx->reader)
     174           0 :         ereport(ERROR,
     175             :                 (errcode(ERRCODE_OUT_OF_MEMORY),
     176             :                  errmsg("out of memory")));
     177             : 
     178         594 :     ctx->reorder = ReorderBufferAllocate();
     179         594 :     ctx->snapshot_builder =
     180         594 :         AllocateSnapshotBuilder(ctx->reorder, xmin_horizon, start_lsn,
     181             :                                 need_full_snapshot);
     182             : 
     183         594 :     ctx->reorder->private_data = ctx;
     184             : 
     185             :     /* wrap output plugin callbacks, so we can add error context information */
     186         594 :     ctx->reorder->begin = begin_cb_wrapper;
     187         594 :     ctx->reorder->apply_change = change_cb_wrapper;
     188         594 :     ctx->reorder->apply_truncate = truncate_cb_wrapper;
     189         594 :     ctx->reorder->commit = commit_cb_wrapper;
     190         594 :     ctx->reorder->message = message_cb_wrapper;
     191             : 
     192         594 :     ctx->out = makeStringInfo();
     193         594 :     ctx->prepare_write = prepare_write;
     194         594 :     ctx->write = do_write;
     195         594 :     ctx->update_progress = update_progress;
     196             : 
     197         594 :     ctx->output_plugin_options = output_plugin_options;
     198             : 
     199         594 :     ctx->fast_forward = fast_forward;
     200             : 
     201         594 :     MemoryContextSwitchTo(old_context);
     202             : 
     203         594 :     return ctx;
     204             : }
     205             : 
     206             : /*
     207             :  * Create a new decoding context, for a new logical slot.
     208             :  *
     209             :  * plugin -- contains the name of the output plugin
     210             :  * output_plugin_options -- contains options passed to the output plugin
     211             :  * need_full_snapshot -- if true, must obtain a snapshot able to read all
     212             :  *      tables; if false, one that can read only catalogs is acceptable.
     213             :  * restart_lsn -- if given as invalid, it's this routine's responsibility to
     214             :  *      mark WAL as reserved by setting a convenient restart_lsn for the slot.
     215             :  *      Otherwise, we set for decoding to start from the given LSN without
     216             :  *      marking WAL reserved beforehand.  In that scenario, it's up to the
     217             :  *      caller to guarantee that WAL remains available.
     218             :  * xl_routine -- XLogReaderRoutine for underlying XLogReader
     219             :  * prepare_write, do_write, update_progress --
     220             :  *      callbacks that perform the use-case dependent, actual, work.
     221             :  *
     222             :  * Needs to be called while in a memory context that's at least as long lived
     223             :  * as the decoding context because further memory contexts will be created
     224             :  * inside it.
     225             :  *
     226             :  * Returns an initialized decoding context after calling the output plugin's
     227             :  * startup function.
     228             :  */
     229             : LogicalDecodingContext *
     230         296 : CreateInitDecodingContext(char *plugin,
     231             :                           List *output_plugin_options,
     232             :                           bool need_full_snapshot,
     233             :                           XLogRecPtr restart_lsn,
     234             :                           XLogReaderRoutine *xl_routine,
     235             :                           LogicalOutputPluginWriterPrepareWrite prepare_write,
     236             :                           LogicalOutputPluginWriterWrite do_write,
     237             :                           LogicalOutputPluginWriterUpdateProgress update_progress)
     238             : {
     239         296 :     TransactionId xmin_horizon = InvalidTransactionId;
     240             :     ReplicationSlot *slot;
     241             :     LogicalDecodingContext *ctx;
     242             :     MemoryContext old_context;
     243             : 
     244             :     /* shorter lines... */
     245         296 :     slot = MyReplicationSlot;
     246             : 
     247             :     /* first some sanity checks that are unlikely to be violated */
     248         296 :     if (slot == NULL)
     249           0 :         elog(ERROR, "cannot perform logical decoding without an acquired slot");
     250             : 
     251         296 :     if (plugin == NULL)
     252           0 :         elog(ERROR, "cannot initialize logical decoding without a specified plugin");
     253             : 
     254             :     /* Make sure the passed slot is suitable. These are user facing errors. */
     255         296 :     if (SlotIsPhysical(slot))
     256           0 :         ereport(ERROR,
     257             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     258             :                  errmsg("cannot use physical replication slot for logical decoding")));
     259             : 
     260         296 :     if (slot->data.database != MyDatabaseId)
     261           0 :         ereport(ERROR,
     262             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     263             :                  errmsg("replication slot \"%s\" was not created in this database",
     264             :                         NameStr(slot->data.name))));
     265             : 
     266         550 :     if (IsTransactionState() &&
     267         254 :         GetTopTransactionIdIfAny() != InvalidTransactionId)
     268           4 :         ereport(ERROR,
     269             :                 (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
     270             :                  errmsg("cannot create logical replication slot in transaction that has performed writes")));
     271             : 
     272             :     /* register output plugin name with slot */
     273         292 :     SpinLockAcquire(&slot->mutex);
     274         292 :     StrNCpy(NameStr(slot->data.plugin), plugin, NAMEDATALEN);
     275         292 :     SpinLockRelease(&slot->mutex);
     276             : 
     277         292 :     if (XLogRecPtrIsInvalid(restart_lsn))
     278         280 :         ReplicationSlotReserveWal();
     279             :     else
     280             :     {
     281          12 :         SpinLockAcquire(&slot->mutex);
     282          12 :         slot->data.restart_lsn = restart_lsn;
     283          12 :         SpinLockRelease(&slot->mutex);
     284             :     }
     285             : 
     286             :     /* ----
     287             :      * This is a bit tricky: We need to determine a safe xmin horizon to start
     288             :      * decoding from, to avoid starting from a running xacts record referring
     289             :      * to xids whose rows have been vacuumed or pruned
     290             :      * already. GetOldestSafeDecodingTransactionId() returns such a value, but
     291             :      * without further interlock its return value might immediately be out of
     292             :      * date.
     293             :      *
     294             :      * So we have to acquire the ProcArrayLock to prevent computation of new
     295             :      * xmin horizons by other backends, get the safe decoding xid, and inform
     296             :      * the slot machinery about the new limit. Once that's done the
     297             :      * ProcArrayLock can be released as the slot machinery now is
     298             :      * protecting against vacuum.
     299             :      *
     300             :      * Note that, temporarily, the data, not just the catalog, xmin has to be
     301             :      * reserved if a data snapshot is to be exported.  Otherwise the initial
     302             :      * data snapshot created here is not guaranteed to be valid. After that
     303             :      * the data xmin doesn't need to be managed anymore and the global xmin
     304             :      * should be recomputed. As we are fine with losing the pegged data xmin
     305             :      * after crash - no chance a snapshot would get exported anymore - we can
     306             :      * get away with just setting the slot's
     307             :      * effective_xmin. ReplicationSlotRelease will reset it again.
     308             :      *
     309             :      * ----
     310             :      */
     311         292 :     LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
     312             : 
     313         292 :     xmin_horizon = GetOldestSafeDecodingTransactionId(!need_full_snapshot);
     314             : 
     315         292 :     SpinLockAcquire(&slot->mutex);
     316         292 :     slot->effective_catalog_xmin = xmin_horizon;
     317         292 :     slot->data.catalog_xmin = xmin_horizon;
     318         292 :     if (need_full_snapshot)
     319         102 :         slot->effective_xmin = xmin_horizon;
     320         292 :     SpinLockRelease(&slot->mutex);
     321             : 
     322         292 :     ReplicationSlotsComputeRequiredXmin(true);
     323             : 
     324         292 :     LWLockRelease(ProcArrayLock);
     325             : 
     326         292 :     ReplicationSlotMarkDirty();
     327         292 :     ReplicationSlotSave();
     328             : 
     329         292 :     ctx = StartupDecodingContext(NIL, restart_lsn, xmin_horizon,
     330             :                                  need_full_snapshot, false,
     331             :                                  xl_routine, prepare_write, do_write,
     332             :                                  update_progress);
     333             : 
     334             :     /* call output plugin initialization callback */
     335         290 :     old_context = MemoryContextSwitchTo(ctx->context);
     336         290 :     if (ctx->callbacks.startup_cb != NULL)
     337         290 :         startup_cb_wrapper(ctx, &ctx->options, true);
     338         290 :     MemoryContextSwitchTo(old_context);
     339             : 
     340         290 :     ctx->reorder->output_rewrites = ctx->options.receive_rewrites;
     341             : 
     342         290 :     return ctx;
     343             : }
     344             : 
     345             : /*
     346             :  * Create a new decoding context, for a logical slot that has previously been
     347             :  * used already.
     348             :  *
     349             :  * start_lsn
     350             :  *      The LSN at which to start decoding.  If InvalidXLogRecPtr, restart
     351             :  *      from the slot's confirmed_flush; otherwise, start from the specified
     352             :  *      location (but move it forwards to confirmed_flush if it's older than
     353             :  *      that, see below).
     354             :  *
     355             :  * output_plugin_options
     356             :  *      options passed to the output plugin.
     357             :  *
     358             :  * fast_forward
     359             :  *      bypass the generation of logical changes.
     360             :  *
     361             :  * xl_routine
     362             :  *      XLogReaderRoutine used by underlying xlogreader
     363             :  *
     364             :  * prepare_write, do_write, update_progress
     365             :  *      callbacks that have to be filled to perform the use-case dependent,
     366             :  *      actual work.
     367             :  *
     368             :  * Needs to be called while in a memory context that's at least as long lived
     369             :  * as the decoding context because further memory contexts will be created
     370             :  * inside it.
     371             :  *
     372             :  * Returns an initialized decoding context after calling the output plugin's
     373             :  * startup function.
     374             :  */
     375             : LogicalDecodingContext *
     376         310 : CreateDecodingContext(XLogRecPtr start_lsn,
     377             :                       List *output_plugin_options,
     378             :                       bool fast_forward,
     379             :                       XLogReaderRoutine *xl_routine,
     380             :                       LogicalOutputPluginWriterPrepareWrite prepare_write,
     381             :                       LogicalOutputPluginWriterWrite do_write,
     382             :                       LogicalOutputPluginWriterUpdateProgress update_progress)
     383             : {
     384             :     LogicalDecodingContext *ctx;
     385             :     ReplicationSlot *slot;
     386             :     MemoryContext old_context;
     387             : 
     388             :     /* shorter lines... */
     389         310 :     slot = MyReplicationSlot;
     390             : 
     391             :     /* first some sanity checks that are unlikely to be violated */
     392         310 :     if (slot == NULL)
     393           0 :         elog(ERROR, "cannot perform logical decoding without an acquired slot");
     394             : 
     395             :     /* make sure the passed slot is suitable, these are user facing errors */
     396         310 :     if (SlotIsPhysical(slot))
     397           2 :         ereport(ERROR,
     398             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     399             :                  errmsg("cannot use physical replication slot for logical decoding")));
     400             : 
     401         308 :     if (slot->data.database != MyDatabaseId)
     402           4 :         ereport(ERROR,
     403             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     404             :                  errmsg("replication slot \"%s\" was not created in this database",
     405             :                         NameStr(slot->data.name))));
     406             : 
     407         304 :     if (start_lsn == InvalidXLogRecPtr)
     408             :     {
     409             :         /* continue from last position */
     410         296 :         start_lsn = slot->data.confirmed_flush;
     411             :     }
     412           8 :     else if (start_lsn < slot->data.confirmed_flush)
     413             :     {
     414             :         /*
     415             :          * It might seem like we should error out in this case, but it's
     416             :          * pretty common for a client to acknowledge a LSN it doesn't have to
     417             :          * do anything for, and thus didn't store persistently, because the
     418             :          * xlog records didn't result in anything relevant for logical
     419             :          * decoding. Clients have to be able to do that to support synchronous
     420             :          * replication.
     421             :          */
     422           0 :         elog(DEBUG1, "cannot stream from %X/%X, minimum is %X/%X, forwarding",
     423             :              (uint32) (start_lsn >> 32), (uint32) start_lsn,
     424             :              (uint32) (slot->data.confirmed_flush >> 32),
     425             :              (uint32) slot->data.confirmed_flush);
     426             : 
     427           0 :         start_lsn = slot->data.confirmed_flush;
     428             :     }
     429             : 
     430         304 :     ctx = StartupDecodingContext(output_plugin_options,
     431             :                                  start_lsn, InvalidTransactionId, false,
     432             :                                  fast_forward, xl_routine, prepare_write,
     433             :                                  do_write, update_progress);
     434             : 
     435             :     /* call output plugin initialization callback */
     436         304 :     old_context = MemoryContextSwitchTo(ctx->context);
     437         304 :     if (ctx->callbacks.startup_cb != NULL)
     438         298 :         startup_cb_wrapper(ctx, &ctx->options, false);
     439         298 :     MemoryContextSwitchTo(old_context);
     440             : 
     441         298 :     ctx->reorder->output_rewrites = ctx->options.receive_rewrites;
     442             : 
     443         298 :     ereport(LOG,
     444             :             (errmsg("starting logical decoding for slot \"%s\"",
     445             :                     NameStr(slot->data.name)),
     446             :              errdetail("Streaming transactions committing after %X/%X, reading WAL from %X/%X.",
     447             :                        (uint32) (slot->data.confirmed_flush >> 32),
     448             :                        (uint32) slot->data.confirmed_flush,
     449             :                        (uint32) (slot->data.restart_lsn >> 32),
     450             :                        (uint32) slot->data.restart_lsn)));
     451             : 
     452         298 :     return ctx;
     453             : }
     454             : 
     455             : /*
     456             :  * Returns true if a consistent initial decoding snapshot has been built.
     457             :  */
     458             : bool
     459         306 : DecodingContextReady(LogicalDecodingContext *ctx)
     460             : {
     461         306 :     return SnapBuildCurrentState(ctx->snapshot_builder) == SNAPBUILD_CONSISTENT;
     462             : }
     463             : 
     464             : /*
     465             :  * Read from the decoding slot, until it is ready to start extracting changes.
     466             :  */
     467             : void
     468         278 : DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
     469             : {
     470         278 :     ReplicationSlot *slot = ctx->slot;
     471             : 
     472             :     /* Initialize from where to start reading WAL. */
     473         278 :     XLogBeginRead(ctx->reader, slot->data.restart_lsn);
     474             : 
     475         278 :     elog(DEBUG1, "searching for logical decoding starting point, starting at %X/%X",
     476             :          (uint32) (slot->data.restart_lsn >> 32),
     477             :          (uint32) slot->data.restart_lsn);
     478             : 
     479             :     /* Wait for a consistent starting point */
     480             :     for (;;)
     481          28 :     {
     482             :         XLogRecord *record;
     483         306 :         char       *err = NULL;
     484             : 
     485             :         /* the read_page callback waits for new WAL */
     486         306 :         record = XLogReadRecord(ctx->reader, &err);
     487         306 :         if (err)
     488           0 :             elog(ERROR, "%s", err);
     489         306 :         if (!record)
     490           0 :             elog(ERROR, "no record found"); /* shouldn't happen */
     491             : 
     492         306 :         LogicalDecodingProcessRecord(ctx, ctx->reader);
     493             : 
     494             :         /* only continue till we found a consistent spot */
     495         306 :         if (DecodingContextReady(ctx))
     496         278 :             break;
     497             : 
     498          28 :         CHECK_FOR_INTERRUPTS();
     499             :     }
     500             : 
     501         278 :     SpinLockAcquire(&slot->mutex);
     502         278 :     slot->data.confirmed_flush = ctx->reader->EndRecPtr;
     503         278 :     SpinLockRelease(&slot->mutex);
     504         278 : }
     505             : 
     506             : /*
     507             :  * Free a previously allocated decoding context, invoking the shutdown
     508             :  * callback if necessary.
     509             :  */
     510             : void
     511         544 : FreeDecodingContext(LogicalDecodingContext *ctx)
     512             : {
     513         544 :     if (ctx->callbacks.shutdown_cb != NULL)
     514         538 :         shutdown_cb_wrapper(ctx);
     515             : 
     516         544 :     ReorderBufferFree(ctx->reorder);
     517         544 :     FreeSnapshotBuilder(ctx->snapshot_builder);
     518         544 :     XLogReaderFree(ctx->reader);
     519         544 :     MemoryContextDelete(ctx->context);
     520         544 : }
     521             : 
     522             : /*
     523             :  * Prepare a write using the context's output routine.
     524             :  */
     525             : void
     526      293682 : OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write)
     527             : {
     528      293682 :     if (!ctx->accept_writes)
     529           0 :         elog(ERROR, "writes are only accepted in commit, begin and change callbacks");
     530             : 
     531      293682 :     ctx->prepare_write(ctx, ctx->write_location, ctx->write_xid, last_write);
     532      293682 :     ctx->prepared_write = true;
     533      293682 : }
     534             : 
     535             : /*
     536             :  * Perform a write using the context's output routine.
     537             :  */
     538             : void
     539      293682 : OutputPluginWrite(struct LogicalDecodingContext *ctx, bool last_write)
     540             : {
     541      293682 :     if (!ctx->prepared_write)
     542           0 :         elog(ERROR, "OutputPluginPrepareWrite needs to be called before OutputPluginWrite");
     543             : 
     544      293682 :     ctx->write(ctx, ctx->write_location, ctx->write_xid, last_write);
     545      293682 :     ctx->prepared_write = false;
     546      293682 : }
     547             : 
     548             : /*
     549             :  * Update progress tracking (if supported).
     550             :  */
     551             : void
     552         308 : OutputPluginUpdateProgress(struct LogicalDecodingContext *ctx)
     553             : {
     554         308 :     if (!ctx->update_progress)
     555           0 :         return;
     556             : 
     557         308 :     ctx->update_progress(ctx, ctx->write_location, ctx->write_xid);
     558             : }
     559             : 
     560             : /*
     561             :  * Load the output plugin, lookup its output plugin init function, and check
     562             :  * that it provides the required callbacks.
     563             :  */
     564             : static void
     565         590 : LoadOutputPlugin(OutputPluginCallbacks *callbacks, char *plugin)
     566             : {
     567             :     LogicalOutputPluginInit plugin_init;
     568             : 
     569             :     plugin_init = (LogicalOutputPluginInit)
     570         590 :         load_external_function(plugin, "_PG_output_plugin_init", false, NULL);
     571             : 
     572         588 :     if (plugin_init == NULL)
     573           0 :         elog(ERROR, "output plugins have to declare the _PG_output_plugin_init symbol");
     574             : 
     575             :     /* ask the output plugin to fill the callback struct */
     576         588 :     plugin_init(callbacks);
     577             : 
     578         588 :     if (callbacks->begin_cb == NULL)
     579           0 :         elog(ERROR, "output plugins have to register a begin callback");
     580         588 :     if (callbacks->change_cb == NULL)
     581           0 :         elog(ERROR, "output plugins have to register a change callback");
     582         588 :     if (callbacks->commit_cb == NULL)
     583           0 :         elog(ERROR, "output plugins have to register a commit callback");
     584         588 : }
     585             : 
     586             : static void
     587           6 : output_plugin_error_callback(void *arg)
     588             : {
     589           6 :     LogicalErrorCallbackState *state = (LogicalErrorCallbackState *) arg;
     590             : 
     591             :     /* not all callbacks have an associated LSN  */
     592           6 :     if (state->report_location != InvalidXLogRecPtr)
     593           0 :         errcontext("slot \"%s\", output plugin \"%s\", in the %s callback, associated LSN %X/%X",
     594           0 :                    NameStr(state->ctx->slot->data.name),
     595           0 :                    NameStr(state->ctx->slot->data.plugin),
     596             :                    state->callback_name,
     597           0 :                    (uint32) (state->report_location >> 32),
     598           0 :                    (uint32) state->report_location);
     599             :     else
     600           6 :         errcontext("slot \"%s\", output plugin \"%s\", in the %s callback",
     601           6 :                    NameStr(state->ctx->slot->data.name),
     602           6 :                    NameStr(state->ctx->slot->data.plugin),
     603             :                    state->callback_name);
     604           6 : }
     605             : 
     606             : static void
     607         588 : startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init)
     608             : {
     609             :     LogicalErrorCallbackState state;
     610             :     ErrorContextCallback errcallback;
     611             : 
     612             :     Assert(!ctx->fast_forward);
     613             : 
     614             :     /* Push callback + info on the error context stack */
     615         588 :     state.ctx = ctx;
     616         588 :     state.callback_name = "startup";
     617         588 :     state.report_location = InvalidXLogRecPtr;
     618         588 :     errcallback.callback = output_plugin_error_callback;
     619         588 :     errcallback.arg = (void *) &state;
     620         588 :     errcallback.previous = error_context_stack;
     621         588 :     error_context_stack = &errcallback;
     622             : 
     623             :     /* set output state */
     624         588 :     ctx->accept_writes = false;
     625             : 
     626             :     /* do the actual work: call callback */
     627         588 :     ctx->callbacks.startup_cb(ctx, opt, is_init);
     628             : 
     629             :     /* Pop the error context stack */
     630         582 :     error_context_stack = errcallback.previous;
     631         582 : }
     632             : 
     633             : static void
     634         538 : shutdown_cb_wrapper(LogicalDecodingContext *ctx)
     635             : {
     636             :     LogicalErrorCallbackState state;
     637             :     ErrorContextCallback errcallback;
     638             : 
     639             :     Assert(!ctx->fast_forward);
     640             : 
     641             :     /* Push callback + info on the error context stack */
     642         538 :     state.ctx = ctx;
     643         538 :     state.callback_name = "shutdown";
     644         538 :     state.report_location = InvalidXLogRecPtr;
     645         538 :     errcallback.callback = output_plugin_error_callback;
     646         538 :     errcallback.arg = (void *) &state;
     647         538 :     errcallback.previous = error_context_stack;
     648         538 :     error_context_stack = &errcallback;
     649             : 
     650             :     /* set output state */
     651         538 :     ctx->accept_writes = false;
     652             : 
     653             :     /* do the actual work: call callback */
     654         538 :     ctx->callbacks.shutdown_cb(ctx);
     655             : 
     656             :     /* Pop the error context stack */
     657         538 :     error_context_stack = errcallback.previous;
     658         538 : }
     659             : 
     660             : 
     661             : /*
     662             :  * Callbacks for ReorderBuffer which add in some more information and then call
     663             :  * output_plugin.h plugins.
     664             :  */
     665             : static void
     666         974 : begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
     667             : {
     668         974 :     LogicalDecodingContext *ctx = cache->private_data;
     669             :     LogicalErrorCallbackState state;
     670             :     ErrorContextCallback errcallback;
     671             : 
     672             :     Assert(!ctx->fast_forward);
     673             : 
     674             :     /* Push callback + info on the error context stack */
     675         974 :     state.ctx = ctx;
     676         974 :     state.callback_name = "begin";
     677         974 :     state.report_location = txn->first_lsn;
     678         974 :     errcallback.callback = output_plugin_error_callback;
     679         974 :     errcallback.arg = (void *) &state;
     680         974 :     errcallback.previous = error_context_stack;
     681         974 :     error_context_stack = &errcallback;
     682             : 
     683             :     /* set output state */
     684         974 :     ctx->accept_writes = true;
     685         974 :     ctx->write_xid = txn->xid;
     686         974 :     ctx->write_location = txn->first_lsn;
     687             : 
     688             :     /* do the actual work: call callback */
     689         974 :     ctx->callbacks.begin_cb(ctx, txn);
     690             : 
     691             :     /* Pop the error context stack */
     692         974 :     error_context_stack = errcallback.previous;
     693         974 : }
     694             : 
     695             : static void
     696         974 : commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
     697             :                   XLogRecPtr commit_lsn)
     698             : {
     699         974 :     LogicalDecodingContext *ctx = cache->private_data;
     700             :     LogicalErrorCallbackState state;
     701             :     ErrorContextCallback errcallback;
     702             : 
     703             :     Assert(!ctx->fast_forward);
     704             : 
     705             :     /* Push callback + info on the error context stack */
     706         974 :     state.ctx = ctx;
     707         974 :     state.callback_name = "commit";
     708         974 :     state.report_location = txn->final_lsn; /* beginning of commit record */
     709         974 :     errcallback.callback = output_plugin_error_callback;
     710         974 :     errcallback.arg = (void *) &state;
     711         974 :     errcallback.previous = error_context_stack;
     712         974 :     error_context_stack = &errcallback;
     713             : 
     714             :     /* set output state */
     715         974 :     ctx->accept_writes = true;
     716         974 :     ctx->write_xid = txn->xid;
     717         974 :     ctx->write_location = txn->end_lsn; /* points to the end of the record */
     718             : 
     719             :     /* do the actual work: call callback */
     720         974 :     ctx->callbacks.commit_cb(ctx, txn, commit_lsn);
     721             : 
     722             :     /* Pop the error context stack */
     723         974 :     error_context_stack = errcallback.previous;
     724         974 : }
     725             : 
     726             : static void
     727      294316 : change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
     728             :                   Relation relation, ReorderBufferChange *change)
     729             : {
     730      294316 :     LogicalDecodingContext *ctx = cache->private_data;
     731             :     LogicalErrorCallbackState state;
     732             :     ErrorContextCallback errcallback;
     733             : 
     734             :     Assert(!ctx->fast_forward);
     735             : 
     736             :     /* Push callback + info on the error context stack */
     737      294316 :     state.ctx = ctx;
     738      294316 :     state.callback_name = "change";
     739      294316 :     state.report_location = change->lsn;
     740      294316 :     errcallback.callback = output_plugin_error_callback;
     741      294316 :     errcallback.arg = (void *) &state;
     742      294316 :     errcallback.previous = error_context_stack;
     743      294316 :     error_context_stack = &errcallback;
     744             : 
     745             :     /* set output state */
     746      294316 :     ctx->accept_writes = true;
     747      294316 :     ctx->write_xid = txn->xid;
     748             : 
     749             :     /*
     750             :      * report this change's lsn so replies from clients can give an up2date
     751             :      * answer. This won't ever be enough (and shouldn't be!) to confirm
     752             :      * receipt of this transaction, but it might allow another transaction's
     753             :      * commit to be confirmed with one message.
     754             :      */
     755      294316 :     ctx->write_location = change->lsn;
     756             : 
     757      294316 :     ctx->callbacks.change_cb(ctx, txn, relation, change);
     758             : 
     759             :     /* Pop the error context stack */
     760      294316 :     error_context_stack = errcallback.previous;
     761      294316 : }
     762             : 
     763             : static void
     764          20 : truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
     765             :                     int nrelations, Relation relations[], ReorderBufferChange *change)
     766             : {
     767          20 :     LogicalDecodingContext *ctx = cache->private_data;
     768             :     LogicalErrorCallbackState state;
     769             :     ErrorContextCallback errcallback;
     770             : 
     771             :     Assert(!ctx->fast_forward);
     772             : 
     773          20 :     if (!ctx->callbacks.truncate_cb)
     774           0 :         return;
     775             : 
     776             :     /* Push callback + info on the error context stack */
     777          20 :     state.ctx = ctx;
     778          20 :     state.callback_name = "truncate";
     779          20 :     state.report_location = change->lsn;
     780          20 :     errcallback.callback = output_plugin_error_callback;
     781          20 :     errcallback.arg = (void *) &state;
     782          20 :     errcallback.previous = error_context_stack;
     783          20 :     error_context_stack = &errcallback;
     784             : 
     785             :     /* set output state */
     786          20 :     ctx->accept_writes = true;
     787          20 :     ctx->write_xid = txn->xid;
     788             : 
     789             :     /*
     790             :      * report this change's lsn so replies from clients can give an up2date
     791             :      * answer. This won't ever be enough (and shouldn't be!) to confirm
     792             :      * receipt of this transaction, but it might allow another transaction's
     793             :      * commit to be confirmed with one message.
     794             :      */
     795          20 :     ctx->write_location = change->lsn;
     796             : 
     797          20 :     ctx->callbacks.truncate_cb(ctx, txn, nrelations, relations, change);
     798             : 
     799             :     /* Pop the error context stack */
     800          20 :     error_context_stack = errcallback.previous;
     801             : }
     802             : 
     803             : bool
     804     2046782 : filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
     805             : {
     806             :     LogicalErrorCallbackState state;
     807             :     ErrorContextCallback errcallback;
     808             :     bool        ret;
     809             : 
     810             :     Assert(!ctx->fast_forward);
     811             : 
     812             :     /* Push callback + info on the error context stack */
     813     2046782 :     state.ctx = ctx;
     814     2046782 :     state.callback_name = "filter_by_origin";
     815     2046782 :     state.report_location = InvalidXLogRecPtr;
     816     2046782 :     errcallback.callback = output_plugin_error_callback;
     817     2046782 :     errcallback.arg = (void *) &state;
     818     2046782 :     errcallback.previous = error_context_stack;
     819     2046782 :     error_context_stack = &errcallback;
     820             : 
     821             :     /* set output state */
     822     2046782 :     ctx->accept_writes = false;
     823             : 
     824             :     /* do the actual work: call callback */
     825     2046782 :     ret = ctx->callbacks.filter_by_origin_cb(ctx, origin_id);
     826             : 
     827             :     /* Pop the error context stack */
     828     2046782 :     error_context_stack = errcallback.previous;
     829             : 
     830     2046782 :     return ret;
     831             : }
     832             : 
     833             : static void
     834          16 : message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
     835             :                    XLogRecPtr message_lsn, bool transactional,
     836             :                    const char *prefix, Size message_size, const char *message)
     837             : {
     838          16 :     LogicalDecodingContext *ctx = cache->private_data;
     839             :     LogicalErrorCallbackState state;
     840             :     ErrorContextCallback errcallback;
     841             : 
     842             :     Assert(!ctx->fast_forward);
     843             : 
     844          16 :     if (ctx->callbacks.message_cb == NULL)
     845           0 :         return;
     846             : 
     847             :     /* Push callback + info on the error context stack */
     848          16 :     state.ctx = ctx;
     849          16 :     state.callback_name = "message";
     850          16 :     state.report_location = message_lsn;
     851          16 :     errcallback.callback = output_plugin_error_callback;
     852          16 :     errcallback.arg = (void *) &state;
     853          16 :     errcallback.previous = error_context_stack;
     854          16 :     error_context_stack = &errcallback;
     855             : 
     856             :     /* set output state */
     857          16 :     ctx->accept_writes = true;
     858          16 :     ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
     859          16 :     ctx->write_location = message_lsn;
     860             : 
     861             :     /* do the actual work: call callback */
     862          16 :     ctx->callbacks.message_cb(ctx, txn, message_lsn, transactional, prefix,
     863             :                               message_size, message);
     864             : 
     865             :     /* Pop the error context stack */
     866          16 :     error_context_stack = errcallback.previous;
     867             : }
     868             : 
     869             : /*
     870             :  * Set the required catalog xmin horizon for historic snapshots in the current
     871             :  * replication slot.
     872             :  *
     873             :  * Note that in the most cases, we won't be able to immediately use the xmin
     874             :  * to increase the xmin horizon: we need to wait till the client has confirmed
     875             :  * receiving current_lsn with LogicalConfirmReceivedLocation().
     876             :  */
     877             : void
     878         184 : LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
     879             : {
     880         184 :     bool        updated_xmin = false;
     881             :     ReplicationSlot *slot;
     882             : 
     883         184 :     slot = MyReplicationSlot;
     884             : 
     885             :     Assert(slot != NULL);
     886             : 
     887         184 :     SpinLockAcquire(&slot->mutex);
     888             : 
     889             :     /*
     890             :      * don't overwrite if we already have a newer xmin. This can happen if we
     891             :      * restart decoding in a slot.
     892             :      */
     893         184 :     if (TransactionIdPrecedesOrEquals(xmin, slot->data.catalog_xmin))
     894             :     {
     895             :     }
     896             : 
     897             :     /*
     898             :      * If the client has already confirmed up to this lsn, we directly can
     899             :      * mark this as accepted. This can happen if we restart decoding in a
     900             :      * slot.
     901             :      */
     902          56 :     else if (current_lsn <= slot->data.confirmed_flush)
     903             :     {
     904          14 :         slot->candidate_catalog_xmin = xmin;
     905          14 :         slot->candidate_xmin_lsn = current_lsn;
     906             : 
     907             :         /* our candidate can directly be used */
     908          14 :         updated_xmin = true;
     909             :     }
     910             : 
     911             :     /*
     912             :      * Only increase if the previous values have been applied, otherwise we
     913             :      * might never end up updating if the receiver acks too slowly.
     914             :      */
     915          42 :     else if (slot->candidate_xmin_lsn == InvalidXLogRecPtr)
     916             :     {
     917          10 :         slot->candidate_catalog_xmin = xmin;
     918          10 :         slot->candidate_xmin_lsn = current_lsn;
     919             :     }
     920         184 :     SpinLockRelease(&slot->mutex);
     921             : 
     922             :     /* candidate already valid with the current flush position, apply */
     923         184 :     if (updated_xmin)
     924          14 :         LogicalConfirmReceivedLocation(slot->data.confirmed_flush);
     925         184 : }
     926             : 
     927             : /*
     928             :  * Mark the minimal LSN (restart_lsn) we need to read to replay all
     929             :  * transactions that have not yet committed at current_lsn.
     930             :  *
     931             :  * Just like LogicalIncreaseXminForSlot this only takes effect when the
     932             :  * client has confirmed to have received current_lsn.
     933             :  */
     934             : void
     935         150 : LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart_lsn)
     936             : {
     937         150 :     bool        updated_lsn = false;
     938             :     ReplicationSlot *slot;
     939             : 
     940         150 :     slot = MyReplicationSlot;
     941             : 
     942             :     Assert(slot != NULL);
     943             :     Assert(restart_lsn != InvalidXLogRecPtr);
     944             :     Assert(current_lsn != InvalidXLogRecPtr);
     945             : 
     946         150 :     SpinLockAcquire(&slot->mutex);
     947             : 
     948             :     /* don't overwrite if have a newer restart lsn */
     949         150 :     if (restart_lsn <= slot->data.restart_lsn)
     950             :     {
     951             :     }
     952             : 
     953             :     /*
     954             :      * We might have already flushed far enough to directly accept this lsn,
     955             :      * in this case there is no need to check for existing candidate LSNs
     956             :      */
     957         146 :     else if (current_lsn <= slot->data.confirmed_flush)
     958             :     {
     959         100 :         slot->candidate_restart_valid = current_lsn;
     960         100 :         slot->candidate_restart_lsn = restart_lsn;
     961             : 
     962             :         /* our candidate can directly be used */
     963         100 :         updated_lsn = true;
     964             :     }
     965             : 
     966             :     /*
     967             :      * Only increase if the previous values have been applied, otherwise we
     968             :      * might never end up updating if the receiver acks too slowly. A missed
     969             :      * value here will just cause some extra effort after reconnecting.
     970             :      */
     971         150 :     if (slot->candidate_restart_valid == InvalidXLogRecPtr)
     972             :     {
     973          20 :         slot->candidate_restart_valid = current_lsn;
     974          20 :         slot->candidate_restart_lsn = restart_lsn;
     975             : 
     976          20 :         elog(DEBUG1, "got new restart lsn %X/%X at %X/%X",
     977             :              (uint32) (restart_lsn >> 32), (uint32) restart_lsn,
     978             :              (uint32) (current_lsn >> 32), (uint32) current_lsn);
     979             :     }
     980             :     else
     981             :     {
     982         130 :         elog(DEBUG1, "failed to increase restart lsn: proposed %X/%X, after %X/%X, current candidate %X/%X, current after %X/%X, flushed up to %X/%X",
     983             :              (uint32) (restart_lsn >> 32), (uint32) restart_lsn,
     984             :              (uint32) (current_lsn >> 32), (uint32) current_lsn,
     985             :              (uint32) (slot->candidate_restart_lsn >> 32),
     986             :              (uint32) slot->candidate_restart_lsn,
     987             :              (uint32) (slot->candidate_restart_valid >> 32),
     988             :              (uint32) slot->candidate_restart_valid,
     989             :              (uint32) (slot->data.confirmed_flush >> 32),
     990             :              (uint32) slot->data.confirmed_flush
     991             :             );
     992             :     }
     993         150 :     SpinLockRelease(&slot->mutex);
     994             : 
     995             :     /* candidates are already valid with the current flush position, apply */
     996         150 :     if (updated_lsn)
     997         100 :         LogicalConfirmReceivedLocation(slot->data.confirmed_flush);
     998         150 : }
     999             : 
    1000             : /*
    1001             :  * Handle a consumer's confirmation having received all changes up to lsn.
    1002             :  */
    1003             : void
    1004        6466 : LogicalConfirmReceivedLocation(XLogRecPtr lsn)
    1005             : {
    1006             :     Assert(lsn != InvalidXLogRecPtr);
    1007             : 
    1008             :     /* Do an unlocked check for candidate_lsn first. */
    1009        6466 :     if (MyReplicationSlot->candidate_xmin_lsn != InvalidXLogRecPtr ||
    1010        6446 :         MyReplicationSlot->candidate_restart_valid != InvalidXLogRecPtr)
    1011         134 :     {
    1012         134 :         bool        updated_xmin = false;
    1013         134 :         bool        updated_restart = false;
    1014             : 
    1015         134 :         SpinLockAcquire(&MyReplicationSlot->mutex);
    1016             : 
    1017         134 :         MyReplicationSlot->data.confirmed_flush = lsn;
    1018             : 
    1019             :         /* if we're past the location required for bumping xmin, do so */
    1020         134 :         if (MyReplicationSlot->candidate_xmin_lsn != InvalidXLogRecPtr &&
    1021          20 :             MyReplicationSlot->candidate_xmin_lsn <= lsn)
    1022             :         {
    1023             :             /*
    1024             :              * We have to write the changed xmin to disk *before* we change
    1025             :              * the in-memory value, otherwise after a crash we wouldn't know
    1026             :              * that some catalog tuples might have been removed already.
    1027             :              *
    1028             :              * Ensure that by first writing to ->xmin and only update
    1029             :              * ->effective_xmin once the new state is synced to disk. After a
    1030             :              * crash ->effective_xmin is set to ->xmin.
    1031             :              */
    1032          20 :             if (TransactionIdIsValid(MyReplicationSlot->candidate_catalog_xmin) &&
    1033          20 :                 MyReplicationSlot->data.catalog_xmin != MyReplicationSlot->candidate_catalog_xmin)
    1034             :             {
    1035          20 :                 MyReplicationSlot->data.catalog_xmin = MyReplicationSlot->candidate_catalog_xmin;
    1036          20 :                 MyReplicationSlot->candidate_catalog_xmin = InvalidTransactionId;
    1037          20 :                 MyReplicationSlot->candidate_xmin_lsn = InvalidXLogRecPtr;
    1038          20 :                 updated_xmin = true;
    1039             :             }
    1040             :         }
    1041             : 
    1042         134 :         if (MyReplicationSlot->candidate_restart_valid != InvalidXLogRecPtr &&
    1043         120 :             MyReplicationSlot->candidate_restart_valid <= lsn)
    1044             :         {
    1045             :             Assert(MyReplicationSlot->candidate_restart_lsn != InvalidXLogRecPtr);
    1046             : 
    1047         118 :             MyReplicationSlot->data.restart_lsn = MyReplicationSlot->candidate_restart_lsn;
    1048         118 :             MyReplicationSlot->candidate_restart_lsn = InvalidXLogRecPtr;
    1049         118 :             MyReplicationSlot->candidate_restart_valid = InvalidXLogRecPtr;
    1050         118 :             updated_restart = true;
    1051             :         }
    1052             : 
    1053         134 :         SpinLockRelease(&MyReplicationSlot->mutex);
    1054             : 
    1055             :         /* first write new xmin to disk, so we know what's up after a crash */
    1056         134 :         if (updated_xmin || updated_restart)
    1057             :         {
    1058         132 :             ReplicationSlotMarkDirty();
    1059         132 :             ReplicationSlotSave();
    1060         132 :             elog(DEBUG1, "updated xmin: %u restart: %u", updated_xmin, updated_restart);
    1061             :         }
    1062             : 
    1063             :         /*
    1064             :          * Now the new xmin is safely on disk, we can let the global value
    1065             :          * advance. We do not take ProcArrayLock or similar since we only
    1066             :          * advance xmin here and there's not much harm done by a concurrent
    1067             :          * computation missing that.
    1068             :          */
    1069         134 :         if (updated_xmin)
    1070             :         {
    1071          20 :             SpinLockAcquire(&MyReplicationSlot->mutex);
    1072          20 :             MyReplicationSlot->effective_catalog_xmin = MyReplicationSlot->data.catalog_xmin;
    1073          20 :             SpinLockRelease(&MyReplicationSlot->mutex);
    1074             : 
    1075          20 :             ReplicationSlotsComputeRequiredXmin(false);
    1076          20 :             ReplicationSlotsComputeRequiredLSN();
    1077             :         }
    1078             :     }
    1079             :     else
    1080             :     {
    1081        6332 :         SpinLockAcquire(&MyReplicationSlot->mutex);
    1082        6332 :         MyReplicationSlot->data.confirmed_flush = lsn;
    1083        6332 :         SpinLockRelease(&MyReplicationSlot->mutex);
    1084             :     }
    1085        6466 : }

Generated by: LCOV version 1.13