LCOV - code coverage report
Current view: top level - src/backend/replication/logical - slotsync.c (source / functions) Hit Total Coverage
Test: PostgreSQL 19devel Lines: 426 480 88.8 %
Date: 2025-12-07 10:17:27 Functions: 28 28 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  * slotsync.c
       3             :  *     Functionality for synchronizing slots to a standby server from the
       4             :  *         primary server.
       5             :  *
       6             :  * Copyright (c) 2024-2025, PostgreSQL Global Development Group
       7             :  *
       8             :  * IDENTIFICATION
       9             :  *    src/backend/replication/logical/slotsync.c
      10             :  *
      11             :  * This file contains the code for slot synchronization on a physical standby
      12             :  * to fetch logical failover slots information from the primary server, create
      13             :  * the slots on the standby and synchronize them periodically.
      14             :  *
      15             :  * Slot synchronization can be performed either automatically by enabling slot
      16             :  * sync worker or manually by calling SQL function pg_sync_replication_slots().
      17             :  *
      18             :  * If the WAL corresponding to the remote's restart_lsn is not available on the
      19             :  * physical standby or the remote's catalog_xmin precedes the oldest xid for
      20             :  * which it is guaranteed that rows wouldn't have been removed then we cannot
      21             :  * create the local standby slot because that would mean moving the local slot
      22             :  * backward and decoding won't be possible via such a slot. In this case, the
      23             :  * slot will be marked as RS_TEMPORARY. Once the primary server catches up,
      24             :  * the slot will be marked as RS_PERSISTENT (which means sync-ready) after
      25             :  * which slot sync worker can perform the sync periodically or user can call
      26             :  * pg_sync_replication_slots() periodically to perform the syncs.
      27             :  *
      28             :  * If synchronized slots fail to build a consistent snapshot from the
      29             :  * restart_lsn before reaching confirmed_flush_lsn, they would become
      30             :  * unreliable after promotion due to potential data loss from changes
      31             :  * before reaching a consistent point. This can happen because the slots can
      32             :  * be synced at some random time and we may not reach the consistent point
      33             :  * at the same WAL location as the primary. So, we mark such slots as
      34             :  * RS_TEMPORARY. Once the decoding from corresponding LSNs can reach a
      35             :  * consistent point, they will be marked as RS_PERSISTENT.
      36             :  *
      37             :  * The slot sync worker waits for some time before the next synchronization,
      38             :  * with the duration varying based on whether any slots were updated during
      39             :  * the last cycle. Refer to the comments above wait_for_slot_activity() for
      40             :  * more details.
      41             :  *
      42             :  * Any standby synchronized slots will be dropped if they no longer need
      43             :  * to be synchronized. See comment atop drop_local_obsolete_slots() for more
      44             :  * details.
      45             :  *---------------------------------------------------------------------------
      46             :  */
      47             : 
      48             : #include "postgres.h"
      49             : 
      50             : #include <time.h>
      51             : 
      52             : #include "access/xlog_internal.h"
      53             : #include "access/xlogrecovery.h"
      54             : #include "catalog/pg_database.h"
      55             : #include "libpq/pqsignal.h"
      56             : #include "pgstat.h"
      57             : #include "postmaster/interrupt.h"
      58             : #include "replication/logical.h"
      59             : #include "replication/slotsync.h"
      60             : #include "replication/snapbuild.h"
      61             : #include "storage/ipc.h"
      62             : #include "storage/lmgr.h"
      63             : #include "storage/proc.h"
      64             : #include "storage/procarray.h"
      65             : #include "tcop/tcopprot.h"
      66             : #include "utils/builtins.h"
      67             : #include "utils/pg_lsn.h"
      68             : #include "utils/ps_status.h"
      69             : #include "utils/timeout.h"
      70             : 
      71             : /*
      72             :  * Struct for sharing information to control slot synchronization.
      73             :  *
      74             :  * The slot sync worker's pid is needed by the startup process to shut it
      75             :  * down during promotion. The startup process shuts down the slot sync worker
      76             :  * and also sets stopSignaled=true to handle the race condition when the
      77             :  * postmaster has not noticed the promotion yet and thus may end up restarting
      78             :  * the slot sync worker. If stopSignaled is set, the worker will exit in such a
      79             :  * case. The SQL function pg_sync_replication_slots() will also error out if
      80             :  * this flag is set. Note that we don't need to reset this variable as after
      81             :  * promotion the slot sync worker won't be restarted because the pmState
      82             :  * changes to PM_RUN from PM_HOT_STANDBY and we don't support demoting
      83             :  * primary without restarting the server. See LaunchMissingBackgroundProcesses.
      84             :  *
      85             :  * The 'syncing' flag is needed to prevent concurrent slot syncs to avoid slot
      86             :  * overwrites.
      87             :  *
      88             :  * The 'last_start_time' is needed by postmaster to start the slot sync worker
      89             :  * once per SLOTSYNC_RESTART_INTERVAL_SEC. In cases where an immediate restart
      90             :  * is expected (e.g., slot sync GUCs change), slot sync worker will reset
      91             :  * last_start_time before exiting, so that postmaster can start the worker
      92             :  * without waiting for SLOTSYNC_RESTART_INTERVAL_SEC.
      93             :  */
      94             : typedef struct SlotSyncCtxStruct
      95             : {
      96             :     pid_t       pid;
      97             :     bool        stopSignaled;
      98             :     bool        syncing;
      99             :     time_t      last_start_time;
     100             :     slock_t     mutex;
     101             : } SlotSyncCtxStruct;
     102             : 
     103             : static SlotSyncCtxStruct *SlotSyncCtx = NULL;
     104             : 
     105             : /* GUC variable */
     106             : bool        sync_replication_slots = false;
     107             : 
     108             : /*
     109             :  * The sleep time (ms) between slot-sync cycles varies dynamically
     110             :  * (within a MIN/MAX range) according to slot activity. See
     111             :  * wait_for_slot_activity() for details.
     112             :  */
     113             : #define MIN_SLOTSYNC_WORKER_NAPTIME_MS  200
     114             : #define MAX_SLOTSYNC_WORKER_NAPTIME_MS  30000   /* 30s */
     115             : 
     116             : static long sleep_ms = MIN_SLOTSYNC_WORKER_NAPTIME_MS;
     117             : 
     118             : /* The restart interval for slot sync work used by postmaster */
     119             : #define SLOTSYNC_RESTART_INTERVAL_SEC 10
     120             : 
     121             : /*
     122             :  * Flag to tell if we are syncing replication slots. Unlike the 'syncing' flag
     123             :  * in SlotSyncCtxStruct, this flag is true only if the current process is
     124             :  * performing slot synchronization.
     125             :  */
     126             : static bool syncing_slots = false;
     127             : 
     128             : /*
     129             :  * Structure to hold information fetched from the primary server about a logical
     130             :  * replication slot.
     131             :  */
     132             : typedef struct RemoteSlot
     133             : {
     134             :     char       *name;
     135             :     char       *plugin;
     136             :     char       *database;
     137             :     bool        two_phase;
     138             :     bool        failover;
     139             :     XLogRecPtr  restart_lsn;
     140             :     XLogRecPtr  confirmed_lsn;
     141             :     XLogRecPtr  two_phase_at;
     142             :     TransactionId catalog_xmin;
     143             : 
     144             :     /* RS_INVAL_NONE if valid, or the reason of invalidation */
     145             :     ReplicationSlotInvalidationCause invalidated;
     146             : } RemoteSlot;
     147             : 
     148             : static void slotsync_failure_callback(int code, Datum arg);
     149             : static void update_synced_slots_inactive_since(void);
     150             : 
     151             : /*
     152             :  * Update slot sync skip stats. This function requires the caller to acquire
     153             :  * the slot.
     154             :  */
     155             : static void
     156          76 : update_slotsync_skip_stats(SlotSyncSkipReason skip_reason)
     157             : {
     158             :     ReplicationSlot *slot;
     159             : 
     160             :     Assert(MyReplicationSlot);
     161             : 
     162          76 :     slot = MyReplicationSlot;
     163             : 
     164             :     /*
     165             :      * Update the slot sync related stats in pg_stat_replication_slot when a
     166             :      * slot sync is skipped
     167             :      */
     168          76 :     if (skip_reason != SS_SKIP_NONE)
     169           2 :         pgstat_report_replslotsync(slot);
     170             : 
     171             :     /* Update the slot sync skip reason */
     172          76 :     if (slot->slotsync_skip_reason != skip_reason)
     173             :     {
     174           2 :         SpinLockAcquire(&slot->mutex);
     175           2 :         slot->slotsync_skip_reason = skip_reason;
     176           2 :         SpinLockRelease(&slot->mutex);
     177             :     }
     178          76 : }
     179             : 
     180             : /*
     181             :  * If necessary, update the local synced slot's metadata based on the data
     182             :  * from the remote slot.
     183             :  *
     184             :  * If no update was needed (the data of the remote slot is the same as the
     185             :  * local slot) return false, otherwise true.
     186             :  *
     187             :  * *found_consistent_snapshot will be true iff the remote slot's LSN or xmin is
     188             :  * modified, and decoding from the corresponding LSN's can reach a
     189             :  * consistent snapshot.
     190             :  *
     191             :  * *remote_slot_precedes will be true if the remote slot's LSN or xmin
     192             :  * precedes locally reserved position.
     193             :  */
     194             : static bool
     195          76 : update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
     196             :                          bool *found_consistent_snapshot,
     197             :                          bool *remote_slot_precedes)
     198             : {
     199          76 :     ReplicationSlot *slot = MyReplicationSlot;
     200          76 :     bool        updated_xmin_or_lsn = false;
     201          76 :     bool        updated_config = false;
     202          76 :     SlotSyncSkipReason skip_reason = SS_SKIP_NONE;
     203             : 
     204             :     Assert(slot->data.invalidated == RS_INVAL_NONE);
     205             : 
     206          76 :     if (found_consistent_snapshot)
     207          12 :         *found_consistent_snapshot = false;
     208             : 
     209          76 :     if (remote_slot_precedes)
     210          12 :         *remote_slot_precedes = false;
     211             : 
     212             :     /*
     213             :      * Don't overwrite if we already have a newer catalog_xmin and
     214             :      * restart_lsn.
     215             :      */
     216         152 :     if (remote_slot->restart_lsn < slot->data.restart_lsn ||
     217          76 :         TransactionIdPrecedes(remote_slot->catalog_xmin,
     218             :                               slot->data.catalog_xmin))
     219             :     {
     220             :         /* Update slot sync skip stats */
     221           2 :         update_slotsync_skip_stats(SS_SKIP_WAL_OR_ROWS_REMOVED);
     222             : 
     223             :         /*
     224             :          * This can happen in following situations:
     225             :          *
     226             :          * If the slot is temporary, it means either the initial WAL location
     227             :          * reserved for the local slot is ahead of the remote slot's
     228             :          * restart_lsn or the initial xmin_horizon computed for the local slot
     229             :          * is ahead of the remote slot.
     230             :          *
     231             :          * If the slot is persistent, both restart_lsn and catalog_xmin of the
     232             :          * synced slot could still be ahead of the remote slot. Since we use
     233             :          * slot advance functionality to keep snapbuild/slot updated, it is
     234             :          * possible that the restart_lsn and catalog_xmin are advanced to a
     235             :          * later position than it has on the primary. This can happen when
     236             :          * slot advancing machinery finds running xacts record after reaching
     237             :          * the consistent state at a later point than the primary where it
     238             :          * serializes the snapshot and updates the restart_lsn.
     239             :          *
     240             :          * We LOG the message if the slot is temporary as it can help the user
     241             :          * to understand why the slot is not sync-ready. In the case of a
     242             :          * persistent slot, it would be a more common case and won't directly
     243             :          * impact the users, so we used DEBUG1 level to log the message.
     244             :          */
     245           2 :         ereport(slot->data.persistency == RS_TEMPORARY ? LOG : DEBUG1,
     246             :                 errmsg("could not synchronize replication slot \"%s\"",
     247             :                        remote_slot->name),
     248             :                 errdetail("Synchronization could lead to data loss, because the remote slot needs WAL at LSN %X/%08X and catalog xmin %u, but the standby has LSN %X/%08X and catalog xmin %u.",
     249             :                           LSN_FORMAT_ARGS(remote_slot->restart_lsn),
     250             :                           remote_slot->catalog_xmin,
     251             :                           LSN_FORMAT_ARGS(slot->data.restart_lsn),
     252             :                           slot->data.catalog_xmin));
     253             : 
     254           2 :         if (remote_slot_precedes)
     255           2 :             *remote_slot_precedes = true;
     256             : 
     257             :         /*
     258             :          * Skip updating the configuration. This is required to avoid syncing
     259             :          * two_phase_at without syncing confirmed_lsn. Otherwise, the prepared
     260             :          * transaction between old confirmed_lsn and two_phase_at will
     261             :          * unexpectedly get decoded and sent to the downstream after
     262             :          * promotion. See comments in ReorderBufferFinishPrepared.
     263             :          */
     264           2 :         return false;
     265             :     }
     266             : 
     267             :     /*
     268             :      * Attempt to sync LSNs and xmins only if remote slot is ahead of local
     269             :      * slot.
     270             :      */
     271          74 :     if (remote_slot->confirmed_lsn > slot->data.confirmed_flush ||
     272         102 :         remote_slot->restart_lsn > slot->data.restart_lsn ||
     273          50 :         TransactionIdFollows(remote_slot->catalog_xmin,
     274             :                              slot->data.catalog_xmin))
     275             :     {
     276             :         /*
     277             :          * We can't directly copy the remote slot's LSN or xmin unless there
     278             :          * exists a consistent snapshot at that point. Otherwise, after
     279             :          * promotion, the slots may not reach a consistent point before the
     280             :          * confirmed_flush_lsn which can lead to a data loss. To avoid data
     281             :          * loss, we let slot machinery advance the slot which ensures that
     282             :          * snapbuilder/slot statuses are updated properly.
     283             :          */
     284          24 :         if (SnapBuildSnapshotExists(remote_slot->restart_lsn))
     285             :         {
     286             :             /*
     287             :              * Update the slot info directly if there is a serialized snapshot
     288             :              * at the restart_lsn, as the slot can quickly reach consistency
     289             :              * at restart_lsn by restoring the snapshot.
     290             :              */
     291           6 :             SpinLockAcquire(&slot->mutex);
     292           6 :             slot->data.restart_lsn = remote_slot->restart_lsn;
     293           6 :             slot->data.confirmed_flush = remote_slot->confirmed_lsn;
     294           6 :             slot->data.catalog_xmin = remote_slot->catalog_xmin;
     295           6 :             SpinLockRelease(&slot->mutex);
     296             : 
     297           6 :             if (found_consistent_snapshot)
     298           0 :                 *found_consistent_snapshot = true;
     299             :         }
     300             :         else
     301             :         {
     302          18 :             LogicalSlotAdvanceAndCheckSnapState(remote_slot->confirmed_lsn,
     303             :                                                 found_consistent_snapshot);
     304             : 
     305             :             /* Sanity check */
     306          18 :             if (slot->data.confirmed_flush != remote_slot->confirmed_lsn)
     307           0 :                 ereport(ERROR,
     308             :                         errmsg_internal("synchronized confirmed_flush for slot \"%s\" differs from remote slot",
     309             :                                         remote_slot->name),
     310             :                         errdetail_internal("Remote slot has LSN %X/%08X but local slot has LSN %X/%08X.",
     311             :                                            LSN_FORMAT_ARGS(remote_slot->confirmed_lsn),
     312             :                                            LSN_FORMAT_ARGS(slot->data.confirmed_flush)));
     313             : 
     314             :             /*
     315             :              * If we can't reach a consistent snapshot, the slot won't be
     316             :              * persisted. See update_and_persist_local_synced_slot().
     317             :              */
     318          18 :             if (found_consistent_snapshot && !(*found_consistent_snapshot))
     319           0 :                 skip_reason = SS_SKIP_NO_CONSISTENT_SNAPSHOT;
     320             :         }
     321             : 
     322          24 :         updated_xmin_or_lsn = true;
     323             :     }
     324             : 
     325             :     /* Update slot sync skip stats */
     326          74 :     update_slotsync_skip_stats(skip_reason);
     327             : 
     328          74 :     if (remote_dbid != slot->data.database ||
     329          74 :         remote_slot->two_phase != slot->data.two_phase ||
     330          72 :         remote_slot->failover != slot->data.failover ||
     331          72 :         strcmp(remote_slot->plugin, NameStr(slot->data.plugin)) != 0 ||
     332          72 :         remote_slot->two_phase_at != slot->data.two_phase_at)
     333             :     {
     334             :         NameData    plugin_name;
     335             : 
     336             :         /* Avoid expensive operations while holding a spinlock. */
     337           2 :         namestrcpy(&plugin_name, remote_slot->plugin);
     338             : 
     339           2 :         SpinLockAcquire(&slot->mutex);
     340           2 :         slot->data.plugin = plugin_name;
     341           2 :         slot->data.database = remote_dbid;
     342           2 :         slot->data.two_phase = remote_slot->two_phase;
     343           2 :         slot->data.two_phase_at = remote_slot->two_phase_at;
     344           2 :         slot->data.failover = remote_slot->failover;
     345           2 :         SpinLockRelease(&slot->mutex);
     346             : 
     347           2 :         updated_config = true;
     348             : 
     349             :         /*
     350             :          * Ensure that there is no risk of sending prepared transactions
     351             :          * unexpectedly after the promotion.
     352             :          */
     353             :         Assert(slot->data.two_phase_at <= slot->data.confirmed_flush);
     354             :     }
     355             : 
     356             :     /*
     357             :      * We have to write the changed xmin to disk *before* we change the
     358             :      * in-memory value, otherwise after a crash we wouldn't know that some
     359             :      * catalog tuples might have been removed already.
     360             :      */
     361          74 :     if (updated_config || updated_xmin_or_lsn)
     362             :     {
     363          26 :         ReplicationSlotMarkDirty();
     364          26 :         ReplicationSlotSave();
     365             :     }
     366             : 
     367             :     /*
     368             :      * Now the new xmin is safely on disk, we can let the global value
     369             :      * advance. We do not take ProcArrayLock or similar since we only advance
     370             :      * xmin here and there's not much harm done by a concurrent computation
     371             :      * missing that.
     372             :      */
     373          74 :     if (updated_xmin_or_lsn)
     374             :     {
     375          24 :         SpinLockAcquire(&slot->mutex);
     376          24 :         slot->effective_catalog_xmin = remote_slot->catalog_xmin;
     377          24 :         SpinLockRelease(&slot->mutex);
     378             : 
     379          24 :         ReplicationSlotsComputeRequiredXmin(false);
     380          24 :         ReplicationSlotsComputeRequiredLSN();
     381             :     }
     382             : 
     383          74 :     return updated_config || updated_xmin_or_lsn;
     384             : }
     385             : 
     386             : /*
     387             :  * Get the list of local logical slots that are synchronized from the
     388             :  * primary server.
     389             :  */
     390             : static List *
     391          42 : get_local_synced_slots(void)
     392             : {
     393          42 :     List       *local_slots = NIL;
     394             : 
     395          42 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     396             : 
     397         462 :     for (int i = 0; i < max_replication_slots; i++)
     398             :     {
     399         420 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
     400             : 
     401             :         /* Check if it is a synchronized slot */
     402         420 :         if (s->in_use && s->data.synced)
     403             :         {
     404             :             Assert(SlotIsLogical(s));
     405          68 :             local_slots = lappend(local_slots, s);
     406             :         }
     407             :     }
     408             : 
     409          42 :     LWLockRelease(ReplicationSlotControlLock);
     410             : 
     411          42 :     return local_slots;
     412             : }
     413             : 
     414             : /*
     415             :  * Helper function to check if local_slot is required to be retained.
     416             :  *
     417             :  * Return false either if local_slot does not exist in the remote_slots list
     418             :  * or is invalidated while the corresponding remote slot is still valid,
     419             :  * otherwise true.
     420             :  */
     421             : static bool
     422          68 : local_sync_slot_required(ReplicationSlot *local_slot, List *remote_slots)
     423             : {
     424          68 :     bool        remote_exists = false;
     425          68 :     bool        locally_invalidated = false;
     426             : 
     427         168 :     foreach_ptr(RemoteSlot, remote_slot, remote_slots)
     428             :     {
     429          98 :         if (strcmp(remote_slot->name, NameStr(local_slot->data.name)) == 0)
     430             :         {
     431          66 :             remote_exists = true;
     432             : 
     433             :             /*
     434             :              * If remote slot is not invalidated but local slot is marked as
     435             :              * invalidated, then set locally_invalidated flag.
     436             :              */
     437          66 :             SpinLockAcquire(&local_slot->mutex);
     438          66 :             locally_invalidated =
     439         132 :                 (remote_slot->invalidated == RS_INVAL_NONE) &&
     440          66 :                 (local_slot->data.invalidated != RS_INVAL_NONE);
     441          66 :             SpinLockRelease(&local_slot->mutex);
     442             : 
     443          66 :             break;
     444             :         }
     445             :     }
     446             : 
     447          68 :     return (remote_exists && !locally_invalidated);
     448             : }
     449             : 
     450             : /*
     451             :  * Drop local obsolete slots.
     452             :  *
     453             :  * Drop the local slots that no longer need to be synced i.e. these either do
     454             :  * not exist on the primary or are no longer enabled for failover.
     455             :  *
     456             :  * Additionally, drop any slots that are valid on the primary but got
     457             :  * invalidated on the standby. This situation may occur due to the following
     458             :  * reasons:
     459             :  * - The 'max_slot_wal_keep_size' on the standby is insufficient to retain WAL
     460             :  *   records from the restart_lsn of the slot.
     461             :  * - 'primary_slot_name' is temporarily reset to null and the physical slot is
     462             :  *   removed.
     463             :  * These dropped slots will get recreated in next sync-cycle and it is okay to
     464             :  * drop and recreate such slots as long as these are not consumable on the
     465             :  * standby (which is the case currently).
     466             :  *
     467             :  * Note: Change of 'wal_level' on the primary server to a level lower than
     468             :  * logical may also result in slot invalidation and removal on the standby.
     469             :  * This is because such 'wal_level' change is only possible if the logical
     470             :  * slots are removed on the primary server, so it's expected to see the
     471             :  * slots being invalidated and removed on the standby too (and re-created
     472             :  * if they are re-created on the primary server).
     473             :  */
     474             : static void
     475          42 : drop_local_obsolete_slots(List *remote_slot_list)
     476             : {
     477          42 :     List       *local_slots = get_local_synced_slots();
     478             : 
     479         152 :     foreach_ptr(ReplicationSlot, local_slot, local_slots)
     480             :     {
     481             :         /* Drop the local slot if it is not required to be retained. */
     482          68 :         if (!local_sync_slot_required(local_slot, remote_slot_list))
     483             :         {
     484             :             bool        synced_slot;
     485             : 
     486             :             /*
     487             :              * Use shared lock to prevent a conflict with
     488             :              * ReplicationSlotsDropDBSlots(), trying to drop the same slot
     489             :              * during a drop-database operation.
     490             :              */
     491           4 :             LockSharedObject(DatabaseRelationId, local_slot->data.database,
     492             :                              0, AccessShareLock);
     493             : 
     494             :             /*
     495             :              * In the small window between getting the slot to drop and
     496             :              * locking the database, there is a possibility of a parallel
     497             :              * database drop by the startup process and the creation of a new
     498             :              * slot by the user. This new user-created slot may end up using
     499             :              * the same shared memory as that of 'local_slot'. Thus check if
     500             :              * local_slot is still the synced one before performing actual
     501             :              * drop.
     502             :              */
     503           4 :             SpinLockAcquire(&local_slot->mutex);
     504           4 :             synced_slot = local_slot->in_use && local_slot->data.synced;
     505           4 :             SpinLockRelease(&local_slot->mutex);
     506             : 
     507           4 :             if (synced_slot)
     508             :             {
     509           4 :                 ReplicationSlotAcquire(NameStr(local_slot->data.name), true, false);
     510           4 :                 ReplicationSlotDropAcquired();
     511             :             }
     512             : 
     513           4 :             UnlockSharedObject(DatabaseRelationId, local_slot->data.database,
     514             :                                0, AccessShareLock);
     515             : 
     516           4 :             ereport(LOG,
     517             :                     errmsg("dropped replication slot \"%s\" of database with OID %u",
     518             :                            NameStr(local_slot->data.name),
     519             :                            local_slot->data.database));
     520             :         }
     521             :     }
     522          42 : }
     523             : 
     524             : /*
     525             :  * Reserve WAL for the currently active local slot using the specified WAL
     526             :  * location (restart_lsn).
     527             :  *
     528             :  * If the given WAL location has been removed, reserve WAL using the oldest
     529             :  * existing WAL segment.
     530             :  */
     531             : static void
     532          12 : reserve_wal_for_local_slot(XLogRecPtr restart_lsn)
     533             : {
     534             :     XLogSegNo   oldest_segno;
     535             :     XLogSegNo   segno;
     536          12 :     ReplicationSlot *slot = MyReplicationSlot;
     537             : 
     538             :     Assert(slot != NULL);
     539             :     Assert(!XLogRecPtrIsValid(slot->data.restart_lsn));
     540             : 
     541             :     while (true)
     542             :     {
     543          12 :         SpinLockAcquire(&slot->mutex);
     544          12 :         slot->data.restart_lsn = restart_lsn;
     545          12 :         SpinLockRelease(&slot->mutex);
     546             : 
     547             :         /* Prevent WAL removal as fast as possible */
     548          12 :         ReplicationSlotsComputeRequiredLSN();
     549             : 
     550          12 :         XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size);
     551             : 
     552             :         /*
     553             :          * Find the oldest existing WAL segment file.
     554             :          *
     555             :          * Normally, we can determine it by using the last removed segment
     556             :          * number. However, if no WAL segment files have been removed by a
     557             :          * checkpoint since startup, we need to search for the oldest segment
     558             :          * file from the current timeline existing in XLOGDIR.
     559             :          *
     560             :          * XXX: Currently, we are searching for the oldest segment in the
     561             :          * current timeline as there is less chance of the slot's restart_lsn
     562             :          * from being some prior timeline, and even if it happens, in the
     563             :          * worst case, we will wait to sync till the slot's restart_lsn moved
     564             :          * to the current timeline.
     565             :          */
     566          12 :         oldest_segno = XLogGetLastRemovedSegno() + 1;
     567             : 
     568          12 :         if (oldest_segno == 1)
     569             :         {
     570             :             TimeLineID  cur_timeline;
     571             : 
     572           8 :             GetWalRcvFlushRecPtr(NULL, &cur_timeline);
     573           8 :             oldest_segno = XLogGetOldestSegno(cur_timeline);
     574             :         }
     575             : 
     576          12 :         elog(DEBUG1, "segno: " UINT64_FORMAT " of purposed restart_lsn for the synced slot, oldest_segno: " UINT64_FORMAT " available",
     577             :              segno, oldest_segno);
     578             : 
     579             :         /*
     580             :          * If all required WAL is still there, great, otherwise retry. The
     581             :          * slot should prevent further removal of WAL, unless there's a
     582             :          * concurrent ReplicationSlotsComputeRequiredLSN() after we've written
     583             :          * the new restart_lsn above, so normally we should never need to loop
     584             :          * more than twice.
     585             :          */
     586          12 :         if (segno >= oldest_segno)
     587          12 :             break;
     588             : 
     589             :         /* Retry using the location of the oldest wal segment */
     590           0 :         XLogSegNoOffsetToRecPtr(oldest_segno, 0, wal_segment_size, restart_lsn);
     591             :     }
     592          12 : }
     593             : 
     594             : /*
     595             :  * If the remote restart_lsn and catalog_xmin have caught up with the
     596             :  * local ones, then update the LSNs and persist the local synced slot for
     597             :  * future synchronization; otherwise, do nothing.
     598             :  *
     599             :  * Return true if the slot is marked as RS_PERSISTENT (sync-ready), otherwise
     600             :  * false.
     601             :  */
     602             : static bool
     603          12 : update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
     604             : {
     605          12 :     ReplicationSlot *slot = MyReplicationSlot;
     606          12 :     bool        found_consistent_snapshot = false;
     607          12 :     bool        remote_slot_precedes = false;
     608             : 
     609             :     /* Slotsync skip stats are handled in function update_local_synced_slot() */
     610          12 :     (void) update_local_synced_slot(remote_slot, remote_dbid,
     611             :                                     &found_consistent_snapshot,
     612             :                                     &remote_slot_precedes);
     613             : 
     614             :     /*
     615             :      * Check if the primary server has caught up. Refer to the comment atop
     616             :      * the file for details on this check.
     617             :      */
     618          12 :     if (remote_slot_precedes)
     619             :     {
     620             :         /*
     621             :          * The remote slot didn't catch up to locally reserved position.
     622             :          *
     623             :          * We do not drop the slot because the restart_lsn can be ahead of the
     624             :          * current location when recreating the slot in the next cycle. It may
     625             :          * take more time to create such a slot. Therefore, we keep this slot
     626             :          * and attempt the synchronization in the next cycle.
     627             :          */
     628           2 :         return false;
     629             :     }
     630             : 
     631             :     /*
     632             :      * Don't persist the slot if it cannot reach the consistent point from the
     633             :      * restart_lsn. See comments atop this file.
     634             :      */
     635          10 :     if (!found_consistent_snapshot)
     636             :     {
     637           0 :         ereport(LOG,
     638             :                 errmsg("could not synchronize replication slot \"%s\"", remote_slot->name),
     639             :                 errdetail("Synchronization could lead to data loss, because the standby could not build a consistent snapshot to decode WALs at LSN %X/%08X.",
     640             :                           LSN_FORMAT_ARGS(slot->data.restart_lsn)));
     641             : 
     642           0 :         return false;
     643             :     }
     644             : 
     645          10 :     ReplicationSlotPersist();
     646             : 
     647          10 :     ereport(LOG,
     648             :             errmsg("newly created replication slot \"%s\" is sync-ready now",
     649             :                    remote_slot->name));
     650             : 
     651          10 :     return true;
     652             : }
     653             : 
     654             : /*
     655             :  * Synchronize a single slot to the given position.
     656             :  *
     657             :  * This creates a new slot if there is no existing one and updates the
     658             :  * metadata of the slot as per the data received from the primary server.
     659             :  *
     660             :  * The slot is created as a temporary slot and stays in the same state until the
     661             :  * remote_slot catches up with locally reserved position and local slot is
     662             :  * updated. The slot is then persisted and is considered as sync-ready for
     663             :  * periodic syncs.
     664             :  *
     665             :  * Returns TRUE if the local slot is updated.
     666             :  */
     667             : static bool
     668          76 : synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
     669             : {
     670             :     ReplicationSlot *slot;
     671          76 :     XLogRecPtr  latestFlushPtr = GetStandbyFlushRecPtr(NULL);
     672          76 :     bool        slot_updated = false;
     673             : 
     674             :     /* Search for the named slot */
     675          76 :     if ((slot = SearchNamedReplicationSlot(remote_slot->name, true)))
     676             :     {
     677             :         bool        synced;
     678             : 
     679          64 :         SpinLockAcquire(&slot->mutex);
     680          64 :         synced = slot->data.synced;
     681          64 :         SpinLockRelease(&slot->mutex);
     682             : 
     683             :         /* User-created slot with the same name exists, raise ERROR. */
     684          64 :         if (!synced)
     685           0 :             ereport(ERROR,
     686             :                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     687             :                     errmsg("exiting from slot synchronization because same"
     688             :                            " name slot \"%s\" already exists on the standby",
     689             :                            remote_slot->name));
     690             : 
     691             :         /*
     692             :          * The slot has been synchronized before.
     693             :          *
     694             :          * It is important to acquire the slot here before checking
     695             :          * invalidation. If we don't acquire the slot first, there could be a
     696             :          * race condition that the local slot could be invalidated just after
     697             :          * checking the 'invalidated' flag here and we could end up
     698             :          * overwriting 'invalidated' flag to remote_slot's value. See
     699             :          * InvalidatePossiblyObsoleteSlot() where it invalidates slot directly
     700             :          * if the slot is not acquired by other processes.
     701             :          *
     702             :          * XXX: If it ever turns out that slot acquire/release is costly for
     703             :          * cases when none of the slot properties is changed then we can do a
     704             :          * pre-check to ensure that at least one of the slot properties is
     705             :          * changed before acquiring the slot.
     706             :          */
     707          64 :         ReplicationSlotAcquire(remote_slot->name, true, false);
     708             : 
     709             :         Assert(slot == MyReplicationSlot);
     710             : 
     711             :         /*
     712             :          * Copy the invalidation cause from remote only if local slot is not
     713             :          * invalidated locally, we don't want to overwrite existing one.
     714             :          */
     715          64 :         if (slot->data.invalidated == RS_INVAL_NONE &&
     716          64 :             remote_slot->invalidated != RS_INVAL_NONE)
     717             :         {
     718           0 :             SpinLockAcquire(&slot->mutex);
     719           0 :             slot->data.invalidated = remote_slot->invalidated;
     720           0 :             SpinLockRelease(&slot->mutex);
     721             : 
     722             :             /* Make sure the invalidated state persists across server restart */
     723           0 :             ReplicationSlotMarkDirty();
     724           0 :             ReplicationSlotSave();
     725             : 
     726           0 :             slot_updated = true;
     727             :         }
     728             : 
     729             :         /* Skip the sync of an invalidated slot */
     730          64 :         if (slot->data.invalidated != RS_INVAL_NONE)
     731             :         {
     732           0 :             update_slotsync_skip_stats(SS_SKIP_INVALID);
     733             : 
     734           0 :             ReplicationSlotRelease();
     735           0 :             return slot_updated;
     736             :         }
     737             : 
     738             :         /*
     739             :          * Make sure that concerned WAL is received and flushed before syncing
     740             :          * slot to target lsn received from the primary server.
     741             :          *
     742             :          * Report statistics only after the slot has been acquired, ensuring
     743             :          * it cannot be dropped during the reporting process.
     744             :          */
     745          64 :         if (remote_slot->confirmed_lsn > latestFlushPtr)
     746             :         {
     747           0 :             update_slotsync_skip_stats(SS_SKIP_WAL_NOT_FLUSHED);
     748             : 
     749             :             /*
     750             :              * Can get here only if GUC 'synchronized_standby_slots' on the
     751             :              * primary server was not configured correctly.
     752             :              */
     753           0 :             ereport(AmLogicalSlotSyncWorkerProcess() ? LOG : ERROR,
     754             :                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     755             :                     errmsg("skipping slot synchronization because the received slot sync"
     756             :                            " LSN %X/%08X for slot \"%s\" is ahead of the standby position %X/%08X",
     757             :                            LSN_FORMAT_ARGS(remote_slot->confirmed_lsn),
     758             :                            remote_slot->name,
     759             :                            LSN_FORMAT_ARGS(latestFlushPtr)));
     760             : 
     761           0 :             ReplicationSlotRelease();
     762             : 
     763           0 :             return slot_updated;
     764             :         }
     765             : 
     766             :         /* Slot not ready yet, let's attempt to make it sync-ready now. */
     767          64 :         if (slot->data.persistency == RS_TEMPORARY)
     768             :         {
     769           0 :             slot_updated = update_and_persist_local_synced_slot(remote_slot,
     770             :                                                                 remote_dbid);
     771             :         }
     772             : 
     773             :         /* Slot ready for sync, so sync it. */
     774             :         else
     775             :         {
     776             :             /*
     777             :              * Sanity check: As long as the invalidations are handled
     778             :              * appropriately as above, this should never happen.
     779             :              *
     780             :              * We don't need to check restart_lsn here. See the comments in
     781             :              * update_local_synced_slot() for details.
     782             :              */
     783          64 :             if (remote_slot->confirmed_lsn < slot->data.confirmed_flush)
     784           0 :                 ereport(ERROR,
     785             :                         errmsg_internal("cannot synchronize local slot \"%s\"",
     786             :                                         remote_slot->name),
     787             :                         errdetail_internal("Local slot's start streaming location LSN(%X/%08X) is ahead of remote slot's LSN(%X/%08X).",
     788             :                                            LSN_FORMAT_ARGS(slot->data.confirmed_flush),
     789             :                                            LSN_FORMAT_ARGS(remote_slot->confirmed_lsn)));
     790             : 
     791          64 :             slot_updated = update_local_synced_slot(remote_slot, remote_dbid,
     792             :                                                     NULL, NULL);
     793             :         }
     794             :     }
     795             :     /* Otherwise create the slot first. */
     796             :     else
     797             :     {
     798             :         NameData    plugin_name;
     799          12 :         TransactionId xmin_horizon = InvalidTransactionId;
     800             : 
     801             :         /* Skip creating the local slot if remote_slot is invalidated already */
     802          12 :         if (remote_slot->invalidated != RS_INVAL_NONE)
     803           0 :             return false;
     804             : 
     805             :         /*
     806             :          * We create temporary slots instead of ephemeral slots here because
     807             :          * we want the slots to survive after releasing them. This is done to
     808             :          * avoid dropping and re-creating the slots in each synchronization
     809             :          * cycle if the restart_lsn or catalog_xmin of the remote slot has not
     810             :          * caught up.
     811             :          */
     812          12 :         ReplicationSlotCreate(remote_slot->name, true, RS_TEMPORARY,
     813          12 :                               remote_slot->two_phase,
     814          12 :                               remote_slot->failover,
     815             :                               true);
     816             : 
     817             :         /* For shorter lines. */
     818          12 :         slot = MyReplicationSlot;
     819             : 
     820             :         /* Avoid expensive operations while holding a spinlock. */
     821          12 :         namestrcpy(&plugin_name, remote_slot->plugin);
     822             : 
     823          12 :         SpinLockAcquire(&slot->mutex);
     824          12 :         slot->data.database = remote_dbid;
     825          12 :         slot->data.plugin = plugin_name;
     826          12 :         SpinLockRelease(&slot->mutex);
     827             : 
     828          12 :         reserve_wal_for_local_slot(remote_slot->restart_lsn);
     829             : 
     830          12 :         LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
     831          12 :         xmin_horizon = GetOldestSafeDecodingTransactionId(true);
     832          12 :         SpinLockAcquire(&slot->mutex);
     833          12 :         slot->effective_catalog_xmin = xmin_horizon;
     834          12 :         slot->data.catalog_xmin = xmin_horizon;
     835          12 :         SpinLockRelease(&slot->mutex);
     836          12 :         ReplicationSlotsComputeRequiredXmin(true);
     837          12 :         LWLockRelease(ProcArrayLock);
     838             : 
     839             :         /*
     840             :          * Make sure that concerned WAL is received and flushed before syncing
     841             :          * slot to target lsn received from the primary server.
     842             :          *
     843             :          * Report statistics only after the slot has been acquired, ensuring
     844             :          * it cannot be dropped during the reporting process.
     845             :          */
     846          12 :         if (remote_slot->confirmed_lsn > latestFlushPtr)
     847             :         {
     848           0 :             update_slotsync_skip_stats(SS_SKIP_WAL_NOT_FLUSHED);
     849             : 
     850             :             /*
     851             :              * Can get here only if GUC 'synchronized_standby_slots' on the
     852             :              * primary server was not configured correctly.
     853             :              */
     854           0 :             ereport(AmLogicalSlotSyncWorkerProcess() ? LOG : ERROR,
     855             :                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     856             :                     errmsg("skipping slot synchronization because the received slot sync"
     857             :                            " LSN %X/%08X for slot \"%s\" is ahead of the standby position %X/%08X",
     858             :                            LSN_FORMAT_ARGS(remote_slot->confirmed_lsn),
     859             :                            remote_slot->name,
     860             :                            LSN_FORMAT_ARGS(latestFlushPtr)));
     861             : 
     862           0 :             ReplicationSlotRelease();
     863             : 
     864           0 :             return false;
     865             :         }
     866             : 
     867          12 :         update_and_persist_local_synced_slot(remote_slot, remote_dbid);
     868             : 
     869          12 :         slot_updated = true;
     870             :     }
     871             : 
     872          76 :     ReplicationSlotRelease();
     873             : 
     874          76 :     return slot_updated;
     875             : }
     876             : 
     877             : /*
     878             :  * Synchronize slots.
     879             :  *
     880             :  * Gets the failover logical slots info from the primary server and updates
     881             :  * the slots locally. Creates the slots if not present on the standby.
     882             :  *
     883             :  * Returns TRUE if any of the slots gets updated in this sync-cycle.
     884             :  */
     885             : static bool
     886          44 : synchronize_slots(WalReceiverConn *wrconn)
     887             : {
     888             : #define SLOTSYNC_COLUMN_COUNT 10
     889          44 :     Oid         slotRow[SLOTSYNC_COLUMN_COUNT] = {TEXTOID, TEXTOID, LSNOID,
     890             :     LSNOID, XIDOID, BOOLOID, LSNOID, BOOLOID, TEXTOID, TEXTOID};
     891             : 
     892             :     WalRcvExecResult *res;
     893             :     TupleTableSlot *tupslot;
     894          44 :     List       *remote_slot_list = NIL;
     895          44 :     bool        some_slot_updated = false;
     896          44 :     bool        started_tx = false;
     897          44 :     const char *query = "SELECT slot_name, plugin, confirmed_flush_lsn,"
     898             :         " restart_lsn, catalog_xmin, two_phase, two_phase_at, failover,"
     899             :         " database, invalidation_reason"
     900             :         " FROM pg_catalog.pg_replication_slots"
     901             :         " WHERE failover and NOT temporary";
     902             : 
     903             :     /* The syscache access in walrcv_exec() needs a transaction env. */
     904          44 :     if (!IsTransactionState())
     905             :     {
     906          30 :         StartTransactionCommand();
     907          30 :         started_tx = true;
     908             :     }
     909             : 
     910             :     /* Execute the query */
     911          44 :     res = walrcv_exec(wrconn, query, SLOTSYNC_COLUMN_COUNT, slotRow);
     912          44 :     if (res->status != WALRCV_OK_TUPLES)
     913           2 :         ereport(ERROR,
     914             :                 errmsg("could not fetch failover logical slots info from the primary server: %s",
     915             :                        res->err));
     916             : 
     917             :     /* Construct the remote_slot tuple and synchronize each slot locally */
     918          42 :     tupslot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
     919         118 :     while (tuplestore_gettupleslot(res->tuplestore, true, false, tupslot))
     920             :     {
     921             :         bool        isnull;
     922          76 :         RemoteSlot *remote_slot = palloc0(sizeof(RemoteSlot));
     923             :         Datum       d;
     924          76 :         int         col = 0;
     925             : 
     926          76 :         remote_slot->name = TextDatumGetCString(slot_getattr(tupslot, ++col,
     927             :                                                              &isnull));
     928             :         Assert(!isnull);
     929             : 
     930          76 :         remote_slot->plugin = TextDatumGetCString(slot_getattr(tupslot, ++col,
     931             :                                                                &isnull));
     932             :         Assert(!isnull);
     933             : 
     934             :         /*
     935             :          * It is possible to get null values for LSN and Xmin if slot is
     936             :          * invalidated on the primary server, so handle accordingly.
     937             :          */
     938          76 :         d = slot_getattr(tupslot, ++col, &isnull);
     939          76 :         remote_slot->confirmed_lsn = isnull ? InvalidXLogRecPtr :
     940          76 :             DatumGetLSN(d);
     941             : 
     942          76 :         d = slot_getattr(tupslot, ++col, &isnull);
     943          76 :         remote_slot->restart_lsn = isnull ? InvalidXLogRecPtr : DatumGetLSN(d);
     944             : 
     945          76 :         d = slot_getattr(tupslot, ++col, &isnull);
     946          76 :         remote_slot->catalog_xmin = isnull ? InvalidTransactionId :
     947          76 :             DatumGetTransactionId(d);
     948             : 
     949          76 :         remote_slot->two_phase = DatumGetBool(slot_getattr(tupslot, ++col,
     950             :                                                            &isnull));
     951             :         Assert(!isnull);
     952             : 
     953          76 :         d = slot_getattr(tupslot, ++col, &isnull);
     954          76 :         remote_slot->two_phase_at = isnull ? InvalidXLogRecPtr : DatumGetLSN(d);
     955             : 
     956          76 :         remote_slot->failover = DatumGetBool(slot_getattr(tupslot, ++col,
     957             :                                                           &isnull));
     958             :         Assert(!isnull);
     959             : 
     960          76 :         remote_slot->database = TextDatumGetCString(slot_getattr(tupslot,
     961             :                                                                  ++col, &isnull));
     962             :         Assert(!isnull);
     963             : 
     964          76 :         d = slot_getattr(tupslot, ++col, &isnull);
     965          76 :         remote_slot->invalidated = isnull ? RS_INVAL_NONE :
     966           0 :             GetSlotInvalidationCause(TextDatumGetCString(d));
     967             : 
     968             :         /* Sanity check */
     969             :         Assert(col == SLOTSYNC_COLUMN_COUNT);
     970             : 
     971             :         /*
     972             :          * If restart_lsn, confirmed_lsn or catalog_xmin is invalid but the
     973             :          * slot is valid, that means we have fetched the remote_slot in its
     974             :          * RS_EPHEMERAL state. In such a case, don't sync it; we can always
     975             :          * sync it in the next sync cycle when the remote_slot is persisted
     976             :          * and has valid lsn(s) and xmin values.
     977             :          *
     978             :          * XXX: In future, if we plan to expose 'slot->data.persistency' in
     979             :          * pg_replication_slots view, then we can avoid fetching RS_EPHEMERAL
     980             :          * slots in the first place.
     981             :          */
     982          76 :         if ((!XLogRecPtrIsValid(remote_slot->restart_lsn) ||
     983          76 :              !XLogRecPtrIsValid(remote_slot->confirmed_lsn) ||
     984          76 :              !TransactionIdIsValid(remote_slot->catalog_xmin)) &&
     985           0 :             remote_slot->invalidated == RS_INVAL_NONE)
     986           0 :             pfree(remote_slot);
     987             :         else
     988             :             /* Create list of remote slots */
     989          76 :             remote_slot_list = lappend(remote_slot_list, remote_slot);
     990             : 
     991          76 :         ExecClearTuple(tupslot);
     992             :     }
     993             : 
     994             :     /* Drop local slots that no longer need to be synced. */
     995          42 :     drop_local_obsolete_slots(remote_slot_list);
     996             : 
     997             :     /* Now sync the slots locally */
     998         160 :     foreach_ptr(RemoteSlot, remote_slot, remote_slot_list)
     999             :     {
    1000          76 :         Oid         remote_dbid = get_database_oid(remote_slot->database, false);
    1001             : 
    1002             :         /*
    1003             :          * Use shared lock to prevent a conflict with
    1004             :          * ReplicationSlotsDropDBSlots(), trying to drop the same slot during
    1005             :          * a drop-database operation.
    1006             :          */
    1007          76 :         LockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock);
    1008             : 
    1009          76 :         some_slot_updated |= synchronize_one_slot(remote_slot, remote_dbid);
    1010             : 
    1011          76 :         UnlockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock);
    1012             :     }
    1013             : 
    1014             :     /* We are done, free remote_slot_list elements */
    1015          42 :     list_free_deep(remote_slot_list);
    1016             : 
    1017          42 :     walrcv_clear_result(res);
    1018             : 
    1019          42 :     if (started_tx)
    1020          28 :         CommitTransactionCommand();
    1021             : 
    1022          42 :     return some_slot_updated;
    1023             : }
    1024             : 
    1025             : /*
    1026             :  * Checks the remote server info.
    1027             :  *
    1028             :  * We ensure that the 'primary_slot_name' exists on the remote server and the
    1029             :  * remote server is not a standby node.
    1030             :  */
    1031             : static void
    1032          26 : validate_remote_info(WalReceiverConn *wrconn)
    1033             : {
    1034             : #define PRIMARY_INFO_OUTPUT_COL_COUNT 2
    1035             :     WalRcvExecResult *res;
    1036          26 :     Oid         slotRow[PRIMARY_INFO_OUTPUT_COL_COUNT] = {BOOLOID, BOOLOID};
    1037             :     StringInfoData cmd;
    1038             :     bool        isnull;
    1039             :     TupleTableSlot *tupslot;
    1040             :     bool        remote_in_recovery;
    1041             :     bool        primary_slot_valid;
    1042          26 :     bool        started_tx = false;
    1043             : 
    1044          26 :     initStringInfo(&cmd);
    1045          26 :     appendStringInfo(&cmd,
    1046             :                      "SELECT pg_is_in_recovery(), count(*) = 1"
    1047             :                      " FROM pg_catalog.pg_replication_slots"
    1048             :                      " WHERE slot_type='physical' AND slot_name=%s",
    1049             :                      quote_literal_cstr(PrimarySlotName));
    1050             : 
    1051             :     /* The syscache access in walrcv_exec() needs a transaction env. */
    1052          26 :     if (!IsTransactionState())
    1053             :     {
    1054          10 :         StartTransactionCommand();
    1055          10 :         started_tx = true;
    1056             :     }
    1057             : 
    1058          26 :     res = walrcv_exec(wrconn, cmd.data, PRIMARY_INFO_OUTPUT_COL_COUNT, slotRow);
    1059          26 :     pfree(cmd.data);
    1060             : 
    1061          26 :     if (res->status != WALRCV_OK_TUPLES)
    1062           0 :         ereport(ERROR,
    1063             :                 errmsg("could not fetch primary slot name \"%s\" info from the primary server: %s",
    1064             :                        PrimarySlotName, res->err),
    1065             :                 errhint("Check if \"primary_slot_name\" is configured correctly."));
    1066             : 
    1067          26 :     tupslot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
    1068          26 :     if (!tuplestore_gettupleslot(res->tuplestore, true, false, tupslot))
    1069           0 :         elog(ERROR,
    1070             :              "failed to fetch tuple for the primary server slot specified by \"primary_slot_name\"");
    1071             : 
    1072          26 :     remote_in_recovery = DatumGetBool(slot_getattr(tupslot, 1, &isnull));
    1073             :     Assert(!isnull);
    1074             : 
    1075             :     /*
    1076             :      * Slot sync is currently not supported on a cascading standby. This is
    1077             :      * because if we allow it, the primary server needs to wait for all the
    1078             :      * cascading standbys, otherwise, logical subscribers can still be ahead
    1079             :      * of one of the cascading standbys which we plan to promote. Thus, to
    1080             :      * avoid this additional complexity, we restrict it for the time being.
    1081             :      */
    1082          26 :     if (remote_in_recovery)
    1083           2 :         ereport(ERROR,
    1084             :                 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1085             :                 errmsg("cannot synchronize replication slots from a standby server"));
    1086             : 
    1087          24 :     primary_slot_valid = DatumGetBool(slot_getattr(tupslot, 2, &isnull));
    1088             :     Assert(!isnull);
    1089             : 
    1090          24 :     if (!primary_slot_valid)
    1091           0 :         ereport(ERROR,
    1092             :                 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1093             :         /* translator: second %s is a GUC variable name */
    1094             :                 errmsg("replication slot \"%s\" specified by \"%s\" does not exist on primary server",
    1095             :                        PrimarySlotName, "primary_slot_name"));
    1096             : 
    1097          24 :     ExecClearTuple(tupslot);
    1098          24 :     walrcv_clear_result(res);
    1099             : 
    1100          24 :     if (started_tx)
    1101          10 :         CommitTransactionCommand();
    1102          24 : }
    1103             : 
    1104             : /*
    1105             :  * Checks if dbname is specified in 'primary_conninfo'.
    1106             :  *
    1107             :  * Error out if not specified otherwise return it.
    1108             :  */
    1109             : char *
    1110          28 : CheckAndGetDbnameFromConninfo(void)
    1111             : {
    1112             :     char       *dbname;
    1113             : 
    1114             :     /*
    1115             :      * The slot synchronization needs a database connection for walrcv_exec to
    1116             :      * work.
    1117             :      */
    1118          28 :     dbname = walrcv_get_dbname_from_conninfo(PrimaryConnInfo);
    1119          28 :     if (dbname == NULL)
    1120           2 :         ereport(ERROR,
    1121             :                 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1122             : 
    1123             :         /*
    1124             :          * translator: first %s is a connection option; second %s is a GUC
    1125             :          * variable name
    1126             :          */
    1127             :                 errmsg("replication slot synchronization requires \"%s\" to be specified in \"%s\"",
    1128             :                        "dbname", "primary_conninfo"));
    1129          26 :     return dbname;
    1130             : }
    1131             : 
    1132             : /*
    1133             :  * Return true if all necessary GUCs for slot synchronization are set
    1134             :  * appropriately, otherwise, return false.
    1135             :  */
    1136             : bool
    1137          42 : ValidateSlotSyncParams(int elevel)
    1138             : {
    1139             :     /*
    1140             :      * Logical slot sync/creation requires wal_level >= logical.
    1141             :      */
    1142          42 :     if (wal_level < WAL_LEVEL_LOGICAL)
    1143             :     {
    1144           0 :         ereport(elevel,
    1145             :                 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1146             :                 errmsg("replication slot synchronization requires \"wal_level\" >= \"logical\""));
    1147           0 :         return false;
    1148             :     }
    1149             : 
    1150             :     /*
    1151             :      * A physical replication slot(primary_slot_name) is required on the
    1152             :      * primary to ensure that the rows needed by the standby are not removed
    1153             :      * after restarting, so that the synchronized slot on the standby will not
    1154             :      * be invalidated.
    1155             :      */
    1156          42 :     if (PrimarySlotName == NULL || *PrimarySlotName == '\0')
    1157             :     {
    1158           0 :         ereport(elevel,
    1159             :                 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1160             :         /* translator: %s is a GUC variable name */
    1161             :                 errmsg("replication slot synchronization requires \"%s\" to be set", "primary_slot_name"));
    1162           0 :         return false;
    1163             :     }
    1164             : 
    1165             :     /*
    1166             :      * hot_standby_feedback must be enabled to cooperate with the physical
    1167             :      * replication slot, which allows informing the primary about the xmin and
    1168             :      * catalog_xmin values on the standby.
    1169             :      */
    1170          42 :     if (!hot_standby_feedback)
    1171             :     {
    1172           2 :         ereport(elevel,
    1173             :                 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1174             :         /* translator: %s is a GUC variable name */
    1175             :                 errmsg("replication slot synchronization requires \"%s\" to be enabled",
    1176             :                        "hot_standby_feedback"));
    1177           2 :         return false;
    1178             :     }
    1179             : 
    1180             :     /*
    1181             :      * The primary_conninfo is required to make connection to primary for
    1182             :      * getting slots information.
    1183             :      */
    1184          40 :     if (PrimaryConnInfo == NULL || *PrimaryConnInfo == '\0')
    1185             :     {
    1186           0 :         ereport(elevel,
    1187             :                 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1188             :         /* translator: %s is a GUC variable name */
    1189             :                 errmsg("replication slot synchronization requires \"%s\" to be set",
    1190             :                        "primary_conninfo"));
    1191           0 :         return false;
    1192             :     }
    1193             : 
    1194          40 :     return true;
    1195             : }
    1196             : 
    1197             : /*
    1198             :  * Re-read the config file.
    1199             :  *
    1200             :  * Exit if any of the slot sync GUCs have changed. The postmaster will
    1201             :  * restart it.
    1202             :  */
    1203             : static void
    1204           2 : slotsync_reread_config(void)
    1205             : {
    1206           2 :     char       *old_primary_conninfo = pstrdup(PrimaryConnInfo);
    1207           2 :     char       *old_primary_slotname = pstrdup(PrimarySlotName);
    1208           2 :     bool        old_sync_replication_slots = sync_replication_slots;
    1209           2 :     bool        old_hot_standby_feedback = hot_standby_feedback;
    1210             :     bool        conninfo_changed;
    1211             :     bool        primary_slotname_changed;
    1212             : 
    1213             :     Assert(sync_replication_slots);
    1214             : 
    1215           2 :     ConfigReloadPending = false;
    1216           2 :     ProcessConfigFile(PGC_SIGHUP);
    1217             : 
    1218           2 :     conninfo_changed = strcmp(old_primary_conninfo, PrimaryConnInfo) != 0;
    1219           2 :     primary_slotname_changed = strcmp(old_primary_slotname, PrimarySlotName) != 0;
    1220           2 :     pfree(old_primary_conninfo);
    1221           2 :     pfree(old_primary_slotname);
    1222             : 
    1223           2 :     if (old_sync_replication_slots != sync_replication_slots)
    1224             :     {
    1225           0 :         ereport(LOG,
    1226             :         /* translator: %s is a GUC variable name */
    1227             :                 errmsg("replication slot synchronization worker will shut down because \"%s\" is disabled", "sync_replication_slots"));
    1228           0 :         proc_exit(0);
    1229             :     }
    1230             : 
    1231           2 :     if (conninfo_changed ||
    1232           2 :         primary_slotname_changed ||
    1233           2 :         (old_hot_standby_feedback != hot_standby_feedback))
    1234             :     {
    1235           2 :         ereport(LOG,
    1236             :                 errmsg("replication slot synchronization worker will restart because of a parameter change"));
    1237             : 
    1238             :         /*
    1239             :          * Reset the last-start time for this worker so that the postmaster
    1240             :          * can restart it without waiting for SLOTSYNC_RESTART_INTERVAL_SEC.
    1241             :          */
    1242           2 :         SlotSyncCtx->last_start_time = 0;
    1243             : 
    1244           2 :         proc_exit(0);
    1245             :     }
    1246             : 
    1247           0 : }
    1248             : 
    1249             : /*
    1250             :  * Interrupt handler for main loop of slot sync worker.
    1251             :  */
    1252             : static void
    1253          38 : ProcessSlotSyncInterrupts(void)
    1254             : {
    1255          38 :     CHECK_FOR_INTERRUPTS();
    1256             : 
    1257          34 :     if (ShutdownRequestPending)
    1258             :     {
    1259           2 :         ereport(LOG,
    1260             :                 errmsg("replication slot synchronization worker is shutting down on receiving SIGINT"));
    1261             : 
    1262           2 :         proc_exit(0);
    1263             :     }
    1264             : 
    1265          32 :     if (ConfigReloadPending)
    1266           2 :         slotsync_reread_config();
    1267          30 : }
    1268             : 
    1269             : /*
    1270             :  * Connection cleanup function for slotsync worker.
    1271             :  *
    1272             :  * Called on slotsync worker exit.
    1273             :  */
    1274             : static void
    1275          10 : slotsync_worker_disconnect(int code, Datum arg)
    1276             : {
    1277          10 :     WalReceiverConn *wrconn = (WalReceiverConn *) DatumGetPointer(arg);
    1278             : 
    1279          10 :     walrcv_disconnect(wrconn);
    1280          10 : }
    1281             : 
    1282             : /*
    1283             :  * Cleanup function for slotsync worker.
    1284             :  *
    1285             :  * Called on slotsync worker exit.
    1286             :  */
    1287             : static void
    1288          10 : slotsync_worker_onexit(int code, Datum arg)
    1289             : {
    1290             :     /*
    1291             :      * We need to do slots cleanup here just like WalSndErrorCleanup() does.
    1292             :      *
    1293             :      * The startup process during promotion invokes ShutDownSlotSync() which
    1294             :      * waits for slot sync to finish and it does that by checking the
    1295             :      * 'syncing' flag. Thus the slot sync worker must be done with slots'
    1296             :      * release and cleanup to avoid any dangling temporary slots or active
    1297             :      * slots before it marks itself as finished syncing.
    1298             :      */
    1299             : 
    1300             :     /* Make sure active replication slots are released */
    1301          10 :     if (MyReplicationSlot != NULL)
    1302           0 :         ReplicationSlotRelease();
    1303             : 
    1304             :     /* Also cleanup the temporary slots. */
    1305          10 :     ReplicationSlotCleanup(false);
    1306             : 
    1307          10 :     SpinLockAcquire(&SlotSyncCtx->mutex);
    1308             : 
    1309          10 :     SlotSyncCtx->pid = InvalidPid;
    1310             : 
    1311             :     /*
    1312             :      * If syncing_slots is true, it indicates that the process errored out
    1313             :      * without resetting the flag. So, we need to clean up shared memory and
    1314             :      * reset the flag here.
    1315             :      */
    1316          10 :     if (syncing_slots)
    1317             :     {
    1318          10 :         SlotSyncCtx->syncing = false;
    1319          10 :         syncing_slots = false;
    1320             :     }
    1321             : 
    1322          10 :     SpinLockRelease(&SlotSyncCtx->mutex);
    1323          10 : }
    1324             : 
    1325             : /*
    1326             :  * Sleep for long enough that we believe it's likely that the slots on primary
    1327             :  * get updated.
    1328             :  *
    1329             :  * If there is no slot activity the wait time between sync-cycles will double
    1330             :  * (to a maximum of 30s). If there is some slot activity the wait time between
    1331             :  * sync-cycles is reset to the minimum (200ms).
    1332             :  */
    1333             : static void
    1334          28 : wait_for_slot_activity(bool some_slot_updated)
    1335             : {
    1336             :     int         rc;
    1337             : 
    1338          28 :     if (!some_slot_updated)
    1339             :     {
    1340             :         /*
    1341             :          * No slots were updated, so double the sleep time, but not beyond the
    1342             :          * maximum allowable value.
    1343             :          */
    1344          12 :         sleep_ms = Min(sleep_ms * 2, MAX_SLOTSYNC_WORKER_NAPTIME_MS);
    1345             :     }
    1346             :     else
    1347             :     {
    1348             :         /*
    1349             :          * Some slots were updated since the last sleep, so reset the sleep
    1350             :          * time.
    1351             :          */
    1352          16 :         sleep_ms = MIN_SLOTSYNC_WORKER_NAPTIME_MS;
    1353             :     }
    1354             : 
    1355          28 :     rc = WaitLatch(MyLatch,
    1356             :                    WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
    1357             :                    sleep_ms,
    1358             :                    WAIT_EVENT_REPLICATION_SLOTSYNC_MAIN);
    1359             : 
    1360          28 :     if (rc & WL_LATCH_SET)
    1361           8 :         ResetLatch(MyLatch);
    1362          28 : }
    1363             : 
    1364             : /*
    1365             :  * Emit an error if a promotion or a concurrent sync call is in progress.
    1366             :  * Otherwise, advertise that a sync is in progress.
    1367             :  */
    1368             : static void
    1369          26 : check_and_set_sync_info(pid_t worker_pid)
    1370             : {
    1371          26 :     SpinLockAcquire(&SlotSyncCtx->mutex);
    1372             : 
    1373             :     /* The worker pid must not be already assigned in SlotSyncCtx */
    1374             :     Assert(worker_pid == InvalidPid || SlotSyncCtx->pid == InvalidPid);
    1375             : 
    1376             :     /*
    1377             :      * Emit an error if startup process signaled the slot sync machinery to
    1378             :      * stop. See comments atop SlotSyncCtxStruct.
    1379             :      */
    1380          26 :     if (SlotSyncCtx->stopSignaled)
    1381             :     {
    1382           0 :         SpinLockRelease(&SlotSyncCtx->mutex);
    1383           0 :         ereport(ERROR,
    1384             :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1385             :                 errmsg("cannot synchronize replication slots when standby promotion is ongoing"));
    1386             :     }
    1387             : 
    1388          26 :     if (SlotSyncCtx->syncing)
    1389             :     {
    1390           0 :         SpinLockRelease(&SlotSyncCtx->mutex);
    1391           0 :         ereport(ERROR,
    1392             :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1393             :                 errmsg("cannot synchronize replication slots concurrently"));
    1394             :     }
    1395             : 
    1396          26 :     SlotSyncCtx->syncing = true;
    1397             : 
    1398             :     /*
    1399             :      * Advertise the required PID so that the startup process can kill the
    1400             :      * slot sync worker on promotion.
    1401             :      */
    1402          26 :     SlotSyncCtx->pid = worker_pid;
    1403             : 
    1404          26 :     SpinLockRelease(&SlotSyncCtx->mutex);
    1405             : 
    1406          26 :     syncing_slots = true;
    1407          26 : }
    1408             : 
    1409             : /*
    1410             :  * Reset syncing flag.
    1411             :  */
    1412             : static void
    1413          16 : reset_syncing_flag(void)
    1414             : {
    1415          16 :     SpinLockAcquire(&SlotSyncCtx->mutex);
    1416          16 :     SlotSyncCtx->syncing = false;
    1417          16 :     SpinLockRelease(&SlotSyncCtx->mutex);
    1418             : 
    1419          16 :     syncing_slots = false;
    1420          16 : }
    1421             : 
    1422             : /*
    1423             :  * The main loop of our worker process.
    1424             :  *
    1425             :  * It connects to the primary server, fetches logical failover slots
    1426             :  * information periodically in order to create and sync the slots.
    1427             :  */
    1428             : void
    1429          10 : ReplSlotSyncWorkerMain(const void *startup_data, size_t startup_data_len)
    1430             : {
    1431          10 :     WalReceiverConn *wrconn = NULL;
    1432             :     char       *dbname;
    1433             :     char       *err;
    1434             :     sigjmp_buf  local_sigjmp_buf;
    1435             :     StringInfoData app_name;
    1436             : 
    1437             :     Assert(startup_data_len == 0);
    1438             : 
    1439          10 :     MyBackendType = B_SLOTSYNC_WORKER;
    1440             : 
    1441          10 :     init_ps_display(NULL);
    1442             : 
    1443             :     Assert(GetProcessingMode() == InitProcessing);
    1444             : 
    1445             :     /*
    1446             :      * Create a per-backend PGPROC struct in shared memory.  We must do this
    1447             :      * before we access any shared memory.
    1448             :      */
    1449          10 :     InitProcess();
    1450             : 
    1451             :     /*
    1452             :      * Early initialization.
    1453             :      */
    1454          10 :     BaseInit();
    1455             : 
    1456             :     Assert(SlotSyncCtx != NULL);
    1457             : 
    1458             :     /*
    1459             :      * If an exception is encountered, processing resumes here.
    1460             :      *
    1461             :      * We just need to clean up, report the error, and go away.
    1462             :      *
    1463             :      * If we do not have this handling here, then since this worker process
    1464             :      * operates at the bottom of the exception stack, ERRORs turn into FATALs.
    1465             :      * Therefore, we create our own exception handler to catch ERRORs.
    1466             :      */
    1467          10 :     if (sigsetjmp(local_sigjmp_buf, 1) != 0)
    1468             :     {
    1469             :         /* since not using PG_TRY, must reset error stack by hand */
    1470           2 :         error_context_stack = NULL;
    1471             : 
    1472             :         /* Prevents interrupts while cleaning up */
    1473           2 :         HOLD_INTERRUPTS();
    1474             : 
    1475             :         /* Report the error to the server log */
    1476           2 :         EmitErrorReport();
    1477             : 
    1478             :         /*
    1479             :          * We can now go away.  Note that because we called InitProcess, a
    1480             :          * callback was registered to do ProcKill, which will clean up
    1481             :          * necessary state.
    1482             :          */
    1483           2 :         proc_exit(0);
    1484             :     }
    1485             : 
    1486             :     /* We can now handle ereport(ERROR) */
    1487          10 :     PG_exception_stack = &local_sigjmp_buf;
    1488             : 
    1489             :     /* Setup signal handling */
    1490          10 :     pqsignal(SIGHUP, SignalHandlerForConfigReload);
    1491          10 :     pqsignal(SIGINT, SignalHandlerForShutdownRequest);
    1492          10 :     pqsignal(SIGTERM, die);
    1493          10 :     pqsignal(SIGFPE, FloatExceptionHandler);
    1494          10 :     pqsignal(SIGUSR1, procsignal_sigusr1_handler);
    1495          10 :     pqsignal(SIGUSR2, SIG_IGN);
    1496          10 :     pqsignal(SIGPIPE, SIG_IGN);
    1497          10 :     pqsignal(SIGCHLD, SIG_DFL);
    1498             : 
    1499          10 :     check_and_set_sync_info(MyProcPid);
    1500             : 
    1501          10 :     ereport(LOG, errmsg("slot sync worker started"));
    1502             : 
    1503             :     /* Register it as soon as SlotSyncCtx->pid is initialized. */
    1504          10 :     before_shmem_exit(slotsync_worker_onexit, (Datum) 0);
    1505             : 
    1506             :     /*
    1507             :      * Establishes SIGALRM handler and initialize timeout module. It is needed
    1508             :      * by InitPostgres to register different timeouts.
    1509             :      */
    1510          10 :     InitializeTimeouts();
    1511             : 
    1512             :     /* Load the libpq-specific functions */
    1513          10 :     load_file("libpqwalreceiver", false);
    1514             : 
    1515             :     /*
    1516             :      * Unblock signals (they were blocked when the postmaster forked us)
    1517             :      */
    1518          10 :     sigprocmask(SIG_SETMASK, &UnBlockSig, NULL);
    1519             : 
    1520             :     /*
    1521             :      * Set always-secure search path, so malicious users can't redirect user
    1522             :      * code (e.g. operators).
    1523             :      *
    1524             :      * It's not strictly necessary since we won't be scanning or writing to
    1525             :      * any user table locally, but it's good to retain it here for added
    1526             :      * precaution.
    1527             :      */
    1528          10 :     SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
    1529             : 
    1530          10 :     dbname = CheckAndGetDbnameFromConninfo();
    1531             : 
    1532             :     /*
    1533             :      * Connect to the database specified by the user in primary_conninfo. We
    1534             :      * need a database connection for walrcv_exec to work which we use to
    1535             :      * fetch slot information from the remote node. See comments atop
    1536             :      * libpqrcv_exec.
    1537             :      *
    1538             :      * We do not specify a specific user here since the slot sync worker will
    1539             :      * operate as a superuser. This is safe because the slot sync worker does
    1540             :      * not interact with user tables, eliminating the risk of executing
    1541             :      * arbitrary code within triggers.
    1542             :      */
    1543          10 :     InitPostgres(dbname, InvalidOid, NULL, InvalidOid, 0, NULL);
    1544             : 
    1545          10 :     SetProcessingMode(NormalProcessing);
    1546             : 
    1547          10 :     initStringInfo(&app_name);
    1548          10 :     if (cluster_name[0])
    1549          10 :         appendStringInfo(&app_name, "%s_%s", cluster_name, "slotsync worker");
    1550             :     else
    1551           0 :         appendStringInfoString(&app_name, "slotsync worker");
    1552             : 
    1553             :     /*
    1554             :      * Establish the connection to the primary server for slot
    1555             :      * synchronization.
    1556             :      */
    1557          10 :     wrconn = walrcv_connect(PrimaryConnInfo, false, false, false,
    1558             :                             app_name.data, &err);
    1559             : 
    1560          10 :     if (!wrconn)
    1561           0 :         ereport(ERROR,
    1562             :                 errcode(ERRCODE_CONNECTION_FAILURE),
    1563             :                 errmsg("synchronization worker \"%s\" could not connect to the primary server: %s",
    1564             :                        app_name.data, err));
    1565             : 
    1566          10 :     pfree(app_name.data);
    1567             : 
    1568             :     /*
    1569             :      * Register the disconnection callback.
    1570             :      *
    1571             :      * XXX: This can be combined with previous cleanup registration of
    1572             :      * slotsync_worker_onexit() but that will need the connection to be made
    1573             :      * global and we want to avoid introducing global for this purpose.
    1574             :      */
    1575          10 :     before_shmem_exit(slotsync_worker_disconnect, PointerGetDatum(wrconn));
    1576             : 
    1577             :     /*
    1578             :      * Using the specified primary server connection, check that we are not a
    1579             :      * cascading standby and slot configured in 'primary_slot_name' exists on
    1580             :      * the primary server.
    1581             :      */
    1582          10 :     validate_remote_info(wrconn);
    1583             : 
    1584             :     /* Main loop to synchronize slots */
    1585             :     for (;;)
    1586          28 :     {
    1587          38 :         bool        some_slot_updated = false;
    1588             : 
    1589          38 :         ProcessSlotSyncInterrupts();
    1590             : 
    1591          30 :         some_slot_updated = synchronize_slots(wrconn);
    1592             : 
    1593          28 :         wait_for_slot_activity(some_slot_updated);
    1594             :     }
    1595             : 
    1596             :     /*
    1597             :      * The slot sync worker can't get here because it will only stop when it
    1598             :      * receives a SIGINT from the startup process, or when there is an error.
    1599             :      */
    1600             :     Assert(false);
    1601             : }
    1602             : 
    1603             : /*
    1604             :  * Update the inactive_since property for synced slots.
    1605             :  *
    1606             :  * Note that this function is currently called when we shutdown the slot
    1607             :  * sync machinery.
    1608             :  */
    1609             : static void
    1610        1806 : update_synced_slots_inactive_since(void)
    1611             : {
    1612        1806 :     TimestampTz now = 0;
    1613             : 
    1614             :     /*
    1615             :      * We need to update inactive_since only when we are promoting standby to
    1616             :      * correctly interpret the inactive_since if the standby gets promoted
    1617             :      * without a restart. We don't want the slots to appear inactive for a
    1618             :      * long time after promotion if they haven't been synchronized recently.
    1619             :      * Whoever acquires the slot, i.e., makes the slot active, will reset it.
    1620             :      */
    1621        1806 :     if (!StandbyMode)
    1622        1710 :         return;
    1623             : 
    1624             :     /* The slot sync worker or SQL function mustn't be running by now */
    1625             :     Assert((SlotSyncCtx->pid == InvalidPid) && !SlotSyncCtx->syncing);
    1626             : 
    1627          96 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    1628             : 
    1629        1032 :     for (int i = 0; i < max_replication_slots; i++)
    1630             :     {
    1631         936 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
    1632             : 
    1633             :         /* Check if it is a synchronized slot */
    1634         936 :         if (s->in_use && s->data.synced)
    1635             :         {
    1636             :             Assert(SlotIsLogical(s));
    1637             : 
    1638             :             /* The slot must not be acquired by any process */
    1639             :             Assert(s->active_pid == 0);
    1640             : 
    1641             :             /* Use the same inactive_since time for all the slots. */
    1642           6 :             if (now == 0)
    1643           4 :                 now = GetCurrentTimestamp();
    1644             : 
    1645           6 :             ReplicationSlotSetInactiveSince(s, now, true);
    1646             :         }
    1647             :     }
    1648             : 
    1649          96 :     LWLockRelease(ReplicationSlotControlLock);
    1650             : }
    1651             : 
    1652             : /*
    1653             :  * Shut down the slot sync worker.
    1654             :  *
    1655             :  * This function sends signal to shutdown slot sync worker, if required. It
    1656             :  * also waits till the slot sync worker has exited or
    1657             :  * pg_sync_replication_slots() has finished.
    1658             :  */
    1659             : void
    1660        1806 : ShutDownSlotSync(void)
    1661             : {
    1662             :     pid_t       worker_pid;
    1663             : 
    1664        1806 :     SpinLockAcquire(&SlotSyncCtx->mutex);
    1665             : 
    1666        1806 :     SlotSyncCtx->stopSignaled = true;
    1667             : 
    1668             :     /*
    1669             :      * Return if neither the slot sync worker is running nor the function
    1670             :      * pg_sync_replication_slots() is executing.
    1671             :      */
    1672        1806 :     if (!SlotSyncCtx->syncing)
    1673             :     {
    1674        1804 :         SpinLockRelease(&SlotSyncCtx->mutex);
    1675        1804 :         update_synced_slots_inactive_since();
    1676        1804 :         return;
    1677             :     }
    1678             : 
    1679           2 :     worker_pid = SlotSyncCtx->pid;
    1680             : 
    1681           2 :     SpinLockRelease(&SlotSyncCtx->mutex);
    1682             : 
    1683           2 :     if (worker_pid != InvalidPid)
    1684           2 :         kill(worker_pid, SIGINT);
    1685             : 
    1686             :     /* Wait for slot sync to end */
    1687             :     for (;;)
    1688           0 :     {
    1689             :         int         rc;
    1690             : 
    1691             :         /* Wait a bit, we don't expect to have to wait long */
    1692           2 :         rc = WaitLatch(MyLatch,
    1693             :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
    1694             :                        10L, WAIT_EVENT_REPLICATION_SLOTSYNC_SHUTDOWN);
    1695             : 
    1696           2 :         if (rc & WL_LATCH_SET)
    1697             :         {
    1698           0 :             ResetLatch(MyLatch);
    1699           0 :             CHECK_FOR_INTERRUPTS();
    1700             :         }
    1701             : 
    1702           2 :         SpinLockAcquire(&SlotSyncCtx->mutex);
    1703             : 
    1704             :         /* Ensure that no process is syncing the slots. */
    1705           2 :         if (!SlotSyncCtx->syncing)
    1706           2 :             break;
    1707             : 
    1708           0 :         SpinLockRelease(&SlotSyncCtx->mutex);
    1709             :     }
    1710             : 
    1711           2 :     SpinLockRelease(&SlotSyncCtx->mutex);
    1712             : 
    1713           2 :     update_synced_slots_inactive_since();
    1714             : }
    1715             : 
    1716             : /*
    1717             :  * SlotSyncWorkerCanRestart
    1718             :  *
    1719             :  * Return true, indicating worker is allowed to restart, if enough time has
    1720             :  * passed since it was last launched to reach SLOTSYNC_RESTART_INTERVAL_SEC.
    1721             :  * Otherwise return false.
    1722             :  *
    1723             :  * This is a safety valve to protect against continuous respawn attempts if the
    1724             :  * worker is dying immediately at launch. Note that since we will retry to
    1725             :  * launch the worker from the postmaster main loop, we will get another
    1726             :  * chance later.
    1727             :  */
    1728             : bool
    1729          22 : SlotSyncWorkerCanRestart(void)
    1730             : {
    1731          22 :     time_t      curtime = time(NULL);
    1732             : 
    1733             :     /*
    1734             :      * If first time through, or time somehow went backwards, always update
    1735             :      * last_start_time to match the current clock and allow worker start.
    1736             :      * Otherwise allow it only once enough time has elapsed.
    1737             :      */
    1738          22 :     if (SlotSyncCtx->last_start_time == 0 ||
    1739          12 :         curtime < SlotSyncCtx->last_start_time ||
    1740          12 :         curtime - SlotSyncCtx->last_start_time >= SLOTSYNC_RESTART_INTERVAL_SEC)
    1741             :     {
    1742          10 :         SlotSyncCtx->last_start_time = curtime;
    1743          10 :         return true;
    1744             :     }
    1745          12 :     return false;
    1746             : }
    1747             : 
    1748             : /*
    1749             :  * Is current process syncing replication slots?
    1750             :  *
    1751             :  * Could be either backend executing SQL function or slot sync worker.
    1752             :  */
    1753             : bool
    1754          46 : IsSyncingReplicationSlots(void)
    1755             : {
    1756          46 :     return syncing_slots;
    1757             : }
    1758             : 
    1759             : /*
    1760             :  * Amount of shared memory required for slot synchronization.
    1761             :  */
    1762             : Size
    1763        6324 : SlotSyncShmemSize(void)
    1764             : {
    1765        6324 :     return sizeof(SlotSyncCtxStruct);
    1766             : }
    1767             : 
    1768             : /*
    1769             :  * Allocate and initialize the shared memory of slot synchronization.
    1770             :  */
    1771             : void
    1772        2208 : SlotSyncShmemInit(void)
    1773             : {
    1774        2208 :     Size        size = SlotSyncShmemSize();
    1775             :     bool        found;
    1776             : 
    1777        2208 :     SlotSyncCtx = (SlotSyncCtxStruct *)
    1778        2208 :         ShmemInitStruct("Slot Sync Data", size, &found);
    1779             : 
    1780        2208 :     if (!found)
    1781             :     {
    1782        2208 :         memset(SlotSyncCtx, 0, size);
    1783        2208 :         SlotSyncCtx->pid = InvalidPid;
    1784        2208 :         SpinLockInit(&SlotSyncCtx->mutex);
    1785             :     }
    1786        2208 : }
    1787             : 
    1788             : /*
    1789             :  * Error cleanup callback for slot sync SQL function.
    1790             :  */
    1791             : static void
    1792           2 : slotsync_failure_callback(int code, Datum arg)
    1793             : {
    1794           2 :     WalReceiverConn *wrconn = (WalReceiverConn *) DatumGetPointer(arg);
    1795             : 
    1796             :     /*
    1797             :      * We need to do slots cleanup here just like WalSndErrorCleanup() does.
    1798             :      *
    1799             :      * The startup process during promotion invokes ShutDownSlotSync() which
    1800             :      * waits for slot sync to finish and it does that by checking the
    1801             :      * 'syncing' flag. Thus the SQL function must be done with slots' release
    1802             :      * and cleanup to avoid any dangling temporary slots or active slots
    1803             :      * before it marks itself as finished syncing.
    1804             :      */
    1805             : 
    1806             :     /* Make sure active replication slots are released */
    1807           2 :     if (MyReplicationSlot != NULL)
    1808           0 :         ReplicationSlotRelease();
    1809             : 
    1810             :     /* Also cleanup the synced temporary slots. */
    1811           2 :     ReplicationSlotCleanup(true);
    1812             : 
    1813             :     /*
    1814             :      * The set syncing_slots indicates that the process errored out without
    1815             :      * resetting the flag. So, we need to clean up shared memory and reset the
    1816             :      * flag here.
    1817             :      */
    1818           2 :     if (syncing_slots)
    1819           2 :         reset_syncing_flag();
    1820             : 
    1821           2 :     walrcv_disconnect(wrconn);
    1822           2 : }
    1823             : 
    1824             : /*
    1825             :  * Synchronize the failover enabled replication slots using the specified
    1826             :  * primary server connection.
    1827             :  */
    1828             : void
    1829          16 : SyncReplicationSlots(WalReceiverConn *wrconn)
    1830             : {
    1831          16 :     PG_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn));
    1832             :     {
    1833          16 :         check_and_set_sync_info(InvalidPid);
    1834             : 
    1835          16 :         validate_remote_info(wrconn);
    1836             : 
    1837          14 :         synchronize_slots(wrconn);
    1838             : 
    1839             :         /* Cleanup the synced temporary slots */
    1840          14 :         ReplicationSlotCleanup(true);
    1841             : 
    1842             :         /* We are done with sync, so reset sync flag */
    1843          14 :         reset_syncing_flag();
    1844             :     }
    1845          16 :     PG_END_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn));
    1846          14 : }

Generated by: LCOV version 1.16