LCOV - code coverage report
Current view: top level - src/include/replication - slot.h (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 100.0 % 8 8
Test Date: 2026-02-28 14:14:49 Functions: 100.0 % 1 1
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  * slot.h
       3              :  *     Replication slot management.
       4              :  *
       5              :  * Copyright (c) 2012-2026, PostgreSQL Global Development Group
       6              :  *
       7              :  *-------------------------------------------------------------------------
       8              :  */
       9              : #ifndef SLOT_H
      10              : #define SLOT_H
      11              : 
      12              : #include "access/xlog.h"
      13              : #include "access/xlogreader.h"
      14              : #include "storage/condition_variable.h"
      15              : #include "storage/lwlock.h"
      16              : #include "storage/shmem.h"
      17              : #include "storage/spin.h"
      18              : #include "replication/walreceiver.h"
      19              : 
      20              : /* directory to store replication slot data in */
      21              : #define PG_REPLSLOT_DIR     "pg_replslot"
      22              : 
      23              : /*
      24              :  * The reserved name for a replication slot used to retain dead tuples for
      25              :  * conflict detection in logical replication. See
      26              :  * maybe_advance_nonremovable_xid() for detail.
      27              :  */
      28              : #define CONFLICT_DETECTION_SLOT "pg_conflict_detection"
      29              : 
      30              : /*
      31              :  * Behaviour of replication slots, upon release or crash.
      32              :  *
      33              :  * Slots marked as PERSISTENT are crash-safe and will not be dropped when
      34              :  * released. Slots marked as EPHEMERAL will be dropped when released or after
      35              :  * restarts.  Slots marked TEMPORARY will be dropped at the end of a session
      36              :  * or on error.
      37              :  *
      38              :  * EPHEMERAL is used as a not-quite-ready state when creating persistent
      39              :  * slots.  EPHEMERAL slots can be made PERSISTENT by calling
      40              :  * ReplicationSlotPersist().  For a slot that goes away at the end of a
      41              :  * session, TEMPORARY is the appropriate choice.
      42              :  */
      43              : typedef enum ReplicationSlotPersistency
      44              : {
      45              :     RS_PERSISTENT,
      46              :     RS_EPHEMERAL,
      47              :     RS_TEMPORARY,
      48              : } ReplicationSlotPersistency;
      49              : 
      50              : /*
      51              :  * Slots can be invalidated, e.g. due to max_slot_wal_keep_size. If so, the
      52              :  * 'invalidated' field is set to a value other than _NONE.
      53              :  *
      54              :  * When adding a new invalidation cause here, the value must be powers of 2
      55              :  * (e.g., 1, 2, 4...) for proper bitwise operations. Also, remember to update
      56              :  * RS_INVAL_MAX_CAUSES below, and SlotInvalidationCauses in slot.c.
      57              :  */
      58              : typedef enum ReplicationSlotInvalidationCause
      59              : {
      60              :     RS_INVAL_NONE = 0,
      61              :     /* required WAL has been removed */
      62              :     RS_INVAL_WAL_REMOVED = (1 << 0),
      63              :     /* required rows have been removed */
      64              :     RS_INVAL_HORIZON = (1 << 1),
      65              :     /* wal_level insufficient for slot */
      66              :     RS_INVAL_WAL_LEVEL = (1 << 2),
      67              :     /* idle slot timeout has occurred */
      68              :     RS_INVAL_IDLE_TIMEOUT = (1 << 3),
      69              : } ReplicationSlotInvalidationCause;
      70              : 
      71              : /* Maximum number of invalidation causes */
      72              : #define RS_INVAL_MAX_CAUSES 4
      73              : 
      74              : /*
      75              :  * When the slot synchronization worker is running, or when
      76              :  * pg_sync_replication_slots is executed, slot synchronization may be
      77              :  * skipped. This enum defines the possible reasons for skipping slot
      78              :  * synchronization.
      79              :  */
      80              : typedef enum SlotSyncSkipReason
      81              : {
      82              :     SS_SKIP_NONE,               /* No skip */
      83              :     SS_SKIP_WAL_NOT_FLUSHED,    /* Standby did not flush the wal corresponding
      84              :                                  * to confirmed flush of remote slot */
      85              :     SS_SKIP_WAL_OR_ROWS_REMOVED,    /* Remote slot is behind; required WAL or
      86              :                                      * rows may be removed or at risk */
      87              :     SS_SKIP_NO_CONSISTENT_SNAPSHOT, /* Standby could not build a consistent
      88              :                                      * snapshot */
      89              :     SS_SKIP_INVALID             /* Local slot is invalid */
      90              : } SlotSyncSkipReason;
      91              : 
      92              : /*
      93              :  * On-Disk data of a replication slot, preserved across restarts.
      94              :  */
      95              : typedef struct ReplicationSlotPersistentData
      96              : {
      97              :     /* The slot's identifier */
      98              :     NameData    name;
      99              : 
     100              :     /* database the slot is active on */
     101              :     Oid         database;
     102              : 
     103              :     /*
     104              :      * The slot's behaviour when being dropped (or restored after a crash).
     105              :      */
     106              :     ReplicationSlotPersistency persistency;
     107              : 
     108              :     /*
     109              :      * xmin horizon for data
     110              :      *
     111              :      * NB: This may represent a value that hasn't been written to disk yet;
     112              :      * see notes for effective_xmin, below.
     113              :      */
     114              :     TransactionId xmin;
     115              : 
     116              :     /*
     117              :      * xmin horizon for catalog tuples
     118              :      *
     119              :      * NB: This may represent a value that hasn't been written to disk yet;
     120              :      * see notes for effective_xmin, below.
     121              :      */
     122              :     TransactionId catalog_xmin;
     123              : 
     124              :     /* oldest LSN that might be required by this replication slot */
     125              :     XLogRecPtr  restart_lsn;
     126              : 
     127              :     /* RS_INVAL_NONE if valid, or the reason for having been invalidated */
     128              :     ReplicationSlotInvalidationCause invalidated;
     129              : 
     130              :     /*
     131              :      * Oldest LSN that the client has acked receipt for.  This is used as the
     132              :      * start_lsn point in case the client doesn't specify one, and also as a
     133              :      * safety measure to jump forwards in case the client specifies a
     134              :      * start_lsn that's further in the past than this value.
     135              :      */
     136              :     XLogRecPtr  confirmed_flush;
     137              : 
     138              :     /*
     139              :      * LSN at which we enabled two_phase commit for this slot or LSN at which
     140              :      * we found a consistent point at the time of slot creation.
     141              :      */
     142              :     XLogRecPtr  two_phase_at;
     143              : 
     144              :     /*
     145              :      * Allow decoding of prepared transactions?
     146              :      */
     147              :     bool        two_phase;
     148              : 
     149              :     /* plugin name */
     150              :     NameData    plugin;
     151              : 
     152              :     /*
     153              :      * Was this slot synchronized from the primary server?
     154              :      */
     155              :     bool        synced;
     156              : 
     157              :     /*
     158              :      * Is this a failover slot (sync candidate for standbys)? Only relevant
     159              :      * for logical slots on the primary server.
     160              :      */
     161              :     bool        failover;
     162              : } ReplicationSlotPersistentData;
     163              : 
     164              : /*
     165              :  * Shared memory state of a single replication slot.
     166              :  *
     167              :  * The in-memory data of replication slots follows a locking model based
     168              :  * on two linked concepts:
     169              :  * - A replication slot's in_use flag is switched when added or discarded using
     170              :  * the LWLock ReplicationSlotControlLock, which needs to be hold in exclusive
     171              :  * mode when updating the flag by the backend owning the slot and doing the
     172              :  * operation, while readers (concurrent backends not owning the slot) need
     173              :  * to hold it in shared mode when looking at replication slot data.
     174              :  * - Individual fields are protected by mutex where only the backend owning
     175              :  * the slot is authorized to update the fields from its own slot.  The
     176              :  * backend owning the slot does not need to take this lock when reading its
     177              :  * own fields, while concurrent backends not owning this slot should take the
     178              :  * lock when reading this slot's data.
     179              :  */
     180              : typedef struct ReplicationSlot
     181              : {
     182              :     /* lock, on same cacheline as effective_xmin */
     183              :     slock_t     mutex;
     184              : 
     185              :     /* is this slot defined */
     186              :     bool        in_use;
     187              : 
     188              :     /*
     189              :      * Who is streaming out changes for this slot? INVALID_PROC_NUMBER in
     190              :      * unused slots.
     191              :      */
     192              :     ProcNumber  active_proc;
     193              : 
     194              :     /* any outstanding modifications? */
     195              :     bool        just_dirtied;
     196              :     bool        dirty;
     197              : 
     198              :     /*
     199              :      * For logical decoding, it's extremely important that we never remove any
     200              :      * data that's still needed for decoding purposes, even after a crash;
     201              :      * otherwise, decoding will produce wrong answers.  Ordinary streaming
     202              :      * replication also needs to prevent old row versions from being removed
     203              :      * too soon, but the worst consequence we might encounter there is
     204              :      * unwanted query cancellations on the standby.  Thus, for logical
     205              :      * decoding, this value represents the latest xmin that has actually been
     206              :      * written to disk, whereas for streaming replication, it's just the same
     207              :      * as the persistent value (data.xmin).
     208              :      */
     209              :     TransactionId effective_xmin;
     210              :     TransactionId effective_catalog_xmin;
     211              : 
     212              :     /* data surviving shutdowns and crashes */
     213              :     ReplicationSlotPersistentData data;
     214              : 
     215              :     /* is somebody performing io on this slot? */
     216              :     LWLock      io_in_progress_lock;
     217              : 
     218              :     /* Condition variable signaled when active_proc changes */
     219              :     ConditionVariable active_cv;
     220              : 
     221              :     /* all the remaining data is only used for logical slots */
     222              : 
     223              :     /*
     224              :      * When the client has confirmed flushes >= candidate_xmin_lsn we can
     225              :      * advance the catalog xmin.  When restart_valid has been passed,
     226              :      * restart_lsn can be increased.
     227              :      */
     228              :     TransactionId candidate_catalog_xmin;
     229              :     XLogRecPtr  candidate_xmin_lsn;
     230              :     XLogRecPtr  candidate_restart_valid;
     231              :     XLogRecPtr  candidate_restart_lsn;
     232              : 
     233              :     /*
     234              :      * This value tracks the last confirmed_flush LSN flushed which is used
     235              :      * during a shutdown checkpoint to decide if logical's slot data should be
     236              :      * forcibly flushed or not.
     237              :      */
     238              :     XLogRecPtr  last_saved_confirmed_flush;
     239              : 
     240              :     /*
     241              :      * The time when the slot became inactive. For synced slots on a standby
     242              :      * server, it represents the time when slot synchronization was most
     243              :      * recently stopped.
     244              :      */
     245              :     TimestampTz inactive_since;
     246              : 
     247              :     /*
     248              :      * Latest restart_lsn that has been flushed to disk. For persistent slots
     249              :      * the flushed LSN should be taken into account when calculating the
     250              :      * oldest LSN for WAL segments removal.
     251              :      *
     252              :      * Do not assume that restart_lsn will always move forward, i.e., that the
     253              :      * previously flushed restart_lsn is always behind data.restart_lsn. In
     254              :      * streaming replication using a physical slot, the restart_lsn is updated
     255              :      * based on the flushed WAL position reported by the walreceiver.
     256              :      *
     257              :      * This replication mode allows duplicate WAL records to be received and
     258              :      * overwritten. If the walreceiver receives older WAL records and then
     259              :      * reports them as flushed to the walsender, the restart_lsn may appear to
     260              :      * move backward.
     261              :      *
     262              :      * This typically occurs at the beginning of replication. One reason is
     263              :      * that streaming replication starts at the beginning of a segment, so, if
     264              :      * restart_lsn is in the middle of a segment, it will be updated to an
     265              :      * earlier LSN, see RequestXLogStreaming. Another reason is that the
     266              :      * walreceiver chooses its startpoint based on the replayed LSN, so, if
     267              :      * some records have been received but not yet applied, they will be
     268              :      * received again and leads to updating the restart_lsn to an earlier
     269              :      * position.
     270              :      */
     271              :     XLogRecPtr  last_saved_restart_lsn;
     272              : 
     273              :     /*
     274              :      * Reason for the most recent slot synchronization skip.
     275              :      *
     276              :      * Slot sync skips can occur for both temporary and persistent replication
     277              :      * slots. They are more common for temporary slots, but persistent slots
     278              :      * may also skip synchronization in rare cases (e.g.,
     279              :      * SS_SKIP_WAL_NOT_FLUSHED or SS_SKIP_WAL_OR_ROWS_REMOVED).
     280              :      *
     281              :      * Since, temporary slots are dropped after server restart, persisting
     282              :      * slotsync_skip_reason provides no practical benefit.
     283              :      */
     284              :     SlotSyncSkipReason slotsync_skip_reason;
     285              : } ReplicationSlot;
     286              : 
     287              : #define SlotIsPhysical(slot) ((slot)->data.database == InvalidOid)
     288              : #define SlotIsLogical(slot) ((slot)->data.database != InvalidOid)
     289              : 
     290              : /*
     291              :  * Shared memory control area for all of replication slots.
     292              :  */
     293              : typedef struct ReplicationSlotCtlData
     294              : {
     295              :     /*
     296              :      * This array should be declared [FLEXIBLE_ARRAY_MEMBER], but for some
     297              :      * reason you can't do that in an otherwise-empty struct.
     298              :      */
     299              :     ReplicationSlot replication_slots[1];
     300              : } ReplicationSlotCtlData;
     301              : 
     302              : /*
     303              :  * Set slot's inactive_since property unless it was previously invalidated.
     304              :  */
     305              : static inline void
     306         3019 : ReplicationSlotSetInactiveSince(ReplicationSlot *s, TimestampTz ts,
     307              :                                 bool acquire_lock)
     308              : {
     309         3019 :     if (acquire_lock)
     310          316 :         SpinLockAcquire(&s->mutex);
     311              : 
     312         3019 :     if (s->data.invalidated == RS_INVAL_NONE)
     313         2972 :         s->inactive_since = ts;
     314              : 
     315         3019 :     if (acquire_lock)
     316          316 :         SpinLockRelease(&s->mutex);
     317         3019 : }
     318              : 
     319              : /*
     320              :  * Pointers to shared memory
     321              :  */
     322              : extern PGDLLIMPORT ReplicationSlotCtlData *ReplicationSlotCtl;
     323              : extern PGDLLIMPORT ReplicationSlot *MyReplicationSlot;
     324              : 
     325              : /* GUCs */
     326              : extern PGDLLIMPORT int max_replication_slots;
     327              : extern PGDLLIMPORT char *synchronized_standby_slots;
     328              : extern PGDLLIMPORT int idle_replication_slot_timeout_secs;
     329              : 
     330              : /* shmem initialization functions */
     331              : extern Size ReplicationSlotsShmemSize(void);
     332              : extern void ReplicationSlotsShmemInit(void);
     333              : 
     334              : /* management of individual slots */
     335              : extern void ReplicationSlotCreate(const char *name, bool db_specific,
     336              :                                   ReplicationSlotPersistency persistency,
     337              :                                   bool two_phase, bool failover,
     338              :                                   bool synced);
     339              : extern void ReplicationSlotPersist(void);
     340              : extern void ReplicationSlotDrop(const char *name, bool nowait);
     341              : extern void ReplicationSlotDropAcquired(void);
     342              : extern void ReplicationSlotAlter(const char *name, const bool *failover,
     343              :                                  const bool *two_phase);
     344              : 
     345              : extern void ReplicationSlotAcquire(const char *name, bool nowait,
     346              :                                    bool error_if_invalid);
     347              : extern void ReplicationSlotRelease(void);
     348              : extern void ReplicationSlotCleanup(bool synced_only);
     349              : extern void ReplicationSlotSave(void);
     350              : extern void ReplicationSlotMarkDirty(void);
     351              : 
     352              : /* misc stuff */
     353              : extern void ReplicationSlotInitialize(void);
     354              : extern bool ReplicationSlotValidateName(const char *name,
     355              :                                         bool allow_reserved_name,
     356              :                                         int elevel);
     357              : extern bool ReplicationSlotValidateNameInternal(const char *name,
     358              :                                                 bool allow_reserved_name,
     359              :                                                 int *err_code, char **err_msg, char **err_hint);
     360              : extern void ReplicationSlotReserveWal(void);
     361              : extern void ReplicationSlotsComputeRequiredXmin(bool already_locked);
     362              : extern void ReplicationSlotsComputeRequiredLSN(void);
     363              : extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void);
     364              : extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
     365              : extern bool CheckLogicalSlotExists(void);
     366              : extern void ReplicationSlotsDropDBSlots(Oid dboid);
     367              : extern bool InvalidateObsoleteReplicationSlots(uint32 possible_causes,
     368              :                                                XLogSegNo oldestSegno,
     369              :                                                Oid dboid,
     370              :                                                TransactionId snapshotConflictHorizon);
     371              : extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool need_lock);
     372              : extern int  ReplicationSlotIndex(ReplicationSlot *slot);
     373              : extern bool ReplicationSlotName(int index, Name name);
     374              : extern void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, Size szslot);
     375              : extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok);
     376              : 
     377              : extern void StartupReplicationSlots(void);
     378              : extern void CheckPointReplicationSlots(bool is_shutdown);
     379              : 
     380              : extern void CheckSlotRequirements(void);
     381              : extern void CheckSlotPermissions(void);
     382              : extern ReplicationSlotInvalidationCause
     383              :             GetSlotInvalidationCause(const char *cause_name);
     384              : extern const char *GetSlotInvalidationCauseName(ReplicationSlotInvalidationCause cause);
     385              : 
     386              : extern bool SlotExistsInSyncStandbySlots(const char *slot_name);
     387              : extern bool StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel);
     388              : extern void WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn);
     389              : 
     390              : #endif                          /* SLOT_H */
        

Generated by: LCOV version 2.0-1