LCOV - code coverage report
Current view: top level - src/backend/replication - slot.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 86.2 % 1003 865
Test Date: 2026-05-02 01:16:35 Functions: 97.9 % 48 47
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * slot.c
       4              :  *     Replication slot management.
       5              :  *
       6              :  *
       7              :  * Copyright (c) 2012-2026, PostgreSQL Global Development Group
       8              :  *
       9              :  *
      10              :  * IDENTIFICATION
      11              :  *    src/backend/replication/slot.c
      12              :  *
      13              :  * NOTES
      14              :  *
      15              :  * Replication slots are used to keep state about replication streams
      16              :  * originating from this cluster.  Their primary purpose is to prevent the
      17              :  * premature removal of WAL or of old tuple versions in a manner that would
      18              :  * interfere with replication; they are also useful for monitoring purposes.
      19              :  * Slots need to be permanent (to allow restarts), crash-safe, and allocatable
      20              :  * on standbys (to support cascading setups).  The requirement that slots be
      21              :  * usable on standbys precludes storing them in the system catalogs.
      22              :  *
      23              :  * Each replication slot gets its own directory inside the directory
      24              :  * $PGDATA / PG_REPLSLOT_DIR.  Inside that directory the state file will
      25              :  * contain the slot's own data.  Additional data can be stored alongside that
      26              :  * file if required.  While the server is running, the state data is also
      27              :  * cached in memory for efficiency.
      28              :  *
      29              :  * ReplicationSlotAllocationLock must be taken in exclusive mode to allocate
      30              :  * or free a slot. ReplicationSlotControlLock must be taken in shared mode
      31              :  * to iterate over the slots, and in exclusive mode to change the in_use flag
      32              :  * of a slot.  The remaining data in each slot is protected by its mutex.
      33              :  *
      34              :  *-------------------------------------------------------------------------
      35              :  */
      36              : 
      37              : #include "postgres.h"
      38              : 
      39              : #include <unistd.h>
      40              : #include <sys/stat.h>
      41              : 
      42              : #include "access/transam.h"
      43              : #include "access/xlog_internal.h"
      44              : #include "access/xlogrecovery.h"
      45              : #include "common/file_utils.h"
      46              : #include "common/string.h"
      47              : #include "miscadmin.h"
      48              : #include "pgstat.h"
      49              : #include "postmaster/interrupt.h"
      50              : #include "replication/logicallauncher.h"
      51              : #include "replication/slotsync.h"
      52              : #include "replication/slot.h"
      53              : #include "replication/walsender_private.h"
      54              : #include "storage/fd.h"
      55              : #include "storage/ipc.h"
      56              : #include "storage/proc.h"
      57              : #include "storage/procarray.h"
      58              : #include "storage/subsystems.h"
      59              : #include "utils/builtins.h"
      60              : #include "utils/guc_hooks.h"
      61              : #include "utils/injection_point.h"
      62              : #include "utils/varlena.h"
      63              : #include "utils/wait_event.h"
      64              : 
      65              : /*
      66              :  * Replication slot on-disk data structure.
      67              :  */
      68              : typedef struct ReplicationSlotOnDisk
      69              : {
      70              :     /* first part of this struct needs to be version independent */
      71              : 
      72              :     /* data not covered by checksum */
      73              :     uint32      magic;
      74              :     pg_crc32c   checksum;
      75              : 
      76              :     /* data covered by checksum */
      77              :     uint32      version;
      78              :     uint32      length;
      79              : 
      80              :     /*
      81              :      * The actual data in the slot that follows can differ based on the above
      82              :      * 'version'.
      83              :      */
      84              : 
      85              :     ReplicationSlotPersistentData slotdata;
      86              : } ReplicationSlotOnDisk;
      87              : 
      88              : /*
      89              :  * Struct for the configuration of synchronized_standby_slots.
      90              :  *
      91              :  * Note: this must be a flat representation that can be held in a single chunk
      92              :  * of guc_malloc'd memory, so that it can be stored as the "extra" data for the
      93              :  * synchronized_standby_slots GUC.
      94              :  */
      95              : typedef struct
      96              : {
      97              :     /* Number of slot names in the slot_names[] */
      98              :     int         nslotnames;
      99              : 
     100              :     /*
     101              :      * slot_names contains 'nslotnames' consecutive null-terminated C strings.
     102              :      */
     103              :     char        slot_names[FLEXIBLE_ARRAY_MEMBER];
     104              : } SyncStandbySlotsConfigData;
     105              : 
     106              : /*
     107              :  * Lookup table for slot invalidation causes.
     108              :  */
     109              : typedef struct SlotInvalidationCauseMap
     110              : {
     111              :     ReplicationSlotInvalidationCause cause;
     112              :     const char *cause_name;
     113              : } SlotInvalidationCauseMap;
     114              : 
     115              : static const SlotInvalidationCauseMap SlotInvalidationCauses[] = {
     116              :     {RS_INVAL_NONE, "none"},
     117              :     {RS_INVAL_WAL_REMOVED, "wal_removed"},
     118              :     {RS_INVAL_HORIZON, "rows_removed"},
     119              :     {RS_INVAL_WAL_LEVEL, "wal_level_insufficient"},
     120              :     {RS_INVAL_IDLE_TIMEOUT, "idle_timeout"},
     121              : };
     122              : 
     123              : /*
     124              :  * Ensure that the lookup table is up-to-date with the enums defined in
     125              :  * ReplicationSlotInvalidationCause.
     126              :  */
     127              : StaticAssertDecl(lengthof(SlotInvalidationCauses) == (RS_INVAL_MAX_CAUSES + 1),
     128              :                  "array length mismatch");
     129              : 
     130              : /* size of version independent data */
     131              : #define ReplicationSlotOnDiskConstantSize \
     132              :     offsetof(ReplicationSlotOnDisk, slotdata)
     133              : /* size of the part of the slot not covered by the checksum */
     134              : #define ReplicationSlotOnDiskNotChecksummedSize  \
     135              :     offsetof(ReplicationSlotOnDisk, version)
     136              : /* size of the part covered by the checksum */
     137              : #define ReplicationSlotOnDiskChecksummedSize \
     138              :     sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskNotChecksummedSize
     139              : /* size of the slot data that is version dependent */
     140              : #define ReplicationSlotOnDiskV2Size \
     141              :     sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
     142              : 
     143              : #define SLOT_MAGIC      0x1051CA1   /* format identifier */
     144              : #define SLOT_VERSION    5       /* version for new files */
     145              : 
     146              : /* Control array for replication slot management */
     147              : ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
     148              : 
     149              : static void ReplicationSlotsShmemRequest(void *arg);
     150              : static void ReplicationSlotsShmemInit(void *arg);
     151              : 
     152              : const ShmemCallbacks ReplicationSlotsShmemCallbacks = {
     153              :     .request_fn = ReplicationSlotsShmemRequest,
     154              :     .init_fn = ReplicationSlotsShmemInit,
     155              : };
     156              : 
     157              : /* My backend's replication slot in the shared memory array */
     158              : ReplicationSlot *MyReplicationSlot = NULL;
     159              : 
     160              : /* GUC variables */
     161              : int         max_replication_slots = 10; /* the maximum number of replication
     162              :                                          * slots */
     163              : int         max_repack_replication_slots = 5;   /* the maximum number of slots
     164              :                                                  * for REPACK */
     165              : 
     166              : /*
     167              :  * Invalidate replication slots that have remained idle longer than this
     168              :  * duration; '0' disables it.
     169              :  */
     170              : int         idle_replication_slot_timeout_secs = 0;
     171              : 
     172              : /*
     173              :  * This GUC lists streaming replication standby server slot names that
     174              :  * logical WAL sender processes will wait for.
     175              :  */
     176              : char       *synchronized_standby_slots;
     177              : 
     178              : /* This is the parsed and cached configuration for synchronized_standby_slots */
     179              : static SyncStandbySlotsConfigData *synchronized_standby_slots_config;
     180              : 
     181              : /*
     182              :  * Oldest LSN that has been confirmed to be flushed to the standbys
     183              :  * corresponding to the physical slots specified in the synchronized_standby_slots GUC.
     184              :  */
     185              : static XLogRecPtr ss_oldest_flush_lsn = InvalidXLogRecPtr;
     186              : 
     187              : static void ReplicationSlotShmemExit(int code, Datum arg);
     188              : static bool IsSlotForConflictCheck(const char *name);
     189              : static void ReplicationSlotDropPtr(ReplicationSlot *slot);
     190              : 
     191              : /* internal persistency functions */
     192              : static void RestoreSlotFromDisk(const char *name);
     193              : static void CreateSlotOnDisk(ReplicationSlot *slot);
     194              : static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel);
     195              : 
     196              : /*
     197              :  * Register shared memory space needed for replication slots.
     198              :  */
     199              : static void
     200         1239 : ReplicationSlotsShmemRequest(void *arg)
     201              : {
     202              :     Size        size;
     203              : 
     204         1239 :     if (max_replication_slots + max_repack_replication_slots == 0)
     205            0 :         return;
     206              : 
     207         1239 :     size = offsetof(ReplicationSlotCtlData, replication_slots);
     208         1239 :     size = add_size(size,
     209         1239 :                     mul_size(max_replication_slots + max_repack_replication_slots,
     210              :                              sizeof(ReplicationSlot)));
     211         1239 :     ShmemRequestStruct(.name = "ReplicationSlot Ctl",
     212              :                        .size = size,
     213              :                        .ptr = (void **) &ReplicationSlotCtl,
     214              :         );
     215              : }
     216              : 
     217              : /*
     218              :  * Initialize shared memory for replication slots.
     219              :  */
     220              : static void
     221         1236 : ReplicationSlotsShmemInit(void *arg)
     222              : {
     223        19574 :     for (int i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
     224              :     {
     225        18338 :         ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i];
     226              : 
     227              :         /* everything else is zeroed by the memset above */
     228        18338 :         slot->active_proc = INVALID_PROC_NUMBER;
     229        18338 :         SpinLockInit(&slot->mutex);
     230        18338 :         LWLockInitialize(&slot->io_in_progress_lock,
     231              :                          LWTRANCHE_REPLICATION_SLOT_IO);
     232        18338 :         ConditionVariableInit(&slot->active_cv);
     233              :     }
     234         1236 : }
     235              : 
     236              : /*
     237              :  * Register the callback for replication slot cleanup and releasing.
     238              :  */
     239              : void
     240        24222 : ReplicationSlotInitialize(void)
     241              : {
     242        24222 :     before_shmem_exit(ReplicationSlotShmemExit, 0);
     243        24222 : }
     244              : 
     245              : /*
     246              :  * Release and cleanup replication slots.
     247              :  */
     248              : static void
     249        24222 : ReplicationSlotShmemExit(int code, Datum arg)
     250              : {
     251              :     /* Make sure active replication slots are released */
     252        24222 :     if (MyReplicationSlot != NULL)
     253          309 :         ReplicationSlotRelease();
     254              : 
     255              :     /* Also cleanup all the temporary slots. */
     256        24222 :     ReplicationSlotCleanup(false);
     257        24222 : }
     258              : 
     259              : /*
     260              :  * Check whether the passed slot name is valid and report errors at elevel.
     261              :  *
     262              :  * See comments for ReplicationSlotValidateNameInternal().
     263              :  */
     264              : bool
     265          859 : ReplicationSlotValidateName(const char *name, bool allow_reserved_name,
     266              :                             int elevel)
     267              : {
     268              :     int         err_code;
     269          859 :     char       *err_msg = NULL;
     270          859 :     char       *err_hint = NULL;
     271              : 
     272          859 :     if (!ReplicationSlotValidateNameInternal(name, allow_reserved_name,
     273              :                                              &err_code, &err_msg, &err_hint))
     274              :     {
     275              :         /*
     276              :          * Use errmsg_internal() and errhint_internal() instead of errmsg()
     277              :          * and errhint(), since the messages from
     278              :          * ReplicationSlotValidateNameInternal() are already translated. This
     279              :          * avoids double translation.
     280              :          */
     281            5 :         ereport(elevel,
     282              :                 errcode(err_code),
     283              :                 errmsg_internal("%s", err_msg),
     284              :                 (err_hint != NULL) ? errhint_internal("%s", err_hint) : 0);
     285              : 
     286            0 :         pfree(err_msg);
     287            0 :         if (err_hint != NULL)
     288            0 :             pfree(err_hint);
     289            0 :         return false;
     290              :     }
     291              : 
     292          854 :     return true;
     293              : }
     294              : 
     295              : /*
     296              :  * Check whether the passed slot name is valid.
     297              :  *
     298              :  * An error will be reported for a reserved replication slot name if
     299              :  * allow_reserved_name is set to false.
     300              :  *
     301              :  * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow
     302              :  * the name to be used as a directory name on every supported OS.
     303              :  *
     304              :  * Returns true if the slot name is valid. Otherwise, returns false and stores
     305              :  * the error code, error message, and optional hint in err_code, err_msg, and
     306              :  * err_hint, respectively. The caller is responsible for freeing err_msg and
     307              :  * err_hint, which are palloc'd.
     308              :  */
     309              : bool
     310         1074 : ReplicationSlotValidateNameInternal(const char *name, bool allow_reserved_name,
     311              :                                     int *err_code, char **err_msg, char **err_hint)
     312              : {
     313              :     const char *cp;
     314              : 
     315         1074 :     if (strlen(name) == 0)
     316              :     {
     317            4 :         *err_code = ERRCODE_INVALID_NAME;
     318            4 :         *err_msg = psprintf(_("replication slot name \"%s\" is too short"), name);
     319            4 :         *err_hint = NULL;
     320            4 :         return false;
     321              :     }
     322              : 
     323         1070 :     if (strlen(name) >= NAMEDATALEN)
     324              :     {
     325            0 :         *err_code = ERRCODE_NAME_TOO_LONG;
     326            0 :         *err_msg = psprintf(_("replication slot name \"%s\" is too long"), name);
     327            0 :         *err_hint = NULL;
     328            0 :         return false;
     329              :     }
     330              : 
     331        20691 :     for (cp = name; *cp; cp++)
     332              :     {
     333        19623 :         if (!((*cp >= 'a' && *cp <= 'z')
     334         9375 :               || (*cp >= '0' && *cp <= '9')
     335         1911 :               || (*cp == '_')))
     336              :         {
     337            2 :             *err_code = ERRCODE_INVALID_NAME;
     338            2 :             *err_msg = psprintf(_("replication slot name \"%s\" contains invalid character"), name);
     339            2 :             *err_hint = psprintf(_("Replication slot names may only contain lower case letters, numbers, and the underscore character."));
     340            2 :             return false;
     341              :         }
     342              :     }
     343              : 
     344         1068 :     if (!allow_reserved_name && IsSlotForConflictCheck(name))
     345              :     {
     346            1 :         *err_code = ERRCODE_RESERVED_NAME;
     347            1 :         *err_msg = psprintf(_("replication slot name \"%s\" is reserved"), name);
     348            1 :         *err_hint = psprintf(_("The name \"%s\" is reserved for the conflict detection slot."),
     349              :                              CONFLICT_DETECTION_SLOT);
     350            1 :         return false;
     351              :     }
     352              : 
     353         1067 :     return true;
     354              : }
     355              : 
     356              : /*
     357              :  * Return true if the replication slot name is "pg_conflict_detection".
     358              :  */
     359              : static bool
     360         2354 : IsSlotForConflictCheck(const char *name)
     361              : {
     362         2354 :     return (strcmp(name, CONFLICT_DETECTION_SLOT) == 0);
     363              : }
     364              : 
     365              : /*
     366              :  * Create a new replication slot and mark it as used by this backend.
     367              :  *
     368              :  * name: Name of the slot
     369              :  * db_specific: logical decoding is db specific; if the slot is going to
     370              :  *     be used for that pass true, otherwise false.
     371              :  * two_phase: If enabled, allows decoding of prepared transactions.
     372              :  * repack: If true, use a slot from the pool for REPACK.
     373              :  * failover: If enabled, allows the slot to be synced to standbys so
     374              :  *     that logical replication can be resumed after failover.
     375              :  * synced: True if the slot is synchronized from the primary server.
     376              :  */
     377              : void
     378          714 : ReplicationSlotCreate(const char *name, bool db_specific,
     379              :                       ReplicationSlotPersistency persistency,
     380              :                       bool two_phase, bool repack, bool failover, bool synced)
     381              : {
     382          714 :     ReplicationSlot *slot = NULL;
     383              :     int         startpoint,
     384              :                 endpoint;
     385              : 
     386              :     Assert(MyReplicationSlot == NULL);
     387              : 
     388              :     /*
     389              :      * The logical launcher or pg_upgrade may create or migrate an internal
     390              :      * slot, so using a reserved name is allowed in these cases.
     391              :      */
     392          714 :     ReplicationSlotValidateName(name, IsBinaryUpgrade || IsLogicalLauncher(),
     393          714 :                                 ERROR);
     394              : 
     395          713 :     if (failover)
     396              :     {
     397              :         /*
     398              :          * Do not allow users to create the failover enabled slots on the
     399              :          * standby as we do not support sync to the cascading standby.
     400              :          *
     401              :          * However, failover enabled slots can be created during slot
     402              :          * synchronization because we need to retain the same values as the
     403              :          * remote slot.
     404              :          */
     405           31 :         if (RecoveryInProgress() && !IsSyncingReplicationSlots())
     406            0 :             ereport(ERROR,
     407              :                     errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     408              :                     errmsg("cannot enable failover for a replication slot created on the standby"));
     409              : 
     410              :         /*
     411              :          * Do not allow users to create failover enabled temporary slots,
     412              :          * because temporary slots will not be synced to the standby.
     413              :          *
     414              :          * However, failover enabled temporary slots can be created during
     415              :          * slot synchronization. See the comments atop slotsync.c for details.
     416              :          */
     417           31 :         if (persistency == RS_TEMPORARY && !IsSyncingReplicationSlots())
     418            1 :             ereport(ERROR,
     419              :                     errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     420              :                     errmsg("cannot enable failover for a temporary replication slot"));
     421              :     }
     422              : 
     423              :     /*
     424              :      * If some other backend ran this code concurrently with us, we'd likely
     425              :      * both allocate the same slot, and that would be bad.  We'd also be at
     426              :      * risk of missing a name collision.  Also, we don't want to try to create
     427              :      * a new slot while somebody's busy cleaning up an old one, because we
     428              :      * might both be monkeying with the same directory.
     429              :      */
     430          712 :     LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
     431              : 
     432              :     /*
     433              :      * Check for name collision (across the whole array), and identify an
     434              :      * allocatable slot (in the array slice specific to our current use case:
     435              :      * either general, or REPACK only).  We need to hold
     436              :      * ReplicationSlotControlLock in shared mode for this, so that nobody else
     437              :      * can change the in_use flags while we're looking at them.
     438              :      */
     439          712 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     440          712 :     startpoint = !repack ? 0 : max_replication_slots;
     441          712 :     endpoint = max_replication_slots + (repack ? max_repack_replication_slots : 0);
     442        10463 :     for (int i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
     443              :     {
     444         9754 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
     445              : 
     446         9754 :         if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
     447            3 :             ereport(ERROR,
     448              :                     (errcode(ERRCODE_DUPLICATE_OBJECT),
     449              :                      errmsg("replication slot \"%s\" already exists", name)));
     450              : 
     451         9751 :         if (i >= startpoint && i < endpoint &&
     452         6197 :             !s->in_use && slot == NULL)
     453          708 :             slot = s;
     454              :     }
     455          709 :     LWLockRelease(ReplicationSlotControlLock);
     456              : 
     457              :     /* If all slots are in use, we're out of luck. */
     458          709 :     if (slot == NULL)
     459            1 :         ereport(ERROR,
     460              :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     461              :                  errmsg("all replication slots are in use"),
     462              :                  errhint("Free one or increase \"%s\".",
     463              :                          repack ? "max_repack_replication_slots" : "max_replication_slots")));
     464              : 
     465              :     /*
     466              :      * Since this slot is not in use, nobody should be looking at any part of
     467              :      * it other than the in_use field unless they're trying to allocate it.
     468              :      * And since we hold ReplicationSlotAllocationLock, nobody except us can
     469              :      * be doing that.  So it's safe to initialize the slot.
     470              :      */
     471              :     Assert(!slot->in_use);
     472              :     Assert(slot->active_proc == INVALID_PROC_NUMBER);
     473              : 
     474              :     /* first initialize persistent data */
     475          708 :     memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData));
     476          708 :     namestrcpy(&slot->data.name, name);
     477          708 :     slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
     478          708 :     slot->data.persistency = persistency;
     479          708 :     slot->data.two_phase = two_phase;
     480          708 :     slot->data.two_phase_at = InvalidXLogRecPtr;
     481          708 :     slot->data.failover = failover;
     482          708 :     slot->data.synced = synced;
     483              : 
     484              :     /* and then data only present in shared memory */
     485          708 :     slot->just_dirtied = false;
     486          708 :     slot->dirty = false;
     487          708 :     slot->effective_xmin = InvalidTransactionId;
     488          708 :     slot->effective_catalog_xmin = InvalidTransactionId;
     489          708 :     slot->candidate_catalog_xmin = InvalidTransactionId;
     490          708 :     slot->candidate_xmin_lsn = InvalidXLogRecPtr;
     491          708 :     slot->candidate_restart_valid = InvalidXLogRecPtr;
     492          708 :     slot->candidate_restart_lsn = InvalidXLogRecPtr;
     493          708 :     slot->last_saved_confirmed_flush = InvalidXLogRecPtr;
     494          708 :     slot->last_saved_restart_lsn = InvalidXLogRecPtr;
     495          708 :     slot->inactive_since = 0;
     496          708 :     slot->slotsync_skip_reason = SS_SKIP_NONE;
     497              : 
     498              :     /*
     499              :      * Create the slot on disk.  We haven't actually marked the slot allocated
     500              :      * yet, so no special cleanup is required if this errors out.
     501              :      */
     502          708 :     CreateSlotOnDisk(slot);
     503              : 
     504              :     /*
     505              :      * We need to briefly prevent any other backend from iterating over the
     506              :      * slots while we flip the in_use flag. We also need to set the active
     507              :      * flag while holding the ControlLock as otherwise a concurrent
     508              :      * ReplicationSlotAcquire() could acquire the slot as well.
     509              :      */
     510          708 :     LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
     511              : 
     512          708 :     slot->in_use = true;
     513              : 
     514              :     /* We can now mark the slot active, and that makes it our slot. */
     515          708 :     SpinLockAcquire(&slot->mutex);
     516              :     Assert(slot->active_proc == INVALID_PROC_NUMBER);
     517          708 :     slot->active_proc = MyProcNumber;
     518          708 :     SpinLockRelease(&slot->mutex);
     519          708 :     MyReplicationSlot = slot;
     520              : 
     521          708 :     LWLockRelease(ReplicationSlotControlLock);
     522              : 
     523              :     /*
     524              :      * Create statistics entry for the new logical slot. We don't collect any
     525              :      * stats for physical slots, so no need to create an entry for the same.
     526              :      * See ReplicationSlotDropPtr for why we need to do this before releasing
     527              :      * ReplicationSlotAllocationLock.
     528              :      */
     529          708 :     if (SlotIsLogical(slot))
     530          511 :         pgstat_create_replslot(slot);
     531              : 
     532              :     /*
     533              :      * Now that the slot has been marked as in_use and active, it's safe to
     534              :      * let somebody else try to allocate a slot.
     535              :      */
     536          708 :     LWLockRelease(ReplicationSlotAllocationLock);
     537              : 
     538              :     /* Let everybody know we've modified this slot */
     539          708 :     ConditionVariableBroadcast(&slot->active_cv);
     540          708 : }
     541              : 
     542              : /*
     543              :  * Search for the named replication slot.
     544              :  *
     545              :  * Return the replication slot if found, otherwise NULL.
     546              :  */
     547              : ReplicationSlot *
     548         2161 : SearchNamedReplicationSlot(const char *name, bool need_lock)
     549              : {
     550              :     int         i;
     551         2161 :     ReplicationSlot *slot = NULL;
     552              : 
     553         2161 :     if (need_lock)
     554          655 :         LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     555              : 
     556        11189 :     for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
     557              :     {
     558        10652 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
     559              : 
     560        10652 :         if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
     561              :         {
     562         1624 :             slot = s;
     563         1624 :             break;
     564              :         }
     565              :     }
     566              : 
     567         2161 :     if (need_lock)
     568          655 :         LWLockRelease(ReplicationSlotControlLock);
     569              : 
     570         2161 :     return slot;
     571              : }
     572              : 
     573              : /*
     574              :  * Return the index of the replication slot in
     575              :  * ReplicationSlotCtl->replication_slots.
     576              :  *
     577              :  * This is mainly useful to have an efficient key for storing replication slot
     578              :  * stats.
     579              :  */
     580              : int
     581         8096 : ReplicationSlotIndex(ReplicationSlot *slot)
     582              : {
     583              :     Assert(slot >= ReplicationSlotCtl->replication_slots &&
     584              :            slot < ReplicationSlotCtl->replication_slots +
     585              :            (max_replication_slots + max_repack_replication_slots));
     586              : 
     587         8096 :     return slot - ReplicationSlotCtl->replication_slots;
     588              : }
     589              : 
     590              : /*
     591              :  * If the slot at 'index' is unused, return false. Otherwise 'name' is set to
     592              :  * the slot's name and true is returned.
     593              :  *
     594              :  * This likely is only useful for pgstat_replslot.c during shutdown, in other
     595              :  * cases there are obvious TOCTOU issues.
     596              :  */
     597              : bool
     598          111 : ReplicationSlotName(int index, Name name)
     599              : {
     600              :     ReplicationSlot *slot;
     601              :     bool        found;
     602              : 
     603          111 :     slot = &ReplicationSlotCtl->replication_slots[index];
     604              : 
     605              :     /*
     606              :      * Ensure that the slot cannot be dropped while we copy the name. Don't
     607              :      * need the spinlock as the name of an existing slot cannot change.
     608              :      */
     609          111 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     610          111 :     found = slot->in_use;
     611          111 :     if (slot->in_use)
     612          111 :         namestrcpy(name, NameStr(slot->data.name));
     613          111 :     LWLockRelease(ReplicationSlotControlLock);
     614              : 
     615          111 :     return found;
     616              : }
     617              : 
     618              : /*
     619              :  * Find a previously created slot and mark it as used by this process.
     620              :  *
     621              :  * An error is raised if nowait is true and the slot is currently in use. If
     622              :  * nowait is false, we sleep until the slot is released by the owning process.
     623              :  *
     624              :  * An error is raised if error_if_invalid is true and the slot is found to
     625              :  * be invalid. It should always be set to true, except when we are temporarily
     626              :  * acquiring the slot and don't intend to change it.
     627              :  */
     628              : void
     629         1422 : ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
     630              : {
     631              :     ReplicationSlot *s;
     632              :     ProcNumber  active_proc;
     633              :     int         active_pid;
     634              : 
     635              :     Assert(name != NULL);
     636              : 
     637         1422 : retry:
     638              :     Assert(MyReplicationSlot == NULL);
     639              : 
     640         1422 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     641              : 
     642              :     /* Check if the slot exists with the given name. */
     643         1422 :     s = SearchNamedReplicationSlot(name, false);
     644         1422 :     if (s == NULL || !s->in_use)
     645              :     {
     646            9 :         LWLockRelease(ReplicationSlotControlLock);
     647              : 
     648            9 :         ereport(ERROR,
     649              :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
     650              :                  errmsg("replication slot \"%s\" does not exist",
     651              :                         name)));
     652              :     }
     653              : 
     654              :     /*
     655              :      * Do not allow users to acquire the reserved slot. This scenario may
     656              :      * occur if the launcher that owns the slot has terminated unexpectedly
     657              :      * due to an error, and a backend process attempts to reuse the slot.
     658              :      */
     659         1413 :     if (!IsLogicalLauncher() && IsSlotForConflictCheck(name))
     660            0 :         ereport(ERROR,
     661              :                 errcode(ERRCODE_UNDEFINED_OBJECT),
     662              :                 errmsg("cannot acquire replication slot \"%s\"", name),
     663              :                 errdetail("The slot is reserved for conflict detection and can only be acquired by logical replication launcher."));
     664              : 
     665              :     /*
     666              :      * This is the slot we want; check if it's active under some other
     667              :      * process.  In single user mode, we don't need this check.
     668              :      */
     669         1413 :     if (IsUnderPostmaster)
     670              :     {
     671              :         /*
     672              :          * Get ready to sleep on the slot in case it is active.  (We may end
     673              :          * up not sleeping, but we don't want to do this while holding the
     674              :          * spinlock.)
     675              :          */
     676         1408 :         if (!nowait)
     677          296 :             ConditionVariablePrepareToSleep(&s->active_cv);
     678              : 
     679              :         /*
     680              :          * It is important to reset the inactive_since under spinlock here to
     681              :          * avoid race conditions with slot invalidation. See comments related
     682              :          * to inactive_since in InvalidatePossiblyObsoleteSlot.
     683              :          */
     684         1408 :         SpinLockAcquire(&s->mutex);
     685         1408 :         if (s->active_proc == INVALID_PROC_NUMBER)
     686         1256 :             s->active_proc = MyProcNumber;
     687         1408 :         active_proc = s->active_proc;
     688         1408 :         ReplicationSlotSetInactiveSince(s, 0, false);
     689         1408 :         SpinLockRelease(&s->mutex);
     690              :     }
     691              :     else
     692              :     {
     693            5 :         s->active_proc = active_proc = MyProcNumber;
     694            5 :         ReplicationSlotSetInactiveSince(s, 0, true);
     695              :     }
     696         1413 :     active_pid = GetPGProcByNumber(active_proc)->pid;
     697         1413 :     LWLockRelease(ReplicationSlotControlLock);
     698              : 
     699              :     /*
     700              :      * If we found the slot but it's already active in another process, we
     701              :      * wait until the owning process signals us that it's been released, or
     702              :      * error out.
     703              :      */
     704         1413 :     if (active_proc != MyProcNumber)
     705              :     {
     706            0 :         if (!nowait)
     707              :         {
     708              :             /* Wait here until we get signaled, and then restart */
     709            0 :             ConditionVariableSleep(&s->active_cv,
     710              :                                    WAIT_EVENT_REPLICATION_SLOT_DROP);
     711            0 :             ConditionVariableCancelSleep();
     712            0 :             goto retry;
     713              :         }
     714              : 
     715            0 :         ereport(ERROR,
     716              :                 (errcode(ERRCODE_OBJECT_IN_USE),
     717              :                  errmsg("replication slot \"%s\" is active for PID %d",
     718              :                         NameStr(s->data.name), active_pid)));
     719              :     }
     720         1413 :     else if (!nowait)
     721          296 :         ConditionVariableCancelSleep(); /* no sleep needed after all */
     722              : 
     723              :     /* We made this slot active, so it's ours now. */
     724         1413 :     MyReplicationSlot = s;
     725              : 
     726              :     /*
     727              :      * We need to check for invalidation after making the slot ours to avoid
     728              :      * the possible race condition with the checkpointer that can otherwise
     729              :      * invalidate the slot immediately after the check.
     730              :      */
     731         1413 :     if (error_if_invalid && s->data.invalidated != RS_INVAL_NONE)
     732            8 :         ereport(ERROR,
     733              :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     734              :                 errmsg("can no longer access replication slot \"%s\"",
     735              :                        NameStr(s->data.name)),
     736              :                 errdetail("This replication slot has been invalidated due to \"%s\".",
     737              :                           GetSlotInvalidationCauseName(s->data.invalidated)));
     738              : 
     739              :     /* Let everybody know we've modified this slot */
     740         1405 :     ConditionVariableBroadcast(&s->active_cv);
     741              : 
     742              :     /*
     743              :      * The call to pgstat_acquire_replslot() protects against stats for a
     744              :      * different slot, from before a restart or such, being present during
     745              :      * pgstat_report_replslot().
     746              :      */
     747         1405 :     if (SlotIsLogical(s))
     748         1180 :         pgstat_acquire_replslot(s);
     749              : 
     750              : 
     751         1405 :     if (am_walsender)
     752              :     {
     753          964 :         ereport(log_replication_commands ? LOG : DEBUG1,
     754              :                 SlotIsLogical(s)
     755              :                 ? errmsg("acquired logical replication slot \"%s\"",
     756              :                          NameStr(s->data.name))
     757              :                 : errmsg("acquired physical replication slot \"%s\"",
     758              :                          NameStr(s->data.name)));
     759              :     }
     760         1405 : }
     761              : 
     762              : /*
     763              :  * Release the replication slot that this backend considers to own.
     764              :  *
     765              :  * This or another backend can re-acquire the slot later.
     766              :  * Resources this slot requires will be preserved.
     767              :  */
     768              : void
     769         1695 : ReplicationSlotRelease(void)
     770              : {
     771         1695 :     ReplicationSlot *slot = MyReplicationSlot;
     772         1695 :     char       *slotname = NULL;    /* keep compiler quiet */
     773              :     bool        is_logical;
     774         1695 :     TimestampTz now = 0;
     775              : 
     776              :     Assert(slot != NULL && slot->active_proc != INVALID_PROC_NUMBER);
     777              : 
     778         1695 :     is_logical = SlotIsLogical(slot);
     779              : 
     780         1695 :     if (am_walsender)
     781         1185 :         slotname = pstrdup(NameStr(slot->data.name));
     782              : 
     783         1695 :     if (slot->data.persistency == RS_EPHEMERAL)
     784              :     {
     785              :         /*
     786              :          * Delete the slot. There is no !PANIC case where this is allowed to
     787              :          * fail, all that may happen is an incomplete cleanup of the on-disk
     788              :          * data.
     789              :          */
     790            6 :         ReplicationSlotDropAcquired();
     791              : 
     792              :         /*
     793              :          * Request to disable logical decoding, even though this slot may not
     794              :          * have been the last logical slot. The checkpointer will verify if
     795              :          * logical decoding should actually be disabled.
     796              :          */
     797            6 :         if (is_logical)
     798            6 :             RequestDisableLogicalDecoding();
     799              :     }
     800              : 
     801              :     /*
     802              :      * If slot needed to temporarily restrain both data and catalog xmin to
     803              :      * create the catalog snapshot, remove that temporary constraint.
     804              :      * Snapshots can only be exported while the initial snapshot is still
     805              :      * acquired.
     806              :      */
     807         1695 :     if (!TransactionIdIsValid(slot->data.xmin) &&
     808         1662 :         TransactionIdIsValid(slot->effective_xmin))
     809              :     {
     810          214 :         SpinLockAcquire(&slot->mutex);
     811          214 :         slot->effective_xmin = InvalidTransactionId;
     812          214 :         SpinLockRelease(&slot->mutex);
     813          214 :         ReplicationSlotsComputeRequiredXmin(false);
     814              :     }
     815              : 
     816              :     /*
     817              :      * Set the time since the slot has become inactive. We get the current
     818              :      * time beforehand to avoid system call while holding the spinlock.
     819              :      */
     820         1695 :     now = GetCurrentTimestamp();
     821              : 
     822         1695 :     if (slot->data.persistency == RS_PERSISTENT)
     823              :     {
     824              :         /*
     825              :          * Mark persistent slot inactive.  We're not freeing it, just
     826              :          * disconnecting, but wake up others that may be waiting for it.
     827              :          */
     828         1384 :         SpinLockAcquire(&slot->mutex);
     829         1384 :         slot->active_proc = INVALID_PROC_NUMBER;
     830         1384 :         ReplicationSlotSetInactiveSince(slot, now, false);
     831         1384 :         SpinLockRelease(&slot->mutex);
     832         1384 :         ConditionVariableBroadcast(&slot->active_cv);
     833              :     }
     834              :     else
     835          311 :         ReplicationSlotSetInactiveSince(slot, now, true);
     836              : 
     837         1695 :     MyReplicationSlot = NULL;
     838              : 
     839              :     /* might not have been set when we've been a plain slot */
     840         1695 :     LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
     841         1695 :     MyProc->statusFlags &= ~PROC_IN_LOGICAL_DECODING;
     842         1695 :     ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags;
     843         1695 :     LWLockRelease(ProcArrayLock);
     844              : 
     845         1695 :     if (am_walsender)
     846              :     {
     847         1185 :         ereport(log_replication_commands ? LOG : DEBUG1,
     848              :                 is_logical
     849              :                 ? errmsg("released logical replication slot \"%s\"",
     850              :                          slotname)
     851              :                 : errmsg("released physical replication slot \"%s\"",
     852              :                          slotname));
     853              : 
     854         1185 :         pfree(slotname);
     855              :     }
     856         1695 : }
     857              : 
     858              : /*
     859              :  * Cleanup temporary slots created in current session.
     860              :  *
     861              :  * Cleanup only synced temporary slots if 'synced_only' is true, else
     862              :  * cleanup all temporary slots.
     863              :  *
     864              :  * If it drops the last logical slot in the cluster, requests to disable
     865              :  * logical decoding.
     866              :  */
     867              : void
     868        55390 : ReplicationSlotCleanup(bool synced_only)
     869              : {
     870              :     int         i;
     871              :     bool        found_valid_logicalslot;
     872        55390 :     bool        dropped_logical = false;
     873              : 
     874              :     Assert(MyReplicationSlot == NULL);
     875              : 
     876        55543 : restart:
     877        55543 :     found_valid_logicalslot = false;
     878        55543 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     879       880625 :     for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
     880              :     {
     881       825235 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
     882              : 
     883       825235 :         if (!s->in_use)
     884       809916 :             continue;
     885              : 
     886        15319 :         SpinLockAcquire(&s->mutex);
     887              : 
     888        30638 :         found_valid_logicalslot |=
     889        15319 :             (SlotIsLogical(s) && s->data.invalidated == RS_INVAL_NONE);
     890              : 
     891        15319 :         if ((s->active_proc == MyProcNumber &&
     892          153 :              (!synced_only || s->data.synced)))
     893              :         {
     894              :             Assert(s->data.persistency == RS_TEMPORARY);
     895          153 :             SpinLockRelease(&s->mutex);
     896          153 :             LWLockRelease(ReplicationSlotControlLock);  /* avoid deadlock */
     897              : 
     898          153 :             if (SlotIsLogical(s))
     899           10 :                 dropped_logical = true;
     900              : 
     901          153 :             ReplicationSlotDropPtr(s);
     902              : 
     903          153 :             ConditionVariableBroadcast(&s->active_cv);
     904          153 :             goto restart;
     905              :         }
     906              :         else
     907        15166 :             SpinLockRelease(&s->mutex);
     908              :     }
     909              : 
     910        55390 :     LWLockRelease(ReplicationSlotControlLock);
     911              : 
     912        55390 :     if (dropped_logical && !found_valid_logicalslot)
     913            3 :         RequestDisableLogicalDecoding();
     914        55390 : }
     915              : 
     916              : /*
     917              :  * Permanently drop replication slot identified by the passed in name.
     918              :  */
     919              : void
     920          444 : ReplicationSlotDrop(const char *name, bool nowait)
     921              : {
     922              :     bool        is_logical;
     923              : 
     924              :     Assert(MyReplicationSlot == NULL);
     925              : 
     926          444 :     ReplicationSlotAcquire(name, nowait, false);
     927              : 
     928              :     /*
     929              :      * Do not allow users to drop the slots which are currently being synced
     930              :      * from the primary to the standby.
     931              :      */
     932          437 :     if (RecoveryInProgress() && MyReplicationSlot->data.synced)
     933            1 :         ereport(ERROR,
     934              :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     935              :                 errmsg("cannot drop replication slot \"%s\"", name),
     936              :                 errdetail("This replication slot is being synchronized from the primary server."));
     937              : 
     938          436 :     is_logical = SlotIsLogical(MyReplicationSlot);
     939              : 
     940          436 :     ReplicationSlotDropAcquired();
     941              : 
     942          436 :     if (is_logical)
     943          416 :         RequestDisableLogicalDecoding();
     944          436 : }
     945              : 
     946              : /*
     947              :  * Change the definition of the slot identified by the specified name.
     948              :  *
     949              :  * Altering the two_phase property of a slot requires caution on the
     950              :  * client-side. Enabling it at any random point during decoding has the
     951              :  * risk that transactions prepared before this change may be skipped by
     952              :  * the decoder, leading to missing prepare records on the client. So, we
     953              :  * enable it for subscription related slots only once the initial tablesync
     954              :  * is finished. See comments atop worker.c. Disabling it is safe only when
     955              :  * there are no pending prepared transaction, otherwise, the changes of
     956              :  * already prepared transactions can be replicated again along with their
     957              :  * corresponding commit leading to duplicate data or errors.
     958              :  */
     959              : void
     960            7 : ReplicationSlotAlter(const char *name, const bool *failover,
     961              :                      const bool *two_phase)
     962              : {
     963            7 :     bool        update_slot = false;
     964              : 
     965              :     Assert(MyReplicationSlot == NULL);
     966              :     Assert(failover || two_phase);
     967              : 
     968            7 :     ReplicationSlotAcquire(name, false, true);
     969              : 
     970            6 :     if (SlotIsPhysical(MyReplicationSlot))
     971            0 :         ereport(ERROR,
     972              :                 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     973              :                 errmsg("cannot use %s with a physical replication slot",
     974              :                        "ALTER_REPLICATION_SLOT"));
     975              : 
     976            6 :     if (RecoveryInProgress())
     977              :     {
     978              :         /*
     979              :          * Do not allow users to alter the slots which are currently being
     980              :          * synced from the primary to the standby.
     981              :          */
     982            1 :         if (MyReplicationSlot->data.synced)
     983            1 :             ereport(ERROR,
     984              :                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     985              :                     errmsg("cannot alter replication slot \"%s\"", name),
     986              :                     errdetail("This replication slot is being synchronized from the primary server."));
     987              : 
     988              :         /*
     989              :          * Do not allow users to enable failover on the standby as we do not
     990              :          * support sync to the cascading standby.
     991              :          */
     992            0 :         if (failover && *failover)
     993            0 :             ereport(ERROR,
     994              :                     errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     995              :                     errmsg("cannot enable failover for a replication slot"
     996              :                            " on the standby"));
     997              :     }
     998              : 
     999            5 :     if (failover)
    1000              :     {
    1001              :         /*
    1002              :          * Do not allow users to enable failover for temporary slots as we do
    1003              :          * not support syncing temporary slots to the standby.
    1004              :          */
    1005            4 :         if (*failover && MyReplicationSlot->data.persistency == RS_TEMPORARY)
    1006            0 :             ereport(ERROR,
    1007              :                     errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1008              :                     errmsg("cannot enable failover for a temporary replication slot"));
    1009              : 
    1010            4 :         if (MyReplicationSlot->data.failover != *failover)
    1011              :         {
    1012            4 :             SpinLockAcquire(&MyReplicationSlot->mutex);
    1013            4 :             MyReplicationSlot->data.failover = *failover;
    1014            4 :             SpinLockRelease(&MyReplicationSlot->mutex);
    1015              : 
    1016            4 :             update_slot = true;
    1017              :         }
    1018              :     }
    1019              : 
    1020            5 :     if (two_phase && MyReplicationSlot->data.two_phase != *two_phase)
    1021              :     {
    1022            1 :         SpinLockAcquire(&MyReplicationSlot->mutex);
    1023            1 :         MyReplicationSlot->data.two_phase = *two_phase;
    1024            1 :         SpinLockRelease(&MyReplicationSlot->mutex);
    1025              : 
    1026            1 :         update_slot = true;
    1027              :     }
    1028              : 
    1029            5 :     if (update_slot)
    1030              :     {
    1031            5 :         ReplicationSlotMarkDirty();
    1032            5 :         ReplicationSlotSave();
    1033              :     }
    1034              : 
    1035            5 :     ReplicationSlotRelease();
    1036            5 : }
    1037              : 
    1038              : /*
    1039              :  * Permanently drop the currently acquired replication slot.
    1040              :  */
    1041              : void
    1042          453 : ReplicationSlotDropAcquired(void)
    1043              : {
    1044          453 :     ReplicationSlot *slot = MyReplicationSlot;
    1045              : 
    1046              :     Assert(MyReplicationSlot != NULL);
    1047              : 
    1048              :     /* slot isn't acquired anymore */
    1049          453 :     MyReplicationSlot = NULL;
    1050              : 
    1051          453 :     ReplicationSlotDropPtr(slot);
    1052          453 : }
    1053              : 
    1054              : /*
    1055              :  * Permanently drop the replication slot which will be released by the point
    1056              :  * this function returns.
    1057              :  */
    1058              : static void
    1059          606 : ReplicationSlotDropPtr(ReplicationSlot *slot)
    1060              : {
    1061              :     char        path[MAXPGPATH];
    1062              :     char        tmppath[MAXPGPATH];
    1063              : 
    1064              :     /*
    1065              :      * If some other backend ran this code concurrently with us, we might try
    1066              :      * to delete a slot with a certain name while someone else was trying to
    1067              :      * create a slot with the same name.
    1068              :      */
    1069          606 :     LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
    1070              : 
    1071              :     /* Generate pathnames. */
    1072          606 :     sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(slot->data.name));
    1073          606 :     sprintf(tmppath, "%s/%s.tmp", PG_REPLSLOT_DIR, NameStr(slot->data.name));
    1074              : 
    1075              :     /*
    1076              :      * Rename the slot directory on disk, so that we'll no longer recognize
    1077              :      * this as a valid slot.  Note that if this fails, we've got to mark the
    1078              :      * slot inactive before bailing out.  If we're dropping an ephemeral or a
    1079              :      * temporary slot, we better never fail hard as the caller won't expect
    1080              :      * the slot to survive and this might get called during error handling.
    1081              :      */
    1082          606 :     if (rename(path, tmppath) == 0)
    1083              :     {
    1084              :         /*
    1085              :          * We need to fsync() the directory we just renamed and its parent to
    1086              :          * make sure that our changes are on disk in a crash-safe fashion.  If
    1087              :          * fsync() fails, we can't be sure whether the changes are on disk or
    1088              :          * not.  For now, we handle that by panicking;
    1089              :          * StartupReplicationSlots() will try to straighten it out after
    1090              :          * restart.
    1091              :          */
    1092          606 :         START_CRIT_SECTION();
    1093          606 :         fsync_fname(tmppath, true);
    1094          606 :         fsync_fname(PG_REPLSLOT_DIR, true);
    1095          606 :         END_CRIT_SECTION();
    1096              :     }
    1097              :     else
    1098              :     {
    1099            0 :         bool        fail_softly = slot->data.persistency != RS_PERSISTENT;
    1100              : 
    1101            0 :         SpinLockAcquire(&slot->mutex);
    1102            0 :         slot->active_proc = INVALID_PROC_NUMBER;
    1103            0 :         SpinLockRelease(&slot->mutex);
    1104              : 
    1105              :         /* wake up anyone waiting on this slot */
    1106            0 :         ConditionVariableBroadcast(&slot->active_cv);
    1107              : 
    1108            0 :         ereport(fail_softly ? WARNING : ERROR,
    1109              :                 (errcode_for_file_access(),
    1110              :                  errmsg("could not rename file \"%s\" to \"%s\": %m",
    1111              :                         path, tmppath)));
    1112              :     }
    1113              : 
    1114              :     /*
    1115              :      * The slot is definitely gone.  Lock out concurrent scans of the array
    1116              :      * long enough to kill it.  It's OK to clear the active PID here without
    1117              :      * grabbing the mutex because nobody else can be scanning the array here,
    1118              :      * and nobody can be attached to this slot and thus access it without
    1119              :      * scanning the array.
    1120              :      *
    1121              :      * Also wake up processes waiting for it.
    1122              :      */
    1123          606 :     LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
    1124          606 :     slot->active_proc = INVALID_PROC_NUMBER;
    1125          606 :     slot->in_use = false;
    1126          606 :     LWLockRelease(ReplicationSlotControlLock);
    1127          606 :     ConditionVariableBroadcast(&slot->active_cv);
    1128              : 
    1129              :     /*
    1130              :      * Slot is dead and doesn't prevent resource removal anymore, recompute
    1131              :      * limits.
    1132              :      */
    1133          606 :     ReplicationSlotsComputeRequiredXmin(false);
    1134          606 :     ReplicationSlotsComputeRequiredLSN();
    1135              : 
    1136              :     /*
    1137              :      * If removing the directory fails, the worst thing that will happen is
    1138              :      * that the user won't be able to create a new slot with the same name
    1139              :      * until the next server restart.  We warn about it, but that's all.
    1140              :      */
    1141          606 :     if (!rmtree(tmppath, true))
    1142            0 :         ereport(WARNING,
    1143              :                 (errmsg("could not remove directory \"%s\"", tmppath)));
    1144              : 
    1145              :     /*
    1146              :      * Drop the statistics entry for the replication slot.  Do this while
    1147              :      * holding ReplicationSlotAllocationLock so that we don't drop a
    1148              :      * statistics entry for another slot with the same name just created in
    1149              :      * another session.
    1150              :      */
    1151          606 :     if (SlotIsLogical(slot))
    1152          442 :         pgstat_drop_replslot(slot);
    1153              : 
    1154              :     /*
    1155              :      * We release this at the very end, so that nobody starts trying to create
    1156              :      * a slot while we're still cleaning up the detritus of the old one.
    1157              :      */
    1158          606 :     LWLockRelease(ReplicationSlotAllocationLock);
    1159          606 : }
    1160              : 
    1161              : /*
    1162              :  * Serialize the currently acquired slot's state from memory to disk, thereby
    1163              :  * guaranteeing the current state will survive a crash.
    1164              :  */
    1165              : void
    1166         1508 : ReplicationSlotSave(void)
    1167              : {
    1168              :     char        path[MAXPGPATH];
    1169              : 
    1170              :     Assert(MyReplicationSlot != NULL);
    1171              : 
    1172         1508 :     sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(MyReplicationSlot->data.name));
    1173         1508 :     SaveSlotToPath(MyReplicationSlot, path, ERROR);
    1174         1508 : }
    1175              : 
    1176              : /*
    1177              :  * Signal that it would be useful if the currently acquired slot would be
    1178              :  * flushed out to disk.
    1179              :  *
    1180              :  * Note that the actual flush to disk can be delayed for a long time, if
    1181              :  * required for correctness explicitly do a ReplicationSlotSave().
    1182              :  */
    1183              : void
    1184        41459 : ReplicationSlotMarkDirty(void)
    1185              : {
    1186        41459 :     ReplicationSlot *slot = MyReplicationSlot;
    1187              : 
    1188              :     Assert(MyReplicationSlot != NULL);
    1189              : 
    1190        41459 :     SpinLockAcquire(&slot->mutex);
    1191        41459 :     MyReplicationSlot->just_dirtied = true;
    1192        41459 :     MyReplicationSlot->dirty = true;
    1193        41459 :     SpinLockRelease(&slot->mutex);
    1194        41459 : }
    1195              : 
    1196              : /*
    1197              :  * Convert a slot that's marked as RS_EPHEMERAL or RS_TEMPORARY to a
    1198              :  * RS_PERSISTENT slot, guaranteeing it will be there after an eventual crash.
    1199              :  */
    1200              : void
    1201          492 : ReplicationSlotPersist(void)
    1202              : {
    1203          492 :     ReplicationSlot *slot = MyReplicationSlot;
    1204              : 
    1205              :     Assert(slot != NULL);
    1206              :     Assert(slot->data.persistency != RS_PERSISTENT);
    1207              : 
    1208          492 :     SpinLockAcquire(&slot->mutex);
    1209          492 :     slot->data.persistency = RS_PERSISTENT;
    1210          492 :     SpinLockRelease(&slot->mutex);
    1211              : 
    1212          492 :     ReplicationSlotMarkDirty();
    1213          492 :     ReplicationSlotSave();
    1214          492 : }
    1215              : 
    1216              : /*
    1217              :  * Compute the oldest xmin across all slots and store it in the ProcArray.
    1218              :  *
    1219              :  * If already_locked is true, both the ReplicationSlotControlLock and the
    1220              :  * ProcArrayLock have already been acquired exclusively. It is crucial that the
    1221              :  * caller first acquires the ReplicationSlotControlLock, followed by the
    1222              :  * ProcArrayLock, to prevent any undetectable deadlocks since this function
    1223              :  * acquires them in that order.
    1224              :  */
    1225              : void
    1226         2631 : ReplicationSlotsComputeRequiredXmin(bool already_locked)
    1227              : {
    1228              :     int         i;
    1229         2631 :     TransactionId agg_xmin = InvalidTransactionId;
    1230         2631 :     TransactionId agg_catalog_xmin = InvalidTransactionId;
    1231              : 
    1232              :     Assert(ReplicationSlotCtl != NULL);
    1233              :     Assert(!already_locked ||
    1234              :            (LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_EXCLUSIVE) &&
    1235              :             LWLockHeldByMeInMode(ProcArrayLock, LW_EXCLUSIVE)));
    1236              : 
    1237              :     /*
    1238              :      * Hold the ReplicationSlotControlLock until after updating the slot xmin
    1239              :      * values, so no backend updates the initial xmin for newly created slot
    1240              :      * concurrently. A shared lock is used here to minimize lock contention,
    1241              :      * especially when many slots exist and advancements occur frequently.
    1242              :      * This is safe since an exclusive lock is taken during initial slot xmin
    1243              :      * update in slot creation.
    1244              :      *
    1245              :      * One might think that we can hold the ProcArrayLock exclusively and
    1246              :      * update the slot xmin values, but it could increase lock contention on
    1247              :      * the ProcArrayLock, which is not great since this function can be called
    1248              :      * at non-negligible frequency.
    1249              :      *
    1250              :      * Concurrent invocation of this function may cause the computed slot xmin
    1251              :      * to regress. However, this is harmless because tuples prior to the most
    1252              :      * recent xmin are no longer useful once advancement occurs (see
    1253              :      * LogicalConfirmReceivedLocation where the slot's xmin value is flushed
    1254              :      * before updating the effective_xmin). Thus, such regression merely
    1255              :      * prevents VACUUM from prematurely removing tuples without causing the
    1256              :      * early deletion of required data.
    1257              :      */
    1258         2631 :     if (!already_locked)
    1259         2119 :         LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    1260              : 
    1261        40008 :     for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
    1262              :     {
    1263        37377 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
    1264              :         TransactionId effective_xmin;
    1265              :         TransactionId effective_catalog_xmin;
    1266              :         bool        invalidated;
    1267              : 
    1268        37377 :         if (!s->in_use)
    1269        34849 :             continue;
    1270              : 
    1271         2528 :         SpinLockAcquire(&s->mutex);
    1272         2528 :         effective_xmin = s->effective_xmin;
    1273         2528 :         effective_catalog_xmin = s->effective_catalog_xmin;
    1274         2528 :         invalidated = s->data.invalidated != RS_INVAL_NONE;
    1275         2528 :         SpinLockRelease(&s->mutex);
    1276              : 
    1277              :         /* invalidated slots need not apply */
    1278         2528 :         if (invalidated)
    1279           26 :             continue;
    1280              : 
    1281              :         /* check the data xmin */
    1282         2502 :         if (TransactionIdIsValid(effective_xmin) &&
    1283            7 :             (!TransactionIdIsValid(agg_xmin) ||
    1284            7 :              TransactionIdPrecedes(effective_xmin, agg_xmin)))
    1285          363 :             agg_xmin = effective_xmin;
    1286              : 
    1287              :         /* check the catalog xmin */
    1288         2502 :         if (TransactionIdIsValid(effective_catalog_xmin) &&
    1289         1038 :             (!TransactionIdIsValid(agg_catalog_xmin) ||
    1290         1038 :              TransactionIdPrecedes(effective_catalog_xmin, agg_catalog_xmin)))
    1291         1303 :             agg_catalog_xmin = effective_catalog_xmin;
    1292              :     }
    1293              : 
    1294         2631 :     ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
    1295              : 
    1296         2631 :     if (!already_locked)
    1297         2119 :         LWLockRelease(ReplicationSlotControlLock);
    1298         2631 : }
    1299              : 
    1300              : /*
    1301              :  * Compute the oldest restart LSN across all slots and inform xlog module.
    1302              :  *
    1303              :  * Note: while max_slot_wal_keep_size is theoretically relevant for this
    1304              :  * purpose, we don't try to account for that, because this module doesn't
    1305              :  * know what to compare against.
    1306              :  */
    1307              : void
    1308        42201 : ReplicationSlotsComputeRequiredLSN(void)
    1309              : {
    1310              :     int         i;
    1311        42201 :     XLogRecPtr  min_required = InvalidXLogRecPtr;
    1312              : 
    1313              :     Assert(ReplicationSlotCtl != NULL);
    1314              : 
    1315        42201 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    1316       671255 :     for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
    1317              :     {
    1318       629054 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
    1319              :         XLogRecPtr  restart_lsn;
    1320              :         XLogRecPtr  last_saved_restart_lsn;
    1321              :         bool        invalidated;
    1322              :         ReplicationSlotPersistency persistency;
    1323              : 
    1324       629054 :         if (!s->in_use)
    1325       586721 :             continue;
    1326              : 
    1327        42333 :         SpinLockAcquire(&s->mutex);
    1328        42333 :         persistency = s->data.persistency;
    1329        42333 :         restart_lsn = s->data.restart_lsn;
    1330        42333 :         invalidated = s->data.invalidated != RS_INVAL_NONE;
    1331        42333 :         last_saved_restart_lsn = s->last_saved_restart_lsn;
    1332        42333 :         SpinLockRelease(&s->mutex);
    1333              : 
    1334              :         /* invalidated slots need not apply */
    1335        42333 :         if (invalidated)
    1336           27 :             continue;
    1337              : 
    1338              :         /*
    1339              :          * For persistent slot use last_saved_restart_lsn to compute the
    1340              :          * oldest LSN for removal of WAL segments.  The segments between
    1341              :          * last_saved_restart_lsn and restart_lsn might be needed by a
    1342              :          * persistent slot in the case of database crash.  Non-persistent
    1343              :          * slots can't survive the database crash, so we don't care about
    1344              :          * last_saved_restart_lsn for them.
    1345              :          */
    1346        42306 :         if (persistency == RS_PERSISTENT)
    1347              :         {
    1348        41414 :             if (XLogRecPtrIsValid(last_saved_restart_lsn) &&
    1349              :                 restart_lsn > last_saved_restart_lsn)
    1350              :             {
    1351        38874 :                 restart_lsn = last_saved_restart_lsn;
    1352              :             }
    1353              :         }
    1354              : 
    1355        42306 :         if (XLogRecPtrIsValid(restart_lsn) &&
    1356         1261 :             (!XLogRecPtrIsValid(min_required) ||
    1357              :              restart_lsn < min_required))
    1358        41159 :             min_required = restart_lsn;
    1359              :     }
    1360        42201 :     LWLockRelease(ReplicationSlotControlLock);
    1361              : 
    1362        42201 :     XLogSetReplicationSlotMinimumLSN(min_required);
    1363        42201 : }
    1364              : 
    1365              : /*
    1366              :  * Compute the oldest WAL LSN required by *logical* decoding slots..
    1367              :  *
    1368              :  * Returns InvalidXLogRecPtr if logical decoding is disabled or no logical
    1369              :  * slots exist.
    1370              :  *
    1371              :  * NB: this returns a value >= ReplicationSlotsComputeRequiredLSN(), since it
    1372              :  * ignores physical replication slots.
    1373              :  *
    1374              :  * The results aren't required frequently, so we don't maintain a precomputed
    1375              :  * value like we do for ComputeRequiredLSN() and ComputeRequiredXmin().
    1376              :  */
    1377              : XLogRecPtr
    1378         3866 : ReplicationSlotsComputeLogicalRestartLSN(void)
    1379              : {
    1380         3866 :     XLogRecPtr  result = InvalidXLogRecPtr;
    1381              :     int         i;
    1382              : 
    1383         3866 :     if (max_replication_slots + max_repack_replication_slots <= 0)
    1384            0 :         return InvalidXLogRecPtr;
    1385              : 
    1386         3866 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    1387              : 
    1388        61306 :     for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
    1389              :     {
    1390              :         ReplicationSlot *s;
    1391              :         XLogRecPtr  restart_lsn;
    1392              :         XLogRecPtr  last_saved_restart_lsn;
    1393              :         bool        invalidated;
    1394              :         ReplicationSlotPersistency persistency;
    1395              : 
    1396        57440 :         s = &ReplicationSlotCtl->replication_slots[i];
    1397              : 
    1398              :         /* cannot change while ReplicationSlotCtlLock is held */
    1399        57440 :         if (!s->in_use)
    1400        56622 :             continue;
    1401              : 
    1402              :         /* we're only interested in logical slots */
    1403          818 :         if (!SlotIsLogical(s))
    1404          562 :             continue;
    1405              : 
    1406              :         /* read once, it's ok if it increases while we're checking */
    1407          256 :         SpinLockAcquire(&s->mutex);
    1408          256 :         persistency = s->data.persistency;
    1409          256 :         restart_lsn = s->data.restart_lsn;
    1410          256 :         invalidated = s->data.invalidated != RS_INVAL_NONE;
    1411          256 :         last_saved_restart_lsn = s->last_saved_restart_lsn;
    1412          256 :         SpinLockRelease(&s->mutex);
    1413              : 
    1414              :         /* invalidated slots need not apply */
    1415          256 :         if (invalidated)
    1416           10 :             continue;
    1417              : 
    1418              :         /*
    1419              :          * For persistent slot use last_saved_restart_lsn to compute the
    1420              :          * oldest LSN for removal of WAL segments.  The segments between
    1421              :          * last_saved_restart_lsn and restart_lsn might be needed by a
    1422              :          * persistent slot in the case of database crash.  Non-persistent
    1423              :          * slots can't survive the database crash, so we don't care about
    1424              :          * last_saved_restart_lsn for them.
    1425              :          */
    1426          246 :         if (persistency == RS_PERSISTENT)
    1427              :         {
    1428          244 :             if (XLogRecPtrIsValid(last_saved_restart_lsn) &&
    1429              :                 restart_lsn > last_saved_restart_lsn)
    1430              :             {
    1431            0 :                 restart_lsn = last_saved_restart_lsn;
    1432              :             }
    1433              :         }
    1434              : 
    1435          246 :         if (!XLogRecPtrIsValid(restart_lsn))
    1436            0 :             continue;
    1437              : 
    1438          246 :         if (!XLogRecPtrIsValid(result) ||
    1439              :             restart_lsn < result)
    1440          196 :             result = restart_lsn;
    1441              :     }
    1442              : 
    1443         3866 :     LWLockRelease(ReplicationSlotControlLock);
    1444              : 
    1445         3866 :     return result;
    1446              : }
    1447              : 
    1448              : /*
    1449              :  * ReplicationSlotsCountDBSlots -- count the number of slots that refer to the
    1450              :  * passed database oid.
    1451              :  *
    1452              :  * Returns true if there are any slots referencing the database. *nslots will
    1453              :  * be set to the absolute number of slots in the database, *nactive to ones
    1454              :  * currently active.
    1455              :  */
    1456              : bool
    1457           51 : ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
    1458              : {
    1459              :     int         i;
    1460              : 
    1461           51 :     *nslots = *nactive = 0;
    1462              : 
    1463           51 :     if (max_replication_slots + max_repack_replication_slots <= 0)
    1464            0 :         return false;
    1465              : 
    1466           51 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    1467          789 :     for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
    1468              :     {
    1469              :         ReplicationSlot *s;
    1470              : 
    1471          738 :         s = &ReplicationSlotCtl->replication_slots[i];
    1472              : 
    1473              :         /* cannot change while ReplicationSlotCtlLock is held */
    1474          738 :         if (!s->in_use)
    1475          717 :             continue;
    1476              : 
    1477              :         /* only logical slots are database specific, skip */
    1478           21 :         if (!SlotIsLogical(s))
    1479           10 :             continue;
    1480              : 
    1481              :         /* not our database, skip */
    1482           11 :         if (s->data.database != dboid)
    1483            8 :             continue;
    1484              : 
    1485              :         /* NB: intentionally counting invalidated slots */
    1486              : 
    1487              :         /* count slots with spinlock held */
    1488            3 :         SpinLockAcquire(&s->mutex);
    1489            3 :         (*nslots)++;
    1490            3 :         if (s->active_proc != INVALID_PROC_NUMBER)
    1491            1 :             (*nactive)++;
    1492            3 :         SpinLockRelease(&s->mutex);
    1493              :     }
    1494           51 :     LWLockRelease(ReplicationSlotControlLock);
    1495              : 
    1496           51 :     if (*nslots > 0)
    1497            3 :         return true;
    1498           48 :     return false;
    1499              : }
    1500              : 
    1501              : /*
    1502              :  * ReplicationSlotsDropDBSlots -- Drop all db-specific slots relating to the
    1503              :  * passed database oid. The caller should hold an exclusive lock on the
    1504              :  * pg_database oid for the database to prevent creation of new slots on the db
    1505              :  * or replay from existing slots.
    1506              :  *
    1507              :  * Another session that concurrently acquires an existing slot on the target DB
    1508              :  * (most likely to drop it) may cause this function to ERROR. If that happens
    1509              :  * it may have dropped some but not all slots.
    1510              :  *
    1511              :  * This routine isn't as efficient as it could be - but we don't drop
    1512              :  * databases often, especially databases with lots of slots.
    1513              :  *
    1514              :  * If it drops the last logical slot in the cluster, it requests to disable
    1515              :  * logical decoding.
    1516              :  */
    1517              : void
    1518           64 : ReplicationSlotsDropDBSlots(Oid dboid)
    1519              : {
    1520              :     int         i;
    1521              :     bool        found_valid_logicalslot;
    1522           64 :     bool        dropped = false;
    1523              : 
    1524           64 :     if (max_replication_slots + max_repack_replication_slots <= 0)
    1525            0 :         return;
    1526              : 
    1527           64 : restart:
    1528           69 :     found_valid_logicalslot = false;
    1529           69 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    1530          994 :     for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
    1531              :     {
    1532              :         ReplicationSlot *s;
    1533              :         char       *slotname;
    1534              :         ProcNumber  active_proc;
    1535              : 
    1536          930 :         s = &ReplicationSlotCtl->replication_slots[i];
    1537              : 
    1538              :         /* cannot change while ReplicationSlotCtlLock is held */
    1539          930 :         if (!s->in_use)
    1540          900 :             continue;
    1541              : 
    1542              :         /* only logical slots are database specific, skip */
    1543           30 :         if (!SlotIsLogical(s))
    1544           11 :             continue;
    1545              : 
    1546              :         /*
    1547              :          * Check logical slots on other databases too so we can disable
    1548              :          * logical decoding only if no slots in the cluster.
    1549              :          */
    1550           19 :         SpinLockAcquire(&s->mutex);
    1551           19 :         found_valid_logicalslot |= (s->data.invalidated == RS_INVAL_NONE);
    1552           19 :         SpinLockRelease(&s->mutex);
    1553              : 
    1554              :         /* not our database, skip */
    1555           19 :         if (s->data.database != dboid)
    1556           14 :             continue;
    1557              : 
    1558              :         /* NB: intentionally including invalidated slots to drop */
    1559              : 
    1560              :         /* acquire slot, so ReplicationSlotDropAcquired can be reused  */
    1561            5 :         SpinLockAcquire(&s->mutex);
    1562              :         /* can't change while ReplicationSlotControlLock is held */
    1563            5 :         slotname = NameStr(s->data.name);
    1564            5 :         active_proc = s->active_proc;
    1565            5 :         if (active_proc == INVALID_PROC_NUMBER)
    1566              :         {
    1567            5 :             MyReplicationSlot = s;
    1568            5 :             s->active_proc = MyProcNumber;
    1569              :         }
    1570            5 :         SpinLockRelease(&s->mutex);
    1571              : 
    1572              :         /*
    1573              :          * Even though we hold an exclusive lock on the database object a
    1574              :          * logical slot for that DB can still be active, e.g. if it's
    1575              :          * concurrently being dropped by a backend connected to another DB.
    1576              :          *
    1577              :          * That's fairly unlikely in practice, so we'll just bail out.
    1578              :          *
    1579              :          * The slot sync worker holds a shared lock on the database before
    1580              :          * operating on synced logical slots to avoid conflict with the drop
    1581              :          * happening here. The persistent synced slots are thus safe but there
    1582              :          * is a possibility that the slot sync worker has created a temporary
    1583              :          * slot (which stays active even on release) and we are trying to drop
    1584              :          * that here. In practice, the chances of hitting this scenario are
    1585              :          * less as during slot synchronization, the temporary slot is
    1586              :          * immediately converted to persistent and thus is safe due to the
    1587              :          * shared lock taken on the database. So, we'll just bail out in such
    1588              :          * a case.
    1589              :          *
    1590              :          * XXX: We can consider shutting down the slot sync worker before
    1591              :          * trying to drop synced temporary slots here.
    1592              :          */
    1593            5 :         if (active_proc != INVALID_PROC_NUMBER)
    1594            0 :             ereport(ERROR,
    1595              :                     (errcode(ERRCODE_OBJECT_IN_USE),
    1596              :                      errmsg("replication slot \"%s\" is active for PID %d",
    1597              :                             slotname, GetPGProcByNumber(active_proc)->pid)));
    1598              : 
    1599              :         /*
    1600              :          * To avoid duplicating ReplicationSlotDropAcquired() and to avoid
    1601              :          * holding ReplicationSlotControlLock over filesystem operations,
    1602              :          * release ReplicationSlotControlLock and use
    1603              :          * ReplicationSlotDropAcquired.
    1604              :          *
    1605              :          * As that means the set of slots could change, restart scan from the
    1606              :          * beginning each time we release the lock.
    1607              :          */
    1608            5 :         LWLockRelease(ReplicationSlotControlLock);
    1609            5 :         ReplicationSlotDropAcquired();
    1610            5 :         dropped = true;
    1611            5 :         goto restart;
    1612              :     }
    1613           64 :     LWLockRelease(ReplicationSlotControlLock);
    1614              : 
    1615           64 :     if (dropped && !found_valid_logicalslot)
    1616            0 :         RequestDisableLogicalDecoding();
    1617              : }
    1618              : 
    1619              : /*
    1620              :  * Returns true if there is at least one in-use valid logical replication slot.
    1621              :  */
    1622              : bool
    1623          489 : CheckLogicalSlotExists(void)
    1624              : {
    1625          489 :     bool        found = false;
    1626              : 
    1627          489 :     if (max_replication_slots + max_repack_replication_slots <= 0)
    1628            0 :         return false;
    1629              : 
    1630          489 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    1631         7700 :     for (int i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
    1632              :     {
    1633              :         ReplicationSlot *s;
    1634              :         bool        invalidated;
    1635              : 
    1636         7217 :         s = &ReplicationSlotCtl->replication_slots[i];
    1637              : 
    1638              :         /* cannot change while ReplicationSlotCtlLock is held */
    1639         7217 :         if (!s->in_use)
    1640         7189 :             continue;
    1641              : 
    1642           28 :         if (SlotIsPhysical(s))
    1643           19 :             continue;
    1644              : 
    1645            9 :         SpinLockAcquire(&s->mutex);
    1646            9 :         invalidated = s->data.invalidated != RS_INVAL_NONE;
    1647            9 :         SpinLockRelease(&s->mutex);
    1648              : 
    1649            9 :         if (invalidated)
    1650            3 :             continue;
    1651              : 
    1652            6 :         found = true;
    1653            6 :         break;
    1654              :     }
    1655          489 :     LWLockRelease(ReplicationSlotControlLock);
    1656              : 
    1657          489 :     return found;
    1658              : }
    1659              : 
    1660              : /*
    1661              :  * Check whether the server's configuration supports using replication
    1662              :  * slots.
    1663              :  */
    1664              : void
    1665         1906 : CheckSlotRequirements(bool repack)
    1666              : {
    1667              :     /*
    1668              :      * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
    1669              :      * needs the same check.
    1670              :      */
    1671              : 
    1672         1906 :     if (!repack && max_replication_slots == 0)
    1673            0 :         ereport(ERROR,
    1674              :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1675              :                 errmsg("replication slots can only be used if \"%s\" > 0",
    1676              :                        "max_replication_slots"));
    1677              : 
    1678         1906 :     if (repack && max_repack_replication_slots == 0)
    1679            0 :         ereport(ERROR,
    1680              :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1681              :                 errmsg("REPACK can only be used if \"%s\" > 0",
    1682              :                        "max_repack_replication_slots"));
    1683              : 
    1684         1906 :     if (wal_level < WAL_LEVEL_REPLICA)
    1685            0 :         ereport(ERROR,
    1686              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1687              :                  errmsg("replication slots can only be used if \"wal_level\" >= \"replica\"")));
    1688         1906 : }
    1689              : 
    1690              : /*
    1691              :  * Check whether the user has privilege to use replication slots.
    1692              :  */
    1693              : void
    1694          600 : CheckSlotPermissions(void)
    1695              : {
    1696          600 :     if (!has_rolreplication(GetUserId()))
    1697            5 :         ereport(ERROR,
    1698              :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    1699              :                  errmsg("permission denied to use replication slots"),
    1700              :                  errdetail("Only roles with the %s attribute may use replication slots.",
    1701              :                            "REPLICATION")));
    1702          595 : }
    1703              : 
    1704              : /*
    1705              :  * Reserve WAL for the currently active slot.
    1706              :  *
    1707              :  * Compute and set restart_lsn in a manner that's appropriate for the type of
    1708              :  * the slot and concurrency safe.
    1709              :  */
    1710              : void
    1711          655 : ReplicationSlotReserveWal(void)
    1712              : {
    1713          655 :     ReplicationSlot *slot = MyReplicationSlot;
    1714              :     XLogSegNo   segno;
    1715              :     XLogRecPtr  restart_lsn;
    1716              : 
    1717              :     Assert(slot != NULL);
    1718              :     Assert(!XLogRecPtrIsValid(slot->data.restart_lsn));
    1719              :     Assert(!XLogRecPtrIsValid(slot->last_saved_restart_lsn));
    1720              : 
    1721              :     /*
    1722              :      * The replication slot mechanism is used to prevent the removal of
    1723              :      * required WAL.
    1724              :      *
    1725              :      * Acquire an exclusive lock to prevent the checkpoint process from
    1726              :      * concurrently computing the minimum slot LSN (see
    1727              :      * CheckPointReplicationSlots). This ensures that the WAL reserved for
    1728              :      * replication cannot be removed during a checkpoint.
    1729              :      *
    1730              :      * The mechanism is reliable because if WAL reservation occurs first, the
    1731              :      * checkpoint must wait for the restart_lsn update before determining the
    1732              :      * minimum non-removable LSN. On the other hand, if the checkpoint happens
    1733              :      * first, subsequent WAL reservations will select positions at or beyond
    1734              :      * the redo pointer of that checkpoint.
    1735              :      */
    1736          655 :     LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
    1737              : 
    1738              :     /*
    1739              :      * For logical slots log a standby snapshot and start logical decoding at
    1740              :      * exactly that position. That allows the slot to start up more quickly.
    1741              :      * But on a standby we cannot do WAL writes, so just use the replay
    1742              :      * pointer; effectively, an attempt to create a logical slot on standby
    1743              :      * will cause it to wait for an xl_running_xact record to be logged
    1744              :      * independently on the primary, so that a snapshot can be built using the
    1745              :      * record.
    1746              :      *
    1747              :      * None of this is needed (or indeed helpful) for physical slots as
    1748              :      * they'll start replay at the last logged checkpoint anyway. Instead,
    1749              :      * return the location of the last redo LSN, where a base backup has to
    1750              :      * start replay at.
    1751              :      */
    1752          655 :     if (SlotIsPhysical(slot))
    1753          162 :         restart_lsn = GetRedoRecPtr();
    1754          493 :     else if (RecoveryInProgress())
    1755           26 :         restart_lsn = GetXLogReplayRecPtr(NULL);
    1756              :     else
    1757          467 :         restart_lsn = GetXLogInsertRecPtr();
    1758              : 
    1759          655 :     SpinLockAcquire(&slot->mutex);
    1760          655 :     slot->data.restart_lsn = restart_lsn;
    1761          655 :     SpinLockRelease(&slot->mutex);
    1762              : 
    1763              :     /* prevent WAL removal as fast as possible */
    1764          655 :     ReplicationSlotsComputeRequiredLSN();
    1765              : 
    1766              :     /* Checkpoint shouldn't remove the required WAL. */
    1767          655 :     XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size);
    1768          655 :     if (XLogGetLastRemovedSegno() >= segno)
    1769            0 :         elog(ERROR, "WAL required by replication slot %s has been removed concurrently",
    1770              :              NameStr(slot->data.name));
    1771              : 
    1772          655 :     LWLockRelease(ReplicationSlotAllocationLock);
    1773              : 
    1774          655 :     if (!RecoveryInProgress() && SlotIsLogical(slot))
    1775              :     {
    1776              :         XLogRecPtr  flushptr;
    1777              : 
    1778              :         /* make sure we have enough information to start */
    1779          467 :         flushptr = LogStandbySnapshot(InvalidOid);
    1780              : 
    1781              :         /* and make sure it's fsynced to disk */
    1782          467 :         XLogFlush(flushptr);
    1783              :     }
    1784          655 : }
    1785              : 
    1786              : /*
    1787              :  * Report that replication slot needs to be invalidated
    1788              :  */
    1789              : static void
    1790           23 : ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
    1791              :                        bool terminating,
    1792              :                        int pid,
    1793              :                        NameData slotname,
    1794              :                        XLogRecPtr restart_lsn,
    1795              :                        XLogRecPtr oldestLSN,
    1796              :                        TransactionId snapshotConflictHorizon,
    1797              :                        long slot_idle_seconds)
    1798              : {
    1799              :     StringInfoData err_detail;
    1800              :     StringInfoData err_hint;
    1801              : 
    1802           23 :     initStringInfo(&err_detail);
    1803           23 :     initStringInfo(&err_hint);
    1804              : 
    1805           23 :     switch (cause)
    1806              :     {
    1807            7 :         case RS_INVAL_WAL_REMOVED:
    1808              :             {
    1809            7 :                 uint64      ex = oldestLSN - restart_lsn;
    1810              : 
    1811            7 :                 appendStringInfo(&err_detail,
    1812            7 :                                  ngettext("The slot's restart_lsn %X/%08X exceeds the limit by %" PRIu64 " byte.",
    1813              :                                           "The slot's restart_lsn %X/%08X exceeds the limit by %" PRIu64 " bytes.",
    1814              :                                           ex),
    1815            7 :                                  LSN_FORMAT_ARGS(restart_lsn),
    1816              :                                  ex);
    1817              :                 /* translator: %s is a GUC variable name */
    1818            7 :                 appendStringInfo(&err_hint, _("You might need to increase \"%s\"."),
    1819              :                                  "max_slot_wal_keep_size");
    1820            7 :                 break;
    1821              :             }
    1822           12 :         case RS_INVAL_HORIZON:
    1823           12 :             appendStringInfo(&err_detail, _("The slot conflicted with xid horizon %u."),
    1824              :                              snapshotConflictHorizon);
    1825           12 :             break;
    1826              : 
    1827            4 :         case RS_INVAL_WAL_LEVEL:
    1828            4 :             appendStringInfoString(&err_detail, _("Logical decoding on standby requires the primary server to either set \"wal_level\" >= \"logical\" or have at least one logical slot when \"wal_level\" = \"replica\"."));
    1829            4 :             break;
    1830              : 
    1831            0 :         case RS_INVAL_IDLE_TIMEOUT:
    1832              :             {
    1833              :                 /* translator: %s is a GUC variable name */
    1834            0 :                 appendStringInfo(&err_detail, _("The slot's idle time of %lds exceeds the configured \"%s\" duration of %ds."),
    1835              :                                  slot_idle_seconds, "idle_replication_slot_timeout",
    1836              :                                  idle_replication_slot_timeout_secs);
    1837              :                 /* translator: %s is a GUC variable name */
    1838            0 :                 appendStringInfo(&err_hint, _("You might need to increase \"%s\"."),
    1839              :                                  "idle_replication_slot_timeout");
    1840            0 :                 break;
    1841              :             }
    1842              :         case RS_INVAL_NONE:
    1843              :             pg_unreachable();
    1844              :     }
    1845              : 
    1846           23 :     ereport(LOG,
    1847              :             terminating ?
    1848              :             errmsg("terminating process %d to release replication slot \"%s\"",
    1849              :                    pid, NameStr(slotname)) :
    1850              :             errmsg("invalidating obsolete replication slot \"%s\"",
    1851              :                    NameStr(slotname)),
    1852              :             errdetail_internal("%s", err_detail.data),
    1853              :             err_hint.len ? errhint("%s", err_hint.data) : 0);
    1854              : 
    1855           23 :     pfree(err_detail.data);
    1856           23 :     pfree(err_hint.data);
    1857           23 : }
    1858              : 
    1859              : /*
    1860              :  * Can we invalidate an idle replication slot?
    1861              :  *
    1862              :  * Idle timeout invalidation is allowed only when:
    1863              :  *
    1864              :  * 1. Idle timeout is set
    1865              :  * 2. Slot has reserved WAL
    1866              :  * 3. Slot is inactive
    1867              :  * 4. The slot is not being synced from the primary while the server is in
    1868              :  *    recovery. This is because synced slots are always considered to be
    1869              :  *    inactive because they don't perform logical decoding to produce changes.
    1870              :  */
    1871              : static inline bool
    1872          386 : CanInvalidateIdleSlot(ReplicationSlot *s)
    1873              : {
    1874          386 :     return (idle_replication_slot_timeout_secs != 0 &&
    1875            0 :             XLogRecPtrIsValid(s->data.restart_lsn) &&
    1876          386 :             s->inactive_since > 0 &&
    1877            0 :             !(RecoveryInProgress() && s->data.synced));
    1878              : }
    1879              : 
    1880              : /*
    1881              :  * DetermineSlotInvalidationCause - Determine the cause for which a slot
    1882              :  * becomes invalid among the given possible causes.
    1883              :  *
    1884              :  * This function sequentially checks all possible invalidation causes and
    1885              :  * returns the first one for which the slot is eligible for invalidation.
    1886              :  */
    1887              : static ReplicationSlotInvalidationCause
    1888          419 : DetermineSlotInvalidationCause(uint32 possible_causes, ReplicationSlot *s,
    1889              :                                XLogRecPtr oldestLSN, Oid dboid,
    1890              :                                TransactionId snapshotConflictHorizon,
    1891              :                                TimestampTz *inactive_since, TimestampTz now)
    1892              : {
    1893              :     Assert(possible_causes != RS_INVAL_NONE);
    1894              : 
    1895          419 :     if (possible_causes & RS_INVAL_WAL_REMOVED)
    1896              :     {
    1897          393 :         XLogRecPtr  restart_lsn = s->data.restart_lsn;
    1898              : 
    1899          393 :         if (XLogRecPtrIsValid(restart_lsn) &&
    1900              :             restart_lsn < oldestLSN)
    1901            7 :             return RS_INVAL_WAL_REMOVED;
    1902              :     }
    1903              : 
    1904          412 :     if (possible_causes & RS_INVAL_HORIZON)
    1905              :     {
    1906              :         /* invalid DB oid signals a shared relation */
    1907           22 :         if (SlotIsLogical(s) &&
    1908           17 :             (dboid == InvalidOid || dboid == s->data.database))
    1909              :         {
    1910           22 :             TransactionId effective_xmin = s->effective_xmin;
    1911           22 :             TransactionId catalog_effective_xmin = s->effective_catalog_xmin;
    1912              : 
    1913           22 :             if (TransactionIdIsValid(effective_xmin) &&
    1914            0 :                 TransactionIdPrecedesOrEquals(effective_xmin,
    1915              :                                               snapshotConflictHorizon))
    1916            0 :                 return RS_INVAL_HORIZON;
    1917           44 :             else if (TransactionIdIsValid(catalog_effective_xmin) &&
    1918           22 :                      TransactionIdPrecedesOrEquals(catalog_effective_xmin,
    1919              :                                                    snapshotConflictHorizon))
    1920           12 :                 return RS_INVAL_HORIZON;
    1921              :         }
    1922              :     }
    1923              : 
    1924          400 :     if (possible_causes & RS_INVAL_WAL_LEVEL)
    1925              :     {
    1926            4 :         if (SlotIsLogical(s))
    1927            4 :             return RS_INVAL_WAL_LEVEL;
    1928              :     }
    1929              : 
    1930          396 :     if (possible_causes & RS_INVAL_IDLE_TIMEOUT)
    1931              :     {
    1932              :         Assert(now > 0);
    1933              : 
    1934          386 :         if (CanInvalidateIdleSlot(s))
    1935              :         {
    1936              :             /*
    1937              :              * Simulate the invalidation due to idle_timeout to test the
    1938              :              * timeout behavior promptly, without waiting for it to trigger
    1939              :              * naturally.
    1940              :              */
    1941              : #ifdef USE_INJECTION_POINTS
    1942            0 :             if (IS_INJECTION_POINT_ATTACHED("slot-timeout-inval"))
    1943              :             {
    1944            0 :                 *inactive_since = 0;    /* since the beginning of time */
    1945            0 :                 return RS_INVAL_IDLE_TIMEOUT;
    1946              :             }
    1947              : #endif
    1948              : 
    1949              :             /*
    1950              :              * Check if the slot needs to be invalidated due to
    1951              :              * idle_replication_slot_timeout GUC.
    1952              :              */
    1953            0 :             if (TimestampDifferenceExceedsSeconds(s->inactive_since, now,
    1954              :                                                   idle_replication_slot_timeout_secs))
    1955              :             {
    1956            0 :                 *inactive_since = s->inactive_since;
    1957            0 :                 return RS_INVAL_IDLE_TIMEOUT;
    1958              :             }
    1959              :         }
    1960              :     }
    1961              : 
    1962          396 :     return RS_INVAL_NONE;
    1963              : }
    1964              : 
    1965              : /*
    1966              :  * Helper for InvalidateObsoleteReplicationSlots
    1967              :  *
    1968              :  * Acquires the given slot and mark it invalid, if necessary and possible.
    1969              :  *
    1970              :  * Returns true if the slot was invalidated.
    1971              :  *
    1972              :  * Set *released_lock_out if ReplicationSlotControlLock was released in the
    1973              :  * interim (and in that case we're not holding the lock at return, otherwise
    1974              :  * we are).
    1975              :  *
    1976              :  * This is inherently racy, because we release the LWLock
    1977              :  * for syscalls, so caller must restart if we return true.
    1978              :  */
    1979              : static bool
    1980          458 : InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
    1981              :                                ReplicationSlot *s,
    1982              :                                XLogRecPtr oldestLSN,
    1983              :                                Oid dboid, TransactionId snapshotConflictHorizon,
    1984              :                                bool *released_lock_out)
    1985              : {
    1986          458 :     int         last_signaled_pid = 0;
    1987          458 :     bool        released_lock = false;
    1988          458 :     bool        invalidated = false;
    1989          458 :     TimestampTz inactive_since = 0;
    1990              : 
    1991              :     for (;;)
    1992            7 :     {
    1993              :         XLogRecPtr  restart_lsn;
    1994              :         NameData    slotname;
    1995              :         ProcNumber  active_proc;
    1996          465 :         int         active_pid = 0;
    1997          465 :         ReplicationSlotInvalidationCause invalidation_cause = RS_INVAL_NONE;
    1998          465 :         TimestampTz now = 0;
    1999          465 :         long        slot_idle_secs = 0;
    2000              : 
    2001              :         Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
    2002              : 
    2003          465 :         if (!s->in_use)
    2004              :         {
    2005            0 :             if (released_lock)
    2006            0 :                 LWLockRelease(ReplicationSlotControlLock);
    2007            0 :             break;
    2008              :         }
    2009              : 
    2010          465 :         if (possible_causes & RS_INVAL_IDLE_TIMEOUT)
    2011              :         {
    2012              :             /*
    2013              :              * Assign the current time here to avoid system call overhead
    2014              :              * while holding the spinlock in subsequent code.
    2015              :              */
    2016          407 :             now = GetCurrentTimestamp();
    2017              :         }
    2018              : 
    2019              :         /*
    2020              :          * Check if the slot needs to be invalidated. If it needs to be
    2021              :          * invalidated, and is not currently acquired, acquire it and mark it
    2022              :          * as having been invalidated.  We do this with the spinlock held to
    2023              :          * avoid race conditions -- for example the restart_lsn could move
    2024              :          * forward, or the slot could be dropped.
    2025              :          */
    2026          465 :         SpinLockAcquire(&s->mutex);
    2027              : 
    2028          465 :         restart_lsn = s->data.restart_lsn;
    2029              : 
    2030              :         /* we do nothing if the slot is already invalid */
    2031          465 :         if (s->data.invalidated == RS_INVAL_NONE)
    2032          419 :             invalidation_cause = DetermineSlotInvalidationCause(possible_causes,
    2033              :                                                                 s, oldestLSN,
    2034              :                                                                 dboid,
    2035              :                                                                 snapshotConflictHorizon,
    2036              :                                                                 &inactive_since,
    2037              :                                                                 now);
    2038              : 
    2039              :         /* if there's no invalidation, we're done */
    2040          465 :         if (invalidation_cause == RS_INVAL_NONE)
    2041              :         {
    2042          442 :             SpinLockRelease(&s->mutex);
    2043          442 :             if (released_lock)
    2044            0 :                 LWLockRelease(ReplicationSlotControlLock);
    2045          442 :             break;
    2046              :         }
    2047              : 
    2048           23 :         slotname = s->data.name;
    2049           23 :         active_proc = s->active_proc;
    2050              : 
    2051              :         /*
    2052              :          * If the slot can be acquired, do so and mark it invalidated
    2053              :          * immediately.  Otherwise we'll signal the owning process, below, and
    2054              :          * retry.
    2055              :          *
    2056              :          * Note: Unlike other slot attributes, slot's inactive_since can't be
    2057              :          * changed until the acquired slot is released or the owning process
    2058              :          * is terminated. So, the inactive slot can only be invalidated
    2059              :          * immediately without being terminated.
    2060              :          */
    2061           23 :         if (active_proc == INVALID_PROC_NUMBER)
    2062              :         {
    2063           16 :             MyReplicationSlot = s;
    2064           16 :             s->active_proc = MyProcNumber;
    2065           16 :             s->data.invalidated = invalidation_cause;
    2066              : 
    2067              :             /*
    2068              :              * XXX: We should consider not overwriting restart_lsn and instead
    2069              :              * just rely on .invalidated.
    2070              :              */
    2071           16 :             if (invalidation_cause == RS_INVAL_WAL_REMOVED)
    2072              :             {
    2073            5 :                 s->data.restart_lsn = InvalidXLogRecPtr;
    2074            5 :                 s->last_saved_restart_lsn = InvalidXLogRecPtr;
    2075              :             }
    2076              : 
    2077              :             /* Let caller know */
    2078           16 :             invalidated = true;
    2079              :         }
    2080              :         else
    2081              :         {
    2082            7 :             active_pid = GetPGProcByNumber(active_proc)->pid;
    2083              :             Assert(active_pid != 0);
    2084              :         }
    2085              : 
    2086           23 :         SpinLockRelease(&s->mutex);
    2087              : 
    2088              :         /*
    2089              :          * Calculate the idle time duration of the slot if slot is marked
    2090              :          * invalidated with RS_INVAL_IDLE_TIMEOUT.
    2091              :          */
    2092           23 :         if (invalidation_cause == RS_INVAL_IDLE_TIMEOUT)
    2093              :         {
    2094              :             int         slot_idle_usecs;
    2095              : 
    2096            0 :             TimestampDifference(inactive_since, now, &slot_idle_secs,
    2097              :                                 &slot_idle_usecs);
    2098              :         }
    2099              : 
    2100           23 :         if (active_proc != INVALID_PROC_NUMBER)
    2101              :         {
    2102              :             /*
    2103              :              * Prepare the sleep on the slot's condition variable before
    2104              :              * releasing the lock, to close a possible race condition if the
    2105              :              * slot is released before the sleep below.
    2106              :              */
    2107            7 :             ConditionVariablePrepareToSleep(&s->active_cv);
    2108              : 
    2109            7 :             LWLockRelease(ReplicationSlotControlLock);
    2110            7 :             released_lock = true;
    2111              : 
    2112              :             /*
    2113              :              * Signal to terminate the process that owns the slot, if we
    2114              :              * haven't already signalled it.  (Avoidance of repeated
    2115              :              * signalling is the only reason for there to be a loop in this
    2116              :              * routine; otherwise we could rely on caller's restart loop.)
    2117              :              *
    2118              :              * There is the race condition that other process may own the slot
    2119              :              * after its current owner process is terminated and before this
    2120              :              * process owns it. To handle that, we signal only if the PID of
    2121              :              * the owning process has changed from the previous time. (This
    2122              :              * logic assumes that the same PID is not reused very quickly.)
    2123              :              */
    2124            7 :             if (last_signaled_pid != active_pid)
    2125              :             {
    2126            7 :                 ReportSlotInvalidation(invalidation_cause, true, active_pid,
    2127              :                                        slotname, restart_lsn,
    2128              :                                        oldestLSN, snapshotConflictHorizon,
    2129              :                                        slot_idle_secs);
    2130              : 
    2131            7 :                 if (MyBackendType == B_STARTUP)
    2132            5 :                     (void) SignalRecoveryConflict(GetPGProcByNumber(active_proc),
    2133              :                                                   active_pid,
    2134              :                                                   RECOVERY_CONFLICT_LOGICALSLOT);
    2135              :                 else
    2136            2 :                     (void) kill(active_pid, SIGTERM);
    2137              : 
    2138            7 :                 last_signaled_pid = active_pid;
    2139              :             }
    2140              : 
    2141              :             /* Wait until the slot is released. */
    2142            7 :             ConditionVariableSleep(&s->active_cv,
    2143              :                                    WAIT_EVENT_REPLICATION_SLOT_DROP);
    2144              : 
    2145              :             /*
    2146              :              * Re-acquire lock and start over; we expect to invalidate the
    2147              :              * slot next time (unless another process acquires the slot in the
    2148              :              * meantime).
    2149              :              *
    2150              :              * Note: It is possible for a slot to advance its restart_lsn or
    2151              :              * xmin values sufficiently between when we release the mutex and
    2152              :              * when we recheck, moving from a conflicting state to a non
    2153              :              * conflicting state.  This is intentional and safe: if the slot
    2154              :              * has caught up while we're busy here, the resources we were
    2155              :              * concerned about (WAL segments or tuples) have not yet been
    2156              :              * removed, and there's no reason to invalidate the slot.
    2157              :              */
    2158            7 :             LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    2159            7 :             continue;
    2160              :         }
    2161              :         else
    2162              :         {
    2163              :             /*
    2164              :              * We hold the slot now and have already invalidated it; flush it
    2165              :              * to ensure that state persists.
    2166              :              *
    2167              :              * Don't want to hold ReplicationSlotControlLock across file
    2168              :              * system operations, so release it now but be sure to tell caller
    2169              :              * to restart from scratch.
    2170              :              */
    2171           16 :             LWLockRelease(ReplicationSlotControlLock);
    2172           16 :             released_lock = true;
    2173              : 
    2174              :             /* Make sure the invalidated state persists across server restart */
    2175           16 :             ReplicationSlotMarkDirty();
    2176           16 :             ReplicationSlotSave();
    2177           16 :             ReplicationSlotRelease();
    2178              : 
    2179           16 :             ReportSlotInvalidation(invalidation_cause, false, active_pid,
    2180              :                                    slotname, restart_lsn,
    2181              :                                    oldestLSN, snapshotConflictHorizon,
    2182              :                                    slot_idle_secs);
    2183              : 
    2184              :             /* done with this slot for now */
    2185           16 :             break;
    2186              :         }
    2187              :     }
    2188              : 
    2189              :     Assert(released_lock == !LWLockHeldByMe(ReplicationSlotControlLock));
    2190              : 
    2191          458 :     *released_lock_out = released_lock;
    2192          458 :     return invalidated;
    2193              : }
    2194              : 
    2195              : /*
    2196              :  * Invalidate slots that require resources about to be removed.
    2197              :  *
    2198              :  * Returns true when any slot have got invalidated.
    2199              :  *
    2200              :  * Whether a slot needs to be invalidated depends on the invalidation cause.
    2201              :  * A slot is invalidated if it:
    2202              :  * - RS_INVAL_WAL_REMOVED: requires a LSN older than the given segment
    2203              :  * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given
    2204              :  *   db; dboid may be InvalidOid for shared relations
    2205              :  * - RS_INVAL_WAL_LEVEL: is a logical slot and effective_wal_level is not
    2206              :  *   logical.
    2207              :  * - RS_INVAL_IDLE_TIMEOUT: has been idle longer than the configured
    2208              :  *   "idle_replication_slot_timeout" duration.
    2209              :  *
    2210              :  * Note: This function attempts to invalidate the slot for multiple possible
    2211              :  * causes in a single pass, minimizing redundant iterations. The "cause"
    2212              :  * parameter can be a MASK representing one or more of the defined causes.
    2213              :  *
    2214              :  * If it invalidates the last logical slot in the cluster, it requests to
    2215              :  * disable logical decoding.
    2216              :  *
    2217              :  * NB - this runs as part of checkpoint, so avoid raising errors if possible.
    2218              :  */
    2219              : bool
    2220         1958 : InvalidateObsoleteReplicationSlots(uint32 possible_causes,
    2221              :                                    XLogSegNo oldestSegno, Oid dboid,
    2222              :                                    TransactionId snapshotConflictHorizon)
    2223              : {
    2224              :     XLogRecPtr  oldestLSN;
    2225         1958 :     bool        invalidated = false;
    2226         1958 :     bool        invalidated_logical = false;
    2227              :     bool        found_valid_logicalslot;
    2228              : 
    2229              :     Assert(!(possible_causes & RS_INVAL_HORIZON) || TransactionIdIsValid(snapshotConflictHorizon));
    2230              :     Assert(!(possible_causes & RS_INVAL_WAL_REMOVED) || oldestSegno > 0);
    2231              :     Assert(possible_causes != RS_INVAL_NONE);
    2232              : 
    2233         1958 :     if (max_replication_slots == 0 && max_repack_replication_slots == 0)
    2234            0 :         return invalidated;
    2235              : 
    2236         1958 :     XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
    2237              : 
    2238         1974 : restart:
    2239         1974 :     found_valid_logicalslot = false;
    2240         1974 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    2241        30989 :     for (int i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
    2242              :     {
    2243        29031 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
    2244        29031 :         bool        released_lock = false;
    2245              : 
    2246        29031 :         if (!s->in_use)
    2247        28573 :             continue;
    2248              : 
    2249              :         /* Prevent invalidation of logical slots during binary upgrade */
    2250          467 :         if (SlotIsLogical(s) && IsBinaryUpgrade)
    2251              :         {
    2252            9 :             SpinLockAcquire(&s->mutex);
    2253            9 :             found_valid_logicalslot |= (s->data.invalidated == RS_INVAL_NONE);
    2254            9 :             SpinLockRelease(&s->mutex);
    2255              : 
    2256            9 :             continue;
    2257              :         }
    2258              : 
    2259          458 :         if (InvalidatePossiblyObsoleteSlot(possible_causes, s, oldestLSN,
    2260              :                                            dboid, snapshotConflictHorizon,
    2261              :                                            &released_lock))
    2262              :         {
    2263              :             Assert(released_lock);
    2264              : 
    2265              :             /* Remember we have invalidated a physical or logical slot */
    2266           16 :             invalidated = true;
    2267              : 
    2268              :             /*
    2269              :              * Additionally, remember we have invalidated a logical slot as we
    2270              :              * can request disabling logical decoding later.
    2271              :              */
    2272           16 :             if (SlotIsLogical(s))
    2273           13 :                 invalidated_logical = true;
    2274              :         }
    2275              :         else
    2276              :         {
    2277              :             /*
    2278              :              * We need to check if the slot is invalidated here since
    2279              :              * InvalidatePossiblyObsoleteSlot() returns false also if the slot
    2280              :              * is already invalidated.
    2281              :              */
    2282          442 :             SpinLockAcquire(&s->mutex);
    2283          884 :             found_valid_logicalslot |=
    2284          442 :                 (SlotIsLogical(s) && (s->data.invalidated == RS_INVAL_NONE));
    2285          442 :             SpinLockRelease(&s->mutex);
    2286              :         }
    2287              : 
    2288              :         /* if the lock was released, start from scratch */
    2289          458 :         if (released_lock)
    2290           16 :             goto restart;
    2291              :     }
    2292         1958 :     LWLockRelease(ReplicationSlotControlLock);
    2293              : 
    2294              :     /*
    2295              :      * If any slots have been invalidated, recalculate the resource limits.
    2296              :      */
    2297         1958 :     if (invalidated)
    2298              :     {
    2299           11 :         ReplicationSlotsComputeRequiredXmin(false);
    2300           11 :         ReplicationSlotsComputeRequiredLSN();
    2301              :     }
    2302              : 
    2303              :     /*
    2304              :      * Request the checkpointer to disable logical decoding if no valid
    2305              :      * logical slots remain. If called by the checkpointer during a
    2306              :      * checkpoint, only the request is initiated; actual deactivation is
    2307              :      * deferred until after the checkpoint completes.
    2308              :      */
    2309         1958 :     if (invalidated_logical && !found_valid_logicalslot)
    2310            8 :         RequestDisableLogicalDecoding();
    2311              : 
    2312         1958 :     return invalidated;
    2313              : }
    2314              : 
    2315              : /*
    2316              :  * Flush all replication slots to disk.
    2317              :  *
    2318              :  * It is convenient to flush dirty replication slots at the time of checkpoint.
    2319              :  * Additionally, in case of a shutdown checkpoint, we also identify the slots
    2320              :  * for which the confirmed_flush LSN has been updated since the last time it
    2321              :  * was saved and flush them.
    2322              :  */
    2323              : void
    2324         1933 : CheckPointReplicationSlots(bool is_shutdown)
    2325              : {
    2326              :     int         i;
    2327         1933 :     bool        last_saved_restart_lsn_updated = false;
    2328              : 
    2329         1933 :     elog(DEBUG1, "performing replication slot checkpoint");
    2330              : 
    2331              :     /*
    2332              :      * Prevent any slot from being created/dropped while we're active. As we
    2333              :      * explicitly do *not* want to block iterating over replication_slots or
    2334              :      * acquiring a slot we cannot take the control lock - but that's OK,
    2335              :      * because holding ReplicationSlotAllocationLock is strictly stronger, and
    2336              :      * enough to guarantee that nobody can change the in_use bits on us.
    2337              :      *
    2338              :      * Additionally, acquiring the Allocation lock is necessary to serialize
    2339              :      * the slot flush process with concurrent slot WAL reservation. This
    2340              :      * ensures that the WAL position being reserved is either flushed to disk
    2341              :      * or is beyond or equal to the redo pointer of the current checkpoint
    2342              :      * (See ReplicationSlotReserveWal for details).
    2343              :      */
    2344         1933 :     LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
    2345              : 
    2346        30653 :     for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
    2347              :     {
    2348        28720 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
    2349              :         char        path[MAXPGPATH];
    2350              : 
    2351        28720 :         if (!s->in_use)
    2352        28311 :             continue;
    2353              : 
    2354              :         /* save the slot to disk, locking is handled in SaveSlotToPath() */
    2355          409 :         sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(s->data.name));
    2356              : 
    2357              :         /*
    2358              :          * Slot's data is not flushed each time the confirmed_flush LSN is
    2359              :          * updated as that could lead to frequent writes.  However, we decide
    2360              :          * to force a flush of all logical slot's data at the time of shutdown
    2361              :          * if the confirmed_flush LSN is changed since we last flushed it to
    2362              :          * disk.  This helps in avoiding an unnecessary retreat of the
    2363              :          * confirmed_flush LSN after restart.
    2364              :          */
    2365          409 :         if (is_shutdown && SlotIsLogical(s))
    2366              :         {
    2367           92 :             SpinLockAcquire(&s->mutex);
    2368              : 
    2369           92 :             if (s->data.invalidated == RS_INVAL_NONE &&
    2370           91 :                 s->data.confirmed_flush > s->last_saved_confirmed_flush)
    2371              :             {
    2372           41 :                 s->just_dirtied = true;
    2373           41 :                 s->dirty = true;
    2374              :             }
    2375           92 :             SpinLockRelease(&s->mutex);
    2376              :         }
    2377              : 
    2378              :         /*
    2379              :          * Track if we're going to update slot's last_saved_restart_lsn. We
    2380              :          * need this to know if we need to recompute the required LSN.
    2381              :          */
    2382          409 :         if (s->last_saved_restart_lsn != s->data.restart_lsn)
    2383          213 :             last_saved_restart_lsn_updated = true;
    2384              : 
    2385          409 :         SaveSlotToPath(s, path, LOG);
    2386              :     }
    2387         1933 :     LWLockRelease(ReplicationSlotAllocationLock);
    2388              : 
    2389              :     /*
    2390              :      * Recompute the required LSN if SaveSlotToPath() updated
    2391              :      * last_saved_restart_lsn for any slot.
    2392              :      */
    2393         1933 :     if (last_saved_restart_lsn_updated)
    2394          213 :         ReplicationSlotsComputeRequiredLSN();
    2395         1933 : }
    2396              : 
    2397              : /*
    2398              :  * Load all replication slots from disk into memory at server startup. This
    2399              :  * needs to be run before we start crash recovery.
    2400              :  */
    2401              : void
    2402         1077 : StartupReplicationSlots(void)
    2403              : {
    2404              :     DIR        *replication_dir;
    2405              :     struct dirent *replication_de;
    2406              : 
    2407         1077 :     elog(DEBUG1, "starting up replication slots");
    2408              : 
    2409              :     /* restore all slots by iterating over all on-disk entries */
    2410         1077 :     replication_dir = AllocateDir(PG_REPLSLOT_DIR);
    2411         3348 :     while ((replication_de = ReadDir(replication_dir, PG_REPLSLOT_DIR)) != NULL)
    2412              :     {
    2413              :         char        path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
    2414              :         PGFileType  de_type;
    2415              : 
    2416         2273 :         if (strcmp(replication_de->d_name, ".") == 0 ||
    2417         1198 :             strcmp(replication_de->d_name, "..") == 0)
    2418         2150 :             continue;
    2419              : 
    2420          123 :         snprintf(path, sizeof(path), "%s/%s", PG_REPLSLOT_DIR, replication_de->d_name);
    2421          123 :         de_type = get_dirent_type(path, replication_de, false, DEBUG1);
    2422              : 
    2423              :         /* we're only creating directories here, skip if it's not our's */
    2424          123 :         if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_DIR)
    2425            0 :             continue;
    2426              : 
    2427              :         /* we crashed while a slot was being setup or deleted, clean up */
    2428          123 :         if (pg_str_endswith(replication_de->d_name, ".tmp"))
    2429              :         {
    2430            0 :             if (!rmtree(path, true))
    2431              :             {
    2432            0 :                 ereport(WARNING,
    2433              :                         (errmsg("could not remove directory \"%s\"",
    2434              :                                 path)));
    2435            0 :                 continue;
    2436              :             }
    2437            0 :             fsync_fname(PG_REPLSLOT_DIR, true);
    2438            0 :             continue;
    2439              :         }
    2440              : 
    2441              :         /* looks like a slot in a normal state, restore */
    2442          123 :         RestoreSlotFromDisk(replication_de->d_name);
    2443              :     }
    2444         1075 :     FreeDir(replication_dir);
    2445              : 
    2446              :     /* currently no slots exist, we're done. */
    2447         1075 :     if (max_replication_slots + max_repack_replication_slots <= 0)
    2448            0 :         return;
    2449              : 
    2450              :     /* Now that we have recovered all the data, compute replication xmin */
    2451         1075 :     ReplicationSlotsComputeRequiredXmin(false);
    2452         1075 :     ReplicationSlotsComputeRequiredLSN();
    2453              : }
    2454              : 
    2455              : /* ----
    2456              :  * Manipulation of on-disk state of replication slots
    2457              :  *
    2458              :  * NB: none of the routines below should take any notice whether a slot is the
    2459              :  * current one or not, that's all handled a layer above.
    2460              :  * ----
    2461              :  */
    2462              : static void
    2463          708 : CreateSlotOnDisk(ReplicationSlot *slot)
    2464              : {
    2465              :     char        tmppath[MAXPGPATH];
    2466              :     char        path[MAXPGPATH];
    2467              :     struct stat st;
    2468              : 
    2469              :     /*
    2470              :      * No need to take out the io_in_progress_lock, nobody else can see this
    2471              :      * slot yet, so nobody else will write. We're reusing SaveSlotToPath which
    2472              :      * takes out the lock, if we'd take the lock here, we'd deadlock.
    2473              :      */
    2474              : 
    2475          708 :     sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(slot->data.name));
    2476          708 :     sprintf(tmppath, "%s/%s.tmp", PG_REPLSLOT_DIR, NameStr(slot->data.name));
    2477              : 
    2478              :     /*
    2479              :      * It's just barely possible that some previous effort to create or drop a
    2480              :      * slot with this name left a temp directory lying around. If that seems
    2481              :      * to be the case, try to remove it.  If the rmtree() fails, we'll error
    2482              :      * out at the MakePGDirectory() below, so we don't bother checking
    2483              :      * success.
    2484              :      */
    2485          708 :     if (stat(tmppath, &st) == 0 && S_ISDIR(st.st_mode))
    2486            0 :         rmtree(tmppath, true);
    2487              : 
    2488              :     /* Create and fsync the temporary slot directory. */
    2489          708 :     if (MakePGDirectory(tmppath) < 0)
    2490            0 :         ereport(ERROR,
    2491              :                 (errcode_for_file_access(),
    2492              :                  errmsg("could not create directory \"%s\": %m",
    2493              :                         tmppath)));
    2494          708 :     fsync_fname(tmppath, true);
    2495              : 
    2496              :     /* Write the actual state file. */
    2497          708 :     slot->dirty = true;          /* signal that we really need to write */
    2498          708 :     SaveSlotToPath(slot, tmppath, ERROR);
    2499              : 
    2500              :     /* Rename the directory into place. */
    2501          708 :     if (rename(tmppath, path) != 0)
    2502            0 :         ereport(ERROR,
    2503              :                 (errcode_for_file_access(),
    2504              :                  errmsg("could not rename file \"%s\" to \"%s\": %m",
    2505              :                         tmppath, path)));
    2506              : 
    2507              :     /*
    2508              :      * If we'd now fail - really unlikely - we wouldn't know whether this slot
    2509              :      * would persist after an OS crash or not - so, force a restart. The
    2510              :      * restart would try to fsync this again till it works.
    2511              :      */
    2512          708 :     START_CRIT_SECTION();
    2513              : 
    2514          708 :     fsync_fname(path, true);
    2515          708 :     fsync_fname(PG_REPLSLOT_DIR, true);
    2516              : 
    2517          708 :     END_CRIT_SECTION();
    2518          708 : }
    2519              : 
    2520              : /*
    2521              :  * Shared functionality between saving and creating a replication slot.
    2522              :  */
    2523              : static void
    2524         2625 : SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
    2525              : {
    2526              :     char        tmppath[MAXPGPATH];
    2527              :     char        path[MAXPGPATH];
    2528              :     int         fd;
    2529              :     ReplicationSlotOnDisk cp;
    2530              :     bool        was_dirty;
    2531              : 
    2532              :     /* first check whether there's something to write out */
    2533         2625 :     SpinLockAcquire(&slot->mutex);
    2534         2625 :     was_dirty = slot->dirty;
    2535         2625 :     slot->just_dirtied = false;
    2536         2625 :     SpinLockRelease(&slot->mutex);
    2537              : 
    2538              :     /* and don't do anything if there's nothing to write */
    2539         2625 :     if (!was_dirty)
    2540          144 :         return;
    2541              : 
    2542         2481 :     LWLockAcquire(&slot->io_in_progress_lock, LW_EXCLUSIVE);
    2543              : 
    2544              :     /* silence valgrind :( */
    2545         2481 :     memset(&cp, 0, sizeof(ReplicationSlotOnDisk));
    2546              : 
    2547         2481 :     sprintf(tmppath, "%s/state.tmp", dir);
    2548         2481 :     sprintf(path, "%s/state", dir);
    2549              : 
    2550         2481 :     fd = OpenTransientFile(tmppath, O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
    2551         2481 :     if (fd < 0)
    2552              :     {
    2553              :         /*
    2554              :          * If not an ERROR, then release the lock before returning.  In case
    2555              :          * of an ERROR, the error recovery path automatically releases the
    2556              :          * lock, but no harm in explicitly releasing even in that case.  Note
    2557              :          * that LWLockRelease() could affect errno.
    2558              :          */
    2559            0 :         int         save_errno = errno;
    2560              : 
    2561            0 :         LWLockRelease(&slot->io_in_progress_lock);
    2562            0 :         errno = save_errno;
    2563            0 :         ereport(elevel,
    2564              :                 (errcode_for_file_access(),
    2565              :                  errmsg("could not create file \"%s\": %m",
    2566              :                         tmppath)));
    2567            0 :         return;
    2568              :     }
    2569              : 
    2570         2481 :     cp.magic = SLOT_MAGIC;
    2571         2481 :     INIT_CRC32C(cp.checksum);
    2572         2481 :     cp.version = SLOT_VERSION;
    2573         2481 :     cp.length = ReplicationSlotOnDiskV2Size;
    2574              : 
    2575         2481 :     SpinLockAcquire(&slot->mutex);
    2576              : 
    2577         2481 :     memcpy(&cp.slotdata, &slot->data, sizeof(ReplicationSlotPersistentData));
    2578              : 
    2579         2481 :     SpinLockRelease(&slot->mutex);
    2580              : 
    2581         2481 :     COMP_CRC32C(cp.checksum,
    2582              :                 (char *) (&cp) + ReplicationSlotOnDiskNotChecksummedSize,
    2583              :                 ReplicationSlotOnDiskChecksummedSize);
    2584         2481 :     FIN_CRC32C(cp.checksum);
    2585              : 
    2586         2481 :     errno = 0;
    2587         2481 :     pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_WRITE);
    2588         2481 :     if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
    2589              :     {
    2590            0 :         int         save_errno = errno;
    2591              : 
    2592            0 :         pgstat_report_wait_end();
    2593            0 :         CloseTransientFile(fd);
    2594            0 :         unlink(tmppath);
    2595            0 :         LWLockRelease(&slot->io_in_progress_lock);
    2596              : 
    2597              :         /* if write didn't set errno, assume problem is no disk space */
    2598            0 :         errno = save_errno ? save_errno : ENOSPC;
    2599            0 :         ereport(elevel,
    2600              :                 (errcode_for_file_access(),
    2601              :                  errmsg("could not write to file \"%s\": %m",
    2602              :                         tmppath)));
    2603            0 :         return;
    2604              :     }
    2605         2481 :     pgstat_report_wait_end();
    2606              : 
    2607              :     /* fsync the temporary file */
    2608         2481 :     pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_SYNC);
    2609         2481 :     if (pg_fsync(fd) != 0)
    2610              :     {
    2611            0 :         int         save_errno = errno;
    2612              : 
    2613            0 :         pgstat_report_wait_end();
    2614            0 :         CloseTransientFile(fd);
    2615            0 :         unlink(tmppath);
    2616            0 :         LWLockRelease(&slot->io_in_progress_lock);
    2617              : 
    2618            0 :         errno = save_errno;
    2619            0 :         ereport(elevel,
    2620              :                 (errcode_for_file_access(),
    2621              :                  errmsg("could not fsync file \"%s\": %m",
    2622              :                         tmppath)));
    2623            0 :         return;
    2624              :     }
    2625         2481 :     pgstat_report_wait_end();
    2626              : 
    2627         2481 :     if (CloseTransientFile(fd) != 0)
    2628              :     {
    2629            0 :         int         save_errno = errno;
    2630              : 
    2631            0 :         unlink(tmppath);
    2632            0 :         LWLockRelease(&slot->io_in_progress_lock);
    2633              : 
    2634            0 :         errno = save_errno;
    2635            0 :         ereport(elevel,
    2636              :                 (errcode_for_file_access(),
    2637              :                  errmsg("could not close file \"%s\": %m",
    2638              :                         tmppath)));
    2639            0 :         return;
    2640              :     }
    2641              : 
    2642              :     /* rename to permanent file, fsync file and directory */
    2643         2481 :     if (rename(tmppath, path) != 0)
    2644              :     {
    2645            0 :         int         save_errno = errno;
    2646              : 
    2647            0 :         unlink(tmppath);
    2648            0 :         LWLockRelease(&slot->io_in_progress_lock);
    2649              : 
    2650            0 :         errno = save_errno;
    2651            0 :         ereport(elevel,
    2652              :                 (errcode_for_file_access(),
    2653              :                  errmsg("could not rename file \"%s\" to \"%s\": %m",
    2654              :                         tmppath, path)));
    2655            0 :         return;
    2656              :     }
    2657              : 
    2658              :     /*
    2659              :      * Check CreateSlotOnDisk() for the reasoning of using a critical section.
    2660              :      */
    2661         2481 :     START_CRIT_SECTION();
    2662              : 
    2663         2481 :     fsync_fname(path, false);
    2664         2481 :     fsync_fname(dir, true);
    2665         2481 :     fsync_fname(PG_REPLSLOT_DIR, true);
    2666              : 
    2667         2481 :     END_CRIT_SECTION();
    2668              : 
    2669              :     /*
    2670              :      * Successfully wrote, unset dirty bit, unless somebody dirtied again
    2671              :      * already and remember the confirmed_flush LSN value.
    2672              :      */
    2673         2481 :     SpinLockAcquire(&slot->mutex);
    2674         2481 :     if (!slot->just_dirtied)
    2675         2461 :         slot->dirty = false;
    2676         2481 :     slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
    2677         2481 :     slot->last_saved_restart_lsn = cp.slotdata.restart_lsn;
    2678         2481 :     SpinLockRelease(&slot->mutex);
    2679              : 
    2680         2481 :     LWLockRelease(&slot->io_in_progress_lock);
    2681              : }
    2682              : 
    2683              : /*
    2684              :  * Load a single slot from disk into memory.
    2685              :  */
    2686              : static void
    2687          123 : RestoreSlotFromDisk(const char *name)
    2688              : {
    2689              :     ReplicationSlotOnDisk cp;
    2690              :     int         i;
    2691              :     char        slotdir[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
    2692              :     char        path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR) + 10];
    2693              :     int         fd;
    2694          123 :     bool        restored = false;
    2695              :     int         readBytes;
    2696              :     pg_crc32c   checksum;
    2697          123 :     TimestampTz now = 0;
    2698              : 
    2699              :     /* no need to lock here, no concurrent access allowed yet */
    2700              : 
    2701              :     /* delete temp file if it exists */
    2702          123 :     sprintf(slotdir, "%s/%s", PG_REPLSLOT_DIR, name);
    2703          123 :     sprintf(path, "%s/state.tmp", slotdir);
    2704          123 :     if (unlink(path) < 0 && errno != ENOENT)
    2705            0 :         ereport(PANIC,
    2706              :                 (errcode_for_file_access(),
    2707              :                  errmsg("could not remove file \"%s\": %m", path)));
    2708              : 
    2709          123 :     sprintf(path, "%s/state", slotdir);
    2710              : 
    2711          123 :     elog(DEBUG1, "restoring replication slot from \"%s\"", path);
    2712              : 
    2713              :     /* on some operating systems fsyncing a file requires O_RDWR */
    2714          123 :     fd = OpenTransientFile(path, O_RDWR | PG_BINARY);
    2715              : 
    2716              :     /*
    2717              :      * We do not need to handle this as we are rename()ing the directory into
    2718              :      * place only after we fsync()ed the state file.
    2719              :      */
    2720          123 :     if (fd < 0)
    2721            0 :         ereport(PANIC,
    2722              :                 (errcode_for_file_access(),
    2723              :                  errmsg("could not open file \"%s\": %m", path)));
    2724              : 
    2725              :     /*
    2726              :      * Sync state file before we're reading from it. We might have crashed
    2727              :      * while it wasn't synced yet and we shouldn't continue on that basis.
    2728              :      */
    2729          123 :     pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_RESTORE_SYNC);
    2730          123 :     if (pg_fsync(fd) != 0)
    2731            0 :         ereport(PANIC,
    2732              :                 (errcode_for_file_access(),
    2733              :                  errmsg("could not fsync file \"%s\": %m",
    2734              :                         path)));
    2735          123 :     pgstat_report_wait_end();
    2736              : 
    2737              :     /* Also sync the parent directory */
    2738          123 :     START_CRIT_SECTION();
    2739          123 :     fsync_fname(slotdir, true);
    2740          123 :     END_CRIT_SECTION();
    2741              : 
    2742              :     /* read part of statefile that's guaranteed to be version independent */
    2743          123 :     pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
    2744          123 :     readBytes = read(fd, &cp, ReplicationSlotOnDiskConstantSize);
    2745          123 :     pgstat_report_wait_end();
    2746          123 :     if (readBytes != ReplicationSlotOnDiskConstantSize)
    2747              :     {
    2748            0 :         if (readBytes < 0)
    2749            0 :             ereport(PANIC,
    2750              :                     (errcode_for_file_access(),
    2751              :                      errmsg("could not read file \"%s\": %m", path)));
    2752              :         else
    2753            0 :             ereport(PANIC,
    2754              :                     (errcode(ERRCODE_DATA_CORRUPTED),
    2755              :                      errmsg("could not read file \"%s\": read %d of %zu",
    2756              :                             path, readBytes,
    2757              :                             (Size) ReplicationSlotOnDiskConstantSize)));
    2758              :     }
    2759              : 
    2760              :     /* verify magic */
    2761          123 :     if (cp.magic != SLOT_MAGIC)
    2762            0 :         ereport(PANIC,
    2763              :                 (errcode(ERRCODE_DATA_CORRUPTED),
    2764              :                  errmsg("replication slot file \"%s\" has wrong magic number: %u instead of %u",
    2765              :                         path, cp.magic, SLOT_MAGIC)));
    2766              : 
    2767              :     /* verify version */
    2768          123 :     if (cp.version != SLOT_VERSION)
    2769            0 :         ereport(PANIC,
    2770              :                 (errcode(ERRCODE_DATA_CORRUPTED),
    2771              :                  errmsg("replication slot file \"%s\" has unsupported version %u",
    2772              :                         path, cp.version)));
    2773              : 
    2774              :     /* boundary check on length */
    2775          123 :     if (cp.length != ReplicationSlotOnDiskV2Size)
    2776            0 :         ereport(PANIC,
    2777              :                 (errcode(ERRCODE_DATA_CORRUPTED),
    2778              :                  errmsg("replication slot file \"%s\" has corrupted length %u",
    2779              :                         path, cp.length)));
    2780              : 
    2781              :     /* Now that we know the size, read the entire file */
    2782          123 :     pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
    2783          246 :     readBytes = read(fd,
    2784              :                      (char *) &cp + ReplicationSlotOnDiskConstantSize,
    2785          123 :                      cp.length);
    2786          123 :     pgstat_report_wait_end();
    2787          123 :     if (readBytes != cp.length)
    2788              :     {
    2789            0 :         if (readBytes < 0)
    2790            0 :             ereport(PANIC,
    2791              :                     (errcode_for_file_access(),
    2792              :                      errmsg("could not read file \"%s\": %m", path)));
    2793              :         else
    2794            0 :             ereport(PANIC,
    2795              :                     (errcode(ERRCODE_DATA_CORRUPTED),
    2796              :                      errmsg("could not read file \"%s\": read %d of %zu",
    2797              :                             path, readBytes, (Size) cp.length)));
    2798              :     }
    2799              : 
    2800          123 :     if (CloseTransientFile(fd) != 0)
    2801            0 :         ereport(PANIC,
    2802              :                 (errcode_for_file_access(),
    2803              :                  errmsg("could not close file \"%s\": %m", path)));
    2804              : 
    2805              :     /* now verify the CRC */
    2806          123 :     INIT_CRC32C(checksum);
    2807          123 :     COMP_CRC32C(checksum,
    2808              :                 (char *) &cp + ReplicationSlotOnDiskNotChecksummedSize,
    2809              :                 ReplicationSlotOnDiskChecksummedSize);
    2810          123 :     FIN_CRC32C(checksum);
    2811              : 
    2812          123 :     if (!EQ_CRC32C(checksum, cp.checksum))
    2813            0 :         ereport(PANIC,
    2814              :                 (errmsg("checksum mismatch for replication slot file \"%s\": is %u, should be %u",
    2815              :                         path, checksum, cp.checksum)));
    2816              : 
    2817              :     /*
    2818              :      * If we crashed with an ephemeral slot active, don't restore but delete
    2819              :      * it.
    2820              :      */
    2821          123 :     if (cp.slotdata.persistency != RS_PERSISTENT)
    2822              :     {
    2823            0 :         if (!rmtree(slotdir, true))
    2824              :         {
    2825            0 :             ereport(WARNING,
    2826              :                     (errmsg("could not remove directory \"%s\"",
    2827              :                             slotdir)));
    2828              :         }
    2829            0 :         fsync_fname(PG_REPLSLOT_DIR, true);
    2830            0 :         return;
    2831              :     }
    2832              : 
    2833              :     /*
    2834              :      * Verify that requirements for the specific slot type are met. That's
    2835              :      * important because if these aren't met we're not guaranteed to retain
    2836              :      * all the necessary resources for the slot.
    2837              :      *
    2838              :      * NB: We have to do so *after* the above checks for ephemeral slots,
    2839              :      * because otherwise a slot that shouldn't exist anymore could prevent
    2840              :      * restarts.
    2841              :      *
    2842              :      * NB: Changing the requirements here also requires adapting
    2843              :      * CheckSlotRequirements() and CheckLogicalDecodingRequirements().
    2844              :      */
    2845          123 :     if (cp.slotdata.database != InvalidOid)
    2846              :     {
    2847           85 :         if (wal_level < WAL_LEVEL_REPLICA)
    2848            1 :             ereport(FATAL,
    2849              :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2850              :                      errmsg("logical replication slot \"%s\" exists, but \"wal_level\" < \"replica\"",
    2851              :                             NameStr(cp.slotdata.name)),
    2852              :                      errhint("Change \"wal_level\" to be \"replica\" or higher.")));
    2853              : 
    2854              :         /*
    2855              :          * In standby mode, the hot standby must be enabled. This check is
    2856              :          * necessary to ensure logical slots are invalidated when they become
    2857              :          * incompatible due to insufficient wal_level. Otherwise, if the
    2858              :          * primary reduces effective_wal_level < logical while hot standby is
    2859              :          * disabled, primary disable logical decoding while hot standby is
    2860              :          * disabled, logical slots would remain valid even after promotion.
    2861              :          */
    2862           84 :         if (StandbyMode && !EnableHotStandby)
    2863            1 :             ereport(FATAL,
    2864              :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2865              :                      errmsg("logical replication slot \"%s\" exists on the standby, but \"hot_standby\" = \"off\"",
    2866              :                             NameStr(cp.slotdata.name)),
    2867              :                      errhint("Change \"hot_standby\" to be \"on\".")));
    2868              :     }
    2869           38 :     else if (wal_level < WAL_LEVEL_REPLICA)
    2870            0 :         ereport(FATAL,
    2871              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2872              :                  errmsg("physical replication slot \"%s\" exists, but \"wal_level\" < \"replica\"",
    2873              :                         NameStr(cp.slotdata.name)),
    2874              :                  errhint("Change \"wal_level\" to be \"replica\" or higher.")));
    2875              : 
    2876              :     /*
    2877              :      * Nothing can be active yet, don't lock anything.  Note we iterate up to
    2878              :      * max_replication_slots instead of adding max_repack_replication_slots as
    2879              :      * in all other places, because we must enforce the GUC value in case
    2880              :      * there were more slots before the shutdown than what it is set up to
    2881              :      * now.
    2882              :      */
    2883          178 :     for (i = 0; i < max_replication_slots; i++)
    2884              :     {
    2885              :         ReplicationSlot *slot;
    2886              : 
    2887          178 :         slot = &ReplicationSlotCtl->replication_slots[i];
    2888              : 
    2889          178 :         if (slot->in_use)
    2890           57 :             continue;
    2891              : 
    2892              :         /* restore the entire set of persistent data */
    2893          121 :         memcpy(&slot->data, &cp.slotdata,
    2894              :                sizeof(ReplicationSlotPersistentData));
    2895              : 
    2896              :         /* initialize in memory state */
    2897          121 :         slot->effective_xmin = cp.slotdata.xmin;
    2898          121 :         slot->effective_catalog_xmin = cp.slotdata.catalog_xmin;
    2899          121 :         slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
    2900          121 :         slot->last_saved_restart_lsn = cp.slotdata.restart_lsn;
    2901              : 
    2902          121 :         slot->candidate_catalog_xmin = InvalidTransactionId;
    2903          121 :         slot->candidate_xmin_lsn = InvalidXLogRecPtr;
    2904          121 :         slot->candidate_restart_lsn = InvalidXLogRecPtr;
    2905          121 :         slot->candidate_restart_valid = InvalidXLogRecPtr;
    2906              : 
    2907          121 :         slot->in_use = true;
    2908          121 :         slot->active_proc = INVALID_PROC_NUMBER;
    2909              : 
    2910              :         /*
    2911              :          * Set the time since the slot has become inactive after loading the
    2912              :          * slot from the disk into memory. Whoever acquires the slot i.e.
    2913              :          * makes the slot active will reset it. Use the same inactive_since
    2914              :          * time for all the slots.
    2915              :          */
    2916          121 :         if (now == 0)
    2917          121 :             now = GetCurrentTimestamp();
    2918              : 
    2919          121 :         ReplicationSlotSetInactiveSince(slot, now, false);
    2920              : 
    2921          121 :         restored = true;
    2922          121 :         break;
    2923              :     }
    2924              : 
    2925          121 :     if (!restored)
    2926            0 :         ereport(FATAL,
    2927              :                 (errmsg("too many replication slots active before shutdown"),
    2928              :                  errhint("Increase \"max_replication_slots\" and try again.")));
    2929              : }
    2930              : 
    2931              : /*
    2932              :  * Maps an invalidation reason for a replication slot to
    2933              :  * ReplicationSlotInvalidationCause.
    2934              :  */
    2935              : ReplicationSlotInvalidationCause
    2936            0 : GetSlotInvalidationCause(const char *cause_name)
    2937              : {
    2938              :     Assert(cause_name);
    2939              : 
    2940              :     /* Search lookup table for the cause having this name */
    2941            0 :     for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
    2942              :     {
    2943            0 :         if (strcmp(SlotInvalidationCauses[i].cause_name, cause_name) == 0)
    2944            0 :             return SlotInvalidationCauses[i].cause;
    2945              :     }
    2946              : 
    2947              :     Assert(false);
    2948            0 :     return RS_INVAL_NONE;       /* to keep compiler quiet */
    2949              : }
    2950              : 
    2951              : /*
    2952              :  * Maps a ReplicationSlotInvalidationCause to the invalidation
    2953              :  * reason for a replication slot.
    2954              :  */
    2955              : const char *
    2956           45 : GetSlotInvalidationCauseName(ReplicationSlotInvalidationCause cause)
    2957              : {
    2958              :     /* Search lookup table for the name of this cause */
    2959          146 :     for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
    2960              :     {
    2961          146 :         if (SlotInvalidationCauses[i].cause == cause)
    2962           45 :             return SlotInvalidationCauses[i].cause_name;
    2963              :     }
    2964              : 
    2965              :     Assert(false);
    2966            0 :     return "none";                /* to keep compiler quiet */
    2967              : }
    2968              : 
    2969              : /*
    2970              :  * A helper function to validate slots specified in GUC synchronized_standby_slots.
    2971              :  *
    2972              :  * The rawname will be parsed, and the result will be saved into *elemlist.
    2973              :  */
    2974              : static bool
    2975           28 : validate_sync_standby_slots(char *rawname, List **elemlist)
    2976              : {
    2977              :     /* Verify syntax and parse string into a list of identifiers */
    2978           28 :     if (!SplitIdentifierString(rawname, ',', elemlist))
    2979              :     {
    2980            0 :         GUC_check_errdetail("List syntax is invalid.");
    2981            0 :         return false;
    2982              :     }
    2983              : 
    2984              :     /* Iterate the list to validate each slot name */
    2985           80 :     foreach_ptr(char, name, *elemlist)
    2986              :     {
    2987              :         int         err_code;
    2988           28 :         char       *err_msg = NULL;
    2989           28 :         char       *err_hint = NULL;
    2990              : 
    2991           28 :         if (!ReplicationSlotValidateNameInternal(name, false, &err_code,
    2992              :                                                  &err_msg, &err_hint))
    2993              :         {
    2994            2 :             GUC_check_errcode(err_code);
    2995            2 :             GUC_check_errdetail("%s", err_msg);
    2996            2 :             if (err_hint != NULL)
    2997            2 :                 GUC_check_errhint("%s", err_hint);
    2998            2 :             return false;
    2999              :         }
    3000              :     }
    3001              : 
    3002           26 :     return true;
    3003              : }
    3004              : 
    3005              : /*
    3006              :  * GUC check_hook for synchronized_standby_slots
    3007              :  */
    3008              : bool
    3009         1321 : check_synchronized_standby_slots(char **newval, void **extra, GucSource source)
    3010              : {
    3011              :     char       *rawname;
    3012              :     char       *ptr;
    3013              :     List       *elemlist;
    3014              :     int         size;
    3015              :     bool        ok;
    3016              :     SyncStandbySlotsConfigData *config;
    3017              : 
    3018         1321 :     if ((*newval)[0] == '\0')
    3019         1293 :         return true;
    3020              : 
    3021              :     /* Need a modifiable copy of the GUC string */
    3022           28 :     rawname = pstrdup(*newval);
    3023              : 
    3024              :     /* Now verify if the specified slots exist and have correct type */
    3025           28 :     ok = validate_sync_standby_slots(rawname, &elemlist);
    3026              : 
    3027           28 :     if (!ok || elemlist == NIL)
    3028              :     {
    3029            2 :         pfree(rawname);
    3030            2 :         list_free(elemlist);
    3031            2 :         return ok;
    3032              :     }
    3033              : 
    3034              :     /* Compute the size required for the SyncStandbySlotsConfigData struct */
    3035           26 :     size = offsetof(SyncStandbySlotsConfigData, slot_names);
    3036           78 :     foreach_ptr(char, slot_name, elemlist)
    3037           26 :         size += strlen(slot_name) + 1;
    3038              : 
    3039              :     /* GUC extra value must be guc_malloc'd, not palloc'd */
    3040           26 :     config = (SyncStandbySlotsConfigData *) guc_malloc(LOG, size);
    3041           26 :     if (!config)
    3042            0 :         return false;
    3043              : 
    3044              :     /* Transform the data into SyncStandbySlotsConfigData */
    3045           26 :     config->nslotnames = list_length(elemlist);
    3046              : 
    3047           26 :     ptr = config->slot_names;
    3048           78 :     foreach_ptr(char, slot_name, elemlist)
    3049              :     {
    3050           26 :         strcpy(ptr, slot_name);
    3051           26 :         ptr += strlen(slot_name) + 1;
    3052              :     }
    3053              : 
    3054           26 :     *extra = config;
    3055              : 
    3056           26 :     pfree(rawname);
    3057           26 :     list_free(elemlist);
    3058           26 :     return true;
    3059              : }
    3060              : 
    3061              : /*
    3062              :  * GUC assign_hook for synchronized_standby_slots
    3063              :  */
    3064              : void
    3065         1318 : assign_synchronized_standby_slots(const char *newval, void *extra)
    3066              : {
    3067              :     /*
    3068              :      * The standby slots may have changed, so we must recompute the oldest
    3069              :      * LSN.
    3070              :      */
    3071         1318 :     ss_oldest_flush_lsn = InvalidXLogRecPtr;
    3072              : 
    3073         1318 :     synchronized_standby_slots_config = (SyncStandbySlotsConfigData *) extra;
    3074         1318 : }
    3075              : 
    3076              : /*
    3077              :  * Check if the passed slot_name is specified in the synchronized_standby_slots GUC.
    3078              :  */
    3079              : bool
    3080        39460 : SlotExistsInSyncStandbySlots(const char *slot_name)
    3081              : {
    3082              :     const char *standby_slot_name;
    3083              : 
    3084              :     /* Return false if there is no value in synchronized_standby_slots */
    3085        39460 :     if (synchronized_standby_slots_config == NULL)
    3086        39443 :         return false;
    3087              : 
    3088              :     /*
    3089              :      * XXX: We are not expecting this list to be long so a linear search
    3090              :      * shouldn't hurt but if that turns out not to be true then we can cache
    3091              :      * this information for each WalSender as well.
    3092              :      */
    3093           17 :     standby_slot_name = synchronized_standby_slots_config->slot_names;
    3094           25 :     for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
    3095              :     {
    3096           17 :         if (strcmp(standby_slot_name, slot_name) == 0)
    3097            9 :             return true;
    3098              : 
    3099            8 :         standby_slot_name += strlen(standby_slot_name) + 1;
    3100              :     }
    3101              : 
    3102            8 :     return false;
    3103              : }
    3104              : 
    3105              : /*
    3106              :  * Return true if the slots specified in synchronized_standby_slots have caught up to
    3107              :  * the given WAL location, false otherwise.
    3108              :  *
    3109              :  * The elevel parameter specifies the error level used for logging messages
    3110              :  * related to slots that do not exist, are invalidated, or are inactive.
    3111              :  */
    3112              : bool
    3113         3888 : StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
    3114              : {
    3115              :     const char *name;
    3116         3888 :     int         caught_up_slot_num = 0;
    3117         3888 :     XLogRecPtr  min_restart_lsn = InvalidXLogRecPtr;
    3118              : 
    3119              :     /*
    3120              :      * Don't need to wait for the standbys to catch up if there is no value in
    3121              :      * synchronized_standby_slots.
    3122              :      */
    3123         3888 :     if (synchronized_standby_slots_config == NULL)
    3124         3858 :         return true;
    3125              : 
    3126              :     /*
    3127              :      * Don't need to wait for the standbys to catch up if we are on a standby
    3128              :      * server, since we do not support syncing slots to cascading standbys.
    3129              :      */
    3130           30 :     if (RecoveryInProgress())
    3131            0 :         return true;
    3132              : 
    3133              :     /*
    3134              :      * Don't need to wait for the standbys to catch up if they are already
    3135              :      * beyond the specified WAL location.
    3136              :      */
    3137           30 :     if (XLogRecPtrIsValid(ss_oldest_flush_lsn) &&
    3138           17 :         ss_oldest_flush_lsn >= wait_for_lsn)
    3139            8 :         return true;
    3140              : 
    3141              :     /*
    3142              :      * To prevent concurrent slot dropping and creation while filtering the
    3143              :      * slots, take the ReplicationSlotControlLock outside of the loop.
    3144              :      */
    3145           22 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    3146              : 
    3147           22 :     name = synchronized_standby_slots_config->slot_names;
    3148           30 :     for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
    3149              :     {
    3150              :         XLogRecPtr  restart_lsn;
    3151              :         bool        invalidated;
    3152              :         bool        inactive;
    3153              :         ReplicationSlot *slot;
    3154              : 
    3155           22 :         slot = SearchNamedReplicationSlot(name, false);
    3156              : 
    3157              :         /*
    3158              :          * If a slot name provided in synchronized_standby_slots does not
    3159              :          * exist, report a message and exit the loop.
    3160              :          */
    3161           22 :         if (!slot)
    3162              :         {
    3163            0 :             ereport(elevel,
    3164              :                     errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    3165              :                     errmsg("replication slot \"%s\" specified in parameter \"%s\" does not exist",
    3166              :                            name, "synchronized_standby_slots"),
    3167              :                     errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
    3168              :                               name),
    3169              :                     errhint("Create the replication slot \"%s\" or amend parameter \"%s\".",
    3170              :                             name, "synchronized_standby_slots"));
    3171            0 :             break;
    3172              :         }
    3173              : 
    3174              :         /* Same as above: if a slot is not physical, exit the loop. */
    3175           22 :         if (SlotIsLogical(slot))
    3176              :         {
    3177            0 :             ereport(elevel,
    3178              :                     errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    3179              :                     errmsg("cannot specify logical replication slot \"%s\" in parameter \"%s\"",
    3180              :                            name, "synchronized_standby_slots"),
    3181              :                     errdetail("Logical replication is waiting for correction on replication slot \"%s\".",
    3182              :                               name),
    3183              :                     errhint("Remove the logical replication slot \"%s\" from parameter \"%s\".",
    3184              :                             name, "synchronized_standby_slots"));
    3185            0 :             break;
    3186              :         }
    3187              : 
    3188           22 :         SpinLockAcquire(&slot->mutex);
    3189           22 :         restart_lsn = slot->data.restart_lsn;
    3190           22 :         invalidated = slot->data.invalidated != RS_INVAL_NONE;
    3191           22 :         inactive = slot->active_proc == INVALID_PROC_NUMBER;
    3192           22 :         SpinLockRelease(&slot->mutex);
    3193              : 
    3194           22 :         if (invalidated)
    3195              :         {
    3196              :             /* Specified physical slot has been invalidated */
    3197            0 :             ereport(elevel,
    3198              :                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    3199              :                     errmsg("physical replication slot \"%s\" specified in parameter \"%s\" has been invalidated",
    3200              :                            name, "synchronized_standby_slots"),
    3201              :                     errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
    3202              :                               name),
    3203              :                     errhint("Drop and recreate the replication slot \"%s\", or amend parameter \"%s\".",
    3204              :                             name, "synchronized_standby_slots"));
    3205            0 :             break;
    3206              :         }
    3207              : 
    3208           22 :         if (!XLogRecPtrIsValid(restart_lsn) || restart_lsn < wait_for_lsn)
    3209              :         {
    3210              :             /* Log a message if no active_pid for this physical slot */
    3211           14 :             if (inactive)
    3212            7 :                 ereport(elevel,
    3213              :                         errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    3214              :                         errmsg("replication slot \"%s\" specified in parameter \"%s\" does not have active_pid",
    3215              :                                name, "synchronized_standby_slots"),
    3216              :                         errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
    3217              :                                   name),
    3218              :                         errhint("Start the standby associated with the replication slot \"%s\", or amend parameter \"%s\".",
    3219              :                                 name, "synchronized_standby_slots"));
    3220              : 
    3221              :             /* Continue if the current slot hasn't caught up. */
    3222           14 :             break;
    3223              :         }
    3224              : 
    3225              :         Assert(restart_lsn >= wait_for_lsn);
    3226              : 
    3227            8 :         if (!XLogRecPtrIsValid(min_restart_lsn) ||
    3228              :             min_restart_lsn > restart_lsn)
    3229            8 :             min_restart_lsn = restart_lsn;
    3230              : 
    3231            8 :         caught_up_slot_num++;
    3232              : 
    3233            8 :         name += strlen(name) + 1;
    3234              :     }
    3235              : 
    3236           22 :     LWLockRelease(ReplicationSlotControlLock);
    3237              : 
    3238              :     /*
    3239              :      * Return false if not all the standbys have caught up to the specified
    3240              :      * WAL location.
    3241              :      */
    3242           22 :     if (caught_up_slot_num != synchronized_standby_slots_config->nslotnames)
    3243           14 :         return false;
    3244              : 
    3245              :     /* The ss_oldest_flush_lsn must not retreat. */
    3246              :     Assert(!XLogRecPtrIsValid(ss_oldest_flush_lsn) ||
    3247              :            min_restart_lsn >= ss_oldest_flush_lsn);
    3248              : 
    3249            8 :     ss_oldest_flush_lsn = min_restart_lsn;
    3250              : 
    3251            8 :     return true;
    3252              : }
    3253              : 
    3254              : /*
    3255              :  * Wait for physical standbys to confirm receiving the given lsn.
    3256              :  *
    3257              :  * Used by logical decoding SQL functions. It waits for physical standbys
    3258              :  * corresponding to the physical slots specified in the synchronized_standby_slots GUC.
    3259              :  */
    3260              : void
    3261          236 : WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
    3262              : {
    3263              :     /*
    3264              :      * Don't need to wait for the standby to catch up if the current acquired
    3265              :      * slot is not a logical failover slot, or there is no value in
    3266              :      * synchronized_standby_slots.
    3267              :      */
    3268          236 :     if (!MyReplicationSlot->data.failover || !synchronized_standby_slots_config)
    3269          235 :         return;
    3270              : 
    3271            1 :     ConditionVariablePrepareToSleep(&WalSndCtl->wal_confirm_rcv_cv);
    3272              : 
    3273              :     for (;;)
    3274              :     {
    3275            2 :         CHECK_FOR_INTERRUPTS();
    3276              : 
    3277            2 :         if (ConfigReloadPending)
    3278              :         {
    3279            1 :             ConfigReloadPending = false;
    3280            1 :             ProcessConfigFile(PGC_SIGHUP);
    3281              :         }
    3282              : 
    3283              :         /* Exit if done waiting for every slot. */
    3284            2 :         if (StandbySlotsHaveCaughtup(wait_for_lsn, WARNING))
    3285            1 :             break;
    3286              : 
    3287              :         /*
    3288              :          * Wait for the slots in the synchronized_standby_slots to catch up,
    3289              :          * but use a timeout (1s) so we can also check if the
    3290              :          * synchronized_standby_slots has been changed.
    3291              :          */
    3292            1 :         ConditionVariableTimedSleep(&WalSndCtl->wal_confirm_rcv_cv, 1000,
    3293              :                                     WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION);
    3294              :     }
    3295              : 
    3296            1 :     ConditionVariableCancelSleep();
    3297              : }
        

Generated by: LCOV version 2.0-1