LCOV - code coverage report
Current view: top level - src/backend/replication - slot.c (source / functions) Hit Total Coverage
Test: PostgreSQL 14devel Lines: 458 574 79.8 %
Date: 2020-11-27 11:06:40 Functions: 28 28 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * slot.c
       4             :  *     Replication slot management.
       5             :  *
       6             :  *
       7             :  * Copyright (c) 2012-2020, PostgreSQL Global Development Group
       8             :  *
       9             :  *
      10             :  * IDENTIFICATION
      11             :  *    src/backend/replication/slot.c
      12             :  *
      13             :  * NOTES
      14             :  *
      15             :  * Replication slots are used to keep state about replication streams
      16             :  * originating from this cluster.  Their primary purpose is to prevent the
      17             :  * premature removal of WAL or of old tuple versions in a manner that would
      18             :  * interfere with replication; they are also useful for monitoring purposes.
      19             :  * Slots need to be permanent (to allow restarts), crash-safe, and allocatable
      20             :  * on standbys (to support cascading setups).  The requirement that slots be
      21             :  * usable on standbys precludes storing them in the system catalogs.
      22             :  *
      23             :  * Each replication slot gets its own directory inside the $PGDATA/pg_replslot
      24             :  * directory. Inside that directory the state file will contain the slot's
      25             :  * own data. Additional data can be stored alongside that file if required.
      26             :  * While the server is running, the state data is also cached in memory for
      27             :  * efficiency.
      28             :  *
      29             :  * ReplicationSlotAllocationLock must be taken in exclusive mode to allocate
      30             :  * or free a slot. ReplicationSlotControlLock must be taken in shared mode
      31             :  * to iterate over the slots, and in exclusive mode to change the in_use flag
      32             :  * of a slot.  The remaining data in each slot is protected by its mutex.
      33             :  *
      34             :  *-------------------------------------------------------------------------
      35             :  */
      36             : 
      37             : #include "postgres.h"
      38             : 
      39             : #include <unistd.h>
      40             : #include <sys/stat.h>
      41             : 
      42             : #include "access/transam.h"
      43             : #include "access/xlog_internal.h"
      44             : #include "common/string.h"
      45             : #include "miscadmin.h"
      46             : #include "pgstat.h"
      47             : #include "replication/slot.h"
      48             : #include "storage/fd.h"
      49             : #include "storage/proc.h"
      50             : #include "storage/procarray.h"
      51             : #include "utils/builtins.h"
      52             : 
      53             : /*
      54             :  * Replication slot on-disk data structure.
      55             :  */
      56             : typedef struct ReplicationSlotOnDisk
      57             : {
      58             :     /* first part of this struct needs to be version independent */
      59             : 
      60             :     /* data not covered by checksum */
      61             :     uint32      magic;
      62             :     pg_crc32c   checksum;
      63             : 
      64             :     /* data covered by checksum */
      65             :     uint32      version;
      66             :     uint32      length;
      67             : 
      68             :     /*
      69             :      * The actual data in the slot that follows can differ based on the above
      70             :      * 'version'.
      71             :      */
      72             : 
      73             :     ReplicationSlotPersistentData slotdata;
      74             : } ReplicationSlotOnDisk;
      75             : 
      76             : /* size of version independent data */
      77             : #define ReplicationSlotOnDiskConstantSize \
      78             :     offsetof(ReplicationSlotOnDisk, slotdata)
      79             : /* size of the part of the slot not covered by the checksum */
      80             : #define SnapBuildOnDiskNotChecksummedSize \
      81             :     offsetof(ReplicationSlotOnDisk, version)
      82             : /* size of the part covered by the checksum */
      83             : #define SnapBuildOnDiskChecksummedSize \
      84             :     sizeof(ReplicationSlotOnDisk) - SnapBuildOnDiskNotChecksummedSize
      85             : /* size of the slot data that is version dependent */
      86             : #define ReplicationSlotOnDiskV2Size \
      87             :     sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
      88             : 
      89             : #define SLOT_MAGIC      0x1051CA1   /* format identifier */
      90             : #define SLOT_VERSION    2       /* version for new files */
      91             : 
      92             : /* Control array for replication slot management */
      93             : ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
      94             : 
      95             : /* My backend's replication slot in the shared memory array */
      96             : ReplicationSlot *MyReplicationSlot = NULL;
      97             : 
      98             : /* GUCs */
      99             : int         max_replication_slots = 0;  /* the maximum number of replication
     100             :                                          * slots */
     101             : 
     102             : static int ReplicationSlotAcquireInternal(ReplicationSlot *slot,
     103             :                                           const char *name, SlotAcquireBehavior behavior);
     104             : static void ReplicationSlotDropAcquired(void);
     105             : static void ReplicationSlotDropPtr(ReplicationSlot *slot);
     106             : 
     107             : /* internal persistency functions */
     108             : static void RestoreSlotFromDisk(const char *name);
     109             : static void CreateSlotOnDisk(ReplicationSlot *slot);
     110             : static void SaveSlotToPath(ReplicationSlot *slot, const char *path, int elevel);
     111             : 
     112             : /*
     113             :  * Report shared-memory space needed by ReplicationSlotsShmemInit.
     114             :  */
     115             : Size
     116        7120 : ReplicationSlotsShmemSize(void)
     117             : {
     118        7120 :     Size        size = 0;
     119             : 
     120        7120 :     if (max_replication_slots == 0)
     121           0 :         return size;
     122             : 
     123        7120 :     size = offsetof(ReplicationSlotCtlData, replication_slots);
     124        7120 :     size = add_size(size,
     125             :                     mul_size(max_replication_slots, sizeof(ReplicationSlot)));
     126             : 
     127        7120 :     return size;
     128             : }
     129             : 
     130             : /*
     131             :  * Allocate and initialize shared memory for replication slots.
     132             :  */
     133             : void
     134        2372 : ReplicationSlotsShmemInit(void)
     135             : {
     136             :     bool        found;
     137             : 
     138        2372 :     if (max_replication_slots == 0)
     139           0 :         return;
     140             : 
     141        2372 :     ReplicationSlotCtl = (ReplicationSlotCtlData *)
     142        2372 :         ShmemInitStruct("ReplicationSlot Ctl", ReplicationSlotsShmemSize(),
     143             :                         &found);
     144             : 
     145        2372 :     if (!found)
     146             :     {
     147             :         int         i;
     148             : 
     149             :         /* First time through, so initialize */
     150        4232 :         MemSet(ReplicationSlotCtl, 0, ReplicationSlotsShmemSize());
     151             : 
     152       25992 :         for (i = 0; i < max_replication_slots; i++)
     153             :         {
     154       23620 :             ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i];
     155             : 
     156             :             /* everything else is zeroed by the memset above */
     157       23620 :             SpinLockInit(&slot->mutex);
     158       23620 :             LWLockInitialize(&slot->io_in_progress_lock,
     159             :                              LWTRANCHE_REPLICATION_SLOT_IO);
     160       23620 :             ConditionVariableInit(&slot->active_cv);
     161             :         }
     162             :     }
     163             : }
     164             : 
     165             : /*
     166             :  * Check whether the passed slot name is valid and report errors at elevel.
     167             :  *
     168             :  * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow
     169             :  * the name to be used as a directory name on every supported OS.
     170             :  *
     171             :  * Returns whether the directory name is valid or not if elevel < ERROR.
     172             :  */
     173             : bool
     174         602 : ReplicationSlotValidateName(const char *name, int elevel)
     175             : {
     176             :     const char *cp;
     177             : 
     178         602 :     if (strlen(name) == 0)
     179             :     {
     180           0 :         ereport(elevel,
     181             :                 (errcode(ERRCODE_INVALID_NAME),
     182             :                  errmsg("replication slot name \"%s\" is too short",
     183             :                         name)));
     184           0 :         return false;
     185             :     }
     186             : 
     187         602 :     if (strlen(name) >= NAMEDATALEN)
     188             :     {
     189           0 :         ereport(elevel,
     190             :                 (errcode(ERRCODE_NAME_TOO_LONG),
     191             :                  errmsg("replication slot name \"%s\" is too long",
     192             :                         name)));
     193           0 :         return false;
     194             :     }
     195             : 
     196        9948 :     for (cp = name; *cp; cp++)
     197             :     {
     198        9348 :         if (!((*cp >= 'a' && *cp <= 'z')
     199        3072 :               || (*cp >= '0' && *cp <= '9')
     200        1062 :               || (*cp == '_')))
     201             :         {
     202           2 :             ereport(elevel,
     203             :                     (errcode(ERRCODE_INVALID_NAME),
     204             :                      errmsg("replication slot name \"%s\" contains invalid character",
     205             :                             name),
     206             :                      errhint("Replication slot names may only contain lower case letters, numbers, and the underscore character.")));
     207           0 :             return false;
     208             :         }
     209             :     }
     210         600 :     return true;
     211             : }
     212             : 
     213             : /*
     214             :  * Create a new replication slot and mark it as used by this backend.
     215             :  *
     216             :  * name: Name of the slot
     217             :  * db_specific: logical decoding is db specific; if the slot is going to
     218             :  *     be used for that pass true, otherwise false.
     219             :  */
     220             : void
     221         512 : ReplicationSlotCreate(const char *name, bool db_specific,
     222             :                       ReplicationSlotPersistency persistency)
     223             : {
     224         512 :     ReplicationSlot *slot = NULL;
     225             :     int         i;
     226             : 
     227             :     Assert(MyReplicationSlot == NULL);
     228             : 
     229         512 :     ReplicationSlotValidateName(name, ERROR);
     230             : 
     231             :     /*
     232             :      * If some other backend ran this code concurrently with us, we'd likely
     233             :      * both allocate the same slot, and that would be bad.  We'd also be at
     234             :      * risk of missing a name collision.  Also, we don't want to try to create
     235             :      * a new slot while somebody's busy cleaning up an old one, because we
     236             :      * might both be monkeying with the same directory.
     237             :      */
     238         510 :     LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
     239             : 
     240             :     /*
     241             :      * Check for name collision, and identify an allocatable slot.  We need to
     242             :      * hold ReplicationSlotControlLock in shared mode for this, so that nobody
     243             :      * else can change the in_use flags while we're looking at them.
     244             :      */
     245         510 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     246        4450 :     for (i = 0; i < max_replication_slots; i++)
     247             :     {
     248        3946 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
     249             : 
     250        3946 :         if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
     251           6 :             ereport(ERROR,
     252             :                     (errcode(ERRCODE_DUPLICATE_OBJECT),
     253             :                      errmsg("replication slot \"%s\" already exists", name)));
     254        3940 :         if (!s->in_use && slot == NULL)
     255         502 :             slot = s;
     256             :     }
     257         504 :     LWLockRelease(ReplicationSlotControlLock);
     258             : 
     259             :     /* If all slots are in use, we're out of luck. */
     260         504 :     if (slot == NULL)
     261           2 :         ereport(ERROR,
     262             :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     263             :                  errmsg("all replication slots are in use"),
     264             :                  errhint("Free one or increase max_replication_slots.")));
     265             : 
     266             :     /*
     267             :      * Since this slot is not in use, nobody should be looking at any part of
     268             :      * it other than the in_use field unless they're trying to allocate it.
     269             :      * And since we hold ReplicationSlotAllocationLock, nobody except us can
     270             :      * be doing that.  So it's safe to initialize the slot.
     271             :      */
     272             :     Assert(!slot->in_use);
     273             :     Assert(slot->active_pid == 0);
     274             : 
     275             :     /* first initialize persistent data */
     276         502 :     memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData));
     277         502 :     namestrcpy(&slot->data.name, name);
     278         502 :     slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
     279         502 :     slot->data.persistency = persistency;
     280             : 
     281             :     /* and then data only present in shared memory */
     282         502 :     slot->just_dirtied = false;
     283         502 :     slot->dirty = false;
     284         502 :     slot->effective_xmin = InvalidTransactionId;
     285         502 :     slot->effective_catalog_xmin = InvalidTransactionId;
     286         502 :     slot->candidate_catalog_xmin = InvalidTransactionId;
     287         502 :     slot->candidate_xmin_lsn = InvalidXLogRecPtr;
     288         502 :     slot->candidate_restart_valid = InvalidXLogRecPtr;
     289         502 :     slot->candidate_restart_lsn = InvalidXLogRecPtr;
     290             : 
     291             :     /*
     292             :      * Create the slot on disk.  We haven't actually marked the slot allocated
     293             :      * yet, so no special cleanup is required if this errors out.
     294             :      */
     295         502 :     CreateSlotOnDisk(slot);
     296             : 
     297             :     /*
     298             :      * We need to briefly prevent any other backend from iterating over the
     299             :      * slots while we flip the in_use flag. We also need to set the active
     300             :      * flag while holding the ControlLock as otherwise a concurrent
     301             :      * ReplicationSlotAcquire() could acquire the slot as well.
     302             :      */
     303         502 :     LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
     304             : 
     305         502 :     slot->in_use = true;
     306             : 
     307             :     /* We can now mark the slot active, and that makes it our slot. */
     308         502 :     SpinLockAcquire(&slot->mutex);
     309             :     Assert(slot->active_pid == 0);
     310         502 :     slot->active_pid = MyProcPid;
     311         502 :     SpinLockRelease(&slot->mutex);
     312         502 :     MyReplicationSlot = slot;
     313             : 
     314         502 :     LWLockRelease(ReplicationSlotControlLock);
     315             : 
     316             :     /*
     317             :      * Create statistics entry for the new logical slot. We don't collect any
     318             :      * stats for physical slots, so no need to create an entry for the same.
     319             :      * See ReplicationSlotDropPtr for why we need to do this before releasing
     320             :      * ReplicationSlotAllocationLock.
     321             :      */
     322         502 :     if (SlotIsLogical(slot))
     323         334 :         pgstat_report_replslot(NameStr(slot->data.name), 0, 0, 0, 0, 0, 0);
     324             : 
     325             :     /*
     326             :      * Now that the slot has been marked as in_use and active, it's safe to
     327             :      * let somebody else try to allocate a slot.
     328             :      */
     329         502 :     LWLockRelease(ReplicationSlotAllocationLock);
     330             : 
     331             :     /* Let everybody know we've modified this slot */
     332         502 :     ConditionVariableBroadcast(&slot->active_cv);
     333         502 : }
     334             : 
     335             : /*
     336             :  * Search for the named replication slot.
     337             :  *
     338             :  * Return the replication slot if found, otherwise NULL.
     339             :  *
     340             :  * The caller must hold ReplicationSlotControlLock in shared mode.
     341             :  */
     342             : ReplicationSlot *
     343         794 : SearchNamedReplicationSlot(const char *name)
     344             : {
     345             :     int         i;
     346         794 :     ReplicationSlot *slot = NULL;
     347             : 
     348             :     Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock,
     349             :                                 LW_SHARED));
     350             : 
     351        1072 :     for (i = 0; i < max_replication_slots; i++)
     352             :     {
     353        1058 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
     354             : 
     355        1058 :         if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
     356             :         {
     357         780 :             slot = s;
     358         780 :             break;
     359             :         }
     360             :     }
     361             : 
     362         794 :     return slot;
     363             : }
     364             : 
     365             : /*
     366             :  * Find a previously created slot and mark it as used by this process.
     367             :  *
     368             :  * The return value is only useful if behavior is SAB_Inquire, in which
     369             :  * it's zero if we successfully acquired the slot, -1 if the slot no longer
     370             :  * exists, or the PID of the owning process otherwise.  If behavior is
     371             :  * SAB_Error, then trying to acquire an owned slot is an error.
     372             :  * If SAB_Block, we sleep until the slot is released by the owning process.
     373             :  */
     374             : int
     375         792 : ReplicationSlotAcquire(const char *name, SlotAcquireBehavior behavior)
     376             : {
     377         792 :     return ReplicationSlotAcquireInternal(NULL, name, behavior);
     378             : }
     379             : 
     380             : /*
     381             :  * Mark the specified slot as used by this process.
     382             :  *
     383             :  * Only one of slot and name can be specified.
     384             :  * If slot == NULL, search for the slot with the given name.
     385             :  *
     386             :  * See comments about the return value in ReplicationSlotAcquire().
     387             :  */
     388             : static int
     389         792 : ReplicationSlotAcquireInternal(ReplicationSlot *slot, const char *name,
     390             :                                SlotAcquireBehavior behavior)
     391             : {
     392             :     ReplicationSlot *s;
     393             :     int         active_pid;
     394             : 
     395             :     AssertArg((slot == NULL) ^ (name == NULL));
     396             : 
     397         792 : retry:
     398             :     Assert(MyReplicationSlot == NULL);
     399             : 
     400         792 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     401             : 
     402             :     /*
     403             :      * Search for the slot with the specified name if the slot to acquire is
     404             :      * not given. If the slot is not found, we either return -1 or error out.
     405             :      */
     406         792 :     s = slot ? slot : SearchNamedReplicationSlot(name);
     407         792 :     if (s == NULL || !s->in_use)
     408             :     {
     409          14 :         LWLockRelease(ReplicationSlotControlLock);
     410             : 
     411          14 :         if (behavior == SAB_Inquire)
     412           0 :             return -1;
     413          14 :         ereport(ERROR,
     414             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
     415             :                  errmsg("replication slot \"%s\" does not exist",
     416             :                         name ? name : NameStr(slot->data.name))));
     417             :     }
     418             : 
     419             :     /*
     420             :      * This is the slot we want; check if it's active under some other
     421             :      * process.  In single user mode, we don't need this check.
     422             :      */
     423         778 :     if (IsUnderPostmaster)
     424             :     {
     425             :         /*
     426             :          * Get ready to sleep on the slot in case it is active if SAB_Block.
     427             :          * (We may end up not sleeping, but we don't want to do this while
     428             :          * holding the spinlock.)
     429             :          */
     430         778 :         if (behavior == SAB_Block)
     431          14 :             ConditionVariablePrepareToSleep(&s->active_cv);
     432             : 
     433         778 :         SpinLockAcquire(&s->mutex);
     434         778 :         if (s->active_pid == 0)
     435         516 :             s->active_pid = MyProcPid;
     436         778 :         active_pid = s->active_pid;
     437         778 :         SpinLockRelease(&s->mutex);
     438             :     }
     439             :     else
     440           0 :         active_pid = MyProcPid;
     441         778 :     LWLockRelease(ReplicationSlotControlLock);
     442             : 
     443             :     /*
     444             :      * If we found the slot but it's already active in another process, we
     445             :      * either error out, return the PID of the owning process, or retry
     446             :      * after a short wait, as caller specified.
     447             :      */
     448         778 :     if (active_pid != MyProcPid)
     449             :     {
     450           0 :         if (behavior == SAB_Error)
     451           0 :             ereport(ERROR,
     452             :                     (errcode(ERRCODE_OBJECT_IN_USE),
     453             :                      errmsg("replication slot \"%s\" is active for PID %d",
     454             :                             NameStr(s->data.name), active_pid)));
     455           0 :         else if (behavior == SAB_Inquire)
     456           0 :             return active_pid;
     457             : 
     458             :         /* Wait here until we get signaled, and then restart */
     459           0 :         ConditionVariableSleep(&s->active_cv,
     460             :                                WAIT_EVENT_REPLICATION_SLOT_DROP);
     461           0 :         ConditionVariableCancelSleep();
     462           0 :         goto retry;
     463             :     }
     464         778 :     else if (behavior == SAB_Block)
     465          14 :         ConditionVariableCancelSleep(); /* no sleep needed after all */
     466             : 
     467             :     /* Let everybody know we've modified this slot */
     468         778 :     ConditionVariableBroadcast(&s->active_cv);
     469             : 
     470             :     /* We made this slot active, so it's ours now. */
     471         778 :     MyReplicationSlot = s;
     472             : 
     473             :     /* success */
     474         778 :     return 0;
     475             : }
     476             : 
     477             : /*
     478             :  * Release the replication slot that this backend considers to own.
     479             :  *
     480             :  * This or another backend can re-acquire the slot later.
     481             :  * Resources this slot requires will be preserved.
     482             :  */
     483             : void
     484        1122 : ReplicationSlotRelease(void)
     485             : {
     486        1122 :     ReplicationSlot *slot = MyReplicationSlot;
     487             : 
     488             :     Assert(slot != NULL && slot->active_pid != 0);
     489             : 
     490        1122 :     if (slot->data.persistency == RS_EPHEMERAL)
     491             :     {
     492             :         /*
     493             :          * Delete the slot. There is no !PANIC case where this is allowed to
     494             :          * fail, all that may happen is an incomplete cleanup of the on-disk
     495             :          * data.
     496             :          */
     497           6 :         ReplicationSlotDropAcquired();
     498             :     }
     499             : 
     500             :     /*
     501             :      * If slot needed to temporarily restrain both data and catalog xmin to
     502             :      * create the catalog snapshot, remove that temporary constraint.
     503             :      * Snapshots can only be exported while the initial snapshot is still
     504             :      * acquired.
     505             :      */
     506        1122 :     if (!TransactionIdIsValid(slot->data.xmin) &&
     507        1122 :         TransactionIdIsValid(slot->effective_xmin))
     508             :     {
     509         120 :         SpinLockAcquire(&slot->mutex);
     510         120 :         slot->effective_xmin = InvalidTransactionId;
     511         120 :         SpinLockRelease(&slot->mutex);
     512         120 :         ReplicationSlotsComputeRequiredXmin(false);
     513             :     }
     514             : 
     515        1122 :     if (slot->data.persistency == RS_PERSISTENT)
     516             :     {
     517             :         /*
     518             :          * Mark persistent slot inactive.  We're not freeing it, just
     519             :          * disconnecting, but wake up others that may be waiting for it.
     520             :          */
     521         584 :         SpinLockAcquire(&slot->mutex);
     522         584 :         slot->active_pid = 0;
     523         584 :         SpinLockRelease(&slot->mutex);
     524         584 :         ConditionVariableBroadcast(&slot->active_cv);
     525             :     }
     526             : 
     527        1122 :     MyReplicationSlot = NULL;
     528             : 
     529             :     /* might not have been set when we've been a plain slot */
     530        1122 :     LWLockAcquire(ProcArrayLock, LW_SHARED);
     531        1122 :     MyProc->statusFlags &= ~PROC_IN_LOGICAL_DECODING;
     532        1122 :     ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags;
     533        1122 :     LWLockRelease(ProcArrayLock);
     534        1122 : }
     535             : 
     536             : /*
     537             :  * Cleanup all temporary slots created in current session.
     538             :  */
     539             : void
     540       31768 : ReplicationSlotCleanup(void)
     541             : {
     542             :     int         i;
     543             : 
     544             :     Assert(MyReplicationSlot == NULL);
     545             : 
     546       31768 : restart:
     547       31768 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     548      344396 :     for (i = 0; i < max_replication_slots; i++)
     549             :     {
     550      312898 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
     551             : 
     552      312898 :         if (!s->in_use)
     553      310540 :             continue;
     554             : 
     555        2358 :         SpinLockAcquire(&s->mutex);
     556        2358 :         if (s->active_pid == MyProcPid)
     557             :         {
     558             :             Assert(s->data.persistency == RS_TEMPORARY);
     559         270 :             SpinLockRelease(&s->mutex);
     560         270 :             LWLockRelease(ReplicationSlotControlLock);  /* avoid deadlock */
     561             : 
     562         270 :             ReplicationSlotDropPtr(s);
     563             : 
     564         270 :             ConditionVariableBroadcast(&s->active_cv);
     565         270 :             goto restart;
     566             :         }
     567             :         else
     568        2088 :             SpinLockRelease(&s->mutex);
     569             :     }
     570             : 
     571       31498 :     LWLockRelease(ReplicationSlotControlLock);
     572       31498 : }
     573             : 
     574             : /*
     575             :  * Permanently drop replication slot identified by the passed in name.
     576             :  */
     577             : void
     578         168 : ReplicationSlotDrop(const char *name, bool nowait)
     579             : {
     580             :     Assert(MyReplicationSlot == NULL);
     581             : 
     582         168 :     (void) ReplicationSlotAcquire(name, nowait ? SAB_Error : SAB_Block);
     583             : 
     584         158 :     ReplicationSlotDropAcquired();
     585         158 : }
     586             : 
     587             : /*
     588             :  * Permanently drop the currently acquired replication slot.
     589             :  */
     590             : static void
     591         170 : ReplicationSlotDropAcquired(void)
     592             : {
     593         170 :     ReplicationSlot *slot = MyReplicationSlot;
     594             : 
     595             :     Assert(MyReplicationSlot != NULL);
     596             : 
     597             :     /* slot isn't acquired anymore */
     598         170 :     MyReplicationSlot = NULL;
     599             : 
     600         170 :     ReplicationSlotDropPtr(slot);
     601         170 : }
     602             : 
     603             : /*
     604             :  * Permanently drop the replication slot which will be released by the point
     605             :  * this function returns.
     606             :  */
     607             : static void
     608         440 : ReplicationSlotDropPtr(ReplicationSlot *slot)
     609             : {
     610             :     char        path[MAXPGPATH];
     611             :     char        tmppath[MAXPGPATH];
     612             : 
     613             :     /*
     614             :      * If some other backend ran this code concurrently with us, we might try
     615             :      * to delete a slot with a certain name while someone else was trying to
     616             :      * create a slot with the same name.
     617             :      */
     618         440 :     LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
     619             : 
     620             :     /* Generate pathnames. */
     621         440 :     sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
     622         440 :     sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));
     623             : 
     624             :     /*
     625             :      * Rename the slot directory on disk, so that we'll no longer recognize
     626             :      * this as a valid slot.  Note that if this fails, we've got to mark the
     627             :      * slot inactive before bailing out.  If we're dropping an ephemeral or a
     628             :      * temporary slot, we better never fail hard as the caller won't expect
     629             :      * the slot to survive and this might get called during error handling.
     630             :      */
     631         440 :     if (rename(path, tmppath) == 0)
     632             :     {
     633             :         /*
     634             :          * We need to fsync() the directory we just renamed and its parent to
     635             :          * make sure that our changes are on disk in a crash-safe fashion.  If
     636             :          * fsync() fails, we can't be sure whether the changes are on disk or
     637             :          * not.  For now, we handle that by panicking;
     638             :          * StartupReplicationSlots() will try to straighten it out after
     639             :          * restart.
     640             :          */
     641         440 :         START_CRIT_SECTION();
     642         440 :         fsync_fname(tmppath, true);
     643         440 :         fsync_fname("pg_replslot", true);
     644         440 :         END_CRIT_SECTION();
     645             :     }
     646             :     else
     647             :     {
     648           0 :         bool        fail_softly = slot->data.persistency != RS_PERSISTENT;
     649             : 
     650           0 :         SpinLockAcquire(&slot->mutex);
     651           0 :         slot->active_pid = 0;
     652           0 :         SpinLockRelease(&slot->mutex);
     653             : 
     654             :         /* wake up anyone waiting on this slot */
     655           0 :         ConditionVariableBroadcast(&slot->active_cv);
     656             : 
     657           0 :         ereport(fail_softly ? WARNING : ERROR,
     658             :                 (errcode_for_file_access(),
     659             :                  errmsg("could not rename file \"%s\" to \"%s\": %m",
     660             :                         path, tmppath)));
     661             :     }
     662             : 
     663             :     /*
     664             :      * The slot is definitely gone.  Lock out concurrent scans of the array
     665             :      * long enough to kill it.  It's OK to clear the active PID here without
     666             :      * grabbing the mutex because nobody else can be scanning the array here,
     667             :      * and nobody can be attached to this slot and thus access it without
     668             :      * scanning the array.
     669             :      *
     670             :      * Also wake up processes waiting for it.
     671             :      */
     672         440 :     LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
     673         440 :     slot->active_pid = 0;
     674         440 :     slot->in_use = false;
     675         440 :     LWLockRelease(ReplicationSlotControlLock);
     676         440 :     ConditionVariableBroadcast(&slot->active_cv);
     677             : 
     678             :     /*
     679             :      * Slot is dead and doesn't prevent resource removal anymore, recompute
     680             :      * limits.
     681             :      */
     682         440 :     ReplicationSlotsComputeRequiredXmin(false);
     683         440 :     ReplicationSlotsComputeRequiredLSN();
     684             : 
     685             :     /*
     686             :      * If removing the directory fails, the worst thing that will happen is
     687             :      * that the user won't be able to create a new slot with the same name
     688             :      * until the next server restart.  We warn about it, but that's all.
     689             :      */
     690         440 :     if (!rmtree(tmppath, true))
     691           0 :         ereport(WARNING,
     692             :                 (errmsg("could not remove directory \"%s\"", tmppath)));
     693             : 
     694             :     /*
     695             :      * Send a message to drop the replication slot to the stats collector.
     696             :      * Since there is no guarantee of the order of message transfer on a UDP
     697             :      * connection, it's possible that a message for creating a new slot
     698             :      * reaches before a message for removing the old slot. We send the drop
     699             :      * and create messages while holding ReplicationSlotAllocationLock to
     700             :      * reduce that possibility. If the messages reached in reverse, we would
     701             :      * lose one statistics update message. But the next update message will
     702             :      * create the statistics for the replication slot.
     703             :      */
     704         440 :     if (SlotIsLogical(slot))
     705         286 :         pgstat_report_replslot_drop(NameStr(slot->data.name));
     706             : 
     707             :     /*
     708             :      * We release this at the very end, so that nobody starts trying to create
     709             :      * a slot while we're still cleaning up the detritus of the old one.
     710             :      */
     711         440 :     LWLockRelease(ReplicationSlotAllocationLock);
     712         440 : }
     713             : 
     714             : /*
     715             :  * Serialize the currently acquired slot's state from memory to disk, thereby
     716             :  * guaranteeing the current state will survive a crash.
     717             :  */
     718             : void
     719         704 : ReplicationSlotSave(void)
     720             : {
     721             :     char        path[MAXPGPATH];
     722             : 
     723             :     Assert(MyReplicationSlot != NULL);
     724             : 
     725         704 :     sprintf(path, "pg_replslot/%s", NameStr(MyReplicationSlot->data.name));
     726         704 :     SaveSlotToPath(MyReplicationSlot, path, ERROR);
     727         704 : }
     728             : 
     729             : /*
     730             :  * Signal that it would be useful if the currently acquired slot would be
     731             :  * flushed out to disk.
     732             :  *
     733             :  * Note that the actual flush to disk can be delayed for a long time, if
     734             :  * required for correctness explicitly do a ReplicationSlotSave().
     735             :  */
     736             : void
     737        1372 : ReplicationSlotMarkDirty(void)
     738             : {
     739        1372 :     ReplicationSlot *slot = MyReplicationSlot;
     740             : 
     741             :     Assert(MyReplicationSlot != NULL);
     742             : 
     743        1372 :     SpinLockAcquire(&slot->mutex);
     744        1372 :     MyReplicationSlot->just_dirtied = true;
     745        1372 :     MyReplicationSlot->dirty = true;
     746        1372 :     SpinLockRelease(&slot->mutex);
     747        1372 : }
     748             : 
     749             : /*
     750             :  * Convert a slot that's marked as RS_EPHEMERAL to a RS_PERSISTENT slot,
     751             :  * guaranteeing it will be there after an eventual crash.
     752             :  */
     753             : void
     754         192 : ReplicationSlotPersist(void)
     755             : {
     756         192 :     ReplicationSlot *slot = MyReplicationSlot;
     757             : 
     758             :     Assert(slot != NULL);
     759             :     Assert(slot->data.persistency != RS_PERSISTENT);
     760             : 
     761         192 :     SpinLockAcquire(&slot->mutex);
     762         192 :     slot->data.persistency = RS_PERSISTENT;
     763         192 :     SpinLockRelease(&slot->mutex);
     764             : 
     765         192 :     ReplicationSlotMarkDirty();
     766         192 :     ReplicationSlotSave();
     767         192 : }
     768             : 
     769             : /*
     770             :  * Compute the oldest xmin across all slots and store it in the ProcArray.
     771             :  *
     772             :  * If already_locked is true, ProcArrayLock has already been acquired
     773             :  * exclusively.
     774             :  */
     775             : void
     776        2486 : ReplicationSlotsComputeRequiredXmin(bool already_locked)
     777             : {
     778             :     int         i;
     779        2486 :     TransactionId agg_xmin = InvalidTransactionId;
     780        2486 :     TransactionId agg_catalog_xmin = InvalidTransactionId;
     781             : 
     782             :     Assert(ReplicationSlotCtl != NULL);
     783             : 
     784        2486 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     785             : 
     786       24920 :     for (i = 0; i < max_replication_slots; i++)
     787             :     {
     788       22434 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
     789             :         TransactionId effective_xmin;
     790             :         TransactionId effective_catalog_xmin;
     791             : 
     792       22434 :         if (!s->in_use)
     793       21214 :             continue;
     794             : 
     795        1220 :         SpinLockAcquire(&s->mutex);
     796        1220 :         effective_xmin = s->effective_xmin;
     797        1220 :         effective_catalog_xmin = s->effective_catalog_xmin;
     798        1220 :         SpinLockRelease(&s->mutex);
     799             : 
     800             :         /* check the data xmin */
     801        1220 :         if (TransactionIdIsValid(effective_xmin) &&
     802           0 :             (!TransactionIdIsValid(agg_xmin) ||
     803           0 :              TransactionIdPrecedes(effective_xmin, agg_xmin)))
     804         142 :             agg_xmin = effective_xmin;
     805             : 
     806             :         /* check the catalog xmin */
     807        1220 :         if (TransactionIdIsValid(effective_catalog_xmin) &&
     808         452 :             (!TransactionIdIsValid(agg_catalog_xmin) ||
     809         452 :              TransactionIdPrecedes(effective_catalog_xmin, agg_catalog_xmin)))
     810         656 :             agg_catalog_xmin = effective_catalog_xmin;
     811             :     }
     812             : 
     813        2486 :     LWLockRelease(ReplicationSlotControlLock);
     814             : 
     815        2486 :     ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
     816        2486 : }
     817             : 
     818             : /*
     819             :  * Compute the oldest restart LSN across all slots and inform xlog module.
     820             :  *
     821             :  * Note: while max_slot_wal_keep_size is theoretically relevant for this
     822             :  * purpose, we don't try to account for that, because this module doesn't
     823             :  * know what to compare against.
     824             :  */
     825             : void
     826        2712 : ReplicationSlotsComputeRequiredLSN(void)
     827             : {
     828             :     int         i;
     829        2712 :     XLogRecPtr  min_required = InvalidXLogRecPtr;
     830             : 
     831             :     Assert(ReplicationSlotCtl != NULL);
     832             : 
     833        2712 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     834       27250 :     for (i = 0; i < max_replication_slots; i++)
     835             :     {
     836       24538 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
     837             :         XLogRecPtr  restart_lsn;
     838             : 
     839       24538 :         if (!s->in_use)
     840       23222 :             continue;
     841             : 
     842        1316 :         SpinLockAcquire(&s->mutex);
     843        1316 :         restart_lsn = s->data.restart_lsn;
     844        1316 :         SpinLockRelease(&s->mutex);
     845             : 
     846        1316 :         if (restart_lsn != InvalidXLogRecPtr &&
     847         344 :             (min_required == InvalidXLogRecPtr ||
     848             :              restart_lsn < min_required))
     849         962 :             min_required = restart_lsn;
     850             :     }
     851        2712 :     LWLockRelease(ReplicationSlotControlLock);
     852             : 
     853        2712 :     XLogSetReplicationSlotMinimumLSN(min_required);
     854        2712 : }
     855             : 
     856             : /*
     857             :  * Compute the oldest WAL LSN required by *logical* decoding slots..
     858             :  *
     859             :  * Returns InvalidXLogRecPtr if logical decoding is disabled or no logical
     860             :  * slots exist.
     861             :  *
     862             :  * NB: this returns a value >= ReplicationSlotsComputeRequiredLSN(), since it
     863             :  * ignores physical replication slots.
     864             :  *
     865             :  * The results aren't required frequently, so we don't maintain a precomputed
     866             :  * value like we do for ComputeRequiredLSN() and ComputeRequiredXmin().
     867             :  */
     868             : XLogRecPtr
     869        6916 : ReplicationSlotsComputeLogicalRestartLSN(void)
     870             : {
     871        6916 :     XLogRecPtr  result = InvalidXLogRecPtr;
     872             :     int         i;
     873             : 
     874        6916 :     if (max_replication_slots <= 0)
     875           0 :         return InvalidXLogRecPtr;
     876             : 
     877        6916 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     878             : 
     879       75740 :     for (i = 0; i < max_replication_slots; i++)
     880             :     {
     881             :         ReplicationSlot *s;
     882             :         XLogRecPtr  restart_lsn;
     883             : 
     884       68824 :         s = &ReplicationSlotCtl->replication_slots[i];
     885             : 
     886             :         /* cannot change while ReplicationSlotCtlLock is held */
     887       68824 :         if (!s->in_use)
     888       68624 :             continue;
     889             : 
     890             :         /* we're only interested in logical slots */
     891         200 :         if (!SlotIsLogical(s))
     892          88 :             continue;
     893             : 
     894             :         /* read once, it's ok if it increases while we're checking */
     895         112 :         SpinLockAcquire(&s->mutex);
     896         112 :         restart_lsn = s->data.restart_lsn;
     897         112 :         SpinLockRelease(&s->mutex);
     898             : 
     899         112 :         if (restart_lsn == InvalidXLogRecPtr)
     900           0 :             continue;
     901             : 
     902         112 :         if (result == InvalidXLogRecPtr ||
     903             :             restart_lsn < result)
     904         104 :             result = restart_lsn;
     905             :     }
     906             : 
     907        6916 :     LWLockRelease(ReplicationSlotControlLock);
     908             : 
     909        6916 :     return result;
     910             : }
     911             : 
     912             : /*
     913             :  * ReplicationSlotsCountDBSlots -- count the number of slots that refer to the
     914             :  * passed database oid.
     915             :  *
     916             :  * Returns true if there are any slots referencing the database. *nslots will
     917             :  * be set to the absolute number of slots in the database, *nactive to ones
     918             :  * currently active.
     919             :  */
     920             : bool
     921          20 : ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
     922             : {
     923             :     int         i;
     924             : 
     925          20 :     *nslots = *nactive = 0;
     926             : 
     927          20 :     if (max_replication_slots <= 0)
     928           0 :         return false;
     929             : 
     930          20 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     931         206 :     for (i = 0; i < max_replication_slots; i++)
     932             :     {
     933             :         ReplicationSlot *s;
     934             : 
     935         186 :         s = &ReplicationSlotCtl->replication_slots[i];
     936             : 
     937             :         /* cannot change while ReplicationSlotCtlLock is held */
     938         186 :         if (!s->in_use)
     939         172 :             continue;
     940             : 
     941             :         /* only logical slots are database specific, skip */
     942          14 :         if (!SlotIsLogical(s))
     943           2 :             continue;
     944             : 
     945             :         /* not our database, skip */
     946          12 :         if (s->data.database != dboid)
     947           6 :             continue;
     948             : 
     949             :         /* count slots with spinlock held */
     950           6 :         SpinLockAcquire(&s->mutex);
     951           6 :         (*nslots)++;
     952           6 :         if (s->active_pid != 0)
     953           2 :             (*nactive)++;
     954           6 :         SpinLockRelease(&s->mutex);
     955             :     }
     956          20 :     LWLockRelease(ReplicationSlotControlLock);
     957             : 
     958          20 :     if (*nslots > 0)
     959           6 :         return true;
     960          14 :     return false;
     961             : }
     962             : 
     963             : /*
     964             :  * ReplicationSlotsDropDBSlots -- Drop all db-specific slots relating to the
     965             :  * passed database oid. The caller should hold an exclusive lock on the
     966             :  * pg_database oid for the database to prevent creation of new slots on the db
     967             :  * or replay from existing slots.
     968             :  *
     969             :  * Another session that concurrently acquires an existing slot on the target DB
     970             :  * (most likely to drop it) may cause this function to ERROR. If that happens
     971             :  * it may have dropped some but not all slots.
     972             :  *
     973             :  * This routine isn't as efficient as it could be - but we don't drop
     974             :  * databases often, especially databases with lots of slots.
     975             :  */
     976             : void
     977          20 : ReplicationSlotsDropDBSlots(Oid dboid)
     978             : {
     979             :     int         i;
     980             : 
     981          20 :     if (max_replication_slots <= 0)
     982           0 :         return;
     983             : 
     984          20 : restart:
     985          26 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     986         204 :     for (i = 0; i < max_replication_slots; i++)
     987             :     {
     988             :         ReplicationSlot *s;
     989             :         char       *slotname;
     990             :         int         active_pid;
     991             : 
     992         184 :         s = &ReplicationSlotCtl->replication_slots[i];
     993             : 
     994             :         /* cannot change while ReplicationSlotCtlLock is held */
     995         184 :         if (!s->in_use)
     996         164 :             continue;
     997             : 
     998             :         /* only logical slots are database specific, skip */
     999          20 :         if (!SlotIsLogical(s))
    1000           2 :             continue;
    1001             : 
    1002             :         /* not our database, skip */
    1003          18 :         if (s->data.database != dboid)
    1004          12 :             continue;
    1005             : 
    1006             :         /* acquire slot, so ReplicationSlotDropAcquired can be reused  */
    1007           6 :         SpinLockAcquire(&s->mutex);
    1008             :         /* can't change while ReplicationSlotControlLock is held */
    1009           6 :         slotname = NameStr(s->data.name);
    1010           6 :         active_pid = s->active_pid;
    1011           6 :         if (active_pid == 0)
    1012             :         {
    1013           6 :             MyReplicationSlot = s;
    1014           6 :             s->active_pid = MyProcPid;
    1015             :         }
    1016           6 :         SpinLockRelease(&s->mutex);
    1017             : 
    1018             :         /*
    1019             :          * Even though we hold an exclusive lock on the database object a
    1020             :          * logical slot for that DB can still be active, e.g. if it's
    1021             :          * concurrently being dropped by a backend connected to another DB.
    1022             :          *
    1023             :          * That's fairly unlikely in practice, so we'll just bail out.
    1024             :          */
    1025           6 :         if (active_pid)
    1026           0 :             ereport(ERROR,
    1027             :                     (errcode(ERRCODE_OBJECT_IN_USE),
    1028             :                      errmsg("replication slot \"%s\" is active for PID %d",
    1029             :                             slotname, active_pid)));
    1030             : 
    1031             :         /*
    1032             :          * To avoid duplicating ReplicationSlotDropAcquired() and to avoid
    1033             :          * holding ReplicationSlotControlLock over filesystem operations,
    1034             :          * release ReplicationSlotControlLock and use
    1035             :          * ReplicationSlotDropAcquired.
    1036             :          *
    1037             :          * As that means the set of slots could change, restart scan from the
    1038             :          * beginning each time we release the lock.
    1039             :          */
    1040           6 :         LWLockRelease(ReplicationSlotControlLock);
    1041           6 :         ReplicationSlotDropAcquired();
    1042           6 :         goto restart;
    1043             :     }
    1044          20 :     LWLockRelease(ReplicationSlotControlLock);
    1045             : }
    1046             : 
    1047             : 
    1048             : /*
    1049             :  * Check whether the server's configuration supports using replication
    1050             :  * slots.
    1051             :  */
    1052             : void
    1053         990 : CheckSlotRequirements(void)
    1054             : {
    1055             :     /*
    1056             :      * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
    1057             :      * needs the same check.
    1058             :      */
    1059             : 
    1060         990 :     if (max_replication_slots == 0)
    1061           0 :         ereport(ERROR,
    1062             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1063             :                  errmsg("replication slots can only be used if max_replication_slots > 0")));
    1064             : 
    1065         990 :     if (wal_level < WAL_LEVEL_REPLICA)
    1066           0 :         ereport(ERROR,
    1067             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1068             :                  errmsg("replication slots can only be used if wal_level >= replica")));
    1069         990 : }
    1070             : 
    1071             : /*
    1072             :  * Reserve WAL for the currently active slot.
    1073             :  *
    1074             :  * Compute and set restart_lsn in a manner that's appropriate for the type of
    1075             :  * the slot and concurrency safe.
    1076             :  */
    1077             : void
    1078         458 : ReplicationSlotReserveWal(void)
    1079             : {
    1080         458 :     ReplicationSlot *slot = MyReplicationSlot;
    1081             : 
    1082             :     Assert(slot != NULL);
    1083             :     Assert(slot->data.restart_lsn == InvalidXLogRecPtr);
    1084             : 
    1085             :     /*
    1086             :      * The replication slot mechanism is used to prevent removal of required
    1087             :      * WAL. As there is no interlock between this routine and checkpoints, WAL
    1088             :      * segments could concurrently be removed when a now stale return value of
    1089             :      * ReplicationSlotsComputeRequiredLSN() is used. In the unlikely case that
    1090             :      * this happens we'll just retry.
    1091             :      */
    1092             :     while (true)
    1093           0 :     {
    1094             :         XLogSegNo   segno;
    1095             :         XLogRecPtr  restart_lsn;
    1096             : 
    1097             :         /*
    1098             :          * For logical slots log a standby snapshot and start logical decoding
    1099             :          * at exactly that position. That allows the slot to start up more
    1100             :          * quickly.
    1101             :          *
    1102             :          * That's not needed (or indeed helpful) for physical slots as they'll
    1103             :          * start replay at the last logged checkpoint anyway. Instead return
    1104             :          * the location of the last redo LSN. While that slightly increases
    1105             :          * the chance that we have to retry, it's where a base backup has to
    1106             :          * start replay at.
    1107             :          */
    1108         458 :         if (!RecoveryInProgress() && SlotIsLogical(slot))
    1109         318 :         {
    1110             :             XLogRecPtr  flushptr;
    1111             : 
    1112             :             /* start at current insert position */
    1113         318 :             restart_lsn = GetXLogInsertRecPtr();
    1114         318 :             SpinLockAcquire(&slot->mutex);
    1115         318 :             slot->data.restart_lsn = restart_lsn;
    1116         318 :             SpinLockRelease(&slot->mutex);
    1117             : 
    1118             :             /* make sure we have enough information to start */
    1119         318 :             flushptr = LogStandbySnapshot();
    1120             : 
    1121             :             /* and make sure it's fsynced to disk */
    1122         318 :             XLogFlush(flushptr);
    1123             :         }
    1124             :         else
    1125             :         {
    1126         140 :             restart_lsn = GetRedoRecPtr();
    1127         140 :             SpinLockAcquire(&slot->mutex);
    1128         140 :             slot->data.restart_lsn = restart_lsn;
    1129         140 :             SpinLockRelease(&slot->mutex);
    1130             :         }
    1131             : 
    1132             :         /* prevent WAL removal as fast as possible */
    1133         458 :         ReplicationSlotsComputeRequiredLSN();
    1134             : 
    1135             :         /*
    1136             :          * If all required WAL is still there, great, otherwise retry. The
    1137             :          * slot should prevent further removal of WAL, unless there's a
    1138             :          * concurrent ReplicationSlotsComputeRequiredLSN() after we've written
    1139             :          * the new restart_lsn above, so normally we should never need to loop
    1140             :          * more than twice.
    1141             :          */
    1142         458 :         XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size);
    1143         458 :         if (XLogGetLastRemovedSegno() < segno)
    1144         458 :             break;
    1145             :     }
    1146         458 : }
    1147             : 
    1148             : /*
    1149             :  * Mark any slot that points to an LSN older than the given segment
    1150             :  * as invalid; it requires WAL that's about to be removed.
    1151             :  *
    1152             :  * NB - this runs as part of checkpoint, so avoid raising errors if possible.
    1153             :  */
    1154             : void
    1155        3458 : InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno)
    1156             : {
    1157             :     XLogRecPtr  oldestLSN;
    1158             : 
    1159        3458 :     XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
    1160             : 
    1161        3458 : restart:
    1162        3458 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    1163       37870 :     for (int i = 0; i < max_replication_slots; i++)
    1164             :     {
    1165       34412 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
    1166       34412 :         XLogRecPtr  restart_lsn = InvalidXLogRecPtr;
    1167             :         NameData    slotname;
    1168             :         int     wspid;
    1169       34412 :         int     last_signaled_pid = 0;
    1170             : 
    1171       34412 :         if (!s->in_use)
    1172       34412 :             continue;
    1173             : 
    1174         100 :         SpinLockAcquire(&s->mutex);
    1175         100 :         slotname = s->data.name;
    1176         100 :         restart_lsn = s->data.restart_lsn;
    1177         100 :         SpinLockRelease(&s->mutex);
    1178             : 
    1179         100 :         if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= oldestLSN)
    1180         100 :             continue;
    1181           0 :         LWLockRelease(ReplicationSlotControlLock);
    1182           0 :         CHECK_FOR_INTERRUPTS();
    1183             : 
    1184             :         /* Get ready to sleep on the slot in case it is active */
    1185           0 :         ConditionVariablePrepareToSleep(&s->active_cv);
    1186             : 
    1187             :         for (;;)
    1188             :         {
    1189             :             /*
    1190             :              * Try to mark this slot as used by this process.
    1191             :              *
    1192             :              * Note that ReplicationSlotAcquireInternal(SAB_Inquire)
    1193             :              * should not cancel the prepared condition variable
    1194             :              * if this slot is active in other process. Because in this case
    1195             :              * we have to wait on that CV for the process owning
    1196             :              * the slot to be terminated, later.
    1197             :              */
    1198           0 :             wspid = ReplicationSlotAcquireInternal(s, NULL, SAB_Inquire);
    1199             : 
    1200             :             /*
    1201             :              * Exit the loop if we successfully acquired the slot or
    1202             :              * the slot was dropped during waiting for the owning process
    1203             :              * to be terminated. For example, the latter case is likely to
    1204             :              * happen when the slot is temporary because it's automatically
    1205             :              * dropped by the termination of the owning process.
    1206             :              */
    1207           0 :             if (wspid <= 0)
    1208           0 :                 break;
    1209             : 
    1210             :             /*
    1211             :              * Signal to terminate the process that owns the slot.
    1212             :              *
    1213             :              * There is the race condition where other process may own
    1214             :              * the slot after the process using it was terminated and before
    1215             :              * this process owns it. To handle this case, we signal again
    1216             :              * if the PID of the owning process is changed than the last.
    1217             :              *
    1218             :              * XXX This logic assumes that the same PID is not reused
    1219             :              * very quickly.
    1220             :              */
    1221           0 :             if (last_signaled_pid != wspid)
    1222             :             {
    1223           0 :                 ereport(LOG,
    1224             :                         (errmsg("terminating process %d because replication slot \"%s\" is too far behind",
    1225             :                                 wspid, NameStr(slotname))));
    1226           0 :                 (void) kill(wspid, SIGTERM);
    1227           0 :                 last_signaled_pid = wspid;
    1228             :             }
    1229             : 
    1230           0 :             ConditionVariableTimedSleep(&s->active_cv, 10,
    1231             :                                         WAIT_EVENT_REPLICATION_SLOT_DROP);
    1232             :         }
    1233           0 :         ConditionVariableCancelSleep();
    1234             : 
    1235             :         /*
    1236             :          * Do nothing here and start from scratch if the slot has
    1237             :          * already been dropped.
    1238             :          */
    1239           0 :         if (wspid == -1)
    1240           0 :             goto restart;
    1241             : 
    1242           0 :         ereport(LOG,
    1243             :                 (errmsg("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size",
    1244             :                         NameStr(slotname),
    1245             :                         (uint32) (restart_lsn >> 32),
    1246             :                         (uint32) restart_lsn)));
    1247             : 
    1248           0 :         SpinLockAcquire(&s->mutex);
    1249           0 :         s->data.invalidated_at = s->data.restart_lsn;
    1250           0 :         s->data.restart_lsn = InvalidXLogRecPtr;
    1251           0 :         SpinLockRelease(&s->mutex);
    1252             : 
    1253             :         /* Make sure the invalidated state persists across server restart */
    1254           0 :         ReplicationSlotMarkDirty();
    1255           0 :         ReplicationSlotSave();
    1256           0 :         ReplicationSlotRelease();
    1257             : 
    1258             :         /* if we did anything, start from scratch */
    1259           0 :         goto restart;
    1260             :     }
    1261        3458 :     LWLockRelease(ReplicationSlotControlLock);
    1262        3458 : }
    1263             : 
    1264             : /*
    1265             :  * Flush all replication slots to disk.
    1266             :  *
    1267             :  * This needn't actually be part of a checkpoint, but it's a convenient
    1268             :  * location.
    1269             :  */
    1270             : void
    1271        3458 : CheckPointReplicationSlots(void)
    1272             : {
    1273             :     int         i;
    1274             : 
    1275        3458 :     elog(DEBUG1, "performing replication slot checkpoint");
    1276             : 
    1277             :     /*
    1278             :      * Prevent any slot from being created/dropped while we're active. As we
    1279             :      * explicitly do *not* want to block iterating over replication_slots or
    1280             :      * acquiring a slot we cannot take the control lock - but that's OK,
    1281             :      * because holding ReplicationSlotAllocationLock is strictly stronger, and
    1282             :      * enough to guarantee that nobody can change the in_use bits on us.
    1283             :      */
    1284        3458 :     LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
    1285             : 
    1286       37870 :     for (i = 0; i < max_replication_slots; i++)
    1287             :     {
    1288       34412 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
    1289             :         char        path[MAXPGPATH];
    1290             : 
    1291       34412 :         if (!s->in_use)
    1292       34312 :             continue;
    1293             : 
    1294             :         /* save the slot to disk, locking is handled in SaveSlotToPath() */
    1295         100 :         sprintf(path, "pg_replslot/%s", NameStr(s->data.name));
    1296         100 :         SaveSlotToPath(s, path, LOG);
    1297             :     }
    1298        3458 :     LWLockRelease(ReplicationSlotAllocationLock);
    1299        3458 : }
    1300             : 
    1301             : /*
    1302             :  * Load all replication slots from disk into memory at server startup. This
    1303             :  * needs to be run before we start crash recovery.
    1304             :  */
    1305             : void
    1306        1512 : StartupReplicationSlots(void)
    1307             : {
    1308             :     DIR        *replication_dir;
    1309             :     struct dirent *replication_de;
    1310             : 
    1311        1512 :     elog(DEBUG1, "starting up replication slots");
    1312             : 
    1313             :     /* restore all slots by iterating over all on-disk entries */
    1314        1512 :     replication_dir = AllocateDir("pg_replslot");
    1315        4564 :     while ((replication_de = ReadDir(replication_dir, "pg_replslot")) != NULL)
    1316             :     {
    1317             :         struct stat statbuf;
    1318             :         char        path[MAXPGPATH + 12];
    1319             : 
    1320        3052 :         if (strcmp(replication_de->d_name, ".") == 0 ||
    1321        1540 :             strcmp(replication_de->d_name, "..") == 0)
    1322        3024 :             continue;
    1323             : 
    1324          28 :         snprintf(path, sizeof(path), "pg_replslot/%s", replication_de->d_name);
    1325             : 
    1326             :         /* we're only creating directories here, skip if it's not our's */
    1327          28 :         if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
    1328           0 :             continue;
    1329             : 
    1330             :         /* we crashed while a slot was being setup or deleted, clean up */
    1331          28 :         if (pg_str_endswith(replication_de->d_name, ".tmp"))
    1332             :         {
    1333           0 :             if (!rmtree(path, true))
    1334             :             {
    1335           0 :                 ereport(WARNING,
    1336             :                         (errmsg("could not remove directory \"%s\"",
    1337             :                                 path)));
    1338           0 :                 continue;
    1339             :             }
    1340           0 :             fsync_fname("pg_replslot", true);
    1341           0 :             continue;
    1342             :         }
    1343             : 
    1344             :         /* looks like a slot in a normal state, restore */
    1345          28 :         RestoreSlotFromDisk(replication_de->d_name);
    1346             :     }
    1347        1512 :     FreeDir(replication_dir);
    1348             : 
    1349             :     /* currently no slots exist, we're done. */
    1350        1512 :     if (max_replication_slots <= 0)
    1351           0 :         return;
    1352             : 
    1353             :     /* Now that we have recovered all the data, compute replication xmin */
    1354        1512 :     ReplicationSlotsComputeRequiredXmin(false);
    1355        1512 :     ReplicationSlotsComputeRequiredLSN();
    1356             : }
    1357             : 
    1358             : /* ----
    1359             :  * Manipulation of on-disk state of replication slots
    1360             :  *
    1361             :  * NB: none of the routines below should take any notice whether a slot is the
    1362             :  * current one or not, that's all handled a layer above.
    1363             :  * ----
    1364             :  */
    1365             : static void
    1366         502 : CreateSlotOnDisk(ReplicationSlot *slot)
    1367             : {
    1368             :     char        tmppath[MAXPGPATH];
    1369             :     char        path[MAXPGPATH];
    1370             :     struct stat st;
    1371             : 
    1372             :     /*
    1373             :      * No need to take out the io_in_progress_lock, nobody else can see this
    1374             :      * slot yet, so nobody else will write. We're reusing SaveSlotToPath which
    1375             :      * takes out the lock, if we'd take the lock here, we'd deadlock.
    1376             :      */
    1377             : 
    1378         502 :     sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
    1379         502 :     sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));
    1380             : 
    1381             :     /*
    1382             :      * It's just barely possible that some previous effort to create or drop a
    1383             :      * slot with this name left a temp directory lying around. If that seems
    1384             :      * to be the case, try to remove it.  If the rmtree() fails, we'll error
    1385             :      * out at the MakePGDirectory() below, so we don't bother checking
    1386             :      * success.
    1387             :      */
    1388         502 :     if (stat(tmppath, &st) == 0 && S_ISDIR(st.st_mode))
    1389           0 :         rmtree(tmppath, true);
    1390             : 
    1391             :     /* Create and fsync the temporary slot directory. */
    1392         502 :     if (MakePGDirectory(tmppath) < 0)
    1393           0 :         ereport(ERROR,
    1394             :                 (errcode_for_file_access(),
    1395             :                  errmsg("could not create directory \"%s\": %m",
    1396             :                         tmppath)));
    1397         502 :     fsync_fname(tmppath, true);
    1398             : 
    1399             :     /* Write the actual state file. */
    1400         502 :     slot->dirty = true;          /* signal that we really need to write */
    1401         502 :     SaveSlotToPath(slot, tmppath, ERROR);
    1402             : 
    1403             :     /* Rename the directory into place. */
    1404         502 :     if (rename(tmppath, path) != 0)
    1405           0 :         ereport(ERROR,
    1406             :                 (errcode_for_file_access(),
    1407             :                  errmsg("could not rename file \"%s\" to \"%s\": %m",
    1408             :                         tmppath, path)));
    1409             : 
    1410             :     /*
    1411             :      * If we'd now fail - really unlikely - we wouldn't know whether this slot
    1412             :      * would persist after an OS crash or not - so, force a restart. The
    1413             :      * restart would try to fsync this again till it works.
    1414             :      */
    1415         502 :     START_CRIT_SECTION();
    1416             : 
    1417         502 :     fsync_fname(path, true);
    1418         502 :     fsync_fname("pg_replslot", true);
    1419             : 
    1420         502 :     END_CRIT_SECTION();
    1421         502 : }
    1422             : 
    1423             : /*
    1424             :  * Shared functionality between saving and creating a replication slot.
    1425             :  */
    1426             : static void
    1427        1306 : SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
    1428             : {
    1429             :     char        tmppath[MAXPGPATH];
    1430             :     char        path[MAXPGPATH];
    1431             :     int         fd;
    1432             :     ReplicationSlotOnDisk cp;
    1433             :     bool        was_dirty;
    1434             : 
    1435             :     /* first check whether there's something to write out */
    1436        1306 :     SpinLockAcquire(&slot->mutex);
    1437        1306 :     was_dirty = slot->dirty;
    1438        1306 :     slot->just_dirtied = false;
    1439        1306 :     SpinLockRelease(&slot->mutex);
    1440             : 
    1441             :     /* and don't do anything if there's nothing to write */
    1442        1306 :     if (!was_dirty)
    1443          78 :         return;
    1444             : 
    1445        1228 :     LWLockAcquire(&slot->io_in_progress_lock, LW_EXCLUSIVE);
    1446             : 
    1447             :     /* silence valgrind :( */
    1448        1228 :     memset(&cp, 0, sizeof(ReplicationSlotOnDisk));
    1449             : 
    1450        1228 :     sprintf(tmppath, "%s/state.tmp", dir);
    1451        1228 :     sprintf(path, "%s/state", dir);
    1452             : 
    1453        1228 :     fd = OpenTransientFile(tmppath, O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
    1454        1228 :     if (fd < 0)
    1455             :     {
    1456             :         /*
    1457             :          * If not an ERROR, then release the lock before returning.  In case
    1458             :          * of an ERROR, the error recovery path automatically releases the
    1459             :          * lock, but no harm in explicitly releasing even in that case.  Note
    1460             :          * that LWLockRelease() could affect errno.
    1461             :          */
    1462           0 :         int         save_errno = errno;
    1463             : 
    1464           0 :         LWLockRelease(&slot->io_in_progress_lock);
    1465           0 :         errno = save_errno;
    1466           0 :         ereport(elevel,
    1467             :                 (errcode_for_file_access(),
    1468             :                  errmsg("could not create file \"%s\": %m",
    1469             :                         tmppath)));
    1470           0 :         return;
    1471             :     }
    1472             : 
    1473        1228 :     cp.magic = SLOT_MAGIC;
    1474        1228 :     INIT_CRC32C(cp.checksum);
    1475        1228 :     cp.version = SLOT_VERSION;
    1476        1228 :     cp.length = ReplicationSlotOnDiskV2Size;
    1477             : 
    1478        1228 :     SpinLockAcquire(&slot->mutex);
    1479             : 
    1480        1228 :     memcpy(&cp.slotdata, &slot->data, sizeof(ReplicationSlotPersistentData));
    1481             : 
    1482        1228 :     SpinLockRelease(&slot->mutex);
    1483             : 
    1484        1228 :     COMP_CRC32C(cp.checksum,
    1485             :                 (char *) (&cp) + SnapBuildOnDiskNotChecksummedSize,
    1486             :                 SnapBuildOnDiskChecksummedSize);
    1487        1228 :     FIN_CRC32C(cp.checksum);
    1488             : 
    1489        1228 :     errno = 0;
    1490        1228 :     pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_WRITE);
    1491        1228 :     if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
    1492             :     {
    1493           0 :         int         save_errno = errno;
    1494             : 
    1495           0 :         pgstat_report_wait_end();
    1496           0 :         CloseTransientFile(fd);
    1497           0 :         LWLockRelease(&slot->io_in_progress_lock);
    1498             : 
    1499             :         /* if write didn't set errno, assume problem is no disk space */
    1500           0 :         errno = save_errno ? save_errno : ENOSPC;
    1501           0 :         ereport(elevel,
    1502             :                 (errcode_for_file_access(),
    1503             :                  errmsg("could not write to file \"%s\": %m",
    1504             :                         tmppath)));
    1505           0 :         return;
    1506             :     }
    1507        1228 :     pgstat_report_wait_end();
    1508             : 
    1509             :     /* fsync the temporary file */
    1510        1228 :     pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_SYNC);
    1511        1228 :     if (pg_fsync(fd) != 0)
    1512             :     {
    1513           0 :         int         save_errno = errno;
    1514             : 
    1515           0 :         pgstat_report_wait_end();
    1516           0 :         CloseTransientFile(fd);
    1517           0 :         LWLockRelease(&slot->io_in_progress_lock);
    1518           0 :         errno = save_errno;
    1519           0 :         ereport(elevel,
    1520             :                 (errcode_for_file_access(),
    1521             :                  errmsg("could not fsync file \"%s\": %m",
    1522             :                         tmppath)));
    1523           0 :         return;
    1524             :     }
    1525        1228 :     pgstat_report_wait_end();
    1526             : 
    1527        1228 :     if (CloseTransientFile(fd) != 0)
    1528             :     {
    1529           0 :         int         save_errno = errno;
    1530             : 
    1531           0 :         LWLockRelease(&slot->io_in_progress_lock);
    1532           0 :         errno = save_errno;
    1533           0 :         ereport(elevel,
    1534             :                 (errcode_for_file_access(),
    1535             :                  errmsg("could not close file \"%s\": %m",
    1536             :                         tmppath)));
    1537           0 :         return;
    1538             :     }
    1539             : 
    1540             :     /* rename to permanent file, fsync file and directory */
    1541        1228 :     if (rename(tmppath, path) != 0)
    1542             :     {
    1543           0 :         int         save_errno = errno;
    1544             : 
    1545           0 :         LWLockRelease(&slot->io_in_progress_lock);
    1546           0 :         errno = save_errno;
    1547           0 :         ereport(elevel,
    1548             :                 (errcode_for_file_access(),
    1549             :                  errmsg("could not rename file \"%s\" to \"%s\": %m",
    1550             :                         tmppath, path)));
    1551           0 :         return;
    1552             :     }
    1553             : 
    1554             :     /*
    1555             :      * Check CreateSlotOnDisk() for the reasoning of using a critical section.
    1556             :      */
    1557        1228 :     START_CRIT_SECTION();
    1558             : 
    1559        1228 :     fsync_fname(path, false);
    1560        1228 :     fsync_fname(dir, true);
    1561        1228 :     fsync_fname("pg_replslot", true);
    1562             : 
    1563        1228 :     END_CRIT_SECTION();
    1564             : 
    1565             :     /*
    1566             :      * Successfully wrote, unset dirty bit, unless somebody dirtied again
    1567             :      * already.
    1568             :      */
    1569        1228 :     SpinLockAcquire(&slot->mutex);
    1570        1228 :     if (!slot->just_dirtied)
    1571        1228 :         slot->dirty = false;
    1572        1228 :     SpinLockRelease(&slot->mutex);
    1573             : 
    1574        1228 :     LWLockRelease(&slot->io_in_progress_lock);
    1575             : }
    1576             : 
    1577             : /*
    1578             :  * Load a single slot from disk into memory.
    1579             :  */
    1580             : static void
    1581          28 : RestoreSlotFromDisk(const char *name)
    1582             : {
    1583             :     ReplicationSlotOnDisk cp;
    1584             :     int         i;
    1585             :     char        slotdir[MAXPGPATH + 12];
    1586             :     char        path[MAXPGPATH + 22];
    1587             :     int         fd;
    1588          28 :     bool        restored = false;
    1589             :     int         readBytes;
    1590             :     pg_crc32c   checksum;
    1591             : 
    1592             :     /* no need to lock here, no concurrent access allowed yet */
    1593             : 
    1594             :     /* delete temp file if it exists */
    1595          28 :     sprintf(slotdir, "pg_replslot/%s", name);
    1596          28 :     sprintf(path, "%s/state.tmp", slotdir);
    1597          28 :     if (unlink(path) < 0 && errno != ENOENT)
    1598           0 :         ereport(PANIC,
    1599             :                 (errcode_for_file_access(),
    1600             :                  errmsg("could not remove file \"%s\": %m", path)));
    1601             : 
    1602          28 :     sprintf(path, "%s/state", slotdir);
    1603             : 
    1604          28 :     elog(DEBUG1, "restoring replication slot from \"%s\"", path);
    1605             : 
    1606             :     /* on some operating systems fsyncing a file requires O_RDWR */
    1607          28 :     fd = OpenTransientFile(path, O_RDWR | PG_BINARY);
    1608             : 
    1609             :     /*
    1610             :      * We do not need to handle this as we are rename()ing the directory into
    1611             :      * place only after we fsync()ed the state file.
    1612             :      */
    1613          28 :     if (fd < 0)
    1614           0 :         ereport(PANIC,
    1615             :                 (errcode_for_file_access(),
    1616             :                  errmsg("could not open file \"%s\": %m", path)));
    1617             : 
    1618             :     /*
    1619             :      * Sync state file before we're reading from it. We might have crashed
    1620             :      * while it wasn't synced yet and we shouldn't continue on that basis.
    1621             :      */
    1622          28 :     pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_RESTORE_SYNC);
    1623          28 :     if (pg_fsync(fd) != 0)
    1624           0 :         ereport(PANIC,
    1625             :                 (errcode_for_file_access(),
    1626             :                  errmsg("could not fsync file \"%s\": %m",
    1627             :                         path)));
    1628          28 :     pgstat_report_wait_end();
    1629             : 
    1630             :     /* Also sync the parent directory */
    1631          28 :     START_CRIT_SECTION();
    1632          28 :     fsync_fname(slotdir, true);
    1633          28 :     END_CRIT_SECTION();
    1634             : 
    1635             :     /* read part of statefile that's guaranteed to be version independent */
    1636          28 :     pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
    1637          28 :     readBytes = read(fd, &cp, ReplicationSlotOnDiskConstantSize);
    1638          28 :     pgstat_report_wait_end();
    1639          28 :     if (readBytes != ReplicationSlotOnDiskConstantSize)
    1640             :     {
    1641           0 :         if (readBytes < 0)
    1642           0 :             ereport(PANIC,
    1643             :                     (errcode_for_file_access(),
    1644             :                      errmsg("could not read file \"%s\": %m", path)));
    1645             :         else
    1646           0 :             ereport(PANIC,
    1647             :                     (errcode(ERRCODE_DATA_CORRUPTED),
    1648             :                      errmsg("could not read file \"%s\": read %d of %zu",
    1649             :                             path, readBytes,
    1650             :                             (Size) ReplicationSlotOnDiskConstantSize)));
    1651             :     }
    1652             : 
    1653             :     /* verify magic */
    1654          28 :     if (cp.magic != SLOT_MAGIC)
    1655           0 :         ereport(PANIC,
    1656             :                 (errcode(ERRCODE_DATA_CORRUPTED),
    1657             :                  errmsg("replication slot file \"%s\" has wrong magic number: %u instead of %u",
    1658             :                         path, cp.magic, SLOT_MAGIC)));
    1659             : 
    1660             :     /* verify version */
    1661          28 :     if (cp.version != SLOT_VERSION)
    1662           0 :         ereport(PANIC,
    1663             :                 (errcode(ERRCODE_DATA_CORRUPTED),
    1664             :                  errmsg("replication slot file \"%s\" has unsupported version %u",
    1665             :                         path, cp.version)));
    1666             : 
    1667             :     /* boundary check on length */
    1668          28 :     if (cp.length != ReplicationSlotOnDiskV2Size)
    1669           0 :         ereport(PANIC,
    1670             :                 (errcode(ERRCODE_DATA_CORRUPTED),
    1671             :                  errmsg("replication slot file \"%s\" has corrupted length %u",
    1672             :                         path, cp.length)));
    1673             : 
    1674             :     /* Now that we know the size, read the entire file */
    1675          28 :     pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
    1676          56 :     readBytes = read(fd,
    1677             :                      (char *) &cp + ReplicationSlotOnDiskConstantSize,
    1678          28 :                      cp.length);
    1679          28 :     pgstat_report_wait_end();
    1680          28 :     if (readBytes != cp.length)
    1681             :     {
    1682           0 :         if (readBytes < 0)
    1683           0 :             ereport(PANIC,
    1684             :                     (errcode_for_file_access(),
    1685             :                      errmsg("could not read file \"%s\": %m", path)));
    1686             :         else
    1687           0 :             ereport(PANIC,
    1688             :                     (errcode(ERRCODE_DATA_CORRUPTED),
    1689             :                      errmsg("could not read file \"%s\": read %d of %zu",
    1690             :                             path, readBytes, (Size) cp.length)));
    1691             :     }
    1692             : 
    1693          28 :     if (CloseTransientFile(fd) != 0)
    1694           0 :         ereport(PANIC,
    1695             :                 (errcode_for_file_access(),
    1696             :                  errmsg("could not close file \"%s\": %m", path)));
    1697             : 
    1698             :     /* now verify the CRC */
    1699          28 :     INIT_CRC32C(checksum);
    1700          28 :     COMP_CRC32C(checksum,
    1701             :                 (char *) &cp + SnapBuildOnDiskNotChecksummedSize,
    1702             :                 SnapBuildOnDiskChecksummedSize);
    1703          28 :     FIN_CRC32C(checksum);
    1704             : 
    1705          28 :     if (!EQ_CRC32C(checksum, cp.checksum))
    1706           0 :         ereport(PANIC,
    1707             :                 (errmsg("checksum mismatch for replication slot file \"%s\": is %u, should be %u",
    1708             :                         path, checksum, cp.checksum)));
    1709             : 
    1710             :     /*
    1711             :      * If we crashed with an ephemeral slot active, don't restore but delete
    1712             :      * it.
    1713             :      */
    1714          28 :     if (cp.slotdata.persistency != RS_PERSISTENT)
    1715             :     {
    1716           0 :         if (!rmtree(slotdir, true))
    1717             :         {
    1718           0 :             ereport(WARNING,
    1719             :                     (errmsg("could not remove directory \"%s\"",
    1720             :                             slotdir)));
    1721             :         }
    1722           0 :         fsync_fname("pg_replslot", true);
    1723           0 :         return;
    1724             :     }
    1725             : 
    1726             :     /*
    1727             :      * Verify that requirements for the specific slot type are met. That's
    1728             :      * important because if these aren't met we're not guaranteed to retain
    1729             :      * all the necessary resources for the slot.
    1730             :      *
    1731             :      * NB: We have to do so *after* the above checks for ephemeral slots,
    1732             :      * because otherwise a slot that shouldn't exist anymore could prevent
    1733             :      * restarts.
    1734             :      *
    1735             :      * NB: Changing the requirements here also requires adapting
    1736             :      * CheckSlotRequirements() and CheckLogicalDecodingRequirements().
    1737             :      */
    1738          28 :     if (cp.slotdata.database != InvalidOid && wal_level < WAL_LEVEL_LOGICAL)
    1739           0 :         ereport(FATAL,
    1740             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1741             :                  errmsg("logical replication slot \"%s\" exists, but wal_level < logical",
    1742             :                         NameStr(cp.slotdata.name)),
    1743             :                  errhint("Change wal_level to be logical or higher.")));
    1744          28 :     else if (wal_level < WAL_LEVEL_REPLICA)
    1745           0 :         ereport(FATAL,
    1746             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1747             :                  errmsg("physical replication slot \"%s\" exists, but wal_level < replica",
    1748             :                         NameStr(cp.slotdata.name)),
    1749             :                  errhint("Change wal_level to be replica or higher.")));
    1750             : 
    1751             :     /* nothing can be active yet, don't lock anything */
    1752          38 :     for (i = 0; i < max_replication_slots; i++)
    1753             :     {
    1754             :         ReplicationSlot *slot;
    1755             : 
    1756          38 :         slot = &ReplicationSlotCtl->replication_slots[i];
    1757             : 
    1758          38 :         if (slot->in_use)
    1759          10 :             continue;
    1760             : 
    1761             :         /* restore the entire set of persistent data */
    1762          28 :         memcpy(&slot->data, &cp.slotdata,
    1763             :                sizeof(ReplicationSlotPersistentData));
    1764             : 
    1765             :         /* initialize in memory state */
    1766          28 :         slot->effective_xmin = cp.slotdata.xmin;
    1767          28 :         slot->effective_catalog_xmin = cp.slotdata.catalog_xmin;
    1768             : 
    1769          28 :         slot->candidate_catalog_xmin = InvalidTransactionId;
    1770          28 :         slot->candidate_xmin_lsn = InvalidXLogRecPtr;
    1771          28 :         slot->candidate_restart_lsn = InvalidXLogRecPtr;
    1772          28 :         slot->candidate_restart_valid = InvalidXLogRecPtr;
    1773             : 
    1774          28 :         slot->in_use = true;
    1775          28 :         slot->active_pid = 0;
    1776             : 
    1777          28 :         restored = true;
    1778          28 :         break;
    1779             :     }
    1780             : 
    1781          28 :     if (!restored)
    1782           0 :         ereport(FATAL,
    1783             :                 (errmsg("too many replication slots active before shutdown"),
    1784             :                  errhint("Increase max_replication_slots and try again.")));
    1785             : }

Generated by: LCOV version 1.13