LCOV - code coverage report
Current view: top level - src/backend/replication/logical - slotsync.c (source / functions) Hit Total Coverage
Test: PostgreSQL 19devel Lines: 469 530 88.5 %
Date: 2026-01-31 02:18:36 Functions: 30 30 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  * slotsync.c
       3             :  *     Functionality for synchronizing slots to a standby server from the
       4             :  *         primary server.
       5             :  *
       6             :  * Copyright (c) 2024-2026, PostgreSQL Global Development Group
       7             :  *
       8             :  * IDENTIFICATION
       9             :  *    src/backend/replication/logical/slotsync.c
      10             :  *
      11             :  * This file contains the code for slot synchronization on a physical standby
      12             :  * to fetch logical failover slots information from the primary server, create
      13             :  * the slots on the standby and synchronize them periodically.
      14             :  *
      15             :  * Slot synchronization can be performed either automatically by enabling slot
      16             :  * sync worker or manually by calling SQL function pg_sync_replication_slots().
      17             :  *
      18             :  * If the WAL corresponding to the remote's restart_lsn is not available on the
      19             :  * physical standby or the remote's catalog_xmin precedes the oldest xid for
      20             :  * which it is guaranteed that rows wouldn't have been removed then we cannot
      21             :  * create the local standby slot because that would mean moving the local slot
      22             :  * backward and decoding won't be possible via such a slot. In this case, the
      23             :  * slot will be marked as RS_TEMPORARY. Once the primary server catches up,
      24             :  * the slot will be marked as RS_PERSISTENT (which means sync-ready) after
      25             :  * which slot sync worker can perform the sync periodically or user can call
      26             :  * pg_sync_replication_slots() periodically to perform the syncs.
      27             :  *
      28             :  * If synchronized slots fail to build a consistent snapshot from the
      29             :  * restart_lsn before reaching confirmed_flush_lsn, they would become
      30             :  * unreliable after promotion due to potential data loss from changes
      31             :  * before reaching a consistent point. This can happen because the slots can
      32             :  * be synced at some random time and we may not reach the consistent point
      33             :  * at the same WAL location as the primary. So, we mark such slots as
      34             :  * RS_TEMPORARY. Once the decoding from corresponding LSNs can reach a
      35             :  * consistent point, they will be marked as RS_PERSISTENT.
      36             :  *
      37             :  * The slot sync worker waits for some time before the next synchronization,
      38             :  * with the duration varying based on whether any slots were updated during
      39             :  * the last cycle. Refer to the comments above wait_for_slot_activity() for
      40             :  * more details.
      41             :  *
      42             :  * If the SQL function pg_sync_replication_slots() is used to sync the slots,
      43             :  * and if the slots are not ready to be synced and are marked as RS_TEMPORARY
      44             :  * because of any of the reasons mentioned above, then the SQL function also
      45             :  * waits and retries until the slots are marked as RS_PERSISTENT (which means
      46             :  * sync-ready). Refer to the comments in SyncReplicationSlots() for more
      47             :  * details.
      48             :  *
      49             :  * Any standby synchronized slots will be dropped if they no longer need
      50             :  * to be synchronized. See comment atop drop_local_obsolete_slots() for more
      51             :  * details.
      52             :  *---------------------------------------------------------------------------
      53             :  */
      54             : 
      55             : #include "postgres.h"
      56             : 
      57             : #include <time.h>
      58             : 
      59             : #include "access/xlog_internal.h"
      60             : #include "access/xlogrecovery.h"
      61             : #include "catalog/pg_database.h"
      62             : #include "libpq/pqsignal.h"
      63             : #include "pgstat.h"
      64             : #include "postmaster/interrupt.h"
      65             : #include "replication/logical.h"
      66             : #include "replication/slotsync.h"
      67             : #include "replication/snapbuild.h"
      68             : #include "storage/ipc.h"
      69             : #include "storage/lmgr.h"
      70             : #include "storage/proc.h"
      71             : #include "storage/procarray.h"
      72             : #include "tcop/tcopprot.h"
      73             : #include "utils/builtins.h"
      74             : #include "utils/memutils.h"
      75             : #include "utils/pg_lsn.h"
      76             : #include "utils/ps_status.h"
      77             : #include "utils/timeout.h"
      78             : 
      79             : /*
      80             :  * Struct for sharing information to control slot synchronization.
      81             :  *
      82             :  * The 'pid' is either the slot sync worker's pid or the backend's pid running
      83             :  * the SQL function pg_sync_replication_slots(). When the startup process sets
      84             :  * 'stopSignaled' during promotion, it uses this 'pid' to wake up the currently
      85             :  * synchronizing process so that the process can immediately stop its
      86             :  * synchronizing work on seeing 'stopSignaled' set.
      87             :  * Setting 'stopSignaled' is also used to handle the race condition when the
      88             :  * postmaster has not noticed the promotion yet and thus may end up restarting
      89             :  * the slot sync worker. If 'stopSignaled' is set, the worker will exit in such a
      90             :  * case. The SQL function pg_sync_replication_slots() will also error out if
      91             :  * this flag is set. Note that we don't need to reset this variable as after
      92             :  * promotion the slot sync worker won't be restarted because the pmState
      93             :  * changes to PM_RUN from PM_HOT_STANDBY and we don't support demoting
      94             :  * primary without restarting the server. See LaunchMissingBackgroundProcesses.
      95             :  *
      96             :  * The 'syncing' flag is needed to prevent concurrent slot syncs to avoid slot
      97             :  * overwrites.
      98             :  *
      99             :  * The 'last_start_time' is needed by postmaster to start the slot sync worker
     100             :  * once per SLOTSYNC_RESTART_INTERVAL_SEC. In cases where an immediate restart
     101             :  * is expected (e.g., slot sync GUCs change), slot sync worker will reset
     102             :  * last_start_time before exiting, so that postmaster can start the worker
     103             :  * without waiting for SLOTSYNC_RESTART_INTERVAL_SEC.
     104             :  */
     105             : typedef struct SlotSyncCtxStruct
     106             : {
     107             :     pid_t       pid;
     108             :     bool        stopSignaled;
     109             :     bool        syncing;
     110             :     time_t      last_start_time;
     111             :     slock_t     mutex;
     112             : } SlotSyncCtxStruct;
     113             : 
     114             : static SlotSyncCtxStruct *SlotSyncCtx = NULL;
     115             : 
     116             : /* GUC variable */
     117             : bool        sync_replication_slots = false;
     118             : 
     119             : /*
     120             :  * The sleep time (ms) between slot-sync cycles varies dynamically
     121             :  * (within a MIN/MAX range) according to slot activity. See
     122             :  * wait_for_slot_activity() for details.
     123             :  */
     124             : #define MIN_SLOTSYNC_WORKER_NAPTIME_MS  200
     125             : #define MAX_SLOTSYNC_WORKER_NAPTIME_MS  30000   /* 30s */
     126             : 
     127             : static long sleep_ms = MIN_SLOTSYNC_WORKER_NAPTIME_MS;
     128             : 
     129             : /* The restart interval for slot sync work used by postmaster */
     130             : #define SLOTSYNC_RESTART_INTERVAL_SEC 10
     131             : 
     132             : /*
     133             :  * Flag to tell if we are syncing replication slots. Unlike the 'syncing' flag
     134             :  * in SlotSyncCtxStruct, this flag is true only if the current process is
     135             :  * performing slot synchronization.
     136             :  */
     137             : static bool syncing_slots = false;
     138             : 
     139             : /*
     140             :  * Structure to hold information fetched from the primary server about a logical
     141             :  * replication slot.
     142             :  */
     143             : typedef struct RemoteSlot
     144             : {
     145             :     char       *name;
     146             :     char       *plugin;
     147             :     char       *database;
     148             :     bool        two_phase;
     149             :     bool        failover;
     150             :     XLogRecPtr  restart_lsn;
     151             :     XLogRecPtr  confirmed_lsn;
     152             :     XLogRecPtr  two_phase_at;
     153             :     TransactionId catalog_xmin;
     154             : 
     155             :     /* RS_INVAL_NONE if valid, or the reason of invalidation */
     156             :     ReplicationSlotInvalidationCause invalidated;
     157             : } RemoteSlot;
     158             : 
     159             : static void slotsync_failure_callback(int code, Datum arg);
     160             : static void update_synced_slots_inactive_since(void);
     161             : 
     162             : /*
     163             :  * Update slot sync skip stats. This function requires the caller to acquire
     164             :  * the slot.
     165             :  */
     166             : static void
     167          94 : update_slotsync_skip_stats(SlotSyncSkipReason skip_reason)
     168             : {
     169             :     ReplicationSlot *slot;
     170             : 
     171             :     Assert(MyReplicationSlot);
     172             : 
     173          94 :     slot = MyReplicationSlot;
     174             : 
     175             :     /*
     176             :      * Update the slot sync related stats in pg_stat_replication_slots when a
     177             :      * slot sync is skipped
     178             :      */
     179          94 :     if (skip_reason != SS_SKIP_NONE)
     180           6 :         pgstat_report_replslotsync(slot);
     181             : 
     182             :     /* Update the slot sync skip reason */
     183          94 :     if (slot->slotsync_skip_reason != skip_reason)
     184             :     {
     185           6 :         SpinLockAcquire(&slot->mutex);
     186           6 :         slot->slotsync_skip_reason = skip_reason;
     187           6 :         SpinLockRelease(&slot->mutex);
     188             :     }
     189          94 : }
     190             : 
     191             : /*
     192             :  * If necessary, update the local synced slot's metadata based on the data
     193             :  * from the remote slot.
     194             :  *
     195             :  * If no update was needed (the data of the remote slot is the same as the
     196             :  * local slot) return false, otherwise true.
     197             :  *
     198             :  * *found_consistent_snapshot will be true iff the remote slot's LSN or xmin is
     199             :  * modified, and decoding from the corresponding LSN's can reach a
     200             :  * consistent snapshot.
     201             :  *
     202             :  * *remote_slot_precedes will be true if the remote slot's LSN or xmin
     203             :  * precedes locally reserved position.
     204             :  */
     205             : static bool
     206          94 : update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
     207             :                          bool *found_consistent_snapshot,
     208             :                          bool *remote_slot_precedes)
     209             : {
     210          94 :     ReplicationSlot *slot = MyReplicationSlot;
     211          94 :     bool        updated_xmin_or_lsn = false;
     212          94 :     bool        updated_config = false;
     213          94 :     SlotSyncSkipReason skip_reason = SS_SKIP_NONE;
     214             : 
     215             :     Assert(slot->data.invalidated == RS_INVAL_NONE);
     216             : 
     217          94 :     if (found_consistent_snapshot)
     218          18 :         *found_consistent_snapshot = false;
     219             : 
     220          94 :     if (remote_slot_precedes)
     221          18 :         *remote_slot_precedes = false;
     222             : 
     223             :     /*
     224             :      * Don't overwrite if we already have a newer catalog_xmin and
     225             :      * restart_lsn.
     226             :      */
     227         186 :     if (remote_slot->restart_lsn < slot->data.restart_lsn ||
     228          92 :         TransactionIdPrecedes(remote_slot->catalog_xmin,
     229             :                               slot->data.catalog_xmin))
     230             :     {
     231             :         /* Update slot sync skip stats */
     232           6 :         update_slotsync_skip_stats(SS_SKIP_WAL_OR_ROWS_REMOVED);
     233             : 
     234             :         /*
     235             :          * This can happen in following situations:
     236             :          *
     237             :          * If the slot is temporary, it means either the initial WAL location
     238             :          * reserved for the local slot is ahead of the remote slot's
     239             :          * restart_lsn or the initial xmin_horizon computed for the local slot
     240             :          * is ahead of the remote slot.
     241             :          *
     242             :          * If the slot is persistent, both restart_lsn and catalog_xmin of the
     243             :          * synced slot could still be ahead of the remote slot. Since we use
     244             :          * slot advance functionality to keep snapbuild/slot updated, it is
     245             :          * possible that the restart_lsn and catalog_xmin are advanced to a
     246             :          * later position than it has on the primary. This can happen when
     247             :          * slot advancing machinery finds running xacts record after reaching
     248             :          * the consistent state at a later point than the primary where it
     249             :          * serializes the snapshot and updates the restart_lsn.
     250             :          *
     251             :          * We LOG the message if the slot is temporary as it can help the user
     252             :          * to understand why the slot is not sync-ready. In the case of a
     253             :          * persistent slot, it would be a more common case and won't directly
     254             :          * impact the users, so we used DEBUG1 level to log the message.
     255             :          */
     256           6 :         ereport(slot->data.persistency == RS_TEMPORARY ? LOG : DEBUG1,
     257             :                 errmsg("could not synchronize replication slot \"%s\"",
     258             :                        remote_slot->name),
     259             :                 errdetail("Synchronization could lead to data loss, because the remote slot needs WAL at LSN %X/%08X and catalog xmin %u, but the standby has LSN %X/%08X and catalog xmin %u.",
     260             :                           LSN_FORMAT_ARGS(remote_slot->restart_lsn),
     261             :                           remote_slot->catalog_xmin,
     262             :                           LSN_FORMAT_ARGS(slot->data.restart_lsn),
     263             :                           slot->data.catalog_xmin));
     264             : 
     265           6 :         if (remote_slot_precedes)
     266           6 :             *remote_slot_precedes = true;
     267             : 
     268             :         /*
     269             :          * Skip updating the configuration. This is required to avoid syncing
     270             :          * two_phase_at without syncing confirmed_lsn. Otherwise, the prepared
     271             :          * transaction between old confirmed_lsn and two_phase_at will
     272             :          * unexpectedly get decoded and sent to the downstream after
     273             :          * promotion. See comments in ReorderBufferFinishPrepared.
     274             :          */
     275           6 :         return false;
     276             :     }
     277             : 
     278             :     /*
     279             :      * Attempt to sync LSNs and xmins only if remote slot is ahead of local
     280             :      * slot.
     281             :      */
     282          88 :     if (remote_slot->confirmed_lsn > slot->data.confirmed_flush ||
     283         122 :         remote_slot->restart_lsn > slot->data.restart_lsn ||
     284          60 :         TransactionIdFollows(remote_slot->catalog_xmin,
     285             :                              slot->data.catalog_xmin))
     286             :     {
     287             :         /*
     288             :          * We can't directly copy the remote slot's LSN or xmin unless there
     289             :          * exists a consistent snapshot at that point. Otherwise, after
     290             :          * promotion, the slots may not reach a consistent point before the
     291             :          * confirmed_flush_lsn which can lead to a data loss. To avoid data
     292             :          * loss, we let slot machinery advance the slot which ensures that
     293             :          * snapbuilder/slot statuses are updated properly.
     294             :          */
     295          28 :         if (SnapBuildSnapshotExists(remote_slot->restart_lsn))
     296             :         {
     297             :             /*
     298             :              * Update the slot info directly if there is a serialized snapshot
     299             :              * at the restart_lsn, as the slot can quickly reach consistency
     300             :              * at restart_lsn by restoring the snapshot.
     301             :              */
     302           8 :             SpinLockAcquire(&slot->mutex);
     303           8 :             slot->data.restart_lsn = remote_slot->restart_lsn;
     304           8 :             slot->data.confirmed_flush = remote_slot->confirmed_lsn;
     305           8 :             slot->data.catalog_xmin = remote_slot->catalog_xmin;
     306           8 :             SpinLockRelease(&slot->mutex);
     307             : 
     308           8 :             if (found_consistent_snapshot)
     309           0 :                 *found_consistent_snapshot = true;
     310             :         }
     311             :         else
     312             :         {
     313          20 :             LogicalSlotAdvanceAndCheckSnapState(remote_slot->confirmed_lsn,
     314             :                                                 found_consistent_snapshot);
     315             : 
     316             :             /* Sanity check */
     317          20 :             if (slot->data.confirmed_flush != remote_slot->confirmed_lsn)
     318           0 :                 ereport(ERROR,
     319             :                         errmsg_internal("synchronized confirmed_flush for slot \"%s\" differs from remote slot",
     320             :                                         remote_slot->name),
     321             :                         errdetail_internal("Remote slot has LSN %X/%08X but local slot has LSN %X/%08X.",
     322             :                                            LSN_FORMAT_ARGS(remote_slot->confirmed_lsn),
     323             :                                            LSN_FORMAT_ARGS(slot->data.confirmed_flush)));
     324             : 
     325             :             /*
     326             :              * If we can't reach a consistent snapshot, the slot won't be
     327             :              * persisted. See update_and_persist_local_synced_slot().
     328             :              */
     329          20 :             if (found_consistent_snapshot && !(*found_consistent_snapshot))
     330           0 :                 skip_reason = SS_SKIP_NO_CONSISTENT_SNAPSHOT;
     331             :         }
     332             : 
     333          28 :         updated_xmin_or_lsn = true;
     334             :     }
     335             : 
     336             :     /* Update slot sync skip stats */
     337          88 :     update_slotsync_skip_stats(skip_reason);
     338             : 
     339          88 :     if (remote_dbid != slot->data.database ||
     340          88 :         remote_slot->two_phase != slot->data.two_phase ||
     341          86 :         remote_slot->failover != slot->data.failover ||
     342          86 :         strcmp(remote_slot->plugin, NameStr(slot->data.plugin)) != 0 ||
     343          86 :         remote_slot->two_phase_at != slot->data.two_phase_at)
     344             :     {
     345             :         NameData    plugin_name;
     346             : 
     347             :         /* Avoid expensive operations while holding a spinlock. */
     348           2 :         namestrcpy(&plugin_name, remote_slot->plugin);
     349             : 
     350           2 :         SpinLockAcquire(&slot->mutex);
     351           2 :         slot->data.plugin = plugin_name;
     352           2 :         slot->data.database = remote_dbid;
     353           2 :         slot->data.two_phase = remote_slot->two_phase;
     354           2 :         slot->data.two_phase_at = remote_slot->two_phase_at;
     355           2 :         slot->data.failover = remote_slot->failover;
     356           2 :         SpinLockRelease(&slot->mutex);
     357             : 
     358           2 :         updated_config = true;
     359             : 
     360             :         /*
     361             :          * Ensure that there is no risk of sending prepared transactions
     362             :          * unexpectedly after the promotion.
     363             :          */
     364             :         Assert(slot->data.two_phase_at <= slot->data.confirmed_flush);
     365             :     }
     366             : 
     367             :     /*
     368             :      * We have to write the changed xmin to disk *before* we change the
     369             :      * in-memory value, otherwise after a crash we wouldn't know that some
     370             :      * catalog tuples might have been removed already.
     371             :      */
     372          88 :     if (updated_config || updated_xmin_or_lsn)
     373             :     {
     374          30 :         ReplicationSlotMarkDirty();
     375          30 :         ReplicationSlotSave();
     376             :     }
     377             : 
     378             :     /*
     379             :      * Now the new xmin is safely on disk, we can let the global value
     380             :      * advance. We do not take ProcArrayLock or similar since we only advance
     381             :      * xmin here and there's not much harm done by a concurrent computation
     382             :      * missing that.
     383             :      */
     384          88 :     if (updated_xmin_or_lsn)
     385             :     {
     386          28 :         SpinLockAcquire(&slot->mutex);
     387          28 :         slot->effective_catalog_xmin = remote_slot->catalog_xmin;
     388          28 :         SpinLockRelease(&slot->mutex);
     389             : 
     390          28 :         ReplicationSlotsComputeRequiredXmin(false);
     391          28 :         ReplicationSlotsComputeRequiredLSN();
     392             :     }
     393             : 
     394          88 :     return updated_config || updated_xmin_or_lsn;
     395             : }
     396             : 
     397             : /*
     398             :  * Get the list of local logical slots that are synchronized from the
     399             :  * primary server.
     400             :  */
     401             : static List *
     402          54 : get_local_synced_slots(void)
     403             : {
     404          54 :     List       *local_slots = NIL;
     405             : 
     406          54 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     407             : 
     408         594 :     for (int i = 0; i < max_replication_slots; i++)
     409             :     {
     410         540 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
     411             : 
     412             :         /* Check if it is a synchronized slot */
     413         540 :         if (s->in_use && s->data.synced)
     414             :         {
     415             :             Assert(SlotIsLogical(s));
     416          84 :             local_slots = lappend(local_slots, s);
     417             :         }
     418             :     }
     419             : 
     420          54 :     LWLockRelease(ReplicationSlotControlLock);
     421             : 
     422          54 :     return local_slots;
     423             : }
     424             : 
     425             : /*
     426             :  * Helper function to check if local_slot is required to be retained.
     427             :  *
     428             :  * Return false either if local_slot does not exist in the remote_slots list
     429             :  * or is invalidated while the corresponding remote slot is still valid,
     430             :  * otherwise true.
     431             :  */
     432             : static bool
     433          84 : local_sync_slot_required(ReplicationSlot *local_slot, List *remote_slots)
     434             : {
     435          84 :     bool        remote_exists = false;
     436          84 :     bool        locally_invalidated = false;
     437             : 
     438         206 :     foreach_ptr(RemoteSlot, remote_slot, remote_slots)
     439             :     {
     440         120 :         if (strcmp(remote_slot->name, NameStr(local_slot->data.name)) == 0)
     441             :         {
     442          82 :             remote_exists = true;
     443             : 
     444             :             /*
     445             :              * If remote slot is not invalidated but local slot is marked as
     446             :              * invalidated, then set locally_invalidated flag.
     447             :              */
     448          82 :             SpinLockAcquire(&local_slot->mutex);
     449          82 :             locally_invalidated =
     450         164 :                 (remote_slot->invalidated == RS_INVAL_NONE) &&
     451          82 :                 (local_slot->data.invalidated != RS_INVAL_NONE);
     452          82 :             SpinLockRelease(&local_slot->mutex);
     453             : 
     454          82 :             break;
     455             :         }
     456             :     }
     457             : 
     458          84 :     return (remote_exists && !locally_invalidated);
     459             : }
     460             : 
     461             : /*
     462             :  * Drop local obsolete slots.
     463             :  *
     464             :  * Drop the local slots that no longer need to be synced i.e. these either do
     465             :  * not exist on the primary or are no longer enabled for failover.
     466             :  *
     467             :  * Additionally, drop any slots that are valid on the primary but got
     468             :  * invalidated on the standby. This situation may occur due to the following
     469             :  * reasons:
     470             :  * - The 'max_slot_wal_keep_size' on the standby is insufficient to retain WAL
     471             :  *   records from the restart_lsn of the slot.
     472             :  * - 'primary_slot_name' is temporarily reset to null and the physical slot is
     473             :  *   removed.
     474             :  * These dropped slots will get recreated in next sync-cycle and it is okay to
     475             :  * drop and recreate such slots as long as these are not consumable on the
     476             :  * standby (which is the case currently).
     477             :  *
     478             :  * Note: Change of 'wal_level' on the primary server to a level lower than
     479             :  * logical may also result in slot invalidation and removal on the standby.
     480             :  * This is because such 'wal_level' change is only possible if the logical
     481             :  * slots are removed on the primary server, so it's expected to see the
     482             :  * slots being invalidated and removed on the standby too (and re-created
     483             :  * if they are re-created on the primary server).
     484             :  */
     485             : static void
     486          54 : drop_local_obsolete_slots(List *remote_slot_list)
     487             : {
     488          54 :     List       *local_slots = get_local_synced_slots();
     489             : 
     490         192 :     foreach_ptr(ReplicationSlot, local_slot, local_slots)
     491             :     {
     492             :         /* Drop the local slot if it is not required to be retained. */
     493          84 :         if (!local_sync_slot_required(local_slot, remote_slot_list))
     494             :         {
     495             :             bool        synced_slot;
     496             : 
     497             :             /*
     498             :              * Use shared lock to prevent a conflict with
     499             :              * ReplicationSlotsDropDBSlots(), trying to drop the same slot
     500             :              * during a drop-database operation.
     501             :              */
     502           4 :             LockSharedObject(DatabaseRelationId, local_slot->data.database,
     503             :                              0, AccessShareLock);
     504             : 
     505             :             /*
     506             :              * In the small window between getting the slot to drop and
     507             :              * locking the database, there is a possibility of a parallel
     508             :              * database drop by the startup process and the creation of a new
     509             :              * slot by the user. This new user-created slot may end up using
     510             :              * the same shared memory as that of 'local_slot'. Thus check if
     511             :              * local_slot is still the synced one before performing actual
     512             :              * drop.
     513             :              */
     514           4 :             SpinLockAcquire(&local_slot->mutex);
     515           4 :             synced_slot = local_slot->in_use && local_slot->data.synced;
     516           4 :             SpinLockRelease(&local_slot->mutex);
     517             : 
     518           4 :             if (synced_slot)
     519             :             {
     520           4 :                 ReplicationSlotAcquire(NameStr(local_slot->data.name), true, false);
     521           4 :                 ReplicationSlotDropAcquired();
     522             :             }
     523             : 
     524           4 :             UnlockSharedObject(DatabaseRelationId, local_slot->data.database,
     525             :                                0, AccessShareLock);
     526             : 
     527           4 :             ereport(LOG,
     528             :                     errmsg("dropped replication slot \"%s\" of database with OID %u",
     529             :                            NameStr(local_slot->data.name),
     530             :                            local_slot->data.database));
     531             :         }
     532             :     }
     533          54 : }
     534             : 
     535             : /*
     536             :  * Reserve WAL for the currently active local slot using the specified WAL
     537             :  * location (restart_lsn).
     538             :  *
     539             :  * If the given WAL location has been removed or is at risk of removal,
     540             :  * reserve WAL using the oldest segment that is non-removable.
     541             :  */
     542             : static void
     543          14 : reserve_wal_for_local_slot(XLogRecPtr restart_lsn)
     544             : {
     545             :     XLogRecPtr  slot_min_lsn;
     546             :     XLogRecPtr  min_safe_lsn;
     547             :     XLogSegNo   segno;
     548          14 :     ReplicationSlot *slot = MyReplicationSlot;
     549             : 
     550             :     Assert(slot != NULL);
     551             :     Assert(!XLogRecPtrIsValid(slot->data.restart_lsn));
     552             : 
     553             :     /*
     554             :      * Acquire an exclusive lock to prevent the checkpoint process from
     555             :      * concurrently calculating the minimum slot LSN (see
     556             :      * CheckPointReplicationSlots), ensuring that if WAL reservation occurs
     557             :      * first, the checkpoint must wait for the restart_lsn update before
     558             :      * calculating the minimum LSN.
     559             :      *
     560             :      * Note: Unlike ReplicationSlotReserveWal(), this lock does not protect a
     561             :      * newly synced slot from being invalidated if a concurrent checkpoint has
     562             :      * invoked CheckPointReplicationSlots() before the WAL reservation here.
     563             :      * This can happen because the initial restart_lsn received from the
     564             :      * remote server can precede the redo pointer. Therefore, when selecting
     565             :      * the initial restart_lsn, we consider using the redo pointer or the
     566             :      * minimum slot LSN (if those values are greater than the remote
     567             :      * restart_lsn) instead of relying solely on the remote value.
     568             :      */
     569          14 :     LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
     570             : 
     571             :     /*
     572             :      * Determine the minimum non-removable LSN by comparing the redo pointer
     573             :      * with the minimum slot LSN.
     574             :      *
     575             :      * The minimum slot LSN is considered because the redo pointer advances at
     576             :      * every checkpoint, even when replication slots are present on the
     577             :      * standby. In such scenarios, the redo pointer can exceed the remote
     578             :      * restart_lsn, while WALs preceding the remote restart_lsn remain
     579             :      * protected by a local replication slot.
     580             :      */
     581          14 :     min_safe_lsn = GetRedoRecPtr();
     582          14 :     slot_min_lsn = XLogGetReplicationSlotMinimumLSN();
     583             : 
     584          14 :     if (XLogRecPtrIsValid(slot_min_lsn) && min_safe_lsn > slot_min_lsn)
     585           0 :         min_safe_lsn = slot_min_lsn;
     586             : 
     587             :     /*
     588             :      * If the minimum safe LSN is greater than the given restart_lsn, use it
     589             :      * as the initial restart_lsn for the newly synced slot. Otherwise, use
     590             :      * the given remote restart_lsn.
     591             :      */
     592          14 :     SpinLockAcquire(&slot->mutex);
     593          14 :     slot->data.restart_lsn = Max(restart_lsn, min_safe_lsn);
     594          14 :     SpinLockRelease(&slot->mutex);
     595             : 
     596          14 :     ReplicationSlotsComputeRequiredLSN();
     597             : 
     598          14 :     XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size);
     599          14 :     if (XLogGetLastRemovedSegno() >= segno)
     600           0 :         elog(ERROR, "WAL required by replication slot %s has been removed concurrently",
     601             :              NameStr(slot->data.name));
     602             : 
     603          14 :     LWLockRelease(ReplicationSlotAllocationLock);
     604          14 : }
     605             : 
     606             : /*
     607             :  * If the remote restart_lsn and catalog_xmin have caught up with the
     608             :  * local ones, then update the LSNs and persist the local synced slot for
     609             :  * future synchronization; otherwise, do nothing.
     610             :  *
     611             :  * *slot_persistence_pending is set to true if any of the slots fail to
     612             :  * persist.
     613             :  *
     614             :  * Return true if the slot is marked as RS_PERSISTENT (sync-ready), otherwise
     615             :  * false.
     616             :  */
     617             : static bool
     618          18 : update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
     619             :                                      bool *slot_persistence_pending)
     620             : {
     621          18 :     ReplicationSlot *slot = MyReplicationSlot;
     622          18 :     bool        found_consistent_snapshot = false;
     623          18 :     bool        remote_slot_precedes = false;
     624             : 
     625             :     /* Slotsync skip stats are handled in function update_local_synced_slot() */
     626          18 :     (void) update_local_synced_slot(remote_slot, remote_dbid,
     627             :                                     &found_consistent_snapshot,
     628             :                                     &remote_slot_precedes);
     629             : 
     630             :     /*
     631             :      * Check if the primary server has caught up. Refer to the comment atop
     632             :      * the file for details on this check.
     633             :      */
     634          18 :     if (remote_slot_precedes)
     635             :     {
     636             :         /*
     637             :          * The remote slot didn't catch up to locally reserved position.
     638             :          *
     639             :          * We do not drop the slot because the restart_lsn can be ahead of the
     640             :          * current location when recreating the slot in the next cycle. It may
     641             :          * take more time to create such a slot. Therefore, we keep this slot
     642             :          * and attempt the synchronization in the next cycle.
     643             :          *
     644             :          * We also update the slot_persistence_pending parameter, so the SQL
     645             :          * function can retry.
     646             :          */
     647           6 :         if (slot_persistence_pending)
     648           4 :             *slot_persistence_pending = true;
     649             : 
     650           6 :         return false;
     651             :     }
     652             : 
     653             :     /*
     654             :      * Don't persist the slot if it cannot reach the consistent point from the
     655             :      * restart_lsn. See comments atop this file.
     656             :      */
     657          12 :     if (!found_consistent_snapshot)
     658             :     {
     659           0 :         ereport(LOG,
     660             :                 errmsg("could not synchronize replication slot \"%s\"", remote_slot->name),
     661             :                 errdetail("Synchronization could lead to data loss, because the standby could not build a consistent snapshot to decode WALs at LSN %X/%08X.",
     662             :                           LSN_FORMAT_ARGS(slot->data.restart_lsn)));
     663             : 
     664             :         /* Set this, so that SQL function can retry */
     665           0 :         if (slot_persistence_pending)
     666           0 :             *slot_persistence_pending = true;
     667             : 
     668           0 :         return false;
     669             :     }
     670             : 
     671          12 :     ReplicationSlotPersist();
     672             : 
     673          12 :     ereport(LOG,
     674             :             errmsg("newly created replication slot \"%s\" is sync-ready now",
     675             :                    remote_slot->name));
     676             : 
     677          12 :     return true;
     678             : }
     679             : 
     680             : /*
     681             :  * Synchronize a single slot to the given position.
     682             :  *
     683             :  * This creates a new slot if there is no existing one and updates the
     684             :  * metadata of the slot as per the data received from the primary server.
     685             :  *
     686             :  * The slot is created as a temporary slot and stays in the same state until the
     687             :  * remote_slot catches up with locally reserved position and local slot is
     688             :  * updated. The slot is then persisted and is considered as sync-ready for
     689             :  * periodic syncs.
     690             :  *
     691             :  * *slot_persistence_pending is set to true if any of the slots fail to
     692             :  * persist.
     693             :  *
     694             :  * Returns TRUE if the local slot is updated.
     695             :  */
     696             : static bool
     697          94 : synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid,
     698             :                      bool *slot_persistence_pending)
     699             : {
     700             :     ReplicationSlot *slot;
     701          94 :     XLogRecPtr  latestFlushPtr = GetStandbyFlushRecPtr(NULL);
     702          94 :     bool        slot_updated = false;
     703             : 
     704             :     /* Search for the named slot */
     705          94 :     if ((slot = SearchNamedReplicationSlot(remote_slot->name, true)))
     706             :     {
     707             :         bool        synced;
     708             : 
     709          80 :         SpinLockAcquire(&slot->mutex);
     710          80 :         synced = slot->data.synced;
     711          80 :         SpinLockRelease(&slot->mutex);
     712             : 
     713             :         /* User-created slot with the same name exists, raise ERROR. */
     714          80 :         if (!synced)
     715           0 :             ereport(ERROR,
     716             :                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     717             :                     errmsg("exiting from slot synchronization because same"
     718             :                            " name slot \"%s\" already exists on the standby",
     719             :                            remote_slot->name));
     720             : 
     721             :         /*
     722             :          * The slot has been synchronized before.
     723             :          *
     724             :          * It is important to acquire the slot here before checking
     725             :          * invalidation. If we don't acquire the slot first, there could be a
     726             :          * race condition that the local slot could be invalidated just after
     727             :          * checking the 'invalidated' flag here and we could end up
     728             :          * overwriting 'invalidated' flag to remote_slot's value. See
     729             :          * InvalidatePossiblyObsoleteSlot() where it invalidates slot directly
     730             :          * if the slot is not acquired by other processes.
     731             :          *
     732             :          * XXX: If it ever turns out that slot acquire/release is costly for
     733             :          * cases when none of the slot properties is changed then we can do a
     734             :          * pre-check to ensure that at least one of the slot properties is
     735             :          * changed before acquiring the slot.
     736             :          */
     737          80 :         ReplicationSlotAcquire(remote_slot->name, true, false);
     738             : 
     739             :         Assert(slot == MyReplicationSlot);
     740             : 
     741             :         /*
     742             :          * Copy the invalidation cause from remote only if local slot is not
     743             :          * invalidated locally, we don't want to overwrite existing one.
     744             :          */
     745          80 :         if (slot->data.invalidated == RS_INVAL_NONE &&
     746          80 :             remote_slot->invalidated != RS_INVAL_NONE)
     747             :         {
     748           0 :             SpinLockAcquire(&slot->mutex);
     749           0 :             slot->data.invalidated = remote_slot->invalidated;
     750           0 :             SpinLockRelease(&slot->mutex);
     751             : 
     752             :             /* Make sure the invalidated state persists across server restart */
     753           0 :             ReplicationSlotMarkDirty();
     754           0 :             ReplicationSlotSave();
     755             : 
     756           0 :             slot_updated = true;
     757             :         }
     758             : 
     759             :         /* Skip the sync of an invalidated slot */
     760          80 :         if (slot->data.invalidated != RS_INVAL_NONE)
     761             :         {
     762           0 :             update_slotsync_skip_stats(SS_SKIP_INVALID);
     763             : 
     764           0 :             ReplicationSlotRelease();
     765           0 :             return slot_updated;
     766             :         }
     767             : 
     768             :         /*
     769             :          * Make sure that concerned WAL is received and flushed before syncing
     770             :          * slot to target lsn received from the primary server.
     771             :          *
     772             :          * Report statistics only after the slot has been acquired, ensuring
     773             :          * it cannot be dropped during the reporting process.
     774             :          */
     775          80 :         if (remote_slot->confirmed_lsn > latestFlushPtr)
     776             :         {
     777           0 :             update_slotsync_skip_stats(SS_SKIP_WAL_NOT_FLUSHED);
     778             : 
     779             :             /*
     780             :              * Can get here only if GUC 'synchronized_standby_slots' on the
     781             :              * primary server was not configured correctly.
     782             :              */
     783           0 :             ereport(AmLogicalSlotSyncWorkerProcess() ? LOG : ERROR,
     784             :                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     785             :                     errmsg("skipping slot synchronization because the received slot sync"
     786             :                            " LSN %X/%08X for slot \"%s\" is ahead of the standby position %X/%08X",
     787             :                            LSN_FORMAT_ARGS(remote_slot->confirmed_lsn),
     788             :                            remote_slot->name,
     789             :                            LSN_FORMAT_ARGS(latestFlushPtr)));
     790             : 
     791           0 :             ReplicationSlotRelease();
     792             : 
     793           0 :             return slot_updated;
     794             :         }
     795             : 
     796             :         /* Slot not ready yet, let's attempt to make it sync-ready now. */
     797          80 :         if (slot->data.persistency == RS_TEMPORARY)
     798             :         {
     799           4 :             slot_updated = update_and_persist_local_synced_slot(remote_slot,
     800             :                                                                 remote_dbid,
     801             :                                                                 slot_persistence_pending);
     802             :         }
     803             : 
     804             :         /* Slot ready for sync, so sync it. */
     805             :         else
     806             :         {
     807             :             /*
     808             :              * Sanity check: As long as the invalidations are handled
     809             :              * appropriately as above, this should never happen.
     810             :              *
     811             :              * We don't need to check restart_lsn here. See the comments in
     812             :              * update_local_synced_slot() for details.
     813             :              */
     814          76 :             if (remote_slot->confirmed_lsn < slot->data.confirmed_flush)
     815           0 :                 ereport(ERROR,
     816             :                         errmsg_internal("cannot synchronize local slot \"%s\"",
     817             :                                         remote_slot->name),
     818             :                         errdetail_internal("Local slot's start streaming location LSN(%X/%08X) is ahead of remote slot's LSN(%X/%08X).",
     819             :                                            LSN_FORMAT_ARGS(slot->data.confirmed_flush),
     820             :                                            LSN_FORMAT_ARGS(remote_slot->confirmed_lsn)));
     821             : 
     822          76 :             slot_updated = update_local_synced_slot(remote_slot, remote_dbid,
     823             :                                                     NULL, NULL);
     824             :         }
     825             :     }
     826             :     /* Otherwise create the slot first. */
     827             :     else
     828             :     {
     829             :         NameData    plugin_name;
     830          14 :         TransactionId xmin_horizon = InvalidTransactionId;
     831             : 
     832             :         /* Skip creating the local slot if remote_slot is invalidated already */
     833          14 :         if (remote_slot->invalidated != RS_INVAL_NONE)
     834           0 :             return false;
     835             : 
     836             :         /*
     837             :          * We create temporary slots instead of ephemeral slots here because
     838             :          * we want the slots to survive after releasing them. This is done to
     839             :          * avoid dropping and re-creating the slots in each synchronization
     840             :          * cycle if the restart_lsn or catalog_xmin of the remote slot has not
     841             :          * caught up.
     842             :          */
     843          14 :         ReplicationSlotCreate(remote_slot->name, true, RS_TEMPORARY,
     844          14 :                               remote_slot->two_phase,
     845          14 :                               remote_slot->failover,
     846             :                               true);
     847             : 
     848             :         /* For shorter lines. */
     849          14 :         slot = MyReplicationSlot;
     850             : 
     851             :         /* Avoid expensive operations while holding a spinlock. */
     852          14 :         namestrcpy(&plugin_name, remote_slot->plugin);
     853             : 
     854          14 :         SpinLockAcquire(&slot->mutex);
     855          14 :         slot->data.database = remote_dbid;
     856          14 :         slot->data.plugin = plugin_name;
     857          14 :         SpinLockRelease(&slot->mutex);
     858             : 
     859          14 :         reserve_wal_for_local_slot(remote_slot->restart_lsn);
     860             : 
     861          14 :         LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
     862          14 :         LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
     863          14 :         xmin_horizon = GetOldestSafeDecodingTransactionId(true);
     864          14 :         SpinLockAcquire(&slot->mutex);
     865          14 :         slot->effective_catalog_xmin = xmin_horizon;
     866          14 :         slot->data.catalog_xmin = xmin_horizon;
     867          14 :         SpinLockRelease(&slot->mutex);
     868          14 :         ReplicationSlotsComputeRequiredXmin(true);
     869          14 :         LWLockRelease(ProcArrayLock);
     870          14 :         LWLockRelease(ReplicationSlotControlLock);
     871             : 
     872             :         /*
     873             :          * Make sure that concerned WAL is received and flushed before syncing
     874             :          * slot to target lsn received from the primary server.
     875             :          *
     876             :          * Report statistics only after the slot has been acquired, ensuring
     877             :          * it cannot be dropped during the reporting process.
     878             :          */
     879          14 :         if (remote_slot->confirmed_lsn > latestFlushPtr)
     880             :         {
     881           0 :             update_slotsync_skip_stats(SS_SKIP_WAL_NOT_FLUSHED);
     882             : 
     883             :             /*
     884             :              * Can get here only if GUC 'synchronized_standby_slots' on the
     885             :              * primary server was not configured correctly.
     886             :              */
     887           0 :             ereport(AmLogicalSlotSyncWorkerProcess() ? LOG : ERROR,
     888             :                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     889             :                     errmsg("skipping slot synchronization because the received slot sync"
     890             :                            " LSN %X/%08X for slot \"%s\" is ahead of the standby position %X/%08X",
     891             :                            LSN_FORMAT_ARGS(remote_slot->confirmed_lsn),
     892             :                            remote_slot->name,
     893             :                            LSN_FORMAT_ARGS(latestFlushPtr)));
     894             : 
     895           0 :             ReplicationSlotRelease();
     896             : 
     897           0 :             return false;
     898             :         }
     899             : 
     900          14 :         update_and_persist_local_synced_slot(remote_slot, remote_dbid,
     901             :                                              slot_persistence_pending);
     902             : 
     903          14 :         slot_updated = true;
     904             :     }
     905             : 
     906          94 :     ReplicationSlotRelease();
     907             : 
     908          94 :     return slot_updated;
     909             : }
     910             : 
     911             : /*
     912             :  * Fetch remote slots.
     913             :  *
     914             :  * If slot_names is NIL, fetches all failover logical slots from the
     915             :  * primary server, otherwise fetches only the ones with names in slot_names.
     916             :  *
     917             :  * Returns a list of remote slot information structures, or NIL if none
     918             :  * are found.
     919             :  */
     920             : static List *
     921          56 : fetch_remote_slots(WalReceiverConn *wrconn, List *slot_names)
     922             : {
     923             : #define SLOTSYNC_COLUMN_COUNT 10
     924          56 :     Oid         slotRow[SLOTSYNC_COLUMN_COUNT] = {TEXTOID, TEXTOID, LSNOID,
     925             :     LSNOID, XIDOID, BOOLOID, LSNOID, BOOLOID, TEXTOID, TEXTOID};
     926             : 
     927             :     WalRcvExecResult *res;
     928             :     TupleTableSlot *tupslot;
     929          56 :     List       *remote_slot_list = NIL;
     930             :     StringInfoData query;
     931             : 
     932          56 :     initStringInfo(&query);
     933          56 :     appendStringInfoString(&query,
     934             :                            "SELECT slot_name, plugin, confirmed_flush_lsn,"
     935             :                            " restart_lsn, catalog_xmin, two_phase,"
     936             :                            " two_phase_at, failover,"
     937             :                            " database, invalidation_reason"
     938             :                            " FROM pg_catalog.pg_replication_slots"
     939             :                            " WHERE failover and NOT temporary");
     940             : 
     941          56 :     if (slot_names != NIL)
     942             :     {
     943           4 :         bool        first_slot = true;
     944             : 
     945             :         /*
     946             :          * Construct the query to fetch only the specified slots
     947             :          */
     948           4 :         appendStringInfoString(&query, " AND slot_name IN (");
     949             : 
     950          12 :         foreach_ptr(char, slot_name, slot_names)
     951             :         {
     952           4 :             if (!first_slot)
     953           0 :                 appendStringInfoString(&query, ", ");
     954             : 
     955           4 :             appendStringInfo(&query, "%s", quote_literal_cstr(slot_name));
     956           4 :             first_slot = false;
     957             :         }
     958           4 :         appendStringInfoChar(&query, ')');
     959             :     }
     960             : 
     961             :     /* Execute the query */
     962          56 :     res = walrcv_exec(wrconn, query.data, SLOTSYNC_COLUMN_COUNT, slotRow);
     963          56 :     pfree(query.data);
     964          56 :     if (res->status != WALRCV_OK_TUPLES)
     965           2 :         ereport(ERROR,
     966             :                 errmsg("could not fetch failover logical slots info from the primary server: %s",
     967             :                        res->err));
     968             : 
     969          54 :     tupslot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
     970         148 :     while (tuplestore_gettupleslot(res->tuplestore, true, false, tupslot))
     971             :     {
     972             :         bool        isnull;
     973          94 :         RemoteSlot *remote_slot = palloc0_object(RemoteSlot);
     974             :         Datum       d;
     975          94 :         int         col = 0;
     976             : 
     977          94 :         remote_slot->name = TextDatumGetCString(slot_getattr(tupslot, ++col,
     978             :                                                              &isnull));
     979             :         Assert(!isnull);
     980             : 
     981          94 :         remote_slot->plugin = TextDatumGetCString(slot_getattr(tupslot, ++col,
     982             :                                                                &isnull));
     983             :         Assert(!isnull);
     984             : 
     985             :         /*
     986             :          * It is possible to get null values for LSN and Xmin if slot is
     987             :          * invalidated on the primary server, so handle accordingly.
     988             :          */
     989          94 :         d = slot_getattr(tupslot, ++col, &isnull);
     990          94 :         remote_slot->confirmed_lsn = isnull ? InvalidXLogRecPtr :
     991          94 :             DatumGetLSN(d);
     992             : 
     993          94 :         d = slot_getattr(tupslot, ++col, &isnull);
     994          94 :         remote_slot->restart_lsn = isnull ? InvalidXLogRecPtr : DatumGetLSN(d);
     995             : 
     996          94 :         d = slot_getattr(tupslot, ++col, &isnull);
     997          94 :         remote_slot->catalog_xmin = isnull ? InvalidTransactionId :
     998          94 :             DatumGetTransactionId(d);
     999             : 
    1000          94 :         remote_slot->two_phase = DatumGetBool(slot_getattr(tupslot, ++col,
    1001             :                                                            &isnull));
    1002             :         Assert(!isnull);
    1003             : 
    1004          94 :         d = slot_getattr(tupslot, ++col, &isnull);
    1005          94 :         remote_slot->two_phase_at = isnull ? InvalidXLogRecPtr : DatumGetLSN(d);
    1006             : 
    1007          94 :         remote_slot->failover = DatumGetBool(slot_getattr(tupslot, ++col,
    1008             :                                                           &isnull));
    1009             :         Assert(!isnull);
    1010             : 
    1011          94 :         remote_slot->database = TextDatumGetCString(slot_getattr(tupslot,
    1012             :                                                                  ++col, &isnull));
    1013             :         Assert(!isnull);
    1014             : 
    1015          94 :         d = slot_getattr(tupslot, ++col, &isnull);
    1016          94 :         remote_slot->invalidated = isnull ? RS_INVAL_NONE :
    1017           0 :             GetSlotInvalidationCause(TextDatumGetCString(d));
    1018             : 
    1019             :         /* Sanity check */
    1020             :         Assert(col == SLOTSYNC_COLUMN_COUNT);
    1021             : 
    1022             :         /*
    1023             :          * If restart_lsn, confirmed_lsn or catalog_xmin is invalid but the
    1024             :          * slot is valid, that means we have fetched the remote_slot in its
    1025             :          * RS_EPHEMERAL state. In such a case, don't sync it; we can always
    1026             :          * sync it in the next sync cycle when the remote_slot is persisted
    1027             :          * and has valid lsn(s) and xmin values.
    1028             :          *
    1029             :          * XXX: In future, if we plan to expose 'slot->data.persistency' in
    1030             :          * pg_replication_slots view, then we can avoid fetching RS_EPHEMERAL
    1031             :          * slots in the first place.
    1032             :          */
    1033          94 :         if ((!XLogRecPtrIsValid(remote_slot->restart_lsn) ||
    1034          94 :              !XLogRecPtrIsValid(remote_slot->confirmed_lsn) ||
    1035          94 :              !TransactionIdIsValid(remote_slot->catalog_xmin)) &&
    1036           0 :             remote_slot->invalidated == RS_INVAL_NONE)
    1037           0 :             pfree(remote_slot);
    1038             :         else
    1039             :             /* Create list of remote slots */
    1040          94 :             remote_slot_list = lappend(remote_slot_list, remote_slot);
    1041             : 
    1042          94 :         ExecClearTuple(tupslot);
    1043             :     }
    1044             : 
    1045          54 :     walrcv_clear_result(res);
    1046             : 
    1047          54 :     return remote_slot_list;
    1048             : }
    1049             : 
    1050             : /*
    1051             :  * Synchronize slots.
    1052             :  *
    1053             :  * This function takes a list of remote slots and synchronizes them locally. It
    1054             :  * creates the slots if not present on the standby and updates existing ones.
    1055             :  *
    1056             :  * If slot_persistence_pending is not NULL, it will be set to true if one or
    1057             :  * more slots could not be persisted. This allows callers such as
    1058             :  * SyncReplicationSlots() to retry those slots.
    1059             :  *
    1060             :  * Returns TRUE if any of the slots gets updated in this sync-cycle.
    1061             :  */
    1062             : static bool
    1063          54 : synchronize_slots(WalReceiverConn *wrconn, List *remote_slot_list,
    1064             :                   bool *slot_persistence_pending)
    1065             : {
    1066          54 :     bool        some_slot_updated = false;
    1067             : 
    1068             :     /* Drop local slots that no longer need to be synced. */
    1069          54 :     drop_local_obsolete_slots(remote_slot_list);
    1070             : 
    1071             :     /* Now sync the slots locally */
    1072         202 :     foreach_ptr(RemoteSlot, remote_slot, remote_slot_list)
    1073             :     {
    1074          94 :         Oid         remote_dbid = get_database_oid(remote_slot->database, false);
    1075             : 
    1076             :         /*
    1077             :          * Use shared lock to prevent a conflict with
    1078             :          * ReplicationSlotsDropDBSlots(), trying to drop the same slot during
    1079             :          * a drop-database operation.
    1080             :          */
    1081          94 :         LockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock);
    1082             : 
    1083          94 :         some_slot_updated |= synchronize_one_slot(remote_slot, remote_dbid,
    1084             :                                                   slot_persistence_pending);
    1085             : 
    1086          94 :         UnlockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock);
    1087             :     }
    1088             : 
    1089          54 :     return some_slot_updated;
    1090             : }
    1091             : 
    1092             : /*
    1093             :  * Checks the remote server info.
    1094             :  *
    1095             :  * We ensure that the 'primary_slot_name' exists on the remote server and the
    1096             :  * remote server is not a standby node.
    1097             :  */
    1098             : static void
    1099          28 : validate_remote_info(WalReceiverConn *wrconn)
    1100             : {
    1101             : #define PRIMARY_INFO_OUTPUT_COL_COUNT 2
    1102             :     WalRcvExecResult *res;
    1103          28 :     Oid         slotRow[PRIMARY_INFO_OUTPUT_COL_COUNT] = {BOOLOID, BOOLOID};
    1104             :     StringInfoData cmd;
    1105             :     bool        isnull;
    1106             :     TupleTableSlot *tupslot;
    1107             :     bool        remote_in_recovery;
    1108             :     bool        primary_slot_valid;
    1109          28 :     bool        started_tx = false;
    1110             : 
    1111          28 :     initStringInfo(&cmd);
    1112          28 :     appendStringInfo(&cmd,
    1113             :                      "SELECT pg_is_in_recovery(), count(*) = 1"
    1114             :                      " FROM pg_catalog.pg_replication_slots"
    1115             :                      " WHERE slot_type='physical' AND slot_name=%s",
    1116             :                      quote_literal_cstr(PrimarySlotName));
    1117             : 
    1118             :     /* The syscache access in walrcv_exec() needs a transaction env. */
    1119          28 :     if (!IsTransactionState())
    1120             :     {
    1121          10 :         StartTransactionCommand();
    1122          10 :         started_tx = true;
    1123             :     }
    1124             : 
    1125          28 :     res = walrcv_exec(wrconn, cmd.data, PRIMARY_INFO_OUTPUT_COL_COUNT, slotRow);
    1126          28 :     pfree(cmd.data);
    1127             : 
    1128          28 :     if (res->status != WALRCV_OK_TUPLES)
    1129           0 :         ereport(ERROR,
    1130             :                 errmsg("could not fetch primary slot name \"%s\" info from the primary server: %s",
    1131             :                        PrimarySlotName, res->err),
    1132             :                 errhint("Check if \"primary_slot_name\" is configured correctly."));
    1133             : 
    1134          28 :     tupslot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
    1135          28 :     if (!tuplestore_gettupleslot(res->tuplestore, true, false, tupslot))
    1136           0 :         elog(ERROR,
    1137             :              "failed to fetch tuple for the primary server slot specified by \"primary_slot_name\"");
    1138             : 
    1139          28 :     remote_in_recovery = DatumGetBool(slot_getattr(tupslot, 1, &isnull));
    1140             :     Assert(!isnull);
    1141             : 
    1142             :     /*
    1143             :      * Slot sync is currently not supported on a cascading standby. This is
    1144             :      * because if we allow it, the primary server needs to wait for all the
    1145             :      * cascading standbys, otherwise, logical subscribers can still be ahead
    1146             :      * of one of the cascading standbys which we plan to promote. Thus, to
    1147             :      * avoid this additional complexity, we restrict it for the time being.
    1148             :      */
    1149          28 :     if (remote_in_recovery)
    1150           2 :         ereport(ERROR,
    1151             :                 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1152             :                 errmsg("cannot synchronize replication slots from a standby server"));
    1153             : 
    1154          26 :     primary_slot_valid = DatumGetBool(slot_getattr(tupslot, 2, &isnull));
    1155             :     Assert(!isnull);
    1156             : 
    1157          26 :     if (!primary_slot_valid)
    1158           0 :         ereport(ERROR,
    1159             :                 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1160             :         /* translator: second %s is a GUC variable name */
    1161             :                 errmsg("replication slot \"%s\" specified by \"%s\" does not exist on primary server",
    1162             :                        PrimarySlotName, "primary_slot_name"));
    1163             : 
    1164          26 :     ExecClearTuple(tupslot);
    1165          26 :     walrcv_clear_result(res);
    1166             : 
    1167          26 :     if (started_tx)
    1168          10 :         CommitTransactionCommand();
    1169          26 : }
    1170             : 
    1171             : /*
    1172             :  * Checks if dbname is specified in 'primary_conninfo'.
    1173             :  *
    1174             :  * Error out if not specified otherwise return it.
    1175             :  */
    1176             : char *
    1177          30 : CheckAndGetDbnameFromConninfo(void)
    1178             : {
    1179             :     char       *dbname;
    1180             : 
    1181             :     /*
    1182             :      * The slot synchronization needs a database connection for walrcv_exec to
    1183             :      * work.
    1184             :      */
    1185          30 :     dbname = walrcv_get_dbname_from_conninfo(PrimaryConnInfo);
    1186          30 :     if (dbname == NULL)
    1187           2 :         ereport(ERROR,
    1188             :                 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1189             : 
    1190             :         /*
    1191             :          * translator: first %s is a connection option; second %s is a GUC
    1192             :          * variable name
    1193             :          */
    1194             :                 errmsg("replication slot synchronization requires \"%s\" to be specified in \"%s\"",
    1195             :                        "dbname", "primary_conninfo"));
    1196          28 :     return dbname;
    1197             : }
    1198             : 
    1199             : /*
    1200             :  * Return true if all necessary GUCs for slot synchronization are set
    1201             :  * appropriately, otherwise, return false.
    1202             :  */
    1203             : bool
    1204          38 : ValidateSlotSyncParams(int elevel)
    1205             : {
    1206             :     /*
    1207             :      * Logical slot sync/creation requires logical decoding to be enabled.
    1208             :      */
    1209          38 :     if (!IsLogicalDecodingEnabled())
    1210             :     {
    1211           0 :         ereport(elevel,
    1212             :                 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1213             :                 errmsg("replication slot synchronization requires \"effective_wal_level\" >= \"logical\" on the primary"),
    1214             :                 errhint("To enable logical decoding on primary, set \"wal_level\" >= \"logical\" or create at least one logical slot when \"wal_level\" = \"replica\"."));
    1215             : 
    1216           0 :         return false;
    1217             :     }
    1218             : 
    1219             :     /*
    1220             :      * A physical replication slot(primary_slot_name) is required on the
    1221             :      * primary to ensure that the rows needed by the standby are not removed
    1222             :      * after restarting, so that the synchronized slot on the standby will not
    1223             :      * be invalidated.
    1224             :      */
    1225          38 :     if (PrimarySlotName == NULL || *PrimarySlotName == '\0')
    1226             :     {
    1227           0 :         ereport(elevel,
    1228             :                 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1229             :         /* translator: %s is a GUC variable name */
    1230             :                 errmsg("replication slot synchronization requires \"%s\" to be set", "primary_slot_name"));
    1231           0 :         return false;
    1232             :     }
    1233             : 
    1234             :     /*
    1235             :      * hot_standby_feedback must be enabled to cooperate with the physical
    1236             :      * replication slot, which allows informing the primary about the xmin and
    1237             :      * catalog_xmin values on the standby.
    1238             :      */
    1239          38 :     if (!hot_standby_feedback)
    1240             :     {
    1241           2 :         ereport(elevel,
    1242             :                 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1243             :         /* translator: %s is a GUC variable name */
    1244             :                 errmsg("replication slot synchronization requires \"%s\" to be enabled",
    1245             :                        "hot_standby_feedback"));
    1246           2 :         return false;
    1247             :     }
    1248             : 
    1249             :     /*
    1250             :      * The primary_conninfo is required to make connection to primary for
    1251             :      * getting slots information.
    1252             :      */
    1253          36 :     if (PrimaryConnInfo == NULL || *PrimaryConnInfo == '\0')
    1254             :     {
    1255           0 :         ereport(elevel,
    1256             :                 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1257             :         /* translator: %s is a GUC variable name */
    1258             :                 errmsg("replication slot synchronization requires \"%s\" to be set",
    1259             :                        "primary_conninfo"));
    1260           0 :         return false;
    1261             :     }
    1262             : 
    1263          36 :     return true;
    1264             : }
    1265             : 
    1266             : /*
    1267             :  * Re-read the config file for slot synchronization.
    1268             :  *
    1269             :  * Exit or throw error if relevant GUCs have changed depending on whether
    1270             :  * called from slot sync worker or from the SQL function pg_sync_replication_slots()
    1271             :  */
    1272             : static void
    1273           2 : slotsync_reread_config(void)
    1274             : {
    1275           2 :     char       *old_primary_conninfo = pstrdup(PrimaryConnInfo);
    1276           2 :     char       *old_primary_slotname = pstrdup(PrimarySlotName);
    1277           2 :     bool        old_sync_replication_slots = sync_replication_slots;
    1278           2 :     bool        old_hot_standby_feedback = hot_standby_feedback;
    1279             :     bool        conninfo_changed;
    1280             :     bool        primary_slotname_changed;
    1281           2 :     bool        is_slotsync_worker = AmLogicalSlotSyncWorkerProcess();
    1282           2 :     bool        parameter_changed = false;
    1283             : 
    1284             :     if (is_slotsync_worker)
    1285             :         Assert(sync_replication_slots);
    1286             : 
    1287           2 :     ConfigReloadPending = false;
    1288           2 :     ProcessConfigFile(PGC_SIGHUP);
    1289             : 
    1290           2 :     conninfo_changed = strcmp(old_primary_conninfo, PrimaryConnInfo) != 0;
    1291           2 :     primary_slotname_changed = strcmp(old_primary_slotname, PrimarySlotName) != 0;
    1292           2 :     pfree(old_primary_conninfo);
    1293           2 :     pfree(old_primary_slotname);
    1294             : 
    1295           2 :     if (old_sync_replication_slots != sync_replication_slots)
    1296             :     {
    1297           0 :         if (is_slotsync_worker)
    1298             :         {
    1299           0 :             ereport(LOG,
    1300             :             /* translator: %s is a GUC variable name */
    1301             :                     errmsg("replication slot synchronization worker will stop because \"%s\" is disabled",
    1302             :                            "sync_replication_slots"));
    1303             : 
    1304           0 :             proc_exit(0);
    1305             :         }
    1306             : 
    1307           0 :         parameter_changed = true;
    1308             :     }
    1309             :     else
    1310             :     {
    1311           2 :         if (conninfo_changed ||
    1312           2 :             primary_slotname_changed ||
    1313           2 :             (old_hot_standby_feedback != hot_standby_feedback))
    1314             :         {
    1315             : 
    1316           2 :             if (is_slotsync_worker)
    1317             :             {
    1318           2 :                 ereport(LOG,
    1319             :                         errmsg("replication slot synchronization worker will restart because of a parameter change"));
    1320             : 
    1321             :                 /*
    1322             :                  * Reset the last-start time for this worker so that the
    1323             :                  * postmaster can restart it without waiting for
    1324             :                  * SLOTSYNC_RESTART_INTERVAL_SEC.
    1325             :                  */
    1326           2 :                 SlotSyncCtx->last_start_time = 0;
    1327             : 
    1328           2 :                 proc_exit(0);
    1329             :             }
    1330             : 
    1331           0 :             parameter_changed = true;
    1332             :         }
    1333             :     }
    1334             : 
    1335             :     /*
    1336             :      * If we have reached here with a parameter change, we must be running in
    1337             :      * SQL function, emit error in such a case.
    1338             :      */
    1339           0 :     if (parameter_changed)
    1340             :     {
    1341             :         Assert(!is_slotsync_worker);
    1342           0 :         ereport(ERROR,
    1343             :                 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1344             :                 errmsg("replication slot synchronization will stop because of a parameter change"));
    1345             :     }
    1346             : 
    1347           0 : }
    1348             : 
    1349             : /*
    1350             :  * Interrupt handler for process performing slot synchronization.
    1351             :  */
    1352             : static void
    1353          82 : ProcessSlotSyncInterrupts(void)
    1354             : {
    1355          82 :     CHECK_FOR_INTERRUPTS();
    1356             : 
    1357          78 :     if (SlotSyncCtx->stopSignaled)
    1358             :     {
    1359           2 :         if (AmLogicalSlotSyncWorkerProcess())
    1360             :         {
    1361           2 :             ereport(LOG,
    1362             :                     errmsg("replication slot synchronization worker will stop because promotion is triggered"));
    1363             : 
    1364           2 :             proc_exit(0);
    1365             :         }
    1366             :         else
    1367             :         {
    1368             :             /*
    1369             :              * For the backend executing SQL function
    1370             :              * pg_sync_replication_slots().
    1371             :              */
    1372           0 :             ereport(ERROR,
    1373             :                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1374             :                     errmsg("replication slot synchronization will stop because promotion is triggered"));
    1375             :         }
    1376             :     }
    1377             : 
    1378          76 :     if (ConfigReloadPending)
    1379           2 :         slotsync_reread_config();
    1380          74 : }
    1381             : 
    1382             : /*
    1383             :  * Connection cleanup function for slotsync worker.
    1384             :  *
    1385             :  * Called on slotsync worker exit.
    1386             :  */
    1387             : static void
    1388          10 : slotsync_worker_disconnect(int code, Datum arg)
    1389             : {
    1390          10 :     WalReceiverConn *wrconn = (WalReceiverConn *) DatumGetPointer(arg);
    1391             : 
    1392          10 :     walrcv_disconnect(wrconn);
    1393          10 : }
    1394             : 
    1395             : /*
    1396             :  * Cleanup function for slotsync worker.
    1397             :  *
    1398             :  * Called on slotsync worker exit.
    1399             :  */
    1400             : static void
    1401          10 : slotsync_worker_onexit(int code, Datum arg)
    1402             : {
    1403             :     /*
    1404             :      * We need to do slots cleanup here just like WalSndErrorCleanup() does.
    1405             :      *
    1406             :      * The startup process during promotion invokes ShutDownSlotSync() which
    1407             :      * waits for slot sync to finish and it does that by checking the
    1408             :      * 'syncing' flag. Thus the slot sync worker must be done with slots'
    1409             :      * release and cleanup to avoid any dangling temporary slots or active
    1410             :      * slots before it marks itself as finished syncing.
    1411             :      */
    1412             : 
    1413             :     /* Make sure active replication slots are released */
    1414          10 :     if (MyReplicationSlot != NULL)
    1415           0 :         ReplicationSlotRelease();
    1416             : 
    1417             :     /* Also cleanup the temporary slots. */
    1418          10 :     ReplicationSlotCleanup(false);
    1419             : 
    1420          10 :     SpinLockAcquire(&SlotSyncCtx->mutex);
    1421             : 
    1422          10 :     SlotSyncCtx->pid = InvalidPid;
    1423             : 
    1424             :     /*
    1425             :      * If syncing_slots is true, it indicates that the process errored out
    1426             :      * without resetting the flag. So, we need to clean up shared memory and
    1427             :      * reset the flag here.
    1428             :      */
    1429          10 :     if (syncing_slots)
    1430             :     {
    1431          10 :         SlotSyncCtx->syncing = false;
    1432          10 :         syncing_slots = false;
    1433             :     }
    1434             : 
    1435          10 :     SpinLockRelease(&SlotSyncCtx->mutex);
    1436          10 : }
    1437             : 
    1438             : /*
    1439             :  * Sleep for long enough that we believe it's likely that the slots on primary
    1440             :  * get updated.
    1441             :  *
    1442             :  * If there is no slot activity the wait time between sync-cycles will double
    1443             :  * (to a maximum of 30s). If there is some slot activity the wait time between
    1444             :  * sync-cycles is reset to the minimum (200ms).
    1445             :  */
    1446             : static void
    1447          38 : wait_for_slot_activity(bool some_slot_updated)
    1448             : {
    1449             :     int         rc;
    1450             : 
    1451          38 :     if (!some_slot_updated)
    1452             :     {
    1453             :         /*
    1454             :          * No slots were updated, so double the sleep time, but not beyond the
    1455             :          * maximum allowable value.
    1456             :          */
    1457          18 :         sleep_ms = Min(sleep_ms * 2, MAX_SLOTSYNC_WORKER_NAPTIME_MS);
    1458             :     }
    1459             :     else
    1460             :     {
    1461             :         /*
    1462             :          * Some slots were updated since the last sleep, so reset the sleep
    1463             :          * time.
    1464             :          */
    1465          20 :         sleep_ms = MIN_SLOTSYNC_WORKER_NAPTIME_MS;
    1466             :     }
    1467             : 
    1468          38 :     rc = WaitLatch(MyLatch,
    1469             :                    WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
    1470             :                    sleep_ms,
    1471             :                    WAIT_EVENT_REPLICATION_SLOTSYNC_MAIN);
    1472             : 
    1473          38 :     if (rc & WL_LATCH_SET)
    1474           8 :         ResetLatch(MyLatch);
    1475          38 : }
    1476             : 
    1477             : /*
    1478             :  * Emit an error if a concurrent sync call is in progress.
    1479             :  * Otherwise, advertise that a sync is in progress.
    1480             :  */
    1481             : static void
    1482          28 : check_and_set_sync_info(pid_t sync_process_pid)
    1483             : {
    1484          28 :     SpinLockAcquire(&SlotSyncCtx->mutex);
    1485             : 
    1486          28 :     if (SlotSyncCtx->syncing)
    1487             :     {
    1488           0 :         SpinLockRelease(&SlotSyncCtx->mutex);
    1489           0 :         ereport(ERROR,
    1490             :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1491             :                 errmsg("cannot synchronize replication slots concurrently"));
    1492             :     }
    1493             : 
    1494             :     /* The pid must not be already assigned in SlotSyncCtx */
    1495             :     Assert(SlotSyncCtx->pid == InvalidPid);
    1496             : 
    1497          28 :     SlotSyncCtx->syncing = true;
    1498             : 
    1499             :     /*
    1500             :      * Advertise the required PID so that the startup process can kill the
    1501             :      * slot sync process on promotion.
    1502             :      */
    1503          28 :     SlotSyncCtx->pid = sync_process_pid;
    1504             : 
    1505          28 :     SpinLockRelease(&SlotSyncCtx->mutex);
    1506             : 
    1507          28 :     syncing_slots = true;
    1508          28 : }
    1509             : 
    1510             : /*
    1511             :  * Reset syncing flag.
    1512             :  */
    1513             : static void
    1514          18 : reset_syncing_flag(void)
    1515             : {
    1516          18 :     SpinLockAcquire(&SlotSyncCtx->mutex);
    1517          18 :     SlotSyncCtx->syncing = false;
    1518          18 :     SlotSyncCtx->pid = InvalidPid;
    1519          18 :     SpinLockRelease(&SlotSyncCtx->mutex);
    1520             : 
    1521          18 :     syncing_slots = false;
    1522          18 : }
    1523             : 
    1524             : /*
    1525             :  * The main loop of our worker process.
    1526             :  *
    1527             :  * It connects to the primary server, fetches logical failover slots
    1528             :  * information periodically in order to create and sync the slots.
    1529             :  *
    1530             :  * Note: If any changes are made here, check if the corresponding SQL
    1531             :  * function logic in SyncReplicationSlots() also needs to be changed.
    1532             :  */
    1533             : void
    1534          10 : ReplSlotSyncWorkerMain(const void *startup_data, size_t startup_data_len)
    1535             : {
    1536          10 :     WalReceiverConn *wrconn = NULL;
    1537             :     char       *dbname;
    1538             :     char       *err;
    1539             :     sigjmp_buf  local_sigjmp_buf;
    1540             :     StringInfoData app_name;
    1541             : 
    1542             :     Assert(startup_data_len == 0);
    1543             : 
    1544          10 :     MyBackendType = B_SLOTSYNC_WORKER;
    1545             : 
    1546          10 :     init_ps_display(NULL);
    1547             : 
    1548             :     Assert(GetProcessingMode() == InitProcessing);
    1549             : 
    1550             :     /*
    1551             :      * Create a per-backend PGPROC struct in shared memory.  We must do this
    1552             :      * before we access any shared memory.
    1553             :      */
    1554          10 :     InitProcess();
    1555             : 
    1556             :     /*
    1557             :      * Early initialization.
    1558             :      */
    1559          10 :     BaseInit();
    1560             : 
    1561             :     Assert(SlotSyncCtx != NULL);
    1562             : 
    1563             :     /*
    1564             :      * If an exception is encountered, processing resumes here.
    1565             :      *
    1566             :      * We just need to clean up, report the error, and go away.
    1567             :      *
    1568             :      * If we do not have this handling here, then since this worker process
    1569             :      * operates at the bottom of the exception stack, ERRORs turn into FATALs.
    1570             :      * Therefore, we create our own exception handler to catch ERRORs.
    1571             :      */
    1572          10 :     if (sigsetjmp(local_sigjmp_buf, 1) != 0)
    1573             :     {
    1574             :         /* since not using PG_TRY, must reset error stack by hand */
    1575           2 :         error_context_stack = NULL;
    1576             : 
    1577             :         /* Prevents interrupts while cleaning up */
    1578           2 :         HOLD_INTERRUPTS();
    1579             : 
    1580             :         /* Report the error to the server log */
    1581           2 :         EmitErrorReport();
    1582             : 
    1583             :         /*
    1584             :          * We can now go away.  Note that because we called InitProcess, a
    1585             :          * callback was registered to do ProcKill, which will clean up
    1586             :          * necessary state.
    1587             :          */
    1588           2 :         proc_exit(0);
    1589             :     }
    1590             : 
    1591             :     /* We can now handle ereport(ERROR) */
    1592          10 :     PG_exception_stack = &local_sigjmp_buf;
    1593             : 
    1594             :     /* Setup signal handling */
    1595          10 :     pqsignal(SIGHUP, SignalHandlerForConfigReload);
    1596          10 :     pqsignal(SIGINT, StatementCancelHandler);
    1597          10 :     pqsignal(SIGTERM, die);
    1598          10 :     pqsignal(SIGFPE, FloatExceptionHandler);
    1599          10 :     pqsignal(SIGUSR1, procsignal_sigusr1_handler);
    1600          10 :     pqsignal(SIGUSR2, SIG_IGN);
    1601          10 :     pqsignal(SIGPIPE, SIG_IGN);
    1602          10 :     pqsignal(SIGCHLD, SIG_DFL);
    1603             : 
    1604          10 :     check_and_set_sync_info(MyProcPid);
    1605             : 
    1606          10 :     ereport(LOG, errmsg("slot sync worker started"));
    1607             : 
    1608             :     /* Register it as soon as SlotSyncCtx->pid is initialized. */
    1609          10 :     before_shmem_exit(slotsync_worker_onexit, (Datum) 0);
    1610             : 
    1611             :     /*
    1612             :      * Establishes SIGALRM handler and initialize timeout module. It is needed
    1613             :      * by InitPostgres to register different timeouts.
    1614             :      */
    1615          10 :     InitializeTimeouts();
    1616             : 
    1617             :     /* Load the libpq-specific functions */
    1618          10 :     load_file("libpqwalreceiver", false);
    1619             : 
    1620             :     /*
    1621             :      * Unblock signals (they were blocked when the postmaster forked us)
    1622             :      */
    1623          10 :     sigprocmask(SIG_SETMASK, &UnBlockSig, NULL);
    1624             : 
    1625             :     /*
    1626             :      * Set always-secure search path, so malicious users can't redirect user
    1627             :      * code (e.g. operators).
    1628             :      *
    1629             :      * It's not strictly necessary since we won't be scanning or writing to
    1630             :      * any user table locally, but it's good to retain it here for added
    1631             :      * precaution.
    1632             :      */
    1633          10 :     SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
    1634             : 
    1635          10 :     dbname = CheckAndGetDbnameFromConninfo();
    1636             : 
    1637             :     /*
    1638             :      * Connect to the database specified by the user in primary_conninfo. We
    1639             :      * need a database connection for walrcv_exec to work which we use to
    1640             :      * fetch slot information from the remote node. See comments atop
    1641             :      * libpqrcv_exec.
    1642             :      *
    1643             :      * We do not specify a specific user here since the slot sync worker will
    1644             :      * operate as a superuser. This is safe because the slot sync worker does
    1645             :      * not interact with user tables, eliminating the risk of executing
    1646             :      * arbitrary code within triggers.
    1647             :      */
    1648          10 :     InitPostgres(dbname, InvalidOid, NULL, InvalidOid, 0, NULL);
    1649             : 
    1650          10 :     SetProcessingMode(NormalProcessing);
    1651             : 
    1652          10 :     initStringInfo(&app_name);
    1653          10 :     if (cluster_name[0])
    1654          10 :         appendStringInfo(&app_name, "%s_%s", cluster_name, "slotsync worker");
    1655             :     else
    1656           0 :         appendStringInfoString(&app_name, "slotsync worker");
    1657             : 
    1658             :     /*
    1659             :      * Establish the connection to the primary server for slot
    1660             :      * synchronization.
    1661             :      */
    1662          10 :     wrconn = walrcv_connect(PrimaryConnInfo, false, false, false,
    1663             :                             app_name.data, &err);
    1664             : 
    1665          10 :     if (!wrconn)
    1666           0 :         ereport(ERROR,
    1667             :                 errcode(ERRCODE_CONNECTION_FAILURE),
    1668             :                 errmsg("synchronization worker \"%s\" could not connect to the primary server: %s",
    1669             :                        app_name.data, err));
    1670             : 
    1671          10 :     pfree(app_name.data);
    1672             : 
    1673             :     /*
    1674             :      * Register the disconnection callback.
    1675             :      *
    1676             :      * XXX: This can be combined with previous cleanup registration of
    1677             :      * slotsync_worker_onexit() but that will need the connection to be made
    1678             :      * global and we want to avoid introducing global for this purpose.
    1679             :      */
    1680          10 :     before_shmem_exit(slotsync_worker_disconnect, PointerGetDatum(wrconn));
    1681             : 
    1682             :     /*
    1683             :      * Using the specified primary server connection, check that we are not a
    1684             :      * cascading standby and slot configured in 'primary_slot_name' exists on
    1685             :      * the primary server.
    1686             :      */
    1687          10 :     validate_remote_info(wrconn);
    1688             : 
    1689             :     /* Main loop to synchronize slots */
    1690             :     for (;;)
    1691          34 :     {
    1692          44 :         bool        some_slot_updated = false;
    1693          44 :         bool        started_tx = false;
    1694             :         List       *remote_slots;
    1695             : 
    1696          44 :         ProcessSlotSyncInterrupts();
    1697             : 
    1698             :         /*
    1699             :          * The syscache access in fetch_remote_slots() needs a transaction
    1700             :          * env.
    1701             :          */
    1702          36 :         if (!IsTransactionState())
    1703             :         {
    1704          36 :             StartTransactionCommand();
    1705          36 :             started_tx = true;
    1706             :         }
    1707             : 
    1708          36 :         remote_slots = fetch_remote_slots(wrconn, NIL);
    1709          34 :         some_slot_updated = synchronize_slots(wrconn, remote_slots, NULL);
    1710          34 :         list_free_deep(remote_slots);
    1711             : 
    1712          34 :         if (started_tx)
    1713          34 :             CommitTransactionCommand();
    1714             : 
    1715          34 :         wait_for_slot_activity(some_slot_updated);
    1716             :     }
    1717             : 
    1718             :     /*
    1719             :      * The slot sync worker can't get here because it will only stop when it
    1720             :      * receives a stop request from the startup process, or when there is an
    1721             :      * error.
    1722             :      */
    1723             :     Assert(false);
    1724             : }
    1725             : 
    1726             : /*
    1727             :  * Update the inactive_since property for synced slots.
    1728             :  *
    1729             :  * Note that this function is currently called when we shutdown the slot
    1730             :  * sync machinery.
    1731             :  */
    1732             : static void
    1733        1862 : update_synced_slots_inactive_since(void)
    1734             : {
    1735        1862 :     TimestampTz now = 0;
    1736             : 
    1737             :     /*
    1738             :      * We need to update inactive_since only when we are promoting standby to
    1739             :      * correctly interpret the inactive_since if the standby gets promoted
    1740             :      * without a restart. We don't want the slots to appear inactive for a
    1741             :      * long time after promotion if they haven't been synchronized recently.
    1742             :      * Whoever acquires the slot, i.e., makes the slot active, will reset it.
    1743             :      */
    1744        1862 :     if (!StandbyMode)
    1745        1758 :         return;
    1746             : 
    1747             :     /* The slot sync worker or the SQL function mustn't be running by now */
    1748             :     Assert((SlotSyncCtx->pid == InvalidPid) && !SlotSyncCtx->syncing);
    1749             : 
    1750         104 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    1751             : 
    1752        1120 :     for (int i = 0; i < max_replication_slots; i++)
    1753             :     {
    1754        1016 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
    1755             : 
    1756             :         /* Check if it is a synchronized slot */
    1757        1016 :         if (s->in_use && s->data.synced)
    1758             :         {
    1759             :             Assert(SlotIsLogical(s));
    1760             : 
    1761             :             /* The slot must not be acquired by any process */
    1762             :             Assert(s->active_pid == 0);
    1763             : 
    1764             :             /* Use the same inactive_since time for all the slots. */
    1765           6 :             if (now == 0)
    1766           4 :                 now = GetCurrentTimestamp();
    1767             : 
    1768           6 :             ReplicationSlotSetInactiveSince(s, now, true);
    1769             :         }
    1770             :     }
    1771             : 
    1772         104 :     LWLockRelease(ReplicationSlotControlLock);
    1773             : }
    1774             : 
    1775             : /*
    1776             :  * Shut down slot synchronization.
    1777             :  *
    1778             :  * This function sets stopSignaled=true and wakes up the slot sync process
    1779             :  * (either worker or backend running the SQL function pg_sync_replication_slots())
    1780             :  * so that worker can exit or the SQL function pg_sync_replication_slots() can
    1781             :  * finish. It also waits till the slot sync worker has exited or
    1782             :  * pg_sync_replication_slots() has finished.
    1783             :  */
    1784             : void
    1785        1862 : ShutDownSlotSync(void)
    1786             : {
    1787             :     pid_t       sync_process_pid;
    1788             : 
    1789        1862 :     SpinLockAcquire(&SlotSyncCtx->mutex);
    1790             : 
    1791        1862 :     SlotSyncCtx->stopSignaled = true;
    1792             : 
    1793             :     /*
    1794             :      * Return if neither the slot sync worker is running nor the function
    1795             :      * pg_sync_replication_slots() is executing.
    1796             :      */
    1797        1862 :     if (!SlotSyncCtx->syncing)
    1798             :     {
    1799        1860 :         SpinLockRelease(&SlotSyncCtx->mutex);
    1800        1860 :         update_synced_slots_inactive_since();
    1801        1860 :         return;
    1802             :     }
    1803             : 
    1804           2 :     sync_process_pid = SlotSyncCtx->pid;
    1805             : 
    1806           2 :     SpinLockRelease(&SlotSyncCtx->mutex);
    1807             : 
    1808             :     /*
    1809             :      * Signal process doing slotsync, if any. The process will stop upon
    1810             :      * detecting that the stopSignaled flag is set to true.
    1811             :      */
    1812           2 :     if (sync_process_pid != InvalidPid)
    1813           2 :         kill(sync_process_pid, SIGUSR1);
    1814             : 
    1815             :     /* Wait for slot sync to end */
    1816             :     for (;;)
    1817           0 :     {
    1818             :         int         rc;
    1819             : 
    1820             :         /* Wait a bit, we don't expect to have to wait long */
    1821           2 :         rc = WaitLatch(MyLatch,
    1822             :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
    1823             :                        10L, WAIT_EVENT_REPLICATION_SLOTSYNC_SHUTDOWN);
    1824             : 
    1825           2 :         if (rc & WL_LATCH_SET)
    1826             :         {
    1827           0 :             ResetLatch(MyLatch);
    1828           0 :             CHECK_FOR_INTERRUPTS();
    1829             :         }
    1830             : 
    1831           2 :         SpinLockAcquire(&SlotSyncCtx->mutex);
    1832             : 
    1833             :         /* Ensure that no process is syncing the slots. */
    1834           2 :         if (!SlotSyncCtx->syncing)
    1835           2 :             break;
    1836             : 
    1837           0 :         SpinLockRelease(&SlotSyncCtx->mutex);
    1838             :     }
    1839             : 
    1840           2 :     SpinLockRelease(&SlotSyncCtx->mutex);
    1841             : 
    1842           2 :     update_synced_slots_inactive_since();
    1843             : }
    1844             : 
    1845             : /*
    1846             :  * SlotSyncWorkerCanRestart
    1847             :  *
    1848             :  * Return true, indicating worker is allowed to restart, if enough time has
    1849             :  * passed since it was last launched to reach SLOTSYNC_RESTART_INTERVAL_SEC.
    1850             :  * Otherwise return false.
    1851             :  *
    1852             :  * This is a safety valve to protect against continuous respawn attempts if the
    1853             :  * worker is dying immediately at launch. Note that since we will retry to
    1854             :  * launch the worker from the postmaster main loop, we will get another
    1855             :  * chance later.
    1856             :  */
    1857             : bool
    1858          16 : SlotSyncWorkerCanRestart(void)
    1859             : {
    1860          16 :     time_t      curtime = time(NULL);
    1861             : 
    1862             :     /*
    1863             :      * If first time through, or time somehow went backwards, always update
    1864             :      * last_start_time to match the current clock and allow worker start.
    1865             :      * Otherwise allow it only once enough time has elapsed.
    1866             :      */
    1867          16 :     if (SlotSyncCtx->last_start_time == 0 ||
    1868           6 :         curtime < SlotSyncCtx->last_start_time ||
    1869           6 :         curtime - SlotSyncCtx->last_start_time >= SLOTSYNC_RESTART_INTERVAL_SEC)
    1870             :     {
    1871          10 :         SlotSyncCtx->last_start_time = curtime;
    1872          10 :         return true;
    1873             :     }
    1874           6 :     return false;
    1875             : }
    1876             : 
    1877             : /*
    1878             :  * Is current process syncing replication slots?
    1879             :  *
    1880             :  * Could be either backend executing SQL function or slot sync worker.
    1881             :  */
    1882             : bool
    1883          52 : IsSyncingReplicationSlots(void)
    1884             : {
    1885          52 :     return syncing_slots;
    1886             : }
    1887             : 
    1888             : /*
    1889             :  * Amount of shared memory required for slot synchronization.
    1890             :  */
    1891             : Size
    1892        6534 : SlotSyncShmemSize(void)
    1893             : {
    1894        6534 :     return sizeof(SlotSyncCtxStruct);
    1895             : }
    1896             : 
    1897             : /*
    1898             :  * Allocate and initialize the shared memory of slot synchronization.
    1899             :  */
    1900             : void
    1901        2280 : SlotSyncShmemInit(void)
    1902             : {
    1903        2280 :     Size        size = SlotSyncShmemSize();
    1904             :     bool        found;
    1905             : 
    1906        2280 :     SlotSyncCtx = (SlotSyncCtxStruct *)
    1907        2280 :         ShmemInitStruct("Slot Sync Data", size, &found);
    1908             : 
    1909        2280 :     if (!found)
    1910             :     {
    1911        2280 :         memset(SlotSyncCtx, 0, size);
    1912        2280 :         SlotSyncCtx->pid = InvalidPid;
    1913        2280 :         SpinLockInit(&SlotSyncCtx->mutex);
    1914             :     }
    1915        2280 : }
    1916             : 
    1917             : /*
    1918             :  * Error cleanup callback for slot sync SQL function.
    1919             :  */
    1920             : static void
    1921           2 : slotsync_failure_callback(int code, Datum arg)
    1922             : {
    1923           2 :     WalReceiverConn *wrconn = (WalReceiverConn *) DatumGetPointer(arg);
    1924             : 
    1925             :     /*
    1926             :      * We need to do slots cleanup here just like WalSndErrorCleanup() does.
    1927             :      *
    1928             :      * The startup process during promotion invokes ShutDownSlotSync() which
    1929             :      * waits for slot sync to finish and it does that by checking the
    1930             :      * 'syncing' flag. Thus the SQL function must be done with slots' release
    1931             :      * and cleanup to avoid any dangling temporary slots or active slots
    1932             :      * before it marks itself as finished syncing.
    1933             :      */
    1934             : 
    1935             :     /* Make sure active replication slots are released */
    1936           2 :     if (MyReplicationSlot != NULL)
    1937           0 :         ReplicationSlotRelease();
    1938             : 
    1939             :     /* Also cleanup the synced temporary slots. */
    1940           2 :     ReplicationSlotCleanup(true);
    1941             : 
    1942             :     /*
    1943             :      * The set syncing_slots indicates that the process errored out without
    1944             :      * resetting the flag. So, we need to clean up shared memory and reset the
    1945             :      * flag here.
    1946             :      */
    1947           2 :     if (syncing_slots)
    1948           2 :         reset_syncing_flag();
    1949             : 
    1950           2 :     walrcv_disconnect(wrconn);
    1951           2 : }
    1952             : 
    1953             : /*
    1954             :  * Helper function to extract slot names from a list of remote slots
    1955             :  */
    1956             : static List *
    1957           2 : extract_slot_names(List *remote_slots)
    1958             : {
    1959           2 :     List       *slot_names = NIL;
    1960             : 
    1961           6 :     foreach_ptr(RemoteSlot, remote_slot, remote_slots)
    1962             :     {
    1963             :         char       *slot_name;
    1964             : 
    1965           2 :         slot_name = pstrdup(remote_slot->name);
    1966           2 :         slot_names = lappend(slot_names, slot_name);
    1967             :     }
    1968             : 
    1969           2 :     return slot_names;
    1970             : }
    1971             : 
    1972             : /*
    1973             :  * Synchronize the failover enabled replication slots using the specified
    1974             :  * primary server connection.
    1975             :  *
    1976             :  * Repeatedly fetches and updates replication slot information from the
    1977             :  * primary until all slots are at least "sync ready".
    1978             :  *
    1979             :  * Exits early if promotion is triggered or certain critical
    1980             :  * configuration parameters have changed.
    1981             :  */
    1982             : void
    1983          18 : SyncReplicationSlots(WalReceiverConn *wrconn)
    1984             : {
    1985          18 :     PG_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn));
    1986             :     {
    1987          18 :         List       *remote_slots = NIL;
    1988          18 :         List       *slot_names = NIL;   /* List of slot names to track */
    1989             : 
    1990          18 :         check_and_set_sync_info(MyProcPid);
    1991             : 
    1992             :         /* Check for interrupts and config changes */
    1993          18 :         ProcessSlotSyncInterrupts();
    1994             : 
    1995          18 :         validate_remote_info(wrconn);
    1996             : 
    1997             :         /* Retry until all the slots are sync-ready */
    1998             :         for (;;)
    1999           4 :         {
    2000          20 :             bool        slot_persistence_pending = false;
    2001          20 :             bool        some_slot_updated = false;
    2002             : 
    2003             :             /* Check for interrupts and config changes */
    2004          20 :             ProcessSlotSyncInterrupts();
    2005             : 
    2006             :             /* We must be in a valid transaction state */
    2007             :             Assert(IsTransactionState());
    2008             : 
    2009             :             /*
    2010             :              * Fetch remote slot info for the given slot_names. If slot_names
    2011             :              * is NIL, fetch all failover-enabled slots. Note that we reuse
    2012             :              * slot_names from the first iteration; re-fetching all failover
    2013             :              * slots each time could cause an endless loop. Instead of
    2014             :              * reprocessing only the pending slots in each iteration, it's
    2015             :              * better to process all the slots received in the first
    2016             :              * iteration. This ensures that by the time we're done, all slots
    2017             :              * reflect the latest values.
    2018             :              */
    2019          20 :             remote_slots = fetch_remote_slots(wrconn, slot_names);
    2020             : 
    2021             :             /* Attempt to synchronize slots */
    2022          20 :             some_slot_updated = synchronize_slots(wrconn, remote_slots,
    2023             :                                                   &slot_persistence_pending);
    2024             : 
    2025             :             /*
    2026             :              * If slot_persistence_pending is true, extract slot names for
    2027             :              * future iterations (only needed if we haven't done it yet)
    2028             :              */
    2029          20 :             if (slot_names == NIL && slot_persistence_pending)
    2030           2 :                 slot_names = extract_slot_names(remote_slots);
    2031             : 
    2032             :             /* Free the current remote_slots list */
    2033          20 :             list_free_deep(remote_slots);
    2034             : 
    2035             :             /* Done if all slots are persisted i.e are sync-ready */
    2036          20 :             if (!slot_persistence_pending)
    2037          16 :                 break;
    2038             : 
    2039             :             /* wait before retrying again */
    2040           4 :             wait_for_slot_activity(some_slot_updated);
    2041             :         }
    2042             : 
    2043          16 :         if (slot_names)
    2044           2 :             list_free_deep(slot_names);
    2045             : 
    2046             :         /* Cleanup the synced temporary slots */
    2047          16 :         ReplicationSlotCleanup(true);
    2048             : 
    2049             :         /* We are done with sync, so reset sync flag */
    2050          16 :         reset_syncing_flag();
    2051             :     }
    2052          18 :     PG_END_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn));
    2053          16 : }

Generated by: LCOV version 1.16