LCOV - code coverage report
Current view: top level - src/backend/replication - slot.c (source / functions) Hit Total Coverage
Test: PostgreSQL 13beta1 Lines: 443 554 80.0 %
Date: 2020-06-01 09:07:10 Functions: 26 26 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * slot.c
       4             :  *     Replication slot management.
       5             :  *
       6             :  *
       7             :  * Copyright (c) 2012-2020, PostgreSQL Global Development Group
       8             :  *
       9             :  *
      10             :  * IDENTIFICATION
      11             :  *    src/backend/replication/slot.c
      12             :  *
      13             :  * NOTES
      14             :  *
      15             :  * Replication slots are used to keep state about replication streams
      16             :  * originating from this cluster.  Their primary purpose is to prevent the
      17             :  * premature removal of WAL or of old tuple versions in a manner that would
      18             :  * interfere with replication; they are also useful for monitoring purposes.
      19             :  * Slots need to be permanent (to allow restarts), crash-safe, and allocatable
      20             :  * on standbys (to support cascading setups).  The requirement that slots be
      21             :  * usable on standbys precludes storing them in the system catalogs.
      22             :  *
      23             :  * Each replication slot gets its own directory inside the $PGDATA/pg_replslot
      24             :  * directory. Inside that directory the state file will contain the slot's
      25             :  * own data. Additional data can be stored alongside that file if required.
      26             :  * While the server is running, the state data is also cached in memory for
      27             :  * efficiency.
      28             :  *
      29             :  * ReplicationSlotAllocationLock must be taken in exclusive mode to allocate
      30             :  * or free a slot. ReplicationSlotControlLock must be taken in shared mode
      31             :  * to iterate over the slots, and in exclusive mode to change the in_use flag
      32             :  * of a slot.  The remaining data in each slot is protected by its mutex.
      33             :  *
      34             :  *-------------------------------------------------------------------------
      35             :  */
      36             : 
      37             : #include "postgres.h"
      38             : 
      39             : #include <unistd.h>
      40             : #include <sys/stat.h>
      41             : 
      42             : #include "access/transam.h"
      43             : #include "access/xlog_internal.h"
      44             : #include "common/string.h"
      45             : #include "miscadmin.h"
      46             : #include "pgstat.h"
      47             : #include "replication/slot.h"
      48             : #include "storage/fd.h"
      49             : #include "storage/proc.h"
      50             : #include "storage/procarray.h"
      51             : #include "utils/builtins.h"
      52             : 
      53             : /*
      54             :  * Replication slot on-disk data structure.
      55             :  */
      56             : typedef struct ReplicationSlotOnDisk
      57             : {
      58             :     /* first part of this struct needs to be version independent */
      59             : 
      60             :     /* data not covered by checksum */
      61             :     uint32      magic;
      62             :     pg_crc32c   checksum;
      63             : 
      64             :     /* data covered by checksum */
      65             :     uint32      version;
      66             :     uint32      length;
      67             : 
      68             :     /*
      69             :      * The actual data in the slot that follows can differ based on the above
      70             :      * 'version'.
      71             :      */
      72             : 
      73             :     ReplicationSlotPersistentData slotdata;
      74             : } ReplicationSlotOnDisk;
      75             : 
      76             : /* size of version independent data */
      77             : #define ReplicationSlotOnDiskConstantSize \
      78             :     offsetof(ReplicationSlotOnDisk, slotdata)
      79             : /* size of the part of the slot not covered by the checksum */
      80             : #define SnapBuildOnDiskNotChecksummedSize \
      81             :     offsetof(ReplicationSlotOnDisk, version)
      82             : /* size of the part covered by the checksum */
      83             : #define SnapBuildOnDiskChecksummedSize \
      84             :     sizeof(ReplicationSlotOnDisk) - SnapBuildOnDiskNotChecksummedSize
      85             : /* size of the slot data that is version dependent */
      86             : #define ReplicationSlotOnDiskV2Size \
      87             :     sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
      88             : 
      89             : #define SLOT_MAGIC      0x1051CA1   /* format identifier */
      90             : #define SLOT_VERSION    2       /* version for new files */
      91             : 
      92             : /* Control array for replication slot management */
      93             : ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
      94             : 
      95             : /* My backend's replication slot in the shared memory array */
      96             : ReplicationSlot *MyReplicationSlot = NULL;
      97             : 
      98             : /* GUCs */
      99             : int         max_replication_slots = 0;  /* the maximum number of replication
     100             :                                          * slots */
     101             : 
     102             : static void ReplicationSlotDropAcquired(void);
     103             : static void ReplicationSlotDropPtr(ReplicationSlot *slot);
     104             : 
     105             : /* internal persistency functions */
     106             : static void RestoreSlotFromDisk(const char *name);
     107             : static void CreateSlotOnDisk(ReplicationSlot *slot);
     108             : static void SaveSlotToPath(ReplicationSlot *slot, const char *path, int elevel);
     109             : 
     110             : /*
     111             :  * Report shared-memory space needed by ReplicationSlotsShmemInit.
     112             :  */
     113             : Size
     114        6514 : ReplicationSlotsShmemSize(void)
     115             : {
     116        6514 :     Size        size = 0;
     117             : 
     118        6514 :     if (max_replication_slots == 0)
     119           0 :         return size;
     120             : 
     121        6514 :     size = offsetof(ReplicationSlotCtlData, replication_slots);
     122        6514 :     size = add_size(size,
     123             :                     mul_size(max_replication_slots, sizeof(ReplicationSlot)));
     124             : 
     125        6514 :     return size;
     126             : }
     127             : 
     128             : /*
     129             :  * Allocate and initialize shared memory for replication slots.
     130             :  */
     131             : void
     132        2170 : ReplicationSlotsShmemInit(void)
     133             : {
     134             :     bool        found;
     135             : 
     136        2170 :     if (max_replication_slots == 0)
     137           0 :         return;
     138             : 
     139        2170 :     ReplicationSlotCtl = (ReplicationSlotCtlData *)
     140        2170 :         ShmemInitStruct("ReplicationSlot Ctl", ReplicationSlotsShmemSize(),
     141             :                         &found);
     142             : 
     143        2170 :     if (!found)
     144             :     {
     145             :         int         i;
     146             : 
     147             :         /* First time through, so initialize */
     148        3970 :         MemSet(ReplicationSlotCtl, 0, ReplicationSlotsShmemSize());
     149             : 
     150       22020 :         for (i = 0; i < max_replication_slots; i++)
     151             :         {
     152       19850 :             ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i];
     153             : 
     154             :             /* everything else is zeroed by the memset above */
     155       19850 :             SpinLockInit(&slot->mutex);
     156       19850 :             LWLockInitialize(&slot->io_in_progress_lock,
     157             :                              LWTRANCHE_REPLICATION_SLOT_IO);
     158       19850 :             ConditionVariableInit(&slot->active_cv);
     159             :         }
     160             :     }
     161             : }
     162             : 
     163             : /*
     164             :  * Check whether the passed slot name is valid and report errors at elevel.
     165             :  *
     166             :  * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow
     167             :  * the name to be used as a directory name on every supported OS.
     168             :  *
     169             :  * Returns whether the directory name is valid or not if elevel < ERROR.
     170             :  */
     171             : bool
     172         560 : ReplicationSlotValidateName(const char *name, int elevel)
     173             : {
     174             :     const char *cp;
     175             : 
     176         560 :     if (strlen(name) == 0)
     177             :     {
     178           0 :         ereport(elevel,
     179             :                 (errcode(ERRCODE_INVALID_NAME),
     180             :                  errmsg("replication slot name \"%s\" is too short",
     181             :                         name)));
     182           0 :         return false;
     183             :     }
     184             : 
     185         560 :     if (strlen(name) >= NAMEDATALEN)
     186             :     {
     187           0 :         ereport(elevel,
     188             :                 (errcode(ERRCODE_NAME_TOO_LONG),
     189             :                  errmsg("replication slot name \"%s\" is too long",
     190             :                         name)));
     191           0 :         return false;
     192             :     }
     193             : 
     194        9190 :     for (cp = name; *cp; cp++)
     195             :     {
     196        8632 :         if (!((*cp >= 'a' && *cp <= 'z')
     197        2784 :               || (*cp >= '0' && *cp <= '9')
     198         966 :               || (*cp == '_')))
     199             :         {
     200           2 :             ereport(elevel,
     201             :                     (errcode(ERRCODE_INVALID_NAME),
     202             :                      errmsg("replication slot name \"%s\" contains invalid character",
     203             :                             name),
     204             :                      errhint("Replication slot names may only contain lower case letters, numbers, and the underscore character.")));
     205           0 :             return false;
     206             :         }
     207             :     }
     208         558 :     return true;
     209             : }
     210             : 
     211             : /*
     212             :  * Create a new replication slot and mark it as used by this backend.
     213             :  *
     214             :  * name: Name of the slot
     215             :  * db_specific: logical decoding is db specific; if the slot is going to
     216             :  *     be used for that pass true, otherwise false.
     217             :  */
     218             : void
     219         468 : ReplicationSlotCreate(const char *name, bool db_specific,
     220             :                       ReplicationSlotPersistency persistency)
     221             : {
     222         468 :     ReplicationSlot *slot = NULL;
     223             :     int         i;
     224             : 
     225             :     Assert(MyReplicationSlot == NULL);
     226             : 
     227         468 :     ReplicationSlotValidateName(name, ERROR);
     228             : 
     229             :     /*
     230             :      * If some other backend ran this code concurrently with us, we'd likely
     231             :      * both allocate the same slot, and that would be bad.  We'd also be at
     232             :      * risk of missing a name collision.  Also, we don't want to try to create
     233             :      * a new slot while somebody's busy cleaning up an old one, because we
     234             :      * might both be monkeying with the same directory.
     235             :      */
     236         466 :     LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
     237             : 
     238             :     /*
     239             :      * Check for name collision, and identify an allocatable slot.  We need to
     240             :      * hold ReplicationSlotControlLock in shared mode for this, so that nobody
     241             :      * else can change the in_use flags while we're looking at them.
     242             :      */
     243         466 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     244        2752 :     for (i = 0; i < max_replication_slots; i++)
     245             :     {
     246        2292 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
     247             : 
     248        2292 :         if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
     249           6 :             ereport(ERROR,
     250             :                     (errcode(ERRCODE_DUPLICATE_OBJECT),
     251             :                      errmsg("replication slot \"%s\" already exists", name)));
     252        2286 :         if (!s->in_use && slot == NULL)
     253         458 :             slot = s;
     254             :     }
     255         460 :     LWLockRelease(ReplicationSlotControlLock);
     256             : 
     257             :     /* If all slots are in use, we're out of luck. */
     258         460 :     if (slot == NULL)
     259           2 :         ereport(ERROR,
     260             :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     261             :                  errmsg("all replication slots are in use"),
     262             :                  errhint("Free one or increase max_replication_slots.")));
     263             : 
     264             :     /*
     265             :      * Since this slot is not in use, nobody should be looking at any part of
     266             :      * it other than the in_use field unless they're trying to allocate it.
     267             :      * And since we hold ReplicationSlotAllocationLock, nobody except us can
     268             :      * be doing that.  So it's safe to initialize the slot.
     269             :      */
     270             :     Assert(!slot->in_use);
     271             :     Assert(slot->active_pid == 0);
     272             : 
     273             :     /* first initialize persistent data */
     274         458 :     memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData));
     275         458 :     StrNCpy(NameStr(slot->data.name), name, NAMEDATALEN);
     276         458 :     slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
     277         458 :     slot->data.persistency = persistency;
     278             : 
     279             :     /* and then data only present in shared memory */
     280         458 :     slot->just_dirtied = false;
     281         458 :     slot->dirty = false;
     282         458 :     slot->effective_xmin = InvalidTransactionId;
     283         458 :     slot->effective_catalog_xmin = InvalidTransactionId;
     284         458 :     slot->candidate_catalog_xmin = InvalidTransactionId;
     285         458 :     slot->candidate_xmin_lsn = InvalidXLogRecPtr;
     286         458 :     slot->candidate_restart_valid = InvalidXLogRecPtr;
     287         458 :     slot->candidate_restart_lsn = InvalidXLogRecPtr;
     288             : 
     289             :     /*
     290             :      * Create the slot on disk.  We haven't actually marked the slot allocated
     291             :      * yet, so no special cleanup is required if this errors out.
     292             :      */
     293         458 :     CreateSlotOnDisk(slot);
     294             : 
     295             :     /*
     296             :      * We need to briefly prevent any other backend from iterating over the
     297             :      * slots while we flip the in_use flag. We also need to set the active
     298             :      * flag while holding the ControlLock as otherwise a concurrent
     299             :      * ReplicationSlotAcquire() could acquire the slot as well.
     300             :      */
     301         458 :     LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
     302             : 
     303         458 :     slot->in_use = true;
     304             : 
     305             :     /* We can now mark the slot active, and that makes it our slot. */
     306         458 :     SpinLockAcquire(&slot->mutex);
     307             :     Assert(slot->active_pid == 0);
     308         458 :     slot->active_pid = MyProcPid;
     309         458 :     SpinLockRelease(&slot->mutex);
     310         458 :     MyReplicationSlot = slot;
     311             : 
     312         458 :     LWLockRelease(ReplicationSlotControlLock);
     313             : 
     314             :     /*
     315             :      * Now that the slot has been marked as in_use and active, it's safe to
     316             :      * let somebody else try to allocate a slot.
     317             :      */
     318         458 :     LWLockRelease(ReplicationSlotAllocationLock);
     319             : 
     320             :     /* Let everybody know we've modified this slot */
     321         458 :     ConditionVariableBroadcast(&slot->active_cv);
     322         458 : }
     323             : 
     324             : /*
     325             :  * Find a previously created slot and mark it as used by this backend.
     326             :  *
     327             :  * The return value is only useful if behavior is SAB_Inquire, in which
     328             :  * it's zero if we successfully acquired the slot, or the PID of the
     329             :  * owning process otherwise.  If behavior is SAB_Error, then trying to
     330             :  * acquire an owned slot is an error.  If SAB_Block, we sleep until the
     331             :  * slot is released by the owning process.
     332             :  */
     333             : int
     334         628 : ReplicationSlotAcquire(const char *name, SlotAcquireBehavior behavior)
     335             : {
     336             :     ReplicationSlot *slot;
     337             :     int         active_pid;
     338             :     int         i;
     339             : 
     340         628 : retry:
     341             :     Assert(MyReplicationSlot == NULL);
     342             : 
     343             :     /*
     344             :      * Search for the named slot and mark it active if we find it.  If the
     345             :      * slot is already active, we exit the loop with active_pid set to the PID
     346             :      * of the backend that owns it.
     347             :      */
     348         628 :     active_pid = 0;
     349         628 :     slot = NULL;
     350         628 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     351         764 :     for (i = 0; i < max_replication_slots; i++)
     352             :     {
     353         750 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
     354             : 
     355         750 :         if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
     356             :         {
     357             :             /*
     358             :              * This is the slot we want; check if it's active under some other
     359             :              * process.  In single user mode, we don't need this check.
     360             :              */
     361         614 :             if (IsUnderPostmaster)
     362             :             {
     363             :                 /*
     364             :                  * Get ready to sleep on it in case it is active.  (We may end
     365             :                  * up not sleeping, but we don't want to do this while holding
     366             :                  * the spinlock.)
     367             :                  */
     368         614 :                 ConditionVariablePrepareToSleep(&s->active_cv);
     369             : 
     370         614 :                 SpinLockAcquire(&s->mutex);
     371             : 
     372         614 :                 active_pid = s->active_pid;
     373         614 :                 if (active_pid == 0)
     374         472 :                     active_pid = s->active_pid = MyProcPid;
     375             : 
     376         614 :                 SpinLockRelease(&s->mutex);
     377             :             }
     378             :             else
     379           0 :                 active_pid = MyProcPid;
     380         614 :             slot = s;
     381             : 
     382         614 :             break;
     383             :         }
     384             :     }
     385         628 :     LWLockRelease(ReplicationSlotControlLock);
     386             : 
     387             :     /* If we did not find the slot, error out. */
     388         628 :     if (slot == NULL)
     389          14 :         ereport(ERROR,
     390             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
     391             :                  errmsg("replication slot \"%s\" does not exist", name)));
     392             : 
     393             :     /*
     394             :      * If we found the slot but it's already active in another backend, we
     395             :      * either error out or retry after a short wait, as caller specified.
     396             :      */
     397         614 :     if (active_pid != MyProcPid)
     398             :     {
     399           0 :         if (behavior == SAB_Error)
     400           0 :             ereport(ERROR,
     401             :                     (errcode(ERRCODE_OBJECT_IN_USE),
     402             :                      errmsg("replication slot \"%s\" is active for PID %d",
     403             :                             name, active_pid)));
     404           0 :         else if (behavior == SAB_Inquire)
     405           0 :             return active_pid;
     406             : 
     407             :         /* Wait here until we get signaled, and then restart */
     408           0 :         ConditionVariableSleep(&slot->active_cv,
     409             :                                WAIT_EVENT_REPLICATION_SLOT_DROP);
     410           0 :         ConditionVariableCancelSleep();
     411           0 :         goto retry;
     412             :     }
     413             :     else
     414         614 :         ConditionVariableCancelSleep(); /* no sleep needed after all */
     415             : 
     416             :     /* Let everybody know we've modified this slot */
     417         614 :     ConditionVariableBroadcast(&slot->active_cv);
     418             : 
     419             :     /* We made this slot active, so it's ours now. */
     420         614 :     MyReplicationSlot = slot;
     421             : 
     422             :     /* success */
     423         614 :     return 0;
     424             : }
     425             : 
     426             : /*
     427             :  * Release the replication slot that this backend considers to own.
     428             :  *
     429             :  * This or another backend can re-acquire the slot later.
     430             :  * Resources this slot requires will be preserved.
     431             :  */
     432             : void
     433         924 : ReplicationSlotRelease(void)
     434             : {
     435         924 :     ReplicationSlot *slot = MyReplicationSlot;
     436             : 
     437             :     Assert(slot != NULL && slot->active_pid != 0);
     438             : 
     439         924 :     if (slot->data.persistency == RS_EPHEMERAL)
     440             :     {
     441             :         /*
     442             :          * Delete the slot. There is no !PANIC case where this is allowed to
     443             :          * fail, all that may happen is an incomplete cleanup of the on-disk
     444             :          * data.
     445             :          */
     446           6 :         ReplicationSlotDropAcquired();
     447             :     }
     448             : 
     449             :     /*
     450             :      * If slot needed to temporarily restrain both data and catalog xmin to
     451             :      * create the catalog snapshot, remove that temporary constraint.
     452             :      * Snapshots can only be exported while the initial snapshot is still
     453             :      * acquired.
     454             :      */
     455         924 :     if (!TransactionIdIsValid(slot->data.xmin) &&
     456         924 :         TransactionIdIsValid(slot->effective_xmin))
     457             :     {
     458         102 :         SpinLockAcquire(&slot->mutex);
     459         102 :         slot->effective_xmin = InvalidTransactionId;
     460         102 :         SpinLockRelease(&slot->mutex);
     461         102 :         ReplicationSlotsComputeRequiredXmin(false);
     462             :     }
     463             : 
     464         924 :     if (slot->data.persistency == RS_PERSISTENT)
     465             :     {
     466             :         /*
     467             :          * Mark persistent slot inactive.  We're not freeing it, just
     468             :          * disconnecting, but wake up others that may be waiting for it.
     469             :          */
     470         530 :         SpinLockAcquire(&slot->mutex);
     471         530 :         slot->active_pid = 0;
     472         530 :         SpinLockRelease(&slot->mutex);
     473         530 :         ConditionVariableBroadcast(&slot->active_cv);
     474             :     }
     475             : 
     476         924 :     MyReplicationSlot = NULL;
     477             : 
     478             :     /* might not have been set when we've been a plain slot */
     479         924 :     LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
     480         924 :     MyPgXact->vacuumFlags &= ~PROC_IN_LOGICAL_DECODING;
     481         924 :     LWLockRelease(ProcArrayLock);
     482         924 : }
     483             : 
     484             : /*
     485             :  * Cleanup all temporary slots created in current session.
     486             :  */
     487             : void
     488       30298 : ReplicationSlotCleanup(void)
     489             : {
     490             :     int         i;
     491             : 
     492             :     Assert(MyReplicationSlot == NULL);
     493             : 
     494       30298 : restart:
     495       30298 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     496      313074 :     for (i = 0; i < max_replication_slots; i++)
     497             :     {
     498      283022 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
     499             : 
     500      283022 :         if (!s->in_use)
     501      280884 :             continue;
     502             : 
     503        2138 :         SpinLockAcquire(&s->mutex);
     504        2138 :         if (s->active_pid == MyProcPid)
     505             :         {
     506             :             Assert(s->data.persistency == RS_TEMPORARY);
     507         246 :             SpinLockRelease(&s->mutex);
     508         246 :             LWLockRelease(ReplicationSlotControlLock);  /* avoid deadlock */
     509             : 
     510         246 :             ReplicationSlotDropPtr(s);
     511             : 
     512         246 :             ConditionVariableBroadcast(&s->active_cv);
     513         246 :             goto restart;
     514             :         }
     515             :         else
     516        1892 :             SpinLockRelease(&s->mutex);
     517             :     }
     518             : 
     519       30052 :     LWLockRelease(ReplicationSlotControlLock);
     520       30052 : }
     521             : 
     522             : /*
     523             :  * Permanently drop replication slot identified by the passed in name.
     524             :  */
     525             : void
     526         158 : ReplicationSlotDrop(const char *name, bool nowait)
     527             : {
     528             :     Assert(MyReplicationSlot == NULL);
     529             : 
     530         158 :     (void) ReplicationSlotAcquire(name, nowait ? SAB_Error : SAB_Block);
     531             : 
     532         148 :     ReplicationSlotDropAcquired();
     533         148 : }
     534             : 
     535             : /*
     536             :  * Permanently drop the currently acquired replication slot.
     537             :  */
     538             : static void
     539         160 : ReplicationSlotDropAcquired(void)
     540             : {
     541         160 :     ReplicationSlot *slot = MyReplicationSlot;
     542             : 
     543             :     Assert(MyReplicationSlot != NULL);
     544             : 
     545             :     /* slot isn't acquired anymore */
     546         160 :     MyReplicationSlot = NULL;
     547             : 
     548         160 :     ReplicationSlotDropPtr(slot);
     549         160 : }
     550             : 
     551             : /*
     552             :  * Permanently drop the replication slot which will be released by the point
     553             :  * this function returns.
     554             :  */
     555             : static void
     556         406 : ReplicationSlotDropPtr(ReplicationSlot *slot)
     557             : {
     558             :     char        path[MAXPGPATH];
     559             :     char        tmppath[MAXPGPATH];
     560             : 
     561             :     /*
     562             :      * If some other backend ran this code concurrently with us, we might try
     563             :      * to delete a slot with a certain name while someone else was trying to
     564             :      * create a slot with the same name.
     565             :      */
     566         406 :     LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
     567             : 
     568             :     /* Generate pathnames. */
     569         406 :     sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
     570         406 :     sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));
     571             : 
     572             :     /*
     573             :      * Rename the slot directory on disk, so that we'll no longer recognize
     574             :      * this as a valid slot.  Note that if this fails, we've got to mark the
     575             :      * slot inactive before bailing out.  If we're dropping an ephemeral or a
     576             :      * temporary slot, we better never fail hard as the caller won't expect
     577             :      * the slot to survive and this might get called during error handling.
     578             :      */
     579         406 :     if (rename(path, tmppath) == 0)
     580             :     {
     581             :         /*
     582             :          * We need to fsync() the directory we just renamed and its parent to
     583             :          * make sure that our changes are on disk in a crash-safe fashion.  If
     584             :          * fsync() fails, we can't be sure whether the changes are on disk or
     585             :          * not.  For now, we handle that by panicking;
     586             :          * StartupReplicationSlots() will try to straighten it out after
     587             :          * restart.
     588             :          */
     589         406 :         START_CRIT_SECTION();
     590         406 :         fsync_fname(tmppath, true);
     591         406 :         fsync_fname("pg_replslot", true);
     592         406 :         END_CRIT_SECTION();
     593             :     }
     594             :     else
     595             :     {
     596           0 :         bool        fail_softly = slot->data.persistency != RS_PERSISTENT;
     597             : 
     598           0 :         SpinLockAcquire(&slot->mutex);
     599           0 :         slot->active_pid = 0;
     600           0 :         SpinLockRelease(&slot->mutex);
     601             : 
     602             :         /* wake up anyone waiting on this slot */
     603           0 :         ConditionVariableBroadcast(&slot->active_cv);
     604             : 
     605           0 :         ereport(fail_softly ? WARNING : ERROR,
     606             :                 (errcode_for_file_access(),
     607             :                  errmsg("could not rename file \"%s\" to \"%s\": %m",
     608             :                         path, tmppath)));
     609             :     }
     610             : 
     611             :     /*
     612             :      * The slot is definitely gone.  Lock out concurrent scans of the array
     613             :      * long enough to kill it.  It's OK to clear the active PID here without
     614             :      * grabbing the mutex because nobody else can be scanning the array here,
     615             :      * and nobody can be attached to this slot and thus access it without
     616             :      * scanning the array.
     617             :      *
     618             :      * Also wake up processes waiting for it.
     619             :      */
     620         406 :     LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
     621         406 :     slot->active_pid = 0;
     622         406 :     slot->in_use = false;
     623         406 :     LWLockRelease(ReplicationSlotControlLock);
     624         406 :     ConditionVariableBroadcast(&slot->active_cv);
     625             : 
     626             :     /*
     627             :      * Slot is dead and doesn't prevent resource removal anymore, recompute
     628             :      * limits.
     629             :      */
     630         406 :     ReplicationSlotsComputeRequiredXmin(false);
     631         406 :     ReplicationSlotsComputeRequiredLSN();
     632             : 
     633             :     /*
     634             :      * If removing the directory fails, the worst thing that will happen is
     635             :      * that the user won't be able to create a new slot with the same name
     636             :      * until the next server restart.  We warn about it, but that's all.
     637             :      */
     638         406 :     if (!rmtree(tmppath, true))
     639           0 :         ereport(WARNING,
     640             :                 (errmsg("could not remove directory \"%s\"", tmppath)));
     641             : 
     642             :     /*
     643             :      * We release this at the very end, so that nobody starts trying to create
     644             :      * a slot while we're still cleaning up the detritus of the old one.
     645             :      */
     646         406 :     LWLockRelease(ReplicationSlotAllocationLock);
     647         406 : }
     648             : 
     649             : /*
     650             :  * Serialize the currently acquired slot's state from memory to disk, thereby
     651             :  * guaranteeing the current state will survive a crash.
     652             :  */
     653             : void
     654         634 : ReplicationSlotSave(void)
     655             : {
     656             :     char        path[MAXPGPATH];
     657             : 
     658             :     Assert(MyReplicationSlot != NULL);
     659             : 
     660         634 :     sprintf(path, "pg_replslot/%s", NameStr(MyReplicationSlot->data.name));
     661         634 :     SaveSlotToPath(MyReplicationSlot, path, ERROR);
     662         634 : }
     663             : 
     664             : /*
     665             :  * Signal that it would be useful if the currently acquired slot would be
     666             :  * flushed out to disk.
     667             :  *
     668             :  * Note that the actual flush to disk can be delayed for a long time, if
     669             :  * required for correctness explicitly do a ReplicationSlotSave().
     670             :  */
     671             : void
     672        1278 : ReplicationSlotMarkDirty(void)
     673             : {
     674        1278 :     ReplicationSlot *slot = MyReplicationSlot;
     675             : 
     676             :     Assert(MyReplicationSlot != NULL);
     677             : 
     678        1278 :     SpinLockAcquire(&slot->mutex);
     679        1278 :     MyReplicationSlot->just_dirtied = true;
     680        1278 :     MyReplicationSlot->dirty = true;
     681        1278 :     SpinLockRelease(&slot->mutex);
     682        1278 : }
     683             : 
     684             : /*
     685             :  * Convert a slot that's marked as RS_EPHEMERAL to a RS_PERSISTENT slot,
     686             :  * guaranteeing it will be there after an eventual crash.
     687             :  */
     688             : void
     689         172 : ReplicationSlotPersist(void)
     690             : {
     691         172 :     ReplicationSlot *slot = MyReplicationSlot;
     692             : 
     693             :     Assert(slot != NULL);
     694             :     Assert(slot->data.persistency != RS_PERSISTENT);
     695             : 
     696         172 :     SpinLockAcquire(&slot->mutex);
     697         172 :     slot->data.persistency = RS_PERSISTENT;
     698         172 :     SpinLockRelease(&slot->mutex);
     699             : 
     700         172 :     ReplicationSlotMarkDirty();
     701         172 :     ReplicationSlotSave();
     702         172 : }
     703             : 
     704             : /*
     705             :  * Compute the oldest xmin across all slots and store it in the ProcArray.
     706             :  *
     707             :  * If already_locked is true, ProcArrayLock has already been acquired
     708             :  * exclusively.
     709             :  */
     710             : void
     711        2262 : ReplicationSlotsComputeRequiredXmin(bool already_locked)
     712             : {
     713             :     int         i;
     714        2262 :     TransactionId agg_xmin = InvalidTransactionId;
     715        2262 :     TransactionId agg_catalog_xmin = InvalidTransactionId;
     716             : 
     717             :     Assert(ReplicationSlotCtl != NULL);
     718             : 
     719        2262 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     720             : 
     721       18772 :     for (i = 0; i < max_replication_slots; i++)
     722             :     {
     723       16510 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
     724             :         TransactionId effective_xmin;
     725             :         TransactionId effective_catalog_xmin;
     726             : 
     727       16510 :         if (!s->in_use)
     728       15410 :             continue;
     729             : 
     730        1100 :         SpinLockAcquire(&s->mutex);
     731        1100 :         effective_xmin = s->effective_xmin;
     732        1100 :         effective_catalog_xmin = s->effective_catalog_xmin;
     733        1100 :         SpinLockRelease(&s->mutex);
     734             : 
     735             :         /* check the data xmin */
     736        1100 :         if (TransactionIdIsValid(effective_xmin) &&
     737           0 :             (!TransactionIdIsValid(agg_xmin) ||
     738           0 :              TransactionIdPrecedes(effective_xmin, agg_xmin)))
     739         120 :             agg_xmin = effective_xmin;
     740             : 
     741             :         /* check the catalog xmin */
     742        1100 :         if (TransactionIdIsValid(effective_catalog_xmin) &&
     743         416 :             (!TransactionIdIsValid(agg_catalog_xmin) ||
     744         416 :              TransactionIdPrecedes(effective_catalog_xmin, agg_catalog_xmin)))
     745         572 :             agg_catalog_xmin = effective_catalog_xmin;
     746             :     }
     747             : 
     748        2262 :     LWLockRelease(ReplicationSlotControlLock);
     749             : 
     750        2262 :     ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
     751        2262 : }
     752             : 
     753             : /*
     754             :  * Compute the oldest restart LSN across all slots and inform xlog module.
     755             :  *
     756             :  * Note: while max_slot_wal_keep_size is theoretically relevant for this
     757             :  * purpose, we don't try to account for that, because this module doesn't
     758             :  * know what to compare against.
     759             :  */
     760             : void
     761        2502 : ReplicationSlotsComputeRequiredLSN(void)
     762             : {
     763             :     int         i;
     764        2502 :     XLogRecPtr  min_required = InvalidXLogRecPtr;
     765             : 
     766             :     Assert(ReplicationSlotCtl != NULL);
     767             : 
     768        2502 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     769       20510 :     for (i = 0; i < max_replication_slots; i++)
     770             :     {
     771       18008 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
     772             :         XLogRecPtr  restart_lsn;
     773             : 
     774       18008 :         if (!s->in_use)
     775       16780 :             continue;
     776             : 
     777        1228 :         SpinLockAcquire(&s->mutex);
     778        1228 :         restart_lsn = s->data.restart_lsn;
     779        1228 :         SpinLockRelease(&s->mutex);
     780             : 
     781        1228 :         if (restart_lsn != InvalidXLogRecPtr &&
     782         328 :             (min_required == InvalidXLogRecPtr ||
     783             :              restart_lsn < min_required))
     784         894 :             min_required = restart_lsn;
     785             :     }
     786        2502 :     LWLockRelease(ReplicationSlotControlLock);
     787             : 
     788        2502 :     XLogSetReplicationSlotMinimumLSN(min_required);
     789        2502 : }
     790             : 
     791             : /*
     792             :  * Compute the oldest WAL LSN required by *logical* decoding slots..
     793             :  *
     794             :  * Returns InvalidXLogRecPtr if logical decoding is disabled or no logical
     795             :  * slots exist.
     796             :  *
     797             :  * NB: this returns a value >= ReplicationSlotsComputeRequiredLSN(), since it
     798             :  * ignores physical replication slots.
     799             :  *
     800             :  * The results aren't required frequently, so we don't maintain a precomputed
     801             :  * value like we do for ComputeRequiredLSN() and ComputeRequiredXmin().
     802             :  */
     803             : XLogRecPtr
     804        6344 : ReplicationSlotsComputeLogicalRestartLSN(void)
     805             : {
     806        6344 :     XLogRecPtr  result = InvalidXLogRecPtr;
     807             :     int         i;
     808             : 
     809        6344 :     if (max_replication_slots <= 0)
     810           0 :         return InvalidXLogRecPtr;
     811             : 
     812        6344 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     813             : 
     814       66768 :     for (i = 0; i < max_replication_slots; i++)
     815             :     {
     816             :         ReplicationSlot *s;
     817             :         XLogRecPtr  restart_lsn;
     818             : 
     819       60424 :         s = &ReplicationSlotCtl->replication_slots[i];
     820             : 
     821             :         /* cannot change while ReplicationSlotCtlLock is held */
     822       60424 :         if (!s->in_use)
     823       60240 :             continue;
     824             : 
     825             :         /* we're only interested in logical slots */
     826         184 :         if (!SlotIsLogical(s))
     827          92 :             continue;
     828             : 
     829             :         /* read once, it's ok if it increases while we're checking */
     830          92 :         SpinLockAcquire(&s->mutex);
     831          92 :         restart_lsn = s->data.restart_lsn;
     832          92 :         SpinLockRelease(&s->mutex);
     833             : 
     834          92 :         if (restart_lsn == InvalidXLogRecPtr)
     835           0 :             continue;
     836             : 
     837          92 :         if (result == InvalidXLogRecPtr ||
     838             :             restart_lsn < result)
     839          84 :             result = restart_lsn;
     840             :     }
     841             : 
     842        6344 :     LWLockRelease(ReplicationSlotControlLock);
     843             : 
     844        6344 :     return result;
     845             : }
     846             : 
     847             : /*
     848             :  * ReplicationSlotsCountDBSlots -- count the number of slots that refer to the
     849             :  * passed database oid.
     850             :  *
     851             :  * Returns true if there are any slots referencing the database. *nslots will
     852             :  * be set to the absolute number of slots in the database, *nactive to ones
     853             :  * currently active.
     854             :  */
     855             : bool
     856          20 : ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
     857             : {
     858             :     int         i;
     859             : 
     860          20 :     *nslots = *nactive = 0;
     861             : 
     862          20 :     if (max_replication_slots <= 0)
     863           0 :         return false;
     864             : 
     865          20 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     866         186 :     for (i = 0; i < max_replication_slots; i++)
     867             :     {
     868             :         ReplicationSlot *s;
     869             : 
     870         166 :         s = &ReplicationSlotCtl->replication_slots[i];
     871             : 
     872             :         /* cannot change while ReplicationSlotCtlLock is held */
     873         166 :         if (!s->in_use)
     874         152 :             continue;
     875             : 
     876             :         /* only logical slots are database specific, skip */
     877          14 :         if (!SlotIsLogical(s))
     878           2 :             continue;
     879             : 
     880             :         /* not our database, skip */
     881          12 :         if (s->data.database != dboid)
     882           6 :             continue;
     883             : 
     884             :         /* count slots with spinlock held */
     885           6 :         SpinLockAcquire(&s->mutex);
     886           6 :         (*nslots)++;
     887           6 :         if (s->active_pid != 0)
     888           2 :             (*nactive)++;
     889           6 :         SpinLockRelease(&s->mutex);
     890             :     }
     891          20 :     LWLockRelease(ReplicationSlotControlLock);
     892             : 
     893          20 :     if (*nslots > 0)
     894           6 :         return true;
     895          14 :     return false;
     896             : }
     897             : 
     898             : /*
     899             :  * ReplicationSlotsDropDBSlots -- Drop all db-specific slots relating to the
     900             :  * passed database oid. The caller should hold an exclusive lock on the
     901             :  * pg_database oid for the database to prevent creation of new slots on the db
     902             :  * or replay from existing slots.
     903             :  *
     904             :  * Another session that concurrently acquires an existing slot on the target DB
     905             :  * (most likely to drop it) may cause this function to ERROR. If that happens
     906             :  * it may have dropped some but not all slots.
     907             :  *
     908             :  * This routine isn't as efficient as it could be - but we don't drop
     909             :  * databases often, especially databases with lots of slots.
     910             :  */
     911             : void
     912          20 : ReplicationSlotsDropDBSlots(Oid dboid)
     913             : {
     914             :     int         i;
     915             : 
     916          20 :     if (max_replication_slots <= 0)
     917           0 :         return;
     918             : 
     919          20 : restart:
     920          26 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     921         194 :     for (i = 0; i < max_replication_slots; i++)
     922             :     {
     923             :         ReplicationSlot *s;
     924             :         char       *slotname;
     925             :         int         active_pid;
     926             : 
     927         174 :         s = &ReplicationSlotCtl->replication_slots[i];
     928             : 
     929             :         /* cannot change while ReplicationSlotCtlLock is held */
     930         174 :         if (!s->in_use)
     931         154 :             continue;
     932             : 
     933             :         /* only logical slots are database specific, skip */
     934          20 :         if (!SlotIsLogical(s))
     935           2 :             continue;
     936             : 
     937             :         /* not our database, skip */
     938          18 :         if (s->data.database != dboid)
     939          12 :             continue;
     940             : 
     941             :         /* acquire slot, so ReplicationSlotDropAcquired can be reused  */
     942           6 :         SpinLockAcquire(&s->mutex);
     943             :         /* can't change while ReplicationSlotControlLock is held */
     944           6 :         slotname = NameStr(s->data.name);
     945           6 :         active_pid = s->active_pid;
     946           6 :         if (active_pid == 0)
     947             :         {
     948           6 :             MyReplicationSlot = s;
     949           6 :             s->active_pid = MyProcPid;
     950             :         }
     951           6 :         SpinLockRelease(&s->mutex);
     952             : 
     953             :         /*
     954             :          * Even though we hold an exclusive lock on the database object a
     955             :          * logical slot for that DB can still be active, e.g. if it's
     956             :          * concurrently being dropped by a backend connected to another DB.
     957             :          *
     958             :          * That's fairly unlikely in practice, so we'll just bail out.
     959             :          */
     960           6 :         if (active_pid)
     961           0 :             ereport(ERROR,
     962             :                     (errcode(ERRCODE_OBJECT_IN_USE),
     963             :                      errmsg("replication slot \"%s\" is active for PID %d",
     964             :                             slotname, active_pid)));
     965             : 
     966             :         /*
     967             :          * To avoid duplicating ReplicationSlotDropAcquired() and to avoid
     968             :          * holding ReplicationSlotControlLock over filesystem operations,
     969             :          * release ReplicationSlotControlLock and use
     970             :          * ReplicationSlotDropAcquired.
     971             :          *
     972             :          * As that means the set of slots could change, restart scan from the
     973             :          * beginning each time we release the lock.
     974             :          */
     975           6 :         LWLockRelease(ReplicationSlotControlLock);
     976           6 :         ReplicationSlotDropAcquired();
     977           6 :         goto restart;
     978             :     }
     979          20 :     LWLockRelease(ReplicationSlotControlLock);
     980             : }
     981             : 
     982             : 
     983             : /*
     984             :  * Check whether the server's configuration supports using replication
     985             :  * slots.
     986             :  */
     987             : void
     988         792 : CheckSlotRequirements(void)
     989             : {
     990             :     /*
     991             :      * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
     992             :      * needs the same check.
     993             :      */
     994             : 
     995         792 :     if (max_replication_slots == 0)
     996           0 :         ereport(ERROR,
     997             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     998             :                  errmsg("replication slots can only be used if max_replication_slots > 0")));
     999             : 
    1000         792 :     if (wal_level < WAL_LEVEL_REPLICA)
    1001           0 :         ereport(ERROR,
    1002             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1003             :                  errmsg("replication slots can only be used if wal_level >= replica")));
    1004         792 : }
    1005             : 
    1006             : /*
    1007             :  * Reserve WAL for the currently active slot.
    1008             :  *
    1009             :  * Compute and set restart_lsn in a manner that's appropriate for the type of
    1010             :  * the slot and concurrency safe.
    1011             :  */
    1012             : void
    1013         414 : ReplicationSlotReserveWal(void)
    1014             : {
    1015         414 :     ReplicationSlot *slot = MyReplicationSlot;
    1016             : 
    1017             :     Assert(slot != NULL);
    1018             :     Assert(slot->data.restart_lsn == InvalidXLogRecPtr);
    1019             : 
    1020             :     /*
    1021             :      * The replication slot mechanism is used to prevent removal of required
    1022             :      * WAL. As there is no interlock between this routine and checkpoints, WAL
    1023             :      * segments could concurrently be removed when a now stale return value of
    1024             :      * ReplicationSlotsComputeRequiredLSN() is used. In the unlikely case that
    1025             :      * this happens we'll just retry.
    1026             :      */
    1027             :     while (true)
    1028           0 :     {
    1029             :         XLogSegNo   segno;
    1030             :         XLogRecPtr  restart_lsn;
    1031             : 
    1032             :         /*
    1033             :          * For logical slots log a standby snapshot and start logical decoding
    1034             :          * at exactly that position. That allows the slot to start up more
    1035             :          * quickly.
    1036             :          *
    1037             :          * That's not needed (or indeed helpful) for physical slots as they'll
    1038             :          * start replay at the last logged checkpoint anyway. Instead return
    1039             :          * the location of the last redo LSN. While that slightly increases
    1040             :          * the chance that we have to retry, it's where a base backup has to
    1041             :          * start replay at.
    1042             :          */
    1043         414 :         if (!RecoveryInProgress() && SlotIsLogical(slot))
    1044         280 :         {
    1045             :             XLogRecPtr  flushptr;
    1046             : 
    1047             :             /* start at current insert position */
    1048         280 :             restart_lsn = GetXLogInsertRecPtr();
    1049         280 :             SpinLockAcquire(&slot->mutex);
    1050         280 :             slot->data.restart_lsn = restart_lsn;
    1051         280 :             SpinLockRelease(&slot->mutex);
    1052             : 
    1053             :             /* make sure we have enough information to start */
    1054         280 :             flushptr = LogStandbySnapshot();
    1055             : 
    1056             :             /* and make sure it's fsynced to disk */
    1057         280 :             XLogFlush(flushptr);
    1058             :         }
    1059             :         else
    1060             :         {
    1061         134 :             restart_lsn = GetRedoRecPtr();
    1062         134 :             SpinLockAcquire(&slot->mutex);
    1063         134 :             slot->data.restart_lsn = restart_lsn;
    1064         134 :             SpinLockRelease(&slot->mutex);
    1065             :         }
    1066             : 
    1067             :         /* prevent WAL removal as fast as possible */
    1068         414 :         ReplicationSlotsComputeRequiredLSN();
    1069             : 
    1070             :         /*
    1071             :          * If all required WAL is still there, great, otherwise retry. The
    1072             :          * slot should prevent further removal of WAL, unless there's a
    1073             :          * concurrent ReplicationSlotsComputeRequiredLSN() after we've written
    1074             :          * the new restart_lsn above, so normally we should never need to loop
    1075             :          * more than twice.
    1076             :          */
    1077         414 :         XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size);
    1078         414 :         if (XLogGetLastRemovedSegno() < segno)
    1079         414 :             break;
    1080             :     }
    1081         414 : }
    1082             : 
    1083             : /*
    1084             :  * Mark any slot that points to an LSN older than the given segment
    1085             :  * as invalid; it requires WAL that's about to be removed.
    1086             :  *
    1087             :  * NB - this runs as part of checkpoint, so avoid raising errors if possible.
    1088             :  */
    1089             : void
    1090        3172 : InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno)
    1091             : {
    1092             :     XLogRecPtr  oldestLSN;
    1093             : 
    1094        3172 :     XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
    1095             : 
    1096        3172 : restart:
    1097        3172 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    1098       33384 :     for (int i = 0; i < max_replication_slots; i++)
    1099             :     {
    1100       30212 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
    1101       30212 :         XLogRecPtr  restart_lsn = InvalidXLogRecPtr;
    1102             :         char       *slotname;
    1103             : 
    1104       30212 :         if (!s->in_use)
    1105       30120 :             continue;
    1106             : 
    1107          92 :         SpinLockAcquire(&s->mutex);
    1108          92 :         if (s->data.restart_lsn == InvalidXLogRecPtr ||
    1109          84 :             s->data.restart_lsn >= oldestLSN)
    1110             :         {
    1111          92 :             SpinLockRelease(&s->mutex);
    1112          92 :             continue;
    1113             :         }
    1114             : 
    1115           0 :         slotname = pstrdup(NameStr(s->data.name));
    1116           0 :         restart_lsn = s->data.restart_lsn;
    1117             : 
    1118           0 :         SpinLockRelease(&s->mutex);
    1119           0 :         LWLockRelease(ReplicationSlotControlLock);
    1120             : 
    1121             :         for (;;)
    1122           0 :         {
    1123           0 :             int         wspid = ReplicationSlotAcquire(slotname, SAB_Inquire);
    1124             : 
    1125             :             /* no walsender? success! */
    1126           0 :             if (wspid == 0)
    1127           0 :                 break;
    1128             : 
    1129           0 :             ereport(LOG,
    1130             :                     (errmsg("terminating walsender %d because replication slot \"%s\" is too far behind",
    1131             :                             wspid, slotname)));
    1132           0 :             (void) kill(wspid, SIGTERM);
    1133             : 
    1134           0 :             ConditionVariableTimedSleep(&s->active_cv, 10,
    1135             :                                         WAIT_EVENT_REPLICATION_SLOT_DROP);
    1136             :         }
    1137           0 :         ConditionVariableCancelSleep();
    1138             : 
    1139           0 :         ereport(LOG,
    1140             :                 (errmsg("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size",
    1141             :                         slotname,
    1142             :                         (uint32) (restart_lsn >> 32),
    1143             :                         (uint32) restart_lsn)));
    1144             : 
    1145           0 :         SpinLockAcquire(&s->mutex);
    1146           0 :         s->data.restart_lsn = InvalidXLogRecPtr;
    1147           0 :         SpinLockRelease(&s->mutex);
    1148           0 :         ReplicationSlotRelease();
    1149             : 
    1150             :         /* if we did anything, start from scratch */
    1151           0 :         CHECK_FOR_INTERRUPTS();
    1152           0 :         goto restart;
    1153             :     }
    1154        3172 :     LWLockRelease(ReplicationSlotControlLock);
    1155        3172 : }
    1156             : 
    1157             : /*
    1158             :  * Flush all replication slots to disk.
    1159             :  *
    1160             :  * This needn't actually be part of a checkpoint, but it's a convenient
    1161             :  * location.
    1162             :  */
    1163             : void
    1164        3172 : CheckPointReplicationSlots(void)
    1165             : {
    1166             :     int         i;
    1167             : 
    1168        3172 :     elog(DEBUG1, "performing replication slot checkpoint");
    1169             : 
    1170             :     /*
    1171             :      * Prevent any slot from being created/dropped while we're active. As we
    1172             :      * explicitly do *not* want to block iterating over replication_slots or
    1173             :      * acquiring a slot we cannot take the control lock - but that's OK,
    1174             :      * because holding ReplicationSlotAllocationLock is strictly stronger, and
    1175             :      * enough to guarantee that nobody can change the in_use bits on us.
    1176             :      */
    1177        3172 :     LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
    1178             : 
    1179       33384 :     for (i = 0; i < max_replication_slots; i++)
    1180             :     {
    1181       30212 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
    1182             :         char        path[MAXPGPATH];
    1183             : 
    1184       30212 :         if (!s->in_use)
    1185       30120 :             continue;
    1186             : 
    1187             :         /* save the slot to disk, locking is handled in SaveSlotToPath() */
    1188          92 :         sprintf(path, "pg_replslot/%s", NameStr(s->data.name));
    1189          92 :         SaveSlotToPath(s, path, LOG);
    1190             :     }
    1191        3172 :     LWLockRelease(ReplicationSlotAllocationLock);
    1192        3172 : }
    1193             : 
    1194             : /*
    1195             :  * Load all replication slots from disk into memory at server startup. This
    1196             :  * needs to be run before we start crash recovery.
    1197             :  */
    1198             : void
    1199        1390 : StartupReplicationSlots(void)
    1200             : {
    1201             :     DIR        *replication_dir;
    1202             :     struct dirent *replication_de;
    1203             : 
    1204        1390 :     elog(DEBUG1, "starting up replication slots");
    1205             : 
    1206             :     /* restore all slots by iterating over all on-disk entries */
    1207        1390 :     replication_dir = AllocateDir("pg_replslot");
    1208        4200 :     while ((replication_de = ReadDir(replication_dir, "pg_replslot")) != NULL)
    1209             :     {
    1210             :         struct stat statbuf;
    1211             :         char        path[MAXPGPATH + 12];
    1212             : 
    1213        2810 :         if (strcmp(replication_de->d_name, ".") == 0 ||
    1214        1420 :             strcmp(replication_de->d_name, "..") == 0)
    1215        2780 :             continue;
    1216             : 
    1217          30 :         snprintf(path, sizeof(path), "pg_replslot/%s", replication_de->d_name);
    1218             : 
    1219             :         /* we're only creating directories here, skip if it's not our's */
    1220          30 :         if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
    1221           0 :             continue;
    1222             : 
    1223             :         /* we crashed while a slot was being setup or deleted, clean up */
    1224          30 :         if (pg_str_endswith(replication_de->d_name, ".tmp"))
    1225             :         {
    1226           0 :             if (!rmtree(path, true))
    1227             :             {
    1228           0 :                 ereport(WARNING,
    1229             :                         (errmsg("could not remove directory \"%s\"",
    1230             :                                 path)));
    1231           0 :                 continue;
    1232             :             }
    1233           0 :             fsync_fname("pg_replslot", true);
    1234           0 :             continue;
    1235             :         }
    1236             : 
    1237             :         /* looks like a slot in a normal state, restore */
    1238          30 :         RestoreSlotFromDisk(replication_de->d_name);
    1239             :     }
    1240        1390 :     FreeDir(replication_dir);
    1241             : 
    1242             :     /* currently no slots exist, we're done. */
    1243        1390 :     if (max_replication_slots <= 0)
    1244           0 :         return;
    1245             : 
    1246             :     /* Now that we have recovered all the data, compute replication xmin */
    1247        1390 :     ReplicationSlotsComputeRequiredXmin(false);
    1248        1390 :     ReplicationSlotsComputeRequiredLSN();
    1249             : }
    1250             : 
    1251             : /* ----
    1252             :  * Manipulation of on-disk state of replication slots
    1253             :  *
    1254             :  * NB: none of the routines below should take any notice whether a slot is the
    1255             :  * current one or not, that's all handled a layer above.
    1256             :  * ----
    1257             :  */
    1258             : static void
    1259         458 : CreateSlotOnDisk(ReplicationSlot *slot)
    1260             : {
    1261             :     char        tmppath[MAXPGPATH];
    1262             :     char        path[MAXPGPATH];
    1263             :     struct stat st;
    1264             : 
    1265             :     /*
    1266             :      * No need to take out the io_in_progress_lock, nobody else can see this
    1267             :      * slot yet, so nobody else will write. We're reusing SaveSlotToPath which
    1268             :      * takes out the lock, if we'd take the lock here, we'd deadlock.
    1269             :      */
    1270             : 
    1271         458 :     sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
    1272         458 :     sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));
    1273             : 
    1274             :     /*
    1275             :      * It's just barely possible that some previous effort to create or drop a
    1276             :      * slot with this name left a temp directory lying around. If that seems
    1277             :      * to be the case, try to remove it.  If the rmtree() fails, we'll error
    1278             :      * out at the MakePGDirectory() below, so we don't bother checking
    1279             :      * success.
    1280             :      */
    1281         458 :     if (stat(tmppath, &st) == 0 && S_ISDIR(st.st_mode))
    1282           0 :         rmtree(tmppath, true);
    1283             : 
    1284             :     /* Create and fsync the temporary slot directory. */
    1285         458 :     if (MakePGDirectory(tmppath) < 0)
    1286           0 :         ereport(ERROR,
    1287             :                 (errcode_for_file_access(),
    1288             :                  errmsg("could not create directory \"%s\": %m",
    1289             :                         tmppath)));
    1290         458 :     fsync_fname(tmppath, true);
    1291             : 
    1292             :     /* Write the actual state file. */
    1293         458 :     slot->dirty = true;          /* signal that we really need to write */
    1294         458 :     SaveSlotToPath(slot, tmppath, ERROR);
    1295             : 
    1296             :     /* Rename the directory into place. */
    1297         458 :     if (rename(tmppath, path) != 0)
    1298           0 :         ereport(ERROR,
    1299             :                 (errcode_for_file_access(),
    1300             :                  errmsg("could not rename file \"%s\" to \"%s\": %m",
    1301             :                         tmppath, path)));
    1302             : 
    1303             :     /*
    1304             :      * If we'd now fail - really unlikely - we wouldn't know whether this slot
    1305             :      * would persist after an OS crash or not - so, force a restart. The
    1306             :      * restart would try to fsync this again till it works.
    1307             :      */
    1308         458 :     START_CRIT_SECTION();
    1309             : 
    1310         458 :     fsync_fname(path, true);
    1311         458 :     fsync_fname("pg_replslot", true);
    1312             : 
    1313         458 :     END_CRIT_SECTION();
    1314         458 : }
    1315             : 
    1316             : /*
    1317             :  * Shared functionality between saving and creating a replication slot.
    1318             :  */
    1319             : static void
    1320        1184 : SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
    1321             : {
    1322             :     char        tmppath[MAXPGPATH];
    1323             :     char        path[MAXPGPATH];
    1324             :     int         fd;
    1325             :     ReplicationSlotOnDisk cp;
    1326             :     bool        was_dirty;
    1327             : 
    1328             :     /* first check whether there's something to write out */
    1329        1184 :     SpinLockAcquire(&slot->mutex);
    1330        1184 :     was_dirty = slot->dirty;
    1331        1184 :     slot->just_dirtied = false;
    1332        1184 :     SpinLockRelease(&slot->mutex);
    1333             : 
    1334             :     /* and don't do anything if there's nothing to write */
    1335        1184 :     if (!was_dirty)
    1336          66 :         return;
    1337             : 
    1338        1118 :     LWLockAcquire(&slot->io_in_progress_lock, LW_EXCLUSIVE);
    1339             : 
    1340             :     /* silence valgrind :( */
    1341        1118 :     memset(&cp, 0, sizeof(ReplicationSlotOnDisk));
    1342             : 
    1343        1118 :     sprintf(tmppath, "%s/state.tmp", dir);
    1344        1118 :     sprintf(path, "%s/state", dir);
    1345             : 
    1346        1118 :     fd = OpenTransientFile(tmppath, O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
    1347        1118 :     if (fd < 0)
    1348             :     {
    1349             :         /*
    1350             :          * If not an ERROR, then release the lock before returning.  In case
    1351             :          * of an ERROR, the error recovery path automatically releases the
    1352             :          * lock, but no harm in explicitly releasing even in that case.  Note
    1353             :          * that LWLockRelease() could affect errno.
    1354             :          */
    1355           0 :         int         save_errno = errno;
    1356             : 
    1357           0 :         LWLockRelease(&slot->io_in_progress_lock);
    1358           0 :         errno = save_errno;
    1359           0 :         ereport(elevel,
    1360             :                 (errcode_for_file_access(),
    1361             :                  errmsg("could not create file \"%s\": %m",
    1362             :                         tmppath)));
    1363           0 :         return;
    1364             :     }
    1365             : 
    1366        1118 :     cp.magic = SLOT_MAGIC;
    1367        1118 :     INIT_CRC32C(cp.checksum);
    1368        1118 :     cp.version = SLOT_VERSION;
    1369        1118 :     cp.length = ReplicationSlotOnDiskV2Size;
    1370             : 
    1371        1118 :     SpinLockAcquire(&slot->mutex);
    1372             : 
    1373        1118 :     memcpy(&cp.slotdata, &slot->data, sizeof(ReplicationSlotPersistentData));
    1374             : 
    1375        1118 :     SpinLockRelease(&slot->mutex);
    1376             : 
    1377        1118 :     COMP_CRC32C(cp.checksum,
    1378             :                 (char *) (&cp) + SnapBuildOnDiskNotChecksummedSize,
    1379             :                 SnapBuildOnDiskChecksummedSize);
    1380        1118 :     FIN_CRC32C(cp.checksum);
    1381             : 
    1382        1118 :     errno = 0;
    1383        1118 :     pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_WRITE);
    1384        1118 :     if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
    1385             :     {
    1386           0 :         int         save_errno = errno;
    1387             : 
    1388           0 :         pgstat_report_wait_end();
    1389           0 :         CloseTransientFile(fd);
    1390           0 :         LWLockRelease(&slot->io_in_progress_lock);
    1391             : 
    1392             :         /* if write didn't set errno, assume problem is no disk space */
    1393           0 :         errno = save_errno ? save_errno : ENOSPC;
    1394           0 :         ereport(elevel,
    1395             :                 (errcode_for_file_access(),
    1396             :                  errmsg("could not write to file \"%s\": %m",
    1397             :                         tmppath)));
    1398           0 :         return;
    1399             :     }
    1400        1118 :     pgstat_report_wait_end();
    1401             : 
    1402             :     /* fsync the temporary file */
    1403        1118 :     pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_SYNC);
    1404        1118 :     if (pg_fsync(fd) != 0)
    1405             :     {
    1406           0 :         int         save_errno = errno;
    1407             : 
    1408           0 :         pgstat_report_wait_end();
    1409           0 :         CloseTransientFile(fd);
    1410           0 :         LWLockRelease(&slot->io_in_progress_lock);
    1411           0 :         errno = save_errno;
    1412           0 :         ereport(elevel,
    1413             :                 (errcode_for_file_access(),
    1414             :                  errmsg("could not fsync file \"%s\": %m",
    1415             :                         tmppath)));
    1416           0 :         return;
    1417             :     }
    1418        1118 :     pgstat_report_wait_end();
    1419             : 
    1420        1118 :     if (CloseTransientFile(fd) != 0)
    1421             :     {
    1422           0 :         int         save_errno = errno;
    1423             : 
    1424           0 :         LWLockRelease(&slot->io_in_progress_lock);
    1425           0 :         errno = save_errno;
    1426           0 :         ereport(elevel,
    1427             :                 (errcode_for_file_access(),
    1428             :                  errmsg("could not close file \"%s\": %m",
    1429             :                         tmppath)));
    1430           0 :         return;
    1431             :     }
    1432             : 
    1433             :     /* rename to permanent file, fsync file and directory */
    1434        1118 :     if (rename(tmppath, path) != 0)
    1435             :     {
    1436           0 :         int         save_errno = errno;
    1437             : 
    1438           0 :         LWLockRelease(&slot->io_in_progress_lock);
    1439           0 :         errno = save_errno;
    1440           0 :         ereport(elevel,
    1441             :                 (errcode_for_file_access(),
    1442             :                  errmsg("could not rename file \"%s\" to \"%s\": %m",
    1443             :                         tmppath, path)));
    1444           0 :         return;
    1445             :     }
    1446             : 
    1447             :     /*
    1448             :      * Check CreateSlotOnDisk() for the reasoning of using a critical section.
    1449             :      */
    1450        1118 :     START_CRIT_SECTION();
    1451             : 
    1452        1118 :     fsync_fname(path, false);
    1453        1118 :     fsync_fname(dir, true);
    1454        1118 :     fsync_fname("pg_replslot", true);
    1455             : 
    1456        1118 :     END_CRIT_SECTION();
    1457             : 
    1458             :     /*
    1459             :      * Successfully wrote, unset dirty bit, unless somebody dirtied again
    1460             :      * already.
    1461             :      */
    1462        1118 :     SpinLockAcquire(&slot->mutex);
    1463        1118 :     if (!slot->just_dirtied)
    1464        1118 :         slot->dirty = false;
    1465        1118 :     SpinLockRelease(&slot->mutex);
    1466             : 
    1467        1118 :     LWLockRelease(&slot->io_in_progress_lock);
    1468             : }
    1469             : 
    1470             : /*
    1471             :  * Load a single slot from disk into memory.
    1472             :  */
    1473             : static void
    1474          30 : RestoreSlotFromDisk(const char *name)
    1475             : {
    1476             :     ReplicationSlotOnDisk cp;
    1477             :     int         i;
    1478             :     char        slotdir[MAXPGPATH + 12];
    1479             :     char        path[MAXPGPATH + 22];
    1480             :     int         fd;
    1481          30 :     bool        restored = false;
    1482             :     int         readBytes;
    1483             :     pg_crc32c   checksum;
    1484             : 
    1485             :     /* no need to lock here, no concurrent access allowed yet */
    1486             : 
    1487             :     /* delete temp file if it exists */
    1488          30 :     sprintf(slotdir, "pg_replslot/%s", name);
    1489          30 :     sprintf(path, "%s/state.tmp", slotdir);
    1490          30 :     if (unlink(path) < 0 && errno != ENOENT)
    1491           0 :         ereport(PANIC,
    1492             :                 (errcode_for_file_access(),
    1493             :                  errmsg("could not remove file \"%s\": %m", path)));
    1494             : 
    1495          30 :     sprintf(path, "%s/state", slotdir);
    1496             : 
    1497          30 :     elog(DEBUG1, "restoring replication slot from \"%s\"", path);
    1498             : 
    1499             :     /* on some operating systems fsyncing a file requires O_RDWR */
    1500          30 :     fd = OpenTransientFile(path, O_RDWR | PG_BINARY);
    1501             : 
    1502             :     /*
    1503             :      * We do not need to handle this as we are rename()ing the directory into
    1504             :      * place only after we fsync()ed the state file.
    1505             :      */
    1506          30 :     if (fd < 0)
    1507           0 :         ereport(PANIC,
    1508             :                 (errcode_for_file_access(),
    1509             :                  errmsg("could not open file \"%s\": %m", path)));
    1510             : 
    1511             :     /*
    1512             :      * Sync state file before we're reading from it. We might have crashed
    1513             :      * while it wasn't synced yet and we shouldn't continue on that basis.
    1514             :      */
    1515          30 :     pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_RESTORE_SYNC);
    1516          30 :     if (pg_fsync(fd) != 0)
    1517           0 :         ereport(PANIC,
    1518             :                 (errcode_for_file_access(),
    1519             :                  errmsg("could not fsync file \"%s\": %m",
    1520             :                         path)));
    1521          30 :     pgstat_report_wait_end();
    1522             : 
    1523             :     /* Also sync the parent directory */
    1524          30 :     START_CRIT_SECTION();
    1525          30 :     fsync_fname(slotdir, true);
    1526          30 :     END_CRIT_SECTION();
    1527             : 
    1528             :     /* read part of statefile that's guaranteed to be version independent */
    1529          30 :     pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
    1530          30 :     readBytes = read(fd, &cp, ReplicationSlotOnDiskConstantSize);
    1531          30 :     pgstat_report_wait_end();
    1532          30 :     if (readBytes != ReplicationSlotOnDiskConstantSize)
    1533             :     {
    1534           0 :         if (readBytes < 0)
    1535           0 :             ereport(PANIC,
    1536             :                     (errcode_for_file_access(),
    1537             :                      errmsg("could not read file \"%s\": %m", path)));
    1538             :         else
    1539           0 :             ereport(PANIC,
    1540             :                     (errcode(ERRCODE_DATA_CORRUPTED),
    1541             :                      errmsg("could not read file \"%s\": read %d of %zu",
    1542             :                             path, readBytes,
    1543             :                             (Size) ReplicationSlotOnDiskConstantSize)));
    1544             :     }
    1545             : 
    1546             :     /* verify magic */
    1547          30 :     if (cp.magic != SLOT_MAGIC)
    1548           0 :         ereport(PANIC,
    1549             :                 (errcode(ERRCODE_DATA_CORRUPTED),
    1550             :                  errmsg("replication slot file \"%s\" has wrong magic number: %u instead of %u",
    1551             :                         path, cp.magic, SLOT_MAGIC)));
    1552             : 
    1553             :     /* verify version */
    1554          30 :     if (cp.version != SLOT_VERSION)
    1555           0 :         ereport(PANIC,
    1556             :                 (errcode(ERRCODE_DATA_CORRUPTED),
    1557             :                  errmsg("replication slot file \"%s\" has unsupported version %u",
    1558             :                         path, cp.version)));
    1559             : 
    1560             :     /* boundary check on length */
    1561          30 :     if (cp.length != ReplicationSlotOnDiskV2Size)
    1562           0 :         ereport(PANIC,
    1563             :                 (errcode(ERRCODE_DATA_CORRUPTED),
    1564             :                  errmsg("replication slot file \"%s\" has corrupted length %u",
    1565             :                         path, cp.length)));
    1566             : 
    1567             :     /* Now that we know the size, read the entire file */
    1568          30 :     pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
    1569          60 :     readBytes = read(fd,
    1570             :                      (char *) &cp + ReplicationSlotOnDiskConstantSize,
    1571          30 :                      cp.length);
    1572          30 :     pgstat_report_wait_end();
    1573          30 :     if (readBytes != cp.length)
    1574             :     {
    1575           0 :         if (readBytes < 0)
    1576           0 :             ereport(PANIC,
    1577             :                     (errcode_for_file_access(),
    1578             :                      errmsg("could not read file \"%s\": %m", path)));
    1579             :         else
    1580           0 :             ereport(PANIC,
    1581             :                     (errcode(ERRCODE_DATA_CORRUPTED),
    1582             :                      errmsg("could not read file \"%s\": read %d of %zu",
    1583             :                             path, readBytes, (Size) cp.length)));
    1584             :     }
    1585             : 
    1586          30 :     if (CloseTransientFile(fd) != 0)
    1587           0 :         ereport(PANIC,
    1588             :                 (errcode_for_file_access(),
    1589             :                  errmsg("could not close file \"%s\": %m", path)));
    1590             : 
    1591             :     /* now verify the CRC */
    1592          30 :     INIT_CRC32C(checksum);
    1593          30 :     COMP_CRC32C(checksum,
    1594             :                 (char *) &cp + SnapBuildOnDiskNotChecksummedSize,
    1595             :                 SnapBuildOnDiskChecksummedSize);
    1596          30 :     FIN_CRC32C(checksum);
    1597             : 
    1598          30 :     if (!EQ_CRC32C(checksum, cp.checksum))
    1599           0 :         ereport(PANIC,
    1600             :                 (errmsg("checksum mismatch for replication slot file \"%s\": is %u, should be %u",
    1601             :                         path, checksum, cp.checksum)));
    1602             : 
    1603             :     /*
    1604             :      * If we crashed with an ephemeral slot active, don't restore but delete
    1605             :      * it.
    1606             :      */
    1607          30 :     if (cp.slotdata.persistency != RS_PERSISTENT)
    1608             :     {
    1609           0 :         if (!rmtree(slotdir, true))
    1610             :         {
    1611           0 :             ereport(WARNING,
    1612             :                     (errmsg("could not remove directory \"%s\"",
    1613             :                             slotdir)));
    1614             :         }
    1615           0 :         fsync_fname("pg_replslot", true);
    1616           0 :         return;
    1617             :     }
    1618             : 
    1619             :     /*
    1620             :      * Verify that requirements for the specific slot type are met. That's
    1621             :      * important because if these aren't met we're not guaranteed to retain
    1622             :      * all the necessary resources for the slot.
    1623             :      *
    1624             :      * NB: We have to do so *after* the above checks for ephemeral slots,
    1625             :      * because otherwise a slot that shouldn't exist anymore could prevent
    1626             :      * restarts.
    1627             :      *
    1628             :      * NB: Changing the requirements here also requires adapting
    1629             :      * CheckSlotRequirements() and CheckLogicalDecodingRequirements().
    1630             :      */
    1631          30 :     if (cp.slotdata.database != InvalidOid && wal_level < WAL_LEVEL_LOGICAL)
    1632           0 :         ereport(FATAL,
    1633             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1634             :                  errmsg("logical replication slot \"%s\" exists, but wal_level < logical",
    1635             :                         NameStr(cp.slotdata.name)),
    1636             :                  errhint("Change wal_level to be logical or higher.")));
    1637          30 :     else if (wal_level < WAL_LEVEL_REPLICA)
    1638           0 :         ereport(FATAL,
    1639             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1640             :                  errmsg("physical replication slot \"%s\" exists, but wal_level < replica",
    1641             :                         NameStr(cp.slotdata.name)),
    1642             :                  errhint("Change wal_level to be replica or higher.")));
    1643             : 
    1644             :     /* nothing can be active yet, don't lock anything */
    1645          42 :     for (i = 0; i < max_replication_slots; i++)
    1646             :     {
    1647             :         ReplicationSlot *slot;
    1648             : 
    1649          42 :         slot = &ReplicationSlotCtl->replication_slots[i];
    1650             : 
    1651          42 :         if (slot->in_use)
    1652          12 :             continue;
    1653             : 
    1654             :         /* restore the entire set of persistent data */
    1655          30 :         memcpy(&slot->data, &cp.slotdata,
    1656             :                sizeof(ReplicationSlotPersistentData));
    1657             : 
    1658             :         /* initialize in memory state */
    1659          30 :         slot->effective_xmin = cp.slotdata.xmin;
    1660          30 :         slot->effective_catalog_xmin = cp.slotdata.catalog_xmin;
    1661             : 
    1662          30 :         slot->candidate_catalog_xmin = InvalidTransactionId;
    1663          30 :         slot->candidate_xmin_lsn = InvalidXLogRecPtr;
    1664          30 :         slot->candidate_restart_lsn = InvalidXLogRecPtr;
    1665          30 :         slot->candidate_restart_valid = InvalidXLogRecPtr;
    1666             : 
    1667          30 :         slot->in_use = true;
    1668          30 :         slot->active_pid = 0;
    1669             : 
    1670          30 :         restored = true;
    1671          30 :         break;
    1672             :     }
    1673             : 
    1674          30 :     if (!restored)
    1675           0 :         ereport(FATAL,
    1676             :                 (errmsg("too many replication slots active before shutdown"),
    1677             :                  errhint("Increase max_replication_slots and try again.")));
    1678             : }

Generated by: LCOV version 1.13