LCOV - code coverage report
Current view: top level - src/include/replication - slot.h (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 8 8 100.0 %
Date: 2025-02-22 07:14:56 Functions: 1 1 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  * slot.h
       3             :  *     Replication slot management.
       4             :  *
       5             :  * Copyright (c) 2012-2025, PostgreSQL Global Development Group
       6             :  *
       7             :  *-------------------------------------------------------------------------
       8             :  */
       9             : #ifndef SLOT_H
      10             : #define SLOT_H
      11             : 
      12             : #include "access/xlog.h"
      13             : #include "access/xlogreader.h"
      14             : #include "storage/condition_variable.h"
      15             : #include "storage/lwlock.h"
      16             : #include "storage/shmem.h"
      17             : #include "storage/spin.h"
      18             : #include "replication/walreceiver.h"
      19             : 
      20             : /* directory to store replication slot data in */
      21             : #define PG_REPLSLOT_DIR     "pg_replslot"
      22             : 
      23             : /*
      24             :  * Behaviour of replication slots, upon release or crash.
      25             :  *
      26             :  * Slots marked as PERSISTENT are crash-safe and will not be dropped when
      27             :  * released. Slots marked as EPHEMERAL will be dropped when released or after
      28             :  * restarts.  Slots marked TEMPORARY will be dropped at the end of a session
      29             :  * or on error.
      30             :  *
      31             :  * EPHEMERAL is used as a not-quite-ready state when creating persistent
      32             :  * slots.  EPHEMERAL slots can be made PERSISTENT by calling
      33             :  * ReplicationSlotPersist().  For a slot that goes away at the end of a
      34             :  * session, TEMPORARY is the appropriate choice.
      35             :  */
      36             : typedef enum ReplicationSlotPersistency
      37             : {
      38             :     RS_PERSISTENT,
      39             :     RS_EPHEMERAL,
      40             :     RS_TEMPORARY,
      41             : } ReplicationSlotPersistency;
      42             : 
      43             : /*
      44             :  * Slots can be invalidated, e.g. due to max_slot_wal_keep_size. If so, the
      45             :  * 'invalidated' field is set to a value other than _NONE.
      46             :  *
      47             :  * When adding a new invalidation cause here, the value must be powers of 2
      48             :  * (e.g., 1, 2, 4...) for proper bitwise operations. Also, remember to update
      49             :  * RS_INVAL_MAX_CAUSES below, and SlotInvalidationCauses in slot.c.
      50             :  */
      51             : typedef enum ReplicationSlotInvalidationCause
      52             : {
      53             :     RS_INVAL_NONE = 0,
      54             :     /* required WAL has been removed */
      55             :     RS_INVAL_WAL_REMOVED = (1 << 0),
      56             :     /* required rows have been removed */
      57             :     RS_INVAL_HORIZON = (1 << 1),
      58             :     /* wal_level insufficient for slot */
      59             :     RS_INVAL_WAL_LEVEL = (1 << 2),
      60             :     /* idle slot timeout has occurred */
      61             :     RS_INVAL_IDLE_TIMEOUT = (1 << 3),
      62             : } ReplicationSlotInvalidationCause;
      63             : 
      64             : /* Maximum number of invalidation causes */
      65             : #define RS_INVAL_MAX_CAUSES 4
      66             : 
      67             : /*
      68             :  * On-Disk data of a replication slot, preserved across restarts.
      69             :  */
      70             : typedef struct ReplicationSlotPersistentData
      71             : {
      72             :     /* The slot's identifier */
      73             :     NameData    name;
      74             : 
      75             :     /* database the slot is active on */
      76             :     Oid         database;
      77             : 
      78             :     /*
      79             :      * The slot's behaviour when being dropped (or restored after a crash).
      80             :      */
      81             :     ReplicationSlotPersistency persistency;
      82             : 
      83             :     /*
      84             :      * xmin horizon for data
      85             :      *
      86             :      * NB: This may represent a value that hasn't been written to disk yet;
      87             :      * see notes for effective_xmin, below.
      88             :      */
      89             :     TransactionId xmin;
      90             : 
      91             :     /*
      92             :      * xmin horizon for catalog tuples
      93             :      *
      94             :      * NB: This may represent a value that hasn't been written to disk yet;
      95             :      * see notes for effective_xmin, below.
      96             :      */
      97             :     TransactionId catalog_xmin;
      98             : 
      99             :     /* oldest LSN that might be required by this replication slot */
     100             :     XLogRecPtr  restart_lsn;
     101             : 
     102             :     /* RS_INVAL_NONE if valid, or the reason for having been invalidated */
     103             :     ReplicationSlotInvalidationCause invalidated;
     104             : 
     105             :     /*
     106             :      * Oldest LSN that the client has acked receipt for.  This is used as the
     107             :      * start_lsn point in case the client doesn't specify one, and also as a
     108             :      * safety measure to jump forwards in case the client specifies a
     109             :      * start_lsn that's further in the past than this value.
     110             :      */
     111             :     XLogRecPtr  confirmed_flush;
     112             : 
     113             :     /*
     114             :      * LSN at which we enabled two_phase commit for this slot or LSN at which
     115             :      * we found a consistent point at the time of slot creation.
     116             :      */
     117             :     XLogRecPtr  two_phase_at;
     118             : 
     119             :     /*
     120             :      * Allow decoding of prepared transactions?
     121             :      */
     122             :     bool        two_phase;
     123             : 
     124             :     /* plugin name */
     125             :     NameData    plugin;
     126             : 
     127             :     /*
     128             :      * Was this slot synchronized from the primary server?
     129             :      */
     130             :     char        synced;
     131             : 
     132             :     /*
     133             :      * Is this a failover slot (sync candidate for standbys)? Only relevant
     134             :      * for logical slots on the primary server.
     135             :      */
     136             :     bool        failover;
     137             : } ReplicationSlotPersistentData;
     138             : 
     139             : /*
     140             :  * Shared memory state of a single replication slot.
     141             :  *
     142             :  * The in-memory data of replication slots follows a locking model based
     143             :  * on two linked concepts:
     144             :  * - A replication slot's in_use flag is switched when added or discarded using
     145             :  * the LWLock ReplicationSlotControlLock, which needs to be hold in exclusive
     146             :  * mode when updating the flag by the backend owning the slot and doing the
     147             :  * operation, while readers (concurrent backends not owning the slot) need
     148             :  * to hold it in shared mode when looking at replication slot data.
     149             :  * - Individual fields are protected by mutex where only the backend owning
     150             :  * the slot is authorized to update the fields from its own slot.  The
     151             :  * backend owning the slot does not need to take this lock when reading its
     152             :  * own fields, while concurrent backends not owning this slot should take the
     153             :  * lock when reading this slot's data.
     154             :  */
     155             : typedef struct ReplicationSlot
     156             : {
     157             :     /* lock, on same cacheline as effective_xmin */
     158             :     slock_t     mutex;
     159             : 
     160             :     /* is this slot defined */
     161             :     bool        in_use;
     162             : 
     163             :     /* Who is streaming out changes for this slot? 0 in unused slots. */
     164             :     pid_t       active_pid;
     165             : 
     166             :     /* any outstanding modifications? */
     167             :     bool        just_dirtied;
     168             :     bool        dirty;
     169             : 
     170             :     /*
     171             :      * For logical decoding, it's extremely important that we never remove any
     172             :      * data that's still needed for decoding purposes, even after a crash;
     173             :      * otherwise, decoding will produce wrong answers.  Ordinary streaming
     174             :      * replication also needs to prevent old row versions from being removed
     175             :      * too soon, but the worst consequence we might encounter there is
     176             :      * unwanted query cancellations on the standby.  Thus, for logical
     177             :      * decoding, this value represents the latest xmin that has actually been
     178             :      * written to disk, whereas for streaming replication, it's just the same
     179             :      * as the persistent value (data.xmin).
     180             :      */
     181             :     TransactionId effective_xmin;
     182             :     TransactionId effective_catalog_xmin;
     183             : 
     184             :     /* data surviving shutdowns and crashes */
     185             :     ReplicationSlotPersistentData data;
     186             : 
     187             :     /* is somebody performing io on this slot? */
     188             :     LWLock      io_in_progress_lock;
     189             : 
     190             :     /* Condition variable signaled when active_pid changes */
     191             :     ConditionVariable active_cv;
     192             : 
     193             :     /* all the remaining data is only used for logical slots */
     194             : 
     195             :     /*
     196             :      * When the client has confirmed flushes >= candidate_xmin_lsn we can
     197             :      * advance the catalog xmin.  When restart_valid has been passed,
     198             :      * restart_lsn can be increased.
     199             :      */
     200             :     TransactionId candidate_catalog_xmin;
     201             :     XLogRecPtr  candidate_xmin_lsn;
     202             :     XLogRecPtr  candidate_restart_valid;
     203             :     XLogRecPtr  candidate_restart_lsn;
     204             : 
     205             :     /*
     206             :      * This value tracks the last confirmed_flush LSN flushed which is used
     207             :      * during a shutdown checkpoint to decide if logical's slot data should be
     208             :      * forcibly flushed or not.
     209             :      */
     210             :     XLogRecPtr  last_saved_confirmed_flush;
     211             : 
     212             :     /*
     213             :      * The time when the slot became inactive. For synced slots on a standby
     214             :      * server, it represents the time when slot synchronization was most
     215             :      * recently stopped.
     216             :      */
     217             :     TimestampTz inactive_since;
     218             : } ReplicationSlot;
     219             : 
     220             : #define SlotIsPhysical(slot) ((slot)->data.database == InvalidOid)
     221             : #define SlotIsLogical(slot) ((slot)->data.database != InvalidOid)
     222             : 
     223             : /*
     224             :  * Shared memory control area for all of replication slots.
     225             :  */
     226             : typedef struct ReplicationSlotCtlData
     227             : {
     228             :     /*
     229             :      * This array should be declared [FLEXIBLE_ARRAY_MEMBER], but for some
     230             :      * reason you can't do that in an otherwise-empty struct.
     231             :      */
     232             :     ReplicationSlot replication_slots[1];
     233             : } ReplicationSlotCtlData;
     234             : 
     235             : /*
     236             :  * Set slot's inactive_since property unless it was previously invalidated.
     237             :  */
     238             : static inline void
     239        5286 : ReplicationSlotSetInactiveSince(ReplicationSlot *s, TimestampTz ts,
     240             :                                 bool acquire_lock)
     241             : {
     242        5286 :     if (acquire_lock)
     243         568 :         SpinLockAcquire(&s->mutex);
     244             : 
     245        5286 :     if (s->data.invalidated == RS_INVAL_NONE)
     246        5232 :         s->inactive_since = ts;
     247             : 
     248        5286 :     if (acquire_lock)
     249         568 :         SpinLockRelease(&s->mutex);
     250        5286 : }
     251             : 
     252             : /*
     253             :  * Pointers to shared memory
     254             :  */
     255             : extern PGDLLIMPORT ReplicationSlotCtlData *ReplicationSlotCtl;
     256             : extern PGDLLIMPORT ReplicationSlot *MyReplicationSlot;
     257             : 
     258             : /* GUCs */
     259             : extern PGDLLIMPORT int max_replication_slots;
     260             : extern PGDLLIMPORT char *synchronized_standby_slots;
     261             : extern PGDLLIMPORT int idle_replication_slot_timeout_mins;
     262             : 
     263             : /* shmem initialization functions */
     264             : extern Size ReplicationSlotsShmemSize(void);
     265             : extern void ReplicationSlotsShmemInit(void);
     266             : 
     267             : /* management of individual slots */
     268             : extern void ReplicationSlotCreate(const char *name, bool db_specific,
     269             :                                   ReplicationSlotPersistency persistency,
     270             :                                   bool two_phase, bool failover,
     271             :                                   bool synced);
     272             : extern void ReplicationSlotPersist(void);
     273             : extern void ReplicationSlotDrop(const char *name, bool nowait);
     274             : extern void ReplicationSlotDropAcquired(void);
     275             : extern void ReplicationSlotAlter(const char *name, const bool *failover,
     276             :                                  const bool *two_phase);
     277             : 
     278             : extern void ReplicationSlotAcquire(const char *name, bool nowait,
     279             :                                    bool error_if_invalid);
     280             : extern void ReplicationSlotRelease(void);
     281             : extern void ReplicationSlotCleanup(bool synced_only);
     282             : extern void ReplicationSlotSave(void);
     283             : extern void ReplicationSlotMarkDirty(void);
     284             : 
     285             : /* misc stuff */
     286             : extern void ReplicationSlotInitialize(void);
     287             : extern bool ReplicationSlotValidateName(const char *name, int elevel);
     288             : extern void ReplicationSlotReserveWal(void);
     289             : extern void ReplicationSlotsComputeRequiredXmin(bool already_locked);
     290             : extern void ReplicationSlotsComputeRequiredLSN(void);
     291             : extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void);
     292             : extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
     293             : extern void ReplicationSlotsDropDBSlots(Oid dboid);
     294             : extern bool InvalidateObsoleteReplicationSlots(uint32 possible_causes,
     295             :                                                XLogSegNo oldestSegno,
     296             :                                                Oid dboid,
     297             :                                                TransactionId snapshotConflictHorizon);
     298             : extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool need_lock);
     299             : extern int  ReplicationSlotIndex(ReplicationSlot *slot);
     300             : extern bool ReplicationSlotName(int index, Name name);
     301             : extern void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, Size szslot);
     302             : extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok);
     303             : 
     304             : extern void StartupReplicationSlots(void);
     305             : extern void CheckPointReplicationSlots(bool is_shutdown);
     306             : 
     307             : extern void CheckSlotRequirements(void);
     308             : extern void CheckSlotPermissions(void);
     309             : extern ReplicationSlotInvalidationCause
     310             :             GetSlotInvalidationCause(const char *invalidation_reason);
     311             : extern const char *GetSlotInvalidationCauseName(ReplicationSlotInvalidationCause cause);
     312             : 
     313             : extern bool SlotExistsInSyncStandbySlots(const char *slot_name);
     314             : extern bool StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel);
     315             : extern void WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn);
     316             : 
     317             : #endif                          /* SLOT_H */

Generated by: LCOV version 1.14