LCOV - code coverage report
Current view: top level - src/include/replication - slot.h (source / functions) Hit Total Coverage
Test: PostgreSQL 19devel Lines: 8 8 100.0 %
Date: 2025-07-29 04:18:44 Functions: 1 1 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  * slot.h
       3             :  *     Replication slot management.
       4             :  *
       5             :  * Copyright (c) 2012-2025, PostgreSQL Global Development Group
       6             :  *
       7             :  *-------------------------------------------------------------------------
       8             :  */
       9             : #ifndef SLOT_H
      10             : #define SLOT_H
      11             : 
      12             : #include "access/xlog.h"
      13             : #include "access/xlogreader.h"
      14             : #include "storage/condition_variable.h"
      15             : #include "storage/lwlock.h"
      16             : #include "storage/shmem.h"
      17             : #include "storage/spin.h"
      18             : #include "replication/walreceiver.h"
      19             : 
      20             : /* directory to store replication slot data in */
      21             : #define PG_REPLSLOT_DIR     "pg_replslot"
      22             : 
      23             : /*
      24             :  * The reserved name for a replication slot used to retain dead tuples for
      25             :  * conflict detection in logical replication. See
      26             :  * maybe_advance_nonremovable_xid() for detail.
      27             :  */
      28             : #define CONFLICT_DETECTION_SLOT "pg_conflict_detection"
      29             : 
      30             : /*
      31             :  * Behaviour of replication slots, upon release or crash.
      32             :  *
      33             :  * Slots marked as PERSISTENT are crash-safe and will not be dropped when
      34             :  * released. Slots marked as EPHEMERAL will be dropped when released or after
      35             :  * restarts.  Slots marked TEMPORARY will be dropped at the end of a session
      36             :  * or on error.
      37             :  *
      38             :  * EPHEMERAL is used as a not-quite-ready state when creating persistent
      39             :  * slots.  EPHEMERAL slots can be made PERSISTENT by calling
      40             :  * ReplicationSlotPersist().  For a slot that goes away at the end of a
      41             :  * session, TEMPORARY is the appropriate choice.
      42             :  */
      43             : typedef enum ReplicationSlotPersistency
      44             : {
      45             :     RS_PERSISTENT,
      46             :     RS_EPHEMERAL,
      47             :     RS_TEMPORARY,
      48             : } ReplicationSlotPersistency;
      49             : 
      50             : /*
      51             :  * Slots can be invalidated, e.g. due to max_slot_wal_keep_size. If so, the
      52             :  * 'invalidated' field is set to a value other than _NONE.
      53             :  *
      54             :  * When adding a new invalidation cause here, the value must be powers of 2
      55             :  * (e.g., 1, 2, 4...) for proper bitwise operations. Also, remember to update
      56             :  * RS_INVAL_MAX_CAUSES below, and SlotInvalidationCauses in slot.c.
      57             :  */
      58             : typedef enum ReplicationSlotInvalidationCause
      59             : {
      60             :     RS_INVAL_NONE = 0,
      61             :     /* required WAL has been removed */
      62             :     RS_INVAL_WAL_REMOVED = (1 << 0),
      63             :     /* required rows have been removed */
      64             :     RS_INVAL_HORIZON = (1 << 1),
      65             :     /* wal_level insufficient for slot */
      66             :     RS_INVAL_WAL_LEVEL = (1 << 2),
      67             :     /* idle slot timeout has occurred */
      68             :     RS_INVAL_IDLE_TIMEOUT = (1 << 3),
      69             : } ReplicationSlotInvalidationCause;
      70             : 
      71             : /* Maximum number of invalidation causes */
      72             : #define RS_INVAL_MAX_CAUSES 4
      73             : 
      74             : /*
      75             :  * On-Disk data of a replication slot, preserved across restarts.
      76             :  */
      77             : typedef struct ReplicationSlotPersistentData
      78             : {
      79             :     /* The slot's identifier */
      80             :     NameData    name;
      81             : 
      82             :     /* database the slot is active on */
      83             :     Oid         database;
      84             : 
      85             :     /*
      86             :      * The slot's behaviour when being dropped (or restored after a crash).
      87             :      */
      88             :     ReplicationSlotPersistency persistency;
      89             : 
      90             :     /*
      91             :      * xmin horizon for data
      92             :      *
      93             :      * NB: This may represent a value that hasn't been written to disk yet;
      94             :      * see notes for effective_xmin, below.
      95             :      */
      96             :     TransactionId xmin;
      97             : 
      98             :     /*
      99             :      * xmin horizon for catalog tuples
     100             :      *
     101             :      * NB: This may represent a value that hasn't been written to disk yet;
     102             :      * see notes for effective_xmin, below.
     103             :      */
     104             :     TransactionId catalog_xmin;
     105             : 
     106             :     /* oldest LSN that might be required by this replication slot */
     107             :     XLogRecPtr  restart_lsn;
     108             : 
     109             :     /* RS_INVAL_NONE if valid, or the reason for having been invalidated */
     110             :     ReplicationSlotInvalidationCause invalidated;
     111             : 
     112             :     /*
     113             :      * Oldest LSN that the client has acked receipt for.  This is used as the
     114             :      * start_lsn point in case the client doesn't specify one, and also as a
     115             :      * safety measure to jump forwards in case the client specifies a
     116             :      * start_lsn that's further in the past than this value.
     117             :      */
     118             :     XLogRecPtr  confirmed_flush;
     119             : 
     120             :     /*
     121             :      * LSN at which we enabled two_phase commit for this slot or LSN at which
     122             :      * we found a consistent point at the time of slot creation.
     123             :      */
     124             :     XLogRecPtr  two_phase_at;
     125             : 
     126             :     /*
     127             :      * Allow decoding of prepared transactions?
     128             :      */
     129             :     bool        two_phase;
     130             : 
     131             :     /* plugin name */
     132             :     NameData    plugin;
     133             : 
     134             :     /*
     135             :      * Was this slot synchronized from the primary server?
     136             :      */
     137             :     char        synced;
     138             : 
     139             :     /*
     140             :      * Is this a failover slot (sync candidate for standbys)? Only relevant
     141             :      * for logical slots on the primary server.
     142             :      */
     143             :     bool        failover;
     144             : } ReplicationSlotPersistentData;
     145             : 
     146             : /*
     147             :  * Shared memory state of a single replication slot.
     148             :  *
     149             :  * The in-memory data of replication slots follows a locking model based
     150             :  * on two linked concepts:
     151             :  * - A replication slot's in_use flag is switched when added or discarded using
     152             :  * the LWLock ReplicationSlotControlLock, which needs to be hold in exclusive
     153             :  * mode when updating the flag by the backend owning the slot and doing the
     154             :  * operation, while readers (concurrent backends not owning the slot) need
     155             :  * to hold it in shared mode when looking at replication slot data.
     156             :  * - Individual fields are protected by mutex where only the backend owning
     157             :  * the slot is authorized to update the fields from its own slot.  The
     158             :  * backend owning the slot does not need to take this lock when reading its
     159             :  * own fields, while concurrent backends not owning this slot should take the
     160             :  * lock when reading this slot's data.
     161             :  */
     162             : typedef struct ReplicationSlot
     163             : {
     164             :     /* lock, on same cacheline as effective_xmin */
     165             :     slock_t     mutex;
     166             : 
     167             :     /* is this slot defined */
     168             :     bool        in_use;
     169             : 
     170             :     /* Who is streaming out changes for this slot? 0 in unused slots. */
     171             :     pid_t       active_pid;
     172             : 
     173             :     /* any outstanding modifications? */
     174             :     bool        just_dirtied;
     175             :     bool        dirty;
     176             : 
     177             :     /*
     178             :      * For logical decoding, it's extremely important that we never remove any
     179             :      * data that's still needed for decoding purposes, even after a crash;
     180             :      * otherwise, decoding will produce wrong answers.  Ordinary streaming
     181             :      * replication also needs to prevent old row versions from being removed
     182             :      * too soon, but the worst consequence we might encounter there is
     183             :      * unwanted query cancellations on the standby.  Thus, for logical
     184             :      * decoding, this value represents the latest xmin that has actually been
     185             :      * written to disk, whereas for streaming replication, it's just the same
     186             :      * as the persistent value (data.xmin).
     187             :      */
     188             :     TransactionId effective_xmin;
     189             :     TransactionId effective_catalog_xmin;
     190             : 
     191             :     /* data surviving shutdowns and crashes */
     192             :     ReplicationSlotPersistentData data;
     193             : 
     194             :     /* is somebody performing io on this slot? */
     195             :     LWLock      io_in_progress_lock;
     196             : 
     197             :     /* Condition variable signaled when active_pid changes */
     198             :     ConditionVariable active_cv;
     199             : 
     200             :     /* all the remaining data is only used for logical slots */
     201             : 
     202             :     /*
     203             :      * When the client has confirmed flushes >= candidate_xmin_lsn we can
     204             :      * advance the catalog xmin.  When restart_valid has been passed,
     205             :      * restart_lsn can be increased.
     206             :      */
     207             :     TransactionId candidate_catalog_xmin;
     208             :     XLogRecPtr  candidate_xmin_lsn;
     209             :     XLogRecPtr  candidate_restart_valid;
     210             :     XLogRecPtr  candidate_restart_lsn;
     211             : 
     212             :     /*
     213             :      * This value tracks the last confirmed_flush LSN flushed which is used
     214             :      * during a shutdown checkpoint to decide if logical's slot data should be
     215             :      * forcibly flushed or not.
     216             :      */
     217             :     XLogRecPtr  last_saved_confirmed_flush;
     218             : 
     219             :     /*
     220             :      * The time when the slot became inactive. For synced slots on a standby
     221             :      * server, it represents the time when slot synchronization was most
     222             :      * recently stopped.
     223             :      */
     224             :     TimestampTz inactive_since;
     225             : 
     226             :     /*
     227             :      * Latest restart_lsn that has been flushed to disk. For persistent slots
     228             :      * the flushed LSN should be taken into account when calculating the
     229             :      * oldest LSN for WAL segments removal.
     230             :      *
     231             :      * Do not assume that restart_lsn will always move forward, i.e., that the
     232             :      * previously flushed restart_lsn is always behind data.restart_lsn. In
     233             :      * streaming replication using a physical slot, the restart_lsn is updated
     234             :      * based on the flushed WAL position reported by the walreceiver.
     235             :      *
     236             :      * This replication mode allows duplicate WAL records to be received and
     237             :      * overwritten. If the walreceiver receives older WAL records and then
     238             :      * reports them as flushed to the walsender, the restart_lsn may appear to
     239             :      * move backward.
     240             :      *
     241             :      * This typically occurs at the beginning of replication. One reason is
     242             :      * that streaming replication starts at the beginning of a segment, so, if
     243             :      * restart_lsn is in the middle of a segment, it will be updated to an
     244             :      * earlier LSN, see RequestXLogStreaming. Another reason is that the
     245             :      * walreceiver chooses its startpoint based on the replayed LSN, so, if
     246             :      * some records have been received but not yet applied, they will be
     247             :      * received again and leads to updating the restart_lsn to an earlier
     248             :      * position.
     249             :      */
     250             :     XLogRecPtr  last_saved_restart_lsn;
     251             : 
     252             : } ReplicationSlot;
     253             : 
     254             : #define SlotIsPhysical(slot) ((slot)->data.database == InvalidOid)
     255             : #define SlotIsLogical(slot) ((slot)->data.database != InvalidOid)
     256             : 
     257             : /*
     258             :  * Shared memory control area for all of replication slots.
     259             :  */
     260             : typedef struct ReplicationSlotCtlData
     261             : {
     262             :     /*
     263             :      * This array should be declared [FLEXIBLE_ARRAY_MEMBER], but for some
     264             :      * reason you can't do that in an otherwise-empty struct.
     265             :      */
     266             :     ReplicationSlot replication_slots[1];
     267             : } ReplicationSlotCtlData;
     268             : 
     269             : /*
     270             :  * Set slot's inactive_since property unless it was previously invalidated.
     271             :  */
     272             : static inline void
     273        5666 : ReplicationSlotSetInactiveSince(ReplicationSlot *s, TimestampTz ts,
     274             :                                 bool acquire_lock)
     275             : {
     276        5666 :     if (acquire_lock)
     277         576 :         SpinLockAcquire(&s->mutex);
     278             : 
     279        5666 :     if (s->data.invalidated == RS_INVAL_NONE)
     280        5584 :         s->inactive_since = ts;
     281             : 
     282        5666 :     if (acquire_lock)
     283         576 :         SpinLockRelease(&s->mutex);
     284        5666 : }
     285             : 
     286             : /*
     287             :  * Pointers to shared memory
     288             :  */
     289             : extern PGDLLIMPORT ReplicationSlotCtlData *ReplicationSlotCtl;
     290             : extern PGDLLIMPORT ReplicationSlot *MyReplicationSlot;
     291             : 
     292             : /* GUCs */
     293             : extern PGDLLIMPORT int max_replication_slots;
     294             : extern PGDLLIMPORT char *synchronized_standby_slots;
     295             : extern PGDLLIMPORT int idle_replication_slot_timeout_secs;
     296             : 
     297             : /* shmem initialization functions */
     298             : extern Size ReplicationSlotsShmemSize(void);
     299             : extern void ReplicationSlotsShmemInit(void);
     300             : 
     301             : /* management of individual slots */
     302             : extern void ReplicationSlotCreate(const char *name, bool db_specific,
     303             :                                   ReplicationSlotPersistency persistency,
     304             :                                   bool two_phase, bool failover,
     305             :                                   bool synced);
     306             : extern void ReplicationSlotPersist(void);
     307             : extern void ReplicationSlotDrop(const char *name, bool nowait);
     308             : extern void ReplicationSlotDropAcquired(void);
     309             : extern void ReplicationSlotAlter(const char *name, const bool *failover,
     310             :                                  const bool *two_phase);
     311             : 
     312             : extern void ReplicationSlotAcquire(const char *name, bool nowait,
     313             :                                    bool error_if_invalid);
     314             : extern void ReplicationSlotRelease(void);
     315             : extern void ReplicationSlotCleanup(bool synced_only);
     316             : extern void ReplicationSlotSave(void);
     317             : extern void ReplicationSlotMarkDirty(void);
     318             : 
     319             : /* misc stuff */
     320             : extern void ReplicationSlotInitialize(void);
     321             : extern bool ReplicationSlotValidateName(const char *name,
     322             :                                         bool allow_reserved_name,
     323             :                                         int elevel);
     324             : extern void ReplicationSlotReserveWal(void);
     325             : extern void ReplicationSlotsComputeRequiredXmin(bool already_locked);
     326             : extern void ReplicationSlotsComputeRequiredLSN(void);
     327             : extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void);
     328             : extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
     329             : extern void ReplicationSlotsDropDBSlots(Oid dboid);
     330             : extern bool InvalidateObsoleteReplicationSlots(uint32 possible_causes,
     331             :                                                XLogSegNo oldestSegno,
     332             :                                                Oid dboid,
     333             :                                                TransactionId snapshotConflictHorizon);
     334             : extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool need_lock);
     335             : extern int  ReplicationSlotIndex(ReplicationSlot *slot);
     336             : extern bool ReplicationSlotName(int index, Name name);
     337             : extern void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, Size szslot);
     338             : extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok);
     339             : 
     340             : extern void StartupReplicationSlots(void);
     341             : extern void CheckPointReplicationSlots(bool is_shutdown);
     342             : 
     343             : extern void CheckSlotRequirements(void);
     344             : extern void CheckSlotPermissions(void);
     345             : extern ReplicationSlotInvalidationCause
     346             :             GetSlotInvalidationCause(const char *cause_name);
     347             : extern const char *GetSlotInvalidationCauseName(ReplicationSlotInvalidationCause cause);
     348             : 
     349             : extern bool SlotExistsInSyncStandbySlots(const char *slot_name);
     350             : extern bool StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel);
     351             : extern void WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn);
     352             : 
     353             : #endif                          /* SLOT_H */

Generated by: LCOV version 1.16