LCOV - code coverage report
Current view: top level - src/backend/replication/logical - slotsync.c (source / functions) Hit Total Coverage
Test: PostgreSQL 17devel Lines: 391 453 86.3 %
Date: 2024-04-28 23:11:55 Functions: 27 27 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  * slotsync.c
       3             :  *     Functionality for synchronizing slots to a standby server from the
       4             :  *         primary server.
       5             :  *
       6             :  * Copyright (c) 2024, PostgreSQL Global Development Group
       7             :  *
       8             :  * IDENTIFICATION
       9             :  *    src/backend/replication/logical/slotsync.c
      10             :  *
      11             :  * This file contains the code for slot synchronization on a physical standby
      12             :  * to fetch logical failover slots information from the primary server, create
      13             :  * the slots on the standby and synchronize them periodically.
      14             :  *
      15             :  * Slot synchronization can be performed either automatically by enabling slot
      16             :  * sync worker or manually by calling SQL function pg_sync_replication_slots().
      17             :  *
      18             :  * If the WAL corresponding to the remote's restart_lsn is not available on the
      19             :  * physical standby or the remote's catalog_xmin precedes the oldest xid for
      20             :  * which it is guaranteed that rows wouldn't have been removed then we cannot
      21             :  * create the local standby slot because that would mean moving the local slot
      22             :  * backward and decoding won't be possible via such a slot. In this case, the
      23             :  * slot will be marked as RS_TEMPORARY. Once the primary server catches up,
      24             :  * the slot will be marked as RS_PERSISTENT (which means sync-ready) after
      25             :  * which slot sync worker can perform the sync periodically or user can call
      26             :  * pg_sync_replication_slots() periodically to perform the syncs.
      27             :  *
      28             :  * If synchronized slots fail to build a consistent snapshot from the
      29             :  * restart_lsn before reaching confirmed_flush_lsn, they would become
      30             :  * unreliable after promotion due to potential data loss from changes
      31             :  * before reaching a consistent point. This can happen because the slots can
      32             :  * be synced at some random time and we may not reach the consistent point
      33             :  * at the same WAL location as the primary. So, we mark such slots as
      34             :  * RS_TEMPORARY. Once the decoding from corresponding LSNs can reach a
      35             :  * consistent point, they will be marked as RS_PERSISTENT.
      36             :  *
      37             :  * The slot sync worker waits for some time before the next synchronization,
      38             :  * with the duration varying based on whether any slots were updated during
      39             :  * the last cycle. Refer to the comments above wait_for_slot_activity() for
      40             :  * more details.
      41             :  *
      42             :  * Any standby synchronized slots will be dropped if they no longer need
      43             :  * to be synchronized. See comment atop drop_local_obsolete_slots() for more
      44             :  * details.
      45             :  *---------------------------------------------------------------------------
      46             :  */
      47             : 
      48             : #include "postgres.h"
      49             : 
      50             : #include <time.h>
      51             : 
      52             : #include "access/xlog_internal.h"
      53             : #include "access/xlogrecovery.h"
      54             : #include "catalog/pg_database.h"
      55             : #include "commands/dbcommands.h"
      56             : #include "libpq/pqsignal.h"
      57             : #include "pgstat.h"
      58             : #include "postmaster/fork_process.h"
      59             : #include "postmaster/interrupt.h"
      60             : #include "postmaster/postmaster.h"
      61             : #include "replication/logical.h"
      62             : #include "replication/slotsync.h"
      63             : #include "replication/snapbuild.h"
      64             : #include "storage/ipc.h"
      65             : #include "storage/lmgr.h"
      66             : #include "storage/proc.h"
      67             : #include "storage/procarray.h"
      68             : #include "tcop/tcopprot.h"
      69             : #include "utils/builtins.h"
      70             : #include "utils/pg_lsn.h"
      71             : #include "utils/ps_status.h"
      72             : #include "utils/timeout.h"
      73             : 
      74             : /*
      75             :  * Struct for sharing information to control slot synchronization.
      76             :  *
      77             :  * The slot sync worker's pid is needed by the startup process to shut it
      78             :  * down during promotion. The startup process shuts down the slot sync worker
      79             :  * and also sets stopSignaled=true to handle the race condition when the
      80             :  * postmaster has not noticed the promotion yet and thus may end up restarting
      81             :  * the slot sync worker. If stopSignaled is set, the worker will exit in such a
      82             :  * case. The SQL function pg_sync_replication_slots() will also error out if
      83             :  * this flag is set. Note that we don't need to reset this variable as after
      84             :  * promotion the slot sync worker won't be restarted because the pmState
      85             :  * changes to PM_RUN from PM_HOT_STANDBY and we don't support demoting
      86             :  * primary without restarting the server. See MaybeStartSlotSyncWorker.
      87             :  *
      88             :  * The 'syncing' flag is needed to prevent concurrent slot syncs to avoid slot
      89             :  * overwrites.
      90             :  *
      91             :  * The 'last_start_time' is needed by postmaster to start the slot sync worker
      92             :  * once per SLOTSYNC_RESTART_INTERVAL_SEC. In cases where an immediate restart
      93             :  * is expected (e.g., slot sync GUCs change), slot sync worker will reset
      94             :  * last_start_time before exiting, so that postmaster can start the worker
      95             :  * without waiting for SLOTSYNC_RESTART_INTERVAL_SEC.
      96             :  */
      97             : typedef struct SlotSyncCtxStruct
      98             : {
      99             :     pid_t       pid;
     100             :     bool        stopSignaled;
     101             :     bool        syncing;
     102             :     time_t      last_start_time;
     103             :     slock_t     mutex;
     104             : } SlotSyncCtxStruct;
     105             : 
     106             : SlotSyncCtxStruct *SlotSyncCtx = NULL;
     107             : 
     108             : /* GUC variable */
     109             : bool        sync_replication_slots = false;
     110             : 
     111             : /*
     112             :  * The sleep time (ms) between slot-sync cycles varies dynamically
     113             :  * (within a MIN/MAX range) according to slot activity. See
     114             :  * wait_for_slot_activity() for details.
     115             :  */
     116             : #define MIN_SLOTSYNC_WORKER_NAPTIME_MS  200
     117             : #define MAX_SLOTSYNC_WORKER_NAPTIME_MS  30000   /* 30s */
     118             : 
     119             : static long sleep_ms = MIN_SLOTSYNC_WORKER_NAPTIME_MS;
     120             : 
     121             : /* The restart interval for slot sync work used by postmaster */
     122             : #define SLOTSYNC_RESTART_INTERVAL_SEC 10
     123             : 
     124             : /*
     125             :  * Flag to tell if we are syncing replication slots. Unlike the 'syncing' flag
     126             :  * in SlotSyncCtxStruct, this flag is true only if the current process is
     127             :  * performing slot synchronization.
     128             :  */
     129             : static bool syncing_slots = false;
     130             : 
     131             : /*
     132             :  * Structure to hold information fetched from the primary server about a logical
     133             :  * replication slot.
     134             :  */
     135             : typedef struct RemoteSlot
     136             : {
     137             :     char       *name;
     138             :     char       *plugin;
     139             :     char       *database;
     140             :     bool        two_phase;
     141             :     bool        failover;
     142             :     XLogRecPtr  restart_lsn;
     143             :     XLogRecPtr  confirmed_lsn;
     144             :     TransactionId catalog_xmin;
     145             : 
     146             :     /* RS_INVAL_NONE if valid, or the reason of invalidation */
     147             :     ReplicationSlotInvalidationCause invalidated;
     148             : } RemoteSlot;
     149             : 
     150             : static void slotsync_failure_callback(int code, Datum arg);
     151             : static void update_synced_slots_inactive_since(void);
     152             : 
     153             : /*
     154             :  * If necessary, update the local synced slot's metadata based on the data
     155             :  * from the remote slot.
     156             :  *
     157             :  * If no update was needed (the data of the remote slot is the same as the
     158             :  * local slot) return false, otherwise true.
     159             :  *
     160             :  * *found_consistent_snapshot will be true iff the remote slot's LSN or xmin is
     161             :  * modified, and decoding from the corresponding LSN's can reach a
     162             :  * consistent snapshot.
     163             :  *
     164             :  * *remote_slot_precedes will be true if the remote slot's LSN or xmin
     165             :  * precedes locally reserved position.
     166             :  */
     167             : static bool
     168          56 : update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
     169             :                          bool *found_consistent_snapshot,
     170             :                          bool *remote_slot_precedes)
     171             : {
     172          56 :     ReplicationSlot *slot = MyReplicationSlot;
     173          56 :     bool        updated_xmin_or_lsn = false;
     174          56 :     bool        updated_config = false;
     175             : 
     176             :     Assert(slot->data.invalidated == RS_INVAL_NONE);
     177             : 
     178          56 :     if (found_consistent_snapshot)
     179           8 :         *found_consistent_snapshot = false;
     180             : 
     181          56 :     if (remote_slot_precedes)
     182           8 :         *remote_slot_precedes = false;
     183             : 
     184             :     /*
     185             :      * Don't overwrite if we already have a newer catalog_xmin and
     186             :      * restart_lsn.
     187             :      */
     188         112 :     if (remote_slot->restart_lsn < slot->data.restart_lsn ||
     189          56 :         TransactionIdPrecedes(remote_slot->catalog_xmin,
     190             :                               slot->data.catalog_xmin))
     191             :     {
     192             :         /*
     193             :          * This can happen in following situations:
     194             :          *
     195             :          * If the slot is temporary, it means either the initial WAL location
     196             :          * reserved for the local slot is ahead of the remote slot's
     197             :          * restart_lsn or the initial xmin_horizon computed for the local slot
     198             :          * is ahead of the remote slot.
     199             :          *
     200             :          * If the slot is persistent, restart_lsn of the synced slot could
     201             :          * still be ahead of the remote slot. Since we use slot advance
     202             :          * functionality to keep snapbuild/slot updated, it is possible that
     203             :          * the restart_lsn is advanced to a later position than it has on the
     204             :          * primary. This can happen when slot advancing machinery finds
     205             :          * running xacts record after reaching the consistent state at a later
     206             :          * point than the primary where it serializes the snapshot and updates
     207             :          * the restart_lsn.
     208             :          *
     209             :          * We LOG the message if the slot is temporary as it can help the user
     210             :          * to understand why the slot is not sync-ready. In the case of a
     211             :          * persistent slot, it would be a more common case and won't directly
     212             :          * impact the users, so we used DEBUG1 level to log the message.
     213             :          */
     214           0 :         ereport(slot->data.persistency == RS_TEMPORARY ? LOG : DEBUG1,
     215             :                 errmsg("could not sync slot \"%s\" as remote slot precedes local slot",
     216             :                        remote_slot->name),
     217             :                 errdetail("Remote slot has LSN %X/%X and catalog xmin %u, but local slot has LSN %X/%X and catalog xmin %u.",
     218             :                           LSN_FORMAT_ARGS(remote_slot->restart_lsn),
     219             :                           remote_slot->catalog_xmin,
     220             :                           LSN_FORMAT_ARGS(slot->data.restart_lsn),
     221             :                           slot->data.catalog_xmin));
     222             : 
     223           0 :         if (remote_slot_precedes)
     224           0 :             *remote_slot_precedes = true;
     225             :     }
     226             : 
     227             :     /*
     228             :      * Attempt to sync LSNs and xmins only if remote slot is ahead of local
     229             :      * slot.
     230             :      */
     231          56 :     else if (remote_slot->confirmed_lsn > slot->data.confirmed_flush ||
     232          74 :              remote_slot->restart_lsn > slot->data.restart_lsn ||
     233          36 :              TransactionIdFollows(remote_slot->catalog_xmin,
     234             :                                   slot->data.catalog_xmin))
     235             :     {
     236             :         /*
     237             :          * We can't directly copy the remote slot's LSN or xmin unless there
     238             :          * exists a consistent snapshot at that point. Otherwise, after
     239             :          * promotion, the slots may not reach a consistent point before the
     240             :          * confirmed_flush_lsn which can lead to a data loss. To avoid data
     241             :          * loss, we let slot machinery advance the slot which ensures that
     242             :          * snapbuilder/slot statuses are updated properly.
     243             :          */
     244          20 :         if (SnapBuildSnapshotExists(remote_slot->restart_lsn))
     245             :         {
     246             :             /*
     247             :              * Update the slot info directly if there is a serialized snapshot
     248             :              * at the restart_lsn, as the slot can quickly reach consistency
     249             :              * at restart_lsn by restoring the snapshot.
     250             :              */
     251           4 :             SpinLockAcquire(&slot->mutex);
     252           4 :             slot->data.restart_lsn = remote_slot->restart_lsn;
     253           4 :             slot->data.confirmed_flush = remote_slot->confirmed_lsn;
     254           4 :             slot->data.catalog_xmin = remote_slot->catalog_xmin;
     255           4 :             SpinLockRelease(&slot->mutex);
     256             : 
     257           4 :             if (found_consistent_snapshot)
     258           0 :                 *found_consistent_snapshot = true;
     259             :         }
     260             :         else
     261             :         {
     262          16 :             LogicalSlotAdvanceAndCheckSnapState(remote_slot->confirmed_lsn,
     263             :                                                 found_consistent_snapshot);
     264             : 
     265             :             /* Sanity check */
     266          16 :             if (slot->data.confirmed_flush != remote_slot->confirmed_lsn)
     267           0 :                 ereport(ERROR,
     268             :                         errmsg_internal("synchronized confirmed_flush for slot \"%s\" differs from remote slot",
     269             :                                         remote_slot->name),
     270             :                         errdetail_internal("Remote slot has LSN %X/%X but local slot has LSN %X/%X.",
     271             :                                            LSN_FORMAT_ARGS(remote_slot->confirmed_lsn),
     272             :                                            LSN_FORMAT_ARGS(slot->data.confirmed_flush)));
     273             :         }
     274             : 
     275          20 :         updated_xmin_or_lsn = true;
     276             :     }
     277             : 
     278          56 :     if (remote_dbid != slot->data.database ||
     279          56 :         remote_slot->two_phase != slot->data.two_phase ||
     280          56 :         remote_slot->failover != slot->data.failover ||
     281          56 :         strcmp(remote_slot->plugin, NameStr(slot->data.plugin)) != 0)
     282             :     {
     283             :         NameData    plugin_name;
     284             : 
     285             :         /* Avoid expensive operations while holding a spinlock. */
     286           0 :         namestrcpy(&plugin_name, remote_slot->plugin);
     287             : 
     288           0 :         SpinLockAcquire(&slot->mutex);
     289           0 :         slot->data.plugin = plugin_name;
     290           0 :         slot->data.database = remote_dbid;
     291           0 :         slot->data.two_phase = remote_slot->two_phase;
     292           0 :         slot->data.failover = remote_slot->failover;
     293           0 :         SpinLockRelease(&slot->mutex);
     294             : 
     295           0 :         updated_config = true;
     296             :     }
     297             : 
     298             :     /*
     299             :      * We have to write the changed xmin to disk *before* we change the
     300             :      * in-memory value, otherwise after a crash we wouldn't know that some
     301             :      * catalog tuples might have been removed already.
     302             :      */
     303          56 :     if (updated_config || updated_xmin_or_lsn)
     304             :     {
     305          20 :         ReplicationSlotMarkDirty();
     306          20 :         ReplicationSlotSave();
     307             :     }
     308             : 
     309             :     /*
     310             :      * Now the new xmin is safely on disk, we can let the global value
     311             :      * advance. We do not take ProcArrayLock or similar since we only advance
     312             :      * xmin here and there's not much harm done by a concurrent computation
     313             :      * missing that.
     314             :      */
     315          56 :     if (updated_xmin_or_lsn)
     316             :     {
     317          20 :         SpinLockAcquire(&slot->mutex);
     318          20 :         slot->effective_catalog_xmin = remote_slot->catalog_xmin;
     319          20 :         SpinLockRelease(&slot->mutex);
     320             : 
     321          20 :         ReplicationSlotsComputeRequiredXmin(false);
     322          20 :         ReplicationSlotsComputeRequiredLSN();
     323             :     }
     324             : 
     325          56 :     return updated_config || updated_xmin_or_lsn;
     326             : }
     327             : 
     328             : /*
     329             :  * Get the list of local logical slots that are synchronized from the
     330             :  * primary server.
     331             :  */
     332             : static List *
     333          30 : get_local_synced_slots(void)
     334             : {
     335          30 :     List       *local_slots = NIL;
     336             : 
     337          30 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     338             : 
     339         330 :     for (int i = 0; i < max_replication_slots; i++)
     340             :     {
     341         300 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
     342             : 
     343             :         /* Check if it is a synchronized slot */
     344         300 :         if (s->in_use && s->data.synced)
     345             :         {
     346             :             Assert(SlotIsLogical(s));
     347          52 :             local_slots = lappend(local_slots, s);
     348             :         }
     349             :     }
     350             : 
     351          30 :     LWLockRelease(ReplicationSlotControlLock);
     352             : 
     353          30 :     return local_slots;
     354             : }
     355             : 
     356             : /*
     357             :  * Helper function to check if local_slot is required to be retained.
     358             :  *
     359             :  * Return false either if local_slot does not exist in the remote_slots list
     360             :  * or is invalidated while the corresponding remote slot is still valid,
     361             :  * otherwise true.
     362             :  */
     363             : static bool
     364          52 : local_sync_slot_required(ReplicationSlot *local_slot, List *remote_slots)
     365             : {
     366          52 :     bool        remote_exists = false;
     367          52 :     bool        locally_invalidated = false;
     368             : 
     369         128 :     foreach_ptr(RemoteSlot, remote_slot, remote_slots)
     370             :     {
     371          74 :         if (strcmp(remote_slot->name, NameStr(local_slot->data.name)) == 0)
     372             :         {
     373          50 :             remote_exists = true;
     374             : 
     375             :             /*
     376             :              * If remote slot is not invalidated but local slot is marked as
     377             :              * invalidated, then set locally_invalidated flag.
     378             :              */
     379          50 :             SpinLockAcquire(&local_slot->mutex);
     380          50 :             locally_invalidated =
     381         100 :                 (remote_slot->invalidated == RS_INVAL_NONE) &&
     382          50 :                 (local_slot->data.invalidated != RS_INVAL_NONE);
     383          50 :             SpinLockRelease(&local_slot->mutex);
     384             : 
     385          50 :             break;
     386             :         }
     387             :     }
     388             : 
     389          52 :     return (remote_exists && !locally_invalidated);
     390             : }
     391             : 
     392             : /*
     393             :  * Drop local obsolete slots.
     394             :  *
     395             :  * Drop the local slots that no longer need to be synced i.e. these either do
     396             :  * not exist on the primary or are no longer enabled for failover.
     397             :  *
     398             :  * Additionally, drop any slots that are valid on the primary but got
     399             :  * invalidated on the standby. This situation may occur due to the following
     400             :  * reasons:
     401             :  * - The 'max_slot_wal_keep_size' on the standby is insufficient to retain WAL
     402             :  *   records from the restart_lsn of the slot.
     403             :  * - 'primary_slot_name' is temporarily reset to null and the physical slot is
     404             :  *   removed.
     405             :  * These dropped slots will get recreated in next sync-cycle and it is okay to
     406             :  * drop and recreate such slots as long as these are not consumable on the
     407             :  * standby (which is the case currently).
     408             :  *
     409             :  * Note: Change of 'wal_level' on the primary server to a level lower than
     410             :  * logical may also result in slot invalidation and removal on the standby.
     411             :  * This is because such 'wal_level' change is only possible if the logical
     412             :  * slots are removed on the primary server, so it's expected to see the
     413             :  * slots being invalidated and removed on the standby too (and re-created
     414             :  * if they are re-created on the primary server).
     415             :  */
     416             : static void
     417          30 : drop_local_obsolete_slots(List *remote_slot_list)
     418             : {
     419          30 :     List       *local_slots = get_local_synced_slots();
     420             : 
     421         112 :     foreach_ptr(ReplicationSlot, local_slot, local_slots)
     422             :     {
     423             :         /* Drop the local slot if it is not required to be retained. */
     424          52 :         if (!local_sync_slot_required(local_slot, remote_slot_list))
     425             :         {
     426             :             bool        synced_slot;
     427             : 
     428             :             /*
     429             :              * Use shared lock to prevent a conflict with
     430             :              * ReplicationSlotsDropDBSlots(), trying to drop the same slot
     431             :              * during a drop-database operation.
     432             :              */
     433           4 :             LockSharedObject(DatabaseRelationId, local_slot->data.database,
     434             :                              0, AccessShareLock);
     435             : 
     436             :             /*
     437             :              * In the small window between getting the slot to drop and
     438             :              * locking the database, there is a possibility of a parallel
     439             :              * database drop by the startup process and the creation of a new
     440             :              * slot by the user. This new user-created slot may end up using
     441             :              * the same shared memory as that of 'local_slot'. Thus check if
     442             :              * local_slot is still the synced one before performing actual
     443             :              * drop.
     444             :              */
     445           4 :             SpinLockAcquire(&local_slot->mutex);
     446           4 :             synced_slot = local_slot->in_use && local_slot->data.synced;
     447           4 :             SpinLockRelease(&local_slot->mutex);
     448             : 
     449           4 :             if (synced_slot)
     450             :             {
     451           4 :                 ReplicationSlotAcquire(NameStr(local_slot->data.name), true);
     452           4 :                 ReplicationSlotDropAcquired();
     453             :             }
     454             : 
     455           4 :             UnlockSharedObject(DatabaseRelationId, local_slot->data.database,
     456             :                                0, AccessShareLock);
     457             : 
     458           4 :             ereport(LOG,
     459             :                     errmsg("dropped replication slot \"%s\" of dbid %d",
     460             :                            NameStr(local_slot->data.name),
     461             :                            local_slot->data.database));
     462             :         }
     463             :     }
     464          30 : }
     465             : 
     466             : /*
     467             :  * Reserve WAL for the currently active local slot using the specified WAL
     468             :  * location (restart_lsn).
     469             :  *
     470             :  * If the given WAL location has been removed, reserve WAL using the oldest
     471             :  * existing WAL segment.
     472             :  */
     473             : static void
     474           8 : reserve_wal_for_local_slot(XLogRecPtr restart_lsn)
     475             : {
     476             :     XLogSegNo   oldest_segno;
     477             :     XLogSegNo   segno;
     478           8 :     ReplicationSlot *slot = MyReplicationSlot;
     479             : 
     480             :     Assert(slot != NULL);
     481             :     Assert(XLogRecPtrIsInvalid(slot->data.restart_lsn));
     482             : 
     483             :     while (true)
     484             :     {
     485           8 :         SpinLockAcquire(&slot->mutex);
     486           8 :         slot->data.restart_lsn = restart_lsn;
     487           8 :         SpinLockRelease(&slot->mutex);
     488             : 
     489             :         /* Prevent WAL removal as fast as possible */
     490           8 :         ReplicationSlotsComputeRequiredLSN();
     491             : 
     492           8 :         XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size);
     493             : 
     494             :         /*
     495             :          * Find the oldest existing WAL segment file.
     496             :          *
     497             :          * Normally, we can determine it by using the last removed segment
     498             :          * number. However, if no WAL segment files have been removed by a
     499             :          * checkpoint since startup, we need to search for the oldest segment
     500             :          * file from the current timeline existing in XLOGDIR.
     501             :          *
     502             :          * XXX: Currently, we are searching for the oldest segment in the
     503             :          * current timeline as there is less chance of the slot's restart_lsn
     504             :          * from being some prior timeline, and even if it happens, in the
     505             :          * worst case, we will wait to sync till the slot's restart_lsn moved
     506             :          * to the current timeline.
     507             :          */
     508           8 :         oldest_segno = XLogGetLastRemovedSegno() + 1;
     509             : 
     510           8 :         if (oldest_segno == 1)
     511             :         {
     512             :             TimeLineID  cur_timeline;
     513             : 
     514           4 :             GetWalRcvFlushRecPtr(NULL, &cur_timeline);
     515           4 :             oldest_segno = XLogGetOldestSegno(cur_timeline);
     516             :         }
     517             : 
     518           8 :         elog(DEBUG1, "segno: " UINT64_FORMAT " of purposed restart_lsn for the synced slot, oldest_segno: " UINT64_FORMAT " available",
     519             :              segno, oldest_segno);
     520             : 
     521             :         /*
     522             :          * If all required WAL is still there, great, otherwise retry. The
     523             :          * slot should prevent further removal of WAL, unless there's a
     524             :          * concurrent ReplicationSlotsComputeRequiredLSN() after we've written
     525             :          * the new restart_lsn above, so normally we should never need to loop
     526             :          * more than twice.
     527             :          */
     528           8 :         if (segno >= oldest_segno)
     529           8 :             break;
     530             : 
     531             :         /* Retry using the location of the oldest wal segment */
     532           0 :         XLogSegNoOffsetToRecPtr(oldest_segno, 0, wal_segment_size, restart_lsn);
     533             :     }
     534           8 : }
     535             : 
     536             : /*
     537             :  * If the remote restart_lsn and catalog_xmin have caught up with the
     538             :  * local ones, then update the LSNs and persist the local synced slot for
     539             :  * future synchronization; otherwise, do nothing.
     540             :  *
     541             :  * Return true if the slot is marked as RS_PERSISTENT (sync-ready), otherwise
     542             :  * false.
     543             :  */
     544             : static bool
     545           8 : update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
     546             : {
     547           8 :     ReplicationSlot *slot = MyReplicationSlot;
     548           8 :     bool        found_consistent_snapshot = false;
     549           8 :     bool        remote_slot_precedes = false;
     550             : 
     551           8 :     (void) update_local_synced_slot(remote_slot, remote_dbid,
     552             :                                     &found_consistent_snapshot,
     553             :                                     &remote_slot_precedes);
     554             : 
     555             :     /*
     556             :      * Check if the primary server has caught up. Refer to the comment atop
     557             :      * the file for details on this check.
     558             :      */
     559           8 :     if (remote_slot_precedes)
     560             :     {
     561             :         /*
     562             :          * The remote slot didn't catch up to locally reserved position.
     563             :          *
     564             :          * We do not drop the slot because the restart_lsn can be ahead of the
     565             :          * current location when recreating the slot in the next cycle. It may
     566             :          * take more time to create such a slot. Therefore, we keep this slot
     567             :          * and attempt the synchronization in the next cycle.
     568             :          */
     569           0 :         return false;
     570             :     }
     571             : 
     572             :     /*
     573             :      * Don't persist the slot if it cannot reach the consistent point from the
     574             :      * restart_lsn. See comments atop this file.
     575             :      */
     576           8 :     if (!found_consistent_snapshot)
     577             :     {
     578           0 :         ereport(LOG,
     579             :                 errmsg("could not sync slot \"%s\"", remote_slot->name),
     580             :                 errdetail("Logical decoding cannot find consistent point from local slot's LSN %X/%X.",
     581             :                           LSN_FORMAT_ARGS(slot->data.restart_lsn)));
     582             : 
     583           0 :         return false;
     584             :     }
     585             : 
     586           8 :     ReplicationSlotPersist();
     587             : 
     588           8 :     ereport(LOG,
     589             :             errmsg("newly created slot \"%s\" is sync-ready now",
     590             :                    remote_slot->name));
     591             : 
     592           8 :     return true;
     593             : }
     594             : 
     595             : /*
     596             :  * Synchronize a single slot to the given position.
     597             :  *
     598             :  * This creates a new slot if there is no existing one and updates the
     599             :  * metadata of the slot as per the data received from the primary server.
     600             :  *
     601             :  * The slot is created as a temporary slot and stays in the same state until the
     602             :  * remote_slot catches up with locally reserved position and local slot is
     603             :  * updated. The slot is then persisted and is considered as sync-ready for
     604             :  * periodic syncs.
     605             :  *
     606             :  * Returns TRUE if the local slot is updated.
     607             :  */
     608             : static bool
     609          56 : synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
     610             : {
     611             :     ReplicationSlot *slot;
     612             :     XLogRecPtr  latestFlushPtr;
     613          56 :     bool        slot_updated = false;
     614             : 
     615             :     /*
     616             :      * Make sure that concerned WAL is received and flushed before syncing
     617             :      * slot to target lsn received from the primary server.
     618             :      */
     619          56 :     latestFlushPtr = GetStandbyFlushRecPtr(NULL);
     620          56 :     if (remote_slot->confirmed_lsn > latestFlushPtr)
     621             :     {
     622             :         /*
     623             :          * Can get here only if GUC 'standby_slot_names' on the primary server
     624             :          * was not configured correctly.
     625             :          */
     626           0 :         ereport(AmLogicalSlotSyncWorkerProcess() ? LOG : ERROR,
     627             :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     628             :                 errmsg("skipping slot synchronization as the received slot sync"
     629             :                        " LSN %X/%X for slot \"%s\" is ahead of the standby position %X/%X",
     630             :                        LSN_FORMAT_ARGS(remote_slot->confirmed_lsn),
     631             :                        remote_slot->name,
     632             :                        LSN_FORMAT_ARGS(latestFlushPtr)));
     633             : 
     634           0 :         return false;
     635             :     }
     636             : 
     637             :     /* Search for the named slot */
     638          56 :     if ((slot = SearchNamedReplicationSlot(remote_slot->name, true)))
     639             :     {
     640             :         bool        synced;
     641             : 
     642          48 :         SpinLockAcquire(&slot->mutex);
     643          48 :         synced = slot->data.synced;
     644          48 :         SpinLockRelease(&slot->mutex);
     645             : 
     646             :         /* User-created slot with the same name exists, raise ERROR. */
     647          48 :         if (!synced)
     648           0 :             ereport(ERROR,
     649             :                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     650             :                     errmsg("exiting from slot synchronization because same"
     651             :                            " name slot \"%s\" already exists on the standby",
     652             :                            remote_slot->name));
     653             : 
     654             :         /*
     655             :          * The slot has been synchronized before.
     656             :          *
     657             :          * It is important to acquire the slot here before checking
     658             :          * invalidation. If we don't acquire the slot first, there could be a
     659             :          * race condition that the local slot could be invalidated just after
     660             :          * checking the 'invalidated' flag here and we could end up
     661             :          * overwriting 'invalidated' flag to remote_slot's value. See
     662             :          * InvalidatePossiblyObsoleteSlot() where it invalidates slot directly
     663             :          * if the slot is not acquired by other processes.
     664             :          *
     665             :          * XXX: If it ever turns out that slot acquire/release is costly for
     666             :          * cases when none of the slot properties is changed then we can do a
     667             :          * pre-check to ensure that at least one of the slot properties is
     668             :          * changed before acquiring the slot.
     669             :          */
     670          48 :         ReplicationSlotAcquire(remote_slot->name, true);
     671             : 
     672             :         Assert(slot == MyReplicationSlot);
     673             : 
     674             :         /*
     675             :          * Copy the invalidation cause from remote only if local slot is not
     676             :          * invalidated locally, we don't want to overwrite existing one.
     677             :          */
     678          48 :         if (slot->data.invalidated == RS_INVAL_NONE &&
     679          48 :             remote_slot->invalidated != RS_INVAL_NONE)
     680             :         {
     681           0 :             SpinLockAcquire(&slot->mutex);
     682           0 :             slot->data.invalidated = remote_slot->invalidated;
     683           0 :             SpinLockRelease(&slot->mutex);
     684             : 
     685             :             /* Make sure the invalidated state persists across server restart */
     686           0 :             ReplicationSlotMarkDirty();
     687           0 :             ReplicationSlotSave();
     688             : 
     689           0 :             slot_updated = true;
     690             :         }
     691             : 
     692             :         /* Skip the sync of an invalidated slot */
     693          48 :         if (slot->data.invalidated != RS_INVAL_NONE)
     694             :         {
     695           0 :             ReplicationSlotRelease();
     696           0 :             return slot_updated;
     697             :         }
     698             : 
     699             :         /* Slot not ready yet, let's attempt to make it sync-ready now. */
     700          48 :         if (slot->data.persistency == RS_TEMPORARY)
     701             :         {
     702           0 :             slot_updated = update_and_persist_local_synced_slot(remote_slot,
     703             :                                                                 remote_dbid);
     704             :         }
     705             : 
     706             :         /* Slot ready for sync, so sync it. */
     707             :         else
     708             :         {
     709             :             /*
     710             :              * Sanity check: As long as the invalidations are handled
     711             :              * appropriately as above, this should never happen.
     712             :              *
     713             :              * We don't need to check restart_lsn here. See the comments in
     714             :              * update_local_synced_slot() for details.
     715             :              */
     716          48 :             if (remote_slot->confirmed_lsn < slot->data.confirmed_flush)
     717           0 :                 ereport(ERROR,
     718             :                         errmsg_internal("cannot synchronize local slot \"%s\"",
     719             :                                         remote_slot->name),
     720             :                         errdetail_internal("Local slot's start streaming location LSN(%X/%X) is ahead of remote slot's LSN(%X/%X).",
     721             :                                            LSN_FORMAT_ARGS(slot->data.confirmed_flush),
     722             :                                            LSN_FORMAT_ARGS(remote_slot->confirmed_lsn)));
     723             : 
     724          48 :             slot_updated = update_local_synced_slot(remote_slot, remote_dbid,
     725             :                                                     NULL, NULL);
     726             :         }
     727             :     }
     728             :     /* Otherwise create the slot first. */
     729             :     else
     730             :     {
     731             :         NameData    plugin_name;
     732           8 :         TransactionId xmin_horizon = InvalidTransactionId;
     733             : 
     734             :         /* Skip creating the local slot if remote_slot is invalidated already */
     735           8 :         if (remote_slot->invalidated != RS_INVAL_NONE)
     736           0 :             return false;
     737             : 
     738             :         /*
     739             :          * We create temporary slots instead of ephemeral slots here because
     740             :          * we want the slots to survive after releasing them. This is done to
     741             :          * avoid dropping and re-creating the slots in each synchronization
     742             :          * cycle if the restart_lsn or catalog_xmin of the remote slot has not
     743             :          * caught up.
     744             :          */
     745           8 :         ReplicationSlotCreate(remote_slot->name, true, RS_TEMPORARY,
     746           8 :                               remote_slot->two_phase,
     747           8 :                               remote_slot->failover,
     748             :                               true);
     749             : 
     750             :         /* For shorter lines. */
     751           8 :         slot = MyReplicationSlot;
     752             : 
     753             :         /* Avoid expensive operations while holding a spinlock. */
     754           8 :         namestrcpy(&plugin_name, remote_slot->plugin);
     755             : 
     756           8 :         SpinLockAcquire(&slot->mutex);
     757           8 :         slot->data.database = remote_dbid;
     758           8 :         slot->data.plugin = plugin_name;
     759           8 :         SpinLockRelease(&slot->mutex);
     760             : 
     761           8 :         reserve_wal_for_local_slot(remote_slot->restart_lsn);
     762             : 
     763           8 :         LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
     764           8 :         xmin_horizon = GetOldestSafeDecodingTransactionId(true);
     765           8 :         SpinLockAcquire(&slot->mutex);
     766           8 :         slot->effective_catalog_xmin = xmin_horizon;
     767           8 :         slot->data.catalog_xmin = xmin_horizon;
     768           8 :         SpinLockRelease(&slot->mutex);
     769           8 :         ReplicationSlotsComputeRequiredXmin(true);
     770           8 :         LWLockRelease(ProcArrayLock);
     771             : 
     772           8 :         update_and_persist_local_synced_slot(remote_slot, remote_dbid);
     773             : 
     774           8 :         slot_updated = true;
     775             :     }
     776             : 
     777          56 :     ReplicationSlotRelease();
     778             : 
     779          56 :     return slot_updated;
     780             : }
     781             : 
     782             : /*
     783             :  * Synchronize slots.
     784             :  *
     785             :  * Gets the failover logical slots info from the primary server and updates
     786             :  * the slots locally. Creates the slots if not present on the standby.
     787             :  *
     788             :  * Returns TRUE if any of the slots gets updated in this sync-cycle.
     789             :  */
     790             : static bool
     791          30 : synchronize_slots(WalReceiverConn *wrconn)
     792             : {
     793             : #define SLOTSYNC_COLUMN_COUNT 9
     794          30 :     Oid         slotRow[SLOTSYNC_COLUMN_COUNT] = {TEXTOID, TEXTOID, LSNOID,
     795             :     LSNOID, XIDOID, BOOLOID, BOOLOID, TEXTOID, TEXTOID};
     796             : 
     797             :     WalRcvExecResult *res;
     798             :     TupleTableSlot *tupslot;
     799          30 :     List       *remote_slot_list = NIL;
     800          30 :     bool        some_slot_updated = false;
     801          30 :     bool        started_tx = false;
     802          30 :     const char *query = "SELECT slot_name, plugin, confirmed_flush_lsn,"
     803             :         " restart_lsn, catalog_xmin, two_phase, failover,"
     804             :         " database, invalidation_reason"
     805             :         " FROM pg_catalog.pg_replication_slots"
     806             :         " WHERE failover and NOT temporary";
     807             : 
     808             :     /* The syscache access in walrcv_exec() needs a transaction env. */
     809          30 :     if (!IsTransactionState())
     810             :     {
     811          18 :         StartTransactionCommand();
     812          18 :         started_tx = true;
     813             :     }
     814             : 
     815             :     /* Execute the query */
     816          30 :     res = walrcv_exec(wrconn, query, SLOTSYNC_COLUMN_COUNT, slotRow);
     817          30 :     if (res->status != WALRCV_OK_TUPLES)
     818           0 :         ereport(ERROR,
     819             :                 errmsg("could not fetch failover logical slots info from the primary server: %s",
     820             :                        res->err));
     821             : 
     822             :     /* Construct the remote_slot tuple and synchronize each slot locally */
     823          30 :     tupslot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
     824          86 :     while (tuplestore_gettupleslot(res->tuplestore, true, false, tupslot))
     825             :     {
     826             :         bool        isnull;
     827          56 :         RemoteSlot *remote_slot = palloc0(sizeof(RemoteSlot));
     828             :         Datum       d;
     829          56 :         int         col = 0;
     830             : 
     831          56 :         remote_slot->name = TextDatumGetCString(slot_getattr(tupslot, ++col,
     832             :                                                              &isnull));
     833             :         Assert(!isnull);
     834             : 
     835          56 :         remote_slot->plugin = TextDatumGetCString(slot_getattr(tupslot, ++col,
     836             :                                                                &isnull));
     837             :         Assert(!isnull);
     838             : 
     839             :         /*
     840             :          * It is possible to get null values for LSN and Xmin if slot is
     841             :          * invalidated on the primary server, so handle accordingly.
     842             :          */
     843          56 :         d = slot_getattr(tupslot, ++col, &isnull);
     844          56 :         remote_slot->confirmed_lsn = isnull ? InvalidXLogRecPtr :
     845          56 :             DatumGetLSN(d);
     846             : 
     847          56 :         d = slot_getattr(tupslot, ++col, &isnull);
     848          56 :         remote_slot->restart_lsn = isnull ? InvalidXLogRecPtr : DatumGetLSN(d);
     849             : 
     850          56 :         d = slot_getattr(tupslot, ++col, &isnull);
     851          56 :         remote_slot->catalog_xmin = isnull ? InvalidTransactionId :
     852          56 :             DatumGetTransactionId(d);
     853             : 
     854          56 :         remote_slot->two_phase = DatumGetBool(slot_getattr(tupslot, ++col,
     855             :                                                            &isnull));
     856             :         Assert(!isnull);
     857             : 
     858          56 :         remote_slot->failover = DatumGetBool(slot_getattr(tupslot, ++col,
     859             :                                                           &isnull));
     860             :         Assert(!isnull);
     861             : 
     862          56 :         remote_slot->database = TextDatumGetCString(slot_getattr(tupslot,
     863             :                                                                  ++col, &isnull));
     864             :         Assert(!isnull);
     865             : 
     866          56 :         d = slot_getattr(tupslot, ++col, &isnull);
     867          56 :         remote_slot->invalidated = isnull ? RS_INVAL_NONE :
     868           0 :             GetSlotInvalidationCause(TextDatumGetCString(d));
     869             : 
     870             :         /* Sanity check */
     871             :         Assert(col == SLOTSYNC_COLUMN_COUNT);
     872             : 
     873             :         /*
     874             :          * If restart_lsn, confirmed_lsn or catalog_xmin is invalid but the
     875             :          * slot is valid, that means we have fetched the remote_slot in its
     876             :          * RS_EPHEMERAL state. In such a case, don't sync it; we can always
     877             :          * sync it in the next sync cycle when the remote_slot is persisted
     878             :          * and has valid lsn(s) and xmin values.
     879             :          *
     880             :          * XXX: In future, if we plan to expose 'slot->data.persistency' in
     881             :          * pg_replication_slots view, then we can avoid fetching RS_EPHEMERAL
     882             :          * slots in the first place.
     883             :          */
     884          56 :         if ((XLogRecPtrIsInvalid(remote_slot->restart_lsn) ||
     885          56 :              XLogRecPtrIsInvalid(remote_slot->confirmed_lsn) ||
     886          56 :              !TransactionIdIsValid(remote_slot->catalog_xmin)) &&
     887           0 :             remote_slot->invalidated == RS_INVAL_NONE)
     888           0 :             pfree(remote_slot);
     889             :         else
     890             :             /* Create list of remote slots */
     891          56 :             remote_slot_list = lappend(remote_slot_list, remote_slot);
     892             : 
     893          56 :         ExecClearTuple(tupslot);
     894             :     }
     895             : 
     896             :     /* Drop local slots that no longer need to be synced. */
     897          30 :     drop_local_obsolete_slots(remote_slot_list);
     898             : 
     899             :     /* Now sync the slots locally */
     900         116 :     foreach_ptr(RemoteSlot, remote_slot, remote_slot_list)
     901             :     {
     902          56 :         Oid         remote_dbid = get_database_oid(remote_slot->database, false);
     903             : 
     904             :         /*
     905             :          * Use shared lock to prevent a conflict with
     906             :          * ReplicationSlotsDropDBSlots(), trying to drop the same slot during
     907             :          * a drop-database operation.
     908             :          */
     909          56 :         LockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock);
     910             : 
     911          56 :         some_slot_updated |= synchronize_one_slot(remote_slot, remote_dbid);
     912             : 
     913          56 :         UnlockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock);
     914             :     }
     915             : 
     916             :     /* We are done, free remote_slot_list elements */
     917          30 :     list_free_deep(remote_slot_list);
     918             : 
     919          30 :     walrcv_clear_result(res);
     920             : 
     921          30 :     if (started_tx)
     922          18 :         CommitTransactionCommand();
     923             : 
     924          30 :     return some_slot_updated;
     925             : }
     926             : 
     927             : /*
     928             :  * Checks the remote server info.
     929             :  *
     930             :  * We ensure that the 'primary_slot_name' exists on the remote server and the
     931             :  * remote server is not a standby node.
     932             :  */
     933             : static void
     934          22 : validate_remote_info(WalReceiverConn *wrconn)
     935             : {
     936             : #define PRIMARY_INFO_OUTPUT_COL_COUNT 2
     937             :     WalRcvExecResult *res;
     938          22 :     Oid         slotRow[PRIMARY_INFO_OUTPUT_COL_COUNT] = {BOOLOID, BOOLOID};
     939             :     StringInfoData cmd;
     940             :     bool        isnull;
     941             :     TupleTableSlot *tupslot;
     942             :     bool        remote_in_recovery;
     943             :     bool        primary_slot_valid;
     944          22 :     bool        started_tx = false;
     945             : 
     946          22 :     initStringInfo(&cmd);
     947          22 :     appendStringInfo(&cmd,
     948             :                      "SELECT pg_is_in_recovery(), count(*) = 1"
     949             :                      " FROM pg_catalog.pg_replication_slots"
     950             :                      " WHERE slot_type='physical' AND slot_name=%s",
     951             :                      quote_literal_cstr(PrimarySlotName));
     952             : 
     953             :     /* The syscache access in walrcv_exec() needs a transaction env. */
     954          22 :     if (!IsTransactionState())
     955             :     {
     956           8 :         StartTransactionCommand();
     957           8 :         started_tx = true;
     958             :     }
     959             : 
     960          22 :     res = walrcv_exec(wrconn, cmd.data, PRIMARY_INFO_OUTPUT_COL_COUNT, slotRow);
     961          22 :     pfree(cmd.data);
     962             : 
     963          22 :     if (res->status != WALRCV_OK_TUPLES)
     964           0 :         ereport(ERROR,
     965             :                 errmsg("could not fetch primary_slot_name \"%s\" info from the primary server: %s",
     966             :                        PrimarySlotName, res->err),
     967             :                 errhint("Check if primary_slot_name is configured correctly."));
     968             : 
     969          22 :     tupslot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
     970          22 :     if (!tuplestore_gettupleslot(res->tuplestore, true, false, tupslot))
     971           0 :         elog(ERROR,
     972             :              "failed to fetch tuple for the primary server slot specified by primary_slot_name");
     973             : 
     974          22 :     remote_in_recovery = DatumGetBool(slot_getattr(tupslot, 1, &isnull));
     975             :     Assert(!isnull);
     976             : 
     977             :     /*
     978             :      * Slot sync is currently not supported on a cascading standby. This is
     979             :      * because if we allow it, the primary server needs to wait for all the
     980             :      * cascading standbys, otherwise, logical subscribers can still be ahead
     981             :      * of one of the cascading standbys which we plan to promote. Thus, to
     982             :      * avoid this additional complexity, we restrict it for the time being.
     983             :      */
     984          22 :     if (remote_in_recovery)
     985           2 :         ereport(ERROR,
     986             :                 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     987             :                 errmsg("cannot synchronize replication slots from a standby server"));
     988             : 
     989          20 :     primary_slot_valid = DatumGetBool(slot_getattr(tupslot, 2, &isnull));
     990             :     Assert(!isnull);
     991             : 
     992          20 :     if (!primary_slot_valid)
     993           0 :         ereport(ERROR,
     994             :                 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     995             :                 errmsg("slot synchronization requires valid primary_slot_name"),
     996             :         /* translator: second %s is a GUC variable name */
     997             :                 errdetail("The replication slot \"%s\" specified by %s does not exist on the primary server.",
     998             :                           PrimarySlotName, "primary_slot_name"));
     999             : 
    1000          20 :     ExecClearTuple(tupslot);
    1001          20 :     walrcv_clear_result(res);
    1002             : 
    1003          20 :     if (started_tx)
    1004           8 :         CommitTransactionCommand();
    1005          20 : }
    1006             : 
    1007             : /*
    1008             :  * Checks if dbname is specified in 'primary_conninfo'.
    1009             :  *
    1010             :  * Error out if not specified otherwise return it.
    1011             :  */
    1012             : char *
    1013          24 : CheckAndGetDbnameFromConninfo(void)
    1014             : {
    1015             :     char       *dbname;
    1016             : 
    1017             :     /*
    1018             :      * The slot synchronization needs a database connection for walrcv_exec to
    1019             :      * work.
    1020             :      */
    1021          24 :     dbname = walrcv_get_dbname_from_conninfo(PrimaryConnInfo);
    1022          24 :     if (dbname == NULL)
    1023           2 :         ereport(ERROR,
    1024             : 
    1025             :         /*
    1026             :          * translator: dbname is a specific option; %s is a GUC variable name
    1027             :          */
    1028             :                 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1029             :                 errmsg("slot synchronization requires dbname to be specified in %s",
    1030             :                        "primary_conninfo"));
    1031          22 :     return dbname;
    1032             : }
    1033             : 
    1034             : /*
    1035             :  * Return true if all necessary GUCs for slot synchronization are set
    1036             :  * appropriately, otherwise, return false.
    1037             :  */
    1038             : bool
    1039          28 : ValidateSlotSyncParams(int elevel)
    1040             : {
    1041             :     /*
    1042             :      * Logical slot sync/creation requires wal_level >= logical.
    1043             :      *
    1044             :      * Sincle altering the wal_level requires a server restart, so error out
    1045             :      * in this case regardless of elevel provided by caller.
    1046             :      */
    1047          28 :     if (wal_level < WAL_LEVEL_LOGICAL)
    1048           0 :         ereport(ERROR,
    1049             :                 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1050             :                 errmsg("slot synchronization requires wal_level >= \"logical\""));
    1051             : 
    1052             :     /*
    1053             :      * A physical replication slot(primary_slot_name) is required on the
    1054             :      * primary to ensure that the rows needed by the standby are not removed
    1055             :      * after restarting, so that the synchronized slot on the standby will not
    1056             :      * be invalidated.
    1057             :      */
    1058          28 :     if (PrimarySlotName == NULL || *PrimarySlotName == '\0')
    1059             :     {
    1060           0 :         ereport(elevel,
    1061             :         /* translator: %s is a GUC variable name */
    1062             :                 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1063             :                 errmsg("slot synchronization requires %s to be defined", "primary_slot_name"));
    1064           0 :         return false;
    1065             :     }
    1066             : 
    1067             :     /*
    1068             :      * hot_standby_feedback must be enabled to cooperate with the physical
    1069             :      * replication slot, which allows informing the primary about the xmin and
    1070             :      * catalog_xmin values on the standby.
    1071             :      */
    1072          28 :     if (!hot_standby_feedback)
    1073             :     {
    1074           2 :         ereport(elevel,
    1075             :         /* translator: %s is a GUC variable name */
    1076             :                 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1077             :                 errmsg("slot synchronization requires %s to be enabled",
    1078             :                        "hot_standby_feedback"));
    1079           2 :         return false;
    1080             :     }
    1081             : 
    1082             :     /*
    1083             :      * The primary_conninfo is required to make connection to primary for
    1084             :      * getting slots information.
    1085             :      */
    1086          26 :     if (PrimaryConnInfo == NULL || *PrimaryConnInfo == '\0')
    1087             :     {
    1088           0 :         ereport(elevel,
    1089             :         /* translator: %s is a GUC variable name */
    1090             :                 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    1091             :                 errmsg("slot synchronization requires %s to be defined",
    1092             :                        "primary_conninfo"));
    1093           0 :         return false;
    1094             :     }
    1095             : 
    1096          26 :     return true;
    1097             : }
    1098             : 
    1099             : /*
    1100             :  * Re-read the config file.
    1101             :  *
    1102             :  * Exit if any of the slot sync GUCs have changed. The postmaster will
    1103             :  * restart it.
    1104             :  */
    1105             : static void
    1106           2 : slotsync_reread_config(void)
    1107             : {
    1108           2 :     char       *old_primary_conninfo = pstrdup(PrimaryConnInfo);
    1109           2 :     char       *old_primary_slotname = pstrdup(PrimarySlotName);
    1110           2 :     bool        old_sync_replication_slots = sync_replication_slots;
    1111           2 :     bool        old_hot_standby_feedback = hot_standby_feedback;
    1112             :     bool        conninfo_changed;
    1113             :     bool        primary_slotname_changed;
    1114             : 
    1115             :     Assert(sync_replication_slots);
    1116             : 
    1117           2 :     ConfigReloadPending = false;
    1118           2 :     ProcessConfigFile(PGC_SIGHUP);
    1119             : 
    1120           2 :     conninfo_changed = strcmp(old_primary_conninfo, PrimaryConnInfo) != 0;
    1121           2 :     primary_slotname_changed = strcmp(old_primary_slotname, PrimarySlotName) != 0;
    1122           2 :     pfree(old_primary_conninfo);
    1123           2 :     pfree(old_primary_slotname);
    1124             : 
    1125           2 :     if (old_sync_replication_slots != sync_replication_slots)
    1126             :     {
    1127           0 :         ereport(LOG,
    1128             :         /* translator: %s is a GUC variable name */
    1129             :                 errmsg("slot sync worker will shutdown because %s is disabled", "sync_replication_slots"));
    1130           0 :         proc_exit(0);
    1131             :     }
    1132             : 
    1133           2 :     if (conninfo_changed ||
    1134           2 :         primary_slotname_changed ||
    1135           2 :         (old_hot_standby_feedback != hot_standby_feedback))
    1136             :     {
    1137           2 :         ereport(LOG,
    1138             :                 errmsg("slot sync worker will restart because of a parameter change"));
    1139             : 
    1140             :         /*
    1141             :          * Reset the last-start time for this worker so that the postmaster
    1142             :          * can restart it without waiting for SLOTSYNC_RESTART_INTERVAL_SEC.
    1143             :          */
    1144           2 :         SlotSyncCtx->last_start_time = 0;
    1145             : 
    1146           2 :         proc_exit(0);
    1147             :     }
    1148             : 
    1149           0 : }
    1150             : 
    1151             : /*
    1152             :  * Interrupt handler for main loop of slot sync worker.
    1153             :  */
    1154             : static void
    1155          26 : ProcessSlotSyncInterrupts(WalReceiverConn *wrconn)
    1156             : {
    1157          26 :     CHECK_FOR_INTERRUPTS();
    1158             : 
    1159          22 :     if (ShutdownRequestPending)
    1160             :     {
    1161           2 :         ereport(LOG,
    1162             :                 errmsg("slot sync worker is shutting down on receiving SIGINT"));
    1163             : 
    1164           2 :         proc_exit(0);
    1165             :     }
    1166             : 
    1167          20 :     if (ConfigReloadPending)
    1168           2 :         slotsync_reread_config();
    1169          18 : }
    1170             : 
    1171             : /*
    1172             :  * Connection cleanup function for slotsync worker.
    1173             :  *
    1174             :  * Called on slotsync worker exit.
    1175             :  */
    1176             : static void
    1177           8 : slotsync_worker_disconnect(int code, Datum arg)
    1178             : {
    1179           8 :     WalReceiverConn *wrconn = (WalReceiverConn *) DatumGetPointer(arg);
    1180             : 
    1181           8 :     walrcv_disconnect(wrconn);
    1182           8 : }
    1183             : 
    1184             : /*
    1185             :  * Cleanup function for slotsync worker.
    1186             :  *
    1187             :  * Called on slotsync worker exit.
    1188             :  */
    1189             : static void
    1190           8 : slotsync_worker_onexit(int code, Datum arg)
    1191             : {
    1192             :     /*
    1193             :      * We need to do slots cleanup here just like WalSndErrorCleanup() does.
    1194             :      *
    1195             :      * The startup process during promotion invokes ShutDownSlotSync() which
    1196             :      * waits for slot sync to finish and it does that by checking the
    1197             :      * 'syncing' flag. Thus the slot sync worker must be done with slots'
    1198             :      * release and cleanup to avoid any dangling temporary slots or active
    1199             :      * slots before it marks itself as finished syncing.
    1200             :      */
    1201             : 
    1202             :     /* Make sure active replication slots are released */
    1203           8 :     if (MyReplicationSlot != NULL)
    1204           0 :         ReplicationSlotRelease();
    1205             : 
    1206             :     /* Also cleanup the temporary slots. */
    1207           8 :     ReplicationSlotCleanup(false);
    1208             : 
    1209           8 :     SpinLockAcquire(&SlotSyncCtx->mutex);
    1210             : 
    1211           8 :     SlotSyncCtx->pid = InvalidPid;
    1212             : 
    1213             :     /*
    1214             :      * If syncing_slots is true, it indicates that the process errored out
    1215             :      * without resetting the flag. So, we need to clean up shared memory and
    1216             :      * reset the flag here.
    1217             :      */
    1218           8 :     if (syncing_slots)
    1219             :     {
    1220           8 :         SlotSyncCtx->syncing = false;
    1221           8 :         syncing_slots = false;
    1222             :     }
    1223             : 
    1224           8 :     SpinLockRelease(&SlotSyncCtx->mutex);
    1225           8 : }
    1226             : 
    1227             : /*
    1228             :  * Sleep for long enough that we believe it's likely that the slots on primary
    1229             :  * get updated.
    1230             :  *
    1231             :  * If there is no slot activity the wait time between sync-cycles will double
    1232             :  * (to a maximum of 30s). If there is some slot activity the wait time between
    1233             :  * sync-cycles is reset to the minimum (200ms).
    1234             :  */
    1235             : static void
    1236          18 : wait_for_slot_activity(bool some_slot_updated)
    1237             : {
    1238             :     int         rc;
    1239             : 
    1240          18 :     if (!some_slot_updated)
    1241             :     {
    1242             :         /*
    1243             :          * No slots were updated, so double the sleep time, but not beyond the
    1244             :          * maximum allowable value.
    1245             :          */
    1246           8 :         sleep_ms = Min(sleep_ms * 2, MAX_SLOTSYNC_WORKER_NAPTIME_MS);
    1247             :     }
    1248             :     else
    1249             :     {
    1250             :         /*
    1251             :          * Some slots were updated since the last sleep, so reset the sleep
    1252             :          * time.
    1253             :          */
    1254          10 :         sleep_ms = MIN_SLOTSYNC_WORKER_NAPTIME_MS;
    1255             :     }
    1256             : 
    1257          18 :     rc = WaitLatch(MyLatch,
    1258             :                    WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
    1259             :                    sleep_ms,
    1260             :                    WAIT_EVENT_REPLICATION_SLOTSYNC_MAIN);
    1261             : 
    1262          18 :     if (rc & WL_LATCH_SET)
    1263           8 :         ResetLatch(MyLatch);
    1264          18 : }
    1265             : 
    1266             : /*
    1267             :  * Emit an error if a promotion or a concurrent sync call is in progress.
    1268             :  * Otherwise, advertise that a sync is in progress.
    1269             :  */
    1270             : static void
    1271          22 : check_and_set_sync_info(pid_t worker_pid)
    1272             : {
    1273          22 :     SpinLockAcquire(&SlotSyncCtx->mutex);
    1274             : 
    1275             :     /* The worker pid must not be already assigned in SlotSyncCtx */
    1276             :     Assert(worker_pid == InvalidPid || SlotSyncCtx->pid == InvalidPid);
    1277             : 
    1278             :     /*
    1279             :      * Emit an error if startup process signaled the slot sync machinery to
    1280             :      * stop. See comments atop SlotSyncCtxStruct.
    1281             :      */
    1282          22 :     if (SlotSyncCtx->stopSignaled)
    1283             :     {
    1284           0 :         SpinLockRelease(&SlotSyncCtx->mutex);
    1285           0 :         ereport(ERROR,
    1286             :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1287             :                 errmsg("cannot synchronize replication slots when standby promotion is ongoing"));
    1288             :     }
    1289             : 
    1290          22 :     if (SlotSyncCtx->syncing)
    1291             :     {
    1292           0 :         SpinLockRelease(&SlotSyncCtx->mutex);
    1293           0 :         ereport(ERROR,
    1294             :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1295             :                 errmsg("cannot synchronize replication slots concurrently"));
    1296             :     }
    1297             : 
    1298          22 :     SlotSyncCtx->syncing = true;
    1299             : 
    1300             :     /*
    1301             :      * Advertise the required PID so that the startup process can kill the
    1302             :      * slot sync worker on promotion.
    1303             :      */
    1304          22 :     SlotSyncCtx->pid = worker_pid;
    1305             : 
    1306          22 :     SpinLockRelease(&SlotSyncCtx->mutex);
    1307             : 
    1308          22 :     syncing_slots = true;
    1309          22 : }
    1310             : 
    1311             : /*
    1312             :  * Reset syncing flag.
    1313             :  */
    1314             : static void
    1315          14 : reset_syncing_flag()
    1316             : {
    1317          14 :     SpinLockAcquire(&SlotSyncCtx->mutex);
    1318          14 :     SlotSyncCtx->syncing = false;
    1319          14 :     SpinLockRelease(&SlotSyncCtx->mutex);
    1320             : 
    1321          14 :     syncing_slots = false;
    1322          14 : };
    1323             : 
    1324             : /*
    1325             :  * The main loop of our worker process.
    1326             :  *
    1327             :  * It connects to the primary server, fetches logical failover slots
    1328             :  * information periodically in order to create and sync the slots.
    1329             :  */
    1330             : void
    1331           8 : ReplSlotSyncWorkerMain(char *startup_data, size_t startup_data_len)
    1332             : {
    1333           8 :     WalReceiverConn *wrconn = NULL;
    1334             :     char       *dbname;
    1335             :     char       *err;
    1336             :     sigjmp_buf  local_sigjmp_buf;
    1337             :     StringInfoData app_name;
    1338             : 
    1339             :     Assert(startup_data_len == 0);
    1340             : 
    1341           8 :     MyBackendType = B_SLOTSYNC_WORKER;
    1342             : 
    1343           8 :     init_ps_display(NULL);
    1344             : 
    1345           8 :     SetProcessingMode(InitProcessing);
    1346             : 
    1347             :     /*
    1348             :      * Create a per-backend PGPROC struct in shared memory.  We must do this
    1349             :      * before we access any shared memory.
    1350             :      */
    1351           8 :     InitProcess();
    1352             : 
    1353             :     /*
    1354             :      * Early initialization.
    1355             :      */
    1356           8 :     BaseInit();
    1357             : 
    1358             :     Assert(SlotSyncCtx != NULL);
    1359             : 
    1360             :     /*
    1361             :      * If an exception is encountered, processing resumes here.
    1362             :      *
    1363             :      * We just need to clean up, report the error, and go away.
    1364             :      *
    1365             :      * If we do not have this handling here, then since this worker process
    1366             :      * operates at the bottom of the exception stack, ERRORs turn into FATALs.
    1367             :      * Therefore, we create our own exception handler to catch ERRORs.
    1368             :      */
    1369           8 :     if (sigsetjmp(local_sigjmp_buf, 1) != 0)
    1370             :     {
    1371             :         /* since not using PG_TRY, must reset error stack by hand */
    1372           0 :         error_context_stack = NULL;
    1373             : 
    1374             :         /* Prevents interrupts while cleaning up */
    1375           0 :         HOLD_INTERRUPTS();
    1376             : 
    1377             :         /* Report the error to the server log */
    1378           0 :         EmitErrorReport();
    1379             : 
    1380             :         /*
    1381             :          * We can now go away.  Note that because we called InitProcess, a
    1382             :          * callback was registered to do ProcKill, which will clean up
    1383             :          * necessary state.
    1384             :          */
    1385           0 :         proc_exit(0);
    1386             :     }
    1387             : 
    1388             :     /* We can now handle ereport(ERROR) */
    1389           8 :     PG_exception_stack = &local_sigjmp_buf;
    1390             : 
    1391             :     /* Setup signal handling */
    1392           8 :     pqsignal(SIGHUP, SignalHandlerForConfigReload);
    1393           8 :     pqsignal(SIGINT, SignalHandlerForShutdownRequest);
    1394           8 :     pqsignal(SIGTERM, die);
    1395           8 :     pqsignal(SIGFPE, FloatExceptionHandler);
    1396           8 :     pqsignal(SIGUSR1, procsignal_sigusr1_handler);
    1397           8 :     pqsignal(SIGUSR2, SIG_IGN);
    1398           8 :     pqsignal(SIGPIPE, SIG_IGN);
    1399           8 :     pqsignal(SIGCHLD, SIG_DFL);
    1400             : 
    1401           8 :     check_and_set_sync_info(MyProcPid);
    1402             : 
    1403           8 :     ereport(LOG, errmsg("slot sync worker started"));
    1404             : 
    1405             :     /* Register it as soon as SlotSyncCtx->pid is initialized. */
    1406           8 :     before_shmem_exit(slotsync_worker_onexit, (Datum) 0);
    1407             : 
    1408             :     /*
    1409             :      * Establishes SIGALRM handler and initialize timeout module. It is needed
    1410             :      * by InitPostgres to register different timeouts.
    1411             :      */
    1412           8 :     InitializeTimeouts();
    1413             : 
    1414             :     /* Load the libpq-specific functions */
    1415           8 :     load_file("libpqwalreceiver", false);
    1416             : 
    1417             :     /*
    1418             :      * Unblock signals (they were blocked when the postmaster forked us)
    1419             :      */
    1420           8 :     sigprocmask(SIG_SETMASK, &UnBlockSig, NULL);
    1421             : 
    1422             :     /*
    1423             :      * Set always-secure search path, so malicious users can't redirect user
    1424             :      * code (e.g. operators).
    1425             :      *
    1426             :      * It's not strictly necessary since we won't be scanning or writing to
    1427             :      * any user table locally, but it's good to retain it here for added
    1428             :      * precaution.
    1429             :      */
    1430           8 :     SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
    1431             : 
    1432           8 :     dbname = CheckAndGetDbnameFromConninfo();
    1433             : 
    1434             :     /*
    1435             :      * Connect to the database specified by the user in primary_conninfo. We
    1436             :      * need a database connection for walrcv_exec to work which we use to
    1437             :      * fetch slot information from the remote node. See comments atop
    1438             :      * libpqrcv_exec.
    1439             :      *
    1440             :      * We do not specify a specific user here since the slot sync worker will
    1441             :      * operate as a superuser. This is safe because the slot sync worker does
    1442             :      * not interact with user tables, eliminating the risk of executing
    1443             :      * arbitrary code within triggers.
    1444             :      */
    1445           8 :     InitPostgres(dbname, InvalidOid, NULL, InvalidOid, 0, NULL);
    1446             : 
    1447           8 :     SetProcessingMode(NormalProcessing);
    1448             : 
    1449           8 :     initStringInfo(&app_name);
    1450           8 :     if (cluster_name[0])
    1451           8 :         appendStringInfo(&app_name, "%s_%s", cluster_name, "slotsync worker");
    1452             :     else
    1453           0 :         appendStringInfoString(&app_name, "slotsync worker");
    1454             : 
    1455             :     /*
    1456             :      * Establish the connection to the primary server for slot
    1457             :      * synchronization.
    1458             :      */
    1459           8 :     wrconn = walrcv_connect(PrimaryConnInfo, false, false, false,
    1460             :                             app_name.data, &err);
    1461           8 :     pfree(app_name.data);
    1462             : 
    1463           8 :     if (!wrconn)
    1464           0 :         ereport(ERROR,
    1465             :                 errcode(ERRCODE_CONNECTION_FAILURE),
    1466             :                 errmsg("could not connect to the primary server: %s", err));
    1467             : 
    1468             :     /*
    1469             :      * Register the disconnection callback.
    1470             :      *
    1471             :      * XXX: This can be combined with previous cleanup registration of
    1472             :      * slotsync_worker_onexit() but that will need the connection to be made
    1473             :      * global and we want to avoid introducing global for this purpose.
    1474             :      */
    1475           8 :     before_shmem_exit(slotsync_worker_disconnect, PointerGetDatum(wrconn));
    1476             : 
    1477             :     /*
    1478             :      * Using the specified primary server connection, check that we are not a
    1479             :      * cascading standby and slot configured in 'primary_slot_name' exists on
    1480             :      * the primary server.
    1481             :      */
    1482           8 :     validate_remote_info(wrconn);
    1483             : 
    1484             :     /* Main loop to synchronize slots */
    1485             :     for (;;)
    1486          18 :     {
    1487          26 :         bool        some_slot_updated = false;
    1488             : 
    1489          26 :         ProcessSlotSyncInterrupts(wrconn);
    1490             : 
    1491          18 :         some_slot_updated = synchronize_slots(wrconn);
    1492             : 
    1493          18 :         wait_for_slot_activity(some_slot_updated);
    1494             :     }
    1495             : 
    1496             :     /*
    1497             :      * The slot sync worker can't get here because it will only stop when it
    1498             :      * receives a SIGINT from the startup process, or when there is an error.
    1499             :      */
    1500             :     Assert(false);
    1501             : }
    1502             : 
    1503             : /*
    1504             :  * Update the inactive_since property for synced slots.
    1505             :  *
    1506             :  * Note that this function is currently called when we shutdown the slot
    1507             :  * sync machinery.
    1508             :  */
    1509             : static void
    1510        1420 : update_synced_slots_inactive_since(void)
    1511             : {
    1512        1420 :     TimestampTz now = 0;
    1513             : 
    1514             :     /*
    1515             :      * We need to update inactive_since only when we are promoting standby to
    1516             :      * correctly interpret the inactive_since if the standby gets promoted
    1517             :      * without a restart. We don't want the slots to appear inactive for a
    1518             :      * long time after promotion if they haven't been synchronized recently.
    1519             :      * Whoever acquires the slot i.e.makes the slot active will reset it.
    1520             :      */
    1521        1420 :     if (!StandbyMode)
    1522        1336 :         return;
    1523             : 
    1524             :     /* The slot sync worker or SQL function mustn't be running by now */
    1525             :     Assert((SlotSyncCtx->pid == InvalidPid) && !SlotSyncCtx->syncing);
    1526             : 
    1527          84 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    1528             : 
    1529         900 :     for (int i = 0; i < max_replication_slots; i++)
    1530             :     {
    1531         816 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
    1532             : 
    1533             :         /* Check if it is a synchronized slot */
    1534         816 :         if (s->in_use && s->data.synced)
    1535             :         {
    1536             :             Assert(SlotIsLogical(s));
    1537             : 
    1538             :             /* The slot must not be acquired by any process */
    1539             :             Assert(s->active_pid == 0);
    1540             : 
    1541             :             /* Use the same inactive_since time for all the slots. */
    1542           4 :             if (now == 0)
    1543           2 :                 now = GetCurrentTimestamp();
    1544             : 
    1545           4 :             SpinLockAcquire(&s->mutex);
    1546           4 :             s->inactive_since = now;
    1547           4 :             SpinLockRelease(&s->mutex);
    1548             :         }
    1549             :     }
    1550             : 
    1551          84 :     LWLockRelease(ReplicationSlotControlLock);
    1552             : }
    1553             : 
    1554             : /*
    1555             :  * Shut down the slot sync worker.
    1556             :  *
    1557             :  * This function sends signal to shutdown slot sync worker, if required. It
    1558             :  * also waits till the slot sync worker has exited or
    1559             :  * pg_sync_replication_slots() has finished.
    1560             :  */
    1561             : void
    1562        1420 : ShutDownSlotSync(void)
    1563             : {
    1564             :     pid_t       worker_pid;
    1565             : 
    1566        1420 :     SpinLockAcquire(&SlotSyncCtx->mutex);
    1567             : 
    1568        1420 :     SlotSyncCtx->stopSignaled = true;
    1569             : 
    1570             :     /*
    1571             :      * Return if neither the slot sync worker is running nor the function
    1572             :      * pg_sync_replication_slots() is executing.
    1573             :      */
    1574        1420 :     if (!SlotSyncCtx->syncing)
    1575             :     {
    1576        1418 :         SpinLockRelease(&SlotSyncCtx->mutex);
    1577        1418 :         update_synced_slots_inactive_since();
    1578        1418 :         return;
    1579             :     }
    1580             : 
    1581           2 :     worker_pid = SlotSyncCtx->pid;
    1582             : 
    1583           2 :     SpinLockRelease(&SlotSyncCtx->mutex);
    1584             : 
    1585           2 :     if (worker_pid != InvalidPid)
    1586           2 :         kill(worker_pid, SIGINT);
    1587             : 
    1588             :     /* Wait for slot sync to end */
    1589             :     for (;;)
    1590           0 :     {
    1591             :         int         rc;
    1592             : 
    1593             :         /* Wait a bit, we don't expect to have to wait long */
    1594           2 :         rc = WaitLatch(MyLatch,
    1595             :                        WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
    1596             :                        10L, WAIT_EVENT_REPLICATION_SLOTSYNC_SHUTDOWN);
    1597             : 
    1598           2 :         if (rc & WL_LATCH_SET)
    1599             :         {
    1600           0 :             ResetLatch(MyLatch);
    1601           0 :             CHECK_FOR_INTERRUPTS();
    1602             :         }
    1603             : 
    1604           2 :         SpinLockAcquire(&SlotSyncCtx->mutex);
    1605             : 
    1606             :         /* Ensure that no process is syncing the slots. */
    1607           2 :         if (!SlotSyncCtx->syncing)
    1608           2 :             break;
    1609             : 
    1610           0 :         SpinLockRelease(&SlotSyncCtx->mutex);
    1611             :     }
    1612             : 
    1613           2 :     SpinLockRelease(&SlotSyncCtx->mutex);
    1614             : 
    1615           2 :     update_synced_slots_inactive_since();
    1616             : }
    1617             : 
    1618             : /*
    1619             :  * SlotSyncWorkerCanRestart
    1620             :  *
    1621             :  * Returns true if enough time (SLOTSYNC_RESTART_INTERVAL_SEC) has passed
    1622             :  * since it was launched last. Otherwise returns false.
    1623             :  *
    1624             :  * This is a safety valve to protect against continuous respawn attempts if the
    1625             :  * worker is dying immediately at launch. Note that since we will retry to
    1626             :  * launch the worker from the postmaster main loop, we will get another
    1627             :  * chance later.
    1628             :  */
    1629             : bool
    1630          10 : SlotSyncWorkerCanRestart(void)
    1631             : {
    1632          10 :     time_t      curtime = time(NULL);
    1633             : 
    1634             :     /* Return false if too soon since last start. */
    1635          10 :     if ((unsigned int) (curtime - SlotSyncCtx->last_start_time) <
    1636             :         (unsigned int) SLOTSYNC_RESTART_INTERVAL_SEC)
    1637           2 :         return false;
    1638             : 
    1639           8 :     SlotSyncCtx->last_start_time = curtime;
    1640             : 
    1641           8 :     return true;
    1642             : }
    1643             : 
    1644             : /*
    1645             :  * Is current process syncing replication slots?
    1646             :  *
    1647             :  * Could be either backend executing SQL function or slot sync worker.
    1648             :  */
    1649             : bool
    1650          36 : IsSyncingReplicationSlots(void)
    1651             : {
    1652          36 :     return syncing_slots;
    1653             : }
    1654             : 
    1655             : /*
    1656             :  * Amount of shared memory required for slot synchronization.
    1657             :  */
    1658             : Size
    1659        5066 : SlotSyncShmemSize(void)
    1660             : {
    1661        5066 :     return sizeof(SlotSyncCtxStruct);
    1662             : }
    1663             : 
    1664             : /*
    1665             :  * Allocate and initialize the shared memory of slot synchronization.
    1666             :  */
    1667             : void
    1668        1768 : SlotSyncShmemInit(void)
    1669             : {
    1670        1768 :     Size        size = SlotSyncShmemSize();
    1671             :     bool        found;
    1672             : 
    1673        1768 :     SlotSyncCtx = (SlotSyncCtxStruct *)
    1674        1768 :         ShmemInitStruct("Slot Sync Data", size, &found);
    1675             : 
    1676        1768 :     if (!found)
    1677             :     {
    1678        1768 :         memset(SlotSyncCtx, 0, size);
    1679        1768 :         SlotSyncCtx->pid = InvalidPid;
    1680        1768 :         SpinLockInit(&SlotSyncCtx->mutex);
    1681             :     }
    1682        1768 : }
    1683             : 
    1684             : /*
    1685             :  * Error cleanup callback for slot sync SQL function.
    1686             :  */
    1687             : static void
    1688           2 : slotsync_failure_callback(int code, Datum arg)
    1689             : {
    1690           2 :     WalReceiverConn *wrconn = (WalReceiverConn *) DatumGetPointer(arg);
    1691             : 
    1692             :     /*
    1693             :      * We need to do slots cleanup here just like WalSndErrorCleanup() does.
    1694             :      *
    1695             :      * The startup process during promotion invokes ShutDownSlotSync() which
    1696             :      * waits for slot sync to finish and it does that by checking the
    1697             :      * 'syncing' flag. Thus the SQL function must be done with slots' release
    1698             :      * and cleanup to avoid any dangling temporary slots or active slots
    1699             :      * before it marks itself as finished syncing.
    1700             :      */
    1701             : 
    1702             :     /* Make sure active replication slots are released */
    1703           2 :     if (MyReplicationSlot != NULL)
    1704           0 :         ReplicationSlotRelease();
    1705             : 
    1706             :     /* Also cleanup the synced temporary slots. */
    1707           2 :     ReplicationSlotCleanup(true);
    1708             : 
    1709             :     /*
    1710             :      * The set syncing_slots indicates that the process errored out without
    1711             :      * resetting the flag. So, we need to clean up shared memory and reset the
    1712             :      * flag here.
    1713             :      */
    1714           2 :     if (syncing_slots)
    1715           2 :         reset_syncing_flag();
    1716             : 
    1717           2 :     walrcv_disconnect(wrconn);
    1718           2 : }
    1719             : 
    1720             : /*
    1721             :  * Synchronize the failover enabled replication slots using the specified
    1722             :  * primary server connection.
    1723             :  */
    1724             : void
    1725          14 : SyncReplicationSlots(WalReceiverConn *wrconn)
    1726             : {
    1727          14 :     PG_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn));
    1728             :     {
    1729          14 :         check_and_set_sync_info(InvalidPid);
    1730             : 
    1731          14 :         validate_remote_info(wrconn);
    1732             : 
    1733          12 :         synchronize_slots(wrconn);
    1734             : 
    1735             :         /* Cleanup the synced temporary slots */
    1736          12 :         ReplicationSlotCleanup(true);
    1737             : 
    1738             :         /* We are done with sync, so reset sync flag */
    1739          12 :         reset_syncing_flag();
    1740             :     }
    1741          14 :     PG_END_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn));
    1742          12 : }

Generated by: LCOV version 1.14