LCOV - code coverage report
Current view: top level - src/backend/replication - slot.c (source / functions) Hit Total Coverage
Test: PostgreSQL 13devel Lines: 424 502 84.5 %
Date: 2019-08-24 15:07:19 Functions: 25 25 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * slot.c
       4             :  *     Replication slot management.
       5             :  *
       6             :  *
       7             :  * Copyright (c) 2012-2019, PostgreSQL Global Development Group
       8             :  *
       9             :  *
      10             :  * IDENTIFICATION
      11             :  *    src/backend/replication/slot.c
      12             :  *
      13             :  * NOTES
      14             :  *
      15             :  * Replication slots are used to keep state about replication streams
      16             :  * originating from this cluster.  Their primary purpose is to prevent the
      17             :  * premature removal of WAL or of old tuple versions in a manner that would
      18             :  * interfere with replication; they are also useful for monitoring purposes.
      19             :  * Slots need to be permanent (to allow restarts), crash-safe, and allocatable
      20             :  * on standbys (to support cascading setups).  The requirement that slots be
      21             :  * usable on standbys precludes storing them in the system catalogs.
      22             :  *
      23             :  * Each replication slot gets its own directory inside the $PGDATA/pg_replslot
      24             :  * directory. Inside that directory the state file will contain the slot's
      25             :  * own data. Additional data can be stored alongside that file if required.
      26             :  * While the server is running, the state data is also cached in memory for
      27             :  * efficiency.
      28             :  *
      29             :  * ReplicationSlotAllocationLock must be taken in exclusive mode to allocate
      30             :  * or free a slot. ReplicationSlotControlLock must be taken in shared mode
      31             :  * to iterate over the slots, and in exclusive mode to change the in_use flag
      32             :  * of a slot.  The remaining data in each slot is protected by its mutex.
      33             :  *
      34             :  *-------------------------------------------------------------------------
      35             :  */
      36             : 
      37             : #include "postgres.h"
      38             : 
      39             : #include <unistd.h>
      40             : #include <sys/stat.h>
      41             : 
      42             : #include "access/transam.h"
      43             : #include "access/xlog_internal.h"
      44             : #include "common/string.h"
      45             : #include "miscadmin.h"
      46             : #include "pgstat.h"
      47             : #include "replication/slot.h"
      48             : #include "storage/fd.h"
      49             : #include "storage/proc.h"
      50             : #include "storage/procarray.h"
      51             : #include "utils/builtins.h"
      52             : 
      53             : /*
      54             :  * Replication slot on-disk data structure.
      55             :  */
      56             : typedef struct ReplicationSlotOnDisk
      57             : {
      58             :     /* first part of this struct needs to be version independent */
      59             : 
      60             :     /* data not covered by checksum */
      61             :     uint32      magic;
      62             :     pg_crc32c   checksum;
      63             : 
      64             :     /* data covered by checksum */
      65             :     uint32      version;
      66             :     uint32      length;
      67             : 
      68             :     /*
      69             :      * The actual data in the slot that follows can differ based on the above
      70             :      * 'version'.
      71             :      */
      72             : 
      73             :     ReplicationSlotPersistentData slotdata;
      74             : } ReplicationSlotOnDisk;
      75             : 
      76             : /* size of version independent data */
      77             : #define ReplicationSlotOnDiskConstantSize \
      78             :     offsetof(ReplicationSlotOnDisk, slotdata)
      79             : /* size of the part of the slot not covered by the checksum */
      80             : #define SnapBuildOnDiskNotChecksummedSize \
      81             :     offsetof(ReplicationSlotOnDisk, version)
      82             : /* size of the part covered by the checksum */
      83             : #define SnapBuildOnDiskChecksummedSize \
      84             :     sizeof(ReplicationSlotOnDisk) - SnapBuildOnDiskNotChecksummedSize
      85             : /* size of the slot data that is version dependent */
      86             : #define ReplicationSlotOnDiskV2Size \
      87             :     sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
      88             : 
      89             : #define SLOT_MAGIC      0x1051CA1   /* format identifier */
      90             : #define SLOT_VERSION    2       /* version for new files */
      91             : 
      92             : /* Control array for replication slot management */
      93             : ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
      94             : 
      95             : /* My backend's replication slot in the shared memory array */
      96             : ReplicationSlot *MyReplicationSlot = NULL;
      97             : 
      98             : /* GUCs */
      99             : int         max_replication_slots = 0;  /* the maximum number of replication
     100             :                                          * slots */
     101             : 
     102             : static void ReplicationSlotDropAcquired(void);
     103             : static void ReplicationSlotDropPtr(ReplicationSlot *slot);
     104             : 
     105             : /* internal persistency functions */
     106             : static void RestoreSlotFromDisk(const char *name);
     107             : static void CreateSlotOnDisk(ReplicationSlot *slot);
     108             : static void SaveSlotToPath(ReplicationSlot *slot, const char *path, int elevel);
     109             : 
     110             : /*
     111             :  * Report shared-memory space needed by ReplicationSlotsShmemInit.
     112             :  */
     113             : Size
     114        5578 : ReplicationSlotsShmemSize(void)
     115             : {
     116        5578 :     Size        size = 0;
     117             : 
     118        5578 :     if (max_replication_slots == 0)
     119           0 :         return size;
     120             : 
     121        5578 :     size = offsetof(ReplicationSlotCtlData, replication_slots);
     122        5578 :     size = add_size(size,
     123             :                     mul_size(max_replication_slots, sizeof(ReplicationSlot)));
     124             : 
     125        5578 :     return size;
     126             : }
     127             : 
     128             : /*
     129             :  * Allocate and initialize walsender-related shared memory.
     130             :  */
     131             : void
     132        1858 : ReplicationSlotsShmemInit(void)
     133             : {
     134             :     bool        found;
     135             : 
     136        1858 :     if (max_replication_slots == 0)
     137           0 :         return;
     138             : 
     139        1858 :     ReplicationSlotCtl = (ReplicationSlotCtlData *)
     140        1858 :         ShmemInitStruct("ReplicationSlot Ctl", ReplicationSlotsShmemSize(),
     141             :                         &found);
     142             : 
     143        1858 :     LWLockRegisterTranche(LWTRANCHE_REPLICATION_SLOT_IO_IN_PROGRESS,
     144             :                           "replication_slot_io");
     145             : 
     146        1858 :     if (!found)
     147             :     {
     148             :         int         i;
     149             : 
     150             :         /* First time through, so initialize */
     151        1858 :         MemSet(ReplicationSlotCtl, 0, ReplicationSlotsShmemSize());
     152             : 
     153       18990 :         for (i = 0; i < max_replication_slots; i++)
     154             :         {
     155       17132 :             ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i];
     156             : 
     157             :             /* everything else is zeroed by the memset above */
     158       17132 :             SpinLockInit(&slot->mutex);
     159       17132 :             LWLockInitialize(&slot->io_in_progress_lock, LWTRANCHE_REPLICATION_SLOT_IO_IN_PROGRESS);
     160       17132 :             ConditionVariableInit(&slot->active_cv);
     161             :         }
     162             :     }
     163             : }
     164             : 
     165             : /*
     166             :  * Check whether the passed slot name is valid and report errors at elevel.
     167             :  *
     168             :  * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow
     169             :  * the name to be used as a directory name on every supported OS.
     170             :  *
     171             :  * Returns whether the directory name is valid or not if elevel < ERROR.
     172             :  */
     173             : bool
     174         426 : ReplicationSlotValidateName(const char *name, int elevel)
     175             : {
     176             :     const char *cp;
     177             : 
     178         426 :     if (strlen(name) == 0)
     179             :     {
     180           0 :         ereport(elevel,
     181             :                 (errcode(ERRCODE_INVALID_NAME),
     182             :                  errmsg("replication slot name \"%s\" is too short",
     183             :                         name)));
     184           0 :         return false;
     185             :     }
     186             : 
     187         426 :     if (strlen(name) >= NAMEDATALEN)
     188             :     {
     189           0 :         ereport(elevel,
     190             :                 (errcode(ERRCODE_NAME_TOO_LONG),
     191             :                  errmsg("replication slot name \"%s\" is too long",
     192             :                         name)));
     193           0 :         return false;
     194             :     }
     195             : 
     196        7062 :     for (cp = name; *cp; cp++)
     197             :     {
     198        7376 :         if (!((*cp >= 'a' && *cp <= 'z')
     199        1954 :               || (*cp >= '0' && *cp <= '9')
     200         738 :               || (*cp == '_')))
     201             :         {
     202           2 :             ereport(elevel,
     203             :                     (errcode(ERRCODE_INVALID_NAME),
     204             :                      errmsg("replication slot name \"%s\" contains invalid character",
     205             :                             name),
     206             :                      errhint("Replication slot names may only contain lower case letters, numbers, and the underscore character.")));
     207           0 :             return false;
     208             :         }
     209             :     }
     210         424 :     return true;
     211             : }
     212             : 
     213             : /*
     214             :  * Create a new replication slot and mark it as used by this backend.
     215             :  *
     216             :  * name: Name of the slot
     217             :  * db_specific: logical decoding is db specific; if the slot is going to
     218             :  *     be used for that pass true, otherwise false.
     219             :  */
     220             : void
     221         376 : ReplicationSlotCreate(const char *name, bool db_specific,
     222             :                       ReplicationSlotPersistency persistency)
     223             : {
     224         376 :     ReplicationSlot *slot = NULL;
     225             :     int         i;
     226             : 
     227             :     Assert(MyReplicationSlot == NULL);
     228             : 
     229         376 :     ReplicationSlotValidateName(name, ERROR);
     230             : 
     231             :     /*
     232             :      * If some other backend ran this code concurrently with us, we'd likely
     233             :      * both allocate the same slot, and that would be bad.  We'd also be at
     234             :      * risk of missing a name collision.  Also, we don't want to try to create
     235             :      * a new slot while somebody's busy cleaning up an old one, because we
     236             :      * might both be monkeying with the same directory.
     237             :      */
     238         374 :     LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
     239             : 
     240             :     /*
     241             :      * Check for name collision, and identify an allocatable slot.  We need to
     242             :      * hold ReplicationSlotControlLock in shared mode for this, so that nobody
     243             :      * else can change the in_use flags while we're looking at them.
     244             :      */
     245         374 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     246        2204 :     for (i = 0; i < max_replication_slots; i++)
     247             :     {
     248        1836 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
     249             : 
     250        1836 :         if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
     251           6 :             ereport(ERROR,
     252             :                     (errcode(ERRCODE_DUPLICATE_OBJECT),
     253             :                      errmsg("replication slot \"%s\" already exists", name)));
     254        1830 :         if (!s->in_use && slot == NULL)
     255         366 :             slot = s;
     256             :     }
     257         368 :     LWLockRelease(ReplicationSlotControlLock);
     258             : 
     259             :     /* If all slots are in use, we're out of luck. */
     260         368 :     if (slot == NULL)
     261           2 :         ereport(ERROR,
     262             :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     263             :                  errmsg("all replication slots are in use"),
     264             :                  errhint("Free one or increase max_replication_slots.")));
     265             : 
     266             :     /*
     267             :      * Since this slot is not in use, nobody should be looking at any part of
     268             :      * it other than the in_use field unless they're trying to allocate it.
     269             :      * And since we hold ReplicationSlotAllocationLock, nobody except us can
     270             :      * be doing that.  So it's safe to initialize the slot.
     271             :      */
     272             :     Assert(!slot->in_use);
     273             :     Assert(slot->active_pid == 0);
     274             : 
     275             :     /* first initialize persistent data */
     276         366 :     memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData));
     277         366 :     StrNCpy(NameStr(slot->data.name), name, NAMEDATALEN);
     278         366 :     slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
     279         366 :     slot->data.persistency = persistency;
     280             : 
     281             :     /* and then data only present in shared memory */
     282         366 :     slot->just_dirtied = false;
     283         366 :     slot->dirty = false;
     284         366 :     slot->effective_xmin = InvalidTransactionId;
     285         366 :     slot->effective_catalog_xmin = InvalidTransactionId;
     286         366 :     slot->candidate_catalog_xmin = InvalidTransactionId;
     287         366 :     slot->candidate_xmin_lsn = InvalidXLogRecPtr;
     288         366 :     slot->candidate_restart_valid = InvalidXLogRecPtr;
     289         366 :     slot->candidate_restart_lsn = InvalidXLogRecPtr;
     290             : 
     291             :     /*
     292             :      * Create the slot on disk.  We haven't actually marked the slot allocated
     293             :      * yet, so no special cleanup is required if this errors out.
     294             :      */
     295         366 :     CreateSlotOnDisk(slot);
     296             : 
     297             :     /*
     298             :      * We need to briefly prevent any other backend from iterating over the
     299             :      * slots while we flip the in_use flag. We also need to set the active
     300             :      * flag while holding the ControlLock as otherwise a concurrent
     301             :      * ReplicationSlotAcquire() could acquire the slot as well.
     302             :      */
     303         366 :     LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
     304             : 
     305         366 :     slot->in_use = true;
     306             : 
     307             :     /* We can now mark the slot active, and that makes it our slot. */
     308         366 :     SpinLockAcquire(&slot->mutex);
     309             :     Assert(slot->active_pid == 0);
     310         366 :     slot->active_pid = MyProcPid;
     311         366 :     SpinLockRelease(&slot->mutex);
     312         366 :     MyReplicationSlot = slot;
     313             : 
     314         366 :     LWLockRelease(ReplicationSlotControlLock);
     315             : 
     316             :     /*
     317             :      * Now that the slot has been marked as in_use and active, it's safe to
     318             :      * let somebody else try to allocate a slot.
     319             :      */
     320         366 :     LWLockRelease(ReplicationSlotAllocationLock);
     321             : 
     322             :     /* Let everybody know we've modified this slot */
     323         366 :     ConditionVariableBroadcast(&slot->active_cv);
     324         366 : }
     325             : 
     326             : /*
     327             :  * Find a previously created slot and mark it as used by this backend.
     328             :  */
     329             : void
     330         544 : ReplicationSlotAcquire(const char *name, bool nowait)
     331             : {
     332             :     ReplicationSlot *slot;
     333             :     int         active_pid;
     334             :     int         i;
     335             : 
     336             : retry:
     337             :     Assert(MyReplicationSlot == NULL);
     338             : 
     339             :     /*
     340             :      * Search for the named slot and mark it active if we find it.  If the
     341             :      * slot is already active, we exit the loop with active_pid set to the PID
     342             :      * of the backend that owns it.
     343             :      */
     344         544 :     active_pid = 0;
     345         544 :     slot = NULL;
     346         544 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     347         670 :     for (i = 0; i < max_replication_slots; i++)
     348             :     {
     349         656 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
     350             : 
     351         656 :         if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
     352             :         {
     353             :             /*
     354             :              * This is the slot we want; check if it's active under some other
     355             :              * process.  In single user mode, we don't need this check.
     356             :              */
     357         530 :             if (IsUnderPostmaster)
     358             :             {
     359             :                 /*
     360             :                  * Get ready to sleep on it in case it is active.  (We may end
     361             :                  * up not sleeping, but we don't want to do this while holding
     362             :                  * the spinlock.)
     363             :                  */
     364         530 :                 ConditionVariablePrepareToSleep(&s->active_cv);
     365             : 
     366         530 :                 SpinLockAcquire(&s->mutex);
     367             : 
     368         530 :                 active_pid = s->active_pid;
     369         530 :                 if (active_pid == 0)
     370         434 :                     active_pid = s->active_pid = MyProcPid;
     371             : 
     372         530 :                 SpinLockRelease(&s->mutex);
     373             :             }
     374             :             else
     375           0 :                 active_pid = MyProcPid;
     376         530 :             slot = s;
     377             : 
     378         530 :             break;
     379             :         }
     380             :     }
     381         544 :     LWLockRelease(ReplicationSlotControlLock);
     382             : 
     383             :     /* If we did not find the slot, error out. */
     384         544 :     if (slot == NULL)
     385          14 :         ereport(ERROR,
     386             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
     387             :                  errmsg("replication slot \"%s\" does not exist", name)));
     388             : 
     389             :     /*
     390             :      * If we found the slot but it's already active in another backend, we
     391             :      * either error out or retry after a short wait, as caller specified.
     392             :      */
     393         530 :     if (active_pid != MyProcPid)
     394             :     {
     395           0 :         if (nowait)
     396           0 :             ereport(ERROR,
     397             :                     (errcode(ERRCODE_OBJECT_IN_USE),
     398             :                      errmsg("replication slot \"%s\" is active for PID %d",
     399             :                             name, active_pid)));
     400             : 
     401             :         /* Wait here until we get signaled, and then restart */
     402           0 :         ConditionVariableSleep(&slot->active_cv,
     403             :                                WAIT_EVENT_REPLICATION_SLOT_DROP);
     404           0 :         ConditionVariableCancelSleep();
     405           0 :         goto retry;
     406             :     }
     407             :     else
     408         530 :         ConditionVariableCancelSleep(); /* no sleep needed after all */
     409             : 
     410             :     /* Let everybody know we've modified this slot */
     411         530 :     ConditionVariableBroadcast(&slot->active_cv);
     412             : 
     413             :     /* We made this slot active, so it's ours now. */
     414         530 :     MyReplicationSlot = slot;
     415         530 : }
     416             : 
     417             : /*
     418             :  * Release the replication slot that this backend considers to own.
     419             :  *
     420             :  * This or another backend can re-acquire the slot later.
     421             :  * Resources this slot requires will be preserved.
     422             :  */
     423             : void
     424         752 : ReplicationSlotRelease(void)
     425             : {
     426         752 :     ReplicationSlot *slot = MyReplicationSlot;
     427             : 
     428             :     Assert(slot != NULL && slot->active_pid != 0);
     429             : 
     430         752 :     if (slot->data.persistency == RS_EPHEMERAL)
     431             :     {
     432             :         /*
     433             :          * Delete the slot. There is no !PANIC case where this is allowed to
     434             :          * fail, all that may happen is an incomplete cleanup of the on-disk
     435             :          * data.
     436             :          */
     437           6 :         ReplicationSlotDropAcquired();
     438             :     }
     439             : 
     440             :     /*
     441             :      * If slot needed to temporarily restrain both data and catalog xmin to
     442             :      * create the catalog snapshot, remove that temporary constraint.
     443             :      * Snapshots can only be exported while the initial snapshot is still
     444             :      * acquired.
     445             :      */
     446        1504 :     if (!TransactionIdIsValid(slot->data.xmin) &&
     447         752 :         TransactionIdIsValid(slot->effective_xmin))
     448             :     {
     449          72 :         SpinLockAcquire(&slot->mutex);
     450          72 :         slot->effective_xmin = InvalidTransactionId;
     451          72 :         SpinLockRelease(&slot->mutex);
     452          72 :         ReplicationSlotsComputeRequiredXmin(false);
     453             :     }
     454             : 
     455         752 :     if (slot->data.persistency == RS_PERSISTENT)
     456             :     {
     457             :         /*
     458             :          * Mark persistent slot inactive.  We're not freeing it, just
     459             :          * disconnecting, but wake up others that may be waiting for it.
     460             :          */
     461         480 :         SpinLockAcquire(&slot->mutex);
     462         480 :         slot->active_pid = 0;
     463         480 :         SpinLockRelease(&slot->mutex);
     464         480 :         ConditionVariableBroadcast(&slot->active_cv);
     465             :     }
     466             : 
     467         752 :     MyReplicationSlot = NULL;
     468             : 
     469             :     /* might not have been set when we've been a plain slot */
     470         752 :     LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
     471         752 :     MyPgXact->vacuumFlags &= ~PROC_IN_LOGICAL_DECODING;
     472         752 :     LWLockRelease(ProcArrayLock);
     473         752 : }
     474             : 
     475             : /*
     476             :  * Cleanup all temporary slots created in current session.
     477             :  */
     478             : void
     479       27812 : ReplicationSlotCleanup(void)
     480             : {
     481             :     int         i;
     482             : 
     483             :     Assert(MyReplicationSlot == NULL);
     484             : 
     485             : restart:
     486       27812 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     487      291160 :     for (i = 0; i < max_replication_slots; i++)
     488             :     {
     489      263518 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
     490             : 
     491      263518 :         if (!s->in_use)
     492      262170 :             continue;
     493             : 
     494        1348 :         SpinLockAcquire(&s->mutex);
     495        1348 :         if (s->active_pid == MyProcPid)
     496             :         {
     497             :             Assert(s->data.persistency == RS_TEMPORARY);
     498         170 :             SpinLockRelease(&s->mutex);
     499         170 :             LWLockRelease(ReplicationSlotControlLock);  /* avoid deadlock */
     500             : 
     501         170 :             ReplicationSlotDropPtr(s);
     502             : 
     503         170 :             ConditionVariableBroadcast(&s->active_cv);
     504         170 :             goto restart;
     505             :         }
     506             :         else
     507        1178 :             SpinLockRelease(&s->mutex);
     508             :     }
     509             : 
     510       27642 :     LWLockRelease(ReplicationSlotControlLock);
     511       27642 : }
     512             : 
     513             : /*
     514             :  * Permanently drop replication slot identified by the passed in name.
     515             :  */
     516             : void
     517         154 : ReplicationSlotDrop(const char *name, bool nowait)
     518             : {
     519             :     Assert(MyReplicationSlot == NULL);
     520             : 
     521         154 :     ReplicationSlotAcquire(name, nowait);
     522             : 
     523         144 :     ReplicationSlotDropAcquired();
     524         144 : }
     525             : 
     526             : /*
     527             :  * Permanently drop the currently acquired replication slot.
     528             :  */
     529             : static void
     530         156 : ReplicationSlotDropAcquired(void)
     531             : {
     532         156 :     ReplicationSlot *slot = MyReplicationSlot;
     533             : 
     534             :     Assert(MyReplicationSlot != NULL);
     535             : 
     536             :     /* slot isn't acquired anymore */
     537         156 :     MyReplicationSlot = NULL;
     538             : 
     539         156 :     ReplicationSlotDropPtr(slot);
     540         156 : }
     541             : 
     542             : /*
     543             :  * Permanently drop the replication slot which will be released by the point
     544             :  * this function returns.
     545             :  */
     546             : static void
     547         326 : ReplicationSlotDropPtr(ReplicationSlot *slot)
     548             : {
     549             :     char        path[MAXPGPATH];
     550             :     char        tmppath[MAXPGPATH];
     551             : 
     552             :     /*
     553             :      * If some other backend ran this code concurrently with us, we might try
     554             :      * to delete a slot with a certain name while someone else was trying to
     555             :      * create a slot with the same name.
     556             :      */
     557         326 :     LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
     558             : 
     559             :     /* Generate pathnames. */
     560         326 :     sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
     561         326 :     sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));
     562             : 
     563             :     /*
     564             :      * Rename the slot directory on disk, so that we'll no longer recognize
     565             :      * this as a valid slot.  Note that if this fails, we've got to mark the
     566             :      * slot inactive before bailing out.  If we're dropping an ephemeral or a
     567             :      * temporary slot, we better never fail hard as the caller won't expect
     568             :      * the slot to survive and this might get called during error handling.
     569             :      */
     570         326 :     if (rename(path, tmppath) == 0)
     571             :     {
     572             :         /*
     573             :          * We need to fsync() the directory we just renamed and its parent to
     574             :          * make sure that our changes are on disk in a crash-safe fashion.  If
     575             :          * fsync() fails, we can't be sure whether the changes are on disk or
     576             :          * not.  For now, we handle that by panicking;
     577             :          * StartupReplicationSlots() will try to straighten it out after
     578             :          * restart.
     579             :          */
     580         326 :         START_CRIT_SECTION();
     581         326 :         fsync_fname(tmppath, true);
     582         326 :         fsync_fname("pg_replslot", true);
     583         326 :         END_CRIT_SECTION();
     584             :     }
     585             :     else
     586             :     {
     587           0 :         bool        fail_softly = slot->data.persistency != RS_PERSISTENT;
     588             : 
     589           0 :         SpinLockAcquire(&slot->mutex);
     590           0 :         slot->active_pid = 0;
     591           0 :         SpinLockRelease(&slot->mutex);
     592             : 
     593             :         /* wake up anyone waiting on this slot */
     594           0 :         ConditionVariableBroadcast(&slot->active_cv);
     595             : 
     596           0 :         ereport(fail_softly ? WARNING : ERROR,
     597             :                 (errcode_for_file_access(),
     598             :                  errmsg("could not rename file \"%s\" to \"%s\": %m",
     599             :                         path, tmppath)));
     600             :     }
     601             : 
     602             :     /*
     603             :      * The slot is definitely gone.  Lock out concurrent scans of the array
     604             :      * long enough to kill it.  It's OK to clear the active PID here without
     605             :      * grabbing the mutex because nobody else can be scanning the array here,
     606             :      * and nobody can be attached to this slot and thus access it without
     607             :      * scanning the array.
     608             :      *
     609             :      * Also wake up processes waiting for it.
     610             :      */
     611         326 :     LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
     612         326 :     slot->active_pid = 0;
     613         326 :     slot->in_use = false;
     614         326 :     LWLockRelease(ReplicationSlotControlLock);
     615         326 :     ConditionVariableBroadcast(&slot->active_cv);
     616             : 
     617             :     /*
     618             :      * Slot is dead and doesn't prevent resource removal anymore, recompute
     619             :      * limits.
     620             :      */
     621         326 :     ReplicationSlotsComputeRequiredXmin(false);
     622         326 :     ReplicationSlotsComputeRequiredLSN();
     623             : 
     624             :     /*
     625             :      * If removing the directory fails, the worst thing that will happen is
     626             :      * that the user won't be able to create a new slot with the same name
     627             :      * until the next server restart.  We warn about it, but that's all.
     628             :      */
     629         326 :     if (!rmtree(tmppath, true))
     630           0 :         ereport(WARNING,
     631             :                 (errmsg("could not remove directory \"%s\"", tmppath)));
     632             : 
     633             :     /*
     634             :      * We release this at the very end, so that nobody starts trying to create
     635             :      * a slot while we're still cleaning up the detritus of the old one.
     636             :      */
     637         326 :     LWLockRelease(ReplicationSlotAllocationLock);
     638         326 : }
     639             : 
     640             : /*
     641             :  * Serialize the currently acquired slot's state from memory to disk, thereby
     642             :  * guaranteeing the current state will survive a crash.
     643             :  */
     644             : void
     645         544 : ReplicationSlotSave(void)
     646             : {
     647             :     char        path[MAXPGPATH];
     648             : 
     649             :     Assert(MyReplicationSlot != NULL);
     650             : 
     651         544 :     sprintf(path, "pg_replslot/%s", NameStr(MyReplicationSlot->data.name));
     652         544 :     SaveSlotToPath(MyReplicationSlot, path, ERROR);
     653         544 : }
     654             : 
     655             : /*
     656             :  * Signal that it would be useful if the currently acquired slot would be
     657             :  * flushed out to disk.
     658             :  *
     659             :  * Note that the actual flush to disk can be delayed for a long time, if
     660             :  * required for correctness explicitly do a ReplicationSlotSave().
     661             :  */
     662             : void
     663         934 : ReplicationSlotMarkDirty(void)
     664             : {
     665         934 :     ReplicationSlot *slot = MyReplicationSlot;
     666             : 
     667             :     Assert(MyReplicationSlot != NULL);
     668             : 
     669         934 :     SpinLockAcquire(&slot->mutex);
     670         934 :     MyReplicationSlot->just_dirtied = true;
     671         934 :     MyReplicationSlot->dirty = true;
     672         934 :     SpinLockRelease(&slot->mutex);
     673         934 : }
     674             : 
     675             : /*
     676             :  * Convert a slot that's marked as RS_EPHEMERAL to a RS_PERSISTENT slot,
     677             :  * guaranteeing it will be there after an eventual crash.
     678             :  */
     679             : void
     680         162 : ReplicationSlotPersist(void)
     681             : {
     682         162 :     ReplicationSlot *slot = MyReplicationSlot;
     683             : 
     684             :     Assert(slot != NULL);
     685             :     Assert(slot->data.persistency != RS_PERSISTENT);
     686             : 
     687         162 :     SpinLockAcquire(&slot->mutex);
     688         162 :     slot->data.persistency = RS_PERSISTENT;
     689         162 :     SpinLockRelease(&slot->mutex);
     690             : 
     691         162 :     ReplicationSlotMarkDirty();
     692         162 :     ReplicationSlotSave();
     693         162 : }
     694             : 
     695             : /*
     696             :  * Compute the oldest xmin across all slots and store it in the ProcArray.
     697             :  *
     698             :  * If already_locked is true, ProcArrayLock has already been acquired
     699             :  * exclusively.
     700             :  */
     701             : void
     702        1848 : ReplicationSlotsComputeRequiredXmin(bool already_locked)
     703             : {
     704             :     int         i;
     705        1848 :     TransactionId agg_xmin = InvalidTransactionId;
     706        1848 :     TransactionId agg_catalog_xmin = InvalidTransactionId;
     707             : 
     708             :     Assert(ReplicationSlotCtl != NULL);
     709             : 
     710        1848 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     711             : 
     712       15538 :     for (i = 0; i < max_replication_slots; i++)
     713             :     {
     714       13690 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
     715             :         TransactionId effective_xmin;
     716             :         TransactionId effective_catalog_xmin;
     717             : 
     718       13690 :         if (!s->in_use)
     719       12858 :             continue;
     720             : 
     721         832 :         SpinLockAcquire(&s->mutex);
     722         832 :         effective_xmin = s->effective_xmin;
     723         832 :         effective_catalog_xmin = s->effective_catalog_xmin;
     724         832 :         SpinLockRelease(&s->mutex);
     725             : 
     726             :         /* check the data xmin */
     727         832 :         if (TransactionIdIsValid(effective_xmin) &&
     728           0 :             (!TransactionIdIsValid(agg_xmin) ||
     729           0 :              TransactionIdPrecedes(effective_xmin, agg_xmin)))
     730          76 :             agg_xmin = effective_xmin;
     731             : 
     732             :         /* check the catalog xmin */
     733         832 :         if (TransactionIdIsValid(effective_catalog_xmin) &&
     734         298 :             (!TransactionIdIsValid(agg_catalog_xmin) ||
     735         298 :              TransactionIdPrecedes(effective_catalog_xmin, agg_catalog_xmin)))
     736         464 :             agg_catalog_xmin = effective_catalog_xmin;
     737             :     }
     738             : 
     739        1848 :     LWLockRelease(ReplicationSlotControlLock);
     740             : 
     741        1848 :     ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
     742        1848 : }
     743             : 
     744             : /*
     745             :  * Compute the oldest restart LSN across all slots and inform xlog module.
     746             :  */
     747             : void
     748        1936 : ReplicationSlotsComputeRequiredLSN(void)
     749             : {
     750             :     int         i;
     751        1936 :     XLogRecPtr  min_required = InvalidXLogRecPtr;
     752             : 
     753             :     Assert(ReplicationSlotCtl != NULL);
     754             : 
     755        1936 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     756       16414 :     for (i = 0; i < max_replication_slots; i++)
     757             :     {
     758       14478 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
     759             :         XLogRecPtr  restart_lsn;
     760             : 
     761       14478 :         if (!s->in_use)
     762       13634 :             continue;
     763             : 
     764         844 :         SpinLockAcquire(&s->mutex);
     765         844 :         restart_lsn = s->data.restart_lsn;
     766         844 :         SpinLockRelease(&s->mutex);
     767             : 
     768         844 :         if (restart_lsn != InvalidXLogRecPtr &&
     769         252 :             (min_required == InvalidXLogRecPtr ||
     770             :              restart_lsn < min_required))
     771         594 :             min_required = restart_lsn;
     772             :     }
     773        1936 :     LWLockRelease(ReplicationSlotControlLock);
     774             : 
     775        1936 :     XLogSetReplicationSlotMinimumLSN(min_required);
     776        1936 : }
     777             : 
     778             : /*
     779             :  * Compute the oldest WAL LSN required by *logical* decoding slots..
     780             :  *
     781             :  * Returns InvalidXLogRecPtr if logical decoding is disabled or no logical
     782             :  * slots exist.
     783             :  *
     784             :  * NB: this returns a value >= ReplicationSlotsComputeRequiredLSN(), since it
     785             :  * ignores physical replication slots.
     786             :  *
     787             :  * The results aren't required frequently, so we don't maintain a precomputed
     788             :  * value like we do for ComputeRequiredLSN() and ComputeRequiredXmin().
     789             :  */
     790             : XLogRecPtr
     791        5680 : ReplicationSlotsComputeLogicalRestartLSN(void)
     792             : {
     793        5680 :     XLogRecPtr  result = InvalidXLogRecPtr;
     794             :     int         i;
     795             : 
     796        5680 :     if (max_replication_slots <= 0)
     797           0 :         return InvalidXLogRecPtr;
     798             : 
     799        5680 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     800             : 
     801       59644 :     for (i = 0; i < max_replication_slots; i++)
     802             :     {
     803             :         ReplicationSlot *s;
     804             :         XLogRecPtr  restart_lsn;
     805             : 
     806       53964 :         s = &ReplicationSlotCtl->replication_slots[i];
     807             : 
     808             :         /* cannot change while ReplicationSlotCtlLock is held */
     809       53964 :         if (!s->in_use)
     810       53832 :             continue;
     811             : 
     812             :         /* we're only interested in logical slots */
     813         132 :         if (!SlotIsLogical(s))
     814          60 :             continue;
     815             : 
     816             :         /* read once, it's ok if it increases while we're checking */
     817          72 :         SpinLockAcquire(&s->mutex);
     818          72 :         restart_lsn = s->data.restart_lsn;
     819          72 :         SpinLockRelease(&s->mutex);
     820             : 
     821          72 :         if (result == InvalidXLogRecPtr ||
     822             :             restart_lsn < result)
     823          72 :             result = restart_lsn;
     824             :     }
     825             : 
     826        5680 :     LWLockRelease(ReplicationSlotControlLock);
     827             : 
     828        5680 :     return result;
     829             : }
     830             : 
     831             : /*
     832             :  * ReplicationSlotsCountDBSlots -- count the number of slots that refer to the
     833             :  * passed database oid.
     834             :  *
     835             :  * Returns true if there are any slots referencing the database. *nslots will
     836             :  * be set to the absolute number of slots in the database, *nactive to ones
     837             :  * currently active.
     838             :  */
     839             : bool
     840          18 : ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
     841             : {
     842             :     int         i;
     843             : 
     844          18 :     *nslots = *nactive = 0;
     845             : 
     846          18 :     if (max_replication_slots <= 0)
     847           0 :         return false;
     848             : 
     849          18 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     850         164 :     for (i = 0; i < max_replication_slots; i++)
     851             :     {
     852             :         ReplicationSlot *s;
     853             : 
     854         146 :         s = &ReplicationSlotCtl->replication_slots[i];
     855             : 
     856             :         /* cannot change while ReplicationSlotCtlLock is held */
     857         146 :         if (!s->in_use)
     858         132 :             continue;
     859             : 
     860             :         /* only logical slots are database specific, skip */
     861          14 :         if (!SlotIsLogical(s))
     862           2 :             continue;
     863             : 
     864             :         /* not our database, skip */
     865          12 :         if (s->data.database != dboid)
     866           6 :             continue;
     867             : 
     868             :         /* count slots with spinlock held */
     869           6 :         SpinLockAcquire(&s->mutex);
     870           6 :         (*nslots)++;
     871           6 :         if (s->active_pid != 0)
     872           2 :             (*nactive)++;
     873           6 :         SpinLockRelease(&s->mutex);
     874             :     }
     875          18 :     LWLockRelease(ReplicationSlotControlLock);
     876             : 
     877          18 :     if (*nslots > 0)
     878           6 :         return true;
     879          12 :     return false;
     880             : }
     881             : 
     882             : /*
     883             :  * ReplicationSlotsDropDBSlots -- Drop all db-specific slots relating to the
     884             :  * passed database oid. The caller should hold an exclusive lock on the
     885             :  * pg_database oid for the database to prevent creation of new slots on the db
     886             :  * or replay from existing slots.
     887             :  *
     888             :  * Another session that concurrently acquires an existing slot on the target DB
     889             :  * (most likely to drop it) may cause this function to ERROR. If that happens
     890             :  * it may have dropped some but not all slots.
     891             :  *
     892             :  * This routine isn't as efficient as it could be - but we don't drop
     893             :  * databases often, especially databases with lots of slots.
     894             :  */
     895             : void
     896          18 : ReplicationSlotsDropDBSlots(Oid dboid)
     897             : {
     898             :     int         i;
     899             : 
     900          18 :     if (max_replication_slots <= 0)
     901           0 :         return;
     902             : 
     903             : restart:
     904          24 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     905         172 :     for (i = 0; i < max_replication_slots; i++)
     906             :     {
     907             :         ReplicationSlot *s;
     908             :         char       *slotname;
     909             :         int         active_pid;
     910             : 
     911         154 :         s = &ReplicationSlotCtl->replication_slots[i];
     912             : 
     913             :         /* cannot change while ReplicationSlotCtlLock is held */
     914         154 :         if (!s->in_use)
     915         134 :             continue;
     916             : 
     917             :         /* only logical slots are database specific, skip */
     918          20 :         if (!SlotIsLogical(s))
     919           2 :             continue;
     920             : 
     921             :         /* not our database, skip */
     922          18 :         if (s->data.database != dboid)
     923          12 :             continue;
     924             : 
     925             :         /* acquire slot, so ReplicationSlotDropAcquired can be reused  */
     926           6 :         SpinLockAcquire(&s->mutex);
     927             :         /* can't change while ReplicationSlotControlLock is held */
     928           6 :         slotname = NameStr(s->data.name);
     929           6 :         active_pid = s->active_pid;
     930           6 :         if (active_pid == 0)
     931             :         {
     932           6 :             MyReplicationSlot = s;
     933           6 :             s->active_pid = MyProcPid;
     934             :         }
     935           6 :         SpinLockRelease(&s->mutex);
     936             : 
     937             :         /*
     938             :          * Even though we hold an exclusive lock on the database object a
     939             :          * logical slot for that DB can still be active, e.g. if it's
     940             :          * concurrently being dropped by a backend connected to another DB.
     941             :          *
     942             :          * That's fairly unlikely in practice, so we'll just bail out.
     943             :          */
     944           6 :         if (active_pid)
     945           0 :             ereport(ERROR,
     946             :                     (errcode(ERRCODE_OBJECT_IN_USE),
     947             :                      errmsg("replication slot \"%s\" is active for PID %d",
     948             :                             slotname, active_pid)));
     949             : 
     950             :         /*
     951             :          * To avoid duplicating ReplicationSlotDropAcquired() and to avoid
     952             :          * holding ReplicationSlotControlLock over filesystem operations,
     953             :          * release ReplicationSlotControlLock and use
     954             :          * ReplicationSlotDropAcquired.
     955             :          *
     956             :          * As that means the set of slots could change, restart scan from the
     957             :          * beginning each time we release the lock.
     958             :          */
     959           6 :         LWLockRelease(ReplicationSlotControlLock);
     960           6 :         ReplicationSlotDropAcquired();
     961           6 :         goto restart;
     962             :     }
     963          18 :     LWLockRelease(ReplicationSlotControlLock);
     964             : }
     965             : 
     966             : 
     967             : /*
     968             :  * Check whether the server's configuration supports using replication
     969             :  * slots.
     970             :  */
     971             : void
     972         732 : CheckSlotRequirements(void)
     973             : {
     974             :     /*
     975             :      * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
     976             :      * needs the same check.
     977             :      */
     978             : 
     979         732 :     if (max_replication_slots == 0)
     980           0 :         ereport(ERROR,
     981             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     982             :                  (errmsg("replication slots can only be used if max_replication_slots > 0"))));
     983             : 
     984         732 :     if (wal_level < WAL_LEVEL_REPLICA)
     985           0 :         ereport(ERROR,
     986             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     987             :                  errmsg("replication slots can only be used if wal_level >= replica")));
     988         732 : }
     989             : 
     990             : /*
     991             :  * Reserve WAL for the currently active slot.
     992             :  *
     993             :  * Compute and set restart_lsn in a manner that's appropriate for the type of
     994             :  * the slot and concurrency safe.
     995             :  */
     996             : void
     997         326 : ReplicationSlotReserveWal(void)
     998             : {
     999         326 :     ReplicationSlot *slot = MyReplicationSlot;
    1000             : 
    1001             :     Assert(slot != NULL);
    1002             :     Assert(slot->data.restart_lsn == InvalidXLogRecPtr);
    1003             : 
    1004             :     /*
    1005             :      * The replication slot mechanism is used to prevent removal of required
    1006             :      * WAL. As there is no interlock between this routine and checkpoints, WAL
    1007             :      * segments could concurrently be removed when a now stale return value of
    1008             :      * ReplicationSlotsComputeRequiredLSN() is used. In the unlikely case that
    1009             :      * this happens we'll just retry.
    1010             :      */
    1011             :     while (true)
    1012           0 :     {
    1013             :         XLogSegNo   segno;
    1014             :         XLogRecPtr  restart_lsn;
    1015             : 
    1016             :         /*
    1017             :          * For logical slots log a standby snapshot and start logical decoding
    1018             :          * at exactly that position. That allows the slot to start up more
    1019             :          * quickly.
    1020             :          *
    1021             :          * That's not needed (or indeed helpful) for physical slots as they'll
    1022             :          * start replay at the last logged checkpoint anyway. Instead return
    1023             :          * the location of the last redo LSN. While that slightly increases
    1024             :          * the chance that we have to retry, it's where a base backup has to
    1025             :          * start replay at.
    1026             :          */
    1027         326 :         if (!RecoveryInProgress() && SlotIsLogical(slot))
    1028         240 :         {
    1029             :             XLogRecPtr  flushptr;
    1030             : 
    1031             :             /* start at current insert position */
    1032         240 :             restart_lsn = GetXLogInsertRecPtr();
    1033         240 :             SpinLockAcquire(&slot->mutex);
    1034         240 :             slot->data.restart_lsn = restart_lsn;
    1035         240 :             SpinLockRelease(&slot->mutex);
    1036             : 
    1037             :             /* make sure we have enough information to start */
    1038         240 :             flushptr = LogStandbySnapshot();
    1039             : 
    1040             :             /* and make sure it's fsynced to disk */
    1041         240 :             XLogFlush(flushptr);
    1042             :         }
    1043             :         else
    1044             :         {
    1045          86 :             restart_lsn = GetRedoRecPtr();
    1046          86 :             SpinLockAcquire(&slot->mutex);
    1047          86 :             slot->data.restart_lsn = restart_lsn;
    1048          86 :             SpinLockRelease(&slot->mutex);
    1049             :         }
    1050             : 
    1051             :         /* prevent WAL removal as fast as possible */
    1052         326 :         ReplicationSlotsComputeRequiredLSN();
    1053             : 
    1054             :         /*
    1055             :          * If all required WAL is still there, great, otherwise retry. The
    1056             :          * slot should prevent further removal of WAL, unless there's a
    1057             :          * concurrent ReplicationSlotsComputeRequiredLSN() after we've written
    1058             :          * the new restart_lsn above, so normally we should never need to loop
    1059             :          * more than twice.
    1060             :          */
    1061         326 :         XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size);
    1062         326 :         if (XLogGetLastRemovedSegno() < segno)
    1063         326 :             break;
    1064             :     }
    1065         326 : }
    1066             : 
    1067             : /*
    1068             :  * Flush all replication slots to disk.
    1069             :  *
    1070             :  * This needn't actually be part of a checkpoint, but it's a convenient
    1071             :  * location.
    1072             :  */
    1073             : void
    1074        2840 : CheckPointReplicationSlots(void)
    1075             : {
    1076             :     int         i;
    1077             : 
    1078        2840 :     elog(DEBUG1, "performing replication slot checkpoint");
    1079             : 
    1080             :     /*
    1081             :      * Prevent any slot from being created/dropped while we're active. As we
    1082             :      * explicitly do *not* want to block iterating over replication_slots or
    1083             :      * acquiring a slot we cannot take the control lock - but that's OK,
    1084             :      * because holding ReplicationSlotAllocationLock is strictly stronger, and
    1085             :      * enough to guarantee that nobody can change the in_use bits on us.
    1086             :      */
    1087        2840 :     LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
    1088             : 
    1089       29822 :     for (i = 0; i < max_replication_slots; i++)
    1090             :     {
    1091       26982 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
    1092             :         char        path[MAXPGPATH];
    1093             : 
    1094       26982 :         if (!s->in_use)
    1095       26916 :             continue;
    1096             : 
    1097             :         /* save the slot to disk, locking is handled in SaveSlotToPath() */
    1098          66 :         sprintf(path, "pg_replslot/%s", NameStr(s->data.name));
    1099          66 :         SaveSlotToPath(s, path, LOG);
    1100             :     }
    1101        2840 :     LWLockRelease(ReplicationSlotAllocationLock);
    1102        2840 : }
    1103             : 
    1104             : /*
    1105             :  * Load all replication slots from disk into memory at server startup. This
    1106             :  * needs to be run before we start crash recovery.
    1107             :  */
    1108             : void
    1109        1162 : StartupReplicationSlots(void)
    1110             : {
    1111             :     DIR        *replication_dir;
    1112             :     struct dirent *replication_de;
    1113             : 
    1114        1162 :     elog(DEBUG1, "starting up replication slots");
    1115             : 
    1116             :     /* restore all slots by iterating over all on-disk entries */
    1117        1162 :     replication_dir = AllocateDir("pg_replslot");
    1118        4668 :     while ((replication_de = ReadDir(replication_dir, "pg_replslot")) != NULL)
    1119             :     {
    1120             :         struct stat statbuf;
    1121             :         char        path[MAXPGPATH + 12];
    1122             : 
    1123        3526 :         if (strcmp(replication_de->d_name, ".") == 0 ||
    1124        1182 :             strcmp(replication_de->d_name, "..") == 0)
    1125        4648 :             continue;
    1126             : 
    1127          20 :         snprintf(path, sizeof(path), "pg_replslot/%s", replication_de->d_name);
    1128             : 
    1129             :         /* we're only creating directories here, skip if it's not our's */
    1130          20 :         if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
    1131           0 :             continue;
    1132             : 
    1133             :         /* we crashed while a slot was being setup or deleted, clean up */
    1134          20 :         if (pg_str_endswith(replication_de->d_name, ".tmp"))
    1135             :         {
    1136           0 :             if (!rmtree(path, true))
    1137             :             {
    1138           0 :                 ereport(WARNING,
    1139             :                         (errmsg("could not remove directory \"%s\"",
    1140             :                                 path)));
    1141           0 :                 continue;
    1142             :             }
    1143           0 :             fsync_fname("pg_replslot", true);
    1144           0 :             continue;
    1145             :         }
    1146             : 
    1147             :         /* looks like a slot in a normal state, restore */
    1148          20 :         RestoreSlotFromDisk(replication_de->d_name);
    1149             :     }
    1150        1162 :     FreeDir(replication_dir);
    1151             : 
    1152             :     /* currently no slots exist, we're done. */
    1153        1162 :     if (max_replication_slots <= 0)
    1154           0 :         return;
    1155             : 
    1156             :     /* Now that we have recovered all the data, compute replication xmin */
    1157        1162 :     ReplicationSlotsComputeRequiredXmin(false);
    1158        1162 :     ReplicationSlotsComputeRequiredLSN();
    1159             : }
    1160             : 
    1161             : /* ----
    1162             :  * Manipulation of on-disk state of replication slots
    1163             :  *
    1164             :  * NB: none of the routines below should take any notice whether a slot is the
    1165             :  * current one or not, that's all handled a layer above.
    1166             :  * ----
    1167             :  */
    1168             : static void
    1169         366 : CreateSlotOnDisk(ReplicationSlot *slot)
    1170             : {
    1171             :     char        tmppath[MAXPGPATH];
    1172             :     char        path[MAXPGPATH];
    1173             :     struct stat st;
    1174             : 
    1175             :     /*
    1176             :      * No need to take out the io_in_progress_lock, nobody else can see this
    1177             :      * slot yet, so nobody else will write. We're reusing SaveSlotToPath which
    1178             :      * takes out the lock, if we'd take the lock here, we'd deadlock.
    1179             :      */
    1180             : 
    1181         366 :     sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
    1182         366 :     sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));
    1183             : 
    1184             :     /*
    1185             :      * It's just barely possible that some previous effort to create or drop a
    1186             :      * slot with this name left a temp directory lying around. If that seems
    1187             :      * to be the case, try to remove it.  If the rmtree() fails, we'll error
    1188             :      * out at the MakePGDirectory() below, so we don't bother checking
    1189             :      * success.
    1190             :      */
    1191         366 :     if (stat(tmppath, &st) == 0 && S_ISDIR(st.st_mode))
    1192           0 :         rmtree(tmppath, true);
    1193             : 
    1194             :     /* Create and fsync the temporary slot directory. */
    1195         366 :     if (MakePGDirectory(tmppath) < 0)
    1196           0 :         ereport(ERROR,
    1197             :                 (errcode_for_file_access(),
    1198             :                  errmsg("could not create directory \"%s\": %m",
    1199             :                         tmppath)));
    1200         366 :     fsync_fname(tmppath, true);
    1201             : 
    1202             :     /* Write the actual state file. */
    1203         366 :     slot->dirty = true;          /* signal that we really need to write */
    1204         366 :     SaveSlotToPath(slot, tmppath, ERROR);
    1205             : 
    1206             :     /* Rename the directory into place. */
    1207         366 :     if (rename(tmppath, path) != 0)
    1208           0 :         ereport(ERROR,
    1209             :                 (errcode_for_file_access(),
    1210             :                  errmsg("could not rename file \"%s\" to \"%s\": %m",
    1211             :                         tmppath, path)));
    1212             : 
    1213             :     /*
    1214             :      * If we'd now fail - really unlikely - we wouldn't know whether this slot
    1215             :      * would persist after an OS crash or not - so, force a restart. The
    1216             :      * restart would try to fsync this again till it works.
    1217             :      */
    1218         366 :     START_CRIT_SECTION();
    1219             : 
    1220         366 :     fsync_fname(path, true);
    1221         366 :     fsync_fname("pg_replslot", true);
    1222             : 
    1223         366 :     END_CRIT_SECTION();
    1224         366 : }
    1225             : 
    1226             : /*
    1227             :  * Shared functionality between saving and creating a replication slot.
    1228             :  */
    1229             : static void
    1230         976 : SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
    1231             : {
    1232             :     char        tmppath[MAXPGPATH];
    1233             :     char        path[MAXPGPATH];
    1234             :     int         fd;
    1235             :     ReplicationSlotOnDisk cp;
    1236             :     bool        was_dirty;
    1237             : 
    1238             :     /* first check whether there's something to write out */
    1239         976 :     SpinLockAcquire(&slot->mutex);
    1240         976 :     was_dirty = slot->dirty;
    1241         976 :     slot->just_dirtied = false;
    1242         976 :     SpinLockRelease(&slot->mutex);
    1243             : 
    1244             :     /* and don't do anything if there's nothing to write */
    1245         976 :     if (!was_dirty)
    1246         104 :         return;
    1247             : 
    1248         924 :     LWLockAcquire(&slot->io_in_progress_lock, LW_EXCLUSIVE);
    1249             : 
    1250             :     /* silence valgrind :( */
    1251         924 :     memset(&cp, 0, sizeof(ReplicationSlotOnDisk));
    1252             : 
    1253         924 :     sprintf(tmppath, "%s/state.tmp", dir);
    1254         924 :     sprintf(path, "%s/state", dir);
    1255             : 
    1256         924 :     fd = OpenTransientFile(tmppath, O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
    1257         924 :     if (fd < 0)
    1258             :     {
    1259           0 :         ereport(elevel,
    1260             :                 (errcode_for_file_access(),
    1261             :                  errmsg("could not create file \"%s\": %m",
    1262             :                         tmppath)));
    1263           0 :         return;
    1264             :     }
    1265             : 
    1266         924 :     cp.magic = SLOT_MAGIC;
    1267         924 :     INIT_CRC32C(cp.checksum);
    1268         924 :     cp.version = SLOT_VERSION;
    1269         924 :     cp.length = ReplicationSlotOnDiskV2Size;
    1270             : 
    1271         924 :     SpinLockAcquire(&slot->mutex);
    1272             : 
    1273         924 :     memcpy(&cp.slotdata, &slot->data, sizeof(ReplicationSlotPersistentData));
    1274             : 
    1275         924 :     SpinLockRelease(&slot->mutex);
    1276             : 
    1277         924 :     COMP_CRC32C(cp.checksum,
    1278             :                 (char *) (&cp) + SnapBuildOnDiskNotChecksummedSize,
    1279             :                 SnapBuildOnDiskChecksummedSize);
    1280         924 :     FIN_CRC32C(cp.checksum);
    1281             : 
    1282         924 :     errno = 0;
    1283         924 :     pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_WRITE);
    1284         924 :     if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
    1285             :     {
    1286           0 :         int         save_errno = errno;
    1287             : 
    1288           0 :         pgstat_report_wait_end();
    1289           0 :         CloseTransientFile(fd);
    1290             : 
    1291             :         /* if write didn't set errno, assume problem is no disk space */
    1292           0 :         errno = save_errno ? save_errno : ENOSPC;
    1293           0 :         ereport(elevel,
    1294             :                 (errcode_for_file_access(),
    1295             :                  errmsg("could not write to file \"%s\": %m",
    1296             :                         tmppath)));
    1297           0 :         return;
    1298             :     }
    1299         924 :     pgstat_report_wait_end();
    1300             : 
    1301             :     /* fsync the temporary file */
    1302         924 :     pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_SYNC);
    1303         924 :     if (pg_fsync(fd) != 0)
    1304             :     {
    1305           0 :         int         save_errno = errno;
    1306             : 
    1307           0 :         pgstat_report_wait_end();
    1308           0 :         CloseTransientFile(fd);
    1309           0 :         errno = save_errno;
    1310           0 :         ereport(elevel,
    1311             :                 (errcode_for_file_access(),
    1312             :                  errmsg("could not fsync file \"%s\": %m",
    1313             :                         tmppath)));
    1314           0 :         return;
    1315             :     }
    1316         924 :     pgstat_report_wait_end();
    1317             : 
    1318         924 :     if (CloseTransientFile(fd) != 0)
    1319             :     {
    1320           0 :         ereport(elevel,
    1321             :                 (errcode_for_file_access(),
    1322             :                  errmsg("could not close file \"%s\": %m",
    1323             :                         tmppath)));
    1324           0 :         return;
    1325             :     }
    1326             : 
    1327             :     /* rename to permanent file, fsync file and directory */
    1328         924 :     if (rename(tmppath, path) != 0)
    1329             :     {
    1330           0 :         ereport(elevel,
    1331             :                 (errcode_for_file_access(),
    1332             :                  errmsg("could not rename file \"%s\" to \"%s\": %m",
    1333             :                         tmppath, path)));
    1334           0 :         return;
    1335             :     }
    1336             : 
    1337             :     /*
    1338             :      * Check CreateSlotOnDisk() for the reasoning of using a critical section.
    1339             :      */
    1340         924 :     START_CRIT_SECTION();
    1341             : 
    1342         924 :     fsync_fname(path, false);
    1343         924 :     fsync_fname(dir, true);
    1344         924 :     fsync_fname("pg_replslot", true);
    1345             : 
    1346         924 :     END_CRIT_SECTION();
    1347             : 
    1348             :     /*
    1349             :      * Successfully wrote, unset dirty bit, unless somebody dirtied again
    1350             :      * already.
    1351             :      */
    1352         924 :     SpinLockAcquire(&slot->mutex);
    1353         924 :     if (!slot->just_dirtied)
    1354         924 :         slot->dirty = false;
    1355         924 :     SpinLockRelease(&slot->mutex);
    1356             : 
    1357         924 :     LWLockRelease(&slot->io_in_progress_lock);
    1358             : }
    1359             : 
    1360             : /*
    1361             :  * Load a single slot from disk into memory.
    1362             :  */
    1363             : static void
    1364          20 : RestoreSlotFromDisk(const char *name)
    1365             : {
    1366             :     ReplicationSlotOnDisk cp;
    1367             :     int         i;
    1368             :     char        slotdir[MAXPGPATH + 12];
    1369             :     char        path[MAXPGPATH + 22];
    1370             :     int         fd;
    1371          20 :     bool        restored = false;
    1372             :     int         readBytes;
    1373             :     pg_crc32c   checksum;
    1374             : 
    1375             :     /* no need to lock here, no concurrent access allowed yet */
    1376             : 
    1377             :     /* delete temp file if it exists */
    1378          20 :     sprintf(slotdir, "pg_replslot/%s", name);
    1379          20 :     sprintf(path, "%s/state.tmp", slotdir);
    1380          20 :     if (unlink(path) < 0 && errno != ENOENT)
    1381           0 :         ereport(PANIC,
    1382             :                 (errcode_for_file_access(),
    1383             :                  errmsg("could not remove file \"%s\": %m", path)));
    1384             : 
    1385          20 :     sprintf(path, "%s/state", slotdir);
    1386             : 
    1387          20 :     elog(DEBUG1, "restoring replication slot from \"%s\"", path);
    1388             : 
    1389          20 :     fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
    1390             : 
    1391             :     /*
    1392             :      * We do not need to handle this as we are rename()ing the directory into
    1393             :      * place only after we fsync()ed the state file.
    1394             :      */
    1395          20 :     if (fd < 0)
    1396           0 :         ereport(PANIC,
    1397             :                 (errcode_for_file_access(),
    1398             :                  errmsg("could not open file \"%s\": %m", path)));
    1399             : 
    1400             :     /*
    1401             :      * Sync state file before we're reading from it. We might have crashed
    1402             :      * while it wasn't synced yet and we shouldn't continue on that basis.
    1403             :      */
    1404          20 :     pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_RESTORE_SYNC);
    1405          20 :     if (pg_fsync(fd) != 0)
    1406           0 :         ereport(PANIC,
    1407             :                 (errcode_for_file_access(),
    1408             :                  errmsg("could not fsync file \"%s\": %m",
    1409             :                         path)));
    1410          20 :     pgstat_report_wait_end();
    1411             : 
    1412             :     /* Also sync the parent directory */
    1413          20 :     START_CRIT_SECTION();
    1414          20 :     fsync_fname(slotdir, true);
    1415          20 :     END_CRIT_SECTION();
    1416             : 
    1417             :     /* read part of statefile that's guaranteed to be version independent */
    1418          20 :     pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
    1419          20 :     readBytes = read(fd, &cp, ReplicationSlotOnDiskConstantSize);
    1420          20 :     pgstat_report_wait_end();
    1421          20 :     if (readBytes != ReplicationSlotOnDiskConstantSize)
    1422             :     {
    1423           0 :         if (readBytes < 0)
    1424           0 :             ereport(PANIC,
    1425             :                     (errcode_for_file_access(),
    1426             :                      errmsg("could not read file \"%s\": %m", path)));
    1427             :         else
    1428           0 :             ereport(PANIC,
    1429             :                     (errcode(ERRCODE_DATA_CORRUPTED),
    1430             :                      errmsg("could not read file \"%s\": read %d of %zu",
    1431             :                             path, readBytes,
    1432             :                             (Size) ReplicationSlotOnDiskConstantSize)));
    1433             :     }
    1434             : 
    1435             :     /* verify magic */
    1436          20 :     if (cp.magic != SLOT_MAGIC)
    1437           0 :         ereport(PANIC,
    1438             :                 (errcode(ERRCODE_DATA_CORRUPTED),
    1439             :                  errmsg("replication slot file \"%s\" has wrong magic number: %u instead of %u",
    1440             :                         path, cp.magic, SLOT_MAGIC)));
    1441             : 
    1442             :     /* verify version */
    1443          20 :     if (cp.version != SLOT_VERSION)
    1444           0 :         ereport(PANIC,
    1445             :                 (errcode(ERRCODE_DATA_CORRUPTED),
    1446             :                  errmsg("replication slot file \"%s\" has unsupported version %u",
    1447             :                         path, cp.version)));
    1448             : 
    1449             :     /* boundary check on length */
    1450          20 :     if (cp.length != ReplicationSlotOnDiskV2Size)
    1451           0 :         ereport(PANIC,
    1452             :                 (errcode(ERRCODE_DATA_CORRUPTED),
    1453             :                  errmsg("replication slot file \"%s\" has corrupted length %u",
    1454             :                         path, cp.length)));
    1455             : 
    1456             :     /* Now that we know the size, read the entire file */
    1457          20 :     pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
    1458          20 :     readBytes = read(fd,
    1459             :                      (char *) &cp + ReplicationSlotOnDiskConstantSize,
    1460          20 :                      cp.length);
    1461          20 :     pgstat_report_wait_end();
    1462          20 :     if (readBytes != cp.length)
    1463             :     {
    1464           0 :         if (readBytes < 0)
    1465           0 :             ereport(PANIC,
    1466             :                     (errcode_for_file_access(),
    1467             :                      errmsg("could not read file \"%s\": %m", path)));
    1468             :         else
    1469           0 :             ereport(PANIC,
    1470             :                     (errcode(ERRCODE_DATA_CORRUPTED),
    1471             :                      errmsg("could not read file \"%s\": read %d of %zu",
    1472             :                             path, readBytes, (Size) cp.length)));
    1473             :     }
    1474             : 
    1475          20 :     if (CloseTransientFile(fd) != 0)
    1476           0 :         ereport(PANIC,
    1477             :                 (errcode_for_file_access(),
    1478             :                  errmsg("could not close file \"%s\": %m", path)));
    1479             : 
    1480             :     /* now verify the CRC */
    1481          20 :     INIT_CRC32C(checksum);
    1482          20 :     COMP_CRC32C(checksum,
    1483             :                 (char *) &cp + SnapBuildOnDiskNotChecksummedSize,
    1484             :                 SnapBuildOnDiskChecksummedSize);
    1485          20 :     FIN_CRC32C(checksum);
    1486             : 
    1487          20 :     if (!EQ_CRC32C(checksum, cp.checksum))
    1488           0 :         ereport(PANIC,
    1489             :                 (errmsg("checksum mismatch for replication slot file \"%s\": is %u, should be %u",
    1490             :                         path, checksum, cp.checksum)));
    1491             : 
    1492             :     /*
    1493             :      * If we crashed with an ephemeral slot active, don't restore but delete
    1494             :      * it.
    1495             :      */
    1496          20 :     if (cp.slotdata.persistency != RS_PERSISTENT)
    1497             :     {
    1498           0 :         if (!rmtree(slotdir, true))
    1499             :         {
    1500           0 :             ereport(WARNING,
    1501             :                     (errmsg("could not remove directory \"%s\"",
    1502             :                             slotdir)));
    1503             :         }
    1504           0 :         fsync_fname("pg_replslot", true);
    1505           0 :         return;
    1506             :     }
    1507             : 
    1508             :     /*
    1509             :      * Verify that requirements for the specific slot type are met. That's
    1510             :      * important because if these aren't met we're not guaranteed to retain
    1511             :      * all the necessary resources for the slot.
    1512             :      *
    1513             :      * NB: We have to do so *after* the above checks for ephemeral slots,
    1514             :      * because otherwise a slot that shouldn't exist anymore could prevent
    1515             :      * restarts.
    1516             :      *
    1517             :      * NB: Changing the requirements here also requires adapting
    1518             :      * CheckSlotRequirements() and CheckLogicalDecodingRequirements().
    1519             :      */
    1520          20 :     if (cp.slotdata.database != InvalidOid && wal_level < WAL_LEVEL_LOGICAL)
    1521           0 :         ereport(FATAL,
    1522             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1523             :                  errmsg("logical replication slot \"%s\" exists, but wal_level < logical",
    1524             :                         NameStr(cp.slotdata.name)),
    1525             :                  errhint("Change wal_level to be logical or higher.")));
    1526          20 :     else if (wal_level < WAL_LEVEL_REPLICA)
    1527           0 :         ereport(FATAL,
    1528             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1529             :                  errmsg("physical replication slot \"%s\" exists, but wal_level < replica",
    1530             :                         NameStr(cp.slotdata.name)),
    1531             :                  errhint("Change wal_level to be replica or higher.")));
    1532             : 
    1533             :     /* nothing can be active yet, don't lock anything */
    1534          56 :     for (i = 0; i < max_replication_slots; i++)
    1535             :     {
    1536             :         ReplicationSlot *slot;
    1537             : 
    1538          28 :         slot = &ReplicationSlotCtl->replication_slots[i];
    1539             : 
    1540          28 :         if (slot->in_use)
    1541           8 :             continue;
    1542             : 
    1543             :         /* restore the entire set of persistent data */
    1544          20 :         memcpy(&slot->data, &cp.slotdata,
    1545             :                sizeof(ReplicationSlotPersistentData));
    1546             : 
    1547             :         /* initialize in memory state */
    1548          20 :         slot->effective_xmin = cp.slotdata.xmin;
    1549          20 :         slot->effective_catalog_xmin = cp.slotdata.catalog_xmin;
    1550             : 
    1551          20 :         slot->candidate_catalog_xmin = InvalidTransactionId;
    1552          20 :         slot->candidate_xmin_lsn = InvalidXLogRecPtr;
    1553          20 :         slot->candidate_restart_lsn = InvalidXLogRecPtr;
    1554          20 :         slot->candidate_restart_valid = InvalidXLogRecPtr;
    1555             : 
    1556          20 :         slot->in_use = true;
    1557          20 :         slot->active_pid = 0;
    1558             : 
    1559          20 :         restored = true;
    1560          20 :         break;
    1561             :     }
    1562             : 
    1563          20 :     if (!restored)
    1564           0 :         ereport(FATAL,
    1565             :                 (errmsg("too many replication slots active before shutdown"),
    1566             :                  errhint("Increase max_replication_slots and try again.")));
    1567             : }

Generated by: LCOV version 1.13