LCOV - code coverage report
Current view: top level - src/backend/replication - slot.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19beta1 Lines: 86.2 % 1000 862
Test Date: 2026-06-13 02:16:31 Functions: 97.9 % 48 47
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * slot.c
       4              :  *     Replication slot management.
       5              :  *
       6              :  *
       7              :  * Copyright (c) 2012-2026, PostgreSQL Global Development Group
       8              :  *
       9              :  *
      10              :  * IDENTIFICATION
      11              :  *    src/backend/replication/slot.c
      12              :  *
      13              :  * NOTES
      14              :  *
      15              :  * Replication slots are used to keep state about replication streams
      16              :  * originating from this cluster.  Their primary purpose is to prevent the
      17              :  * premature removal of WAL or of old tuple versions in a manner that would
      18              :  * interfere with replication; they are also useful for monitoring purposes.
      19              :  * Slots need to be permanent (to allow restarts), crash-safe, and allocatable
      20              :  * on standbys (to support cascading setups).  The requirement that slots be
      21              :  * usable on standbys precludes storing them in the system catalogs.
      22              :  *
      23              :  * Each replication slot gets its own directory inside the directory
      24              :  * $PGDATA / PG_REPLSLOT_DIR.  Inside that directory the state file will
      25              :  * contain the slot's own data.  Additional data can be stored alongside that
      26              :  * file if required.  While the server is running, the state data is also
      27              :  * cached in memory for efficiency.
      28              :  *
      29              :  * ReplicationSlotAllocationLock must be taken in exclusive mode to allocate
      30              :  * or free a slot. ReplicationSlotControlLock must be taken in shared mode
      31              :  * to iterate over the slots, and in exclusive mode to change the in_use flag
      32              :  * of a slot.  The remaining data in each slot is protected by its mutex.
      33              :  *
      34              :  *-------------------------------------------------------------------------
      35              :  */
      36              : 
      37              : #include "postgres.h"
      38              : 
      39              : #include <unistd.h>
      40              : #include <sys/stat.h>
      41              : 
      42              : #include "access/transam.h"
      43              : #include "access/xlog_internal.h"
      44              : #include "access/xlogrecovery.h"
      45              : #include "common/file_utils.h"
      46              : #include "common/string.h"
      47              : #include "miscadmin.h"
      48              : #include "pgstat.h"
      49              : #include "postmaster/interrupt.h"
      50              : #include "replication/logicallauncher.h"
      51              : #include "replication/slotsync.h"
      52              : #include "replication/slot.h"
      53              : #include "replication/walsender_private.h"
      54              : #include "storage/fd.h"
      55              : #include "storage/ipc.h"
      56              : #include "storage/proc.h"
      57              : #include "storage/procarray.h"
      58              : #include "storage/subsystems.h"
      59              : #include "utils/builtins.h"
      60              : #include "utils/guc_hooks.h"
      61              : #include "utils/injection_point.h"
      62              : #include "utils/varlena.h"
      63              : #include "utils/wait_event.h"
      64              : 
      65              : /*
      66              :  * Replication slot on-disk data structure.
      67              :  */
      68              : typedef struct ReplicationSlotOnDisk
      69              : {
      70              :     /* first part of this struct needs to be version independent */
      71              : 
      72              :     /* data not covered by checksum */
      73              :     uint32      magic;
      74              :     pg_crc32c   checksum;
      75              : 
      76              :     /* data covered by checksum */
      77              :     uint32      version;
      78              :     uint32      length;
      79              : 
      80              :     /*
      81              :      * The actual data in the slot that follows can differ based on the above
      82              :      * 'version'.
      83              :      */
      84              : 
      85              :     ReplicationSlotPersistentData slotdata;
      86              : } ReplicationSlotOnDisk;
      87              : 
      88              : /*
      89              :  * Struct for the configuration of synchronized_standby_slots.
      90              :  *
      91              :  * Note: this must be a flat representation that can be held in a single chunk
      92              :  * of guc_malloc'd memory, so that it can be stored as the "extra" data for the
      93              :  * synchronized_standby_slots GUC.
      94              :  */
      95              : typedef struct
      96              : {
      97              :     /* Number of slot names in the slot_names[] */
      98              :     int         nslotnames;
      99              : 
     100              :     /*
     101              :      * slot_names contains 'nslotnames' consecutive null-terminated C strings.
     102              :      */
     103              :     char        slot_names[FLEXIBLE_ARRAY_MEMBER];
     104              : } SyncStandbySlotsConfigData;
     105              : 
     106              : /*
     107              :  * Lookup table for slot invalidation causes.
     108              :  */
     109              : typedef struct SlotInvalidationCauseMap
     110              : {
     111              :     ReplicationSlotInvalidationCause cause;
     112              :     const char *cause_name;
     113              : } SlotInvalidationCauseMap;
     114              : 
     115              : static const SlotInvalidationCauseMap SlotInvalidationCauses[] = {
     116              :     {RS_INVAL_NONE, "none"},
     117              :     {RS_INVAL_WAL_REMOVED, "wal_removed"},
     118              :     {RS_INVAL_HORIZON, "rows_removed"},
     119              :     {RS_INVAL_WAL_LEVEL, "wal_level_insufficient"},
     120              :     {RS_INVAL_IDLE_TIMEOUT, "idle_timeout"},
     121              : };
     122              : 
     123              : /*
     124              :  * Ensure that the lookup table is up-to-date with the enums defined in
     125              :  * ReplicationSlotInvalidationCause.
     126              :  */
     127              : StaticAssertDecl(lengthof(SlotInvalidationCauses) == (RS_INVAL_MAX_CAUSES + 1),
     128              :                  "array length mismatch");
     129              : 
     130              : /* size of version independent data */
     131              : #define ReplicationSlotOnDiskConstantSize \
     132              :     offsetof(ReplicationSlotOnDisk, slotdata)
     133              : /* size of the part of the slot not covered by the checksum */
     134              : #define ReplicationSlotOnDiskNotChecksummedSize  \
     135              :     offsetof(ReplicationSlotOnDisk, version)
     136              : /* size of the part covered by the checksum */
     137              : #define ReplicationSlotOnDiskChecksummedSize \
     138              :     sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskNotChecksummedSize
     139              : /* size of the slot data that is version dependent */
     140              : #define ReplicationSlotOnDiskV2Size \
     141              :     sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
     142              : 
     143              : #define SLOT_MAGIC      0x1051CA1   /* format identifier */
     144              : #define SLOT_VERSION    5       /* version for new files */
     145              : 
     146              : /* Control array for replication slot management */
     147              : ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
     148              : 
     149              : static void ReplicationSlotsShmemRequest(void *arg);
     150              : static void ReplicationSlotsShmemInit(void *arg);
     151              : 
     152              : const ShmemCallbacks ReplicationSlotsShmemCallbacks = {
     153              :     .request_fn = ReplicationSlotsShmemRequest,
     154              :     .init_fn = ReplicationSlotsShmemInit,
     155              : };
     156              : 
     157              : /* My backend's replication slot in the shared memory array */
     158              : ReplicationSlot *MyReplicationSlot = NULL;
     159              : 
     160              : /* GUC variables */
     161              : int         max_replication_slots = 10; /* the maximum number of replication
     162              :                                          * slots */
     163              : int         max_repack_replication_slots = 5;   /* the maximum number of slots
     164              :                                                  * for REPACK */
     165              : 
     166              : /*
     167              :  * Invalidate replication slots that have remained idle longer than this
     168              :  * duration; '0' disables it.
     169              :  */
     170              : int         idle_replication_slot_timeout_secs = 0;
     171              : 
     172              : /*
     173              :  * This GUC lists streaming replication standby server slot names that
     174              :  * logical WAL sender processes will wait for.
     175              :  */
     176              : char       *synchronized_standby_slots;
     177              : 
     178              : /* This is the parsed and cached configuration for synchronized_standby_slots */
     179              : static SyncStandbySlotsConfigData *synchronized_standby_slots_config;
     180              : 
     181              : /*
     182              :  * Oldest LSN that has been confirmed to be flushed to the standbys
     183              :  * corresponding to the physical slots specified in the synchronized_standby_slots GUC.
     184              :  */
     185              : static XLogRecPtr ss_oldest_flush_lsn = InvalidXLogRecPtr;
     186              : 
     187              : static void ReplicationSlotShmemExit(int code, Datum arg);
     188              : static bool IsSlotForConflictCheck(const char *name);
     189              : static void ReplicationSlotDropPtr(ReplicationSlot *slot);
     190              : 
     191              : /* internal persistency functions */
     192              : static void RestoreSlotFromDisk(const char *name);
     193              : static void CreateSlotOnDisk(ReplicationSlot *slot);
     194              : static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel);
     195              : 
     196              : /*
     197              :  * Register shared memory space needed for replication slots.
     198              :  */
     199              : static void
     200         1255 : ReplicationSlotsShmemRequest(void *arg)
     201              : {
     202              :     Size        size;
     203              : 
     204         1255 :     if (max_replication_slots + max_repack_replication_slots == 0)
     205            0 :         return;
     206              : 
     207         1255 :     size = offsetof(ReplicationSlotCtlData, replication_slots);
     208         1255 :     size = add_size(size,
     209         1255 :                     mul_size(max_replication_slots + max_repack_replication_slots,
     210              :                              sizeof(ReplicationSlot)));
     211         1255 :     ShmemRequestStruct(.name = "ReplicationSlot Ctl",
     212              :                        .size = size,
     213              :                        .ptr = (void **) &ReplicationSlotCtl,
     214              :         );
     215              : }
     216              : 
     217              : /*
     218              :  * Initialize shared memory for replication slots.
     219              :  */
     220              : static void
     221         1252 : ReplicationSlotsShmemInit(void *arg)
     222              : {
     223        19824 :     for (int i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
     224              :     {
     225        18572 :         ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i];
     226              : 
     227              :         /* everything else is zeroed by the memset above */
     228        18572 :         slot->active_proc = INVALID_PROC_NUMBER;
     229        18572 :         SpinLockInit(&slot->mutex);
     230        18572 :         LWLockInitialize(&slot->io_in_progress_lock,
     231              :                          LWTRANCHE_REPLICATION_SLOT_IO);
     232        18572 :         ConditionVariableInit(&slot->active_cv);
     233              :     }
     234         1252 : }
     235              : 
     236              : /*
     237              :  * Register the callback for replication slot cleanup and releasing.
     238              :  */
     239              : void
     240        24570 : ReplicationSlotInitialize(void)
     241              : {
     242        24570 :     before_shmem_exit(ReplicationSlotShmemExit, 0);
     243        24570 : }
     244              : 
     245              : /*
     246              :  * Release and cleanup replication slots.
     247              :  */
     248              : static void
     249        24570 : ReplicationSlotShmemExit(int code, Datum arg)
     250              : {
     251              :     /* Make sure active replication slots are released */
     252        24570 :     if (MyReplicationSlot != NULL)
     253          305 :         ReplicationSlotRelease();
     254              : 
     255              :     /* Also cleanup all the temporary slots. */
     256        24570 :     ReplicationSlotCleanup(false);
     257        24570 : }
     258              : 
     259              : /*
     260              :  * Check whether the passed slot name is valid and report errors at elevel.
     261              :  *
     262              :  * See comments for ReplicationSlotValidateNameInternal().
     263              :  */
     264              : bool
     265          879 : ReplicationSlotValidateName(const char *name, bool allow_reserved_name,
     266              :                             int elevel)
     267              : {
     268              :     int         err_code;
     269          879 :     char       *err_msg = NULL;
     270          879 :     char       *err_hint = NULL;
     271              : 
     272          879 :     if (!ReplicationSlotValidateNameInternal(name, allow_reserved_name,
     273              :                                              &err_code, &err_msg, &err_hint))
     274              :     {
     275              :         /*
     276              :          * Use errmsg_internal() and errhint_internal() instead of errmsg()
     277              :          * and errhint(), since the messages from
     278              :          * ReplicationSlotValidateNameInternal() are already translated. This
     279              :          * avoids double translation.
     280              :          */
     281            5 :         ereport(elevel,
     282              :                 errcode(err_code),
     283              :                 errmsg_internal("%s", err_msg),
     284              :                 (err_hint != NULL) ? errhint_internal("%s", err_hint) : 0);
     285              : 
     286            0 :         pfree(err_msg);
     287            0 :         if (err_hint != NULL)
     288            0 :             pfree(err_hint);
     289            0 :         return false;
     290              :     }
     291              : 
     292          874 :     return true;
     293              : }
     294              : 
     295              : /*
     296              :  * Check whether the passed slot name is valid.
     297              :  *
     298              :  * An error will be reported for a reserved replication slot name if
     299              :  * allow_reserved_name is set to false.
     300              :  *
     301              :  * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow
     302              :  * the name to be used as a directory name on every supported OS.
     303              :  *
     304              :  * Returns true if the slot name is valid. Otherwise, returns false and stores
     305              :  * the error code, error message, and optional hint in err_code, err_msg, and
     306              :  * err_hint, respectively. The caller is responsible for freeing err_msg and
     307              :  * err_hint, which are palloc'd.
     308              :  */
     309              : bool
     310         1097 : ReplicationSlotValidateNameInternal(const char *name, bool allow_reserved_name,
     311              :                                     int *err_code, char **err_msg, char **err_hint)
     312              : {
     313              :     const char *cp;
     314              : 
     315         1097 :     if (strlen(name) == 0)
     316              :     {
     317            4 :         *err_code = ERRCODE_INVALID_NAME;
     318            4 :         *err_msg = psprintf(_("replication slot name \"%s\" is too short"), name);
     319            4 :         *err_hint = NULL;
     320            4 :         return false;
     321              :     }
     322              : 
     323         1093 :     if (strlen(name) >= NAMEDATALEN)
     324              :     {
     325            0 :         *err_code = ERRCODE_NAME_TOO_LONG;
     326            0 :         *err_msg = psprintf(_("replication slot name \"%s\" is too long"), name);
     327            0 :         *err_hint = NULL;
     328            0 :         return false;
     329              :     }
     330              : 
     331        21091 :     for (cp = name; *cp; cp++)
     332              :     {
     333        20000 :         if (!((*cp >= 'a' && *cp <= 'z')
     334         9525 :               || (*cp >= '0' && *cp <= '9')
     335         1951 :               || (*cp == '_')))
     336              :         {
     337            2 :             *err_code = ERRCODE_INVALID_NAME;
     338            2 :             *err_msg = psprintf(_("replication slot name \"%s\" contains invalid character"), name);
     339            2 :             *err_hint = psprintf(_("Replication slot names may only contain lower case letters, numbers, and the underscore character."));
     340            2 :             return false;
     341              :         }
     342              :     }
     343              : 
     344         1091 :     if (!allow_reserved_name && IsSlotForConflictCheck(name))
     345              :     {
     346            1 :         *err_code = ERRCODE_RESERVED_NAME;
     347            1 :         *err_msg = psprintf(_("replication slot name \"%s\" is reserved"), name);
     348            1 :         *err_hint = psprintf(_("The name \"%s\" is reserved for the conflict detection slot."),
     349              :                              CONFLICT_DETECTION_SLOT);
     350            1 :         return false;
     351              :     }
     352              : 
     353         1090 :     return true;
     354              : }
     355              : 
     356              : /*
     357              :  * Return true if the replication slot name is "pg_conflict_detection".
     358              :  */
     359              : static bool
     360         2384 : IsSlotForConflictCheck(const char *name)
     361              : {
     362         2384 :     return (strcmp(name, CONFLICT_DETECTION_SLOT) == 0);
     363              : }
     364              : 
     365              : /*
     366              :  * Create a new replication slot and mark it as used by this backend.
     367              :  *
     368              :  * name: Name of the slot
     369              :  * db_specific: logical decoding is db specific; if the slot is going to
     370              :  *     be used for that pass true, otherwise false.
     371              :  * two_phase: If enabled, allows decoding of prepared transactions.
     372              :  * repack: If true, use a slot from the pool for REPACK.
     373              :  * failover: If enabled, allows the slot to be synced to standbys so
     374              :  *     that logical replication can be resumed after failover.
     375              :  * synced: True if the slot is synchronized from the primary server.
     376              :  */
     377              : void
     378          731 : ReplicationSlotCreate(const char *name, bool db_specific,
     379              :                       ReplicationSlotPersistency persistency,
     380              :                       bool two_phase, bool repack, bool failover, bool synced)
     381              : {
     382          731 :     ReplicationSlot *slot = NULL;
     383              :     int         startpoint,
     384              :                 endpoint;
     385              : 
     386              :     Assert(MyReplicationSlot == NULL);
     387              : 
     388              :     /*
     389              :      * The logical launcher or pg_upgrade may create or migrate an internal
     390              :      * slot, so using a reserved name is allowed in these cases.
     391              :      */
     392          731 :     ReplicationSlotValidateName(name, IsBinaryUpgrade || IsLogicalLauncher(),
     393          731 :                                 ERROR);
     394              : 
     395          730 :     if (failover)
     396              :     {
     397              :         /*
     398              :          * Do not allow users to create the failover enabled slots on the
     399              :          * standby as we do not support sync to the cascading standby.
     400              :          *
     401              :          * However, failover enabled slots can be created during slot
     402              :          * synchronization because we need to retain the same values as the
     403              :          * remote slot.
     404              :          */
     405           31 :         if (RecoveryInProgress() && !IsSyncingReplicationSlots())
     406            0 :             ereport(ERROR,
     407              :                     errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     408              :                     errmsg("cannot enable failover for a replication slot created on the standby"));
     409              : 
     410              :         /*
     411              :          * Do not allow users to create failover enabled temporary slots,
     412              :          * because temporary slots will not be synced to the standby.
     413              :          *
     414              :          * However, failover enabled temporary slots can be created during
     415              :          * slot synchronization. See the comments atop slotsync.c for details.
     416              :          */
     417           31 :         if (persistency == RS_TEMPORARY && !IsSyncingReplicationSlots())
     418            1 :             ereport(ERROR,
     419              :                     errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     420              :                     errmsg("cannot enable failover for a temporary replication slot"));
     421              :     }
     422              : 
     423              :     /*
     424              :      * If some other backend ran this code concurrently with us, we'd likely
     425              :      * both allocate the same slot, and that would be bad.  We'd also be at
     426              :      * risk of missing a name collision.  Also, we don't want to try to create
     427              :      * a new slot while somebody's busy cleaning up an old one, because we
     428              :      * might both be monkeying with the same directory.
     429              :      */
     430          729 :     LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
     431              : 
     432              :     /*
     433              :      * Check for name collision (across the whole array), and identify an
     434              :      * allocatable slot (in the array slice specific to our current use case:
     435              :      * either general, or REPACK only).  We need to hold
     436              :      * ReplicationSlotControlLock in shared mode for this, so that nobody else
     437              :      * can change the in_use flags while we're looking at them.
     438              :      */
     439          729 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     440          729 :     startpoint = !repack ? 0 : max_replication_slots;
     441          729 :     endpoint = max_replication_slots + (repack ? max_repack_replication_slots : 0);
     442        10713 :     for (int i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
     443              :     {
     444         9987 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
     445              : 
     446         9987 :         if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
     447            3 :             ereport(ERROR,
     448              :                     (errcode(ERRCODE_DUPLICATE_OBJECT),
     449              :                      errmsg("replication slot \"%s\" already exists", name)));
     450              : 
     451         9984 :         if (i >= startpoint && i < endpoint &&
     452         6331 :             !s->in_use && slot == NULL)
     453          725 :             slot = s;
     454              :     }
     455          726 :     LWLockRelease(ReplicationSlotControlLock);
     456              : 
     457              :     /* If all slots are in use, we're out of luck. */
     458          726 :     if (slot == NULL)
     459            1 :         ereport(ERROR,
     460              :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     461              :                  errmsg("all replication slots are in use"),
     462              :                  errhint("Free one or increase \"%s\".",
     463              :                          repack ? "max_repack_replication_slots" : "max_replication_slots")));
     464              : 
     465              :     /*
     466              :      * Since this slot is not in use, nobody should be looking at any part of
     467              :      * it other than the in_use field unless they're trying to allocate it.
     468              :      * And since we hold ReplicationSlotAllocationLock, nobody except us can
     469              :      * be doing that.  So it's safe to initialize the slot.
     470              :      */
     471              :     Assert(!slot->in_use);
     472              :     Assert(slot->active_proc == INVALID_PROC_NUMBER);
     473              : 
     474              :     /* first initialize persistent data */
     475          725 :     memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData));
     476          725 :     namestrcpy(&slot->data.name, name);
     477          725 :     slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
     478          725 :     slot->data.persistency = persistency;
     479          725 :     slot->data.two_phase = two_phase;
     480          725 :     slot->data.two_phase_at = InvalidXLogRecPtr;
     481          725 :     slot->data.failover = failover;
     482          725 :     slot->data.synced = synced;
     483              : 
     484              :     /* and then data only present in shared memory */
     485          725 :     slot->just_dirtied = false;
     486          725 :     slot->dirty = false;
     487          725 :     slot->effective_xmin = InvalidTransactionId;
     488          725 :     slot->effective_catalog_xmin = InvalidTransactionId;
     489          725 :     slot->candidate_catalog_xmin = InvalidTransactionId;
     490          725 :     slot->candidate_xmin_lsn = InvalidXLogRecPtr;
     491          725 :     slot->candidate_restart_valid = InvalidXLogRecPtr;
     492          725 :     slot->candidate_restart_lsn = InvalidXLogRecPtr;
     493          725 :     slot->last_saved_confirmed_flush = InvalidXLogRecPtr;
     494          725 :     slot->last_saved_restart_lsn = InvalidXLogRecPtr;
     495          725 :     slot->inactive_since = 0;
     496          725 :     slot->slotsync_skip_reason = SS_SKIP_NONE;
     497              : 
     498              :     /*
     499              :      * Create the slot on disk.  We haven't actually marked the slot allocated
     500              :      * yet, so no special cleanup is required if this errors out.
     501              :      */
     502          725 :     CreateSlotOnDisk(slot);
     503              : 
     504              :     /*
     505              :      * We need to briefly prevent any other backend from iterating over the
     506              :      * slots while we flip the in_use flag. We also need to set the active
     507              :      * flag while holding the ControlLock as otherwise a concurrent
     508              :      * ReplicationSlotAcquire() could acquire the slot as well.
     509              :      */
     510          725 :     LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
     511              : 
     512          725 :     slot->in_use = true;
     513              : 
     514              :     /* We can now mark the slot active, and that makes it our slot. */
     515          725 :     SpinLockAcquire(&slot->mutex);
     516              :     Assert(slot->active_proc == INVALID_PROC_NUMBER);
     517          725 :     slot->active_proc = MyProcNumber;
     518          725 :     SpinLockRelease(&slot->mutex);
     519          725 :     MyReplicationSlot = slot;
     520              : 
     521          725 :     LWLockRelease(ReplicationSlotControlLock);
     522              : 
     523              :     /*
     524              :      * Create statistics entry for the new logical slot. We don't collect any
     525              :      * stats for physical slots, so no need to create an entry for the same.
     526              :      * See ReplicationSlotDropPtr for why we need to do this before releasing
     527              :      * ReplicationSlotAllocationLock.
     528              :      */
     529          725 :     if (SlotIsLogical(slot))
     530          521 :         pgstat_create_replslot(slot);
     531              : 
     532              :     /*
     533              :      * Now that the slot has been marked as in_use and active, it's safe to
     534              :      * let somebody else try to allocate a slot.
     535              :      */
     536          725 :     LWLockRelease(ReplicationSlotAllocationLock);
     537              : 
     538              :     /* Let everybody know we've modified this slot */
     539          725 :     ConditionVariableBroadcast(&slot->active_cv);
     540          725 : }
     541              : 
     542              : /*
     543              :  * Search for the named replication slot.
     544              :  *
     545              :  * Return the replication slot if found, otherwise NULL.
     546              :  */
     547              : ReplicationSlot *
     548         2192 : SearchNamedReplicationSlot(const char *name, bool need_lock)
     549              : {
     550              :     int         i;
     551         2192 :     ReplicationSlot *slot = NULL;
     552              : 
     553         2192 :     if (need_lock)
     554          672 :         LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     555              : 
     556        11412 :     for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
     557              :     {
     558        10863 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
     559              : 
     560        10863 :         if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
     561              :         {
     562         1643 :             slot = s;
     563         1643 :             break;
     564              :         }
     565              :     }
     566              : 
     567         2192 :     if (need_lock)
     568          672 :         LWLockRelease(ReplicationSlotControlLock);
     569              : 
     570         2192 :     return slot;
     571              : }
     572              : 
     573              : /*
     574              :  * Return the index of the replication slot in
     575              :  * ReplicationSlotCtl->replication_slots.
     576              :  *
     577              :  * This is mainly useful to have an efficient key for storing replication slot
     578              :  * stats.
     579              :  */
     580              : int
     581         8525 : ReplicationSlotIndex(ReplicationSlot *slot)
     582              : {
     583              :     Assert(slot >= ReplicationSlotCtl->replication_slots &&
     584              :            slot < ReplicationSlotCtl->replication_slots +
     585              :            (max_replication_slots + max_repack_replication_slots));
     586              : 
     587         8525 :     return slot - ReplicationSlotCtl->replication_slots;
     588              : }
     589              : 
     590              : /*
     591              :  * If the slot at 'index' is unused, return false. Otherwise 'name' is set to
     592              :  * the slot's name and true is returned.
     593              :  *
     594              :  * This likely is only useful for pgstat_replslot.c during shutdown, in other
     595              :  * cases there are obvious TOCTOU issues.
     596              :  */
     597              : bool
     598          114 : ReplicationSlotName(int index, Name name)
     599              : {
     600              :     ReplicationSlot *slot;
     601              :     bool        found;
     602              : 
     603          114 :     slot = &ReplicationSlotCtl->replication_slots[index];
     604              : 
     605              :     /*
     606              :      * Ensure that the slot cannot be dropped while we copy the name. Don't
     607              :      * need the spinlock as the name of an existing slot cannot change.
     608              :      */
     609          114 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     610          114 :     found = slot->in_use;
     611          114 :     if (slot->in_use)
     612          114 :         namestrcpy(name, NameStr(slot->data.name));
     613          114 :     LWLockRelease(ReplicationSlotControlLock);
     614              : 
     615          114 :     return found;
     616              : }
     617              : 
     618              : /*
     619              :  * Find a previously created slot and mark it as used by this process.
     620              :  *
     621              :  * An error is raised if nowait is true and the slot is currently in use. If
     622              :  * nowait is false, we sleep until the slot is released by the owning process.
     623              :  *
     624              :  * An error is raised if error_if_invalid is true and the slot is found to
     625              :  * be invalid. It should always be set to true, except when we are temporarily
     626              :  * acquiring the slot and don't intend to change it.
     627              :  */
     628              : void
     629         1433 : ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
     630              : {
     631              :     ReplicationSlot *s;
     632              :     ProcNumber  active_proc;
     633              :     int         active_pid;
     634              : 
     635              :     Assert(name != NULL);
     636              : 
     637         1433 : retry:
     638              :     Assert(MyReplicationSlot == NULL);
     639              : 
     640         1433 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     641              : 
     642              :     /* Check if the slot exists with the given name. */
     643         1433 :     s = SearchNamedReplicationSlot(name, false);
     644         1433 :     if (s == NULL || !s->in_use)
     645              :     {
     646           10 :         LWLockRelease(ReplicationSlotControlLock);
     647              : 
     648           10 :         ereport(ERROR,
     649              :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
     650              :                  errmsg("replication slot \"%s\" does not exist",
     651              :                         name)));
     652              :     }
     653              : 
     654              :     /*
     655              :      * Do not allow users to acquire the reserved slot. This scenario may
     656              :      * occur if the launcher that owns the slot has terminated unexpectedly
     657              :      * due to an error, and a backend process attempts to reuse the slot.
     658              :      */
     659         1423 :     if (!IsLogicalLauncher() && IsSlotForConflictCheck(name))
     660            0 :         ereport(ERROR,
     661              :                 errcode(ERRCODE_UNDEFINED_OBJECT),
     662              :                 errmsg("cannot acquire replication slot \"%s\"", name),
     663              :                 errdetail("The slot is reserved for conflict detection and can only be acquired by logical replication launcher."));
     664              : 
     665              :     /*
     666              :      * This is the slot we want; check if it's active under some other
     667              :      * process.  In single user mode, we don't need this check.
     668              :      */
     669         1423 :     if (IsUnderPostmaster)
     670              :     {
     671              :         /*
     672              :          * Get ready to sleep on the slot in case it is active.  (We may end
     673              :          * up not sleeping, but we don't want to do this while holding the
     674              :          * spinlock.)
     675              :          */
     676         1418 :         if (!nowait)
     677          297 :             ConditionVariablePrepareToSleep(&s->active_cv);
     678              : 
     679              :         /*
     680              :          * It is important to reset the inactive_since under spinlock here to
     681              :          * avoid race conditions with slot invalidation. See comments related
     682              :          * to inactive_since in InvalidatePossiblyObsoleteSlot.
     683              :          */
     684         1418 :         SpinLockAcquire(&s->mutex);
     685         1418 :         if (s->active_proc == INVALID_PROC_NUMBER)
     686         1261 :             s->active_proc = MyProcNumber;
     687         1418 :         active_proc = s->active_proc;
     688         1418 :         ReplicationSlotSetInactiveSince(s, 0, false);
     689         1418 :         SpinLockRelease(&s->mutex);
     690              :     }
     691              :     else
     692              :     {
     693            5 :         s->active_proc = active_proc = MyProcNumber;
     694            5 :         ReplicationSlotSetInactiveSince(s, 0, true);
     695              :     }
     696         1423 :     active_pid = GetPGProcByNumber(active_proc)->pid;
     697         1423 :     LWLockRelease(ReplicationSlotControlLock);
     698              : 
     699              :     /*
     700              :      * If we found the slot but it's already active in another process, we
     701              :      * wait until the owning process signals us that it's been released, or
     702              :      * error out.
     703              :      */
     704         1423 :     if (active_proc != MyProcNumber)
     705              :     {
     706            0 :         if (!nowait)
     707              :         {
     708              :             /* Wait here until we get signaled, and then restart */
     709            0 :             ConditionVariableSleep(&s->active_cv,
     710              :                                    WAIT_EVENT_REPLICATION_SLOT_DROP);
     711            0 :             ConditionVariableCancelSleep();
     712            0 :             goto retry;
     713              :         }
     714              : 
     715            0 :         ereport(ERROR,
     716              :                 (errcode(ERRCODE_OBJECT_IN_USE),
     717              :                  errmsg("replication slot \"%s\" is active for PID %d",
     718              :                         NameStr(s->data.name), active_pid)));
     719              :     }
     720         1423 :     else if (!nowait)
     721          297 :         ConditionVariableCancelSleep(); /* no sleep needed after all */
     722              : 
     723              :     /* We made this slot active, so it's ours now. */
     724         1423 :     MyReplicationSlot = s;
     725              : 
     726              :     /*
     727              :      * We need to check for invalidation after making the slot ours to avoid
     728              :      * the possible race condition with the checkpointer that can otherwise
     729              :      * invalidate the slot immediately after the check.
     730              :      */
     731         1423 :     if (error_if_invalid && s->data.invalidated != RS_INVAL_NONE)
     732            8 :         ereport(ERROR,
     733              :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     734              :                 errmsg("can no longer access replication slot \"%s\"",
     735              :                        NameStr(s->data.name)),
     736              :                 errdetail("This replication slot has been invalidated due to \"%s\".",
     737              :                           GetSlotInvalidationCauseName(s->data.invalidated)));
     738              : 
     739              :     /* Let everybody know we've modified this slot */
     740         1415 :     ConditionVariableBroadcast(&s->active_cv);
     741              : 
     742              :     /*
     743              :      * The call to pgstat_acquire_replslot() protects against stats for a
     744              :      * different slot, from before a restart or such, being present during
     745              :      * pgstat_report_replslot().
     746              :      */
     747         1415 :     if (SlotIsLogical(s))
     748         1182 :         pgstat_acquire_replslot(s);
     749              : 
     750              : 
     751         1415 :     if (am_walsender)
     752              :     {
     753          970 :         ereport(log_replication_commands ? LOG : DEBUG1,
     754              :                 SlotIsLogical(s)
     755              :                 ? errmsg("acquired logical replication slot \"%s\"",
     756              :                          NameStr(s->data.name))
     757              :                 : errmsg("acquired physical replication slot \"%s\"",
     758              :                          NameStr(s->data.name)));
     759              :     }
     760         1415 : }
     761              : 
     762              : /*
     763              :  * Release the replication slot that this backend considers to own.
     764              :  *
     765              :  * This or another backend can re-acquire the slot later.
     766              :  * Resources this slot requires will be preserved.
     767              :  */
     768              : void
     769         1718 : ReplicationSlotRelease(void)
     770              : {
     771         1718 :     ReplicationSlot *slot = MyReplicationSlot;
     772         1718 :     char       *slotname = NULL;    /* keep compiler quiet */
     773              :     bool        is_logical;
     774         1718 :     TimestampTz now = 0;
     775              : 
     776              :     Assert(slot != NULL && slot->active_proc != INVALID_PROC_NUMBER);
     777              : 
     778         1718 :     is_logical = SlotIsLogical(slot);
     779              : 
     780         1718 :     if (am_walsender)
     781         1201 :         slotname = pstrdup(NameStr(slot->data.name));
     782              : 
     783         1718 :     if (slot->data.persistency == RS_EPHEMERAL)
     784              :     {
     785              :         /*
     786              :          * If slot is ephemeral, we drop it upon release, and request logical
     787              :          * decoding be disabled.
     788              :          */
     789            8 :         ReplicationSlotDropAcquired(is_logical);
     790              :     }
     791              :     else
     792              :     {
     793              :         /*
     794              :          * If slot needed to temporarily restrain both data and catalog xmin
     795              :          * to create the catalog snapshot, remove that temporary constraint.
     796              :          * Snapshots can only be exported while the initial snapshot is still
     797              :          * acquired.
     798              :          */
     799         1710 :         if (!TransactionIdIsValid(slot->data.xmin) &&
     800         1675 :             TransactionIdIsValid(slot->effective_xmin))
     801              :         {
     802          215 :             SpinLockAcquire(&slot->mutex);
     803          215 :             slot->effective_xmin = InvalidTransactionId;
     804          215 :             SpinLockRelease(&slot->mutex);
     805          215 :             ReplicationSlotsComputeRequiredXmin(false);
     806              :         }
     807              : 
     808              :         /*
     809              :          * Set the time since the slot has become inactive. We get the current
     810              :          * time beforehand to avoid system call while holding the spinlock.
     811              :          */
     812         1710 :         now = GetCurrentTimestamp();
     813              : 
     814         1710 :         if (slot->data.persistency == RS_PERSISTENT)
     815              :         {
     816              :             /*
     817              :              * Mark persistent slot inactive.  We're not freeing it, just
     818              :              * disconnecting, but wake up others that may be waiting for it.
     819              :              */
     820         1393 :             SpinLockAcquire(&slot->mutex);
     821         1393 :             slot->active_proc = INVALID_PROC_NUMBER;
     822         1393 :             ReplicationSlotSetInactiveSince(slot, now, false);
     823         1393 :             SpinLockRelease(&slot->mutex);
     824         1393 :             ConditionVariableBroadcast(&slot->active_cv);
     825              :         }
     826              :         else
     827          317 :             ReplicationSlotSetInactiveSince(slot, now, true);
     828              : 
     829         1710 :         MyReplicationSlot = NULL;
     830              :     }
     831              : 
     832              :     /* might not have been set when we've been a plain slot */
     833         1718 :     LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
     834         1718 :     MyProc->statusFlags &= ~PROC_IN_LOGICAL_DECODING;
     835         1718 :     ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags;
     836         1718 :     LWLockRelease(ProcArrayLock);
     837              : 
     838         1718 :     if (am_walsender)
     839              :     {
     840         1201 :         ereport(log_replication_commands ? LOG : DEBUG1,
     841              :                 is_logical
     842              :                 ? errmsg("released logical replication slot \"%s\"",
     843              :                          slotname)
     844              :                 : errmsg("released physical replication slot \"%s\"",
     845              :                          slotname));
     846              : 
     847         1201 :         pfree(slotname);
     848              :     }
     849         1718 : }
     850              : 
     851              : /*
     852              :  * Cleanup temporary slots created in current session.
     853              :  *
     854              :  * Cleanup only synced temporary slots if 'synced_only' is true, else
     855              :  * cleanup all temporary slots.
     856              :  *
     857              :  * If it drops the last logical slot in the cluster, requests to disable
     858              :  * logical decoding.
     859              :  */
     860              : void
     861        55915 : ReplicationSlotCleanup(bool synced_only)
     862              : {
     863              :     int         i;
     864              :     bool        found_valid_logicalslot;
     865        55915 :     bool        dropped_logical = false;
     866              : 
     867              :     Assert(MyReplicationSlot == NULL);
     868              : 
     869        56075 : restart:
     870        56075 :     found_valid_logicalslot = false;
     871        56075 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     872       888879 :     for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
     873              :     {
     874       832964 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
     875              : 
     876       832964 :         if (!s->in_use)
     877       817513 :             continue;
     878              : 
     879        15451 :         SpinLockAcquire(&s->mutex);
     880              : 
     881        30902 :         found_valid_logicalslot |=
     882        15451 :             (SlotIsLogical(s) && s->data.invalidated == RS_INVAL_NONE);
     883              : 
     884        15451 :         if ((s->active_proc == MyProcNumber &&
     885          160 :              (!synced_only || s->data.synced)))
     886              :         {
     887              :             Assert(s->data.persistency == RS_TEMPORARY);
     888          160 :             SpinLockRelease(&s->mutex);
     889          160 :             LWLockRelease(ReplicationSlotControlLock);  /* avoid deadlock */
     890              : 
     891          160 :             if (SlotIsLogical(s))
     892           10 :                 dropped_logical = true;
     893              : 
     894          160 :             ReplicationSlotDropPtr(s);
     895              : 
     896          160 :             ConditionVariableBroadcast(&s->active_cv);
     897          160 :             goto restart;
     898              :         }
     899              :         else
     900        15291 :             SpinLockRelease(&s->mutex);
     901              :     }
     902              : 
     903        55915 :     LWLockRelease(ReplicationSlotControlLock);
     904              : 
     905        55915 :     if (dropped_logical && !found_valid_logicalslot)
     906            3 :         RequestDisableLogicalDecoding();
     907        55915 : }
     908              : 
     909              : /*
     910              :  * Permanently drop the replication slot identified by the passed-in name.
     911              :  *
     912              :  * If this is a logical slot, request that logical decoding be disabled.
     913              :  */
     914              : void
     915          444 : ReplicationSlotDrop(const char *name, bool nowait)
     916              : {
     917          444 :     ReplicationSlotAcquire(name, nowait, false);
     918              : 
     919              :     /*
     920              :      * Do not allow users to drop the slots which are currently being synced
     921              :      * from the primary to the standby.
     922              :      */
     923          437 :     if (RecoveryInProgress() && MyReplicationSlot->data.synced)
     924            1 :         ereport(ERROR,
     925              :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     926              :                 errmsg("cannot drop replication slot \"%s\"", name),
     927              :                 errdetail("This replication slot is being synchronized from the primary server."));
     928              : 
     929          436 :     ReplicationSlotDropAcquired(SlotIsLogical(MyReplicationSlot));
     930          436 : }
     931              : 
     932              : /*
     933              :  * Change the definition of the slot identified by the specified name.
     934              :  *
     935              :  * Altering the two_phase property of a slot requires caution on the
     936              :  * client-side. Enabling it at any random point during decoding has the
     937              :  * risk that transactions prepared before this change may be skipped by
     938              :  * the decoder, leading to missing prepare records on the client. So, we
     939              :  * enable it for subscription related slots only once the initial tablesync
     940              :  * is finished. See comments atop worker.c. Disabling it is safe only when
     941              :  * there are no pending prepared transaction, otherwise, the changes of
     942              :  * already prepared transactions can be replicated again along with their
     943              :  * corresponding commit leading to duplicate data or errors.
     944              :  */
     945              : void
     946            7 : ReplicationSlotAlter(const char *name, const bool *failover,
     947              :                      const bool *two_phase)
     948              : {
     949            7 :     bool        update_slot = false;
     950              : 
     951              :     Assert(MyReplicationSlot == NULL);
     952              :     Assert(failover || two_phase);
     953              : 
     954            7 :     ReplicationSlotAcquire(name, false, true);
     955              : 
     956            6 :     if (SlotIsPhysical(MyReplicationSlot))
     957            0 :         ereport(ERROR,
     958              :                 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     959              :                 errmsg("cannot use %s with a physical replication slot",
     960              :                        "ALTER_REPLICATION_SLOT"));
     961              : 
     962            6 :     if (RecoveryInProgress())
     963              :     {
     964              :         /*
     965              :          * Do not allow users to alter the slots which are currently being
     966              :          * synced from the primary to the standby.
     967              :          */
     968            1 :         if (MyReplicationSlot->data.synced)
     969            1 :             ereport(ERROR,
     970              :                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     971              :                     errmsg("cannot alter replication slot \"%s\"", name),
     972              :                     errdetail("This replication slot is being synchronized from the primary server."));
     973              : 
     974              :         /*
     975              :          * Do not allow users to enable failover on the standby as we do not
     976              :          * support sync to the cascading standby.
     977              :          */
     978            0 :         if (failover && *failover)
     979            0 :             ereport(ERROR,
     980              :                     errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     981              :                     errmsg("cannot enable failover for a replication slot"
     982              :                            " on the standby"));
     983              :     }
     984              : 
     985            5 :     if (failover)
     986              :     {
     987              :         /*
     988              :          * Do not allow users to enable failover for temporary slots as we do
     989              :          * not support syncing temporary slots to the standby.
     990              :          */
     991            4 :         if (*failover && MyReplicationSlot->data.persistency == RS_TEMPORARY)
     992            0 :             ereport(ERROR,
     993              :                     errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     994              :                     errmsg("cannot enable failover for a temporary replication slot"));
     995              : 
     996            4 :         if (MyReplicationSlot->data.failover != *failover)
     997              :         {
     998            4 :             SpinLockAcquire(&MyReplicationSlot->mutex);
     999            4 :             MyReplicationSlot->data.failover = *failover;
    1000            4 :             SpinLockRelease(&MyReplicationSlot->mutex);
    1001              : 
    1002            4 :             update_slot = true;
    1003              :         }
    1004              :     }
    1005              : 
    1006            5 :     if (two_phase && MyReplicationSlot->data.two_phase != *two_phase)
    1007              :     {
    1008            1 :         SpinLockAcquire(&MyReplicationSlot->mutex);
    1009            1 :         MyReplicationSlot->data.two_phase = *two_phase;
    1010            1 :         SpinLockRelease(&MyReplicationSlot->mutex);
    1011              : 
    1012            1 :         update_slot = true;
    1013              :     }
    1014              : 
    1015            5 :     if (update_slot)
    1016              :     {
    1017            5 :         ReplicationSlotMarkDirty();
    1018            5 :         ReplicationSlotSave();
    1019              :     }
    1020              : 
    1021            5 :     ReplicationSlotRelease();
    1022            5 : }
    1023              : 
    1024              : /*
    1025              :  * Permanently drop the currently acquired replication slot.
    1026              :  *
    1027              :  * If caller requests it, have checkpointer attempt to disable logical
    1028              :  * decoding.  Obviously, this should only be done if the slot is logical.
    1029              :  */
    1030              : void
    1031          459 : ReplicationSlotDropAcquired(bool try_disable)
    1032              : {
    1033              :     ReplicationSlot *slot;
    1034              : 
    1035              :     Assert(MyReplicationSlot != NULL);
    1036          459 :     slot = MyReplicationSlot;
    1037              : 
    1038              :     /* Can only disable logical decoding if slot is logical */
    1039              :     Assert(!try_disable || SlotIsLogical(slot));
    1040              : 
    1041              :     /* slot isn't acquired anymore */
    1042          459 :     MyReplicationSlot = NULL;
    1043              : 
    1044          459 :     ReplicationSlotDropPtr(slot);
    1045              : 
    1046          459 :     if (try_disable)
    1047          431 :         RequestDisableLogicalDecoding();
    1048          459 : }
    1049              : 
    1050              : /*
    1051              :  * Permanently drop the replication slot which will be released by the point
    1052              :  * this function returns.
    1053              :  */
    1054              : static void
    1055          619 : ReplicationSlotDropPtr(ReplicationSlot *slot)
    1056              : {
    1057              :     char        path[MAXPGPATH];
    1058              :     char        tmppath[MAXPGPATH];
    1059              : 
    1060              :     /*
    1061              :      * If some other backend ran this code concurrently with us, we might try
    1062              :      * to delete a slot with a certain name while someone else was trying to
    1063              :      * create a slot with the same name.
    1064              :      */
    1065          619 :     LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
    1066              : 
    1067              :     /* Generate pathnames. */
    1068          619 :     sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(slot->data.name));
    1069          619 :     sprintf(tmppath, "%s/%s.tmp", PG_REPLSLOT_DIR, NameStr(slot->data.name));
    1070              : 
    1071              :     /*
    1072              :      * Rename the slot directory on disk, so that we'll no longer recognize
    1073              :      * this as a valid slot.  Note that if this fails, we've got to mark the
    1074              :      * slot inactive before bailing out.  If we're dropping an ephemeral or a
    1075              :      * temporary slot, we better never fail hard as the caller won't expect
    1076              :      * the slot to survive and this might get called during error handling.
    1077              :      */
    1078          619 :     if (rename(path, tmppath) == 0)
    1079              :     {
    1080              :         /*
    1081              :          * We need to fsync() the directory we just renamed and its parent to
    1082              :          * make sure that our changes are on disk in a crash-safe fashion.  If
    1083              :          * fsync() fails, we can't be sure whether the changes are on disk or
    1084              :          * not.  For now, we handle that by panicking;
    1085              :          * StartupReplicationSlots() will try to straighten it out after
    1086              :          * restart.
    1087              :          */
    1088          619 :         START_CRIT_SECTION();
    1089          619 :         fsync_fname(tmppath, true);
    1090          619 :         fsync_fname(PG_REPLSLOT_DIR, true);
    1091          619 :         END_CRIT_SECTION();
    1092              :     }
    1093              :     else
    1094              :     {
    1095            0 :         bool        fail_softly = slot->data.persistency != RS_PERSISTENT;
    1096              : 
    1097            0 :         SpinLockAcquire(&slot->mutex);
    1098            0 :         slot->active_proc = INVALID_PROC_NUMBER;
    1099            0 :         SpinLockRelease(&slot->mutex);
    1100              : 
    1101              :         /* wake up anyone waiting on this slot */
    1102            0 :         ConditionVariableBroadcast(&slot->active_cv);
    1103              : 
    1104            0 :         ereport(fail_softly ? WARNING : ERROR,
    1105              :                 (errcode_for_file_access(),
    1106              :                  errmsg("could not rename file \"%s\" to \"%s\": %m",
    1107              :                         path, tmppath)));
    1108              :     }
    1109              : 
    1110              :     /*
    1111              :      * The slot is definitely gone.  Lock out concurrent scans of the array
    1112              :      * long enough to kill it.  It's OK to clear the active PID here without
    1113              :      * grabbing the mutex because nobody else can be scanning the array here,
    1114              :      * and nobody can be attached to this slot and thus access it without
    1115              :      * scanning the array.
    1116              :      *
    1117              :      * Also wake up processes waiting for it.
    1118              :      */
    1119          619 :     LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
    1120          619 :     slot->active_proc = INVALID_PROC_NUMBER;
    1121          619 :     slot->in_use = false;
    1122          619 :     LWLockRelease(ReplicationSlotControlLock);
    1123          619 :     ConditionVariableBroadcast(&slot->active_cv);
    1124              : 
    1125              :     /*
    1126              :      * Slot is dead and doesn't prevent resource removal anymore, recompute
    1127              :      * limits.
    1128              :      */
    1129          619 :     ReplicationSlotsComputeRequiredXmin(false);
    1130          619 :     ReplicationSlotsComputeRequiredLSN();
    1131              : 
    1132              :     /*
    1133              :      * If removing the directory fails, the worst thing that will happen is
    1134              :      * that the user won't be able to create a new slot with the same name
    1135              :      * until the next server restart.  We warn about it, but that's all.
    1136              :      */
    1137          619 :     if (!rmtree(tmppath, true))
    1138            0 :         ereport(WARNING,
    1139              :                 (errmsg("could not remove directory \"%s\"", tmppath)));
    1140              : 
    1141              :     /*
    1142              :      * Drop the statistics entry for the replication slot.  Do this while
    1143              :      * holding ReplicationSlotAllocationLock so that we don't drop a
    1144              :      * statistics entry for another slot with the same name just created in
    1145              :      * another session.
    1146              :      */
    1147          619 :     if (SlotIsLogical(slot))
    1148          448 :         pgstat_drop_replslot(slot);
    1149              : 
    1150              :     /*
    1151              :      * We release this at the very end, so that nobody starts trying to create
    1152              :      * a slot while we're still cleaning up the detritus of the old one.
    1153              :      */
    1154          619 :     LWLockRelease(ReplicationSlotAllocationLock);
    1155          619 : }
    1156              : 
    1157              : /*
    1158              :  * Serialize the currently acquired slot's state from memory to disk, thereby
    1159              :  * guaranteeing the current state will survive a crash.
    1160              :  */
    1161              : void
    1162         1520 : ReplicationSlotSave(void)
    1163              : {
    1164              :     char        path[MAXPGPATH];
    1165              : 
    1166              :     Assert(MyReplicationSlot != NULL);
    1167              : 
    1168         1520 :     sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(MyReplicationSlot->data.name));
    1169         1520 :     SaveSlotToPath(MyReplicationSlot, path, ERROR);
    1170         1520 : }
    1171              : 
    1172              : /*
    1173              :  * Signal that it would be useful if the currently acquired slot would be
    1174              :  * flushed out to disk.
    1175              :  *
    1176              :  * Note that the actual flush to disk can be delayed for a long time, if
    1177              :  * required for correctness explicitly do a ReplicationSlotSave().
    1178              :  */
    1179              : void
    1180        43163 : ReplicationSlotMarkDirty(void)
    1181              : {
    1182        43163 :     ReplicationSlot *slot = MyReplicationSlot;
    1183              : 
    1184              :     Assert(MyReplicationSlot != NULL);
    1185              : 
    1186        43163 :     SpinLockAcquire(&slot->mutex);
    1187        43163 :     MyReplicationSlot->just_dirtied = true;
    1188        43163 :     MyReplicationSlot->dirty = true;
    1189        43163 :     SpinLockRelease(&slot->mutex);
    1190        43163 : }
    1191              : 
    1192              : /*
    1193              :  * Convert a slot that's marked as RS_EPHEMERAL or RS_TEMPORARY to a
    1194              :  * RS_PERSISTENT slot, guaranteeing it will be there after an eventual crash.
    1195              :  */
    1196              : void
    1197          496 : ReplicationSlotPersist(void)
    1198              : {
    1199          496 :     ReplicationSlot *slot = MyReplicationSlot;
    1200              : 
    1201              :     Assert(slot != NULL);
    1202              :     Assert(slot->data.persistency != RS_PERSISTENT);
    1203              : 
    1204          496 :     SpinLockAcquire(&slot->mutex);
    1205          496 :     slot->data.persistency = RS_PERSISTENT;
    1206          496 :     SpinLockRelease(&slot->mutex);
    1207              : 
    1208          496 :     ReplicationSlotMarkDirty();
    1209          496 :     ReplicationSlotSave();
    1210          496 : }
    1211              : 
    1212              : /*
    1213              :  * Compute the oldest xmin across all slots and store it in the ProcArray.
    1214              :  *
    1215              :  * If already_locked is true, both the ReplicationSlotControlLock and the
    1216              :  * ProcArrayLock have already been acquired exclusively. It is crucial that the
    1217              :  * caller first acquires the ReplicationSlotControlLock, followed by the
    1218              :  * ProcArrayLock, to prevent any undetectable deadlocks since this function
    1219              :  * acquires them in that order.
    1220              :  */
    1221              : void
    1222         2664 : ReplicationSlotsComputeRequiredXmin(bool already_locked)
    1223              : {
    1224              :     int         i;
    1225         2664 :     TransactionId agg_xmin = InvalidTransactionId;
    1226         2664 :     TransactionId agg_catalog_xmin = InvalidTransactionId;
    1227              : 
    1228              :     Assert(ReplicationSlotCtl != NULL);
    1229              :     Assert(!already_locked ||
    1230              :            (LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_EXCLUSIVE) &&
    1231              :             LWLockHeldByMeInMode(ProcArrayLock, LW_EXCLUSIVE)));
    1232              : 
    1233              :     /*
    1234              :      * Hold the ReplicationSlotControlLock until after updating the slot xmin
    1235              :      * values, so no backend updates the initial xmin for newly created slot
    1236              :      * concurrently. A shared lock is used here to minimize lock contention,
    1237              :      * especially when many slots exist and advancements occur frequently.
    1238              :      * This is safe since an exclusive lock is taken during initial slot xmin
    1239              :      * update in slot creation.
    1240              :      *
    1241              :      * One might think that we can hold the ProcArrayLock exclusively and
    1242              :      * update the slot xmin values, but it could increase lock contention on
    1243              :      * the ProcArrayLock, which is not great since this function can be called
    1244              :      * at non-negligible frequency.
    1245              :      *
    1246              :      * Concurrent invocation of this function may cause the computed slot xmin
    1247              :      * to regress. However, this is harmless because tuples prior to the most
    1248              :      * recent xmin are no longer useful once advancement occurs (see
    1249              :      * LogicalConfirmReceivedLocation where the slot's xmin value is flushed
    1250              :      * before updating the effective_xmin). Thus, such regression merely
    1251              :      * prevents VACUUM from prematurely removing tuples without causing the
    1252              :      * early deletion of required data.
    1253              :      */
    1254         2664 :     if (!already_locked)
    1255         2143 :         LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    1256              : 
    1257        40487 :     for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
    1258              :     {
    1259        37823 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
    1260              :         TransactionId effective_xmin;
    1261              :         TransactionId effective_catalog_xmin;
    1262              :         bool        invalidated;
    1263              : 
    1264        37823 :         if (!s->in_use)
    1265        35271 :             continue;
    1266              : 
    1267         2552 :         SpinLockAcquire(&s->mutex);
    1268         2552 :         effective_xmin = s->effective_xmin;
    1269         2552 :         effective_catalog_xmin = s->effective_catalog_xmin;
    1270         2552 :         invalidated = s->data.invalidated != RS_INVAL_NONE;
    1271         2552 :         SpinLockRelease(&s->mutex);
    1272              : 
    1273              :         /* invalidated slots need not apply */
    1274         2552 :         if (invalidated)
    1275           26 :             continue;
    1276              : 
    1277              :         /* check the data xmin */
    1278         2526 :         if (TransactionIdIsValid(effective_xmin) &&
    1279            7 :             (!TransactionIdIsValid(agg_xmin) ||
    1280            7 :              TransactionIdPrecedes(effective_xmin, agg_xmin)))
    1281          355 :             agg_xmin = effective_xmin;
    1282              : 
    1283              :         /* check the catalog xmin */
    1284         2526 :         if (TransactionIdIsValid(effective_catalog_xmin) &&
    1285         1058 :             (!TransactionIdIsValid(agg_catalog_xmin) ||
    1286         1058 :              TransactionIdPrecedes(effective_catalog_xmin, agg_catalog_xmin)))
    1287         1312 :             agg_catalog_xmin = effective_catalog_xmin;
    1288              :     }
    1289              : 
    1290         2664 :     ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
    1291              : 
    1292         2664 :     if (!already_locked)
    1293         2143 :         LWLockRelease(ReplicationSlotControlLock);
    1294         2664 : }
    1295              : 
    1296              : /*
    1297              :  * Compute the oldest restart LSN across all slots and inform xlog module.
    1298              :  *
    1299              :  * Note: while max_slot_wal_keep_size is theoretically relevant for this
    1300              :  * purpose, we don't try to account for that, because this module doesn't
    1301              :  * know what to compare against.
    1302              :  */
    1303              : void
    1304        44218 : ReplicationSlotsComputeRequiredLSN(void)
    1305              : {
    1306              :     int         i;
    1307        44218 :     XLogRecPtr  min_required = InvalidXLogRecPtr;
    1308              : 
    1309              :     Assert(ReplicationSlotCtl != NULL);
    1310              : 
    1311        44218 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    1312       703221 :     for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
    1313              :     {
    1314       659003 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
    1315              :         XLogRecPtr  restart_lsn;
    1316              :         XLogRecPtr  last_saved_restart_lsn;
    1317              :         bool        invalidated;
    1318              :         ReplicationSlotPersistency persistency;
    1319              : 
    1320       659003 :         if (!s->in_use)
    1321       614231 :             continue;
    1322              : 
    1323        44772 :         SpinLockAcquire(&s->mutex);
    1324        44772 :         persistency = s->data.persistency;
    1325        44772 :         restart_lsn = s->data.restart_lsn;
    1326        44772 :         invalidated = s->data.invalidated != RS_INVAL_NONE;
    1327        44772 :         last_saved_restart_lsn = s->last_saved_restart_lsn;
    1328        44772 :         SpinLockRelease(&s->mutex);
    1329              : 
    1330              :         /* invalidated slots need not apply */
    1331        44772 :         if (invalidated)
    1332           27 :             continue;
    1333              : 
    1334              :         /*
    1335              :          * For persistent slot use last_saved_restart_lsn to compute the
    1336              :          * oldest LSN for removal of WAL segments.  The segments between
    1337              :          * last_saved_restart_lsn and restart_lsn might be needed by a
    1338              :          * persistent slot in the case of database crash.  Non-persistent
    1339              :          * slots can't survive the database crash, so we don't care about
    1340              :          * last_saved_restart_lsn for them.
    1341              :          */
    1342        44745 :         if (persistency == RS_PERSISTENT)
    1343              :         {
    1344        43745 :             if (XLogRecPtrIsValid(last_saved_restart_lsn) &&
    1345              :                 restart_lsn > last_saved_restart_lsn)
    1346              :             {
    1347        40620 :                 restart_lsn = last_saved_restart_lsn;
    1348              :             }
    1349              :         }
    1350              : 
    1351        44745 :         if (XLogRecPtrIsValid(restart_lsn) &&
    1352         1718 :             (!XLogRecPtrIsValid(min_required) ||
    1353              :              restart_lsn < min_required))
    1354        43212 :             min_required = restart_lsn;
    1355              :     }
    1356        44218 :     LWLockRelease(ReplicationSlotControlLock);
    1357              : 
    1358        44218 :     XLogSetReplicationSlotMinimumLSN(min_required);
    1359        44218 : }
    1360              : 
    1361              : /*
    1362              :  * Compute the oldest WAL LSN required by *logical* decoding slots..
    1363              :  *
    1364              :  * Returns InvalidXLogRecPtr if logical decoding is disabled or no logical
    1365              :  * slots exist.
    1366              :  *
    1367              :  * NB: this returns a value >= ReplicationSlotsComputeRequiredLSN(), since it
    1368              :  * ignores physical replication slots.
    1369              :  *
    1370              :  * The results aren't required frequently, so we don't maintain a precomputed
    1371              :  * value like we do for ComputeRequiredLSN() and ComputeRequiredXmin().
    1372              :  */
    1373              : XLogRecPtr
    1374         3896 : ReplicationSlotsComputeLogicalRestartLSN(void)
    1375              : {
    1376         3896 :     XLogRecPtr  result = InvalidXLogRecPtr;
    1377              :     int         i;
    1378              : 
    1379         3896 :     if (max_replication_slots + max_repack_replication_slots <= 0)
    1380            0 :         return InvalidXLogRecPtr;
    1381              : 
    1382         3896 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    1383              : 
    1384        61774 :     for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
    1385              :     {
    1386              :         ReplicationSlot *s;
    1387              :         XLogRecPtr  restart_lsn;
    1388              :         XLogRecPtr  last_saved_restart_lsn;
    1389              :         bool        invalidated;
    1390              :         ReplicationSlotPersistency persistency;
    1391              : 
    1392        57878 :         s = &ReplicationSlotCtl->replication_slots[i];
    1393              : 
    1394              :         /* cannot change while ReplicationSlotCtlLock is held */
    1395        57878 :         if (!s->in_use)
    1396        57052 :             continue;
    1397              : 
    1398              :         /* we're only interested in logical slots */
    1399          826 :         if (!SlotIsLogical(s))
    1400          564 :             continue;
    1401              : 
    1402              :         /* read once, it's ok if it increases while we're checking */
    1403          262 :         SpinLockAcquire(&s->mutex);
    1404          262 :         persistency = s->data.persistency;
    1405          262 :         restart_lsn = s->data.restart_lsn;
    1406          262 :         invalidated = s->data.invalidated != RS_INVAL_NONE;
    1407          262 :         last_saved_restart_lsn = s->last_saved_restart_lsn;
    1408          262 :         SpinLockRelease(&s->mutex);
    1409              : 
    1410              :         /* invalidated slots need not apply */
    1411          262 :         if (invalidated)
    1412           10 :             continue;
    1413              : 
    1414              :         /*
    1415              :          * For persistent slot use last_saved_restart_lsn to compute the
    1416              :          * oldest LSN for removal of WAL segments.  The segments between
    1417              :          * last_saved_restart_lsn and restart_lsn might be needed by a
    1418              :          * persistent slot in the case of database crash.  Non-persistent
    1419              :          * slots can't survive the database crash, so we don't care about
    1420              :          * last_saved_restart_lsn for them.
    1421              :          */
    1422          252 :         if (persistency == RS_PERSISTENT)
    1423              :         {
    1424          250 :             if (XLogRecPtrIsValid(last_saved_restart_lsn) &&
    1425              :                 restart_lsn > last_saved_restart_lsn)
    1426              :             {
    1427            0 :                 restart_lsn = last_saved_restart_lsn;
    1428              :             }
    1429              :         }
    1430              : 
    1431          252 :         if (!XLogRecPtrIsValid(restart_lsn))
    1432            0 :             continue;
    1433              : 
    1434          252 :         if (!XLogRecPtrIsValid(result) ||
    1435              :             restart_lsn < result)
    1436          200 :             result = restart_lsn;
    1437              :     }
    1438              : 
    1439         3896 :     LWLockRelease(ReplicationSlotControlLock);
    1440              : 
    1441         3896 :     return result;
    1442              : }
    1443              : 
    1444              : /*
    1445              :  * ReplicationSlotsCountDBSlots -- count the number of slots that refer to the
    1446              :  * passed database oid.
    1447              :  *
    1448              :  * Returns true if there are any slots referencing the database. *nslots will
    1449              :  * be set to the absolute number of slots in the database, *nactive to ones
    1450              :  * currently active.
    1451              :  */
    1452              : bool
    1453           52 : ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
    1454              : {
    1455              :     int         i;
    1456              : 
    1457           52 :     *nslots = *nactive = 0;
    1458              : 
    1459           52 :     if (max_replication_slots + max_repack_replication_slots <= 0)
    1460            0 :         return false;
    1461              : 
    1462           52 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    1463          805 :     for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
    1464              :     {
    1465              :         ReplicationSlot *s;
    1466              : 
    1467          753 :         s = &ReplicationSlotCtl->replication_slots[i];
    1468              : 
    1469              :         /* cannot change while ReplicationSlotCtlLock is held */
    1470          753 :         if (!s->in_use)
    1471          732 :             continue;
    1472              : 
    1473              :         /* only logical slots are database specific, skip */
    1474           21 :         if (!SlotIsLogical(s))
    1475           10 :             continue;
    1476              : 
    1477              :         /* not our database, skip */
    1478           11 :         if (s->data.database != dboid)
    1479            8 :             continue;
    1480              : 
    1481              :         /* NB: intentionally counting invalidated slots */
    1482              : 
    1483              :         /* count slots with spinlock held */
    1484            3 :         SpinLockAcquire(&s->mutex);
    1485            3 :         (*nslots)++;
    1486            3 :         if (s->active_proc != INVALID_PROC_NUMBER)
    1487            1 :             (*nactive)++;
    1488            3 :         SpinLockRelease(&s->mutex);
    1489              :     }
    1490           52 :     LWLockRelease(ReplicationSlotControlLock);
    1491              : 
    1492           52 :     if (*nslots > 0)
    1493            3 :         return true;
    1494           49 :     return false;
    1495              : }
    1496              : 
    1497              : /*
    1498              :  * ReplicationSlotsDropDBSlots -- Drop all db-specific slots relating to the
    1499              :  * passed database oid. The caller should hold an exclusive lock on the
    1500              :  * pg_database oid for the database to prevent creation of new slots on the db
    1501              :  * or replay from existing slots.
    1502              :  *
    1503              :  * Another session that concurrently acquires an existing slot on the target DB
    1504              :  * (most likely to drop it) may cause this function to ERROR. If that happens
    1505              :  * it may have dropped some but not all slots.
    1506              :  *
    1507              :  * This routine isn't as efficient as it could be - but we don't drop
    1508              :  * databases often, especially databases with lots of slots.
    1509              :  *
    1510              :  * If the last logical slot in the cluster is dropped, request to disable
    1511              :  * logical decoding.
    1512              :  */
    1513              : void
    1514           66 : ReplicationSlotsDropDBSlots(Oid dboid)
    1515              : {
    1516              :     int         i;
    1517              :     bool        found_valid_logicalslot;
    1518           66 :     bool        dropped = false;
    1519              : 
    1520           66 :     if (max_replication_slots + max_repack_replication_slots <= 0)
    1521            0 :         return;
    1522              : 
    1523           66 : restart:
    1524           71 :     found_valid_logicalslot = false;
    1525           71 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    1526         1021 :     for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
    1527              :     {
    1528              :         ReplicationSlot *s;
    1529              :         char       *slotname;
    1530              :         ProcNumber  active_proc;
    1531              : 
    1532          955 :         s = &ReplicationSlotCtl->replication_slots[i];
    1533              : 
    1534              :         /* cannot change while ReplicationSlotCtlLock is held */
    1535          955 :         if (!s->in_use)
    1536          925 :             continue;
    1537              : 
    1538              :         /* only logical slots are database specific, skip */
    1539           30 :         if (!SlotIsLogical(s))
    1540           11 :             continue;
    1541              : 
    1542              :         /*
    1543              :          * Check logical slots on other databases too so we can disable
    1544              :          * logical decoding only if no slots in the cluster.
    1545              :          */
    1546           19 :         SpinLockAcquire(&s->mutex);
    1547           19 :         found_valid_logicalslot |= (s->data.invalidated == RS_INVAL_NONE);
    1548           19 :         SpinLockRelease(&s->mutex);
    1549              : 
    1550              :         /* not our database, skip */
    1551           19 :         if (s->data.database != dboid)
    1552           14 :             continue;
    1553              : 
    1554              :         /* NB: intentionally including invalidated slots to drop */
    1555              : 
    1556              :         /* acquire slot, so ReplicationSlotDropAcquired can be reused  */
    1557            5 :         SpinLockAcquire(&s->mutex);
    1558              :         /* can't change while ReplicationSlotControlLock is held */
    1559            5 :         slotname = NameStr(s->data.name);
    1560            5 :         active_proc = s->active_proc;
    1561            5 :         if (active_proc == INVALID_PROC_NUMBER)
    1562              :         {
    1563            5 :             MyReplicationSlot = s;
    1564            5 :             s->active_proc = MyProcNumber;
    1565              :         }
    1566            5 :         SpinLockRelease(&s->mutex);
    1567              : 
    1568              :         /*
    1569              :          * Even though we hold an exclusive lock on the database object a
    1570              :          * logical slot for that DB can still be active, e.g. if it's
    1571              :          * concurrently being dropped by a backend connected to another DB.
    1572              :          *
    1573              :          * That's fairly unlikely in practice, so we'll just bail out.
    1574              :          *
    1575              :          * The slot sync worker holds a shared lock on the database before
    1576              :          * operating on synced logical slots to avoid conflict with the drop
    1577              :          * happening here. The persistent synced slots are thus safe but there
    1578              :          * is a possibility that the slot sync worker has created a temporary
    1579              :          * slot (which stays active even on release) and we are trying to drop
    1580              :          * that here. In practice, the chances of hitting this scenario are
    1581              :          * less as during slot synchronization, the temporary slot is
    1582              :          * immediately converted to persistent and thus is safe due to the
    1583              :          * shared lock taken on the database. So, we'll just bail out in such
    1584              :          * a case.
    1585              :          *
    1586              :          * XXX: We can consider shutting down the slot sync worker before
    1587              :          * trying to drop synced temporary slots here.
    1588              :          */
    1589            5 :         if (active_proc != INVALID_PROC_NUMBER)
    1590            0 :             ereport(ERROR,
    1591              :                     (errcode(ERRCODE_OBJECT_IN_USE),
    1592              :                      errmsg("replication slot \"%s\" is active for PID %d",
    1593              :                             slotname, GetPGProcByNumber(active_proc)->pid)));
    1594              : 
    1595              :         /*
    1596              :          * To avoid duplicating ReplicationSlotDropAcquired() and to avoid
    1597              :          * holding ReplicationSlotControlLock over filesystem operations,
    1598              :          * release ReplicationSlotControlLock and use
    1599              :          * ReplicationSlotDropAcquired.
    1600              :          *
    1601              :          * As that means the set of slots could change, restart scan from the
    1602              :          * beginning each time we release the lock.
    1603              :          */
    1604            5 :         LWLockRelease(ReplicationSlotControlLock);
    1605            5 :         ReplicationSlotDropAcquired(false);
    1606            5 :         dropped = true;
    1607            5 :         goto restart;
    1608              :     }
    1609           66 :     LWLockRelease(ReplicationSlotControlLock);
    1610              : 
    1611           66 :     if (dropped && !found_valid_logicalslot)
    1612            0 :         RequestDisableLogicalDecoding();
    1613              : }
    1614              : 
    1615              : /*
    1616              :  * Returns true if there is at least one in-use valid logical replication slot.
    1617              :  */
    1618              : bool
    1619          505 : CheckLogicalSlotExists(void)
    1620              : {
    1621          505 :     bool        found = false;
    1622              : 
    1623          505 :     if (max_replication_slots + max_repack_replication_slots <= 0)
    1624            0 :         return false;
    1625              : 
    1626          505 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    1627         7942 :     for (int i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
    1628              :     {
    1629              :         ReplicationSlot *s;
    1630              :         bool        invalidated;
    1631              : 
    1632         7444 :         s = &ReplicationSlotCtl->replication_slots[i];
    1633              : 
    1634              :         /* cannot change while ReplicationSlotCtlLock is held */
    1635         7444 :         if (!s->in_use)
    1636         7414 :             continue;
    1637              : 
    1638           30 :         if (SlotIsPhysical(s))
    1639           20 :             continue;
    1640              : 
    1641           10 :         SpinLockAcquire(&s->mutex);
    1642           10 :         invalidated = s->data.invalidated != RS_INVAL_NONE;
    1643           10 :         SpinLockRelease(&s->mutex);
    1644              : 
    1645           10 :         if (invalidated)
    1646            3 :             continue;
    1647              : 
    1648            7 :         found = true;
    1649            7 :         break;
    1650              :     }
    1651          505 :     LWLockRelease(ReplicationSlotControlLock);
    1652              : 
    1653          505 :     return found;
    1654              : }
    1655              : 
    1656              : /*
    1657              :  * Check whether the server's configuration supports using replication
    1658              :  * slots.
    1659              :  */
    1660              : void
    1661         1925 : CheckSlotRequirements(bool repack)
    1662              : {
    1663              :     /*
    1664              :      * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
    1665              :      * needs the same check.
    1666              :      */
    1667              : 
    1668         1925 :     if (!repack && max_replication_slots == 0)
    1669            0 :         ereport(ERROR,
    1670              :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1671              :                 errmsg("replication slots can only be used if \"%s\" > 0",
    1672              :                        "max_replication_slots"));
    1673              : 
    1674         1925 :     if (repack && max_repack_replication_slots == 0)
    1675            0 :         ereport(ERROR,
    1676              :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1677              :                 errmsg("REPACK can only be used if \"%s\" > 0",
    1678              :                        "max_repack_replication_slots"));
    1679              : 
    1680         1925 :     if (wal_level < WAL_LEVEL_REPLICA)
    1681            0 :         ereport(ERROR,
    1682              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1683              :                  errmsg("replication slots can only be used if \"wal_level\" >= \"replica\"")));
    1684         1925 : }
    1685              : 
    1686              : /*
    1687              :  * Check whether the user has privilege to use replication slots.
    1688              :  */
    1689              : void
    1690          605 : CheckSlotPermissions(void)
    1691              : {
    1692          605 :     if (!has_rolreplication(GetUserId()))
    1693            5 :         ereport(ERROR,
    1694              :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    1695              :                  errmsg("permission denied to use replication slots"),
    1696              :                  errdetail("Only roles with the %s attribute may use replication slots.",
    1697              :                            "REPLICATION")));
    1698          600 : }
    1699              : 
    1700              : /*
    1701              :  * Reserve WAL for the currently active slot.
    1702              :  *
    1703              :  * Compute and set restart_lsn in a manner that's appropriate for the type of
    1704              :  * the slot and concurrency safe.
    1705              :  */
    1706              : void
    1707          671 : ReplicationSlotReserveWal(void)
    1708              : {
    1709          671 :     ReplicationSlot *slot = MyReplicationSlot;
    1710              :     XLogSegNo   segno;
    1711              :     XLogRecPtr  restart_lsn;
    1712              : 
    1713              :     Assert(slot != NULL);
    1714              :     Assert(!XLogRecPtrIsValid(slot->data.restart_lsn));
    1715              :     Assert(!XLogRecPtrIsValid(slot->last_saved_restart_lsn));
    1716              : 
    1717              :     /*
    1718              :      * The replication slot mechanism is used to prevent the removal of
    1719              :      * required WAL.
    1720              :      *
    1721              :      * Acquire an exclusive lock to prevent the checkpoint process from
    1722              :      * concurrently computing the minimum slot LSN (see
    1723              :      * CheckPointReplicationSlots). This ensures that the WAL reserved for
    1724              :      * replication cannot be removed during a checkpoint.
    1725              :      *
    1726              :      * The mechanism is reliable because if WAL reservation occurs first, the
    1727              :      * checkpoint must wait for the restart_lsn update before determining the
    1728              :      * minimum non-removable LSN. On the other hand, if the checkpoint happens
    1729              :      * first, subsequent WAL reservations will select positions at or beyond
    1730              :      * the redo pointer of that checkpoint.
    1731              :      */
    1732          671 :     LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
    1733              : 
    1734              :     /*
    1735              :      * For logical slots log a standby snapshot and start logical decoding at
    1736              :      * exactly that position. That allows the slot to start up more quickly.
    1737              :      * But on a standby we cannot do WAL writes, so just use the replay
    1738              :      * pointer; effectively, an attempt to create a logical slot on standby
    1739              :      * will cause it to wait for an xl_running_xact record to be logged
    1740              :      * independently on the primary, so that a snapshot can be built using the
    1741              :      * record.
    1742              :      *
    1743              :      * None of this is needed (or indeed helpful) for physical slots as
    1744              :      * they'll start replay at the last logged checkpoint anyway. Instead,
    1745              :      * return the location of the last redo LSN, where a base backup has to
    1746              :      * start replay at.
    1747              :      */
    1748          671 :     if (SlotIsPhysical(slot))
    1749          169 :         restart_lsn = GetRedoRecPtr();
    1750          502 :     else if (RecoveryInProgress())
    1751           28 :         restart_lsn = GetXLogReplayRecPtr(NULL);
    1752              :     else
    1753          474 :         restart_lsn = GetXLogInsertRecPtr();
    1754              : 
    1755          671 :     SpinLockAcquire(&slot->mutex);
    1756          671 :     slot->data.restart_lsn = restart_lsn;
    1757          671 :     SpinLockRelease(&slot->mutex);
    1758              : 
    1759              :     /* prevent WAL removal as fast as possible */
    1760          671 :     ReplicationSlotsComputeRequiredLSN();
    1761              : 
    1762              :     /* Checkpoint shouldn't remove the required WAL. */
    1763          671 :     XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size);
    1764          671 :     if (XLogGetLastRemovedSegno() >= segno)
    1765            0 :         elog(ERROR, "WAL required by replication slot %s has been removed concurrently",
    1766              :              NameStr(slot->data.name));
    1767              : 
    1768          671 :     LWLockRelease(ReplicationSlotAllocationLock);
    1769              : 
    1770          671 :     if (!RecoveryInProgress() && SlotIsLogical(slot))
    1771              :     {
    1772              :         XLogRecPtr  flushptr;
    1773              : 
    1774              :         /* make sure we have enough information to start */
    1775          474 :         flushptr = LogStandbySnapshot();
    1776              : 
    1777              :         /* and make sure it's fsynced to disk */
    1778          474 :         XLogFlush(flushptr);
    1779              :     }
    1780          671 : }
    1781              : 
    1782              : /*
    1783              :  * Report that replication slot needs to be invalidated
    1784              :  */
    1785              : static void
    1786           23 : ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
    1787              :                        bool terminating,
    1788              :                        int pid,
    1789              :                        NameData slotname,
    1790              :                        XLogRecPtr restart_lsn,
    1791              :                        XLogRecPtr oldestLSN,
    1792              :                        TransactionId snapshotConflictHorizon,
    1793              :                        long slot_idle_seconds)
    1794              : {
    1795              :     StringInfoData err_detail;
    1796              :     StringInfoData err_hint;
    1797              : 
    1798           23 :     initStringInfo(&err_detail);
    1799           23 :     initStringInfo(&err_hint);
    1800              : 
    1801           23 :     switch (cause)
    1802              :     {
    1803            7 :         case RS_INVAL_WAL_REMOVED:
    1804              :             {
    1805            7 :                 uint64      ex = oldestLSN - restart_lsn;
    1806              : 
    1807            7 :                 appendStringInfo(&err_detail,
    1808            7 :                                  ngettext("The slot's restart_lsn %X/%08X exceeds the limit by %" PRIu64 " byte.",
    1809              :                                           "The slot's restart_lsn %X/%08X exceeds the limit by %" PRIu64 " bytes.",
    1810              :                                           ex),
    1811            7 :                                  LSN_FORMAT_ARGS(restart_lsn),
    1812              :                                  ex);
    1813              :                 /* translator: %s is a GUC variable name */
    1814            7 :                 appendStringInfo(&err_hint, _("You might need to increase \"%s\"."),
    1815              :                                  "max_slot_wal_keep_size");
    1816            7 :                 break;
    1817              :             }
    1818           12 :         case RS_INVAL_HORIZON:
    1819           12 :             appendStringInfo(&err_detail, _("The slot conflicted with xid horizon %u."),
    1820              :                              snapshotConflictHorizon);
    1821           12 :             break;
    1822              : 
    1823            4 :         case RS_INVAL_WAL_LEVEL:
    1824            4 :             appendStringInfoString(&err_detail, _("Logical decoding on standby requires the primary server to either set \"wal_level\" >= \"logical\" or have at least one logical slot when \"wal_level\" = \"replica\"."));
    1825            4 :             break;
    1826              : 
    1827            0 :         case RS_INVAL_IDLE_TIMEOUT:
    1828              :             {
    1829              :                 /* translator: %s is a GUC variable name */
    1830            0 :                 appendStringInfo(&err_detail, _("The slot's idle time of %lds exceeds the configured \"%s\" duration of %ds."),
    1831              :                                  slot_idle_seconds, "idle_replication_slot_timeout",
    1832              :                                  idle_replication_slot_timeout_secs);
    1833              :                 /* translator: %s is a GUC variable name */
    1834            0 :                 appendStringInfo(&err_hint, _("You might need to increase \"%s\"."),
    1835              :                                  "idle_replication_slot_timeout");
    1836            0 :                 break;
    1837              :             }
    1838              :         case RS_INVAL_NONE:
    1839              :             pg_unreachable();
    1840              :     }
    1841              : 
    1842           23 :     ereport(LOG,
    1843              :             terminating ?
    1844              :             errmsg("terminating process %d to release replication slot \"%s\"",
    1845              :                    pid, NameStr(slotname)) :
    1846              :             errmsg("invalidating obsolete replication slot \"%s\"",
    1847              :                    NameStr(slotname)),
    1848              :             errdetail_internal("%s", err_detail.data),
    1849              :             err_hint.len ? errhint("%s", err_hint.data) : 0);
    1850              : 
    1851           23 :     pfree(err_detail.data);
    1852           23 :     pfree(err_hint.data);
    1853           23 : }
    1854              : 
    1855              : /*
    1856              :  * Can we invalidate an idle replication slot?
    1857              :  *
    1858              :  * Idle timeout invalidation is allowed only when:
    1859              :  *
    1860              :  * 1. Idle timeout is set
    1861              :  * 2. Slot has reserved WAL
    1862              :  * 3. Slot is inactive
    1863              :  * 4. The slot is not being synced from the primary while the server is in
    1864              :  *    recovery. This is because synced slots are always considered to be
    1865              :  *    inactive because they don't perform logical decoding to produce changes.
    1866              :  */
    1867              : static inline bool
    1868          390 : CanInvalidateIdleSlot(ReplicationSlot *s)
    1869              : {
    1870          390 :     return (idle_replication_slot_timeout_secs != 0 &&
    1871            0 :             XLogRecPtrIsValid(s->data.restart_lsn) &&
    1872          390 :             s->inactive_since > 0 &&
    1873            0 :             !(RecoveryInProgress() && s->data.synced));
    1874              : }
    1875              : 
    1876              : /*
    1877              :  * DetermineSlotInvalidationCause - Determine the cause for which a slot
    1878              :  * becomes invalid among the given possible causes.
    1879              :  *
    1880              :  * This function sequentially checks all possible invalidation causes and
    1881              :  * returns the first one for which the slot is eligible for invalidation.
    1882              :  */
    1883              : static ReplicationSlotInvalidationCause
    1884          423 : DetermineSlotInvalidationCause(uint32 possible_causes, ReplicationSlot *s,
    1885              :                                XLogRecPtr oldestLSN, Oid dboid,
    1886              :                                TransactionId snapshotConflictHorizon,
    1887              :                                TimestampTz *inactive_since, TimestampTz now)
    1888              : {
    1889              :     Assert(possible_causes != RS_INVAL_NONE);
    1890              : 
    1891          423 :     if (possible_causes & RS_INVAL_WAL_REMOVED)
    1892              :     {
    1893          397 :         XLogRecPtr  restart_lsn = s->data.restart_lsn;
    1894              : 
    1895          397 :         if (XLogRecPtrIsValid(restart_lsn) &&
    1896              :             restart_lsn < oldestLSN)
    1897            7 :             return RS_INVAL_WAL_REMOVED;
    1898              :     }
    1899              : 
    1900          416 :     if (possible_causes & RS_INVAL_HORIZON)
    1901              :     {
    1902              :         /* invalid DB oid signals a shared relation */
    1903           22 :         if (SlotIsLogical(s) &&
    1904           17 :             (dboid == InvalidOid || dboid == s->data.database))
    1905              :         {
    1906           22 :             TransactionId effective_xmin = s->effective_xmin;
    1907           22 :             TransactionId catalog_effective_xmin = s->effective_catalog_xmin;
    1908              : 
    1909           22 :             if (TransactionIdIsValid(effective_xmin) &&
    1910            0 :                 TransactionIdPrecedesOrEquals(effective_xmin,
    1911              :                                               snapshotConflictHorizon))
    1912            0 :                 return RS_INVAL_HORIZON;
    1913           44 :             else if (TransactionIdIsValid(catalog_effective_xmin) &&
    1914           22 :                      TransactionIdPrecedesOrEquals(catalog_effective_xmin,
    1915              :                                                    snapshotConflictHorizon))
    1916           12 :                 return RS_INVAL_HORIZON;
    1917              :         }
    1918              :     }
    1919              : 
    1920          404 :     if (possible_causes & RS_INVAL_WAL_LEVEL)
    1921              :     {
    1922            4 :         if (SlotIsLogical(s))
    1923            4 :             return RS_INVAL_WAL_LEVEL;
    1924              :     }
    1925              : 
    1926          400 :     if (possible_causes & RS_INVAL_IDLE_TIMEOUT)
    1927              :     {
    1928              :         Assert(now > 0);
    1929              : 
    1930          390 :         if (CanInvalidateIdleSlot(s))
    1931              :         {
    1932              :             /*
    1933              :              * Simulate the invalidation due to idle_timeout to test the
    1934              :              * timeout behavior promptly, without waiting for it to trigger
    1935              :              * naturally.
    1936              :              */
    1937              : #ifdef USE_INJECTION_POINTS
    1938            0 :             if (IS_INJECTION_POINT_ATTACHED("slot-timeout-inval"))
    1939              :             {
    1940            0 :                 *inactive_since = 0;    /* since the beginning of time */
    1941            0 :                 return RS_INVAL_IDLE_TIMEOUT;
    1942              :             }
    1943              : #endif
    1944              : 
    1945              :             /*
    1946              :              * Check if the slot needs to be invalidated due to
    1947              :              * idle_replication_slot_timeout GUC.
    1948              :              */
    1949            0 :             if (TimestampDifferenceExceedsSeconds(s->inactive_since, now,
    1950              :                                                   idle_replication_slot_timeout_secs))
    1951              :             {
    1952            0 :                 *inactive_since = s->inactive_since;
    1953            0 :                 return RS_INVAL_IDLE_TIMEOUT;
    1954              :             }
    1955              :         }
    1956              :     }
    1957              : 
    1958          400 :     return RS_INVAL_NONE;
    1959              : }
    1960              : 
    1961              : /*
    1962              :  * Helper for InvalidateObsoleteReplicationSlots
    1963              :  *
    1964              :  * Acquires the given slot and mark it invalid, if necessary and possible.
    1965              :  *
    1966              :  * Returns true if the slot was invalidated.
    1967              :  *
    1968              :  * Set *released_lock_out if ReplicationSlotControlLock was released in the
    1969              :  * interim (and in that case we're not holding the lock at return, otherwise
    1970              :  * we are).
    1971              :  *
    1972              :  * This is inherently racy, because we release the LWLock
    1973              :  * for syscalls, so caller must restart if we return true.
    1974              :  */
    1975              : static bool
    1976          462 : InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
    1977              :                                ReplicationSlot *s,
    1978              :                                XLogRecPtr oldestLSN,
    1979              :                                Oid dboid, TransactionId snapshotConflictHorizon,
    1980              :                                bool *released_lock_out)
    1981              : {
    1982          462 :     int         last_signaled_pid = 0;
    1983          462 :     bool        released_lock = false;
    1984          462 :     bool        invalidated = false;
    1985          462 :     TimestampTz inactive_since = 0;
    1986              : 
    1987              :     for (;;)
    1988            7 :     {
    1989              :         XLogRecPtr  restart_lsn;
    1990              :         NameData    slotname;
    1991              :         ProcNumber  active_proc;
    1992          469 :         int         active_pid = 0;
    1993          469 :         ReplicationSlotInvalidationCause invalidation_cause = RS_INVAL_NONE;
    1994          469 :         TimestampTz now = 0;
    1995          469 :         long        slot_idle_secs = 0;
    1996              : 
    1997              :         Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
    1998              : 
    1999          469 :         if (!s->in_use)
    2000              :         {
    2001            0 :             if (released_lock)
    2002            0 :                 LWLockRelease(ReplicationSlotControlLock);
    2003            0 :             break;
    2004              :         }
    2005              : 
    2006          469 :         if (possible_causes & RS_INVAL_IDLE_TIMEOUT)
    2007              :         {
    2008              :             /*
    2009              :              * Assign the current time here to avoid system call overhead
    2010              :              * while holding the spinlock in subsequent code.
    2011              :              */
    2012          411 :             now = GetCurrentTimestamp();
    2013              :         }
    2014              : 
    2015              :         /*
    2016              :          * Check if the slot needs to be invalidated. If it needs to be
    2017              :          * invalidated, and is not currently acquired, acquire it and mark it
    2018              :          * as having been invalidated.  We do this with the spinlock held to
    2019              :          * avoid race conditions -- for example the restart_lsn could move
    2020              :          * forward, or the slot could be dropped.
    2021              :          */
    2022          469 :         SpinLockAcquire(&s->mutex);
    2023              : 
    2024          469 :         restart_lsn = s->data.restart_lsn;
    2025              : 
    2026              :         /* we do nothing if the slot is already invalid */
    2027          469 :         if (s->data.invalidated == RS_INVAL_NONE)
    2028          423 :             invalidation_cause = DetermineSlotInvalidationCause(possible_causes,
    2029              :                                                                 s, oldestLSN,
    2030              :                                                                 dboid,
    2031              :                                                                 snapshotConflictHorizon,
    2032              :                                                                 &inactive_since,
    2033              :                                                                 now);
    2034              : 
    2035              :         /* if there's no invalidation, we're done */
    2036          469 :         if (invalidation_cause == RS_INVAL_NONE)
    2037              :         {
    2038          446 :             SpinLockRelease(&s->mutex);
    2039          446 :             if (released_lock)
    2040            0 :                 LWLockRelease(ReplicationSlotControlLock);
    2041          446 :             break;
    2042              :         }
    2043              : 
    2044           23 :         slotname = s->data.name;
    2045           23 :         active_proc = s->active_proc;
    2046              : 
    2047              :         /*
    2048              :          * If the slot can be acquired, do so and mark it invalidated
    2049              :          * immediately.  Otherwise we'll signal the owning process, below, and
    2050              :          * retry.
    2051              :          *
    2052              :          * Note: Unlike other slot attributes, slot's inactive_since can't be
    2053              :          * changed until the acquired slot is released or the owning process
    2054              :          * is terminated. So, the inactive slot can only be invalidated
    2055              :          * immediately without being terminated.
    2056              :          */
    2057           23 :         if (active_proc == INVALID_PROC_NUMBER)
    2058              :         {
    2059           16 :             MyReplicationSlot = s;
    2060           16 :             s->active_proc = MyProcNumber;
    2061           16 :             s->data.invalidated = invalidation_cause;
    2062              : 
    2063              :             /*
    2064              :              * XXX: We should consider not overwriting restart_lsn and instead
    2065              :              * just rely on .invalidated.
    2066              :              */
    2067           16 :             if (invalidation_cause == RS_INVAL_WAL_REMOVED)
    2068              :             {
    2069            5 :                 s->data.restart_lsn = InvalidXLogRecPtr;
    2070            5 :                 s->last_saved_restart_lsn = InvalidXLogRecPtr;
    2071              :             }
    2072              : 
    2073              :             /* Let caller know */
    2074           16 :             invalidated = true;
    2075              :         }
    2076              :         else
    2077              :         {
    2078            7 :             active_pid = GetPGProcByNumber(active_proc)->pid;
    2079              :             Assert(active_pid != 0);
    2080              :         }
    2081              : 
    2082           23 :         SpinLockRelease(&s->mutex);
    2083              : 
    2084              :         /*
    2085              :          * Calculate the idle time duration of the slot if slot is marked
    2086              :          * invalidated with RS_INVAL_IDLE_TIMEOUT.
    2087              :          */
    2088           23 :         if (invalidation_cause == RS_INVAL_IDLE_TIMEOUT)
    2089              :         {
    2090              :             int         slot_idle_usecs;
    2091              : 
    2092            0 :             TimestampDifference(inactive_since, now, &slot_idle_secs,
    2093              :                                 &slot_idle_usecs);
    2094              :         }
    2095              : 
    2096           23 :         if (active_proc != INVALID_PROC_NUMBER)
    2097              :         {
    2098              :             /*
    2099              :              * Prepare the sleep on the slot's condition variable before
    2100              :              * releasing the lock, to close a possible race condition if the
    2101              :              * slot is released before the sleep below.
    2102              :              */
    2103            7 :             ConditionVariablePrepareToSleep(&s->active_cv);
    2104              : 
    2105            7 :             LWLockRelease(ReplicationSlotControlLock);
    2106            7 :             released_lock = true;
    2107              : 
    2108              :             /*
    2109              :              * Signal to terminate the process that owns the slot, if we
    2110              :              * haven't already signalled it.  (Avoidance of repeated
    2111              :              * signalling is the only reason for there to be a loop in this
    2112              :              * routine; otherwise we could rely on caller's restart loop.)
    2113              :              *
    2114              :              * There is the race condition that other process may own the slot
    2115              :              * after its current owner process is terminated and before this
    2116              :              * process owns it. To handle that, we signal only if the PID of
    2117              :              * the owning process has changed from the previous time. (This
    2118              :              * logic assumes that the same PID is not reused very quickly.)
    2119              :              */
    2120            7 :             if (last_signaled_pid != active_pid)
    2121              :             {
    2122            7 :                 ReportSlotInvalidation(invalidation_cause, true, active_pid,
    2123              :                                        slotname, restart_lsn,
    2124              :                                        oldestLSN, snapshotConflictHorizon,
    2125              :                                        slot_idle_secs);
    2126              : 
    2127            7 :                 if (MyBackendType == B_STARTUP)
    2128            5 :                     (void) SignalRecoveryConflict(GetPGProcByNumber(active_proc),
    2129              :                                                   active_pid,
    2130              :                                                   RECOVERY_CONFLICT_LOGICALSLOT);
    2131              :                 else
    2132            2 :                     (void) kill(active_pid, SIGTERM);
    2133              : 
    2134            7 :                 last_signaled_pid = active_pid;
    2135              :             }
    2136              : 
    2137              :             /* Wait until the slot is released. */
    2138            7 :             ConditionVariableSleep(&s->active_cv,
    2139              :                                    WAIT_EVENT_REPLICATION_SLOT_DROP);
    2140              : 
    2141              :             /*
    2142              :              * Re-acquire lock and start over; we expect to invalidate the
    2143              :              * slot next time (unless another process acquires the slot in the
    2144              :              * meantime).
    2145              :              *
    2146              :              * Note: It is possible for a slot to advance its restart_lsn or
    2147              :              * xmin values sufficiently between when we release the mutex and
    2148              :              * when we recheck, moving from a conflicting state to a non
    2149              :              * conflicting state.  This is intentional and safe: if the slot
    2150              :              * has caught up while we're busy here, the resources we were
    2151              :              * concerned about (WAL segments or tuples) have not yet been
    2152              :              * removed, and there's no reason to invalidate the slot.
    2153              :              */
    2154            7 :             LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    2155            7 :             continue;
    2156              :         }
    2157              :         else
    2158              :         {
    2159              :             /*
    2160              :              * We hold the slot now and have already invalidated it; flush it
    2161              :              * to ensure that state persists.
    2162              :              *
    2163              :              * Don't want to hold ReplicationSlotControlLock across file
    2164              :              * system operations, so release it now but be sure to tell caller
    2165              :              * to restart from scratch.
    2166              :              */
    2167           16 :             LWLockRelease(ReplicationSlotControlLock);
    2168           16 :             released_lock = true;
    2169              : 
    2170              :             /* Make sure the invalidated state persists across server restart */
    2171           16 :             ReplicationSlotMarkDirty();
    2172           16 :             ReplicationSlotSave();
    2173           16 :             ReplicationSlotRelease();
    2174              : 
    2175           16 :             ReportSlotInvalidation(invalidation_cause, false, active_pid,
    2176              :                                    slotname, restart_lsn,
    2177              :                                    oldestLSN, snapshotConflictHorizon,
    2178              :                                    slot_idle_secs);
    2179              : 
    2180              :             /* done with this slot for now */
    2181           16 :             break;
    2182              :         }
    2183              :     }
    2184              : 
    2185              :     Assert(released_lock == !LWLockHeldByMe(ReplicationSlotControlLock));
    2186              : 
    2187          462 :     *released_lock_out = released_lock;
    2188          462 :     return invalidated;
    2189              : }
    2190              : 
    2191              : /*
    2192              :  * Invalidate slots that require resources about to be removed.
    2193              :  *
    2194              :  * Returns true when any slot have got invalidated.
    2195              :  *
    2196              :  * Whether a slot needs to be invalidated depends on the invalidation cause.
    2197              :  * A slot is invalidated if it:
    2198              :  * - RS_INVAL_WAL_REMOVED: requires a LSN older than the given segment
    2199              :  * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given
    2200              :  *   db; dboid may be InvalidOid for shared relations
    2201              :  * - RS_INVAL_WAL_LEVEL: is a logical slot and effective_wal_level is not
    2202              :  *   logical.
    2203              :  * - RS_INVAL_IDLE_TIMEOUT: has been idle longer than the configured
    2204              :  *   "idle_replication_slot_timeout" duration.
    2205              :  *
    2206              :  * Note: This function attempts to invalidate the slot for multiple possible
    2207              :  * causes in a single pass, minimizing redundant iterations. The "cause"
    2208              :  * parameter can be a MASK representing one or more of the defined causes.
    2209              :  *
    2210              :  * If it invalidates the last logical slot in the cluster, it requests to
    2211              :  * disable logical decoding.
    2212              :  *
    2213              :  * NB - this runs as part of checkpoint, so avoid raising errors if possible.
    2214              :  */
    2215              : bool
    2216         1973 : InvalidateObsoleteReplicationSlots(uint32 possible_causes,
    2217              :                                    XLogSegNo oldestSegno, Oid dboid,
    2218              :                                    TransactionId snapshotConflictHorizon)
    2219              : {
    2220              :     XLogRecPtr  oldestLSN;
    2221         1973 :     bool        invalidated = false;
    2222         1973 :     bool        invalidated_logical = false;
    2223              :     bool        found_valid_logicalslot;
    2224              : 
    2225              :     Assert(!(possible_causes & RS_INVAL_HORIZON) || TransactionIdIsValid(snapshotConflictHorizon));
    2226              :     Assert(!(possible_causes & RS_INVAL_WAL_REMOVED) || oldestSegno > 0);
    2227              :     Assert(possible_causes != RS_INVAL_NONE);
    2228              : 
    2229         1973 :     if (max_replication_slots == 0 && max_repack_replication_slots == 0)
    2230            0 :         return invalidated;
    2231              : 
    2232         1973 :     XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
    2233              : 
    2234         1989 : restart:
    2235         1989 :     found_valid_logicalslot = false;
    2236         1989 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    2237        31223 :     for (int i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
    2238              :     {
    2239        29250 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
    2240        29250 :         bool        released_lock = false;
    2241              : 
    2242        29250 :         if (!s->in_use)
    2243        28788 :             continue;
    2244              : 
    2245              :         /* Prevent invalidation of logical slots during binary upgrade */
    2246          471 :         if (SlotIsLogical(s) && IsBinaryUpgrade)
    2247              :         {
    2248            9 :             SpinLockAcquire(&s->mutex);
    2249            9 :             found_valid_logicalslot |= (s->data.invalidated == RS_INVAL_NONE);
    2250            9 :             SpinLockRelease(&s->mutex);
    2251              : 
    2252            9 :             continue;
    2253              :         }
    2254              : 
    2255          462 :         if (InvalidatePossiblyObsoleteSlot(possible_causes, s, oldestLSN,
    2256              :                                            dboid, snapshotConflictHorizon,
    2257              :                                            &released_lock))
    2258              :         {
    2259              :             Assert(released_lock);
    2260              : 
    2261              :             /* Remember we have invalidated a physical or logical slot */
    2262           16 :             invalidated = true;
    2263              : 
    2264              :             /*
    2265              :              * Additionally, remember we have invalidated a logical slot as we
    2266              :              * can request disabling logical decoding later.
    2267              :              */
    2268           16 :             if (SlotIsLogical(s))
    2269           13 :                 invalidated_logical = true;
    2270              :         }
    2271              :         else
    2272              :         {
    2273              :             /*
    2274              :              * We need to check if the slot is invalidated here since
    2275              :              * InvalidatePossiblyObsoleteSlot() returns false also if the slot
    2276              :              * is already invalidated.
    2277              :              */
    2278          446 :             SpinLockAcquire(&s->mutex);
    2279          892 :             found_valid_logicalslot |=
    2280          446 :                 (SlotIsLogical(s) && (s->data.invalidated == RS_INVAL_NONE));
    2281          446 :             SpinLockRelease(&s->mutex);
    2282              :         }
    2283              : 
    2284              :         /* if the lock was released, start from scratch */
    2285          462 :         if (released_lock)
    2286           16 :             goto restart;
    2287              :     }
    2288         1973 :     LWLockRelease(ReplicationSlotControlLock);
    2289              : 
    2290              :     /*
    2291              :      * If any slots have been invalidated, recalculate the resource limits.
    2292              :      */
    2293         1973 :     if (invalidated)
    2294              :     {
    2295           11 :         ReplicationSlotsComputeRequiredXmin(false);
    2296           11 :         ReplicationSlotsComputeRequiredLSN();
    2297              :     }
    2298              : 
    2299              :     /*
    2300              :      * Request the checkpointer to disable logical decoding if no valid
    2301              :      * logical slots remain. If called by the checkpointer during a
    2302              :      * checkpoint, only the request is initiated; actual deactivation is
    2303              :      * deferred until after the checkpoint completes.
    2304              :      */
    2305         1973 :     if (invalidated_logical && !found_valid_logicalslot)
    2306            8 :         RequestDisableLogicalDecoding();
    2307              : 
    2308         1973 :     return invalidated;
    2309              : }
    2310              : 
    2311              : /*
    2312              :  * Flush all replication slots to disk.
    2313              :  *
    2314              :  * It is convenient to flush dirty replication slots at the time of checkpoint.
    2315              :  * Additionally, in case of a shutdown checkpoint, we also identify the slots
    2316              :  * for which the confirmed_flush LSN has been updated since the last time it
    2317              :  * was saved and flush them.
    2318              :  */
    2319              : void
    2320         1948 : CheckPointReplicationSlots(bool is_shutdown)
    2321              : {
    2322              :     int         i;
    2323         1948 :     bool        last_saved_restart_lsn_updated = false;
    2324              : 
    2325         1948 :     elog(DEBUG1, "performing replication slot checkpoint");
    2326              : 
    2327              :     /*
    2328              :      * Prevent any slot from being created/dropped while we're active. As we
    2329              :      * explicitly do *not* want to block iterating over replication_slots or
    2330              :      * acquiring a slot we cannot take the control lock - but that's OK,
    2331              :      * because holding ReplicationSlotAllocationLock is strictly stronger, and
    2332              :      * enough to guarantee that nobody can change the in_use bits on us.
    2333              :      *
    2334              :      * Additionally, acquiring the Allocation lock is necessary to serialize
    2335              :      * the slot flush process with concurrent slot WAL reservation. This
    2336              :      * ensures that the WAL position being reserved is either flushed to disk
    2337              :      * or is beyond or equal to the redo pointer of the current checkpoint
    2338              :      * (See ReplicationSlotReserveWal for details).
    2339              :      */
    2340         1948 :     LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
    2341              : 
    2342        30887 :     for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
    2343              :     {
    2344        28939 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
    2345              :         char        path[MAXPGPATH];
    2346              : 
    2347        28939 :         if (!s->in_use)
    2348        28526 :             continue;
    2349              : 
    2350              :         /* save the slot to disk, locking is handled in SaveSlotToPath() */
    2351          413 :         sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(s->data.name));
    2352              : 
    2353              :         /*
    2354              :          * Slot's data is not flushed each time the confirmed_flush LSN is
    2355              :          * updated as that could lead to frequent writes.  However, we decide
    2356              :          * to force a flush of all logical slot's data at the time of shutdown
    2357              :          * if the confirmed_flush LSN is changed since we last flushed it to
    2358              :          * disk.  This helps in avoiding an unnecessary retreat of the
    2359              :          * confirmed_flush LSN after restart.
    2360              :          */
    2361          413 :         if (is_shutdown && SlotIsLogical(s))
    2362              :         {
    2363           95 :             SpinLockAcquire(&s->mutex);
    2364              : 
    2365           95 :             if (s->data.invalidated == RS_INVAL_NONE &&
    2366           94 :                 s->data.confirmed_flush > s->last_saved_confirmed_flush)
    2367              :             {
    2368           42 :                 s->just_dirtied = true;
    2369           42 :                 s->dirty = true;
    2370              :             }
    2371           95 :             SpinLockRelease(&s->mutex);
    2372              :         }
    2373              : 
    2374              :         /*
    2375              :          * Track if we're going to update slot's last_saved_restart_lsn. We
    2376              :          * need this to know if we need to recompute the required LSN.
    2377              :          */
    2378          413 :         if (s->last_saved_restart_lsn != s->data.restart_lsn)
    2379          215 :             last_saved_restart_lsn_updated = true;
    2380              : 
    2381          413 :         SaveSlotToPath(s, path, LOG);
    2382              :     }
    2383         1948 :     LWLockRelease(ReplicationSlotAllocationLock);
    2384              : 
    2385              :     /*
    2386              :      * Recompute the required LSN if SaveSlotToPath() updated
    2387              :      * last_saved_restart_lsn for any slot.
    2388              :      */
    2389         1948 :     if (last_saved_restart_lsn_updated)
    2390          215 :         ReplicationSlotsComputeRequiredLSN();
    2391         1948 : }
    2392              : 
    2393              : /*
    2394              :  * Load all replication slots from disk into memory at server startup. This
    2395              :  * needs to be run before we start crash recovery.
    2396              :  */
    2397              : void
    2398         1093 : StartupReplicationSlots(void)
    2399              : {
    2400              :     DIR        *replication_dir;
    2401              :     struct dirent *replication_de;
    2402              : 
    2403         1093 :     elog(DEBUG1, "starting up replication slots");
    2404              : 
    2405              :     /* restore all slots by iterating over all on-disk entries */
    2406         1093 :     replication_dir = AllocateDir(PG_REPLSLOT_DIR);
    2407         3399 :     while ((replication_de = ReadDir(replication_dir, PG_REPLSLOT_DIR)) != NULL)
    2408              :     {
    2409              :         char        path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
    2410              :         PGFileType  de_type;
    2411              : 
    2412         2308 :         if (strcmp(replication_de->d_name, ".") == 0 ||
    2413         1217 :             strcmp(replication_de->d_name, "..") == 0)
    2414         2182 :             continue;
    2415              : 
    2416          126 :         snprintf(path, sizeof(path), "%s/%s", PG_REPLSLOT_DIR, replication_de->d_name);
    2417          126 :         de_type = get_dirent_type(path, replication_de, false, DEBUG1);
    2418              : 
    2419              :         /* we're only creating directories here, skip if it's not our's */
    2420          126 :         if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_DIR)
    2421            0 :             continue;
    2422              : 
    2423              :         /* we crashed while a slot was being setup or deleted, clean up */
    2424          126 :         if (pg_str_endswith(replication_de->d_name, ".tmp"))
    2425              :         {
    2426            0 :             if (!rmtree(path, true))
    2427              :             {
    2428            0 :                 ereport(WARNING,
    2429              :                         (errmsg("could not remove directory \"%s\"",
    2430              :                                 path)));
    2431            0 :                 continue;
    2432              :             }
    2433            0 :             fsync_fname(PG_REPLSLOT_DIR, true);
    2434            0 :             continue;
    2435              :         }
    2436              : 
    2437              :         /* looks like a slot in a normal state, restore */
    2438          126 :         RestoreSlotFromDisk(replication_de->d_name);
    2439              :     }
    2440         1091 :     FreeDir(replication_dir);
    2441              : 
    2442              :     /* currently no slots exist, we're done. */
    2443         1091 :     if (max_replication_slots + max_repack_replication_slots <= 0)
    2444            0 :         return;
    2445              : 
    2446              :     /* Now that we have recovered all the data, compute replication xmin */
    2447         1091 :     ReplicationSlotsComputeRequiredXmin(false);
    2448         1091 :     ReplicationSlotsComputeRequiredLSN();
    2449              : }
    2450              : 
    2451              : /* ----
    2452              :  * Manipulation of on-disk state of replication slots
    2453              :  *
    2454              :  * NB: none of the routines below should take any notice whether a slot is the
    2455              :  * current one or not, that's all handled a layer above.
    2456              :  * ----
    2457              :  */
    2458              : static void
    2459          725 : CreateSlotOnDisk(ReplicationSlot *slot)
    2460              : {
    2461              :     char        tmppath[MAXPGPATH];
    2462              :     char        path[MAXPGPATH];
    2463              :     struct stat st;
    2464              : 
    2465              :     /*
    2466              :      * No need to take out the io_in_progress_lock, nobody else can see this
    2467              :      * slot yet, so nobody else will write. We're reusing SaveSlotToPath which
    2468              :      * takes out the lock, if we'd take the lock here, we'd deadlock.
    2469              :      */
    2470              : 
    2471          725 :     sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(slot->data.name));
    2472          725 :     sprintf(tmppath, "%s/%s.tmp", PG_REPLSLOT_DIR, NameStr(slot->data.name));
    2473              : 
    2474              :     /*
    2475              :      * It's just barely possible that some previous effort to create or drop a
    2476              :      * slot with this name left a temp directory lying around. If that seems
    2477              :      * to be the case, try to remove it.  If the rmtree() fails, we'll error
    2478              :      * out at the MakePGDirectory() below, so we don't bother checking
    2479              :      * success.
    2480              :      */
    2481          725 :     if (stat(tmppath, &st) == 0 && S_ISDIR(st.st_mode))
    2482            0 :         rmtree(tmppath, true);
    2483              : 
    2484              :     /* Create and fsync the temporary slot directory. */
    2485          725 :     if (MakePGDirectory(tmppath) < 0)
    2486            0 :         ereport(ERROR,
    2487              :                 (errcode_for_file_access(),
    2488              :                  errmsg("could not create directory \"%s\": %m",
    2489              :                         tmppath)));
    2490          725 :     fsync_fname(tmppath, true);
    2491              : 
    2492              :     /* Write the actual state file. */
    2493          725 :     slot->dirty = true;          /* signal that we really need to write */
    2494          725 :     SaveSlotToPath(slot, tmppath, ERROR);
    2495              : 
    2496              :     /* Rename the directory into place. */
    2497          725 :     if (rename(tmppath, path) != 0)
    2498            0 :         ereport(ERROR,
    2499              :                 (errcode_for_file_access(),
    2500              :                  errmsg("could not rename file \"%s\" to \"%s\": %m",
    2501              :                         tmppath, path)));
    2502              : 
    2503              :     /*
    2504              :      * If we'd now fail - really unlikely - we wouldn't know whether this slot
    2505              :      * would persist after an OS crash or not - so, force a restart. The
    2506              :      * restart would try to fsync this again till it works.
    2507              :      */
    2508          725 :     START_CRIT_SECTION();
    2509              : 
    2510          725 :     fsync_fname(path, true);
    2511          725 :     fsync_fname(PG_REPLSLOT_DIR, true);
    2512              : 
    2513          725 :     END_CRIT_SECTION();
    2514          725 : }
    2515              : 
    2516              : /*
    2517              :  * Shared functionality between saving and creating a replication slot.
    2518              :  */
    2519              : static void
    2520         2658 : SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
    2521              : {
    2522              :     char        tmppath[MAXPGPATH];
    2523              :     char        path[MAXPGPATH];
    2524              :     int         fd;
    2525              :     ReplicationSlotOnDisk cp;
    2526              :     bool        was_dirty;
    2527              : 
    2528              :     /* first check whether there's something to write out */
    2529         2658 :     SpinLockAcquire(&slot->mutex);
    2530         2658 :     was_dirty = slot->dirty;
    2531         2658 :     slot->just_dirtied = false;
    2532         2658 :     SpinLockRelease(&slot->mutex);
    2533              : 
    2534              :     /* and don't do anything if there's nothing to write */
    2535         2658 :     if (!was_dirty)
    2536          145 :         return;
    2537              : 
    2538         2513 :     LWLockAcquire(&slot->io_in_progress_lock, LW_EXCLUSIVE);
    2539              : 
    2540              :     /* silence valgrind :( */
    2541         2513 :     memset(&cp, 0, sizeof(ReplicationSlotOnDisk));
    2542              : 
    2543         2513 :     sprintf(tmppath, "%s/state.tmp", dir);
    2544         2513 :     sprintf(path, "%s/state", dir);
    2545              : 
    2546         2513 :     fd = OpenTransientFile(tmppath, O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
    2547         2513 :     if (fd < 0)
    2548              :     {
    2549              :         /*
    2550              :          * If not an ERROR, then release the lock before returning.  In case
    2551              :          * of an ERROR, the error recovery path automatically releases the
    2552              :          * lock, but no harm in explicitly releasing even in that case.  Note
    2553              :          * that LWLockRelease() could affect errno.
    2554              :          */
    2555            0 :         int         save_errno = errno;
    2556              : 
    2557            0 :         LWLockRelease(&slot->io_in_progress_lock);
    2558            0 :         errno = save_errno;
    2559            0 :         ereport(elevel,
    2560              :                 (errcode_for_file_access(),
    2561              :                  errmsg("could not create file \"%s\": %m",
    2562              :                         tmppath)));
    2563            0 :         return;
    2564              :     }
    2565              : 
    2566         2513 :     cp.magic = SLOT_MAGIC;
    2567         2513 :     INIT_CRC32C(cp.checksum);
    2568         2513 :     cp.version = SLOT_VERSION;
    2569         2513 :     cp.length = ReplicationSlotOnDiskV2Size;
    2570              : 
    2571         2513 :     SpinLockAcquire(&slot->mutex);
    2572              : 
    2573         2513 :     memcpy(&cp.slotdata, &slot->data, sizeof(ReplicationSlotPersistentData));
    2574              : 
    2575         2513 :     SpinLockRelease(&slot->mutex);
    2576              : 
    2577         2513 :     COMP_CRC32C(cp.checksum,
    2578              :                 (char *) (&cp) + ReplicationSlotOnDiskNotChecksummedSize,
    2579              :                 ReplicationSlotOnDiskChecksummedSize);
    2580         2513 :     FIN_CRC32C(cp.checksum);
    2581              : 
    2582         2513 :     errno = 0;
    2583         2513 :     pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_WRITE);
    2584         2513 :     if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
    2585              :     {
    2586            0 :         int         save_errno = errno;
    2587              : 
    2588            0 :         pgstat_report_wait_end();
    2589            0 :         CloseTransientFile(fd);
    2590            0 :         unlink(tmppath);
    2591            0 :         LWLockRelease(&slot->io_in_progress_lock);
    2592              : 
    2593              :         /* if write didn't set errno, assume problem is no disk space */
    2594            0 :         errno = save_errno ? save_errno : ENOSPC;
    2595            0 :         ereport(elevel,
    2596              :                 (errcode_for_file_access(),
    2597              :                  errmsg("could not write to file \"%s\": %m",
    2598              :                         tmppath)));
    2599            0 :         return;
    2600              :     }
    2601         2513 :     pgstat_report_wait_end();
    2602              : 
    2603              :     /* fsync the temporary file */
    2604         2513 :     pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_SYNC);
    2605         2513 :     if (pg_fsync(fd) != 0)
    2606              :     {
    2607            0 :         int         save_errno = errno;
    2608              : 
    2609            0 :         pgstat_report_wait_end();
    2610            0 :         CloseTransientFile(fd);
    2611            0 :         unlink(tmppath);
    2612            0 :         LWLockRelease(&slot->io_in_progress_lock);
    2613              : 
    2614            0 :         errno = save_errno;
    2615            0 :         ereport(elevel,
    2616              :                 (errcode_for_file_access(),
    2617              :                  errmsg("could not fsync file \"%s\": %m",
    2618              :                         tmppath)));
    2619            0 :         return;
    2620              :     }
    2621         2513 :     pgstat_report_wait_end();
    2622              : 
    2623         2513 :     if (CloseTransientFile(fd) != 0)
    2624              :     {
    2625            0 :         int         save_errno = errno;
    2626              : 
    2627            0 :         unlink(tmppath);
    2628            0 :         LWLockRelease(&slot->io_in_progress_lock);
    2629              : 
    2630            0 :         errno = save_errno;
    2631            0 :         ereport(elevel,
    2632              :                 (errcode_for_file_access(),
    2633              :                  errmsg("could not close file \"%s\": %m",
    2634              :                         tmppath)));
    2635            0 :         return;
    2636              :     }
    2637              : 
    2638              :     /* rename to permanent file, fsync file and directory */
    2639         2513 :     if (rename(tmppath, path) != 0)
    2640              :     {
    2641            0 :         int         save_errno = errno;
    2642              : 
    2643            0 :         unlink(tmppath);
    2644            0 :         LWLockRelease(&slot->io_in_progress_lock);
    2645              : 
    2646            0 :         errno = save_errno;
    2647            0 :         ereport(elevel,
    2648              :                 (errcode_for_file_access(),
    2649              :                  errmsg("could not rename file \"%s\" to \"%s\": %m",
    2650              :                         tmppath, path)));
    2651            0 :         return;
    2652              :     }
    2653              : 
    2654              :     /*
    2655              :      * Check CreateSlotOnDisk() for the reasoning of using a critical section.
    2656              :      */
    2657         2513 :     START_CRIT_SECTION();
    2658              : 
    2659         2513 :     fsync_fname(path, false);
    2660         2513 :     fsync_fname(dir, true);
    2661         2513 :     fsync_fname(PG_REPLSLOT_DIR, true);
    2662              : 
    2663         2513 :     END_CRIT_SECTION();
    2664              : 
    2665              :     /*
    2666              :      * Successfully wrote, unset dirty bit, unless somebody dirtied again
    2667              :      * already and remember the confirmed_flush LSN value.
    2668              :      */
    2669         2513 :     SpinLockAcquire(&slot->mutex);
    2670         2513 :     if (!slot->just_dirtied)
    2671         2486 :         slot->dirty = false;
    2672         2513 :     slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
    2673         2513 :     slot->last_saved_restart_lsn = cp.slotdata.restart_lsn;
    2674         2513 :     SpinLockRelease(&slot->mutex);
    2675              : 
    2676         2513 :     LWLockRelease(&slot->io_in_progress_lock);
    2677              : }
    2678              : 
    2679              : /*
    2680              :  * Load a single slot from disk into memory.
    2681              :  */
    2682              : static void
    2683          126 : RestoreSlotFromDisk(const char *name)
    2684              : {
    2685              :     ReplicationSlotOnDisk cp;
    2686              :     int         i;
    2687              :     char        slotdir[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
    2688              :     char        path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR) + 10];
    2689              :     int         fd;
    2690          126 :     bool        restored = false;
    2691              :     int         readBytes;
    2692              :     pg_crc32c   checksum;
    2693          126 :     TimestampTz now = 0;
    2694              : 
    2695              :     /* no need to lock here, no concurrent access allowed yet */
    2696              : 
    2697              :     /* delete temp file if it exists */
    2698          126 :     sprintf(slotdir, "%s/%s", PG_REPLSLOT_DIR, name);
    2699          126 :     sprintf(path, "%s/state.tmp", slotdir);
    2700          126 :     if (unlink(path) < 0 && errno != ENOENT)
    2701            0 :         ereport(PANIC,
    2702              :                 (errcode_for_file_access(),
    2703              :                  errmsg("could not remove file \"%s\": %m", path)));
    2704              : 
    2705          126 :     sprintf(path, "%s/state", slotdir);
    2706              : 
    2707          126 :     elog(DEBUG1, "restoring replication slot from \"%s\"", path);
    2708              : 
    2709              :     /* on some operating systems fsyncing a file requires O_RDWR */
    2710          126 :     fd = OpenTransientFile(path, O_RDWR | PG_BINARY);
    2711              : 
    2712              :     /*
    2713              :      * We do not need to handle this as we are rename()ing the directory into
    2714              :      * place only after we fsync()ed the state file.
    2715              :      */
    2716          126 :     if (fd < 0)
    2717            0 :         ereport(PANIC,
    2718              :                 (errcode_for_file_access(),
    2719              :                  errmsg("could not open file \"%s\": %m", path)));
    2720              : 
    2721              :     /*
    2722              :      * Sync state file before we're reading from it. We might have crashed
    2723              :      * while it wasn't synced yet and we shouldn't continue on that basis.
    2724              :      */
    2725          126 :     pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_RESTORE_SYNC);
    2726          126 :     if (pg_fsync(fd) != 0)
    2727            0 :         ereport(PANIC,
    2728              :                 (errcode_for_file_access(),
    2729              :                  errmsg("could not fsync file \"%s\": %m",
    2730              :                         path)));
    2731          126 :     pgstat_report_wait_end();
    2732              : 
    2733              :     /* Also sync the parent directory */
    2734          126 :     START_CRIT_SECTION();
    2735          126 :     fsync_fname(slotdir, true);
    2736          126 :     END_CRIT_SECTION();
    2737              : 
    2738              :     /* read part of statefile that's guaranteed to be version independent */
    2739          126 :     pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
    2740          126 :     readBytes = read(fd, &cp, ReplicationSlotOnDiskConstantSize);
    2741          126 :     pgstat_report_wait_end();
    2742          126 :     if (readBytes != ReplicationSlotOnDiskConstantSize)
    2743              :     {
    2744            0 :         if (readBytes < 0)
    2745            0 :             ereport(PANIC,
    2746              :                     (errcode_for_file_access(),
    2747              :                      errmsg("could not read file \"%s\": %m", path)));
    2748              :         else
    2749            0 :             ereport(PANIC,
    2750              :                     (errcode(ERRCODE_DATA_CORRUPTED),
    2751              :                      errmsg("could not read file \"%s\": read %d of %zu",
    2752              :                             path, readBytes,
    2753              :                             (Size) ReplicationSlotOnDiskConstantSize)));
    2754              :     }
    2755              : 
    2756              :     /* verify magic */
    2757          126 :     if (cp.magic != SLOT_MAGIC)
    2758            0 :         ereport(PANIC,
    2759              :                 (errcode(ERRCODE_DATA_CORRUPTED),
    2760              :                  errmsg("replication slot file \"%s\" has wrong magic number: %u instead of %u",
    2761              :                         path, cp.magic, SLOT_MAGIC)));
    2762              : 
    2763              :     /* verify version */
    2764          126 :     if (cp.version != SLOT_VERSION)
    2765            0 :         ereport(PANIC,
    2766              :                 (errcode(ERRCODE_DATA_CORRUPTED),
    2767              :                  errmsg("replication slot file \"%s\" has unsupported version %u",
    2768              :                         path, cp.version)));
    2769              : 
    2770              :     /* boundary check on length */
    2771          126 :     if (cp.length != ReplicationSlotOnDiskV2Size)
    2772            0 :         ereport(PANIC,
    2773              :                 (errcode(ERRCODE_DATA_CORRUPTED),
    2774              :                  errmsg("replication slot file \"%s\" has corrupted length %u",
    2775              :                         path, cp.length)));
    2776              : 
    2777              :     /* Now that we know the size, read the entire file */
    2778          126 :     pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
    2779          252 :     readBytes = read(fd,
    2780              :                      (char *) &cp + ReplicationSlotOnDiskConstantSize,
    2781          126 :                      cp.length);
    2782          126 :     pgstat_report_wait_end();
    2783          126 :     if (readBytes != cp.length)
    2784              :     {
    2785            0 :         if (readBytes < 0)
    2786            0 :             ereport(PANIC,
    2787              :                     (errcode_for_file_access(),
    2788              :                      errmsg("could not read file \"%s\": %m", path)));
    2789              :         else
    2790            0 :             ereport(PANIC,
    2791              :                     (errcode(ERRCODE_DATA_CORRUPTED),
    2792              :                      errmsg("could not read file \"%s\": read %d of %zu",
    2793              :                             path, readBytes, (Size) cp.length)));
    2794              :     }
    2795              : 
    2796          126 :     if (CloseTransientFile(fd) != 0)
    2797            0 :         ereport(PANIC,
    2798              :                 (errcode_for_file_access(),
    2799              :                  errmsg("could not close file \"%s\": %m", path)));
    2800              : 
    2801              :     /* now verify the CRC */
    2802          126 :     INIT_CRC32C(checksum);
    2803          126 :     COMP_CRC32C(checksum,
    2804              :                 (char *) &cp + ReplicationSlotOnDiskNotChecksummedSize,
    2805              :                 ReplicationSlotOnDiskChecksummedSize);
    2806          126 :     FIN_CRC32C(checksum);
    2807              : 
    2808          126 :     if (!EQ_CRC32C(checksum, cp.checksum))
    2809            0 :         ereport(PANIC,
    2810              :                 (errmsg("checksum mismatch for replication slot file \"%s\": is %u, should be %u",
    2811              :                         path, checksum, cp.checksum)));
    2812              : 
    2813              :     /*
    2814              :      * If we crashed with an ephemeral slot active, don't restore but delete
    2815              :      * it.
    2816              :      */
    2817          126 :     if (cp.slotdata.persistency != RS_PERSISTENT)
    2818              :     {
    2819            0 :         if (!rmtree(slotdir, true))
    2820              :         {
    2821            0 :             ereport(WARNING,
    2822              :                     (errmsg("could not remove directory \"%s\"",
    2823              :                             slotdir)));
    2824              :         }
    2825            0 :         fsync_fname(PG_REPLSLOT_DIR, true);
    2826            0 :         return;
    2827              :     }
    2828              : 
    2829              :     /*
    2830              :      * Verify that requirements for the specific slot type are met. That's
    2831              :      * important because if these aren't met we're not guaranteed to retain
    2832              :      * all the necessary resources for the slot.
    2833              :      *
    2834              :      * NB: We have to do so *after* the above checks for ephemeral slots,
    2835              :      * because otherwise a slot that shouldn't exist anymore could prevent
    2836              :      * restarts.
    2837              :      *
    2838              :      * NB: Changing the requirements here also requires adapting
    2839              :      * CheckSlotRequirements() and CheckLogicalDecodingRequirements().
    2840              :      */
    2841          126 :     if (cp.slotdata.database != InvalidOid)
    2842              :     {
    2843           87 :         if (wal_level < WAL_LEVEL_REPLICA)
    2844            1 :             ereport(FATAL,
    2845              :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2846              :                      errmsg("logical replication slot \"%s\" exists, but \"wal_level\" < \"replica\"",
    2847              :                             NameStr(cp.slotdata.name)),
    2848              :                      errhint("Change \"wal_level\" to be \"replica\" or higher.")));
    2849              : 
    2850              :         /*
    2851              :          * In standby mode, the hot standby must be enabled. This check is
    2852              :          * necessary to ensure logical slots are invalidated when they become
    2853              :          * incompatible due to insufficient wal_level. Otherwise, if the
    2854              :          * primary reduces effective_wal_level < logical while hot standby is
    2855              :          * disabled, primary disable logical decoding while hot standby is
    2856              :          * disabled, logical slots would remain valid even after promotion.
    2857              :          */
    2858           86 :         if (StandbyMode && !EnableHotStandby)
    2859            1 :             ereport(FATAL,
    2860              :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2861              :                      errmsg("logical replication slot \"%s\" exists on the standby, but \"hot_standby\" = \"off\"",
    2862              :                             NameStr(cp.slotdata.name)),
    2863              :                      errhint("Change \"hot_standby\" to be \"on\".")));
    2864              :     }
    2865           39 :     else if (wal_level < WAL_LEVEL_REPLICA)
    2866            0 :         ereport(FATAL,
    2867              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2868              :                  errmsg("physical replication slot \"%s\" exists, but \"wal_level\" < \"replica\"",
    2869              :                         NameStr(cp.slotdata.name)),
    2870              :                  errhint("Change \"wal_level\" to be \"replica\" or higher.")));
    2871              : 
    2872              :     /*
    2873              :      * Nothing can be active yet, don't lock anything.  Note we iterate up to
    2874              :      * max_replication_slots instead of adding max_repack_replication_slots as
    2875              :      * in all other places, because we must enforce the GUC value in case
    2876              :      * there were more slots before the shutdown than what it is set up to
    2877              :      * now.
    2878              :      */
    2879          182 :     for (i = 0; i < max_replication_slots; i++)
    2880              :     {
    2881              :         ReplicationSlot *slot;
    2882              : 
    2883          182 :         slot = &ReplicationSlotCtl->replication_slots[i];
    2884              : 
    2885          182 :         if (slot->in_use)
    2886           58 :             continue;
    2887              : 
    2888              :         /* restore the entire set of persistent data */
    2889          124 :         memcpy(&slot->data, &cp.slotdata,
    2890              :                sizeof(ReplicationSlotPersistentData));
    2891              : 
    2892              :         /* initialize in memory state */
    2893          124 :         slot->effective_xmin = cp.slotdata.xmin;
    2894          124 :         slot->effective_catalog_xmin = cp.slotdata.catalog_xmin;
    2895          124 :         slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
    2896          124 :         slot->last_saved_restart_lsn = cp.slotdata.restart_lsn;
    2897              : 
    2898          124 :         slot->candidate_catalog_xmin = InvalidTransactionId;
    2899          124 :         slot->candidate_xmin_lsn = InvalidXLogRecPtr;
    2900          124 :         slot->candidate_restart_lsn = InvalidXLogRecPtr;
    2901          124 :         slot->candidate_restart_valid = InvalidXLogRecPtr;
    2902              : 
    2903          124 :         slot->in_use = true;
    2904          124 :         slot->active_proc = INVALID_PROC_NUMBER;
    2905              : 
    2906              :         /*
    2907              :          * Set the time since the slot has become inactive after loading the
    2908              :          * slot from the disk into memory. Whoever acquires the slot i.e.
    2909              :          * makes the slot active will reset it. Use the same inactive_since
    2910              :          * time for all the slots.
    2911              :          */
    2912          124 :         if (now == 0)
    2913          124 :             now = GetCurrentTimestamp();
    2914              : 
    2915          124 :         ReplicationSlotSetInactiveSince(slot, now, false);
    2916              : 
    2917          124 :         restored = true;
    2918          124 :         break;
    2919              :     }
    2920              : 
    2921          124 :     if (!restored)
    2922            0 :         ereport(FATAL,
    2923              :                 (errmsg("too many replication slots active before shutdown"),
    2924              :                  errhint("Increase \"max_replication_slots\" and try again.")));
    2925              : }
    2926              : 
    2927              : /*
    2928              :  * Maps an invalidation reason for a replication slot to
    2929              :  * ReplicationSlotInvalidationCause.
    2930              :  */
    2931              : ReplicationSlotInvalidationCause
    2932            0 : GetSlotInvalidationCause(const char *cause_name)
    2933              : {
    2934              :     Assert(cause_name);
    2935              : 
    2936              :     /* Search lookup table for the cause having this name */
    2937            0 :     for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
    2938              :     {
    2939            0 :         if (strcmp(SlotInvalidationCauses[i].cause_name, cause_name) == 0)
    2940            0 :             return SlotInvalidationCauses[i].cause;
    2941              :     }
    2942              : 
    2943              :     Assert(false);
    2944            0 :     return RS_INVAL_NONE;       /* to keep compiler quiet */
    2945              : }
    2946              : 
    2947              : /*
    2948              :  * Maps a ReplicationSlotInvalidationCause to the invalidation
    2949              :  * reason for a replication slot.
    2950              :  */
    2951              : const char *
    2952           45 : GetSlotInvalidationCauseName(ReplicationSlotInvalidationCause cause)
    2953              : {
    2954              :     /* Search lookup table for the name of this cause */
    2955          146 :     for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
    2956              :     {
    2957          146 :         if (SlotInvalidationCauses[i].cause == cause)
    2958           45 :             return SlotInvalidationCauses[i].cause_name;
    2959              :     }
    2960              : 
    2961              :     Assert(false);
    2962            0 :     return "none";                /* to keep compiler quiet */
    2963              : }
    2964              : 
    2965              : /*
    2966              :  * A helper function to validate slots specified in GUC synchronized_standby_slots.
    2967              :  *
    2968              :  * The rawname will be parsed, and the result will be saved into *elemlist.
    2969              :  */
    2970              : static bool
    2971           28 : validate_sync_standby_slots(char *rawname, List **elemlist)
    2972              : {
    2973              :     /* Verify syntax and parse string into a list of identifiers */
    2974           28 :     if (!SplitIdentifierString(rawname, ',', elemlist))
    2975              :     {
    2976            0 :         GUC_check_errdetail("List syntax is invalid.");
    2977            0 :         return false;
    2978              :     }
    2979              : 
    2980              :     /* Iterate the list to validate each slot name */
    2981           80 :     foreach_ptr(char, name, *elemlist)
    2982              :     {
    2983              :         int         err_code;
    2984           28 :         char       *err_msg = NULL;
    2985           28 :         char       *err_hint = NULL;
    2986              : 
    2987           28 :         if (!ReplicationSlotValidateNameInternal(name, false, &err_code,
    2988              :                                                  &err_msg, &err_hint))
    2989              :         {
    2990            2 :             GUC_check_errcode(err_code);
    2991            2 :             GUC_check_errdetail("%s", err_msg);
    2992            2 :             if (err_hint != NULL)
    2993            2 :                 GUC_check_errhint("%s", err_hint);
    2994            2 :             return false;
    2995              :         }
    2996              :     }
    2997              : 
    2998           26 :     return true;
    2999              : }
    3000              : 
    3001              : /*
    3002              :  * GUC check_hook for synchronized_standby_slots
    3003              :  */
    3004              : bool
    3005         1337 : check_synchronized_standby_slots(char **newval, void **extra, GucSource source)
    3006              : {
    3007              :     char       *rawname;
    3008              :     char       *ptr;
    3009              :     List       *elemlist;
    3010              :     int         size;
    3011              :     bool        ok;
    3012              :     SyncStandbySlotsConfigData *config;
    3013              : 
    3014         1337 :     if ((*newval)[0] == '\0')
    3015         1309 :         return true;
    3016              : 
    3017              :     /* Need a modifiable copy of the GUC string */
    3018           28 :     rawname = pstrdup(*newval);
    3019              : 
    3020              :     /* Now verify if the specified slots exist and have correct type */
    3021           28 :     ok = validate_sync_standby_slots(rawname, &elemlist);
    3022              : 
    3023           28 :     if (!ok || elemlist == NIL)
    3024              :     {
    3025            2 :         pfree(rawname);
    3026            2 :         list_free(elemlist);
    3027            2 :         return ok;
    3028              :     }
    3029              : 
    3030              :     /* Compute the size required for the SyncStandbySlotsConfigData struct */
    3031           26 :     size = offsetof(SyncStandbySlotsConfigData, slot_names);
    3032           78 :     foreach_ptr(char, slot_name, elemlist)
    3033           26 :         size += strlen(slot_name) + 1;
    3034              : 
    3035              :     /* GUC extra value must be guc_malloc'd, not palloc'd */
    3036           26 :     config = (SyncStandbySlotsConfigData *) guc_malloc(LOG, size);
    3037           26 :     if (!config)
    3038            0 :         return false;
    3039              : 
    3040              :     /* Transform the data into SyncStandbySlotsConfigData */
    3041           26 :     config->nslotnames = list_length(elemlist);
    3042              : 
    3043           26 :     ptr = config->slot_names;
    3044           78 :     foreach_ptr(char, slot_name, elemlist)
    3045              :     {
    3046           26 :         strcpy(ptr, slot_name);
    3047           26 :         ptr += strlen(slot_name) + 1;
    3048              :     }
    3049              : 
    3050           26 :     *extra = config;
    3051              : 
    3052           26 :     pfree(rawname);
    3053           26 :     list_free(elemlist);
    3054           26 :     return true;
    3055              : }
    3056              : 
    3057              : /*
    3058              :  * GUC assign_hook for synchronized_standby_slots
    3059              :  */
    3060              : void
    3061         1334 : assign_synchronized_standby_slots(const char *newval, void *extra)
    3062              : {
    3063              :     /*
    3064              :      * The standby slots may have changed, so we must recompute the oldest
    3065              :      * LSN.
    3066              :      */
    3067         1334 :     ss_oldest_flush_lsn = InvalidXLogRecPtr;
    3068              : 
    3069         1334 :     synchronized_standby_slots_config = (SyncStandbySlotsConfigData *) extra;
    3070         1334 : }
    3071              : 
    3072              : /*
    3073              :  * Check if the passed slot_name is specified in the synchronized_standby_slots GUC.
    3074              :  */
    3075              : bool
    3076        41147 : SlotExistsInSyncStandbySlots(const char *slot_name)
    3077              : {
    3078              :     const char *standby_slot_name;
    3079              : 
    3080              :     /* Return false if there is no value in synchronized_standby_slots */
    3081        41147 :     if (synchronized_standby_slots_config == NULL)
    3082        41130 :         return false;
    3083              : 
    3084              :     /*
    3085              :      * XXX: We are not expecting this list to be long so a linear search
    3086              :      * shouldn't hurt but if that turns out not to be true then we can cache
    3087              :      * this information for each WalSender as well.
    3088              :      */
    3089           17 :     standby_slot_name = synchronized_standby_slots_config->slot_names;
    3090           25 :     for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
    3091              :     {
    3092           17 :         if (strcmp(standby_slot_name, slot_name) == 0)
    3093            9 :             return true;
    3094              : 
    3095            8 :         standby_slot_name += strlen(standby_slot_name) + 1;
    3096              :     }
    3097              : 
    3098            8 :     return false;
    3099              : }
    3100              : 
    3101              : /*
    3102              :  * Return true if the slots specified in synchronized_standby_slots have caught up to
    3103              :  * the given WAL location, false otherwise.
    3104              :  *
    3105              :  * The elevel parameter specifies the error level used for logging messages
    3106              :  * related to slots that do not exist, are invalidated, or are inactive.
    3107              :  */
    3108              : bool
    3109         3535 : StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
    3110              : {
    3111              :     const char *name;
    3112         3535 :     int         caught_up_slot_num = 0;
    3113         3535 :     XLogRecPtr  min_restart_lsn = InvalidXLogRecPtr;
    3114              : 
    3115              :     /*
    3116              :      * Don't need to wait for the standbys to catch up if there is no value in
    3117              :      * synchronized_standby_slots.
    3118              :      */
    3119         3535 :     if (synchronized_standby_slots_config == NULL)
    3120         3501 :         return true;
    3121              : 
    3122              :     /*
    3123              :      * Don't need to wait for the standbys to catch up if we are on a standby
    3124              :      * server, since we do not support syncing slots to cascading standbys.
    3125              :      */
    3126           34 :     if (RecoveryInProgress())
    3127            0 :         return true;
    3128              : 
    3129              :     /*
    3130              :      * Don't need to wait for the standbys to catch up if they are already
    3131              :      * beyond the specified WAL location.
    3132              :      */
    3133           34 :     if (XLogRecPtrIsValid(ss_oldest_flush_lsn) &&
    3134           18 :         ss_oldest_flush_lsn >= wait_for_lsn)
    3135            9 :         return true;
    3136              : 
    3137              :     /*
    3138              :      * To prevent concurrent slot dropping and creation while filtering the
    3139              :      * slots, take the ReplicationSlotControlLock outside of the loop.
    3140              :      */
    3141           25 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    3142              : 
    3143           25 :     name = synchronized_standby_slots_config->slot_names;
    3144           34 :     for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
    3145              :     {
    3146              :         XLogRecPtr  restart_lsn;
    3147              :         bool        invalidated;
    3148              :         bool        inactive;
    3149              :         ReplicationSlot *slot;
    3150              : 
    3151           25 :         slot = SearchNamedReplicationSlot(name, false);
    3152              : 
    3153              :         /*
    3154              :          * If a slot name provided in synchronized_standby_slots does not
    3155              :          * exist, report a message and exit the loop.
    3156              :          */
    3157           25 :         if (!slot)
    3158              :         {
    3159            0 :             ereport(elevel,
    3160              :                     errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    3161              :                     errmsg("replication slot \"%s\" specified in parameter \"%s\" does not exist",
    3162              :                            name, "synchronized_standby_slots"),
    3163              :                     errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
    3164              :                               name),
    3165              :                     errhint("Create the replication slot \"%s\" or amend parameter \"%s\".",
    3166              :                             name, "synchronized_standby_slots"));
    3167            0 :             break;
    3168              :         }
    3169              : 
    3170              :         /* Same as above: if a slot is not physical, exit the loop. */
    3171           25 :         if (SlotIsLogical(slot))
    3172              :         {
    3173            0 :             ereport(elevel,
    3174              :                     errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    3175              :                     errmsg("cannot specify logical replication slot \"%s\" in parameter \"%s\"",
    3176              :                            name, "synchronized_standby_slots"),
    3177              :                     errdetail("Logical replication is waiting for correction on replication slot \"%s\".",
    3178              :                               name),
    3179              :                     errhint("Remove the logical replication slot \"%s\" from parameter \"%s\".",
    3180              :                             name, "synchronized_standby_slots"));
    3181            0 :             break;
    3182              :         }
    3183              : 
    3184           25 :         SpinLockAcquire(&slot->mutex);
    3185           25 :         restart_lsn = slot->data.restart_lsn;
    3186           25 :         invalidated = slot->data.invalidated != RS_INVAL_NONE;
    3187           25 :         inactive = slot->active_proc == INVALID_PROC_NUMBER;
    3188           25 :         SpinLockRelease(&slot->mutex);
    3189              : 
    3190           25 :         if (invalidated)
    3191              :         {
    3192              :             /* Specified physical slot has been invalidated */
    3193            0 :             ereport(elevel,
    3194              :                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    3195              :                     errmsg("physical replication slot \"%s\" specified in parameter \"%s\" has been invalidated",
    3196              :                            name, "synchronized_standby_slots"),
    3197              :                     errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
    3198              :                               name),
    3199              :                     errhint("Drop and recreate the replication slot \"%s\", or amend parameter \"%s\".",
    3200              :                             name, "synchronized_standby_slots"));
    3201            0 :             break;
    3202              :         }
    3203              : 
    3204           25 :         if (!XLogRecPtrIsValid(restart_lsn) || restart_lsn < wait_for_lsn)
    3205              :         {
    3206              :             /* Log a message if no active_pid for this physical slot */
    3207           16 :             if (inactive)
    3208           11 :                 ereport(elevel,
    3209              :                         errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    3210              :                         errmsg("replication slot \"%s\" specified in parameter \"%s\" does not have active_pid",
    3211              :                                name, "synchronized_standby_slots"),
    3212              :                         errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
    3213              :                                   name),
    3214              :                         errhint("Start the standby associated with the replication slot \"%s\", or amend parameter \"%s\".",
    3215              :                                 name, "synchronized_standby_slots"));
    3216              : 
    3217              :             /* Continue if the current slot hasn't caught up. */
    3218           16 :             break;
    3219              :         }
    3220              : 
    3221              :         Assert(restart_lsn >= wait_for_lsn);
    3222              : 
    3223            9 :         if (!XLogRecPtrIsValid(min_restart_lsn) ||
    3224              :             min_restart_lsn > restart_lsn)
    3225            9 :             min_restart_lsn = restart_lsn;
    3226              : 
    3227            9 :         caught_up_slot_num++;
    3228              : 
    3229            9 :         name += strlen(name) + 1;
    3230              :     }
    3231              : 
    3232           25 :     LWLockRelease(ReplicationSlotControlLock);
    3233              : 
    3234              :     /*
    3235              :      * Return false if not all the standbys have caught up to the specified
    3236              :      * WAL location.
    3237              :      */
    3238           25 :     if (caught_up_slot_num != synchronized_standby_slots_config->nslotnames)
    3239           16 :         return false;
    3240              : 
    3241              :     /* The ss_oldest_flush_lsn must not retreat. */
    3242              :     Assert(!XLogRecPtrIsValid(ss_oldest_flush_lsn) ||
    3243              :            min_restart_lsn >= ss_oldest_flush_lsn);
    3244              : 
    3245            9 :     ss_oldest_flush_lsn = min_restart_lsn;
    3246              : 
    3247            9 :     return true;
    3248              : }
    3249              : 
    3250              : /*
    3251              :  * Wait for physical standbys to confirm receiving the given lsn.
    3252              :  *
    3253              :  * Used by logical decoding SQL functions. It waits for physical standbys
    3254              :  * corresponding to the physical slots specified in the synchronized_standby_slots GUC.
    3255              :  */
    3256              : void
    3257          236 : WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
    3258              : {
    3259              :     /*
    3260              :      * Don't need to wait for the standby to catch up if the current acquired
    3261              :      * slot is not a logical failover slot, or there is no value in
    3262              :      * synchronized_standby_slots.
    3263              :      */
    3264          236 :     if (!MyReplicationSlot->data.failover || !synchronized_standby_slots_config)
    3265          235 :         return;
    3266              : 
    3267            1 :     ConditionVariablePrepareToSleep(&WalSndCtl->wal_confirm_rcv_cv);
    3268              : 
    3269              :     for (;;)
    3270              :     {
    3271            2 :         CHECK_FOR_INTERRUPTS();
    3272              : 
    3273            2 :         if (ConfigReloadPending)
    3274              :         {
    3275            1 :             ConfigReloadPending = false;
    3276            1 :             ProcessConfigFile(PGC_SIGHUP);
    3277              :         }
    3278              : 
    3279              :         /* Exit if done waiting for every slot. */
    3280            2 :         if (StandbySlotsHaveCaughtup(wait_for_lsn, WARNING))
    3281            1 :             break;
    3282              : 
    3283              :         /*
    3284              :          * Wait for the slots in the synchronized_standby_slots to catch up,
    3285              :          * but use a timeout (1s) so we can also check if the
    3286              :          * synchronized_standby_slots has been changed.
    3287              :          */
    3288            1 :         ConditionVariableTimedSleep(&WalSndCtl->wal_confirm_rcv_cv, 1000,
    3289              :                                     WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION);
    3290              :     }
    3291              : 
    3292            1 :     ConditionVariableCancelSleep();
    3293              : }
        

Generated by: LCOV version 2.0-1