Line data Source code
1 : /*-------------------------------------------------------------------------
2 : * slot.h
3 : * Replication slot management.
4 : *
5 : * Copyright (c) 2012-2026, PostgreSQL Global Development Group
6 : *
7 : *-------------------------------------------------------------------------
8 : */
9 : #ifndef SLOT_H
10 : #define SLOT_H
11 :
12 : #include "access/xlog.h"
13 : #include "access/xlogreader.h"
14 : #include "storage/condition_variable.h"
15 : #include "storage/lwlock.h"
16 : #include "storage/shmem.h"
17 : #include "storage/spin.h"
18 : #include "replication/walreceiver.h"
19 :
20 : /* directory to store replication slot data in */
21 : #define PG_REPLSLOT_DIR "pg_replslot"
22 :
23 : /*
24 : * The reserved name for a replication slot used to retain dead tuples for
25 : * conflict detection in logical replication. See
26 : * maybe_advance_nonremovable_xid() for detail.
27 : */
28 : #define CONFLICT_DETECTION_SLOT "pg_conflict_detection"
29 :
30 : /*
31 : * Behaviour of replication slots, upon release or crash.
32 : *
33 : * Slots marked as PERSISTENT are crash-safe and will not be dropped when
34 : * released. Slots marked as EPHEMERAL will be dropped when released or after
35 : * restarts. Slots marked TEMPORARY will be dropped at the end of a session
36 : * or on error.
37 : *
38 : * EPHEMERAL is used as a not-quite-ready state when creating persistent
39 : * slots. EPHEMERAL slots can be made PERSISTENT by calling
40 : * ReplicationSlotPersist(). For a slot that goes away at the end of a
41 : * session, TEMPORARY is the appropriate choice.
42 : */
43 : typedef enum ReplicationSlotPersistency
44 : {
45 : RS_PERSISTENT,
46 : RS_EPHEMERAL,
47 : RS_TEMPORARY,
48 : } ReplicationSlotPersistency;
49 :
50 : /*
51 : * Slots can be invalidated, e.g. due to max_slot_wal_keep_size. If so, the
52 : * 'invalidated' field is set to a value other than _NONE.
53 : *
54 : * When adding a new invalidation cause here, the value must be powers of 2
55 : * (e.g., 1, 2, 4...) for proper bitwise operations. Also, remember to update
56 : * RS_INVAL_MAX_CAUSES below, and SlotInvalidationCauses in slot.c.
57 : */
58 : typedef enum ReplicationSlotInvalidationCause
59 : {
60 : RS_INVAL_NONE = 0,
61 : /* required WAL has been removed */
62 : RS_INVAL_WAL_REMOVED = (1 << 0),
63 : /* required rows have been removed */
64 : RS_INVAL_HORIZON = (1 << 1),
65 : /* wal_level insufficient for slot */
66 : RS_INVAL_WAL_LEVEL = (1 << 2),
67 : /* idle slot timeout has occurred */
68 : RS_INVAL_IDLE_TIMEOUT = (1 << 3),
69 : } ReplicationSlotInvalidationCause;
70 :
71 : /* Maximum number of invalidation causes */
72 : #define RS_INVAL_MAX_CAUSES 4
73 :
74 : /*
75 : * When the slot synchronization worker is running, or when
76 : * pg_sync_replication_slots is executed, slot synchronization may be
77 : * skipped. This enum defines the possible reasons for skipping slot
78 : * synchronization.
79 : */
80 : typedef enum SlotSyncSkipReason
81 : {
82 : SS_SKIP_NONE, /* No skip */
83 : SS_SKIP_WAL_NOT_FLUSHED, /* Standby did not flush the wal corresponding
84 : * to confirmed flush of remote slot */
85 : SS_SKIP_WAL_OR_ROWS_REMOVED, /* Remote slot is behind; required WAL or
86 : * rows may be removed or at risk */
87 : SS_SKIP_NO_CONSISTENT_SNAPSHOT, /* Standby could not build a consistent
88 : * snapshot */
89 : SS_SKIP_INVALID /* Local slot is invalid */
90 : } SlotSyncSkipReason;
91 :
92 : /*
93 : * On-Disk data of a replication slot, preserved across restarts.
94 : */
95 : typedef struct ReplicationSlotPersistentData
96 : {
97 : /* The slot's identifier */
98 : NameData name;
99 :
100 : /* database the slot is active on */
101 : Oid database;
102 :
103 : /*
104 : * The slot's behaviour when being dropped (or restored after a crash).
105 : */
106 : ReplicationSlotPersistency persistency;
107 :
108 : /*
109 : * xmin horizon for data
110 : *
111 : * NB: This may represent a value that hasn't been written to disk yet;
112 : * see notes for effective_xmin, below.
113 : */
114 : TransactionId xmin;
115 :
116 : /*
117 : * xmin horizon for catalog tuples
118 : *
119 : * NB: This may represent a value that hasn't been written to disk yet;
120 : * see notes for effective_xmin, below.
121 : */
122 : TransactionId catalog_xmin;
123 :
124 : /* oldest LSN that might be required by this replication slot */
125 : XLogRecPtr restart_lsn;
126 :
127 : /* RS_INVAL_NONE if valid, or the reason for having been invalidated */
128 : ReplicationSlotInvalidationCause invalidated;
129 :
130 : /*
131 : * Oldest LSN that the client has acked receipt for. This is used as the
132 : * start_lsn point in case the client doesn't specify one, and also as a
133 : * safety measure to jump forwards in case the client specifies a
134 : * start_lsn that's further in the past than this value.
135 : */
136 : XLogRecPtr confirmed_flush;
137 :
138 : /*
139 : * LSN at which we enabled two_phase commit for this slot or LSN at which
140 : * we found a consistent point at the time of slot creation.
141 : */
142 : XLogRecPtr two_phase_at;
143 :
144 : /*
145 : * Allow decoding of prepared transactions?
146 : */
147 : bool two_phase;
148 :
149 : /* plugin name */
150 : NameData plugin;
151 :
152 : /*
153 : * Was this slot synchronized from the primary server?
154 : */
155 : bool synced;
156 :
157 : /*
158 : * Is this a failover slot (sync candidate for standbys)? Only relevant
159 : * for logical slots on the primary server.
160 : */
161 : bool failover;
162 : } ReplicationSlotPersistentData;
163 :
164 : /*
165 : * Shared memory state of a single replication slot.
166 : *
167 : * The in-memory data of replication slots follows a locking model based
168 : * on two linked concepts:
169 : * - A replication slot's in_use flag is switched when added or discarded using
170 : * the LWLock ReplicationSlotControlLock, which needs to be hold in exclusive
171 : * mode when updating the flag by the backend owning the slot and doing the
172 : * operation, while readers (concurrent backends not owning the slot) need
173 : * to hold it in shared mode when looking at replication slot data.
174 : * - Individual fields are protected by mutex where only the backend owning
175 : * the slot is authorized to update the fields from its own slot. The
176 : * backend owning the slot does not need to take this lock when reading its
177 : * own fields, while concurrent backends not owning this slot should take the
178 : * lock when reading this slot's data.
179 : */
180 : typedef struct ReplicationSlot
181 : {
182 : /* lock, on same cacheline as effective_xmin */
183 : slock_t mutex;
184 :
185 : /* is this slot defined */
186 : bool in_use;
187 :
188 : /*
189 : * Who is streaming out changes for this slot? INVALID_PROC_NUMBER in
190 : * unused slots.
191 : */
192 : ProcNumber active_proc;
193 :
194 : /* any outstanding modifications? */
195 : bool just_dirtied;
196 : bool dirty;
197 :
198 : /*
199 : * For logical decoding, it's extremely important that we never remove any
200 : * data that's still needed for decoding purposes, even after a crash;
201 : * otherwise, decoding will produce wrong answers. Ordinary streaming
202 : * replication also needs to prevent old row versions from being removed
203 : * too soon, but the worst consequence we might encounter there is
204 : * unwanted query cancellations on the standby. Thus, for logical
205 : * decoding, this value represents the latest xmin that has actually been
206 : * written to disk, whereas for streaming replication, it's just the same
207 : * as the persistent value (data.xmin).
208 : */
209 : TransactionId effective_xmin;
210 : TransactionId effective_catalog_xmin;
211 :
212 : /* data surviving shutdowns and crashes */
213 : ReplicationSlotPersistentData data;
214 :
215 : /* is somebody performing io on this slot? */
216 : LWLock io_in_progress_lock;
217 :
218 : /* Condition variable signaled when active_proc changes */
219 : ConditionVariable active_cv;
220 :
221 : /* all the remaining data is only used for logical slots */
222 :
223 : /*
224 : * When the client has confirmed flushes >= candidate_xmin_lsn we can
225 : * advance the catalog xmin. When restart_valid has been passed,
226 : * restart_lsn can be increased.
227 : */
228 : TransactionId candidate_catalog_xmin;
229 : XLogRecPtr candidate_xmin_lsn;
230 : XLogRecPtr candidate_restart_valid;
231 : XLogRecPtr candidate_restart_lsn;
232 :
233 : /*
234 : * This value tracks the last confirmed_flush LSN flushed which is used
235 : * during a shutdown checkpoint to decide if logical's slot data should be
236 : * forcibly flushed or not.
237 : */
238 : XLogRecPtr last_saved_confirmed_flush;
239 :
240 : /*
241 : * The time when the slot became inactive. For synced slots on a standby
242 : * server, it represents the time when slot synchronization was most
243 : * recently stopped.
244 : */
245 : TimestampTz inactive_since;
246 :
247 : /*
248 : * Latest restart_lsn that has been flushed to disk. For persistent slots
249 : * the flushed LSN should be taken into account when calculating the
250 : * oldest LSN for WAL segments removal.
251 : *
252 : * Do not assume that restart_lsn will always move forward, i.e., that the
253 : * previously flushed restart_lsn is always behind data.restart_lsn. In
254 : * streaming replication using a physical slot, the restart_lsn is updated
255 : * based on the flushed WAL position reported by the walreceiver.
256 : *
257 : * This replication mode allows duplicate WAL records to be received and
258 : * overwritten. If the walreceiver receives older WAL records and then
259 : * reports them as flushed to the walsender, the restart_lsn may appear to
260 : * move backward.
261 : *
262 : * This typically occurs at the beginning of replication. One reason is
263 : * that streaming replication starts at the beginning of a segment, so, if
264 : * restart_lsn is in the middle of a segment, it will be updated to an
265 : * earlier LSN, see RequestXLogStreaming. Another reason is that the
266 : * walreceiver chooses its startpoint based on the replayed LSN, so, if
267 : * some records have been received but not yet applied, they will be
268 : * received again and leads to updating the restart_lsn to an earlier
269 : * position.
270 : */
271 : XLogRecPtr last_saved_restart_lsn;
272 :
273 : /*
274 : * Reason for the most recent slot synchronization skip.
275 : *
276 : * Slot sync skips can occur for both temporary and persistent replication
277 : * slots. They are more common for temporary slots, but persistent slots
278 : * may also skip synchronization in rare cases (e.g.,
279 : * SS_SKIP_WAL_NOT_FLUSHED or SS_SKIP_WAL_OR_ROWS_REMOVED).
280 : *
281 : * Since, temporary slots are dropped after server restart, persisting
282 : * slotsync_skip_reason provides no practical benefit.
283 : */
284 : SlotSyncSkipReason slotsync_skip_reason;
285 : } ReplicationSlot;
286 :
287 : #define SlotIsPhysical(slot) ((slot)->data.database == InvalidOid)
288 : #define SlotIsLogical(slot) ((slot)->data.database != InvalidOid)
289 :
290 : /*
291 : * Shared memory control area for all of replication slots.
292 : */
293 : typedef struct ReplicationSlotCtlData
294 : {
295 : /*
296 : * This array should be declared [FLEXIBLE_ARRAY_MEMBER], but for some
297 : * reason you can't do that in an otherwise-empty struct.
298 : */
299 : ReplicationSlot replication_slots[1];
300 : } ReplicationSlotCtlData;
301 :
302 : /*
303 : * Set slot's inactive_since property unless it was previously invalidated.
304 : */
305 : static inline void
306 3019 : ReplicationSlotSetInactiveSince(ReplicationSlot *s, TimestampTz ts,
307 : bool acquire_lock)
308 : {
309 3019 : if (acquire_lock)
310 316 : SpinLockAcquire(&s->mutex);
311 :
312 3019 : if (s->data.invalidated == RS_INVAL_NONE)
313 2972 : s->inactive_since = ts;
314 :
315 3019 : if (acquire_lock)
316 316 : SpinLockRelease(&s->mutex);
317 3019 : }
318 :
319 : /*
320 : * Pointers to shared memory
321 : */
322 : extern PGDLLIMPORT ReplicationSlotCtlData *ReplicationSlotCtl;
323 : extern PGDLLIMPORT ReplicationSlot *MyReplicationSlot;
324 :
325 : /* GUCs */
326 : extern PGDLLIMPORT int max_replication_slots;
327 : extern PGDLLIMPORT char *synchronized_standby_slots;
328 : extern PGDLLIMPORT int idle_replication_slot_timeout_secs;
329 :
330 : /* shmem initialization functions */
331 : extern Size ReplicationSlotsShmemSize(void);
332 : extern void ReplicationSlotsShmemInit(void);
333 :
334 : /* management of individual slots */
335 : extern void ReplicationSlotCreate(const char *name, bool db_specific,
336 : ReplicationSlotPersistency persistency,
337 : bool two_phase, bool failover,
338 : bool synced);
339 : extern void ReplicationSlotPersist(void);
340 : extern void ReplicationSlotDrop(const char *name, bool nowait);
341 : extern void ReplicationSlotDropAcquired(void);
342 : extern void ReplicationSlotAlter(const char *name, const bool *failover,
343 : const bool *two_phase);
344 :
345 : extern void ReplicationSlotAcquire(const char *name, bool nowait,
346 : bool error_if_invalid);
347 : extern void ReplicationSlotRelease(void);
348 : extern void ReplicationSlotCleanup(bool synced_only);
349 : extern void ReplicationSlotSave(void);
350 : extern void ReplicationSlotMarkDirty(void);
351 :
352 : /* misc stuff */
353 : extern void ReplicationSlotInitialize(void);
354 : extern bool ReplicationSlotValidateName(const char *name,
355 : bool allow_reserved_name,
356 : int elevel);
357 : extern bool ReplicationSlotValidateNameInternal(const char *name,
358 : bool allow_reserved_name,
359 : int *err_code, char **err_msg, char **err_hint);
360 : extern void ReplicationSlotReserveWal(void);
361 : extern void ReplicationSlotsComputeRequiredXmin(bool already_locked);
362 : extern void ReplicationSlotsComputeRequiredLSN(void);
363 : extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void);
364 : extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
365 : extern bool CheckLogicalSlotExists(void);
366 : extern void ReplicationSlotsDropDBSlots(Oid dboid);
367 : extern bool InvalidateObsoleteReplicationSlots(uint32 possible_causes,
368 : XLogSegNo oldestSegno,
369 : Oid dboid,
370 : TransactionId snapshotConflictHorizon);
371 : extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool need_lock);
372 : extern int ReplicationSlotIndex(ReplicationSlot *slot);
373 : extern bool ReplicationSlotName(int index, Name name);
374 : extern void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, Size szslot);
375 : extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok);
376 :
377 : extern void StartupReplicationSlots(void);
378 : extern void CheckPointReplicationSlots(bool is_shutdown);
379 :
380 : extern void CheckSlotRequirements(void);
381 : extern void CheckSlotPermissions(void);
382 : extern ReplicationSlotInvalidationCause
383 : GetSlotInvalidationCause(const char *cause_name);
384 : extern const char *GetSlotInvalidationCauseName(ReplicationSlotInvalidationCause cause);
385 :
386 : extern bool SlotExistsInSyncStandbySlots(const char *slot_name);
387 : extern bool StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel);
388 : extern void WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn);
389 :
390 : #endif /* SLOT_H */
|