Line data Source code
1 : /*-------------------------------------------------------------------------
2 : * slot.h
3 : * Replication slot management.
4 : *
5 : * Copyright (c) 2012-2025, PostgreSQL Global Development Group
6 : *
7 : *-------------------------------------------------------------------------
8 : */
9 : #ifndef SLOT_H
10 : #define SLOT_H
11 :
12 : #include "access/xlog.h"
13 : #include "access/xlogreader.h"
14 : #include "storage/condition_variable.h"
15 : #include "storage/lwlock.h"
16 : #include "storage/shmem.h"
17 : #include "storage/spin.h"
18 : #include "replication/walreceiver.h"
19 :
20 : /* directory to store replication slot data in */
21 : #define PG_REPLSLOT_DIR "pg_replslot"
22 :
23 : /*
24 : * Behaviour of replication slots, upon release or crash.
25 : *
26 : * Slots marked as PERSISTENT are crash-safe and will not be dropped when
27 : * released. Slots marked as EPHEMERAL will be dropped when released or after
28 : * restarts. Slots marked TEMPORARY will be dropped at the end of a session
29 : * or on error.
30 : *
31 : * EPHEMERAL is used as a not-quite-ready state when creating persistent
32 : * slots. EPHEMERAL slots can be made PERSISTENT by calling
33 : * ReplicationSlotPersist(). For a slot that goes away at the end of a
34 : * session, TEMPORARY is the appropriate choice.
35 : */
36 : typedef enum ReplicationSlotPersistency
37 : {
38 : RS_PERSISTENT,
39 : RS_EPHEMERAL,
40 : RS_TEMPORARY,
41 : } ReplicationSlotPersistency;
42 :
43 : /*
44 : * Slots can be invalidated, e.g. due to max_slot_wal_keep_size. If so, the
45 : * 'invalidated' field is set to a value other than _NONE.
46 : *
47 : * When adding a new invalidation cause here, the value must be powers of 2
48 : * (e.g., 1, 2, 4...) for proper bitwise operations. Also, remember to update
49 : * RS_INVAL_MAX_CAUSES below, and SlotInvalidationCauses in slot.c.
50 : */
51 : typedef enum ReplicationSlotInvalidationCause
52 : {
53 : RS_INVAL_NONE = 0,
54 : /* required WAL has been removed */
55 : RS_INVAL_WAL_REMOVED = (1 << 0),
56 : /* required rows have been removed */
57 : RS_INVAL_HORIZON = (1 << 1),
58 : /* wal_level insufficient for slot */
59 : RS_INVAL_WAL_LEVEL = (1 << 2),
60 : /* idle slot timeout has occurred */
61 : RS_INVAL_IDLE_TIMEOUT = (1 << 3),
62 : } ReplicationSlotInvalidationCause;
63 :
64 : /* Maximum number of invalidation causes */
65 : #define RS_INVAL_MAX_CAUSES 4
66 :
67 : /*
68 : * On-Disk data of a replication slot, preserved across restarts.
69 : */
70 : typedef struct ReplicationSlotPersistentData
71 : {
72 : /* The slot's identifier */
73 : NameData name;
74 :
75 : /* database the slot is active on */
76 : Oid database;
77 :
78 : /*
79 : * The slot's behaviour when being dropped (or restored after a crash).
80 : */
81 : ReplicationSlotPersistency persistency;
82 :
83 : /*
84 : * xmin horizon for data
85 : *
86 : * NB: This may represent a value that hasn't been written to disk yet;
87 : * see notes for effective_xmin, below.
88 : */
89 : TransactionId xmin;
90 :
91 : /*
92 : * xmin horizon for catalog tuples
93 : *
94 : * NB: This may represent a value that hasn't been written to disk yet;
95 : * see notes for effective_xmin, below.
96 : */
97 : TransactionId catalog_xmin;
98 :
99 : /* oldest LSN that might be required by this replication slot */
100 : XLogRecPtr restart_lsn;
101 :
102 : /* RS_INVAL_NONE if valid, or the reason for having been invalidated */
103 : ReplicationSlotInvalidationCause invalidated;
104 :
105 : /*
106 : * Oldest LSN that the client has acked receipt for. This is used as the
107 : * start_lsn point in case the client doesn't specify one, and also as a
108 : * safety measure to jump forwards in case the client specifies a
109 : * start_lsn that's further in the past than this value.
110 : */
111 : XLogRecPtr confirmed_flush;
112 :
113 : /*
114 : * LSN at which we enabled two_phase commit for this slot or LSN at which
115 : * we found a consistent point at the time of slot creation.
116 : */
117 : XLogRecPtr two_phase_at;
118 :
119 : /*
120 : * Allow decoding of prepared transactions?
121 : */
122 : bool two_phase;
123 :
124 : /* plugin name */
125 : NameData plugin;
126 :
127 : /*
128 : * Was this slot synchronized from the primary server?
129 : */
130 : char synced;
131 :
132 : /*
133 : * Is this a failover slot (sync candidate for standbys)? Only relevant
134 : * for logical slots on the primary server.
135 : */
136 : bool failover;
137 : } ReplicationSlotPersistentData;
138 :
139 : /*
140 : * Shared memory state of a single replication slot.
141 : *
142 : * The in-memory data of replication slots follows a locking model based
143 : * on two linked concepts:
144 : * - A replication slot's in_use flag is switched when added or discarded using
145 : * the LWLock ReplicationSlotControlLock, which needs to be hold in exclusive
146 : * mode when updating the flag by the backend owning the slot and doing the
147 : * operation, while readers (concurrent backends not owning the slot) need
148 : * to hold it in shared mode when looking at replication slot data.
149 : * - Individual fields are protected by mutex where only the backend owning
150 : * the slot is authorized to update the fields from its own slot. The
151 : * backend owning the slot does not need to take this lock when reading its
152 : * own fields, while concurrent backends not owning this slot should take the
153 : * lock when reading this slot's data.
154 : */
155 : typedef struct ReplicationSlot
156 : {
157 : /* lock, on same cacheline as effective_xmin */
158 : slock_t mutex;
159 :
160 : /* is this slot defined */
161 : bool in_use;
162 :
163 : /* Who is streaming out changes for this slot? 0 in unused slots. */
164 : pid_t active_pid;
165 :
166 : /* any outstanding modifications? */
167 : bool just_dirtied;
168 : bool dirty;
169 :
170 : /*
171 : * For logical decoding, it's extremely important that we never remove any
172 : * data that's still needed for decoding purposes, even after a crash;
173 : * otherwise, decoding will produce wrong answers. Ordinary streaming
174 : * replication also needs to prevent old row versions from being removed
175 : * too soon, but the worst consequence we might encounter there is
176 : * unwanted query cancellations on the standby. Thus, for logical
177 : * decoding, this value represents the latest xmin that has actually been
178 : * written to disk, whereas for streaming replication, it's just the same
179 : * as the persistent value (data.xmin).
180 : */
181 : TransactionId effective_xmin;
182 : TransactionId effective_catalog_xmin;
183 :
184 : /* data surviving shutdowns and crashes */
185 : ReplicationSlotPersistentData data;
186 :
187 : /* is somebody performing io on this slot? */
188 : LWLock io_in_progress_lock;
189 :
190 : /* Condition variable signaled when active_pid changes */
191 : ConditionVariable active_cv;
192 :
193 : /* all the remaining data is only used for logical slots */
194 :
195 : /*
196 : * When the client has confirmed flushes >= candidate_xmin_lsn we can
197 : * advance the catalog xmin. When restart_valid has been passed,
198 : * restart_lsn can be increased.
199 : */
200 : TransactionId candidate_catalog_xmin;
201 : XLogRecPtr candidate_xmin_lsn;
202 : XLogRecPtr candidate_restart_valid;
203 : XLogRecPtr candidate_restart_lsn;
204 :
205 : /*
206 : * This value tracks the last confirmed_flush LSN flushed which is used
207 : * during a shutdown checkpoint to decide if logical's slot data should be
208 : * forcibly flushed or not.
209 : */
210 : XLogRecPtr last_saved_confirmed_flush;
211 :
212 : /*
213 : * The time when the slot became inactive. For synced slots on a standby
214 : * server, it represents the time when slot synchronization was most
215 : * recently stopped.
216 : */
217 : TimestampTz inactive_since;
218 : } ReplicationSlot;
219 :
220 : #define SlotIsPhysical(slot) ((slot)->data.database == InvalidOid)
221 : #define SlotIsLogical(slot) ((slot)->data.database != InvalidOid)
222 :
223 : /*
224 : * Shared memory control area for all of replication slots.
225 : */
226 : typedef struct ReplicationSlotCtlData
227 : {
228 : /*
229 : * This array should be declared [FLEXIBLE_ARRAY_MEMBER], but for some
230 : * reason you can't do that in an otherwise-empty struct.
231 : */
232 : ReplicationSlot replication_slots[1];
233 : } ReplicationSlotCtlData;
234 :
235 : /*
236 : * Set slot's inactive_since property unless it was previously invalidated.
237 : */
238 : static inline void
239 5286 : ReplicationSlotSetInactiveSince(ReplicationSlot *s, TimestampTz ts,
240 : bool acquire_lock)
241 : {
242 5286 : if (acquire_lock)
243 568 : SpinLockAcquire(&s->mutex);
244 :
245 5286 : if (s->data.invalidated == RS_INVAL_NONE)
246 5232 : s->inactive_since = ts;
247 :
248 5286 : if (acquire_lock)
249 568 : SpinLockRelease(&s->mutex);
250 5286 : }
251 :
252 : /*
253 : * Pointers to shared memory
254 : */
255 : extern PGDLLIMPORT ReplicationSlotCtlData *ReplicationSlotCtl;
256 : extern PGDLLIMPORT ReplicationSlot *MyReplicationSlot;
257 :
258 : /* GUCs */
259 : extern PGDLLIMPORT int max_replication_slots;
260 : extern PGDLLIMPORT char *synchronized_standby_slots;
261 : extern PGDLLIMPORT int idle_replication_slot_timeout_mins;
262 :
263 : /* shmem initialization functions */
264 : extern Size ReplicationSlotsShmemSize(void);
265 : extern void ReplicationSlotsShmemInit(void);
266 :
267 : /* management of individual slots */
268 : extern void ReplicationSlotCreate(const char *name, bool db_specific,
269 : ReplicationSlotPersistency persistency,
270 : bool two_phase, bool failover,
271 : bool synced);
272 : extern void ReplicationSlotPersist(void);
273 : extern void ReplicationSlotDrop(const char *name, bool nowait);
274 : extern void ReplicationSlotDropAcquired(void);
275 : extern void ReplicationSlotAlter(const char *name, const bool *failover,
276 : const bool *two_phase);
277 :
278 : extern void ReplicationSlotAcquire(const char *name, bool nowait,
279 : bool error_if_invalid);
280 : extern void ReplicationSlotRelease(void);
281 : extern void ReplicationSlotCleanup(bool synced_only);
282 : extern void ReplicationSlotSave(void);
283 : extern void ReplicationSlotMarkDirty(void);
284 :
285 : /* misc stuff */
286 : extern void ReplicationSlotInitialize(void);
287 : extern bool ReplicationSlotValidateName(const char *name, int elevel);
288 : extern void ReplicationSlotReserveWal(void);
289 : extern void ReplicationSlotsComputeRequiredXmin(bool already_locked);
290 : extern void ReplicationSlotsComputeRequiredLSN(void);
291 : extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void);
292 : extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
293 : extern void ReplicationSlotsDropDBSlots(Oid dboid);
294 : extern bool InvalidateObsoleteReplicationSlots(uint32 possible_causes,
295 : XLogSegNo oldestSegno,
296 : Oid dboid,
297 : TransactionId snapshotConflictHorizon);
298 : extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool need_lock);
299 : extern int ReplicationSlotIndex(ReplicationSlot *slot);
300 : extern bool ReplicationSlotName(int index, Name name);
301 : extern void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, Size szslot);
302 : extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok);
303 :
304 : extern void StartupReplicationSlots(void);
305 : extern void CheckPointReplicationSlots(bool is_shutdown);
306 :
307 : extern void CheckSlotRequirements(void);
308 : extern void CheckSlotPermissions(void);
309 : extern ReplicationSlotInvalidationCause
310 : GetSlotInvalidationCause(const char *invalidation_reason);
311 : extern const char *GetSlotInvalidationCauseName(ReplicationSlotInvalidationCause cause);
312 :
313 : extern bool SlotExistsInSyncStandbySlots(const char *slot_name);
314 : extern bool StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel);
315 : extern void WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn);
316 :
317 : #endif /* SLOT_H */
|