LCOV - code coverage report
Current view: top level - src/backend/replication - slot.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18beta1 Lines: 781 925 84.4 %
Date: 2025-06-28 09:17:06 Functions: 45 46 97.8 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * slot.c
       4             :  *     Replication slot management.
       5             :  *
       6             :  *
       7             :  * Copyright (c) 2012-2025, PostgreSQL Global Development Group
       8             :  *
       9             :  *
      10             :  * IDENTIFICATION
      11             :  *    src/backend/replication/slot.c
      12             :  *
      13             :  * NOTES
      14             :  *
      15             :  * Replication slots are used to keep state about replication streams
      16             :  * originating from this cluster.  Their primary purpose is to prevent the
      17             :  * premature removal of WAL or of old tuple versions in a manner that would
      18             :  * interfere with replication; they are also useful for monitoring purposes.
      19             :  * Slots need to be permanent (to allow restarts), crash-safe, and allocatable
      20             :  * on standbys (to support cascading setups).  The requirement that slots be
      21             :  * usable on standbys precludes storing them in the system catalogs.
      22             :  *
      23             :  * Each replication slot gets its own directory inside the directory
      24             :  * $PGDATA / PG_REPLSLOT_DIR.  Inside that directory the state file will
      25             :  * contain the slot's own data.  Additional data can be stored alongside that
      26             :  * file if required.  While the server is running, the state data is also
      27             :  * cached in memory for efficiency.
      28             :  *
      29             :  * ReplicationSlotAllocationLock must be taken in exclusive mode to allocate
      30             :  * or free a slot. ReplicationSlotControlLock must be taken in shared mode
      31             :  * to iterate over the slots, and in exclusive mode to change the in_use flag
      32             :  * of a slot.  The remaining data in each slot is protected by its mutex.
      33             :  *
      34             :  *-------------------------------------------------------------------------
      35             :  */
      36             : 
      37             : #include "postgres.h"
      38             : 
      39             : #include <unistd.h>
      40             : #include <sys/stat.h>
      41             : 
      42             : #include "access/transam.h"
      43             : #include "access/xlog_internal.h"
      44             : #include "access/xlogrecovery.h"
      45             : #include "common/file_utils.h"
      46             : #include "common/string.h"
      47             : #include "miscadmin.h"
      48             : #include "pgstat.h"
      49             : #include "postmaster/interrupt.h"
      50             : #include "replication/slotsync.h"
      51             : #include "replication/slot.h"
      52             : #include "replication/walsender_private.h"
      53             : #include "storage/fd.h"
      54             : #include "storage/ipc.h"
      55             : #include "storage/proc.h"
      56             : #include "storage/procarray.h"
      57             : #include "utils/builtins.h"
      58             : #include "utils/guc_hooks.h"
      59             : #include "utils/injection_point.h"
      60             : #include "utils/varlena.h"
      61             : 
      62             : /*
      63             :  * Replication slot on-disk data structure.
      64             :  */
      65             : typedef struct ReplicationSlotOnDisk
      66             : {
      67             :     /* first part of this struct needs to be version independent */
      68             : 
      69             :     /* data not covered by checksum */
      70             :     uint32      magic;
      71             :     pg_crc32c   checksum;
      72             : 
      73             :     /* data covered by checksum */
      74             :     uint32      version;
      75             :     uint32      length;
      76             : 
      77             :     /*
      78             :      * The actual data in the slot that follows can differ based on the above
      79             :      * 'version'.
      80             :      */
      81             : 
      82             :     ReplicationSlotPersistentData slotdata;
      83             : } ReplicationSlotOnDisk;
      84             : 
      85             : /*
      86             :  * Struct for the configuration of synchronized_standby_slots.
      87             :  *
      88             :  * Note: this must be a flat representation that can be held in a single chunk
      89             :  * of guc_malloc'd memory, so that it can be stored as the "extra" data for the
      90             :  * synchronized_standby_slots GUC.
      91             :  */
      92             : typedef struct
      93             : {
      94             :     /* Number of slot names in the slot_names[] */
      95             :     int         nslotnames;
      96             : 
      97             :     /*
      98             :      * slot_names contains 'nslotnames' consecutive null-terminated C strings.
      99             :      */
     100             :     char        slot_names[FLEXIBLE_ARRAY_MEMBER];
     101             : } SyncStandbySlotsConfigData;
     102             : 
     103             : /*
     104             :  * Lookup table for slot invalidation causes.
     105             :  */
     106             : typedef struct SlotInvalidationCauseMap
     107             : {
     108             :     ReplicationSlotInvalidationCause cause;
     109             :     const char *cause_name;
     110             : } SlotInvalidationCauseMap;
     111             : 
     112             : static const SlotInvalidationCauseMap SlotInvalidationCauses[] = {
     113             :     {RS_INVAL_NONE, "none"},
     114             :     {RS_INVAL_WAL_REMOVED, "wal_removed"},
     115             :     {RS_INVAL_HORIZON, "rows_removed"},
     116             :     {RS_INVAL_WAL_LEVEL, "wal_level_insufficient"},
     117             :     {RS_INVAL_IDLE_TIMEOUT, "idle_timeout"},
     118             : };
     119             : 
     120             : /*
     121             :  * Ensure that the lookup table is up-to-date with the enums defined in
     122             :  * ReplicationSlotInvalidationCause.
     123             :  */
     124             : StaticAssertDecl(lengthof(SlotInvalidationCauses) == (RS_INVAL_MAX_CAUSES + 1),
     125             :                  "array length mismatch");
     126             : 
     127             : /* size of version independent data */
     128             : #define ReplicationSlotOnDiskConstantSize \
     129             :     offsetof(ReplicationSlotOnDisk, slotdata)
     130             : /* size of the part of the slot not covered by the checksum */
     131             : #define ReplicationSlotOnDiskNotChecksummedSize  \
     132             :     offsetof(ReplicationSlotOnDisk, version)
     133             : /* size of the part covered by the checksum */
     134             : #define ReplicationSlotOnDiskChecksummedSize \
     135             :     sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskNotChecksummedSize
     136             : /* size of the slot data that is version dependent */
     137             : #define ReplicationSlotOnDiskV2Size \
     138             :     sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
     139             : 
     140             : #define SLOT_MAGIC      0x1051CA1   /* format identifier */
     141             : #define SLOT_VERSION    5       /* version for new files */
     142             : 
     143             : /* Control array for replication slot management */
     144             : ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
     145             : 
     146             : /* My backend's replication slot in the shared memory array */
     147             : ReplicationSlot *MyReplicationSlot = NULL;
     148             : 
     149             : /* GUC variables */
     150             : int         max_replication_slots = 10; /* the maximum number of replication
     151             :                                          * slots */
     152             : 
     153             : /*
     154             :  * Invalidate replication slots that have remained idle longer than this
     155             :  * duration; '0' disables it.
     156             :  */
     157             : int         idle_replication_slot_timeout_mins = 0;
     158             : 
     159             : /*
     160             :  * This GUC lists streaming replication standby server slot names that
     161             :  * logical WAL sender processes will wait for.
     162             :  */
     163             : char       *synchronized_standby_slots;
     164             : 
     165             : /* This is the parsed and cached configuration for synchronized_standby_slots */
     166             : static SyncStandbySlotsConfigData *synchronized_standby_slots_config;
     167             : 
     168             : /*
     169             :  * Oldest LSN that has been confirmed to be flushed to the standbys
     170             :  * corresponding to the physical slots specified in the synchronized_standby_slots GUC.
     171             :  */
     172             : static XLogRecPtr ss_oldest_flush_lsn = InvalidXLogRecPtr;
     173             : 
     174             : static void ReplicationSlotShmemExit(int code, Datum arg);
     175             : static void ReplicationSlotDropPtr(ReplicationSlot *slot);
     176             : 
     177             : /* internal persistency functions */
     178             : static void RestoreSlotFromDisk(const char *name);
     179             : static void CreateSlotOnDisk(ReplicationSlot *slot);
     180             : static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel);
     181             : 
     182             : /*
     183             :  * Report shared-memory space needed by ReplicationSlotsShmemInit.
     184             :  */
     185             : Size
     186        8204 : ReplicationSlotsShmemSize(void)
     187             : {
     188        8204 :     Size        size = 0;
     189             : 
     190        8204 :     if (max_replication_slots == 0)
     191           0 :         return size;
     192             : 
     193        8204 :     size = offsetof(ReplicationSlotCtlData, replication_slots);
     194        8204 :     size = add_size(size,
     195             :                     mul_size(max_replication_slots, sizeof(ReplicationSlot)));
     196             : 
     197        8204 :     return size;
     198             : }
     199             : 
     200             : /*
     201             :  * Allocate and initialize shared memory for replication slots.
     202             :  */
     203             : void
     204        2126 : ReplicationSlotsShmemInit(void)
     205             : {
     206             :     bool        found;
     207             : 
     208        2126 :     if (max_replication_slots == 0)
     209           0 :         return;
     210             : 
     211        2126 :     ReplicationSlotCtl = (ReplicationSlotCtlData *)
     212        2126 :         ShmemInitStruct("ReplicationSlot Ctl", ReplicationSlotsShmemSize(),
     213             :                         &found);
     214             : 
     215        2126 :     if (!found)
     216             :     {
     217             :         int         i;
     218             : 
     219             :         /* First time through, so initialize */
     220        3926 :         MemSet(ReplicationSlotCtl, 0, ReplicationSlotsShmemSize());
     221             : 
     222       22992 :         for (i = 0; i < max_replication_slots; i++)
     223             :         {
     224       20866 :             ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i];
     225             : 
     226             :             /* everything else is zeroed by the memset above */
     227       20866 :             SpinLockInit(&slot->mutex);
     228       20866 :             LWLockInitialize(&slot->io_in_progress_lock,
     229             :                              LWTRANCHE_REPLICATION_SLOT_IO);
     230       20866 :             ConditionVariableInit(&slot->active_cv);
     231             :         }
     232             :     }
     233             : }
     234             : 
     235             : /*
     236             :  * Register the callback for replication slot cleanup and releasing.
     237             :  */
     238             : void
     239       41876 : ReplicationSlotInitialize(void)
     240             : {
     241       41876 :     before_shmem_exit(ReplicationSlotShmemExit, 0);
     242       41876 : }
     243             : 
     244             : /*
     245             :  * Release and cleanup replication slots.
     246             :  */
     247             : static void
     248       41876 : ReplicationSlotShmemExit(int code, Datum arg)
     249             : {
     250             :     /* Make sure active replication slots are released */
     251       41876 :     if (MyReplicationSlot != NULL)
     252         454 :         ReplicationSlotRelease();
     253             : 
     254             :     /* Also cleanup all the temporary slots. */
     255       41876 :     ReplicationSlotCleanup(false);
     256       41876 : }
     257             : 
     258             : /*
     259             :  * Check whether the passed slot name is valid and report errors at elevel.
     260             :  *
     261             :  * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow
     262             :  * the name to be used as a directory name on every supported OS.
     263             :  *
     264             :  * Returns whether the directory name is valid or not if elevel < ERROR.
     265             :  */
     266             : bool
     267        1742 : ReplicationSlotValidateName(const char *name, int elevel)
     268             : {
     269             :     const char *cp;
     270             : 
     271        1742 :     if (strlen(name) == 0)
     272             :     {
     273           6 :         ereport(elevel,
     274             :                 (errcode(ERRCODE_INVALID_NAME),
     275             :                  errmsg("replication slot name \"%s\" is too short",
     276             :                         name)));
     277           0 :         return false;
     278             :     }
     279             : 
     280        1736 :     if (strlen(name) >= NAMEDATALEN)
     281             :     {
     282           0 :         ereport(elevel,
     283             :                 (errcode(ERRCODE_NAME_TOO_LONG),
     284             :                  errmsg("replication slot name \"%s\" is too long",
     285             :                         name)));
     286           0 :         return false;
     287             :     }
     288             : 
     289       35178 :     for (cp = name; *cp; cp++)
     290             :     {
     291       33444 :         if (!((*cp >= 'a' && *cp <= 'z')
     292       16764 :               || (*cp >= '0' && *cp <= '9')
     293        3240 :               || (*cp == '_')))
     294             :         {
     295           2 :             ereport(elevel,
     296             :                     (errcode(ERRCODE_INVALID_NAME),
     297             :                      errmsg("replication slot name \"%s\" contains invalid character",
     298             :                             name),
     299             :                      errhint("Replication slot names may only contain lower case letters, numbers, and the underscore character.")));
     300           0 :             return false;
     301             :         }
     302             :     }
     303        1734 :     return true;
     304             : }
     305             : 
     306             : /*
     307             :  * Create a new replication slot and mark it as used by this backend.
     308             :  *
     309             :  * name: Name of the slot
     310             :  * db_specific: logical decoding is db specific; if the slot is going to
     311             :  *     be used for that pass true, otherwise false.
     312             :  * two_phase: Allows decoding of prepared transactions. We allow this option
     313             :  *     to be enabled only at the slot creation time. If we allow this option
     314             :  *     to be changed during decoding then it is quite possible that we skip
     315             :  *     prepare first time because this option was not enabled. Now next time
     316             :  *     during getting changes, if the two_phase option is enabled it can skip
     317             :  *     prepare because by that time start decoding point has been moved. So the
     318             :  *     user will only get commit prepared.
     319             :  * failover: If enabled, allows the slot to be synced to standbys so
     320             :  *     that logical replication can be resumed after failover.
     321             :  * synced: True if the slot is synchronized from the primary server.
     322             :  */
     323             : void
     324        1270 : ReplicationSlotCreate(const char *name, bool db_specific,
     325             :                       ReplicationSlotPersistency persistency,
     326             :                       bool two_phase, bool failover, bool synced)
     327             : {
     328        1270 :     ReplicationSlot *slot = NULL;
     329             :     int         i;
     330             : 
     331             :     Assert(MyReplicationSlot == NULL);
     332             : 
     333        1270 :     ReplicationSlotValidateName(name, ERROR);
     334             : 
     335        1268 :     if (failover)
     336             :     {
     337             :         /*
     338             :          * Do not allow users to create the failover enabled slots on the
     339             :          * standby as we do not support sync to the cascading standby.
     340             :          *
     341             :          * However, failover enabled slots can be created during slot
     342             :          * synchronization because we need to retain the same values as the
     343             :          * remote slot.
     344             :          */
     345          44 :         if (RecoveryInProgress() && !IsSyncingReplicationSlots())
     346           0 :             ereport(ERROR,
     347             :                     errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     348             :                     errmsg("cannot enable failover for a replication slot created on the standby"));
     349             : 
     350             :         /*
     351             :          * Do not allow users to create failover enabled temporary slots,
     352             :          * because temporary slots will not be synced to the standby.
     353             :          *
     354             :          * However, failover enabled temporary slots can be created during
     355             :          * slot synchronization. See the comments atop slotsync.c for details.
     356             :          */
     357          44 :         if (persistency == RS_TEMPORARY && !IsSyncingReplicationSlots())
     358           2 :             ereport(ERROR,
     359             :                     errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     360             :                     errmsg("cannot enable failover for a temporary replication slot"));
     361             :     }
     362             : 
     363             :     /*
     364             :      * If some other backend ran this code concurrently with us, we'd likely
     365             :      * both allocate the same slot, and that would be bad.  We'd also be at
     366             :      * risk of missing a name collision.  Also, we don't want to try to create
     367             :      * a new slot while somebody's busy cleaning up an old one, because we
     368             :      * might both be monkeying with the same directory.
     369             :      */
     370        1266 :     LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
     371             : 
     372             :     /*
     373             :      * Check for name collision, and identify an allocatable slot.  We need to
     374             :      * hold ReplicationSlotControlLock in shared mode for this, so that nobody
     375             :      * else can change the in_use flags while we're looking at them.
     376             :      */
     377        1266 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     378       12120 :     for (i = 0; i < max_replication_slots; i++)
     379             :     {
     380       10860 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
     381             : 
     382       10860 :         if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
     383           6 :             ereport(ERROR,
     384             :                     (errcode(ERRCODE_DUPLICATE_OBJECT),
     385             :                      errmsg("replication slot \"%s\" already exists", name)));
     386       10854 :         if (!s->in_use && slot == NULL)
     387        1258 :             slot = s;
     388             :     }
     389        1260 :     LWLockRelease(ReplicationSlotControlLock);
     390             : 
     391             :     /* If all slots are in use, we're out of luck. */
     392        1260 :     if (slot == NULL)
     393           2 :         ereport(ERROR,
     394             :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     395             :                  errmsg("all replication slots are in use"),
     396             :                  errhint("Free one or increase \"max_replication_slots\".")));
     397             : 
     398             :     /*
     399             :      * Since this slot is not in use, nobody should be looking at any part of
     400             :      * it other than the in_use field unless they're trying to allocate it.
     401             :      * And since we hold ReplicationSlotAllocationLock, nobody except us can
     402             :      * be doing that.  So it's safe to initialize the slot.
     403             :      */
     404             :     Assert(!slot->in_use);
     405             :     Assert(slot->active_pid == 0);
     406             : 
     407             :     /* first initialize persistent data */
     408        1258 :     memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData));
     409        1258 :     namestrcpy(&slot->data.name, name);
     410        1258 :     slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
     411        1258 :     slot->data.persistency = persistency;
     412        1258 :     slot->data.two_phase = two_phase;
     413        1258 :     slot->data.two_phase_at = InvalidXLogRecPtr;
     414        1258 :     slot->data.failover = failover;
     415        1258 :     slot->data.synced = synced;
     416             : 
     417             :     /* and then data only present in shared memory */
     418        1258 :     slot->just_dirtied = false;
     419        1258 :     slot->dirty = false;
     420        1258 :     slot->effective_xmin = InvalidTransactionId;
     421        1258 :     slot->effective_catalog_xmin = InvalidTransactionId;
     422        1258 :     slot->candidate_catalog_xmin = InvalidTransactionId;
     423        1258 :     slot->candidate_xmin_lsn = InvalidXLogRecPtr;
     424        1258 :     slot->candidate_restart_valid = InvalidXLogRecPtr;
     425        1258 :     slot->candidate_restart_lsn = InvalidXLogRecPtr;
     426        1258 :     slot->last_saved_confirmed_flush = InvalidXLogRecPtr;
     427        1258 :     slot->last_saved_restart_lsn = InvalidXLogRecPtr;
     428        1258 :     slot->inactive_since = 0;
     429             : 
     430             :     /*
     431             :      * Create the slot on disk.  We haven't actually marked the slot allocated
     432             :      * yet, so no special cleanup is required if this errors out.
     433             :      */
     434        1258 :     CreateSlotOnDisk(slot);
     435             : 
     436             :     /*
     437             :      * We need to briefly prevent any other backend from iterating over the
     438             :      * slots while we flip the in_use flag. We also need to set the active
     439             :      * flag while holding the ControlLock as otherwise a concurrent
     440             :      * ReplicationSlotAcquire() could acquire the slot as well.
     441             :      */
     442        1258 :     LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
     443             : 
     444        1258 :     slot->in_use = true;
     445             : 
     446             :     /* We can now mark the slot active, and that makes it our slot. */
     447        1258 :     SpinLockAcquire(&slot->mutex);
     448             :     Assert(slot->active_pid == 0);
     449        1258 :     slot->active_pid = MyProcPid;
     450        1258 :     SpinLockRelease(&slot->mutex);
     451        1258 :     MyReplicationSlot = slot;
     452             : 
     453        1258 :     LWLockRelease(ReplicationSlotControlLock);
     454             : 
     455             :     /*
     456             :      * Create statistics entry for the new logical slot. We don't collect any
     457             :      * stats for physical slots, so no need to create an entry for the same.
     458             :      * See ReplicationSlotDropPtr for why we need to do this before releasing
     459             :      * ReplicationSlotAllocationLock.
     460             :      */
     461        1258 :     if (SlotIsLogical(slot))
     462         914 :         pgstat_create_replslot(slot);
     463             : 
     464             :     /*
     465             :      * Now that the slot has been marked as in_use and active, it's safe to
     466             :      * let somebody else try to allocate a slot.
     467             :      */
     468        1258 :     LWLockRelease(ReplicationSlotAllocationLock);
     469             : 
     470             :     /* Let everybody know we've modified this slot */
     471        1258 :     ConditionVariableBroadcast(&slot->active_cv);
     472        1258 : }
     473             : 
     474             : /*
     475             :  * Search for the named replication slot.
     476             :  *
     477             :  * Return the replication slot if found, otherwise NULL.
     478             :  */
     479             : ReplicationSlot *
     480        2794 : SearchNamedReplicationSlot(const char *name, bool need_lock)
     481             : {
     482             :     int         i;
     483        2794 :     ReplicationSlot *slot = NULL;
     484             : 
     485        2794 :     if (need_lock)
     486         184 :         LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     487             : 
     488        4896 :     for (i = 0; i < max_replication_slots; i++)
     489             :     {
     490        4854 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
     491             : 
     492        4854 :         if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
     493             :         {
     494        2752 :             slot = s;
     495        2752 :             break;
     496             :         }
     497             :     }
     498             : 
     499        2794 :     if (need_lock)
     500         184 :         LWLockRelease(ReplicationSlotControlLock);
     501             : 
     502        2794 :     return slot;
     503             : }
     504             : 
     505             : /*
     506             :  * Return the index of the replication slot in
     507             :  * ReplicationSlotCtl->replication_slots.
     508             :  *
     509             :  * This is mainly useful to have an efficient key for storing replication slot
     510             :  * stats.
     511             :  */
     512             : int
     513       16236 : ReplicationSlotIndex(ReplicationSlot *slot)
     514             : {
     515             :     Assert(slot >= ReplicationSlotCtl->replication_slots &&
     516             :            slot < ReplicationSlotCtl->replication_slots + max_replication_slots);
     517             : 
     518       16236 :     return slot - ReplicationSlotCtl->replication_slots;
     519             : }
     520             : 
     521             : /*
     522             :  * If the slot at 'index' is unused, return false. Otherwise 'name' is set to
     523             :  * the slot's name and true is returned.
     524             :  *
     525             :  * This likely is only useful for pgstat_replslot.c during shutdown, in other
     526             :  * cases there are obvious TOCTOU issues.
     527             :  */
     528             : bool
     529         156 : ReplicationSlotName(int index, Name name)
     530             : {
     531             :     ReplicationSlot *slot;
     532             :     bool        found;
     533             : 
     534         156 :     slot = &ReplicationSlotCtl->replication_slots[index];
     535             : 
     536             :     /*
     537             :      * Ensure that the slot cannot be dropped while we copy the name. Don't
     538             :      * need the spinlock as the name of an existing slot cannot change.
     539             :      */
     540         156 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     541         156 :     found = slot->in_use;
     542         156 :     if (slot->in_use)
     543         156 :         namestrcpy(name, NameStr(slot->data.name));
     544         156 :     LWLockRelease(ReplicationSlotControlLock);
     545             : 
     546         156 :     return found;
     547             : }
     548             : 
     549             : /*
     550             :  * Find a previously created slot and mark it as used by this process.
     551             :  *
     552             :  * An error is raised if nowait is true and the slot is currently in use. If
     553             :  * nowait is false, we sleep until the slot is released by the owning process.
     554             :  *
     555             :  * An error is raised if error_if_invalid is true and the slot is found to
     556             :  * be invalid. It should always be set to true, except when we are temporarily
     557             :  * acquiring the slot and don't intend to change it.
     558             :  */
     559             : void
     560        2462 : ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
     561             : {
     562             :     ReplicationSlot *s;
     563             :     int         active_pid;
     564             : 
     565             :     Assert(name != NULL);
     566             : 
     567        2462 : retry:
     568             :     Assert(MyReplicationSlot == NULL);
     569             : 
     570        2462 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     571             : 
     572             :     /* Check if the slot exits with the given name. */
     573        2462 :     s = SearchNamedReplicationSlot(name, false);
     574        2462 :     if (s == NULL || !s->in_use)
     575             :     {
     576          18 :         LWLockRelease(ReplicationSlotControlLock);
     577             : 
     578          18 :         ereport(ERROR,
     579             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
     580             :                  errmsg("replication slot \"%s\" does not exist",
     581             :                         name)));
     582             :     }
     583             : 
     584             :     /*
     585             :      * This is the slot we want; check if it's active under some other
     586             :      * process.  In single user mode, we don't need this check.
     587             :      */
     588        2444 :     if (IsUnderPostmaster)
     589             :     {
     590             :         /*
     591             :          * Get ready to sleep on the slot in case it is active.  (We may end
     592             :          * up not sleeping, but we don't want to do this while holding the
     593             :          * spinlock.)
     594             :          */
     595        2444 :         if (!nowait)
     596         528 :             ConditionVariablePrepareToSleep(&s->active_cv);
     597             : 
     598             :         /*
     599             :          * It is important to reset the inactive_since under spinlock here to
     600             :          * avoid race conditions with slot invalidation. See comments related
     601             :          * to inactive_since in InvalidatePossiblyObsoleteSlot.
     602             :          */
     603        2444 :         SpinLockAcquire(&s->mutex);
     604        2444 :         if (s->active_pid == 0)
     605        2170 :             s->active_pid = MyProcPid;
     606        2444 :         active_pid = s->active_pid;
     607        2444 :         ReplicationSlotSetInactiveSince(s, 0, false);
     608        2444 :         SpinLockRelease(&s->mutex);
     609             :     }
     610             :     else
     611             :     {
     612           0 :         active_pid = MyProcPid;
     613           0 :         ReplicationSlotSetInactiveSince(s, 0, true);
     614             :     }
     615        2444 :     LWLockRelease(ReplicationSlotControlLock);
     616             : 
     617             :     /*
     618             :      * If we found the slot but it's already active in another process, we
     619             :      * wait until the owning process signals us that it's been released, or
     620             :      * error out.
     621             :      */
     622        2444 :     if (active_pid != MyProcPid)
     623             :     {
     624           0 :         if (!nowait)
     625             :         {
     626             :             /* Wait here until we get signaled, and then restart */
     627           0 :             ConditionVariableSleep(&s->active_cv,
     628             :                                    WAIT_EVENT_REPLICATION_SLOT_DROP);
     629           0 :             ConditionVariableCancelSleep();
     630           0 :             goto retry;
     631             :         }
     632             : 
     633           0 :         ereport(ERROR,
     634             :                 (errcode(ERRCODE_OBJECT_IN_USE),
     635             :                  errmsg("replication slot \"%s\" is active for PID %d",
     636             :                         NameStr(s->data.name), active_pid)));
     637             :     }
     638        2444 :     else if (!nowait)
     639         528 :         ConditionVariableCancelSleep(); /* no sleep needed after all */
     640             : 
     641             :     /* We made this slot active, so it's ours now. */
     642        2444 :     MyReplicationSlot = s;
     643             : 
     644             :     /*
     645             :      * We need to check for invalidation after making the slot ours to avoid
     646             :      * the possible race condition with the checkpointer that can otherwise
     647             :      * invalidate the slot immediately after the check.
     648             :      */
     649        2444 :     if (error_if_invalid && s->data.invalidated != RS_INVAL_NONE)
     650          14 :         ereport(ERROR,
     651             :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     652             :                 errmsg("can no longer access replication slot \"%s\"",
     653             :                        NameStr(s->data.name)),
     654             :                 errdetail("This replication slot has been invalidated due to \"%s\".",
     655             :                           GetSlotInvalidationCauseName(s->data.invalidated)));
     656             : 
     657             :     /* Let everybody know we've modified this slot */
     658        2430 :     ConditionVariableBroadcast(&s->active_cv);
     659             : 
     660             :     /*
     661             :      * The call to pgstat_acquire_replslot() protects against stats for a
     662             :      * different slot, from before a restart or such, being present during
     663             :      * pgstat_report_replslot().
     664             :      */
     665        2430 :     if (SlotIsLogical(s))
     666        2034 :         pgstat_acquire_replslot(s);
     667             : 
     668             : 
     669        2430 :     if (am_walsender)
     670             :     {
     671        1646 :         ereport(log_replication_commands ? LOG : DEBUG1,
     672             :                 SlotIsLogical(s)
     673             :                 ? errmsg("acquired logical replication slot \"%s\"",
     674             :                          NameStr(s->data.name))
     675             :                 : errmsg("acquired physical replication slot \"%s\"",
     676             :                          NameStr(s->data.name)));
     677             :     }
     678        2430 : }
     679             : 
     680             : /*
     681             :  * Release the replication slot that this backend considers to own.
     682             :  *
     683             :  * This or another backend can re-acquire the slot later.
     684             :  * Resources this slot requires will be preserved.
     685             :  */
     686             : void
     687        2948 : ReplicationSlotRelease(void)
     688             : {
     689        2948 :     ReplicationSlot *slot = MyReplicationSlot;
     690        2948 :     char       *slotname = NULL;    /* keep compiler quiet */
     691        2948 :     bool        is_logical = false; /* keep compiler quiet */
     692        2948 :     TimestampTz now = 0;
     693             : 
     694             :     Assert(slot != NULL && slot->active_pid != 0);
     695             : 
     696        2948 :     if (am_walsender)
     697             :     {
     698        2060 :         slotname = pstrdup(NameStr(slot->data.name));
     699        2060 :         is_logical = SlotIsLogical(slot);
     700             :     }
     701             : 
     702        2948 :     if (slot->data.persistency == RS_EPHEMERAL)
     703             :     {
     704             :         /*
     705             :          * Delete the slot. There is no !PANIC case where this is allowed to
     706             :          * fail, all that may happen is an incomplete cleanup of the on-disk
     707             :          * data.
     708             :          */
     709          10 :         ReplicationSlotDropAcquired();
     710             :     }
     711             : 
     712             :     /*
     713             :      * If slot needed to temporarily restrain both data and catalog xmin to
     714             :      * create the catalog snapshot, remove that temporary constraint.
     715             :      * Snapshots can only be exported while the initial snapshot is still
     716             :      * acquired.
     717             :      */
     718        2948 :     if (!TransactionIdIsValid(slot->data.xmin) &&
     719        2896 :         TransactionIdIsValid(slot->effective_xmin))
     720             :     {
     721         390 :         SpinLockAcquire(&slot->mutex);
     722         390 :         slot->effective_xmin = InvalidTransactionId;
     723         390 :         SpinLockRelease(&slot->mutex);
     724         390 :         ReplicationSlotsComputeRequiredXmin(false);
     725             :     }
     726             : 
     727             :     /*
     728             :      * Set the time since the slot has become inactive. We get the current
     729             :      * time beforehand to avoid system call while holding the spinlock.
     730             :      */
     731        2948 :     now = GetCurrentTimestamp();
     732             : 
     733        2948 :     if (slot->data.persistency == RS_PERSISTENT)
     734             :     {
     735             :         /*
     736             :          * Mark persistent slot inactive.  We're not freeing it, just
     737             :          * disconnecting, but wake up others that may be waiting for it.
     738             :          */
     739        2386 :         SpinLockAcquire(&slot->mutex);
     740        2386 :         slot->active_pid = 0;
     741        2386 :         ReplicationSlotSetInactiveSince(slot, now, false);
     742        2386 :         SpinLockRelease(&slot->mutex);
     743        2386 :         ConditionVariableBroadcast(&slot->active_cv);
     744             :     }
     745             :     else
     746         562 :         ReplicationSlotSetInactiveSince(slot, now, true);
     747             : 
     748        2948 :     MyReplicationSlot = NULL;
     749             : 
     750             :     /* might not have been set when we've been a plain slot */
     751        2948 :     LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
     752        2948 :     MyProc->statusFlags &= ~PROC_IN_LOGICAL_DECODING;
     753        2948 :     ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags;
     754        2948 :     LWLockRelease(ProcArrayLock);
     755             : 
     756        2948 :     if (am_walsender)
     757             :     {
     758        2060 :         ereport(log_replication_commands ? LOG : DEBUG1,
     759             :                 is_logical
     760             :                 ? errmsg("released logical replication slot \"%s\"",
     761             :                          slotname)
     762             :                 : errmsg("released physical replication slot \"%s\"",
     763             :                          slotname));
     764             : 
     765        2060 :         pfree(slotname);
     766             :     }
     767        2948 : }
     768             : 
     769             : /*
     770             :  * Cleanup temporary slots created in current session.
     771             :  *
     772             :  * Cleanup only synced temporary slots if 'synced_only' is true, else
     773             :  * cleanup all temporary slots.
     774             :  */
     775             : void
     776       85200 : ReplicationSlotCleanup(bool synced_only)
     777             : {
     778             :     int         i;
     779             : 
     780             :     Assert(MyReplicationSlot == NULL);
     781             : 
     782       85478 : restart:
     783       85478 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     784      926206 :     for (i = 0; i < max_replication_slots; i++)
     785             :     {
     786      841006 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
     787             : 
     788      841006 :         if (!s->in_use)
     789      814504 :             continue;
     790             : 
     791       26502 :         SpinLockAcquire(&s->mutex);
     792       26502 :         if ((s->active_pid == MyProcPid &&
     793         278 :              (!synced_only || s->data.synced)))
     794             :         {
     795             :             Assert(s->data.persistency == RS_TEMPORARY);
     796         278 :             SpinLockRelease(&s->mutex);
     797         278 :             LWLockRelease(ReplicationSlotControlLock);  /* avoid deadlock */
     798             : 
     799         278 :             ReplicationSlotDropPtr(s);
     800             : 
     801         278 :             ConditionVariableBroadcast(&s->active_cv);
     802         278 :             goto restart;
     803             :         }
     804             :         else
     805       26224 :             SpinLockRelease(&s->mutex);
     806             :     }
     807             : 
     808       85200 :     LWLockRelease(ReplicationSlotControlLock);
     809       85200 : }
     810             : 
     811             : /*
     812             :  * Permanently drop replication slot identified by the passed in name.
     813             :  */
     814             : void
     815         794 : ReplicationSlotDrop(const char *name, bool nowait)
     816             : {
     817             :     Assert(MyReplicationSlot == NULL);
     818             : 
     819         794 :     ReplicationSlotAcquire(name, nowait, false);
     820             : 
     821             :     /*
     822             :      * Do not allow users to drop the slots which are currently being synced
     823             :      * from the primary to the standby.
     824             :      */
     825         780 :     if (RecoveryInProgress() && MyReplicationSlot->data.synced)
     826           2 :         ereport(ERROR,
     827             :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     828             :                 errmsg("cannot drop replication slot \"%s\"", name),
     829             :                 errdetail("This replication slot is being synchronized from the primary server."));
     830             : 
     831         778 :     ReplicationSlotDropAcquired();
     832         778 : }
     833             : 
     834             : /*
     835             :  * Change the definition of the slot identified by the specified name.
     836             :  */
     837             : void
     838          12 : ReplicationSlotAlter(const char *name, const bool *failover,
     839             :                      const bool *two_phase)
     840             : {
     841          12 :     bool        update_slot = false;
     842             : 
     843             :     Assert(MyReplicationSlot == NULL);
     844             :     Assert(failover || two_phase);
     845             : 
     846          12 :     ReplicationSlotAcquire(name, false, true);
     847             : 
     848          10 :     if (SlotIsPhysical(MyReplicationSlot))
     849           0 :         ereport(ERROR,
     850             :                 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     851             :                 errmsg("cannot use %s with a physical replication slot",
     852             :                        "ALTER_REPLICATION_SLOT"));
     853             : 
     854          10 :     if (RecoveryInProgress())
     855             :     {
     856             :         /*
     857             :          * Do not allow users to alter the slots which are currently being
     858             :          * synced from the primary to the standby.
     859             :          */
     860           2 :         if (MyReplicationSlot->data.synced)
     861           2 :             ereport(ERROR,
     862             :                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     863             :                     errmsg("cannot alter replication slot \"%s\"", name),
     864             :                     errdetail("This replication slot is being synchronized from the primary server."));
     865             : 
     866             :         /*
     867             :          * Do not allow users to enable failover on the standby as we do not
     868             :          * support sync to the cascading standby.
     869             :          */
     870           0 :         if (failover && *failover)
     871           0 :             ereport(ERROR,
     872             :                     errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     873             :                     errmsg("cannot enable failover for a replication slot"
     874             :                            " on the standby"));
     875             :     }
     876             : 
     877           8 :     if (failover)
     878             :     {
     879             :         /*
     880             :          * Do not allow users to enable failover for temporary slots as we do
     881             :          * not support syncing temporary slots to the standby.
     882             :          */
     883           6 :         if (*failover && MyReplicationSlot->data.persistency == RS_TEMPORARY)
     884           0 :             ereport(ERROR,
     885             :                     errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     886             :                     errmsg("cannot enable failover for a temporary replication slot"));
     887             : 
     888           6 :         if (MyReplicationSlot->data.failover != *failover)
     889             :         {
     890           6 :             SpinLockAcquire(&MyReplicationSlot->mutex);
     891           6 :             MyReplicationSlot->data.failover = *failover;
     892           6 :             SpinLockRelease(&MyReplicationSlot->mutex);
     893             : 
     894           6 :             update_slot = true;
     895             :         }
     896             :     }
     897             : 
     898           8 :     if (two_phase && MyReplicationSlot->data.two_phase != *two_phase)
     899             :     {
     900           2 :         SpinLockAcquire(&MyReplicationSlot->mutex);
     901           2 :         MyReplicationSlot->data.two_phase = *two_phase;
     902           2 :         SpinLockRelease(&MyReplicationSlot->mutex);
     903             : 
     904           2 :         update_slot = true;
     905             :     }
     906             : 
     907           8 :     if (update_slot)
     908             :     {
     909           8 :         ReplicationSlotMarkDirty();
     910           8 :         ReplicationSlotSave();
     911             :     }
     912             : 
     913           8 :     ReplicationSlotRelease();
     914           8 : }
     915             : 
     916             : /*
     917             :  * Permanently drop the currently acquired replication slot.
     918             :  */
     919             : void
     920         802 : ReplicationSlotDropAcquired(void)
     921             : {
     922         802 :     ReplicationSlot *slot = MyReplicationSlot;
     923             : 
     924             :     Assert(MyReplicationSlot != NULL);
     925             : 
     926             :     /* slot isn't acquired anymore */
     927         802 :     MyReplicationSlot = NULL;
     928             : 
     929         802 :     ReplicationSlotDropPtr(slot);
     930         802 : }
     931             : 
     932             : /*
     933             :  * Permanently drop the replication slot which will be released by the point
     934             :  * this function returns.
     935             :  */
     936             : static void
     937        1080 : ReplicationSlotDropPtr(ReplicationSlot *slot)
     938             : {
     939             :     char        path[MAXPGPATH];
     940             :     char        tmppath[MAXPGPATH];
     941             : 
     942             :     /*
     943             :      * If some other backend ran this code concurrently with us, we might try
     944             :      * to delete a slot with a certain name while someone else was trying to
     945             :      * create a slot with the same name.
     946             :      */
     947        1080 :     LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
     948             : 
     949             :     /* Generate pathnames. */
     950        1080 :     sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(slot->data.name));
     951        1080 :     sprintf(tmppath, "%s/%s.tmp", PG_REPLSLOT_DIR, NameStr(slot->data.name));
     952             : 
     953             :     /*
     954             :      * Rename the slot directory on disk, so that we'll no longer recognize
     955             :      * this as a valid slot.  Note that if this fails, we've got to mark the
     956             :      * slot inactive before bailing out.  If we're dropping an ephemeral or a
     957             :      * temporary slot, we better never fail hard as the caller won't expect
     958             :      * the slot to survive and this might get called during error handling.
     959             :      */
     960        1080 :     if (rename(path, tmppath) == 0)
     961             :     {
     962             :         /*
     963             :          * We need to fsync() the directory we just renamed and its parent to
     964             :          * make sure that our changes are on disk in a crash-safe fashion.  If
     965             :          * fsync() fails, we can't be sure whether the changes are on disk or
     966             :          * not.  For now, we handle that by panicking;
     967             :          * StartupReplicationSlots() will try to straighten it out after
     968             :          * restart.
     969             :          */
     970        1080 :         START_CRIT_SECTION();
     971        1080 :         fsync_fname(tmppath, true);
     972        1080 :         fsync_fname(PG_REPLSLOT_DIR, true);
     973        1080 :         END_CRIT_SECTION();
     974             :     }
     975             :     else
     976             :     {
     977           0 :         bool        fail_softly = slot->data.persistency != RS_PERSISTENT;
     978             : 
     979           0 :         SpinLockAcquire(&slot->mutex);
     980           0 :         slot->active_pid = 0;
     981           0 :         SpinLockRelease(&slot->mutex);
     982             : 
     983             :         /* wake up anyone waiting on this slot */
     984           0 :         ConditionVariableBroadcast(&slot->active_cv);
     985             : 
     986           0 :         ereport(fail_softly ? WARNING : ERROR,
     987             :                 (errcode_for_file_access(),
     988             :                  errmsg("could not rename file \"%s\" to \"%s\": %m",
     989             :                         path, tmppath)));
     990             :     }
     991             : 
     992             :     /*
     993             :      * The slot is definitely gone.  Lock out concurrent scans of the array
     994             :      * long enough to kill it.  It's OK to clear the active PID here without
     995             :      * grabbing the mutex because nobody else can be scanning the array here,
     996             :      * and nobody can be attached to this slot and thus access it without
     997             :      * scanning the array.
     998             :      *
     999             :      * Also wake up processes waiting for it.
    1000             :      */
    1001        1080 :     LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
    1002        1080 :     slot->active_pid = 0;
    1003        1080 :     slot->in_use = false;
    1004        1080 :     LWLockRelease(ReplicationSlotControlLock);
    1005        1080 :     ConditionVariableBroadcast(&slot->active_cv);
    1006             : 
    1007             :     /*
    1008             :      * Slot is dead and doesn't prevent resource removal anymore, recompute
    1009             :      * limits.
    1010             :      */
    1011        1080 :     ReplicationSlotsComputeRequiredXmin(false);
    1012        1080 :     ReplicationSlotsComputeRequiredLSN();
    1013             : 
    1014             :     /*
    1015             :      * If removing the directory fails, the worst thing that will happen is
    1016             :      * that the user won't be able to create a new slot with the same name
    1017             :      * until the next server restart.  We warn about it, but that's all.
    1018             :      */
    1019        1080 :     if (!rmtree(tmppath, true))
    1020           0 :         ereport(WARNING,
    1021             :                 (errmsg("could not remove directory \"%s\"", tmppath)));
    1022             : 
    1023             :     /*
    1024             :      * Drop the statistics entry for the replication slot.  Do this while
    1025             :      * holding ReplicationSlotAllocationLock so that we don't drop a
    1026             :      * statistics entry for another slot with the same name just created in
    1027             :      * another session.
    1028             :      */
    1029        1080 :     if (SlotIsLogical(slot))
    1030         786 :         pgstat_drop_replslot(slot);
    1031             : 
    1032             :     /*
    1033             :      * We release this at the very end, so that nobody starts trying to create
    1034             :      * a slot while we're still cleaning up the detritus of the old one.
    1035             :      */
    1036        1080 :     LWLockRelease(ReplicationSlotAllocationLock);
    1037        1080 : }
    1038             : 
    1039             : /*
    1040             :  * Serialize the currently acquired slot's state from memory to disk, thereby
    1041             :  * guaranteeing the current state will survive a crash.
    1042             :  */
    1043             : void
    1044        2648 : ReplicationSlotSave(void)
    1045             : {
    1046             :     char        path[MAXPGPATH];
    1047             : 
    1048             :     Assert(MyReplicationSlot != NULL);
    1049             : 
    1050        2648 :     sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(MyReplicationSlot->data.name));
    1051        2648 :     SaveSlotToPath(MyReplicationSlot, path, ERROR);
    1052        2648 : }
    1053             : 
    1054             : /*
    1055             :  * Signal that it would be useful if the currently acquired slot would be
    1056             :  * flushed out to disk.
    1057             :  *
    1058             :  * Note that the actual flush to disk can be delayed for a long time, if
    1059             :  * required for correctness explicitly do a ReplicationSlotSave().
    1060             :  */
    1061             : void
    1062       61970 : ReplicationSlotMarkDirty(void)
    1063             : {
    1064       61970 :     ReplicationSlot *slot = MyReplicationSlot;
    1065             : 
    1066             :     Assert(MyReplicationSlot != NULL);
    1067             : 
    1068       61970 :     SpinLockAcquire(&slot->mutex);
    1069       61970 :     MyReplicationSlot->just_dirtied = true;
    1070       61970 :     MyReplicationSlot->dirty = true;
    1071       61970 :     SpinLockRelease(&slot->mutex);
    1072       61970 : }
    1073             : 
    1074             : /*
    1075             :  * Convert a slot that's marked as RS_EPHEMERAL or RS_TEMPORARY to a
    1076             :  * RS_PERSISTENT slot, guaranteeing it will be there after an eventual crash.
    1077             :  */
    1078             : void
    1079         888 : ReplicationSlotPersist(void)
    1080             : {
    1081         888 :     ReplicationSlot *slot = MyReplicationSlot;
    1082             : 
    1083             :     Assert(slot != NULL);
    1084             :     Assert(slot->data.persistency != RS_PERSISTENT);
    1085             : 
    1086         888 :     SpinLockAcquire(&slot->mutex);
    1087         888 :     slot->data.persistency = RS_PERSISTENT;
    1088         888 :     SpinLockRelease(&slot->mutex);
    1089             : 
    1090         888 :     ReplicationSlotMarkDirty();
    1091         888 :     ReplicationSlotSave();
    1092         888 : }
    1093             : 
    1094             : /*
    1095             :  * Compute the oldest xmin across all slots and store it in the ProcArray.
    1096             :  *
    1097             :  * If already_locked is true, ProcArrayLock has already been acquired
    1098             :  * exclusively.
    1099             :  */
    1100             : void
    1101        4538 : ReplicationSlotsComputeRequiredXmin(bool already_locked)
    1102             : {
    1103             :     int         i;
    1104        4538 :     TransactionId agg_xmin = InvalidTransactionId;
    1105        4538 :     TransactionId agg_catalog_xmin = InvalidTransactionId;
    1106             : 
    1107             :     Assert(ReplicationSlotCtl != NULL);
    1108             : 
    1109        4538 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    1110             : 
    1111       45776 :     for (i = 0; i < max_replication_slots; i++)
    1112             :     {
    1113       41238 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
    1114             :         TransactionId effective_xmin;
    1115             :         TransactionId effective_catalog_xmin;
    1116             :         bool        invalidated;
    1117             : 
    1118       41238 :         if (!s->in_use)
    1119       36970 :             continue;
    1120             : 
    1121        4268 :         SpinLockAcquire(&s->mutex);
    1122        4268 :         effective_xmin = s->effective_xmin;
    1123        4268 :         effective_catalog_xmin = s->effective_catalog_xmin;
    1124        4268 :         invalidated = s->data.invalidated != RS_INVAL_NONE;
    1125        4268 :         SpinLockRelease(&s->mutex);
    1126             : 
    1127             :         /* invalidated slots need not apply */
    1128        4268 :         if (invalidated)
    1129          44 :             continue;
    1130             : 
    1131             :         /* check the data xmin */
    1132        4224 :         if (TransactionIdIsValid(effective_xmin) &&
    1133          34 :             (!TransactionIdIsValid(agg_xmin) ||
    1134          34 :              TransactionIdPrecedes(effective_xmin, agg_xmin)))
    1135         624 :             agg_xmin = effective_xmin;
    1136             : 
    1137             :         /* check the catalog xmin */
    1138        4224 :         if (TransactionIdIsValid(effective_catalog_xmin) &&
    1139        1846 :             (!TransactionIdIsValid(agg_catalog_xmin) ||
    1140        1846 :              TransactionIdPrecedes(effective_catalog_xmin, agg_catalog_xmin)))
    1141        2238 :             agg_catalog_xmin = effective_catalog_xmin;
    1142             :     }
    1143             : 
    1144        4538 :     LWLockRelease(ReplicationSlotControlLock);
    1145             : 
    1146        4538 :     ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
    1147        4538 : }
    1148             : 
    1149             : /*
    1150             :  * Compute the oldest restart LSN across all slots and inform xlog module.
    1151             :  *
    1152             :  * Note: while max_slot_wal_keep_size is theoretically relevant for this
    1153             :  * purpose, we don't try to account for that, because this module doesn't
    1154             :  * know what to compare against.
    1155             :  */
    1156             : void
    1157       63264 : ReplicationSlotsComputeRequiredLSN(void)
    1158             : {
    1159             :     int         i;
    1160       63264 :     XLogRecPtr  min_required = InvalidXLogRecPtr;
    1161             : 
    1162             :     Assert(ReplicationSlotCtl != NULL);
    1163             : 
    1164       63264 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    1165      687306 :     for (i = 0; i < max_replication_slots; i++)
    1166             :     {
    1167      624042 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
    1168             :         XLogRecPtr  restart_lsn;
    1169             :         XLogRecPtr  last_saved_restart_lsn;
    1170             :         bool        invalidated;
    1171             :         ReplicationSlotPersistency persistency;
    1172             : 
    1173      624042 :         if (!s->in_use)
    1174      561024 :             continue;
    1175             : 
    1176       63018 :         SpinLockAcquire(&s->mutex);
    1177       63018 :         persistency = s->data.persistency;
    1178       63018 :         restart_lsn = s->data.restart_lsn;
    1179       63018 :         invalidated = s->data.invalidated != RS_INVAL_NONE;
    1180       63018 :         last_saved_restart_lsn = s->last_saved_restart_lsn;
    1181       63018 :         SpinLockRelease(&s->mutex);
    1182             : 
    1183             :         /* invalidated slots need not apply */
    1184       63018 :         if (invalidated)
    1185          46 :             continue;
    1186             : 
    1187             :         /*
    1188             :          * For persistent slot use last_saved_restart_lsn to compute the
    1189             :          * oldest LSN for removal of WAL segments.  The segments between
    1190             :          * last_saved_restart_lsn and restart_lsn might be needed by a
    1191             :          * persistent slot in the case of database crash.  Non-persistent
    1192             :          * slots can't survive the database crash, so we don't care about
    1193             :          * last_saved_restart_lsn for them.
    1194             :          */
    1195       62972 :         if (persistency == RS_PERSISTENT)
    1196             :         {
    1197       61384 :             if (last_saved_restart_lsn != InvalidXLogRecPtr &&
    1198             :                 restart_lsn > last_saved_restart_lsn)
    1199             :             {
    1200       57472 :                 restart_lsn = last_saved_restart_lsn;
    1201             :             }
    1202             :         }
    1203             : 
    1204       62972 :         if (restart_lsn != InvalidXLogRecPtr &&
    1205        1894 :             (min_required == InvalidXLogRecPtr ||
    1206             :              restart_lsn < min_required))
    1207       61208 :             min_required = restart_lsn;
    1208             :     }
    1209       63264 :     LWLockRelease(ReplicationSlotControlLock);
    1210             : 
    1211       63264 :     XLogSetReplicationSlotMinimumLSN(min_required);
    1212       63264 : }
    1213             : 
    1214             : /*
    1215             :  * Compute the oldest WAL LSN required by *logical* decoding slots..
    1216             :  *
    1217             :  * Returns InvalidXLogRecPtr if logical decoding is disabled or no logical
    1218             :  * slots exist.
    1219             :  *
    1220             :  * NB: this returns a value >= ReplicationSlotsComputeRequiredLSN(), since it
    1221             :  * ignores physical replication slots.
    1222             :  *
    1223             :  * The results aren't required frequently, so we don't maintain a precomputed
    1224             :  * value like we do for ComputeRequiredLSN() and ComputeRequiredXmin().
    1225             :  */
    1226             : XLogRecPtr
    1227        6700 : ReplicationSlotsComputeLogicalRestartLSN(void)
    1228             : {
    1229        6700 :     XLogRecPtr  result = InvalidXLogRecPtr;
    1230             :     int         i;
    1231             : 
    1232        6700 :     if (max_replication_slots <= 0)
    1233           0 :         return InvalidXLogRecPtr;
    1234             : 
    1235        6700 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    1236             : 
    1237       72608 :     for (i = 0; i < max_replication_slots; i++)
    1238             :     {
    1239             :         ReplicationSlot *s;
    1240             :         XLogRecPtr  restart_lsn;
    1241             :         XLogRecPtr  last_saved_restart_lsn;
    1242             :         bool        invalidated;
    1243             :         ReplicationSlotPersistency persistency;
    1244             : 
    1245       65908 :         s = &ReplicationSlotCtl->replication_slots[i];
    1246             : 
    1247             :         /* cannot change while ReplicationSlotCtlLock is held */
    1248       65908 :         if (!s->in_use)
    1249       64596 :             continue;
    1250             : 
    1251             :         /* we're only interested in logical slots */
    1252        1312 :         if (!SlotIsLogical(s))
    1253         964 :             continue;
    1254             : 
    1255             :         /* read once, it's ok if it increases while we're checking */
    1256         348 :         SpinLockAcquire(&s->mutex);
    1257         348 :         persistency = s->data.persistency;
    1258         348 :         restart_lsn = s->data.restart_lsn;
    1259         348 :         invalidated = s->data.invalidated != RS_INVAL_NONE;
    1260         348 :         last_saved_restart_lsn = s->last_saved_restart_lsn;
    1261         348 :         SpinLockRelease(&s->mutex);
    1262             : 
    1263             :         /* invalidated slots need not apply */
    1264         348 :         if (invalidated)
    1265           8 :             continue;
    1266             : 
    1267             :         /*
    1268             :          * For persistent slot use last_saved_restart_lsn to compute the
    1269             :          * oldest LSN for removal of WAL segments.  The segments between
    1270             :          * last_saved_restart_lsn and restart_lsn might be needed by a
    1271             :          * persistent slot in the case of database crash.  Non-persistent
    1272             :          * slots can't survive the database crash, so we don't care about
    1273             :          * last_saved_restart_lsn for them.
    1274             :          */
    1275         340 :         if (persistency == RS_PERSISTENT)
    1276             :         {
    1277         336 :             if (last_saved_restart_lsn != InvalidXLogRecPtr &&
    1278             :                 restart_lsn > last_saved_restart_lsn)
    1279             :             {
    1280           0 :                 restart_lsn = last_saved_restart_lsn;
    1281             :             }
    1282             :         }
    1283             : 
    1284         340 :         if (restart_lsn == InvalidXLogRecPtr)
    1285           0 :             continue;
    1286             : 
    1287         340 :         if (result == InvalidXLogRecPtr ||
    1288             :             restart_lsn < result)
    1289         272 :             result = restart_lsn;
    1290             :     }
    1291             : 
    1292        6700 :     LWLockRelease(ReplicationSlotControlLock);
    1293             : 
    1294        6700 :     return result;
    1295             : }
    1296             : 
    1297             : /*
    1298             :  * ReplicationSlotsCountDBSlots -- count the number of slots that refer to the
    1299             :  * passed database oid.
    1300             :  *
    1301             :  * Returns true if there are any slots referencing the database. *nslots will
    1302             :  * be set to the absolute number of slots in the database, *nactive to ones
    1303             :  * currently active.
    1304             :  */
    1305             : bool
    1306          90 : ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
    1307             : {
    1308             :     int         i;
    1309             : 
    1310          90 :     *nslots = *nactive = 0;
    1311             : 
    1312          90 :     if (max_replication_slots <= 0)
    1313           0 :         return false;
    1314             : 
    1315          90 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    1316         932 :     for (i = 0; i < max_replication_slots; i++)
    1317             :     {
    1318             :         ReplicationSlot *s;
    1319             : 
    1320         842 :         s = &ReplicationSlotCtl->replication_slots[i];
    1321             : 
    1322             :         /* cannot change while ReplicationSlotCtlLock is held */
    1323         842 :         if (!s->in_use)
    1324         806 :             continue;
    1325             : 
    1326             :         /* only logical slots are database specific, skip */
    1327          36 :         if (!SlotIsLogical(s))
    1328          20 :             continue;
    1329             : 
    1330             :         /* not our database, skip */
    1331          16 :         if (s->data.database != dboid)
    1332          10 :             continue;
    1333             : 
    1334             :         /* NB: intentionally counting invalidated slots */
    1335             : 
    1336             :         /* count slots with spinlock held */
    1337           6 :         SpinLockAcquire(&s->mutex);
    1338           6 :         (*nslots)++;
    1339           6 :         if (s->active_pid != 0)
    1340           2 :             (*nactive)++;
    1341           6 :         SpinLockRelease(&s->mutex);
    1342             :     }
    1343          90 :     LWLockRelease(ReplicationSlotControlLock);
    1344             : 
    1345          90 :     if (*nslots > 0)
    1346           6 :         return true;
    1347          84 :     return false;
    1348             : }
    1349             : 
    1350             : /*
    1351             :  * ReplicationSlotsDropDBSlots -- Drop all db-specific slots relating to the
    1352             :  * passed database oid. The caller should hold an exclusive lock on the
    1353             :  * pg_database oid for the database to prevent creation of new slots on the db
    1354             :  * or replay from existing slots.
    1355             :  *
    1356             :  * Another session that concurrently acquires an existing slot on the target DB
    1357             :  * (most likely to drop it) may cause this function to ERROR. If that happens
    1358             :  * it may have dropped some but not all slots.
    1359             :  *
    1360             :  * This routine isn't as efficient as it could be - but we don't drop
    1361             :  * databases often, especially databases with lots of slots.
    1362             :  */
    1363             : void
    1364         114 : ReplicationSlotsDropDBSlots(Oid dboid)
    1365             : {
    1366             :     int         i;
    1367             : 
    1368         114 :     if (max_replication_slots <= 0)
    1369           0 :         return;
    1370             : 
    1371         114 : restart:
    1372         124 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    1373        1190 :     for (i = 0; i < max_replication_slots; i++)
    1374             :     {
    1375             :         ReplicationSlot *s;
    1376             :         char       *slotname;
    1377             :         int         active_pid;
    1378             : 
    1379        1076 :         s = &ReplicationSlotCtl->replication_slots[i];
    1380             : 
    1381             :         /* cannot change while ReplicationSlotCtlLock is held */
    1382        1076 :         if (!s->in_use)
    1383        1022 :             continue;
    1384             : 
    1385             :         /* only logical slots are database specific, skip */
    1386          54 :         if (!SlotIsLogical(s))
    1387          22 :             continue;
    1388             : 
    1389             :         /* not our database, skip */
    1390          32 :         if (s->data.database != dboid)
    1391          22 :             continue;
    1392             : 
    1393             :         /* NB: intentionally including invalidated slots */
    1394             : 
    1395             :         /* acquire slot, so ReplicationSlotDropAcquired can be reused  */
    1396          10 :         SpinLockAcquire(&s->mutex);
    1397             :         /* can't change while ReplicationSlotControlLock is held */
    1398          10 :         slotname = NameStr(s->data.name);
    1399          10 :         active_pid = s->active_pid;
    1400          10 :         if (active_pid == 0)
    1401             :         {
    1402          10 :             MyReplicationSlot = s;
    1403          10 :             s->active_pid = MyProcPid;
    1404             :         }
    1405          10 :         SpinLockRelease(&s->mutex);
    1406             : 
    1407             :         /*
    1408             :          * Even though we hold an exclusive lock on the database object a
    1409             :          * logical slot for that DB can still be active, e.g. if it's
    1410             :          * concurrently being dropped by a backend connected to another DB.
    1411             :          *
    1412             :          * That's fairly unlikely in practice, so we'll just bail out.
    1413             :          *
    1414             :          * The slot sync worker holds a shared lock on the database before
    1415             :          * operating on synced logical slots to avoid conflict with the drop
    1416             :          * happening here. The persistent synced slots are thus safe but there
    1417             :          * is a possibility that the slot sync worker has created a temporary
    1418             :          * slot (which stays active even on release) and we are trying to drop
    1419             :          * that here. In practice, the chances of hitting this scenario are
    1420             :          * less as during slot synchronization, the temporary slot is
    1421             :          * immediately converted to persistent and thus is safe due to the
    1422             :          * shared lock taken on the database. So, we'll just bail out in such
    1423             :          * a case.
    1424             :          *
    1425             :          * XXX: We can consider shutting down the slot sync worker before
    1426             :          * trying to drop synced temporary slots here.
    1427             :          */
    1428          10 :         if (active_pid)
    1429           0 :             ereport(ERROR,
    1430             :                     (errcode(ERRCODE_OBJECT_IN_USE),
    1431             :                      errmsg("replication slot \"%s\" is active for PID %d",
    1432             :                             slotname, active_pid)));
    1433             : 
    1434             :         /*
    1435             :          * To avoid duplicating ReplicationSlotDropAcquired() and to avoid
    1436             :          * holding ReplicationSlotControlLock over filesystem operations,
    1437             :          * release ReplicationSlotControlLock and use
    1438             :          * ReplicationSlotDropAcquired.
    1439             :          *
    1440             :          * As that means the set of slots could change, restart scan from the
    1441             :          * beginning each time we release the lock.
    1442             :          */
    1443          10 :         LWLockRelease(ReplicationSlotControlLock);
    1444          10 :         ReplicationSlotDropAcquired();
    1445          10 :         goto restart;
    1446             :     }
    1447         114 :     LWLockRelease(ReplicationSlotControlLock);
    1448             : }
    1449             : 
    1450             : 
    1451             : /*
    1452             :  * Check whether the server's configuration supports using replication
    1453             :  * slots.
    1454             :  */
    1455             : void
    1456        3364 : CheckSlotRequirements(void)
    1457             : {
    1458             :     /*
    1459             :      * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
    1460             :      * needs the same check.
    1461             :      */
    1462             : 
    1463        3364 :     if (max_replication_slots == 0)
    1464           0 :         ereport(ERROR,
    1465             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1466             :                  errmsg("replication slots can only be used if \"max_replication_slots\" > 0")));
    1467             : 
    1468        3364 :     if (wal_level < WAL_LEVEL_REPLICA)
    1469           0 :         ereport(ERROR,
    1470             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1471             :                  errmsg("replication slots can only be used if \"wal_level\" >= \"replica\"")));
    1472        3364 : }
    1473             : 
    1474             : /*
    1475             :  * Check whether the user has privilege to use replication slots.
    1476             :  */
    1477             : void
    1478        1086 : CheckSlotPermissions(void)
    1479             : {
    1480        1086 :     if (!has_rolreplication(GetUserId()))
    1481          10 :         ereport(ERROR,
    1482             :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    1483             :                  errmsg("permission denied to use replication slots"),
    1484             :                  errdetail("Only roles with the %s attribute may use replication slots.",
    1485             :                            "REPLICATION")));
    1486        1076 : }
    1487             : 
    1488             : /*
    1489             :  * Reserve WAL for the currently active slot.
    1490             :  *
    1491             :  * Compute and set restart_lsn in a manner that's appropriate for the type of
    1492             :  * the slot and concurrency safe.
    1493             :  */
    1494             : void
    1495        1182 : ReplicationSlotReserveWal(void)
    1496             : {
    1497        1182 :     ReplicationSlot *slot = MyReplicationSlot;
    1498             : 
    1499             :     Assert(slot != NULL);
    1500             :     Assert(slot->data.restart_lsn == InvalidXLogRecPtr);
    1501             :     Assert(slot->last_saved_restart_lsn == InvalidXLogRecPtr);
    1502             : 
    1503             :     /*
    1504             :      * The replication slot mechanism is used to prevent removal of required
    1505             :      * WAL. As there is no interlock between this routine and checkpoints, WAL
    1506             :      * segments could concurrently be removed when a now stale return value of
    1507             :      * ReplicationSlotsComputeRequiredLSN() is used. In the unlikely case that
    1508             :      * this happens we'll just retry.
    1509             :      */
    1510             :     while (true)
    1511           0 :     {
    1512             :         XLogSegNo   segno;
    1513             :         XLogRecPtr  restart_lsn;
    1514             : 
    1515             :         /*
    1516             :          * For logical slots log a standby snapshot and start logical decoding
    1517             :          * at exactly that position. That allows the slot to start up more
    1518             :          * quickly. But on a standby we cannot do WAL writes, so just use the
    1519             :          * replay pointer; effectively, an attempt to create a logical slot on
    1520             :          * standby will cause it to wait for an xl_running_xact record to be
    1521             :          * logged independently on the primary, so that a snapshot can be
    1522             :          * built using the record.
    1523             :          *
    1524             :          * None of this is needed (or indeed helpful) for physical slots as
    1525             :          * they'll start replay at the last logged checkpoint anyway. Instead
    1526             :          * return the location of the last redo LSN. While that slightly
    1527             :          * increases the chance that we have to retry, it's where a base
    1528             :          * backup has to start replay at.
    1529             :          */
    1530        1182 :         if (SlotIsPhysical(slot))
    1531         294 :             restart_lsn = GetRedoRecPtr();
    1532         888 :         else if (RecoveryInProgress())
    1533          46 :             restart_lsn = GetXLogReplayRecPtr(NULL);
    1534             :         else
    1535         842 :             restart_lsn = GetXLogInsertRecPtr();
    1536             : 
    1537        1182 :         SpinLockAcquire(&slot->mutex);
    1538        1182 :         slot->data.restart_lsn = restart_lsn;
    1539        1182 :         SpinLockRelease(&slot->mutex);
    1540             : 
    1541             :         /* prevent WAL removal as fast as possible */
    1542        1182 :         ReplicationSlotsComputeRequiredLSN();
    1543             : 
    1544             :         /*
    1545             :          * If all required WAL is still there, great, otherwise retry. The
    1546             :          * slot should prevent further removal of WAL, unless there's a
    1547             :          * concurrent ReplicationSlotsComputeRequiredLSN() after we've written
    1548             :          * the new restart_lsn above, so normally we should never need to loop
    1549             :          * more than twice.
    1550             :          */
    1551        1182 :         XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size);
    1552        1182 :         if (XLogGetLastRemovedSegno() < segno)
    1553        1182 :             break;
    1554             :     }
    1555             : 
    1556        1182 :     if (!RecoveryInProgress() && SlotIsLogical(slot))
    1557             :     {
    1558             :         XLogRecPtr  flushptr;
    1559             : 
    1560             :         /* make sure we have enough information to start */
    1561         842 :         flushptr = LogStandbySnapshot();
    1562             : 
    1563             :         /* and make sure it's fsynced to disk */
    1564         842 :         XLogFlush(flushptr);
    1565             :     }
    1566        1182 : }
    1567             : 
    1568             : /*
    1569             :  * Report that replication slot needs to be invalidated
    1570             :  */
    1571             : static void
    1572          42 : ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
    1573             :                        bool terminating,
    1574             :                        int pid,
    1575             :                        NameData slotname,
    1576             :                        XLogRecPtr restart_lsn,
    1577             :                        XLogRecPtr oldestLSN,
    1578             :                        TransactionId snapshotConflictHorizon,
    1579             :                        long slot_idle_seconds)
    1580             : {
    1581             :     StringInfoData err_detail;
    1582             :     StringInfoData err_hint;
    1583             : 
    1584          42 :     initStringInfo(&err_detail);
    1585          42 :     initStringInfo(&err_hint);
    1586             : 
    1587          42 :     switch (cause)
    1588             :     {
    1589          12 :         case RS_INVAL_WAL_REMOVED:
    1590             :             {
    1591          12 :                 uint64      ex = oldestLSN - restart_lsn;
    1592             : 
    1593          12 :                 appendStringInfo(&err_detail,
    1594          12 :                                  ngettext("The slot's restart_lsn %X/%X exceeds the limit by %" PRIu64 " byte.",
    1595             :                                           "The slot's restart_lsn %X/%X exceeds the limit by %" PRIu64 " bytes.",
    1596             :                                           ex),
    1597          12 :                                  LSN_FORMAT_ARGS(restart_lsn),
    1598             :                                  ex);
    1599             :                 /* translator: %s is a GUC variable name */
    1600          12 :                 appendStringInfo(&err_hint, _("You might need to increase \"%s\"."),
    1601             :                                  "max_slot_wal_keep_size");
    1602          12 :                 break;
    1603             :             }
    1604          24 :         case RS_INVAL_HORIZON:
    1605          24 :             appendStringInfo(&err_detail, _("The slot conflicted with xid horizon %u."),
    1606             :                              snapshotConflictHorizon);
    1607          24 :             break;
    1608             : 
    1609           6 :         case RS_INVAL_WAL_LEVEL:
    1610           6 :             appendStringInfoString(&err_detail, _("Logical decoding on standby requires \"wal_level\" >= \"logical\" on the primary server."));
    1611           6 :             break;
    1612             : 
    1613           0 :         case RS_INVAL_IDLE_TIMEOUT:
    1614             :             {
    1615           0 :                 int         minutes = slot_idle_seconds / SECS_PER_MINUTE;
    1616           0 :                 int         secs = slot_idle_seconds % SECS_PER_MINUTE;
    1617             : 
    1618             :                 /* translator: %s is a GUC variable name */
    1619           0 :                 appendStringInfo(&err_detail, _("The slot's idle time of %dmin %02ds exceeds the configured \"%s\" duration of %dmin."),
    1620             :                                  minutes, secs, "idle_replication_slot_timeout",
    1621             :                                  idle_replication_slot_timeout_mins);
    1622             :                 /* translator: %s is a GUC variable name */
    1623           0 :                 appendStringInfo(&err_hint, _("You might need to increase \"%s\"."),
    1624             :                                  "idle_replication_slot_timeout");
    1625           0 :                 break;
    1626             :             }
    1627             :         case RS_INVAL_NONE:
    1628             :             pg_unreachable();
    1629             :     }
    1630             : 
    1631          42 :     ereport(LOG,
    1632             :             terminating ?
    1633             :             errmsg("terminating process %d to release replication slot \"%s\"",
    1634             :                    pid, NameStr(slotname)) :
    1635             :             errmsg("invalidating obsolete replication slot \"%s\"",
    1636             :                    NameStr(slotname)),
    1637             :             errdetail_internal("%s", err_detail.data),
    1638             :             err_hint.len ? errhint("%s", err_hint.data) : 0);
    1639             : 
    1640          42 :     pfree(err_detail.data);
    1641          42 :     pfree(err_hint.data);
    1642          42 : }
    1643             : 
    1644             : /*
    1645             :  * Can we invalidate an idle replication slot?
    1646             :  *
    1647             :  * Idle timeout invalidation is allowed only when:
    1648             :  *
    1649             :  * 1. Idle timeout is set
    1650             :  * 2. Slot has reserved WAL
    1651             :  * 3. Slot is inactive
    1652             :  * 4. The slot is not being synced from the primary while the server is in
    1653             :  *    recovery. This is because synced slots are always considered to be
    1654             :  *    inactive because they don't perform logical decoding to produce changes.
    1655             :  */
    1656             : static inline bool
    1657         634 : CanInvalidateIdleSlot(ReplicationSlot *s)
    1658             : {
    1659         634 :     return (idle_replication_slot_timeout_mins != 0 &&
    1660           0 :             !XLogRecPtrIsInvalid(s->data.restart_lsn) &&
    1661         634 :             s->inactive_since > 0 &&
    1662           0 :             !(RecoveryInProgress() && s->data.synced));
    1663             : }
    1664             : 
    1665             : /*
    1666             :  * DetermineSlotInvalidationCause - Determine the cause for which a slot
    1667             :  * becomes invalid among the given possible causes.
    1668             :  *
    1669             :  * This function sequentially checks all possible invalidation causes and
    1670             :  * returns the first one for which the slot is eligible for invalidation.
    1671             :  */
    1672             : static ReplicationSlotInvalidationCause
    1673         696 : DetermineSlotInvalidationCause(uint32 possible_causes, ReplicationSlot *s,
    1674             :                                XLogRecPtr oldestLSN, Oid dboid,
    1675             :                                TransactionId snapshotConflictHorizon,
    1676             :                                TransactionId initial_effective_xmin,
    1677             :                                TransactionId initial_catalog_effective_xmin,
    1678             :                                XLogRecPtr initial_restart_lsn,
    1679             :                                TimestampTz *inactive_since, TimestampTz now)
    1680             : {
    1681             :     Assert(possible_causes != RS_INVAL_NONE);
    1682             : 
    1683         696 :     if (possible_causes & RS_INVAL_WAL_REMOVED)
    1684             :     {
    1685         646 :         if (initial_restart_lsn != InvalidXLogRecPtr &&
    1686             :             initial_restart_lsn < oldestLSN)
    1687          12 :             return RS_INVAL_WAL_REMOVED;
    1688             :     }
    1689             : 
    1690         684 :     if (possible_causes & RS_INVAL_HORIZON)
    1691             :     {
    1692             :         /* invalid DB oid signals a shared relation */
    1693          44 :         if (SlotIsLogical(s) &&
    1694          34 :             (dboid == InvalidOid || dboid == s->data.database))
    1695             :         {
    1696          44 :             if (TransactionIdIsValid(initial_effective_xmin) &&
    1697           0 :                 TransactionIdPrecedesOrEquals(initial_effective_xmin,
    1698             :                                               snapshotConflictHorizon))
    1699           0 :                 return RS_INVAL_HORIZON;
    1700          88 :             else if (TransactionIdIsValid(initial_catalog_effective_xmin) &&
    1701          44 :                      TransactionIdPrecedesOrEquals(initial_catalog_effective_xmin,
    1702             :                                                    snapshotConflictHorizon))
    1703          24 :                 return RS_INVAL_HORIZON;
    1704             :         }
    1705             :     }
    1706             : 
    1707         660 :     if (possible_causes & RS_INVAL_WAL_LEVEL)
    1708             :     {
    1709           6 :         if (SlotIsLogical(s))
    1710           6 :             return RS_INVAL_WAL_LEVEL;
    1711             :     }
    1712             : 
    1713         654 :     if (possible_causes & RS_INVAL_IDLE_TIMEOUT)
    1714             :     {
    1715             :         Assert(now > 0);
    1716             : 
    1717         634 :         if (CanInvalidateIdleSlot(s))
    1718             :         {
    1719             :             /*
    1720             :              * We simulate the invalidation due to idle_timeout as the minimum
    1721             :              * time idle time is one minute which makes tests take a long
    1722             :              * time.
    1723             :              */
    1724             : #ifdef USE_INJECTION_POINTS
    1725           0 :             if (IS_INJECTION_POINT_ATTACHED("slot-timeout-inval"))
    1726             :             {
    1727           0 :                 *inactive_since = 0;    /* since the beginning of time */
    1728           0 :                 return RS_INVAL_IDLE_TIMEOUT;
    1729             :             }
    1730             : #endif
    1731             : 
    1732             :             /*
    1733             :              * Check if the slot needs to be invalidated due to
    1734             :              * idle_replication_slot_timeout GUC.
    1735             :              */
    1736           0 :             if (TimestampDifferenceExceedsSeconds(s->inactive_since, now,
    1737             :                                                   idle_replication_slot_timeout_mins * SECS_PER_MINUTE))
    1738             :             {
    1739           0 :                 *inactive_since = s->inactive_since;
    1740           0 :                 return RS_INVAL_IDLE_TIMEOUT;
    1741             :             }
    1742             :         }
    1743             :     }
    1744             : 
    1745         654 :     return RS_INVAL_NONE;
    1746             : }
    1747             : 
    1748             : /*
    1749             :  * Helper for InvalidateObsoleteReplicationSlots
    1750             :  *
    1751             :  * Acquires the given slot and mark it invalid, if necessary and possible.
    1752             :  *
    1753             :  * Returns whether ReplicationSlotControlLock was released in the interim (and
    1754             :  * in that case we're not holding the lock at return, otherwise we are).
    1755             :  *
    1756             :  * Sets *invalidated true if the slot was invalidated. (Untouched otherwise.)
    1757             :  *
    1758             :  * This is inherently racy, because we release the LWLock
    1759             :  * for syscalls, so caller must restart if we return true.
    1760             :  */
    1761             : static bool
    1762         768 : InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
    1763             :                                ReplicationSlot *s,
    1764             :                                XLogRecPtr oldestLSN,
    1765             :                                Oid dboid, TransactionId snapshotConflictHorizon,
    1766             :                                bool *invalidated)
    1767             : {
    1768         768 :     int         last_signaled_pid = 0;
    1769         768 :     bool        released_lock = false;
    1770         768 :     bool        terminated = false;
    1771         768 :     TransactionId initial_effective_xmin = InvalidTransactionId;
    1772         768 :     TransactionId initial_catalog_effective_xmin = InvalidTransactionId;
    1773         768 :     XLogRecPtr  initial_restart_lsn = InvalidXLogRecPtr;
    1774         768 :     ReplicationSlotInvalidationCause invalidation_cause_prev PG_USED_FOR_ASSERTS_ONLY = RS_INVAL_NONE;
    1775         768 :     TimestampTz inactive_since = 0;
    1776             : 
    1777             :     for (;;)
    1778          14 :     {
    1779             :         XLogRecPtr  restart_lsn;
    1780             :         NameData    slotname;
    1781         782 :         int         active_pid = 0;
    1782         782 :         ReplicationSlotInvalidationCause invalidation_cause = RS_INVAL_NONE;
    1783         782 :         TimestampTz now = 0;
    1784         782 :         long        slot_idle_secs = 0;
    1785             : 
    1786             :         Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
    1787             : 
    1788         782 :         if (!s->in_use)
    1789             :         {
    1790           0 :             if (released_lock)
    1791           0 :                 LWLockRelease(ReplicationSlotControlLock);
    1792           0 :             break;
    1793             :         }
    1794             : 
    1795         782 :         if (possible_causes & RS_INVAL_IDLE_TIMEOUT)
    1796             :         {
    1797             :             /*
    1798             :              * Assign the current time here to avoid system call overhead
    1799             :              * while holding the spinlock in subsequent code.
    1800             :              */
    1801         666 :             now = GetCurrentTimestamp();
    1802             :         }
    1803             : 
    1804             :         /*
    1805             :          * Check if the slot needs to be invalidated. If it needs to be
    1806             :          * invalidated, and is not currently acquired, acquire it and mark it
    1807             :          * as having been invalidated.  We do this with the spinlock held to
    1808             :          * avoid race conditions -- for example the restart_lsn could move
    1809             :          * forward, or the slot could be dropped.
    1810             :          */
    1811         782 :         SpinLockAcquire(&s->mutex);
    1812             : 
    1813         782 :         restart_lsn = s->data.restart_lsn;
    1814             : 
    1815             :         /* we do nothing if the slot is already invalid */
    1816         782 :         if (s->data.invalidated == RS_INVAL_NONE)
    1817             :         {
    1818             :             /*
    1819             :              * The slot's mutex will be released soon, and it is possible that
    1820             :              * those values change since the process holding the slot has been
    1821             :              * terminated (if any), so record them here to ensure that we
    1822             :              * would report the correct invalidation cause.
    1823             :              *
    1824             :              * Unlike other slot attributes, slot's inactive_since can't be
    1825             :              * changed until the acquired slot is released or the owning
    1826             :              * process is terminated. So, the inactive slot can only be
    1827             :              * invalidated immediately without being terminated.
    1828             :              */
    1829         696 :             if (!terminated)
    1830             :             {
    1831         682 :                 initial_restart_lsn = s->data.restart_lsn;
    1832         682 :                 initial_effective_xmin = s->effective_xmin;
    1833         682 :                 initial_catalog_effective_xmin = s->effective_catalog_xmin;
    1834             :             }
    1835             : 
    1836         696 :             invalidation_cause = DetermineSlotInvalidationCause(possible_causes,
    1837             :                                                                 s, oldestLSN,
    1838             :                                                                 dboid,
    1839             :                                                                 snapshotConflictHorizon,
    1840             :                                                                 initial_effective_xmin,
    1841             :                                                                 initial_catalog_effective_xmin,
    1842             :                                                                 initial_restart_lsn,
    1843             :                                                                 &inactive_since,
    1844             :                                                                 now);
    1845             :         }
    1846             : 
    1847             :         /*
    1848             :          * The invalidation cause recorded previously should not change while
    1849             :          * the process owning the slot (if any) has been terminated.
    1850             :          */
    1851             :         Assert(!(invalidation_cause_prev != RS_INVAL_NONE && terminated &&
    1852             :                  invalidation_cause_prev != invalidation_cause));
    1853             : 
    1854             :         /* if there's no invalidation, we're done */
    1855         782 :         if (invalidation_cause == RS_INVAL_NONE)
    1856             :         {
    1857         740 :             SpinLockRelease(&s->mutex);
    1858         740 :             if (released_lock)
    1859           0 :                 LWLockRelease(ReplicationSlotControlLock);
    1860         740 :             break;
    1861             :         }
    1862             : 
    1863          42 :         slotname = s->data.name;
    1864          42 :         active_pid = s->active_pid;
    1865             : 
    1866             :         /*
    1867             :          * If the slot can be acquired, do so and mark it invalidated
    1868             :          * immediately.  Otherwise we'll signal the owning process, below, and
    1869             :          * retry.
    1870             :          */
    1871          42 :         if (active_pid == 0)
    1872             :         {
    1873          28 :             MyReplicationSlot = s;
    1874          28 :             s->active_pid = MyProcPid;
    1875          28 :             s->data.invalidated = invalidation_cause;
    1876             : 
    1877             :             /*
    1878             :              * XXX: We should consider not overwriting restart_lsn and instead
    1879             :              * just rely on .invalidated.
    1880             :              */
    1881          28 :             if (invalidation_cause == RS_INVAL_WAL_REMOVED)
    1882             :             {
    1883           8 :                 s->data.restart_lsn = InvalidXLogRecPtr;
    1884           8 :                 s->last_saved_restart_lsn = InvalidXLogRecPtr;
    1885             :             }
    1886             : 
    1887             :             /* Let caller know */
    1888          28 :             *invalidated = true;
    1889             :         }
    1890             : 
    1891          42 :         SpinLockRelease(&s->mutex);
    1892             : 
    1893             :         /*
    1894             :          * The logical replication slots shouldn't be invalidated as GUC
    1895             :          * max_slot_wal_keep_size is set to -1 and
    1896             :          * idle_replication_slot_timeout is set to 0 during the binary
    1897             :          * upgrade. See check_old_cluster_for_valid_slots() where we ensure
    1898             :          * that no invalidated before the upgrade.
    1899             :          */
    1900             :         Assert(!(*invalidated && SlotIsLogical(s) && IsBinaryUpgrade));
    1901             : 
    1902             :         /*
    1903             :          * Calculate the idle time duration of the slot if slot is marked
    1904             :          * invalidated with RS_INVAL_IDLE_TIMEOUT.
    1905             :          */
    1906          42 :         if (invalidation_cause == RS_INVAL_IDLE_TIMEOUT)
    1907             :         {
    1908             :             int         slot_idle_usecs;
    1909             : 
    1910           0 :             TimestampDifference(inactive_since, now, &slot_idle_secs,
    1911             :                                 &slot_idle_usecs);
    1912             :         }
    1913             : 
    1914          42 :         if (active_pid != 0)
    1915             :         {
    1916             :             /*
    1917             :              * Prepare the sleep on the slot's condition variable before
    1918             :              * releasing the lock, to close a possible race condition if the
    1919             :              * slot is released before the sleep below.
    1920             :              */
    1921          14 :             ConditionVariablePrepareToSleep(&s->active_cv);
    1922             : 
    1923          14 :             LWLockRelease(ReplicationSlotControlLock);
    1924          14 :             released_lock = true;
    1925             : 
    1926             :             /*
    1927             :              * Signal to terminate the process that owns the slot, if we
    1928             :              * haven't already signalled it.  (Avoidance of repeated
    1929             :              * signalling is the only reason for there to be a loop in this
    1930             :              * routine; otherwise we could rely on caller's restart loop.)
    1931             :              *
    1932             :              * There is the race condition that other process may own the slot
    1933             :              * after its current owner process is terminated and before this
    1934             :              * process owns it. To handle that, we signal only if the PID of
    1935             :              * the owning process has changed from the previous time. (This
    1936             :              * logic assumes that the same PID is not reused very quickly.)
    1937             :              */
    1938          14 :             if (last_signaled_pid != active_pid)
    1939             :             {
    1940          14 :                 ReportSlotInvalidation(invalidation_cause, true, active_pid,
    1941             :                                        slotname, restart_lsn,
    1942             :                                        oldestLSN, snapshotConflictHorizon,
    1943             :                                        slot_idle_secs);
    1944             : 
    1945          14 :                 if (MyBackendType == B_STARTUP)
    1946          10 :                     (void) SendProcSignal(active_pid,
    1947             :                                           PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT,
    1948             :                                           INVALID_PROC_NUMBER);
    1949             :                 else
    1950           4 :                     (void) kill(active_pid, SIGTERM);
    1951             : 
    1952          14 :                 last_signaled_pid = active_pid;
    1953          14 :                 terminated = true;
    1954          14 :                 invalidation_cause_prev = invalidation_cause;
    1955             :             }
    1956             : 
    1957             :             /* Wait until the slot is released. */
    1958          14 :             ConditionVariableSleep(&s->active_cv,
    1959             :                                    WAIT_EVENT_REPLICATION_SLOT_DROP);
    1960             : 
    1961             :             /*
    1962             :              * Re-acquire lock and start over; we expect to invalidate the
    1963             :              * slot next time (unless another process acquires the slot in the
    1964             :              * meantime).
    1965             :              */
    1966          14 :             LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    1967          14 :             continue;
    1968             :         }
    1969             :         else
    1970             :         {
    1971             :             /*
    1972             :              * We hold the slot now and have already invalidated it; flush it
    1973             :              * to ensure that state persists.
    1974             :              *
    1975             :              * Don't want to hold ReplicationSlotControlLock across file
    1976             :              * system operations, so release it now but be sure to tell caller
    1977             :              * to restart from scratch.
    1978             :              */
    1979          28 :             LWLockRelease(ReplicationSlotControlLock);
    1980          28 :             released_lock = true;
    1981             : 
    1982             :             /* Make sure the invalidated state persists across server restart */
    1983          28 :             ReplicationSlotMarkDirty();
    1984          28 :             ReplicationSlotSave();
    1985          28 :             ReplicationSlotRelease();
    1986             : 
    1987          28 :             ReportSlotInvalidation(invalidation_cause, false, active_pid,
    1988             :                                    slotname, restart_lsn,
    1989             :                                    oldestLSN, snapshotConflictHorizon,
    1990             :                                    slot_idle_secs);
    1991             : 
    1992             :             /* done with this slot for now */
    1993          28 :             break;
    1994             :         }
    1995             :     }
    1996             : 
    1997             :     Assert(released_lock == !LWLockHeldByMe(ReplicationSlotControlLock));
    1998             : 
    1999         768 :     return released_lock;
    2000             : }
    2001             : 
    2002             : /*
    2003             :  * Invalidate slots that require resources about to be removed.
    2004             :  *
    2005             :  * Returns true when any slot have got invalidated.
    2006             :  *
    2007             :  * Whether a slot needs to be invalidated depends on the invalidation cause.
    2008             :  * A slot is invalidated if it:
    2009             :  * - RS_INVAL_WAL_REMOVED: requires a LSN older than the given segment
    2010             :  * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given
    2011             :  *   db; dboid may be InvalidOid for shared relations
    2012             :  * - RS_INVAL_WAL_LEVEL: is logical and wal_level is insufficient
    2013             :  * - RS_INVAL_IDLE_TIMEOUT: has been idle longer than the configured
    2014             :  *   "idle_replication_slot_timeout" duration.
    2015             :  *
    2016             :  * Note: This function attempts to invalidate the slot for multiple possible
    2017             :  * causes in a single pass, minimizing redundant iterations. The "cause"
    2018             :  * parameter can be a MASK representing one or more of the defined causes.
    2019             :  *
    2020             :  * NB - this runs as part of checkpoint, so avoid raising errors if possible.
    2021             :  */
    2022             : bool
    2023        3394 : InvalidateObsoleteReplicationSlots(uint32 possible_causes,
    2024             :                                    XLogSegNo oldestSegno, Oid dboid,
    2025             :                                    TransactionId snapshotConflictHorizon)
    2026             : {
    2027             :     XLogRecPtr  oldestLSN;
    2028        3394 :     bool        invalidated = false;
    2029             : 
    2030             :     Assert(!(possible_causes & RS_INVAL_HORIZON) || TransactionIdIsValid(snapshotConflictHorizon));
    2031             :     Assert(!(possible_causes & RS_INVAL_WAL_REMOVED) || oldestSegno > 0);
    2032             :     Assert(possible_causes != RS_INVAL_NONE);
    2033             : 
    2034        3394 :     if (max_replication_slots == 0)
    2035           0 :         return invalidated;
    2036             : 
    2037        3394 :     XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
    2038             : 
    2039        3422 : restart:
    2040        3422 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    2041       36636 :     for (int i = 0; i < max_replication_slots; i++)
    2042             :     {
    2043       33242 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
    2044             : 
    2045       33242 :         if (!s->in_use)
    2046       32474 :             continue;
    2047             : 
    2048         768 :         if (InvalidatePossiblyObsoleteSlot(possible_causes, s, oldestLSN, dboid,
    2049             :                                            snapshotConflictHorizon,
    2050             :                                            &invalidated))
    2051             :         {
    2052             :             /* if the lock was released, start from scratch */
    2053          28 :             goto restart;
    2054             :         }
    2055             :     }
    2056        3394 :     LWLockRelease(ReplicationSlotControlLock);
    2057             : 
    2058             :     /*
    2059             :      * If any slots have been invalidated, recalculate the resource limits.
    2060             :      */
    2061        3394 :     if (invalidated)
    2062             :     {
    2063          18 :         ReplicationSlotsComputeRequiredXmin(false);
    2064          18 :         ReplicationSlotsComputeRequiredLSN();
    2065             :     }
    2066             : 
    2067        3394 :     return invalidated;
    2068             : }
    2069             : 
    2070             : /*
    2071             :  * Flush all replication slots to disk.
    2072             :  *
    2073             :  * It is convenient to flush dirty replication slots at the time of checkpoint.
    2074             :  * Additionally, in case of a shutdown checkpoint, we also identify the slots
    2075             :  * for which the confirmed_flush LSN has been updated since the last time it
    2076             :  * was saved and flush them.
    2077             :  */
    2078             : void
    2079        3350 : CheckPointReplicationSlots(bool is_shutdown)
    2080             : {
    2081             :     int         i;
    2082        3350 :     bool        last_saved_restart_lsn_updated = false;
    2083             : 
    2084        3350 :     elog(DEBUG1, "performing replication slot checkpoint");
    2085             : 
    2086             :     /*
    2087             :      * Prevent any slot from being created/dropped while we're active. As we
    2088             :      * explicitly do *not* want to block iterating over replication_slots or
    2089             :      * acquiring a slot we cannot take the control lock - but that's OK,
    2090             :      * because holding ReplicationSlotAllocationLock is strictly stronger, and
    2091             :      * enough to guarantee that nobody can change the in_use bits on us.
    2092             :      */
    2093        3350 :     LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
    2094             : 
    2095       36304 :     for (i = 0; i < max_replication_slots; i++)
    2096             :     {
    2097       32954 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
    2098             :         char        path[MAXPGPATH];
    2099             : 
    2100       32954 :         if (!s->in_use)
    2101       32298 :             continue;
    2102             : 
    2103             :         /* save the slot to disk, locking is handled in SaveSlotToPath() */
    2104         656 :         sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(s->data.name));
    2105             : 
    2106             :         /*
    2107             :          * Slot's data is not flushed each time the confirmed_flush LSN is
    2108             :          * updated as that could lead to frequent writes.  However, we decide
    2109             :          * to force a flush of all logical slot's data at the time of shutdown
    2110             :          * if the confirmed_flush LSN is changed since we last flushed it to
    2111             :          * disk.  This helps in avoiding an unnecessary retreat of the
    2112             :          * confirmed_flush LSN after restart.
    2113             :          */
    2114         656 :         if (is_shutdown && SlotIsLogical(s))
    2115             :         {
    2116         122 :             SpinLockAcquire(&s->mutex);
    2117             : 
    2118         122 :             if (s->data.invalidated == RS_INVAL_NONE &&
    2119         122 :                 s->data.confirmed_flush > s->last_saved_confirmed_flush)
    2120             :             {
    2121          62 :                 s->just_dirtied = true;
    2122          62 :                 s->dirty = true;
    2123             :             }
    2124         122 :             SpinLockRelease(&s->mutex);
    2125             :         }
    2126             : 
    2127             :         /*
    2128             :          * Track if we're going to update slot's last_saved_restart_lsn. We
    2129             :          * need this to know if we need to recompute the required LSN.
    2130             :          */
    2131         656 :         if (s->last_saved_restart_lsn != s->data.restart_lsn)
    2132         378 :             last_saved_restart_lsn_updated = true;
    2133             : 
    2134         656 :         SaveSlotToPath(s, path, LOG);
    2135             :     }
    2136        3350 :     LWLockRelease(ReplicationSlotAllocationLock);
    2137             : 
    2138             :     /*
    2139             :      * Recompute the required LSN if SaveSlotToPath() updated
    2140             :      * last_saved_restart_lsn for any slot.
    2141             :      */
    2142        3350 :     if (last_saved_restart_lsn_updated)
    2143         378 :         ReplicationSlotsComputeRequiredLSN();
    2144        3350 : }
    2145             : 
    2146             : /*
    2147             :  * Load all replication slots from disk into memory at server startup. This
    2148             :  * needs to be run before we start crash recovery.
    2149             :  */
    2150             : void
    2151        1842 : StartupReplicationSlots(void)
    2152             : {
    2153             :     DIR        *replication_dir;
    2154             :     struct dirent *replication_de;
    2155             : 
    2156        1842 :     elog(DEBUG1, "starting up replication slots");
    2157             : 
    2158             :     /* restore all slots by iterating over all on-disk entries */
    2159        1842 :     replication_dir = AllocateDir(PG_REPLSLOT_DIR);
    2160        5670 :     while ((replication_de = ReadDir(replication_dir, PG_REPLSLOT_DIR)) != NULL)
    2161             :     {
    2162             :         char        path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
    2163             :         PGFileType  de_type;
    2164             : 
    2165        3830 :         if (strcmp(replication_de->d_name, ".") == 0 ||
    2166        1990 :             strcmp(replication_de->d_name, "..") == 0)
    2167        3680 :             continue;
    2168             : 
    2169         150 :         snprintf(path, sizeof(path), "%s/%s", PG_REPLSLOT_DIR, replication_de->d_name);
    2170         150 :         de_type = get_dirent_type(path, replication_de, false, DEBUG1);
    2171             : 
    2172             :         /* we're only creating directories here, skip if it's not our's */
    2173         150 :         if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_DIR)
    2174           0 :             continue;
    2175             : 
    2176             :         /* we crashed while a slot was being setup or deleted, clean up */
    2177         150 :         if (pg_str_endswith(replication_de->d_name, ".tmp"))
    2178             :         {
    2179           0 :             if (!rmtree(path, true))
    2180             :             {
    2181           0 :                 ereport(WARNING,
    2182             :                         (errmsg("could not remove directory \"%s\"",
    2183             :                                 path)));
    2184           0 :                 continue;
    2185             :             }
    2186           0 :             fsync_fname(PG_REPLSLOT_DIR, true);
    2187           0 :             continue;
    2188             :         }
    2189             : 
    2190             :         /* looks like a slot in a normal state, restore */
    2191         150 :         RestoreSlotFromDisk(replication_de->d_name);
    2192             :     }
    2193        1840 :     FreeDir(replication_dir);
    2194             : 
    2195             :     /* currently no slots exist, we're done. */
    2196        1840 :     if (max_replication_slots <= 0)
    2197           0 :         return;
    2198             : 
    2199             :     /* Now that we have recovered all the data, compute replication xmin */
    2200        1840 :     ReplicationSlotsComputeRequiredXmin(false);
    2201        1840 :     ReplicationSlotsComputeRequiredLSN();
    2202             : }
    2203             : 
    2204             : /* ----
    2205             :  * Manipulation of on-disk state of replication slots
    2206             :  *
    2207             :  * NB: none of the routines below should take any notice whether a slot is the
    2208             :  * current one or not, that's all handled a layer above.
    2209             :  * ----
    2210             :  */
    2211             : static void
    2212        1258 : CreateSlotOnDisk(ReplicationSlot *slot)
    2213             : {
    2214             :     char        tmppath[MAXPGPATH];
    2215             :     char        path[MAXPGPATH];
    2216             :     struct stat st;
    2217             : 
    2218             :     /*
    2219             :      * No need to take out the io_in_progress_lock, nobody else can see this
    2220             :      * slot yet, so nobody else will write. We're reusing SaveSlotToPath which
    2221             :      * takes out the lock, if we'd take the lock here, we'd deadlock.
    2222             :      */
    2223             : 
    2224        1258 :     sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(slot->data.name));
    2225        1258 :     sprintf(tmppath, "%s/%s.tmp", PG_REPLSLOT_DIR, NameStr(slot->data.name));
    2226             : 
    2227             :     /*
    2228             :      * It's just barely possible that some previous effort to create or drop a
    2229             :      * slot with this name left a temp directory lying around. If that seems
    2230             :      * to be the case, try to remove it.  If the rmtree() fails, we'll error
    2231             :      * out at the MakePGDirectory() below, so we don't bother checking
    2232             :      * success.
    2233             :      */
    2234        1258 :     if (stat(tmppath, &st) == 0 && S_ISDIR(st.st_mode))
    2235           0 :         rmtree(tmppath, true);
    2236             : 
    2237             :     /* Create and fsync the temporary slot directory. */
    2238        1258 :     if (MakePGDirectory(tmppath) < 0)
    2239           0 :         ereport(ERROR,
    2240             :                 (errcode_for_file_access(),
    2241             :                  errmsg("could not create directory \"%s\": %m",
    2242             :                         tmppath)));
    2243        1258 :     fsync_fname(tmppath, true);
    2244             : 
    2245             :     /* Write the actual state file. */
    2246        1258 :     slot->dirty = true;          /* signal that we really need to write */
    2247        1258 :     SaveSlotToPath(slot, tmppath, ERROR);
    2248             : 
    2249             :     /* Rename the directory into place. */
    2250        1258 :     if (rename(tmppath, path) != 0)
    2251           0 :         ereport(ERROR,
    2252             :                 (errcode_for_file_access(),
    2253             :                  errmsg("could not rename file \"%s\" to \"%s\": %m",
    2254             :                         tmppath, path)));
    2255             : 
    2256             :     /*
    2257             :      * If we'd now fail - really unlikely - we wouldn't know whether this slot
    2258             :      * would persist after an OS crash or not - so, force a restart. The
    2259             :      * restart would try to fsync this again till it works.
    2260             :      */
    2261        1258 :     START_CRIT_SECTION();
    2262             : 
    2263        1258 :     fsync_fname(path, true);
    2264        1258 :     fsync_fname(PG_REPLSLOT_DIR, true);
    2265             : 
    2266        1258 :     END_CRIT_SECTION();
    2267        1258 : }
    2268             : 
    2269             : /*
    2270             :  * Shared functionality between saving and creating a replication slot.
    2271             :  */
    2272             : static void
    2273        4562 : SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
    2274             : {
    2275             :     char        tmppath[MAXPGPATH];
    2276             :     char        path[MAXPGPATH];
    2277             :     int         fd;
    2278             :     ReplicationSlotOnDisk cp;
    2279             :     bool        was_dirty;
    2280             : 
    2281             :     /* first check whether there's something to write out */
    2282        4562 :     SpinLockAcquire(&slot->mutex);
    2283        4562 :     was_dirty = slot->dirty;
    2284        4562 :     slot->just_dirtied = false;
    2285        4562 :     SpinLockRelease(&slot->mutex);
    2286             : 
    2287             :     /* and don't do anything if there's nothing to write */
    2288        4562 :     if (!was_dirty)
    2289         198 :         return;
    2290             : 
    2291        4364 :     LWLockAcquire(&slot->io_in_progress_lock, LW_EXCLUSIVE);
    2292             : 
    2293             :     /* silence valgrind :( */
    2294        4364 :     memset(&cp, 0, sizeof(ReplicationSlotOnDisk));
    2295             : 
    2296        4364 :     sprintf(tmppath, "%s/state.tmp", dir);
    2297        4364 :     sprintf(path, "%s/state", dir);
    2298             : 
    2299        4364 :     fd = OpenTransientFile(tmppath, O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
    2300        4364 :     if (fd < 0)
    2301             :     {
    2302             :         /*
    2303             :          * If not an ERROR, then release the lock before returning.  In case
    2304             :          * of an ERROR, the error recovery path automatically releases the
    2305             :          * lock, but no harm in explicitly releasing even in that case.  Note
    2306             :          * that LWLockRelease() could affect errno.
    2307             :          */
    2308           0 :         int         save_errno = errno;
    2309             : 
    2310           0 :         LWLockRelease(&slot->io_in_progress_lock);
    2311           0 :         errno = save_errno;
    2312           0 :         ereport(elevel,
    2313             :                 (errcode_for_file_access(),
    2314             :                  errmsg("could not create file \"%s\": %m",
    2315             :                         tmppath)));
    2316           0 :         return;
    2317             :     }
    2318             : 
    2319        4364 :     cp.magic = SLOT_MAGIC;
    2320        4364 :     INIT_CRC32C(cp.checksum);
    2321        4364 :     cp.version = SLOT_VERSION;
    2322        4364 :     cp.length = ReplicationSlotOnDiskV2Size;
    2323             : 
    2324        4364 :     SpinLockAcquire(&slot->mutex);
    2325             : 
    2326        4364 :     memcpy(&cp.slotdata, &slot->data, sizeof(ReplicationSlotPersistentData));
    2327             : 
    2328        4364 :     SpinLockRelease(&slot->mutex);
    2329             : 
    2330        4364 :     COMP_CRC32C(cp.checksum,
    2331             :                 (char *) (&cp) + ReplicationSlotOnDiskNotChecksummedSize,
    2332             :                 ReplicationSlotOnDiskChecksummedSize);
    2333        4364 :     FIN_CRC32C(cp.checksum);
    2334             : 
    2335        4364 :     errno = 0;
    2336        4364 :     pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_WRITE);
    2337        4364 :     if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
    2338             :     {
    2339           0 :         int         save_errno = errno;
    2340             : 
    2341           0 :         pgstat_report_wait_end();
    2342           0 :         CloseTransientFile(fd);
    2343           0 :         LWLockRelease(&slot->io_in_progress_lock);
    2344             : 
    2345             :         /* if write didn't set errno, assume problem is no disk space */
    2346           0 :         errno = save_errno ? save_errno : ENOSPC;
    2347           0 :         ereport(elevel,
    2348             :                 (errcode_for_file_access(),
    2349             :                  errmsg("could not write to file \"%s\": %m",
    2350             :                         tmppath)));
    2351           0 :         return;
    2352             :     }
    2353        4364 :     pgstat_report_wait_end();
    2354             : 
    2355             :     /* fsync the temporary file */
    2356        4364 :     pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_SYNC);
    2357        4364 :     if (pg_fsync(fd) != 0)
    2358             :     {
    2359           0 :         int         save_errno = errno;
    2360             : 
    2361           0 :         pgstat_report_wait_end();
    2362           0 :         CloseTransientFile(fd);
    2363           0 :         LWLockRelease(&slot->io_in_progress_lock);
    2364           0 :         errno = save_errno;
    2365           0 :         ereport(elevel,
    2366             :                 (errcode_for_file_access(),
    2367             :                  errmsg("could not fsync file \"%s\": %m",
    2368             :                         tmppath)));
    2369           0 :         return;
    2370             :     }
    2371        4364 :     pgstat_report_wait_end();
    2372             : 
    2373        4364 :     if (CloseTransientFile(fd) != 0)
    2374             :     {
    2375           0 :         int         save_errno = errno;
    2376             : 
    2377           0 :         LWLockRelease(&slot->io_in_progress_lock);
    2378           0 :         errno = save_errno;
    2379           0 :         ereport(elevel,
    2380             :                 (errcode_for_file_access(),
    2381             :                  errmsg("could not close file \"%s\": %m",
    2382             :                         tmppath)));
    2383           0 :         return;
    2384             :     }
    2385             : 
    2386             :     /* rename to permanent file, fsync file and directory */
    2387        4364 :     if (rename(tmppath, path) != 0)
    2388             :     {
    2389           0 :         int         save_errno = errno;
    2390             : 
    2391           0 :         LWLockRelease(&slot->io_in_progress_lock);
    2392           0 :         errno = save_errno;
    2393           0 :         ereport(elevel,
    2394             :                 (errcode_for_file_access(),
    2395             :                  errmsg("could not rename file \"%s\" to \"%s\": %m",
    2396             :                         tmppath, path)));
    2397           0 :         return;
    2398             :     }
    2399             : 
    2400             :     /*
    2401             :      * Check CreateSlotOnDisk() for the reasoning of using a critical section.
    2402             :      */
    2403        4364 :     START_CRIT_SECTION();
    2404             : 
    2405        4364 :     fsync_fname(path, false);
    2406        4364 :     fsync_fname(dir, true);
    2407        4364 :     fsync_fname(PG_REPLSLOT_DIR, true);
    2408             : 
    2409        4364 :     END_CRIT_SECTION();
    2410             : 
    2411             :     /*
    2412             :      * Successfully wrote, unset dirty bit, unless somebody dirtied again
    2413             :      * already and remember the confirmed_flush LSN value.
    2414             :      */
    2415        4364 :     SpinLockAcquire(&slot->mutex);
    2416        4364 :     if (!slot->just_dirtied)
    2417        4322 :         slot->dirty = false;
    2418        4364 :     slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
    2419        4364 :     slot->last_saved_restart_lsn = cp.slotdata.restart_lsn;
    2420        4364 :     SpinLockRelease(&slot->mutex);
    2421             : 
    2422        4364 :     LWLockRelease(&slot->io_in_progress_lock);
    2423             : }
    2424             : 
    2425             : /*
    2426             :  * Load a single slot from disk into memory.
    2427             :  */
    2428             : static void
    2429         150 : RestoreSlotFromDisk(const char *name)
    2430             : {
    2431             :     ReplicationSlotOnDisk cp;
    2432             :     int         i;
    2433             :     char        slotdir[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
    2434             :     char        path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR) + 10];
    2435             :     int         fd;
    2436         150 :     bool        restored = false;
    2437             :     int         readBytes;
    2438             :     pg_crc32c   checksum;
    2439         150 :     TimestampTz now = 0;
    2440             : 
    2441             :     /* no need to lock here, no concurrent access allowed yet */
    2442             : 
    2443             :     /* delete temp file if it exists */
    2444         150 :     sprintf(slotdir, "%s/%s", PG_REPLSLOT_DIR, name);
    2445         150 :     sprintf(path, "%s/state.tmp", slotdir);
    2446         150 :     if (unlink(path) < 0 && errno != ENOENT)
    2447           0 :         ereport(PANIC,
    2448             :                 (errcode_for_file_access(),
    2449             :                  errmsg("could not remove file \"%s\": %m", path)));
    2450             : 
    2451         150 :     sprintf(path, "%s/state", slotdir);
    2452             : 
    2453         150 :     elog(DEBUG1, "restoring replication slot from \"%s\"", path);
    2454             : 
    2455             :     /* on some operating systems fsyncing a file requires O_RDWR */
    2456         150 :     fd = OpenTransientFile(path, O_RDWR | PG_BINARY);
    2457             : 
    2458             :     /*
    2459             :      * We do not need to handle this as we are rename()ing the directory into
    2460             :      * place only after we fsync()ed the state file.
    2461             :      */
    2462         150 :     if (fd < 0)
    2463           0 :         ereport(PANIC,
    2464             :                 (errcode_for_file_access(),
    2465             :                  errmsg("could not open file \"%s\": %m", path)));
    2466             : 
    2467             :     /*
    2468             :      * Sync state file before we're reading from it. We might have crashed
    2469             :      * while it wasn't synced yet and we shouldn't continue on that basis.
    2470             :      */
    2471         150 :     pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_RESTORE_SYNC);
    2472         150 :     if (pg_fsync(fd) != 0)
    2473           0 :         ereport(PANIC,
    2474             :                 (errcode_for_file_access(),
    2475             :                  errmsg("could not fsync file \"%s\": %m",
    2476             :                         path)));
    2477         150 :     pgstat_report_wait_end();
    2478             : 
    2479             :     /* Also sync the parent directory */
    2480         150 :     START_CRIT_SECTION();
    2481         150 :     fsync_fname(slotdir, true);
    2482         150 :     END_CRIT_SECTION();
    2483             : 
    2484             :     /* read part of statefile that's guaranteed to be version independent */
    2485         150 :     pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
    2486         150 :     readBytes = read(fd, &cp, ReplicationSlotOnDiskConstantSize);
    2487         150 :     pgstat_report_wait_end();
    2488         150 :     if (readBytes != ReplicationSlotOnDiskConstantSize)
    2489             :     {
    2490           0 :         if (readBytes < 0)
    2491           0 :             ereport(PANIC,
    2492             :                     (errcode_for_file_access(),
    2493             :                      errmsg("could not read file \"%s\": %m", path)));
    2494             :         else
    2495           0 :             ereport(PANIC,
    2496             :                     (errcode(ERRCODE_DATA_CORRUPTED),
    2497             :                      errmsg("could not read file \"%s\": read %d of %zu",
    2498             :                             path, readBytes,
    2499             :                             (Size) ReplicationSlotOnDiskConstantSize)));
    2500             :     }
    2501             : 
    2502             :     /* verify magic */
    2503         150 :     if (cp.magic != SLOT_MAGIC)
    2504           0 :         ereport(PANIC,
    2505             :                 (errcode(ERRCODE_DATA_CORRUPTED),
    2506             :                  errmsg("replication slot file \"%s\" has wrong magic number: %u instead of %u",
    2507             :                         path, cp.magic, SLOT_MAGIC)));
    2508             : 
    2509             :     /* verify version */
    2510         150 :     if (cp.version != SLOT_VERSION)
    2511           0 :         ereport(PANIC,
    2512             :                 (errcode(ERRCODE_DATA_CORRUPTED),
    2513             :                  errmsg("replication slot file \"%s\" has unsupported version %u",
    2514             :                         path, cp.version)));
    2515             : 
    2516             :     /* boundary check on length */
    2517         150 :     if (cp.length != ReplicationSlotOnDiskV2Size)
    2518           0 :         ereport(PANIC,
    2519             :                 (errcode(ERRCODE_DATA_CORRUPTED),
    2520             :                  errmsg("replication slot file \"%s\" has corrupted length %u",
    2521             :                         path, cp.length)));
    2522             : 
    2523             :     /* Now that we know the size, read the entire file */
    2524         150 :     pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
    2525         300 :     readBytes = read(fd,
    2526             :                      (char *) &cp + ReplicationSlotOnDiskConstantSize,
    2527         150 :                      cp.length);
    2528         150 :     pgstat_report_wait_end();
    2529         150 :     if (readBytes != cp.length)
    2530             :     {
    2531           0 :         if (readBytes < 0)
    2532           0 :             ereport(PANIC,
    2533             :                     (errcode_for_file_access(),
    2534             :                      errmsg("could not read file \"%s\": %m", path)));
    2535             :         else
    2536           0 :             ereport(PANIC,
    2537             :                     (errcode(ERRCODE_DATA_CORRUPTED),
    2538             :                      errmsg("could not read file \"%s\": read %d of %zu",
    2539             :                             path, readBytes, (Size) cp.length)));
    2540             :     }
    2541             : 
    2542         150 :     if (CloseTransientFile(fd) != 0)
    2543           0 :         ereport(PANIC,
    2544             :                 (errcode_for_file_access(),
    2545             :                  errmsg("could not close file \"%s\": %m", path)));
    2546             : 
    2547             :     /* now verify the CRC */
    2548         150 :     INIT_CRC32C(checksum);
    2549         150 :     COMP_CRC32C(checksum,
    2550             :                 (char *) &cp + ReplicationSlotOnDiskNotChecksummedSize,
    2551             :                 ReplicationSlotOnDiskChecksummedSize);
    2552         150 :     FIN_CRC32C(checksum);
    2553             : 
    2554         150 :     if (!EQ_CRC32C(checksum, cp.checksum))
    2555           0 :         ereport(PANIC,
    2556             :                 (errmsg("checksum mismatch for replication slot file \"%s\": is %u, should be %u",
    2557             :                         path, checksum, cp.checksum)));
    2558             : 
    2559             :     /*
    2560             :      * If we crashed with an ephemeral slot active, don't restore but delete
    2561             :      * it.
    2562             :      */
    2563         150 :     if (cp.slotdata.persistency != RS_PERSISTENT)
    2564             :     {
    2565           0 :         if (!rmtree(slotdir, true))
    2566             :         {
    2567           0 :             ereport(WARNING,
    2568             :                     (errmsg("could not remove directory \"%s\"",
    2569             :                             slotdir)));
    2570             :         }
    2571           0 :         fsync_fname(PG_REPLSLOT_DIR, true);
    2572           0 :         return;
    2573             :     }
    2574             : 
    2575             :     /*
    2576             :      * Verify that requirements for the specific slot type are met. That's
    2577             :      * important because if these aren't met we're not guaranteed to retain
    2578             :      * all the necessary resources for the slot.
    2579             :      *
    2580             :      * NB: We have to do so *after* the above checks for ephemeral slots,
    2581             :      * because otherwise a slot that shouldn't exist anymore could prevent
    2582             :      * restarts.
    2583             :      *
    2584             :      * NB: Changing the requirements here also requires adapting
    2585             :      * CheckSlotRequirements() and CheckLogicalDecodingRequirements().
    2586             :      */
    2587         150 :     if (cp.slotdata.database != InvalidOid)
    2588             :     {
    2589         112 :         if (wal_level < WAL_LEVEL_LOGICAL)
    2590           0 :             ereport(FATAL,
    2591             :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2592             :                      errmsg("logical replication slot \"%s\" exists, but \"wal_level\" < \"logical\"",
    2593             :                             NameStr(cp.slotdata.name)),
    2594             :                      errhint("Change \"wal_level\" to be \"logical\" or higher.")));
    2595             : 
    2596             :         /*
    2597             :          * In standby mode, the hot standby must be enabled. This check is
    2598             :          * necessary to ensure logical slots are invalidated when they become
    2599             :          * incompatible due to insufficient wal_level. Otherwise, if the
    2600             :          * primary reduces wal_level < logical while hot standby is disabled,
    2601             :          * logical slots would remain valid even after promotion.
    2602             :          */
    2603         112 :         if (StandbyMode && !EnableHotStandby)
    2604           2 :             ereport(FATAL,
    2605             :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2606             :                      errmsg("logical replication slot \"%s\" exists on the standby, but \"hot_standby\" = \"off\"",
    2607             :                             NameStr(cp.slotdata.name)),
    2608             :                      errhint("Change \"hot_standby\" to be \"on\".")));
    2609             :     }
    2610          38 :     else if (wal_level < WAL_LEVEL_REPLICA)
    2611           0 :         ereport(FATAL,
    2612             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2613             :                  errmsg("physical replication slot \"%s\" exists, but \"wal_level\" < \"replica\"",
    2614             :                         NameStr(cp.slotdata.name)),
    2615             :                  errhint("Change \"wal_level\" to be \"replica\" or higher.")));
    2616             : 
    2617             :     /* nothing can be active yet, don't lock anything */
    2618         202 :     for (i = 0; i < max_replication_slots; i++)
    2619             :     {
    2620             :         ReplicationSlot *slot;
    2621             : 
    2622         202 :         slot = &ReplicationSlotCtl->replication_slots[i];
    2623             : 
    2624         202 :         if (slot->in_use)
    2625          54 :             continue;
    2626             : 
    2627             :         /* restore the entire set of persistent data */
    2628         148 :         memcpy(&slot->data, &cp.slotdata,
    2629             :                sizeof(ReplicationSlotPersistentData));
    2630             : 
    2631             :         /* initialize in memory state */
    2632         148 :         slot->effective_xmin = cp.slotdata.xmin;
    2633         148 :         slot->effective_catalog_xmin = cp.slotdata.catalog_xmin;
    2634         148 :         slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
    2635         148 :         slot->last_saved_restart_lsn = cp.slotdata.restart_lsn;
    2636             : 
    2637         148 :         slot->candidate_catalog_xmin = InvalidTransactionId;
    2638         148 :         slot->candidate_xmin_lsn = InvalidXLogRecPtr;
    2639         148 :         slot->candidate_restart_lsn = InvalidXLogRecPtr;
    2640         148 :         slot->candidate_restart_valid = InvalidXLogRecPtr;
    2641             : 
    2642         148 :         slot->in_use = true;
    2643         148 :         slot->active_pid = 0;
    2644             : 
    2645             :         /*
    2646             :          * Set the time since the slot has become inactive after loading the
    2647             :          * slot from the disk into memory. Whoever acquires the slot i.e.
    2648             :          * makes the slot active will reset it. Use the same inactive_since
    2649             :          * time for all the slots.
    2650             :          */
    2651         148 :         if (now == 0)
    2652         148 :             now = GetCurrentTimestamp();
    2653             : 
    2654         148 :         ReplicationSlotSetInactiveSince(slot, now, false);
    2655             : 
    2656         148 :         restored = true;
    2657         148 :         break;
    2658             :     }
    2659             : 
    2660         148 :     if (!restored)
    2661           0 :         ereport(FATAL,
    2662             :                 (errmsg("too many replication slots active before shutdown"),
    2663             :                  errhint("Increase \"max_replication_slots\" and try again.")));
    2664             : }
    2665             : 
    2666             : /*
    2667             :  * Maps an invalidation reason for a replication slot to
    2668             :  * ReplicationSlotInvalidationCause.
    2669             :  */
    2670             : ReplicationSlotInvalidationCause
    2671           0 : GetSlotInvalidationCause(const char *cause_name)
    2672             : {
    2673             :     Assert(cause_name);
    2674             : 
    2675             :     /* Search lookup table for the cause having this name */
    2676           0 :     for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
    2677             :     {
    2678           0 :         if (strcmp(SlotInvalidationCauses[i].cause_name, cause_name) == 0)
    2679           0 :             return SlotInvalidationCauses[i].cause;
    2680             :     }
    2681             : 
    2682             :     Assert(false);
    2683           0 :     return RS_INVAL_NONE;       /* to keep compiler quiet */
    2684             : }
    2685             : 
    2686             : /*
    2687             :  * Maps an ReplicationSlotInvalidationCause to the invalidation
    2688             :  * reason for a replication slot.
    2689             :  */
    2690             : const char *
    2691          84 : GetSlotInvalidationCauseName(ReplicationSlotInvalidationCause cause)
    2692             : {
    2693             :     /* Search lookup table for the name of this cause */
    2694         270 :     for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
    2695             :     {
    2696         270 :         if (SlotInvalidationCauses[i].cause == cause)
    2697          84 :             return SlotInvalidationCauses[i].cause_name;
    2698             :     }
    2699             : 
    2700             :     Assert(false);
    2701           0 :     return "none";                /* to keep compiler quiet */
    2702             : }
    2703             : 
    2704             : /*
    2705             :  * A helper function to validate slots specified in GUC synchronized_standby_slots.
    2706             :  *
    2707             :  * The rawname will be parsed, and the result will be saved into *elemlist.
    2708             :  */
    2709             : static bool
    2710          12 : validate_sync_standby_slots(char *rawname, List **elemlist)
    2711             : {
    2712             :     bool        ok;
    2713             : 
    2714             :     /* Verify syntax and parse string into a list of identifiers */
    2715          12 :     ok = SplitIdentifierString(rawname, ',', elemlist);
    2716             : 
    2717          12 :     if (!ok)
    2718             :     {
    2719           0 :         GUC_check_errdetail("List syntax is invalid.");
    2720             :     }
    2721          12 :     else if (MyProc)
    2722             :     {
    2723             :         /*
    2724             :          * Check that each specified slot exist and is physical.
    2725             :          *
    2726             :          * Because we need an LWLock, we cannot do this on processes without a
    2727             :          * PGPROC, so we skip it there; but see comments in
    2728             :          * StandbySlotsHaveCaughtup() as to why that's not a problem.
    2729             :          */
    2730           6 :         LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    2731             : 
    2732          18 :         foreach_ptr(char, name, *elemlist)
    2733             :         {
    2734             :             ReplicationSlot *slot;
    2735             : 
    2736           6 :             slot = SearchNamedReplicationSlot(name, false);
    2737             : 
    2738           6 :             if (!slot)
    2739             :             {
    2740           0 :                 GUC_check_errdetail("Replication slot \"%s\" does not exist.",
    2741             :                                     name);
    2742           0 :                 ok = false;
    2743           0 :                 break;
    2744             :             }
    2745             : 
    2746           6 :             if (!SlotIsPhysical(slot))
    2747             :             {
    2748           0 :                 GUC_check_errdetail("\"%s\" is not a physical replication slot.",
    2749             :                                     name);
    2750           0 :                 ok = false;
    2751           0 :                 break;
    2752             :             }
    2753             :         }
    2754             : 
    2755           6 :         LWLockRelease(ReplicationSlotControlLock);
    2756             :     }
    2757             : 
    2758          12 :     return ok;
    2759             : }
    2760             : 
    2761             : /*
    2762             :  * GUC check_hook for synchronized_standby_slots
    2763             :  */
    2764             : bool
    2765        2216 : check_synchronized_standby_slots(char **newval, void **extra, GucSource source)
    2766             : {
    2767             :     char       *rawname;
    2768             :     char       *ptr;
    2769             :     List       *elemlist;
    2770             :     int         size;
    2771             :     bool        ok;
    2772             :     SyncStandbySlotsConfigData *config;
    2773             : 
    2774        2216 :     if ((*newval)[0] == '\0')
    2775        2204 :         return true;
    2776             : 
    2777             :     /* Need a modifiable copy of the GUC string */
    2778          12 :     rawname = pstrdup(*newval);
    2779             : 
    2780             :     /* Now verify if the specified slots exist and have correct type */
    2781          12 :     ok = validate_sync_standby_slots(rawname, &elemlist);
    2782             : 
    2783          12 :     if (!ok || elemlist == NIL)
    2784             :     {
    2785           0 :         pfree(rawname);
    2786           0 :         list_free(elemlist);
    2787           0 :         return ok;
    2788             :     }
    2789             : 
    2790             :     /* Compute the size required for the SyncStandbySlotsConfigData struct */
    2791          12 :     size = offsetof(SyncStandbySlotsConfigData, slot_names);
    2792          36 :     foreach_ptr(char, slot_name, elemlist)
    2793          12 :         size += strlen(slot_name) + 1;
    2794             : 
    2795             :     /* GUC extra value must be guc_malloc'd, not palloc'd */
    2796          12 :     config = (SyncStandbySlotsConfigData *) guc_malloc(LOG, size);
    2797          12 :     if (!config)
    2798           0 :         return false;
    2799             : 
    2800             :     /* Transform the data into SyncStandbySlotsConfigData */
    2801          12 :     config->nslotnames = list_length(elemlist);
    2802             : 
    2803          12 :     ptr = config->slot_names;
    2804          36 :     foreach_ptr(char, slot_name, elemlist)
    2805             :     {
    2806          12 :         strcpy(ptr, slot_name);
    2807          12 :         ptr += strlen(slot_name) + 1;
    2808             :     }
    2809             : 
    2810          12 :     *extra = config;
    2811             : 
    2812          12 :     pfree(rawname);
    2813          12 :     list_free(elemlist);
    2814          12 :     return true;
    2815             : }
    2816             : 
    2817             : /*
    2818             :  * GUC assign_hook for synchronized_standby_slots
    2819             :  */
    2820             : void
    2821        2216 : assign_synchronized_standby_slots(const char *newval, void *extra)
    2822             : {
    2823             :     /*
    2824             :      * The standby slots may have changed, so we must recompute the oldest
    2825             :      * LSN.
    2826             :      */
    2827        2216 :     ss_oldest_flush_lsn = InvalidXLogRecPtr;
    2828             : 
    2829        2216 :     synchronized_standby_slots_config = (SyncStandbySlotsConfigData *) extra;
    2830        2216 : }
    2831             : 
    2832             : /*
    2833             :  * Check if the passed slot_name is specified in the synchronized_standby_slots GUC.
    2834             :  */
    2835             : bool
    2836       58452 : SlotExistsInSyncStandbySlots(const char *slot_name)
    2837             : {
    2838             :     const char *standby_slot_name;
    2839             : 
    2840             :     /* Return false if there is no value in synchronized_standby_slots */
    2841       58452 :     if (synchronized_standby_slots_config == NULL)
    2842       58440 :         return false;
    2843             : 
    2844             :     /*
    2845             :      * XXX: We are not expecting this list to be long so a linear search
    2846             :      * shouldn't hurt but if that turns out not to be true then we can cache
    2847             :      * this information for each WalSender as well.
    2848             :      */
    2849          12 :     standby_slot_name = synchronized_standby_slots_config->slot_names;
    2850          12 :     for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
    2851             :     {
    2852          12 :         if (strcmp(standby_slot_name, slot_name) == 0)
    2853          12 :             return true;
    2854             : 
    2855           0 :         standby_slot_name += strlen(standby_slot_name) + 1;
    2856             :     }
    2857             : 
    2858           0 :     return false;
    2859             : }
    2860             : 
    2861             : /*
    2862             :  * Return true if the slots specified in synchronized_standby_slots have caught up to
    2863             :  * the given WAL location, false otherwise.
    2864             :  *
    2865             :  * The elevel parameter specifies the error level used for logging messages
    2866             :  * related to slots that do not exist, are invalidated, or are inactive.
    2867             :  */
    2868             : bool
    2869        1226 : StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
    2870             : {
    2871             :     const char *name;
    2872        1226 :     int         caught_up_slot_num = 0;
    2873        1226 :     XLogRecPtr  min_restart_lsn = InvalidXLogRecPtr;
    2874             : 
    2875             :     /*
    2876             :      * Don't need to wait for the standbys to catch up if there is no value in
    2877             :      * synchronized_standby_slots.
    2878             :      */
    2879        1226 :     if (synchronized_standby_slots_config == NULL)
    2880        1194 :         return true;
    2881             : 
    2882             :     /*
    2883             :      * Don't need to wait for the standbys to catch up if we are on a standby
    2884             :      * server, since we do not support syncing slots to cascading standbys.
    2885             :      */
    2886          32 :     if (RecoveryInProgress())
    2887           0 :         return true;
    2888             : 
    2889             :     /*
    2890             :      * Don't need to wait for the standbys to catch up if they are already
    2891             :      * beyond the specified WAL location.
    2892             :      */
    2893          32 :     if (!XLogRecPtrIsInvalid(ss_oldest_flush_lsn) &&
    2894          20 :         ss_oldest_flush_lsn >= wait_for_lsn)
    2895          12 :         return true;
    2896             : 
    2897             :     /*
    2898             :      * To prevent concurrent slot dropping and creation while filtering the
    2899             :      * slots, take the ReplicationSlotControlLock outside of the loop.
    2900             :      */
    2901          20 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    2902             : 
    2903          20 :     name = synchronized_standby_slots_config->slot_names;
    2904          30 :     for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
    2905             :     {
    2906             :         XLogRecPtr  restart_lsn;
    2907             :         bool        invalidated;
    2908             :         bool        inactive;
    2909             :         ReplicationSlot *slot;
    2910             : 
    2911          20 :         slot = SearchNamedReplicationSlot(name, false);
    2912             : 
    2913             :         /*
    2914             :          * If a slot name provided in synchronized_standby_slots does not
    2915             :          * exist, report a message and exit the loop.
    2916             :          *
    2917             :          * Though validate_sync_standby_slots (the GUC check_hook) tries to
    2918             :          * avoid this, it can nonetheless happen because the user can specify
    2919             :          * a nonexistent slot name before server startup. That function cannot
    2920             :          * validate such a slot during startup, as ReplicationSlotCtl is not
    2921             :          * initialized by then.  Also, the user might have dropped one slot.
    2922             :          */
    2923          20 :         if (!slot)
    2924             :         {
    2925           0 :             ereport(elevel,
    2926             :                     errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    2927             :                     errmsg("replication slot \"%s\" specified in parameter \"%s\" does not exist",
    2928             :                            name, "synchronized_standby_slots"),
    2929             :                     errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
    2930             :                               name),
    2931             :                     errhint("Create the replication slot \"%s\" or amend parameter \"%s\".",
    2932             :                             name, "synchronized_standby_slots"));
    2933           0 :             break;
    2934             :         }
    2935             : 
    2936             :         /* Same as above: if a slot is not physical, exit the loop. */
    2937          20 :         if (SlotIsLogical(slot))
    2938             :         {
    2939           0 :             ereport(elevel,
    2940             :                     errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    2941             :                     errmsg("cannot specify logical replication slot \"%s\" in parameter \"%s\"",
    2942             :                            name, "synchronized_standby_slots"),
    2943             :                     errdetail("Logical replication is waiting for correction on replication slot \"%s\".",
    2944             :                               name),
    2945             :                     errhint("Remove the logical replication slot \"%s\" from parameter \"%s\".",
    2946             :                             name, "synchronized_standby_slots"));
    2947           0 :             break;
    2948             :         }
    2949             : 
    2950          20 :         SpinLockAcquire(&slot->mutex);
    2951          20 :         restart_lsn = slot->data.restart_lsn;
    2952          20 :         invalidated = slot->data.invalidated != RS_INVAL_NONE;
    2953          20 :         inactive = slot->active_pid == 0;
    2954          20 :         SpinLockRelease(&slot->mutex);
    2955             : 
    2956          20 :         if (invalidated)
    2957             :         {
    2958             :             /* Specified physical slot has been invalidated */
    2959           0 :             ereport(elevel,
    2960             :                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2961             :                     errmsg("physical replication slot \"%s\" specified in parameter \"%s\" has been invalidated",
    2962             :                            name, "synchronized_standby_slots"),
    2963             :                     errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
    2964             :                               name),
    2965             :                     errhint("Drop and recreate the replication slot \"%s\", or amend parameter \"%s\".",
    2966             :                             name, "synchronized_standby_slots"));
    2967           0 :             break;
    2968             :         }
    2969             : 
    2970          20 :         if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn < wait_for_lsn)
    2971             :         {
    2972             :             /* Log a message if no active_pid for this physical slot */
    2973          10 :             if (inactive)
    2974           6 :                 ereport(elevel,
    2975             :                         errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2976             :                         errmsg("replication slot \"%s\" specified in parameter \"%s\" does not have active_pid",
    2977             :                                name, "synchronized_standby_slots"),
    2978             :                         errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
    2979             :                                   name),
    2980             :                         errhint("Start the standby associated with the replication slot \"%s\", or amend parameter \"%s\".",
    2981             :                                 name, "synchronized_standby_slots"));
    2982             : 
    2983             :             /* Continue if the current slot hasn't caught up. */
    2984          10 :             break;
    2985             :         }
    2986             : 
    2987             :         Assert(restart_lsn >= wait_for_lsn);
    2988             : 
    2989          10 :         if (XLogRecPtrIsInvalid(min_restart_lsn) ||
    2990             :             min_restart_lsn > restart_lsn)
    2991          10 :             min_restart_lsn = restart_lsn;
    2992             : 
    2993          10 :         caught_up_slot_num++;
    2994             : 
    2995          10 :         name += strlen(name) + 1;
    2996             :     }
    2997             : 
    2998          20 :     LWLockRelease(ReplicationSlotControlLock);
    2999             : 
    3000             :     /*
    3001             :      * Return false if not all the standbys have caught up to the specified
    3002             :      * WAL location.
    3003             :      */
    3004          20 :     if (caught_up_slot_num != synchronized_standby_slots_config->nslotnames)
    3005          10 :         return false;
    3006             : 
    3007             :     /* The ss_oldest_flush_lsn must not retreat. */
    3008             :     Assert(XLogRecPtrIsInvalid(ss_oldest_flush_lsn) ||
    3009             :            min_restart_lsn >= ss_oldest_flush_lsn);
    3010             : 
    3011          10 :     ss_oldest_flush_lsn = min_restart_lsn;
    3012             : 
    3013          10 :     return true;
    3014             : }
    3015             : 
    3016             : /*
    3017             :  * Wait for physical standbys to confirm receiving the given lsn.
    3018             :  *
    3019             :  * Used by logical decoding SQL functions. It waits for physical standbys
    3020             :  * corresponding to the physical slots specified in the synchronized_standby_slots GUC.
    3021             :  */
    3022             : void
    3023         440 : WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
    3024             : {
    3025             :     /*
    3026             :      * Don't need to wait for the standby to catch up if the current acquired
    3027             :      * slot is not a logical failover slot, or there is no value in
    3028             :      * synchronized_standby_slots.
    3029             :      */
    3030         440 :     if (!MyReplicationSlot->data.failover || !synchronized_standby_slots_config)
    3031         438 :         return;
    3032             : 
    3033           2 :     ConditionVariablePrepareToSleep(&WalSndCtl->wal_confirm_rcv_cv);
    3034             : 
    3035             :     for (;;)
    3036             :     {
    3037           4 :         CHECK_FOR_INTERRUPTS();
    3038             : 
    3039           4 :         if (ConfigReloadPending)
    3040             :         {
    3041           2 :             ConfigReloadPending = false;
    3042           2 :             ProcessConfigFile(PGC_SIGHUP);
    3043             :         }
    3044             : 
    3045             :         /* Exit if done waiting for every slot. */
    3046           4 :         if (StandbySlotsHaveCaughtup(wait_for_lsn, WARNING))
    3047           2 :             break;
    3048             : 
    3049             :         /*
    3050             :          * Wait for the slots in the synchronized_standby_slots to catch up,
    3051             :          * but use a timeout (1s) so we can also check if the
    3052             :          * synchronized_standby_slots has been changed.
    3053             :          */
    3054           2 :         ConditionVariableTimedSleep(&WalSndCtl->wal_confirm_rcv_cv, 1000,
    3055             :                                     WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION);
    3056             :     }
    3057             : 
    3058           2 :     ConditionVariableCancelSleep();
    3059             : }
    3060             : 
    3061             : /*
    3062             :  * GUC check_hook for idle_replication_slot_timeout
    3063             :  *
    3064             :  * The value of idle_replication_slot_timeout must be set to 0 during
    3065             :  * a binary upgrade. See start_postmaster() in pg_upgrade for more details.
    3066             :  */
    3067             : bool
    3068        2310 : check_idle_replication_slot_timeout(int *newval, void **extra, GucSource source)
    3069             : {
    3070        2310 :     if (IsBinaryUpgrade && *newval != 0)
    3071             :     {
    3072           0 :         GUC_check_errdetail("\"%s\" must be set to 0 during binary upgrade mode.",
    3073             :                             "idle_replication_slot_timeout");
    3074           0 :         return false;
    3075             :     }
    3076             : 
    3077        2310 :     return true;
    3078             : }

Generated by: LCOV version 1.16