LCOV - code coverage report
Current view: top level - src/backend/replication - slot.c (source / functions) Hit Total Coverage
Test: PostgreSQL 19devel Lines: 789 927 85.1 %
Date: 2025-07-30 12:18:05 Functions: 45 46 97.8 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * slot.c
       4             :  *     Replication slot management.
       5             :  *
       6             :  *
       7             :  * Copyright (c) 2012-2025, PostgreSQL Global Development Group
       8             :  *
       9             :  *
      10             :  * IDENTIFICATION
      11             :  *    src/backend/replication/slot.c
      12             :  *
      13             :  * NOTES
      14             :  *
      15             :  * Replication slots are used to keep state about replication streams
      16             :  * originating from this cluster.  Their primary purpose is to prevent the
      17             :  * premature removal of WAL or of old tuple versions in a manner that would
      18             :  * interfere with replication; they are also useful for monitoring purposes.
      19             :  * Slots need to be permanent (to allow restarts), crash-safe, and allocatable
      20             :  * on standbys (to support cascading setups).  The requirement that slots be
      21             :  * usable on standbys precludes storing them in the system catalogs.
      22             :  *
      23             :  * Each replication slot gets its own directory inside the directory
      24             :  * $PGDATA / PG_REPLSLOT_DIR.  Inside that directory the state file will
      25             :  * contain the slot's own data.  Additional data can be stored alongside that
      26             :  * file if required.  While the server is running, the state data is also
      27             :  * cached in memory for efficiency.
      28             :  *
      29             :  * ReplicationSlotAllocationLock must be taken in exclusive mode to allocate
      30             :  * or free a slot. ReplicationSlotControlLock must be taken in shared mode
      31             :  * to iterate over the slots, and in exclusive mode to change the in_use flag
      32             :  * of a slot.  The remaining data in each slot is protected by its mutex.
      33             :  *
      34             :  *-------------------------------------------------------------------------
      35             :  */
      36             : 
      37             : #include "postgres.h"
      38             : 
      39             : #include <unistd.h>
      40             : #include <sys/stat.h>
      41             : 
      42             : #include "access/transam.h"
      43             : #include "access/xlog_internal.h"
      44             : #include "access/xlogrecovery.h"
      45             : #include "common/file_utils.h"
      46             : #include "common/string.h"
      47             : #include "miscadmin.h"
      48             : #include "pgstat.h"
      49             : #include "postmaster/interrupt.h"
      50             : #include "replication/logicallauncher.h"
      51             : #include "replication/slotsync.h"
      52             : #include "replication/slot.h"
      53             : #include "replication/walsender_private.h"
      54             : #include "storage/fd.h"
      55             : #include "storage/ipc.h"
      56             : #include "storage/proc.h"
      57             : #include "storage/procarray.h"
      58             : #include "utils/builtins.h"
      59             : #include "utils/guc_hooks.h"
      60             : #include "utils/injection_point.h"
      61             : #include "utils/varlena.h"
      62             : 
      63             : /*
      64             :  * Replication slot on-disk data structure.
      65             :  */
      66             : typedef struct ReplicationSlotOnDisk
      67             : {
      68             :     /* first part of this struct needs to be version independent */
      69             : 
      70             :     /* data not covered by checksum */
      71             :     uint32      magic;
      72             :     pg_crc32c   checksum;
      73             : 
      74             :     /* data covered by checksum */
      75             :     uint32      version;
      76             :     uint32      length;
      77             : 
      78             :     /*
      79             :      * The actual data in the slot that follows can differ based on the above
      80             :      * 'version'.
      81             :      */
      82             : 
      83             :     ReplicationSlotPersistentData slotdata;
      84             : } ReplicationSlotOnDisk;
      85             : 
      86             : /*
      87             :  * Struct for the configuration of synchronized_standby_slots.
      88             :  *
      89             :  * Note: this must be a flat representation that can be held in a single chunk
      90             :  * of guc_malloc'd memory, so that it can be stored as the "extra" data for the
      91             :  * synchronized_standby_slots GUC.
      92             :  */
      93             : typedef struct
      94             : {
      95             :     /* Number of slot names in the slot_names[] */
      96             :     int         nslotnames;
      97             : 
      98             :     /*
      99             :      * slot_names contains 'nslotnames' consecutive null-terminated C strings.
     100             :      */
     101             :     char        slot_names[FLEXIBLE_ARRAY_MEMBER];
     102             : } SyncStandbySlotsConfigData;
     103             : 
     104             : /*
     105             :  * Lookup table for slot invalidation causes.
     106             :  */
     107             : typedef struct SlotInvalidationCauseMap
     108             : {
     109             :     ReplicationSlotInvalidationCause cause;
     110             :     const char *cause_name;
     111             : } SlotInvalidationCauseMap;
     112             : 
     113             : static const SlotInvalidationCauseMap SlotInvalidationCauses[] = {
     114             :     {RS_INVAL_NONE, "none"},
     115             :     {RS_INVAL_WAL_REMOVED, "wal_removed"},
     116             :     {RS_INVAL_HORIZON, "rows_removed"},
     117             :     {RS_INVAL_WAL_LEVEL, "wal_level_insufficient"},
     118             :     {RS_INVAL_IDLE_TIMEOUT, "idle_timeout"},
     119             : };
     120             : 
     121             : /*
     122             :  * Ensure that the lookup table is up-to-date with the enums defined in
     123             :  * ReplicationSlotInvalidationCause.
     124             :  */
     125             : StaticAssertDecl(lengthof(SlotInvalidationCauses) == (RS_INVAL_MAX_CAUSES + 1),
     126             :                  "array length mismatch");
     127             : 
     128             : /* size of version independent data */
     129             : #define ReplicationSlotOnDiskConstantSize \
     130             :     offsetof(ReplicationSlotOnDisk, slotdata)
     131             : /* size of the part of the slot not covered by the checksum */
     132             : #define ReplicationSlotOnDiskNotChecksummedSize  \
     133             :     offsetof(ReplicationSlotOnDisk, version)
     134             : /* size of the part covered by the checksum */
     135             : #define ReplicationSlotOnDiskChecksummedSize \
     136             :     sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskNotChecksummedSize
     137             : /* size of the slot data that is version dependent */
     138             : #define ReplicationSlotOnDiskV2Size \
     139             :     sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
     140             : 
     141             : #define SLOT_MAGIC      0x1051CA1   /* format identifier */
     142             : #define SLOT_VERSION    5       /* version for new files */
     143             : 
     144             : /* Control array for replication slot management */
     145             : ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
     146             : 
     147             : /* My backend's replication slot in the shared memory array */
     148             : ReplicationSlot *MyReplicationSlot = NULL;
     149             : 
     150             : /* GUC variables */
     151             : int         max_replication_slots = 10; /* the maximum number of replication
     152             :                                          * slots */
     153             : 
     154             : /*
     155             :  * Invalidate replication slots that have remained idle longer than this
     156             :  * duration; '0' disables it.
     157             :  */
     158             : int         idle_replication_slot_timeout_secs = 0;
     159             : 
     160             : /*
     161             :  * This GUC lists streaming replication standby server slot names that
     162             :  * logical WAL sender processes will wait for.
     163             :  */
     164             : char       *synchronized_standby_slots;
     165             : 
     166             : /* This is the parsed and cached configuration for synchronized_standby_slots */
     167             : static SyncStandbySlotsConfigData *synchronized_standby_slots_config;
     168             : 
     169             : /*
     170             :  * Oldest LSN that has been confirmed to be flushed to the standbys
     171             :  * corresponding to the physical slots specified in the synchronized_standby_slots GUC.
     172             :  */
     173             : static XLogRecPtr ss_oldest_flush_lsn = InvalidXLogRecPtr;
     174             : 
     175             : static void ReplicationSlotShmemExit(int code, Datum arg);
     176             : static bool IsSlotForConflictCheck(const char *name);
     177             : static void ReplicationSlotDropPtr(ReplicationSlot *slot);
     178             : 
     179             : /* internal persistency functions */
     180             : static void RestoreSlotFromDisk(const char *name);
     181             : static void CreateSlotOnDisk(ReplicationSlot *slot);
     182             : static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel);
     183             : 
     184             : /*
     185             :  * Report shared-memory space needed by ReplicationSlotsShmemInit.
     186             :  */
     187             : Size
     188        8298 : ReplicationSlotsShmemSize(void)
     189             : {
     190        8298 :     Size        size = 0;
     191             : 
     192        8298 :     if (max_replication_slots == 0)
     193           4 :         return size;
     194             : 
     195        8294 :     size = offsetof(ReplicationSlotCtlData, replication_slots);
     196        8294 :     size = add_size(size,
     197             :                     mul_size(max_replication_slots, sizeof(ReplicationSlot)));
     198             : 
     199        8294 :     return size;
     200             : }
     201             : 
     202             : /*
     203             :  * Allocate and initialize shared memory for replication slots.
     204             :  */
     205             : void
     206        2152 : ReplicationSlotsShmemInit(void)
     207             : {
     208             :     bool        found;
     209             : 
     210        2152 :     if (max_replication_slots == 0)
     211           2 :         return;
     212             : 
     213        2150 :     ReplicationSlotCtl = (ReplicationSlotCtlData *)
     214        2150 :         ShmemInitStruct("ReplicationSlot Ctl", ReplicationSlotsShmemSize(),
     215             :                         &found);
     216             : 
     217        2150 :     if (!found)
     218             :     {
     219             :         int         i;
     220             : 
     221             :         /* First time through, so initialize */
     222        3950 :         MemSet(ReplicationSlotCtl, 0, ReplicationSlotsShmemSize());
     223             : 
     224       23256 :         for (i = 0; i < max_replication_slots; i++)
     225             :         {
     226       21106 :             ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i];
     227             : 
     228             :             /* everything else is zeroed by the memset above */
     229       21106 :             SpinLockInit(&slot->mutex);
     230       21106 :             LWLockInitialize(&slot->io_in_progress_lock,
     231             :                              LWTRANCHE_REPLICATION_SLOT_IO);
     232       21106 :             ConditionVariableInit(&slot->active_cv);
     233             :         }
     234             :     }
     235             : }
     236             : 
     237             : /*
     238             :  * Register the callback for replication slot cleanup and releasing.
     239             :  */
     240             : void
     241       42654 : ReplicationSlotInitialize(void)
     242             : {
     243       42654 :     before_shmem_exit(ReplicationSlotShmemExit, 0);
     244       42654 : }
     245             : 
     246             : /*
     247             :  * Release and cleanup replication slots.
     248             :  */
     249             : static void
     250       42654 : ReplicationSlotShmemExit(int code, Datum arg)
     251             : {
     252             :     /* Make sure active replication slots are released */
     253       42654 :     if (MyReplicationSlot != NULL)
     254         498 :         ReplicationSlotRelease();
     255             : 
     256             :     /* Also cleanup all the temporary slots. */
     257       42654 :     ReplicationSlotCleanup(false);
     258       42654 : }
     259             : 
     260             : /*
     261             :  * Check whether the passed slot name is valid and report errors at elevel.
     262             :  *
     263             :  * An error will be reported for a reserved replication slot name if
     264             :  * allow_reserved_name is set to false.
     265             :  *
     266             :  * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow
     267             :  * the name to be used as a directory name on every supported OS.
     268             :  *
     269             :  * Returns whether the directory name is valid or not if elevel < ERROR.
     270             :  */
     271             : bool
     272        1884 : ReplicationSlotValidateName(const char *name, bool allow_reserved_name,
     273             :                             int elevel)
     274             : {
     275             :     const char *cp;
     276             : 
     277        1884 :     if (strlen(name) == 0)
     278             :     {
     279           6 :         ereport(elevel,
     280             :                 (errcode(ERRCODE_INVALID_NAME),
     281             :                  errmsg("replication slot name \"%s\" is too short",
     282             :                         name)));
     283           0 :         return false;
     284             :     }
     285             : 
     286        1878 :     if (strlen(name) >= NAMEDATALEN)
     287             :     {
     288           0 :         ereport(elevel,
     289             :                 (errcode(ERRCODE_NAME_TOO_LONG),
     290             :                  errmsg("replication slot name \"%s\" is too long",
     291             :                         name)));
     292           0 :         return false;
     293             :     }
     294             : 
     295       37076 :     for (cp = name; *cp; cp++)
     296             :     {
     297       35200 :         if (!((*cp >= 'a' && *cp <= 'z')
     298       17016 :               || (*cp >= '0' && *cp <= '9')
     299        3408 :               || (*cp == '_')))
     300             :         {
     301           2 :             ereport(elevel,
     302             :                     (errcode(ERRCODE_INVALID_NAME),
     303             :                      errmsg("replication slot name \"%s\" contains invalid character",
     304             :                             name),
     305             :                      errhint("Replication slot names may only contain lower case letters, numbers, and the underscore character.")));
     306           0 :             return false;
     307             :         }
     308             :     }
     309             : 
     310        1876 :     if (!allow_reserved_name && IsSlotForConflictCheck(name))
     311             :     {
     312           0 :         ereport(elevel,
     313             :                 errcode(ERRCODE_RESERVED_NAME),
     314             :                 errmsg("replication slot name \"%s\" is reserved",
     315             :                        name),
     316             :                 errdetail("The name \"%s\" is reserved for the conflict detection slot.",
     317             :                           CONFLICT_DETECTION_SLOT));
     318             : 
     319           0 :         return false;
     320             :     }
     321             : 
     322        1876 :     return true;
     323             : }
     324             : 
     325             : /*
     326             :  * Return true if the replication slot name is "pg_conflict_detection".
     327             :  */
     328             : static bool
     329        4228 : IsSlotForConflictCheck(const char *name)
     330             : {
     331        4228 :     return (strcmp(name, CONFLICT_DETECTION_SLOT) == 0);
     332             : }
     333             : 
     334             : /*
     335             :  * Create a new replication slot and mark it as used by this backend.
     336             :  *
     337             :  * name: Name of the slot
     338             :  * db_specific: logical decoding is db specific; if the slot is going to
     339             :  *     be used for that pass true, otherwise false.
     340             :  * two_phase: Allows decoding of prepared transactions. We allow this option
     341             :  *     to be enabled only at the slot creation time. If we allow this option
     342             :  *     to be changed during decoding then it is quite possible that we skip
     343             :  *     prepare first time because this option was not enabled. Now next time
     344             :  *     during getting changes, if the two_phase option is enabled it can skip
     345             :  *     prepare because by that time start decoding point has been moved. So the
     346             :  *     user will only get commit prepared.
     347             :  * failover: If enabled, allows the slot to be synced to standbys so
     348             :  *     that logical replication can be resumed after failover.
     349             :  * synced: True if the slot is synchronized from the primary server.
     350             :  */
     351             : void
     352        1288 : ReplicationSlotCreate(const char *name, bool db_specific,
     353             :                       ReplicationSlotPersistency persistency,
     354             :                       bool two_phase, bool failover, bool synced)
     355             : {
     356        1288 :     ReplicationSlot *slot = NULL;
     357             :     int         i;
     358             : 
     359             :     Assert(MyReplicationSlot == NULL);
     360             : 
     361             :     /*
     362             :      * The logical launcher or pg_upgrade may create or migrate an internal
     363             :      * slot, so using a reserved name is allowed in these cases.
     364             :      */
     365        1288 :     ReplicationSlotValidateName(name, IsBinaryUpgrade || IsLogicalLauncher(),
     366             :                                 ERROR);
     367             : 
     368        1286 :     if (failover)
     369             :     {
     370             :         /*
     371             :          * Do not allow users to create the failover enabled slots on the
     372             :          * standby as we do not support sync to the cascading standby.
     373             :          *
     374             :          * However, failover enabled slots can be created during slot
     375             :          * synchronization because we need to retain the same values as the
     376             :          * remote slot.
     377             :          */
     378          44 :         if (RecoveryInProgress() && !IsSyncingReplicationSlots())
     379           0 :             ereport(ERROR,
     380             :                     errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     381             :                     errmsg("cannot enable failover for a replication slot created on the standby"));
     382             : 
     383             :         /*
     384             :          * Do not allow users to create failover enabled temporary slots,
     385             :          * because temporary slots will not be synced to the standby.
     386             :          *
     387             :          * However, failover enabled temporary slots can be created during
     388             :          * slot synchronization. See the comments atop slotsync.c for details.
     389             :          */
     390          44 :         if (persistency == RS_TEMPORARY && !IsSyncingReplicationSlots())
     391           2 :             ereport(ERROR,
     392             :                     errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     393             :                     errmsg("cannot enable failover for a temporary replication slot"));
     394             :     }
     395             : 
     396             :     /*
     397             :      * If some other backend ran this code concurrently with us, we'd likely
     398             :      * both allocate the same slot, and that would be bad.  We'd also be at
     399             :      * risk of missing a name collision.  Also, we don't want to try to create
     400             :      * a new slot while somebody's busy cleaning up an old one, because we
     401             :      * might both be monkeying with the same directory.
     402             :      */
     403        1284 :     LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
     404             : 
     405             :     /*
     406             :      * Check for name collision, and identify an allocatable slot.  We need to
     407             :      * hold ReplicationSlotControlLock in shared mode for this, so that nobody
     408             :      * else can change the in_use flags while we're looking at them.
     409             :      */
     410        1284 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     411       12318 :     for (i = 0; i < max_replication_slots; i++)
     412             :     {
     413       11040 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
     414             : 
     415       11040 :         if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
     416           6 :             ereport(ERROR,
     417             :                     (errcode(ERRCODE_DUPLICATE_OBJECT),
     418             :                      errmsg("replication slot \"%s\" already exists", name)));
     419       11034 :         if (!s->in_use && slot == NULL)
     420        1276 :             slot = s;
     421             :     }
     422        1278 :     LWLockRelease(ReplicationSlotControlLock);
     423             : 
     424             :     /* If all slots are in use, we're out of luck. */
     425        1278 :     if (slot == NULL)
     426           2 :         ereport(ERROR,
     427             :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     428             :                  errmsg("all replication slots are in use"),
     429             :                  errhint("Free one or increase \"max_replication_slots\".")));
     430             : 
     431             :     /*
     432             :      * Since this slot is not in use, nobody should be looking at any part of
     433             :      * it other than the in_use field unless they're trying to allocate it.
     434             :      * And since we hold ReplicationSlotAllocationLock, nobody except us can
     435             :      * be doing that.  So it's safe to initialize the slot.
     436             :      */
     437             :     Assert(!slot->in_use);
     438             :     Assert(slot->active_pid == 0);
     439             : 
     440             :     /* first initialize persistent data */
     441        1276 :     memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData));
     442        1276 :     namestrcpy(&slot->data.name, name);
     443        1276 :     slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
     444        1276 :     slot->data.persistency = persistency;
     445        1276 :     slot->data.two_phase = two_phase;
     446        1276 :     slot->data.two_phase_at = InvalidXLogRecPtr;
     447        1276 :     slot->data.failover = failover;
     448        1276 :     slot->data.synced = synced;
     449             : 
     450             :     /* and then data only present in shared memory */
     451        1276 :     slot->just_dirtied = false;
     452        1276 :     slot->dirty = false;
     453        1276 :     slot->effective_xmin = InvalidTransactionId;
     454        1276 :     slot->effective_catalog_xmin = InvalidTransactionId;
     455        1276 :     slot->candidate_catalog_xmin = InvalidTransactionId;
     456        1276 :     slot->candidate_xmin_lsn = InvalidXLogRecPtr;
     457        1276 :     slot->candidate_restart_valid = InvalidXLogRecPtr;
     458        1276 :     slot->candidate_restart_lsn = InvalidXLogRecPtr;
     459        1276 :     slot->last_saved_confirmed_flush = InvalidXLogRecPtr;
     460        1276 :     slot->last_saved_restart_lsn = InvalidXLogRecPtr;
     461        1276 :     slot->inactive_since = 0;
     462             : 
     463             :     /*
     464             :      * Create the slot on disk.  We haven't actually marked the slot allocated
     465             :      * yet, so no special cleanup is required if this errors out.
     466             :      */
     467        1276 :     CreateSlotOnDisk(slot);
     468             : 
     469             :     /*
     470             :      * We need to briefly prevent any other backend from iterating over the
     471             :      * slots while we flip the in_use flag. We also need to set the active
     472             :      * flag while holding the ControlLock as otherwise a concurrent
     473             :      * ReplicationSlotAcquire() could acquire the slot as well.
     474             :      */
     475        1276 :     LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
     476             : 
     477        1276 :     slot->in_use = true;
     478             : 
     479             :     /* We can now mark the slot active, and that makes it our slot. */
     480        1276 :     SpinLockAcquire(&slot->mutex);
     481             :     Assert(slot->active_pid == 0);
     482        1276 :     slot->active_pid = MyProcPid;
     483        1276 :     SpinLockRelease(&slot->mutex);
     484        1276 :     MyReplicationSlot = slot;
     485             : 
     486        1276 :     LWLockRelease(ReplicationSlotControlLock);
     487             : 
     488             :     /*
     489             :      * Create statistics entry for the new logical slot. We don't collect any
     490             :      * stats for physical slots, so no need to create an entry for the same.
     491             :      * See ReplicationSlotDropPtr for why we need to do this before releasing
     492             :      * ReplicationSlotAllocationLock.
     493             :      */
     494        1276 :     if (SlotIsLogical(slot))
     495         922 :         pgstat_create_replslot(slot);
     496             : 
     497             :     /*
     498             :      * Now that the slot has been marked as in_use and active, it's safe to
     499             :      * let somebody else try to allocate a slot.
     500             :      */
     501        1276 :     LWLockRelease(ReplicationSlotAllocationLock);
     502             : 
     503             :     /* Let everybody know we've modified this slot */
     504        1276 :     ConditionVariableBroadcast(&slot->active_cv);
     505        1276 : }
     506             : 
     507             : /*
     508             :  * Search for the named replication slot.
     509             :  *
     510             :  * Return the replication slot if found, otherwise NULL.
     511             :  */
     512             : ReplicationSlot *
     513        3716 : SearchNamedReplicationSlot(const char *name, bool need_lock)
     514             : {
     515             :     int         i;
     516        3716 :     ReplicationSlot *slot = NULL;
     517             : 
     518        3716 :     if (need_lock)
     519        1028 :         LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     520             : 
     521       13946 :     for (i = 0; i < max_replication_slots; i++)
     522             :     {
     523       13082 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
     524             : 
     525       13082 :         if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
     526             :         {
     527        2852 :             slot = s;
     528        2852 :             break;
     529             :         }
     530             :     }
     531             : 
     532        3716 :     if (need_lock)
     533        1028 :         LWLockRelease(ReplicationSlotControlLock);
     534             : 
     535        3716 :     return slot;
     536             : }
     537             : 
     538             : /*
     539             :  * Return the index of the replication slot in
     540             :  * ReplicationSlotCtl->replication_slots.
     541             :  *
     542             :  * This is mainly useful to have an efficient key for storing replication slot
     543             :  * stats.
     544             :  */
     545             : int
     546       14724 : ReplicationSlotIndex(ReplicationSlot *slot)
     547             : {
     548             :     Assert(slot >= ReplicationSlotCtl->replication_slots &&
     549             :            slot < ReplicationSlotCtl->replication_slots + max_replication_slots);
     550             : 
     551       14724 :     return slot - ReplicationSlotCtl->replication_slots;
     552             : }
     553             : 
     554             : /*
     555             :  * If the slot at 'index' is unused, return false. Otherwise 'name' is set to
     556             :  * the slot's name and true is returned.
     557             :  *
     558             :  * This likely is only useful for pgstat_replslot.c during shutdown, in other
     559             :  * cases there are obvious TOCTOU issues.
     560             :  */
     561             : bool
     562         158 : ReplicationSlotName(int index, Name name)
     563             : {
     564             :     ReplicationSlot *slot;
     565             :     bool        found;
     566             : 
     567         158 :     slot = &ReplicationSlotCtl->replication_slots[index];
     568             : 
     569             :     /*
     570             :      * Ensure that the slot cannot be dropped while we copy the name. Don't
     571             :      * need the spinlock as the name of an existing slot cannot change.
     572             :      */
     573         158 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     574         158 :     found = slot->in_use;
     575         158 :     if (slot->in_use)
     576         158 :         namestrcpy(name, NameStr(slot->data.name));
     577         158 :     LWLockRelease(ReplicationSlotControlLock);
     578             : 
     579         158 :     return found;
     580             : }
     581             : 
     582             : /*
     583             :  * Find a previously created slot and mark it as used by this process.
     584             :  *
     585             :  * An error is raised if nowait is true and the slot is currently in use. If
     586             :  * nowait is false, we sleep until the slot is released by the owning process.
     587             :  *
     588             :  * An error is raised if error_if_invalid is true and the slot is found to
     589             :  * be invalid. It should always be set to true, except when we are temporarily
     590             :  * acquiring the slot and don't intend to change it.
     591             :  */
     592             : void
     593        2542 : ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
     594             : {
     595             :     ReplicationSlot *s;
     596             :     int         active_pid;
     597             : 
     598             :     Assert(name != NULL);
     599             : 
     600        2542 : retry:
     601             :     Assert(MyReplicationSlot == NULL);
     602             : 
     603        2542 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     604             : 
     605             :     /* Check if the slot exits with the given name. */
     606        2542 :     s = SearchNamedReplicationSlot(name, false);
     607        2542 :     if (s == NULL || !s->in_use)
     608             :     {
     609          18 :         LWLockRelease(ReplicationSlotControlLock);
     610             : 
     611          18 :         ereport(ERROR,
     612             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
     613             :                  errmsg("replication slot \"%s\" does not exist",
     614             :                         name)));
     615             :     }
     616             : 
     617             :     /*
     618             :      * Do not allow users to acquire the reserved slot. This scenario may
     619             :      * occur if the launcher that owns the slot has terminated unexpectedly
     620             :      * due to an error, and a backend process attempts to reuse the slot.
     621             :      */
     622        2524 :     if (!IsLogicalLauncher() && IsSlotForConflictCheck(name))
     623           0 :         ereport(ERROR,
     624             :                 errcode(ERRCODE_UNDEFINED_OBJECT),
     625             :                 errmsg("cannot acquire replication slot \"%s\"", name),
     626             :                 errdetail("The slot is reserved for conflict detection and can only be acquired by logical replication launcher."));
     627             : 
     628             :     /*
     629             :      * This is the slot we want; check if it's active under some other
     630             :      * process.  In single user mode, we don't need this check.
     631             :      */
     632        2524 :     if (IsUnderPostmaster)
     633             :     {
     634             :         /*
     635             :          * Get ready to sleep on the slot in case it is active.  (We may end
     636             :          * up not sleeping, but we don't want to do this while holding the
     637             :          * spinlock.)
     638             :          */
     639        2524 :         if (!nowait)
     640         538 :             ConditionVariablePrepareToSleep(&s->active_cv);
     641             : 
     642             :         /*
     643             :          * It is important to reset the inactive_since under spinlock here to
     644             :          * avoid race conditions with slot invalidation. See comments related
     645             :          * to inactive_since in InvalidatePossiblyObsoleteSlot.
     646             :          */
     647        2524 :         SpinLockAcquire(&s->mutex);
     648        2524 :         if (s->active_pid == 0)
     649        2248 :             s->active_pid = MyProcPid;
     650        2524 :         active_pid = s->active_pid;
     651        2524 :         ReplicationSlotSetInactiveSince(s, 0, false);
     652        2524 :         SpinLockRelease(&s->mutex);
     653             :     }
     654             :     else
     655             :     {
     656           0 :         active_pid = MyProcPid;
     657           0 :         ReplicationSlotSetInactiveSince(s, 0, true);
     658             :     }
     659        2524 :     LWLockRelease(ReplicationSlotControlLock);
     660             : 
     661             :     /*
     662             :      * If we found the slot but it's already active in another process, we
     663             :      * wait until the owning process signals us that it's been released, or
     664             :      * error out.
     665             :      */
     666        2524 :     if (active_pid != MyProcPid)
     667             :     {
     668           0 :         if (!nowait)
     669             :         {
     670             :             /* Wait here until we get signaled, and then restart */
     671           0 :             ConditionVariableSleep(&s->active_cv,
     672             :                                    WAIT_EVENT_REPLICATION_SLOT_DROP);
     673           0 :             ConditionVariableCancelSleep();
     674           0 :             goto retry;
     675             :         }
     676             : 
     677           0 :         ereport(ERROR,
     678             :                 (errcode(ERRCODE_OBJECT_IN_USE),
     679             :                  errmsg("replication slot \"%s\" is active for PID %d",
     680             :                         NameStr(s->data.name), active_pid)));
     681             :     }
     682        2524 :     else if (!nowait)
     683         538 :         ConditionVariableCancelSleep(); /* no sleep needed after all */
     684             : 
     685             :     /* We made this slot active, so it's ours now. */
     686        2524 :     MyReplicationSlot = s;
     687             : 
     688             :     /*
     689             :      * We need to check for invalidation after making the slot ours to avoid
     690             :      * the possible race condition with the checkpointer that can otherwise
     691             :      * invalidate the slot immediately after the check.
     692             :      */
     693        2524 :     if (error_if_invalid && s->data.invalidated != RS_INVAL_NONE)
     694          14 :         ereport(ERROR,
     695             :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     696             :                 errmsg("can no longer access replication slot \"%s\"",
     697             :                        NameStr(s->data.name)),
     698             :                 errdetail("This replication slot has been invalidated due to \"%s\".",
     699             :                           GetSlotInvalidationCauseName(s->data.invalidated)));
     700             : 
     701             :     /* Let everybody know we've modified this slot */
     702        2510 :     ConditionVariableBroadcast(&s->active_cv);
     703             : 
     704             :     /*
     705             :      * The call to pgstat_acquire_replslot() protects against stats for a
     706             :      * different slot, from before a restart or such, being present during
     707             :      * pgstat_report_replslot().
     708             :      */
     709        2510 :     if (SlotIsLogical(s))
     710        2104 :         pgstat_acquire_replslot(s);
     711             : 
     712             : 
     713        2510 :     if (am_walsender)
     714             :     {
     715        1700 :         ereport(log_replication_commands ? LOG : DEBUG1,
     716             :                 SlotIsLogical(s)
     717             :                 ? errmsg("acquired logical replication slot \"%s\"",
     718             :                          NameStr(s->data.name))
     719             :                 : errmsg("acquired physical replication slot \"%s\"",
     720             :                          NameStr(s->data.name)));
     721             :     }
     722        2510 : }
     723             : 
     724             : /*
     725             :  * Release the replication slot that this backend considers to own.
     726             :  *
     727             :  * This or another backend can re-acquire the slot later.
     728             :  * Resources this slot requires will be preserved.
     729             :  */
     730             : void
     731        3034 : ReplicationSlotRelease(void)
     732             : {
     733        3034 :     ReplicationSlot *slot = MyReplicationSlot;
     734        3034 :     char       *slotname = NULL;    /* keep compiler quiet */
     735        3034 :     bool        is_logical = false; /* keep compiler quiet */
     736        3034 :     TimestampTz now = 0;
     737             : 
     738             :     Assert(slot != NULL && slot->active_pid != 0);
     739             : 
     740        3034 :     if (am_walsender)
     741             :     {
     742        2112 :         slotname = pstrdup(NameStr(slot->data.name));
     743        2112 :         is_logical = SlotIsLogical(slot);
     744             :     }
     745             : 
     746        3034 :     if (slot->data.persistency == RS_EPHEMERAL)
     747             :     {
     748             :         /*
     749             :          * Delete the slot. There is no !PANIC case where this is allowed to
     750             :          * fail, all that may happen is an incomplete cleanup of the on-disk
     751             :          * data.
     752             :          */
     753          10 :         ReplicationSlotDropAcquired();
     754             :     }
     755             : 
     756             :     /*
     757             :      * If slot needed to temporarily restrain both data and catalog xmin to
     758             :      * create the catalog snapshot, remove that temporary constraint.
     759             :      * Snapshots can only be exported while the initial snapshot is still
     760             :      * acquired.
     761             :      */
     762        3034 :     if (!TransactionIdIsValid(slot->data.xmin) &&
     763        2978 :         TransactionIdIsValid(slot->effective_xmin))
     764             :     {
     765         390 :         SpinLockAcquire(&slot->mutex);
     766         390 :         slot->effective_xmin = InvalidTransactionId;
     767         390 :         SpinLockRelease(&slot->mutex);
     768         390 :         ReplicationSlotsComputeRequiredXmin(false);
     769             :     }
     770             : 
     771             :     /*
     772             :      * Set the time since the slot has become inactive. We get the current
     773             :      * time beforehand to avoid system call while holding the spinlock.
     774             :      */
     775        3034 :     now = GetCurrentTimestamp();
     776             : 
     777        3034 :     if (slot->data.persistency == RS_PERSISTENT)
     778             :     {
     779             :         /*
     780             :          * Mark persistent slot inactive.  We're not freeing it, just
     781             :          * disconnecting, but wake up others that may be waiting for it.
     782             :          */
     783        2468 :         SpinLockAcquire(&slot->mutex);
     784        2468 :         slot->active_pid = 0;
     785        2468 :         ReplicationSlotSetInactiveSince(slot, now, false);
     786        2468 :         SpinLockRelease(&slot->mutex);
     787        2468 :         ConditionVariableBroadcast(&slot->active_cv);
     788             :     }
     789             :     else
     790         566 :         ReplicationSlotSetInactiveSince(slot, now, true);
     791             : 
     792        3034 :     MyReplicationSlot = NULL;
     793             : 
     794             :     /* might not have been set when we've been a plain slot */
     795        3034 :     LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
     796        3034 :     MyProc->statusFlags &= ~PROC_IN_LOGICAL_DECODING;
     797        3034 :     ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags;
     798        3034 :     LWLockRelease(ProcArrayLock);
     799             : 
     800        3034 :     if (am_walsender)
     801             :     {
     802        2112 :         ereport(log_replication_commands ? LOG : DEBUG1,
     803             :                 is_logical
     804             :                 ? errmsg("released logical replication slot \"%s\"",
     805             :                          slotname)
     806             :                 : errmsg("released physical replication slot \"%s\"",
     807             :                          slotname));
     808             : 
     809        2112 :         pfree(slotname);
     810             :     }
     811        3034 : }
     812             : 
     813             : /*
     814             :  * Cleanup temporary slots created in current session.
     815             :  *
     816             :  * Cleanup only synced temporary slots if 'synced_only' is true, else
     817             :  * cleanup all temporary slots.
     818             :  */
     819             : void
     820       86124 : ReplicationSlotCleanup(bool synced_only)
     821             : {
     822             :     int         i;
     823             : 
     824             :     Assert(MyReplicationSlot == NULL);
     825             : 
     826       86404 : restart:
     827       86404 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     828      936190 :     for (i = 0; i < max_replication_slots; i++)
     829             :     {
     830      850066 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
     831             : 
     832      850066 :         if (!s->in_use)
     833      822718 :             continue;
     834             : 
     835       27348 :         SpinLockAcquire(&s->mutex);
     836       27348 :         if ((s->active_pid == MyProcPid &&
     837         280 :              (!synced_only || s->data.synced)))
     838             :         {
     839             :             Assert(s->data.persistency == RS_TEMPORARY);
     840         280 :             SpinLockRelease(&s->mutex);
     841         280 :             LWLockRelease(ReplicationSlotControlLock);  /* avoid deadlock */
     842             : 
     843         280 :             ReplicationSlotDropPtr(s);
     844             : 
     845         280 :             ConditionVariableBroadcast(&s->active_cv);
     846         280 :             goto restart;
     847             :         }
     848             :         else
     849       27068 :             SpinLockRelease(&s->mutex);
     850             :     }
     851             : 
     852       86124 :     LWLockRelease(ReplicationSlotControlLock);
     853       86124 : }
     854             : 
     855             : /*
     856             :  * Permanently drop replication slot identified by the passed in name.
     857             :  */
     858             : void
     859         804 : ReplicationSlotDrop(const char *name, bool nowait)
     860             : {
     861             :     Assert(MyReplicationSlot == NULL);
     862             : 
     863         804 :     ReplicationSlotAcquire(name, nowait, false);
     864             : 
     865             :     /*
     866             :      * Do not allow users to drop the slots which are currently being synced
     867             :      * from the primary to the standby.
     868             :      */
     869         790 :     if (RecoveryInProgress() && MyReplicationSlot->data.synced)
     870           2 :         ereport(ERROR,
     871             :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     872             :                 errmsg("cannot drop replication slot \"%s\"", name),
     873             :                 errdetail("This replication slot is being synchronized from the primary server."));
     874             : 
     875         788 :     ReplicationSlotDropAcquired();
     876         788 : }
     877             : 
     878             : /*
     879             :  * Change the definition of the slot identified by the specified name.
     880             :  */
     881             : void
     882          12 : ReplicationSlotAlter(const char *name, const bool *failover,
     883             :                      const bool *two_phase)
     884             : {
     885          12 :     bool        update_slot = false;
     886             : 
     887             :     Assert(MyReplicationSlot == NULL);
     888             :     Assert(failover || two_phase);
     889             : 
     890          12 :     ReplicationSlotAcquire(name, false, true);
     891             : 
     892          10 :     if (SlotIsPhysical(MyReplicationSlot))
     893           0 :         ereport(ERROR,
     894             :                 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     895             :                 errmsg("cannot use %s with a physical replication slot",
     896             :                        "ALTER_REPLICATION_SLOT"));
     897             : 
     898          10 :     if (RecoveryInProgress())
     899             :     {
     900             :         /*
     901             :          * Do not allow users to alter the slots which are currently being
     902             :          * synced from the primary to the standby.
     903             :          */
     904           2 :         if (MyReplicationSlot->data.synced)
     905           2 :             ereport(ERROR,
     906             :                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     907             :                     errmsg("cannot alter replication slot \"%s\"", name),
     908             :                     errdetail("This replication slot is being synchronized from the primary server."));
     909             : 
     910             :         /*
     911             :          * Do not allow users to enable failover on the standby as we do not
     912             :          * support sync to the cascading standby.
     913             :          */
     914           0 :         if (failover && *failover)
     915           0 :             ereport(ERROR,
     916             :                     errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     917             :                     errmsg("cannot enable failover for a replication slot"
     918             :                            " on the standby"));
     919             :     }
     920             : 
     921           8 :     if (failover)
     922             :     {
     923             :         /*
     924             :          * Do not allow users to enable failover for temporary slots as we do
     925             :          * not support syncing temporary slots to the standby.
     926             :          */
     927           6 :         if (*failover && MyReplicationSlot->data.persistency == RS_TEMPORARY)
     928           0 :             ereport(ERROR,
     929             :                     errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     930             :                     errmsg("cannot enable failover for a temporary replication slot"));
     931             : 
     932           6 :         if (MyReplicationSlot->data.failover != *failover)
     933             :         {
     934           6 :             SpinLockAcquire(&MyReplicationSlot->mutex);
     935           6 :             MyReplicationSlot->data.failover = *failover;
     936           6 :             SpinLockRelease(&MyReplicationSlot->mutex);
     937             : 
     938           6 :             update_slot = true;
     939             :         }
     940             :     }
     941             : 
     942           8 :     if (two_phase && MyReplicationSlot->data.two_phase != *two_phase)
     943             :     {
     944           2 :         SpinLockAcquire(&MyReplicationSlot->mutex);
     945           2 :         MyReplicationSlot->data.two_phase = *two_phase;
     946           2 :         SpinLockRelease(&MyReplicationSlot->mutex);
     947             : 
     948           2 :         update_slot = true;
     949             :     }
     950             : 
     951           8 :     if (update_slot)
     952             :     {
     953           8 :         ReplicationSlotMarkDirty();
     954           8 :         ReplicationSlotSave();
     955             :     }
     956             : 
     957           8 :     ReplicationSlotRelease();
     958           8 : }
     959             : 
     960             : /*
     961             :  * Permanently drop the currently acquired replication slot.
     962             :  */
     963             : void
     964         814 : ReplicationSlotDropAcquired(void)
     965             : {
     966         814 :     ReplicationSlot *slot = MyReplicationSlot;
     967             : 
     968             :     Assert(MyReplicationSlot != NULL);
     969             : 
     970             :     /* slot isn't acquired anymore */
     971         814 :     MyReplicationSlot = NULL;
     972             : 
     973         814 :     ReplicationSlotDropPtr(slot);
     974         814 : }
     975             : 
     976             : /*
     977             :  * Permanently drop the replication slot which will be released by the point
     978             :  * this function returns.
     979             :  */
     980             : static void
     981        1094 : ReplicationSlotDropPtr(ReplicationSlot *slot)
     982             : {
     983             :     char        path[MAXPGPATH];
     984             :     char        tmppath[MAXPGPATH];
     985             : 
     986             :     /*
     987             :      * If some other backend ran this code concurrently with us, we might try
     988             :      * to delete a slot with a certain name while someone else was trying to
     989             :      * create a slot with the same name.
     990             :      */
     991        1094 :     LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
     992             : 
     993             :     /* Generate pathnames. */
     994        1094 :     sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(slot->data.name));
     995        1094 :     sprintf(tmppath, "%s/%s.tmp", PG_REPLSLOT_DIR, NameStr(slot->data.name));
     996             : 
     997             :     /*
     998             :      * Rename the slot directory on disk, so that we'll no longer recognize
     999             :      * this as a valid slot.  Note that if this fails, we've got to mark the
    1000             :      * slot inactive before bailing out.  If we're dropping an ephemeral or a
    1001             :      * temporary slot, we better never fail hard as the caller won't expect
    1002             :      * the slot to survive and this might get called during error handling.
    1003             :      */
    1004        1094 :     if (rename(path, tmppath) == 0)
    1005             :     {
    1006             :         /*
    1007             :          * We need to fsync() the directory we just renamed and its parent to
    1008             :          * make sure that our changes are on disk in a crash-safe fashion.  If
    1009             :          * fsync() fails, we can't be sure whether the changes are on disk or
    1010             :          * not.  For now, we handle that by panicking;
    1011             :          * StartupReplicationSlots() will try to straighten it out after
    1012             :          * restart.
    1013             :          */
    1014        1094 :         START_CRIT_SECTION();
    1015        1094 :         fsync_fname(tmppath, true);
    1016        1094 :         fsync_fname(PG_REPLSLOT_DIR, true);
    1017        1094 :         END_CRIT_SECTION();
    1018             :     }
    1019             :     else
    1020             :     {
    1021           0 :         bool        fail_softly = slot->data.persistency != RS_PERSISTENT;
    1022             : 
    1023           0 :         SpinLockAcquire(&slot->mutex);
    1024           0 :         slot->active_pid = 0;
    1025           0 :         SpinLockRelease(&slot->mutex);
    1026             : 
    1027             :         /* wake up anyone waiting on this slot */
    1028           0 :         ConditionVariableBroadcast(&slot->active_cv);
    1029             : 
    1030           0 :         ereport(fail_softly ? WARNING : ERROR,
    1031             :                 (errcode_for_file_access(),
    1032             :                  errmsg("could not rename file \"%s\" to \"%s\": %m",
    1033             :                         path, tmppath)));
    1034             :     }
    1035             : 
    1036             :     /*
    1037             :      * The slot is definitely gone.  Lock out concurrent scans of the array
    1038             :      * long enough to kill it.  It's OK to clear the active PID here without
    1039             :      * grabbing the mutex because nobody else can be scanning the array here,
    1040             :      * and nobody can be attached to this slot and thus access it without
    1041             :      * scanning the array.
    1042             :      *
    1043             :      * Also wake up processes waiting for it.
    1044             :      */
    1045        1094 :     LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
    1046        1094 :     slot->active_pid = 0;
    1047        1094 :     slot->in_use = false;
    1048        1094 :     LWLockRelease(ReplicationSlotControlLock);
    1049        1094 :     ConditionVariableBroadcast(&slot->active_cv);
    1050             : 
    1051             :     /*
    1052             :      * Slot is dead and doesn't prevent resource removal anymore, recompute
    1053             :      * limits.
    1054             :      */
    1055        1094 :     ReplicationSlotsComputeRequiredXmin(false);
    1056        1094 :     ReplicationSlotsComputeRequiredLSN();
    1057             : 
    1058             :     /*
    1059             :      * If removing the directory fails, the worst thing that will happen is
    1060             :      * that the user won't be able to create a new slot with the same name
    1061             :      * until the next server restart.  We warn about it, but that's all.
    1062             :      */
    1063        1094 :     if (!rmtree(tmppath, true))
    1064           0 :         ereport(WARNING,
    1065             :                 (errmsg("could not remove directory \"%s\"", tmppath)));
    1066             : 
    1067             :     /*
    1068             :      * Drop the statistics entry for the replication slot.  Do this while
    1069             :      * holding ReplicationSlotAllocationLock so that we don't drop a
    1070             :      * statistics entry for another slot with the same name just created in
    1071             :      * another session.
    1072             :      */
    1073        1094 :     if (SlotIsLogical(slot))
    1074         796 :         pgstat_drop_replslot(slot);
    1075             : 
    1076             :     /*
    1077             :      * We release this at the very end, so that nobody starts trying to create
    1078             :      * a slot while we're still cleaning up the detritus of the old one.
    1079             :      */
    1080        1094 :     LWLockRelease(ReplicationSlotAllocationLock);
    1081        1094 : }
    1082             : 
    1083             : /*
    1084             :  * Serialize the currently acquired slot's state from memory to disk, thereby
    1085             :  * guaranteeing the current state will survive a crash.
    1086             :  */
    1087             : void
    1088        2672 : ReplicationSlotSave(void)
    1089             : {
    1090             :     char        path[MAXPGPATH];
    1091             : 
    1092             :     Assert(MyReplicationSlot != NULL);
    1093             : 
    1094        2672 :     sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(MyReplicationSlot->data.name));
    1095        2672 :     SaveSlotToPath(MyReplicationSlot, path, ERROR);
    1096        2672 : }
    1097             : 
    1098             : /*
    1099             :  * Signal that it would be useful if the currently acquired slot would be
    1100             :  * flushed out to disk.
    1101             :  *
    1102             :  * Note that the actual flush to disk can be delayed for a long time, if
    1103             :  * required for correctness explicitly do a ReplicationSlotSave().
    1104             :  */
    1105             : void
    1106       54430 : ReplicationSlotMarkDirty(void)
    1107             : {
    1108       54430 :     ReplicationSlot *slot = MyReplicationSlot;
    1109             : 
    1110             :     Assert(MyReplicationSlot != NULL);
    1111             : 
    1112       54430 :     SpinLockAcquire(&slot->mutex);
    1113       54430 :     MyReplicationSlot->just_dirtied = true;
    1114       54430 :     MyReplicationSlot->dirty = true;
    1115       54430 :     SpinLockRelease(&slot->mutex);
    1116       54430 : }
    1117             : 
    1118             : /*
    1119             :  * Convert a slot that's marked as RS_EPHEMERAL or RS_TEMPORARY to a
    1120             :  * RS_PERSISTENT slot, guaranteeing it will be there after an eventual crash.
    1121             :  */
    1122             : void
    1123         896 : ReplicationSlotPersist(void)
    1124             : {
    1125         896 :     ReplicationSlot *slot = MyReplicationSlot;
    1126             : 
    1127             :     Assert(slot != NULL);
    1128             :     Assert(slot->data.persistency != RS_PERSISTENT);
    1129             : 
    1130         896 :     SpinLockAcquire(&slot->mutex);
    1131         896 :     slot->data.persistency = RS_PERSISTENT;
    1132         896 :     SpinLockRelease(&slot->mutex);
    1133             : 
    1134         896 :     ReplicationSlotMarkDirty();
    1135         896 :     ReplicationSlotSave();
    1136         896 : }
    1137             : 
    1138             : /*
    1139             :  * Compute the oldest xmin across all slots and store it in the ProcArray.
    1140             :  *
    1141             :  * If already_locked is true, ProcArrayLock has already been acquired
    1142             :  * exclusively.
    1143             :  */
    1144             : void
    1145        4602 : ReplicationSlotsComputeRequiredXmin(bool already_locked)
    1146             : {
    1147             :     int         i;
    1148        4602 :     TransactionId agg_xmin = InvalidTransactionId;
    1149        4602 :     TransactionId agg_catalog_xmin = InvalidTransactionId;
    1150             : 
    1151             :     Assert(ReplicationSlotCtl != NULL);
    1152             : 
    1153        4602 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    1154             : 
    1155       46492 :     for (i = 0; i < max_replication_slots; i++)
    1156             :     {
    1157       41890 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
    1158             :         TransactionId effective_xmin;
    1159             :         TransactionId effective_catalog_xmin;
    1160             :         bool        invalidated;
    1161             : 
    1162       41890 :         if (!s->in_use)
    1163       37614 :             continue;
    1164             : 
    1165        4276 :         SpinLockAcquire(&s->mutex);
    1166        4276 :         effective_xmin = s->effective_xmin;
    1167        4276 :         effective_catalog_xmin = s->effective_catalog_xmin;
    1168        4276 :         invalidated = s->data.invalidated != RS_INVAL_NONE;
    1169        4276 :         SpinLockRelease(&s->mutex);
    1170             : 
    1171             :         /* invalidated slots need not apply */
    1172        4276 :         if (invalidated)
    1173          44 :             continue;
    1174             : 
    1175             :         /* check the data xmin */
    1176        4232 :         if (TransactionIdIsValid(effective_xmin) &&
    1177          14 :             (!TransactionIdIsValid(agg_xmin) ||
    1178          14 :              TransactionIdPrecedes(effective_xmin, agg_xmin)))
    1179         588 :             agg_xmin = effective_xmin;
    1180             : 
    1181             :         /* check the catalog xmin */
    1182        4232 :         if (TransactionIdIsValid(effective_catalog_xmin) &&
    1183        1776 :             (!TransactionIdIsValid(agg_catalog_xmin) ||
    1184        1776 :              TransactionIdPrecedes(effective_catalog_xmin, agg_catalog_xmin)))
    1185        2244 :             agg_catalog_xmin = effective_catalog_xmin;
    1186             :     }
    1187             : 
    1188        4602 :     LWLockRelease(ReplicationSlotControlLock);
    1189             : 
    1190        4602 :     ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
    1191        4602 : }
    1192             : 
    1193             : /*
    1194             :  * Compute the oldest restart LSN across all slots and inform xlog module.
    1195             :  *
    1196             :  * Note: while max_slot_wal_keep_size is theoretically relevant for this
    1197             :  * purpose, we don't try to account for that, because this module doesn't
    1198             :  * know what to compare against.
    1199             :  */
    1200             : void
    1201       55742 : ReplicationSlotsComputeRequiredLSN(void)
    1202             : {
    1203             :     int         i;
    1204       55742 :     XLogRecPtr  min_required = InvalidXLogRecPtr;
    1205             : 
    1206             :     Assert(ReplicationSlotCtl != NULL);
    1207             : 
    1208       55742 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    1209      604060 :     for (i = 0; i < max_replication_slots; i++)
    1210             :     {
    1211      548318 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
    1212             :         XLogRecPtr  restart_lsn;
    1213             :         XLogRecPtr  last_saved_restart_lsn;
    1214             :         bool        invalidated;
    1215             :         ReplicationSlotPersistency persistency;
    1216             : 
    1217      548318 :         if (!s->in_use)
    1218      492194 :             continue;
    1219             : 
    1220       56124 :         SpinLockAcquire(&s->mutex);
    1221       56124 :         persistency = s->data.persistency;
    1222       56124 :         restart_lsn = s->data.restart_lsn;
    1223       56124 :         invalidated = s->data.invalidated != RS_INVAL_NONE;
    1224       56124 :         last_saved_restart_lsn = s->last_saved_restart_lsn;
    1225       56124 :         SpinLockRelease(&s->mutex);
    1226             : 
    1227             :         /* invalidated slots need not apply */
    1228       56124 :         if (invalidated)
    1229          46 :             continue;
    1230             : 
    1231             :         /*
    1232             :          * For persistent slot use last_saved_restart_lsn to compute the
    1233             :          * oldest LSN for removal of WAL segments.  The segments between
    1234             :          * last_saved_restart_lsn and restart_lsn might be needed by a
    1235             :          * persistent slot in the case of database crash.  Non-persistent
    1236             :          * slots can't survive the database crash, so we don't care about
    1237             :          * last_saved_restart_lsn for them.
    1238             :          */
    1239       56078 :         if (persistency == RS_PERSISTENT)
    1240             :         {
    1241       54440 :             if (last_saved_restart_lsn != InvalidXLogRecPtr &&
    1242             :                 restart_lsn > last_saved_restart_lsn)
    1243             :             {
    1244       49756 :                 restart_lsn = last_saved_restart_lsn;
    1245             :             }
    1246             :         }
    1247             : 
    1248       56078 :         if (restart_lsn != InvalidXLogRecPtr &&
    1249        2326 :             (min_required == InvalidXLogRecPtr ||
    1250             :              restart_lsn < min_required))
    1251       53884 :             min_required = restart_lsn;
    1252             :     }
    1253       55742 :     LWLockRelease(ReplicationSlotControlLock);
    1254             : 
    1255       55742 :     XLogSetReplicationSlotMinimumLSN(min_required);
    1256       55742 : }
    1257             : 
    1258             : /*
    1259             :  * Compute the oldest WAL LSN required by *logical* decoding slots..
    1260             :  *
    1261             :  * Returns InvalidXLogRecPtr if logical decoding is disabled or no logical
    1262             :  * slots exist.
    1263             :  *
    1264             :  * NB: this returns a value >= ReplicationSlotsComputeRequiredLSN(), since it
    1265             :  * ignores physical replication slots.
    1266             :  *
    1267             :  * The results aren't required frequently, so we don't maintain a precomputed
    1268             :  * value like we do for ComputeRequiredLSN() and ComputeRequiredXmin().
    1269             :  */
    1270             : XLogRecPtr
    1271        6776 : ReplicationSlotsComputeLogicalRestartLSN(void)
    1272             : {
    1273        6776 :     XLogRecPtr  result = InvalidXLogRecPtr;
    1274             :     int         i;
    1275             : 
    1276        6776 :     if (max_replication_slots <= 0)
    1277           4 :         return InvalidXLogRecPtr;
    1278             : 
    1279        6772 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    1280             : 
    1281       73400 :     for (i = 0; i < max_replication_slots; i++)
    1282             :     {
    1283             :         ReplicationSlot *s;
    1284             :         XLogRecPtr  restart_lsn;
    1285             :         XLogRecPtr  last_saved_restart_lsn;
    1286             :         bool        invalidated;
    1287             :         ReplicationSlotPersistency persistency;
    1288             : 
    1289       66628 :         s = &ReplicationSlotCtl->replication_slots[i];
    1290             : 
    1291             :         /* cannot change while ReplicationSlotCtlLock is held */
    1292       66628 :         if (!s->in_use)
    1293       65276 :             continue;
    1294             : 
    1295             :         /* we're only interested in logical slots */
    1296        1352 :         if (!SlotIsLogical(s))
    1297        1000 :             continue;
    1298             : 
    1299             :         /* read once, it's ok if it increases while we're checking */
    1300         352 :         SpinLockAcquire(&s->mutex);
    1301         352 :         persistency = s->data.persistency;
    1302         352 :         restart_lsn = s->data.restart_lsn;
    1303         352 :         invalidated = s->data.invalidated != RS_INVAL_NONE;
    1304         352 :         last_saved_restart_lsn = s->last_saved_restart_lsn;
    1305         352 :         SpinLockRelease(&s->mutex);
    1306             : 
    1307             :         /* invalidated slots need not apply */
    1308         352 :         if (invalidated)
    1309           8 :             continue;
    1310             : 
    1311             :         /*
    1312             :          * For persistent slot use last_saved_restart_lsn to compute the
    1313             :          * oldest LSN for removal of WAL segments.  The segments between
    1314             :          * last_saved_restart_lsn and restart_lsn might be needed by a
    1315             :          * persistent slot in the case of database crash.  Non-persistent
    1316             :          * slots can't survive the database crash, so we don't care about
    1317             :          * last_saved_restart_lsn for them.
    1318             :          */
    1319         344 :         if (persistency == RS_PERSISTENT)
    1320             :         {
    1321         340 :             if (last_saved_restart_lsn != InvalidXLogRecPtr &&
    1322             :                 restart_lsn > last_saved_restart_lsn)
    1323             :             {
    1324           0 :                 restart_lsn = last_saved_restart_lsn;
    1325             :             }
    1326             :         }
    1327             : 
    1328         344 :         if (restart_lsn == InvalidXLogRecPtr)
    1329           0 :             continue;
    1330             : 
    1331         344 :         if (result == InvalidXLogRecPtr ||
    1332             :             restart_lsn < result)
    1333         276 :             result = restart_lsn;
    1334             :     }
    1335             : 
    1336        6772 :     LWLockRelease(ReplicationSlotControlLock);
    1337             : 
    1338        6772 :     return result;
    1339             : }
    1340             : 
    1341             : /*
    1342             :  * ReplicationSlotsCountDBSlots -- count the number of slots that refer to the
    1343             :  * passed database oid.
    1344             :  *
    1345             :  * Returns true if there are any slots referencing the database. *nslots will
    1346             :  * be set to the absolute number of slots in the database, *nactive to ones
    1347             :  * currently active.
    1348             :  */
    1349             : bool
    1350          90 : ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
    1351             : {
    1352             :     int         i;
    1353             : 
    1354          90 :     *nslots = *nactive = 0;
    1355             : 
    1356          90 :     if (max_replication_slots <= 0)
    1357           0 :         return false;
    1358             : 
    1359          90 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    1360         932 :     for (i = 0; i < max_replication_slots; i++)
    1361             :     {
    1362             :         ReplicationSlot *s;
    1363             : 
    1364         842 :         s = &ReplicationSlotCtl->replication_slots[i];
    1365             : 
    1366             :         /* cannot change while ReplicationSlotCtlLock is held */
    1367         842 :         if (!s->in_use)
    1368         806 :             continue;
    1369             : 
    1370             :         /* only logical slots are database specific, skip */
    1371          36 :         if (!SlotIsLogical(s))
    1372          20 :             continue;
    1373             : 
    1374             :         /* not our database, skip */
    1375          16 :         if (s->data.database != dboid)
    1376          10 :             continue;
    1377             : 
    1378             :         /* NB: intentionally counting invalidated slots */
    1379             : 
    1380             :         /* count slots with spinlock held */
    1381           6 :         SpinLockAcquire(&s->mutex);
    1382           6 :         (*nslots)++;
    1383           6 :         if (s->active_pid != 0)
    1384           2 :             (*nactive)++;
    1385           6 :         SpinLockRelease(&s->mutex);
    1386             :     }
    1387          90 :     LWLockRelease(ReplicationSlotControlLock);
    1388             : 
    1389          90 :     if (*nslots > 0)
    1390           6 :         return true;
    1391          84 :     return false;
    1392             : }
    1393             : 
    1394             : /*
    1395             :  * ReplicationSlotsDropDBSlots -- Drop all db-specific slots relating to the
    1396             :  * passed database oid. The caller should hold an exclusive lock on the
    1397             :  * pg_database oid for the database to prevent creation of new slots on the db
    1398             :  * or replay from existing slots.
    1399             :  *
    1400             :  * Another session that concurrently acquires an existing slot on the target DB
    1401             :  * (most likely to drop it) may cause this function to ERROR. If that happens
    1402             :  * it may have dropped some but not all slots.
    1403             :  *
    1404             :  * This routine isn't as efficient as it could be - but we don't drop
    1405             :  * databases often, especially databases with lots of slots.
    1406             :  */
    1407             : void
    1408         114 : ReplicationSlotsDropDBSlots(Oid dboid)
    1409             : {
    1410             :     int         i;
    1411             : 
    1412         114 :     if (max_replication_slots <= 0)
    1413           0 :         return;
    1414             : 
    1415         114 : restart:
    1416         124 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    1417        1190 :     for (i = 0; i < max_replication_slots; i++)
    1418             :     {
    1419             :         ReplicationSlot *s;
    1420             :         char       *slotname;
    1421             :         int         active_pid;
    1422             : 
    1423        1076 :         s = &ReplicationSlotCtl->replication_slots[i];
    1424             : 
    1425             :         /* cannot change while ReplicationSlotCtlLock is held */
    1426        1076 :         if (!s->in_use)
    1427        1022 :             continue;
    1428             : 
    1429             :         /* only logical slots are database specific, skip */
    1430          54 :         if (!SlotIsLogical(s))
    1431          22 :             continue;
    1432             : 
    1433             :         /* not our database, skip */
    1434          32 :         if (s->data.database != dboid)
    1435          22 :             continue;
    1436             : 
    1437             :         /* NB: intentionally including invalidated slots */
    1438             : 
    1439             :         /* acquire slot, so ReplicationSlotDropAcquired can be reused  */
    1440          10 :         SpinLockAcquire(&s->mutex);
    1441             :         /* can't change while ReplicationSlotControlLock is held */
    1442          10 :         slotname = NameStr(s->data.name);
    1443          10 :         active_pid = s->active_pid;
    1444          10 :         if (active_pid == 0)
    1445             :         {
    1446          10 :             MyReplicationSlot = s;
    1447          10 :             s->active_pid = MyProcPid;
    1448             :         }
    1449          10 :         SpinLockRelease(&s->mutex);
    1450             : 
    1451             :         /*
    1452             :          * Even though we hold an exclusive lock on the database object a
    1453             :          * logical slot for that DB can still be active, e.g. if it's
    1454             :          * concurrently being dropped by a backend connected to another DB.
    1455             :          *
    1456             :          * That's fairly unlikely in practice, so we'll just bail out.
    1457             :          *
    1458             :          * The slot sync worker holds a shared lock on the database before
    1459             :          * operating on synced logical slots to avoid conflict with the drop
    1460             :          * happening here. The persistent synced slots are thus safe but there
    1461             :          * is a possibility that the slot sync worker has created a temporary
    1462             :          * slot (which stays active even on release) and we are trying to drop
    1463             :          * that here. In practice, the chances of hitting this scenario are
    1464             :          * less as during slot synchronization, the temporary slot is
    1465             :          * immediately converted to persistent and thus is safe due to the
    1466             :          * shared lock taken on the database. So, we'll just bail out in such
    1467             :          * a case.
    1468             :          *
    1469             :          * XXX: We can consider shutting down the slot sync worker before
    1470             :          * trying to drop synced temporary slots here.
    1471             :          */
    1472          10 :         if (active_pid)
    1473           0 :             ereport(ERROR,
    1474             :                     (errcode(ERRCODE_OBJECT_IN_USE),
    1475             :                      errmsg("replication slot \"%s\" is active for PID %d",
    1476             :                             slotname, active_pid)));
    1477             : 
    1478             :         /*
    1479             :          * To avoid duplicating ReplicationSlotDropAcquired() and to avoid
    1480             :          * holding ReplicationSlotControlLock over filesystem operations,
    1481             :          * release ReplicationSlotControlLock and use
    1482             :          * ReplicationSlotDropAcquired.
    1483             :          *
    1484             :          * As that means the set of slots could change, restart scan from the
    1485             :          * beginning each time we release the lock.
    1486             :          */
    1487          10 :         LWLockRelease(ReplicationSlotControlLock);
    1488          10 :         ReplicationSlotDropAcquired();
    1489          10 :         goto restart;
    1490             :     }
    1491         114 :     LWLockRelease(ReplicationSlotControlLock);
    1492             : }
    1493             : 
    1494             : 
    1495             : /*
    1496             :  * Check whether the server's configuration supports using replication
    1497             :  * slots.
    1498             :  */
    1499             : void
    1500        3422 : CheckSlotRequirements(void)
    1501             : {
    1502             :     /*
    1503             :      * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
    1504             :      * needs the same check.
    1505             :      */
    1506             : 
    1507        3422 :     if (max_replication_slots == 0)
    1508           0 :         ereport(ERROR,
    1509             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1510             :                  errmsg("replication slots can only be used if \"max_replication_slots\" > 0")));
    1511             : 
    1512        3422 :     if (wal_level < WAL_LEVEL_REPLICA)
    1513           0 :         ereport(ERROR,
    1514             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1515             :                  errmsg("replication slots can only be used if \"wal_level\" >= \"replica\"")));
    1516        3422 : }
    1517             : 
    1518             : /*
    1519             :  * Check whether the user has privilege to use replication slots.
    1520             :  */
    1521             : void
    1522        1096 : CheckSlotPermissions(void)
    1523             : {
    1524        1096 :     if (!has_rolreplication(GetUserId()))
    1525          10 :         ereport(ERROR,
    1526             :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    1527             :                  errmsg("permission denied to use replication slots"),
    1528             :                  errdetail("Only roles with the %s attribute may use replication slots.",
    1529             :                            "REPLICATION")));
    1530        1086 : }
    1531             : 
    1532             : /*
    1533             :  * Reserve WAL for the currently active slot.
    1534             :  *
    1535             :  * Compute and set restart_lsn in a manner that's appropriate for the type of
    1536             :  * the slot and concurrency safe.
    1537             :  */
    1538             : void
    1539        1194 : ReplicationSlotReserveWal(void)
    1540             : {
    1541        1194 :     ReplicationSlot *slot = MyReplicationSlot;
    1542             : 
    1543             :     Assert(slot != NULL);
    1544             :     Assert(slot->data.restart_lsn == InvalidXLogRecPtr);
    1545             :     Assert(slot->last_saved_restart_lsn == InvalidXLogRecPtr);
    1546             : 
    1547             :     /*
    1548             :      * The replication slot mechanism is used to prevent removal of required
    1549             :      * WAL. As there is no interlock between this routine and checkpoints, WAL
    1550             :      * segments could concurrently be removed when a now stale return value of
    1551             :      * ReplicationSlotsComputeRequiredLSN() is used. In the unlikely case that
    1552             :      * this happens we'll just retry.
    1553             :      */
    1554             :     while (true)
    1555           0 :     {
    1556             :         XLogSegNo   segno;
    1557             :         XLogRecPtr  restart_lsn;
    1558             : 
    1559             :         /*
    1560             :          * For logical slots log a standby snapshot and start logical decoding
    1561             :          * at exactly that position. That allows the slot to start up more
    1562             :          * quickly. But on a standby we cannot do WAL writes, so just use the
    1563             :          * replay pointer; effectively, an attempt to create a logical slot on
    1564             :          * standby will cause it to wait for an xl_running_xact record to be
    1565             :          * logged independently on the primary, so that a snapshot can be
    1566             :          * built using the record.
    1567             :          *
    1568             :          * None of this is needed (or indeed helpful) for physical slots as
    1569             :          * they'll start replay at the last logged checkpoint anyway. Instead
    1570             :          * return the location of the last redo LSN. While that slightly
    1571             :          * increases the chance that we have to retry, it's where a base
    1572             :          * backup has to start replay at.
    1573             :          */
    1574        1194 :         if (SlotIsPhysical(slot))
    1575         298 :             restart_lsn = GetRedoRecPtr();
    1576         896 :         else if (RecoveryInProgress())
    1577          46 :             restart_lsn = GetXLogReplayRecPtr(NULL);
    1578             :         else
    1579         850 :             restart_lsn = GetXLogInsertRecPtr();
    1580             : 
    1581        1194 :         SpinLockAcquire(&slot->mutex);
    1582        1194 :         slot->data.restart_lsn = restart_lsn;
    1583        1194 :         SpinLockRelease(&slot->mutex);
    1584             : 
    1585             :         /* prevent WAL removal as fast as possible */
    1586        1194 :         ReplicationSlotsComputeRequiredLSN();
    1587             : 
    1588             :         /*
    1589             :          * If all required WAL is still there, great, otherwise retry. The
    1590             :          * slot should prevent further removal of WAL, unless there's a
    1591             :          * concurrent ReplicationSlotsComputeRequiredLSN() after we've written
    1592             :          * the new restart_lsn above, so normally we should never need to loop
    1593             :          * more than twice.
    1594             :          */
    1595        1194 :         XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size);
    1596        1194 :         if (XLogGetLastRemovedSegno() < segno)
    1597        1194 :             break;
    1598             :     }
    1599             : 
    1600        1194 :     if (!RecoveryInProgress() && SlotIsLogical(slot))
    1601             :     {
    1602             :         XLogRecPtr  flushptr;
    1603             : 
    1604             :         /* make sure we have enough information to start */
    1605         850 :         flushptr = LogStandbySnapshot();
    1606             : 
    1607             :         /* and make sure it's fsynced to disk */
    1608         850 :         XLogFlush(flushptr);
    1609             :     }
    1610        1194 : }
    1611             : 
    1612             : /*
    1613             :  * Report that replication slot needs to be invalidated
    1614             :  */
    1615             : static void
    1616          42 : ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
    1617             :                        bool terminating,
    1618             :                        int pid,
    1619             :                        NameData slotname,
    1620             :                        XLogRecPtr restart_lsn,
    1621             :                        XLogRecPtr oldestLSN,
    1622             :                        TransactionId snapshotConflictHorizon,
    1623             :                        long slot_idle_seconds)
    1624             : {
    1625             :     StringInfoData err_detail;
    1626             :     StringInfoData err_hint;
    1627             : 
    1628          42 :     initStringInfo(&err_detail);
    1629          42 :     initStringInfo(&err_hint);
    1630             : 
    1631          42 :     switch (cause)
    1632             :     {
    1633          12 :         case RS_INVAL_WAL_REMOVED:
    1634             :             {
    1635          12 :                 uint64      ex = oldestLSN - restart_lsn;
    1636             : 
    1637          12 :                 appendStringInfo(&err_detail,
    1638          12 :                                  ngettext("The slot's restart_lsn %X/%08X exceeds the limit by %" PRIu64 " byte.",
    1639             :                                           "The slot's restart_lsn %X/%08X exceeds the limit by %" PRIu64 " bytes.",
    1640             :                                           ex),
    1641          12 :                                  LSN_FORMAT_ARGS(restart_lsn),
    1642             :                                  ex);
    1643             :                 /* translator: %s is a GUC variable name */
    1644          12 :                 appendStringInfo(&err_hint, _("You might need to increase \"%s\"."),
    1645             :                                  "max_slot_wal_keep_size");
    1646          12 :                 break;
    1647             :             }
    1648          24 :         case RS_INVAL_HORIZON:
    1649          24 :             appendStringInfo(&err_detail, _("The slot conflicted with xid horizon %u."),
    1650             :                              snapshotConflictHorizon);
    1651          24 :             break;
    1652             : 
    1653           6 :         case RS_INVAL_WAL_LEVEL:
    1654           6 :             appendStringInfoString(&err_detail, _("Logical decoding on standby requires \"wal_level\" >= \"logical\" on the primary server."));
    1655           6 :             break;
    1656             : 
    1657           0 :         case RS_INVAL_IDLE_TIMEOUT:
    1658             :             {
    1659             :                 /* translator: %s is a GUC variable name */
    1660           0 :                 appendStringInfo(&err_detail, _("The slot's idle time of %lds exceeds the configured \"%s\" duration of %ds."),
    1661             :                                  slot_idle_seconds, "idle_replication_slot_timeout",
    1662             :                                  idle_replication_slot_timeout_secs);
    1663             :                 /* translator: %s is a GUC variable name */
    1664           0 :                 appendStringInfo(&err_hint, _("You might need to increase \"%s\"."),
    1665             :                                  "idle_replication_slot_timeout");
    1666           0 :                 break;
    1667             :             }
    1668             :         case RS_INVAL_NONE:
    1669             :             pg_unreachable();
    1670             :     }
    1671             : 
    1672          42 :     ereport(LOG,
    1673             :             terminating ?
    1674             :             errmsg("terminating process %d to release replication slot \"%s\"",
    1675             :                    pid, NameStr(slotname)) :
    1676             :             errmsg("invalidating obsolete replication slot \"%s\"",
    1677             :                    NameStr(slotname)),
    1678             :             errdetail_internal("%s", err_detail.data),
    1679             :             err_hint.len ? errhint("%s", err_hint.data) : 0);
    1680             : 
    1681          42 :     pfree(err_detail.data);
    1682          42 :     pfree(err_hint.data);
    1683          42 : }
    1684             : 
    1685             : /*
    1686             :  * Can we invalidate an idle replication slot?
    1687             :  *
    1688             :  * Idle timeout invalidation is allowed only when:
    1689             :  *
    1690             :  * 1. Idle timeout is set
    1691             :  * 2. Slot has reserved WAL
    1692             :  * 3. Slot is inactive
    1693             :  * 4. The slot is not being synced from the primary while the server is in
    1694             :  *    recovery. This is because synced slots are always considered to be
    1695             :  *    inactive because they don't perform logical decoding to produce changes.
    1696             :  */
    1697             : static inline bool
    1698         642 : CanInvalidateIdleSlot(ReplicationSlot *s)
    1699             : {
    1700         642 :     return (idle_replication_slot_timeout_secs != 0 &&
    1701           0 :             !XLogRecPtrIsInvalid(s->data.restart_lsn) &&
    1702         642 :             s->inactive_since > 0 &&
    1703           0 :             !(RecoveryInProgress() && s->data.synced));
    1704             : }
    1705             : 
    1706             : /*
    1707             :  * DetermineSlotInvalidationCause - Determine the cause for which a slot
    1708             :  * becomes invalid among the given possible causes.
    1709             :  *
    1710             :  * This function sequentially checks all possible invalidation causes and
    1711             :  * returns the first one for which the slot is eligible for invalidation.
    1712             :  */
    1713             : static ReplicationSlotInvalidationCause
    1714         704 : DetermineSlotInvalidationCause(uint32 possible_causes, ReplicationSlot *s,
    1715             :                                XLogRecPtr oldestLSN, Oid dboid,
    1716             :                                TransactionId snapshotConflictHorizon,
    1717             :                                TransactionId initial_effective_xmin,
    1718             :                                TransactionId initial_catalog_effective_xmin,
    1719             :                                XLogRecPtr initial_restart_lsn,
    1720             :                                TimestampTz *inactive_since, TimestampTz now)
    1721             : {
    1722             :     Assert(possible_causes != RS_INVAL_NONE);
    1723             : 
    1724         704 :     if (possible_causes & RS_INVAL_WAL_REMOVED)
    1725             :     {
    1726         654 :         if (initial_restart_lsn != InvalidXLogRecPtr &&
    1727             :             initial_restart_lsn < oldestLSN)
    1728          12 :             return RS_INVAL_WAL_REMOVED;
    1729             :     }
    1730             : 
    1731         692 :     if (possible_causes & RS_INVAL_HORIZON)
    1732             :     {
    1733             :         /* invalid DB oid signals a shared relation */
    1734          44 :         if (SlotIsLogical(s) &&
    1735          34 :             (dboid == InvalidOid || dboid == s->data.database))
    1736             :         {
    1737          44 :             if (TransactionIdIsValid(initial_effective_xmin) &&
    1738           0 :                 TransactionIdPrecedesOrEquals(initial_effective_xmin,
    1739             :                                               snapshotConflictHorizon))
    1740           0 :                 return RS_INVAL_HORIZON;
    1741          88 :             else if (TransactionIdIsValid(initial_catalog_effective_xmin) &&
    1742          44 :                      TransactionIdPrecedesOrEquals(initial_catalog_effective_xmin,
    1743             :                                                    snapshotConflictHorizon))
    1744          24 :                 return RS_INVAL_HORIZON;
    1745             :         }
    1746             :     }
    1747             : 
    1748         668 :     if (possible_causes & RS_INVAL_WAL_LEVEL)
    1749             :     {
    1750           6 :         if (SlotIsLogical(s))
    1751           6 :             return RS_INVAL_WAL_LEVEL;
    1752             :     }
    1753             : 
    1754         662 :     if (possible_causes & RS_INVAL_IDLE_TIMEOUT)
    1755             :     {
    1756             :         Assert(now > 0);
    1757             : 
    1758         642 :         if (CanInvalidateIdleSlot(s))
    1759             :         {
    1760             :             /*
    1761             :              * Simulate the invalidation due to idle_timeout to test the
    1762             :              * timeout behavior promptly, without waiting for it to trigger
    1763             :              * naturally.
    1764             :              */
    1765             : #ifdef USE_INJECTION_POINTS
    1766           0 :             if (IS_INJECTION_POINT_ATTACHED("slot-timeout-inval"))
    1767             :             {
    1768           0 :                 *inactive_since = 0;    /* since the beginning of time */
    1769           0 :                 return RS_INVAL_IDLE_TIMEOUT;
    1770             :             }
    1771             : #endif
    1772             : 
    1773             :             /*
    1774             :              * Check if the slot needs to be invalidated due to
    1775             :              * idle_replication_slot_timeout GUC.
    1776             :              */
    1777           0 :             if (TimestampDifferenceExceedsSeconds(s->inactive_since, now,
    1778             :                                                   idle_replication_slot_timeout_secs))
    1779             :             {
    1780           0 :                 *inactive_since = s->inactive_since;
    1781           0 :                 return RS_INVAL_IDLE_TIMEOUT;
    1782             :             }
    1783             :         }
    1784             :     }
    1785             : 
    1786         662 :     return RS_INVAL_NONE;
    1787             : }
    1788             : 
    1789             : /*
    1790             :  * Helper for InvalidateObsoleteReplicationSlots
    1791             :  *
    1792             :  * Acquires the given slot and mark it invalid, if necessary and possible.
    1793             :  *
    1794             :  * Returns whether ReplicationSlotControlLock was released in the interim (and
    1795             :  * in that case we're not holding the lock at return, otherwise we are).
    1796             :  *
    1797             :  * Sets *invalidated true if the slot was invalidated. (Untouched otherwise.)
    1798             :  *
    1799             :  * This is inherently racy, because we release the LWLock
    1800             :  * for syscalls, so caller must restart if we return true.
    1801             :  */
    1802             : static bool
    1803         768 : InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
    1804             :                                ReplicationSlot *s,
    1805             :                                XLogRecPtr oldestLSN,
    1806             :                                Oid dboid, TransactionId snapshotConflictHorizon,
    1807             :                                bool *invalidated)
    1808             : {
    1809         768 :     int         last_signaled_pid = 0;
    1810         768 :     bool        released_lock = false;
    1811         768 :     bool        terminated = false;
    1812         768 :     TransactionId initial_effective_xmin = InvalidTransactionId;
    1813         768 :     TransactionId initial_catalog_effective_xmin = InvalidTransactionId;
    1814         768 :     XLogRecPtr  initial_restart_lsn = InvalidXLogRecPtr;
    1815         768 :     ReplicationSlotInvalidationCause invalidation_cause_prev PG_USED_FOR_ASSERTS_ONLY = RS_INVAL_NONE;
    1816         768 :     TimestampTz inactive_since = 0;
    1817             : 
    1818             :     for (;;)
    1819          14 :     {
    1820             :         XLogRecPtr  restart_lsn;
    1821             :         NameData    slotname;
    1822         782 :         int         active_pid = 0;
    1823         782 :         ReplicationSlotInvalidationCause invalidation_cause = RS_INVAL_NONE;
    1824         782 :         TimestampTz now = 0;
    1825         782 :         long        slot_idle_secs = 0;
    1826             : 
    1827             :         Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
    1828             : 
    1829         782 :         if (!s->in_use)
    1830             :         {
    1831           0 :             if (released_lock)
    1832           0 :                 LWLockRelease(ReplicationSlotControlLock);
    1833           0 :             break;
    1834             :         }
    1835             : 
    1836         782 :         if (possible_causes & RS_INVAL_IDLE_TIMEOUT)
    1837             :         {
    1838             :             /*
    1839             :              * Assign the current time here to avoid system call overhead
    1840             :              * while holding the spinlock in subsequent code.
    1841             :              */
    1842         674 :             now = GetCurrentTimestamp();
    1843             :         }
    1844             : 
    1845             :         /*
    1846             :          * Check if the slot needs to be invalidated. If it needs to be
    1847             :          * invalidated, and is not currently acquired, acquire it and mark it
    1848             :          * as having been invalidated.  We do this with the spinlock held to
    1849             :          * avoid race conditions -- for example the restart_lsn could move
    1850             :          * forward, or the slot could be dropped.
    1851             :          */
    1852         782 :         SpinLockAcquire(&s->mutex);
    1853             : 
    1854         782 :         restart_lsn = s->data.restart_lsn;
    1855             : 
    1856             :         /* we do nothing if the slot is already invalid */
    1857         782 :         if (s->data.invalidated == RS_INVAL_NONE)
    1858             :         {
    1859             :             /*
    1860             :              * The slot's mutex will be released soon, and it is possible that
    1861             :              * those values change since the process holding the slot has been
    1862             :              * terminated (if any), so record them here to ensure that we
    1863             :              * would report the correct invalidation cause.
    1864             :              *
    1865             :              * Unlike other slot attributes, slot's inactive_since can't be
    1866             :              * changed until the acquired slot is released or the owning
    1867             :              * process is terminated. So, the inactive slot can only be
    1868             :              * invalidated immediately without being terminated.
    1869             :              */
    1870         704 :             if (!terminated)
    1871             :             {
    1872         690 :                 initial_restart_lsn = s->data.restart_lsn;
    1873         690 :                 initial_effective_xmin = s->effective_xmin;
    1874         690 :                 initial_catalog_effective_xmin = s->effective_catalog_xmin;
    1875             :             }
    1876             : 
    1877         704 :             invalidation_cause = DetermineSlotInvalidationCause(possible_causes,
    1878             :                                                                 s, oldestLSN,
    1879             :                                                                 dboid,
    1880             :                                                                 snapshotConflictHorizon,
    1881             :                                                                 initial_effective_xmin,
    1882             :                                                                 initial_catalog_effective_xmin,
    1883             :                                                                 initial_restart_lsn,
    1884             :                                                                 &inactive_since,
    1885             :                                                                 now);
    1886             :         }
    1887             : 
    1888             :         /*
    1889             :          * The invalidation cause recorded previously should not change while
    1890             :          * the process owning the slot (if any) has been terminated.
    1891             :          */
    1892             :         Assert(!(invalidation_cause_prev != RS_INVAL_NONE && terminated &&
    1893             :                  invalidation_cause_prev != invalidation_cause));
    1894             : 
    1895             :         /* if there's no invalidation, we're done */
    1896         782 :         if (invalidation_cause == RS_INVAL_NONE)
    1897             :         {
    1898         740 :             SpinLockRelease(&s->mutex);
    1899         740 :             if (released_lock)
    1900           0 :                 LWLockRelease(ReplicationSlotControlLock);
    1901         740 :             break;
    1902             :         }
    1903             : 
    1904          42 :         slotname = s->data.name;
    1905          42 :         active_pid = s->active_pid;
    1906             : 
    1907             :         /*
    1908             :          * If the slot can be acquired, do so and mark it invalidated
    1909             :          * immediately.  Otherwise we'll signal the owning process, below, and
    1910             :          * retry.
    1911             :          */
    1912          42 :         if (active_pid == 0)
    1913             :         {
    1914          28 :             MyReplicationSlot = s;
    1915          28 :             s->active_pid = MyProcPid;
    1916          28 :             s->data.invalidated = invalidation_cause;
    1917             : 
    1918             :             /*
    1919             :              * XXX: We should consider not overwriting restart_lsn and instead
    1920             :              * just rely on .invalidated.
    1921             :              */
    1922          28 :             if (invalidation_cause == RS_INVAL_WAL_REMOVED)
    1923             :             {
    1924           8 :                 s->data.restart_lsn = InvalidXLogRecPtr;
    1925           8 :                 s->last_saved_restart_lsn = InvalidXLogRecPtr;
    1926             :             }
    1927             : 
    1928             :             /* Let caller know */
    1929          28 :             *invalidated = true;
    1930             :         }
    1931             : 
    1932          42 :         SpinLockRelease(&s->mutex);
    1933             : 
    1934             :         /*
    1935             :          * Calculate the idle time duration of the slot if slot is marked
    1936             :          * invalidated with RS_INVAL_IDLE_TIMEOUT.
    1937             :          */
    1938          42 :         if (invalidation_cause == RS_INVAL_IDLE_TIMEOUT)
    1939             :         {
    1940             :             int         slot_idle_usecs;
    1941             : 
    1942           0 :             TimestampDifference(inactive_since, now, &slot_idle_secs,
    1943             :                                 &slot_idle_usecs);
    1944             :         }
    1945             : 
    1946          42 :         if (active_pid != 0)
    1947             :         {
    1948             :             /*
    1949             :              * Prepare the sleep on the slot's condition variable before
    1950             :              * releasing the lock, to close a possible race condition if the
    1951             :              * slot is released before the sleep below.
    1952             :              */
    1953          14 :             ConditionVariablePrepareToSleep(&s->active_cv);
    1954             : 
    1955          14 :             LWLockRelease(ReplicationSlotControlLock);
    1956          14 :             released_lock = true;
    1957             : 
    1958             :             /*
    1959             :              * Signal to terminate the process that owns the slot, if we
    1960             :              * haven't already signalled it.  (Avoidance of repeated
    1961             :              * signalling is the only reason for there to be a loop in this
    1962             :              * routine; otherwise we could rely on caller's restart loop.)
    1963             :              *
    1964             :              * There is the race condition that other process may own the slot
    1965             :              * after its current owner process is terminated and before this
    1966             :              * process owns it. To handle that, we signal only if the PID of
    1967             :              * the owning process has changed from the previous time. (This
    1968             :              * logic assumes that the same PID is not reused very quickly.)
    1969             :              */
    1970          14 :             if (last_signaled_pid != active_pid)
    1971             :             {
    1972          14 :                 ReportSlotInvalidation(invalidation_cause, true, active_pid,
    1973             :                                        slotname, restart_lsn,
    1974             :                                        oldestLSN, snapshotConflictHorizon,
    1975             :                                        slot_idle_secs);
    1976             : 
    1977          14 :                 if (MyBackendType == B_STARTUP)
    1978          10 :                     (void) SendProcSignal(active_pid,
    1979             :                                           PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT,
    1980             :                                           INVALID_PROC_NUMBER);
    1981             :                 else
    1982           4 :                     (void) kill(active_pid, SIGTERM);
    1983             : 
    1984          14 :                 last_signaled_pid = active_pid;
    1985          14 :                 terminated = true;
    1986          14 :                 invalidation_cause_prev = invalidation_cause;
    1987             :             }
    1988             : 
    1989             :             /* Wait until the slot is released. */
    1990          14 :             ConditionVariableSleep(&s->active_cv,
    1991             :                                    WAIT_EVENT_REPLICATION_SLOT_DROP);
    1992             : 
    1993             :             /*
    1994             :              * Re-acquire lock and start over; we expect to invalidate the
    1995             :              * slot next time (unless another process acquires the slot in the
    1996             :              * meantime).
    1997             :              */
    1998          14 :             LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    1999          14 :             continue;
    2000             :         }
    2001             :         else
    2002             :         {
    2003             :             /*
    2004             :              * We hold the slot now and have already invalidated it; flush it
    2005             :              * to ensure that state persists.
    2006             :              *
    2007             :              * Don't want to hold ReplicationSlotControlLock across file
    2008             :              * system operations, so release it now but be sure to tell caller
    2009             :              * to restart from scratch.
    2010             :              */
    2011          28 :             LWLockRelease(ReplicationSlotControlLock);
    2012          28 :             released_lock = true;
    2013             : 
    2014             :             /* Make sure the invalidated state persists across server restart */
    2015          28 :             ReplicationSlotMarkDirty();
    2016          28 :             ReplicationSlotSave();
    2017          28 :             ReplicationSlotRelease();
    2018             : 
    2019          28 :             ReportSlotInvalidation(invalidation_cause, false, active_pid,
    2020             :                                    slotname, restart_lsn,
    2021             :                                    oldestLSN, snapshotConflictHorizon,
    2022             :                                    slot_idle_secs);
    2023             : 
    2024             :             /* done with this slot for now */
    2025          28 :             break;
    2026             :         }
    2027             :     }
    2028             : 
    2029             :     Assert(released_lock == !LWLockHeldByMe(ReplicationSlotControlLock));
    2030             : 
    2031         768 :     return released_lock;
    2032             : }
    2033             : 
    2034             : /*
    2035             :  * Invalidate slots that require resources about to be removed.
    2036             :  *
    2037             :  * Returns true when any slot have got invalidated.
    2038             :  *
    2039             :  * Whether a slot needs to be invalidated depends on the invalidation cause.
    2040             :  * A slot is invalidated if it:
    2041             :  * - RS_INVAL_WAL_REMOVED: requires a LSN older than the given segment
    2042             :  * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given
    2043             :  *   db; dboid may be InvalidOid for shared relations
    2044             :  * - RS_INVAL_WAL_LEVEL: is logical and wal_level is insufficient
    2045             :  * - RS_INVAL_IDLE_TIMEOUT: has been idle longer than the configured
    2046             :  *   "idle_replication_slot_timeout" duration.
    2047             :  *
    2048             :  * Note: This function attempts to invalidate the slot for multiple possible
    2049             :  * causes in a single pass, minimizing redundant iterations. The "cause"
    2050             :  * parameter can be a MASK representing one or more of the defined causes.
    2051             :  *
    2052             :  * NB - this runs as part of checkpoint, so avoid raising errors if possible.
    2053             :  */
    2054             : bool
    2055        3428 : InvalidateObsoleteReplicationSlots(uint32 possible_causes,
    2056             :                                    XLogSegNo oldestSegno, Oid dboid,
    2057             :                                    TransactionId snapshotConflictHorizon)
    2058             : {
    2059             :     XLogRecPtr  oldestLSN;
    2060        3428 :     bool        invalidated = false;
    2061             : 
    2062             :     Assert(!(possible_causes & RS_INVAL_HORIZON) || TransactionIdIsValid(snapshotConflictHorizon));
    2063             :     Assert(!(possible_causes & RS_INVAL_WAL_REMOVED) || oldestSegno > 0);
    2064             :     Assert(possible_causes != RS_INVAL_NONE);
    2065             : 
    2066        3428 :     if (max_replication_slots == 0)
    2067           2 :         return invalidated;
    2068             : 
    2069        3426 :     XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
    2070             : 
    2071        3454 : restart:
    2072        3454 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    2073       37008 :     for (int i = 0; i < max_replication_slots; i++)
    2074             :     {
    2075       33582 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
    2076             : 
    2077       33582 :         if (!s->in_use)
    2078       32800 :             continue;
    2079             : 
    2080             :         /* Prevent invalidation of logical slots during binary upgrade */
    2081         782 :         if (SlotIsLogical(s) && IsBinaryUpgrade)
    2082          14 :             continue;
    2083             : 
    2084         768 :         if (InvalidatePossiblyObsoleteSlot(possible_causes, s, oldestLSN, dboid,
    2085             :                                            snapshotConflictHorizon,
    2086             :                                            &invalidated))
    2087             :         {
    2088             :             /* if the lock was released, start from scratch */
    2089          28 :             goto restart;
    2090             :         }
    2091             :     }
    2092        3426 :     LWLockRelease(ReplicationSlotControlLock);
    2093             : 
    2094             :     /*
    2095             :      * If any slots have been invalidated, recalculate the resource limits.
    2096             :      */
    2097        3426 :     if (invalidated)
    2098             :     {
    2099          18 :         ReplicationSlotsComputeRequiredXmin(false);
    2100          18 :         ReplicationSlotsComputeRequiredLSN();
    2101             :     }
    2102             : 
    2103        3426 :     return invalidated;
    2104             : }
    2105             : 
    2106             : /*
    2107             :  * Flush all replication slots to disk.
    2108             :  *
    2109             :  * It is convenient to flush dirty replication slots at the time of checkpoint.
    2110             :  * Additionally, in case of a shutdown checkpoint, we also identify the slots
    2111             :  * for which the confirmed_flush LSN has been updated since the last time it
    2112             :  * was saved and flush them.
    2113             :  */
    2114             : void
    2115        3388 : CheckPointReplicationSlots(bool is_shutdown)
    2116             : {
    2117             :     int         i;
    2118        3388 :     bool        last_saved_restart_lsn_updated = false;
    2119             : 
    2120        3388 :     elog(DEBUG1, "performing replication slot checkpoint");
    2121             : 
    2122             :     /*
    2123             :      * Prevent any slot from being created/dropped while we're active. As we
    2124             :      * explicitly do *not* want to block iterating over replication_slots or
    2125             :      * acquiring a slot we cannot take the control lock - but that's OK,
    2126             :      * because holding ReplicationSlotAllocationLock is strictly stronger, and
    2127             :      * enough to guarantee that nobody can change the in_use bits on us.
    2128             :      */
    2129        3388 :     LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
    2130             : 
    2131       36702 :     for (i = 0; i < max_replication_slots; i++)
    2132             :     {
    2133       33314 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
    2134             :         char        path[MAXPGPATH];
    2135             : 
    2136       33314 :         if (!s->in_use)
    2137       32638 :             continue;
    2138             : 
    2139             :         /* save the slot to disk, locking is handled in SaveSlotToPath() */
    2140         676 :         sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(s->data.name));
    2141             : 
    2142             :         /*
    2143             :          * Slot's data is not flushed each time the confirmed_flush LSN is
    2144             :          * updated as that could lead to frequent writes.  However, we decide
    2145             :          * to force a flush of all logical slot's data at the time of shutdown
    2146             :          * if the confirmed_flush LSN is changed since we last flushed it to
    2147             :          * disk.  This helps in avoiding an unnecessary retreat of the
    2148             :          * confirmed_flush LSN after restart.
    2149             :          */
    2150         676 :         if (is_shutdown && SlotIsLogical(s))
    2151             :         {
    2152         124 :             SpinLockAcquire(&s->mutex);
    2153             : 
    2154         124 :             if (s->data.invalidated == RS_INVAL_NONE &&
    2155         124 :                 s->data.confirmed_flush > s->last_saved_confirmed_flush)
    2156             :             {
    2157          66 :                 s->just_dirtied = true;
    2158          66 :                 s->dirty = true;
    2159             :             }
    2160         124 :             SpinLockRelease(&s->mutex);
    2161             :         }
    2162             : 
    2163             :         /*
    2164             :          * Track if we're going to update slot's last_saved_restart_lsn. We
    2165             :          * need this to know if we need to recompute the required LSN.
    2166             :          */
    2167         676 :         if (s->last_saved_restart_lsn != s->data.restart_lsn)
    2168         384 :             last_saved_restart_lsn_updated = true;
    2169             : 
    2170         676 :         SaveSlotToPath(s, path, LOG);
    2171             :     }
    2172        3388 :     LWLockRelease(ReplicationSlotAllocationLock);
    2173             : 
    2174             :     /*
    2175             :      * Recompute the required LSN if SaveSlotToPath() updated
    2176             :      * last_saved_restart_lsn for any slot.
    2177             :      */
    2178        3388 :     if (last_saved_restart_lsn_updated)
    2179         384 :         ReplicationSlotsComputeRequiredLSN();
    2180        3388 : }
    2181             : 
    2182             : /*
    2183             :  * Load all replication slots from disk into memory at server startup. This
    2184             :  * needs to be run before we start crash recovery.
    2185             :  */
    2186             : void
    2187        1864 : StartupReplicationSlots(void)
    2188             : {
    2189             :     DIR        *replication_dir;
    2190             :     struct dirent *replication_de;
    2191             : 
    2192        1864 :     elog(DEBUG1, "starting up replication slots");
    2193             : 
    2194             :     /* restore all slots by iterating over all on-disk entries */
    2195        1864 :     replication_dir = AllocateDir(PG_REPLSLOT_DIR);
    2196        5752 :     while ((replication_de = ReadDir(replication_dir, PG_REPLSLOT_DIR)) != NULL)
    2197             :     {
    2198             :         char        path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
    2199             :         PGFileType  de_type;
    2200             : 
    2201        3890 :         if (strcmp(replication_de->d_name, ".") == 0 ||
    2202        2028 :             strcmp(replication_de->d_name, "..") == 0)
    2203        3724 :             continue;
    2204             : 
    2205         166 :         snprintf(path, sizeof(path), "%s/%s", PG_REPLSLOT_DIR, replication_de->d_name);
    2206         166 :         de_type = get_dirent_type(path, replication_de, false, DEBUG1);
    2207             : 
    2208             :         /* we're only creating directories here, skip if it's not our's */
    2209         166 :         if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_DIR)
    2210           0 :             continue;
    2211             : 
    2212             :         /* we crashed while a slot was being setup or deleted, clean up */
    2213         166 :         if (pg_str_endswith(replication_de->d_name, ".tmp"))
    2214             :         {
    2215           0 :             if (!rmtree(path, true))
    2216             :             {
    2217           0 :                 ereport(WARNING,
    2218             :                         (errmsg("could not remove directory \"%s\"",
    2219             :                                 path)));
    2220           0 :                 continue;
    2221             :             }
    2222           0 :             fsync_fname(PG_REPLSLOT_DIR, true);
    2223           0 :             continue;
    2224             :         }
    2225             : 
    2226             :         /* looks like a slot in a normal state, restore */
    2227         166 :         RestoreSlotFromDisk(replication_de->d_name);
    2228             :     }
    2229        1862 :     FreeDir(replication_dir);
    2230             : 
    2231             :     /* currently no slots exist, we're done. */
    2232        1862 :     if (max_replication_slots <= 0)
    2233           2 :         return;
    2234             : 
    2235             :     /* Now that we have recovered all the data, compute replication xmin */
    2236        1860 :     ReplicationSlotsComputeRequiredXmin(false);
    2237        1860 :     ReplicationSlotsComputeRequiredLSN();
    2238             : }
    2239             : 
    2240             : /* ----
    2241             :  * Manipulation of on-disk state of replication slots
    2242             :  *
    2243             :  * NB: none of the routines below should take any notice whether a slot is the
    2244             :  * current one or not, that's all handled a layer above.
    2245             :  * ----
    2246             :  */
    2247             : static void
    2248        1276 : CreateSlotOnDisk(ReplicationSlot *slot)
    2249             : {
    2250             :     char        tmppath[MAXPGPATH];
    2251             :     char        path[MAXPGPATH];
    2252             :     struct stat st;
    2253             : 
    2254             :     /*
    2255             :      * No need to take out the io_in_progress_lock, nobody else can see this
    2256             :      * slot yet, so nobody else will write. We're reusing SaveSlotToPath which
    2257             :      * takes out the lock, if we'd take the lock here, we'd deadlock.
    2258             :      */
    2259             : 
    2260        1276 :     sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(slot->data.name));
    2261        1276 :     sprintf(tmppath, "%s/%s.tmp", PG_REPLSLOT_DIR, NameStr(slot->data.name));
    2262             : 
    2263             :     /*
    2264             :      * It's just barely possible that some previous effort to create or drop a
    2265             :      * slot with this name left a temp directory lying around. If that seems
    2266             :      * to be the case, try to remove it.  If the rmtree() fails, we'll error
    2267             :      * out at the MakePGDirectory() below, so we don't bother checking
    2268             :      * success.
    2269             :      */
    2270        1276 :     if (stat(tmppath, &st) == 0 && S_ISDIR(st.st_mode))
    2271           0 :         rmtree(tmppath, true);
    2272             : 
    2273             :     /* Create and fsync the temporary slot directory. */
    2274        1276 :     if (MakePGDirectory(tmppath) < 0)
    2275           0 :         ereport(ERROR,
    2276             :                 (errcode_for_file_access(),
    2277             :                  errmsg("could not create directory \"%s\": %m",
    2278             :                         tmppath)));
    2279        1276 :     fsync_fname(tmppath, true);
    2280             : 
    2281             :     /* Write the actual state file. */
    2282        1276 :     slot->dirty = true;          /* signal that we really need to write */
    2283        1276 :     SaveSlotToPath(slot, tmppath, ERROR);
    2284             : 
    2285             :     /* Rename the directory into place. */
    2286        1276 :     if (rename(tmppath, path) != 0)
    2287           0 :         ereport(ERROR,
    2288             :                 (errcode_for_file_access(),
    2289             :                  errmsg("could not rename file \"%s\" to \"%s\": %m",
    2290             :                         tmppath, path)));
    2291             : 
    2292             :     /*
    2293             :      * If we'd now fail - really unlikely - we wouldn't know whether this slot
    2294             :      * would persist after an OS crash or not - so, force a restart. The
    2295             :      * restart would try to fsync this again till it works.
    2296             :      */
    2297        1276 :     START_CRIT_SECTION();
    2298             : 
    2299        1276 :     fsync_fname(path, true);
    2300        1276 :     fsync_fname(PG_REPLSLOT_DIR, true);
    2301             : 
    2302        1276 :     END_CRIT_SECTION();
    2303        1276 : }
    2304             : 
    2305             : /*
    2306             :  * Shared functionality between saving and creating a replication slot.
    2307             :  */
    2308             : static void
    2309        4624 : SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
    2310             : {
    2311             :     char        tmppath[MAXPGPATH];
    2312             :     char        path[MAXPGPATH];
    2313             :     int         fd;
    2314             :     ReplicationSlotOnDisk cp;
    2315             :     bool        was_dirty;
    2316             : 
    2317             :     /* first check whether there's something to write out */
    2318        4624 :     SpinLockAcquire(&slot->mutex);
    2319        4624 :     was_dirty = slot->dirty;
    2320        4624 :     slot->just_dirtied = false;
    2321        4624 :     SpinLockRelease(&slot->mutex);
    2322             : 
    2323             :     /* and don't do anything if there's nothing to write */
    2324        4624 :     if (!was_dirty)
    2325         206 :         return;
    2326             : 
    2327        4418 :     LWLockAcquire(&slot->io_in_progress_lock, LW_EXCLUSIVE);
    2328             : 
    2329             :     /* silence valgrind :( */
    2330        4418 :     memset(&cp, 0, sizeof(ReplicationSlotOnDisk));
    2331             : 
    2332        4418 :     sprintf(tmppath, "%s/state.tmp", dir);
    2333        4418 :     sprintf(path, "%s/state", dir);
    2334             : 
    2335        4418 :     fd = OpenTransientFile(tmppath, O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
    2336        4418 :     if (fd < 0)
    2337             :     {
    2338             :         /*
    2339             :          * If not an ERROR, then release the lock before returning.  In case
    2340             :          * of an ERROR, the error recovery path automatically releases the
    2341             :          * lock, but no harm in explicitly releasing even in that case.  Note
    2342             :          * that LWLockRelease() could affect errno.
    2343             :          */
    2344           0 :         int         save_errno = errno;
    2345             : 
    2346           0 :         LWLockRelease(&slot->io_in_progress_lock);
    2347           0 :         errno = save_errno;
    2348           0 :         ereport(elevel,
    2349             :                 (errcode_for_file_access(),
    2350             :                  errmsg("could not create file \"%s\": %m",
    2351             :                         tmppath)));
    2352           0 :         return;
    2353             :     }
    2354             : 
    2355        4418 :     cp.magic = SLOT_MAGIC;
    2356        4418 :     INIT_CRC32C(cp.checksum);
    2357        4418 :     cp.version = SLOT_VERSION;
    2358        4418 :     cp.length = ReplicationSlotOnDiskV2Size;
    2359             : 
    2360        4418 :     SpinLockAcquire(&slot->mutex);
    2361             : 
    2362        4418 :     memcpy(&cp.slotdata, &slot->data, sizeof(ReplicationSlotPersistentData));
    2363             : 
    2364        4418 :     SpinLockRelease(&slot->mutex);
    2365             : 
    2366        4418 :     COMP_CRC32C(cp.checksum,
    2367             :                 (char *) (&cp) + ReplicationSlotOnDiskNotChecksummedSize,
    2368             :                 ReplicationSlotOnDiskChecksummedSize);
    2369        4418 :     FIN_CRC32C(cp.checksum);
    2370             : 
    2371        4418 :     errno = 0;
    2372        4418 :     pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_WRITE);
    2373        4418 :     if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
    2374             :     {
    2375           0 :         int         save_errno = errno;
    2376             : 
    2377           0 :         pgstat_report_wait_end();
    2378           0 :         CloseTransientFile(fd);
    2379           0 :         LWLockRelease(&slot->io_in_progress_lock);
    2380             : 
    2381             :         /* if write didn't set errno, assume problem is no disk space */
    2382           0 :         errno = save_errno ? save_errno : ENOSPC;
    2383           0 :         ereport(elevel,
    2384             :                 (errcode_for_file_access(),
    2385             :                  errmsg("could not write to file \"%s\": %m",
    2386             :                         tmppath)));
    2387           0 :         return;
    2388             :     }
    2389        4418 :     pgstat_report_wait_end();
    2390             : 
    2391             :     /* fsync the temporary file */
    2392        4418 :     pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_SYNC);
    2393        4418 :     if (pg_fsync(fd) != 0)
    2394             :     {
    2395           0 :         int         save_errno = errno;
    2396             : 
    2397           0 :         pgstat_report_wait_end();
    2398           0 :         CloseTransientFile(fd);
    2399           0 :         LWLockRelease(&slot->io_in_progress_lock);
    2400           0 :         errno = save_errno;
    2401           0 :         ereport(elevel,
    2402             :                 (errcode_for_file_access(),
    2403             :                  errmsg("could not fsync file \"%s\": %m",
    2404             :                         tmppath)));
    2405           0 :         return;
    2406             :     }
    2407        4418 :     pgstat_report_wait_end();
    2408             : 
    2409        4418 :     if (CloseTransientFile(fd) != 0)
    2410             :     {
    2411           0 :         int         save_errno = errno;
    2412             : 
    2413           0 :         LWLockRelease(&slot->io_in_progress_lock);
    2414           0 :         errno = save_errno;
    2415           0 :         ereport(elevel,
    2416             :                 (errcode_for_file_access(),
    2417             :                  errmsg("could not close file \"%s\": %m",
    2418             :                         tmppath)));
    2419           0 :         return;
    2420             :     }
    2421             : 
    2422             :     /* rename to permanent file, fsync file and directory */
    2423        4418 :     if (rename(tmppath, path) != 0)
    2424             :     {
    2425           0 :         int         save_errno = errno;
    2426             : 
    2427           0 :         LWLockRelease(&slot->io_in_progress_lock);
    2428           0 :         errno = save_errno;
    2429           0 :         ereport(elevel,
    2430             :                 (errcode_for_file_access(),
    2431             :                  errmsg("could not rename file \"%s\" to \"%s\": %m",
    2432             :                         tmppath, path)));
    2433           0 :         return;
    2434             :     }
    2435             : 
    2436             :     /*
    2437             :      * Check CreateSlotOnDisk() for the reasoning of using a critical section.
    2438             :      */
    2439        4418 :     START_CRIT_SECTION();
    2440             : 
    2441        4418 :     fsync_fname(path, false);
    2442        4418 :     fsync_fname(dir, true);
    2443        4418 :     fsync_fname(PG_REPLSLOT_DIR, true);
    2444             : 
    2445        4418 :     END_CRIT_SECTION();
    2446             : 
    2447             :     /*
    2448             :      * Successfully wrote, unset dirty bit, unless somebody dirtied again
    2449             :      * already and remember the confirmed_flush LSN value.
    2450             :      */
    2451        4418 :     SpinLockAcquire(&slot->mutex);
    2452        4418 :     if (!slot->just_dirtied)
    2453        4394 :         slot->dirty = false;
    2454        4418 :     slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
    2455        4418 :     slot->last_saved_restart_lsn = cp.slotdata.restart_lsn;
    2456        4418 :     SpinLockRelease(&slot->mutex);
    2457             : 
    2458        4418 :     LWLockRelease(&slot->io_in_progress_lock);
    2459             : }
    2460             : 
    2461             : /*
    2462             :  * Load a single slot from disk into memory.
    2463             :  */
    2464             : static void
    2465         166 : RestoreSlotFromDisk(const char *name)
    2466             : {
    2467             :     ReplicationSlotOnDisk cp;
    2468             :     int         i;
    2469             :     char        slotdir[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
    2470             :     char        path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR) + 10];
    2471             :     int         fd;
    2472         166 :     bool        restored = false;
    2473             :     int         readBytes;
    2474             :     pg_crc32c   checksum;
    2475         166 :     TimestampTz now = 0;
    2476             : 
    2477             :     /* no need to lock here, no concurrent access allowed yet */
    2478             : 
    2479             :     /* delete temp file if it exists */
    2480         166 :     sprintf(slotdir, "%s/%s", PG_REPLSLOT_DIR, name);
    2481         166 :     sprintf(path, "%s/state.tmp", slotdir);
    2482         166 :     if (unlink(path) < 0 && errno != ENOENT)
    2483           0 :         ereport(PANIC,
    2484             :                 (errcode_for_file_access(),
    2485             :                  errmsg("could not remove file \"%s\": %m", path)));
    2486             : 
    2487         166 :     sprintf(path, "%s/state", slotdir);
    2488             : 
    2489         166 :     elog(DEBUG1, "restoring replication slot from \"%s\"", path);
    2490             : 
    2491             :     /* on some operating systems fsyncing a file requires O_RDWR */
    2492         166 :     fd = OpenTransientFile(path, O_RDWR | PG_BINARY);
    2493             : 
    2494             :     /*
    2495             :      * We do not need to handle this as we are rename()ing the directory into
    2496             :      * place only after we fsync()ed the state file.
    2497             :      */
    2498         166 :     if (fd < 0)
    2499           0 :         ereport(PANIC,
    2500             :                 (errcode_for_file_access(),
    2501             :                  errmsg("could not open file \"%s\": %m", path)));
    2502             : 
    2503             :     /*
    2504             :      * Sync state file before we're reading from it. We might have crashed
    2505             :      * while it wasn't synced yet and we shouldn't continue on that basis.
    2506             :      */
    2507         166 :     pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_RESTORE_SYNC);
    2508         166 :     if (pg_fsync(fd) != 0)
    2509           0 :         ereport(PANIC,
    2510             :                 (errcode_for_file_access(),
    2511             :                  errmsg("could not fsync file \"%s\": %m",
    2512             :                         path)));
    2513         166 :     pgstat_report_wait_end();
    2514             : 
    2515             :     /* Also sync the parent directory */
    2516         166 :     START_CRIT_SECTION();
    2517         166 :     fsync_fname(slotdir, true);
    2518         166 :     END_CRIT_SECTION();
    2519             : 
    2520             :     /* read part of statefile that's guaranteed to be version independent */
    2521         166 :     pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
    2522         166 :     readBytes = read(fd, &cp, ReplicationSlotOnDiskConstantSize);
    2523         166 :     pgstat_report_wait_end();
    2524         166 :     if (readBytes != ReplicationSlotOnDiskConstantSize)
    2525             :     {
    2526           0 :         if (readBytes < 0)
    2527           0 :             ereport(PANIC,
    2528             :                     (errcode_for_file_access(),
    2529             :                      errmsg("could not read file \"%s\": %m", path)));
    2530             :         else
    2531           0 :             ereport(PANIC,
    2532             :                     (errcode(ERRCODE_DATA_CORRUPTED),
    2533             :                      errmsg("could not read file \"%s\": read %d of %zu",
    2534             :                             path, readBytes,
    2535             :                             (Size) ReplicationSlotOnDiskConstantSize)));
    2536             :     }
    2537             : 
    2538             :     /* verify magic */
    2539         166 :     if (cp.magic != SLOT_MAGIC)
    2540           0 :         ereport(PANIC,
    2541             :                 (errcode(ERRCODE_DATA_CORRUPTED),
    2542             :                  errmsg("replication slot file \"%s\" has wrong magic number: %u instead of %u",
    2543             :                         path, cp.magic, SLOT_MAGIC)));
    2544             : 
    2545             :     /* verify version */
    2546         166 :     if (cp.version != SLOT_VERSION)
    2547           0 :         ereport(PANIC,
    2548             :                 (errcode(ERRCODE_DATA_CORRUPTED),
    2549             :                  errmsg("replication slot file \"%s\" has unsupported version %u",
    2550             :                         path, cp.version)));
    2551             : 
    2552             :     /* boundary check on length */
    2553         166 :     if (cp.length != ReplicationSlotOnDiskV2Size)
    2554           0 :         ereport(PANIC,
    2555             :                 (errcode(ERRCODE_DATA_CORRUPTED),
    2556             :                  errmsg("replication slot file \"%s\" has corrupted length %u",
    2557             :                         path, cp.length)));
    2558             : 
    2559             :     /* Now that we know the size, read the entire file */
    2560         166 :     pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
    2561         332 :     readBytes = read(fd,
    2562             :                      (char *) &cp + ReplicationSlotOnDiskConstantSize,
    2563         166 :                      cp.length);
    2564         166 :     pgstat_report_wait_end();
    2565         166 :     if (readBytes != cp.length)
    2566             :     {
    2567           0 :         if (readBytes < 0)
    2568           0 :             ereport(PANIC,
    2569             :                     (errcode_for_file_access(),
    2570             :                      errmsg("could not read file \"%s\": %m", path)));
    2571             :         else
    2572           0 :             ereport(PANIC,
    2573             :                     (errcode(ERRCODE_DATA_CORRUPTED),
    2574             :                      errmsg("could not read file \"%s\": read %d of %zu",
    2575             :                             path, readBytes, (Size) cp.length)));
    2576             :     }
    2577             : 
    2578         166 :     if (CloseTransientFile(fd) != 0)
    2579           0 :         ereport(PANIC,
    2580             :                 (errcode_for_file_access(),
    2581             :                  errmsg("could not close file \"%s\": %m", path)));
    2582             : 
    2583             :     /* now verify the CRC */
    2584         166 :     INIT_CRC32C(checksum);
    2585         166 :     COMP_CRC32C(checksum,
    2586             :                 (char *) &cp + ReplicationSlotOnDiskNotChecksummedSize,
    2587             :                 ReplicationSlotOnDiskChecksummedSize);
    2588         166 :     FIN_CRC32C(checksum);
    2589             : 
    2590         166 :     if (!EQ_CRC32C(checksum, cp.checksum))
    2591           0 :         ereport(PANIC,
    2592             :                 (errmsg("checksum mismatch for replication slot file \"%s\": is %u, should be %u",
    2593             :                         path, checksum, cp.checksum)));
    2594             : 
    2595             :     /*
    2596             :      * If we crashed with an ephemeral slot active, don't restore but delete
    2597             :      * it.
    2598             :      */
    2599         166 :     if (cp.slotdata.persistency != RS_PERSISTENT)
    2600             :     {
    2601           0 :         if (!rmtree(slotdir, true))
    2602             :         {
    2603           0 :             ereport(WARNING,
    2604             :                     (errmsg("could not remove directory \"%s\"",
    2605             :                             slotdir)));
    2606             :         }
    2607           0 :         fsync_fname(PG_REPLSLOT_DIR, true);
    2608           0 :         return;
    2609             :     }
    2610             : 
    2611             :     /*
    2612             :      * Verify that requirements for the specific slot type are met. That's
    2613             :      * important because if these aren't met we're not guaranteed to retain
    2614             :      * all the necessary resources for the slot.
    2615             :      *
    2616             :      * NB: We have to do so *after* the above checks for ephemeral slots,
    2617             :      * because otherwise a slot that shouldn't exist anymore could prevent
    2618             :      * restarts.
    2619             :      *
    2620             :      * NB: Changing the requirements here also requires adapting
    2621             :      * CheckSlotRequirements() and CheckLogicalDecodingRequirements().
    2622             :      */
    2623         166 :     if (cp.slotdata.database != InvalidOid)
    2624             :     {
    2625         116 :         if (wal_level < WAL_LEVEL_LOGICAL)
    2626           0 :             ereport(FATAL,
    2627             :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2628             :                      errmsg("logical replication slot \"%s\" exists, but \"wal_level\" < \"logical\"",
    2629             :                             NameStr(cp.slotdata.name)),
    2630             :                      errhint("Change \"wal_level\" to be \"logical\" or higher.")));
    2631             : 
    2632             :         /*
    2633             :          * In standby mode, the hot standby must be enabled. This check is
    2634             :          * necessary to ensure logical slots are invalidated when they become
    2635             :          * incompatible due to insufficient wal_level. Otherwise, if the
    2636             :          * primary reduces wal_level < logical while hot standby is disabled,
    2637             :          * logical slots would remain valid even after promotion.
    2638             :          */
    2639         116 :         if (StandbyMode && !EnableHotStandby)
    2640           2 :             ereport(FATAL,
    2641             :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2642             :                      errmsg("logical replication slot \"%s\" exists on the standby, but \"hot_standby\" = \"off\"",
    2643             :                             NameStr(cp.slotdata.name)),
    2644             :                      errhint("Change \"hot_standby\" to be \"on\".")));
    2645             :     }
    2646          50 :     else if (wal_level < WAL_LEVEL_REPLICA)
    2647           0 :         ereport(FATAL,
    2648             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2649             :                  errmsg("physical replication slot \"%s\" exists, but \"wal_level\" < \"replica\"",
    2650             :                         NameStr(cp.slotdata.name)),
    2651             :                  errhint("Change \"wal_level\" to be \"replica\" or higher.")));
    2652             : 
    2653             :     /* nothing can be active yet, don't lock anything */
    2654         220 :     for (i = 0; i < max_replication_slots; i++)
    2655             :     {
    2656             :         ReplicationSlot *slot;
    2657             : 
    2658         220 :         slot = &ReplicationSlotCtl->replication_slots[i];
    2659             : 
    2660         220 :         if (slot->in_use)
    2661          56 :             continue;
    2662             : 
    2663             :         /* restore the entire set of persistent data */
    2664         164 :         memcpy(&slot->data, &cp.slotdata,
    2665             :                sizeof(ReplicationSlotPersistentData));
    2666             : 
    2667             :         /* initialize in memory state */
    2668         164 :         slot->effective_xmin = cp.slotdata.xmin;
    2669         164 :         slot->effective_catalog_xmin = cp.slotdata.catalog_xmin;
    2670         164 :         slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
    2671         164 :         slot->last_saved_restart_lsn = cp.slotdata.restart_lsn;
    2672             : 
    2673         164 :         slot->candidate_catalog_xmin = InvalidTransactionId;
    2674         164 :         slot->candidate_xmin_lsn = InvalidXLogRecPtr;
    2675         164 :         slot->candidate_restart_lsn = InvalidXLogRecPtr;
    2676         164 :         slot->candidate_restart_valid = InvalidXLogRecPtr;
    2677             : 
    2678         164 :         slot->in_use = true;
    2679         164 :         slot->active_pid = 0;
    2680             : 
    2681             :         /*
    2682             :          * Set the time since the slot has become inactive after loading the
    2683             :          * slot from the disk into memory. Whoever acquires the slot i.e.
    2684             :          * makes the slot active will reset it. Use the same inactive_since
    2685             :          * time for all the slots.
    2686             :          */
    2687         164 :         if (now == 0)
    2688         164 :             now = GetCurrentTimestamp();
    2689             : 
    2690         164 :         ReplicationSlotSetInactiveSince(slot, now, false);
    2691             : 
    2692         164 :         restored = true;
    2693         164 :         break;
    2694             :     }
    2695             : 
    2696         164 :     if (!restored)
    2697           0 :         ereport(FATAL,
    2698             :                 (errmsg("too many replication slots active before shutdown"),
    2699             :                  errhint("Increase \"max_replication_slots\" and try again.")));
    2700             : }
    2701             : 
    2702             : /*
    2703             :  * Maps an invalidation reason for a replication slot to
    2704             :  * ReplicationSlotInvalidationCause.
    2705             :  */
    2706             : ReplicationSlotInvalidationCause
    2707           0 : GetSlotInvalidationCause(const char *cause_name)
    2708             : {
    2709             :     Assert(cause_name);
    2710             : 
    2711             :     /* Search lookup table for the cause having this name */
    2712           0 :     for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
    2713             :     {
    2714           0 :         if (strcmp(SlotInvalidationCauses[i].cause_name, cause_name) == 0)
    2715           0 :             return SlotInvalidationCauses[i].cause;
    2716             :     }
    2717             : 
    2718             :     Assert(false);
    2719           0 :     return RS_INVAL_NONE;       /* to keep compiler quiet */
    2720             : }
    2721             : 
    2722             : /*
    2723             :  * Maps an ReplicationSlotInvalidationCause to the invalidation
    2724             :  * reason for a replication slot.
    2725             :  */
    2726             : const char *
    2727          80 : GetSlotInvalidationCauseName(ReplicationSlotInvalidationCause cause)
    2728             : {
    2729             :     /* Search lookup table for the name of this cause */
    2730         250 :     for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
    2731             :     {
    2732         250 :         if (SlotInvalidationCauses[i].cause == cause)
    2733          80 :             return SlotInvalidationCauses[i].cause_name;
    2734             :     }
    2735             : 
    2736             :     Assert(false);
    2737           0 :     return "none";                /* to keep compiler quiet */
    2738             : }
    2739             : 
    2740             : /*
    2741             :  * A helper function to validate slots specified in GUC synchronized_standby_slots.
    2742             :  *
    2743             :  * The rawname will be parsed, and the result will be saved into *elemlist.
    2744             :  */
    2745             : static bool
    2746          12 : validate_sync_standby_slots(char *rawname, List **elemlist)
    2747             : {
    2748             :     bool        ok;
    2749             : 
    2750             :     /* Verify syntax and parse string into a list of identifiers */
    2751          12 :     ok = SplitIdentifierString(rawname, ',', elemlist);
    2752             : 
    2753          12 :     if (!ok)
    2754             :     {
    2755           0 :         GUC_check_errdetail("List syntax is invalid.");
    2756             :     }
    2757          12 :     else if (MyProc)
    2758             :     {
    2759             :         /*
    2760             :          * Check that each specified slot exist and is physical.
    2761             :          *
    2762             :          * Because we need an LWLock, we cannot do this on processes without a
    2763             :          * PGPROC, so we skip it there; but see comments in
    2764             :          * StandbySlotsHaveCaughtup() as to why that's not a problem.
    2765             :          */
    2766           6 :         LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    2767             : 
    2768          18 :         foreach_ptr(char, name, *elemlist)
    2769             :         {
    2770             :             ReplicationSlot *slot;
    2771             : 
    2772           6 :             slot = SearchNamedReplicationSlot(name, false);
    2773             : 
    2774           6 :             if (!slot)
    2775             :             {
    2776           0 :                 GUC_check_errdetail("Replication slot \"%s\" does not exist.",
    2777             :                                     name);
    2778           0 :                 ok = false;
    2779           0 :                 break;
    2780             :             }
    2781             : 
    2782           6 :             if (!SlotIsPhysical(slot))
    2783             :             {
    2784           0 :                 GUC_check_errdetail("\"%s\" is not a physical replication slot.",
    2785             :                                     name);
    2786           0 :                 ok = false;
    2787           0 :                 break;
    2788             :             }
    2789             :         }
    2790             : 
    2791           6 :         LWLockRelease(ReplicationSlotControlLock);
    2792             :     }
    2793             : 
    2794          12 :     return ok;
    2795             : }
    2796             : 
    2797             : /*
    2798             :  * GUC check_hook for synchronized_standby_slots
    2799             :  */
    2800             : bool
    2801        2246 : check_synchronized_standby_slots(char **newval, void **extra, GucSource source)
    2802             : {
    2803             :     char       *rawname;
    2804             :     char       *ptr;
    2805             :     List       *elemlist;
    2806             :     int         size;
    2807             :     bool        ok;
    2808             :     SyncStandbySlotsConfigData *config;
    2809             : 
    2810        2246 :     if ((*newval)[0] == '\0')
    2811        2234 :         return true;
    2812             : 
    2813             :     /* Need a modifiable copy of the GUC string */
    2814          12 :     rawname = pstrdup(*newval);
    2815             : 
    2816             :     /* Now verify if the specified slots exist and have correct type */
    2817          12 :     ok = validate_sync_standby_slots(rawname, &elemlist);
    2818             : 
    2819          12 :     if (!ok || elemlist == NIL)
    2820             :     {
    2821           0 :         pfree(rawname);
    2822           0 :         list_free(elemlist);
    2823           0 :         return ok;
    2824             :     }
    2825             : 
    2826             :     /* Compute the size required for the SyncStandbySlotsConfigData struct */
    2827          12 :     size = offsetof(SyncStandbySlotsConfigData, slot_names);
    2828          36 :     foreach_ptr(char, slot_name, elemlist)
    2829          12 :         size += strlen(slot_name) + 1;
    2830             : 
    2831             :     /* GUC extra value must be guc_malloc'd, not palloc'd */
    2832          12 :     config = (SyncStandbySlotsConfigData *) guc_malloc(LOG, size);
    2833          12 :     if (!config)
    2834           0 :         return false;
    2835             : 
    2836             :     /* Transform the data into SyncStandbySlotsConfigData */
    2837          12 :     config->nslotnames = list_length(elemlist);
    2838             : 
    2839          12 :     ptr = config->slot_names;
    2840          36 :     foreach_ptr(char, slot_name, elemlist)
    2841             :     {
    2842          12 :         strcpy(ptr, slot_name);
    2843          12 :         ptr += strlen(slot_name) + 1;
    2844             :     }
    2845             : 
    2846          12 :     *extra = config;
    2847             : 
    2848          12 :     pfree(rawname);
    2849          12 :     list_free(elemlist);
    2850          12 :     return true;
    2851             : }
    2852             : 
    2853             : /*
    2854             :  * GUC assign_hook for synchronized_standby_slots
    2855             :  */
    2856             : void
    2857        2246 : assign_synchronized_standby_slots(const char *newval, void *extra)
    2858             : {
    2859             :     /*
    2860             :      * The standby slots may have changed, so we must recompute the oldest
    2861             :      * LSN.
    2862             :      */
    2863        2246 :     ss_oldest_flush_lsn = InvalidXLogRecPtr;
    2864             : 
    2865        2246 :     synchronized_standby_slots_config = (SyncStandbySlotsConfigData *) extra;
    2866        2246 : }
    2867             : 
    2868             : /*
    2869             :  * Check if the passed slot_name is specified in the synchronized_standby_slots GUC.
    2870             :  */
    2871             : bool
    2872       50868 : SlotExistsInSyncStandbySlots(const char *slot_name)
    2873             : {
    2874             :     const char *standby_slot_name;
    2875             : 
    2876             :     /* Return false if there is no value in synchronized_standby_slots */
    2877       50868 :     if (synchronized_standby_slots_config == NULL)
    2878       50856 :         return false;
    2879             : 
    2880             :     /*
    2881             :      * XXX: We are not expecting this list to be long so a linear search
    2882             :      * shouldn't hurt but if that turns out not to be true then we can cache
    2883             :      * this information for each WalSender as well.
    2884             :      */
    2885          12 :     standby_slot_name = synchronized_standby_slots_config->slot_names;
    2886          12 :     for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
    2887             :     {
    2888          12 :         if (strcmp(standby_slot_name, slot_name) == 0)
    2889          12 :             return true;
    2890             : 
    2891           0 :         standby_slot_name += strlen(standby_slot_name) + 1;
    2892             :     }
    2893             : 
    2894           0 :     return false;
    2895             : }
    2896             : 
    2897             : /*
    2898             :  * Return true if the slots specified in synchronized_standby_slots have caught up to
    2899             :  * the given WAL location, false otherwise.
    2900             :  *
    2901             :  * The elevel parameter specifies the error level used for logging messages
    2902             :  * related to slots that do not exist, are invalidated, or are inactive.
    2903             :  */
    2904             : bool
    2905        1220 : StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
    2906             : {
    2907             :     const char *name;
    2908        1220 :     int         caught_up_slot_num = 0;
    2909        1220 :     XLogRecPtr  min_restart_lsn = InvalidXLogRecPtr;
    2910             : 
    2911             :     /*
    2912             :      * Don't need to wait for the standbys to catch up if there is no value in
    2913             :      * synchronized_standby_slots.
    2914             :      */
    2915        1220 :     if (synchronized_standby_slots_config == NULL)
    2916        1190 :         return true;
    2917             : 
    2918             :     /*
    2919             :      * Don't need to wait for the standbys to catch up if we are on a standby
    2920             :      * server, since we do not support syncing slots to cascading standbys.
    2921             :      */
    2922          30 :     if (RecoveryInProgress())
    2923           0 :         return true;
    2924             : 
    2925             :     /*
    2926             :      * Don't need to wait for the standbys to catch up if they are already
    2927             :      * beyond the specified WAL location.
    2928             :      */
    2929          30 :     if (!XLogRecPtrIsInvalid(ss_oldest_flush_lsn) &&
    2930          18 :         ss_oldest_flush_lsn >= wait_for_lsn)
    2931          12 :         return true;
    2932             : 
    2933             :     /*
    2934             :      * To prevent concurrent slot dropping and creation while filtering the
    2935             :      * slots, take the ReplicationSlotControlLock outside of the loop.
    2936             :      */
    2937          18 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    2938             : 
    2939          18 :     name = synchronized_standby_slots_config->slot_names;
    2940          28 :     for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
    2941             :     {
    2942             :         XLogRecPtr  restart_lsn;
    2943             :         bool        invalidated;
    2944             :         bool        inactive;
    2945             :         ReplicationSlot *slot;
    2946             : 
    2947          18 :         slot = SearchNamedReplicationSlot(name, false);
    2948             : 
    2949             :         /*
    2950             :          * If a slot name provided in synchronized_standby_slots does not
    2951             :          * exist, report a message and exit the loop.
    2952             :          *
    2953             :          * Though validate_sync_standby_slots (the GUC check_hook) tries to
    2954             :          * avoid this, it can nonetheless happen because the user can specify
    2955             :          * a nonexistent slot name before server startup. That function cannot
    2956             :          * validate such a slot during startup, as ReplicationSlotCtl is not
    2957             :          * initialized by then.  Also, the user might have dropped one slot.
    2958             :          */
    2959          18 :         if (!slot)
    2960             :         {
    2961           0 :             ereport(elevel,
    2962             :                     errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    2963             :                     errmsg("replication slot \"%s\" specified in parameter \"%s\" does not exist",
    2964             :                            name, "synchronized_standby_slots"),
    2965             :                     errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
    2966             :                               name),
    2967             :                     errhint("Create the replication slot \"%s\" or amend parameter \"%s\".",
    2968             :                             name, "synchronized_standby_slots"));
    2969           0 :             break;
    2970             :         }
    2971             : 
    2972             :         /* Same as above: if a slot is not physical, exit the loop. */
    2973          18 :         if (SlotIsLogical(slot))
    2974             :         {
    2975           0 :             ereport(elevel,
    2976             :                     errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    2977             :                     errmsg("cannot specify logical replication slot \"%s\" in parameter \"%s\"",
    2978             :                            name, "synchronized_standby_slots"),
    2979             :                     errdetail("Logical replication is waiting for correction on replication slot \"%s\".",
    2980             :                               name),
    2981             :                     errhint("Remove the logical replication slot \"%s\" from parameter \"%s\".",
    2982             :                             name, "synchronized_standby_slots"));
    2983           0 :             break;
    2984             :         }
    2985             : 
    2986          18 :         SpinLockAcquire(&slot->mutex);
    2987          18 :         restart_lsn = slot->data.restart_lsn;
    2988          18 :         invalidated = slot->data.invalidated != RS_INVAL_NONE;
    2989          18 :         inactive = slot->active_pid == 0;
    2990          18 :         SpinLockRelease(&slot->mutex);
    2991             : 
    2992          18 :         if (invalidated)
    2993             :         {
    2994             :             /* Specified physical slot has been invalidated */
    2995           0 :             ereport(elevel,
    2996             :                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2997             :                     errmsg("physical replication slot \"%s\" specified in parameter \"%s\" has been invalidated",
    2998             :                            name, "synchronized_standby_slots"),
    2999             :                     errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
    3000             :                               name),
    3001             :                     errhint("Drop and recreate the replication slot \"%s\", or amend parameter \"%s\".",
    3002             :                             name, "synchronized_standby_slots"));
    3003           0 :             break;
    3004             :         }
    3005             : 
    3006          18 :         if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn < wait_for_lsn)
    3007             :         {
    3008             :             /* Log a message if no active_pid for this physical slot */
    3009           8 :             if (inactive)
    3010           6 :                 ereport(elevel,
    3011             :                         errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    3012             :                         errmsg("replication slot \"%s\" specified in parameter \"%s\" does not have active_pid",
    3013             :                                name, "synchronized_standby_slots"),
    3014             :                         errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
    3015             :                                   name),
    3016             :                         errhint("Start the standby associated with the replication slot \"%s\", or amend parameter \"%s\".",
    3017             :                                 name, "synchronized_standby_slots"));
    3018             : 
    3019             :             /* Continue if the current slot hasn't caught up. */
    3020           8 :             break;
    3021             :         }
    3022             : 
    3023             :         Assert(restart_lsn >= wait_for_lsn);
    3024             : 
    3025          10 :         if (XLogRecPtrIsInvalid(min_restart_lsn) ||
    3026             :             min_restart_lsn > restart_lsn)
    3027          10 :             min_restart_lsn = restart_lsn;
    3028             : 
    3029          10 :         caught_up_slot_num++;
    3030             : 
    3031          10 :         name += strlen(name) + 1;
    3032             :     }
    3033             : 
    3034          18 :     LWLockRelease(ReplicationSlotControlLock);
    3035             : 
    3036             :     /*
    3037             :      * Return false if not all the standbys have caught up to the specified
    3038             :      * WAL location.
    3039             :      */
    3040          18 :     if (caught_up_slot_num != synchronized_standby_slots_config->nslotnames)
    3041           8 :         return false;
    3042             : 
    3043             :     /* The ss_oldest_flush_lsn must not retreat. */
    3044             :     Assert(XLogRecPtrIsInvalid(ss_oldest_flush_lsn) ||
    3045             :            min_restart_lsn >= ss_oldest_flush_lsn);
    3046             : 
    3047          10 :     ss_oldest_flush_lsn = min_restart_lsn;
    3048             : 
    3049          10 :     return true;
    3050             : }
    3051             : 
    3052             : /*
    3053             :  * Wait for physical standbys to confirm receiving the given lsn.
    3054             :  *
    3055             :  * Used by logical decoding SQL functions. It waits for physical standbys
    3056             :  * corresponding to the physical slots specified in the synchronized_standby_slots GUC.
    3057             :  */
    3058             : void
    3059         448 : WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
    3060             : {
    3061             :     /*
    3062             :      * Don't need to wait for the standby to catch up if the current acquired
    3063             :      * slot is not a logical failover slot, or there is no value in
    3064             :      * synchronized_standby_slots.
    3065             :      */
    3066         448 :     if (!MyReplicationSlot->data.failover || !synchronized_standby_slots_config)
    3067         446 :         return;
    3068             : 
    3069           2 :     ConditionVariablePrepareToSleep(&WalSndCtl->wal_confirm_rcv_cv);
    3070             : 
    3071             :     for (;;)
    3072             :     {
    3073           4 :         CHECK_FOR_INTERRUPTS();
    3074             : 
    3075           4 :         if (ConfigReloadPending)
    3076             :         {
    3077           2 :             ConfigReloadPending = false;
    3078           2 :             ProcessConfigFile(PGC_SIGHUP);
    3079             :         }
    3080             : 
    3081             :         /* Exit if done waiting for every slot. */
    3082           4 :         if (StandbySlotsHaveCaughtup(wait_for_lsn, WARNING))
    3083           2 :             break;
    3084             : 
    3085             :         /*
    3086             :          * Wait for the slots in the synchronized_standby_slots to catch up,
    3087             :          * but use a timeout (1s) so we can also check if the
    3088             :          * synchronized_standby_slots has been changed.
    3089             :          */
    3090           2 :         ConditionVariableTimedSleep(&WalSndCtl->wal_confirm_rcv_cv, 1000,
    3091             :                                     WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION);
    3092             :     }
    3093             : 
    3094           2 :     ConditionVariableCancelSleep();
    3095             : }

Generated by: LCOV version 1.16