LCOV - code coverage report
Current view: top level - src/backend/replication - slot.c (source / functions) Hit Total Coverage
Test: PostgreSQL 17devel Lines: 637 742 85.8 %
Date: 2024-02-22 00:11:39 Functions: 35 36 97.2 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * slot.c
       4             :  *     Replication slot management.
       5             :  *
       6             :  *
       7             :  * Copyright (c) 2012-2024, PostgreSQL Global Development Group
       8             :  *
       9             :  *
      10             :  * IDENTIFICATION
      11             :  *    src/backend/replication/slot.c
      12             :  *
      13             :  * NOTES
      14             :  *
      15             :  * Replication slots are used to keep state about replication streams
      16             :  * originating from this cluster.  Their primary purpose is to prevent the
      17             :  * premature removal of WAL or of old tuple versions in a manner that would
      18             :  * interfere with replication; they are also useful for monitoring purposes.
      19             :  * Slots need to be permanent (to allow restarts), crash-safe, and allocatable
      20             :  * on standbys (to support cascading setups).  The requirement that slots be
      21             :  * usable on standbys precludes storing them in the system catalogs.
      22             :  *
      23             :  * Each replication slot gets its own directory inside the $PGDATA/pg_replslot
      24             :  * directory. Inside that directory the state file will contain the slot's
      25             :  * own data. Additional data can be stored alongside that file if required.
      26             :  * While the server is running, the state data is also cached in memory for
      27             :  * efficiency.
      28             :  *
      29             :  * ReplicationSlotAllocationLock must be taken in exclusive mode to allocate
      30             :  * or free a slot. ReplicationSlotControlLock must be taken in shared mode
      31             :  * to iterate over the slots, and in exclusive mode to change the in_use flag
      32             :  * of a slot.  The remaining data in each slot is protected by its mutex.
      33             :  *
      34             :  *-------------------------------------------------------------------------
      35             :  */
      36             : 
      37             : #include "postgres.h"
      38             : 
      39             : #include <unistd.h>
      40             : #include <sys/stat.h>
      41             : 
      42             : #include "access/transam.h"
      43             : #include "access/xlog_internal.h"
      44             : #include "access/xlogrecovery.h"
      45             : #include "common/file_utils.h"
      46             : #include "common/string.h"
      47             : #include "miscadmin.h"
      48             : #include "pgstat.h"
      49             : #include "replication/slotsync.h"
      50             : #include "replication/slot.h"
      51             : #include "storage/fd.h"
      52             : #include "storage/ipc.h"
      53             : #include "storage/proc.h"
      54             : #include "storage/procarray.h"
      55             : #include "utils/builtins.h"
      56             : 
      57             : /*
      58             :  * Replication slot on-disk data structure.
      59             :  */
      60             : typedef struct ReplicationSlotOnDisk
      61             : {
      62             :     /* first part of this struct needs to be version independent */
      63             : 
      64             :     /* data not covered by checksum */
      65             :     uint32      magic;
      66             :     pg_crc32c   checksum;
      67             : 
      68             :     /* data covered by checksum */
      69             :     uint32      version;
      70             :     uint32      length;
      71             : 
      72             :     /*
      73             :      * The actual data in the slot that follows can differ based on the above
      74             :      * 'version'.
      75             :      */
      76             : 
      77             :     ReplicationSlotPersistentData slotdata;
      78             : } ReplicationSlotOnDisk;
      79             : 
      80             : /*
      81             :  * Lookup table for slot invalidation causes.
      82             :  */
      83             : const char *const SlotInvalidationCauses[] = {
      84             :     [RS_INVAL_NONE] = "none",
      85             :     [RS_INVAL_WAL_REMOVED] = "wal_removed",
      86             :     [RS_INVAL_HORIZON] = "rows_removed",
      87             :     [RS_INVAL_WAL_LEVEL] = "wal_level_insufficient",
      88             : };
      89             : 
      90             : /* Maximum number of invalidation causes */
      91             : #define RS_INVAL_MAX_CAUSES RS_INVAL_WAL_LEVEL
      92             : 
      93             : StaticAssertDecl(lengthof(SlotInvalidationCauses) == (RS_INVAL_MAX_CAUSES + 1),
      94             :                  "array length mismatch");
      95             : 
      96             : /* size of version independent data */
      97             : #define ReplicationSlotOnDiskConstantSize \
      98             :     offsetof(ReplicationSlotOnDisk, slotdata)
      99             : /* size of the part of the slot not covered by the checksum */
     100             : #define ReplicationSlotOnDiskNotChecksummedSize  \
     101             :     offsetof(ReplicationSlotOnDisk, version)
     102             : /* size of the part covered by the checksum */
     103             : #define ReplicationSlotOnDiskChecksummedSize \
     104             :     sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskNotChecksummedSize
     105             : /* size of the slot data that is version dependent */
     106             : #define ReplicationSlotOnDiskV2Size \
     107             :     sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
     108             : 
     109             : #define SLOT_MAGIC      0x1051CA1   /* format identifier */
     110             : #define SLOT_VERSION    5       /* version for new files */
     111             : 
     112             : /* Control array for replication slot management */
     113             : ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
     114             : 
     115             : /* My backend's replication slot in the shared memory array */
     116             : ReplicationSlot *MyReplicationSlot = NULL;
     117             : 
     118             : /* GUC variable */
     119             : int         max_replication_slots = 10; /* the maximum number of replication
     120             :                                          * slots */
     121             : 
     122             : static void ReplicationSlotShmemExit(int code, Datum arg);
     123             : static void ReplicationSlotDropPtr(ReplicationSlot *slot);
     124             : 
     125             : /* internal persistency functions */
     126             : static void RestoreSlotFromDisk(const char *name);
     127             : static void CreateSlotOnDisk(ReplicationSlot *slot);
     128             : static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel);
     129             : 
     130             : /*
     131             :  * Report shared-memory space needed by ReplicationSlotsShmemInit.
     132             :  */
     133             : Size
     134        6352 : ReplicationSlotsShmemSize(void)
     135             : {
     136        6352 :     Size        size = 0;
     137             : 
     138        6352 :     if (max_replication_slots == 0)
     139           4 :         return size;
     140             : 
     141        6348 :     size = offsetof(ReplicationSlotCtlData, replication_slots);
     142        6348 :     size = add_size(size,
     143             :                     mul_size(max_replication_slots, sizeof(ReplicationSlot)));
     144             : 
     145        6348 :     return size;
     146             : }
     147             : 
     148             : /*
     149             :  * Allocate and initialize shared memory for replication slots.
     150             :  */
     151             : void
     152        1638 : ReplicationSlotsShmemInit(void)
     153             : {
     154             :     bool        found;
     155             : 
     156        1638 :     if (max_replication_slots == 0)
     157           2 :         return;
     158             : 
     159        1636 :     ReplicationSlotCtl = (ReplicationSlotCtlData *)
     160        1636 :         ShmemInitStruct("ReplicationSlot Ctl", ReplicationSlotsShmemSize(),
     161             :                         &found);
     162             : 
     163        1636 :     if (!found)
     164             :     {
     165             :         int         i;
     166             : 
     167             :         /* First time through, so initialize */
     168        3132 :         MemSet(ReplicationSlotCtl, 0, ReplicationSlotsShmemSize());
     169             : 
     170       17656 :         for (i = 0; i < max_replication_slots; i++)
     171             :         {
     172       16020 :             ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i];
     173             : 
     174             :             /* everything else is zeroed by the memset above */
     175       16020 :             SpinLockInit(&slot->mutex);
     176       16020 :             LWLockInitialize(&slot->io_in_progress_lock,
     177             :                              LWTRANCHE_REPLICATION_SLOT_IO);
     178       16020 :             ConditionVariableInit(&slot->active_cv);
     179             :         }
     180             :     }
     181             : }
     182             : 
     183             : /*
     184             :  * Register the callback for replication slot cleanup and releasing.
     185             :  */
     186             : void
     187       28828 : ReplicationSlotInitialize(void)
     188             : {
     189       28828 :     before_shmem_exit(ReplicationSlotShmemExit, 0);
     190       28828 : }
     191             : 
     192             : /*
     193             :  * Release and cleanup replication slots.
     194             :  */
     195             : static void
     196       28828 : ReplicationSlotShmemExit(int code, Datum arg)
     197             : {
     198             :     /* Make sure active replication slots are released */
     199       28828 :     if (MyReplicationSlot != NULL)
     200         338 :         ReplicationSlotRelease();
     201             : 
     202             :     /* Also cleanup all the temporary slots. */
     203       28828 :     ReplicationSlotCleanup();
     204       28828 : }
     205             : 
     206             : /*
     207             :  * Check whether the passed slot name is valid and report errors at elevel.
     208             :  *
     209             :  * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow
     210             :  * the name to be used as a directory name on every supported OS.
     211             :  *
     212             :  * Returns whether the directory name is valid or not if elevel < ERROR.
     213             :  */
     214             : bool
     215        1356 : ReplicationSlotValidateName(const char *name, int elevel)
     216             : {
     217             :     const char *cp;
     218             : 
     219        1356 :     if (strlen(name) == 0)
     220             :     {
     221           6 :         ereport(elevel,
     222             :                 (errcode(ERRCODE_INVALID_NAME),
     223             :                  errmsg("replication slot name \"%s\" is too short",
     224             :                         name)));
     225           0 :         return false;
     226             :     }
     227             : 
     228        1350 :     if (strlen(name) >= NAMEDATALEN)
     229             :     {
     230           0 :         ereport(elevel,
     231             :                 (errcode(ERRCODE_NAME_TOO_LONG),
     232             :                  errmsg("replication slot name \"%s\" is too long",
     233             :                         name)));
     234           0 :         return false;
     235             :     }
     236             : 
     237       28866 :     for (cp = name; *cp; cp++)
     238             :     {
     239       27518 :         if (!((*cp >= 'a' && *cp <= 'z')
     240       14162 :               || (*cp >= '0' && *cp <= '9')
     241        2650 :               || (*cp == '_')))
     242             :         {
     243           2 :             ereport(elevel,
     244             :                     (errcode(ERRCODE_INVALID_NAME),
     245             :                      errmsg("replication slot name \"%s\" contains invalid character",
     246             :                             name),
     247             :                      errhint("Replication slot names may only contain lower case letters, numbers, and the underscore character.")));
     248           0 :             return false;
     249             :         }
     250             :     }
     251        1348 :     return true;
     252             : }
     253             : 
     254             : /*
     255             :  * Create a new replication slot and mark it as used by this backend.
     256             :  *
     257             :  * name: Name of the slot
     258             :  * db_specific: logical decoding is db specific; if the slot is going to
     259             :  *     be used for that pass true, otherwise false.
     260             :  * two_phase: Allows decoding of prepared transactions. We allow this option
     261             :  *     to be enabled only at the slot creation time. If we allow this option
     262             :  *     to be changed during decoding then it is quite possible that we skip
     263             :  *     prepare first time because this option was not enabled. Now next time
     264             :  *     during getting changes, if the two_phase option is enabled it can skip
     265             :  *     prepare because by that time start decoding point has been moved. So the
     266             :  *     user will only get commit prepared.
     267             :  * failover: If enabled, allows the slot to be synced to standbys so
     268             :  *     that logical replication can be resumed after failover.
     269             :  * synced: True if the slot is synchronized from the primary server.
     270             :  */
     271             : void
     272        1064 : ReplicationSlotCreate(const char *name, bool db_specific,
     273             :                       ReplicationSlotPersistency persistency,
     274             :                       bool two_phase, bool failover, bool synced)
     275             : {
     276        1064 :     ReplicationSlot *slot = NULL;
     277             :     int         i;
     278             : 
     279             :     Assert(MyReplicationSlot == NULL);
     280             : 
     281        1064 :     ReplicationSlotValidateName(name, ERROR);
     282             : 
     283        1062 :     if (failover)
     284             :     {
     285             :         /*
     286             :          * Do not allow users to create the failover enabled slots on the
     287             :          * standby as we do not support sync to the cascading standby.
     288             :          *
     289             :          * However, failover enabled slots can be created during slot
     290             :          * synchronization because we need to retain the same values as the
     291             :          * remote slot.
     292             :          */
     293          24 :         if (RecoveryInProgress() && !IsSyncingReplicationSlots())
     294           0 :             ereport(ERROR,
     295             :                     errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     296             :                     errmsg("cannot enable failover for a replication slot created on the standby"));
     297             : 
     298             :         /*
     299             :          * Do not allow users to create failover enabled temporary slots,
     300             :          * because temporary slots will not be synced to the standby.
     301             :          *
     302             :          * However, failover enabled temporary slots can be created during
     303             :          * slot synchronization. See the comments atop slotsync.c for details.
     304             :          */
     305          24 :         if (persistency == RS_TEMPORARY && !IsSyncingReplicationSlots())
     306           2 :             ereport(ERROR,
     307             :                     errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     308             :                     errmsg("cannot enable failover for a temporary replication slot"));
     309             :     }
     310             : 
     311             :     /*
     312             :      * If some other backend ran this code concurrently with us, we'd likely
     313             :      * both allocate the same slot, and that would be bad.  We'd also be at
     314             :      * risk of missing a name collision.  Also, we don't want to try to create
     315             :      * a new slot while somebody's busy cleaning up an old one, because we
     316             :      * might both be monkeying with the same directory.
     317             :      */
     318        1060 :     LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
     319             : 
     320             :     /*
     321             :      * Check for name collision, and identify an allocatable slot.  We need to
     322             :      * hold ReplicationSlotControlLock in shared mode for this, so that nobody
     323             :      * else can change the in_use flags while we're looking at them.
     324             :      */
     325        1060 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     326        9980 :     for (i = 0; i < max_replication_slots; i++)
     327             :     {
     328        8926 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
     329             : 
     330        8926 :         if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
     331           6 :             ereport(ERROR,
     332             :                     (errcode(ERRCODE_DUPLICATE_OBJECT),
     333             :                      errmsg("replication slot \"%s\" already exists", name)));
     334        8920 :         if (!s->in_use && slot == NULL)
     335        1052 :             slot = s;
     336             :     }
     337        1054 :     LWLockRelease(ReplicationSlotControlLock);
     338             : 
     339             :     /* If all slots are in use, we're out of luck. */
     340        1054 :     if (slot == NULL)
     341           2 :         ereport(ERROR,
     342             :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     343             :                  errmsg("all replication slots are in use"),
     344             :                  errhint("Free one or increase max_replication_slots.")));
     345             : 
     346             :     /*
     347             :      * Since this slot is not in use, nobody should be looking at any part of
     348             :      * it other than the in_use field unless they're trying to allocate it.
     349             :      * And since we hold ReplicationSlotAllocationLock, nobody except us can
     350             :      * be doing that.  So it's safe to initialize the slot.
     351             :      */
     352             :     Assert(!slot->in_use);
     353             :     Assert(slot->active_pid == 0);
     354             : 
     355             :     /* first initialize persistent data */
     356        1052 :     memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData));
     357        1052 :     namestrcpy(&slot->data.name, name);
     358        1052 :     slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
     359        1052 :     slot->data.persistency = persistency;
     360        1052 :     slot->data.two_phase = two_phase;
     361        1052 :     slot->data.two_phase_at = InvalidXLogRecPtr;
     362        1052 :     slot->data.failover = failover;
     363        1052 :     slot->data.synced = synced;
     364             : 
     365             :     /* and then data only present in shared memory */
     366        1052 :     slot->just_dirtied = false;
     367        1052 :     slot->dirty = false;
     368        1052 :     slot->effective_xmin = InvalidTransactionId;
     369        1052 :     slot->effective_catalog_xmin = InvalidTransactionId;
     370        1052 :     slot->candidate_catalog_xmin = InvalidTransactionId;
     371        1052 :     slot->candidate_xmin_lsn = InvalidXLogRecPtr;
     372        1052 :     slot->candidate_restart_valid = InvalidXLogRecPtr;
     373        1052 :     slot->candidate_restart_lsn = InvalidXLogRecPtr;
     374        1052 :     slot->last_saved_confirmed_flush = InvalidXLogRecPtr;
     375             : 
     376             :     /*
     377             :      * Create the slot on disk.  We haven't actually marked the slot allocated
     378             :      * yet, so no special cleanup is required if this errors out.
     379             :      */
     380        1052 :     CreateSlotOnDisk(slot);
     381             : 
     382             :     /*
     383             :      * We need to briefly prevent any other backend from iterating over the
     384             :      * slots while we flip the in_use flag. We also need to set the active
     385             :      * flag while holding the ControlLock as otherwise a concurrent
     386             :      * ReplicationSlotAcquire() could acquire the slot as well.
     387             :      */
     388        1052 :     LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
     389             : 
     390        1052 :     slot->in_use = true;
     391             : 
     392             :     /* We can now mark the slot active, and that makes it our slot. */
     393        1052 :     SpinLockAcquire(&slot->mutex);
     394             :     Assert(slot->active_pid == 0);
     395        1052 :     slot->active_pid = MyProcPid;
     396        1052 :     SpinLockRelease(&slot->mutex);
     397        1052 :     MyReplicationSlot = slot;
     398             : 
     399        1052 :     LWLockRelease(ReplicationSlotControlLock);
     400             : 
     401             :     /*
     402             :      * Create statistics entry for the new logical slot. We don't collect any
     403             :      * stats for physical slots, so no need to create an entry for the same.
     404             :      * See ReplicationSlotDropPtr for why we need to do this before releasing
     405             :      * ReplicationSlotAllocationLock.
     406             :      */
     407        1052 :     if (SlotIsLogical(slot))
     408         766 :         pgstat_create_replslot(slot);
     409             : 
     410             :     /*
     411             :      * Now that the slot has been marked as in_use and active, it's safe to
     412             :      * let somebody else try to allocate a slot.
     413             :      */
     414        1052 :     LWLockRelease(ReplicationSlotAllocationLock);
     415             : 
     416             :     /* Let everybody know we've modified this slot */
     417        1052 :     ConditionVariableBroadcast(&slot->active_cv);
     418        1052 : }
     419             : 
     420             : /*
     421             :  * Search for the named replication slot.
     422             :  *
     423             :  * Return the replication slot if found, otherwise NULL.
     424             :  */
     425             : ReplicationSlot *
     426        2178 : SearchNamedReplicationSlot(const char *name, bool need_lock)
     427             : {
     428             :     int         i;
     429        2178 :     ReplicationSlot *slot = NULL;
     430             : 
     431        2178 :     if (need_lock)
     432         166 :         LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     433             : 
     434        3562 :     for (i = 0; i < max_replication_slots; i++)
     435             :     {
     436        3526 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
     437             : 
     438        3526 :         if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
     439             :         {
     440        2142 :             slot = s;
     441        2142 :             break;
     442             :         }
     443             :     }
     444             : 
     445        2178 :     if (need_lock)
     446         166 :         LWLockRelease(ReplicationSlotControlLock);
     447             : 
     448        2178 :     return slot;
     449             : }
     450             : 
     451             : /*
     452             :  * Return the index of the replication slot in
     453             :  * ReplicationSlotCtl->replication_slots.
     454             :  *
     455             :  * This is mainly useful to have an efficient key for storing replication slot
     456             :  * stats.
     457             :  */
     458             : int
     459       13412 : ReplicationSlotIndex(ReplicationSlot *slot)
     460             : {
     461             :     Assert(slot >= ReplicationSlotCtl->replication_slots &&
     462             :            slot < ReplicationSlotCtl->replication_slots + max_replication_slots);
     463             : 
     464       13412 :     return slot - ReplicationSlotCtl->replication_slots;
     465             : }
     466             : 
     467             : /*
     468             :  * If the slot at 'index' is unused, return false. Otherwise 'name' is set to
     469             :  * the slot's name and true is returned.
     470             :  *
     471             :  * This likely is only useful for pgstat_replslot.c during shutdown, in other
     472             :  * cases there are obvious TOCTOU issues.
     473             :  */
     474             : bool
     475         116 : ReplicationSlotName(int index, Name name)
     476             : {
     477             :     ReplicationSlot *slot;
     478             :     bool        found;
     479             : 
     480         116 :     slot = &ReplicationSlotCtl->replication_slots[index];
     481             : 
     482             :     /*
     483             :      * Ensure that the slot cannot be dropped while we copy the name. Don't
     484             :      * need the spinlock as the name of an existing slot cannot change.
     485             :      */
     486         116 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     487         116 :     found = slot->in_use;
     488         116 :     if (slot->in_use)
     489         116 :         namestrcpy(name, NameStr(slot->data.name));
     490         116 :     LWLockRelease(ReplicationSlotControlLock);
     491             : 
     492         116 :     return found;
     493             : }
     494             : 
     495             : /*
     496             :  * Find a previously created slot and mark it as used by this process.
     497             :  *
     498             :  * An error is raised if nowait is true and the slot is currently in use. If
     499             :  * nowait is false, we sleep until the slot is released by the owning process.
     500             :  */
     501             : void
     502        2000 : ReplicationSlotAcquire(const char *name, bool nowait)
     503             : {
     504             :     ReplicationSlot *s;
     505             :     int         active_pid;
     506             : 
     507             :     Assert(name != NULL);
     508             : 
     509        2000 : retry:
     510             :     Assert(MyReplicationSlot == NULL);
     511             : 
     512        2000 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     513             : 
     514             :     /* Check if the slot exits with the given name. */
     515        2000 :     s = SearchNamedReplicationSlot(name, false);
     516        2000 :     if (s == NULL || !s->in_use)
     517             :     {
     518          16 :         LWLockRelease(ReplicationSlotControlLock);
     519             : 
     520          16 :         ereport(ERROR,
     521             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
     522             :                  errmsg("replication slot \"%s\" does not exist",
     523             :                         name)));
     524             :     }
     525             : 
     526             :     /*
     527             :      * This is the slot we want; check if it's active under some other
     528             :      * process.  In single user mode, we don't need this check.
     529             :      */
     530        1984 :     if (IsUnderPostmaster)
     531             :     {
     532             :         /*
     533             :          * Get ready to sleep on the slot in case it is active.  (We may end
     534             :          * up not sleeping, but we don't want to do this while holding the
     535             :          * spinlock.)
     536             :          */
     537        1984 :         if (!nowait)
     538         430 :             ConditionVariablePrepareToSleep(&s->active_cv);
     539             : 
     540        1984 :         SpinLockAcquire(&s->mutex);
     541        1984 :         if (s->active_pid == 0)
     542        1758 :             s->active_pid = MyProcPid;
     543        1984 :         active_pid = s->active_pid;
     544        1984 :         SpinLockRelease(&s->mutex);
     545             :     }
     546             :     else
     547           0 :         active_pid = MyProcPid;
     548        1984 :     LWLockRelease(ReplicationSlotControlLock);
     549             : 
     550             :     /*
     551             :      * If we found the slot but it's already active in another process, we
     552             :      * wait until the owning process signals us that it's been released, or
     553             :      * error out.
     554             :      */
     555        1984 :     if (active_pid != MyProcPid)
     556             :     {
     557           0 :         if (!nowait)
     558             :         {
     559             :             /* Wait here until we get signaled, and then restart */
     560           0 :             ConditionVariableSleep(&s->active_cv,
     561             :                                    WAIT_EVENT_REPLICATION_SLOT_DROP);
     562           0 :             ConditionVariableCancelSleep();
     563           0 :             goto retry;
     564             :         }
     565             : 
     566           0 :         ereport(ERROR,
     567             :                 (errcode(ERRCODE_OBJECT_IN_USE),
     568             :                  errmsg("replication slot \"%s\" is active for PID %d",
     569             :                         NameStr(s->data.name), active_pid)));
     570             :     }
     571        1984 :     else if (!nowait)
     572         430 :         ConditionVariableCancelSleep(); /* no sleep needed after all */
     573             : 
     574             :     /* Let everybody know we've modified this slot */
     575        1984 :     ConditionVariableBroadcast(&s->active_cv);
     576             : 
     577             :     /* We made this slot active, so it's ours now. */
     578        1984 :     MyReplicationSlot = s;
     579             : 
     580             :     /*
     581             :      * The call to pgstat_acquire_replslot() protects against stats for a
     582             :      * different slot, from before a restart or such, being present during
     583             :      * pgstat_report_replslot().
     584             :      */
     585        1984 :     if (SlotIsLogical(s))
     586        1682 :         pgstat_acquire_replslot(s);
     587             : 
     588        1984 :     if (am_walsender)
     589             :     {
     590        1346 :         ereport(log_replication_commands ? LOG : DEBUG1,
     591             :                 SlotIsLogical(s)
     592             :                 ? errmsg("acquired logical replication slot \"%s\"",
     593             :                          NameStr(s->data.name))
     594             :                 : errmsg("acquired physical replication slot \"%s\"",
     595             :                          NameStr(s->data.name)));
     596             :     }
     597        1984 : }
     598             : 
     599             : /*
     600             :  * Release the replication slot that this backend considers to own.
     601             :  *
     602             :  * This or another backend can re-acquire the slot later.
     603             :  * Resources this slot requires will be preserved.
     604             :  */
     605             : void
     606        2412 : ReplicationSlotRelease(void)
     607             : {
     608        2412 :     ReplicationSlot *slot = MyReplicationSlot;
     609        2412 :     char       *slotname = NULL;    /* keep compiler quiet */
     610        2412 :     bool        is_logical = false; /* keep compiler quiet */
     611             : 
     612             :     Assert(slot != NULL && slot->active_pid != 0);
     613             : 
     614        2412 :     if (am_walsender)
     615             :     {
     616        1690 :         slotname = pstrdup(NameStr(slot->data.name));
     617        1690 :         is_logical = SlotIsLogical(slot);
     618             :     }
     619             : 
     620        2412 :     if (slot->data.persistency == RS_EPHEMERAL)
     621             :     {
     622             :         /*
     623             :          * Delete the slot. There is no !PANIC case where this is allowed to
     624             :          * fail, all that may happen is an incomplete cleanup of the on-disk
     625             :          * data.
     626             :          */
     627          10 :         ReplicationSlotDropAcquired();
     628             :     }
     629             : 
     630             :     /*
     631             :      * If slot needed to temporarily restrain both data and catalog xmin to
     632             :      * create the catalog snapshot, remove that temporary constraint.
     633             :      * Snapshots can only be exported while the initial snapshot is still
     634             :      * acquired.
     635             :      */
     636        2412 :     if (!TransactionIdIsValid(slot->data.xmin) &&
     637        2398 :         TransactionIdIsValid(slot->effective_xmin))
     638             :     {
     639         336 :         SpinLockAcquire(&slot->mutex);
     640         336 :         slot->effective_xmin = InvalidTransactionId;
     641         336 :         SpinLockRelease(&slot->mutex);
     642         336 :         ReplicationSlotsComputeRequiredXmin(false);
     643             :     }
     644             : 
     645        2412 :     if (slot->data.persistency == RS_PERSISTENT)
     646             :     {
     647             :         /*
     648             :          * Mark persistent slot inactive.  We're not freeing it, just
     649             :          * disconnecting, but wake up others that may be waiting for it.
     650             :          */
     651        1946 :         SpinLockAcquire(&slot->mutex);
     652        1946 :         slot->active_pid = 0;
     653        1946 :         SpinLockRelease(&slot->mutex);
     654        1946 :         ConditionVariableBroadcast(&slot->active_cv);
     655             :     }
     656             : 
     657        2412 :     MyReplicationSlot = NULL;
     658             : 
     659             :     /* might not have been set when we've been a plain slot */
     660        2412 :     LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
     661        2412 :     MyProc->statusFlags &= ~PROC_IN_LOGICAL_DECODING;
     662        2412 :     ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags;
     663        2412 :     LWLockRelease(ProcArrayLock);
     664             : 
     665        2412 :     if (am_walsender)
     666             :     {
     667        1690 :         ereport(log_replication_commands ? LOG : DEBUG1,
     668             :                 is_logical
     669             :                 ? errmsg("released logical replication slot \"%s\"",
     670             :                          slotname)
     671             :                 : errmsg("released physical replication slot \"%s\"",
     672             :                          slotname));
     673             : 
     674        1690 :         pfree(slotname);
     675             :     }
     676        2412 : }
     677             : 
     678             : /*
     679             :  * Cleanup all temporary slots created in current session.
     680             :  */
     681             : void
     682       66998 : ReplicationSlotCleanup(void)
     683             : {
     684             :     int         i;
     685             : 
     686             :     Assert(MyReplicationSlot == NULL);
     687             : 
     688       66998 : restart:
     689       66998 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     690      725158 :     for (i = 0; i < max_replication_slots; i++)
     691             :     {
     692      658390 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
     693             : 
     694      658390 :         if (!s->in_use)
     695      637058 :             continue;
     696             : 
     697       21332 :         SpinLockAcquire(&s->mutex);
     698       21332 :         if (s->active_pid == MyProcPid)
     699             :         {
     700             :             Assert(s->data.persistency == RS_TEMPORARY);
     701         230 :             SpinLockRelease(&s->mutex);
     702         230 :             LWLockRelease(ReplicationSlotControlLock);  /* avoid deadlock */
     703             : 
     704         230 :             ReplicationSlotDropPtr(s);
     705             : 
     706         230 :             ConditionVariableBroadcast(&s->active_cv);
     707         230 :             goto restart;
     708             :         }
     709             :         else
     710       21102 :             SpinLockRelease(&s->mutex);
     711             :     }
     712             : 
     713       66768 :     LWLockRelease(ReplicationSlotControlLock);
     714       66768 : }
     715             : 
     716             : /*
     717             :  * Permanently drop replication slot identified by the passed in name.
     718             :  */
     719             : void
     720         658 : ReplicationSlotDrop(const char *name, bool nowait)
     721             : {
     722             :     Assert(MyReplicationSlot == NULL);
     723             : 
     724         658 :     ReplicationSlotAcquire(name, nowait);
     725             : 
     726             :     /*
     727             :      * Do not allow users to drop the slots which are currently being synced
     728             :      * from the primary to the standby.
     729             :      */
     730         648 :     if (RecoveryInProgress() && MyReplicationSlot->data.synced)
     731           2 :         ereport(ERROR,
     732             :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     733             :                 errmsg("cannot drop replication slot \"%s\"", name),
     734             :                 errdetail("This slot is being synced from the primary server."));
     735             : 
     736         646 :     ReplicationSlotDropAcquired();
     737         646 : }
     738             : 
     739             : /*
     740             :  * Change the definition of the slot identified by the specified name.
     741             :  */
     742             : void
     743          12 : ReplicationSlotAlter(const char *name, bool failover)
     744             : {
     745             :     Assert(MyReplicationSlot == NULL);
     746             : 
     747          12 :     ReplicationSlotAcquire(name, false);
     748             : 
     749          12 :     if (SlotIsPhysical(MyReplicationSlot))
     750           0 :         ereport(ERROR,
     751             :                 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     752             :                 errmsg("cannot use %s with a physical replication slot",
     753             :                        "ALTER_REPLICATION_SLOT"));
     754             : 
     755          12 :     if (RecoveryInProgress())
     756             :     {
     757             :         /*
     758             :          * Do not allow users to alter the slots which are currently being
     759             :          * synced from the primary to the standby.
     760             :          */
     761           2 :         if (MyReplicationSlot->data.synced)
     762           2 :             ereport(ERROR,
     763             :                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     764             :                     errmsg("cannot alter replication slot \"%s\"", name),
     765             :                     errdetail("This slot is being synced from the primary server."));
     766             : 
     767             :         /*
     768             :          * Do not allow users to enable failover on the standby as we do not
     769             :          * support sync to the cascading standby.
     770             :          */
     771           0 :         if (failover)
     772           0 :             ereport(ERROR,
     773             :                     errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     774             :                     errmsg("cannot enable failover for a replication slot"
     775             :                            " on the standby"));
     776             :     }
     777             : 
     778             :     /*
     779             :      * Do not allow users to enable failover for temporary slots as we do not
     780             :      * support syncing temporary slots to the standby.
     781             :      */
     782          10 :     if (failover && MyReplicationSlot->data.persistency == RS_TEMPORARY)
     783           0 :         ereport(ERROR,
     784             :                 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     785             :                 errmsg("cannot enable failover for a temporary replication slot"));
     786             : 
     787          10 :     if (MyReplicationSlot->data.failover != failover)
     788             :     {
     789           6 :         SpinLockAcquire(&MyReplicationSlot->mutex);
     790           6 :         MyReplicationSlot->data.failover = failover;
     791           6 :         SpinLockRelease(&MyReplicationSlot->mutex);
     792             : 
     793           6 :         ReplicationSlotMarkDirty();
     794           6 :         ReplicationSlotSave();
     795             :     }
     796             : 
     797          10 :     ReplicationSlotRelease();
     798          10 : }
     799             : 
     800             : /*
     801             :  * Permanently drop the currently acquired replication slot.
     802             :  */
     803             : void
     804         670 : ReplicationSlotDropAcquired(void)
     805             : {
     806         670 :     ReplicationSlot *slot = MyReplicationSlot;
     807             : 
     808             :     Assert(MyReplicationSlot != NULL);
     809             : 
     810             :     /* slot isn't acquired anymore */
     811         670 :     MyReplicationSlot = NULL;
     812             : 
     813         670 :     ReplicationSlotDropPtr(slot);
     814         670 : }
     815             : 
     816             : /*
     817             :  * Permanently drop the replication slot which will be released by the point
     818             :  * this function returns.
     819             :  */
     820             : static void
     821         900 : ReplicationSlotDropPtr(ReplicationSlot *slot)
     822             : {
     823             :     char        path[MAXPGPATH];
     824             :     char        tmppath[MAXPGPATH];
     825             : 
     826             :     /*
     827             :      * If some other backend ran this code concurrently with us, we might try
     828             :      * to delete a slot with a certain name while someone else was trying to
     829             :      * create a slot with the same name.
     830             :      */
     831         900 :     LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
     832             : 
     833             :     /* Generate pathnames. */
     834         900 :     sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
     835         900 :     sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));
     836             : 
     837             :     /*
     838             :      * Rename the slot directory on disk, so that we'll no longer recognize
     839             :      * this as a valid slot.  Note that if this fails, we've got to mark the
     840             :      * slot inactive before bailing out.  If we're dropping an ephemeral or a
     841             :      * temporary slot, we better never fail hard as the caller won't expect
     842             :      * the slot to survive and this might get called during error handling.
     843             :      */
     844         900 :     if (rename(path, tmppath) == 0)
     845             :     {
     846             :         /*
     847             :          * We need to fsync() the directory we just renamed and its parent to
     848             :          * make sure that our changes are on disk in a crash-safe fashion.  If
     849             :          * fsync() fails, we can't be sure whether the changes are on disk or
     850             :          * not.  For now, we handle that by panicking;
     851             :          * StartupReplicationSlots() will try to straighten it out after
     852             :          * restart.
     853             :          */
     854         900 :         START_CRIT_SECTION();
     855         900 :         fsync_fname(tmppath, true);
     856         900 :         fsync_fname("pg_replslot", true);
     857         900 :         END_CRIT_SECTION();
     858             :     }
     859             :     else
     860             :     {
     861           0 :         bool        fail_softly = slot->data.persistency != RS_PERSISTENT;
     862             : 
     863           0 :         SpinLockAcquire(&slot->mutex);
     864           0 :         slot->active_pid = 0;
     865           0 :         SpinLockRelease(&slot->mutex);
     866             : 
     867             :         /* wake up anyone waiting on this slot */
     868           0 :         ConditionVariableBroadcast(&slot->active_cv);
     869             : 
     870           0 :         ereport(fail_softly ? WARNING : ERROR,
     871             :                 (errcode_for_file_access(),
     872             :                  errmsg("could not rename file \"%s\" to \"%s\": %m",
     873             :                         path, tmppath)));
     874             :     }
     875             : 
     876             :     /*
     877             :      * The slot is definitely gone.  Lock out concurrent scans of the array
     878             :      * long enough to kill it.  It's OK to clear the active PID here without
     879             :      * grabbing the mutex because nobody else can be scanning the array here,
     880             :      * and nobody can be attached to this slot and thus access it without
     881             :      * scanning the array.
     882             :      *
     883             :      * Also wake up processes waiting for it.
     884             :      */
     885         900 :     LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
     886         900 :     slot->active_pid = 0;
     887         900 :     slot->in_use = false;
     888         900 :     LWLockRelease(ReplicationSlotControlLock);
     889         900 :     ConditionVariableBroadcast(&slot->active_cv);
     890             : 
     891             :     /*
     892             :      * Slot is dead and doesn't prevent resource removal anymore, recompute
     893             :      * limits.
     894             :      */
     895         900 :     ReplicationSlotsComputeRequiredXmin(false);
     896         900 :     ReplicationSlotsComputeRequiredLSN();
     897             : 
     898             :     /*
     899             :      * If removing the directory fails, the worst thing that will happen is
     900             :      * that the user won't be able to create a new slot with the same name
     901             :      * until the next server restart.  We warn about it, but that's all.
     902             :      */
     903         900 :     if (!rmtree(tmppath, true))
     904           0 :         ereport(WARNING,
     905             :                 (errmsg("could not remove directory \"%s\"", tmppath)));
     906             : 
     907             :     /*
     908             :      * Drop the statistics entry for the replication slot.  Do this while
     909             :      * holding ReplicationSlotAllocationLock so that we don't drop a
     910             :      * statistics entry for another slot with the same name just created in
     911             :      * another session.
     912             :      */
     913         900 :     if (SlotIsLogical(slot))
     914         656 :         pgstat_drop_replslot(slot);
     915             : 
     916             :     /*
     917             :      * We release this at the very end, so that nobody starts trying to create
     918             :      * a slot while we're still cleaning up the detritus of the old one.
     919             :      */
     920         900 :     LWLockRelease(ReplicationSlotAllocationLock);
     921         900 : }
     922             : 
     923             : /*
     924             :  * Serialize the currently acquired slot's state from memory to disk, thereby
     925             :  * guaranteeing the current state will survive a crash.
     926             :  */
     927             : void
     928        2106 : ReplicationSlotSave(void)
     929             : {
     930             :     char        path[MAXPGPATH];
     931             : 
     932             :     Assert(MyReplicationSlot != NULL);
     933             : 
     934        2106 :     sprintf(path, "pg_replslot/%s", NameStr(MyReplicationSlot->data.name));
     935        2106 :     SaveSlotToPath(MyReplicationSlot, path, ERROR);
     936        2106 : }
     937             : 
     938             : /*
     939             :  * Signal that it would be useful if the currently acquired slot would be
     940             :  * flushed out to disk.
     941             :  *
     942             :  * Note that the actual flush to disk can be delayed for a long time, if
     943             :  * required for correctness explicitly do a ReplicationSlotSave().
     944             :  */
     945             : void
     946       38488 : ReplicationSlotMarkDirty(void)
     947             : {
     948       38488 :     ReplicationSlot *slot = MyReplicationSlot;
     949             : 
     950             :     Assert(MyReplicationSlot != NULL);
     951             : 
     952       38488 :     SpinLockAcquire(&slot->mutex);
     953       38488 :     MyReplicationSlot->just_dirtied = true;
     954       38488 :     MyReplicationSlot->dirty = true;
     955       38488 :     SpinLockRelease(&slot->mutex);
     956       38488 : }
     957             : 
     958             : /*
     959             :  * Convert a slot that's marked as RS_EPHEMERAL or RS_TEMPORARY to a
     960             :  * RS_PERSISTENT slot, guaranteeing it will be there after an eventual crash.
     961             :  */
     962             : void
     963         740 : ReplicationSlotPersist(void)
     964             : {
     965         740 :     ReplicationSlot *slot = MyReplicationSlot;
     966             : 
     967             :     Assert(slot != NULL);
     968             :     Assert(slot->data.persistency != RS_PERSISTENT);
     969             : 
     970         740 :     SpinLockAcquire(&slot->mutex);
     971         740 :     slot->data.persistency = RS_PERSISTENT;
     972         740 :     SpinLockRelease(&slot->mutex);
     973             : 
     974         740 :     ReplicationSlotMarkDirty();
     975         740 :     ReplicationSlotSave();
     976         740 : }
     977             : 
     978             : /*
     979             :  * Compute the oldest xmin across all slots and store it in the ProcArray.
     980             :  *
     981             :  * If already_locked is true, ProcArrayLock has already been acquired
     982             :  * exclusively.
     983             :  */
     984             : void
     985        3638 : ReplicationSlotsComputeRequiredXmin(bool already_locked)
     986             : {
     987             :     int         i;
     988        3638 :     TransactionId agg_xmin = InvalidTransactionId;
     989        3638 :     TransactionId agg_catalog_xmin = InvalidTransactionId;
     990             : 
     991             :     Assert(ReplicationSlotCtl != NULL);
     992             : 
     993        3638 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     994             : 
     995       36280 :     for (i = 0; i < max_replication_slots; i++)
     996             :     {
     997       32642 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
     998             :         TransactionId effective_xmin;
     999             :         TransactionId effective_catalog_xmin;
    1000             :         bool        invalidated;
    1001             : 
    1002       32642 :         if (!s->in_use)
    1003       29440 :             continue;
    1004             : 
    1005        3202 :         SpinLockAcquire(&s->mutex);
    1006        3202 :         effective_xmin = s->effective_xmin;
    1007        3202 :         effective_catalog_xmin = s->effective_catalog_xmin;
    1008        3202 :         invalidated = s->data.invalidated != RS_INVAL_NONE;
    1009        3202 :         SpinLockRelease(&s->mutex);
    1010             : 
    1011             :         /* invalidated slots need not apply */
    1012        3202 :         if (invalidated)
    1013          42 :             continue;
    1014             : 
    1015             :         /* check the data xmin */
    1016        3160 :         if (TransactionIdIsValid(effective_xmin) &&
    1017           0 :             (!TransactionIdIsValid(agg_xmin) ||
    1018           0 :              TransactionIdPrecedes(effective_xmin, agg_xmin)))
    1019         458 :             agg_xmin = effective_xmin;
    1020             : 
    1021             :         /* check the catalog xmin */
    1022        3160 :         if (TransactionIdIsValid(effective_catalog_xmin) &&
    1023        1260 :             (!TransactionIdIsValid(agg_catalog_xmin) ||
    1024        1260 :              TransactionIdPrecedes(effective_catalog_xmin, agg_catalog_xmin)))
    1025        1780 :             agg_catalog_xmin = effective_catalog_xmin;
    1026             :     }
    1027             : 
    1028        3638 :     LWLockRelease(ReplicationSlotControlLock);
    1029             : 
    1030        3638 :     ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
    1031        3638 : }
    1032             : 
    1033             : /*
    1034             :  * Compute the oldest restart LSN across all slots and inform xlog module.
    1035             :  *
    1036             :  * Note: while max_slot_wal_keep_size is theoretically relevant for this
    1037             :  * purpose, we don't try to account for that, because this module doesn't
    1038             :  * know what to compare against.
    1039             :  */
    1040             : void
    1041       39212 : ReplicationSlotsComputeRequiredLSN(void)
    1042             : {
    1043             :     int         i;
    1044       39212 :     XLogRecPtr  min_required = InvalidXLogRecPtr;
    1045             : 
    1046             :     Assert(ReplicationSlotCtl != NULL);
    1047             : 
    1048       39212 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    1049      425184 :     for (i = 0; i < max_replication_slots; i++)
    1050             :     {
    1051      385972 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
    1052             :         XLogRecPtr  restart_lsn;
    1053             :         bool        invalidated;
    1054             : 
    1055      385972 :         if (!s->in_use)
    1056      347480 :             continue;
    1057             : 
    1058       38492 :         SpinLockAcquire(&s->mutex);
    1059       38492 :         restart_lsn = s->data.restart_lsn;
    1060       38492 :         invalidated = s->data.invalidated != RS_INVAL_NONE;
    1061       38492 :         SpinLockRelease(&s->mutex);
    1062             : 
    1063             :         /* invalidated slots need not apply */
    1064       38492 :         if (invalidated)
    1065          44 :             continue;
    1066             : 
    1067       38448 :         if (restart_lsn != InvalidXLogRecPtr &&
    1068        1000 :             (min_required == InvalidXLogRecPtr ||
    1069             :              restart_lsn < min_required))
    1070       37476 :             min_required = restart_lsn;
    1071             :     }
    1072       39212 :     LWLockRelease(ReplicationSlotControlLock);
    1073             : 
    1074       39212 :     XLogSetReplicationSlotMinimumLSN(min_required);
    1075       39212 : }
    1076             : 
    1077             : /*
    1078             :  * Compute the oldest WAL LSN required by *logical* decoding slots..
    1079             :  *
    1080             :  * Returns InvalidXLogRecPtr if logical decoding is disabled or no logical
    1081             :  * slots exist.
    1082             :  *
    1083             :  * NB: this returns a value >= ReplicationSlotsComputeRequiredLSN(), since it
    1084             :  * ignores physical replication slots.
    1085             :  *
    1086             :  * The results aren't required frequently, so we don't maintain a precomputed
    1087             :  * value like we do for ComputeRequiredLSN() and ComputeRequiredXmin().
    1088             :  */
    1089             : XLogRecPtr
    1090        3148 : ReplicationSlotsComputeLogicalRestartLSN(void)
    1091             : {
    1092        3148 :     XLogRecPtr  result = InvalidXLogRecPtr;
    1093             :     int         i;
    1094             : 
    1095        3148 :     if (max_replication_slots <= 0)
    1096           4 :         return InvalidXLogRecPtr;
    1097             : 
    1098        3144 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    1099             : 
    1100       33664 :     for (i = 0; i < max_replication_slots; i++)
    1101             :     {
    1102             :         ReplicationSlot *s;
    1103             :         XLogRecPtr  restart_lsn;
    1104             :         bool        invalidated;
    1105             : 
    1106       30520 :         s = &ReplicationSlotCtl->replication_slots[i];
    1107             : 
    1108             :         /* cannot change while ReplicationSlotCtlLock is held */
    1109       30520 :         if (!s->in_use)
    1110       29944 :             continue;
    1111             : 
    1112             :         /* we're only interested in logical slots */
    1113         576 :         if (!SlotIsLogical(s))
    1114         280 :             continue;
    1115             : 
    1116             :         /* read once, it's ok if it increases while we're checking */
    1117         296 :         SpinLockAcquire(&s->mutex);
    1118         296 :         restart_lsn = s->data.restart_lsn;
    1119         296 :         invalidated = s->data.invalidated != RS_INVAL_NONE;
    1120         296 :         SpinLockRelease(&s->mutex);
    1121             : 
    1122             :         /* invalidated slots need not apply */
    1123         296 :         if (invalidated)
    1124           8 :             continue;
    1125             : 
    1126         288 :         if (restart_lsn == InvalidXLogRecPtr)
    1127           0 :             continue;
    1128             : 
    1129         288 :         if (result == InvalidXLogRecPtr ||
    1130             :             restart_lsn < result)
    1131         232 :             result = restart_lsn;
    1132             :     }
    1133             : 
    1134        3144 :     LWLockRelease(ReplicationSlotControlLock);
    1135             : 
    1136        3144 :     return result;
    1137             : }
    1138             : 
    1139             : /*
    1140             :  * ReplicationSlotsCountDBSlots -- count the number of slots that refer to the
    1141             :  * passed database oid.
    1142             :  *
    1143             :  * Returns true if there are any slots referencing the database. *nslots will
    1144             :  * be set to the absolute number of slots in the database, *nactive to ones
    1145             :  * currently active.
    1146             :  */
    1147             : bool
    1148          60 : ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
    1149             : {
    1150             :     int         i;
    1151             : 
    1152          60 :     *nslots = *nactive = 0;
    1153             : 
    1154          60 :     if (max_replication_slots <= 0)
    1155           0 :         return false;
    1156             : 
    1157          60 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    1158         602 :     for (i = 0; i < max_replication_slots; i++)
    1159             :     {
    1160             :         ReplicationSlot *s;
    1161             : 
    1162         542 :         s = &ReplicationSlotCtl->replication_slots[i];
    1163             : 
    1164             :         /* cannot change while ReplicationSlotCtlLock is held */
    1165         542 :         if (!s->in_use)
    1166         514 :             continue;
    1167             : 
    1168             :         /* only logical slots are database specific, skip */
    1169          28 :         if (!SlotIsLogical(s))
    1170          16 :             continue;
    1171             : 
    1172             :         /* not our database, skip */
    1173          12 :         if (s->data.database != dboid)
    1174           6 :             continue;
    1175             : 
    1176             :         /* NB: intentionally counting invalidated slots */
    1177             : 
    1178             :         /* count slots with spinlock held */
    1179           6 :         SpinLockAcquire(&s->mutex);
    1180           6 :         (*nslots)++;
    1181           6 :         if (s->active_pid != 0)
    1182           2 :             (*nactive)++;
    1183           6 :         SpinLockRelease(&s->mutex);
    1184             :     }
    1185          60 :     LWLockRelease(ReplicationSlotControlLock);
    1186             : 
    1187          60 :     if (*nslots > 0)
    1188           6 :         return true;
    1189          54 :     return false;
    1190             : }
    1191             : 
    1192             : /*
    1193             :  * ReplicationSlotsDropDBSlots -- Drop all db-specific slots relating to the
    1194             :  * passed database oid. The caller should hold an exclusive lock on the
    1195             :  * pg_database oid for the database to prevent creation of new slots on the db
    1196             :  * or replay from existing slots.
    1197             :  *
    1198             :  * Another session that concurrently acquires an existing slot on the target DB
    1199             :  * (most likely to drop it) may cause this function to ERROR. If that happens
    1200             :  * it may have dropped some but not all slots.
    1201             :  *
    1202             :  * This routine isn't as efficient as it could be - but we don't drop
    1203             :  * databases often, especially databases with lots of slots.
    1204             :  */
    1205             : void
    1206          78 : ReplicationSlotsDropDBSlots(Oid dboid)
    1207             : {
    1208             :     int         i;
    1209             : 
    1210          78 :     if (max_replication_slots <= 0)
    1211           0 :         return;
    1212             : 
    1213          78 : restart:
    1214          88 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    1215         794 :     for (i = 0; i < max_replication_slots; i++)
    1216             :     {
    1217             :         ReplicationSlot *s;
    1218             :         char       *slotname;
    1219             :         int         active_pid;
    1220             : 
    1221         716 :         s = &ReplicationSlotCtl->replication_slots[i];
    1222             : 
    1223             :         /* cannot change while ReplicationSlotCtlLock is held */
    1224         716 :         if (!s->in_use)
    1225         676 :             continue;
    1226             : 
    1227             :         /* only logical slots are database specific, skip */
    1228          40 :         if (!SlotIsLogical(s))
    1229          16 :             continue;
    1230             : 
    1231             :         /* not our database, skip */
    1232          24 :         if (s->data.database != dboid)
    1233          14 :             continue;
    1234             : 
    1235             :         /* NB: intentionally including invalidated slots */
    1236             : 
    1237             :         /* acquire slot, so ReplicationSlotDropAcquired can be reused  */
    1238          10 :         SpinLockAcquire(&s->mutex);
    1239             :         /* can't change while ReplicationSlotControlLock is held */
    1240          10 :         slotname = NameStr(s->data.name);
    1241          10 :         active_pid = s->active_pid;
    1242          10 :         if (active_pid == 0)
    1243             :         {
    1244          10 :             MyReplicationSlot = s;
    1245          10 :             s->active_pid = MyProcPid;
    1246             :         }
    1247          10 :         SpinLockRelease(&s->mutex);
    1248             : 
    1249             :         /*
    1250             :          * Even though we hold an exclusive lock on the database object a
    1251             :          * logical slot for that DB can still be active, e.g. if it's
    1252             :          * concurrently being dropped by a backend connected to another DB.
    1253             :          *
    1254             :          * That's fairly unlikely in practice, so we'll just bail out.
    1255             :          */
    1256          10 :         if (active_pid)
    1257           0 :             ereport(ERROR,
    1258             :                     (errcode(ERRCODE_OBJECT_IN_USE),
    1259             :                      errmsg("replication slot \"%s\" is active for PID %d",
    1260             :                             slotname, active_pid)));
    1261             : 
    1262             :         /*
    1263             :          * To avoid duplicating ReplicationSlotDropAcquired() and to avoid
    1264             :          * holding ReplicationSlotControlLock over filesystem operations,
    1265             :          * release ReplicationSlotControlLock and use
    1266             :          * ReplicationSlotDropAcquired.
    1267             :          *
    1268             :          * As that means the set of slots could change, restart scan from the
    1269             :          * beginning each time we release the lock.
    1270             :          */
    1271          10 :         LWLockRelease(ReplicationSlotControlLock);
    1272          10 :         ReplicationSlotDropAcquired();
    1273          10 :         goto restart;
    1274             :     }
    1275          78 :     LWLockRelease(ReplicationSlotControlLock);
    1276             : }
    1277             : 
    1278             : 
    1279             : /*
    1280             :  * Check whether the server's configuration supports using replication
    1281             :  * slots.
    1282             :  */
    1283             : void
    1284        2872 : CheckSlotRequirements(void)
    1285             : {
    1286             :     /*
    1287             :      * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
    1288             :      * needs the same check.
    1289             :      */
    1290             : 
    1291        2872 :     if (max_replication_slots == 0)
    1292           0 :         ereport(ERROR,
    1293             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1294             :                  errmsg("replication slots can only be used if max_replication_slots > 0")));
    1295             : 
    1296        2872 :     if (wal_level < WAL_LEVEL_REPLICA)
    1297           0 :         ereport(ERROR,
    1298             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1299             :                  errmsg("replication slots can only be used if wal_level >= replica")));
    1300        2872 : }
    1301             : 
    1302             : /*
    1303             :  * Check whether the user has privilege to use replication slots.
    1304             :  */
    1305             : void
    1306         952 : CheckSlotPermissions(void)
    1307             : {
    1308         952 :     if (!has_rolreplication(GetUserId()))
    1309          10 :         ereport(ERROR,
    1310             :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    1311             :                  errmsg("permission denied to use replication slots"),
    1312             :                  errdetail("Only roles with the %s attribute may use replication slots.",
    1313             :                            "REPLICATION")));
    1314         942 : }
    1315             : 
    1316             : /*
    1317             :  * Reserve WAL for the currently active slot.
    1318             :  *
    1319             :  * Compute and set restart_lsn in a manner that's appropriate for the type of
    1320             :  * the slot and concurrency safe.
    1321             :  */
    1322             : void
    1323         986 : ReplicationSlotReserveWal(void)
    1324             : {
    1325         986 :     ReplicationSlot *slot = MyReplicationSlot;
    1326             : 
    1327             :     Assert(slot != NULL);
    1328             :     Assert(slot->data.restart_lsn == InvalidXLogRecPtr);
    1329             : 
    1330             :     /*
    1331             :      * The replication slot mechanism is used to prevent removal of required
    1332             :      * WAL. As there is no interlock between this routine and checkpoints, WAL
    1333             :      * segments could concurrently be removed when a now stale return value of
    1334             :      * ReplicationSlotsComputeRequiredLSN() is used. In the unlikely case that
    1335             :      * this happens we'll just retry.
    1336             :      */
    1337             :     while (true)
    1338           0 :     {
    1339             :         XLogSegNo   segno;
    1340             :         XLogRecPtr  restart_lsn;
    1341             : 
    1342             :         /*
    1343             :          * For logical slots log a standby snapshot and start logical decoding
    1344             :          * at exactly that position. That allows the slot to start up more
    1345             :          * quickly. But on a standby we cannot do WAL writes, so just use the
    1346             :          * replay pointer; effectively, an attempt to create a logical slot on
    1347             :          * standby will cause it to wait for an xl_running_xact record to be
    1348             :          * logged independently on the primary, so that a snapshot can be
    1349             :          * built using the record.
    1350             :          *
    1351             :          * None of this is needed (or indeed helpful) for physical slots as
    1352             :          * they'll start replay at the last logged checkpoint anyway. Instead
    1353             :          * return the location of the last redo LSN. While that slightly
    1354             :          * increases the chance that we have to retry, it's where a base
    1355             :          * backup has to start replay at.
    1356             :          */
    1357         986 :         if (SlotIsPhysical(slot))
    1358         242 :             restart_lsn = GetRedoRecPtr();
    1359         744 :         else if (RecoveryInProgress())
    1360          44 :             restart_lsn = GetXLogReplayRecPtr(NULL);
    1361             :         else
    1362         700 :             restart_lsn = GetXLogInsertRecPtr();
    1363             : 
    1364         986 :         SpinLockAcquire(&slot->mutex);
    1365         986 :         slot->data.restart_lsn = restart_lsn;
    1366         986 :         SpinLockRelease(&slot->mutex);
    1367             : 
    1368             :         /* prevent WAL removal as fast as possible */
    1369         986 :         ReplicationSlotsComputeRequiredLSN();
    1370             : 
    1371             :         /*
    1372             :          * If all required WAL is still there, great, otherwise retry. The
    1373             :          * slot should prevent further removal of WAL, unless there's a
    1374             :          * concurrent ReplicationSlotsComputeRequiredLSN() after we've written
    1375             :          * the new restart_lsn above, so normally we should never need to loop
    1376             :          * more than twice.
    1377             :          */
    1378         986 :         XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size);
    1379         986 :         if (XLogGetLastRemovedSegno() < segno)
    1380         986 :             break;
    1381             :     }
    1382             : 
    1383         986 :     if (!RecoveryInProgress() && SlotIsLogical(slot))
    1384             :     {
    1385             :         XLogRecPtr  flushptr;
    1386             : 
    1387             :         /* make sure we have enough information to start */
    1388         700 :         flushptr = LogStandbySnapshot();
    1389             : 
    1390             :         /* and make sure it's fsynced to disk */
    1391         700 :         XLogFlush(flushptr);
    1392             :     }
    1393         986 : }
    1394             : 
    1395             : /*
    1396             :  * Report that replication slot needs to be invalidated
    1397             :  */
    1398             : static void
    1399          40 : ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
    1400             :                        bool terminating,
    1401             :                        int pid,
    1402             :                        NameData slotname,
    1403             :                        XLogRecPtr restart_lsn,
    1404             :                        XLogRecPtr oldestLSN,
    1405             :                        TransactionId snapshotConflictHorizon)
    1406             : {
    1407             :     StringInfoData err_detail;
    1408          40 :     bool        hint = false;
    1409             : 
    1410          40 :     initStringInfo(&err_detail);
    1411             : 
    1412          40 :     switch (cause)
    1413             :     {
    1414          10 :         case RS_INVAL_WAL_REMOVED:
    1415             :             {
    1416          10 :                 unsigned long long ex = oldestLSN - restart_lsn;
    1417             : 
    1418          10 :                 hint = true;
    1419          10 :                 appendStringInfo(&err_detail,
    1420          10 :                                  ngettext("The slot's restart_lsn %X/%X exceeds the limit by %llu byte.",
    1421             :                                           "The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.",
    1422             :                                           ex),
    1423          10 :                                  LSN_FORMAT_ARGS(restart_lsn),
    1424             :                                  ex);
    1425          10 :                 break;
    1426             :             }
    1427          24 :         case RS_INVAL_HORIZON:
    1428          24 :             appendStringInfo(&err_detail, _("The slot conflicted with xid horizon %u."),
    1429             :                              snapshotConflictHorizon);
    1430          24 :             break;
    1431             : 
    1432           6 :         case RS_INVAL_WAL_LEVEL:
    1433           6 :             appendStringInfoString(&err_detail, _("Logical decoding on standby requires wal_level >= logical on the primary server."));
    1434           6 :             break;
    1435             :         case RS_INVAL_NONE:
    1436             :             pg_unreachable();
    1437             :     }
    1438             : 
    1439          40 :     ereport(LOG,
    1440             :             terminating ?
    1441             :             errmsg("terminating process %d to release replication slot \"%s\"",
    1442             :                    pid, NameStr(slotname)) :
    1443             :             errmsg("invalidating obsolete replication slot \"%s\"",
    1444             :                    NameStr(slotname)),
    1445             :             errdetail_internal("%s", err_detail.data),
    1446             :             hint ? errhint("You might need to increase %s.", "max_slot_wal_keep_size") : 0);
    1447             : 
    1448          40 :     pfree(err_detail.data);
    1449          40 : }
    1450             : 
    1451             : /*
    1452             :  * Helper for InvalidateObsoleteReplicationSlots
    1453             :  *
    1454             :  * Acquires the given slot and mark it invalid, if necessary and possible.
    1455             :  *
    1456             :  * Returns whether ReplicationSlotControlLock was released in the interim (and
    1457             :  * in that case we're not holding the lock at return, otherwise we are).
    1458             :  *
    1459             :  * Sets *invalidated true if the slot was invalidated. (Untouched otherwise.)
    1460             :  *
    1461             :  * This is inherently racy, because we release the LWLock
    1462             :  * for syscalls, so caller must restart if we return true.
    1463             :  */
    1464             : static bool
    1465         396 : InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
    1466             :                                ReplicationSlot *s,
    1467             :                                XLogRecPtr oldestLSN,
    1468             :                                Oid dboid, TransactionId snapshotConflictHorizon,
    1469             :                                bool *invalidated)
    1470             : {
    1471         396 :     int         last_signaled_pid = 0;
    1472         396 :     bool        released_lock = false;
    1473         396 :     bool        terminated = false;
    1474         396 :     XLogRecPtr  initial_effective_xmin = InvalidXLogRecPtr;
    1475         396 :     XLogRecPtr  initial_catalog_effective_xmin = InvalidXLogRecPtr;
    1476         396 :     XLogRecPtr  initial_restart_lsn = InvalidXLogRecPtr;
    1477         396 :     ReplicationSlotInvalidationCause conflict_prev PG_USED_FOR_ASSERTS_ONLY = RS_INVAL_NONE;
    1478             : 
    1479             :     for (;;)
    1480          14 :     {
    1481             :         XLogRecPtr  restart_lsn;
    1482             :         NameData    slotname;
    1483         410 :         int         active_pid = 0;
    1484         410 :         ReplicationSlotInvalidationCause conflict = RS_INVAL_NONE;
    1485             : 
    1486             :         Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
    1487             : 
    1488         410 :         if (!s->in_use)
    1489             :         {
    1490           0 :             if (released_lock)
    1491           0 :                 LWLockRelease(ReplicationSlotControlLock);
    1492           0 :             break;
    1493             :         }
    1494             : 
    1495             :         /*
    1496             :          * Check if the slot needs to be invalidated. If it needs to be
    1497             :          * invalidated, and is not currently acquired, acquire it and mark it
    1498             :          * as having been invalidated.  We do this with the spinlock held to
    1499             :          * avoid race conditions -- for example the restart_lsn could move
    1500             :          * forward, or the slot could be dropped.
    1501             :          */
    1502         410 :         SpinLockAcquire(&s->mutex);
    1503             : 
    1504         410 :         restart_lsn = s->data.restart_lsn;
    1505             : 
    1506             :         /*
    1507             :          * If the slot is already invalid or is a non conflicting slot, we
    1508             :          * don't need to do anything.
    1509             :          */
    1510         410 :         if (s->data.invalidated == RS_INVAL_NONE)
    1511             :         {
    1512             :             /*
    1513             :              * The slot's mutex will be released soon, and it is possible that
    1514             :              * those values change since the process holding the slot has been
    1515             :              * terminated (if any), so record them here to ensure that we
    1516             :              * would report the correct conflict cause.
    1517             :              */
    1518         334 :             if (!terminated)
    1519             :             {
    1520         320 :                 initial_restart_lsn = s->data.restart_lsn;
    1521         320 :                 initial_effective_xmin = s->effective_xmin;
    1522         320 :                 initial_catalog_effective_xmin = s->effective_catalog_xmin;
    1523             :             }
    1524             : 
    1525         334 :             switch (cause)
    1526             :             {
    1527         280 :                 case RS_INVAL_WAL_REMOVED:
    1528         280 :                     if (initial_restart_lsn != InvalidXLogRecPtr &&
    1529             :                         initial_restart_lsn < oldestLSN)
    1530          10 :                         conflict = cause;
    1531         280 :                     break;
    1532          48 :                 case RS_INVAL_HORIZON:
    1533          48 :                     if (!SlotIsLogical(s))
    1534           0 :                         break;
    1535             :                     /* invalid DB oid signals a shared relation */
    1536          48 :                     if (dboid != InvalidOid && dboid != s->data.database)
    1537           0 :                         break;
    1538          48 :                     if (TransactionIdIsValid(initial_effective_xmin) &&
    1539           0 :                         TransactionIdPrecedesOrEquals(initial_effective_xmin,
    1540             :                                                       snapshotConflictHorizon))
    1541           0 :                         conflict = cause;
    1542          96 :                     else if (TransactionIdIsValid(initial_catalog_effective_xmin) &&
    1543          48 :                              TransactionIdPrecedesOrEquals(initial_catalog_effective_xmin,
    1544             :                                                            snapshotConflictHorizon))
    1545          24 :                         conflict = cause;
    1546          48 :                     break;
    1547           6 :                 case RS_INVAL_WAL_LEVEL:
    1548           6 :                     if (SlotIsLogical(s))
    1549           6 :                         conflict = cause;
    1550           6 :                     break;
    1551             :                 case RS_INVAL_NONE:
    1552             :                     pg_unreachable();
    1553             :             }
    1554          76 :         }
    1555             : 
    1556             :         /*
    1557             :          * The conflict cause recorded previously should not change while the
    1558             :          * process owning the slot (if any) has been terminated.
    1559             :          */
    1560             :         Assert(!(conflict_prev != RS_INVAL_NONE && terminated &&
    1561             :                  conflict_prev != conflict));
    1562             : 
    1563             :         /* if there's no conflict, we're done */
    1564         410 :         if (conflict == RS_INVAL_NONE)
    1565             :         {
    1566         370 :             SpinLockRelease(&s->mutex);
    1567         370 :             if (released_lock)
    1568           0 :                 LWLockRelease(ReplicationSlotControlLock);
    1569         370 :             break;
    1570             :         }
    1571             : 
    1572          40 :         slotname = s->data.name;
    1573          40 :         active_pid = s->active_pid;
    1574             : 
    1575             :         /*
    1576             :          * If the slot can be acquired, do so and mark it invalidated
    1577             :          * immediately.  Otherwise we'll signal the owning process, below, and
    1578             :          * retry.
    1579             :          */
    1580          40 :         if (active_pid == 0)
    1581             :         {
    1582          26 :             MyReplicationSlot = s;
    1583          26 :             s->active_pid = MyProcPid;
    1584          26 :             s->data.invalidated = conflict;
    1585             : 
    1586             :             /*
    1587             :              * XXX: We should consider not overwriting restart_lsn and instead
    1588             :              * just rely on .invalidated.
    1589             :              */
    1590          26 :             if (conflict == RS_INVAL_WAL_REMOVED)
    1591           6 :                 s->data.restart_lsn = InvalidXLogRecPtr;
    1592             : 
    1593             :             /* Let caller know */
    1594          26 :             *invalidated = true;
    1595             :         }
    1596             : 
    1597          40 :         SpinLockRelease(&s->mutex);
    1598             : 
    1599             :         /*
    1600             :          * The logical replication slots shouldn't be invalidated as GUC
    1601             :          * max_slot_wal_keep_size is set to -1 during the binary upgrade. See
    1602             :          * check_old_cluster_for_valid_slots() where we ensure that no
    1603             :          * invalidated before the upgrade.
    1604             :          */
    1605             :         Assert(!(*invalidated && SlotIsLogical(s) && IsBinaryUpgrade));
    1606             : 
    1607          40 :         if (active_pid != 0)
    1608             :         {
    1609             :             /*
    1610             :              * Prepare the sleep on the slot's condition variable before
    1611             :              * releasing the lock, to close a possible race condition if the
    1612             :              * slot is released before the sleep below.
    1613             :              */
    1614          14 :             ConditionVariablePrepareToSleep(&s->active_cv);
    1615             : 
    1616          14 :             LWLockRelease(ReplicationSlotControlLock);
    1617          14 :             released_lock = true;
    1618             : 
    1619             :             /*
    1620             :              * Signal to terminate the process that owns the slot, if we
    1621             :              * haven't already signalled it.  (Avoidance of repeated
    1622             :              * signalling is the only reason for there to be a loop in this
    1623             :              * routine; otherwise we could rely on caller's restart loop.)
    1624             :              *
    1625             :              * There is the race condition that other process may own the slot
    1626             :              * after its current owner process is terminated and before this
    1627             :              * process owns it. To handle that, we signal only if the PID of
    1628             :              * the owning process has changed from the previous time. (This
    1629             :              * logic assumes that the same PID is not reused very quickly.)
    1630             :              */
    1631          14 :             if (last_signaled_pid != active_pid)
    1632             :             {
    1633          14 :                 ReportSlotInvalidation(conflict, true, active_pid,
    1634             :                                        slotname, restart_lsn,
    1635             :                                        oldestLSN, snapshotConflictHorizon);
    1636             : 
    1637          14 :                 if (MyBackendType == B_STARTUP)
    1638          10 :                     (void) SendProcSignal(active_pid,
    1639             :                                           PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT,
    1640             :                                           InvalidBackendId);
    1641             :                 else
    1642           4 :                     (void) kill(active_pid, SIGTERM);
    1643             : 
    1644          14 :                 last_signaled_pid = active_pid;
    1645          14 :                 terminated = true;
    1646          14 :                 conflict_prev = conflict;
    1647             :             }
    1648             : 
    1649             :             /* Wait until the slot is released. */
    1650          14 :             ConditionVariableSleep(&s->active_cv,
    1651             :                                    WAIT_EVENT_REPLICATION_SLOT_DROP);
    1652             : 
    1653             :             /*
    1654             :              * Re-acquire lock and start over; we expect to invalidate the
    1655             :              * slot next time (unless another process acquires the slot in the
    1656             :              * meantime).
    1657             :              */
    1658          14 :             LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    1659          14 :             continue;
    1660             :         }
    1661             :         else
    1662             :         {
    1663             :             /*
    1664             :              * We hold the slot now and have already invalidated it; flush it
    1665             :              * to ensure that state persists.
    1666             :              *
    1667             :              * Don't want to hold ReplicationSlotControlLock across file
    1668             :              * system operations, so release it now but be sure to tell caller
    1669             :              * to restart from scratch.
    1670             :              */
    1671          26 :             LWLockRelease(ReplicationSlotControlLock);
    1672          26 :             released_lock = true;
    1673             : 
    1674             :             /* Make sure the invalidated state persists across server restart */
    1675          26 :             ReplicationSlotMarkDirty();
    1676          26 :             ReplicationSlotSave();
    1677          26 :             ReplicationSlotRelease();
    1678          26 :             pgstat_drop_replslot(s);
    1679             : 
    1680          26 :             ReportSlotInvalidation(conflict, false, active_pid,
    1681             :                                    slotname, restart_lsn,
    1682             :                                    oldestLSN, snapshotConflictHorizon);
    1683             : 
    1684             :             /* done with this slot for now */
    1685          26 :             break;
    1686             :         }
    1687             :     }
    1688             : 
    1689             :     Assert(released_lock == !LWLockHeldByMe(ReplicationSlotControlLock));
    1690             : 
    1691         396 :     return released_lock;
    1692             : }
    1693             : 
    1694             : /*
    1695             :  * Invalidate slots that require resources about to be removed.
    1696             :  *
    1697             :  * Returns true when any slot have got invalidated.
    1698             :  *
    1699             :  * Whether a slot needs to be invalidated depends on the cause. A slot is
    1700             :  * removed if it:
    1701             :  * - RS_INVAL_WAL_REMOVED: requires a LSN older than the given segment
    1702             :  * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given
    1703             :  *   db; dboid may be InvalidOid for shared relations
    1704             :  * - RS_INVAL_WAL_LEVEL: is logical
    1705             :  *
    1706             :  * NB - this runs as part of checkpoint, so avoid raising errors if possible.
    1707             :  */
    1708             : bool
    1709        1610 : InvalidateObsoleteReplicationSlots(ReplicationSlotInvalidationCause cause,
    1710             :                                    XLogSegNo oldestSegno, Oid dboid,
    1711             :                                    TransactionId snapshotConflictHorizon)
    1712             : {
    1713             :     XLogRecPtr  oldestLSN;
    1714        1610 :     bool        invalidated = false;
    1715             : 
    1716             :     Assert(cause != RS_INVAL_HORIZON || TransactionIdIsValid(snapshotConflictHorizon));
    1717             :     Assert(cause != RS_INVAL_WAL_REMOVED || oldestSegno > 0);
    1718             :     Assert(cause != RS_INVAL_NONE);
    1719             : 
    1720        1610 :     if (max_replication_slots == 0)
    1721           2 :         return invalidated;
    1722             : 
    1723        1608 :     XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
    1724             : 
    1725        1634 : restart:
    1726        1634 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    1727       17084 :     for (int i = 0; i < max_replication_slots; i++)
    1728             :     {
    1729       15476 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
    1730             : 
    1731       15476 :         if (!s->in_use)
    1732       15080 :             continue;
    1733             : 
    1734         396 :         if (InvalidatePossiblyObsoleteSlot(cause, s, oldestLSN, dboid,
    1735             :                                            snapshotConflictHorizon,
    1736             :                                            &invalidated))
    1737             :         {
    1738             :             /* if the lock was released, start from scratch */
    1739          26 :             goto restart;
    1740             :         }
    1741             :     }
    1742        1608 :     LWLockRelease(ReplicationSlotControlLock);
    1743             : 
    1744             :     /*
    1745             :      * If any slots have been invalidated, recalculate the resource limits.
    1746             :      */
    1747        1608 :     if (invalidated)
    1748             :     {
    1749          16 :         ReplicationSlotsComputeRequiredXmin(false);
    1750          16 :         ReplicationSlotsComputeRequiredLSN();
    1751             :     }
    1752             : 
    1753        1608 :     return invalidated;
    1754             : }
    1755             : 
    1756             : /*
    1757             :  * Flush all replication slots to disk.
    1758             :  *
    1759             :  * It is convenient to flush dirty replication slots at the time of checkpoint.
    1760             :  * Additionally, in case of a shutdown checkpoint, we also identify the slots
    1761             :  * for which the confirmed_flush LSN has been updated since the last time it
    1762             :  * was saved and flush them.
    1763             :  */
    1764             : void
    1765        1574 : CheckPointReplicationSlots(bool is_shutdown)
    1766             : {
    1767             :     int         i;
    1768             : 
    1769        1574 :     elog(DEBUG1, "performing replication slot checkpoint");
    1770             : 
    1771             :     /*
    1772             :      * Prevent any slot from being created/dropped while we're active. As we
    1773             :      * explicitly do *not* want to block iterating over replication_slots or
    1774             :      * acquiring a slot we cannot take the control lock - but that's OK,
    1775             :      * because holding ReplicationSlotAllocationLock is strictly stronger, and
    1776             :      * enough to guarantee that nobody can change the in_use bits on us.
    1777             :      */
    1778        1574 :     LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
    1779             : 
    1780       16834 :     for (i = 0; i < max_replication_slots; i++)
    1781             :     {
    1782       15260 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
    1783             :         char        path[MAXPGPATH];
    1784             : 
    1785       15260 :         if (!s->in_use)
    1786       14972 :             continue;
    1787             : 
    1788             :         /* save the slot to disk, locking is handled in SaveSlotToPath() */
    1789         288 :         sprintf(path, "pg_replslot/%s", NameStr(s->data.name));
    1790             : 
    1791             :         /*
    1792             :          * Slot's data is not flushed each time the confirmed_flush LSN is
    1793             :          * updated as that could lead to frequent writes.  However, we decide
    1794             :          * to force a flush of all logical slot's data at the time of shutdown
    1795             :          * if the confirmed_flush LSN is changed since we last flushed it to
    1796             :          * disk.  This helps in avoiding an unnecessary retreat of the
    1797             :          * confirmed_flush LSN after restart.
    1798             :          */
    1799         288 :         if (is_shutdown && SlotIsLogical(s))
    1800             :         {
    1801         110 :             SpinLockAcquire(&s->mutex);
    1802             : 
    1803             :             Assert(s->data.confirmed_flush >= s->last_saved_confirmed_flush);
    1804             : 
    1805         110 :             if (s->data.invalidated == RS_INVAL_NONE &&
    1806         110 :                 s->data.confirmed_flush != s->last_saved_confirmed_flush)
    1807             :             {
    1808          66 :                 s->just_dirtied = true;
    1809          66 :                 s->dirty = true;
    1810             :             }
    1811         110 :             SpinLockRelease(&s->mutex);
    1812             :         }
    1813             : 
    1814         288 :         SaveSlotToPath(s, path, LOG);
    1815             :     }
    1816        1574 :     LWLockRelease(ReplicationSlotAllocationLock);
    1817        1574 : }
    1818             : 
    1819             : /*
    1820             :  * Load all replication slots from disk into memory at server startup. This
    1821             :  * needs to be run before we start crash recovery.
    1822             :  */
    1823             : void
    1824        1416 : StartupReplicationSlots(void)
    1825             : {
    1826             :     DIR        *replication_dir;
    1827             :     struct dirent *replication_de;
    1828             : 
    1829        1416 :     elog(DEBUG1, "starting up replication slots");
    1830             : 
    1831             :     /* restore all slots by iterating over all on-disk entries */
    1832        1416 :     replication_dir = AllocateDir("pg_replslot");
    1833        4338 :     while ((replication_de = ReadDir(replication_dir, "pg_replslot")) != NULL)
    1834             :     {
    1835             :         char        path[MAXPGPATH + 12];
    1836             :         PGFileType  de_type;
    1837             : 
    1838        2922 :         if (strcmp(replication_de->d_name, ".") == 0 ||
    1839        1506 :             strcmp(replication_de->d_name, "..") == 0)
    1840        2832 :             continue;
    1841             : 
    1842          90 :         snprintf(path, sizeof(path), "pg_replslot/%s", replication_de->d_name);
    1843          90 :         de_type = get_dirent_type(path, replication_de, false, DEBUG1);
    1844             : 
    1845             :         /* we're only creating directories here, skip if it's not our's */
    1846          90 :         if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_DIR)
    1847           0 :             continue;
    1848             : 
    1849             :         /* we crashed while a slot was being setup or deleted, clean up */
    1850          90 :         if (pg_str_endswith(replication_de->d_name, ".tmp"))
    1851             :         {
    1852           0 :             if (!rmtree(path, true))
    1853             :             {
    1854           0 :                 ereport(WARNING,
    1855             :                         (errmsg("could not remove directory \"%s\"",
    1856             :                                 path)));
    1857           0 :                 continue;
    1858             :             }
    1859           0 :             fsync_fname("pg_replslot", true);
    1860           0 :             continue;
    1861             :         }
    1862             : 
    1863             :         /* looks like a slot in a normal state, restore */
    1864          90 :         RestoreSlotFromDisk(replication_de->d_name);
    1865             :     }
    1866        1416 :     FreeDir(replication_dir);
    1867             : 
    1868             :     /* currently no slots exist, we're done. */
    1869        1416 :     if (max_replication_slots <= 0)
    1870           2 :         return;
    1871             : 
    1872             :     /* Now that we have recovered all the data, compute replication xmin */
    1873        1414 :     ReplicationSlotsComputeRequiredXmin(false);
    1874        1414 :     ReplicationSlotsComputeRequiredLSN();
    1875             : }
    1876             : 
    1877             : /* ----
    1878             :  * Manipulation of on-disk state of replication slots
    1879             :  *
    1880             :  * NB: none of the routines below should take any notice whether a slot is the
    1881             :  * current one or not, that's all handled a layer above.
    1882             :  * ----
    1883             :  */
    1884             : static void
    1885        1052 : CreateSlotOnDisk(ReplicationSlot *slot)
    1886             : {
    1887             :     char        tmppath[MAXPGPATH];
    1888             :     char        path[MAXPGPATH];
    1889             :     struct stat st;
    1890             : 
    1891             :     /*
    1892             :      * No need to take out the io_in_progress_lock, nobody else can see this
    1893             :      * slot yet, so nobody else will write. We're reusing SaveSlotToPath which
    1894             :      * takes out the lock, if we'd take the lock here, we'd deadlock.
    1895             :      */
    1896             : 
    1897        1052 :     sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
    1898        1052 :     sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));
    1899             : 
    1900             :     /*
    1901             :      * It's just barely possible that some previous effort to create or drop a
    1902             :      * slot with this name left a temp directory lying around. If that seems
    1903             :      * to be the case, try to remove it.  If the rmtree() fails, we'll error
    1904             :      * out at the MakePGDirectory() below, so we don't bother checking
    1905             :      * success.
    1906             :      */
    1907        1052 :     if (stat(tmppath, &st) == 0 && S_ISDIR(st.st_mode))
    1908           0 :         rmtree(tmppath, true);
    1909             : 
    1910             :     /* Create and fsync the temporary slot directory. */
    1911        1052 :     if (MakePGDirectory(tmppath) < 0)
    1912           0 :         ereport(ERROR,
    1913             :                 (errcode_for_file_access(),
    1914             :                  errmsg("could not create directory \"%s\": %m",
    1915             :                         tmppath)));
    1916        1052 :     fsync_fname(tmppath, true);
    1917             : 
    1918             :     /* Write the actual state file. */
    1919        1052 :     slot->dirty = true;          /* signal that we really need to write */
    1920        1052 :     SaveSlotToPath(slot, tmppath, ERROR);
    1921             : 
    1922             :     /* Rename the directory into place. */
    1923        1052 :     if (rename(tmppath, path) != 0)
    1924           0 :         ereport(ERROR,
    1925             :                 (errcode_for_file_access(),
    1926             :                  errmsg("could not rename file \"%s\" to \"%s\": %m",
    1927             :                         tmppath, path)));
    1928             : 
    1929             :     /*
    1930             :      * If we'd now fail - really unlikely - we wouldn't know whether this slot
    1931             :      * would persist after an OS crash or not - so, force a restart. The
    1932             :      * restart would try to fsync this again till it works.
    1933             :      */
    1934        1052 :     START_CRIT_SECTION();
    1935             : 
    1936        1052 :     fsync_fname(path, true);
    1937        1052 :     fsync_fname("pg_replslot", true);
    1938             : 
    1939        1052 :     END_CRIT_SECTION();
    1940        1052 : }
    1941             : 
    1942             : /*
    1943             :  * Shared functionality between saving and creating a replication slot.
    1944             :  */
    1945             : static void
    1946        3446 : SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
    1947             : {
    1948             :     char        tmppath[MAXPGPATH];
    1949             :     char        path[MAXPGPATH];
    1950             :     int         fd;
    1951             :     ReplicationSlotOnDisk cp;
    1952             :     bool        was_dirty;
    1953             : 
    1954             :     /* first check whether there's something to write out */
    1955        3446 :     SpinLockAcquire(&slot->mutex);
    1956        3446 :     was_dirty = slot->dirty;
    1957        3446 :     slot->just_dirtied = false;
    1958        3446 :     SpinLockRelease(&slot->mutex);
    1959             : 
    1960             :     /* and don't do anything if there's nothing to write */
    1961        3446 :     if (!was_dirty)
    1962         144 :         return;
    1963             : 
    1964        3302 :     LWLockAcquire(&slot->io_in_progress_lock, LW_EXCLUSIVE);
    1965             : 
    1966             :     /* silence valgrind :( */
    1967        3302 :     memset(&cp, 0, sizeof(ReplicationSlotOnDisk));
    1968             : 
    1969        3302 :     sprintf(tmppath, "%s/state.tmp", dir);
    1970        3302 :     sprintf(path, "%s/state", dir);
    1971             : 
    1972        3302 :     fd = OpenTransientFile(tmppath, O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
    1973        3302 :     if (fd < 0)
    1974             :     {
    1975             :         /*
    1976             :          * If not an ERROR, then release the lock before returning.  In case
    1977             :          * of an ERROR, the error recovery path automatically releases the
    1978             :          * lock, but no harm in explicitly releasing even in that case.  Note
    1979             :          * that LWLockRelease() could affect errno.
    1980             :          */
    1981           0 :         int         save_errno = errno;
    1982             : 
    1983           0 :         LWLockRelease(&slot->io_in_progress_lock);
    1984           0 :         errno = save_errno;
    1985           0 :         ereport(elevel,
    1986             :                 (errcode_for_file_access(),
    1987             :                  errmsg("could not create file \"%s\": %m",
    1988             :                         tmppath)));
    1989           0 :         return;
    1990             :     }
    1991             : 
    1992        3302 :     cp.magic = SLOT_MAGIC;
    1993        3302 :     INIT_CRC32C(cp.checksum);
    1994        3302 :     cp.version = SLOT_VERSION;
    1995        3302 :     cp.length = ReplicationSlotOnDiskV2Size;
    1996             : 
    1997        3302 :     SpinLockAcquire(&slot->mutex);
    1998             : 
    1999        3302 :     memcpy(&cp.slotdata, &slot->data, sizeof(ReplicationSlotPersistentData));
    2000             : 
    2001        3302 :     SpinLockRelease(&slot->mutex);
    2002             : 
    2003        3302 :     COMP_CRC32C(cp.checksum,
    2004             :                 (char *) (&cp) + ReplicationSlotOnDiskNotChecksummedSize,
    2005             :                 ReplicationSlotOnDiskChecksummedSize);
    2006        3302 :     FIN_CRC32C(cp.checksum);
    2007             : 
    2008        3302 :     errno = 0;
    2009        3302 :     pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_WRITE);
    2010        3302 :     if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
    2011             :     {
    2012           0 :         int         save_errno = errno;
    2013             : 
    2014           0 :         pgstat_report_wait_end();
    2015           0 :         CloseTransientFile(fd);
    2016           0 :         LWLockRelease(&slot->io_in_progress_lock);
    2017             : 
    2018             :         /* if write didn't set errno, assume problem is no disk space */
    2019           0 :         errno = save_errno ? save_errno : ENOSPC;
    2020           0 :         ereport(elevel,
    2021             :                 (errcode_for_file_access(),
    2022             :                  errmsg("could not write to file \"%s\": %m",
    2023             :                         tmppath)));
    2024           0 :         return;
    2025             :     }
    2026        3302 :     pgstat_report_wait_end();
    2027             : 
    2028             :     /* fsync the temporary file */
    2029        3302 :     pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_SYNC);
    2030        3302 :     if (pg_fsync(fd) != 0)
    2031             :     {
    2032           0 :         int         save_errno = errno;
    2033             : 
    2034           0 :         pgstat_report_wait_end();
    2035           0 :         CloseTransientFile(fd);
    2036           0 :         LWLockRelease(&slot->io_in_progress_lock);
    2037           0 :         errno = save_errno;
    2038           0 :         ereport(elevel,
    2039             :                 (errcode_for_file_access(),
    2040             :                  errmsg("could not fsync file \"%s\": %m",
    2041             :                         tmppath)));
    2042           0 :         return;
    2043             :     }
    2044        3302 :     pgstat_report_wait_end();
    2045             : 
    2046        3302 :     if (CloseTransientFile(fd) != 0)
    2047             :     {
    2048           0 :         int         save_errno = errno;
    2049             : 
    2050           0 :         LWLockRelease(&slot->io_in_progress_lock);
    2051           0 :         errno = save_errno;
    2052           0 :         ereport(elevel,
    2053             :                 (errcode_for_file_access(),
    2054             :                  errmsg("could not close file \"%s\": %m",
    2055             :                         tmppath)));
    2056           0 :         return;
    2057             :     }
    2058             : 
    2059             :     /* rename to permanent file, fsync file and directory */
    2060        3302 :     if (rename(tmppath, path) != 0)
    2061             :     {
    2062           0 :         int         save_errno = errno;
    2063             : 
    2064           0 :         LWLockRelease(&slot->io_in_progress_lock);
    2065           0 :         errno = save_errno;
    2066           0 :         ereport(elevel,
    2067             :                 (errcode_for_file_access(),
    2068             :                  errmsg("could not rename file \"%s\" to \"%s\": %m",
    2069             :                         tmppath, path)));
    2070           0 :         return;
    2071             :     }
    2072             : 
    2073             :     /*
    2074             :      * Check CreateSlotOnDisk() for the reasoning of using a critical section.
    2075             :      */
    2076        3302 :     START_CRIT_SECTION();
    2077             : 
    2078        3302 :     fsync_fname(path, false);
    2079        3302 :     fsync_fname(dir, true);
    2080        3302 :     fsync_fname("pg_replslot", true);
    2081             : 
    2082        3302 :     END_CRIT_SECTION();
    2083             : 
    2084             :     /*
    2085             :      * Successfully wrote, unset dirty bit, unless somebody dirtied again
    2086             :      * already and remember the confirmed_flush LSN value.
    2087             :      */
    2088        3302 :     SpinLockAcquire(&slot->mutex);
    2089        3302 :     if (!slot->just_dirtied)
    2090        3298 :         slot->dirty = false;
    2091        3302 :     slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
    2092        3302 :     SpinLockRelease(&slot->mutex);
    2093             : 
    2094        3302 :     LWLockRelease(&slot->io_in_progress_lock);
    2095             : }
    2096             : 
    2097             : /*
    2098             :  * Load a single slot from disk into memory.
    2099             :  */
    2100             : static void
    2101          90 : RestoreSlotFromDisk(const char *name)
    2102             : {
    2103             :     ReplicationSlotOnDisk cp;
    2104             :     int         i;
    2105             :     char        slotdir[MAXPGPATH + 12];
    2106             :     char        path[MAXPGPATH + 22];
    2107             :     int         fd;
    2108          90 :     bool        restored = false;
    2109             :     int         readBytes;
    2110             :     pg_crc32c   checksum;
    2111             : 
    2112             :     /* no need to lock here, no concurrent access allowed yet */
    2113             : 
    2114             :     /* delete temp file if it exists */
    2115          90 :     sprintf(slotdir, "pg_replslot/%s", name);
    2116          90 :     sprintf(path, "%s/state.tmp", slotdir);
    2117          90 :     if (unlink(path) < 0 && errno != ENOENT)
    2118           0 :         ereport(PANIC,
    2119             :                 (errcode_for_file_access(),
    2120             :                  errmsg("could not remove file \"%s\": %m", path)));
    2121             : 
    2122          90 :     sprintf(path, "%s/state", slotdir);
    2123             : 
    2124          90 :     elog(DEBUG1, "restoring replication slot from \"%s\"", path);
    2125             : 
    2126             :     /* on some operating systems fsyncing a file requires O_RDWR */
    2127          90 :     fd = OpenTransientFile(path, O_RDWR | PG_BINARY);
    2128             : 
    2129             :     /*
    2130             :      * We do not need to handle this as we are rename()ing the directory into
    2131             :      * place only after we fsync()ed the state file.
    2132             :      */
    2133          90 :     if (fd < 0)
    2134           0 :         ereport(PANIC,
    2135             :                 (errcode_for_file_access(),
    2136             :                  errmsg("could not open file \"%s\": %m", path)));
    2137             : 
    2138             :     /*
    2139             :      * Sync state file before we're reading from it. We might have crashed
    2140             :      * while it wasn't synced yet and we shouldn't continue on that basis.
    2141             :      */
    2142          90 :     pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_RESTORE_SYNC);
    2143          90 :     if (pg_fsync(fd) != 0)
    2144           0 :         ereport(PANIC,
    2145             :                 (errcode_for_file_access(),
    2146             :                  errmsg("could not fsync file \"%s\": %m",
    2147             :                         path)));
    2148          90 :     pgstat_report_wait_end();
    2149             : 
    2150             :     /* Also sync the parent directory */
    2151          90 :     START_CRIT_SECTION();
    2152          90 :     fsync_fname(slotdir, true);
    2153          90 :     END_CRIT_SECTION();
    2154             : 
    2155             :     /* read part of statefile that's guaranteed to be version independent */
    2156          90 :     pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
    2157          90 :     readBytes = read(fd, &cp, ReplicationSlotOnDiskConstantSize);
    2158          90 :     pgstat_report_wait_end();
    2159          90 :     if (readBytes != ReplicationSlotOnDiskConstantSize)
    2160             :     {
    2161           0 :         if (readBytes < 0)
    2162           0 :             ereport(PANIC,
    2163             :                     (errcode_for_file_access(),
    2164             :                      errmsg("could not read file \"%s\": %m", path)));
    2165             :         else
    2166           0 :             ereport(PANIC,
    2167             :                     (errcode(ERRCODE_DATA_CORRUPTED),
    2168             :                      errmsg("could not read file \"%s\": read %d of %zu",
    2169             :                             path, readBytes,
    2170             :                             (Size) ReplicationSlotOnDiskConstantSize)));
    2171             :     }
    2172             : 
    2173             :     /* verify magic */
    2174          90 :     if (cp.magic != SLOT_MAGIC)
    2175           0 :         ereport(PANIC,
    2176             :                 (errcode(ERRCODE_DATA_CORRUPTED),
    2177             :                  errmsg("replication slot file \"%s\" has wrong magic number: %u instead of %u",
    2178             :                         path, cp.magic, SLOT_MAGIC)));
    2179             : 
    2180             :     /* verify version */
    2181          90 :     if (cp.version != SLOT_VERSION)
    2182           0 :         ereport(PANIC,
    2183             :                 (errcode(ERRCODE_DATA_CORRUPTED),
    2184             :                  errmsg("replication slot file \"%s\" has unsupported version %u",
    2185             :                         path, cp.version)));
    2186             : 
    2187             :     /* boundary check on length */
    2188          90 :     if (cp.length != ReplicationSlotOnDiskV2Size)
    2189           0 :         ereport(PANIC,
    2190             :                 (errcode(ERRCODE_DATA_CORRUPTED),
    2191             :                  errmsg("replication slot file \"%s\" has corrupted length %u",
    2192             :                         path, cp.length)));
    2193             : 
    2194             :     /* Now that we know the size, read the entire file */
    2195          90 :     pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
    2196         180 :     readBytes = read(fd,
    2197             :                      (char *) &cp + ReplicationSlotOnDiskConstantSize,
    2198          90 :                      cp.length);
    2199          90 :     pgstat_report_wait_end();
    2200          90 :     if (readBytes != cp.length)
    2201             :     {
    2202           0 :         if (readBytes < 0)
    2203           0 :             ereport(PANIC,
    2204             :                     (errcode_for_file_access(),
    2205             :                      errmsg("could not read file \"%s\": %m", path)));
    2206             :         else
    2207           0 :             ereport(PANIC,
    2208             :                     (errcode(ERRCODE_DATA_CORRUPTED),
    2209             :                      errmsg("could not read file \"%s\": read %d of %zu",
    2210             :                             path, readBytes, (Size) cp.length)));
    2211             :     }
    2212             : 
    2213          90 :     if (CloseTransientFile(fd) != 0)
    2214           0 :         ereport(PANIC,
    2215             :                 (errcode_for_file_access(),
    2216             :                  errmsg("could not close file \"%s\": %m", path)));
    2217             : 
    2218             :     /* now verify the CRC */
    2219          90 :     INIT_CRC32C(checksum);
    2220          90 :     COMP_CRC32C(checksum,
    2221             :                 (char *) &cp + ReplicationSlotOnDiskNotChecksummedSize,
    2222             :                 ReplicationSlotOnDiskChecksummedSize);
    2223          90 :     FIN_CRC32C(checksum);
    2224             : 
    2225          90 :     if (!EQ_CRC32C(checksum, cp.checksum))
    2226           0 :         ereport(PANIC,
    2227             :                 (errmsg("checksum mismatch for replication slot file \"%s\": is %u, should be %u",
    2228             :                         path, checksum, cp.checksum)));
    2229             : 
    2230             :     /*
    2231             :      * If we crashed with an ephemeral slot active, don't restore but delete
    2232             :      * it.
    2233             :      */
    2234          90 :     if (cp.slotdata.persistency != RS_PERSISTENT)
    2235             :     {
    2236           0 :         if (!rmtree(slotdir, true))
    2237             :         {
    2238           0 :             ereport(WARNING,
    2239             :                     (errmsg("could not remove directory \"%s\"",
    2240             :                             slotdir)));
    2241             :         }
    2242           0 :         fsync_fname("pg_replslot", true);
    2243           0 :         return;
    2244             :     }
    2245             : 
    2246             :     /*
    2247             :      * Verify that requirements for the specific slot type are met. That's
    2248             :      * important because if these aren't met we're not guaranteed to retain
    2249             :      * all the necessary resources for the slot.
    2250             :      *
    2251             :      * NB: We have to do so *after* the above checks for ephemeral slots,
    2252             :      * because otherwise a slot that shouldn't exist anymore could prevent
    2253             :      * restarts.
    2254             :      *
    2255             :      * NB: Changing the requirements here also requires adapting
    2256             :      * CheckSlotRequirements() and CheckLogicalDecodingRequirements().
    2257             :      */
    2258          90 :     if (cp.slotdata.database != InvalidOid && wal_level < WAL_LEVEL_LOGICAL)
    2259           0 :         ereport(FATAL,
    2260             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2261             :                  errmsg("logical replication slot \"%s\" exists, but wal_level < logical",
    2262             :                         NameStr(cp.slotdata.name)),
    2263             :                  errhint("Change wal_level to be logical or higher.")));
    2264          90 :     else if (wal_level < WAL_LEVEL_REPLICA)
    2265           0 :         ereport(FATAL,
    2266             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2267             :                  errmsg("physical replication slot \"%s\" exists, but wal_level < replica",
    2268             :                         NameStr(cp.slotdata.name)),
    2269             :                  errhint("Change wal_level to be replica or higher.")));
    2270             : 
    2271             :     /* nothing can be active yet, don't lock anything */
    2272         122 :     for (i = 0; i < max_replication_slots; i++)
    2273             :     {
    2274             :         ReplicationSlot *slot;
    2275             : 
    2276         122 :         slot = &ReplicationSlotCtl->replication_slots[i];
    2277             : 
    2278         122 :         if (slot->in_use)
    2279          32 :             continue;
    2280             : 
    2281             :         /* restore the entire set of persistent data */
    2282          90 :         memcpy(&slot->data, &cp.slotdata,
    2283             :                sizeof(ReplicationSlotPersistentData));
    2284             : 
    2285             :         /* initialize in memory state */
    2286          90 :         slot->effective_xmin = cp.slotdata.xmin;
    2287          90 :         slot->effective_catalog_xmin = cp.slotdata.catalog_xmin;
    2288          90 :         slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
    2289             : 
    2290          90 :         slot->candidate_catalog_xmin = InvalidTransactionId;
    2291          90 :         slot->candidate_xmin_lsn = InvalidXLogRecPtr;
    2292          90 :         slot->candidate_restart_lsn = InvalidXLogRecPtr;
    2293          90 :         slot->candidate_restart_valid = InvalidXLogRecPtr;
    2294             : 
    2295          90 :         slot->in_use = true;
    2296          90 :         slot->active_pid = 0;
    2297             : 
    2298          90 :         restored = true;
    2299          90 :         break;
    2300             :     }
    2301             : 
    2302          90 :     if (!restored)
    2303           0 :         ereport(FATAL,
    2304             :                 (errmsg("too many replication slots active before shutdown"),
    2305             :                  errhint("Increase max_replication_slots and try again.")));
    2306             : }
    2307             : 
    2308             : /*
    2309             :  * Maps a conflict reason for a replication slot to
    2310             :  * ReplicationSlotInvalidationCause.
    2311             :  */
    2312             : ReplicationSlotInvalidationCause
    2313           0 : GetSlotInvalidationCause(const char *conflict_reason)
    2314             : {
    2315             :     ReplicationSlotInvalidationCause cause;
    2316           0 :     bool        found PG_USED_FOR_ASSERTS_ONLY = false;
    2317             : 
    2318             :     Assert(conflict_reason);
    2319             : 
    2320           0 :     for (cause = RS_INVAL_NONE; cause <= RS_INVAL_MAX_CAUSES; cause++)
    2321             :     {
    2322           0 :         if (strcmp(SlotInvalidationCauses[cause], conflict_reason) == 0)
    2323             :         {
    2324           0 :             found = true;
    2325           0 :             break;
    2326             :         }
    2327             :     }
    2328             : 
    2329             :     Assert(found);
    2330           0 :     return cause;
    2331             : }

Generated by: LCOV version 1.14