LCOV - code coverage report
Current view: top level - src/backend/replication - slot.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 748 872 85.8 %
Date: 2024-11-21 08:14:44 Functions: 41 42 97.6 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * slot.c
       4             :  *     Replication slot management.
       5             :  *
       6             :  *
       7             :  * Copyright (c) 2012-2024, PostgreSQL Global Development Group
       8             :  *
       9             :  *
      10             :  * IDENTIFICATION
      11             :  *    src/backend/replication/slot.c
      12             :  *
      13             :  * NOTES
      14             :  *
      15             :  * Replication slots are used to keep state about replication streams
      16             :  * originating from this cluster.  Their primary purpose is to prevent the
      17             :  * premature removal of WAL or of old tuple versions in a manner that would
      18             :  * interfere with replication; they are also useful for monitoring purposes.
      19             :  * Slots need to be permanent (to allow restarts), crash-safe, and allocatable
      20             :  * on standbys (to support cascading setups).  The requirement that slots be
      21             :  * usable on standbys precludes storing them in the system catalogs.
      22             :  *
      23             :  * Each replication slot gets its own directory inside the directory
      24             :  * $PGDATA / PG_REPLSLOT_DIR.  Inside that directory the state file will
      25             :  * contain the slot's own data.  Additional data can be stored alongside that
      26             :  * file if required.  While the server is running, the state data is also
      27             :  * cached in memory for efficiency.
      28             :  *
      29             :  * ReplicationSlotAllocationLock must be taken in exclusive mode to allocate
      30             :  * or free a slot. ReplicationSlotControlLock must be taken in shared mode
      31             :  * to iterate over the slots, and in exclusive mode to change the in_use flag
      32             :  * of a slot.  The remaining data in each slot is protected by its mutex.
      33             :  *
      34             :  *-------------------------------------------------------------------------
      35             :  */
      36             : 
      37             : #include "postgres.h"
      38             : 
      39             : #include <unistd.h>
      40             : #include <sys/stat.h>
      41             : 
      42             : #include "access/transam.h"
      43             : #include "access/xlog_internal.h"
      44             : #include "access/xlogrecovery.h"
      45             : #include "common/file_utils.h"
      46             : #include "common/string.h"
      47             : #include "miscadmin.h"
      48             : #include "pgstat.h"
      49             : #include "postmaster/interrupt.h"
      50             : #include "replication/slotsync.h"
      51             : #include "replication/slot.h"
      52             : #include "replication/walsender_private.h"
      53             : #include "storage/fd.h"
      54             : #include "storage/ipc.h"
      55             : #include "storage/proc.h"
      56             : #include "storage/procarray.h"
      57             : #include "utils/builtins.h"
      58             : #include "utils/guc_hooks.h"
      59             : #include "utils/varlena.h"
      60             : 
      61             : /*
      62             :  * Replication slot on-disk data structure.
      63             :  */
      64             : typedef struct ReplicationSlotOnDisk
      65             : {
      66             :     /* first part of this struct needs to be version independent */
      67             : 
      68             :     /* data not covered by checksum */
      69             :     uint32      magic;
      70             :     pg_crc32c   checksum;
      71             : 
      72             :     /* data covered by checksum */
      73             :     uint32      version;
      74             :     uint32      length;
      75             : 
      76             :     /*
      77             :      * The actual data in the slot that follows can differ based on the above
      78             :      * 'version'.
      79             :      */
      80             : 
      81             :     ReplicationSlotPersistentData slotdata;
      82             : } ReplicationSlotOnDisk;
      83             : 
      84             : /*
      85             :  * Struct for the configuration of synchronized_standby_slots.
      86             :  *
      87             :  * Note: this must be a flat representation that can be held in a single chunk
      88             :  * of guc_malloc'd memory, so that it can be stored as the "extra" data for the
      89             :  * synchronized_standby_slots GUC.
      90             :  */
      91             : typedef struct
      92             : {
      93             :     /* Number of slot names in the slot_names[] */
      94             :     int         nslotnames;
      95             : 
      96             :     /*
      97             :      * slot_names contains 'nslotnames' consecutive null-terminated C strings.
      98             :      */
      99             :     char        slot_names[FLEXIBLE_ARRAY_MEMBER];
     100             : } SyncStandbySlotsConfigData;
     101             : 
     102             : /*
     103             :  * Lookup table for slot invalidation causes.
     104             :  */
     105             : const char *const SlotInvalidationCauses[] = {
     106             :     [RS_INVAL_NONE] = "none",
     107             :     [RS_INVAL_WAL_REMOVED] = "wal_removed",
     108             :     [RS_INVAL_HORIZON] = "rows_removed",
     109             :     [RS_INVAL_WAL_LEVEL] = "wal_level_insufficient",
     110             : };
     111             : 
     112             : /* Maximum number of invalidation causes */
     113             : #define RS_INVAL_MAX_CAUSES RS_INVAL_WAL_LEVEL
     114             : 
     115             : StaticAssertDecl(lengthof(SlotInvalidationCauses) == (RS_INVAL_MAX_CAUSES + 1),
     116             :                  "array length mismatch");
     117             : 
     118             : /* size of version independent data */
     119             : #define ReplicationSlotOnDiskConstantSize \
     120             :     offsetof(ReplicationSlotOnDisk, slotdata)
     121             : /* size of the part of the slot not covered by the checksum */
     122             : #define ReplicationSlotOnDiskNotChecksummedSize  \
     123             :     offsetof(ReplicationSlotOnDisk, version)
     124             : /* size of the part covered by the checksum */
     125             : #define ReplicationSlotOnDiskChecksummedSize \
     126             :     sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskNotChecksummedSize
     127             : /* size of the slot data that is version dependent */
     128             : #define ReplicationSlotOnDiskV2Size \
     129             :     sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
     130             : 
     131             : #define SLOT_MAGIC      0x1051CA1   /* format identifier */
     132             : #define SLOT_VERSION    5       /* version for new files */
     133             : 
     134             : /* Control array for replication slot management */
     135             : ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
     136             : 
     137             : /* My backend's replication slot in the shared memory array */
     138             : ReplicationSlot *MyReplicationSlot = NULL;
     139             : 
     140             : /* GUC variables */
     141             : int         max_replication_slots = 10; /* the maximum number of replication
     142             :                                          * slots */
     143             : 
     144             : /*
     145             :  * This GUC lists streaming replication standby server slot names that
     146             :  * logical WAL sender processes will wait for.
     147             :  */
     148             : char       *synchronized_standby_slots;
     149             : 
     150             : /* This is the parsed and cached configuration for synchronized_standby_slots */
     151             : static SyncStandbySlotsConfigData *synchronized_standby_slots_config;
     152             : 
     153             : /*
     154             :  * Oldest LSN that has been confirmed to be flushed to the standbys
     155             :  * corresponding to the physical slots specified in the synchronized_standby_slots GUC.
     156             :  */
     157             : static XLogRecPtr ss_oldest_flush_lsn = InvalidXLogRecPtr;
     158             : 
     159             : static void ReplicationSlotShmemExit(int code, Datum arg);
     160             : static void ReplicationSlotDropPtr(ReplicationSlot *slot);
     161             : 
     162             : /* internal persistency functions */
     163             : static void RestoreSlotFromDisk(const char *name);
     164             : static void CreateSlotOnDisk(ReplicationSlot *slot);
     165             : static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel);
     166             : 
     167             : /*
     168             :  * Report shared-memory space needed by ReplicationSlotsShmemInit.
     169             :  */
     170             : Size
     171        7334 : ReplicationSlotsShmemSize(void)
     172             : {
     173        7334 :     Size        size = 0;
     174             : 
     175        7334 :     if (max_replication_slots == 0)
     176           4 :         return size;
     177             : 
     178        7330 :     size = offsetof(ReplicationSlotCtlData, replication_slots);
     179        7330 :     size = add_size(size,
     180             :                     mul_size(max_replication_slots, sizeof(ReplicationSlot)));
     181             : 
     182        7330 :     return size;
     183             : }
     184             : 
     185             : /*
     186             :  * Allocate and initialize shared memory for replication slots.
     187             :  */
     188             : void
     189        1902 : ReplicationSlotsShmemInit(void)
     190             : {
     191             :     bool        found;
     192             : 
     193        1902 :     if (max_replication_slots == 0)
     194           2 :         return;
     195             : 
     196        1900 :     ReplicationSlotCtl = (ReplicationSlotCtlData *)
     197        1900 :         ShmemInitStruct("ReplicationSlot Ctl", ReplicationSlotsShmemSize(),
     198             :                         &found);
     199             : 
     200        1900 :     if (!found)
     201             :     {
     202             :         int         i;
     203             : 
     204             :         /* First time through, so initialize */
     205        3580 :         MemSet(ReplicationSlotCtl, 0, ReplicationSlotsShmemSize());
     206             : 
     207       20524 :         for (i = 0; i < max_replication_slots; i++)
     208             :         {
     209       18624 :             ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i];
     210             : 
     211             :             /* everything else is zeroed by the memset above */
     212       18624 :             SpinLockInit(&slot->mutex);
     213       18624 :             LWLockInitialize(&slot->io_in_progress_lock,
     214             :                              LWTRANCHE_REPLICATION_SLOT_IO);
     215       18624 :             ConditionVariableInit(&slot->active_cv);
     216             :         }
     217             :     }
     218             : }
     219             : 
     220             : /*
     221             :  * Register the callback for replication slot cleanup and releasing.
     222             :  */
     223             : void
     224       35550 : ReplicationSlotInitialize(void)
     225             : {
     226       35550 :     before_shmem_exit(ReplicationSlotShmemExit, 0);
     227       35550 : }
     228             : 
     229             : /*
     230             :  * Release and cleanup replication slots.
     231             :  */
     232             : static void
     233       35550 : ReplicationSlotShmemExit(int code, Datum arg)
     234             : {
     235             :     /* Make sure active replication slots are released */
     236       35550 :     if (MyReplicationSlot != NULL)
     237         410 :         ReplicationSlotRelease();
     238             : 
     239             :     /* Also cleanup all the temporary slots. */
     240       35550 :     ReplicationSlotCleanup(false);
     241       35550 : }
     242             : 
     243             : /*
     244             :  * Check whether the passed slot name is valid and report errors at elevel.
     245             :  *
     246             :  * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow
     247             :  * the name to be used as a directory name on every supported OS.
     248             :  *
     249             :  * Returns whether the directory name is valid or not if elevel < ERROR.
     250             :  */
     251             : bool
     252        1632 : ReplicationSlotValidateName(const char *name, int elevel)
     253             : {
     254             :     const char *cp;
     255             : 
     256        1632 :     if (strlen(name) == 0)
     257             :     {
     258           6 :         ereport(elevel,
     259             :                 (errcode(ERRCODE_INVALID_NAME),
     260             :                  errmsg("replication slot name \"%s\" is too short",
     261             :                         name)));
     262           0 :         return false;
     263             :     }
     264             : 
     265        1626 :     if (strlen(name) >= NAMEDATALEN)
     266             :     {
     267           0 :         ereport(elevel,
     268             :                 (errcode(ERRCODE_NAME_TOO_LONG),
     269             :                  errmsg("replication slot name \"%s\" is too long",
     270             :                         name)));
     271           0 :         return false;
     272             :     }
     273             : 
     274       32552 :     for (cp = name; *cp; cp++)
     275             :     {
     276       30928 :         if (!((*cp >= 'a' && *cp <= 'z')
     277       15196 :               || (*cp >= '0' && *cp <= '9')
     278        2990 :               || (*cp == '_')))
     279             :         {
     280           2 :             ereport(elevel,
     281             :                     (errcode(ERRCODE_INVALID_NAME),
     282             :                      errmsg("replication slot name \"%s\" contains invalid character",
     283             :                             name),
     284             :                      errhint("Replication slot names may only contain lower case letters, numbers, and the underscore character.")));
     285           0 :             return false;
     286             :         }
     287             :     }
     288        1624 :     return true;
     289             : }
     290             : 
     291             : /*
     292             :  * Create a new replication slot and mark it as used by this backend.
     293             :  *
     294             :  * name: Name of the slot
     295             :  * db_specific: logical decoding is db specific; if the slot is going to
     296             :  *     be used for that pass true, otherwise false.
     297             :  * two_phase: Allows decoding of prepared transactions. We allow this option
     298             :  *     to be enabled only at the slot creation time. If we allow this option
     299             :  *     to be changed during decoding then it is quite possible that we skip
     300             :  *     prepare first time because this option was not enabled. Now next time
     301             :  *     during getting changes, if the two_phase option is enabled it can skip
     302             :  *     prepare because by that time start decoding point has been moved. So the
     303             :  *     user will only get commit prepared.
     304             :  * failover: If enabled, allows the slot to be synced to standbys so
     305             :  *     that logical replication can be resumed after failover.
     306             :  * synced: True if the slot is synchronized from the primary server.
     307             :  */
     308             : void
     309        1176 : ReplicationSlotCreate(const char *name, bool db_specific,
     310             :                       ReplicationSlotPersistency persistency,
     311             :                       bool two_phase, bool failover, bool synced)
     312             : {
     313        1176 :     ReplicationSlot *slot = NULL;
     314             :     int         i;
     315             : 
     316             :     Assert(MyReplicationSlot == NULL);
     317             : 
     318        1176 :     ReplicationSlotValidateName(name, ERROR);
     319             : 
     320        1174 :     if (failover)
     321             :     {
     322             :         /*
     323             :          * Do not allow users to create the failover enabled slots on the
     324             :          * standby as we do not support sync to the cascading standby.
     325             :          *
     326             :          * However, failover enabled slots can be created during slot
     327             :          * synchronization because we need to retain the same values as the
     328             :          * remote slot.
     329             :          */
     330          42 :         if (RecoveryInProgress() && !IsSyncingReplicationSlots())
     331           0 :             ereport(ERROR,
     332             :                     errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     333             :                     errmsg("cannot enable failover for a replication slot created on the standby"));
     334             : 
     335             :         /*
     336             :          * Do not allow users to create failover enabled temporary slots,
     337             :          * because temporary slots will not be synced to the standby.
     338             :          *
     339             :          * However, failover enabled temporary slots can be created during
     340             :          * slot synchronization. See the comments atop slotsync.c for details.
     341             :          */
     342          42 :         if (persistency == RS_TEMPORARY && !IsSyncingReplicationSlots())
     343           2 :             ereport(ERROR,
     344             :                     errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     345             :                     errmsg("cannot enable failover for a temporary replication slot"));
     346             :     }
     347             : 
     348             :     /*
     349             :      * If some other backend ran this code concurrently with us, we'd likely
     350             :      * both allocate the same slot, and that would be bad.  We'd also be at
     351             :      * risk of missing a name collision.  Also, we don't want to try to create
     352             :      * a new slot while somebody's busy cleaning up an old one, because we
     353             :      * might both be monkeying with the same directory.
     354             :      */
     355        1172 :     LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
     356             : 
     357             :     /*
     358             :      * Check for name collision, and identify an allocatable slot.  We need to
     359             :      * hold ReplicationSlotControlLock in shared mode for this, so that nobody
     360             :      * else can change the in_use flags while we're looking at them.
     361             :      */
     362        1172 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     363       11188 :     for (i = 0; i < max_replication_slots; i++)
     364             :     {
     365       10022 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
     366             : 
     367       10022 :         if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
     368           6 :             ereport(ERROR,
     369             :                     (errcode(ERRCODE_DUPLICATE_OBJECT),
     370             :                      errmsg("replication slot \"%s\" already exists", name)));
     371       10016 :         if (!s->in_use && slot == NULL)
     372        1164 :             slot = s;
     373             :     }
     374        1166 :     LWLockRelease(ReplicationSlotControlLock);
     375             : 
     376             :     /* If all slots are in use, we're out of luck. */
     377        1166 :     if (slot == NULL)
     378           2 :         ereport(ERROR,
     379             :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     380             :                  errmsg("all replication slots are in use"),
     381             :                  errhint("Free one or increase \"max_replication_slots\".")));
     382             : 
     383             :     /*
     384             :      * Since this slot is not in use, nobody should be looking at any part of
     385             :      * it other than the in_use field unless they're trying to allocate it.
     386             :      * And since we hold ReplicationSlotAllocationLock, nobody except us can
     387             :      * be doing that.  So it's safe to initialize the slot.
     388             :      */
     389             :     Assert(!slot->in_use);
     390             :     Assert(slot->active_pid == 0);
     391             : 
     392             :     /* first initialize persistent data */
     393        1164 :     memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData));
     394        1164 :     namestrcpy(&slot->data.name, name);
     395        1164 :     slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
     396        1164 :     slot->data.persistency = persistency;
     397        1164 :     slot->data.two_phase = two_phase;
     398        1164 :     slot->data.two_phase_at = InvalidXLogRecPtr;
     399        1164 :     slot->data.failover = failover;
     400        1164 :     slot->data.synced = synced;
     401             : 
     402             :     /* and then data only present in shared memory */
     403        1164 :     slot->just_dirtied = false;
     404        1164 :     slot->dirty = false;
     405        1164 :     slot->effective_xmin = InvalidTransactionId;
     406        1164 :     slot->effective_catalog_xmin = InvalidTransactionId;
     407        1164 :     slot->candidate_catalog_xmin = InvalidTransactionId;
     408        1164 :     slot->candidate_xmin_lsn = InvalidXLogRecPtr;
     409        1164 :     slot->candidate_restart_valid = InvalidXLogRecPtr;
     410        1164 :     slot->candidate_restart_lsn = InvalidXLogRecPtr;
     411        1164 :     slot->last_saved_confirmed_flush = InvalidXLogRecPtr;
     412        1164 :     slot->inactive_since = 0;
     413             : 
     414             :     /*
     415             :      * Create the slot on disk.  We haven't actually marked the slot allocated
     416             :      * yet, so no special cleanup is required if this errors out.
     417             :      */
     418        1164 :     CreateSlotOnDisk(slot);
     419             : 
     420             :     /*
     421             :      * We need to briefly prevent any other backend from iterating over the
     422             :      * slots while we flip the in_use flag. We also need to set the active
     423             :      * flag while holding the ControlLock as otherwise a concurrent
     424             :      * ReplicationSlotAcquire() could acquire the slot as well.
     425             :      */
     426        1164 :     LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
     427             : 
     428        1164 :     slot->in_use = true;
     429             : 
     430             :     /* We can now mark the slot active, and that makes it our slot. */
     431        1164 :     SpinLockAcquire(&slot->mutex);
     432             :     Assert(slot->active_pid == 0);
     433        1164 :     slot->active_pid = MyProcPid;
     434        1164 :     SpinLockRelease(&slot->mutex);
     435        1164 :     MyReplicationSlot = slot;
     436             : 
     437        1164 :     LWLockRelease(ReplicationSlotControlLock);
     438             : 
     439             :     /*
     440             :      * Create statistics entry for the new logical slot. We don't collect any
     441             :      * stats for physical slots, so no need to create an entry for the same.
     442             :      * See ReplicationSlotDropPtr for why we need to do this before releasing
     443             :      * ReplicationSlotAllocationLock.
     444             :      */
     445        1164 :     if (SlotIsLogical(slot))
     446         828 :         pgstat_create_replslot(slot);
     447             : 
     448             :     /*
     449             :      * Now that the slot has been marked as in_use and active, it's safe to
     450             :      * let somebody else try to allocate a slot.
     451             :      */
     452        1164 :     LWLockRelease(ReplicationSlotAllocationLock);
     453             : 
     454             :     /* Let everybody know we've modified this slot */
     455        1164 :     ConditionVariableBroadcast(&slot->active_cv);
     456        1164 : }
     457             : 
     458             : /*
     459             :  * Search for the named replication slot.
     460             :  *
     461             :  * Return the replication slot if found, otherwise NULL.
     462             :  */
     463             : ReplicationSlot *
     464        2558 : SearchNamedReplicationSlot(const char *name, bool need_lock)
     465             : {
     466             :     int         i;
     467        2558 :     ReplicationSlot *slot = NULL;
     468             : 
     469        2558 :     if (need_lock)
     470         156 :         LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     471             : 
     472        4376 :     for (i = 0; i < max_replication_slots; i++)
     473             :     {
     474        4336 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
     475             : 
     476        4336 :         if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
     477             :         {
     478        2518 :             slot = s;
     479        2518 :             break;
     480             :         }
     481             :     }
     482             : 
     483        2558 :     if (need_lock)
     484         156 :         LWLockRelease(ReplicationSlotControlLock);
     485             : 
     486        2558 :     return slot;
     487             : }
     488             : 
     489             : /*
     490             :  * Return the index of the replication slot in
     491             :  * ReplicationSlotCtl->replication_slots.
     492             :  *
     493             :  * This is mainly useful to have an efficient key for storing replication slot
     494             :  * stats.
     495             :  */
     496             : int
     497       15058 : ReplicationSlotIndex(ReplicationSlot *slot)
     498             : {
     499             :     Assert(slot >= ReplicationSlotCtl->replication_slots &&
     500             :            slot < ReplicationSlotCtl->replication_slots + max_replication_slots);
     501             : 
     502       15058 :     return slot - ReplicationSlotCtl->replication_slots;
     503             : }
     504             : 
     505             : /*
     506             :  * If the slot at 'index' is unused, return false. Otherwise 'name' is set to
     507             :  * the slot's name and true is returned.
     508             :  *
     509             :  * This likely is only useful for pgstat_replslot.c during shutdown, in other
     510             :  * cases there are obvious TOCTOU issues.
     511             :  */
     512             : bool
     513         150 : ReplicationSlotName(int index, Name name)
     514             : {
     515             :     ReplicationSlot *slot;
     516             :     bool        found;
     517             : 
     518         150 :     slot = &ReplicationSlotCtl->replication_slots[index];
     519             : 
     520             :     /*
     521             :      * Ensure that the slot cannot be dropped while we copy the name. Don't
     522             :      * need the spinlock as the name of an existing slot cannot change.
     523             :      */
     524         150 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     525         150 :     found = slot->in_use;
     526         150 :     if (slot->in_use)
     527         150 :         namestrcpy(name, NameStr(slot->data.name));
     528         150 :     LWLockRelease(ReplicationSlotControlLock);
     529             : 
     530         150 :     return found;
     531             : }
     532             : 
     533             : /*
     534             :  * Find a previously created slot and mark it as used by this process.
     535             :  *
     536             :  * An error is raised if nowait is true and the slot is currently in use. If
     537             :  * nowait is false, we sleep until the slot is released by the owning process.
     538             :  */
     539             : void
     540        2260 : ReplicationSlotAcquire(const char *name, bool nowait)
     541             : {
     542             :     ReplicationSlot *s;
     543             :     int         active_pid;
     544             : 
     545             :     Assert(name != NULL);
     546             : 
     547        2260 : retry:
     548             :     Assert(MyReplicationSlot == NULL);
     549             : 
     550        2260 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     551             : 
     552             :     /* Check if the slot exits with the given name. */
     553        2260 :     s = SearchNamedReplicationSlot(name, false);
     554        2260 :     if (s == NULL || !s->in_use)
     555             :     {
     556          16 :         LWLockRelease(ReplicationSlotControlLock);
     557             : 
     558          16 :         ereport(ERROR,
     559             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
     560             :                  errmsg("replication slot \"%s\" does not exist",
     561             :                         name)));
     562             :     }
     563             : 
     564             :     /*
     565             :      * This is the slot we want; check if it's active under some other
     566             :      * process.  In single user mode, we don't need this check.
     567             :      */
     568        2244 :     if (IsUnderPostmaster)
     569             :     {
     570             :         /*
     571             :          * Get ready to sleep on the slot in case it is active.  (We may end
     572             :          * up not sleeping, but we don't want to do this while holding the
     573             :          * spinlock.)
     574             :          */
     575        2244 :         if (!nowait)
     576         468 :             ConditionVariablePrepareToSleep(&s->active_cv);
     577             : 
     578        2244 :         SpinLockAcquire(&s->mutex);
     579        2244 :         if (s->active_pid == 0)
     580        1972 :             s->active_pid = MyProcPid;
     581        2244 :         active_pid = s->active_pid;
     582        2244 :         SpinLockRelease(&s->mutex);
     583             :     }
     584             :     else
     585           0 :         active_pid = MyProcPid;
     586        2244 :     LWLockRelease(ReplicationSlotControlLock);
     587             : 
     588             :     /*
     589             :      * If we found the slot but it's already active in another process, we
     590             :      * wait until the owning process signals us that it's been released, or
     591             :      * error out.
     592             :      */
     593        2244 :     if (active_pid != MyProcPid)
     594             :     {
     595           0 :         if (!nowait)
     596             :         {
     597             :             /* Wait here until we get signaled, and then restart */
     598           0 :             ConditionVariableSleep(&s->active_cv,
     599             :                                    WAIT_EVENT_REPLICATION_SLOT_DROP);
     600           0 :             ConditionVariableCancelSleep();
     601           0 :             goto retry;
     602             :         }
     603             : 
     604           0 :         ereport(ERROR,
     605             :                 (errcode(ERRCODE_OBJECT_IN_USE),
     606             :                  errmsg("replication slot \"%s\" is active for PID %d",
     607             :                         NameStr(s->data.name), active_pid)));
     608             :     }
     609        2244 :     else if (!nowait)
     610         468 :         ConditionVariableCancelSleep(); /* no sleep needed after all */
     611             : 
     612             :     /* Let everybody know we've modified this slot */
     613        2244 :     ConditionVariableBroadcast(&s->active_cv);
     614             : 
     615             :     /* We made this slot active, so it's ours now. */
     616        2244 :     MyReplicationSlot = s;
     617             : 
     618             :     /*
     619             :      * The call to pgstat_acquire_replslot() protects against stats for a
     620             :      * different slot, from before a restart or such, being present during
     621             :      * pgstat_report_replslot().
     622             :      */
     623        2244 :     if (SlotIsLogical(s))
     624        1856 :         pgstat_acquire_replslot(s);
     625             : 
     626             :     /*
     627             :      * Reset the time since the slot has become inactive as the slot is active
     628             :      * now.
     629             :      */
     630        2244 :     SpinLockAcquire(&s->mutex);
     631        2244 :     s->inactive_since = 0;
     632        2244 :     SpinLockRelease(&s->mutex);
     633             : 
     634        2244 :     if (am_walsender)
     635             :     {
     636        1512 :         ereport(log_replication_commands ? LOG : DEBUG1,
     637             :                 SlotIsLogical(s)
     638             :                 ? errmsg("acquired logical replication slot \"%s\"",
     639             :                          NameStr(s->data.name))
     640             :                 : errmsg("acquired physical replication slot \"%s\"",
     641             :                          NameStr(s->data.name)));
     642             :     }
     643        2244 : }
     644             : 
     645             : /*
     646             :  * Release the replication slot that this backend considers to own.
     647             :  *
     648             :  * This or another backend can re-acquire the slot later.
     649             :  * Resources this slot requires will be preserved.
     650             :  */
     651             : void
     652        2734 : ReplicationSlotRelease(void)
     653             : {
     654        2734 :     ReplicationSlot *slot = MyReplicationSlot;
     655        2734 :     char       *slotname = NULL;    /* keep compiler quiet */
     656        2734 :     bool        is_logical = false; /* keep compiler quiet */
     657        2734 :     TimestampTz now = 0;
     658             : 
     659             :     Assert(slot != NULL && slot->active_pid != 0);
     660             : 
     661        2734 :     if (am_walsender)
     662             :     {
     663        1900 :         slotname = pstrdup(NameStr(slot->data.name));
     664        1900 :         is_logical = SlotIsLogical(slot);
     665             :     }
     666             : 
     667        2734 :     if (slot->data.persistency == RS_EPHEMERAL)
     668             :     {
     669             :         /*
     670             :          * Delete the slot. There is no !PANIC case where this is allowed to
     671             :          * fail, all that may happen is an incomplete cleanup of the on-disk
     672             :          * data.
     673             :          */
     674          10 :         ReplicationSlotDropAcquired();
     675             :     }
     676             : 
     677             :     /*
     678             :      * If slot needed to temporarily restrain both data and catalog xmin to
     679             :      * create the catalog snapshot, remove that temporary constraint.
     680             :      * Snapshots can only be exported while the initial snapshot is still
     681             :      * acquired.
     682             :      */
     683        2734 :     if (!TransactionIdIsValid(slot->data.xmin) &&
     684        2686 :         TransactionIdIsValid(slot->effective_xmin))
     685             :     {
     686         344 :         SpinLockAcquire(&slot->mutex);
     687         344 :         slot->effective_xmin = InvalidTransactionId;
     688         344 :         SpinLockRelease(&slot->mutex);
     689         344 :         ReplicationSlotsComputeRequiredXmin(false);
     690             :     }
     691             : 
     692             :     /*
     693             :      * Set the time since the slot has become inactive. We get the current
     694             :      * time beforehand to avoid system call while holding the spinlock.
     695             :      */
     696        2734 :     now = GetCurrentTimestamp();
     697             : 
     698        2734 :     if (slot->data.persistency == RS_PERSISTENT)
     699             :     {
     700             :         /*
     701             :          * Mark persistent slot inactive.  We're not freeing it, just
     702             :          * disconnecting, but wake up others that may be waiting for it.
     703             :          */
     704        2178 :         SpinLockAcquire(&slot->mutex);
     705        2178 :         slot->active_pid = 0;
     706        2178 :         slot->inactive_since = now;
     707        2178 :         SpinLockRelease(&slot->mutex);
     708        2178 :         ConditionVariableBroadcast(&slot->active_cv);
     709             :     }
     710             :     else
     711             :     {
     712         556 :         SpinLockAcquire(&slot->mutex);
     713         556 :         slot->inactive_since = now;
     714         556 :         SpinLockRelease(&slot->mutex);
     715             :     }
     716             : 
     717        2734 :     MyReplicationSlot = NULL;
     718             : 
     719             :     /* might not have been set when we've been a plain slot */
     720        2734 :     LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
     721        2734 :     MyProc->statusFlags &= ~PROC_IN_LOGICAL_DECODING;
     722        2734 :     ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags;
     723        2734 :     LWLockRelease(ProcArrayLock);
     724             : 
     725        2734 :     if (am_walsender)
     726             :     {
     727        1900 :         ereport(log_replication_commands ? LOG : DEBUG1,
     728             :                 is_logical
     729             :                 ? errmsg("released logical replication slot \"%s\"",
     730             :                          slotname)
     731             :                 : errmsg("released physical replication slot \"%s\"",
     732             :                          slotname));
     733             : 
     734        1900 :         pfree(slotname);
     735             :     }
     736        2734 : }
     737             : 
     738             : /*
     739             :  * Cleanup temporary slots created in current session.
     740             :  *
     741             :  * Cleanup only synced temporary slots if 'synced_only' is true, else
     742             :  * cleanup all temporary slots.
     743             :  */
     744             : void
     745       77100 : ReplicationSlotCleanup(bool synced_only)
     746             : {
     747             :     int         i;
     748             : 
     749             :     Assert(MyReplicationSlot == NULL);
     750             : 
     751       77100 : restart:
     752       77100 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     753      835436 :     for (i = 0; i < max_replication_slots; i++)
     754             :     {
     755      758610 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
     756             : 
     757      758610 :         if (!s->in_use)
     758      734418 :             continue;
     759             : 
     760       24192 :         SpinLockAcquire(&s->mutex);
     761       24192 :         if ((s->active_pid == MyProcPid &&
     762         274 :              (!synced_only || s->data.synced)))
     763             :         {
     764             :             Assert(s->data.persistency == RS_TEMPORARY);
     765         274 :             SpinLockRelease(&s->mutex);
     766         274 :             LWLockRelease(ReplicationSlotControlLock);  /* avoid deadlock */
     767             : 
     768         274 :             ReplicationSlotDropPtr(s);
     769             : 
     770         274 :             ConditionVariableBroadcast(&s->active_cv);
     771         274 :             goto restart;
     772             :         }
     773             :         else
     774       23918 :             SpinLockRelease(&s->mutex);
     775             :     }
     776             : 
     777       76826 :     LWLockRelease(ReplicationSlotControlLock);
     778       76826 : }
     779             : 
     780             : /*
     781             :  * Permanently drop replication slot identified by the passed in name.
     782             :  */
     783             : void
     784         710 : ReplicationSlotDrop(const char *name, bool nowait)
     785             : {
     786             :     Assert(MyReplicationSlot == NULL);
     787             : 
     788         710 :     ReplicationSlotAcquire(name, nowait);
     789             : 
     790             :     /*
     791             :      * Do not allow users to drop the slots which are currently being synced
     792             :      * from the primary to the standby.
     793             :      */
     794         700 :     if (RecoveryInProgress() && MyReplicationSlot->data.synced)
     795           2 :         ereport(ERROR,
     796             :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     797             :                 errmsg("cannot drop replication slot \"%s\"", name),
     798             :                 errdetail("This replication slot is being synchronized from the primary server."));
     799             : 
     800         698 :     ReplicationSlotDropAcquired();
     801         698 : }
     802             : 
     803             : /*
     804             :  * Change the definition of the slot identified by the specified name.
     805             :  */
     806             : void
     807          12 : ReplicationSlotAlter(const char *name, const bool *failover,
     808             :                      const bool *two_phase)
     809             : {
     810          12 :     bool        update_slot = false;
     811             : 
     812             :     Assert(MyReplicationSlot == NULL);
     813             :     Assert(failover || two_phase);
     814             : 
     815          12 :     ReplicationSlotAcquire(name, false);
     816             : 
     817          12 :     if (SlotIsPhysical(MyReplicationSlot))
     818           0 :         ereport(ERROR,
     819             :                 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     820             :                 errmsg("cannot use %s with a physical replication slot",
     821             :                        "ALTER_REPLICATION_SLOT"));
     822             : 
     823          12 :     if (MyReplicationSlot->data.invalidated != RS_INVAL_NONE)
     824           2 :         ereport(ERROR,
     825             :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     826             :                 errmsg("cannot alter invalid replication slot \"%s\"", name),
     827             :                 errdetail("This replication slot has been invalidated due to \"%s\".",
     828             :                           SlotInvalidationCauses[MyReplicationSlot->data.invalidated]));
     829             : 
     830          10 :     if (RecoveryInProgress())
     831             :     {
     832             :         /*
     833             :          * Do not allow users to alter the slots which are currently being
     834             :          * synced from the primary to the standby.
     835             :          */
     836           2 :         if (MyReplicationSlot->data.synced)
     837           2 :             ereport(ERROR,
     838             :                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     839             :                     errmsg("cannot alter replication slot \"%s\"", name),
     840             :                     errdetail("This replication slot is being synchronized from the primary server."));
     841             : 
     842             :         /*
     843             :          * Do not allow users to enable failover on the standby as we do not
     844             :          * support sync to the cascading standby.
     845             :          */
     846           0 :         if (failover && *failover)
     847           0 :             ereport(ERROR,
     848             :                     errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     849             :                     errmsg("cannot enable failover for a replication slot"
     850             :                            " on the standby"));
     851             :     }
     852             : 
     853           8 :     if (failover)
     854             :     {
     855             :         /*
     856             :          * Do not allow users to enable failover for temporary slots as we do
     857             :          * not support syncing temporary slots to the standby.
     858             :          */
     859           6 :         if (*failover && MyReplicationSlot->data.persistency == RS_TEMPORARY)
     860           0 :             ereport(ERROR,
     861             :                     errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     862             :                     errmsg("cannot enable failover for a temporary replication slot"));
     863             : 
     864           6 :         if (MyReplicationSlot->data.failover != *failover)
     865             :         {
     866           6 :             SpinLockAcquire(&MyReplicationSlot->mutex);
     867           6 :             MyReplicationSlot->data.failover = *failover;
     868           6 :             SpinLockRelease(&MyReplicationSlot->mutex);
     869             : 
     870           6 :             update_slot = true;
     871             :         }
     872             :     }
     873             : 
     874           8 :     if (two_phase && MyReplicationSlot->data.two_phase != *two_phase)
     875             :     {
     876           2 :         SpinLockAcquire(&MyReplicationSlot->mutex);
     877           2 :         MyReplicationSlot->data.two_phase = *two_phase;
     878           2 :         SpinLockRelease(&MyReplicationSlot->mutex);
     879             : 
     880           2 :         update_slot = true;
     881             :     }
     882             : 
     883           8 :     if (update_slot)
     884             :     {
     885           8 :         ReplicationSlotMarkDirty();
     886           8 :         ReplicationSlotSave();
     887             :     }
     888             : 
     889           8 :     ReplicationSlotRelease();
     890           8 : }
     891             : 
     892             : /*
     893             :  * Permanently drop the currently acquired replication slot.
     894             :  */
     895             : void
     896         722 : ReplicationSlotDropAcquired(void)
     897             : {
     898         722 :     ReplicationSlot *slot = MyReplicationSlot;
     899             : 
     900             :     Assert(MyReplicationSlot != NULL);
     901             : 
     902             :     /* slot isn't acquired anymore */
     903         722 :     MyReplicationSlot = NULL;
     904             : 
     905         722 :     ReplicationSlotDropPtr(slot);
     906         722 : }
     907             : 
     908             : /*
     909             :  * Permanently drop the replication slot which will be released by the point
     910             :  * this function returns.
     911             :  */
     912             : static void
     913         996 : ReplicationSlotDropPtr(ReplicationSlot *slot)
     914             : {
     915             :     char        path[MAXPGPATH];
     916             :     char        tmppath[MAXPGPATH];
     917             : 
     918             :     /*
     919             :      * If some other backend ran this code concurrently with us, we might try
     920             :      * to delete a slot with a certain name while someone else was trying to
     921             :      * create a slot with the same name.
     922             :      */
     923         996 :     LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
     924             : 
     925             :     /* Generate pathnames. */
     926         996 :     sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(slot->data.name));
     927         996 :     sprintf(tmppath, "%s/%s.tmp", PG_REPLSLOT_DIR, NameStr(slot->data.name));
     928             : 
     929             :     /*
     930             :      * Rename the slot directory on disk, so that we'll no longer recognize
     931             :      * this as a valid slot.  Note that if this fails, we've got to mark the
     932             :      * slot inactive before bailing out.  If we're dropping an ephemeral or a
     933             :      * temporary slot, we better never fail hard as the caller won't expect
     934             :      * the slot to survive and this might get called during error handling.
     935             :      */
     936         996 :     if (rename(path, tmppath) == 0)
     937             :     {
     938             :         /*
     939             :          * We need to fsync() the directory we just renamed and its parent to
     940             :          * make sure that our changes are on disk in a crash-safe fashion.  If
     941             :          * fsync() fails, we can't be sure whether the changes are on disk or
     942             :          * not.  For now, we handle that by panicking;
     943             :          * StartupReplicationSlots() will try to straighten it out after
     944             :          * restart.
     945             :          */
     946         996 :         START_CRIT_SECTION();
     947         996 :         fsync_fname(tmppath, true);
     948         996 :         fsync_fname(PG_REPLSLOT_DIR, true);
     949         996 :         END_CRIT_SECTION();
     950             :     }
     951             :     else
     952             :     {
     953           0 :         bool        fail_softly = slot->data.persistency != RS_PERSISTENT;
     954             : 
     955           0 :         SpinLockAcquire(&slot->mutex);
     956           0 :         slot->active_pid = 0;
     957           0 :         SpinLockRelease(&slot->mutex);
     958             : 
     959             :         /* wake up anyone waiting on this slot */
     960           0 :         ConditionVariableBroadcast(&slot->active_cv);
     961             : 
     962           0 :         ereport(fail_softly ? WARNING : ERROR,
     963             :                 (errcode_for_file_access(),
     964             :                  errmsg("could not rename file \"%s\" to \"%s\": %m",
     965             :                         path, tmppath)));
     966             :     }
     967             : 
     968             :     /*
     969             :      * The slot is definitely gone.  Lock out concurrent scans of the array
     970             :      * long enough to kill it.  It's OK to clear the active PID here without
     971             :      * grabbing the mutex because nobody else can be scanning the array here,
     972             :      * and nobody can be attached to this slot and thus access it without
     973             :      * scanning the array.
     974             :      *
     975             :      * Also wake up processes waiting for it.
     976             :      */
     977         996 :     LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
     978         996 :     slot->active_pid = 0;
     979         996 :     slot->in_use = false;
     980         996 :     LWLockRelease(ReplicationSlotControlLock);
     981         996 :     ConditionVariableBroadcast(&slot->active_cv);
     982             : 
     983             :     /*
     984             :      * Slot is dead and doesn't prevent resource removal anymore, recompute
     985             :      * limits.
     986             :      */
     987         996 :     ReplicationSlotsComputeRequiredXmin(false);
     988         996 :     ReplicationSlotsComputeRequiredLSN();
     989             : 
     990             :     /*
     991             :      * If removing the directory fails, the worst thing that will happen is
     992             :      * that the user won't be able to create a new slot with the same name
     993             :      * until the next server restart.  We warn about it, but that's all.
     994             :      */
     995         996 :     if (!rmtree(tmppath, true))
     996           0 :         ereport(WARNING,
     997             :                 (errmsg("could not remove directory \"%s\"", tmppath)));
     998             : 
     999             :     /*
    1000             :      * Drop the statistics entry for the replication slot.  Do this while
    1001             :      * holding ReplicationSlotAllocationLock so that we don't drop a
    1002             :      * statistics entry for another slot with the same name just created in
    1003             :      * another session.
    1004             :      */
    1005         996 :     if (SlotIsLogical(slot))
    1006         706 :         pgstat_drop_replslot(slot);
    1007             : 
    1008             :     /*
    1009             :      * We release this at the very end, so that nobody starts trying to create
    1010             :      * a slot while we're still cleaning up the detritus of the old one.
    1011             :      */
    1012         996 :     LWLockRelease(ReplicationSlotAllocationLock);
    1013         996 : }
    1014             : 
    1015             : /*
    1016             :  * Serialize the currently acquired slot's state from memory to disk, thereby
    1017             :  * guaranteeing the current state will survive a crash.
    1018             :  */
    1019             : void
    1020        2310 : ReplicationSlotSave(void)
    1021             : {
    1022             :     char        path[MAXPGPATH];
    1023             : 
    1024             :     Assert(MyReplicationSlot != NULL);
    1025             : 
    1026        2310 :     sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(MyReplicationSlot->data.name));
    1027        2310 :     SaveSlotToPath(MyReplicationSlot, path, ERROR);
    1028        2310 : }
    1029             : 
    1030             : /*
    1031             :  * Signal that it would be useful if the currently acquired slot would be
    1032             :  * flushed out to disk.
    1033             :  *
    1034             :  * Note that the actual flush to disk can be delayed for a long time, if
    1035             :  * required for correctness explicitly do a ReplicationSlotSave().
    1036             :  */
    1037             : void
    1038       12824 : ReplicationSlotMarkDirty(void)
    1039             : {
    1040       12824 :     ReplicationSlot *slot = MyReplicationSlot;
    1041             : 
    1042             :     Assert(MyReplicationSlot != NULL);
    1043             : 
    1044       12824 :     SpinLockAcquire(&slot->mutex);
    1045       12824 :     MyReplicationSlot->just_dirtied = true;
    1046       12824 :     MyReplicationSlot->dirty = true;
    1047       12824 :     SpinLockRelease(&slot->mutex);
    1048       12824 : }
    1049             : 
    1050             : /*
    1051             :  * Convert a slot that's marked as RS_EPHEMERAL or RS_TEMPORARY to a
    1052             :  * RS_PERSISTENT slot, guaranteeing it will be there after an eventual crash.
    1053             :  */
    1054             : void
    1055         802 : ReplicationSlotPersist(void)
    1056             : {
    1057         802 :     ReplicationSlot *slot = MyReplicationSlot;
    1058             : 
    1059             :     Assert(slot != NULL);
    1060             :     Assert(slot->data.persistency != RS_PERSISTENT);
    1061             : 
    1062         802 :     SpinLockAcquire(&slot->mutex);
    1063         802 :     slot->data.persistency = RS_PERSISTENT;
    1064         802 :     SpinLockRelease(&slot->mutex);
    1065             : 
    1066         802 :     ReplicationSlotMarkDirty();
    1067         802 :     ReplicationSlotSave();
    1068         802 : }
    1069             : 
    1070             : /*
    1071             :  * Compute the oldest xmin across all slots and store it in the ProcArray.
    1072             :  *
    1073             :  * If already_locked is true, ProcArrayLock has already been acquired
    1074             :  * exclusively.
    1075             :  */
    1076             : void
    1077        4072 : ReplicationSlotsComputeRequiredXmin(bool already_locked)
    1078             : {
    1079             :     int         i;
    1080        4072 :     TransactionId agg_xmin = InvalidTransactionId;
    1081        4072 :     TransactionId agg_catalog_xmin = InvalidTransactionId;
    1082             : 
    1083             :     Assert(ReplicationSlotCtl != NULL);
    1084             : 
    1085        4072 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    1086             : 
    1087       40922 :     for (i = 0; i < max_replication_slots; i++)
    1088             :     {
    1089       36850 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
    1090             :         TransactionId effective_xmin;
    1091             :         TransactionId effective_catalog_xmin;
    1092             :         bool        invalidated;
    1093             : 
    1094       36850 :         if (!s->in_use)
    1095       33198 :             continue;
    1096             : 
    1097        3652 :         SpinLockAcquire(&s->mutex);
    1098        3652 :         effective_xmin = s->effective_xmin;
    1099        3652 :         effective_catalog_xmin = s->effective_catalog_xmin;
    1100        3652 :         invalidated = s->data.invalidated != RS_INVAL_NONE;
    1101        3652 :         SpinLockRelease(&s->mutex);
    1102             : 
    1103             :         /* invalidated slots need not apply */
    1104        3652 :         if (invalidated)
    1105          44 :             continue;
    1106             : 
    1107             :         /* check the data xmin */
    1108        3608 :         if (TransactionIdIsValid(effective_xmin) &&
    1109           6 :             (!TransactionIdIsValid(agg_xmin) ||
    1110           6 :              TransactionIdPrecedes(effective_xmin, agg_xmin)))
    1111         518 :             agg_xmin = effective_xmin;
    1112             : 
    1113             :         /* check the catalog xmin */
    1114        3608 :         if (TransactionIdIsValid(effective_catalog_xmin) &&
    1115        1472 :             (!TransactionIdIsValid(agg_catalog_xmin) ||
    1116        1472 :              TransactionIdPrecedes(effective_catalog_xmin, agg_catalog_xmin)))
    1117        1998 :             agg_catalog_xmin = effective_catalog_xmin;
    1118             :     }
    1119             : 
    1120        4072 :     LWLockRelease(ReplicationSlotControlLock);
    1121             : 
    1122        4072 :     ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
    1123        4072 : }
    1124             : 
    1125             : /*
    1126             :  * Compute the oldest restart LSN across all slots and inform xlog module.
    1127             :  *
    1128             :  * Note: while max_slot_wal_keep_size is theoretically relevant for this
    1129             :  * purpose, we don't try to account for that, because this module doesn't
    1130             :  * know what to compare against.
    1131             :  */
    1132             : void
    1133       13670 : ReplicationSlotsComputeRequiredLSN(void)
    1134             : {
    1135             :     int         i;
    1136       13670 :     XLogRecPtr  min_required = InvalidXLogRecPtr;
    1137             : 
    1138             :     Assert(ReplicationSlotCtl != NULL);
    1139             : 
    1140       13670 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    1141      144066 :     for (i = 0; i < max_replication_slots; i++)
    1142             :     {
    1143      130396 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
    1144             :         XLogRecPtr  restart_lsn;
    1145             :         bool        invalidated;
    1146             : 
    1147      130396 :         if (!s->in_use)
    1148      117242 :             continue;
    1149             : 
    1150       13154 :         SpinLockAcquire(&s->mutex);
    1151       13154 :         restart_lsn = s->data.restart_lsn;
    1152       13154 :         invalidated = s->data.invalidated != RS_INVAL_NONE;
    1153       13154 :         SpinLockRelease(&s->mutex);
    1154             : 
    1155             :         /* invalidated slots need not apply */
    1156       13154 :         if (invalidated)
    1157          46 :             continue;
    1158             : 
    1159       13108 :         if (restart_lsn != InvalidXLogRecPtr &&
    1160        1426 :             (min_required == InvalidXLogRecPtr ||
    1161             :              restart_lsn < min_required))
    1162       11744 :             min_required = restart_lsn;
    1163             :     }
    1164       13670 :     LWLockRelease(ReplicationSlotControlLock);
    1165             : 
    1166       13670 :     XLogSetReplicationSlotMinimumLSN(min_required);
    1167       13670 : }
    1168             : 
    1169             : /*
    1170             :  * Compute the oldest WAL LSN required by *logical* decoding slots..
    1171             :  *
    1172             :  * Returns InvalidXLogRecPtr if logical decoding is disabled or no logical
    1173             :  * slots exist.
    1174             :  *
    1175             :  * NB: this returns a value >= ReplicationSlotsComputeRequiredLSN(), since it
    1176             :  * ignores physical replication slots.
    1177             :  *
    1178             :  * The results aren't required frequently, so we don't maintain a precomputed
    1179             :  * value like we do for ComputeRequiredLSN() and ComputeRequiredXmin().
    1180             :  */
    1181             : XLogRecPtr
    1182        4952 : ReplicationSlotsComputeLogicalRestartLSN(void)
    1183             : {
    1184        4952 :     XLogRecPtr  result = InvalidXLogRecPtr;
    1185             :     int         i;
    1186             : 
    1187        4952 :     if (max_replication_slots <= 0)
    1188           4 :         return InvalidXLogRecPtr;
    1189             : 
    1190        4948 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    1191             : 
    1192       53392 :     for (i = 0; i < max_replication_slots; i++)
    1193             :     {
    1194             :         ReplicationSlot *s;
    1195             :         XLogRecPtr  restart_lsn;
    1196             :         bool        invalidated;
    1197             : 
    1198       48444 :         s = &ReplicationSlotCtl->replication_slots[i];
    1199             : 
    1200             :         /* cannot change while ReplicationSlotCtlLock is held */
    1201       48444 :         if (!s->in_use)
    1202       47148 :             continue;
    1203             : 
    1204             :         /* we're only interested in logical slots */
    1205        1296 :         if (!SlotIsLogical(s))
    1206         948 :             continue;
    1207             : 
    1208             :         /* read once, it's ok if it increases while we're checking */
    1209         348 :         SpinLockAcquire(&s->mutex);
    1210         348 :         restart_lsn = s->data.restart_lsn;
    1211         348 :         invalidated = s->data.invalidated != RS_INVAL_NONE;
    1212         348 :         SpinLockRelease(&s->mutex);
    1213             : 
    1214             :         /* invalidated slots need not apply */
    1215         348 :         if (invalidated)
    1216           8 :             continue;
    1217             : 
    1218         340 :         if (restart_lsn == InvalidXLogRecPtr)
    1219           0 :             continue;
    1220             : 
    1221         340 :         if (result == InvalidXLogRecPtr ||
    1222             :             restart_lsn < result)
    1223         280 :             result = restart_lsn;
    1224             :     }
    1225             : 
    1226        4948 :     LWLockRelease(ReplicationSlotControlLock);
    1227             : 
    1228        4948 :     return result;
    1229             : }
    1230             : 
    1231             : /*
    1232             :  * ReplicationSlotsCountDBSlots -- count the number of slots that refer to the
    1233             :  * passed database oid.
    1234             :  *
    1235             :  * Returns true if there are any slots referencing the database. *nslots will
    1236             :  * be set to the absolute number of slots in the database, *nactive to ones
    1237             :  * currently active.
    1238             :  */
    1239             : bool
    1240          72 : ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
    1241             : {
    1242             :     int         i;
    1243             : 
    1244          72 :     *nslots = *nactive = 0;
    1245             : 
    1246          72 :     if (max_replication_slots <= 0)
    1247           0 :         return false;
    1248             : 
    1249          72 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    1250         734 :     for (i = 0; i < max_replication_slots; i++)
    1251             :     {
    1252             :         ReplicationSlot *s;
    1253             : 
    1254         662 :         s = &ReplicationSlotCtl->replication_slots[i];
    1255             : 
    1256             :         /* cannot change while ReplicationSlotCtlLock is held */
    1257         662 :         if (!s->in_use)
    1258         626 :             continue;
    1259             : 
    1260             :         /* only logical slots are database specific, skip */
    1261          36 :         if (!SlotIsLogical(s))
    1262          20 :             continue;
    1263             : 
    1264             :         /* not our database, skip */
    1265          16 :         if (s->data.database != dboid)
    1266          10 :             continue;
    1267             : 
    1268             :         /* NB: intentionally counting invalidated slots */
    1269             : 
    1270             :         /* count slots with spinlock held */
    1271           6 :         SpinLockAcquire(&s->mutex);
    1272           6 :         (*nslots)++;
    1273           6 :         if (s->active_pid != 0)
    1274           2 :             (*nactive)++;
    1275           6 :         SpinLockRelease(&s->mutex);
    1276             :     }
    1277          72 :     LWLockRelease(ReplicationSlotControlLock);
    1278             : 
    1279          72 :     if (*nslots > 0)
    1280           6 :         return true;
    1281          66 :     return false;
    1282             : }
    1283             : 
    1284             : /*
    1285             :  * ReplicationSlotsDropDBSlots -- Drop all db-specific slots relating to the
    1286             :  * passed database oid. The caller should hold an exclusive lock on the
    1287             :  * pg_database oid for the database to prevent creation of new slots on the db
    1288             :  * or replay from existing slots.
    1289             :  *
    1290             :  * Another session that concurrently acquires an existing slot on the target DB
    1291             :  * (most likely to drop it) may cause this function to ERROR. If that happens
    1292             :  * it may have dropped some but not all slots.
    1293             :  *
    1294             :  * This routine isn't as efficient as it could be - but we don't drop
    1295             :  * databases often, especially databases with lots of slots.
    1296             :  */
    1297             : void
    1298          96 : ReplicationSlotsDropDBSlots(Oid dboid)
    1299             : {
    1300             :     int         i;
    1301             : 
    1302          96 :     if (max_replication_slots <= 0)
    1303           0 :         return;
    1304             : 
    1305          96 : restart:
    1306         106 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    1307         992 :     for (i = 0; i < max_replication_slots; i++)
    1308             :     {
    1309             :         ReplicationSlot *s;
    1310             :         char       *slotname;
    1311             :         int         active_pid;
    1312             : 
    1313         896 :         s = &ReplicationSlotCtl->replication_slots[i];
    1314             : 
    1315             :         /* cannot change while ReplicationSlotCtlLock is held */
    1316         896 :         if (!s->in_use)
    1317         842 :             continue;
    1318             : 
    1319             :         /* only logical slots are database specific, skip */
    1320          54 :         if (!SlotIsLogical(s))
    1321          22 :             continue;
    1322             : 
    1323             :         /* not our database, skip */
    1324          32 :         if (s->data.database != dboid)
    1325          22 :             continue;
    1326             : 
    1327             :         /* NB: intentionally including invalidated slots */
    1328             : 
    1329             :         /* acquire slot, so ReplicationSlotDropAcquired can be reused  */
    1330          10 :         SpinLockAcquire(&s->mutex);
    1331             :         /* can't change while ReplicationSlotControlLock is held */
    1332          10 :         slotname = NameStr(s->data.name);
    1333          10 :         active_pid = s->active_pid;
    1334          10 :         if (active_pid == 0)
    1335             :         {
    1336          10 :             MyReplicationSlot = s;
    1337          10 :             s->active_pid = MyProcPid;
    1338             :         }
    1339          10 :         SpinLockRelease(&s->mutex);
    1340             : 
    1341             :         /*
    1342             :          * Even though we hold an exclusive lock on the database object a
    1343             :          * logical slot for that DB can still be active, e.g. if it's
    1344             :          * concurrently being dropped by a backend connected to another DB.
    1345             :          *
    1346             :          * That's fairly unlikely in practice, so we'll just bail out.
    1347             :          *
    1348             :          * The slot sync worker holds a shared lock on the database before
    1349             :          * operating on synced logical slots to avoid conflict with the drop
    1350             :          * happening here. The persistent synced slots are thus safe but there
    1351             :          * is a possibility that the slot sync worker has created a temporary
    1352             :          * slot (which stays active even on release) and we are trying to drop
    1353             :          * that here. In practice, the chances of hitting this scenario are
    1354             :          * less as during slot synchronization, the temporary slot is
    1355             :          * immediately converted to persistent and thus is safe due to the
    1356             :          * shared lock taken on the database. So, we'll just bail out in such
    1357             :          * a case.
    1358             :          *
    1359             :          * XXX: We can consider shutting down the slot sync worker before
    1360             :          * trying to drop synced temporary slots here.
    1361             :          */
    1362          10 :         if (active_pid)
    1363           0 :             ereport(ERROR,
    1364             :                     (errcode(ERRCODE_OBJECT_IN_USE),
    1365             :                      errmsg("replication slot \"%s\" is active for PID %d",
    1366             :                             slotname, active_pid)));
    1367             : 
    1368             :         /*
    1369             :          * To avoid duplicating ReplicationSlotDropAcquired() and to avoid
    1370             :          * holding ReplicationSlotControlLock over filesystem operations,
    1371             :          * release ReplicationSlotControlLock and use
    1372             :          * ReplicationSlotDropAcquired.
    1373             :          *
    1374             :          * As that means the set of slots could change, restart scan from the
    1375             :          * beginning each time we release the lock.
    1376             :          */
    1377          10 :         LWLockRelease(ReplicationSlotControlLock);
    1378          10 :         ReplicationSlotDropAcquired();
    1379          10 :         goto restart;
    1380             :     }
    1381          96 :     LWLockRelease(ReplicationSlotControlLock);
    1382             : }
    1383             : 
    1384             : 
    1385             : /*
    1386             :  * Check whether the server's configuration supports using replication
    1387             :  * slots.
    1388             :  */
    1389             : void
    1390        3082 : CheckSlotRequirements(void)
    1391             : {
    1392             :     /*
    1393             :      * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
    1394             :      * needs the same check.
    1395             :      */
    1396             : 
    1397        3082 :     if (max_replication_slots == 0)
    1398           0 :         ereport(ERROR,
    1399             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1400             :                  errmsg("replication slots can only be used if \"max_replication_slots\" > 0")));
    1401             : 
    1402        3082 :     if (wal_level < WAL_LEVEL_REPLICA)
    1403           0 :         ereport(ERROR,
    1404             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1405             :                  errmsg("replication slots can only be used if \"wal_level\" >= \"replica\"")));
    1406        3082 : }
    1407             : 
    1408             : /*
    1409             :  * Check whether the user has privilege to use replication slots.
    1410             :  */
    1411             : void
    1412        1032 : CheckSlotPermissions(void)
    1413             : {
    1414        1032 :     if (!has_rolreplication(GetUserId()))
    1415          10 :         ereport(ERROR,
    1416             :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
    1417             :                  errmsg("permission denied to use replication slots"),
    1418             :                  errdetail("Only roles with the %s attribute may use replication slots.",
    1419             :                            "REPLICATION")));
    1420        1022 : }
    1421             : 
    1422             : /*
    1423             :  * Reserve WAL for the currently active slot.
    1424             :  *
    1425             :  * Compute and set restart_lsn in a manner that's appropriate for the type of
    1426             :  * the slot and concurrency safe.
    1427             :  */
    1428             : void
    1429        1088 : ReplicationSlotReserveWal(void)
    1430             : {
    1431        1088 :     ReplicationSlot *slot = MyReplicationSlot;
    1432             : 
    1433             :     Assert(slot != NULL);
    1434             :     Assert(slot->data.restart_lsn == InvalidXLogRecPtr);
    1435             : 
    1436             :     /*
    1437             :      * The replication slot mechanism is used to prevent removal of required
    1438             :      * WAL. As there is no interlock between this routine and checkpoints, WAL
    1439             :      * segments could concurrently be removed when a now stale return value of
    1440             :      * ReplicationSlotsComputeRequiredLSN() is used. In the unlikely case that
    1441             :      * this happens we'll just retry.
    1442             :      */
    1443             :     while (true)
    1444           0 :     {
    1445             :         XLogSegNo   segno;
    1446             :         XLogRecPtr  restart_lsn;
    1447             : 
    1448             :         /*
    1449             :          * For logical slots log a standby snapshot and start logical decoding
    1450             :          * at exactly that position. That allows the slot to start up more
    1451             :          * quickly. But on a standby we cannot do WAL writes, so just use the
    1452             :          * replay pointer; effectively, an attempt to create a logical slot on
    1453             :          * standby will cause it to wait for an xl_running_xact record to be
    1454             :          * logged independently on the primary, so that a snapshot can be
    1455             :          * built using the record.
    1456             :          *
    1457             :          * None of this is needed (or indeed helpful) for physical slots as
    1458             :          * they'll start replay at the last logged checkpoint anyway. Instead
    1459             :          * return the location of the last redo LSN. While that slightly
    1460             :          * increases the chance that we have to retry, it's where a base
    1461             :          * backup has to start replay at.
    1462             :          */
    1463        1088 :         if (SlotIsPhysical(slot))
    1464         286 :             restart_lsn = GetRedoRecPtr();
    1465         802 :         else if (RecoveryInProgress())
    1466          44 :             restart_lsn = GetXLogReplayRecPtr(NULL);
    1467             :         else
    1468         758 :             restart_lsn = GetXLogInsertRecPtr();
    1469             : 
    1470        1088 :         SpinLockAcquire(&slot->mutex);
    1471        1088 :         slot->data.restart_lsn = restart_lsn;
    1472        1088 :         SpinLockRelease(&slot->mutex);
    1473             : 
    1474             :         /* prevent WAL removal as fast as possible */
    1475        1088 :         ReplicationSlotsComputeRequiredLSN();
    1476             : 
    1477             :         /*
    1478             :          * If all required WAL is still there, great, otherwise retry. The
    1479             :          * slot should prevent further removal of WAL, unless there's a
    1480             :          * concurrent ReplicationSlotsComputeRequiredLSN() after we've written
    1481             :          * the new restart_lsn above, so normally we should never need to loop
    1482             :          * more than twice.
    1483             :          */
    1484        1088 :         XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size);
    1485        1088 :         if (XLogGetLastRemovedSegno() < segno)
    1486        1088 :             break;
    1487             :     }
    1488             : 
    1489        1088 :     if (!RecoveryInProgress() && SlotIsLogical(slot))
    1490             :     {
    1491             :         XLogRecPtr  flushptr;
    1492             : 
    1493             :         /* make sure we have enough information to start */
    1494         758 :         flushptr = LogStandbySnapshot();
    1495             : 
    1496             :         /* and make sure it's fsynced to disk */
    1497         758 :         XLogFlush(flushptr);
    1498             :     }
    1499        1088 : }
    1500             : 
    1501             : /*
    1502             :  * Report that replication slot needs to be invalidated
    1503             :  */
    1504             : static void
    1505          42 : ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
    1506             :                        bool terminating,
    1507             :                        int pid,
    1508             :                        NameData slotname,
    1509             :                        XLogRecPtr restart_lsn,
    1510             :                        XLogRecPtr oldestLSN,
    1511             :                        TransactionId snapshotConflictHorizon)
    1512             : {
    1513             :     StringInfoData err_detail;
    1514          42 :     bool        hint = false;
    1515             : 
    1516          42 :     initStringInfo(&err_detail);
    1517             : 
    1518          42 :     switch (cause)
    1519             :     {
    1520          12 :         case RS_INVAL_WAL_REMOVED:
    1521             :             {
    1522          12 :                 unsigned long long ex = oldestLSN - restart_lsn;
    1523             : 
    1524          12 :                 hint = true;
    1525          12 :                 appendStringInfo(&err_detail,
    1526          12 :                                  ngettext("The slot's restart_lsn %X/%X exceeds the limit by %llu byte.",
    1527             :                                           "The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.",
    1528             :                                           ex),
    1529          12 :                                  LSN_FORMAT_ARGS(restart_lsn),
    1530             :                                  ex);
    1531          12 :                 break;
    1532             :             }
    1533          24 :         case RS_INVAL_HORIZON:
    1534          24 :             appendStringInfo(&err_detail, _("The slot conflicted with xid horizon %u."),
    1535             :                              snapshotConflictHorizon);
    1536          24 :             break;
    1537             : 
    1538           6 :         case RS_INVAL_WAL_LEVEL:
    1539           6 :             appendStringInfoString(&err_detail, _("Logical decoding on standby requires \"wal_level\" >= \"logical\" on the primary server."));
    1540           6 :             break;
    1541             :         case RS_INVAL_NONE:
    1542             :             pg_unreachable();
    1543             :     }
    1544             : 
    1545          42 :     ereport(LOG,
    1546             :             terminating ?
    1547             :             errmsg("terminating process %d to release replication slot \"%s\"",
    1548             :                    pid, NameStr(slotname)) :
    1549             :             errmsg("invalidating obsolete replication slot \"%s\"",
    1550             :                    NameStr(slotname)),
    1551             :             errdetail_internal("%s", err_detail.data),
    1552             :             hint ? errhint("You might need to increase \"%s\".", "max_slot_wal_keep_size") : 0);
    1553             : 
    1554          42 :     pfree(err_detail.data);
    1555          42 : }
    1556             : 
    1557             : /*
    1558             :  * Helper for InvalidateObsoleteReplicationSlots
    1559             :  *
    1560             :  * Acquires the given slot and mark it invalid, if necessary and possible.
    1561             :  *
    1562             :  * Returns whether ReplicationSlotControlLock was released in the interim (and
    1563             :  * in that case we're not holding the lock at return, otherwise we are).
    1564             :  *
    1565             :  * Sets *invalidated true if the slot was invalidated. (Untouched otherwise.)
    1566             :  *
    1567             :  * This is inherently racy, because we release the LWLock
    1568             :  * for syscalls, so caller must restart if we return true.
    1569             :  */
    1570             : static bool
    1571         750 : InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
    1572             :                                ReplicationSlot *s,
    1573             :                                XLogRecPtr oldestLSN,
    1574             :                                Oid dboid, TransactionId snapshotConflictHorizon,
    1575             :                                bool *invalidated)
    1576             : {
    1577         750 :     int         last_signaled_pid = 0;
    1578         750 :     bool        released_lock = false;
    1579         750 :     bool        terminated = false;
    1580         750 :     TransactionId initial_effective_xmin = InvalidTransactionId;
    1581         750 :     TransactionId initial_catalog_effective_xmin = InvalidTransactionId;
    1582         750 :     XLogRecPtr  initial_restart_lsn = InvalidXLogRecPtr;
    1583         750 :     ReplicationSlotInvalidationCause invalidation_cause_prev PG_USED_FOR_ASSERTS_ONLY = RS_INVAL_NONE;
    1584             : 
    1585             :     for (;;)
    1586          14 :     {
    1587             :         XLogRecPtr  restart_lsn;
    1588             :         NameData    slotname;
    1589         764 :         int         active_pid = 0;
    1590         764 :         ReplicationSlotInvalidationCause invalidation_cause = RS_INVAL_NONE;
    1591             : 
    1592             :         Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
    1593             : 
    1594         764 :         if (!s->in_use)
    1595             :         {
    1596           0 :             if (released_lock)
    1597           0 :                 LWLockRelease(ReplicationSlotControlLock);
    1598           0 :             break;
    1599             :         }
    1600             : 
    1601             :         /*
    1602             :          * Check if the slot needs to be invalidated. If it needs to be
    1603             :          * invalidated, and is not currently acquired, acquire it and mark it
    1604             :          * as having been invalidated.  We do this with the spinlock held to
    1605             :          * avoid race conditions -- for example the restart_lsn could move
    1606             :          * forward, or the slot could be dropped.
    1607             :          */
    1608         764 :         SpinLockAcquire(&s->mutex);
    1609             : 
    1610         764 :         restart_lsn = s->data.restart_lsn;
    1611             : 
    1612             :         /* we do nothing if the slot is already invalid */
    1613         764 :         if (s->data.invalidated == RS_INVAL_NONE)
    1614             :         {
    1615             :             /*
    1616             :              * The slot's mutex will be released soon, and it is possible that
    1617             :              * those values change since the process holding the slot has been
    1618             :              * terminated (if any), so record them here to ensure that we
    1619             :              * would report the correct invalidation cause.
    1620             :              */
    1621         686 :             if (!terminated)
    1622             :             {
    1623         672 :                 initial_restart_lsn = s->data.restart_lsn;
    1624         672 :                 initial_effective_xmin = s->effective_xmin;
    1625         672 :                 initial_catalog_effective_xmin = s->effective_catalog_xmin;
    1626             :             }
    1627             : 
    1628         686 :             switch (cause)
    1629             :             {
    1630         636 :                 case RS_INVAL_WAL_REMOVED:
    1631         636 :                     if (initial_restart_lsn != InvalidXLogRecPtr &&
    1632             :                         initial_restart_lsn < oldestLSN)
    1633          12 :                         invalidation_cause = cause;
    1634         636 :                     break;
    1635          44 :                 case RS_INVAL_HORIZON:
    1636          44 :                     if (!SlotIsLogical(s))
    1637           0 :                         break;
    1638             :                     /* invalid DB oid signals a shared relation */
    1639          44 :                     if (dboid != InvalidOid && dboid != s->data.database)
    1640           0 :                         break;
    1641          44 :                     if (TransactionIdIsValid(initial_effective_xmin) &&
    1642           0 :                         TransactionIdPrecedesOrEquals(initial_effective_xmin,
    1643             :                                                       snapshotConflictHorizon))
    1644           0 :                         invalidation_cause = cause;
    1645          88 :                     else if (TransactionIdIsValid(initial_catalog_effective_xmin) &&
    1646          44 :                              TransactionIdPrecedesOrEquals(initial_catalog_effective_xmin,
    1647             :                                                            snapshotConflictHorizon))
    1648          24 :                         invalidation_cause = cause;
    1649          44 :                     break;
    1650           6 :                 case RS_INVAL_WAL_LEVEL:
    1651           6 :                     if (SlotIsLogical(s))
    1652           6 :                         invalidation_cause = cause;
    1653           6 :                     break;
    1654             :                 case RS_INVAL_NONE:
    1655             :                     pg_unreachable();
    1656             :             }
    1657          78 :         }
    1658             : 
    1659             :         /*
    1660             :          * The invalidation cause recorded previously should not change while
    1661             :          * the process owning the slot (if any) has been terminated.
    1662             :          */
    1663             :         Assert(!(invalidation_cause_prev != RS_INVAL_NONE && terminated &&
    1664             :                  invalidation_cause_prev != invalidation_cause));
    1665             : 
    1666             :         /* if there's no invalidation, we're done */
    1667         764 :         if (invalidation_cause == RS_INVAL_NONE)
    1668             :         {
    1669         722 :             SpinLockRelease(&s->mutex);
    1670         722 :             if (released_lock)
    1671           0 :                 LWLockRelease(ReplicationSlotControlLock);
    1672         722 :             break;
    1673             :         }
    1674             : 
    1675          42 :         slotname = s->data.name;
    1676          42 :         active_pid = s->active_pid;
    1677             : 
    1678             :         /*
    1679             :          * If the slot can be acquired, do so and mark it invalidated
    1680             :          * immediately.  Otherwise we'll signal the owning process, below, and
    1681             :          * retry.
    1682             :          */
    1683          42 :         if (active_pid == 0)
    1684             :         {
    1685          28 :             MyReplicationSlot = s;
    1686          28 :             s->active_pid = MyProcPid;
    1687          28 :             s->data.invalidated = invalidation_cause;
    1688             : 
    1689             :             /*
    1690             :              * XXX: We should consider not overwriting restart_lsn and instead
    1691             :              * just rely on .invalidated.
    1692             :              */
    1693          28 :             if (invalidation_cause == RS_INVAL_WAL_REMOVED)
    1694           8 :                 s->data.restart_lsn = InvalidXLogRecPtr;
    1695             : 
    1696             :             /* Let caller know */
    1697          28 :             *invalidated = true;
    1698             :         }
    1699             : 
    1700          42 :         SpinLockRelease(&s->mutex);
    1701             : 
    1702             :         /*
    1703             :          * The logical replication slots shouldn't be invalidated as GUC
    1704             :          * max_slot_wal_keep_size is set to -1 during the binary upgrade. See
    1705             :          * check_old_cluster_for_valid_slots() where we ensure that no
    1706             :          * invalidated before the upgrade.
    1707             :          */
    1708             :         Assert(!(*invalidated && SlotIsLogical(s) && IsBinaryUpgrade));
    1709             : 
    1710          42 :         if (active_pid != 0)
    1711             :         {
    1712             :             /*
    1713             :              * Prepare the sleep on the slot's condition variable before
    1714             :              * releasing the lock, to close a possible race condition if the
    1715             :              * slot is released before the sleep below.
    1716             :              */
    1717          14 :             ConditionVariablePrepareToSleep(&s->active_cv);
    1718             : 
    1719          14 :             LWLockRelease(ReplicationSlotControlLock);
    1720          14 :             released_lock = true;
    1721             : 
    1722             :             /*
    1723             :              * Signal to terminate the process that owns the slot, if we
    1724             :              * haven't already signalled it.  (Avoidance of repeated
    1725             :              * signalling is the only reason for there to be a loop in this
    1726             :              * routine; otherwise we could rely on caller's restart loop.)
    1727             :              *
    1728             :              * There is the race condition that other process may own the slot
    1729             :              * after its current owner process is terminated and before this
    1730             :              * process owns it. To handle that, we signal only if the PID of
    1731             :              * the owning process has changed from the previous time. (This
    1732             :              * logic assumes that the same PID is not reused very quickly.)
    1733             :              */
    1734          14 :             if (last_signaled_pid != active_pid)
    1735             :             {
    1736          14 :                 ReportSlotInvalidation(invalidation_cause, true, active_pid,
    1737             :                                        slotname, restart_lsn,
    1738             :                                        oldestLSN, snapshotConflictHorizon);
    1739             : 
    1740          14 :                 if (MyBackendType == B_STARTUP)
    1741          10 :                     (void) SendProcSignal(active_pid,
    1742             :                                           PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT,
    1743             :                                           INVALID_PROC_NUMBER);
    1744             :                 else
    1745           4 :                     (void) kill(active_pid, SIGTERM);
    1746             : 
    1747          14 :                 last_signaled_pid = active_pid;
    1748          14 :                 terminated = true;
    1749          14 :                 invalidation_cause_prev = invalidation_cause;
    1750             :             }
    1751             : 
    1752             :             /* Wait until the slot is released. */
    1753          14 :             ConditionVariableSleep(&s->active_cv,
    1754             :                                    WAIT_EVENT_REPLICATION_SLOT_DROP);
    1755             : 
    1756             :             /*
    1757             :              * Re-acquire lock and start over; we expect to invalidate the
    1758             :              * slot next time (unless another process acquires the slot in the
    1759             :              * meantime).
    1760             :              */
    1761          14 :             LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    1762          14 :             continue;
    1763             :         }
    1764             :         else
    1765             :         {
    1766             :             /*
    1767             :              * We hold the slot now and have already invalidated it; flush it
    1768             :              * to ensure that state persists.
    1769             :              *
    1770             :              * Don't want to hold ReplicationSlotControlLock across file
    1771             :              * system operations, so release it now but be sure to tell caller
    1772             :              * to restart from scratch.
    1773             :              */
    1774          28 :             LWLockRelease(ReplicationSlotControlLock);
    1775          28 :             released_lock = true;
    1776             : 
    1777             :             /* Make sure the invalidated state persists across server restart */
    1778          28 :             ReplicationSlotMarkDirty();
    1779          28 :             ReplicationSlotSave();
    1780          28 :             ReplicationSlotRelease();
    1781             : 
    1782          28 :             ReportSlotInvalidation(invalidation_cause, false, active_pid,
    1783             :                                    slotname, restart_lsn,
    1784             :                                    oldestLSN, snapshotConflictHorizon);
    1785             : 
    1786             :             /* done with this slot for now */
    1787          28 :             break;
    1788             :         }
    1789             :     }
    1790             : 
    1791             :     Assert(released_lock == !LWLockHeldByMe(ReplicationSlotControlLock));
    1792             : 
    1793         750 :     return released_lock;
    1794             : }
    1795             : 
    1796             : /*
    1797             :  * Invalidate slots that require resources about to be removed.
    1798             :  *
    1799             :  * Returns true when any slot have got invalidated.
    1800             :  *
    1801             :  * Whether a slot needs to be invalidated depends on the cause. A slot is
    1802             :  * removed if it:
    1803             :  * - RS_INVAL_WAL_REMOVED: requires a LSN older than the given segment
    1804             :  * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given
    1805             :  *   db; dboid may be InvalidOid for shared relations
    1806             :  * - RS_INVAL_WAL_LEVEL: is logical
    1807             :  *
    1808             :  * NB - this runs as part of checkpoint, so avoid raising errors if possible.
    1809             :  */
    1810             : bool
    1811        2516 : InvalidateObsoleteReplicationSlots(ReplicationSlotInvalidationCause cause,
    1812             :                                    XLogSegNo oldestSegno, Oid dboid,
    1813             :                                    TransactionId snapshotConflictHorizon)
    1814             : {
    1815             :     XLogRecPtr  oldestLSN;
    1816        2516 :     bool        invalidated = false;
    1817             : 
    1818             :     Assert(cause != RS_INVAL_HORIZON || TransactionIdIsValid(snapshotConflictHorizon));
    1819             :     Assert(cause != RS_INVAL_WAL_REMOVED || oldestSegno > 0);
    1820             :     Assert(cause != RS_INVAL_NONE);
    1821             : 
    1822        2516 :     if (max_replication_slots == 0)
    1823           2 :         return invalidated;
    1824             : 
    1825        2514 :     XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
    1826             : 
    1827        2542 : restart:
    1828        2542 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    1829       26986 :     for (int i = 0; i < max_replication_slots; i++)
    1830             :     {
    1831       24472 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
    1832             : 
    1833       24472 :         if (!s->in_use)
    1834       23722 :             continue;
    1835             : 
    1836         750 :         if (InvalidatePossiblyObsoleteSlot(cause, s, oldestLSN, dboid,
    1837             :                                            snapshotConflictHorizon,
    1838             :                                            &invalidated))
    1839             :         {
    1840             :             /* if the lock was released, start from scratch */
    1841          28 :             goto restart;
    1842             :         }
    1843             :     }
    1844        2514 :     LWLockRelease(ReplicationSlotControlLock);
    1845             : 
    1846             :     /*
    1847             :      * If any slots have been invalidated, recalculate the resource limits.
    1848             :      */
    1849        2514 :     if (invalidated)
    1850             :     {
    1851          18 :         ReplicationSlotsComputeRequiredXmin(false);
    1852          18 :         ReplicationSlotsComputeRequiredLSN();
    1853             :     }
    1854             : 
    1855        2514 :     return invalidated;
    1856             : }
    1857             : 
    1858             : /*
    1859             :  * Flush all replication slots to disk.
    1860             :  *
    1861             :  * It is convenient to flush dirty replication slots at the time of checkpoint.
    1862             :  * Additionally, in case of a shutdown checkpoint, we also identify the slots
    1863             :  * for which the confirmed_flush LSN has been updated since the last time it
    1864             :  * was saved and flush them.
    1865             :  */
    1866             : void
    1867        2476 : CheckPointReplicationSlots(bool is_shutdown)
    1868             : {
    1869             :     int         i;
    1870             : 
    1871        2476 :     elog(DEBUG1, "performing replication slot checkpoint");
    1872             : 
    1873             :     /*
    1874             :      * Prevent any slot from being created/dropped while we're active. As we
    1875             :      * explicitly do *not* want to block iterating over replication_slots or
    1876             :      * acquiring a slot we cannot take the control lock - but that's OK,
    1877             :      * because holding ReplicationSlotAllocationLock is strictly stronger, and
    1878             :      * enough to guarantee that nobody can change the in_use bits on us.
    1879             :      */
    1880        2476 :     LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
    1881             : 
    1882       26698 :     for (i = 0; i < max_replication_slots; i++)
    1883             :     {
    1884       24222 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
    1885             :         char        path[MAXPGPATH];
    1886             : 
    1887       24222 :         if (!s->in_use)
    1888       23574 :             continue;
    1889             : 
    1890             :         /* save the slot to disk, locking is handled in SaveSlotToPath() */
    1891         648 :         sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(s->data.name));
    1892             : 
    1893             :         /*
    1894             :          * Slot's data is not flushed each time the confirmed_flush LSN is
    1895             :          * updated as that could lead to frequent writes.  However, we decide
    1896             :          * to force a flush of all logical slot's data at the time of shutdown
    1897             :          * if the confirmed_flush LSN is changed since we last flushed it to
    1898             :          * disk.  This helps in avoiding an unnecessary retreat of the
    1899             :          * confirmed_flush LSN after restart.
    1900             :          */
    1901         648 :         if (is_shutdown && SlotIsLogical(s))
    1902             :         {
    1903         122 :             SpinLockAcquire(&s->mutex);
    1904             : 
    1905         122 :             if (s->data.invalidated == RS_INVAL_NONE &&
    1906         122 :                 s->data.confirmed_flush > s->last_saved_confirmed_flush)
    1907             :             {
    1908          70 :                 s->just_dirtied = true;
    1909          70 :                 s->dirty = true;
    1910             :             }
    1911         122 :             SpinLockRelease(&s->mutex);
    1912             :         }
    1913             : 
    1914         648 :         SaveSlotToPath(s, path, LOG);
    1915             :     }
    1916        2476 :     LWLockRelease(ReplicationSlotAllocationLock);
    1917        2476 : }
    1918             : 
    1919             : /*
    1920             :  * Load all replication slots from disk into memory at server startup. This
    1921             :  * needs to be run before we start crash recovery.
    1922             :  */
    1923             : void
    1924        1634 : StartupReplicationSlots(void)
    1925             : {
    1926             :     DIR        *replication_dir;
    1927             :     struct dirent *replication_de;
    1928             : 
    1929        1634 :     elog(DEBUG1, "starting up replication slots");
    1930             : 
    1931             :     /* restore all slots by iterating over all on-disk entries */
    1932        1634 :     replication_dir = AllocateDir(PG_REPLSLOT_DIR);
    1933        5038 :     while ((replication_de = ReadDir(replication_dir, PG_REPLSLOT_DIR)) != NULL)
    1934             :     {
    1935             :         char        path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
    1936             :         PGFileType  de_type;
    1937             : 
    1938        3404 :         if (strcmp(replication_de->d_name, ".") == 0 ||
    1939        1770 :             strcmp(replication_de->d_name, "..") == 0)
    1940        3268 :             continue;
    1941             : 
    1942         136 :         snprintf(path, sizeof(path), "%s/%s", PG_REPLSLOT_DIR, replication_de->d_name);
    1943         136 :         de_type = get_dirent_type(path, replication_de, false, DEBUG1);
    1944             : 
    1945             :         /* we're only creating directories here, skip if it's not our's */
    1946         136 :         if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_DIR)
    1947           0 :             continue;
    1948             : 
    1949             :         /* we crashed while a slot was being setup or deleted, clean up */
    1950         136 :         if (pg_str_endswith(replication_de->d_name, ".tmp"))
    1951             :         {
    1952           0 :             if (!rmtree(path, true))
    1953             :             {
    1954           0 :                 ereport(WARNING,
    1955             :                         (errmsg("could not remove directory \"%s\"",
    1956             :                                 path)));
    1957           0 :                 continue;
    1958             :             }
    1959           0 :             fsync_fname(PG_REPLSLOT_DIR, true);
    1960           0 :             continue;
    1961             :         }
    1962             : 
    1963             :         /* looks like a slot in a normal state, restore */
    1964         136 :         RestoreSlotFromDisk(replication_de->d_name);
    1965             :     }
    1966        1634 :     FreeDir(replication_dir);
    1967             : 
    1968             :     /* currently no slots exist, we're done. */
    1969        1634 :     if (max_replication_slots <= 0)
    1970           2 :         return;
    1971             : 
    1972             :     /* Now that we have recovered all the data, compute replication xmin */
    1973        1632 :     ReplicationSlotsComputeRequiredXmin(false);
    1974        1632 :     ReplicationSlotsComputeRequiredLSN();
    1975             : }
    1976             : 
    1977             : /* ----
    1978             :  * Manipulation of on-disk state of replication slots
    1979             :  *
    1980             :  * NB: none of the routines below should take any notice whether a slot is the
    1981             :  * current one or not, that's all handled a layer above.
    1982             :  * ----
    1983             :  */
    1984             : static void
    1985        1164 : CreateSlotOnDisk(ReplicationSlot *slot)
    1986             : {
    1987             :     char        tmppath[MAXPGPATH];
    1988             :     char        path[MAXPGPATH];
    1989             :     struct stat st;
    1990             : 
    1991             :     /*
    1992             :      * No need to take out the io_in_progress_lock, nobody else can see this
    1993             :      * slot yet, so nobody else will write. We're reusing SaveSlotToPath which
    1994             :      * takes out the lock, if we'd take the lock here, we'd deadlock.
    1995             :      */
    1996             : 
    1997        1164 :     sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(slot->data.name));
    1998        1164 :     sprintf(tmppath, "%s/%s.tmp", PG_REPLSLOT_DIR, NameStr(slot->data.name));
    1999             : 
    2000             :     /*
    2001             :      * It's just barely possible that some previous effort to create or drop a
    2002             :      * slot with this name left a temp directory lying around. If that seems
    2003             :      * to be the case, try to remove it.  If the rmtree() fails, we'll error
    2004             :      * out at the MakePGDirectory() below, so we don't bother checking
    2005             :      * success.
    2006             :      */
    2007        1164 :     if (stat(tmppath, &st) == 0 && S_ISDIR(st.st_mode))
    2008           0 :         rmtree(tmppath, true);
    2009             : 
    2010             :     /* Create and fsync the temporary slot directory. */
    2011        1164 :     if (MakePGDirectory(tmppath) < 0)
    2012           0 :         ereport(ERROR,
    2013             :                 (errcode_for_file_access(),
    2014             :                  errmsg("could not create directory \"%s\": %m",
    2015             :                         tmppath)));
    2016        1164 :     fsync_fname(tmppath, true);
    2017             : 
    2018             :     /* Write the actual state file. */
    2019        1164 :     slot->dirty = true;          /* signal that we really need to write */
    2020        1164 :     SaveSlotToPath(slot, tmppath, ERROR);
    2021             : 
    2022             :     /* Rename the directory into place. */
    2023        1164 :     if (rename(tmppath, path) != 0)
    2024           0 :         ereport(ERROR,
    2025             :                 (errcode_for_file_access(),
    2026             :                  errmsg("could not rename file \"%s\" to \"%s\": %m",
    2027             :                         tmppath, path)));
    2028             : 
    2029             :     /*
    2030             :      * If we'd now fail - really unlikely - we wouldn't know whether this slot
    2031             :      * would persist after an OS crash or not - so, force a restart. The
    2032             :      * restart would try to fsync this again till it works.
    2033             :      */
    2034        1164 :     START_CRIT_SECTION();
    2035             : 
    2036        1164 :     fsync_fname(path, true);
    2037        1164 :     fsync_fname(PG_REPLSLOT_DIR, true);
    2038             : 
    2039        1164 :     END_CRIT_SECTION();
    2040        1164 : }
    2041             : 
    2042             : /*
    2043             :  * Shared functionality between saving and creating a replication slot.
    2044             :  */
    2045             : static void
    2046        4122 : SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
    2047             : {
    2048             :     char        tmppath[MAXPGPATH];
    2049             :     char        path[MAXPGPATH];
    2050             :     int         fd;
    2051             :     ReplicationSlotOnDisk cp;
    2052             :     bool        was_dirty;
    2053             : 
    2054             :     /* first check whether there's something to write out */
    2055        4122 :     SpinLockAcquire(&slot->mutex);
    2056        4122 :     was_dirty = slot->dirty;
    2057        4122 :     slot->just_dirtied = false;
    2058        4122 :     SpinLockRelease(&slot->mutex);
    2059             : 
    2060             :     /* and don't do anything if there's nothing to write */
    2061        4122 :     if (!was_dirty)
    2062         188 :         return;
    2063             : 
    2064        3934 :     LWLockAcquire(&slot->io_in_progress_lock, LW_EXCLUSIVE);
    2065             : 
    2066             :     /* silence valgrind :( */
    2067        3934 :     memset(&cp, 0, sizeof(ReplicationSlotOnDisk));
    2068             : 
    2069        3934 :     sprintf(tmppath, "%s/state.tmp", dir);
    2070        3934 :     sprintf(path, "%s/state", dir);
    2071             : 
    2072        3934 :     fd = OpenTransientFile(tmppath, O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
    2073        3934 :     if (fd < 0)
    2074             :     {
    2075             :         /*
    2076             :          * If not an ERROR, then release the lock before returning.  In case
    2077             :          * of an ERROR, the error recovery path automatically releases the
    2078             :          * lock, but no harm in explicitly releasing even in that case.  Note
    2079             :          * that LWLockRelease() could affect errno.
    2080             :          */
    2081           0 :         int         save_errno = errno;
    2082             : 
    2083           0 :         LWLockRelease(&slot->io_in_progress_lock);
    2084           0 :         errno = save_errno;
    2085           0 :         ereport(elevel,
    2086             :                 (errcode_for_file_access(),
    2087             :                  errmsg("could not create file \"%s\": %m",
    2088             :                         tmppath)));
    2089           0 :         return;
    2090             :     }
    2091             : 
    2092        3934 :     cp.magic = SLOT_MAGIC;
    2093        3934 :     INIT_CRC32C(cp.checksum);
    2094        3934 :     cp.version = SLOT_VERSION;
    2095        3934 :     cp.length = ReplicationSlotOnDiskV2Size;
    2096             : 
    2097        3934 :     SpinLockAcquire(&slot->mutex);
    2098             : 
    2099        3934 :     memcpy(&cp.slotdata, &slot->data, sizeof(ReplicationSlotPersistentData));
    2100             : 
    2101        3934 :     SpinLockRelease(&slot->mutex);
    2102             : 
    2103        3934 :     COMP_CRC32C(cp.checksum,
    2104             :                 (char *) (&cp) + ReplicationSlotOnDiskNotChecksummedSize,
    2105             :                 ReplicationSlotOnDiskChecksummedSize);
    2106        3934 :     FIN_CRC32C(cp.checksum);
    2107             : 
    2108        3934 :     errno = 0;
    2109        3934 :     pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_WRITE);
    2110        3934 :     if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
    2111             :     {
    2112           0 :         int         save_errno = errno;
    2113             : 
    2114           0 :         pgstat_report_wait_end();
    2115           0 :         CloseTransientFile(fd);
    2116           0 :         LWLockRelease(&slot->io_in_progress_lock);
    2117             : 
    2118             :         /* if write didn't set errno, assume problem is no disk space */
    2119           0 :         errno = save_errno ? save_errno : ENOSPC;
    2120           0 :         ereport(elevel,
    2121             :                 (errcode_for_file_access(),
    2122             :                  errmsg("could not write to file \"%s\": %m",
    2123             :                         tmppath)));
    2124           0 :         return;
    2125             :     }
    2126        3934 :     pgstat_report_wait_end();
    2127             : 
    2128             :     /* fsync the temporary file */
    2129        3934 :     pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_SYNC);
    2130        3934 :     if (pg_fsync(fd) != 0)
    2131             :     {
    2132           0 :         int         save_errno = errno;
    2133             : 
    2134           0 :         pgstat_report_wait_end();
    2135           0 :         CloseTransientFile(fd);
    2136           0 :         LWLockRelease(&slot->io_in_progress_lock);
    2137           0 :         errno = save_errno;
    2138           0 :         ereport(elevel,
    2139             :                 (errcode_for_file_access(),
    2140             :                  errmsg("could not fsync file \"%s\": %m",
    2141             :                         tmppath)));
    2142           0 :         return;
    2143             :     }
    2144        3934 :     pgstat_report_wait_end();
    2145             : 
    2146        3934 :     if (CloseTransientFile(fd) != 0)
    2147             :     {
    2148           0 :         int         save_errno = errno;
    2149             : 
    2150           0 :         LWLockRelease(&slot->io_in_progress_lock);
    2151           0 :         errno = save_errno;
    2152           0 :         ereport(elevel,
    2153             :                 (errcode_for_file_access(),
    2154             :                  errmsg("could not close file \"%s\": %m",
    2155             :                         tmppath)));
    2156           0 :         return;
    2157             :     }
    2158             : 
    2159             :     /* rename to permanent file, fsync file and directory */
    2160        3934 :     if (rename(tmppath, path) != 0)
    2161             :     {
    2162           0 :         int         save_errno = errno;
    2163             : 
    2164           0 :         LWLockRelease(&slot->io_in_progress_lock);
    2165           0 :         errno = save_errno;
    2166           0 :         ereport(elevel,
    2167             :                 (errcode_for_file_access(),
    2168             :                  errmsg("could not rename file \"%s\" to \"%s\": %m",
    2169             :                         tmppath, path)));
    2170           0 :         return;
    2171             :     }
    2172             : 
    2173             :     /*
    2174             :      * Check CreateSlotOnDisk() for the reasoning of using a critical section.
    2175             :      */
    2176        3934 :     START_CRIT_SECTION();
    2177             : 
    2178        3934 :     fsync_fname(path, false);
    2179        3934 :     fsync_fname(dir, true);
    2180        3934 :     fsync_fname(PG_REPLSLOT_DIR, true);
    2181             : 
    2182        3934 :     END_CRIT_SECTION();
    2183             : 
    2184             :     /*
    2185             :      * Successfully wrote, unset dirty bit, unless somebody dirtied again
    2186             :      * already and remember the confirmed_flush LSN value.
    2187             :      */
    2188        3934 :     SpinLockAcquire(&slot->mutex);
    2189        3934 :     if (!slot->just_dirtied)
    2190        3926 :         slot->dirty = false;
    2191        3934 :     slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
    2192        3934 :     SpinLockRelease(&slot->mutex);
    2193             : 
    2194        3934 :     LWLockRelease(&slot->io_in_progress_lock);
    2195             : }
    2196             : 
    2197             : /*
    2198             :  * Load a single slot from disk into memory.
    2199             :  */
    2200             : static void
    2201         136 : RestoreSlotFromDisk(const char *name)
    2202             : {
    2203             :     ReplicationSlotOnDisk cp;
    2204             :     int         i;
    2205             :     char        slotdir[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
    2206             :     char        path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR) + 10];
    2207             :     int         fd;
    2208         136 :     bool        restored = false;
    2209             :     int         readBytes;
    2210             :     pg_crc32c   checksum;
    2211             : 
    2212             :     /* no need to lock here, no concurrent access allowed yet */
    2213             : 
    2214             :     /* delete temp file if it exists */
    2215         136 :     sprintf(slotdir, "%s/%s", PG_REPLSLOT_DIR, name);
    2216         136 :     sprintf(path, "%s/state.tmp", slotdir);
    2217         136 :     if (unlink(path) < 0 && errno != ENOENT)
    2218           0 :         ereport(PANIC,
    2219             :                 (errcode_for_file_access(),
    2220             :                  errmsg("could not remove file \"%s\": %m", path)));
    2221             : 
    2222         136 :     sprintf(path, "%s/state", slotdir);
    2223             : 
    2224         136 :     elog(DEBUG1, "restoring replication slot from \"%s\"", path);
    2225             : 
    2226             :     /* on some operating systems fsyncing a file requires O_RDWR */
    2227         136 :     fd = OpenTransientFile(path, O_RDWR | PG_BINARY);
    2228             : 
    2229             :     /*
    2230             :      * We do not need to handle this as we are rename()ing the directory into
    2231             :      * place only after we fsync()ed the state file.
    2232             :      */
    2233         136 :     if (fd < 0)
    2234           0 :         ereport(PANIC,
    2235             :                 (errcode_for_file_access(),
    2236             :                  errmsg("could not open file \"%s\": %m", path)));
    2237             : 
    2238             :     /*
    2239             :      * Sync state file before we're reading from it. We might have crashed
    2240             :      * while it wasn't synced yet and we shouldn't continue on that basis.
    2241             :      */
    2242         136 :     pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_RESTORE_SYNC);
    2243         136 :     if (pg_fsync(fd) != 0)
    2244           0 :         ereport(PANIC,
    2245             :                 (errcode_for_file_access(),
    2246             :                  errmsg("could not fsync file \"%s\": %m",
    2247             :                         path)));
    2248         136 :     pgstat_report_wait_end();
    2249             : 
    2250             :     /* Also sync the parent directory */
    2251         136 :     START_CRIT_SECTION();
    2252         136 :     fsync_fname(slotdir, true);
    2253         136 :     END_CRIT_SECTION();
    2254             : 
    2255             :     /* read part of statefile that's guaranteed to be version independent */
    2256         136 :     pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
    2257         136 :     readBytes = read(fd, &cp, ReplicationSlotOnDiskConstantSize);
    2258         136 :     pgstat_report_wait_end();
    2259         136 :     if (readBytes != ReplicationSlotOnDiskConstantSize)
    2260             :     {
    2261           0 :         if (readBytes < 0)
    2262           0 :             ereport(PANIC,
    2263             :                     (errcode_for_file_access(),
    2264             :                      errmsg("could not read file \"%s\": %m", path)));
    2265             :         else
    2266           0 :             ereport(PANIC,
    2267             :                     (errcode(ERRCODE_DATA_CORRUPTED),
    2268             :                      errmsg("could not read file \"%s\": read %d of %zu",
    2269             :                             path, readBytes,
    2270             :                             (Size) ReplicationSlotOnDiskConstantSize)));
    2271             :     }
    2272             : 
    2273             :     /* verify magic */
    2274         136 :     if (cp.magic != SLOT_MAGIC)
    2275           0 :         ereport(PANIC,
    2276             :                 (errcode(ERRCODE_DATA_CORRUPTED),
    2277             :                  errmsg("replication slot file \"%s\" has wrong magic number: %u instead of %u",
    2278             :                         path, cp.magic, SLOT_MAGIC)));
    2279             : 
    2280             :     /* verify version */
    2281         136 :     if (cp.version != SLOT_VERSION)
    2282           0 :         ereport(PANIC,
    2283             :                 (errcode(ERRCODE_DATA_CORRUPTED),
    2284             :                  errmsg("replication slot file \"%s\" has unsupported version %u",
    2285             :                         path, cp.version)));
    2286             : 
    2287             :     /* boundary check on length */
    2288         136 :     if (cp.length != ReplicationSlotOnDiskV2Size)
    2289           0 :         ereport(PANIC,
    2290             :                 (errcode(ERRCODE_DATA_CORRUPTED),
    2291             :                  errmsg("replication slot file \"%s\" has corrupted length %u",
    2292             :                         path, cp.length)));
    2293             : 
    2294             :     /* Now that we know the size, read the entire file */
    2295         136 :     pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
    2296         272 :     readBytes = read(fd,
    2297             :                      (char *) &cp + ReplicationSlotOnDiskConstantSize,
    2298         136 :                      cp.length);
    2299         136 :     pgstat_report_wait_end();
    2300         136 :     if (readBytes != cp.length)
    2301             :     {
    2302           0 :         if (readBytes < 0)
    2303           0 :             ereport(PANIC,
    2304             :                     (errcode_for_file_access(),
    2305             :                      errmsg("could not read file \"%s\": %m", path)));
    2306             :         else
    2307           0 :             ereport(PANIC,
    2308             :                     (errcode(ERRCODE_DATA_CORRUPTED),
    2309             :                      errmsg("could not read file \"%s\": read %d of %zu",
    2310             :                             path, readBytes, (Size) cp.length)));
    2311             :     }
    2312             : 
    2313         136 :     if (CloseTransientFile(fd) != 0)
    2314           0 :         ereport(PANIC,
    2315             :                 (errcode_for_file_access(),
    2316             :                  errmsg("could not close file \"%s\": %m", path)));
    2317             : 
    2318             :     /* now verify the CRC */
    2319         136 :     INIT_CRC32C(checksum);
    2320         136 :     COMP_CRC32C(checksum,
    2321             :                 (char *) &cp + ReplicationSlotOnDiskNotChecksummedSize,
    2322             :                 ReplicationSlotOnDiskChecksummedSize);
    2323         136 :     FIN_CRC32C(checksum);
    2324             : 
    2325         136 :     if (!EQ_CRC32C(checksum, cp.checksum))
    2326           0 :         ereport(PANIC,
    2327             :                 (errmsg("checksum mismatch for replication slot file \"%s\": is %u, should be %u",
    2328             :                         path, checksum, cp.checksum)));
    2329             : 
    2330             :     /*
    2331             :      * If we crashed with an ephemeral slot active, don't restore but delete
    2332             :      * it.
    2333             :      */
    2334         136 :     if (cp.slotdata.persistency != RS_PERSISTENT)
    2335             :     {
    2336           0 :         if (!rmtree(slotdir, true))
    2337             :         {
    2338           0 :             ereport(WARNING,
    2339             :                     (errmsg("could not remove directory \"%s\"",
    2340             :                             slotdir)));
    2341             :         }
    2342           0 :         fsync_fname(PG_REPLSLOT_DIR, true);
    2343           0 :         return;
    2344             :     }
    2345             : 
    2346             :     /*
    2347             :      * Verify that requirements for the specific slot type are met. That's
    2348             :      * important because if these aren't met we're not guaranteed to retain
    2349             :      * all the necessary resources for the slot.
    2350             :      *
    2351             :      * NB: We have to do so *after* the above checks for ephemeral slots,
    2352             :      * because otherwise a slot that shouldn't exist anymore could prevent
    2353             :      * restarts.
    2354             :      *
    2355             :      * NB: Changing the requirements here also requires adapting
    2356             :      * CheckSlotRequirements() and CheckLogicalDecodingRequirements().
    2357             :      */
    2358         136 :     if (cp.slotdata.database != InvalidOid && wal_level < WAL_LEVEL_LOGICAL)
    2359           0 :         ereport(FATAL,
    2360             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2361             :                  errmsg("logical replication slot \"%s\" exists, but \"wal_level\" < \"logical\"",
    2362             :                         NameStr(cp.slotdata.name)),
    2363             :                  errhint("Change \"wal_level\" to be \"logical\" or higher.")));
    2364         136 :     else if (wal_level < WAL_LEVEL_REPLICA)
    2365           0 :         ereport(FATAL,
    2366             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2367             :                  errmsg("physical replication slot \"%s\" exists, but \"wal_level\" < \"replica\"",
    2368             :                         NameStr(cp.slotdata.name)),
    2369             :                  errhint("Change \"wal_level\" to be \"replica\" or higher.")));
    2370             : 
    2371             :     /* nothing can be active yet, don't lock anything */
    2372         188 :     for (i = 0; i < max_replication_slots; i++)
    2373             :     {
    2374             :         ReplicationSlot *slot;
    2375             : 
    2376         188 :         slot = &ReplicationSlotCtl->replication_slots[i];
    2377             : 
    2378         188 :         if (slot->in_use)
    2379          52 :             continue;
    2380             : 
    2381             :         /* restore the entire set of persistent data */
    2382         136 :         memcpy(&slot->data, &cp.slotdata,
    2383             :                sizeof(ReplicationSlotPersistentData));
    2384             : 
    2385             :         /* initialize in memory state */
    2386         136 :         slot->effective_xmin = cp.slotdata.xmin;
    2387         136 :         slot->effective_catalog_xmin = cp.slotdata.catalog_xmin;
    2388         136 :         slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
    2389             : 
    2390         136 :         slot->candidate_catalog_xmin = InvalidTransactionId;
    2391         136 :         slot->candidate_xmin_lsn = InvalidXLogRecPtr;
    2392         136 :         slot->candidate_restart_lsn = InvalidXLogRecPtr;
    2393         136 :         slot->candidate_restart_valid = InvalidXLogRecPtr;
    2394             : 
    2395         136 :         slot->in_use = true;
    2396         136 :         slot->active_pid = 0;
    2397             : 
    2398             :         /*
    2399             :          * Set the time since the slot has become inactive after loading the
    2400             :          * slot from the disk into memory. Whoever acquires the slot i.e.
    2401             :          * makes the slot active will reset it.
    2402             :          */
    2403         136 :         slot->inactive_since = GetCurrentTimestamp();
    2404             : 
    2405         136 :         restored = true;
    2406         136 :         break;
    2407             :     }
    2408             : 
    2409         136 :     if (!restored)
    2410           0 :         ereport(FATAL,
    2411             :                 (errmsg("too many replication slots active before shutdown"),
    2412             :                  errhint("Increase \"max_replication_slots\" and try again.")));
    2413             : }
    2414             : 
    2415             : /*
    2416             :  * Maps an invalidation reason for a replication slot to
    2417             :  * ReplicationSlotInvalidationCause.
    2418             :  */
    2419             : ReplicationSlotInvalidationCause
    2420           0 : GetSlotInvalidationCause(const char *invalidation_reason)
    2421             : {
    2422             :     ReplicationSlotInvalidationCause cause;
    2423           0 :     ReplicationSlotInvalidationCause result = RS_INVAL_NONE;
    2424           0 :     bool        found PG_USED_FOR_ASSERTS_ONLY = false;
    2425             : 
    2426             :     Assert(invalidation_reason);
    2427             : 
    2428           0 :     for (cause = RS_INVAL_NONE; cause <= RS_INVAL_MAX_CAUSES; cause++)
    2429             :     {
    2430           0 :         if (strcmp(SlotInvalidationCauses[cause], invalidation_reason) == 0)
    2431             :         {
    2432           0 :             found = true;
    2433           0 :             result = cause;
    2434           0 :             break;
    2435             :         }
    2436             :     }
    2437             : 
    2438             :     Assert(found);
    2439           0 :     return result;
    2440             : }
    2441             : 
    2442             : /*
    2443             :  * A helper function to validate slots specified in GUC synchronized_standby_slots.
    2444             :  *
    2445             :  * The rawname will be parsed, and the result will be saved into *elemlist.
    2446             :  */
    2447             : static bool
    2448          12 : validate_sync_standby_slots(char *rawname, List **elemlist)
    2449             : {
    2450             :     bool        ok;
    2451             : 
    2452             :     /* Verify syntax and parse string into a list of identifiers */
    2453          12 :     ok = SplitIdentifierString(rawname, ',', elemlist);
    2454             : 
    2455          12 :     if (!ok)
    2456             :     {
    2457           0 :         GUC_check_errdetail("List syntax is invalid.");
    2458             :     }
    2459          12 :     else if (!ReplicationSlotCtl)
    2460             :     {
    2461             :         /*
    2462             :          * We cannot validate the replication slot if the replication slots'
    2463             :          * data has not been initialized. This is ok as we will anyway
    2464             :          * validate the specified slot when waiting for them to catch up. See
    2465             :          * StandbySlotsHaveCaughtup() for details.
    2466             :          */
    2467             :     }
    2468             :     else
    2469             :     {
    2470             :         /* Check that the specified slots exist and are logical slots */
    2471          12 :         LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    2472             : 
    2473          36 :         foreach_ptr(char, name, *elemlist)
    2474             :         {
    2475             :             ReplicationSlot *slot;
    2476             : 
    2477          12 :             slot = SearchNamedReplicationSlot(name, false);
    2478             : 
    2479          12 :             if (!slot)
    2480             :             {
    2481           0 :                 GUC_check_errdetail("replication slot \"%s\" does not exist",
    2482             :                                     name);
    2483           0 :                 ok = false;
    2484           0 :                 break;
    2485             :             }
    2486             : 
    2487          12 :             if (!SlotIsPhysical(slot))
    2488             :             {
    2489           0 :                 GUC_check_errdetail("\"%s\" is not a physical replication slot",
    2490             :                                     name);
    2491           0 :                 ok = false;
    2492           0 :                 break;
    2493             :             }
    2494             :         }
    2495             : 
    2496          12 :         LWLockRelease(ReplicationSlotControlLock);
    2497             :     }
    2498             : 
    2499          12 :     return ok;
    2500             : }
    2501             : 
    2502             : /*
    2503             :  * GUC check_hook for synchronized_standby_slots
    2504             :  */
    2505             : bool
    2506        1986 : check_synchronized_standby_slots(char **newval, void **extra, GucSource source)
    2507             : {
    2508             :     char       *rawname;
    2509             :     char       *ptr;
    2510             :     List       *elemlist;
    2511             :     int         size;
    2512             :     bool        ok;
    2513             :     SyncStandbySlotsConfigData *config;
    2514             : 
    2515        1986 :     if ((*newval)[0] == '\0')
    2516        1974 :         return true;
    2517             : 
    2518             :     /* Need a modifiable copy of the GUC string */
    2519          12 :     rawname = pstrdup(*newval);
    2520             : 
    2521             :     /* Now verify if the specified slots exist and have correct type */
    2522          12 :     ok = validate_sync_standby_slots(rawname, &elemlist);
    2523             : 
    2524          12 :     if (!ok || elemlist == NIL)
    2525             :     {
    2526           0 :         pfree(rawname);
    2527           0 :         list_free(elemlist);
    2528           0 :         return ok;
    2529             :     }
    2530             : 
    2531             :     /* Compute the size required for the SyncStandbySlotsConfigData struct */
    2532          12 :     size = offsetof(SyncStandbySlotsConfigData, slot_names);
    2533          36 :     foreach_ptr(char, slot_name, elemlist)
    2534          12 :         size += strlen(slot_name) + 1;
    2535             : 
    2536             :     /* GUC extra value must be guc_malloc'd, not palloc'd */
    2537          12 :     config = (SyncStandbySlotsConfigData *) guc_malloc(LOG, size);
    2538             : 
    2539             :     /* Transform the data into SyncStandbySlotsConfigData */
    2540          12 :     config->nslotnames = list_length(elemlist);
    2541             : 
    2542          12 :     ptr = config->slot_names;
    2543          36 :     foreach_ptr(char, slot_name, elemlist)
    2544             :     {
    2545          12 :         strcpy(ptr, slot_name);
    2546          12 :         ptr += strlen(slot_name) + 1;
    2547             :     }
    2548             : 
    2549          12 :     *extra = (void *) config;
    2550             : 
    2551          12 :     pfree(rawname);
    2552          12 :     list_free(elemlist);
    2553          12 :     return true;
    2554             : }
    2555             : 
    2556             : /*
    2557             :  * GUC assign_hook for synchronized_standby_slots
    2558             :  */
    2559             : void
    2560        1986 : assign_synchronized_standby_slots(const char *newval, void *extra)
    2561             : {
    2562             :     /*
    2563             :      * The standby slots may have changed, so we must recompute the oldest
    2564             :      * LSN.
    2565             :      */
    2566        1986 :     ss_oldest_flush_lsn = InvalidXLogRecPtr;
    2567             : 
    2568        1986 :     synchronized_standby_slots_config = (SyncStandbySlotsConfigData *) extra;
    2569        1986 : }
    2570             : 
    2571             : /*
    2572             :  * Check if the passed slot_name is specified in the synchronized_standby_slots GUC.
    2573             :  */
    2574             : bool
    2575        9660 : SlotExistsInSyncStandbySlots(const char *slot_name)
    2576             : {
    2577             :     const char *standby_slot_name;
    2578             : 
    2579             :     /* Return false if there is no value in synchronized_standby_slots */
    2580        9660 :     if (synchronized_standby_slots_config == NULL)
    2581        9650 :         return false;
    2582             : 
    2583             :     /*
    2584             :      * XXX: We are not expecting this list to be long so a linear search
    2585             :      * shouldn't hurt but if that turns out not to be true then we can cache
    2586             :      * this information for each WalSender as well.
    2587             :      */
    2588          10 :     standby_slot_name = synchronized_standby_slots_config->slot_names;
    2589          10 :     for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
    2590             :     {
    2591          10 :         if (strcmp(standby_slot_name, slot_name) == 0)
    2592          10 :             return true;
    2593             : 
    2594           0 :         standby_slot_name += strlen(standby_slot_name) + 1;
    2595             :     }
    2596             : 
    2597           0 :     return false;
    2598             : }
    2599             : 
    2600             : /*
    2601             :  * Return true if the slots specified in synchronized_standby_slots have caught up to
    2602             :  * the given WAL location, false otherwise.
    2603             :  *
    2604             :  * The elevel parameter specifies the error level used for logging messages
    2605             :  * related to slots that do not exist, are invalidated, or are inactive.
    2606             :  */
    2607             : bool
    2608        1220 : StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
    2609             : {
    2610             :     const char *name;
    2611        1220 :     int         caught_up_slot_num = 0;
    2612        1220 :     XLogRecPtr  min_restart_lsn = InvalidXLogRecPtr;
    2613             : 
    2614             :     /*
    2615             :      * Don't need to wait for the standbys to catch up if there is no value in
    2616             :      * synchronized_standby_slots.
    2617             :      */
    2618        1220 :     if (synchronized_standby_slots_config == NULL)
    2619        1194 :         return true;
    2620             : 
    2621             :     /*
    2622             :      * Don't need to wait for the standbys to catch up if we are on a standby
    2623             :      * server, since we do not support syncing slots to cascading standbys.
    2624             :      */
    2625          26 :     if (RecoveryInProgress())
    2626           0 :         return true;
    2627             : 
    2628             :     /*
    2629             :      * Don't need to wait for the standbys to catch up if they are already
    2630             :      * beyond the specified WAL location.
    2631             :      */
    2632          26 :     if (!XLogRecPtrIsInvalid(ss_oldest_flush_lsn) &&
    2633          18 :         ss_oldest_flush_lsn >= wait_for_lsn)
    2634          10 :         return true;
    2635             : 
    2636             :     /*
    2637             :      * To prevent concurrent slot dropping and creation while filtering the
    2638             :      * slots, take the ReplicationSlotControlLock outside of the loop.
    2639             :      */
    2640          16 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    2641             : 
    2642          16 :     name = synchronized_standby_slots_config->slot_names;
    2643          22 :     for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
    2644             :     {
    2645             :         XLogRecPtr  restart_lsn;
    2646             :         bool        invalidated;
    2647             :         bool        inactive;
    2648             :         ReplicationSlot *slot;
    2649             : 
    2650          16 :         slot = SearchNamedReplicationSlot(name, false);
    2651             : 
    2652          16 :         if (!slot)
    2653             :         {
    2654             :             /*
    2655             :              * If a slot name provided in synchronized_standby_slots does not
    2656             :              * exist, report a message and exit the loop. A user can specify a
    2657             :              * slot name that does not exist just before the server startup.
    2658             :              * The GUC check_hook(validate_sync_standby_slots) cannot validate
    2659             :              * such a slot during startup as the ReplicationSlotCtl shared
    2660             :              * memory is not initialized at that time. It is also possible for
    2661             :              * a user to drop the slot in synchronized_standby_slots
    2662             :              * afterwards.
    2663             :              */
    2664           0 :             ereport(elevel,
    2665             :                     errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    2666             :                     errmsg("replication slot \"%s\" specified in parameter \"%s\" does not exist",
    2667             :                            name, "synchronized_standby_slots"),
    2668             :                     errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
    2669             :                               name),
    2670             :                     errhint("Create the replication slot \"%s\" or amend parameter \"%s\".",
    2671             :                             name, "synchronized_standby_slots"));
    2672           0 :             break;
    2673             :         }
    2674             : 
    2675          16 :         if (SlotIsLogical(slot))
    2676             :         {
    2677             :             /*
    2678             :              * If a logical slot name is provided in
    2679             :              * synchronized_standby_slots, report a message and exit the loop.
    2680             :              * Similar to the non-existent case, a user can specify a logical
    2681             :              * slot name in synchronized_standby_slots before the server
    2682             :              * startup, or drop an existing physical slot and recreate a
    2683             :              * logical slot with the same name.
    2684             :              */
    2685           0 :             ereport(elevel,
    2686             :                     errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    2687             :                     errmsg("cannot specify logical replication slot \"%s\" in parameter \"%s\"",
    2688             :                            name, "synchronized_standby_slots"),
    2689             :                     errdetail("Logical replication is waiting for correction on replication slot \"%s\".",
    2690             :                               name),
    2691             :                     errhint("Remove the logical replication slot \"%s\" from parameter \"%s\".",
    2692             :                             name, "synchronized_standby_slots"));
    2693           0 :             break;
    2694             :         }
    2695             : 
    2696          16 :         SpinLockAcquire(&slot->mutex);
    2697          16 :         restart_lsn = slot->data.restart_lsn;
    2698          16 :         invalidated = slot->data.invalidated != RS_INVAL_NONE;
    2699          16 :         inactive = slot->active_pid == 0;
    2700          16 :         SpinLockRelease(&slot->mutex);
    2701             : 
    2702          16 :         if (invalidated)
    2703             :         {
    2704             :             /* Specified physical slot has been invalidated */
    2705           0 :             ereport(elevel,
    2706             :                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2707             :                     errmsg("physical replication slot \"%s\" specified in parameter \"%s\" has been invalidated",
    2708             :                            name, "synchronized_standby_slots"),
    2709             :                     errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
    2710             :                               name),
    2711             :                     errhint("Drop and recreate the replication slot \"%s\", or amend parameter \"%s\".",
    2712             :                             name, "synchronized_standby_slots"));
    2713           0 :             break;
    2714             :         }
    2715             : 
    2716          16 :         if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn < wait_for_lsn)
    2717             :         {
    2718             :             /* Log a message if no active_pid for this physical slot */
    2719          10 :             if (inactive)
    2720           8 :                 ereport(elevel,
    2721             :                         errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    2722             :                         errmsg("replication slot \"%s\" specified in parameter \"%s\" does not have active_pid",
    2723             :                                name, "synchronized_standby_slots"),
    2724             :                         errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
    2725             :                                   name),
    2726             :                         errhint("Start the standby associated with the replication slot \"%s\", or amend parameter \"%s\".",
    2727             :                                 name, "synchronized_standby_slots"));
    2728             : 
    2729             :             /* Continue if the current slot hasn't caught up. */
    2730          10 :             break;
    2731             :         }
    2732             : 
    2733             :         Assert(restart_lsn >= wait_for_lsn);
    2734             : 
    2735           6 :         if (XLogRecPtrIsInvalid(min_restart_lsn) ||
    2736             :             min_restart_lsn > restart_lsn)
    2737           6 :             min_restart_lsn = restart_lsn;
    2738             : 
    2739           6 :         caught_up_slot_num++;
    2740             : 
    2741           6 :         name += strlen(name) + 1;
    2742             :     }
    2743             : 
    2744          16 :     LWLockRelease(ReplicationSlotControlLock);
    2745             : 
    2746             :     /*
    2747             :      * Return false if not all the standbys have caught up to the specified
    2748             :      * WAL location.
    2749             :      */
    2750          16 :     if (caught_up_slot_num != synchronized_standby_slots_config->nslotnames)
    2751          10 :         return false;
    2752             : 
    2753             :     /* The ss_oldest_flush_lsn must not retreat. */
    2754             :     Assert(XLogRecPtrIsInvalid(ss_oldest_flush_lsn) ||
    2755             :            min_restart_lsn >= ss_oldest_flush_lsn);
    2756             : 
    2757           6 :     ss_oldest_flush_lsn = min_restart_lsn;
    2758             : 
    2759           6 :     return true;
    2760             : }
    2761             : 
    2762             : /*
    2763             :  * Wait for physical standbys to confirm receiving the given lsn.
    2764             :  *
    2765             :  * Used by logical decoding SQL functions. It waits for physical standbys
    2766             :  * corresponding to the physical slots specified in the synchronized_standby_slots GUC.
    2767             :  */
    2768             : void
    2769         426 : WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
    2770             : {
    2771             :     /*
    2772             :      * Don't need to wait for the standby to catch up if the current acquired
    2773             :      * slot is not a logical failover slot, or there is no value in
    2774             :      * synchronized_standby_slots.
    2775             :      */
    2776         426 :     if (!MyReplicationSlot->data.failover || !synchronized_standby_slots_config)
    2777         424 :         return;
    2778             : 
    2779           2 :     ConditionVariablePrepareToSleep(&WalSndCtl->wal_confirm_rcv_cv);
    2780             : 
    2781             :     for (;;)
    2782             :     {
    2783           4 :         CHECK_FOR_INTERRUPTS();
    2784             : 
    2785           4 :         if (ConfigReloadPending)
    2786             :         {
    2787           2 :             ConfigReloadPending = false;
    2788           2 :             ProcessConfigFile(PGC_SIGHUP);
    2789             :         }
    2790             : 
    2791             :         /* Exit if done waiting for every slot. */
    2792           4 :         if (StandbySlotsHaveCaughtup(wait_for_lsn, WARNING))
    2793           2 :             break;
    2794             : 
    2795             :         /*
    2796             :          * Wait for the slots in the synchronized_standby_slots to catch up,
    2797             :          * but use a timeout (1s) so we can also check if the
    2798             :          * synchronized_standby_slots has been changed.
    2799             :          */
    2800           2 :         ConditionVariableTimedSleep(&WalSndCtl->wal_confirm_rcv_cv, 1000,
    2801             :                                     WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION);
    2802             :     }
    2803             : 
    2804           2 :     ConditionVariableCancelSleep();
    2805             : }

Generated by: LCOV version 1.14