LCOV - code coverage report
Current view: top level - src/backend/replication - slot.c (source / functions) Hit Total Coverage
Test: PostgreSQL 19devel Lines: 870 1002 86.8 %
Date: 2026-02-11 07:17:17 Functions: 47 48 97.9 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * slot.c
       4             :  *     Replication slot management.
       5             :  *
       6             :  *
       7             :  * Copyright (c) 2012-2026, PostgreSQL Global Development Group
       8             :  *
       9             :  *
      10             :  * IDENTIFICATION
      11             :  *    src/backend/replication/slot.c
      12             :  *
      13             :  * NOTES
      14             :  *
      15             :  * Replication slots are used to keep state about replication streams
      16             :  * originating from this cluster.  Their primary purpose is to prevent the
      17             :  * premature removal of WAL or of old tuple versions in a manner that would
      18             :  * interfere with replication; they are also useful for monitoring purposes.
      19             :  * Slots need to be permanent (to allow restarts), crash-safe, and allocatable
      20             :  * on standbys (to support cascading setups).  The requirement that slots be
      21             :  * usable on standbys precludes storing them in the system catalogs.
      22             :  *
      23             :  * Each replication slot gets its own directory inside the directory
      24             :  * $PGDATA / PG_REPLSLOT_DIR.  Inside that directory the state file will
      25             :  * contain the slot's own data.  Additional data can be stored alongside that
      26             :  * file if required.  While the server is running, the state data is also
      27             :  * cached in memory for efficiency.
      28             :  *
      29             :  * ReplicationSlotAllocationLock must be taken in exclusive mode to allocate
      30             :  * or free a slot. ReplicationSlotControlLock must be taken in shared mode
      31             :  * to iterate over the slots, and in exclusive mode to change the in_use flag
      32             :  * of a slot.  The remaining data in each slot is protected by its mutex.
      33             :  *
      34             :  *-------------------------------------------------------------------------
      35             :  */
      36             : 
      37             : #include "postgres.h"
      38             : 
      39             : #include <unistd.h>
      40             : #include <sys/stat.h>
      41             : 
      42             : #include "access/transam.h"
      43             : #include "access/xlog_internal.h"
      44             : #include "access/xlogrecovery.h"
      45             : #include "common/file_utils.h"
      46             : #include "common/string.h"
      47             : #include "miscadmin.h"
      48             : #include "pgstat.h"
      49             : #include "postmaster/interrupt.h"
      50             : #include "replication/logicallauncher.h"
      51             : #include "replication/slotsync.h"
      52             : #include "replication/slot.h"
      53             : #include "replication/walsender_private.h"
      54             : #include "storage/fd.h"
      55             : #include "storage/ipc.h"
      56             : #include "storage/proc.h"
      57             : #include "storage/procarray.h"
      58             : #include "utils/builtins.h"
      59             : #include "utils/guc_hooks.h"
      60             : #include "utils/injection_point.h"
      61             : #include "utils/varlena.h"
      62             : 
      63             : /*
      64             :  * Replication slot on-disk data structure.
      65             :  */
      66             : typedef struct ReplicationSlotOnDisk
      67             : {
      68             :     /* first part of this struct needs to be version independent */
      69             : 
      70             :     /* data not covered by checksum */
      71             :     uint32      magic;
      72             :     pg_crc32c   checksum;
      73             : 
      74             :     /* data covered by checksum */
      75             :     uint32      version;
      76             :     uint32      length;
      77             : 
      78             :     /*
      79             :      * The actual data in the slot that follows can differ based on the above
      80             :      * 'version'.
      81             :      */
      82             : 
      83             :     ReplicationSlotPersistentData slotdata;
      84             : } ReplicationSlotOnDisk;
      85             : 
      86             : /*
      87             :  * Struct for the configuration of synchronized_standby_slots.
      88             :  *
      89             :  * Note: this must be a flat representation that can be held in a single chunk
      90             :  * of guc_malloc'd memory, so that it can be stored as the "extra" data for the
      91             :  * synchronized_standby_slots GUC.
      92             :  */
      93             : typedef struct
      94             : {
      95             :     /* Number of slot names in the slot_names[] */
      96             :     int         nslotnames;
      97             : 
      98             :     /*
      99             :      * slot_names contains 'nslotnames' consecutive null-terminated C strings.
     100             :      */
     101             :     char        slot_names[FLEXIBLE_ARRAY_MEMBER];
     102             : } SyncStandbySlotsConfigData;
     103             : 
     104             : /*
     105             :  * Lookup table for slot invalidation causes.
     106             :  */
     107             : typedef struct SlotInvalidationCauseMap
     108             : {
     109             :     ReplicationSlotInvalidationCause cause;
     110             :     const char *cause_name;
     111             : } SlotInvalidationCauseMap;
     112             : 
     113             : static const SlotInvalidationCauseMap SlotInvalidationCauses[] = {
     114             :     {RS_INVAL_NONE, "none"},
     115             :     {RS_INVAL_WAL_REMOVED, "wal_removed"},
     116             :     {RS_INVAL_HORIZON, "rows_removed"},
     117             :     {RS_INVAL_WAL_LEVEL, "wal_level_insufficient"},
     118             :     {RS_INVAL_IDLE_TIMEOUT, "idle_timeout"},
     119             : };
     120             : 
     121             : /*
     122             :  * Ensure that the lookup table is up-to-date with the enums defined in
     123             :  * ReplicationSlotInvalidationCause.
     124             :  */
     125             : StaticAssertDecl(lengthof(SlotInvalidationCauses) == (RS_INVAL_MAX_CAUSES + 1),
     126             :                  "array length mismatch");
     127             : 
     128             : /* size of version independent data */
     129             : #define ReplicationSlotOnDiskConstantSize \
     130             :     offsetof(ReplicationSlotOnDisk, slotdata)
     131             : /* size of the part of the slot not covered by the checksum */
     132             : #define ReplicationSlotOnDiskNotChecksummedSize  \
     133             :     offsetof(ReplicationSlotOnDisk, version)
     134             : /* size of the part covered by the checksum */
     135             : #define ReplicationSlotOnDiskChecksummedSize \
     136             :     sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskNotChecksummedSize
     137             : /* size of the slot data that is version dependent */
     138             : #define ReplicationSlotOnDiskV2Size \
     139             :     sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
     140             : 
     141             : #define SLOT_MAGIC      0x1051CA1   /* format identifier */
     142             : #define SLOT_VERSION    5       /* version for new files */
     143             : 
     144             : /* Control array for replication slot management */
     145             : ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
     146             : 
     147             : /* My backend's replication slot in the shared memory array */
     148             : ReplicationSlot *MyReplicationSlot = NULL;
     149             : 
     150             : /* GUC variables */
     151             : int         max_replication_slots = 10; /* the maximum number of replication
     152             :                                          * slots */
     153             : 
     154             : /*
     155             :  * Invalidate replication slots that have remained idle longer than this
     156             :  * duration; '0' disables it.
     157             :  */
     158             : int         idle_replication_slot_timeout_secs = 0;
     159             : 
     160             : /*
     161             :  * This GUC lists streaming replication standby server slot names that
     162             :  * logical WAL sender processes will wait for.
     163             :  */
     164             : char       *synchronized_standby_slots;
     165             : 
     166             : /* This is the parsed and cached configuration for synchronized_standby_slots */
     167             : static SyncStandbySlotsConfigData *synchronized_standby_slots_config;
     168             : 
     169             : /*
     170             :  * Oldest LSN that has been confirmed to be flushed to the standbys
     171             :  * corresponding to the physical slots specified in the synchronized_standby_slots GUC.
     172             :  */
     173             : static XLogRecPtr ss_oldest_flush_lsn = InvalidXLogRecPtr;
     174             : 
     175             : static void ReplicationSlotShmemExit(int code, Datum arg);
     176             : static bool IsSlotForConflictCheck(const char *name);
     177             : static void ReplicationSlotDropPtr(ReplicationSlot *slot);
     178             : 
     179             : /* internal persistency functions */
     180             : static void RestoreSlotFromDisk(const char *name);
     181             : static void CreateSlotOnDisk(ReplicationSlot *slot);
     182             : static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel);
     183             : 
     184             : /*
     185             :  * Report shared-memory space needed by ReplicationSlotsShmemInit.
     186             :  */
     187             : Size
     188        8810 : ReplicationSlotsShmemSize(void)
     189             : {
     190        8810 :     Size        size = 0;
     191             : 
     192        8810 :     if (max_replication_slots == 0)
     193           4 :         return size;
     194             : 
     195        8806 :     size = offsetof(ReplicationSlotCtlData, replication_slots);
     196        8806 :     size = add_size(size,
     197             :                     mul_size(max_replication_slots, sizeof(ReplicationSlot)));
     198             : 
     199        8806 :     return size;
     200             : }
     201             : 
     202             : /*
     203             :  * Allocate and initialize shared memory for replication slots.
     204             :  */
     205             : void
     206        2280 : ReplicationSlotsShmemInit(void)
     207             : {
     208             :     bool        found;
     209             : 
     210        2280 :     if (max_replication_slots == 0)
     211           2 :         return;
     212             : 
     213        2278 :     ReplicationSlotCtl = (ReplicationSlotCtlData *)
     214        2278 :         ShmemInitStruct("ReplicationSlot Ctl", ReplicationSlotsShmemSize(),
     215             :                         &found);
     216             : 
     217        2278 :     if (!found)
     218             :     {
     219             :         int         i;
     220             : 
     221             :         /* First time through, so initialize */
     222        4498 :         MemSet(ReplicationSlotCtl, 0, ReplicationSlotsShmemSize());
     223             : 
     224       24674 :         for (i = 0; i < max_replication_slots; i++)
     225             :         {
     226       22396 :             ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i];
     227             : 
     228             :             /* everything else is zeroed by the memset above */
     229       22396 :             slot->active_proc = INVALID_PROC_NUMBER;
     230       22396 :             SpinLockInit(&slot->mutex);
     231       22396 :             LWLockInitialize(&slot->io_in_progress_lock,
     232             :                              LWTRANCHE_REPLICATION_SLOT_IO);
     233       22396 :             ConditionVariableInit(&slot->active_cv);
     234             :         }
     235             :     }
     236             : }
     237             : 
     238             : /*
     239             :  * Register the callback for replication slot cleanup and releasing.
     240             :  */
     241             : void
     242       44972 : ReplicationSlotInitialize(void)
     243             : {
     244       44972 :     before_shmem_exit(ReplicationSlotShmemExit, 0);
     245       44972 : }
     246             : 
     247             : /*
     248             :  * Release and cleanup replication slots.
     249             :  */
     250             : static void
     251       44972 : ReplicationSlotShmemExit(int code, Datum arg)
     252             : {
     253             :     /* Make sure active replication slots are released */
     254       44972 :     if (MyReplicationSlot != NULL)
     255         542 :         ReplicationSlotRelease();
     256             : 
     257             :     /* Also cleanup all the temporary slots. */
     258       44972 :     ReplicationSlotCleanup(false);
     259       44972 : }
     260             : 
     261             : /*
     262             :  * Check whether the passed slot name is valid and report errors at elevel.
     263             :  *
     264             :  * See comments for ReplicationSlotValidateNameInternal().
     265             :  */
     266             : bool
     267        1642 : ReplicationSlotValidateName(const char *name, bool allow_reserved_name,
     268             :                             int elevel)
     269             : {
     270             :     int         err_code;
     271        1642 :     char       *err_msg = NULL;
     272        1642 :     char       *err_hint = NULL;
     273             : 
     274        1642 :     if (!ReplicationSlotValidateNameInternal(name, allow_reserved_name,
     275             :                                              &err_code, &err_msg, &err_hint))
     276             :     {
     277             :         /*
     278             :          * Use errmsg_internal() and errhint_internal() instead of errmsg()
     279             :          * and errhint(), since the messages from
     280             :          * ReplicationSlotValidateNameInternal() are already translated. This
     281             :          * avoids double translation.
     282             :          */
     283           8 :         ereport(elevel,
     284             :                 errcode(err_code),
     285             :                 errmsg_internal("%s", err_msg),
     286             :                 (err_hint != NULL) ? errhint_internal("%s", err_hint) : 0);
     287             : 
     288           0 :         pfree(err_msg);
     289           0 :         if (err_hint != NULL)
     290           0 :             pfree(err_hint);
     291           0 :         return false;
     292             :     }
     293             : 
     294        1634 :     return true;
     295             : }
     296             : 
     297             : /*
     298             :  * Check whether the passed slot name is valid.
     299             :  *
     300             :  * An error will be reported for a reserved replication slot name if
     301             :  * allow_reserved_name is set to false.
     302             :  *
     303             :  * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow
     304             :  * the name to be used as a directory name on every supported OS.
     305             :  *
     306             :  * Returns true if the slot name is valid. Otherwise, returns false and stores
     307             :  * the error code, error message, and optional hint in err_code, err_msg, and
     308             :  * err_hint, respectively. The caller is responsible for freeing err_msg and
     309             :  * err_hint, which are palloc'd.
     310             :  */
     311             : bool
     312        2080 : ReplicationSlotValidateNameInternal(const char *name, bool allow_reserved_name,
     313             :                                     int *err_code, char **err_msg, char **err_hint)
     314             : {
     315             :     const char *cp;
     316             : 
     317        2080 :     if (strlen(name) == 0)
     318             :     {
     319           6 :         *err_code = ERRCODE_INVALID_NAME;
     320           6 :         *err_msg = psprintf(_("replication slot name \"%s\" is too short"), name);
     321           6 :         *err_hint = NULL;
     322           6 :         return false;
     323             :     }
     324             : 
     325        2074 :     if (strlen(name) >= NAMEDATALEN)
     326             :     {
     327           0 :         *err_code = ERRCODE_NAME_TOO_LONG;
     328           0 :         *err_msg = psprintf(_("replication slot name \"%s\" is too long"), name);
     329           0 :         *err_hint = NULL;
     330           0 :         return false;
     331             :     }
     332             : 
     333       39842 :     for (cp = name; *cp; cp++)
     334             :     {
     335       37772 :         if (!((*cp >= 'a' && *cp <= 'z')
     336       17674 :               || (*cp >= '0' && *cp <= '9')
     337        3668 :               || (*cp == '_')))
     338             :         {
     339           4 :             *err_code = ERRCODE_INVALID_NAME;
     340           4 :             *err_msg = psprintf(_("replication slot name \"%s\" contains invalid character"), name);
     341           4 :             *err_hint = psprintf(_("Replication slot names may only contain lower case letters, numbers, and the underscore character."));
     342           4 :             return false;
     343             :         }
     344             :     }
     345             : 
     346        2070 :     if (!allow_reserved_name && IsSlotForConflictCheck(name))
     347             :     {
     348           2 :         *err_code = ERRCODE_RESERVED_NAME;
     349           2 :         *err_msg = psprintf(_("replication slot name \"%s\" is reserved"), name);
     350           2 :         *err_hint = psprintf(_("The name \"%s\" is reserved for the conflict detection slot."),
     351             :                              CONFLICT_DETECTION_SLOT);
     352           2 :         return false;
     353             :     }
     354             : 
     355        2068 :     return true;
     356             : }
     357             : 
     358             : /*
     359             :  * Return true if the replication slot name is "pg_conflict_detection".
     360             :  */
     361             : static bool
     362        4450 : IsSlotForConflictCheck(const char *name)
     363             : {
     364        4450 :     return (strcmp(name, CONFLICT_DETECTION_SLOT) == 0);
     365             : }
     366             : 
     367             : /*
     368             :  * Create a new replication slot and mark it as used by this backend.
     369             :  *
     370             :  * name: Name of the slot
     371             :  * db_specific: logical decoding is db specific; if the slot is going to
     372             :  *     be used for that pass true, otherwise false.
     373             :  * two_phase: If enabled, allows decoding of prepared transactions.
     374             :  * failover: If enabled, allows the slot to be synced to standbys so
     375             :  *     that logical replication can be resumed after failover.
     376             :  * synced: True if the slot is synchronized from the primary server.
     377             :  */
     378             : void
     379        1368 : ReplicationSlotCreate(const char *name, bool db_specific,
     380             :                       ReplicationSlotPersistency persistency,
     381             :                       bool two_phase, bool failover, bool synced)
     382             : {
     383        1368 :     ReplicationSlot *slot = NULL;
     384             :     int         i;
     385             : 
     386             :     Assert(MyReplicationSlot == NULL);
     387             : 
     388             :     /*
     389             :      * The logical launcher or pg_upgrade may create or migrate an internal
     390             :      * slot, so using a reserved name is allowed in these cases.
     391             :      */
     392        1368 :     ReplicationSlotValidateName(name, IsBinaryUpgrade || IsLogicalLauncher(),
     393             :                                 ERROR);
     394             : 
     395        1366 :     if (failover)
     396             :     {
     397             :         /*
     398             :          * Do not allow users to create the failover enabled slots on the
     399             :          * standby as we do not support sync to the cascading standby.
     400             :          *
     401             :          * However, failover enabled slots can be created during slot
     402             :          * synchronization because we need to retain the same values as the
     403             :          * remote slot.
     404             :          */
     405          56 :         if (RecoveryInProgress() && !IsSyncingReplicationSlots())
     406           0 :             ereport(ERROR,
     407             :                     errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     408             :                     errmsg("cannot enable failover for a replication slot created on the standby"));
     409             : 
     410             :         /*
     411             :          * Do not allow users to create failover enabled temporary slots,
     412             :          * because temporary slots will not be synced to the standby.
     413             :          *
     414             :          * However, failover enabled temporary slots can be created during
     415             :          * slot synchronization. See the comments atop slotsync.c for details.
     416             :          */
     417          56 :         if (persistency == RS_TEMPORARY && !IsSyncingReplicationSlots())
     418           2 :             ereport(ERROR,
     419             :                     errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     420             :                     errmsg("cannot enable failover for a temporary replication slot"));
     421             :     }
     422             : 
     423             :     /*
     424             :      * If some other backend ran this code concurrently with us, we'd likely
     425             :      * both allocate the same slot, and that would be bad.  We'd also be at
     426             :      * risk of missing a name collision.  Also, we don't want to try to create
     427             :      * a new slot while somebody's busy cleaning up an old one, because we
     428             :      * might both be monkeying with the same directory.
     429             :      */
     430        1364 :     LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
     431             : 
     432             :     /*
     433             :      * Check for name collision, and identify an allocatable slot.  We need to
     434             :      * hold ReplicationSlotControlLock in shared mode for this, so that nobody
     435             :      * else can change the in_use flags while we're looking at them.
     436             :      */
     437        1364 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     438       13188 :     for (i = 0; i < max_replication_slots; i++)
     439             :     {
     440       11830 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
     441             : 
     442       11830 :         if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
     443           6 :             ereport(ERROR,
     444             :                     (errcode(ERRCODE_DUPLICATE_OBJECT),
     445             :                      errmsg("replication slot \"%s\" already exists", name)));
     446       11824 :         if (!s->in_use && slot == NULL)
     447        1356 :             slot = s;
     448             :     }
     449        1358 :     LWLockRelease(ReplicationSlotControlLock);
     450             : 
     451             :     /* If all slots are in use, we're out of luck. */
     452        1358 :     if (slot == NULL)
     453           2 :         ereport(ERROR,
     454             :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     455             :                  errmsg("all replication slots are in use"),
     456             :                  errhint("Free one or increase \"max_replication_slots\".")));
     457             : 
     458             :     /*
     459             :      * Since this slot is not in use, nobody should be looking at any part of
     460             :      * it other than the in_use field unless they're trying to allocate it.
     461             :      * And since we hold ReplicationSlotAllocationLock, nobody except us can
     462             :      * be doing that.  So it's safe to initialize the slot.
     463             :      */
     464             :     Assert(!slot->in_use);
     465             :     Assert(slot->active_proc == INVALID_PROC_NUMBER);
     466             : 
     467             :     /* first initialize persistent data */
     468        1356 :     memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData));
     469        1356 :     namestrcpy(&slot->data.name, name);
     470        1356 :     slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
     471        1356 :     slot->data.persistency = persistency;
     472        1356 :     slot->data.two_phase = two_phase;
     473        1356 :     slot->data.two_phase_at = InvalidXLogRecPtr;
     474        1356 :     slot->data.failover = failover;
     475        1356 :     slot->data.synced = synced;
     476             : 
     477             :     /* and then data only present in shared memory */
     478        1356 :     slot->just_dirtied = false;
     479        1356 :     slot->dirty = false;
     480        1356 :     slot->effective_xmin = InvalidTransactionId;
     481        1356 :     slot->effective_catalog_xmin = InvalidTransactionId;
     482        1356 :     slot->candidate_catalog_xmin = InvalidTransactionId;
     483        1356 :     slot->candidate_xmin_lsn = InvalidXLogRecPtr;
     484        1356 :     slot->candidate_restart_valid = InvalidXLogRecPtr;
     485        1356 :     slot->candidate_restart_lsn = InvalidXLogRecPtr;
     486        1356 :     slot->last_saved_confirmed_flush = InvalidXLogRecPtr;
     487        1356 :     slot->last_saved_restart_lsn = InvalidXLogRecPtr;
     488        1356 :     slot->inactive_since = 0;
     489        1356 :     slot->slotsync_skip_reason = SS_SKIP_NONE;
     490             : 
     491             :     /*
     492             :      * Create the slot on disk.  We haven't actually marked the slot allocated
     493             :      * yet, so no special cleanup is required if this errors out.
     494             :      */
     495        1356 :     CreateSlotOnDisk(slot);
     496             : 
     497             :     /*
     498             :      * We need to briefly prevent any other backend from iterating over the
     499             :      * slots while we flip the in_use flag. We also need to set the active
     500             :      * flag while holding the ControlLock as otherwise a concurrent
     501             :      * ReplicationSlotAcquire() could acquire the slot as well.
     502             :      */
     503        1356 :     LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
     504             : 
     505        1356 :     slot->in_use = true;
     506             : 
     507             :     /* We can now mark the slot active, and that makes it our slot. */
     508        1356 :     SpinLockAcquire(&slot->mutex);
     509             :     Assert(slot->active_proc == INVALID_PROC_NUMBER);
     510        1356 :     slot->active_proc = MyProcNumber;
     511        1356 :     SpinLockRelease(&slot->mutex);
     512        1356 :     MyReplicationSlot = slot;
     513             : 
     514        1356 :     LWLockRelease(ReplicationSlotControlLock);
     515             : 
     516             :     /*
     517             :      * Create statistics entry for the new logical slot. We don't collect any
     518             :      * stats for physical slots, so no need to create an entry for the same.
     519             :      * See ReplicationSlotDropPtr for why we need to do this before releasing
     520             :      * ReplicationSlotAllocationLock.
     521             :      */
     522        1356 :     if (SlotIsLogical(slot))
     523         970 :         pgstat_create_replslot(slot);
     524             : 
     525             :     /*
     526             :      * Now that the slot has been marked as in_use and active, it's safe to
     527             :      * let somebody else try to allocate a slot.
     528             :      */
     529        1356 :     LWLockRelease(ReplicationSlotAllocationLock);
     530             : 
     531             :     /* Let everybody know we've modified this slot */
     532        1356 :     ConditionVariableBroadcast(&slot->active_cv);
     533        1356 : }
     534             : 
     535             : /*
     536             :  * Search for the named replication slot.
     537             :  *
     538             :  * Return the replication slot if found, otherwise NULL.
     539             :  */
     540             : ReplicationSlot *
     541        3956 : SearchNamedReplicationSlot(const char *name, bool need_lock)
     542             : {
     543             :     int         i;
     544        3956 :     ReplicationSlot *slot = NULL;
     545             : 
     546        3956 :     if (need_lock)
     547        1152 :         LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     548             : 
     549       15110 :     for (i = 0; i < max_replication_slots; i++)
     550             :     {
     551       14164 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
     552             : 
     553       14164 :         if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
     554             :         {
     555        3010 :             slot = s;
     556        3010 :             break;
     557             :         }
     558             :     }
     559             : 
     560        3956 :     if (need_lock)
     561        1152 :         LWLockRelease(ReplicationSlotControlLock);
     562             : 
     563        3956 :     return slot;
     564             : }
     565             : 
     566             : /*
     567             :  * Return the index of the replication slot in
     568             :  * ReplicationSlotCtl->replication_slots.
     569             :  *
     570             :  * This is mainly useful to have an efficient key for storing replication slot
     571             :  * stats.
     572             :  */
     573             : int
     574       15340 : ReplicationSlotIndex(ReplicationSlot *slot)
     575             : {
     576             :     Assert(slot >= ReplicationSlotCtl->replication_slots &&
     577             :            slot < ReplicationSlotCtl->replication_slots + max_replication_slots);
     578             : 
     579       15340 :     return slot - ReplicationSlotCtl->replication_slots;
     580             : }
     581             : 
     582             : /*
     583             :  * If the slot at 'index' is unused, return false. Otherwise 'name' is set to
     584             :  * the slot's name and true is returned.
     585             :  *
     586             :  * This likely is only useful for pgstat_replslot.c during shutdown, in other
     587             :  * cases there are obvious TOCTOU issues.
     588             :  */
     589             : bool
     590         212 : ReplicationSlotName(int index, Name name)
     591             : {
     592             :     ReplicationSlot *slot;
     593             :     bool        found;
     594             : 
     595         212 :     slot = &ReplicationSlotCtl->replication_slots[index];
     596             : 
     597             :     /*
     598             :      * Ensure that the slot cannot be dropped while we copy the name. Don't
     599             :      * need the spinlock as the name of an existing slot cannot change.
     600             :      */
     601         212 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     602         212 :     found = slot->in_use;
     603         212 :     if (slot->in_use)
     604         212 :         namestrcpy(name, NameStr(slot->data.name));
     605         212 :     LWLockRelease(ReplicationSlotControlLock);
     606             : 
     607         212 :     return found;
     608             : }
     609             : 
     610             : /*
     611             :  * Find a previously created slot and mark it as used by this process.
     612             :  *
     613             :  * An error is raised if nowait is true and the slot is currently in use. If
     614             :  * nowait is false, we sleep until the slot is released by the owning process.
     615             :  *
     616             :  * An error is raised if error_if_invalid is true and the slot is found to
     617             :  * be invalid. It should always be set to true, except when we are temporarily
     618             :  * acquiring the slot and don't intend to change it.
     619             :  */
     620             : void
     621        2650 : ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
     622             : {
     623             :     ReplicationSlot *s;
     624             :     ProcNumber  active_proc;
     625             :     int         active_pid;
     626             : 
     627             :     Assert(name != NULL);
     628             : 
     629        2650 : retry:
     630             :     Assert(MyReplicationSlot == NULL);
     631             : 
     632        2650 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     633             : 
     634             :     /* Check if the slot exists with the given name. */
     635        2650 :     s = SearchNamedReplicationSlot(name, false);
     636        2650 :     if (s == NULL || !s->in_use)
     637             :     {
     638          20 :         LWLockRelease(ReplicationSlotControlLock);
     639             : 
     640          20 :         ereport(ERROR,
     641             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
     642             :                  errmsg("replication slot \"%s\" does not exist",
     643             :                         name)));
     644             :     }
     645             : 
     646             :     /*
     647             :      * Do not allow users to acquire the reserved slot. This scenario may
     648             :      * occur if the launcher that owns the slot has terminated unexpectedly
     649             :      * due to an error, and a backend process attempts to reuse the slot.
     650             :      */
     651        2630 :     if (!IsLogicalLauncher() && IsSlotForConflictCheck(name))
     652           0 :         ereport(ERROR,
     653             :                 errcode(ERRCODE_UNDEFINED_OBJECT),
     654             :                 errmsg("cannot acquire replication slot \"%s\"", name),
     655             :                 errdetail("The slot is reserved for conflict detection and can only be acquired by logical replication launcher."));
     656             : 
     657             :     /*
     658             :      * This is the slot we want; check if it's active under some other
     659             :      * process.  In single user mode, we don't need this check.
     660             :      */
     661        2630 :     if (IsUnderPostmaster)
     662             :     {
     663             :         /*
     664             :          * Get ready to sleep on the slot in case it is active.  (We may end
     665             :          * up not sleeping, but we don't want to do this while holding the
     666             :          * spinlock.)
     667             :          */
     668        2620 :         if (!nowait)
     669         548 :             ConditionVariablePrepareToSleep(&s->active_cv);
     670             : 
     671             :         /*
     672             :          * It is important to reset the inactive_since under spinlock here to
     673             :          * avoid race conditions with slot invalidation. See comments related
     674             :          * to inactive_since in InvalidatePossiblyObsoleteSlot.
     675             :          */
     676        2620 :         SpinLockAcquire(&s->mutex);
     677        2620 :         if (s->active_proc == INVALID_PROC_NUMBER)
     678        2322 :             s->active_proc = MyProcNumber;
     679        2620 :         active_proc = s->active_proc;
     680        2620 :         ReplicationSlotSetInactiveSince(s, 0, false);
     681        2620 :         SpinLockRelease(&s->mutex);
     682             :     }
     683             :     else
     684             :     {
     685          10 :         s->active_proc = active_proc = MyProcNumber;
     686          10 :         ReplicationSlotSetInactiveSince(s, 0, true);
     687             :     }
     688        2630 :     active_pid = GetPGProcByNumber(active_proc)->pid;
     689        2630 :     LWLockRelease(ReplicationSlotControlLock);
     690             : 
     691             :     /*
     692             :      * If we found the slot but it's already active in another process, we
     693             :      * wait until the owning process signals us that it's been released, or
     694             :      * error out.
     695             :      */
     696        2630 :     if (active_proc != MyProcNumber)
     697             :     {
     698           0 :         if (!nowait)
     699             :         {
     700             :             /* Wait here until we get signaled, and then restart */
     701           0 :             ConditionVariableSleep(&s->active_cv,
     702             :                                    WAIT_EVENT_REPLICATION_SLOT_DROP);
     703           0 :             ConditionVariableCancelSleep();
     704           0 :             goto retry;
     705             :         }
     706             : 
     707           0 :         ereport(ERROR,
     708             :                 (errcode(ERRCODE_OBJECT_IN_USE),
     709             :                  errmsg("replication slot \"%s\" is active for PID %d",
     710             :                         NameStr(s->data.name), active_pid)));
     711             :     }
     712        2630 :     else if (!nowait)
     713         548 :         ConditionVariableCancelSleep(); /* no sleep needed after all */
     714             : 
     715             :     /* We made this slot active, so it's ours now. */
     716        2630 :     MyReplicationSlot = s;
     717             : 
     718             :     /*
     719             :      * We need to check for invalidation after making the slot ours to avoid
     720             :      * the possible race condition with the checkpointer that can otherwise
     721             :      * invalidate the slot immediately after the check.
     722             :      */
     723        2630 :     if (error_if_invalid && s->data.invalidated != RS_INVAL_NONE)
     724          14 :         ereport(ERROR,
     725             :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     726             :                 errmsg("can no longer access replication slot \"%s\"",
     727             :                        NameStr(s->data.name)),
     728             :                 errdetail("This replication slot has been invalidated due to \"%s\".",
     729             :                           GetSlotInvalidationCauseName(s->data.invalidated)));
     730             : 
     731             :     /* Let everybody know we've modified this slot */
     732        2616 :     ConditionVariableBroadcast(&s->active_cv);
     733             : 
     734             :     /*
     735             :      * The call to pgstat_acquire_replslot() protects against stats for a
     736             :      * different slot, from before a restart or such, being present during
     737             :      * pgstat_report_replslot().
     738             :      */
     739        2616 :     if (SlotIsLogical(s))
     740        2182 :         pgstat_acquire_replslot(s);
     741             : 
     742             : 
     743        2616 :     if (am_walsender)
     744             :     {
     745        1774 :         ereport(log_replication_commands ? LOG : DEBUG1,
     746             :                 SlotIsLogical(s)
     747             :                 ? errmsg("acquired logical replication slot \"%s\"",
     748             :                          NameStr(s->data.name))
     749             :                 : errmsg("acquired physical replication slot \"%s\"",
     750             :                          NameStr(s->data.name)));
     751             :     }
     752        2616 : }
     753             : 
     754             : /*
     755             :  * Release the replication slot that this backend considers to own.
     756             :  *
     757             :  * This or another backend can re-acquire the slot later.
     758             :  * Resources this slot requires will be preserved.
     759             :  */
     760             : void
     761        3186 : ReplicationSlotRelease(void)
     762             : {
     763        3186 :     ReplicationSlot *slot = MyReplicationSlot;
     764        3186 :     char       *slotname = NULL;    /* keep compiler quiet */
     765             :     bool        is_logical;
     766        3186 :     TimestampTz now = 0;
     767             : 
     768             :     Assert(slot != NULL && slot->active_proc != INVALID_PROC_NUMBER);
     769             : 
     770        3186 :     is_logical = SlotIsLogical(slot);
     771             : 
     772        3186 :     if (am_walsender)
     773        2214 :         slotname = pstrdup(NameStr(slot->data.name));
     774             : 
     775        3186 :     if (slot->data.persistency == RS_EPHEMERAL)
     776             :     {
     777             :         /*
     778             :          * Delete the slot. There is no !PANIC case where this is allowed to
     779             :          * fail, all that may happen is an incomplete cleanup of the on-disk
     780             :          * data.
     781             :          */
     782          12 :         ReplicationSlotDropAcquired();
     783             : 
     784             :         /*
     785             :          * Request to disable logical decoding, even though this slot may not
     786             :          * have been the last logical slot. The checkpointer will verify if
     787             :          * logical decoding should actually be disabled.
     788             :          */
     789          12 :         if (is_logical)
     790          12 :             RequestDisableLogicalDecoding();
     791             :     }
     792             : 
     793             :     /*
     794             :      * If slot needed to temporarily restrain both data and catalog xmin to
     795             :      * create the catalog snapshot, remove that temporary constraint.
     796             :      * Snapshots can only be exported while the initial snapshot is still
     797             :      * acquired.
     798             :      */
     799        3186 :     if (!TransactionIdIsValid(slot->data.xmin) &&
     800        3124 :         TransactionIdIsValid(slot->effective_xmin))
     801             :     {
     802         398 :         SpinLockAcquire(&slot->mutex);
     803         398 :         slot->effective_xmin = InvalidTransactionId;
     804         398 :         SpinLockRelease(&slot->mutex);
     805         398 :         ReplicationSlotsComputeRequiredXmin(false);
     806             :     }
     807             : 
     808             :     /*
     809             :      * Set the time since the slot has become inactive. We get the current
     810             :      * time beforehand to avoid system call while holding the spinlock.
     811             :      */
     812        3186 :     now = GetCurrentTimestamp();
     813             : 
     814        3186 :     if (slot->data.persistency == RS_PERSISTENT)
     815             :     {
     816             :         /*
     817             :          * Mark persistent slot inactive.  We're not freeing it, just
     818             :          * disconnecting, but wake up others that may be waiting for it.
     819             :          */
     820        2574 :         SpinLockAcquire(&slot->mutex);
     821        2574 :         slot->active_proc = INVALID_PROC_NUMBER;
     822        2574 :         ReplicationSlotSetInactiveSince(slot, now, false);
     823        2574 :         SpinLockRelease(&slot->mutex);
     824        2574 :         ConditionVariableBroadcast(&slot->active_cv);
     825             :     }
     826             :     else
     827         612 :         ReplicationSlotSetInactiveSince(slot, now, true);
     828             : 
     829        3186 :     MyReplicationSlot = NULL;
     830             : 
     831             :     /* might not have been set when we've been a plain slot */
     832        3186 :     LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
     833        3186 :     MyProc->statusFlags &= ~PROC_IN_LOGICAL_DECODING;
     834        3186 :     ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags;
     835        3186 :     LWLockRelease(ProcArrayLock);
     836             : 
     837        3186 :     if (am_walsender)
     838             :     {
     839        2214 :         ereport(log_replication_commands ? LOG : DEBUG1,
     840             :                 is_logical
     841             :                 ? errmsg("released logical replication slot \"%s\"",
     842             :                          slotname)
     843             :                 : errmsg("released physical replication slot \"%s\"",
     844             :                          slotname));
     845             : 
     846        2214 :         pfree(slotname);
     847             :     }
     848        3186 : }
     849             : 
     850             : /*
     851             :  * Cleanup temporary slots created in current session.
     852             :  *
     853             :  * Cleanup only synced temporary slots if 'synced_only' is true, else
     854             :  * cleanup all temporary slots.
     855             :  *
     856             :  * If it drops the last logical slot in the cluster, requests to disable
     857             :  * logical decoding.
     858             :  */
     859             : void
     860       90482 : ReplicationSlotCleanup(bool synced_only)
     861             : {
     862             :     int         i;
     863             :     bool        found_valid_logicalslot;
     864       90482 :     bool        dropped_logical = false;
     865             : 
     866             :     Assert(MyReplicationSlot == NULL);
     867             : 
     868       90784 : restart:
     869       90784 :     found_valid_logicalslot = false;
     870       90784 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     871      983710 :     for (i = 0; i < max_replication_slots; i++)
     872             :     {
     873      893228 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
     874             : 
     875      893228 :         if (!s->in_use)
     876      863528 :             continue;
     877             : 
     878       29700 :         SpinLockAcquire(&s->mutex);
     879             : 
     880       59400 :         found_valid_logicalslot |=
     881       29700 :             (SlotIsLogical(s) && s->data.invalidated == RS_INVAL_NONE);
     882             : 
     883       29700 :         if ((s->active_proc == MyProcNumber &&
     884         302 :              (!synced_only || s->data.synced)))
     885             :         {
     886             :             Assert(s->data.persistency == RS_TEMPORARY);
     887         302 :             SpinLockRelease(&s->mutex);
     888         302 :             LWLockRelease(ReplicationSlotControlLock);  /* avoid deadlock */
     889             : 
     890         302 :             if (SlotIsLogical(s))
     891          20 :                 dropped_logical = true;
     892             : 
     893         302 :             ReplicationSlotDropPtr(s);
     894             : 
     895         302 :             ConditionVariableBroadcast(&s->active_cv);
     896         302 :             goto restart;
     897             :         }
     898             :         else
     899       29398 :             SpinLockRelease(&s->mutex);
     900             :     }
     901             : 
     902       90482 :     LWLockRelease(ReplicationSlotControlLock);
     903             : 
     904       90482 :     if (dropped_logical && !found_valid_logicalslot)
     905           6 :         RequestDisableLogicalDecoding();
     906       90482 : }
     907             : 
     908             : /*
     909             :  * Permanently drop replication slot identified by the passed in name.
     910             :  */
     911             : void
     912         844 : ReplicationSlotDrop(const char *name, bool nowait)
     913             : {
     914             :     bool        is_logical;
     915             : 
     916             :     Assert(MyReplicationSlot == NULL);
     917             : 
     918         844 :     ReplicationSlotAcquire(name, nowait, false);
     919             : 
     920             :     /*
     921             :      * Do not allow users to drop the slots which are currently being synced
     922             :      * from the primary to the standby.
     923             :      */
     924         828 :     if (RecoveryInProgress() && MyReplicationSlot->data.synced)
     925           2 :         ereport(ERROR,
     926             :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     927             :                 errmsg("cannot drop replication slot \"%s\"", name),
     928             :                 errdetail("This replication slot is being synchronized from the primary server."));
     929             : 
     930         826 :     is_logical = SlotIsLogical(MyReplicationSlot);
     931             : 
     932         826 :     ReplicationSlotDropAcquired();
     933             : 
     934         826 :     if (is_logical)
     935         786 :         RequestDisableLogicalDecoding();
     936         826 : }
     937             : 
     938             : /*
     939             :  * Change the definition of the slot identified by the specified name.
     940             :  *
     941             :  * Altering the two_phase property of a slot requires caution on the
     942             :  * client-side. Enabling it at any random point during decoding has the
     943             :  * risk that transactions prepared before this change may be skipped by
     944             :  * the decoder, leading to missing prepare records on the client. So, we
     945             :  * enable it for subscription related slots only once the initial tablesync
     946             :  * is finished. See comments atop worker.c. Disabling it is safe only when
     947             :  * there are no pending prepared transaction, otherwise, the changes of
     948             :  * already prepared transactions can be replicated again along with their
     949             :  * corresponding commit leading to duplicate data or errors.
     950             :  */
     951             : void
     952          14 : ReplicationSlotAlter(const char *name, const bool *failover,
     953             :                      const bool *two_phase)
     954             : {
     955          14 :     bool        update_slot = false;
     956             : 
     957             :     Assert(MyReplicationSlot == NULL);
     958             :     Assert(failover || two_phase);
     959             : 
     960          14 :     ReplicationSlotAcquire(name, false, true);
     961             : 
     962          12 :     if (SlotIsPhysical(MyReplicationSlot))
     963           0 :         ereport(ERROR,
     964             :                 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     965             :                 errmsg("cannot use %s with a physical replication slot",
     966             :                        "ALTER_REPLICATION_SLOT"));
     967             : 
     968          12 :     if (RecoveryInProgress())
     969             :     {
     970             :         /*
     971             :          * Do not allow users to alter the slots which are currently being
     972             :          * synced from the primary to the standby.
     973             :          */
     974           2 :         if (MyReplicationSlot->data.synced)
     975           2 :             ereport(ERROR,
     976             :                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     977             :                     errmsg("cannot alter replication slot \"%s\"", name),
     978             :                     errdetail("This replication slot is being synchronized from the primary server."));
     979             : 
     980             :         /*
     981             :          * Do not allow users to enable failover on the standby as we do not
     982             :          * support sync to the cascading standby.
     983             :          */
     984           0 :         if (failover && *failover)
     985           0 :             ereport(ERROR,
     986             :                     errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     987             :                     errmsg("cannot enable failover for a replication slot"
     988             :                            " on the standby"));
     989             :     }
     990             : 
     991          10 :     if (failover)
     992             :     {
     993             :         /*
     994             :          * Do not allow users to enable failover for temporary slots as we do
     995             :          * not support syncing temporary slots to the standby.
     996             :          */
     997           8 :         if (*failover && MyReplicationSlot->data.persistency == RS_TEMPORARY)
     998           0 :             ereport(ERROR,
     999             :                     errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1000             :                     errmsg("cannot enable failover for a temporary replication slot"));
    1001             : 
    1002           8 :         if (MyReplicationSlot->data.failover != *failover)
    1003             :         {
    1004           8 :             SpinLockAcquire(&MyReplicationSlot->mutex);
    1005           8 :             MyReplicationSlot->data.failover = *failover;
    1006           8 :             SpinLockRelease(&MyReplicationSlot->mutex);
    1007             : 
    1008           8 :             update_slot = true;
    1009             :         }
    1010             :     }
    1011             : 
    1012          10 :     if (two_phase && MyReplicationSlot->data.two_phase != *two_phase)
    1013             :     {
    1014           2 :         SpinLockAcquire(&MyReplicationSlot->mutex);
    1015           2 :         MyReplicationSlot->data.two_phase = *two_phase;
    1016           2 :         SpinLockRelease(&MyReplicationSlot->mutex);
    1017             : 
    1018           2 :         update_slot = true;
    1019             :     }
    1020             : 
    1021          10 :     if (update_slot)
    1022             :     {
    1023          10 :         ReplicationSlotMarkDirty();
    1024          10 :         ReplicationSlotSave();
    1025             :     }
    1026             : 
    1027          10 :     ReplicationSlotRelease();
    1028          10 : }
    1029             : 
    1030             : /*
    1031             :  * Permanently drop the currently acquired replication slot.
    1032             :  */
    1033             : void
    1034         854 : ReplicationSlotDropAcquired(void)
    1035             : {
    1036         854 :     ReplicationSlot *slot = MyReplicationSlot;
    1037             : 
    1038             :     Assert(MyReplicationSlot != NULL);
    1039             : 
    1040             :     /* slot isn't acquired anymore */
    1041         854 :     MyReplicationSlot = NULL;
    1042             : 
    1043         854 :     ReplicationSlotDropPtr(slot);
    1044         854 : }
    1045             : 
    1046             : /*
    1047             :  * Permanently drop the replication slot which will be released by the point
    1048             :  * this function returns.
    1049             :  */
    1050             : static void
    1051        1156 : ReplicationSlotDropPtr(ReplicationSlot *slot)
    1052             : {
    1053             :     char        path[MAXPGPATH];
    1054             :     char        tmppath[MAXPGPATH];
    1055             : 
    1056             :     /*
    1057             :      * If some other backend ran this code concurrently with us, we might try
    1058             :      * to delete a slot with a certain name while someone else was trying to
    1059             :      * create a slot with the same name.
    1060             :      */
    1061        1156 :     LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
    1062             : 
    1063             :     /* Generate pathnames. */
    1064        1156 :     sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(slot->data.name));
    1065        1156 :     sprintf(tmppath, "%s/%s.tmp", PG_REPLSLOT_DIR, NameStr(slot->data.name));
    1066             : 
    1067             :     /*
    1068             :      * Rename the slot directory on disk, so that we'll no longer recognize
    1069             :      * this as a valid slot.  Note that if this fails, we've got to mark the
    1070             :      * slot inactive before bailing out.  If we're dropping an ephemeral or a
    1071             :      * temporary slot, we better never fail hard as the caller won't expect
    1072             :      * the slot to survive and this might get called during error handling.
    1073             :      */
    1074        1156 :     if (rename(path, tmppath) == 0)
    1075             :     {
    1076             :         /*
    1077             :          * We need to fsync() the directory we just renamed and its parent to
    1078             :          * make sure that our changes are on disk in a crash-safe fashion.  If
    1079             :          * fsync() fails, we can't be sure whether the changes are on disk or
    1080             :          * not.  For now, we handle that by panicking;
    1081             :          * StartupReplicationSlots() will try to straighten it out after
    1082             :          * restart.
    1083             :          */
    1084        1156 :         START_CRIT_SECTION();
    1085        1156 :         fsync_fname(tmppath, true);
    1086        1156 :         fsync_fname(PG_REPLSLOT_DIR, true);
    1087        1156 :         END_CRIT_SECTION();
    1088             :     }
    1089             :     else
    1090             :     {
    1091           0 :         bool        fail_softly = slot->data.persistency != RS_PERSISTENT;
    1092             : 
    1093           0 :         SpinLockAcquire(&slot->mutex);
    1094           0 :         slot->active_proc = INVALID_PROC_NUMBER;
    1095           0 :         SpinLockRelease(&slot->mutex);
    1096             : 
    1097             :         /* wake up anyone waiting on this slot */
    1098           0 :         ConditionVariableBroadcast(&slot->active_cv);
    1099             : 
    1100           0 :         ereport(fail_softly ? WARNING : ERROR,
    1101             :                 (errcode_for_file_access(),
    1102             :                  errmsg("could not rename file \"%s\" to \"%s\": %m",
    1103             :                         path, tmppath)));
    1104             :     }
    1105             : 
    1106             :     /*
    1107             :      * The slot is definitely gone.  Lock out concurrent scans of the array
    1108             :      * long enough to kill it.  It's OK to clear the active PID here without
    1109             :      * grabbing the mutex because nobody else can be scanning the array here,
    1110             :      * and nobody can be attached to this slot and thus access it without
    1111             :      * scanning the array.
    1112             :      *
    1113             :      * Also wake up processes waiting for it.
    1114             :      */
    1115        1156 :     LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
    1116        1156 :     slot->active_proc = INVALID_PROC_NUMBER;
    1117        1156 :     slot->in_use = false;
    1118        1156 :     LWLockRelease(ReplicationSlotControlLock);
    1119        1156 :     ConditionVariableBroadcast(&slot->active_cv);
    1120             : 
    1121             :     /*
    1122             :      * Slot is dead and doesn't prevent resource removal anymore, recompute
    1123             :      * limits.
    1124             :      */
    1125        1156 :     ReplicationSlotsComputeRequiredXmin(false);
    1126        1156 :     ReplicationSlotsComputeRequiredLSN();
    1127             : 
    1128             :     /*
    1129             :      * If removing the directory fails, the worst thing that will happen is
    1130             :      * that the user won't be able to create a new slot with the same name
    1131             :      * until the next server restart.  We warn about it, but that's all.
    1132             :      */
    1133        1156 :     if (!rmtree(tmppath, true))
    1134           0 :         ereport(WARNING,
    1135             :                 (errmsg("could not remove directory \"%s\"", tmppath)));
    1136             : 
    1137             :     /*
    1138             :      * Drop the statistics entry for the replication slot.  Do this while
    1139             :      * holding ReplicationSlotAllocationLock so that we don't drop a
    1140             :      * statistics entry for another slot with the same name just created in
    1141             :      * another session.
    1142             :      */
    1143        1156 :     if (SlotIsLogical(slot))
    1144         832 :         pgstat_drop_replslot(slot);
    1145             : 
    1146             :     /*
    1147             :      * We release this at the very end, so that nobody starts trying to create
    1148             :      * a slot while we're still cleaning up the detritus of the old one.
    1149             :      */
    1150        1156 :     LWLockRelease(ReplicationSlotAllocationLock);
    1151        1156 : }
    1152             : 
    1153             : /*
    1154             :  * Serialize the currently acquired slot's state from memory to disk, thereby
    1155             :  * guaranteeing the current state will survive a crash.
    1156             :  */
    1157             : void
    1158        2822 : ReplicationSlotSave(void)
    1159             : {
    1160             :     char        path[MAXPGPATH];
    1161             : 
    1162             :     Assert(MyReplicationSlot != NULL);
    1163             : 
    1164        2822 :     sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(MyReplicationSlot->data.name));
    1165        2822 :     SaveSlotToPath(MyReplicationSlot, path, ERROR);
    1166        2822 : }
    1167             : 
    1168             : /*
    1169             :  * Signal that it would be useful if the currently acquired slot would be
    1170             :  * flushed out to disk.
    1171             :  *
    1172             :  * Note that the actual flush to disk can be delayed for a long time, if
    1173             :  * required for correctness explicitly do a ReplicationSlotSave().
    1174             :  */
    1175             : void
    1176       72880 : ReplicationSlotMarkDirty(void)
    1177             : {
    1178       72880 :     ReplicationSlot *slot = MyReplicationSlot;
    1179             : 
    1180             :     Assert(MyReplicationSlot != NULL);
    1181             : 
    1182       72880 :     SpinLockAcquire(&slot->mutex);
    1183       72880 :     MyReplicationSlot->just_dirtied = true;
    1184       72880 :     MyReplicationSlot->dirty = true;
    1185       72880 :     SpinLockRelease(&slot->mutex);
    1186       72880 : }
    1187             : 
    1188             : /*
    1189             :  * Convert a slot that's marked as RS_EPHEMERAL or RS_TEMPORARY to a
    1190             :  * RS_PERSISTENT slot, guaranteeing it will be there after an eventual crash.
    1191             :  */
    1192             : void
    1193         938 : ReplicationSlotPersist(void)
    1194             : {
    1195         938 :     ReplicationSlot *slot = MyReplicationSlot;
    1196             : 
    1197             :     Assert(slot != NULL);
    1198             :     Assert(slot->data.persistency != RS_PERSISTENT);
    1199             : 
    1200         938 :     SpinLockAcquire(&slot->mutex);
    1201         938 :     slot->data.persistency = RS_PERSISTENT;
    1202         938 :     SpinLockRelease(&slot->mutex);
    1203             : 
    1204         938 :     ReplicationSlotMarkDirty();
    1205         938 :     ReplicationSlotSave();
    1206         938 : }
    1207             : 
    1208             : /*
    1209             :  * Compute the oldest xmin across all slots and store it in the ProcArray.
    1210             :  *
    1211             :  * If already_locked is true, both the ReplicationSlotControlLock and the
    1212             :  * ProcArrayLock have already been acquired exclusively. It is crucial that the
    1213             :  * caller first acquires the ReplicationSlotControlLock, followed by the
    1214             :  * ProcArrayLock, to prevent any undetectable deadlocks since this function
    1215             :  * acquires them in that order.
    1216             :  */
    1217             : void
    1218        4910 : ReplicationSlotsComputeRequiredXmin(bool already_locked)
    1219             : {
    1220             :     int         i;
    1221        4910 :     TransactionId agg_xmin = InvalidTransactionId;
    1222        4910 :     TransactionId agg_catalog_xmin = InvalidTransactionId;
    1223             : 
    1224             :     Assert(ReplicationSlotCtl != NULL);
    1225             :     Assert(!already_locked ||
    1226             :            (LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_EXCLUSIVE) &&
    1227             :             LWLockHeldByMeInMode(ProcArrayLock, LW_EXCLUSIVE)));
    1228             : 
    1229             :     /*
    1230             :      * Hold the ReplicationSlotControlLock until after updating the slot xmin
    1231             :      * values, so no backend updates the initial xmin for newly created slot
    1232             :      * concurrently. A shared lock is used here to minimize lock contention,
    1233             :      * especially when many slots exist and advancements occur frequently.
    1234             :      * This is safe since an exclusive lock is taken during initial slot xmin
    1235             :      * update in slot creation.
    1236             :      *
    1237             :      * One might think that we can hold the ProcArrayLock exclusively and
    1238             :      * update the slot xmin values, but it could increase lock contention on
    1239             :      * the ProcArrayLock, which is not great since this function can be called
    1240             :      * at non-negligible frequency.
    1241             :      *
    1242             :      * Concurrent invocation of this function may cause the computed slot xmin
    1243             :      * to regress. However, this is harmless because tuples prior to the most
    1244             :      * recent xmin are no longer useful once advancement occurs (see
    1245             :      * LogicalConfirmReceivedLocation where the slot's xmin value is flushed
    1246             :      * before updating the effective_xmin). Thus, such regression merely
    1247             :      * prevents VACUUM from prematurely removing tuples without causing the
    1248             :      * early deletion of required data.
    1249             :      */
    1250        4910 :     if (!already_locked)
    1251        3938 :         LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    1252             : 
    1253       49876 :     for (i = 0; i < max_replication_slots; i++)
    1254             :     {
    1255       44966 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
    1256             :         TransactionId effective_xmin;
    1257             :         TransactionId effective_catalog_xmin;
    1258             :         bool        invalidated;
    1259             : 
    1260       44966 :         if (!s->in_use)
    1261       40302 :             continue;
    1262             : 
    1263        4664 :         SpinLockAcquire(&s->mutex);
    1264        4664 :         effective_xmin = s->effective_xmin;
    1265        4664 :         effective_catalog_xmin = s->effective_catalog_xmin;
    1266        4664 :         invalidated = s->data.invalidated != RS_INVAL_NONE;
    1267        4664 :         SpinLockRelease(&s->mutex);
    1268             : 
    1269             :         /* invalidated slots need not apply */
    1270        4664 :         if (invalidated)
    1271          52 :             continue;
    1272             : 
    1273             :         /* check the data xmin */
    1274        4612 :         if (TransactionIdIsValid(effective_xmin) &&
    1275          24 :             (!TransactionIdIsValid(agg_xmin) ||
    1276          24 :              TransactionIdPrecedes(effective_xmin, agg_xmin)))
    1277         692 :             agg_xmin = effective_xmin;
    1278             : 
    1279             :         /* check the catalog xmin */
    1280        4612 :         if (TransactionIdIsValid(effective_catalog_xmin) &&
    1281        1886 :             (!TransactionIdIsValid(agg_catalog_xmin) ||
    1282        1886 :              TransactionIdPrecedes(effective_catalog_xmin, agg_catalog_xmin)))
    1283        2450 :             agg_catalog_xmin = effective_catalog_xmin;
    1284             :     }
    1285             : 
    1286        4910 :     ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
    1287             : 
    1288        4910 :     if (!already_locked)
    1289        3938 :         LWLockRelease(ReplicationSlotControlLock);
    1290        4910 : }
    1291             : 
    1292             : /*
    1293             :  * Compute the oldest restart LSN across all slots and inform xlog module.
    1294             :  *
    1295             :  * Note: while max_slot_wal_keep_size is theoretically relevant for this
    1296             :  * purpose, we don't try to account for that, because this module doesn't
    1297             :  * know what to compare against.
    1298             :  */
    1299             : void
    1300       74284 : ReplicationSlotsComputeRequiredLSN(void)
    1301             : {
    1302             :     int         i;
    1303       74284 :     XLogRecPtr  min_required = InvalidXLogRecPtr;
    1304             : 
    1305             :     Assert(ReplicationSlotCtl != NULL);
    1306             : 
    1307       74284 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    1308      808284 :     for (i = 0; i < max_replication_slots; i++)
    1309             :     {
    1310      734000 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
    1311             :         XLogRecPtr  restart_lsn;
    1312             :         XLogRecPtr  last_saved_restart_lsn;
    1313             :         bool        invalidated;
    1314             :         ReplicationSlotPersistency persistency;
    1315             : 
    1316      734000 :         if (!s->in_use)
    1317      659568 :             continue;
    1318             : 
    1319       74432 :         SpinLockAcquire(&s->mutex);
    1320       74432 :         persistency = s->data.persistency;
    1321       74432 :         restart_lsn = s->data.restart_lsn;
    1322       74432 :         invalidated = s->data.invalidated != RS_INVAL_NONE;
    1323       74432 :         last_saved_restart_lsn = s->last_saved_restart_lsn;
    1324       74432 :         SpinLockRelease(&s->mutex);
    1325             : 
    1326             :         /* invalidated slots need not apply */
    1327       74432 :         if (invalidated)
    1328          54 :             continue;
    1329             : 
    1330             :         /*
    1331             :          * For persistent slot use last_saved_restart_lsn to compute the
    1332             :          * oldest LSN for removal of WAL segments.  The segments between
    1333             :          * last_saved_restart_lsn and restart_lsn might be needed by a
    1334             :          * persistent slot in the case of database crash.  Non-persistent
    1335             :          * slots can't survive the database crash, so we don't care about
    1336             :          * last_saved_restart_lsn for them.
    1337             :          */
    1338       74378 :         if (persistency == RS_PERSISTENT)
    1339             :         {
    1340       72658 :             if (XLogRecPtrIsValid(last_saved_restart_lsn) &&
    1341             :                 restart_lsn > last_saved_restart_lsn)
    1342             :             {
    1343       68260 :                 restart_lsn = last_saved_restart_lsn;
    1344             :             }
    1345             :         }
    1346             : 
    1347       74378 :         if (XLogRecPtrIsValid(restart_lsn) &&
    1348        2270 :             (!XLogRecPtrIsValid(min_required) ||
    1349             :              restart_lsn < min_required))
    1350       72274 :             min_required = restart_lsn;
    1351             :     }
    1352       74284 :     LWLockRelease(ReplicationSlotControlLock);
    1353             : 
    1354       74284 :     XLogSetReplicationSlotMinimumLSN(min_required);
    1355       74284 : }
    1356             : 
    1357             : /*
    1358             :  * Compute the oldest WAL LSN required by *logical* decoding slots..
    1359             :  *
    1360             :  * Returns InvalidXLogRecPtr if logical decoding is disabled or no logical
    1361             :  * slots exist.
    1362             :  *
    1363             :  * NB: this returns a value >= ReplicationSlotsComputeRequiredLSN(), since it
    1364             :  * ignores physical replication slots.
    1365             :  *
    1366             :  * The results aren't required frequently, so we don't maintain a precomputed
    1367             :  * value like we do for ComputeRequiredLSN() and ComputeRequiredXmin().
    1368             :  */
    1369             : XLogRecPtr
    1370        7160 : ReplicationSlotsComputeLogicalRestartLSN(void)
    1371             : {
    1372        7160 :     XLogRecPtr  result = InvalidXLogRecPtr;
    1373             :     int         i;
    1374             : 
    1375        7160 :     if (max_replication_slots <= 0)
    1376           4 :         return InvalidXLogRecPtr;
    1377             : 
    1378        7156 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    1379             : 
    1380       77656 :     for (i = 0; i < max_replication_slots; i++)
    1381             :     {
    1382             :         ReplicationSlot *s;
    1383             :         XLogRecPtr  restart_lsn;
    1384             :         XLogRecPtr  last_saved_restart_lsn;
    1385             :         bool        invalidated;
    1386             :         ReplicationSlotPersistency persistency;
    1387             : 
    1388       70500 :         s = &ReplicationSlotCtl->replication_slots[i];
    1389             : 
    1390             :         /* cannot change while ReplicationSlotCtlLock is held */
    1391       70500 :         if (!s->in_use)
    1392       68916 :             continue;
    1393             : 
    1394             :         /* we're only interested in logical slots */
    1395        1584 :         if (!SlotIsLogical(s))
    1396        1092 :             continue;
    1397             : 
    1398             :         /* read once, it's ok if it increases while we're checking */
    1399         492 :         SpinLockAcquire(&s->mutex);
    1400         492 :         persistency = s->data.persistency;
    1401         492 :         restart_lsn = s->data.restart_lsn;
    1402         492 :         invalidated = s->data.invalidated != RS_INVAL_NONE;
    1403         492 :         last_saved_restart_lsn = s->last_saved_restart_lsn;
    1404         492 :         SpinLockRelease(&s->mutex);
    1405             : 
    1406             :         /* invalidated slots need not apply */
    1407         492 :         if (invalidated)
    1408          20 :             continue;
    1409             : 
    1410             :         /*
    1411             :          * For persistent slot use last_saved_restart_lsn to compute the
    1412             :          * oldest LSN for removal of WAL segments.  The segments between
    1413             :          * last_saved_restart_lsn and restart_lsn might be needed by a
    1414             :          * persistent slot in the case of database crash.  Non-persistent
    1415             :          * slots can't survive the database crash, so we don't care about
    1416             :          * last_saved_restart_lsn for them.
    1417             :          */
    1418         472 :         if (persistency == RS_PERSISTENT)
    1419             :         {
    1420         468 :             if (XLogRecPtrIsValid(last_saved_restart_lsn) &&
    1421             :                 restart_lsn > last_saved_restart_lsn)
    1422             :             {
    1423           0 :                 restart_lsn = last_saved_restart_lsn;
    1424             :             }
    1425             :         }
    1426             : 
    1427         472 :         if (!XLogRecPtrIsValid(restart_lsn))
    1428           0 :             continue;
    1429             : 
    1430         472 :         if (!XLogRecPtrIsValid(result) ||
    1431             :             restart_lsn < result)
    1432         372 :             result = restart_lsn;
    1433             :     }
    1434             : 
    1435        7156 :     LWLockRelease(ReplicationSlotControlLock);
    1436             : 
    1437        7156 :     return result;
    1438             : }
    1439             : 
    1440             : /*
    1441             :  * ReplicationSlotsCountDBSlots -- count the number of slots that refer to the
    1442             :  * passed database oid.
    1443             :  *
    1444             :  * Returns true if there are any slots referencing the database. *nslots will
    1445             :  * be set to the absolute number of slots in the database, *nactive to ones
    1446             :  * currently active.
    1447             :  */
    1448             : bool
    1449          96 : ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
    1450             : {
    1451             :     int         i;
    1452             : 
    1453          96 :     *nslots = *nactive = 0;
    1454             : 
    1455          96 :     if (max_replication_slots <= 0)
    1456           0 :         return false;
    1457             : 
    1458          96 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    1459        1002 :     for (i = 0; i < max_replication_slots; i++)
    1460             :     {
    1461             :         ReplicationSlot *s;
    1462             : 
    1463         906 :         s = &ReplicationSlotCtl->replication_slots[i];
    1464             : 
    1465             :         /* cannot change while ReplicationSlotCtlLock is held */
    1466         906 :         if (!s->in_use)
    1467         864 :             continue;
    1468             : 
    1469             :         /* only logical slots are database specific, skip */
    1470          42 :         if (!SlotIsLogical(s))
    1471          20 :             continue;
    1472             : 
    1473             :         /* not our database, skip */
    1474          22 :         if (s->data.database != dboid)
    1475          16 :             continue;
    1476             : 
    1477             :         /* NB: intentionally counting invalidated slots */
    1478             : 
    1479             :         /* count slots with spinlock held */
    1480           6 :         SpinLockAcquire(&s->mutex);
    1481           6 :         (*nslots)++;
    1482           6 :         if (s->active_proc != INVALID_PROC_NUMBER)
    1483           2 :             (*nactive)++;
    1484           6 :         SpinLockRelease(&s->mutex);
    1485             :     }
    1486          96 :     LWLockRelease(ReplicationSlotControlLock);
    1487             : 
    1488          96 :     if (*nslots > 0)
    1489           6 :         return true;
    1490          90 :     return false;
    1491             : }
    1492             : 
    1493             : /*
    1494             :  * ReplicationSlotsDropDBSlots -- Drop all db-specific slots relating to the
    1495             :  * passed database oid. The caller should hold an exclusive lock on the
    1496             :  * pg_database oid for the database to prevent creation of new slots on the db
    1497             :  * or replay from existing slots.
    1498             :  *
    1499             :  * Another session that concurrently acquires an existing slot on the target DB
    1500             :  * (most likely to drop it) may cause this function to ERROR. If that happens
    1501             :  * it may have dropped some but not all slots.
    1502             :  *
    1503             :  * This routine isn't as efficient as it could be - but we don't drop
    1504             :  * databases often, especially databases with lots of slots.
    1505             :  *
    1506             :  * If it drops the last logical slot in the cluster, it requests to disable
    1507             :  * logical decoding.
    1508             :  */
    1509             : void
    1510         122 : ReplicationSlotsDropDBSlots(Oid dboid)
    1511             : {
    1512             :     int         i;
    1513             :     bool        found_valid_logicalslot;
    1514         122 :     bool        dropped = false;
    1515             : 
    1516         122 :     if (max_replication_slots <= 0)
    1517           0 :         return;
    1518             : 
    1519         122 : restart:
    1520         132 :     found_valid_logicalslot = false;
    1521         132 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    1522        1282 :     for (i = 0; i < max_replication_slots; i++)
    1523             :     {
    1524             :         ReplicationSlot *s;
    1525             :         char       *slotname;
    1526             :         ProcNumber  active_proc;
    1527             : 
    1528        1160 :         s = &ReplicationSlotCtl->replication_slots[i];
    1529             : 
    1530             :         /* cannot change while ReplicationSlotCtlLock is held */
    1531        1160 :         if (!s->in_use)
    1532        1100 :             continue;
    1533             : 
    1534             :         /* only logical slots are database specific, skip */
    1535          60 :         if (!SlotIsLogical(s))
    1536          22 :             continue;
    1537             : 
    1538             :         /*
    1539             :          * Check logical slots on other databases too so we can disable
    1540             :          * logical decoding only if no slots in the cluster.
    1541             :          */
    1542          38 :         SpinLockAcquire(&s->mutex);
    1543          38 :         found_valid_logicalslot |= (s->data.invalidated == RS_INVAL_NONE);
    1544          38 :         SpinLockRelease(&s->mutex);
    1545             : 
    1546             :         /* not our database, skip */
    1547          38 :         if (s->data.database != dboid)
    1548          28 :             continue;
    1549             : 
    1550             :         /* NB: intentionally including invalidated slots to drop */
    1551             : 
    1552             :         /* acquire slot, so ReplicationSlotDropAcquired can be reused  */
    1553          10 :         SpinLockAcquire(&s->mutex);
    1554             :         /* can't change while ReplicationSlotControlLock is held */
    1555          10 :         slotname = NameStr(s->data.name);
    1556          10 :         active_proc = s->active_proc;
    1557          10 :         if (active_proc == INVALID_PROC_NUMBER)
    1558             :         {
    1559          10 :             MyReplicationSlot = s;
    1560          10 :             s->active_proc = MyProcNumber;
    1561             :         }
    1562          10 :         SpinLockRelease(&s->mutex);
    1563             : 
    1564             :         /*
    1565             :          * Even though we hold an exclusive lock on the database object a
    1566             :          * logical slot for that DB can still be active, e.g. if it's
    1567             :          * concurrently being dropped by a backend connected to another DB.
    1568             :          *
    1569             :          * That's fairly unlikely in practice, so we'll just bail out.
    1570             :          *
    1571             :          * The slot sync worker holds a shared lock on the database before
    1572             :          * operating on synced logical slots to avoid conflict with the drop
    1573             :          * happening here. The persistent synced slots are thus safe but there
    1574             :          * is a possibility that the slot sync worker has created a temporary
    1575             :          * slot (which stays active even on release) and we are trying to drop
    1576             :          * that here. In practice, the chances of hitting this scenario are
    1577             :          * less as during slot synchronization, the temporary slot is
    1578             :          * immediately converted to persistent and thus is safe due to the
    1579             :          * shared lock taken on the database. So, we'll just bail out in such
    1580             :          * a case.
    1581             :          *
    1582             :          * XXX: We can consider shutting down the slot sync worker before
    1583             :          * trying to drop synced temporary slots here.
    1584             :          */
    1585          10 :         if (active_proc != INVALID_PROC_NUMBER)
    1586           0 :             ereport(ERROR,
    1587             :                     (errcode(ERRCODE_OBJECT_IN_USE),
    1588             :                      errmsg("replication slot \"%s\" is active for PID %d",
    1589             :                             slotname, GetPGProcByNumber(active_proc)->pid)));
    1590             : 
    1591             :         /*
    1592             :          * To avoid duplicating ReplicationSlotDropAcquired() and to avoid
    1593             :          * holding ReplicationSlotControlLock over filesystem operations,
    1594             :          * release ReplicationSlotControlLock and use
    1595             :          * ReplicationSlotDropAcquired.
    1596             :          *
    1597             :          * As that means the set of slots could change, restart scan from the
    1598             :          * beginning each time we release the lock.
    1599             :          */
    1600          10 :         LWLockRelease(ReplicationSlotControlLock);
    1601          10 :         ReplicationSlotDropAcquired();
    1602          10 :         dropped = true;
    1603          10 :         goto restart;
    1604             :     }
    1605         122 :     LWLockRelease(ReplicationSlotControlLock);
    1606             : 
    1607         122 :     if (dropped && !found_valid_logicalslot)
    1608           0 :         RequestDisableLogicalDecoding();
    1609             : }
    1610             : 
    1611             : /*
    1612             :  * Returns true if there is at least one in-use valid logical replication slot.
    1613             :  */
    1614             : bool
    1615         936 : CheckLogicalSlotExists(void)
    1616             : {
    1617         936 :     bool        found = false;
    1618             : 
    1619         936 :     if (max_replication_slots <= 0)
    1620           2 :         return false;
    1621             : 
    1622         934 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    1623       10106 :     for (int i = 0; i < max_replication_slots; i++)
    1624             :     {
    1625             :         ReplicationSlot *s;
    1626             :         bool        invalidated;
    1627             : 
    1628        9184 :         s = &ReplicationSlotCtl->replication_slots[i];
    1629             : 
    1630             :         /* cannot change while ReplicationSlotCtlLock is held */
    1631        9184 :         if (!s->in_use)
    1632        9128 :             continue;
    1633             : 
    1634          56 :         if (SlotIsPhysical(s))
    1635          38 :             continue;
    1636             : 
    1637          18 :         SpinLockAcquire(&s->mutex);
    1638          18 :         invalidated = s->data.invalidated != RS_INVAL_NONE;
    1639          18 :         SpinLockRelease(&s->mutex);
    1640             : 
    1641          18 :         if (invalidated)
    1642           6 :             continue;
    1643             : 
    1644          12 :         found = true;
    1645          12 :         break;
    1646             :     }
    1647         934 :     LWLockRelease(ReplicationSlotControlLock);
    1648             : 
    1649         934 :     return found;
    1650             : }
    1651             : 
    1652             : /*
    1653             :  * Check whether the server's configuration supports using replication
    1654             :  * slots.
    1655             :  */
    1656             : void
    1657        3606 : CheckSlotRequirements(void)
    1658             : {
    1659             :     /*
    1660             :      * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
    1661             :      * needs the same check.
    1662             :      */
    1663             : 
    1664        3606 :     if (max_replication_slots == 0)
    1665           0 :         ereport(ERROR,
    1666             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1667             :                  errmsg("replication slots can only be used if \"max_replication_slots\" > 0")));
    1668             : 
    1669        3606 :     if (wal_level < WAL_LEVEL_REPLICA)
    1670           0 :         ereport(ERROR,
    1671             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1672             :                  errmsg("replication slots can only be used if \"wal_level\" >= \"replica\"")));
    1673        3606 : }
    1674             : 
    1675             : /*
    1676             :  * Check whether the user has privilege to use replication slots.
    1677             :  */
    1678             : void
    1679        1178 : CheckSlotPermissions(void)
    1680             : {
    1681        1178 :     if (!has_rolreplication(GetUserId()))
    1682          10 :         ereport(ERROR,
    1683             :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    1684             :                  errmsg("permission denied to use replication slots"),
    1685             :                  errdetail("Only roles with the %s attribute may use replication slots.",
    1686             :                            "REPLICATION")));
    1687        1168 : }
    1688             : 
    1689             : /*
    1690             :  * Reserve WAL for the currently active slot.
    1691             :  *
    1692             :  * Compute and set restart_lsn in a manner that's appropriate for the type of
    1693             :  * the slot and concurrency safe.
    1694             :  */
    1695             : void
    1696        1254 : ReplicationSlotReserveWal(void)
    1697             : {
    1698        1254 :     ReplicationSlot *slot = MyReplicationSlot;
    1699             :     XLogSegNo   segno;
    1700             :     XLogRecPtr  restart_lsn;
    1701             : 
    1702             :     Assert(slot != NULL);
    1703             :     Assert(!XLogRecPtrIsValid(slot->data.restart_lsn));
    1704             :     Assert(!XLogRecPtrIsValid(slot->last_saved_restart_lsn));
    1705             : 
    1706             :     /*
    1707             :      * The replication slot mechanism is used to prevent the removal of
    1708             :      * required WAL.
    1709             :      *
    1710             :      * Acquire an exclusive lock to prevent the checkpoint process from
    1711             :      * concurrently computing the minimum slot LSN (see
    1712             :      * CheckPointReplicationSlots). This ensures that the WAL reserved for
    1713             :      * replication cannot be removed during a checkpoint.
    1714             :      *
    1715             :      * The mechanism is reliable because if WAL reservation occurs first, the
    1716             :      * checkpoint must wait for the restart_lsn update before determining the
    1717             :      * minimum non-removable LSN. On the other hand, if the checkpoint happens
    1718             :      * first, subsequent WAL reservations will select positions at or beyond
    1719             :      * the redo pointer of that checkpoint.
    1720             :      */
    1721        1254 :     LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
    1722             : 
    1723             :     /*
    1724             :      * For logical slots log a standby snapshot and start logical decoding at
    1725             :      * exactly that position. That allows the slot to start up more quickly.
    1726             :      * But on a standby we cannot do WAL writes, so just use the replay
    1727             :      * pointer; effectively, an attempt to create a logical slot on standby
    1728             :      * will cause it to wait for an xl_running_xact record to be logged
    1729             :      * independently on the primary, so that a snapshot can be built using the
    1730             :      * record.
    1731             :      *
    1732             :      * None of this is needed (or indeed helpful) for physical slots as
    1733             :      * they'll start replay at the last logged checkpoint anyway. Instead,
    1734             :      * return the location of the last redo LSN, where a base backup has to
    1735             :      * start replay at.
    1736             :      */
    1737        1254 :     if (SlotIsPhysical(slot))
    1738         318 :         restart_lsn = GetRedoRecPtr();
    1739         936 :     else if (RecoveryInProgress())
    1740          52 :         restart_lsn = GetXLogReplayRecPtr(NULL);
    1741             :     else
    1742         884 :         restart_lsn = GetXLogInsertRecPtr();
    1743             : 
    1744        1254 :     SpinLockAcquire(&slot->mutex);
    1745        1254 :     slot->data.restart_lsn = restart_lsn;
    1746        1254 :     SpinLockRelease(&slot->mutex);
    1747             : 
    1748             :     /* prevent WAL removal as fast as possible */
    1749        1254 :     ReplicationSlotsComputeRequiredLSN();
    1750             : 
    1751             :     /* Checkpoint shouldn't remove the required WAL. */
    1752        1254 :     XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size);
    1753        1254 :     if (XLogGetLastRemovedSegno() >= segno)
    1754           0 :         elog(ERROR, "WAL required by replication slot %s has been removed concurrently",
    1755             :              NameStr(slot->data.name));
    1756             : 
    1757        1254 :     LWLockRelease(ReplicationSlotAllocationLock);
    1758             : 
    1759        1254 :     if (!RecoveryInProgress() && SlotIsLogical(slot))
    1760             :     {
    1761             :         XLogRecPtr  flushptr;
    1762             : 
    1763             :         /* make sure we have enough information to start */
    1764         884 :         flushptr = LogStandbySnapshot();
    1765             : 
    1766             :         /* and make sure it's fsynced to disk */
    1767         884 :         XLogFlush(flushptr);
    1768             :     }
    1769        1254 : }
    1770             : 
    1771             : /*
    1772             :  * Report that replication slot needs to be invalidated
    1773             :  */
    1774             : static void
    1775          46 : ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
    1776             :                        bool terminating,
    1777             :                        int pid,
    1778             :                        NameData slotname,
    1779             :                        XLogRecPtr restart_lsn,
    1780             :                        XLogRecPtr oldestLSN,
    1781             :                        TransactionId snapshotConflictHorizon,
    1782             :                        long slot_idle_seconds)
    1783             : {
    1784             :     StringInfoData err_detail;
    1785             :     StringInfoData err_hint;
    1786             : 
    1787          46 :     initStringInfo(&err_detail);
    1788          46 :     initStringInfo(&err_hint);
    1789             : 
    1790          46 :     switch (cause)
    1791             :     {
    1792          14 :         case RS_INVAL_WAL_REMOVED:
    1793             :             {
    1794          14 :                 uint64      ex = oldestLSN - restart_lsn;
    1795             : 
    1796          14 :                 appendStringInfo(&err_detail,
    1797          14 :                                  ngettext("The slot's restart_lsn %X/%08X exceeds the limit by %" PRIu64 " byte.",
    1798             :                                           "The slot's restart_lsn %X/%08X exceeds the limit by %" PRIu64 " bytes.",
    1799             :                                           ex),
    1800          14 :                                  LSN_FORMAT_ARGS(restart_lsn),
    1801             :                                  ex);
    1802             :                 /* translator: %s is a GUC variable name */
    1803          14 :                 appendStringInfo(&err_hint, _("You might need to increase \"%s\"."),
    1804             :                                  "max_slot_wal_keep_size");
    1805          14 :                 break;
    1806             :             }
    1807          24 :         case RS_INVAL_HORIZON:
    1808          24 :             appendStringInfo(&err_detail, _("The slot conflicted with xid horizon %u."),
    1809             :                              snapshotConflictHorizon);
    1810          24 :             break;
    1811             : 
    1812           8 :         case RS_INVAL_WAL_LEVEL:
    1813           8 :             appendStringInfoString(&err_detail, _("Logical decoding on standby requires the primary server to either set \"wal_level\" >= \"logical\" or have at least one logical slot when \"wal_level\" = \"replica\"."));
    1814           8 :             break;
    1815             : 
    1816           0 :         case RS_INVAL_IDLE_TIMEOUT:
    1817             :             {
    1818             :                 /* translator: %s is a GUC variable name */
    1819           0 :                 appendStringInfo(&err_detail, _("The slot's idle time of %lds exceeds the configured \"%s\" duration of %ds."),
    1820             :                                  slot_idle_seconds, "idle_replication_slot_timeout",
    1821             :                                  idle_replication_slot_timeout_secs);
    1822             :                 /* translator: %s is a GUC variable name */
    1823           0 :                 appendStringInfo(&err_hint, _("You might need to increase \"%s\"."),
    1824             :                                  "idle_replication_slot_timeout");
    1825           0 :                 break;
    1826             :             }
    1827             :         case RS_INVAL_NONE:
    1828             :             pg_unreachable();
    1829             :     }
    1830             : 
    1831          46 :     ereport(LOG,
    1832             :             terminating ?
    1833             :             errmsg("terminating process %d to release replication slot \"%s\"",
    1834             :                    pid, NameStr(slotname)) :
    1835             :             errmsg("invalidating obsolete replication slot \"%s\"",
    1836             :                    NameStr(slotname)),
    1837             :             errdetail_internal("%s", err_detail.data),
    1838             :             err_hint.len ? errhint("%s", err_hint.data) : 0);
    1839             : 
    1840          46 :     pfree(err_detail.data);
    1841          46 :     pfree(err_hint.data);
    1842          46 : }
    1843             : 
    1844             : /*
    1845             :  * Can we invalidate an idle replication slot?
    1846             :  *
    1847             :  * Idle timeout invalidation is allowed only when:
    1848             :  *
    1849             :  * 1. Idle timeout is set
    1850             :  * 2. Slot has reserved WAL
    1851             :  * 3. Slot is inactive
    1852             :  * 4. The slot is not being synced from the primary while the server is in
    1853             :  *    recovery. This is because synced slots are always considered to be
    1854             :  *    inactive because they don't perform logical decoding to produce changes.
    1855             :  */
    1856             : static inline bool
    1857         746 : CanInvalidateIdleSlot(ReplicationSlot *s)
    1858             : {
    1859         746 :     return (idle_replication_slot_timeout_secs != 0 &&
    1860           0 :             XLogRecPtrIsValid(s->data.restart_lsn) &&
    1861         746 :             s->inactive_since > 0 &&
    1862           0 :             !(RecoveryInProgress() && s->data.synced));
    1863             : }
    1864             : 
    1865             : /*
    1866             :  * DetermineSlotInvalidationCause - Determine the cause for which a slot
    1867             :  * becomes invalid among the given possible causes.
    1868             :  *
    1869             :  * This function sequentially checks all possible invalidation causes and
    1870             :  * returns the first one for which the slot is eligible for invalidation.
    1871             :  */
    1872             : static ReplicationSlotInvalidationCause
    1873         820 : DetermineSlotInvalidationCause(uint32 possible_causes, ReplicationSlot *s,
    1874             :                                XLogRecPtr oldestLSN, Oid dboid,
    1875             :                                TransactionId snapshotConflictHorizon,
    1876             :                                TimestampTz *inactive_since, TimestampTz now)
    1877             : {
    1878             :     Assert(possible_causes != RS_INVAL_NONE);
    1879             : 
    1880         820 :     if (possible_causes & RS_INVAL_WAL_REMOVED)
    1881             :     {
    1882         760 :         XLogRecPtr  restart_lsn = s->data.restart_lsn;
    1883             : 
    1884         760 :         if (XLogRecPtrIsValid(restart_lsn) &&
    1885             :             restart_lsn < oldestLSN)
    1886          14 :             return RS_INVAL_WAL_REMOVED;
    1887             :     }
    1888             : 
    1889         806 :     if (possible_causes & RS_INVAL_HORIZON)
    1890             :     {
    1891             :         /* invalid DB oid signals a shared relation */
    1892          52 :         if (SlotIsLogical(s) &&
    1893          42 :             (dboid == InvalidOid || dboid == s->data.database))
    1894             :         {
    1895          52 :             TransactionId effective_xmin = s->effective_xmin;
    1896          52 :             TransactionId catalog_effective_xmin = s->effective_catalog_xmin;
    1897             : 
    1898          52 :             if (TransactionIdIsValid(effective_xmin) &&
    1899           0 :                 TransactionIdPrecedesOrEquals(effective_xmin,
    1900             :                                               snapshotConflictHorizon))
    1901           0 :                 return RS_INVAL_HORIZON;
    1902         104 :             else if (TransactionIdIsValid(catalog_effective_xmin) &&
    1903          52 :                      TransactionIdPrecedesOrEquals(catalog_effective_xmin,
    1904             :                                                    snapshotConflictHorizon))
    1905          24 :                 return RS_INVAL_HORIZON;
    1906             :         }
    1907             :     }
    1908             : 
    1909         782 :     if (possible_causes & RS_INVAL_WAL_LEVEL)
    1910             :     {
    1911           8 :         if (SlotIsLogical(s))
    1912           8 :             return RS_INVAL_WAL_LEVEL;
    1913             :     }
    1914             : 
    1915         774 :     if (possible_causes & RS_INVAL_IDLE_TIMEOUT)
    1916             :     {
    1917             :         Assert(now > 0);
    1918             : 
    1919         746 :         if (CanInvalidateIdleSlot(s))
    1920             :         {
    1921             :             /*
    1922             :              * Simulate the invalidation due to idle_timeout to test the
    1923             :              * timeout behavior promptly, without waiting for it to trigger
    1924             :              * naturally.
    1925             :              */
    1926             : #ifdef USE_INJECTION_POINTS
    1927           0 :             if (IS_INJECTION_POINT_ATTACHED("slot-timeout-inval"))
    1928             :             {
    1929           0 :                 *inactive_since = 0;    /* since the beginning of time */
    1930           0 :                 return RS_INVAL_IDLE_TIMEOUT;
    1931             :             }
    1932             : #endif
    1933             : 
    1934             :             /*
    1935             :              * Check if the slot needs to be invalidated due to
    1936             :              * idle_replication_slot_timeout GUC.
    1937             :              */
    1938           0 :             if (TimestampDifferenceExceedsSeconds(s->inactive_since, now,
    1939             :                                                   idle_replication_slot_timeout_secs))
    1940             :             {
    1941           0 :                 *inactive_since = s->inactive_since;
    1942           0 :                 return RS_INVAL_IDLE_TIMEOUT;
    1943             :             }
    1944             :         }
    1945             :     }
    1946             : 
    1947         774 :     return RS_INVAL_NONE;
    1948             : }
    1949             : 
    1950             : /*
    1951             :  * Helper for InvalidateObsoleteReplicationSlots
    1952             :  *
    1953             :  * Acquires the given slot and mark it invalid, if necessary and possible.
    1954             :  *
    1955             :  * Returns true if the slot was invalidated.
    1956             :  *
    1957             :  * Set *released_lock_out if ReplicationSlotControlLock was released in the
    1958             :  * interim (and in that case we're not holding the lock at return, otherwise
    1959             :  * we are).
    1960             :  *
    1961             :  * This is inherently racy, because we release the LWLock
    1962             :  * for syscalls, so caller must restart if we return true.
    1963             :  */
    1964             : static bool
    1965         890 : InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
    1966             :                                ReplicationSlot *s,
    1967             :                                XLogRecPtr oldestLSN,
    1968             :                                Oid dboid, TransactionId snapshotConflictHorizon,
    1969             :                                bool *released_lock_out)
    1970             : {
    1971         890 :     int         last_signaled_pid = 0;
    1972         890 :     bool        released_lock = false;
    1973         890 :     bool        invalidated = false;
    1974         890 :     TimestampTz inactive_since = 0;
    1975             : 
    1976             :     for (;;)
    1977          14 :     {
    1978             :         XLogRecPtr  restart_lsn;
    1979             :         NameData    slotname;
    1980             :         ProcNumber  active_proc;
    1981         904 :         int         active_pid = 0;
    1982         904 :         ReplicationSlotInvalidationCause invalidation_cause = RS_INVAL_NONE;
    1983         904 :         TimestampTz now = 0;
    1984         904 :         long        slot_idle_secs = 0;
    1985             : 
    1986             :         Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
    1987             : 
    1988         904 :         if (!s->in_use)
    1989             :         {
    1990           0 :             if (released_lock)
    1991           0 :                 LWLockRelease(ReplicationSlotControlLock);
    1992           0 :             break;
    1993             :         }
    1994             : 
    1995         904 :         if (possible_causes & RS_INVAL_IDLE_TIMEOUT)
    1996             :         {
    1997             :             /*
    1998             :              * Assign the current time here to avoid system call overhead
    1999             :              * while holding the spinlock in subsequent code.
    2000             :              */
    2001         788 :             now = GetCurrentTimestamp();
    2002             :         }
    2003             : 
    2004             :         /*
    2005             :          * Check if the slot needs to be invalidated. If it needs to be
    2006             :          * invalidated, and is not currently acquired, acquire it and mark it
    2007             :          * as having been invalidated.  We do this with the spinlock held to
    2008             :          * avoid race conditions -- for example the restart_lsn could move
    2009             :          * forward, or the slot could be dropped.
    2010             :          */
    2011         904 :         SpinLockAcquire(&s->mutex);
    2012             : 
    2013         904 :         restart_lsn = s->data.restart_lsn;
    2014             : 
    2015             :         /* we do nothing if the slot is already invalid */
    2016         904 :         if (s->data.invalidated == RS_INVAL_NONE)
    2017         820 :             invalidation_cause = DetermineSlotInvalidationCause(possible_causes,
    2018             :                                                                 s, oldestLSN,
    2019             :                                                                 dboid,
    2020             :                                                                 snapshotConflictHorizon,
    2021             :                                                                 &inactive_since,
    2022             :                                                                 now);
    2023             : 
    2024             :         /* if there's no invalidation, we're done */
    2025         904 :         if (invalidation_cause == RS_INVAL_NONE)
    2026             :         {
    2027         858 :             SpinLockRelease(&s->mutex);
    2028         858 :             if (released_lock)
    2029           0 :                 LWLockRelease(ReplicationSlotControlLock);
    2030         858 :             break;
    2031             :         }
    2032             : 
    2033          46 :         slotname = s->data.name;
    2034          46 :         active_proc = s->active_proc;
    2035             : 
    2036             :         /*
    2037             :          * If the slot can be acquired, do so and mark it invalidated
    2038             :          * immediately.  Otherwise we'll signal the owning process, below, and
    2039             :          * retry.
    2040             :          *
    2041             :          * Note: Unlike other slot attributes, slot's inactive_since can't be
    2042             :          * changed until the acquired slot is released or the owning process
    2043             :          * is terminated. So, the inactive slot can only be invalidated
    2044             :          * immediately without being terminated.
    2045             :          */
    2046          46 :         if (active_proc == INVALID_PROC_NUMBER)
    2047             :         {
    2048          32 :             MyReplicationSlot = s;
    2049          32 :             s->active_proc = MyProcNumber;
    2050          32 :             s->data.invalidated = invalidation_cause;
    2051             : 
    2052             :             /*
    2053             :              * XXX: We should consider not overwriting restart_lsn and instead
    2054             :              * just rely on .invalidated.
    2055             :              */
    2056          32 :             if (invalidation_cause == RS_INVAL_WAL_REMOVED)
    2057             :             {
    2058          10 :                 s->data.restart_lsn = InvalidXLogRecPtr;
    2059          10 :                 s->last_saved_restart_lsn = InvalidXLogRecPtr;
    2060             :             }
    2061             : 
    2062             :             /* Let caller know */
    2063          32 :             invalidated = true;
    2064             :         }
    2065             :         else
    2066             :         {
    2067          14 :             active_pid = GetPGProcByNumber(active_proc)->pid;
    2068             :             Assert(active_pid != 0);
    2069             :         }
    2070             : 
    2071          46 :         SpinLockRelease(&s->mutex);
    2072             : 
    2073             :         /*
    2074             :          * Calculate the idle time duration of the slot if slot is marked
    2075             :          * invalidated with RS_INVAL_IDLE_TIMEOUT.
    2076             :          */
    2077          46 :         if (invalidation_cause == RS_INVAL_IDLE_TIMEOUT)
    2078             :         {
    2079             :             int         slot_idle_usecs;
    2080             : 
    2081           0 :             TimestampDifference(inactive_since, now, &slot_idle_secs,
    2082             :                                 &slot_idle_usecs);
    2083             :         }
    2084             : 
    2085          46 :         if (active_proc != INVALID_PROC_NUMBER)
    2086             :         {
    2087             :             /*
    2088             :              * Prepare the sleep on the slot's condition variable before
    2089             :              * releasing the lock, to close a possible race condition if the
    2090             :              * slot is released before the sleep below.
    2091             :              */
    2092          14 :             ConditionVariablePrepareToSleep(&s->active_cv);
    2093             : 
    2094          14 :             LWLockRelease(ReplicationSlotControlLock);
    2095          14 :             released_lock = true;
    2096             : 
    2097             :             /*
    2098             :              * Signal to terminate the process that owns the slot, if we
    2099             :              * haven't already signalled it.  (Avoidance of repeated
    2100             :              * signalling is the only reason for there to be a loop in this
    2101             :              * routine; otherwise we could rely on caller's restart loop.)
    2102             :              *
    2103             :              * There is the race condition that other process may own the slot
    2104             :              * after its current owner process is terminated and before this
    2105             :              * process owns it. To handle that, we signal only if the PID of
    2106             :              * the owning process has changed from the previous time. (This
    2107             :              * logic assumes that the same PID is not reused very quickly.)
    2108             :              */
    2109          14 :             if (last_signaled_pid != active_pid)
    2110             :             {
    2111          14 :                 ReportSlotInvalidation(invalidation_cause, true, active_pid,
    2112             :                                        slotname, restart_lsn,
    2113             :                                        oldestLSN, snapshotConflictHorizon,
    2114             :                                        slot_idle_secs);
    2115             : 
    2116          14 :                 if (MyBackendType == B_STARTUP)
    2117          10 :                     (void) SignalRecoveryConflict(GetPGProcByNumber(active_proc),
    2118             :                                                   active_pid,
    2119             :                                                   RECOVERY_CONFLICT_LOGICALSLOT);
    2120             :                 else
    2121           4 :                     (void) kill(active_pid, SIGTERM);
    2122             : 
    2123          14 :                 last_signaled_pid = active_pid;
    2124             :             }
    2125             : 
    2126             :             /* Wait until the slot is released. */
    2127          14 :             ConditionVariableSleep(&s->active_cv,
    2128             :                                    WAIT_EVENT_REPLICATION_SLOT_DROP);
    2129             : 
    2130             :             /*
    2131             :              * Re-acquire lock and start over; we expect to invalidate the
    2132             :              * slot next time (unless another process acquires the slot in the
    2133             :              * meantime).
    2134             :              *
    2135             :              * Note: It is possible for a slot to advance its restart_lsn or
    2136             :              * xmin values sufficiently between when we release the mutex and
    2137             :              * when we recheck, moving from a conflicting state to a non
    2138             :              * conflicting state.  This is intentional and safe: if the slot
    2139             :              * has caught up while we're busy here, the resources we were
    2140             :              * concerned about (WAL segments or tuples) have not yet been
    2141             :              * removed, and there's no reason to invalidate the slot.
    2142             :              */
    2143          14 :             LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    2144          14 :             continue;
    2145             :         }
    2146             :         else
    2147             :         {
    2148             :             /*
    2149             :              * We hold the slot now and have already invalidated it; flush it
    2150             :              * to ensure that state persists.
    2151             :              *
    2152             :              * Don't want to hold ReplicationSlotControlLock across file
    2153             :              * system operations, so release it now but be sure to tell caller
    2154             :              * to restart from scratch.
    2155             :              */
    2156          32 :             LWLockRelease(ReplicationSlotControlLock);
    2157          32 :             released_lock = true;
    2158             : 
    2159             :             /* Make sure the invalidated state persists across server restart */
    2160          32 :             ReplicationSlotMarkDirty();
    2161          32 :             ReplicationSlotSave();
    2162          32 :             ReplicationSlotRelease();
    2163             : 
    2164          32 :             ReportSlotInvalidation(invalidation_cause, false, active_pid,
    2165             :                                    slotname, restart_lsn,
    2166             :                                    oldestLSN, snapshotConflictHorizon,
    2167             :                                    slot_idle_secs);
    2168             : 
    2169             :             /* done with this slot for now */
    2170          32 :             break;
    2171             :         }
    2172             :     }
    2173             : 
    2174             :     Assert(released_lock == !LWLockHeldByMe(ReplicationSlotControlLock));
    2175             : 
    2176         890 :     *released_lock_out = released_lock;
    2177         890 :     return invalidated;
    2178             : }
    2179             : 
    2180             : /*
    2181             :  * Invalidate slots that require resources about to be removed.
    2182             :  *
    2183             :  * Returns true when any slot have got invalidated.
    2184             :  *
    2185             :  * Whether a slot needs to be invalidated depends on the invalidation cause.
    2186             :  * A slot is invalidated if it:
    2187             :  * - RS_INVAL_WAL_REMOVED: requires a LSN older than the given segment
    2188             :  * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given
    2189             :  *   db; dboid may be InvalidOid for shared relations
    2190             :  * - RS_INVAL_WAL_LEVEL: is a logical slot and effective_wal_level is not
    2191             :  *   logical.
    2192             :  * - RS_INVAL_IDLE_TIMEOUT: has been idle longer than the configured
    2193             :  *   "idle_replication_slot_timeout" duration.
    2194             :  *
    2195             :  * Note: This function attempts to invalidate the slot for multiple possible
    2196             :  * causes in a single pass, minimizing redundant iterations. The "cause"
    2197             :  * parameter can be a MASK representing one or more of the defined causes.
    2198             :  *
    2199             :  * If it invalidates the last logical slot in the cluster, it requests to
    2200             :  * disable logical decoding.
    2201             :  *
    2202             :  * NB - this runs as part of checkpoint, so avoid raising errors if possible.
    2203             :  */
    2204             : bool
    2205        3630 : InvalidateObsoleteReplicationSlots(uint32 possible_causes,
    2206             :                                    XLogSegNo oldestSegno, Oid dboid,
    2207             :                                    TransactionId snapshotConflictHorizon)
    2208             : {
    2209             :     XLogRecPtr  oldestLSN;
    2210        3630 :     bool        invalidated = false;
    2211        3630 :     bool        invalidated_logical = false;
    2212             :     bool        found_valid_logicalslot;
    2213             : 
    2214             :     Assert(!(possible_causes & RS_INVAL_HORIZON) || TransactionIdIsValid(snapshotConflictHorizon));
    2215             :     Assert(!(possible_causes & RS_INVAL_WAL_REMOVED) || oldestSegno > 0);
    2216             :     Assert(possible_causes != RS_INVAL_NONE);
    2217             : 
    2218        3630 :     if (max_replication_slots == 0)
    2219           2 :         return invalidated;
    2220             : 
    2221        3628 :     XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
    2222             : 
    2223        3660 : restart:
    2224        3660 :     found_valid_logicalslot = false;
    2225        3660 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    2226       39250 :     for (int i = 0; i < max_replication_slots; i++)
    2227             :     {
    2228       35622 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
    2229       35622 :         bool        released_lock = false;
    2230             : 
    2231       35622 :         if (!s->in_use)
    2232       34732 :             continue;
    2233             : 
    2234             :         /* Prevent invalidation of logical slots during binary upgrade */
    2235         908 :         if (SlotIsLogical(s) && IsBinaryUpgrade)
    2236             :         {
    2237          18 :             SpinLockAcquire(&s->mutex);
    2238          18 :             found_valid_logicalslot |= (s->data.invalidated == RS_INVAL_NONE);
    2239          18 :             SpinLockRelease(&s->mutex);
    2240             : 
    2241          18 :             continue;
    2242             :         }
    2243             : 
    2244         890 :         if (InvalidatePossiblyObsoleteSlot(possible_causes, s, oldestLSN,
    2245             :                                            dboid, snapshotConflictHorizon,
    2246             :                                            &released_lock))
    2247             :         {
    2248             :             Assert(released_lock);
    2249             : 
    2250             :             /* Remember we have invalidated a physical or logical slot */
    2251          32 :             invalidated = true;
    2252             : 
    2253             :             /*
    2254             :              * Additionally, remember we have invalidated a logical slot as we
    2255             :              * can request disabling logical decoding later.
    2256             :              */
    2257          32 :             if (SlotIsLogical(s))
    2258          26 :                 invalidated_logical = true;
    2259             :         }
    2260             :         else
    2261             :         {
    2262             :             /*
    2263             :              * We need to check if the slot is invalidated here since
    2264             :              * InvalidatePossiblyObsoleteSlot() returns false also if the slot
    2265             :              * is already invalidated.
    2266             :              */
    2267         858 :             SpinLockAcquire(&s->mutex);
    2268        1716 :             found_valid_logicalslot |=
    2269         858 :                 (SlotIsLogical(s) && (s->data.invalidated == RS_INVAL_NONE));
    2270         858 :             SpinLockRelease(&s->mutex);
    2271             :         }
    2272             : 
    2273             :         /* if the lock was released, start from scratch */
    2274         890 :         if (released_lock)
    2275          32 :             goto restart;
    2276             :     }
    2277        3628 :     LWLockRelease(ReplicationSlotControlLock);
    2278             : 
    2279             :     /*
    2280             :      * If any slots have been invalidated, recalculate the resource limits.
    2281             :      */
    2282        3628 :     if (invalidated)
    2283             :     {
    2284          22 :         ReplicationSlotsComputeRequiredXmin(false);
    2285          22 :         ReplicationSlotsComputeRequiredLSN();
    2286             :     }
    2287             : 
    2288             :     /*
    2289             :      * Request the checkpointer to disable logical decoding if no valid
    2290             :      * logical slots remain. If called by the checkpointer during a
    2291             :      * checkpoint, only the request is initiated; actual deactivation is
    2292             :      * deferred until after the checkpoint completes.
    2293             :      */
    2294        3628 :     if (invalidated_logical && !found_valid_logicalslot)
    2295          16 :         RequestDisableLogicalDecoding();
    2296             : 
    2297        3628 :     return invalidated;
    2298             : }
    2299             : 
    2300             : /*
    2301             :  * Flush all replication slots to disk.
    2302             :  *
    2303             :  * It is convenient to flush dirty replication slots at the time of checkpoint.
    2304             :  * Additionally, in case of a shutdown checkpoint, we also identify the slots
    2305             :  * for which the confirmed_flush LSN has been updated since the last time it
    2306             :  * was saved and flush them.
    2307             :  */
    2308             : void
    2309        3580 : CheckPointReplicationSlots(bool is_shutdown)
    2310             : {
    2311             :     int         i;
    2312        3580 :     bool        last_saved_restart_lsn_updated = false;
    2313             : 
    2314        3580 :     elog(DEBUG1, "performing replication slot checkpoint");
    2315             : 
    2316             :     /*
    2317             :      * Prevent any slot from being created/dropped while we're active. As we
    2318             :      * explicitly do *not* want to block iterating over replication_slots or
    2319             :      * acquiring a slot we cannot take the control lock - but that's OK,
    2320             :      * because holding ReplicationSlotAllocationLock is strictly stronger, and
    2321             :      * enough to guarantee that nobody can change the in_use bits on us.
    2322             :      *
    2323             :      * Additionally, acquiring the Allocation lock is necessary to serialize
    2324             :      * the slot flush process with concurrent slot WAL reservation. This
    2325             :      * ensures that the WAL position being reserved is either flushed to disk
    2326             :      * or is beyond or equal to the redo pointer of the current checkpoint
    2327             :      * (See ReplicationSlotReserveWal for details).
    2328             :      */
    2329        3580 :     LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
    2330             : 
    2331       38830 :     for (i = 0; i < max_replication_slots; i++)
    2332             :     {
    2333       35250 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
    2334             :         char        path[MAXPGPATH];
    2335             : 
    2336       35250 :         if (!s->in_use)
    2337       34458 :             continue;
    2338             : 
    2339             :         /* save the slot to disk, locking is handled in SaveSlotToPath() */
    2340         792 :         sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(s->data.name));
    2341             : 
    2342             :         /*
    2343             :          * Slot's data is not flushed each time the confirmed_flush LSN is
    2344             :          * updated as that could lead to frequent writes.  However, we decide
    2345             :          * to force a flush of all logical slot's data at the time of shutdown
    2346             :          * if the confirmed_flush LSN is changed since we last flushed it to
    2347             :          * disk.  This helps in avoiding an unnecessary retreat of the
    2348             :          * confirmed_flush LSN after restart.
    2349             :          */
    2350         792 :         if (is_shutdown && SlotIsLogical(s))
    2351             :         {
    2352         176 :             SpinLockAcquire(&s->mutex);
    2353             : 
    2354         176 :             if (s->data.invalidated == RS_INVAL_NONE &&
    2355         174 :                 s->data.confirmed_flush > s->last_saved_confirmed_flush)
    2356             :             {
    2357          74 :                 s->just_dirtied = true;
    2358          74 :                 s->dirty = true;
    2359             :             }
    2360         176 :             SpinLockRelease(&s->mutex);
    2361             :         }
    2362             : 
    2363             :         /*
    2364             :          * Track if we're going to update slot's last_saved_restart_lsn. We
    2365             :          * need this to know if we need to recompute the required LSN.
    2366             :          */
    2367         792 :         if (s->last_saved_restart_lsn != s->data.restart_lsn)
    2368         404 :             last_saved_restart_lsn_updated = true;
    2369             : 
    2370         792 :         SaveSlotToPath(s, path, LOG);
    2371             :     }
    2372        3580 :     LWLockRelease(ReplicationSlotAllocationLock);
    2373             : 
    2374             :     /*
    2375             :      * Recompute the required LSN if SaveSlotToPath() updated
    2376             :      * last_saved_restart_lsn for any slot.
    2377             :      */
    2378        3580 :     if (last_saved_restart_lsn_updated)
    2379         404 :         ReplicationSlotsComputeRequiredLSN();
    2380        3580 : }
    2381             : 
    2382             : /*
    2383             :  * Load all replication slots from disk into memory at server startup. This
    2384             :  * needs to be run before we start crash recovery.
    2385             :  */
    2386             : void
    2387        1988 : StartupReplicationSlots(void)
    2388             : {
    2389             :     DIR        *replication_dir;
    2390             :     struct dirent *replication_de;
    2391             : 
    2392        1988 :     elog(DEBUG1, "starting up replication slots");
    2393             : 
    2394             :     /* restore all slots by iterating over all on-disk entries */
    2395        1988 :     replication_dir = AllocateDir(PG_REPLSLOT_DIR);
    2396        6194 :     while ((replication_de = ReadDir(replication_dir, PG_REPLSLOT_DIR)) != NULL)
    2397             :     {
    2398             :         char        path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
    2399             :         PGFileType  de_type;
    2400             : 
    2401        4210 :         if (strcmp(replication_de->d_name, ".") == 0 ||
    2402        2226 :             strcmp(replication_de->d_name, "..") == 0)
    2403        3968 :             continue;
    2404             : 
    2405         242 :         snprintf(path, sizeof(path), "%s/%s", PG_REPLSLOT_DIR, replication_de->d_name);
    2406         242 :         de_type = get_dirent_type(path, replication_de, false, DEBUG1);
    2407             : 
    2408             :         /* we're only creating directories here, skip if it's not our's */
    2409         242 :         if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_DIR)
    2410           0 :             continue;
    2411             : 
    2412             :         /* we crashed while a slot was being setup or deleted, clean up */
    2413         242 :         if (pg_str_endswith(replication_de->d_name, ".tmp"))
    2414             :         {
    2415           0 :             if (!rmtree(path, true))
    2416             :             {
    2417           0 :                 ereport(WARNING,
    2418             :                         (errmsg("could not remove directory \"%s\"",
    2419             :                                 path)));
    2420           0 :                 continue;
    2421             :             }
    2422           0 :             fsync_fname(PG_REPLSLOT_DIR, true);
    2423           0 :             continue;
    2424             :         }
    2425             : 
    2426             :         /* looks like a slot in a normal state, restore */
    2427         242 :         RestoreSlotFromDisk(replication_de->d_name);
    2428             :     }
    2429        1984 :     FreeDir(replication_dir);
    2430             : 
    2431             :     /* currently no slots exist, we're done. */
    2432        1984 :     if (max_replication_slots <= 0)
    2433           2 :         return;
    2434             : 
    2435             :     /* Now that we have recovered all the data, compute replication xmin */
    2436        1982 :     ReplicationSlotsComputeRequiredXmin(false);
    2437        1982 :     ReplicationSlotsComputeRequiredLSN();
    2438             : }
    2439             : 
    2440             : /* ----
    2441             :  * Manipulation of on-disk state of replication slots
    2442             :  *
    2443             :  * NB: none of the routines below should take any notice whether a slot is the
    2444             :  * current one or not, that's all handled a layer above.
    2445             :  * ----
    2446             :  */
    2447             : static void
    2448        1356 : CreateSlotOnDisk(ReplicationSlot *slot)
    2449             : {
    2450             :     char        tmppath[MAXPGPATH];
    2451             :     char        path[MAXPGPATH];
    2452             :     struct stat st;
    2453             : 
    2454             :     /*
    2455             :      * No need to take out the io_in_progress_lock, nobody else can see this
    2456             :      * slot yet, so nobody else will write. We're reusing SaveSlotToPath which
    2457             :      * takes out the lock, if we'd take the lock here, we'd deadlock.
    2458             :      */
    2459             : 
    2460        1356 :     sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(slot->data.name));
    2461        1356 :     sprintf(tmppath, "%s/%s.tmp", PG_REPLSLOT_DIR, NameStr(slot->data.name));
    2462             : 
    2463             :     /*
    2464             :      * It's just barely possible that some previous effort to create or drop a
    2465             :      * slot with this name left a temp directory lying around. If that seems
    2466             :      * to be the case, try to remove it.  If the rmtree() fails, we'll error
    2467             :      * out at the MakePGDirectory() below, so we don't bother checking
    2468             :      * success.
    2469             :      */
    2470        1356 :     if (stat(tmppath, &st) == 0 && S_ISDIR(st.st_mode))
    2471           0 :         rmtree(tmppath, true);
    2472             : 
    2473             :     /* Create and fsync the temporary slot directory. */
    2474        1356 :     if (MakePGDirectory(tmppath) < 0)
    2475           0 :         ereport(ERROR,
    2476             :                 (errcode_for_file_access(),
    2477             :                  errmsg("could not create directory \"%s\": %m",
    2478             :                         tmppath)));
    2479        1356 :     fsync_fname(tmppath, true);
    2480             : 
    2481             :     /* Write the actual state file. */
    2482        1356 :     slot->dirty = true;          /* signal that we really need to write */
    2483        1356 :     SaveSlotToPath(slot, tmppath, ERROR);
    2484             : 
    2485             :     /* Rename the directory into place. */
    2486        1356 :     if (rename(tmppath, path) != 0)
    2487           0 :         ereport(ERROR,
    2488             :                 (errcode_for_file_access(),
    2489             :                  errmsg("could not rename file \"%s\" to \"%s\": %m",
    2490             :                         tmppath, path)));
    2491             : 
    2492             :     /*
    2493             :      * If we'd now fail - really unlikely - we wouldn't know whether this slot
    2494             :      * would persist after an OS crash or not - so, force a restart. The
    2495             :      * restart would try to fsync this again till it works.
    2496             :      */
    2497        1356 :     START_CRIT_SECTION();
    2498             : 
    2499        1356 :     fsync_fname(path, true);
    2500        1356 :     fsync_fname(PG_REPLSLOT_DIR, true);
    2501             : 
    2502        1356 :     END_CRIT_SECTION();
    2503        1356 : }
    2504             : 
    2505             : /*
    2506             :  * Shared functionality between saving and creating a replication slot.
    2507             :  */
    2508             : static void
    2509        4970 : SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
    2510             : {
    2511             :     char        tmppath[MAXPGPATH];
    2512             :     char        path[MAXPGPATH];
    2513             :     int         fd;
    2514             :     ReplicationSlotOnDisk cp;
    2515             :     bool        was_dirty;
    2516             : 
    2517             :     /* first check whether there's something to write out */
    2518        4970 :     SpinLockAcquire(&slot->mutex);
    2519        4970 :     was_dirty = slot->dirty;
    2520        4970 :     slot->just_dirtied = false;
    2521        4970 :     SpinLockRelease(&slot->mutex);
    2522             : 
    2523             :     /* and don't do anything if there's nothing to write */
    2524        4970 :     if (!was_dirty)
    2525         292 :         return;
    2526             : 
    2527        4678 :     LWLockAcquire(&slot->io_in_progress_lock, LW_EXCLUSIVE);
    2528             : 
    2529             :     /* silence valgrind :( */
    2530        4678 :     memset(&cp, 0, sizeof(ReplicationSlotOnDisk));
    2531             : 
    2532        4678 :     sprintf(tmppath, "%s/state.tmp", dir);
    2533        4678 :     sprintf(path, "%s/state", dir);
    2534             : 
    2535        4678 :     fd = OpenTransientFile(tmppath, O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
    2536        4678 :     if (fd < 0)
    2537             :     {
    2538             :         /*
    2539             :          * If not an ERROR, then release the lock before returning.  In case
    2540             :          * of an ERROR, the error recovery path automatically releases the
    2541             :          * lock, but no harm in explicitly releasing even in that case.  Note
    2542             :          * that LWLockRelease() could affect errno.
    2543             :          */
    2544           0 :         int         save_errno = errno;
    2545             : 
    2546           0 :         LWLockRelease(&slot->io_in_progress_lock);
    2547           0 :         errno = save_errno;
    2548           0 :         ereport(elevel,
    2549             :                 (errcode_for_file_access(),
    2550             :                  errmsg("could not create file \"%s\": %m",
    2551             :                         tmppath)));
    2552           0 :         return;
    2553             :     }
    2554             : 
    2555        4678 :     cp.magic = SLOT_MAGIC;
    2556        4678 :     INIT_CRC32C(cp.checksum);
    2557        4678 :     cp.version = SLOT_VERSION;
    2558        4678 :     cp.length = ReplicationSlotOnDiskV2Size;
    2559             : 
    2560        4678 :     SpinLockAcquire(&slot->mutex);
    2561             : 
    2562        4678 :     memcpy(&cp.slotdata, &slot->data, sizeof(ReplicationSlotPersistentData));
    2563             : 
    2564        4678 :     SpinLockRelease(&slot->mutex);
    2565             : 
    2566        4678 :     COMP_CRC32C(cp.checksum,
    2567             :                 (char *) (&cp) + ReplicationSlotOnDiskNotChecksummedSize,
    2568             :                 ReplicationSlotOnDiskChecksummedSize);
    2569        4678 :     FIN_CRC32C(cp.checksum);
    2570             : 
    2571        4678 :     errno = 0;
    2572        4678 :     pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_WRITE);
    2573        4678 :     if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
    2574             :     {
    2575           0 :         int         save_errno = errno;
    2576             : 
    2577           0 :         pgstat_report_wait_end();
    2578           0 :         CloseTransientFile(fd);
    2579           0 :         unlink(tmppath);
    2580           0 :         LWLockRelease(&slot->io_in_progress_lock);
    2581             : 
    2582             :         /* if write didn't set errno, assume problem is no disk space */
    2583           0 :         errno = save_errno ? save_errno : ENOSPC;
    2584           0 :         ereport(elevel,
    2585             :                 (errcode_for_file_access(),
    2586             :                  errmsg("could not write to file \"%s\": %m",
    2587             :                         tmppath)));
    2588           0 :         return;
    2589             :     }
    2590        4678 :     pgstat_report_wait_end();
    2591             : 
    2592             :     /* fsync the temporary file */
    2593        4678 :     pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_SYNC);
    2594        4678 :     if (pg_fsync(fd) != 0)
    2595             :     {
    2596           0 :         int         save_errno = errno;
    2597             : 
    2598           0 :         pgstat_report_wait_end();
    2599           0 :         CloseTransientFile(fd);
    2600           0 :         unlink(tmppath);
    2601           0 :         LWLockRelease(&slot->io_in_progress_lock);
    2602             : 
    2603           0 :         errno = save_errno;
    2604           0 :         ereport(elevel,
    2605             :                 (errcode_for_file_access(),
    2606             :                  errmsg("could not fsync file \"%s\": %m",
    2607             :                         tmppath)));
    2608           0 :         return;
    2609             :     }
    2610        4678 :     pgstat_report_wait_end();
    2611             : 
    2612        4678 :     if (CloseTransientFile(fd) != 0)
    2613             :     {
    2614           0 :         int         save_errno = errno;
    2615             : 
    2616           0 :         unlink(tmppath);
    2617           0 :         LWLockRelease(&slot->io_in_progress_lock);
    2618             : 
    2619           0 :         errno = save_errno;
    2620           0 :         ereport(elevel,
    2621             :                 (errcode_for_file_access(),
    2622             :                  errmsg("could not close file \"%s\": %m",
    2623             :                         tmppath)));
    2624           0 :         return;
    2625             :     }
    2626             : 
    2627             :     /* rename to permanent file, fsync file and directory */
    2628        4678 :     if (rename(tmppath, path) != 0)
    2629             :     {
    2630           0 :         int         save_errno = errno;
    2631             : 
    2632           0 :         unlink(tmppath);
    2633           0 :         LWLockRelease(&slot->io_in_progress_lock);
    2634             : 
    2635           0 :         errno = save_errno;
    2636           0 :         ereport(elevel,
    2637             :                 (errcode_for_file_access(),
    2638             :                  errmsg("could not rename file \"%s\" to \"%s\": %m",
    2639             :                         tmppath, path)));
    2640           0 :         return;
    2641             :     }
    2642             : 
    2643             :     /*
    2644             :      * Check CreateSlotOnDisk() for the reasoning of using a critical section.
    2645             :      */
    2646        4678 :     START_CRIT_SECTION();
    2647             : 
    2648        4678 :     fsync_fname(path, false);
    2649        4678 :     fsync_fname(dir, true);
    2650        4678 :     fsync_fname(PG_REPLSLOT_DIR, true);
    2651             : 
    2652        4678 :     END_CRIT_SECTION();
    2653             : 
    2654             :     /*
    2655             :      * Successfully wrote, unset dirty bit, unless somebody dirtied again
    2656             :      * already and remember the confirmed_flush LSN value.
    2657             :      */
    2658        4678 :     SpinLockAcquire(&slot->mutex);
    2659        4678 :     if (!slot->just_dirtied)
    2660        4656 :         slot->dirty = false;
    2661        4678 :     slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
    2662        4678 :     slot->last_saved_restart_lsn = cp.slotdata.restart_lsn;
    2663        4678 :     SpinLockRelease(&slot->mutex);
    2664             : 
    2665        4678 :     LWLockRelease(&slot->io_in_progress_lock);
    2666             : }
    2667             : 
    2668             : /*
    2669             :  * Load a single slot from disk into memory.
    2670             :  */
    2671             : static void
    2672         242 : RestoreSlotFromDisk(const char *name)
    2673             : {
    2674             :     ReplicationSlotOnDisk cp;
    2675             :     int         i;
    2676             :     char        slotdir[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
    2677             :     char        path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR) + 10];
    2678             :     int         fd;
    2679         242 :     bool        restored = false;
    2680             :     int         readBytes;
    2681             :     pg_crc32c   checksum;
    2682         242 :     TimestampTz now = 0;
    2683             : 
    2684             :     /* no need to lock here, no concurrent access allowed yet */
    2685             : 
    2686             :     /* delete temp file if it exists */
    2687         242 :     sprintf(slotdir, "%s/%s", PG_REPLSLOT_DIR, name);
    2688         242 :     sprintf(path, "%s/state.tmp", slotdir);
    2689         242 :     if (unlink(path) < 0 && errno != ENOENT)
    2690           0 :         ereport(PANIC,
    2691             :                 (errcode_for_file_access(),
    2692             :                  errmsg("could not remove file \"%s\": %m", path)));
    2693             : 
    2694         242 :     sprintf(path, "%s/state", slotdir);
    2695             : 
    2696         242 :     elog(DEBUG1, "restoring replication slot from \"%s\"", path);
    2697             : 
    2698             :     /* on some operating systems fsyncing a file requires O_RDWR */
    2699         242 :     fd = OpenTransientFile(path, O_RDWR | PG_BINARY);
    2700             : 
    2701             :     /*
    2702             :      * We do not need to handle this as we are rename()ing the directory into
    2703             :      * place only after we fsync()ed the state file.
    2704             :      */
    2705         242 :     if (fd < 0)
    2706           0 :         ereport(PANIC,
    2707             :                 (errcode_for_file_access(),
    2708             :                  errmsg("could not open file \"%s\": %m", path)));
    2709             : 
    2710             :     /*
    2711             :      * Sync state file before we're reading from it. We might have crashed
    2712             :      * while it wasn't synced yet and we shouldn't continue on that basis.
    2713             :      */
    2714         242 :     pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_RESTORE_SYNC);
    2715         242 :     if (pg_fsync(fd) != 0)
    2716           0 :         ereport(PANIC,
    2717             :                 (errcode_for_file_access(),
    2718             :                  errmsg("could not fsync file \"%s\": %m",
    2719             :                         path)));
    2720         242 :     pgstat_report_wait_end();
    2721             : 
    2722             :     /* Also sync the parent directory */
    2723         242 :     START_CRIT_SECTION();
    2724         242 :     fsync_fname(slotdir, true);
    2725         242 :     END_CRIT_SECTION();
    2726             : 
    2727             :     /* read part of statefile that's guaranteed to be version independent */
    2728         242 :     pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
    2729         242 :     readBytes = read(fd, &cp, ReplicationSlotOnDiskConstantSize);
    2730         242 :     pgstat_report_wait_end();
    2731         242 :     if (readBytes != ReplicationSlotOnDiskConstantSize)
    2732             :     {
    2733           0 :         if (readBytes < 0)
    2734           0 :             ereport(PANIC,
    2735             :                     (errcode_for_file_access(),
    2736             :                      errmsg("could not read file \"%s\": %m", path)));
    2737             :         else
    2738           0 :             ereport(PANIC,
    2739             :                     (errcode(ERRCODE_DATA_CORRUPTED),
    2740             :                      errmsg("could not read file \"%s\": read %d of %zu",
    2741             :                             path, readBytes,
    2742             :                             (Size) ReplicationSlotOnDiskConstantSize)));
    2743             :     }
    2744             : 
    2745             :     /* verify magic */
    2746         242 :     if (cp.magic != SLOT_MAGIC)
    2747           0 :         ereport(PANIC,
    2748             :                 (errcode(ERRCODE_DATA_CORRUPTED),
    2749             :                  errmsg("replication slot file \"%s\" has wrong magic number: %u instead of %u",
    2750             :                         path, cp.magic, SLOT_MAGIC)));
    2751             : 
    2752             :     /* verify version */
    2753         242 :     if (cp.version != SLOT_VERSION)
    2754           0 :         ereport(PANIC,
    2755             :                 (errcode(ERRCODE_DATA_CORRUPTED),
    2756             :                  errmsg("replication slot file \"%s\" has unsupported version %u",
    2757             :                         path, cp.version)));
    2758             : 
    2759             :     /* boundary check on length */
    2760         242 :     if (cp.length != ReplicationSlotOnDiskV2Size)
    2761           0 :         ereport(PANIC,
    2762             :                 (errcode(ERRCODE_DATA_CORRUPTED),
    2763             :                  errmsg("replication slot file \"%s\" has corrupted length %u",
    2764             :                         path, cp.length)));
    2765             : 
    2766             :     /* Now that we know the size, read the entire file */
    2767         242 :     pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
    2768         484 :     readBytes = read(fd,
    2769             :                      (char *) &cp + ReplicationSlotOnDiskConstantSize,
    2770         242 :                      cp.length);
    2771         242 :     pgstat_report_wait_end();
    2772         242 :     if (readBytes != cp.length)
    2773             :     {
    2774           0 :         if (readBytes < 0)
    2775           0 :             ereport(PANIC,
    2776             :                     (errcode_for_file_access(),
    2777             :                      errmsg("could not read file \"%s\": %m", path)));
    2778             :         else
    2779           0 :             ereport(PANIC,
    2780             :                     (errcode(ERRCODE_DATA_CORRUPTED),
    2781             :                      errmsg("could not read file \"%s\": read %d of %zu",
    2782             :                             path, readBytes, (Size) cp.length)));
    2783             :     }
    2784             : 
    2785         242 :     if (CloseTransientFile(fd) != 0)
    2786           0 :         ereport(PANIC,
    2787             :                 (errcode_for_file_access(),
    2788             :                  errmsg("could not close file \"%s\": %m", path)));
    2789             : 
    2790             :     /* now verify the CRC */
    2791         242 :     INIT_CRC32C(checksum);
    2792         242 :     COMP_CRC32C(checksum,
    2793             :                 (char *) &cp + ReplicationSlotOnDiskNotChecksummedSize,
    2794             :                 ReplicationSlotOnDiskChecksummedSize);
    2795         242 :     FIN_CRC32C(checksum);
    2796             : 
    2797         242 :     if (!EQ_CRC32C(checksum, cp.checksum))
    2798           0 :         ereport(PANIC,
    2799             :                 (errmsg("checksum mismatch for replication slot file \"%s\": is %u, should be %u",
    2800             :                         path, checksum, cp.checksum)));
    2801             : 
    2802             :     /*
    2803             :      * If we crashed with an ephemeral slot active, don't restore but delete
    2804             :      * it.
    2805             :      */
    2806         242 :     if (cp.slotdata.persistency != RS_PERSISTENT)
    2807             :     {
    2808           0 :         if (!rmtree(slotdir, true))
    2809             :         {
    2810           0 :             ereport(WARNING,
    2811             :                     (errmsg("could not remove directory \"%s\"",
    2812             :                             slotdir)));
    2813             :         }
    2814           0 :         fsync_fname(PG_REPLSLOT_DIR, true);
    2815           0 :         return;
    2816             :     }
    2817             : 
    2818             :     /*
    2819             :      * Verify that requirements for the specific slot type are met. That's
    2820             :      * important because if these aren't met we're not guaranteed to retain
    2821             :      * all the necessary resources for the slot.
    2822             :      *
    2823             :      * NB: We have to do so *after* the above checks for ephemeral slots,
    2824             :      * because otherwise a slot that shouldn't exist anymore could prevent
    2825             :      * restarts.
    2826             :      *
    2827             :      * NB: Changing the requirements here also requires adapting
    2828             :      * CheckSlotRequirements() and CheckLogicalDecodingRequirements().
    2829             :      */
    2830         242 :     if (cp.slotdata.database != InvalidOid)
    2831             :     {
    2832         166 :         if (wal_level < WAL_LEVEL_REPLICA)
    2833           2 :             ereport(FATAL,
    2834             :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2835             :                      errmsg("logical replication slot \"%s\" exists, but \"wal_level\" < \"replica\"",
    2836             :                             NameStr(cp.slotdata.name)),
    2837             :                      errhint("Change \"wal_level\" to be \"replica\" or higher.")));
    2838             : 
    2839             :         /*
    2840             :          * In standby mode, the hot standby must be enabled. This check is
    2841             :          * necessary to ensure logical slots are invalidated when they become
    2842             :          * incompatible due to insufficient wal_level. Otherwise, if the
    2843             :          * primary reduces effective_wal_level < logical while hot standby is
    2844             :          * disabled, primary disable logical decoding while hot standby is
    2845             :          * disabled, logical slots would remain valid even after promotion.
    2846             :          */
    2847         164 :         if (StandbyMode && !EnableHotStandby)
    2848           2 :             ereport(FATAL,
    2849             :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2850             :                      errmsg("logical replication slot \"%s\" exists on the standby, but \"hot_standby\" = \"off\"",
    2851             :                             NameStr(cp.slotdata.name)),
    2852             :                      errhint("Change \"hot_standby\" to be \"on\".")));
    2853             :     }
    2854          76 :     else if (wal_level < WAL_LEVEL_REPLICA)
    2855           0 :         ereport(FATAL,
    2856             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2857             :                  errmsg("physical replication slot \"%s\" exists, but \"wal_level\" < \"replica\"",
    2858             :                         NameStr(cp.slotdata.name)),
    2859             :                  errhint("Change \"wal_level\" to be \"replica\" or higher.")));
    2860             : 
    2861             :     /* nothing can be active yet, don't lock anything */
    2862         352 :     for (i = 0; i < max_replication_slots; i++)
    2863             :     {
    2864             :         ReplicationSlot *slot;
    2865             : 
    2866         352 :         slot = &ReplicationSlotCtl->replication_slots[i];
    2867             : 
    2868         352 :         if (slot->in_use)
    2869         114 :             continue;
    2870             : 
    2871             :         /* restore the entire set of persistent data */
    2872         238 :         memcpy(&slot->data, &cp.slotdata,
    2873             :                sizeof(ReplicationSlotPersistentData));
    2874             : 
    2875             :         /* initialize in memory state */
    2876         238 :         slot->effective_xmin = cp.slotdata.xmin;
    2877         238 :         slot->effective_catalog_xmin = cp.slotdata.catalog_xmin;
    2878         238 :         slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
    2879         238 :         slot->last_saved_restart_lsn = cp.slotdata.restart_lsn;
    2880             : 
    2881         238 :         slot->candidate_catalog_xmin = InvalidTransactionId;
    2882         238 :         slot->candidate_xmin_lsn = InvalidXLogRecPtr;
    2883         238 :         slot->candidate_restart_lsn = InvalidXLogRecPtr;
    2884         238 :         slot->candidate_restart_valid = InvalidXLogRecPtr;
    2885             : 
    2886         238 :         slot->in_use = true;
    2887         238 :         slot->active_proc = INVALID_PROC_NUMBER;
    2888             : 
    2889             :         /*
    2890             :          * Set the time since the slot has become inactive after loading the
    2891             :          * slot from the disk into memory. Whoever acquires the slot i.e.
    2892             :          * makes the slot active will reset it. Use the same inactive_since
    2893             :          * time for all the slots.
    2894             :          */
    2895         238 :         if (now == 0)
    2896         238 :             now = GetCurrentTimestamp();
    2897             : 
    2898         238 :         ReplicationSlotSetInactiveSince(slot, now, false);
    2899             : 
    2900         238 :         restored = true;
    2901         238 :         break;
    2902             :     }
    2903             : 
    2904         238 :     if (!restored)
    2905           0 :         ereport(FATAL,
    2906             :                 (errmsg("too many replication slots active before shutdown"),
    2907             :                  errhint("Increase \"max_replication_slots\" and try again.")));
    2908             : }
    2909             : 
    2910             : /*
    2911             :  * Maps an invalidation reason for a replication slot to
    2912             :  * ReplicationSlotInvalidationCause.
    2913             :  */
    2914             : ReplicationSlotInvalidationCause
    2915           0 : GetSlotInvalidationCause(const char *cause_name)
    2916             : {
    2917             :     Assert(cause_name);
    2918             : 
    2919             :     /* Search lookup table for the cause having this name */
    2920           0 :     for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
    2921             :     {
    2922           0 :         if (strcmp(SlotInvalidationCauses[i].cause_name, cause_name) == 0)
    2923           0 :             return SlotInvalidationCauses[i].cause;
    2924             :     }
    2925             : 
    2926             :     Assert(false);
    2927           0 :     return RS_INVAL_NONE;       /* to keep compiler quiet */
    2928             : }
    2929             : 
    2930             : /*
    2931             :  * Maps a ReplicationSlotInvalidationCause to the invalidation
    2932             :  * reason for a replication slot.
    2933             :  */
    2934             : const char *
    2935          88 : GetSlotInvalidationCauseName(ReplicationSlotInvalidationCause cause)
    2936             : {
    2937             :     /* Search lookup table for the name of this cause */
    2938         282 :     for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
    2939             :     {
    2940         282 :         if (SlotInvalidationCauses[i].cause == cause)
    2941          88 :             return SlotInvalidationCauses[i].cause_name;
    2942             :     }
    2943             : 
    2944             :     Assert(false);
    2945           0 :     return "none";                /* to keep compiler quiet */
    2946             : }
    2947             : 
    2948             : /*
    2949             :  * A helper function to validate slots specified in GUC synchronized_standby_slots.
    2950             :  *
    2951             :  * The rawname will be parsed, and the result will be saved into *elemlist.
    2952             :  */
    2953             : static bool
    2954          38 : validate_sync_standby_slots(char *rawname, List **elemlist)
    2955             : {
    2956             :     /* Verify syntax and parse string into a list of identifiers */
    2957          38 :     if (!SplitIdentifierString(rawname, ',', elemlist))
    2958             :     {
    2959           0 :         GUC_check_errdetail("List syntax is invalid.");
    2960           0 :         return false;
    2961             :     }
    2962             : 
    2963             :     /* Iterate the list to validate each slot name */
    2964         106 :     foreach_ptr(char, name, *elemlist)
    2965             :     {
    2966             :         int         err_code;
    2967          38 :         char       *err_msg = NULL;
    2968          38 :         char       *err_hint = NULL;
    2969             : 
    2970          38 :         if (!ReplicationSlotValidateNameInternal(name, false, &err_code,
    2971             :                                                  &err_msg, &err_hint))
    2972             :         {
    2973           4 :             GUC_check_errcode(err_code);
    2974           4 :             GUC_check_errdetail("%s", err_msg);
    2975           4 :             if (err_hint != NULL)
    2976           4 :                 GUC_check_errhint("%s", err_hint);
    2977           4 :             return false;
    2978             :         }
    2979             :     }
    2980             : 
    2981          34 :     return true;
    2982             : }
    2983             : 
    2984             : /*
    2985             :  * GUC check_hook for synchronized_standby_slots
    2986             :  */
    2987             : bool
    2988        2418 : check_synchronized_standby_slots(char **newval, void **extra, GucSource source)
    2989             : {
    2990             :     char       *rawname;
    2991             :     char       *ptr;
    2992             :     List       *elemlist;
    2993             :     int         size;
    2994             :     bool        ok;
    2995             :     SyncStandbySlotsConfigData *config;
    2996             : 
    2997        2418 :     if ((*newval)[0] == '\0')
    2998        2380 :         return true;
    2999             : 
    3000             :     /* Need a modifiable copy of the GUC string */
    3001          38 :     rawname = pstrdup(*newval);
    3002             : 
    3003             :     /* Now verify if the specified slots exist and have correct type */
    3004          38 :     ok = validate_sync_standby_slots(rawname, &elemlist);
    3005             : 
    3006          38 :     if (!ok || elemlist == NIL)
    3007             :     {
    3008           4 :         pfree(rawname);
    3009           4 :         list_free(elemlist);
    3010           4 :         return ok;
    3011             :     }
    3012             : 
    3013             :     /* Compute the size required for the SyncStandbySlotsConfigData struct */
    3014          34 :     size = offsetof(SyncStandbySlotsConfigData, slot_names);
    3015         102 :     foreach_ptr(char, slot_name, elemlist)
    3016          34 :         size += strlen(slot_name) + 1;
    3017             : 
    3018             :     /* GUC extra value must be guc_malloc'd, not palloc'd */
    3019          34 :     config = (SyncStandbySlotsConfigData *) guc_malloc(LOG, size);
    3020          34 :     if (!config)
    3021           0 :         return false;
    3022             : 
    3023             :     /* Transform the data into SyncStandbySlotsConfigData */
    3024          34 :     config->nslotnames = list_length(elemlist);
    3025             : 
    3026          34 :     ptr = config->slot_names;
    3027         102 :     foreach_ptr(char, slot_name, elemlist)
    3028             :     {
    3029          34 :         strcpy(ptr, slot_name);
    3030          34 :         ptr += strlen(slot_name) + 1;
    3031             :     }
    3032             : 
    3033          34 :     *extra = config;
    3034             : 
    3035          34 :     pfree(rawname);
    3036          34 :     list_free(elemlist);
    3037          34 :     return true;
    3038             : }
    3039             : 
    3040             : /*
    3041             :  * GUC assign_hook for synchronized_standby_slots
    3042             :  */
    3043             : void
    3044        2412 : assign_synchronized_standby_slots(const char *newval, void *extra)
    3045             : {
    3046             :     /*
    3047             :      * The standby slots may have changed, so we must recompute the oldest
    3048             :      * LSN.
    3049             :      */
    3050        2412 :     ss_oldest_flush_lsn = InvalidXLogRecPtr;
    3051             : 
    3052        2412 :     synchronized_standby_slots_config = (SyncStandbySlotsConfigData *) extra;
    3053        2412 : }
    3054             : 
    3055             : /*
    3056             :  * Check if the passed slot_name is specified in the synchronized_standby_slots GUC.
    3057             :  */
    3058             : bool
    3059       69120 : SlotExistsInSyncStandbySlots(const char *slot_name)
    3060             : {
    3061             :     const char *standby_slot_name;
    3062             : 
    3063             :     /* Return false if there is no value in synchronized_standby_slots */
    3064       69120 :     if (synchronized_standby_slots_config == NULL)
    3065       69096 :         return false;
    3066             : 
    3067             :     /*
    3068             :      * XXX: We are not expecting this list to be long so a linear search
    3069             :      * shouldn't hurt but if that turns out not to be true then we can cache
    3070             :      * this information for each WalSender as well.
    3071             :      */
    3072          24 :     standby_slot_name = synchronized_standby_slots_config->slot_names;
    3073          36 :     for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
    3074             :     {
    3075          24 :         if (strcmp(standby_slot_name, slot_name) == 0)
    3076          12 :             return true;
    3077             : 
    3078          12 :         standby_slot_name += strlen(standby_slot_name) + 1;
    3079             :     }
    3080             : 
    3081          12 :     return false;
    3082             : }
    3083             : 
    3084             : /*
    3085             :  * Return true if the slots specified in synchronized_standby_slots have caught up to
    3086             :  * the given WAL location, false otherwise.
    3087             :  *
    3088             :  * The elevel parameter specifies the error level used for logging messages
    3089             :  * related to slots that do not exist, are invalidated, or are inactive.
    3090             :  */
    3091             : bool
    3092        1352 : StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
    3093             : {
    3094             :     const char *name;
    3095        1352 :     int         caught_up_slot_num = 0;
    3096        1352 :     XLogRecPtr  min_restart_lsn = InvalidXLogRecPtr;
    3097             : 
    3098             :     /*
    3099             :      * Don't need to wait for the standbys to catch up if there is no value in
    3100             :      * synchronized_standby_slots.
    3101             :      */
    3102        1352 :     if (synchronized_standby_slots_config == NULL)
    3103        1310 :         return true;
    3104             : 
    3105             :     /*
    3106             :      * Don't need to wait for the standbys to catch up if we are on a standby
    3107             :      * server, since we do not support syncing slots to cascading standbys.
    3108             :      */
    3109          42 :     if (RecoveryInProgress())
    3110           0 :         return true;
    3111             : 
    3112             :     /*
    3113             :      * Don't need to wait for the standbys to catch up if they are already
    3114             :      * beyond the specified WAL location.
    3115             :      */
    3116          42 :     if (XLogRecPtrIsValid(ss_oldest_flush_lsn) &&
    3117          20 :         ss_oldest_flush_lsn >= wait_for_lsn)
    3118          12 :         return true;
    3119             : 
    3120             :     /*
    3121             :      * To prevent concurrent slot dropping and creation while filtering the
    3122             :      * slots, take the ReplicationSlotControlLock outside of the loop.
    3123             :      */
    3124          30 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    3125             : 
    3126          30 :     name = synchronized_standby_slots_config->slot_names;
    3127          40 :     for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
    3128             :     {
    3129             :         XLogRecPtr  restart_lsn;
    3130             :         bool        invalidated;
    3131             :         bool        inactive;
    3132             :         ReplicationSlot *slot;
    3133             : 
    3134          30 :         slot = SearchNamedReplicationSlot(name, false);
    3135             : 
    3136             :         /*
    3137             :          * If a slot name provided in synchronized_standby_slots does not
    3138             :          * exist, report a message and exit the loop.
    3139             :          */
    3140          30 :         if (!slot)
    3141             :         {
    3142           0 :             ereport(elevel,
    3143             :                     errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    3144             :                     errmsg("replication slot \"%s\" specified in parameter \"%s\" does not exist",
    3145             :                            name, "synchronized_standby_slots"),
    3146             :                     errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
    3147             :                               name),
    3148             :                     errhint("Create the replication slot \"%s\" or amend parameter \"%s\".",
    3149             :                             name, "synchronized_standby_slots"));
    3150           0 :             break;
    3151             :         }
    3152             : 
    3153             :         /* Same as above: if a slot is not physical, exit the loop. */
    3154          30 :         if (SlotIsLogical(slot))
    3155             :         {
    3156           0 :             ereport(elevel,
    3157             :                     errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    3158             :                     errmsg("cannot specify logical replication slot \"%s\" in parameter \"%s\"",
    3159             :                            name, "synchronized_standby_slots"),
    3160             :                     errdetail("Logical replication is waiting for correction on replication slot \"%s\".",
    3161             :                               name),
    3162             :                     errhint("Remove the logical replication slot \"%s\" from parameter \"%s\".",
    3163             :                             name, "synchronized_standby_slots"));
    3164           0 :             break;
    3165             :         }
    3166             : 
    3167          30 :         SpinLockAcquire(&slot->mutex);
    3168          30 :         restart_lsn = slot->data.restart_lsn;
    3169          30 :         invalidated = slot->data.invalidated != RS_INVAL_NONE;
    3170          30 :         inactive = slot->active_proc == INVALID_PROC_NUMBER;
    3171          30 :         SpinLockRelease(&slot->mutex);
    3172             : 
    3173          30 :         if (invalidated)
    3174             :         {
    3175             :             /* Specified physical slot has been invalidated */
    3176           0 :             ereport(elevel,
    3177             :                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    3178             :                     errmsg("physical replication slot \"%s\" specified in parameter \"%s\" has been invalidated",
    3179             :                            name, "synchronized_standby_slots"),
    3180             :                     errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
    3181             :                               name),
    3182             :                     errhint("Drop and recreate the replication slot \"%s\", or amend parameter \"%s\".",
    3183             :                             name, "synchronized_standby_slots"));
    3184           0 :             break;
    3185             :         }
    3186             : 
    3187          30 :         if (!XLogRecPtrIsValid(restart_lsn) || restart_lsn < wait_for_lsn)
    3188             :         {
    3189             :             /* Log a message if no active_pid for this physical slot */
    3190          20 :             if (inactive)
    3191          16 :                 ereport(elevel,
    3192             :                         errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    3193             :                         errmsg("replication slot \"%s\" specified in parameter \"%s\" does not have active_pid",
    3194             :                                name, "synchronized_standby_slots"),
    3195             :                         errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
    3196             :                                   name),
    3197             :                         errhint("Start the standby associated with the replication slot \"%s\", or amend parameter \"%s\".",
    3198             :                                 name, "synchronized_standby_slots"));
    3199             : 
    3200             :             /* Continue if the current slot hasn't caught up. */
    3201          20 :             break;
    3202             :         }
    3203             : 
    3204             :         Assert(restart_lsn >= wait_for_lsn);
    3205             : 
    3206          10 :         if (!XLogRecPtrIsValid(min_restart_lsn) ||
    3207             :             min_restart_lsn > restart_lsn)
    3208          10 :             min_restart_lsn = restart_lsn;
    3209             : 
    3210          10 :         caught_up_slot_num++;
    3211             : 
    3212          10 :         name += strlen(name) + 1;
    3213             :     }
    3214             : 
    3215          30 :     LWLockRelease(ReplicationSlotControlLock);
    3216             : 
    3217             :     /*
    3218             :      * Return false if not all the standbys have caught up to the specified
    3219             :      * WAL location.
    3220             :      */
    3221          30 :     if (caught_up_slot_num != synchronized_standby_slots_config->nslotnames)
    3222          20 :         return false;
    3223             : 
    3224             :     /* The ss_oldest_flush_lsn must not retreat. */
    3225             :     Assert(!XLogRecPtrIsValid(ss_oldest_flush_lsn) ||
    3226             :            min_restart_lsn >= ss_oldest_flush_lsn);
    3227             : 
    3228          10 :     ss_oldest_flush_lsn = min_restart_lsn;
    3229             : 
    3230          10 :     return true;
    3231             : }
    3232             : 
    3233             : /*
    3234             :  * Wait for physical standbys to confirm receiving the given lsn.
    3235             :  *
    3236             :  * Used by logical decoding SQL functions. It waits for physical standbys
    3237             :  * corresponding to the physical slots specified in the synchronized_standby_slots GUC.
    3238             :  */
    3239             : void
    3240         452 : WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
    3241             : {
    3242             :     /*
    3243             :      * Don't need to wait for the standby to catch up if the current acquired
    3244             :      * slot is not a logical failover slot, or there is no value in
    3245             :      * synchronized_standby_slots.
    3246             :      */
    3247         452 :     if (!MyReplicationSlot->data.failover || !synchronized_standby_slots_config)
    3248         450 :         return;
    3249             : 
    3250           2 :     ConditionVariablePrepareToSleep(&WalSndCtl->wal_confirm_rcv_cv);
    3251             : 
    3252             :     for (;;)
    3253             :     {
    3254           4 :         CHECK_FOR_INTERRUPTS();
    3255             : 
    3256           4 :         if (ConfigReloadPending)
    3257             :         {
    3258           2 :             ConfigReloadPending = false;
    3259           2 :             ProcessConfigFile(PGC_SIGHUP);
    3260             :         }
    3261             : 
    3262             :         /* Exit if done waiting for every slot. */
    3263           4 :         if (StandbySlotsHaveCaughtup(wait_for_lsn, WARNING))
    3264           2 :             break;
    3265             : 
    3266             :         /*
    3267             :          * Wait for the slots in the synchronized_standby_slots to catch up,
    3268             :          * but use a timeout (1s) so we can also check if the
    3269             :          * synchronized_standby_slots has been changed.
    3270             :          */
    3271           2 :         ConditionVariableTimedSleep(&WalSndCtl->wal_confirm_rcv_cv, 1000,
    3272             :                                     WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION);
    3273             :     }
    3274             : 
    3275           2 :     ConditionVariableCancelSleep();
    3276             : }

Generated by: LCOV version 1.16