LCOV - code coverage report
Current view: top level - src/backend/replication - slot.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 86.8 % 1003 871
Test Date: 2026-03-24 02:15:55 Functions: 97.9 % 48 47
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * slot.c
       4              :  *     Replication slot management.
       5              :  *
       6              :  *
       7              :  * Copyright (c) 2012-2026, PostgreSQL Global Development Group
       8              :  *
       9              :  *
      10              :  * IDENTIFICATION
      11              :  *    src/backend/replication/slot.c
      12              :  *
      13              :  * NOTES
      14              :  *
      15              :  * Replication slots are used to keep state about replication streams
      16              :  * originating from this cluster.  Their primary purpose is to prevent the
      17              :  * premature removal of WAL or of old tuple versions in a manner that would
      18              :  * interfere with replication; they are also useful for monitoring purposes.
      19              :  * Slots need to be permanent (to allow restarts), crash-safe, and allocatable
      20              :  * on standbys (to support cascading setups).  The requirement that slots be
      21              :  * usable on standbys precludes storing them in the system catalogs.
      22              :  *
      23              :  * Each replication slot gets its own directory inside the directory
      24              :  * $PGDATA / PG_REPLSLOT_DIR.  Inside that directory the state file will
      25              :  * contain the slot's own data.  Additional data can be stored alongside that
      26              :  * file if required.  While the server is running, the state data is also
      27              :  * cached in memory for efficiency.
      28              :  *
      29              :  * ReplicationSlotAllocationLock must be taken in exclusive mode to allocate
      30              :  * or free a slot. ReplicationSlotControlLock must be taken in shared mode
      31              :  * to iterate over the slots, and in exclusive mode to change the in_use flag
      32              :  * of a slot.  The remaining data in each slot is protected by its mutex.
      33              :  *
      34              :  *-------------------------------------------------------------------------
      35              :  */
      36              : 
      37              : #include "postgres.h"
      38              : 
      39              : #include <unistd.h>
      40              : #include <sys/stat.h>
      41              : 
      42              : #include "access/transam.h"
      43              : #include "access/xlog_internal.h"
      44              : #include "access/xlogrecovery.h"
      45              : #include "common/file_utils.h"
      46              : #include "common/string.h"
      47              : #include "miscadmin.h"
      48              : #include "pgstat.h"
      49              : #include "postmaster/interrupt.h"
      50              : #include "replication/logicallauncher.h"
      51              : #include "replication/slotsync.h"
      52              : #include "replication/slot.h"
      53              : #include "replication/walsender_private.h"
      54              : #include "storage/fd.h"
      55              : #include "storage/ipc.h"
      56              : #include "storage/proc.h"
      57              : #include "storage/procarray.h"
      58              : #include "utils/builtins.h"
      59              : #include "utils/guc_hooks.h"
      60              : #include "utils/injection_point.h"
      61              : #include "utils/varlena.h"
      62              : #include "utils/wait_event.h"
      63              : 
      64              : /*
      65              :  * Replication slot on-disk data structure.
      66              :  */
      67              : typedef struct ReplicationSlotOnDisk
      68              : {
      69              :     /* first part of this struct needs to be version independent */
      70              : 
      71              :     /* data not covered by checksum */
      72              :     uint32      magic;
      73              :     pg_crc32c   checksum;
      74              : 
      75              :     /* data covered by checksum */
      76              :     uint32      version;
      77              :     uint32      length;
      78              : 
      79              :     /*
      80              :      * The actual data in the slot that follows can differ based on the above
      81              :      * 'version'.
      82              :      */
      83              : 
      84              :     ReplicationSlotPersistentData slotdata;
      85              : } ReplicationSlotOnDisk;
      86              : 
      87              : /*
      88              :  * Struct for the configuration of synchronized_standby_slots.
      89              :  *
      90              :  * Note: this must be a flat representation that can be held in a single chunk
      91              :  * of guc_malloc'd memory, so that it can be stored as the "extra" data for the
      92              :  * synchronized_standby_slots GUC.
      93              :  */
      94              : typedef struct
      95              : {
      96              :     /* Number of slot names in the slot_names[] */
      97              :     int         nslotnames;
      98              : 
      99              :     /*
     100              :      * slot_names contains 'nslotnames' consecutive null-terminated C strings.
     101              :      */
     102              :     char        slot_names[FLEXIBLE_ARRAY_MEMBER];
     103              : } SyncStandbySlotsConfigData;
     104              : 
     105              : /*
     106              :  * Lookup table for slot invalidation causes.
     107              :  */
     108              : typedef struct SlotInvalidationCauseMap
     109              : {
     110              :     ReplicationSlotInvalidationCause cause;
     111              :     const char *cause_name;
     112              : } SlotInvalidationCauseMap;
     113              : 
     114              : static const SlotInvalidationCauseMap SlotInvalidationCauses[] = {
     115              :     {RS_INVAL_NONE, "none"},
     116              :     {RS_INVAL_WAL_REMOVED, "wal_removed"},
     117              :     {RS_INVAL_HORIZON, "rows_removed"},
     118              :     {RS_INVAL_WAL_LEVEL, "wal_level_insufficient"},
     119              :     {RS_INVAL_IDLE_TIMEOUT, "idle_timeout"},
     120              : };
     121              : 
     122              : /*
     123              :  * Ensure that the lookup table is up-to-date with the enums defined in
     124              :  * ReplicationSlotInvalidationCause.
     125              :  */
     126              : StaticAssertDecl(lengthof(SlotInvalidationCauses) == (RS_INVAL_MAX_CAUSES + 1),
     127              :                  "array length mismatch");
     128              : 
     129              : /* size of version independent data */
     130              : #define ReplicationSlotOnDiskConstantSize \
     131              :     offsetof(ReplicationSlotOnDisk, slotdata)
     132              : /* size of the part of the slot not covered by the checksum */
     133              : #define ReplicationSlotOnDiskNotChecksummedSize  \
     134              :     offsetof(ReplicationSlotOnDisk, version)
     135              : /* size of the part covered by the checksum */
     136              : #define ReplicationSlotOnDiskChecksummedSize \
     137              :     sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskNotChecksummedSize
     138              : /* size of the slot data that is version dependent */
     139              : #define ReplicationSlotOnDiskV2Size \
     140              :     sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
     141              : 
     142              : #define SLOT_MAGIC      0x1051CA1   /* format identifier */
     143              : #define SLOT_VERSION    5       /* version for new files */
     144              : 
     145              : /* Control array for replication slot management */
     146              : ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
     147              : 
     148              : /* My backend's replication slot in the shared memory array */
     149              : ReplicationSlot *MyReplicationSlot = NULL;
     150              : 
     151              : /* GUC variables */
     152              : int         max_replication_slots = 10; /* the maximum number of replication
     153              :                                          * slots */
     154              : 
     155              : /*
     156              :  * Invalidate replication slots that have remained idle longer than this
     157              :  * duration; '0' disables it.
     158              :  */
     159              : int         idle_replication_slot_timeout_secs = 0;
     160              : 
     161              : /*
     162              :  * This GUC lists streaming replication standby server slot names that
     163              :  * logical WAL sender processes will wait for.
     164              :  */
     165              : char       *synchronized_standby_slots;
     166              : 
     167              : /* This is the parsed and cached configuration for synchronized_standby_slots */
     168              : static SyncStandbySlotsConfigData *synchronized_standby_slots_config;
     169              : 
     170              : /*
     171              :  * Oldest LSN that has been confirmed to be flushed to the standbys
     172              :  * corresponding to the physical slots specified in the synchronized_standby_slots GUC.
     173              :  */
     174              : static XLogRecPtr ss_oldest_flush_lsn = InvalidXLogRecPtr;
     175              : 
     176              : static void ReplicationSlotShmemExit(int code, Datum arg);
     177              : static bool IsSlotForConflictCheck(const char *name);
     178              : static void ReplicationSlotDropPtr(ReplicationSlot *slot);
     179              : 
     180              : /* internal persistency functions */
     181              : static void RestoreSlotFromDisk(const char *name);
     182              : static void CreateSlotOnDisk(ReplicationSlot *slot);
     183              : static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel);
     184              : 
     185              : /*
     186              :  * Report shared-memory space needed by ReplicationSlotsShmemInit.
     187              :  */
     188              : Size
     189         4565 : ReplicationSlotsShmemSize(void)
     190              : {
     191         4565 :     Size        size = 0;
     192              : 
     193         4565 :     if (max_replication_slots == 0)
     194            2 :         return size;
     195              : 
     196         4563 :     size = offsetof(ReplicationSlotCtlData, replication_slots);
     197         4563 :     size = add_size(size,
     198              :                     mul_size(max_replication_slots, sizeof(ReplicationSlot)));
     199              : 
     200         4563 :     return size;
     201              : }
     202              : 
     203              : /*
     204              :  * Allocate and initialize shared memory for replication slots.
     205              :  */
     206              : void
     207         1180 : ReplicationSlotsShmemInit(void)
     208              : {
     209              :     bool        found;
     210              : 
     211         1180 :     if (max_replication_slots == 0)
     212            1 :         return;
     213              : 
     214         1179 :     ReplicationSlotCtl = (ReplicationSlotCtlData *)
     215         1179 :         ShmemInitStruct("ReplicationSlot Ctl", ReplicationSlotsShmemSize(),
     216              :                         &found);
     217              : 
     218         1179 :     if (!found)
     219              :     {
     220              :         int         i;
     221              : 
     222              :         /* First time through, so initialize */
     223         2289 :         MemSet(ReplicationSlotCtl, 0, ReplicationSlotsShmemSize());
     224              : 
     225        12777 :         for (i = 0; i < max_replication_slots; i++)
     226              :         {
     227        11598 :             ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i];
     228              : 
     229              :             /* everything else is zeroed by the memset above */
     230        11598 :             slot->active_proc = INVALID_PROC_NUMBER;
     231        11598 :             SpinLockInit(&slot->mutex);
     232        11598 :             LWLockInitialize(&slot->io_in_progress_lock,
     233              :                              LWTRANCHE_REPLICATION_SLOT_IO);
     234        11598 :             ConditionVariableInit(&slot->active_cv);
     235              :         }
     236              :     }
     237              : }
     238              : 
     239              : /*
     240              :  * Register the callback for replication slot cleanup and releasing.
     241              :  */
     242              : void
     243        24243 : ReplicationSlotInitialize(void)
     244              : {
     245        24243 :     before_shmem_exit(ReplicationSlotShmemExit, 0);
     246        24243 : }
     247              : 
     248              : /*
     249              :  * Release and cleanup replication slots.
     250              :  */
     251              : static void
     252        24243 : ReplicationSlotShmemExit(int code, Datum arg)
     253              : {
     254              :     /* Make sure active replication slots are released */
     255        24243 :     if (MyReplicationSlot != NULL)
     256          298 :         ReplicationSlotRelease();
     257              : 
     258              :     /* Also cleanup all the temporary slots. */
     259        24243 :     ReplicationSlotCleanup(false);
     260        24243 : }
     261              : 
     262              : /*
     263              :  * Check whether the passed slot name is valid and report errors at elevel.
     264              :  *
     265              :  * See comments for ReplicationSlotValidateNameInternal().
     266              :  */
     267              : bool
     268          850 : ReplicationSlotValidateName(const char *name, bool allow_reserved_name,
     269              :                             int elevel)
     270              : {
     271              :     int         err_code;
     272          850 :     char       *err_msg = NULL;
     273          850 :     char       *err_hint = NULL;
     274              : 
     275          850 :     if (!ReplicationSlotValidateNameInternal(name, allow_reserved_name,
     276              :                                              &err_code, &err_msg, &err_hint))
     277              :     {
     278              :         /*
     279              :          * Use errmsg_internal() and errhint_internal() instead of errmsg()
     280              :          * and errhint(), since the messages from
     281              :          * ReplicationSlotValidateNameInternal() are already translated. This
     282              :          * avoids double translation.
     283              :          */
     284            5 :         ereport(elevel,
     285              :                 errcode(err_code),
     286              :                 errmsg_internal("%s", err_msg),
     287              :                 (err_hint != NULL) ? errhint_internal("%s", err_hint) : 0);
     288              : 
     289            0 :         pfree(err_msg);
     290            0 :         if (err_hint != NULL)
     291            0 :             pfree(err_hint);
     292            0 :         return false;
     293              :     }
     294              : 
     295          845 :     return true;
     296              : }
     297              : 
     298              : /*
     299              :  * Check whether the passed slot name is valid.
     300              :  *
     301              :  * An error will be reported for a reserved replication slot name if
     302              :  * allow_reserved_name is set to false.
     303              :  *
     304              :  * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow
     305              :  * the name to be used as a directory name on every supported OS.
     306              :  *
     307              :  * Returns true if the slot name is valid. Otherwise, returns false and stores
     308              :  * the error code, error message, and optional hint in err_code, err_msg, and
     309              :  * err_hint, respectively. The caller is responsible for freeing err_msg and
     310              :  * err_hint, which are palloc'd.
     311              :  */
     312              : bool
     313         1069 : ReplicationSlotValidateNameInternal(const char *name, bool allow_reserved_name,
     314              :                                     int *err_code, char **err_msg, char **err_hint)
     315              : {
     316              :     const char *cp;
     317              : 
     318         1069 :     if (strlen(name) == 0)
     319              :     {
     320            4 :         *err_code = ERRCODE_INVALID_NAME;
     321            4 :         *err_msg = psprintf(_("replication slot name \"%s\" is too short"), name);
     322            4 :         *err_hint = NULL;
     323            4 :         return false;
     324              :     }
     325              : 
     326         1065 :     if (strlen(name) >= NAMEDATALEN)
     327              :     {
     328            0 :         *err_code = ERRCODE_NAME_TOO_LONG;
     329            0 :         *err_msg = psprintf(_("replication slot name \"%s\" is too long"), name);
     330            0 :         *err_hint = NULL;
     331            0 :         return false;
     332              :     }
     333              : 
     334        20675 :     for (cp = name; *cp; cp++)
     335              :     {
     336        19612 :         if (!((*cp >= 'a' && *cp <= 'z')
     337         9382 :               || (*cp >= '0' && *cp <= '9')
     338         1908 :               || (*cp == '_')))
     339              :         {
     340            2 :             *err_code = ERRCODE_INVALID_NAME;
     341            2 :             *err_msg = psprintf(_("replication slot name \"%s\" contains invalid character"), name);
     342            2 :             *err_hint = psprintf(_("Replication slot names may only contain lower case letters, numbers, and the underscore character."));
     343            2 :             return false;
     344              :         }
     345              :     }
     346              : 
     347         1063 :     if (!allow_reserved_name && IsSlotForConflictCheck(name))
     348              :     {
     349            1 :         *err_code = ERRCODE_RESERVED_NAME;
     350            1 :         *err_msg = psprintf(_("replication slot name \"%s\" is reserved"), name);
     351            1 :         *err_hint = psprintf(_("The name \"%s\" is reserved for the conflict detection slot."),
     352              :                              CONFLICT_DETECTION_SLOT);
     353            1 :         return false;
     354              :     }
     355              : 
     356         1062 :     return true;
     357              : }
     358              : 
     359              : /*
     360              :  * Return true if the replication slot name is "pg_conflict_detection".
     361              :  */
     362              : static bool
     363         2324 : IsSlotForConflictCheck(const char *name)
     364              : {
     365         2324 :     return (strcmp(name, CONFLICT_DETECTION_SLOT) == 0);
     366              : }
     367              : 
     368              : /*
     369              :  * Create a new replication slot and mark it as used by this backend.
     370              :  *
     371              :  * name: Name of the slot
     372              :  * db_specific: logical decoding is db specific; if the slot is going to
     373              :  *     be used for that pass true, otherwise false.
     374              :  * two_phase: If enabled, allows decoding of prepared transactions.
     375              :  * failover: If enabled, allows the slot to be synced to standbys so
     376              :  *     that logical replication can be resumed after failover.
     377              :  * synced: True if the slot is synchronized from the primary server.
     378              :  */
     379              : void
     380          707 : ReplicationSlotCreate(const char *name, bool db_specific,
     381              :                       ReplicationSlotPersistency persistency,
     382              :                       bool two_phase, bool failover, bool synced)
     383              : {
     384          707 :     ReplicationSlot *slot = NULL;
     385              :     int         i;
     386              : 
     387              :     Assert(MyReplicationSlot == NULL);
     388              : 
     389              :     /*
     390              :      * The logical launcher or pg_upgrade may create or migrate an internal
     391              :      * slot, so using a reserved name is allowed in these cases.
     392              :      */
     393          707 :     ReplicationSlotValidateName(name, IsBinaryUpgrade || IsLogicalLauncher(),
     394          707 :                                 ERROR);
     395              : 
     396          706 :     if (failover)
     397              :     {
     398              :         /*
     399              :          * Do not allow users to create the failover enabled slots on the
     400              :          * standby as we do not support sync to the cascading standby.
     401              :          *
     402              :          * However, failover enabled slots can be created during slot
     403              :          * synchronization because we need to retain the same values as the
     404              :          * remote slot.
     405              :          */
     406           28 :         if (RecoveryInProgress() && !IsSyncingReplicationSlots())
     407            0 :             ereport(ERROR,
     408              :                     errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     409              :                     errmsg("cannot enable failover for a replication slot created on the standby"));
     410              : 
     411              :         /*
     412              :          * Do not allow users to create failover enabled temporary slots,
     413              :          * because temporary slots will not be synced to the standby.
     414              :          *
     415              :          * However, failover enabled temporary slots can be created during
     416              :          * slot synchronization. See the comments atop slotsync.c for details.
     417              :          */
     418           28 :         if (persistency == RS_TEMPORARY && !IsSyncingReplicationSlots())
     419            1 :             ereport(ERROR,
     420              :                     errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     421              :                     errmsg("cannot enable failover for a temporary replication slot"));
     422              :     }
     423              : 
     424              :     /*
     425              :      * If some other backend ran this code concurrently with us, we'd likely
     426              :      * both allocate the same slot, and that would be bad.  We'd also be at
     427              :      * risk of missing a name collision.  Also, we don't want to try to create
     428              :      * a new slot while somebody's busy cleaning up an old one, because we
     429              :      * might both be monkeying with the same directory.
     430              :      */
     431          705 :     LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
     432              : 
     433              :     /*
     434              :      * Check for name collision, and identify an allocatable slot.  We need to
     435              :      * hold ReplicationSlotControlLock in shared mode for this, so that nobody
     436              :      * else can change the in_use flags while we're looking at them.
     437              :      */
     438          705 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     439         6847 :     for (i = 0; i < max_replication_slots; i++)
     440              :     {
     441         6145 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
     442              : 
     443         6145 :         if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
     444            3 :             ereport(ERROR,
     445              :                     (errcode(ERRCODE_DUPLICATE_OBJECT),
     446              :                      errmsg("replication slot \"%s\" already exists", name)));
     447         6142 :         if (!s->in_use && slot == NULL)
     448          701 :             slot = s;
     449              :     }
     450          702 :     LWLockRelease(ReplicationSlotControlLock);
     451              : 
     452              :     /* If all slots are in use, we're out of luck. */
     453          702 :     if (slot == NULL)
     454            1 :         ereport(ERROR,
     455              :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     456              :                  errmsg("all replication slots are in use"),
     457              :                  errhint("Free one or increase \"max_replication_slots\".")));
     458              : 
     459              :     /*
     460              :      * Since this slot is not in use, nobody should be looking at any part of
     461              :      * it other than the in_use field unless they're trying to allocate it.
     462              :      * And since we hold ReplicationSlotAllocationLock, nobody except us can
     463              :      * be doing that.  So it's safe to initialize the slot.
     464              :      */
     465              :     Assert(!slot->in_use);
     466              :     Assert(slot->active_proc == INVALID_PROC_NUMBER);
     467              : 
     468              :     /* first initialize persistent data */
     469          701 :     memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData));
     470          701 :     namestrcpy(&slot->data.name, name);
     471          701 :     slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
     472          701 :     slot->data.persistency = persistency;
     473          701 :     slot->data.two_phase = two_phase;
     474          701 :     slot->data.two_phase_at = InvalidXLogRecPtr;
     475          701 :     slot->data.failover = failover;
     476          701 :     slot->data.synced = synced;
     477              : 
     478              :     /* and then data only present in shared memory */
     479          701 :     slot->just_dirtied = false;
     480          701 :     slot->dirty = false;
     481          701 :     slot->effective_xmin = InvalidTransactionId;
     482          701 :     slot->effective_catalog_xmin = InvalidTransactionId;
     483          701 :     slot->candidate_catalog_xmin = InvalidTransactionId;
     484          701 :     slot->candidate_xmin_lsn = InvalidXLogRecPtr;
     485          701 :     slot->candidate_restart_valid = InvalidXLogRecPtr;
     486          701 :     slot->candidate_restart_lsn = InvalidXLogRecPtr;
     487          701 :     slot->last_saved_confirmed_flush = InvalidXLogRecPtr;
     488          701 :     slot->last_saved_restart_lsn = InvalidXLogRecPtr;
     489          701 :     slot->inactive_since = 0;
     490          701 :     slot->slotsync_skip_reason = SS_SKIP_NONE;
     491              : 
     492              :     /*
     493              :      * Create the slot on disk.  We haven't actually marked the slot allocated
     494              :      * yet, so no special cleanup is required if this errors out.
     495              :      */
     496          701 :     CreateSlotOnDisk(slot);
     497              : 
     498              :     /*
     499              :      * We need to briefly prevent any other backend from iterating over the
     500              :      * slots while we flip the in_use flag. We also need to set the active
     501              :      * flag while holding the ControlLock as otherwise a concurrent
     502              :      * ReplicationSlotAcquire() could acquire the slot as well.
     503              :      */
     504          701 :     LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
     505              : 
     506          701 :     slot->in_use = true;
     507              : 
     508              :     /* We can now mark the slot active, and that makes it our slot. */
     509          701 :     SpinLockAcquire(&slot->mutex);
     510              :     Assert(slot->active_proc == INVALID_PROC_NUMBER);
     511          701 :     slot->active_proc = MyProcNumber;
     512          701 :     SpinLockRelease(&slot->mutex);
     513          701 :     MyReplicationSlot = slot;
     514              : 
     515          701 :     LWLockRelease(ReplicationSlotControlLock);
     516              : 
     517              :     /*
     518              :      * Create statistics entry for the new logical slot. We don't collect any
     519              :      * stats for physical slots, so no need to create an entry for the same.
     520              :      * See ReplicationSlotDropPtr for why we need to do this before releasing
     521              :      * ReplicationSlotAllocationLock.
     522              :      */
     523          701 :     if (SlotIsLogical(slot))
     524          507 :         pgstat_create_replslot(slot);
     525              : 
     526              :     /*
     527              :      * Now that the slot has been marked as in_use and active, it's safe to
     528              :      * let somebody else try to allocate a slot.
     529              :      */
     530          701 :     LWLockRelease(ReplicationSlotAllocationLock);
     531              : 
     532              :     /* Let everybody know we've modified this slot */
     533          701 :     ConditionVariableBroadcast(&slot->active_cv);
     534          701 : }
     535              : 
     536              : /*
     537              :  * Search for the named replication slot.
     538              :  *
     539              :  * Return the replication slot if found, otherwise NULL.
     540              :  */
     541              : ReplicationSlot *
     542         2088 : SearchNamedReplicationSlot(const char *name, bool need_lock)
     543              : {
     544              :     int         i;
     545         2088 :     ReplicationSlot *slot = NULL;
     546              : 
     547         2088 :     if (need_lock)
     548          615 :         LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     549              : 
     550         8106 :     for (i = 0; i < max_replication_slots; i++)
     551              :     {
     552         7598 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
     553              : 
     554         7598 :         if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
     555              :         {
     556         1580 :             slot = s;
     557         1580 :             break;
     558              :         }
     559              :     }
     560              : 
     561         2088 :     if (need_lock)
     562          615 :         LWLockRelease(ReplicationSlotControlLock);
     563              : 
     564         2088 :     return slot;
     565              : }
     566              : 
     567              : /*
     568              :  * Return the index of the replication slot in
     569              :  * ReplicationSlotCtl->replication_slots.
     570              :  *
     571              :  * This is mainly useful to have an efficient key for storing replication slot
     572              :  * stats.
     573              :  */
     574              : int
     575         8982 : ReplicationSlotIndex(ReplicationSlot *slot)
     576              : {
     577              :     Assert(slot >= ReplicationSlotCtl->replication_slots &&
     578              :            slot < ReplicationSlotCtl->replication_slots + max_replication_slots);
     579              : 
     580         8982 :     return slot - ReplicationSlotCtl->replication_slots;
     581              : }
     582              : 
     583              : /*
     584              :  * If the slot at 'index' is unused, return false. Otherwise 'name' is set to
     585              :  * the slot's name and true is returned.
     586              :  *
     587              :  * This likely is only useful for pgstat_replslot.c during shutdown, in other
     588              :  * cases there are obvious TOCTOU issues.
     589              :  */
     590              : bool
     591          107 : ReplicationSlotName(int index, Name name)
     592              : {
     593              :     ReplicationSlot *slot;
     594              :     bool        found;
     595              : 
     596          107 :     slot = &ReplicationSlotCtl->replication_slots[index];
     597              : 
     598              :     /*
     599              :      * Ensure that the slot cannot be dropped while we copy the name. Don't
     600              :      * need the spinlock as the name of an existing slot cannot change.
     601              :      */
     602          107 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     603          107 :     found = slot->in_use;
     604          107 :     if (slot->in_use)
     605          107 :         namestrcpy(name, NameStr(slot->data.name));
     606          107 :     LWLockRelease(ReplicationSlotControlLock);
     607              : 
     608          107 :     return found;
     609              : }
     610              : 
     611              : /*
     612              :  * Find a previously created slot and mark it as used by this process.
     613              :  *
     614              :  * An error is raised if nowait is true and the slot is currently in use. If
     615              :  * nowait is false, we sleep until the slot is released by the owning process.
     616              :  *
     617              :  * An error is raised if error_if_invalid is true and the slot is found to
     618              :  * be invalid. It should always be set to true, except when we are temporarily
     619              :  * acquiring the slot and don't intend to change it.
     620              :  */
     621              : void
     622         1395 : ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
     623              : {
     624              :     ReplicationSlot *s;
     625              :     ProcNumber  active_proc;
     626              :     int         active_pid;
     627              : 
     628              :     Assert(name != NULL);
     629              : 
     630         1395 : retry:
     631              :     Assert(MyReplicationSlot == NULL);
     632              : 
     633         1395 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     634              : 
     635              :     /* Check if the slot exists with the given name. */
     636         1395 :     s = SearchNamedReplicationSlot(name, false);
     637         1395 :     if (s == NULL || !s->in_use)
     638              :     {
     639            9 :         LWLockRelease(ReplicationSlotControlLock);
     640              : 
     641            9 :         ereport(ERROR,
     642              :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
     643              :                  errmsg("replication slot \"%s\" does not exist",
     644              :                         name)));
     645              :     }
     646              : 
     647              :     /*
     648              :      * Do not allow users to acquire the reserved slot. This scenario may
     649              :      * occur if the launcher that owns the slot has terminated unexpectedly
     650              :      * due to an error, and a backend process attempts to reuse the slot.
     651              :      */
     652         1386 :     if (!IsLogicalLauncher() && IsSlotForConflictCheck(name))
     653            0 :         ereport(ERROR,
     654              :                 errcode(ERRCODE_UNDEFINED_OBJECT),
     655              :                 errmsg("cannot acquire replication slot \"%s\"", name),
     656              :                 errdetail("The slot is reserved for conflict detection and can only be acquired by logical replication launcher."));
     657              : 
     658              :     /*
     659              :      * This is the slot we want; check if it's active under some other
     660              :      * process.  In single user mode, we don't need this check.
     661              :      */
     662         1386 :     if (IsUnderPostmaster)
     663              :     {
     664              :         /*
     665              :          * Get ready to sleep on the slot in case it is active.  (We may end
     666              :          * up not sleeping, but we don't want to do this while holding the
     667              :          * spinlock.)
     668              :          */
     669         1381 :         if (!nowait)
     670          296 :             ConditionVariablePrepareToSleep(&s->active_cv);
     671              : 
     672              :         /*
     673              :          * It is important to reset the inactive_since under spinlock here to
     674              :          * avoid race conditions with slot invalidation. See comments related
     675              :          * to inactive_since in InvalidatePossiblyObsoleteSlot.
     676              :          */
     677         1381 :         SpinLockAcquire(&s->mutex);
     678         1381 :         if (s->active_proc == INVALID_PROC_NUMBER)
     679         1229 :             s->active_proc = MyProcNumber;
     680         1381 :         active_proc = s->active_proc;
     681         1381 :         ReplicationSlotSetInactiveSince(s, 0, false);
     682         1381 :         SpinLockRelease(&s->mutex);
     683              :     }
     684              :     else
     685              :     {
     686            5 :         s->active_proc = active_proc = MyProcNumber;
     687            5 :         ReplicationSlotSetInactiveSince(s, 0, true);
     688              :     }
     689         1386 :     active_pid = GetPGProcByNumber(active_proc)->pid;
     690         1386 :     LWLockRelease(ReplicationSlotControlLock);
     691              : 
     692              :     /*
     693              :      * If we found the slot but it's already active in another process, we
     694              :      * wait until the owning process signals us that it's been released, or
     695              :      * error out.
     696              :      */
     697         1386 :     if (active_proc != MyProcNumber)
     698              :     {
     699            0 :         if (!nowait)
     700              :         {
     701              :             /* Wait here until we get signaled, and then restart */
     702            0 :             ConditionVariableSleep(&s->active_cv,
     703              :                                    WAIT_EVENT_REPLICATION_SLOT_DROP);
     704            0 :             ConditionVariableCancelSleep();
     705            0 :             goto retry;
     706              :         }
     707              : 
     708            0 :         ereport(ERROR,
     709              :                 (errcode(ERRCODE_OBJECT_IN_USE),
     710              :                  errmsg("replication slot \"%s\" is active for PID %d",
     711              :                         NameStr(s->data.name), active_pid)));
     712              :     }
     713         1386 :     else if (!nowait)
     714          296 :         ConditionVariableCancelSleep(); /* no sleep needed after all */
     715              : 
     716              :     /* We made this slot active, so it's ours now. */
     717         1386 :     MyReplicationSlot = s;
     718              : 
     719              :     /*
     720              :      * We need to check for invalidation after making the slot ours to avoid
     721              :      * the possible race condition with the checkpointer that can otherwise
     722              :      * invalidate the slot immediately after the check.
     723              :      */
     724         1386 :     if (error_if_invalid && s->data.invalidated != RS_INVAL_NONE)
     725            8 :         ereport(ERROR,
     726              :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     727              :                 errmsg("can no longer access replication slot \"%s\"",
     728              :                        NameStr(s->data.name)),
     729              :                 errdetail("This replication slot has been invalidated due to \"%s\".",
     730              :                           GetSlotInvalidationCauseName(s->data.invalidated)));
     731              : 
     732              :     /* Let everybody know we've modified this slot */
     733         1378 :     ConditionVariableBroadcast(&s->active_cv);
     734              : 
     735              :     /*
     736              :      * The call to pgstat_acquire_replslot() protects against stats for a
     737              :      * different slot, from before a restart or such, being present during
     738              :      * pgstat_report_replslot().
     739              :      */
     740         1378 :     if (SlotIsLogical(s))
     741         1158 :         pgstat_acquire_replslot(s);
     742              : 
     743              : 
     744         1378 :     if (am_walsender)
     745              :     {
     746          949 :         ereport(log_replication_commands ? LOG : DEBUG1,
     747              :                 SlotIsLogical(s)
     748              :                 ? errmsg("acquired logical replication slot \"%s\"",
     749              :                          NameStr(s->data.name))
     750              :                 : errmsg("acquired physical replication slot \"%s\"",
     751              :                          NameStr(s->data.name)));
     752              :     }
     753         1378 : }
     754              : 
     755              : /*
     756              :  * Release the replication slot that this backend considers to own.
     757              :  *
     758              :  * This or another backend can re-acquire the slot later.
     759              :  * Resources this slot requires will be preserved.
     760              :  */
     761              : void
     762         1664 : ReplicationSlotRelease(void)
     763              : {
     764         1664 :     ReplicationSlot *slot = MyReplicationSlot;
     765         1664 :     char       *slotname = NULL;    /* keep compiler quiet */
     766              :     bool        is_logical;
     767         1664 :     TimestampTz now = 0;
     768              : 
     769              :     Assert(slot != NULL && slot->active_proc != INVALID_PROC_NUMBER);
     770              : 
     771         1664 :     is_logical = SlotIsLogical(slot);
     772              : 
     773         1664 :     if (am_walsender)
     774         1168 :         slotname = pstrdup(NameStr(slot->data.name));
     775              : 
     776         1664 :     if (slot->data.persistency == RS_EPHEMERAL)
     777              :     {
     778              :         /*
     779              :          * Delete the slot. There is no !PANIC case where this is allowed to
     780              :          * fail, all that may happen is an incomplete cleanup of the on-disk
     781              :          * data.
     782              :          */
     783            6 :         ReplicationSlotDropAcquired();
     784              : 
     785              :         /*
     786              :          * Request to disable logical decoding, even though this slot may not
     787              :          * have been the last logical slot. The checkpointer will verify if
     788              :          * logical decoding should actually be disabled.
     789              :          */
     790            6 :         if (is_logical)
     791            6 :             RequestDisableLogicalDecoding();
     792              :     }
     793              : 
     794              :     /*
     795              :      * If slot needed to temporarily restrain both data and catalog xmin to
     796              :      * create the catalog snapshot, remove that temporary constraint.
     797              :      * Snapshots can only be exported while the initial snapshot is still
     798              :      * acquired.
     799              :      */
     800         1664 :     if (!TransactionIdIsValid(slot->data.xmin) &&
     801         1634 :         TransactionIdIsValid(slot->effective_xmin))
     802              :     {
     803          215 :         SpinLockAcquire(&slot->mutex);
     804          215 :         slot->effective_xmin = InvalidTransactionId;
     805          215 :         SpinLockRelease(&slot->mutex);
     806          215 :         ReplicationSlotsComputeRequiredXmin(false);
     807              :     }
     808              : 
     809              :     /*
     810              :      * Set the time since the slot has become inactive. We get the current
     811              :      * time beforehand to avoid system call while holding the spinlock.
     812              :      */
     813         1664 :     now = GetCurrentTimestamp();
     814              : 
     815         1664 :     if (slot->data.persistency == RS_PERSISTENT)
     816              :     {
     817              :         /*
     818              :          * Mark persistent slot inactive.  We're not freeing it, just
     819              :          * disconnecting, but wake up others that may be waiting for it.
     820              :          */
     821         1354 :         SpinLockAcquire(&slot->mutex);
     822         1354 :         slot->active_proc = INVALID_PROC_NUMBER;
     823         1354 :         ReplicationSlotSetInactiveSince(slot, now, false);
     824         1354 :         SpinLockRelease(&slot->mutex);
     825         1354 :         ConditionVariableBroadcast(&slot->active_cv);
     826              :     }
     827              :     else
     828          310 :         ReplicationSlotSetInactiveSince(slot, now, true);
     829              : 
     830         1664 :     MyReplicationSlot = NULL;
     831              : 
     832              :     /* might not have been set when we've been a plain slot */
     833         1664 :     LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
     834         1664 :     MyProc->statusFlags &= ~PROC_IN_LOGICAL_DECODING;
     835         1664 :     ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags;
     836         1664 :     LWLockRelease(ProcArrayLock);
     837              : 
     838         1664 :     if (am_walsender)
     839              :     {
     840         1168 :         ereport(log_replication_commands ? LOG : DEBUG1,
     841              :                 is_logical
     842              :                 ? errmsg("released logical replication slot \"%s\"",
     843              :                          slotname)
     844              :                 : errmsg("released physical replication slot \"%s\"",
     845              :                          slotname));
     846              : 
     847         1168 :         pfree(slotname);
     848              :     }
     849         1664 : }
     850              : 
     851              : /*
     852              :  * Cleanup temporary slots created in current session.
     853              :  *
     854              :  * Cleanup only synced temporary slots if 'synced_only' is true, else
     855              :  * cleanup all temporary slots.
     856              :  *
     857              :  * If it drops the last logical slot in the cluster, requests to disable
     858              :  * logical decoding.
     859              :  */
     860              : void
     861        54632 : ReplicationSlotCleanup(bool synced_only)
     862              : {
     863              :     int         i;
     864              :     bool        found_valid_logicalslot;
     865        54632 :     bool        dropped_logical = false;
     866              : 
     867              :     Assert(MyReplicationSlot == NULL);
     868              : 
     869        54784 : restart:
     870        54784 :     found_valid_logicalslot = false;
     871        54784 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     872       595158 :     for (i = 0; i < max_replication_slots; i++)
     873              :     {
     874       540526 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
     875              : 
     876       540526 :         if (!s->in_use)
     877       525370 :             continue;
     878              : 
     879        15156 :         SpinLockAcquire(&s->mutex);
     880              : 
     881        30312 :         found_valid_logicalslot |=
     882        15156 :             (SlotIsLogical(s) && s->data.invalidated == RS_INVAL_NONE);
     883              : 
     884        15156 :         if ((s->active_proc == MyProcNumber &&
     885          152 :              (!synced_only || s->data.synced)))
     886              :         {
     887              :             Assert(s->data.persistency == RS_TEMPORARY);
     888          152 :             SpinLockRelease(&s->mutex);
     889          152 :             LWLockRelease(ReplicationSlotControlLock);  /* avoid deadlock */
     890              : 
     891          152 :             if (SlotIsLogical(s))
     892           10 :                 dropped_logical = true;
     893              : 
     894          152 :             ReplicationSlotDropPtr(s);
     895              : 
     896          152 :             ConditionVariableBroadcast(&s->active_cv);
     897          152 :             goto restart;
     898              :         }
     899              :         else
     900        15004 :             SpinLockRelease(&s->mutex);
     901              :     }
     902              : 
     903        54632 :     LWLockRelease(ReplicationSlotControlLock);
     904              : 
     905        54632 :     if (dropped_logical && !found_valid_logicalslot)
     906            3 :         RequestDisableLogicalDecoding();
     907        54632 : }
     908              : 
     909              : /*
     910              :  * Permanently drop replication slot identified by the passed in name.
     911              :  */
     912              : void
     913          444 : ReplicationSlotDrop(const char *name, bool nowait)
     914              : {
     915              :     bool        is_logical;
     916              : 
     917              :     Assert(MyReplicationSlot == NULL);
     918              : 
     919          444 :     ReplicationSlotAcquire(name, nowait, false);
     920              : 
     921              :     /*
     922              :      * Do not allow users to drop the slots which are currently being synced
     923              :      * from the primary to the standby.
     924              :      */
     925          437 :     if (RecoveryInProgress() && MyReplicationSlot->data.synced)
     926            1 :         ereport(ERROR,
     927              :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     928              :                 errmsg("cannot drop replication slot \"%s\"", name),
     929              :                 errdetail("This replication slot is being synchronized from the primary server."));
     930              : 
     931          436 :     is_logical = SlotIsLogical(MyReplicationSlot);
     932              : 
     933          436 :     ReplicationSlotDropAcquired();
     934              : 
     935          436 :     if (is_logical)
     936          416 :         RequestDisableLogicalDecoding();
     937          436 : }
     938              : 
     939              : /*
     940              :  * Change the definition of the slot identified by the specified name.
     941              :  *
     942              :  * Altering the two_phase property of a slot requires caution on the
     943              :  * client-side. Enabling it at any random point during decoding has the
     944              :  * risk that transactions prepared before this change may be skipped by
     945              :  * the decoder, leading to missing prepare records on the client. So, we
     946              :  * enable it for subscription related slots only once the initial tablesync
     947              :  * is finished. See comments atop worker.c. Disabling it is safe only when
     948              :  * there are no pending prepared transaction, otherwise, the changes of
     949              :  * already prepared transactions can be replicated again along with their
     950              :  * corresponding commit leading to duplicate data or errors.
     951              :  */
     952              : void
     953            7 : ReplicationSlotAlter(const char *name, const bool *failover,
     954              :                      const bool *two_phase)
     955              : {
     956            7 :     bool        update_slot = false;
     957              : 
     958              :     Assert(MyReplicationSlot == NULL);
     959              :     Assert(failover || two_phase);
     960              : 
     961            7 :     ReplicationSlotAcquire(name, false, true);
     962              : 
     963            6 :     if (SlotIsPhysical(MyReplicationSlot))
     964            0 :         ereport(ERROR,
     965              :                 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     966              :                 errmsg("cannot use %s with a physical replication slot",
     967              :                        "ALTER_REPLICATION_SLOT"));
     968              : 
     969            6 :     if (RecoveryInProgress())
     970              :     {
     971              :         /*
     972              :          * Do not allow users to alter the slots which are currently being
     973              :          * synced from the primary to the standby.
     974              :          */
     975            1 :         if (MyReplicationSlot->data.synced)
     976            1 :             ereport(ERROR,
     977              :                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     978              :                     errmsg("cannot alter replication slot \"%s\"", name),
     979              :                     errdetail("This replication slot is being synchronized from the primary server."));
     980              : 
     981              :         /*
     982              :          * Do not allow users to enable failover on the standby as we do not
     983              :          * support sync to the cascading standby.
     984              :          */
     985            0 :         if (failover && *failover)
     986            0 :             ereport(ERROR,
     987              :                     errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     988              :                     errmsg("cannot enable failover for a replication slot"
     989              :                            " on the standby"));
     990              :     }
     991              : 
     992            5 :     if (failover)
     993              :     {
     994              :         /*
     995              :          * Do not allow users to enable failover for temporary slots as we do
     996              :          * not support syncing temporary slots to the standby.
     997              :          */
     998            4 :         if (*failover && MyReplicationSlot->data.persistency == RS_TEMPORARY)
     999            0 :             ereport(ERROR,
    1000              :                     errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1001              :                     errmsg("cannot enable failover for a temporary replication slot"));
    1002              : 
    1003            4 :         if (MyReplicationSlot->data.failover != *failover)
    1004              :         {
    1005            4 :             SpinLockAcquire(&MyReplicationSlot->mutex);
    1006            4 :             MyReplicationSlot->data.failover = *failover;
    1007            4 :             SpinLockRelease(&MyReplicationSlot->mutex);
    1008              : 
    1009            4 :             update_slot = true;
    1010              :         }
    1011              :     }
    1012              : 
    1013            5 :     if (two_phase && MyReplicationSlot->data.two_phase != *two_phase)
    1014              :     {
    1015            1 :         SpinLockAcquire(&MyReplicationSlot->mutex);
    1016            1 :         MyReplicationSlot->data.two_phase = *two_phase;
    1017            1 :         SpinLockRelease(&MyReplicationSlot->mutex);
    1018              : 
    1019            1 :         update_slot = true;
    1020              :     }
    1021              : 
    1022            5 :     if (update_slot)
    1023              :     {
    1024            5 :         ReplicationSlotMarkDirty();
    1025            5 :         ReplicationSlotSave();
    1026              :     }
    1027              : 
    1028            5 :     ReplicationSlotRelease();
    1029            5 : }
    1030              : 
    1031              : /*
    1032              :  * Permanently drop the currently acquired replication slot.
    1033              :  */
    1034              : void
    1035          450 : ReplicationSlotDropAcquired(void)
    1036              : {
    1037          450 :     ReplicationSlot *slot = MyReplicationSlot;
    1038              : 
    1039              :     Assert(MyReplicationSlot != NULL);
    1040              : 
    1041              :     /* slot isn't acquired anymore */
    1042          450 :     MyReplicationSlot = NULL;
    1043              : 
    1044          450 :     ReplicationSlotDropPtr(slot);
    1045          450 : }
    1046              : 
    1047              : /*
    1048              :  * Permanently drop the replication slot which will be released by the point
    1049              :  * this function returns.
    1050              :  */
    1051              : static void
    1052          602 : ReplicationSlotDropPtr(ReplicationSlot *slot)
    1053              : {
    1054              :     char        path[MAXPGPATH];
    1055              :     char        tmppath[MAXPGPATH];
    1056              : 
    1057              :     /*
    1058              :      * If some other backend ran this code concurrently with us, we might try
    1059              :      * to delete a slot with a certain name while someone else was trying to
    1060              :      * create a slot with the same name.
    1061              :      */
    1062          602 :     LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
    1063              : 
    1064              :     /* Generate pathnames. */
    1065          602 :     sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(slot->data.name));
    1066          602 :     sprintf(tmppath, "%s/%s.tmp", PG_REPLSLOT_DIR, NameStr(slot->data.name));
    1067              : 
    1068              :     /*
    1069              :      * Rename the slot directory on disk, so that we'll no longer recognize
    1070              :      * this as a valid slot.  Note that if this fails, we've got to mark the
    1071              :      * slot inactive before bailing out.  If we're dropping an ephemeral or a
    1072              :      * temporary slot, we better never fail hard as the caller won't expect
    1073              :      * the slot to survive and this might get called during error handling.
    1074              :      */
    1075          602 :     if (rename(path, tmppath) == 0)
    1076              :     {
    1077              :         /*
    1078              :          * We need to fsync() the directory we just renamed and its parent to
    1079              :          * make sure that our changes are on disk in a crash-safe fashion.  If
    1080              :          * fsync() fails, we can't be sure whether the changes are on disk or
    1081              :          * not.  For now, we handle that by panicking;
    1082              :          * StartupReplicationSlots() will try to straighten it out after
    1083              :          * restart.
    1084              :          */
    1085          602 :         START_CRIT_SECTION();
    1086          602 :         fsync_fname(tmppath, true);
    1087          602 :         fsync_fname(PG_REPLSLOT_DIR, true);
    1088          602 :         END_CRIT_SECTION();
    1089              :     }
    1090              :     else
    1091              :     {
    1092            0 :         bool        fail_softly = slot->data.persistency != RS_PERSISTENT;
    1093              : 
    1094            0 :         SpinLockAcquire(&slot->mutex);
    1095            0 :         slot->active_proc = INVALID_PROC_NUMBER;
    1096            0 :         SpinLockRelease(&slot->mutex);
    1097              : 
    1098              :         /* wake up anyone waiting on this slot */
    1099            0 :         ConditionVariableBroadcast(&slot->active_cv);
    1100              : 
    1101            0 :         ereport(fail_softly ? WARNING : ERROR,
    1102              :                 (errcode_for_file_access(),
    1103              :                  errmsg("could not rename file \"%s\" to \"%s\": %m",
    1104              :                         path, tmppath)));
    1105              :     }
    1106              : 
    1107              :     /*
    1108              :      * The slot is definitely gone.  Lock out concurrent scans of the array
    1109              :      * long enough to kill it.  It's OK to clear the active PID here without
    1110              :      * grabbing the mutex because nobody else can be scanning the array here,
    1111              :      * and nobody can be attached to this slot and thus access it without
    1112              :      * scanning the array.
    1113              :      *
    1114              :      * Also wake up processes waiting for it.
    1115              :      */
    1116          602 :     LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
    1117          602 :     slot->active_proc = INVALID_PROC_NUMBER;
    1118          602 :     slot->in_use = false;
    1119          602 :     LWLockRelease(ReplicationSlotControlLock);
    1120          602 :     ConditionVariableBroadcast(&slot->active_cv);
    1121              : 
    1122              :     /*
    1123              :      * Slot is dead and doesn't prevent resource removal anymore, recompute
    1124              :      * limits.
    1125              :      */
    1126          602 :     ReplicationSlotsComputeRequiredXmin(false);
    1127          602 :     ReplicationSlotsComputeRequiredLSN();
    1128              : 
    1129              :     /*
    1130              :      * If removing the directory fails, the worst thing that will happen is
    1131              :      * that the user won't be able to create a new slot with the same name
    1132              :      * until the next server restart.  We warn about it, but that's all.
    1133              :      */
    1134          602 :     if (!rmtree(tmppath, true))
    1135            0 :         ereport(WARNING,
    1136              :                 (errmsg("could not remove directory \"%s\"", tmppath)));
    1137              : 
    1138              :     /*
    1139              :      * Drop the statistics entry for the replication slot.  Do this while
    1140              :      * holding ReplicationSlotAllocationLock so that we don't drop a
    1141              :      * statistics entry for another slot with the same name just created in
    1142              :      * another session.
    1143              :      */
    1144          602 :     if (SlotIsLogical(slot))
    1145          439 :         pgstat_drop_replslot(slot);
    1146              : 
    1147              :     /*
    1148              :      * We release this at the very end, so that nobody starts trying to create
    1149              :      * a slot while we're still cleaning up the detritus of the old one.
    1150              :      */
    1151          602 :     LWLockRelease(ReplicationSlotAllocationLock);
    1152          602 : }
    1153              : 
    1154              : /*
    1155              :  * Serialize the currently acquired slot's state from memory to disk, thereby
    1156              :  * guaranteeing the current state will survive a crash.
    1157              :  */
    1158              : void
    1159         1480 : ReplicationSlotSave(void)
    1160              : {
    1161              :     char        path[MAXPGPATH];
    1162              : 
    1163              :     Assert(MyReplicationSlot != NULL);
    1164              : 
    1165         1480 :     sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(MyReplicationSlot->data.name));
    1166         1480 :     SaveSlotToPath(MyReplicationSlot, path, ERROR);
    1167         1480 : }
    1168              : 
    1169              : /*
    1170              :  * Signal that it would be useful if the currently acquired slot would be
    1171              :  * flushed out to disk.
    1172              :  *
    1173              :  * Note that the actual flush to disk can be delayed for a long time, if
    1174              :  * required for correctness explicitly do a ReplicationSlotSave().
    1175              :  */
    1176              : void
    1177        39525 : ReplicationSlotMarkDirty(void)
    1178              : {
    1179        39525 :     ReplicationSlot *slot = MyReplicationSlot;
    1180              : 
    1181              :     Assert(MyReplicationSlot != NULL);
    1182              : 
    1183        39525 :     SpinLockAcquire(&slot->mutex);
    1184        39525 :     MyReplicationSlot->just_dirtied = true;
    1185        39525 :     MyReplicationSlot->dirty = true;
    1186        39525 :     SpinLockRelease(&slot->mutex);
    1187        39525 : }
    1188              : 
    1189              : /*
    1190              :  * Convert a slot that's marked as RS_EPHEMERAL or RS_TEMPORARY to a
    1191              :  * RS_PERSISTENT slot, guaranteeing it will be there after an eventual crash.
    1192              :  */
    1193              : void
    1194          491 : ReplicationSlotPersist(void)
    1195              : {
    1196          491 :     ReplicationSlot *slot = MyReplicationSlot;
    1197              : 
    1198              :     Assert(slot != NULL);
    1199              :     Assert(slot->data.persistency != RS_PERSISTENT);
    1200              : 
    1201          491 :     SpinLockAcquire(&slot->mutex);
    1202          491 :     slot->data.persistency = RS_PERSISTENT;
    1203          491 :     SpinLockRelease(&slot->mutex);
    1204              : 
    1205          491 :     ReplicationSlotMarkDirty();
    1206          491 :     ReplicationSlotSave();
    1207          491 : }
    1208              : 
    1209              : /*
    1210              :  * Compute the oldest xmin across all slots and store it in the ProcArray.
    1211              :  *
    1212              :  * If already_locked is true, both the ReplicationSlotControlLock and the
    1213              :  * ProcArrayLock have already been acquired exclusively. It is crucial that the
    1214              :  * caller first acquires the ReplicationSlotControlLock, followed by the
    1215              :  * ProcArrayLock, to prevent any undetectable deadlocks since this function
    1216              :  * acquires them in that order.
    1217              :  */
    1218              : void
    1219         2555 : ReplicationSlotsComputeRequiredXmin(bool already_locked)
    1220              : {
    1221              :     int         i;
    1222         2555 :     TransactionId agg_xmin = InvalidTransactionId;
    1223         2555 :     TransactionId agg_catalog_xmin = InvalidTransactionId;
    1224              : 
    1225              :     Assert(ReplicationSlotCtl != NULL);
    1226              :     Assert(!already_locked ||
    1227              :            (LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_EXCLUSIVE) &&
    1228              :             LWLockHeldByMeInMode(ProcArrayLock, LW_EXCLUSIVE)));
    1229              : 
    1230              :     /*
    1231              :      * Hold the ReplicationSlotControlLock until after updating the slot xmin
    1232              :      * values, so no backend updates the initial xmin for newly created slot
    1233              :      * concurrently. A shared lock is used here to minimize lock contention,
    1234              :      * especially when many slots exist and advancements occur frequently.
    1235              :      * This is safe since an exclusive lock is taken during initial slot xmin
    1236              :      * update in slot creation.
    1237              :      *
    1238              :      * One might think that we can hold the ProcArrayLock exclusively and
    1239              :      * update the slot xmin values, but it could increase lock contention on
    1240              :      * the ProcArrayLock, which is not great since this function can be called
    1241              :      * at non-negligible frequency.
    1242              :      *
    1243              :      * Concurrent invocation of this function may cause the computed slot xmin
    1244              :      * to regress. However, this is harmless because tuples prior to the most
    1245              :      * recent xmin are no longer useful once advancement occurs (see
    1246              :      * LogicalConfirmReceivedLocation where the slot's xmin value is flushed
    1247              :      * before updating the effective_xmin). Thus, such regression merely
    1248              :      * prevents VACUUM from prematurely removing tuples without causing the
    1249              :      * early deletion of required data.
    1250              :      */
    1251         2555 :     if (!already_locked)
    1252         2047 :         LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    1253              : 
    1254        26039 :     for (i = 0; i < max_replication_slots; i++)
    1255              :     {
    1256        23484 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
    1257              :         TransactionId effective_xmin;
    1258              :         TransactionId effective_catalog_xmin;
    1259              :         bool        invalidated;
    1260              : 
    1261        23484 :         if (!s->in_use)
    1262        21013 :             continue;
    1263              : 
    1264         2471 :         SpinLockAcquire(&s->mutex);
    1265         2471 :         effective_xmin = s->effective_xmin;
    1266         2471 :         effective_catalog_xmin = s->effective_catalog_xmin;
    1267         2471 :         invalidated = s->data.invalidated != RS_INVAL_NONE;
    1268         2471 :         SpinLockRelease(&s->mutex);
    1269              : 
    1270              :         /* invalidated slots need not apply */
    1271         2471 :         if (invalidated)
    1272           26 :             continue;
    1273              : 
    1274              :         /* check the data xmin */
    1275         2445 :         if (TransactionIdIsValid(effective_xmin) &&
    1276            5 :             (!TransactionIdIsValid(agg_xmin) ||
    1277            5 :              TransactionIdPrecedes(effective_xmin, agg_xmin)))
    1278          344 :             agg_xmin = effective_xmin;
    1279              : 
    1280              :         /* check the catalog xmin */
    1281         2445 :         if (TransactionIdIsValid(effective_catalog_xmin) &&
    1282         1022 :             (!TransactionIdIsValid(agg_catalog_xmin) ||
    1283         1022 :              TransactionIdPrecedes(effective_catalog_xmin, agg_catalog_xmin)))
    1284         1284 :             agg_catalog_xmin = effective_catalog_xmin;
    1285              :     }
    1286              : 
    1287         2555 :     ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
    1288              : 
    1289         2555 :     if (!already_locked)
    1290         2047 :         LWLockRelease(ReplicationSlotControlLock);
    1291         2555 : }
    1292              : 
    1293              : /*
    1294              :  * Compute the oldest restart LSN across all slots and inform xlog module.
    1295              :  *
    1296              :  * Note: while max_slot_wal_keep_size is theoretically relevant for this
    1297              :  * purpose, we don't try to account for that, because this module doesn't
    1298              :  * know what to compare against.
    1299              :  */
    1300              : void
    1301        40238 : ReplicationSlotsComputeRequiredLSN(void)
    1302              : {
    1303              :     int         i;
    1304        40238 :     XLogRecPtr  min_required = InvalidXLogRecPtr;
    1305              : 
    1306              :     Assert(ReplicationSlotCtl != NULL);
    1307              : 
    1308        40238 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    1309       438475 :     for (i = 0; i < max_replication_slots; i++)
    1310              :     {
    1311       398237 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
    1312              :         XLogRecPtr  restart_lsn;
    1313              :         XLogRecPtr  last_saved_restart_lsn;
    1314              :         bool        invalidated;
    1315              :         ReplicationSlotPersistency persistency;
    1316              : 
    1317       398237 :         if (!s->in_use)
    1318       357811 :             continue;
    1319              : 
    1320        40426 :         SpinLockAcquire(&s->mutex);
    1321        40426 :         persistency = s->data.persistency;
    1322        40426 :         restart_lsn = s->data.restart_lsn;
    1323        40426 :         invalidated = s->data.invalidated != RS_INVAL_NONE;
    1324        40426 :         last_saved_restart_lsn = s->last_saved_restart_lsn;
    1325        40426 :         SpinLockRelease(&s->mutex);
    1326              : 
    1327              :         /* invalidated slots need not apply */
    1328        40426 :         if (invalidated)
    1329           27 :             continue;
    1330              : 
    1331              :         /*
    1332              :          * For persistent slot use last_saved_restart_lsn to compute the
    1333              :          * oldest LSN for removal of WAL segments.  The segments between
    1334              :          * last_saved_restart_lsn and restart_lsn might be needed by a
    1335              :          * persistent slot in the case of database crash.  Non-persistent
    1336              :          * slots can't survive the database crash, so we don't care about
    1337              :          * last_saved_restart_lsn for them.
    1338              :          */
    1339        40399 :         if (persistency == RS_PERSISTENT)
    1340              :         {
    1341        39463 :             if (XLogRecPtrIsValid(last_saved_restart_lsn) &&
    1342              :                 restart_lsn > last_saved_restart_lsn)
    1343              :             {
    1344        37063 :                 restart_lsn = last_saved_restart_lsn;
    1345              :             }
    1346              :         }
    1347              : 
    1348        40399 :         if (XLogRecPtrIsValid(restart_lsn) &&
    1349         1281 :             (!XLogRecPtrIsValid(min_required) ||
    1350              :              restart_lsn < min_required))
    1351        39238 :             min_required = restart_lsn;
    1352              :     }
    1353        40238 :     LWLockRelease(ReplicationSlotControlLock);
    1354              : 
    1355        40238 :     XLogSetReplicationSlotMinimumLSN(min_required);
    1356        40238 : }
    1357              : 
    1358              : /*
    1359              :  * Compute the oldest WAL LSN required by *logical* decoding slots..
    1360              :  *
    1361              :  * Returns InvalidXLogRecPtr if logical decoding is disabled or no logical
    1362              :  * slots exist.
    1363              :  *
    1364              :  * NB: this returns a value >= ReplicationSlotsComputeRequiredLSN(), since it
    1365              :  * ignores physical replication slots.
    1366              :  *
    1367              :  * The results aren't required frequently, so we don't maintain a precomputed
    1368              :  * value like we do for ComputeRequiredLSN() and ComputeRequiredXmin().
    1369              :  */
    1370              : XLogRecPtr
    1371         3670 : ReplicationSlotsComputeLogicalRestartLSN(void)
    1372              : {
    1373         3670 :     XLogRecPtr  result = InvalidXLogRecPtr;
    1374              :     int         i;
    1375              : 
    1376         3670 :     if (max_replication_slots <= 0)
    1377            2 :         return InvalidXLogRecPtr;
    1378              : 
    1379         3668 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    1380              : 
    1381        39818 :     for (i = 0; i < max_replication_slots; i++)
    1382              :     {
    1383              :         ReplicationSlot *s;
    1384              :         XLogRecPtr  restart_lsn;
    1385              :         XLogRecPtr  last_saved_restart_lsn;
    1386              :         bool        invalidated;
    1387              :         ReplicationSlotPersistency persistency;
    1388              : 
    1389        36150 :         s = &ReplicationSlotCtl->replication_slots[i];
    1390              : 
    1391              :         /* cannot change while ReplicationSlotCtlLock is held */
    1392        36150 :         if (!s->in_use)
    1393        35356 :             continue;
    1394              : 
    1395              :         /* we're only interested in logical slots */
    1396          794 :         if (!SlotIsLogical(s))
    1397          546 :             continue;
    1398              : 
    1399              :         /* read once, it's ok if it increases while we're checking */
    1400          248 :         SpinLockAcquire(&s->mutex);
    1401          248 :         persistency = s->data.persistency;
    1402          248 :         restart_lsn = s->data.restart_lsn;
    1403          248 :         invalidated = s->data.invalidated != RS_INVAL_NONE;
    1404          248 :         last_saved_restart_lsn = s->last_saved_restart_lsn;
    1405          248 :         SpinLockRelease(&s->mutex);
    1406              : 
    1407              :         /* invalidated slots need not apply */
    1408          248 :         if (invalidated)
    1409           10 :             continue;
    1410              : 
    1411              :         /*
    1412              :          * For persistent slot use last_saved_restart_lsn to compute the
    1413              :          * oldest LSN for removal of WAL segments.  The segments between
    1414              :          * last_saved_restart_lsn and restart_lsn might be needed by a
    1415              :          * persistent slot in the case of database crash.  Non-persistent
    1416              :          * slots can't survive the database crash, so we don't care about
    1417              :          * last_saved_restart_lsn for them.
    1418              :          */
    1419          238 :         if (persistency == RS_PERSISTENT)
    1420              :         {
    1421          236 :             if (XLogRecPtrIsValid(last_saved_restart_lsn) &&
    1422              :                 restart_lsn > last_saved_restart_lsn)
    1423              :             {
    1424            0 :                 restart_lsn = last_saved_restart_lsn;
    1425              :             }
    1426              :         }
    1427              : 
    1428          238 :         if (!XLogRecPtrIsValid(restart_lsn))
    1429            0 :             continue;
    1430              : 
    1431          238 :         if (!XLogRecPtrIsValid(result) ||
    1432              :             restart_lsn < result)
    1433          188 :             result = restart_lsn;
    1434              :     }
    1435              : 
    1436         3668 :     LWLockRelease(ReplicationSlotControlLock);
    1437              : 
    1438         3668 :     return result;
    1439              : }
    1440              : 
    1441              : /*
    1442              :  * ReplicationSlotsCountDBSlots -- count the number of slots that refer to the
    1443              :  * passed database oid.
    1444              :  *
    1445              :  * Returns true if there are any slots referencing the database. *nslots will
    1446              :  * be set to the absolute number of slots in the database, *nactive to ones
    1447              :  * currently active.
    1448              :  */
    1449              : bool
    1450           53 : ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
    1451              : {
    1452              :     int         i;
    1453              : 
    1454           53 :     *nslots = *nactive = 0;
    1455              : 
    1456           53 :     if (max_replication_slots <= 0)
    1457            0 :         return false;
    1458              : 
    1459           53 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    1460          556 :     for (i = 0; i < max_replication_slots; i++)
    1461              :     {
    1462              :         ReplicationSlot *s;
    1463              : 
    1464          503 :         s = &ReplicationSlotCtl->replication_slots[i];
    1465              : 
    1466              :         /* cannot change while ReplicationSlotCtlLock is held */
    1467          503 :         if (!s->in_use)
    1468          482 :             continue;
    1469              : 
    1470              :         /* only logical slots are database specific, skip */
    1471           21 :         if (!SlotIsLogical(s))
    1472           10 :             continue;
    1473              : 
    1474              :         /* not our database, skip */
    1475           11 :         if (s->data.database != dboid)
    1476            8 :             continue;
    1477              : 
    1478              :         /* NB: intentionally counting invalidated slots */
    1479              : 
    1480              :         /* count slots with spinlock held */
    1481            3 :         SpinLockAcquire(&s->mutex);
    1482            3 :         (*nslots)++;
    1483            3 :         if (s->active_proc != INVALID_PROC_NUMBER)
    1484            1 :             (*nactive)++;
    1485            3 :         SpinLockRelease(&s->mutex);
    1486              :     }
    1487           53 :     LWLockRelease(ReplicationSlotControlLock);
    1488              : 
    1489           53 :     if (*nslots > 0)
    1490            3 :         return true;
    1491           50 :     return false;
    1492              : }
    1493              : 
    1494              : /*
    1495              :  * ReplicationSlotsDropDBSlots -- Drop all db-specific slots relating to the
    1496              :  * passed database oid. The caller should hold an exclusive lock on the
    1497              :  * pg_database oid for the database to prevent creation of new slots on the db
    1498              :  * or replay from existing slots.
    1499              :  *
    1500              :  * Another session that concurrently acquires an existing slot on the target DB
    1501              :  * (most likely to drop it) may cause this function to ERROR. If that happens
    1502              :  * it may have dropped some but not all slots.
    1503              :  *
    1504              :  * This routine isn't as efficient as it could be - but we don't drop
    1505              :  * databases often, especially databases with lots of slots.
    1506              :  *
    1507              :  * If it drops the last logical slot in the cluster, it requests to disable
    1508              :  * logical decoding.
    1509              :  */
    1510              : void
    1511           66 : ReplicationSlotsDropDBSlots(Oid dboid)
    1512              : {
    1513              :     int         i;
    1514              :     bool        found_valid_logicalslot;
    1515           66 :     bool        dropped = false;
    1516              : 
    1517           66 :     if (max_replication_slots <= 0)
    1518            0 :         return;
    1519              : 
    1520           66 : restart:
    1521           71 :     found_valid_logicalslot = false;
    1522           71 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    1523          696 :     for (i = 0; i < max_replication_slots; i++)
    1524              :     {
    1525              :         ReplicationSlot *s;
    1526              :         char       *slotname;
    1527              :         ProcNumber  active_proc;
    1528              : 
    1529          630 :         s = &ReplicationSlotCtl->replication_slots[i];
    1530              : 
    1531              :         /* cannot change while ReplicationSlotCtlLock is held */
    1532          630 :         if (!s->in_use)
    1533          600 :             continue;
    1534              : 
    1535              :         /* only logical slots are database specific, skip */
    1536           30 :         if (!SlotIsLogical(s))
    1537           11 :             continue;
    1538              : 
    1539              :         /*
    1540              :          * Check logical slots on other databases too so we can disable
    1541              :          * logical decoding only if no slots in the cluster.
    1542              :          */
    1543           19 :         SpinLockAcquire(&s->mutex);
    1544           19 :         found_valid_logicalslot |= (s->data.invalidated == RS_INVAL_NONE);
    1545           19 :         SpinLockRelease(&s->mutex);
    1546              : 
    1547              :         /* not our database, skip */
    1548           19 :         if (s->data.database != dboid)
    1549           14 :             continue;
    1550              : 
    1551              :         /* NB: intentionally including invalidated slots to drop */
    1552              : 
    1553              :         /* acquire slot, so ReplicationSlotDropAcquired can be reused  */
    1554            5 :         SpinLockAcquire(&s->mutex);
    1555              :         /* can't change while ReplicationSlotControlLock is held */
    1556            5 :         slotname = NameStr(s->data.name);
    1557            5 :         active_proc = s->active_proc;
    1558            5 :         if (active_proc == INVALID_PROC_NUMBER)
    1559              :         {
    1560            5 :             MyReplicationSlot = s;
    1561            5 :             s->active_proc = MyProcNumber;
    1562              :         }
    1563            5 :         SpinLockRelease(&s->mutex);
    1564              : 
    1565              :         /*
    1566              :          * Even though we hold an exclusive lock on the database object a
    1567              :          * logical slot for that DB can still be active, e.g. if it's
    1568              :          * concurrently being dropped by a backend connected to another DB.
    1569              :          *
    1570              :          * That's fairly unlikely in practice, so we'll just bail out.
    1571              :          *
    1572              :          * The slot sync worker holds a shared lock on the database before
    1573              :          * operating on synced logical slots to avoid conflict with the drop
    1574              :          * happening here. The persistent synced slots are thus safe but there
    1575              :          * is a possibility that the slot sync worker has created a temporary
    1576              :          * slot (which stays active even on release) and we are trying to drop
    1577              :          * that here. In practice, the chances of hitting this scenario are
    1578              :          * less as during slot synchronization, the temporary slot is
    1579              :          * immediately converted to persistent and thus is safe due to the
    1580              :          * shared lock taken on the database. So, we'll just bail out in such
    1581              :          * a case.
    1582              :          *
    1583              :          * XXX: We can consider shutting down the slot sync worker before
    1584              :          * trying to drop synced temporary slots here.
    1585              :          */
    1586            5 :         if (active_proc != INVALID_PROC_NUMBER)
    1587            0 :             ereport(ERROR,
    1588              :                     (errcode(ERRCODE_OBJECT_IN_USE),
    1589              :                      errmsg("replication slot \"%s\" is active for PID %d",
    1590              :                             slotname, GetPGProcByNumber(active_proc)->pid)));
    1591              : 
    1592              :         /*
    1593              :          * To avoid duplicating ReplicationSlotDropAcquired() and to avoid
    1594              :          * holding ReplicationSlotControlLock over filesystem operations,
    1595              :          * release ReplicationSlotControlLock and use
    1596              :          * ReplicationSlotDropAcquired.
    1597              :          *
    1598              :          * As that means the set of slots could change, restart scan from the
    1599              :          * beginning each time we release the lock.
    1600              :          */
    1601            5 :         LWLockRelease(ReplicationSlotControlLock);
    1602            5 :         ReplicationSlotDropAcquired();
    1603            5 :         dropped = true;
    1604            5 :         goto restart;
    1605              :     }
    1606           66 :     LWLockRelease(ReplicationSlotControlLock);
    1607              : 
    1608           66 :     if (dropped && !found_valid_logicalslot)
    1609            0 :         RequestDisableLogicalDecoding();
    1610              : }
    1611              : 
    1612              : /*
    1613              :  * Returns true if there is at least one in-use valid logical replication slot.
    1614              :  */
    1615              : bool
    1616          471 : CheckLogicalSlotExists(void)
    1617              : {
    1618          471 :     bool        found = false;
    1619              : 
    1620          471 :     if (max_replication_slots <= 0)
    1621            1 :         return false;
    1622              : 
    1623          470 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    1624         5086 :     for (int i = 0; i < max_replication_slots; i++)
    1625              :     {
    1626              :         ReplicationSlot *s;
    1627              :         bool        invalidated;
    1628              : 
    1629         4622 :         s = &ReplicationSlotCtl->replication_slots[i];
    1630              : 
    1631              :         /* cannot change while ReplicationSlotCtlLock is held */
    1632         4622 :         if (!s->in_use)
    1633         4594 :             continue;
    1634              : 
    1635           28 :         if (SlotIsPhysical(s))
    1636           19 :             continue;
    1637              : 
    1638            9 :         SpinLockAcquire(&s->mutex);
    1639            9 :         invalidated = s->data.invalidated != RS_INVAL_NONE;
    1640            9 :         SpinLockRelease(&s->mutex);
    1641              : 
    1642            9 :         if (invalidated)
    1643            3 :             continue;
    1644              : 
    1645            6 :         found = true;
    1646            6 :         break;
    1647              :     }
    1648          470 :     LWLockRelease(ReplicationSlotControlLock);
    1649              : 
    1650          470 :     return found;
    1651              : }
    1652              : 
    1653              : /*
    1654              :  * Check whether the server's configuration supports using replication
    1655              :  * slots.
    1656              :  */
    1657              : void
    1658         1886 : CheckSlotRequirements(void)
    1659              : {
    1660              :     /*
    1661              :      * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
    1662              :      * needs the same check.
    1663              :      */
    1664              : 
    1665         1886 :     if (max_replication_slots == 0)
    1666            0 :         ereport(ERROR,
    1667              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1668              :                  errmsg("replication slots can only be used if \"max_replication_slots\" > 0")));
    1669              : 
    1670         1886 :     if (wal_level < WAL_LEVEL_REPLICA)
    1671            0 :         ereport(ERROR,
    1672              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1673              :                  errmsg("replication slots can only be used if \"wal_level\" >= \"replica\"")));
    1674         1886 : }
    1675              : 
    1676              : /*
    1677              :  * Check whether the user has privilege to use replication slots.
    1678              :  */
    1679              : void
    1680          596 : CheckSlotPermissions(void)
    1681              : {
    1682          596 :     if (!has_rolreplication(GetUserId()))
    1683            5 :         ereport(ERROR,
    1684              :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    1685              :                  errmsg("permission denied to use replication slots"),
    1686              :                  errdetail("Only roles with the %s attribute may use replication slots.",
    1687              :                            "REPLICATION")));
    1688          591 : }
    1689              : 
    1690              : /*
    1691              :  * Reserve WAL for the currently active slot.
    1692              :  *
    1693              :  * Compute and set restart_lsn in a manner that's appropriate for the type of
    1694              :  * the slot and concurrency safe.
    1695              :  */
    1696              : void
    1697          650 : ReplicationSlotReserveWal(void)
    1698              : {
    1699          650 :     ReplicationSlot *slot = MyReplicationSlot;
    1700              :     XLogSegNo   segno;
    1701              :     XLogRecPtr  restart_lsn;
    1702              : 
    1703              :     Assert(slot != NULL);
    1704              :     Assert(!XLogRecPtrIsValid(slot->data.restart_lsn));
    1705              :     Assert(!XLogRecPtrIsValid(slot->last_saved_restart_lsn));
    1706              : 
    1707              :     /*
    1708              :      * The replication slot mechanism is used to prevent the removal of
    1709              :      * required WAL.
    1710              :      *
    1711              :      * Acquire an exclusive lock to prevent the checkpoint process from
    1712              :      * concurrently computing the minimum slot LSN (see
    1713              :      * CheckPointReplicationSlots). This ensures that the WAL reserved for
    1714              :      * replication cannot be removed during a checkpoint.
    1715              :      *
    1716              :      * The mechanism is reliable because if WAL reservation occurs first, the
    1717              :      * checkpoint must wait for the restart_lsn update before determining the
    1718              :      * minimum non-removable LSN. On the other hand, if the checkpoint happens
    1719              :      * first, subsequent WAL reservations will select positions at or beyond
    1720              :      * the redo pointer of that checkpoint.
    1721              :      */
    1722          650 :     LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
    1723              : 
    1724              :     /*
    1725              :      * For logical slots log a standby snapshot and start logical decoding at
    1726              :      * exactly that position. That allows the slot to start up more quickly.
    1727              :      * But on a standby we cannot do WAL writes, so just use the replay
    1728              :      * pointer; effectively, an attempt to create a logical slot on standby
    1729              :      * will cause it to wait for an xl_running_xact record to be logged
    1730              :      * independently on the primary, so that a snapshot can be built using the
    1731              :      * record.
    1732              :      *
    1733              :      * None of this is needed (or indeed helpful) for physical slots as
    1734              :      * they'll start replay at the last logged checkpoint anyway. Instead,
    1735              :      * return the location of the last redo LSN, where a base backup has to
    1736              :      * start replay at.
    1737              :      */
    1738          650 :     if (SlotIsPhysical(slot))
    1739          160 :         restart_lsn = GetRedoRecPtr();
    1740          490 :     else if (RecoveryInProgress())
    1741           26 :         restart_lsn = GetXLogReplayRecPtr(NULL);
    1742              :     else
    1743          464 :         restart_lsn = GetXLogInsertRecPtr();
    1744              : 
    1745          650 :     SpinLockAcquire(&slot->mutex);
    1746          650 :     slot->data.restart_lsn = restart_lsn;
    1747          650 :     SpinLockRelease(&slot->mutex);
    1748              : 
    1749              :     /* prevent WAL removal as fast as possible */
    1750          650 :     ReplicationSlotsComputeRequiredLSN();
    1751              : 
    1752              :     /* Checkpoint shouldn't remove the required WAL. */
    1753          650 :     XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size);
    1754          650 :     if (XLogGetLastRemovedSegno() >= segno)
    1755            0 :         elog(ERROR, "WAL required by replication slot %s has been removed concurrently",
    1756              :              NameStr(slot->data.name));
    1757              : 
    1758          650 :     LWLockRelease(ReplicationSlotAllocationLock);
    1759              : 
    1760          650 :     if (!RecoveryInProgress() && SlotIsLogical(slot))
    1761              :     {
    1762              :         XLogRecPtr  flushptr;
    1763              : 
    1764              :         /* make sure we have enough information to start */
    1765          464 :         flushptr = LogStandbySnapshot();
    1766              : 
    1767              :         /* and make sure it's fsynced to disk */
    1768          464 :         XLogFlush(flushptr);
    1769              :     }
    1770          650 : }
    1771              : 
    1772              : /*
    1773              :  * Report that replication slot needs to be invalidated
    1774              :  */
    1775              : static void
    1776           23 : ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
    1777              :                        bool terminating,
    1778              :                        int pid,
    1779              :                        NameData slotname,
    1780              :                        XLogRecPtr restart_lsn,
    1781              :                        XLogRecPtr oldestLSN,
    1782              :                        TransactionId snapshotConflictHorizon,
    1783              :                        long slot_idle_seconds)
    1784              : {
    1785              :     StringInfoData err_detail;
    1786              :     StringInfoData err_hint;
    1787              : 
    1788           23 :     initStringInfo(&err_detail);
    1789           23 :     initStringInfo(&err_hint);
    1790              : 
    1791           23 :     switch (cause)
    1792              :     {
    1793            7 :         case RS_INVAL_WAL_REMOVED:
    1794              :             {
    1795            7 :                 uint64      ex = oldestLSN - restart_lsn;
    1796              : 
    1797            7 :                 appendStringInfo(&err_detail,
    1798            7 :                                  ngettext("The slot's restart_lsn %X/%08X exceeds the limit by %" PRIu64 " byte.",
    1799              :                                           "The slot's restart_lsn %X/%08X exceeds the limit by %" PRIu64 " bytes.",
    1800              :                                           ex),
    1801            7 :                                  LSN_FORMAT_ARGS(restart_lsn),
    1802              :                                  ex);
    1803              :                 /* translator: %s is a GUC variable name */
    1804            7 :                 appendStringInfo(&err_hint, _("You might need to increase \"%s\"."),
    1805              :                                  "max_slot_wal_keep_size");
    1806            7 :                 break;
    1807              :             }
    1808           12 :         case RS_INVAL_HORIZON:
    1809           12 :             appendStringInfo(&err_detail, _("The slot conflicted with xid horizon %u."),
    1810              :                              snapshotConflictHorizon);
    1811           12 :             break;
    1812              : 
    1813            4 :         case RS_INVAL_WAL_LEVEL:
    1814            4 :             appendStringInfoString(&err_detail, _("Logical decoding on standby requires the primary server to either set \"wal_level\" >= \"logical\" or have at least one logical slot when \"wal_level\" = \"replica\"."));
    1815            4 :             break;
    1816              : 
    1817            0 :         case RS_INVAL_IDLE_TIMEOUT:
    1818              :             {
    1819              :                 /* translator: %s is a GUC variable name */
    1820            0 :                 appendStringInfo(&err_detail, _("The slot's idle time of %lds exceeds the configured \"%s\" duration of %ds."),
    1821              :                                  slot_idle_seconds, "idle_replication_slot_timeout",
    1822              :                                  idle_replication_slot_timeout_secs);
    1823              :                 /* translator: %s is a GUC variable name */
    1824            0 :                 appendStringInfo(&err_hint, _("You might need to increase \"%s\"."),
    1825              :                                  "idle_replication_slot_timeout");
    1826            0 :                 break;
    1827              :             }
    1828              :         case RS_INVAL_NONE:
    1829              :             pg_unreachable();
    1830              :     }
    1831              : 
    1832           23 :     ereport(LOG,
    1833              :             terminating ?
    1834              :             errmsg("terminating process %d to release replication slot \"%s\"",
    1835              :                    pid, NameStr(slotname)) :
    1836              :             errmsg("invalidating obsolete replication slot \"%s\"",
    1837              :                    NameStr(slotname)),
    1838              :             errdetail_internal("%s", err_detail.data),
    1839              :             err_hint.len ? errhint("%s", err_hint.data) : 0);
    1840              : 
    1841           23 :     pfree(err_detail.data);
    1842           23 :     pfree(err_hint.data);
    1843           23 : }
    1844              : 
    1845              : /*
    1846              :  * Can we invalidate an idle replication slot?
    1847              :  *
    1848              :  * Idle timeout invalidation is allowed only when:
    1849              :  *
    1850              :  * 1. Idle timeout is set
    1851              :  * 2. Slot has reserved WAL
    1852              :  * 3. Slot is inactive
    1853              :  * 4. The slot is not being synced from the primary while the server is in
    1854              :  *    recovery. This is because synced slots are always considered to be
    1855              :  *    inactive because they don't perform logical decoding to produce changes.
    1856              :  */
    1857              : static inline bool
    1858          374 : CanInvalidateIdleSlot(ReplicationSlot *s)
    1859              : {
    1860          374 :     return (idle_replication_slot_timeout_secs != 0 &&
    1861            0 :             XLogRecPtrIsValid(s->data.restart_lsn) &&
    1862          374 :             s->inactive_since > 0 &&
    1863            0 :             !(RecoveryInProgress() && s->data.synced));
    1864              : }
    1865              : 
    1866              : /*
    1867              :  * DetermineSlotInvalidationCause - Determine the cause for which a slot
    1868              :  * becomes invalid among the given possible causes.
    1869              :  *
    1870              :  * This function sequentially checks all possible invalidation causes and
    1871              :  * returns the first one for which the slot is eligible for invalidation.
    1872              :  */
    1873              : static ReplicationSlotInvalidationCause
    1874          409 : DetermineSlotInvalidationCause(uint32 possible_causes, ReplicationSlot *s,
    1875              :                                XLogRecPtr oldestLSN, Oid dboid,
    1876              :                                TransactionId snapshotConflictHorizon,
    1877              :                                TimestampTz *inactive_since, TimestampTz now)
    1878              : {
    1879              :     Assert(possible_causes != RS_INVAL_NONE);
    1880              : 
    1881          409 :     if (possible_causes & RS_INVAL_WAL_REMOVED)
    1882              :     {
    1883          381 :         XLogRecPtr  restart_lsn = s->data.restart_lsn;
    1884              : 
    1885          381 :         if (XLogRecPtrIsValid(restart_lsn) &&
    1886              :             restart_lsn < oldestLSN)
    1887            7 :             return RS_INVAL_WAL_REMOVED;
    1888              :     }
    1889              : 
    1890          402 :     if (possible_causes & RS_INVAL_HORIZON)
    1891              :     {
    1892              :         /* invalid DB oid signals a shared relation */
    1893           24 :         if (SlotIsLogical(s) &&
    1894           19 :             (dboid == InvalidOid || dboid == s->data.database))
    1895              :         {
    1896           24 :             TransactionId effective_xmin = s->effective_xmin;
    1897           24 :             TransactionId catalog_effective_xmin = s->effective_catalog_xmin;
    1898              : 
    1899           24 :             if (TransactionIdIsValid(effective_xmin) &&
    1900            0 :                 TransactionIdPrecedesOrEquals(effective_xmin,
    1901              :                                               snapshotConflictHorizon))
    1902            0 :                 return RS_INVAL_HORIZON;
    1903           48 :             else if (TransactionIdIsValid(catalog_effective_xmin) &&
    1904           24 :                      TransactionIdPrecedesOrEquals(catalog_effective_xmin,
    1905              :                                                    snapshotConflictHorizon))
    1906           12 :                 return RS_INVAL_HORIZON;
    1907              :         }
    1908              :     }
    1909              : 
    1910          390 :     if (possible_causes & RS_INVAL_WAL_LEVEL)
    1911              :     {
    1912            4 :         if (SlotIsLogical(s))
    1913            4 :             return RS_INVAL_WAL_LEVEL;
    1914              :     }
    1915              : 
    1916          386 :     if (possible_causes & RS_INVAL_IDLE_TIMEOUT)
    1917              :     {
    1918              :         Assert(now > 0);
    1919              : 
    1920          374 :         if (CanInvalidateIdleSlot(s))
    1921              :         {
    1922              :             /*
    1923              :              * Simulate the invalidation due to idle_timeout to test the
    1924              :              * timeout behavior promptly, without waiting for it to trigger
    1925              :              * naturally.
    1926              :              */
    1927              : #ifdef USE_INJECTION_POINTS
    1928            0 :             if (IS_INJECTION_POINT_ATTACHED("slot-timeout-inval"))
    1929              :             {
    1930            0 :                 *inactive_since = 0;    /* since the beginning of time */
    1931            0 :                 return RS_INVAL_IDLE_TIMEOUT;
    1932              :             }
    1933              : #endif
    1934              : 
    1935              :             /*
    1936              :              * Check if the slot needs to be invalidated due to
    1937              :              * idle_replication_slot_timeout GUC.
    1938              :              */
    1939            0 :             if (TimestampDifferenceExceedsSeconds(s->inactive_since, now,
    1940              :                                                   idle_replication_slot_timeout_secs))
    1941              :             {
    1942            0 :                 *inactive_since = s->inactive_since;
    1943            0 :                 return RS_INVAL_IDLE_TIMEOUT;
    1944              :             }
    1945              :         }
    1946              :     }
    1947              : 
    1948          386 :     return RS_INVAL_NONE;
    1949              : }
    1950              : 
    1951              : /*
    1952              :  * Helper for InvalidateObsoleteReplicationSlots
    1953              :  *
    1954              :  * Acquires the given slot and mark it invalid, if necessary and possible.
    1955              :  *
    1956              :  * Returns true if the slot was invalidated.
    1957              :  *
    1958              :  * Set *released_lock_out if ReplicationSlotControlLock was released in the
    1959              :  * interim (and in that case we're not holding the lock at return, otherwise
    1960              :  * we are).
    1961              :  *
    1962              :  * This is inherently racy, because we release the LWLock
    1963              :  * for syscalls, so caller must restart if we return true.
    1964              :  */
    1965              : static bool
    1966          448 : InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
    1967              :                                ReplicationSlot *s,
    1968              :                                XLogRecPtr oldestLSN,
    1969              :                                Oid dboid, TransactionId snapshotConflictHorizon,
    1970              :                                bool *released_lock_out)
    1971              : {
    1972          448 :     int         last_signaled_pid = 0;
    1973          448 :     bool        released_lock = false;
    1974          448 :     bool        invalidated = false;
    1975          448 :     TimestampTz inactive_since = 0;
    1976              : 
    1977              :     for (;;)
    1978            7 :     {
    1979              :         XLogRecPtr  restart_lsn;
    1980              :         NameData    slotname;
    1981              :         ProcNumber  active_proc;
    1982          455 :         int         active_pid = 0;
    1983          455 :         ReplicationSlotInvalidationCause invalidation_cause = RS_INVAL_NONE;
    1984          455 :         TimestampTz now = 0;
    1985          455 :         long        slot_idle_secs = 0;
    1986              : 
    1987              :         Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
    1988              : 
    1989          455 :         if (!s->in_use)
    1990              :         {
    1991            0 :             if (released_lock)
    1992            0 :                 LWLockRelease(ReplicationSlotControlLock);
    1993            0 :             break;
    1994              :         }
    1995              : 
    1996          455 :         if (possible_causes & RS_INVAL_IDLE_TIMEOUT)
    1997              :         {
    1998              :             /*
    1999              :              * Assign the current time here to avoid system call overhead
    2000              :              * while holding the spinlock in subsequent code.
    2001              :              */
    2002          395 :             now = GetCurrentTimestamp();
    2003              :         }
    2004              : 
    2005              :         /*
    2006              :          * Check if the slot needs to be invalidated. If it needs to be
    2007              :          * invalidated, and is not currently acquired, acquire it and mark it
    2008              :          * as having been invalidated.  We do this with the spinlock held to
    2009              :          * avoid race conditions -- for example the restart_lsn could move
    2010              :          * forward, or the slot could be dropped.
    2011              :          */
    2012          455 :         SpinLockAcquire(&s->mutex);
    2013              : 
    2014          455 :         restart_lsn = s->data.restart_lsn;
    2015              : 
    2016              :         /* we do nothing if the slot is already invalid */
    2017          455 :         if (s->data.invalidated == RS_INVAL_NONE)
    2018          409 :             invalidation_cause = DetermineSlotInvalidationCause(possible_causes,
    2019              :                                                                 s, oldestLSN,
    2020              :                                                                 dboid,
    2021              :                                                                 snapshotConflictHorizon,
    2022              :                                                                 &inactive_since,
    2023              :                                                                 now);
    2024              : 
    2025              :         /* if there's no invalidation, we're done */
    2026          455 :         if (invalidation_cause == RS_INVAL_NONE)
    2027              :         {
    2028          432 :             SpinLockRelease(&s->mutex);
    2029          432 :             if (released_lock)
    2030            0 :                 LWLockRelease(ReplicationSlotControlLock);
    2031          432 :             break;
    2032              :         }
    2033              : 
    2034           23 :         slotname = s->data.name;
    2035           23 :         active_proc = s->active_proc;
    2036              : 
    2037              :         /*
    2038              :          * If the slot can be acquired, do so and mark it invalidated
    2039              :          * immediately.  Otherwise we'll signal the owning process, below, and
    2040              :          * retry.
    2041              :          *
    2042              :          * Note: Unlike other slot attributes, slot's inactive_since can't be
    2043              :          * changed until the acquired slot is released or the owning process
    2044              :          * is terminated. So, the inactive slot can only be invalidated
    2045              :          * immediately without being terminated.
    2046              :          */
    2047           23 :         if (active_proc == INVALID_PROC_NUMBER)
    2048              :         {
    2049           16 :             MyReplicationSlot = s;
    2050           16 :             s->active_proc = MyProcNumber;
    2051           16 :             s->data.invalidated = invalidation_cause;
    2052              : 
    2053              :             /*
    2054              :              * XXX: We should consider not overwriting restart_lsn and instead
    2055              :              * just rely on .invalidated.
    2056              :              */
    2057           16 :             if (invalidation_cause == RS_INVAL_WAL_REMOVED)
    2058              :             {
    2059            5 :                 s->data.restart_lsn = InvalidXLogRecPtr;
    2060            5 :                 s->last_saved_restart_lsn = InvalidXLogRecPtr;
    2061              :             }
    2062              : 
    2063              :             /* Let caller know */
    2064           16 :             invalidated = true;
    2065              :         }
    2066              :         else
    2067              :         {
    2068            7 :             active_pid = GetPGProcByNumber(active_proc)->pid;
    2069              :             Assert(active_pid != 0);
    2070              :         }
    2071              : 
    2072           23 :         SpinLockRelease(&s->mutex);
    2073              : 
    2074              :         /*
    2075              :          * Calculate the idle time duration of the slot if slot is marked
    2076              :          * invalidated with RS_INVAL_IDLE_TIMEOUT.
    2077              :          */
    2078           23 :         if (invalidation_cause == RS_INVAL_IDLE_TIMEOUT)
    2079              :         {
    2080              :             int         slot_idle_usecs;
    2081              : 
    2082            0 :             TimestampDifference(inactive_since, now, &slot_idle_secs,
    2083              :                                 &slot_idle_usecs);
    2084              :         }
    2085              : 
    2086           23 :         if (active_proc != INVALID_PROC_NUMBER)
    2087              :         {
    2088              :             /*
    2089              :              * Prepare the sleep on the slot's condition variable before
    2090              :              * releasing the lock, to close a possible race condition if the
    2091              :              * slot is released before the sleep below.
    2092              :              */
    2093            7 :             ConditionVariablePrepareToSleep(&s->active_cv);
    2094              : 
    2095            7 :             LWLockRelease(ReplicationSlotControlLock);
    2096            7 :             released_lock = true;
    2097              : 
    2098              :             /*
    2099              :              * Signal to terminate the process that owns the slot, if we
    2100              :              * haven't already signalled it.  (Avoidance of repeated
    2101              :              * signalling is the only reason for there to be a loop in this
    2102              :              * routine; otherwise we could rely on caller's restart loop.)
    2103              :              *
    2104              :              * There is the race condition that other process may own the slot
    2105              :              * after its current owner process is terminated and before this
    2106              :              * process owns it. To handle that, we signal only if the PID of
    2107              :              * the owning process has changed from the previous time. (This
    2108              :              * logic assumes that the same PID is not reused very quickly.)
    2109              :              */
    2110            7 :             if (last_signaled_pid != active_pid)
    2111              :             {
    2112            7 :                 ReportSlotInvalidation(invalidation_cause, true, active_pid,
    2113              :                                        slotname, restart_lsn,
    2114              :                                        oldestLSN, snapshotConflictHorizon,
    2115              :                                        slot_idle_secs);
    2116              : 
    2117            7 :                 if (MyBackendType == B_STARTUP)
    2118            5 :                     (void) SignalRecoveryConflict(GetPGProcByNumber(active_proc),
    2119              :                                                   active_pid,
    2120              :                                                   RECOVERY_CONFLICT_LOGICALSLOT);
    2121              :                 else
    2122            2 :                     (void) kill(active_pid, SIGTERM);
    2123              : 
    2124            7 :                 last_signaled_pid = active_pid;
    2125              :             }
    2126              : 
    2127              :             /* Wait until the slot is released. */
    2128            7 :             ConditionVariableSleep(&s->active_cv,
    2129              :                                    WAIT_EVENT_REPLICATION_SLOT_DROP);
    2130              : 
    2131              :             /*
    2132              :              * Re-acquire lock and start over; we expect to invalidate the
    2133              :              * slot next time (unless another process acquires the slot in the
    2134              :              * meantime).
    2135              :              *
    2136              :              * Note: It is possible for a slot to advance its restart_lsn or
    2137              :              * xmin values sufficiently between when we release the mutex and
    2138              :              * when we recheck, moving from a conflicting state to a non
    2139              :              * conflicting state.  This is intentional and safe: if the slot
    2140              :              * has caught up while we're busy here, the resources we were
    2141              :              * concerned about (WAL segments or tuples) have not yet been
    2142              :              * removed, and there's no reason to invalidate the slot.
    2143              :              */
    2144            7 :             LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    2145            7 :             continue;
    2146              :         }
    2147              :         else
    2148              :         {
    2149              :             /*
    2150              :              * We hold the slot now and have already invalidated it; flush it
    2151              :              * to ensure that state persists.
    2152              :              *
    2153              :              * Don't want to hold ReplicationSlotControlLock across file
    2154              :              * system operations, so release it now but be sure to tell caller
    2155              :              * to restart from scratch.
    2156              :              */
    2157           16 :             LWLockRelease(ReplicationSlotControlLock);
    2158           16 :             released_lock = true;
    2159              : 
    2160              :             /* Make sure the invalidated state persists across server restart */
    2161           16 :             ReplicationSlotMarkDirty();
    2162           16 :             ReplicationSlotSave();
    2163           16 :             ReplicationSlotRelease();
    2164              : 
    2165           16 :             ReportSlotInvalidation(invalidation_cause, false, active_pid,
    2166              :                                    slotname, restart_lsn,
    2167              :                                    oldestLSN, snapshotConflictHorizon,
    2168              :                                    slot_idle_secs);
    2169              : 
    2170              :             /* done with this slot for now */
    2171           16 :             break;
    2172              :         }
    2173              :     }
    2174              : 
    2175              :     Assert(released_lock == !LWLockHeldByMe(ReplicationSlotControlLock));
    2176              : 
    2177          448 :     *released_lock_out = released_lock;
    2178          448 :     return invalidated;
    2179              : }
    2180              : 
    2181              : /*
    2182              :  * Invalidate slots that require resources about to be removed.
    2183              :  *
    2184              :  * Returns true when any slot have got invalidated.
    2185              :  *
    2186              :  * Whether a slot needs to be invalidated depends on the invalidation cause.
    2187              :  * A slot is invalidated if it:
    2188              :  * - RS_INVAL_WAL_REMOVED: requires a LSN older than the given segment
    2189              :  * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given
    2190              :  *   db; dboid may be InvalidOid for shared relations
    2191              :  * - RS_INVAL_WAL_LEVEL: is a logical slot and effective_wal_level is not
    2192              :  *   logical.
    2193              :  * - RS_INVAL_IDLE_TIMEOUT: has been idle longer than the configured
    2194              :  *   "idle_replication_slot_timeout" duration.
    2195              :  *
    2196              :  * Note: This function attempts to invalidate the slot for multiple possible
    2197              :  * causes in a single pass, minimizing redundant iterations. The "cause"
    2198              :  * parameter can be a MASK representing one or more of the defined causes.
    2199              :  *
    2200              :  * If it invalidates the last logical slot in the cluster, it requests to
    2201              :  * disable logical decoding.
    2202              :  *
    2203              :  * NB - this runs as part of checkpoint, so avoid raising errors if possible.
    2204              :  */
    2205              : bool
    2206         1861 : InvalidateObsoleteReplicationSlots(uint32 possible_causes,
    2207              :                                    XLogSegNo oldestSegno, Oid dboid,
    2208              :                                    TransactionId snapshotConflictHorizon)
    2209              : {
    2210              :     XLogRecPtr  oldestLSN;
    2211         1861 :     bool        invalidated = false;
    2212         1861 :     bool        invalidated_logical = false;
    2213              :     bool        found_valid_logicalslot;
    2214              : 
    2215              :     Assert(!(possible_causes & RS_INVAL_HORIZON) || TransactionIdIsValid(snapshotConflictHorizon));
    2216              :     Assert(!(possible_causes & RS_INVAL_WAL_REMOVED) || oldestSegno > 0);
    2217              :     Assert(possible_causes != RS_INVAL_NONE);
    2218              : 
    2219         1861 :     if (max_replication_slots == 0)
    2220            1 :         return invalidated;
    2221              : 
    2222         1860 :     XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
    2223              : 
    2224         1876 : restart:
    2225         1876 :     found_valid_logicalslot = false;
    2226         1876 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    2227        20126 :     for (int i = 0; i < max_replication_slots; i++)
    2228              :     {
    2229        18266 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
    2230        18266 :         bool        released_lock = false;
    2231              : 
    2232        18266 :         if (!s->in_use)
    2233        17818 :             continue;
    2234              : 
    2235              :         /* Prevent invalidation of logical slots during binary upgrade */
    2236          457 :         if (SlotIsLogical(s) && IsBinaryUpgrade)
    2237              :         {
    2238            9 :             SpinLockAcquire(&s->mutex);
    2239            9 :             found_valid_logicalslot |= (s->data.invalidated == RS_INVAL_NONE);
    2240            9 :             SpinLockRelease(&s->mutex);
    2241              : 
    2242            9 :             continue;
    2243              :         }
    2244              : 
    2245          448 :         if (InvalidatePossiblyObsoleteSlot(possible_causes, s, oldestLSN,
    2246              :                                            dboid, snapshotConflictHorizon,
    2247              :                                            &released_lock))
    2248              :         {
    2249              :             Assert(released_lock);
    2250              : 
    2251              :             /* Remember we have invalidated a physical or logical slot */
    2252           16 :             invalidated = true;
    2253              : 
    2254              :             /*
    2255              :              * Additionally, remember we have invalidated a logical slot as we
    2256              :              * can request disabling logical decoding later.
    2257              :              */
    2258           16 :             if (SlotIsLogical(s))
    2259           13 :                 invalidated_logical = true;
    2260              :         }
    2261              :         else
    2262              :         {
    2263              :             /*
    2264              :              * We need to check if the slot is invalidated here since
    2265              :              * InvalidatePossiblyObsoleteSlot() returns false also if the slot
    2266              :              * is already invalidated.
    2267              :              */
    2268          432 :             SpinLockAcquire(&s->mutex);
    2269          864 :             found_valid_logicalslot |=
    2270          432 :                 (SlotIsLogical(s) && (s->data.invalidated == RS_INVAL_NONE));
    2271          432 :             SpinLockRelease(&s->mutex);
    2272              :         }
    2273              : 
    2274              :         /* if the lock was released, start from scratch */
    2275          448 :         if (released_lock)
    2276           16 :             goto restart;
    2277              :     }
    2278         1860 :     LWLockRelease(ReplicationSlotControlLock);
    2279              : 
    2280              :     /*
    2281              :      * If any slots have been invalidated, recalculate the resource limits.
    2282              :      */
    2283         1860 :     if (invalidated)
    2284              :     {
    2285           11 :         ReplicationSlotsComputeRequiredXmin(false);
    2286           11 :         ReplicationSlotsComputeRequiredLSN();
    2287              :     }
    2288              : 
    2289              :     /*
    2290              :      * Request the checkpointer to disable logical decoding if no valid
    2291              :      * logical slots remain. If called by the checkpointer during a
    2292              :      * checkpoint, only the request is initiated; actual deactivation is
    2293              :      * deferred until after the checkpoint completes.
    2294              :      */
    2295         1860 :     if (invalidated_logical && !found_valid_logicalslot)
    2296            8 :         RequestDisableLogicalDecoding();
    2297              : 
    2298         1860 :     return invalidated;
    2299              : }
    2300              : 
    2301              : /*
    2302              :  * Flush all replication slots to disk.
    2303              :  *
    2304              :  * It is convenient to flush dirty replication slots at the time of checkpoint.
    2305              :  * Additionally, in case of a shutdown checkpoint, we also identify the slots
    2306              :  * for which the confirmed_flush LSN has been updated since the last time it
    2307              :  * was saved and flush them.
    2308              :  */
    2309              : void
    2310         1835 : CheckPointReplicationSlots(bool is_shutdown)
    2311              : {
    2312              :     int         i;
    2313         1835 :     bool        last_saved_restart_lsn_updated = false;
    2314              : 
    2315         1835 :     elog(DEBUG1, "performing replication slot checkpoint");
    2316              : 
    2317              :     /*
    2318              :      * Prevent any slot from being created/dropped while we're active. As we
    2319              :      * explicitly do *not* want to block iterating over replication_slots or
    2320              :      * acquiring a slot we cannot take the control lock - but that's OK,
    2321              :      * because holding ReplicationSlotAllocationLock is strictly stronger, and
    2322              :      * enough to guarantee that nobody can change the in_use bits on us.
    2323              :      *
    2324              :      * Additionally, acquiring the Allocation lock is necessary to serialize
    2325              :      * the slot flush process with concurrent slot WAL reservation. This
    2326              :      * ensures that the WAL position being reserved is either flushed to disk
    2327              :      * or is beyond or equal to the redo pointer of the current checkpoint
    2328              :      * (See ReplicationSlotReserveWal for details).
    2329              :      */
    2330         1835 :     LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
    2331              : 
    2332        19910 :     for (i = 0; i < max_replication_slots; i++)
    2333              :     {
    2334        18075 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
    2335              :         char        path[MAXPGPATH];
    2336              : 
    2337        18075 :         if (!s->in_use)
    2338        17678 :             continue;
    2339              : 
    2340              :         /* save the slot to disk, locking is handled in SaveSlotToPath() */
    2341          397 :         sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(s->data.name));
    2342              : 
    2343              :         /*
    2344              :          * Slot's data is not flushed each time the confirmed_flush LSN is
    2345              :          * updated as that could lead to frequent writes.  However, we decide
    2346              :          * to force a flush of all logical slot's data at the time of shutdown
    2347              :          * if the confirmed_flush LSN is changed since we last flushed it to
    2348              :          * disk.  This helps in avoiding an unnecessary retreat of the
    2349              :          * confirmed_flush LSN after restart.
    2350              :          */
    2351          397 :         if (is_shutdown && SlotIsLogical(s))
    2352              :         {
    2353           89 :             SpinLockAcquire(&s->mutex);
    2354              : 
    2355           89 :             if (s->data.invalidated == RS_INVAL_NONE &&
    2356           88 :                 s->data.confirmed_flush > s->last_saved_confirmed_flush)
    2357              :             {
    2358           40 :                 s->just_dirtied = true;
    2359           40 :                 s->dirty = true;
    2360              :             }
    2361           89 :             SpinLockRelease(&s->mutex);
    2362              :         }
    2363              : 
    2364              :         /*
    2365              :          * Track if we're going to update slot's last_saved_restart_lsn. We
    2366              :          * need this to know if we need to recompute the required LSN.
    2367              :          */
    2368          397 :         if (s->last_saved_restart_lsn != s->data.restart_lsn)
    2369          207 :             last_saved_restart_lsn_updated = true;
    2370              : 
    2371          397 :         SaveSlotToPath(s, path, LOG);
    2372              :     }
    2373         1835 :     LWLockRelease(ReplicationSlotAllocationLock);
    2374              : 
    2375              :     /*
    2376              :      * Recompute the required LSN if SaveSlotToPath() updated
    2377              :      * last_saved_restart_lsn for any slot.
    2378              :      */
    2379         1835 :     if (last_saved_restart_lsn_updated)
    2380          207 :         ReplicationSlotsComputeRequiredLSN();
    2381         1835 : }
    2382              : 
    2383              : /*
    2384              :  * Load all replication slots from disk into memory at server startup. This
    2385              :  * needs to be run before we start crash recovery.
    2386              :  */
    2387              : void
    2388         1033 : StartupReplicationSlots(void)
    2389              : {
    2390              :     DIR        *replication_dir;
    2391              :     struct dirent *replication_de;
    2392              : 
    2393         1033 :     elog(DEBUG1, "starting up replication slots");
    2394              : 
    2395              :     /* restore all slots by iterating over all on-disk entries */
    2396         1033 :     replication_dir = AllocateDir(PG_REPLSLOT_DIR);
    2397         3214 :     while ((replication_de = ReadDir(replication_dir, PG_REPLSLOT_DIR)) != NULL)
    2398              :     {
    2399              :         char        path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
    2400              :         PGFileType  de_type;
    2401              : 
    2402         2183 :         if (strcmp(replication_de->d_name, ".") == 0 ||
    2403         1152 :             strcmp(replication_de->d_name, "..") == 0)
    2404         2062 :             continue;
    2405              : 
    2406          121 :         snprintf(path, sizeof(path), "%s/%s", PG_REPLSLOT_DIR, replication_de->d_name);
    2407          121 :         de_type = get_dirent_type(path, replication_de, false, DEBUG1);
    2408              : 
    2409              :         /* we're only creating directories here, skip if it's not our's */
    2410          121 :         if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_DIR)
    2411            0 :             continue;
    2412              : 
    2413              :         /* we crashed while a slot was being setup or deleted, clean up */
    2414          121 :         if (pg_str_endswith(replication_de->d_name, ".tmp"))
    2415              :         {
    2416            0 :             if (!rmtree(path, true))
    2417              :             {
    2418            0 :                 ereport(WARNING,
    2419              :                         (errmsg("could not remove directory \"%s\"",
    2420              :                                 path)));
    2421            0 :                 continue;
    2422              :             }
    2423            0 :             fsync_fname(PG_REPLSLOT_DIR, true);
    2424            0 :             continue;
    2425              :         }
    2426              : 
    2427              :         /* looks like a slot in a normal state, restore */
    2428          121 :         RestoreSlotFromDisk(replication_de->d_name);
    2429              :     }
    2430         1031 :     FreeDir(replication_dir);
    2431              : 
    2432              :     /* currently no slots exist, we're done. */
    2433         1031 :     if (max_replication_slots <= 0)
    2434            1 :         return;
    2435              : 
    2436              :     /* Now that we have recovered all the data, compute replication xmin */
    2437         1030 :     ReplicationSlotsComputeRequiredXmin(false);
    2438         1030 :     ReplicationSlotsComputeRequiredLSN();
    2439              : }
    2440              : 
    2441              : /* ----
    2442              :  * Manipulation of on-disk state of replication slots
    2443              :  *
    2444              :  * NB: none of the routines below should take any notice whether a slot is the
    2445              :  * current one or not, that's all handled a layer above.
    2446              :  * ----
    2447              :  */
    2448              : static void
    2449          701 : CreateSlotOnDisk(ReplicationSlot *slot)
    2450              : {
    2451              :     char        tmppath[MAXPGPATH];
    2452              :     char        path[MAXPGPATH];
    2453              :     struct stat st;
    2454              : 
    2455              :     /*
    2456              :      * No need to take out the io_in_progress_lock, nobody else can see this
    2457              :      * slot yet, so nobody else will write. We're reusing SaveSlotToPath which
    2458              :      * takes out the lock, if we'd take the lock here, we'd deadlock.
    2459              :      */
    2460              : 
    2461          701 :     sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(slot->data.name));
    2462          701 :     sprintf(tmppath, "%s/%s.tmp", PG_REPLSLOT_DIR, NameStr(slot->data.name));
    2463              : 
    2464              :     /*
    2465              :      * It's just barely possible that some previous effort to create or drop a
    2466              :      * slot with this name left a temp directory lying around. If that seems
    2467              :      * to be the case, try to remove it.  If the rmtree() fails, we'll error
    2468              :      * out at the MakePGDirectory() below, so we don't bother checking
    2469              :      * success.
    2470              :      */
    2471          701 :     if (stat(tmppath, &st) == 0 && S_ISDIR(st.st_mode))
    2472            0 :         rmtree(tmppath, true);
    2473              : 
    2474              :     /* Create and fsync the temporary slot directory. */
    2475          701 :     if (MakePGDirectory(tmppath) < 0)
    2476            0 :         ereport(ERROR,
    2477              :                 (errcode_for_file_access(),
    2478              :                  errmsg("could not create directory \"%s\": %m",
    2479              :                         tmppath)));
    2480          701 :     fsync_fname(tmppath, true);
    2481              : 
    2482              :     /* Write the actual state file. */
    2483          701 :     slot->dirty = true;          /* signal that we really need to write */
    2484          701 :     SaveSlotToPath(slot, tmppath, ERROR);
    2485              : 
    2486              :     /* Rename the directory into place. */
    2487          701 :     if (rename(tmppath, path) != 0)
    2488            0 :         ereport(ERROR,
    2489              :                 (errcode_for_file_access(),
    2490              :                  errmsg("could not rename file \"%s\" to \"%s\": %m",
    2491              :                         tmppath, path)));
    2492              : 
    2493              :     /*
    2494              :      * If we'd now fail - really unlikely - we wouldn't know whether this slot
    2495              :      * would persist after an OS crash or not - so, force a restart. The
    2496              :      * restart would try to fsync this again till it works.
    2497              :      */
    2498          701 :     START_CRIT_SECTION();
    2499              : 
    2500          701 :     fsync_fname(path, true);
    2501          701 :     fsync_fname(PG_REPLSLOT_DIR, true);
    2502              : 
    2503          701 :     END_CRIT_SECTION();
    2504          701 : }
    2505              : 
    2506              : /*
    2507              :  * Shared functionality between saving and creating a replication slot.
    2508              :  */
    2509              : static void
    2510         2578 : SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
    2511              : {
    2512              :     char        tmppath[MAXPGPATH];
    2513              :     char        path[MAXPGPATH];
    2514              :     int         fd;
    2515              :     ReplicationSlotOnDisk cp;
    2516              :     bool        was_dirty;
    2517              : 
    2518              :     /* first check whether there's something to write out */
    2519         2578 :     SpinLockAcquire(&slot->mutex);
    2520         2578 :     was_dirty = slot->dirty;
    2521         2578 :     slot->just_dirtied = false;
    2522         2578 :     SpinLockRelease(&slot->mutex);
    2523              : 
    2524              :     /* and don't do anything if there's nothing to write */
    2525         2578 :     if (!was_dirty)
    2526          139 :         return;
    2527              : 
    2528         2439 :     LWLockAcquire(&slot->io_in_progress_lock, LW_EXCLUSIVE);
    2529              : 
    2530              :     /* silence valgrind :( */
    2531         2439 :     memset(&cp, 0, sizeof(ReplicationSlotOnDisk));
    2532              : 
    2533         2439 :     sprintf(tmppath, "%s/state.tmp", dir);
    2534         2439 :     sprintf(path, "%s/state", dir);
    2535              : 
    2536         2439 :     fd = OpenTransientFile(tmppath, O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
    2537         2439 :     if (fd < 0)
    2538              :     {
    2539              :         /*
    2540              :          * If not an ERROR, then release the lock before returning.  In case
    2541              :          * of an ERROR, the error recovery path automatically releases the
    2542              :          * lock, but no harm in explicitly releasing even in that case.  Note
    2543              :          * that LWLockRelease() could affect errno.
    2544              :          */
    2545            0 :         int         save_errno = errno;
    2546              : 
    2547            0 :         LWLockRelease(&slot->io_in_progress_lock);
    2548            0 :         errno = save_errno;
    2549            0 :         ereport(elevel,
    2550              :                 (errcode_for_file_access(),
    2551              :                  errmsg("could not create file \"%s\": %m",
    2552              :                         tmppath)));
    2553            0 :         return;
    2554              :     }
    2555              : 
    2556         2439 :     cp.magic = SLOT_MAGIC;
    2557         2439 :     INIT_CRC32C(cp.checksum);
    2558         2439 :     cp.version = SLOT_VERSION;
    2559         2439 :     cp.length = ReplicationSlotOnDiskV2Size;
    2560              : 
    2561         2439 :     SpinLockAcquire(&slot->mutex);
    2562              : 
    2563         2439 :     memcpy(&cp.slotdata, &slot->data, sizeof(ReplicationSlotPersistentData));
    2564              : 
    2565         2439 :     SpinLockRelease(&slot->mutex);
    2566              : 
    2567         2439 :     COMP_CRC32C(cp.checksum,
    2568              :                 (char *) (&cp) + ReplicationSlotOnDiskNotChecksummedSize,
    2569              :                 ReplicationSlotOnDiskChecksummedSize);
    2570         2439 :     FIN_CRC32C(cp.checksum);
    2571              : 
    2572         2439 :     errno = 0;
    2573         2439 :     pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_WRITE);
    2574         2439 :     if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
    2575              :     {
    2576            0 :         int         save_errno = errno;
    2577              : 
    2578            0 :         pgstat_report_wait_end();
    2579            0 :         CloseTransientFile(fd);
    2580            0 :         unlink(tmppath);
    2581            0 :         LWLockRelease(&slot->io_in_progress_lock);
    2582              : 
    2583              :         /* if write didn't set errno, assume problem is no disk space */
    2584            0 :         errno = save_errno ? save_errno : ENOSPC;
    2585            0 :         ereport(elevel,
    2586              :                 (errcode_for_file_access(),
    2587              :                  errmsg("could not write to file \"%s\": %m",
    2588              :                         tmppath)));
    2589            0 :         return;
    2590              :     }
    2591         2439 :     pgstat_report_wait_end();
    2592              : 
    2593              :     /* fsync the temporary file */
    2594         2439 :     pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_SYNC);
    2595         2439 :     if (pg_fsync(fd) != 0)
    2596              :     {
    2597            0 :         int         save_errno = errno;
    2598              : 
    2599            0 :         pgstat_report_wait_end();
    2600            0 :         CloseTransientFile(fd);
    2601            0 :         unlink(tmppath);
    2602            0 :         LWLockRelease(&slot->io_in_progress_lock);
    2603              : 
    2604            0 :         errno = save_errno;
    2605            0 :         ereport(elevel,
    2606              :                 (errcode_for_file_access(),
    2607              :                  errmsg("could not fsync file \"%s\": %m",
    2608              :                         tmppath)));
    2609            0 :         return;
    2610              :     }
    2611         2439 :     pgstat_report_wait_end();
    2612              : 
    2613         2439 :     if (CloseTransientFile(fd) != 0)
    2614              :     {
    2615            0 :         int         save_errno = errno;
    2616              : 
    2617            0 :         unlink(tmppath);
    2618            0 :         LWLockRelease(&slot->io_in_progress_lock);
    2619              : 
    2620            0 :         errno = save_errno;
    2621            0 :         ereport(elevel,
    2622              :                 (errcode_for_file_access(),
    2623              :                  errmsg("could not close file \"%s\": %m",
    2624              :                         tmppath)));
    2625            0 :         return;
    2626              :     }
    2627              : 
    2628              :     /* rename to permanent file, fsync file and directory */
    2629         2439 :     if (rename(tmppath, path) != 0)
    2630              :     {
    2631            0 :         int         save_errno = errno;
    2632              : 
    2633            0 :         unlink(tmppath);
    2634            0 :         LWLockRelease(&slot->io_in_progress_lock);
    2635              : 
    2636            0 :         errno = save_errno;
    2637            0 :         ereport(elevel,
    2638              :                 (errcode_for_file_access(),
    2639              :                  errmsg("could not rename file \"%s\" to \"%s\": %m",
    2640              :                         tmppath, path)));
    2641            0 :         return;
    2642              :     }
    2643              : 
    2644              :     /*
    2645              :      * Check CreateSlotOnDisk() for the reasoning of using a critical section.
    2646              :      */
    2647         2439 :     START_CRIT_SECTION();
    2648              : 
    2649         2439 :     fsync_fname(path, false);
    2650         2439 :     fsync_fname(dir, true);
    2651         2439 :     fsync_fname(PG_REPLSLOT_DIR, true);
    2652              : 
    2653         2439 :     END_CRIT_SECTION();
    2654              : 
    2655              :     /*
    2656              :      * Successfully wrote, unset dirty bit, unless somebody dirtied again
    2657              :      * already and remember the confirmed_flush LSN value.
    2658              :      */
    2659         2439 :     SpinLockAcquire(&slot->mutex);
    2660         2439 :     if (!slot->just_dirtied)
    2661         2416 :         slot->dirty = false;
    2662         2439 :     slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
    2663         2439 :     slot->last_saved_restart_lsn = cp.slotdata.restart_lsn;
    2664         2439 :     SpinLockRelease(&slot->mutex);
    2665              : 
    2666         2439 :     LWLockRelease(&slot->io_in_progress_lock);
    2667              : }
    2668              : 
    2669              : /*
    2670              :  * Load a single slot from disk into memory.
    2671              :  */
    2672              : static void
    2673          121 : RestoreSlotFromDisk(const char *name)
    2674              : {
    2675              :     ReplicationSlotOnDisk cp;
    2676              :     int         i;
    2677              :     char        slotdir[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
    2678              :     char        path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR) + 10];
    2679              :     int         fd;
    2680          121 :     bool        restored = false;
    2681              :     int         readBytes;
    2682              :     pg_crc32c   checksum;
    2683          121 :     TimestampTz now = 0;
    2684              : 
    2685              :     /* no need to lock here, no concurrent access allowed yet */
    2686              : 
    2687              :     /* delete temp file if it exists */
    2688          121 :     sprintf(slotdir, "%s/%s", PG_REPLSLOT_DIR, name);
    2689          121 :     sprintf(path, "%s/state.tmp", slotdir);
    2690          121 :     if (unlink(path) < 0 && errno != ENOENT)
    2691            0 :         ereport(PANIC,
    2692              :                 (errcode_for_file_access(),
    2693              :                  errmsg("could not remove file \"%s\": %m", path)));
    2694              : 
    2695          121 :     sprintf(path, "%s/state", slotdir);
    2696              : 
    2697          121 :     elog(DEBUG1, "restoring replication slot from \"%s\"", path);
    2698              : 
    2699              :     /* on some operating systems fsyncing a file requires O_RDWR */
    2700          121 :     fd = OpenTransientFile(path, O_RDWR | PG_BINARY);
    2701              : 
    2702              :     /*
    2703              :      * We do not need to handle this as we are rename()ing the directory into
    2704              :      * place only after we fsync()ed the state file.
    2705              :      */
    2706          121 :     if (fd < 0)
    2707            0 :         ereport(PANIC,
    2708              :                 (errcode_for_file_access(),
    2709              :                  errmsg("could not open file \"%s\": %m", path)));
    2710              : 
    2711              :     /*
    2712              :      * Sync state file before we're reading from it. We might have crashed
    2713              :      * while it wasn't synced yet and we shouldn't continue on that basis.
    2714              :      */
    2715          121 :     pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_RESTORE_SYNC);
    2716          121 :     if (pg_fsync(fd) != 0)
    2717            0 :         ereport(PANIC,
    2718              :                 (errcode_for_file_access(),
    2719              :                  errmsg("could not fsync file \"%s\": %m",
    2720              :                         path)));
    2721          121 :     pgstat_report_wait_end();
    2722              : 
    2723              :     /* Also sync the parent directory */
    2724          121 :     START_CRIT_SECTION();
    2725          121 :     fsync_fname(slotdir, true);
    2726          121 :     END_CRIT_SECTION();
    2727              : 
    2728              :     /* read part of statefile that's guaranteed to be version independent */
    2729          121 :     pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
    2730          121 :     readBytes = read(fd, &cp, ReplicationSlotOnDiskConstantSize);
    2731          121 :     pgstat_report_wait_end();
    2732          121 :     if (readBytes != ReplicationSlotOnDiskConstantSize)
    2733              :     {
    2734            0 :         if (readBytes < 0)
    2735            0 :             ereport(PANIC,
    2736              :                     (errcode_for_file_access(),
    2737              :                      errmsg("could not read file \"%s\": %m", path)));
    2738              :         else
    2739            0 :             ereport(PANIC,
    2740              :                     (errcode(ERRCODE_DATA_CORRUPTED),
    2741              :                      errmsg("could not read file \"%s\": read %d of %zu",
    2742              :                             path, readBytes,
    2743              :                             (Size) ReplicationSlotOnDiskConstantSize)));
    2744              :     }
    2745              : 
    2746              :     /* verify magic */
    2747          121 :     if (cp.magic != SLOT_MAGIC)
    2748            0 :         ereport(PANIC,
    2749              :                 (errcode(ERRCODE_DATA_CORRUPTED),
    2750              :                  errmsg("replication slot file \"%s\" has wrong magic number: %u instead of %u",
    2751              :                         path, cp.magic, SLOT_MAGIC)));
    2752              : 
    2753              :     /* verify version */
    2754          121 :     if (cp.version != SLOT_VERSION)
    2755            0 :         ereport(PANIC,
    2756              :                 (errcode(ERRCODE_DATA_CORRUPTED),
    2757              :                  errmsg("replication slot file \"%s\" has unsupported version %u",
    2758              :                         path, cp.version)));
    2759              : 
    2760              :     /* boundary check on length */
    2761          121 :     if (cp.length != ReplicationSlotOnDiskV2Size)
    2762            0 :         ereport(PANIC,
    2763              :                 (errcode(ERRCODE_DATA_CORRUPTED),
    2764              :                  errmsg("replication slot file \"%s\" has corrupted length %u",
    2765              :                         path, cp.length)));
    2766              : 
    2767              :     /* Now that we know the size, read the entire file */
    2768          121 :     pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
    2769          242 :     readBytes = read(fd,
    2770              :                      (char *) &cp + ReplicationSlotOnDiskConstantSize,
    2771          121 :                      cp.length);
    2772          121 :     pgstat_report_wait_end();
    2773          121 :     if (readBytes != cp.length)
    2774              :     {
    2775            0 :         if (readBytes < 0)
    2776            0 :             ereport(PANIC,
    2777              :                     (errcode_for_file_access(),
    2778              :                      errmsg("could not read file \"%s\": %m", path)));
    2779              :         else
    2780            0 :             ereport(PANIC,
    2781              :                     (errcode(ERRCODE_DATA_CORRUPTED),
    2782              :                      errmsg("could not read file \"%s\": read %d of %zu",
    2783              :                             path, readBytes, (Size) cp.length)));
    2784              :     }
    2785              : 
    2786          121 :     if (CloseTransientFile(fd) != 0)
    2787            0 :         ereport(PANIC,
    2788              :                 (errcode_for_file_access(),
    2789              :                  errmsg("could not close file \"%s\": %m", path)));
    2790              : 
    2791              :     /* now verify the CRC */
    2792          121 :     INIT_CRC32C(checksum);
    2793          121 :     COMP_CRC32C(checksum,
    2794              :                 (char *) &cp + ReplicationSlotOnDiskNotChecksummedSize,
    2795              :                 ReplicationSlotOnDiskChecksummedSize);
    2796          121 :     FIN_CRC32C(checksum);
    2797              : 
    2798          121 :     if (!EQ_CRC32C(checksum, cp.checksum))
    2799            0 :         ereport(PANIC,
    2800              :                 (errmsg("checksum mismatch for replication slot file \"%s\": is %u, should be %u",
    2801              :                         path, checksum, cp.checksum)));
    2802              : 
    2803              :     /*
    2804              :      * If we crashed with an ephemeral slot active, don't restore but delete
    2805              :      * it.
    2806              :      */
    2807          121 :     if (cp.slotdata.persistency != RS_PERSISTENT)
    2808              :     {
    2809            0 :         if (!rmtree(slotdir, true))
    2810              :         {
    2811            0 :             ereport(WARNING,
    2812              :                     (errmsg("could not remove directory \"%s\"",
    2813              :                             slotdir)));
    2814              :         }
    2815            0 :         fsync_fname(PG_REPLSLOT_DIR, true);
    2816            0 :         return;
    2817              :     }
    2818              : 
    2819              :     /*
    2820              :      * Verify that requirements for the specific slot type are met. That's
    2821              :      * important because if these aren't met we're not guaranteed to retain
    2822              :      * all the necessary resources for the slot.
    2823              :      *
    2824              :      * NB: We have to do so *after* the above checks for ephemeral slots,
    2825              :      * because otherwise a slot that shouldn't exist anymore could prevent
    2826              :      * restarts.
    2827              :      *
    2828              :      * NB: Changing the requirements here also requires adapting
    2829              :      * CheckSlotRequirements() and CheckLogicalDecodingRequirements().
    2830              :      */
    2831          121 :     if (cp.slotdata.database != InvalidOid)
    2832              :     {
    2833           83 :         if (wal_level < WAL_LEVEL_REPLICA)
    2834            1 :             ereport(FATAL,
    2835              :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2836              :                      errmsg("logical replication slot \"%s\" exists, but \"wal_level\" < \"replica\"",
    2837              :                             NameStr(cp.slotdata.name)),
    2838              :                      errhint("Change \"wal_level\" to be \"replica\" or higher.")));
    2839              : 
    2840              :         /*
    2841              :          * In standby mode, the hot standby must be enabled. This check is
    2842              :          * necessary to ensure logical slots are invalidated when they become
    2843              :          * incompatible due to insufficient wal_level. Otherwise, if the
    2844              :          * primary reduces effective_wal_level < logical while hot standby is
    2845              :          * disabled, primary disable logical decoding while hot standby is
    2846              :          * disabled, logical slots would remain valid even after promotion.
    2847              :          */
    2848           82 :         if (StandbyMode && !EnableHotStandby)
    2849            1 :             ereport(FATAL,
    2850              :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2851              :                      errmsg("logical replication slot \"%s\" exists on the standby, but \"hot_standby\" = \"off\"",
    2852              :                             NameStr(cp.slotdata.name)),
    2853              :                      errhint("Change \"hot_standby\" to be \"on\".")));
    2854              :     }
    2855           38 :     else if (wal_level < WAL_LEVEL_REPLICA)
    2856            0 :         ereport(FATAL,
    2857              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2858              :                  errmsg("physical replication slot \"%s\" exists, but \"wal_level\" < \"replica\"",
    2859              :                         NameStr(cp.slotdata.name)),
    2860              :                  errhint("Change \"wal_level\" to be \"replica\" or higher.")));
    2861              : 
    2862              :     /* nothing can be active yet, don't lock anything */
    2863          176 :     for (i = 0; i < max_replication_slots; i++)
    2864              :     {
    2865              :         ReplicationSlot *slot;
    2866              : 
    2867          176 :         slot = &ReplicationSlotCtl->replication_slots[i];
    2868              : 
    2869          176 :         if (slot->in_use)
    2870           57 :             continue;
    2871              : 
    2872              :         /* restore the entire set of persistent data */
    2873          119 :         memcpy(&slot->data, &cp.slotdata,
    2874              :                sizeof(ReplicationSlotPersistentData));
    2875              : 
    2876              :         /* initialize in memory state */
    2877          119 :         slot->effective_xmin = cp.slotdata.xmin;
    2878          119 :         slot->effective_catalog_xmin = cp.slotdata.catalog_xmin;
    2879          119 :         slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
    2880          119 :         slot->last_saved_restart_lsn = cp.slotdata.restart_lsn;
    2881              : 
    2882          119 :         slot->candidate_catalog_xmin = InvalidTransactionId;
    2883          119 :         slot->candidate_xmin_lsn = InvalidXLogRecPtr;
    2884          119 :         slot->candidate_restart_lsn = InvalidXLogRecPtr;
    2885          119 :         slot->candidate_restart_valid = InvalidXLogRecPtr;
    2886              : 
    2887          119 :         slot->in_use = true;
    2888          119 :         slot->active_proc = INVALID_PROC_NUMBER;
    2889              : 
    2890              :         /*
    2891              :          * Set the time since the slot has become inactive after loading the
    2892              :          * slot from the disk into memory. Whoever acquires the slot i.e.
    2893              :          * makes the slot active will reset it. Use the same inactive_since
    2894              :          * time for all the slots.
    2895              :          */
    2896          119 :         if (now == 0)
    2897          119 :             now = GetCurrentTimestamp();
    2898              : 
    2899          119 :         ReplicationSlotSetInactiveSince(slot, now, false);
    2900              : 
    2901          119 :         restored = true;
    2902          119 :         break;
    2903              :     }
    2904              : 
    2905          119 :     if (!restored)
    2906            0 :         ereport(FATAL,
    2907              :                 (errmsg("too many replication slots active before shutdown"),
    2908              :                  errhint("Increase \"max_replication_slots\" and try again.")));
    2909              : }
    2910              : 
    2911              : /*
    2912              :  * Maps an invalidation reason for a replication slot to
    2913              :  * ReplicationSlotInvalidationCause.
    2914              :  */
    2915              : ReplicationSlotInvalidationCause
    2916            0 : GetSlotInvalidationCause(const char *cause_name)
    2917              : {
    2918              :     Assert(cause_name);
    2919              : 
    2920              :     /* Search lookup table for the cause having this name */
    2921            0 :     for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
    2922              :     {
    2923            0 :         if (strcmp(SlotInvalidationCauses[i].cause_name, cause_name) == 0)
    2924            0 :             return SlotInvalidationCauses[i].cause;
    2925              :     }
    2926              : 
    2927              :     Assert(false);
    2928            0 :     return RS_INVAL_NONE;       /* to keep compiler quiet */
    2929              : }
    2930              : 
    2931              : /*
    2932              :  * Maps a ReplicationSlotInvalidationCause to the invalidation
    2933              :  * reason for a replication slot.
    2934              :  */
    2935              : const char *
    2936           45 : GetSlotInvalidationCauseName(ReplicationSlotInvalidationCause cause)
    2937              : {
    2938              :     /* Search lookup table for the name of this cause */
    2939          146 :     for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
    2940              :     {
    2941          146 :         if (SlotInvalidationCauses[i].cause == cause)
    2942           45 :             return SlotInvalidationCauses[i].cause_name;
    2943              :     }
    2944              : 
    2945              :     Assert(false);
    2946            0 :     return "none";                /* to keep compiler quiet */
    2947              : }
    2948              : 
    2949              : /*
    2950              :  * A helper function to validate slots specified in GUC synchronized_standby_slots.
    2951              :  *
    2952              :  * The rawname will be parsed, and the result will be saved into *elemlist.
    2953              :  */
    2954              : static bool
    2955           19 : validate_sync_standby_slots(char *rawname, List **elemlist)
    2956              : {
    2957              :     /* Verify syntax and parse string into a list of identifiers */
    2958           19 :     if (!SplitIdentifierString(rawname, ',', elemlist))
    2959              :     {
    2960            0 :         GUC_check_errdetail("List syntax is invalid.");
    2961            0 :         return false;
    2962              :     }
    2963              : 
    2964              :     /* Iterate the list to validate each slot name */
    2965           53 :     foreach_ptr(char, name, *elemlist)
    2966              :     {
    2967              :         int         err_code;
    2968           19 :         char       *err_msg = NULL;
    2969           19 :         char       *err_hint = NULL;
    2970              : 
    2971           19 :         if (!ReplicationSlotValidateNameInternal(name, false, &err_code,
    2972              :                                                  &err_msg, &err_hint))
    2973              :         {
    2974            2 :             GUC_check_errcode(err_code);
    2975            2 :             GUC_check_errdetail("%s", err_msg);
    2976            2 :             if (err_hint != NULL)
    2977            2 :                 GUC_check_errhint("%s", err_hint);
    2978            2 :             return false;
    2979              :         }
    2980              :     }
    2981              : 
    2982           17 :     return true;
    2983              : }
    2984              : 
    2985              : /*
    2986              :  * GUC check_hook for synchronized_standby_slots
    2987              :  */
    2988              : bool
    2989         1254 : check_synchronized_standby_slots(char **newval, void **extra, GucSource source)
    2990              : {
    2991              :     char       *rawname;
    2992              :     char       *ptr;
    2993              :     List       *elemlist;
    2994              :     int         size;
    2995              :     bool        ok;
    2996              :     SyncStandbySlotsConfigData *config;
    2997              : 
    2998         1254 :     if ((*newval)[0] == '\0')
    2999         1235 :         return true;
    3000              : 
    3001              :     /* Need a modifiable copy of the GUC string */
    3002           19 :     rawname = pstrdup(*newval);
    3003              : 
    3004              :     /* Now verify if the specified slots exist and have correct type */
    3005           19 :     ok = validate_sync_standby_slots(rawname, &elemlist);
    3006              : 
    3007           19 :     if (!ok || elemlist == NIL)
    3008              :     {
    3009            2 :         pfree(rawname);
    3010            2 :         list_free(elemlist);
    3011            2 :         return ok;
    3012              :     }
    3013              : 
    3014              :     /* Compute the size required for the SyncStandbySlotsConfigData struct */
    3015           17 :     size = offsetof(SyncStandbySlotsConfigData, slot_names);
    3016           51 :     foreach_ptr(char, slot_name, elemlist)
    3017           17 :         size += strlen(slot_name) + 1;
    3018              : 
    3019              :     /* GUC extra value must be guc_malloc'd, not palloc'd */
    3020           17 :     config = (SyncStandbySlotsConfigData *) guc_malloc(LOG, size);
    3021           17 :     if (!config)
    3022            0 :         return false;
    3023              : 
    3024              :     /* Transform the data into SyncStandbySlotsConfigData */
    3025           17 :     config->nslotnames = list_length(elemlist);
    3026              : 
    3027           17 :     ptr = config->slot_names;
    3028           51 :     foreach_ptr(char, slot_name, elemlist)
    3029              :     {
    3030           17 :         strcpy(ptr, slot_name);
    3031           17 :         ptr += strlen(slot_name) + 1;
    3032              :     }
    3033              : 
    3034           17 :     *extra = config;
    3035              : 
    3036           17 :     pfree(rawname);
    3037           17 :     list_free(elemlist);
    3038           17 :     return true;
    3039              : }
    3040              : 
    3041              : /*
    3042              :  * GUC assign_hook for synchronized_standby_slots
    3043              :  */
    3044              : void
    3045         1251 : assign_synchronized_standby_slots(const char *newval, void *extra)
    3046              : {
    3047              :     /*
    3048              :      * The standby slots may have changed, so we must recompute the oldest
    3049              :      * LSN.
    3050              :      */
    3051         1251 :     ss_oldest_flush_lsn = InvalidXLogRecPtr;
    3052              : 
    3053         1251 :     synchronized_standby_slots_config = (SyncStandbySlotsConfigData *) extra;
    3054         1251 : }
    3055              : 
    3056              : /*
    3057              :  * Check if the passed slot_name is specified in the synchronized_standby_slots GUC.
    3058              :  */
    3059              : bool
    3060        37567 : SlotExistsInSyncStandbySlots(const char *slot_name)
    3061              : {
    3062              :     const char *standby_slot_name;
    3063              : 
    3064              :     /* Return false if there is no value in synchronized_standby_slots */
    3065        37567 :     if (synchronized_standby_slots_config == NULL)
    3066        37554 :         return false;
    3067              : 
    3068              :     /*
    3069              :      * XXX: We are not expecting this list to be long so a linear search
    3070              :      * shouldn't hurt but if that turns out not to be true then we can cache
    3071              :      * this information for each WalSender as well.
    3072              :      */
    3073           13 :     standby_slot_name = synchronized_standby_slots_config->slot_names;
    3074           21 :     for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
    3075              :     {
    3076           13 :         if (strcmp(standby_slot_name, slot_name) == 0)
    3077            5 :             return true;
    3078              : 
    3079            8 :         standby_slot_name += strlen(standby_slot_name) + 1;
    3080              :     }
    3081              : 
    3082            8 :     return false;
    3083              : }
    3084              : 
    3085              : /*
    3086              :  * Return true if the slots specified in synchronized_standby_slots have caught up to
    3087              :  * the given WAL location, false otherwise.
    3088              :  *
    3089              :  * The elevel parameter specifies the error level used for logging messages
    3090              :  * related to slots that do not exist, are invalidated, or are inactive.
    3091              :  */
    3092              : bool
    3093          690 : StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
    3094              : {
    3095              :     const char *name;
    3096          690 :     int         caught_up_slot_num = 0;
    3097          690 :     XLogRecPtr  min_restart_lsn = InvalidXLogRecPtr;
    3098              : 
    3099              :     /*
    3100              :      * Don't need to wait for the standbys to catch up if there is no value in
    3101              :      * synchronized_standby_slots.
    3102              :      */
    3103          690 :     if (synchronized_standby_slots_config == NULL)
    3104          667 :         return true;
    3105              : 
    3106              :     /*
    3107              :      * Don't need to wait for the standbys to catch up if we are on a standby
    3108              :      * server, since we do not support syncing slots to cascading standbys.
    3109              :      */
    3110           23 :     if (RecoveryInProgress())
    3111            0 :         return true;
    3112              : 
    3113              :     /*
    3114              :      * Don't need to wait for the standbys to catch up if they are already
    3115              :      * beyond the specified WAL location.
    3116              :      */
    3117           23 :     if (XLogRecPtrIsValid(ss_oldest_flush_lsn) &&
    3118           12 :         ss_oldest_flush_lsn >= wait_for_lsn)
    3119            7 :         return true;
    3120              : 
    3121              :     /*
    3122              :      * To prevent concurrent slot dropping and creation while filtering the
    3123              :      * slots, take the ReplicationSlotControlLock outside of the loop.
    3124              :      */
    3125           16 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    3126              : 
    3127           16 :     name = synchronized_standby_slots_config->slot_names;
    3128           22 :     for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
    3129              :     {
    3130              :         XLogRecPtr  restart_lsn;
    3131              :         bool        invalidated;
    3132              :         bool        inactive;
    3133              :         ReplicationSlot *slot;
    3134              : 
    3135           16 :         slot = SearchNamedReplicationSlot(name, false);
    3136              : 
    3137              :         /*
    3138              :          * If a slot name provided in synchronized_standby_slots does not
    3139              :          * exist, report a message and exit the loop.
    3140              :          */
    3141           16 :         if (!slot)
    3142              :         {
    3143            0 :             ereport(elevel,
    3144              :                     errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    3145              :                     errmsg("replication slot \"%s\" specified in parameter \"%s\" does not exist",
    3146              :                            name, "synchronized_standby_slots"),
    3147              :                     errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
    3148              :                               name),
    3149              :                     errhint("Create the replication slot \"%s\" or amend parameter \"%s\".",
    3150              :                             name, "synchronized_standby_slots"));
    3151            0 :             break;
    3152              :         }
    3153              : 
    3154              :         /* Same as above: if a slot is not physical, exit the loop. */
    3155           16 :         if (SlotIsLogical(slot))
    3156              :         {
    3157            0 :             ereport(elevel,
    3158              :                     errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    3159              :                     errmsg("cannot specify logical replication slot \"%s\" in parameter \"%s\"",
    3160              :                            name, "synchronized_standby_slots"),
    3161              :                     errdetail("Logical replication is waiting for correction on replication slot \"%s\".",
    3162              :                               name),
    3163              :                     errhint("Remove the logical replication slot \"%s\" from parameter \"%s\".",
    3164              :                             name, "synchronized_standby_slots"));
    3165            0 :             break;
    3166              :         }
    3167              : 
    3168           16 :         SpinLockAcquire(&slot->mutex);
    3169           16 :         restart_lsn = slot->data.restart_lsn;
    3170           16 :         invalidated = slot->data.invalidated != RS_INVAL_NONE;
    3171           16 :         inactive = slot->active_proc == INVALID_PROC_NUMBER;
    3172           16 :         SpinLockRelease(&slot->mutex);
    3173              : 
    3174           16 :         if (invalidated)
    3175              :         {
    3176              :             /* Specified physical slot has been invalidated */
    3177            0 :             ereport(elevel,
    3178              :                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    3179              :                     errmsg("physical replication slot \"%s\" specified in parameter \"%s\" has been invalidated",
    3180              :                            name, "synchronized_standby_slots"),
    3181              :                     errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
    3182              :                               name),
    3183              :                     errhint("Drop and recreate the replication slot \"%s\", or amend parameter \"%s\".",
    3184              :                             name, "synchronized_standby_slots"));
    3185            0 :             break;
    3186              :         }
    3187              : 
    3188           16 :         if (!XLogRecPtrIsValid(restart_lsn) || restart_lsn < wait_for_lsn)
    3189              :         {
    3190              :             /* Log a message if no active_pid for this physical slot */
    3191           10 :             if (inactive)
    3192            9 :                 ereport(elevel,
    3193              :                         errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    3194              :                         errmsg("replication slot \"%s\" specified in parameter \"%s\" does not have active_pid",
    3195              :                                name, "synchronized_standby_slots"),
    3196              :                         errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
    3197              :                                   name),
    3198              :                         errhint("Start the standby associated with the replication slot \"%s\", or amend parameter \"%s\".",
    3199              :                                 name, "synchronized_standby_slots"));
    3200              : 
    3201              :             /* Continue if the current slot hasn't caught up. */
    3202           10 :             break;
    3203              :         }
    3204              : 
    3205              :         Assert(restart_lsn >= wait_for_lsn);
    3206              : 
    3207            6 :         if (!XLogRecPtrIsValid(min_restart_lsn) ||
    3208              :             min_restart_lsn > restart_lsn)
    3209            6 :             min_restart_lsn = restart_lsn;
    3210              : 
    3211            6 :         caught_up_slot_num++;
    3212              : 
    3213            6 :         name += strlen(name) + 1;
    3214              :     }
    3215              : 
    3216           16 :     LWLockRelease(ReplicationSlotControlLock);
    3217              : 
    3218              :     /*
    3219              :      * Return false if not all the standbys have caught up to the specified
    3220              :      * WAL location.
    3221              :      */
    3222           16 :     if (caught_up_slot_num != synchronized_standby_slots_config->nslotnames)
    3223           10 :         return false;
    3224              : 
    3225              :     /* The ss_oldest_flush_lsn must not retreat. */
    3226              :     Assert(!XLogRecPtrIsValid(ss_oldest_flush_lsn) ||
    3227              :            min_restart_lsn >= ss_oldest_flush_lsn);
    3228              : 
    3229            6 :     ss_oldest_flush_lsn = min_restart_lsn;
    3230              : 
    3231            6 :     return true;
    3232              : }
    3233              : 
    3234              : /*
    3235              :  * Wait for physical standbys to confirm receiving the given lsn.
    3236              :  *
    3237              :  * Used by logical decoding SQL functions. It waits for physical standbys
    3238              :  * corresponding to the physical slots specified in the synchronized_standby_slots GUC.
    3239              :  */
    3240              : void
    3241          231 : WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
    3242              : {
    3243              :     /*
    3244              :      * Don't need to wait for the standby to catch up if the current acquired
    3245              :      * slot is not a logical failover slot, or there is no value in
    3246              :      * synchronized_standby_slots.
    3247              :      */
    3248          231 :     if (!MyReplicationSlot->data.failover || !synchronized_standby_slots_config)
    3249          230 :         return;
    3250              : 
    3251            1 :     ConditionVariablePrepareToSleep(&WalSndCtl->wal_confirm_rcv_cv);
    3252              : 
    3253              :     for (;;)
    3254              :     {
    3255            2 :         CHECK_FOR_INTERRUPTS();
    3256              : 
    3257            2 :         if (ConfigReloadPending)
    3258              :         {
    3259            1 :             ConfigReloadPending = false;
    3260            1 :             ProcessConfigFile(PGC_SIGHUP);
    3261              :         }
    3262              : 
    3263              :         /* Exit if done waiting for every slot. */
    3264            2 :         if (StandbySlotsHaveCaughtup(wait_for_lsn, WARNING))
    3265            1 :             break;
    3266              : 
    3267              :         /*
    3268              :          * Wait for the slots in the synchronized_standby_slots to catch up,
    3269              :          * but use a timeout (1s) so we can also check if the
    3270              :          * synchronized_standby_slots has been changed.
    3271              :          */
    3272            1 :         ConditionVariableTimedSleep(&WalSndCtl->wal_confirm_rcv_cv, 1000,
    3273              :                                     WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION);
    3274              :     }
    3275              : 
    3276            1 :     ConditionVariableCancelSleep();
    3277              : }
        

Generated by: LCOV version 2.0-1