Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * slot.c
4 : * Replication slot management.
5 : *
6 : *
7 : * Copyright (c) 2012-2026, PostgreSQL Global Development Group
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/backend/replication/slot.c
12 : *
13 : * NOTES
14 : *
15 : * Replication slots are used to keep state about replication streams
16 : * originating from this cluster. Their primary purpose is to prevent the
17 : * premature removal of WAL or of old tuple versions in a manner that would
18 : * interfere with replication; they are also useful for monitoring purposes.
19 : * Slots need to be permanent (to allow restarts), crash-safe, and allocatable
20 : * on standbys (to support cascading setups). The requirement that slots be
21 : * usable on standbys precludes storing them in the system catalogs.
22 : *
23 : * Each replication slot gets its own directory inside the directory
24 : * $PGDATA / PG_REPLSLOT_DIR. Inside that directory the state file will
25 : * contain the slot's own data. Additional data can be stored alongside that
26 : * file if required. While the server is running, the state data is also
27 : * cached in memory for efficiency.
28 : *
29 : * ReplicationSlotAllocationLock must be taken in exclusive mode to allocate
30 : * or free a slot. ReplicationSlotControlLock must be taken in shared mode
31 : * to iterate over the slots, and in exclusive mode to change the in_use flag
32 : * of a slot. The remaining data in each slot is protected by its mutex.
33 : *
34 : *-------------------------------------------------------------------------
35 : */
36 :
37 : #include "postgres.h"
38 :
39 : #include <unistd.h>
40 : #include <sys/stat.h>
41 :
42 : #include "access/transam.h"
43 : #include "access/xlog_internal.h"
44 : #include "access/xlogrecovery.h"
45 : #include "common/file_utils.h"
46 : #include "common/string.h"
47 : #include "miscadmin.h"
48 : #include "pgstat.h"
49 : #include "postmaster/interrupt.h"
50 : #include "replication/logicallauncher.h"
51 : #include "replication/slotsync.h"
52 : #include "replication/slot.h"
53 : #include "replication/walsender_private.h"
54 : #include "storage/fd.h"
55 : #include "storage/ipc.h"
56 : #include "storage/proc.h"
57 : #include "storage/procarray.h"
58 : #include "storage/subsystems.h"
59 : #include "utils/builtins.h"
60 : #include "utils/guc_hooks.h"
61 : #include "utils/injection_point.h"
62 : #include "utils/varlena.h"
63 : #include "utils/wait_event.h"
64 :
65 : /*
66 : * Replication slot on-disk data structure.
67 : */
68 : typedef struct ReplicationSlotOnDisk
69 : {
70 : /* first part of this struct needs to be version independent */
71 :
72 : /* data not covered by checksum */
73 : uint32 magic;
74 : pg_crc32c checksum;
75 :
76 : /* data covered by checksum */
77 : uint32 version;
78 : uint32 length;
79 :
80 : /*
81 : * The actual data in the slot that follows can differ based on the above
82 : * 'version'.
83 : */
84 :
85 : ReplicationSlotPersistentData slotdata;
86 : } ReplicationSlotOnDisk;
87 :
88 : /*
89 : * Struct for the configuration of synchronized_standby_slots.
90 : *
91 : * Note: this must be a flat representation that can be held in a single chunk
92 : * of guc_malloc'd memory, so that it can be stored as the "extra" data for the
93 : * synchronized_standby_slots GUC.
94 : */
95 : typedef struct
96 : {
97 : /* Number of slot names in the slot_names[] */
98 : int nslotnames;
99 :
100 : /*
101 : * slot_names contains 'nslotnames' consecutive null-terminated C strings.
102 : */
103 : char slot_names[FLEXIBLE_ARRAY_MEMBER];
104 : } SyncStandbySlotsConfigData;
105 :
106 : /*
107 : * Lookup table for slot invalidation causes.
108 : */
109 : typedef struct SlotInvalidationCauseMap
110 : {
111 : ReplicationSlotInvalidationCause cause;
112 : const char *cause_name;
113 : } SlotInvalidationCauseMap;
114 :
115 : static const SlotInvalidationCauseMap SlotInvalidationCauses[] = {
116 : {RS_INVAL_NONE, "none"},
117 : {RS_INVAL_WAL_REMOVED, "wal_removed"},
118 : {RS_INVAL_HORIZON, "rows_removed"},
119 : {RS_INVAL_WAL_LEVEL, "wal_level_insufficient"},
120 : {RS_INVAL_IDLE_TIMEOUT, "idle_timeout"},
121 : };
122 :
123 : /*
124 : * Ensure that the lookup table is up-to-date with the enums defined in
125 : * ReplicationSlotInvalidationCause.
126 : */
127 : StaticAssertDecl(lengthof(SlotInvalidationCauses) == (RS_INVAL_MAX_CAUSES + 1),
128 : "array length mismatch");
129 :
130 : /* size of version independent data */
131 : #define ReplicationSlotOnDiskConstantSize \
132 : offsetof(ReplicationSlotOnDisk, slotdata)
133 : /* size of the part of the slot not covered by the checksum */
134 : #define ReplicationSlotOnDiskNotChecksummedSize \
135 : offsetof(ReplicationSlotOnDisk, version)
136 : /* size of the part covered by the checksum */
137 : #define ReplicationSlotOnDiskChecksummedSize \
138 : sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskNotChecksummedSize
139 : /* size of the slot data that is version dependent */
140 : #define ReplicationSlotOnDiskV2Size \
141 : sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
142 :
143 : #define SLOT_MAGIC 0x1051CA1 /* format identifier */
144 : #define SLOT_VERSION 5 /* version for new files */
145 :
146 : /* Control array for replication slot management */
147 : ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
148 :
149 : static void ReplicationSlotsShmemRequest(void *arg);
150 : static void ReplicationSlotsShmemInit(void *arg);
151 :
152 : const ShmemCallbacks ReplicationSlotsShmemCallbacks = {
153 : .request_fn = ReplicationSlotsShmemRequest,
154 : .init_fn = ReplicationSlotsShmemInit,
155 : };
156 :
157 : /* My backend's replication slot in the shared memory array */
158 : ReplicationSlot *MyReplicationSlot = NULL;
159 :
160 : /* GUC variables */
161 : int max_replication_slots = 10; /* the maximum number of replication
162 : * slots */
163 : int max_repack_replication_slots = 5; /* the maximum number of slots
164 : * for REPACK */
165 :
166 : /*
167 : * Invalidate replication slots that have remained idle longer than this
168 : * duration; '0' disables it.
169 : */
170 : int idle_replication_slot_timeout_secs = 0;
171 :
172 : /*
173 : * This GUC lists streaming replication standby server slot names that
174 : * logical WAL sender processes will wait for.
175 : */
176 : char *synchronized_standby_slots;
177 :
178 : /* This is the parsed and cached configuration for synchronized_standby_slots */
179 : static SyncStandbySlotsConfigData *synchronized_standby_slots_config;
180 :
181 : /*
182 : * Oldest LSN that has been confirmed to be flushed to the standbys
183 : * corresponding to the physical slots specified in the synchronized_standby_slots GUC.
184 : */
185 : static XLogRecPtr ss_oldest_flush_lsn = InvalidXLogRecPtr;
186 :
187 : static void ReplicationSlotShmemExit(int code, Datum arg);
188 : static bool IsSlotForConflictCheck(const char *name);
189 : static void ReplicationSlotDropPtr(ReplicationSlot *slot);
190 :
191 : /* internal persistency functions */
192 : static void RestoreSlotFromDisk(const char *name);
193 : static void CreateSlotOnDisk(ReplicationSlot *slot);
194 : static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel);
195 :
196 : /*
197 : * Register shared memory space needed for replication slots.
198 : */
199 : static void
200 1255 : ReplicationSlotsShmemRequest(void *arg)
201 : {
202 : Size size;
203 :
204 1255 : if (max_replication_slots + max_repack_replication_slots == 0)
205 0 : return;
206 :
207 1255 : size = offsetof(ReplicationSlotCtlData, replication_slots);
208 1255 : size = add_size(size,
209 1255 : mul_size(max_replication_slots + max_repack_replication_slots,
210 : sizeof(ReplicationSlot)));
211 1255 : ShmemRequestStruct(.name = "ReplicationSlot Ctl",
212 : .size = size,
213 : .ptr = (void **) &ReplicationSlotCtl,
214 : );
215 : }
216 :
217 : /*
218 : * Initialize shared memory for replication slots.
219 : */
220 : static void
221 1252 : ReplicationSlotsShmemInit(void *arg)
222 : {
223 19824 : for (int i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
224 : {
225 18572 : ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i];
226 :
227 : /* everything else is zeroed by the memset above */
228 18572 : slot->active_proc = INVALID_PROC_NUMBER;
229 18572 : SpinLockInit(&slot->mutex);
230 18572 : LWLockInitialize(&slot->io_in_progress_lock,
231 : LWTRANCHE_REPLICATION_SLOT_IO);
232 18572 : ConditionVariableInit(&slot->active_cv);
233 : }
234 1252 : }
235 :
236 : /*
237 : * Register the callback for replication slot cleanup and releasing.
238 : */
239 : void
240 24570 : ReplicationSlotInitialize(void)
241 : {
242 24570 : before_shmem_exit(ReplicationSlotShmemExit, 0);
243 24570 : }
244 :
245 : /*
246 : * Release and cleanup replication slots.
247 : */
248 : static void
249 24570 : ReplicationSlotShmemExit(int code, Datum arg)
250 : {
251 : /* Make sure active replication slots are released */
252 24570 : if (MyReplicationSlot != NULL)
253 305 : ReplicationSlotRelease();
254 :
255 : /* Also cleanup all the temporary slots. */
256 24570 : ReplicationSlotCleanup(false);
257 24570 : }
258 :
259 : /*
260 : * Check whether the passed slot name is valid and report errors at elevel.
261 : *
262 : * See comments for ReplicationSlotValidateNameInternal().
263 : */
264 : bool
265 879 : ReplicationSlotValidateName(const char *name, bool allow_reserved_name,
266 : int elevel)
267 : {
268 : int err_code;
269 879 : char *err_msg = NULL;
270 879 : char *err_hint = NULL;
271 :
272 879 : if (!ReplicationSlotValidateNameInternal(name, allow_reserved_name,
273 : &err_code, &err_msg, &err_hint))
274 : {
275 : /*
276 : * Use errmsg_internal() and errhint_internal() instead of errmsg()
277 : * and errhint(), since the messages from
278 : * ReplicationSlotValidateNameInternal() are already translated. This
279 : * avoids double translation.
280 : */
281 5 : ereport(elevel,
282 : errcode(err_code),
283 : errmsg_internal("%s", err_msg),
284 : (err_hint != NULL) ? errhint_internal("%s", err_hint) : 0);
285 :
286 0 : pfree(err_msg);
287 0 : if (err_hint != NULL)
288 0 : pfree(err_hint);
289 0 : return false;
290 : }
291 :
292 874 : return true;
293 : }
294 :
295 : /*
296 : * Check whether the passed slot name is valid.
297 : *
298 : * An error will be reported for a reserved replication slot name if
299 : * allow_reserved_name is set to false.
300 : *
301 : * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow
302 : * the name to be used as a directory name on every supported OS.
303 : *
304 : * Returns true if the slot name is valid. Otherwise, returns false and stores
305 : * the error code, error message, and optional hint in err_code, err_msg, and
306 : * err_hint, respectively. The caller is responsible for freeing err_msg and
307 : * err_hint, which are palloc'd.
308 : */
309 : bool
310 1097 : ReplicationSlotValidateNameInternal(const char *name, bool allow_reserved_name,
311 : int *err_code, char **err_msg, char **err_hint)
312 : {
313 : const char *cp;
314 :
315 1097 : if (strlen(name) == 0)
316 : {
317 4 : *err_code = ERRCODE_INVALID_NAME;
318 4 : *err_msg = psprintf(_("replication slot name \"%s\" is too short"), name);
319 4 : *err_hint = NULL;
320 4 : return false;
321 : }
322 :
323 1093 : if (strlen(name) >= NAMEDATALEN)
324 : {
325 0 : *err_code = ERRCODE_NAME_TOO_LONG;
326 0 : *err_msg = psprintf(_("replication slot name \"%s\" is too long"), name);
327 0 : *err_hint = NULL;
328 0 : return false;
329 : }
330 :
331 21091 : for (cp = name; *cp; cp++)
332 : {
333 20000 : if (!((*cp >= 'a' && *cp <= 'z')
334 9525 : || (*cp >= '0' && *cp <= '9')
335 1951 : || (*cp == '_')))
336 : {
337 2 : *err_code = ERRCODE_INVALID_NAME;
338 2 : *err_msg = psprintf(_("replication slot name \"%s\" contains invalid character"), name);
339 2 : *err_hint = psprintf(_("Replication slot names may only contain lower case letters, numbers, and the underscore character."));
340 2 : return false;
341 : }
342 : }
343 :
344 1091 : if (!allow_reserved_name && IsSlotForConflictCheck(name))
345 : {
346 1 : *err_code = ERRCODE_RESERVED_NAME;
347 1 : *err_msg = psprintf(_("replication slot name \"%s\" is reserved"), name);
348 1 : *err_hint = psprintf(_("The name \"%s\" is reserved for the conflict detection slot."),
349 : CONFLICT_DETECTION_SLOT);
350 1 : return false;
351 : }
352 :
353 1090 : return true;
354 : }
355 :
356 : /*
357 : * Return true if the replication slot name is "pg_conflict_detection".
358 : */
359 : static bool
360 2384 : IsSlotForConflictCheck(const char *name)
361 : {
362 2384 : return (strcmp(name, CONFLICT_DETECTION_SLOT) == 0);
363 : }
364 :
365 : /*
366 : * Create a new replication slot and mark it as used by this backend.
367 : *
368 : * name: Name of the slot
369 : * db_specific: logical decoding is db specific; if the slot is going to
370 : * be used for that pass true, otherwise false.
371 : * two_phase: If enabled, allows decoding of prepared transactions.
372 : * repack: If true, use a slot from the pool for REPACK.
373 : * failover: If enabled, allows the slot to be synced to standbys so
374 : * that logical replication can be resumed after failover.
375 : * synced: True if the slot is synchronized from the primary server.
376 : */
377 : void
378 731 : ReplicationSlotCreate(const char *name, bool db_specific,
379 : ReplicationSlotPersistency persistency,
380 : bool two_phase, bool repack, bool failover, bool synced)
381 : {
382 731 : ReplicationSlot *slot = NULL;
383 : int startpoint,
384 : endpoint;
385 :
386 : Assert(MyReplicationSlot == NULL);
387 :
388 : /*
389 : * The logical launcher or pg_upgrade may create or migrate an internal
390 : * slot, so using a reserved name is allowed in these cases.
391 : */
392 731 : ReplicationSlotValidateName(name, IsBinaryUpgrade || IsLogicalLauncher(),
393 731 : ERROR);
394 :
395 730 : if (failover)
396 : {
397 : /*
398 : * Do not allow users to create the failover enabled slots on the
399 : * standby as we do not support sync to the cascading standby.
400 : *
401 : * However, failover enabled slots can be created during slot
402 : * synchronization because we need to retain the same values as the
403 : * remote slot.
404 : */
405 31 : if (RecoveryInProgress() && !IsSyncingReplicationSlots())
406 0 : ereport(ERROR,
407 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
408 : errmsg("cannot enable failover for a replication slot created on the standby"));
409 :
410 : /*
411 : * Do not allow users to create failover enabled temporary slots,
412 : * because temporary slots will not be synced to the standby.
413 : *
414 : * However, failover enabled temporary slots can be created during
415 : * slot synchronization. See the comments atop slotsync.c for details.
416 : */
417 31 : if (persistency == RS_TEMPORARY && !IsSyncingReplicationSlots())
418 1 : ereport(ERROR,
419 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
420 : errmsg("cannot enable failover for a temporary replication slot"));
421 : }
422 :
423 : /*
424 : * If some other backend ran this code concurrently with us, we'd likely
425 : * both allocate the same slot, and that would be bad. We'd also be at
426 : * risk of missing a name collision. Also, we don't want to try to create
427 : * a new slot while somebody's busy cleaning up an old one, because we
428 : * might both be monkeying with the same directory.
429 : */
430 729 : LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
431 :
432 : /*
433 : * Check for name collision (across the whole array), and identify an
434 : * allocatable slot (in the array slice specific to our current use case:
435 : * either general, or REPACK only). We need to hold
436 : * ReplicationSlotControlLock in shared mode for this, so that nobody else
437 : * can change the in_use flags while we're looking at them.
438 : */
439 729 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
440 729 : startpoint = !repack ? 0 : max_replication_slots;
441 729 : endpoint = max_replication_slots + (repack ? max_repack_replication_slots : 0);
442 10713 : for (int i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
443 : {
444 9987 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
445 :
446 9987 : if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
447 3 : ereport(ERROR,
448 : (errcode(ERRCODE_DUPLICATE_OBJECT),
449 : errmsg("replication slot \"%s\" already exists", name)));
450 :
451 9984 : if (i >= startpoint && i < endpoint &&
452 6331 : !s->in_use && slot == NULL)
453 725 : slot = s;
454 : }
455 726 : LWLockRelease(ReplicationSlotControlLock);
456 :
457 : /* If all slots are in use, we're out of luck. */
458 726 : if (slot == NULL)
459 1 : ereport(ERROR,
460 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
461 : errmsg("all replication slots are in use"),
462 : errhint("Free one or increase \"%s\".",
463 : repack ? "max_repack_replication_slots" : "max_replication_slots")));
464 :
465 : /*
466 : * Since this slot is not in use, nobody should be looking at any part of
467 : * it other than the in_use field unless they're trying to allocate it.
468 : * And since we hold ReplicationSlotAllocationLock, nobody except us can
469 : * be doing that. So it's safe to initialize the slot.
470 : */
471 : Assert(!slot->in_use);
472 : Assert(slot->active_proc == INVALID_PROC_NUMBER);
473 :
474 : /* first initialize persistent data */
475 725 : memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData));
476 725 : namestrcpy(&slot->data.name, name);
477 725 : slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
478 725 : slot->data.persistency = persistency;
479 725 : slot->data.two_phase = two_phase;
480 725 : slot->data.two_phase_at = InvalidXLogRecPtr;
481 725 : slot->data.failover = failover;
482 725 : slot->data.synced = synced;
483 :
484 : /* and then data only present in shared memory */
485 725 : slot->just_dirtied = false;
486 725 : slot->dirty = false;
487 725 : slot->effective_xmin = InvalidTransactionId;
488 725 : slot->effective_catalog_xmin = InvalidTransactionId;
489 725 : slot->candidate_catalog_xmin = InvalidTransactionId;
490 725 : slot->candidate_xmin_lsn = InvalidXLogRecPtr;
491 725 : slot->candidate_restart_valid = InvalidXLogRecPtr;
492 725 : slot->candidate_restart_lsn = InvalidXLogRecPtr;
493 725 : slot->last_saved_confirmed_flush = InvalidXLogRecPtr;
494 725 : slot->last_saved_restart_lsn = InvalidXLogRecPtr;
495 725 : slot->inactive_since = 0;
496 725 : slot->slotsync_skip_reason = SS_SKIP_NONE;
497 :
498 : /*
499 : * Create the slot on disk. We haven't actually marked the slot allocated
500 : * yet, so no special cleanup is required if this errors out.
501 : */
502 725 : CreateSlotOnDisk(slot);
503 :
504 : /*
505 : * We need to briefly prevent any other backend from iterating over the
506 : * slots while we flip the in_use flag. We also need to set the active
507 : * flag while holding the ControlLock as otherwise a concurrent
508 : * ReplicationSlotAcquire() could acquire the slot as well.
509 : */
510 725 : LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
511 :
512 725 : slot->in_use = true;
513 :
514 : /* We can now mark the slot active, and that makes it our slot. */
515 725 : SpinLockAcquire(&slot->mutex);
516 : Assert(slot->active_proc == INVALID_PROC_NUMBER);
517 725 : slot->active_proc = MyProcNumber;
518 725 : SpinLockRelease(&slot->mutex);
519 725 : MyReplicationSlot = slot;
520 :
521 725 : LWLockRelease(ReplicationSlotControlLock);
522 :
523 : /*
524 : * Create statistics entry for the new logical slot. We don't collect any
525 : * stats for physical slots, so no need to create an entry for the same.
526 : * See ReplicationSlotDropPtr for why we need to do this before releasing
527 : * ReplicationSlotAllocationLock.
528 : */
529 725 : if (SlotIsLogical(slot))
530 521 : pgstat_create_replslot(slot);
531 :
532 : /*
533 : * Now that the slot has been marked as in_use and active, it's safe to
534 : * let somebody else try to allocate a slot.
535 : */
536 725 : LWLockRelease(ReplicationSlotAllocationLock);
537 :
538 : /* Let everybody know we've modified this slot */
539 725 : ConditionVariableBroadcast(&slot->active_cv);
540 725 : }
541 :
542 : /*
543 : * Search for the named replication slot.
544 : *
545 : * Return the replication slot if found, otherwise NULL.
546 : */
547 : ReplicationSlot *
548 2192 : SearchNamedReplicationSlot(const char *name, bool need_lock)
549 : {
550 : int i;
551 2192 : ReplicationSlot *slot = NULL;
552 :
553 2192 : if (need_lock)
554 672 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
555 :
556 11412 : for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
557 : {
558 10863 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
559 :
560 10863 : if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
561 : {
562 1643 : slot = s;
563 1643 : break;
564 : }
565 : }
566 :
567 2192 : if (need_lock)
568 672 : LWLockRelease(ReplicationSlotControlLock);
569 :
570 2192 : return slot;
571 : }
572 :
573 : /*
574 : * Return the index of the replication slot in
575 : * ReplicationSlotCtl->replication_slots.
576 : *
577 : * This is mainly useful to have an efficient key for storing replication slot
578 : * stats.
579 : */
580 : int
581 8525 : ReplicationSlotIndex(ReplicationSlot *slot)
582 : {
583 : Assert(slot >= ReplicationSlotCtl->replication_slots &&
584 : slot < ReplicationSlotCtl->replication_slots +
585 : (max_replication_slots + max_repack_replication_slots));
586 :
587 8525 : return slot - ReplicationSlotCtl->replication_slots;
588 : }
589 :
590 : /*
591 : * If the slot at 'index' is unused, return false. Otherwise 'name' is set to
592 : * the slot's name and true is returned.
593 : *
594 : * This likely is only useful for pgstat_replslot.c during shutdown, in other
595 : * cases there are obvious TOCTOU issues.
596 : */
597 : bool
598 114 : ReplicationSlotName(int index, Name name)
599 : {
600 : ReplicationSlot *slot;
601 : bool found;
602 :
603 114 : slot = &ReplicationSlotCtl->replication_slots[index];
604 :
605 : /*
606 : * Ensure that the slot cannot be dropped while we copy the name. Don't
607 : * need the spinlock as the name of an existing slot cannot change.
608 : */
609 114 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
610 114 : found = slot->in_use;
611 114 : if (slot->in_use)
612 114 : namestrcpy(name, NameStr(slot->data.name));
613 114 : LWLockRelease(ReplicationSlotControlLock);
614 :
615 114 : return found;
616 : }
617 :
618 : /*
619 : * Find a previously created slot and mark it as used by this process.
620 : *
621 : * An error is raised if nowait is true and the slot is currently in use. If
622 : * nowait is false, we sleep until the slot is released by the owning process.
623 : *
624 : * An error is raised if error_if_invalid is true and the slot is found to
625 : * be invalid. It should always be set to true, except when we are temporarily
626 : * acquiring the slot and don't intend to change it.
627 : */
628 : void
629 1433 : ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
630 : {
631 : ReplicationSlot *s;
632 : ProcNumber active_proc;
633 : int active_pid;
634 :
635 : Assert(name != NULL);
636 :
637 1433 : retry:
638 : Assert(MyReplicationSlot == NULL);
639 :
640 1433 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
641 :
642 : /* Check if the slot exists with the given name. */
643 1433 : s = SearchNamedReplicationSlot(name, false);
644 1433 : if (s == NULL || !s->in_use)
645 : {
646 10 : LWLockRelease(ReplicationSlotControlLock);
647 :
648 10 : ereport(ERROR,
649 : (errcode(ERRCODE_UNDEFINED_OBJECT),
650 : errmsg("replication slot \"%s\" does not exist",
651 : name)));
652 : }
653 :
654 : /*
655 : * Do not allow users to acquire the reserved slot. This scenario may
656 : * occur if the launcher that owns the slot has terminated unexpectedly
657 : * due to an error, and a backend process attempts to reuse the slot.
658 : */
659 1423 : if (!IsLogicalLauncher() && IsSlotForConflictCheck(name))
660 0 : ereport(ERROR,
661 : errcode(ERRCODE_UNDEFINED_OBJECT),
662 : errmsg("cannot acquire replication slot \"%s\"", name),
663 : errdetail("The slot is reserved for conflict detection and can only be acquired by logical replication launcher."));
664 :
665 : /*
666 : * This is the slot we want; check if it's active under some other
667 : * process. In single user mode, we don't need this check.
668 : */
669 1423 : if (IsUnderPostmaster)
670 : {
671 : /*
672 : * Get ready to sleep on the slot in case it is active. (We may end
673 : * up not sleeping, but we don't want to do this while holding the
674 : * spinlock.)
675 : */
676 1418 : if (!nowait)
677 297 : ConditionVariablePrepareToSleep(&s->active_cv);
678 :
679 : /*
680 : * It is important to reset the inactive_since under spinlock here to
681 : * avoid race conditions with slot invalidation. See comments related
682 : * to inactive_since in InvalidatePossiblyObsoleteSlot.
683 : */
684 1418 : SpinLockAcquire(&s->mutex);
685 1418 : if (s->active_proc == INVALID_PROC_NUMBER)
686 1261 : s->active_proc = MyProcNumber;
687 1418 : active_proc = s->active_proc;
688 1418 : ReplicationSlotSetInactiveSince(s, 0, false);
689 1418 : SpinLockRelease(&s->mutex);
690 : }
691 : else
692 : {
693 5 : s->active_proc = active_proc = MyProcNumber;
694 5 : ReplicationSlotSetInactiveSince(s, 0, true);
695 : }
696 1423 : active_pid = GetPGProcByNumber(active_proc)->pid;
697 1423 : LWLockRelease(ReplicationSlotControlLock);
698 :
699 : /*
700 : * If we found the slot but it's already active in another process, we
701 : * wait until the owning process signals us that it's been released, or
702 : * error out.
703 : */
704 1423 : if (active_proc != MyProcNumber)
705 : {
706 0 : if (!nowait)
707 : {
708 : /* Wait here until we get signaled, and then restart */
709 0 : ConditionVariableSleep(&s->active_cv,
710 : WAIT_EVENT_REPLICATION_SLOT_DROP);
711 0 : ConditionVariableCancelSleep();
712 0 : goto retry;
713 : }
714 :
715 0 : ereport(ERROR,
716 : (errcode(ERRCODE_OBJECT_IN_USE),
717 : errmsg("replication slot \"%s\" is active for PID %d",
718 : NameStr(s->data.name), active_pid)));
719 : }
720 1423 : else if (!nowait)
721 297 : ConditionVariableCancelSleep(); /* no sleep needed after all */
722 :
723 : /* We made this slot active, so it's ours now. */
724 1423 : MyReplicationSlot = s;
725 :
726 : /*
727 : * We need to check for invalidation after making the slot ours to avoid
728 : * the possible race condition with the checkpointer that can otherwise
729 : * invalidate the slot immediately after the check.
730 : */
731 1423 : if (error_if_invalid && s->data.invalidated != RS_INVAL_NONE)
732 8 : ereport(ERROR,
733 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
734 : errmsg("can no longer access replication slot \"%s\"",
735 : NameStr(s->data.name)),
736 : errdetail("This replication slot has been invalidated due to \"%s\".",
737 : GetSlotInvalidationCauseName(s->data.invalidated)));
738 :
739 : /* Let everybody know we've modified this slot */
740 1415 : ConditionVariableBroadcast(&s->active_cv);
741 :
742 : /*
743 : * The call to pgstat_acquire_replslot() protects against stats for a
744 : * different slot, from before a restart or such, being present during
745 : * pgstat_report_replslot().
746 : */
747 1415 : if (SlotIsLogical(s))
748 1182 : pgstat_acquire_replslot(s);
749 :
750 :
751 1415 : if (am_walsender)
752 : {
753 970 : ereport(log_replication_commands ? LOG : DEBUG1,
754 : SlotIsLogical(s)
755 : ? errmsg("acquired logical replication slot \"%s\"",
756 : NameStr(s->data.name))
757 : : errmsg("acquired physical replication slot \"%s\"",
758 : NameStr(s->data.name)));
759 : }
760 1415 : }
761 :
762 : /*
763 : * Release the replication slot that this backend considers to own.
764 : *
765 : * This or another backend can re-acquire the slot later.
766 : * Resources this slot requires will be preserved.
767 : */
768 : void
769 1718 : ReplicationSlotRelease(void)
770 : {
771 1718 : ReplicationSlot *slot = MyReplicationSlot;
772 1718 : char *slotname = NULL; /* keep compiler quiet */
773 : bool is_logical;
774 1718 : TimestampTz now = 0;
775 :
776 : Assert(slot != NULL && slot->active_proc != INVALID_PROC_NUMBER);
777 :
778 1718 : is_logical = SlotIsLogical(slot);
779 :
780 1718 : if (am_walsender)
781 1201 : slotname = pstrdup(NameStr(slot->data.name));
782 :
783 1718 : if (slot->data.persistency == RS_EPHEMERAL)
784 : {
785 : /*
786 : * If slot is ephemeral, we drop it upon release, and request logical
787 : * decoding be disabled.
788 : */
789 8 : ReplicationSlotDropAcquired(is_logical);
790 : }
791 : else
792 : {
793 : /*
794 : * If slot needed to temporarily restrain both data and catalog xmin
795 : * to create the catalog snapshot, remove that temporary constraint.
796 : * Snapshots can only be exported while the initial snapshot is still
797 : * acquired.
798 : */
799 1710 : if (!TransactionIdIsValid(slot->data.xmin) &&
800 1675 : TransactionIdIsValid(slot->effective_xmin))
801 : {
802 215 : SpinLockAcquire(&slot->mutex);
803 215 : slot->effective_xmin = InvalidTransactionId;
804 215 : SpinLockRelease(&slot->mutex);
805 215 : ReplicationSlotsComputeRequiredXmin(false);
806 : }
807 :
808 : /*
809 : * Set the time since the slot has become inactive. We get the current
810 : * time beforehand to avoid system call while holding the spinlock.
811 : */
812 1710 : now = GetCurrentTimestamp();
813 :
814 1710 : if (slot->data.persistency == RS_PERSISTENT)
815 : {
816 : /*
817 : * Mark persistent slot inactive. We're not freeing it, just
818 : * disconnecting, but wake up others that may be waiting for it.
819 : */
820 1393 : SpinLockAcquire(&slot->mutex);
821 1393 : slot->active_proc = INVALID_PROC_NUMBER;
822 1393 : ReplicationSlotSetInactiveSince(slot, now, false);
823 1393 : SpinLockRelease(&slot->mutex);
824 1393 : ConditionVariableBroadcast(&slot->active_cv);
825 : }
826 : else
827 317 : ReplicationSlotSetInactiveSince(slot, now, true);
828 :
829 1710 : MyReplicationSlot = NULL;
830 : }
831 :
832 : /* might not have been set when we've been a plain slot */
833 1718 : LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
834 1718 : MyProc->statusFlags &= ~PROC_IN_LOGICAL_DECODING;
835 1718 : ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags;
836 1718 : LWLockRelease(ProcArrayLock);
837 :
838 1718 : if (am_walsender)
839 : {
840 1201 : ereport(log_replication_commands ? LOG : DEBUG1,
841 : is_logical
842 : ? errmsg("released logical replication slot \"%s\"",
843 : slotname)
844 : : errmsg("released physical replication slot \"%s\"",
845 : slotname));
846 :
847 1201 : pfree(slotname);
848 : }
849 1718 : }
850 :
851 : /*
852 : * Cleanup temporary slots created in current session.
853 : *
854 : * Cleanup only synced temporary slots if 'synced_only' is true, else
855 : * cleanup all temporary slots.
856 : *
857 : * If it drops the last logical slot in the cluster, requests to disable
858 : * logical decoding.
859 : */
860 : void
861 55915 : ReplicationSlotCleanup(bool synced_only)
862 : {
863 : int i;
864 : bool found_valid_logicalslot;
865 55915 : bool dropped_logical = false;
866 :
867 : Assert(MyReplicationSlot == NULL);
868 :
869 56075 : restart:
870 56075 : found_valid_logicalslot = false;
871 56075 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
872 888879 : for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
873 : {
874 832964 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
875 :
876 832964 : if (!s->in_use)
877 817513 : continue;
878 :
879 15451 : SpinLockAcquire(&s->mutex);
880 :
881 30902 : found_valid_logicalslot |=
882 15451 : (SlotIsLogical(s) && s->data.invalidated == RS_INVAL_NONE);
883 :
884 15451 : if ((s->active_proc == MyProcNumber &&
885 160 : (!synced_only || s->data.synced)))
886 : {
887 : Assert(s->data.persistency == RS_TEMPORARY);
888 160 : SpinLockRelease(&s->mutex);
889 160 : LWLockRelease(ReplicationSlotControlLock); /* avoid deadlock */
890 :
891 160 : if (SlotIsLogical(s))
892 10 : dropped_logical = true;
893 :
894 160 : ReplicationSlotDropPtr(s);
895 :
896 160 : ConditionVariableBroadcast(&s->active_cv);
897 160 : goto restart;
898 : }
899 : else
900 15291 : SpinLockRelease(&s->mutex);
901 : }
902 :
903 55915 : LWLockRelease(ReplicationSlotControlLock);
904 :
905 55915 : if (dropped_logical && !found_valid_logicalslot)
906 3 : RequestDisableLogicalDecoding();
907 55915 : }
908 :
909 : /*
910 : * Permanently drop the replication slot identified by the passed-in name.
911 : *
912 : * If this is a logical slot, request that logical decoding be disabled.
913 : */
914 : void
915 444 : ReplicationSlotDrop(const char *name, bool nowait)
916 : {
917 444 : ReplicationSlotAcquire(name, nowait, false);
918 :
919 : /*
920 : * Do not allow users to drop the slots which are currently being synced
921 : * from the primary to the standby.
922 : */
923 437 : if (RecoveryInProgress() && MyReplicationSlot->data.synced)
924 1 : ereport(ERROR,
925 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
926 : errmsg("cannot drop replication slot \"%s\"", name),
927 : errdetail("This replication slot is being synchronized from the primary server."));
928 :
929 436 : ReplicationSlotDropAcquired(SlotIsLogical(MyReplicationSlot));
930 436 : }
931 :
932 : /*
933 : * Change the definition of the slot identified by the specified name.
934 : *
935 : * Altering the two_phase property of a slot requires caution on the
936 : * client-side. Enabling it at any random point during decoding has the
937 : * risk that transactions prepared before this change may be skipped by
938 : * the decoder, leading to missing prepare records on the client. So, we
939 : * enable it for subscription related slots only once the initial tablesync
940 : * is finished. See comments atop worker.c. Disabling it is safe only when
941 : * there are no pending prepared transaction, otherwise, the changes of
942 : * already prepared transactions can be replicated again along with their
943 : * corresponding commit leading to duplicate data or errors.
944 : */
945 : void
946 7 : ReplicationSlotAlter(const char *name, const bool *failover,
947 : const bool *two_phase)
948 : {
949 7 : bool update_slot = false;
950 :
951 : Assert(MyReplicationSlot == NULL);
952 : Assert(failover || two_phase);
953 :
954 7 : ReplicationSlotAcquire(name, false, true);
955 :
956 6 : if (SlotIsPhysical(MyReplicationSlot))
957 0 : ereport(ERROR,
958 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
959 : errmsg("cannot use %s with a physical replication slot",
960 : "ALTER_REPLICATION_SLOT"));
961 :
962 6 : if (RecoveryInProgress())
963 : {
964 : /*
965 : * Do not allow users to alter the slots which are currently being
966 : * synced from the primary to the standby.
967 : */
968 1 : if (MyReplicationSlot->data.synced)
969 1 : ereport(ERROR,
970 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
971 : errmsg("cannot alter replication slot \"%s\"", name),
972 : errdetail("This replication slot is being synchronized from the primary server."));
973 :
974 : /*
975 : * Do not allow users to enable failover on the standby as we do not
976 : * support sync to the cascading standby.
977 : */
978 0 : if (failover && *failover)
979 0 : ereport(ERROR,
980 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
981 : errmsg("cannot enable failover for a replication slot"
982 : " on the standby"));
983 : }
984 :
985 5 : if (failover)
986 : {
987 : /*
988 : * Do not allow users to enable failover for temporary slots as we do
989 : * not support syncing temporary slots to the standby.
990 : */
991 4 : if (*failover && MyReplicationSlot->data.persistency == RS_TEMPORARY)
992 0 : ereport(ERROR,
993 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
994 : errmsg("cannot enable failover for a temporary replication slot"));
995 :
996 4 : if (MyReplicationSlot->data.failover != *failover)
997 : {
998 4 : SpinLockAcquire(&MyReplicationSlot->mutex);
999 4 : MyReplicationSlot->data.failover = *failover;
1000 4 : SpinLockRelease(&MyReplicationSlot->mutex);
1001 :
1002 4 : update_slot = true;
1003 : }
1004 : }
1005 :
1006 5 : if (two_phase && MyReplicationSlot->data.two_phase != *two_phase)
1007 : {
1008 1 : SpinLockAcquire(&MyReplicationSlot->mutex);
1009 1 : MyReplicationSlot->data.two_phase = *two_phase;
1010 1 : SpinLockRelease(&MyReplicationSlot->mutex);
1011 :
1012 1 : update_slot = true;
1013 : }
1014 :
1015 5 : if (update_slot)
1016 : {
1017 5 : ReplicationSlotMarkDirty();
1018 5 : ReplicationSlotSave();
1019 : }
1020 :
1021 5 : ReplicationSlotRelease();
1022 5 : }
1023 :
1024 : /*
1025 : * Permanently drop the currently acquired replication slot.
1026 : *
1027 : * If caller requests it, have checkpointer attempt to disable logical
1028 : * decoding. Obviously, this should only be done if the slot is logical.
1029 : */
1030 : void
1031 459 : ReplicationSlotDropAcquired(bool try_disable)
1032 : {
1033 : ReplicationSlot *slot;
1034 :
1035 : Assert(MyReplicationSlot != NULL);
1036 459 : slot = MyReplicationSlot;
1037 :
1038 : /* Can only disable logical decoding if slot is logical */
1039 : Assert(!try_disable || SlotIsLogical(slot));
1040 :
1041 : /* slot isn't acquired anymore */
1042 459 : MyReplicationSlot = NULL;
1043 :
1044 459 : ReplicationSlotDropPtr(slot);
1045 :
1046 459 : if (try_disable)
1047 431 : RequestDisableLogicalDecoding();
1048 459 : }
1049 :
1050 : /*
1051 : * Permanently drop the replication slot which will be released by the point
1052 : * this function returns.
1053 : */
1054 : static void
1055 619 : ReplicationSlotDropPtr(ReplicationSlot *slot)
1056 : {
1057 : char path[MAXPGPATH];
1058 : char tmppath[MAXPGPATH];
1059 :
1060 : /*
1061 : * If some other backend ran this code concurrently with us, we might try
1062 : * to delete a slot with a certain name while someone else was trying to
1063 : * create a slot with the same name.
1064 : */
1065 619 : LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
1066 :
1067 : /* Generate pathnames. */
1068 619 : sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(slot->data.name));
1069 619 : sprintf(tmppath, "%s/%s.tmp", PG_REPLSLOT_DIR, NameStr(slot->data.name));
1070 :
1071 : /*
1072 : * Rename the slot directory on disk, so that we'll no longer recognize
1073 : * this as a valid slot. Note that if this fails, we've got to mark the
1074 : * slot inactive before bailing out. If we're dropping an ephemeral or a
1075 : * temporary slot, we better never fail hard as the caller won't expect
1076 : * the slot to survive and this might get called during error handling.
1077 : */
1078 619 : if (rename(path, tmppath) == 0)
1079 : {
1080 : /*
1081 : * We need to fsync() the directory we just renamed and its parent to
1082 : * make sure that our changes are on disk in a crash-safe fashion. If
1083 : * fsync() fails, we can't be sure whether the changes are on disk or
1084 : * not. For now, we handle that by panicking;
1085 : * StartupReplicationSlots() will try to straighten it out after
1086 : * restart.
1087 : */
1088 619 : START_CRIT_SECTION();
1089 619 : fsync_fname(tmppath, true);
1090 619 : fsync_fname(PG_REPLSLOT_DIR, true);
1091 619 : END_CRIT_SECTION();
1092 : }
1093 : else
1094 : {
1095 0 : bool fail_softly = slot->data.persistency != RS_PERSISTENT;
1096 :
1097 0 : SpinLockAcquire(&slot->mutex);
1098 0 : slot->active_proc = INVALID_PROC_NUMBER;
1099 0 : SpinLockRelease(&slot->mutex);
1100 :
1101 : /* wake up anyone waiting on this slot */
1102 0 : ConditionVariableBroadcast(&slot->active_cv);
1103 :
1104 0 : ereport(fail_softly ? WARNING : ERROR,
1105 : (errcode_for_file_access(),
1106 : errmsg("could not rename file \"%s\" to \"%s\": %m",
1107 : path, tmppath)));
1108 : }
1109 :
1110 : /*
1111 : * The slot is definitely gone. Lock out concurrent scans of the array
1112 : * long enough to kill it. It's OK to clear the active PID here without
1113 : * grabbing the mutex because nobody else can be scanning the array here,
1114 : * and nobody can be attached to this slot and thus access it without
1115 : * scanning the array.
1116 : *
1117 : * Also wake up processes waiting for it.
1118 : */
1119 619 : LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
1120 619 : slot->active_proc = INVALID_PROC_NUMBER;
1121 619 : slot->in_use = false;
1122 619 : LWLockRelease(ReplicationSlotControlLock);
1123 619 : ConditionVariableBroadcast(&slot->active_cv);
1124 :
1125 : /*
1126 : * Slot is dead and doesn't prevent resource removal anymore, recompute
1127 : * limits.
1128 : */
1129 619 : ReplicationSlotsComputeRequiredXmin(false);
1130 619 : ReplicationSlotsComputeRequiredLSN();
1131 :
1132 : /*
1133 : * If removing the directory fails, the worst thing that will happen is
1134 : * that the user won't be able to create a new slot with the same name
1135 : * until the next server restart. We warn about it, but that's all.
1136 : */
1137 619 : if (!rmtree(tmppath, true))
1138 0 : ereport(WARNING,
1139 : (errmsg("could not remove directory \"%s\"", tmppath)));
1140 :
1141 : /*
1142 : * Drop the statistics entry for the replication slot. Do this while
1143 : * holding ReplicationSlotAllocationLock so that we don't drop a
1144 : * statistics entry for another slot with the same name just created in
1145 : * another session.
1146 : */
1147 619 : if (SlotIsLogical(slot))
1148 448 : pgstat_drop_replslot(slot);
1149 :
1150 : /*
1151 : * We release this at the very end, so that nobody starts trying to create
1152 : * a slot while we're still cleaning up the detritus of the old one.
1153 : */
1154 619 : LWLockRelease(ReplicationSlotAllocationLock);
1155 619 : }
1156 :
1157 : /*
1158 : * Serialize the currently acquired slot's state from memory to disk, thereby
1159 : * guaranteeing the current state will survive a crash.
1160 : */
1161 : void
1162 1520 : ReplicationSlotSave(void)
1163 : {
1164 : char path[MAXPGPATH];
1165 :
1166 : Assert(MyReplicationSlot != NULL);
1167 :
1168 1520 : sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(MyReplicationSlot->data.name));
1169 1520 : SaveSlotToPath(MyReplicationSlot, path, ERROR);
1170 1520 : }
1171 :
1172 : /*
1173 : * Signal that it would be useful if the currently acquired slot would be
1174 : * flushed out to disk.
1175 : *
1176 : * Note that the actual flush to disk can be delayed for a long time, if
1177 : * required for correctness explicitly do a ReplicationSlotSave().
1178 : */
1179 : void
1180 43163 : ReplicationSlotMarkDirty(void)
1181 : {
1182 43163 : ReplicationSlot *slot = MyReplicationSlot;
1183 :
1184 : Assert(MyReplicationSlot != NULL);
1185 :
1186 43163 : SpinLockAcquire(&slot->mutex);
1187 43163 : MyReplicationSlot->just_dirtied = true;
1188 43163 : MyReplicationSlot->dirty = true;
1189 43163 : SpinLockRelease(&slot->mutex);
1190 43163 : }
1191 :
1192 : /*
1193 : * Convert a slot that's marked as RS_EPHEMERAL or RS_TEMPORARY to a
1194 : * RS_PERSISTENT slot, guaranteeing it will be there after an eventual crash.
1195 : */
1196 : void
1197 496 : ReplicationSlotPersist(void)
1198 : {
1199 496 : ReplicationSlot *slot = MyReplicationSlot;
1200 :
1201 : Assert(slot != NULL);
1202 : Assert(slot->data.persistency != RS_PERSISTENT);
1203 :
1204 496 : SpinLockAcquire(&slot->mutex);
1205 496 : slot->data.persistency = RS_PERSISTENT;
1206 496 : SpinLockRelease(&slot->mutex);
1207 :
1208 496 : ReplicationSlotMarkDirty();
1209 496 : ReplicationSlotSave();
1210 496 : }
1211 :
1212 : /*
1213 : * Compute the oldest xmin across all slots and store it in the ProcArray.
1214 : *
1215 : * If already_locked is true, both the ReplicationSlotControlLock and the
1216 : * ProcArrayLock have already been acquired exclusively. It is crucial that the
1217 : * caller first acquires the ReplicationSlotControlLock, followed by the
1218 : * ProcArrayLock, to prevent any undetectable deadlocks since this function
1219 : * acquires them in that order.
1220 : */
1221 : void
1222 2664 : ReplicationSlotsComputeRequiredXmin(bool already_locked)
1223 : {
1224 : int i;
1225 2664 : TransactionId agg_xmin = InvalidTransactionId;
1226 2664 : TransactionId agg_catalog_xmin = InvalidTransactionId;
1227 :
1228 : Assert(ReplicationSlotCtl != NULL);
1229 : Assert(!already_locked ||
1230 : (LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_EXCLUSIVE) &&
1231 : LWLockHeldByMeInMode(ProcArrayLock, LW_EXCLUSIVE)));
1232 :
1233 : /*
1234 : * Hold the ReplicationSlotControlLock until after updating the slot xmin
1235 : * values, so no backend updates the initial xmin for newly created slot
1236 : * concurrently. A shared lock is used here to minimize lock contention,
1237 : * especially when many slots exist and advancements occur frequently.
1238 : * This is safe since an exclusive lock is taken during initial slot xmin
1239 : * update in slot creation.
1240 : *
1241 : * One might think that we can hold the ProcArrayLock exclusively and
1242 : * update the slot xmin values, but it could increase lock contention on
1243 : * the ProcArrayLock, which is not great since this function can be called
1244 : * at non-negligible frequency.
1245 : *
1246 : * Concurrent invocation of this function may cause the computed slot xmin
1247 : * to regress. However, this is harmless because tuples prior to the most
1248 : * recent xmin are no longer useful once advancement occurs (see
1249 : * LogicalConfirmReceivedLocation where the slot's xmin value is flushed
1250 : * before updating the effective_xmin). Thus, such regression merely
1251 : * prevents VACUUM from prematurely removing tuples without causing the
1252 : * early deletion of required data.
1253 : */
1254 2664 : if (!already_locked)
1255 2143 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1256 :
1257 40487 : for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
1258 : {
1259 37823 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
1260 : TransactionId effective_xmin;
1261 : TransactionId effective_catalog_xmin;
1262 : bool invalidated;
1263 :
1264 37823 : if (!s->in_use)
1265 35271 : continue;
1266 :
1267 2552 : SpinLockAcquire(&s->mutex);
1268 2552 : effective_xmin = s->effective_xmin;
1269 2552 : effective_catalog_xmin = s->effective_catalog_xmin;
1270 2552 : invalidated = s->data.invalidated != RS_INVAL_NONE;
1271 2552 : SpinLockRelease(&s->mutex);
1272 :
1273 : /* invalidated slots need not apply */
1274 2552 : if (invalidated)
1275 26 : continue;
1276 :
1277 : /* check the data xmin */
1278 2526 : if (TransactionIdIsValid(effective_xmin) &&
1279 7 : (!TransactionIdIsValid(agg_xmin) ||
1280 7 : TransactionIdPrecedes(effective_xmin, agg_xmin)))
1281 355 : agg_xmin = effective_xmin;
1282 :
1283 : /* check the catalog xmin */
1284 2526 : if (TransactionIdIsValid(effective_catalog_xmin) &&
1285 1058 : (!TransactionIdIsValid(agg_catalog_xmin) ||
1286 1058 : TransactionIdPrecedes(effective_catalog_xmin, agg_catalog_xmin)))
1287 1312 : agg_catalog_xmin = effective_catalog_xmin;
1288 : }
1289 :
1290 2664 : ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
1291 :
1292 2664 : if (!already_locked)
1293 2143 : LWLockRelease(ReplicationSlotControlLock);
1294 2664 : }
1295 :
1296 : /*
1297 : * Compute the oldest restart LSN across all slots and inform xlog module.
1298 : *
1299 : * Note: while max_slot_wal_keep_size is theoretically relevant for this
1300 : * purpose, we don't try to account for that, because this module doesn't
1301 : * know what to compare against.
1302 : */
1303 : void
1304 44218 : ReplicationSlotsComputeRequiredLSN(void)
1305 : {
1306 : int i;
1307 44218 : XLogRecPtr min_required = InvalidXLogRecPtr;
1308 :
1309 : Assert(ReplicationSlotCtl != NULL);
1310 :
1311 44218 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1312 703221 : for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
1313 : {
1314 659003 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
1315 : XLogRecPtr restart_lsn;
1316 : XLogRecPtr last_saved_restart_lsn;
1317 : bool invalidated;
1318 : ReplicationSlotPersistency persistency;
1319 :
1320 659003 : if (!s->in_use)
1321 614231 : continue;
1322 :
1323 44772 : SpinLockAcquire(&s->mutex);
1324 44772 : persistency = s->data.persistency;
1325 44772 : restart_lsn = s->data.restart_lsn;
1326 44772 : invalidated = s->data.invalidated != RS_INVAL_NONE;
1327 44772 : last_saved_restart_lsn = s->last_saved_restart_lsn;
1328 44772 : SpinLockRelease(&s->mutex);
1329 :
1330 : /* invalidated slots need not apply */
1331 44772 : if (invalidated)
1332 27 : continue;
1333 :
1334 : /*
1335 : * For persistent slot use last_saved_restart_lsn to compute the
1336 : * oldest LSN for removal of WAL segments. The segments between
1337 : * last_saved_restart_lsn and restart_lsn might be needed by a
1338 : * persistent slot in the case of database crash. Non-persistent
1339 : * slots can't survive the database crash, so we don't care about
1340 : * last_saved_restart_lsn for them.
1341 : */
1342 44745 : if (persistency == RS_PERSISTENT)
1343 : {
1344 43745 : if (XLogRecPtrIsValid(last_saved_restart_lsn) &&
1345 : restart_lsn > last_saved_restart_lsn)
1346 : {
1347 40620 : restart_lsn = last_saved_restart_lsn;
1348 : }
1349 : }
1350 :
1351 44745 : if (XLogRecPtrIsValid(restart_lsn) &&
1352 1718 : (!XLogRecPtrIsValid(min_required) ||
1353 : restart_lsn < min_required))
1354 43212 : min_required = restart_lsn;
1355 : }
1356 44218 : LWLockRelease(ReplicationSlotControlLock);
1357 :
1358 44218 : XLogSetReplicationSlotMinimumLSN(min_required);
1359 44218 : }
1360 :
1361 : /*
1362 : * Compute the oldest WAL LSN required by *logical* decoding slots..
1363 : *
1364 : * Returns InvalidXLogRecPtr if logical decoding is disabled or no logical
1365 : * slots exist.
1366 : *
1367 : * NB: this returns a value >= ReplicationSlotsComputeRequiredLSN(), since it
1368 : * ignores physical replication slots.
1369 : *
1370 : * The results aren't required frequently, so we don't maintain a precomputed
1371 : * value like we do for ComputeRequiredLSN() and ComputeRequiredXmin().
1372 : */
1373 : XLogRecPtr
1374 3896 : ReplicationSlotsComputeLogicalRestartLSN(void)
1375 : {
1376 3896 : XLogRecPtr result = InvalidXLogRecPtr;
1377 : int i;
1378 :
1379 3896 : if (max_replication_slots + max_repack_replication_slots <= 0)
1380 0 : return InvalidXLogRecPtr;
1381 :
1382 3896 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1383 :
1384 61774 : for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
1385 : {
1386 : ReplicationSlot *s;
1387 : XLogRecPtr restart_lsn;
1388 : XLogRecPtr last_saved_restart_lsn;
1389 : bool invalidated;
1390 : ReplicationSlotPersistency persistency;
1391 :
1392 57878 : s = &ReplicationSlotCtl->replication_slots[i];
1393 :
1394 : /* cannot change while ReplicationSlotCtlLock is held */
1395 57878 : if (!s->in_use)
1396 57052 : continue;
1397 :
1398 : /* we're only interested in logical slots */
1399 826 : if (!SlotIsLogical(s))
1400 564 : continue;
1401 :
1402 : /* read once, it's ok if it increases while we're checking */
1403 262 : SpinLockAcquire(&s->mutex);
1404 262 : persistency = s->data.persistency;
1405 262 : restart_lsn = s->data.restart_lsn;
1406 262 : invalidated = s->data.invalidated != RS_INVAL_NONE;
1407 262 : last_saved_restart_lsn = s->last_saved_restart_lsn;
1408 262 : SpinLockRelease(&s->mutex);
1409 :
1410 : /* invalidated slots need not apply */
1411 262 : if (invalidated)
1412 10 : continue;
1413 :
1414 : /*
1415 : * For persistent slot use last_saved_restart_lsn to compute the
1416 : * oldest LSN for removal of WAL segments. The segments between
1417 : * last_saved_restart_lsn and restart_lsn might be needed by a
1418 : * persistent slot in the case of database crash. Non-persistent
1419 : * slots can't survive the database crash, so we don't care about
1420 : * last_saved_restart_lsn for them.
1421 : */
1422 252 : if (persistency == RS_PERSISTENT)
1423 : {
1424 250 : if (XLogRecPtrIsValid(last_saved_restart_lsn) &&
1425 : restart_lsn > last_saved_restart_lsn)
1426 : {
1427 0 : restart_lsn = last_saved_restart_lsn;
1428 : }
1429 : }
1430 :
1431 252 : if (!XLogRecPtrIsValid(restart_lsn))
1432 0 : continue;
1433 :
1434 252 : if (!XLogRecPtrIsValid(result) ||
1435 : restart_lsn < result)
1436 200 : result = restart_lsn;
1437 : }
1438 :
1439 3896 : LWLockRelease(ReplicationSlotControlLock);
1440 :
1441 3896 : return result;
1442 : }
1443 :
1444 : /*
1445 : * ReplicationSlotsCountDBSlots -- count the number of slots that refer to the
1446 : * passed database oid.
1447 : *
1448 : * Returns true if there are any slots referencing the database. *nslots will
1449 : * be set to the absolute number of slots in the database, *nactive to ones
1450 : * currently active.
1451 : */
1452 : bool
1453 52 : ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
1454 : {
1455 : int i;
1456 :
1457 52 : *nslots = *nactive = 0;
1458 :
1459 52 : if (max_replication_slots + max_repack_replication_slots <= 0)
1460 0 : return false;
1461 :
1462 52 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1463 805 : for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
1464 : {
1465 : ReplicationSlot *s;
1466 :
1467 753 : s = &ReplicationSlotCtl->replication_slots[i];
1468 :
1469 : /* cannot change while ReplicationSlotCtlLock is held */
1470 753 : if (!s->in_use)
1471 732 : continue;
1472 :
1473 : /* only logical slots are database specific, skip */
1474 21 : if (!SlotIsLogical(s))
1475 10 : continue;
1476 :
1477 : /* not our database, skip */
1478 11 : if (s->data.database != dboid)
1479 8 : continue;
1480 :
1481 : /* NB: intentionally counting invalidated slots */
1482 :
1483 : /* count slots with spinlock held */
1484 3 : SpinLockAcquire(&s->mutex);
1485 3 : (*nslots)++;
1486 3 : if (s->active_proc != INVALID_PROC_NUMBER)
1487 1 : (*nactive)++;
1488 3 : SpinLockRelease(&s->mutex);
1489 : }
1490 52 : LWLockRelease(ReplicationSlotControlLock);
1491 :
1492 52 : if (*nslots > 0)
1493 3 : return true;
1494 49 : return false;
1495 : }
1496 :
1497 : /*
1498 : * ReplicationSlotsDropDBSlots -- Drop all db-specific slots relating to the
1499 : * passed database oid. The caller should hold an exclusive lock on the
1500 : * pg_database oid for the database to prevent creation of new slots on the db
1501 : * or replay from existing slots.
1502 : *
1503 : * Another session that concurrently acquires an existing slot on the target DB
1504 : * (most likely to drop it) may cause this function to ERROR. If that happens
1505 : * it may have dropped some but not all slots.
1506 : *
1507 : * This routine isn't as efficient as it could be - but we don't drop
1508 : * databases often, especially databases with lots of slots.
1509 : *
1510 : * If the last logical slot in the cluster is dropped, request to disable
1511 : * logical decoding.
1512 : */
1513 : void
1514 66 : ReplicationSlotsDropDBSlots(Oid dboid)
1515 : {
1516 : int i;
1517 : bool found_valid_logicalslot;
1518 66 : bool dropped = false;
1519 :
1520 66 : if (max_replication_slots + max_repack_replication_slots <= 0)
1521 0 : return;
1522 :
1523 66 : restart:
1524 71 : found_valid_logicalslot = false;
1525 71 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1526 1021 : for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
1527 : {
1528 : ReplicationSlot *s;
1529 : char *slotname;
1530 : ProcNumber active_proc;
1531 :
1532 955 : s = &ReplicationSlotCtl->replication_slots[i];
1533 :
1534 : /* cannot change while ReplicationSlotCtlLock is held */
1535 955 : if (!s->in_use)
1536 925 : continue;
1537 :
1538 : /* only logical slots are database specific, skip */
1539 30 : if (!SlotIsLogical(s))
1540 11 : continue;
1541 :
1542 : /*
1543 : * Check logical slots on other databases too so we can disable
1544 : * logical decoding only if no slots in the cluster.
1545 : */
1546 19 : SpinLockAcquire(&s->mutex);
1547 19 : found_valid_logicalslot |= (s->data.invalidated == RS_INVAL_NONE);
1548 19 : SpinLockRelease(&s->mutex);
1549 :
1550 : /* not our database, skip */
1551 19 : if (s->data.database != dboid)
1552 14 : continue;
1553 :
1554 : /* NB: intentionally including invalidated slots to drop */
1555 :
1556 : /* acquire slot, so ReplicationSlotDropAcquired can be reused */
1557 5 : SpinLockAcquire(&s->mutex);
1558 : /* can't change while ReplicationSlotControlLock is held */
1559 5 : slotname = NameStr(s->data.name);
1560 5 : active_proc = s->active_proc;
1561 5 : if (active_proc == INVALID_PROC_NUMBER)
1562 : {
1563 5 : MyReplicationSlot = s;
1564 5 : s->active_proc = MyProcNumber;
1565 : }
1566 5 : SpinLockRelease(&s->mutex);
1567 :
1568 : /*
1569 : * Even though we hold an exclusive lock on the database object a
1570 : * logical slot for that DB can still be active, e.g. if it's
1571 : * concurrently being dropped by a backend connected to another DB.
1572 : *
1573 : * That's fairly unlikely in practice, so we'll just bail out.
1574 : *
1575 : * The slot sync worker holds a shared lock on the database before
1576 : * operating on synced logical slots to avoid conflict with the drop
1577 : * happening here. The persistent synced slots are thus safe but there
1578 : * is a possibility that the slot sync worker has created a temporary
1579 : * slot (which stays active even on release) and we are trying to drop
1580 : * that here. In practice, the chances of hitting this scenario are
1581 : * less as during slot synchronization, the temporary slot is
1582 : * immediately converted to persistent and thus is safe due to the
1583 : * shared lock taken on the database. So, we'll just bail out in such
1584 : * a case.
1585 : *
1586 : * XXX: We can consider shutting down the slot sync worker before
1587 : * trying to drop synced temporary slots here.
1588 : */
1589 5 : if (active_proc != INVALID_PROC_NUMBER)
1590 0 : ereport(ERROR,
1591 : (errcode(ERRCODE_OBJECT_IN_USE),
1592 : errmsg("replication slot \"%s\" is active for PID %d",
1593 : slotname, GetPGProcByNumber(active_proc)->pid)));
1594 :
1595 : /*
1596 : * To avoid duplicating ReplicationSlotDropAcquired() and to avoid
1597 : * holding ReplicationSlotControlLock over filesystem operations,
1598 : * release ReplicationSlotControlLock and use
1599 : * ReplicationSlotDropAcquired.
1600 : *
1601 : * As that means the set of slots could change, restart scan from the
1602 : * beginning each time we release the lock.
1603 : */
1604 5 : LWLockRelease(ReplicationSlotControlLock);
1605 5 : ReplicationSlotDropAcquired(false);
1606 5 : dropped = true;
1607 5 : goto restart;
1608 : }
1609 66 : LWLockRelease(ReplicationSlotControlLock);
1610 :
1611 66 : if (dropped && !found_valid_logicalslot)
1612 0 : RequestDisableLogicalDecoding();
1613 : }
1614 :
1615 : /*
1616 : * Returns true if there is at least one in-use valid logical replication slot.
1617 : */
1618 : bool
1619 505 : CheckLogicalSlotExists(void)
1620 : {
1621 505 : bool found = false;
1622 :
1623 505 : if (max_replication_slots + max_repack_replication_slots <= 0)
1624 0 : return false;
1625 :
1626 505 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1627 7942 : for (int i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
1628 : {
1629 : ReplicationSlot *s;
1630 : bool invalidated;
1631 :
1632 7444 : s = &ReplicationSlotCtl->replication_slots[i];
1633 :
1634 : /* cannot change while ReplicationSlotCtlLock is held */
1635 7444 : if (!s->in_use)
1636 7414 : continue;
1637 :
1638 30 : if (SlotIsPhysical(s))
1639 20 : continue;
1640 :
1641 10 : SpinLockAcquire(&s->mutex);
1642 10 : invalidated = s->data.invalidated != RS_INVAL_NONE;
1643 10 : SpinLockRelease(&s->mutex);
1644 :
1645 10 : if (invalidated)
1646 3 : continue;
1647 :
1648 7 : found = true;
1649 7 : break;
1650 : }
1651 505 : LWLockRelease(ReplicationSlotControlLock);
1652 :
1653 505 : return found;
1654 : }
1655 :
1656 : /*
1657 : * Check whether the server's configuration supports using replication
1658 : * slots.
1659 : */
1660 : void
1661 1925 : CheckSlotRequirements(bool repack)
1662 : {
1663 : /*
1664 : * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
1665 : * needs the same check.
1666 : */
1667 :
1668 1925 : if (!repack && max_replication_slots == 0)
1669 0 : ereport(ERROR,
1670 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1671 : errmsg("replication slots can only be used if \"%s\" > 0",
1672 : "max_replication_slots"));
1673 :
1674 1925 : if (repack && max_repack_replication_slots == 0)
1675 0 : ereport(ERROR,
1676 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1677 : errmsg("REPACK can only be used if \"%s\" > 0",
1678 : "max_repack_replication_slots"));
1679 :
1680 1925 : if (wal_level < WAL_LEVEL_REPLICA)
1681 0 : ereport(ERROR,
1682 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1683 : errmsg("replication slots can only be used if \"wal_level\" >= \"replica\"")));
1684 1925 : }
1685 :
1686 : /*
1687 : * Check whether the user has privilege to use replication slots.
1688 : */
1689 : void
1690 605 : CheckSlotPermissions(void)
1691 : {
1692 605 : if (!has_rolreplication(GetUserId()))
1693 5 : ereport(ERROR,
1694 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1695 : errmsg("permission denied to use replication slots"),
1696 : errdetail("Only roles with the %s attribute may use replication slots.",
1697 : "REPLICATION")));
1698 600 : }
1699 :
1700 : /*
1701 : * Reserve WAL for the currently active slot.
1702 : *
1703 : * Compute and set restart_lsn in a manner that's appropriate for the type of
1704 : * the slot and concurrency safe.
1705 : */
1706 : void
1707 671 : ReplicationSlotReserveWal(void)
1708 : {
1709 671 : ReplicationSlot *slot = MyReplicationSlot;
1710 : XLogSegNo segno;
1711 : XLogRecPtr restart_lsn;
1712 :
1713 : Assert(slot != NULL);
1714 : Assert(!XLogRecPtrIsValid(slot->data.restart_lsn));
1715 : Assert(!XLogRecPtrIsValid(slot->last_saved_restart_lsn));
1716 :
1717 : /*
1718 : * The replication slot mechanism is used to prevent the removal of
1719 : * required WAL.
1720 : *
1721 : * Acquire an exclusive lock to prevent the checkpoint process from
1722 : * concurrently computing the minimum slot LSN (see
1723 : * CheckPointReplicationSlots). This ensures that the WAL reserved for
1724 : * replication cannot be removed during a checkpoint.
1725 : *
1726 : * The mechanism is reliable because if WAL reservation occurs first, the
1727 : * checkpoint must wait for the restart_lsn update before determining the
1728 : * minimum non-removable LSN. On the other hand, if the checkpoint happens
1729 : * first, subsequent WAL reservations will select positions at or beyond
1730 : * the redo pointer of that checkpoint.
1731 : */
1732 671 : LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
1733 :
1734 : /*
1735 : * For logical slots log a standby snapshot and start logical decoding at
1736 : * exactly that position. That allows the slot to start up more quickly.
1737 : * But on a standby we cannot do WAL writes, so just use the replay
1738 : * pointer; effectively, an attempt to create a logical slot on standby
1739 : * will cause it to wait for an xl_running_xact record to be logged
1740 : * independently on the primary, so that a snapshot can be built using the
1741 : * record.
1742 : *
1743 : * None of this is needed (or indeed helpful) for physical slots as
1744 : * they'll start replay at the last logged checkpoint anyway. Instead,
1745 : * return the location of the last redo LSN, where a base backup has to
1746 : * start replay at.
1747 : */
1748 671 : if (SlotIsPhysical(slot))
1749 169 : restart_lsn = GetRedoRecPtr();
1750 502 : else if (RecoveryInProgress())
1751 28 : restart_lsn = GetXLogReplayRecPtr(NULL);
1752 : else
1753 474 : restart_lsn = GetXLogInsertRecPtr();
1754 :
1755 671 : SpinLockAcquire(&slot->mutex);
1756 671 : slot->data.restart_lsn = restart_lsn;
1757 671 : SpinLockRelease(&slot->mutex);
1758 :
1759 : /* prevent WAL removal as fast as possible */
1760 671 : ReplicationSlotsComputeRequiredLSN();
1761 :
1762 : /* Checkpoint shouldn't remove the required WAL. */
1763 671 : XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size);
1764 671 : if (XLogGetLastRemovedSegno() >= segno)
1765 0 : elog(ERROR, "WAL required by replication slot %s has been removed concurrently",
1766 : NameStr(slot->data.name));
1767 :
1768 671 : LWLockRelease(ReplicationSlotAllocationLock);
1769 :
1770 671 : if (!RecoveryInProgress() && SlotIsLogical(slot))
1771 : {
1772 : XLogRecPtr flushptr;
1773 :
1774 : /* make sure we have enough information to start */
1775 474 : flushptr = LogStandbySnapshot();
1776 :
1777 : /* and make sure it's fsynced to disk */
1778 474 : XLogFlush(flushptr);
1779 : }
1780 671 : }
1781 :
1782 : /*
1783 : * Report that replication slot needs to be invalidated
1784 : */
1785 : static void
1786 23 : ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
1787 : bool terminating,
1788 : int pid,
1789 : NameData slotname,
1790 : XLogRecPtr restart_lsn,
1791 : XLogRecPtr oldestLSN,
1792 : TransactionId snapshotConflictHorizon,
1793 : long slot_idle_seconds)
1794 : {
1795 : StringInfoData err_detail;
1796 : StringInfoData err_hint;
1797 :
1798 23 : initStringInfo(&err_detail);
1799 23 : initStringInfo(&err_hint);
1800 :
1801 23 : switch (cause)
1802 : {
1803 7 : case RS_INVAL_WAL_REMOVED:
1804 : {
1805 7 : uint64 ex = oldestLSN - restart_lsn;
1806 :
1807 7 : appendStringInfo(&err_detail,
1808 7 : ngettext("The slot's restart_lsn %X/%08X exceeds the limit by %" PRIu64 " byte.",
1809 : "The slot's restart_lsn %X/%08X exceeds the limit by %" PRIu64 " bytes.",
1810 : ex),
1811 7 : LSN_FORMAT_ARGS(restart_lsn),
1812 : ex);
1813 : /* translator: %s is a GUC variable name */
1814 7 : appendStringInfo(&err_hint, _("You might need to increase \"%s\"."),
1815 : "max_slot_wal_keep_size");
1816 7 : break;
1817 : }
1818 12 : case RS_INVAL_HORIZON:
1819 12 : appendStringInfo(&err_detail, _("The slot conflicted with xid horizon %u."),
1820 : snapshotConflictHorizon);
1821 12 : break;
1822 :
1823 4 : case RS_INVAL_WAL_LEVEL:
1824 4 : appendStringInfoString(&err_detail, _("Logical decoding on standby requires the primary server to either set \"wal_level\" >= \"logical\" or have at least one logical slot when \"wal_level\" = \"replica\"."));
1825 4 : break;
1826 :
1827 0 : case RS_INVAL_IDLE_TIMEOUT:
1828 : {
1829 : /* translator: %s is a GUC variable name */
1830 0 : appendStringInfo(&err_detail, _("The slot's idle time of %lds exceeds the configured \"%s\" duration of %ds."),
1831 : slot_idle_seconds, "idle_replication_slot_timeout",
1832 : idle_replication_slot_timeout_secs);
1833 : /* translator: %s is a GUC variable name */
1834 0 : appendStringInfo(&err_hint, _("You might need to increase \"%s\"."),
1835 : "idle_replication_slot_timeout");
1836 0 : break;
1837 : }
1838 : case RS_INVAL_NONE:
1839 : pg_unreachable();
1840 : }
1841 :
1842 23 : ereport(LOG,
1843 : terminating ?
1844 : errmsg("terminating process %d to release replication slot \"%s\"",
1845 : pid, NameStr(slotname)) :
1846 : errmsg("invalidating obsolete replication slot \"%s\"",
1847 : NameStr(slotname)),
1848 : errdetail_internal("%s", err_detail.data),
1849 : err_hint.len ? errhint("%s", err_hint.data) : 0);
1850 :
1851 23 : pfree(err_detail.data);
1852 23 : pfree(err_hint.data);
1853 23 : }
1854 :
1855 : /*
1856 : * Can we invalidate an idle replication slot?
1857 : *
1858 : * Idle timeout invalidation is allowed only when:
1859 : *
1860 : * 1. Idle timeout is set
1861 : * 2. Slot has reserved WAL
1862 : * 3. Slot is inactive
1863 : * 4. The slot is not being synced from the primary while the server is in
1864 : * recovery. This is because synced slots are always considered to be
1865 : * inactive because they don't perform logical decoding to produce changes.
1866 : */
1867 : static inline bool
1868 390 : CanInvalidateIdleSlot(ReplicationSlot *s)
1869 : {
1870 390 : return (idle_replication_slot_timeout_secs != 0 &&
1871 0 : XLogRecPtrIsValid(s->data.restart_lsn) &&
1872 390 : s->inactive_since > 0 &&
1873 0 : !(RecoveryInProgress() && s->data.synced));
1874 : }
1875 :
1876 : /*
1877 : * DetermineSlotInvalidationCause - Determine the cause for which a slot
1878 : * becomes invalid among the given possible causes.
1879 : *
1880 : * This function sequentially checks all possible invalidation causes and
1881 : * returns the first one for which the slot is eligible for invalidation.
1882 : */
1883 : static ReplicationSlotInvalidationCause
1884 423 : DetermineSlotInvalidationCause(uint32 possible_causes, ReplicationSlot *s,
1885 : XLogRecPtr oldestLSN, Oid dboid,
1886 : TransactionId snapshotConflictHorizon,
1887 : TimestampTz *inactive_since, TimestampTz now)
1888 : {
1889 : Assert(possible_causes != RS_INVAL_NONE);
1890 :
1891 423 : if (possible_causes & RS_INVAL_WAL_REMOVED)
1892 : {
1893 397 : XLogRecPtr restart_lsn = s->data.restart_lsn;
1894 :
1895 397 : if (XLogRecPtrIsValid(restart_lsn) &&
1896 : restart_lsn < oldestLSN)
1897 7 : return RS_INVAL_WAL_REMOVED;
1898 : }
1899 :
1900 416 : if (possible_causes & RS_INVAL_HORIZON)
1901 : {
1902 : /* invalid DB oid signals a shared relation */
1903 22 : if (SlotIsLogical(s) &&
1904 17 : (dboid == InvalidOid || dboid == s->data.database))
1905 : {
1906 22 : TransactionId effective_xmin = s->effective_xmin;
1907 22 : TransactionId catalog_effective_xmin = s->effective_catalog_xmin;
1908 :
1909 22 : if (TransactionIdIsValid(effective_xmin) &&
1910 0 : TransactionIdPrecedesOrEquals(effective_xmin,
1911 : snapshotConflictHorizon))
1912 0 : return RS_INVAL_HORIZON;
1913 44 : else if (TransactionIdIsValid(catalog_effective_xmin) &&
1914 22 : TransactionIdPrecedesOrEquals(catalog_effective_xmin,
1915 : snapshotConflictHorizon))
1916 12 : return RS_INVAL_HORIZON;
1917 : }
1918 : }
1919 :
1920 404 : if (possible_causes & RS_INVAL_WAL_LEVEL)
1921 : {
1922 4 : if (SlotIsLogical(s))
1923 4 : return RS_INVAL_WAL_LEVEL;
1924 : }
1925 :
1926 400 : if (possible_causes & RS_INVAL_IDLE_TIMEOUT)
1927 : {
1928 : Assert(now > 0);
1929 :
1930 390 : if (CanInvalidateIdleSlot(s))
1931 : {
1932 : /*
1933 : * Simulate the invalidation due to idle_timeout to test the
1934 : * timeout behavior promptly, without waiting for it to trigger
1935 : * naturally.
1936 : */
1937 : #ifdef USE_INJECTION_POINTS
1938 0 : if (IS_INJECTION_POINT_ATTACHED("slot-timeout-inval"))
1939 : {
1940 0 : *inactive_since = 0; /* since the beginning of time */
1941 0 : return RS_INVAL_IDLE_TIMEOUT;
1942 : }
1943 : #endif
1944 :
1945 : /*
1946 : * Check if the slot needs to be invalidated due to
1947 : * idle_replication_slot_timeout GUC.
1948 : */
1949 0 : if (TimestampDifferenceExceedsSeconds(s->inactive_since, now,
1950 : idle_replication_slot_timeout_secs))
1951 : {
1952 0 : *inactive_since = s->inactive_since;
1953 0 : return RS_INVAL_IDLE_TIMEOUT;
1954 : }
1955 : }
1956 : }
1957 :
1958 400 : return RS_INVAL_NONE;
1959 : }
1960 :
1961 : /*
1962 : * Helper for InvalidateObsoleteReplicationSlots
1963 : *
1964 : * Acquires the given slot and mark it invalid, if necessary and possible.
1965 : *
1966 : * Returns true if the slot was invalidated.
1967 : *
1968 : * Set *released_lock_out if ReplicationSlotControlLock was released in the
1969 : * interim (and in that case we're not holding the lock at return, otherwise
1970 : * we are).
1971 : *
1972 : * This is inherently racy, because we release the LWLock
1973 : * for syscalls, so caller must restart if we return true.
1974 : */
1975 : static bool
1976 462 : InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
1977 : ReplicationSlot *s,
1978 : XLogRecPtr oldestLSN,
1979 : Oid dboid, TransactionId snapshotConflictHorizon,
1980 : bool *released_lock_out)
1981 : {
1982 462 : int last_signaled_pid = 0;
1983 462 : bool released_lock = false;
1984 462 : bool invalidated = false;
1985 462 : TimestampTz inactive_since = 0;
1986 :
1987 : for (;;)
1988 7 : {
1989 : XLogRecPtr restart_lsn;
1990 : NameData slotname;
1991 : ProcNumber active_proc;
1992 469 : int active_pid = 0;
1993 469 : ReplicationSlotInvalidationCause invalidation_cause = RS_INVAL_NONE;
1994 469 : TimestampTz now = 0;
1995 469 : long slot_idle_secs = 0;
1996 :
1997 : Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
1998 :
1999 469 : if (!s->in_use)
2000 : {
2001 0 : if (released_lock)
2002 0 : LWLockRelease(ReplicationSlotControlLock);
2003 0 : break;
2004 : }
2005 :
2006 469 : if (possible_causes & RS_INVAL_IDLE_TIMEOUT)
2007 : {
2008 : /*
2009 : * Assign the current time here to avoid system call overhead
2010 : * while holding the spinlock in subsequent code.
2011 : */
2012 411 : now = GetCurrentTimestamp();
2013 : }
2014 :
2015 : /*
2016 : * Check if the slot needs to be invalidated. If it needs to be
2017 : * invalidated, and is not currently acquired, acquire it and mark it
2018 : * as having been invalidated. We do this with the spinlock held to
2019 : * avoid race conditions -- for example the restart_lsn could move
2020 : * forward, or the slot could be dropped.
2021 : */
2022 469 : SpinLockAcquire(&s->mutex);
2023 :
2024 469 : restart_lsn = s->data.restart_lsn;
2025 :
2026 : /* we do nothing if the slot is already invalid */
2027 469 : if (s->data.invalidated == RS_INVAL_NONE)
2028 423 : invalidation_cause = DetermineSlotInvalidationCause(possible_causes,
2029 : s, oldestLSN,
2030 : dboid,
2031 : snapshotConflictHorizon,
2032 : &inactive_since,
2033 : now);
2034 :
2035 : /* if there's no invalidation, we're done */
2036 469 : if (invalidation_cause == RS_INVAL_NONE)
2037 : {
2038 446 : SpinLockRelease(&s->mutex);
2039 446 : if (released_lock)
2040 0 : LWLockRelease(ReplicationSlotControlLock);
2041 446 : break;
2042 : }
2043 :
2044 23 : slotname = s->data.name;
2045 23 : active_proc = s->active_proc;
2046 :
2047 : /*
2048 : * If the slot can be acquired, do so and mark it invalidated
2049 : * immediately. Otherwise we'll signal the owning process, below, and
2050 : * retry.
2051 : *
2052 : * Note: Unlike other slot attributes, slot's inactive_since can't be
2053 : * changed until the acquired slot is released or the owning process
2054 : * is terminated. So, the inactive slot can only be invalidated
2055 : * immediately without being terminated.
2056 : */
2057 23 : if (active_proc == INVALID_PROC_NUMBER)
2058 : {
2059 16 : MyReplicationSlot = s;
2060 16 : s->active_proc = MyProcNumber;
2061 16 : s->data.invalidated = invalidation_cause;
2062 :
2063 : /*
2064 : * XXX: We should consider not overwriting restart_lsn and instead
2065 : * just rely on .invalidated.
2066 : */
2067 16 : if (invalidation_cause == RS_INVAL_WAL_REMOVED)
2068 : {
2069 5 : s->data.restart_lsn = InvalidXLogRecPtr;
2070 5 : s->last_saved_restart_lsn = InvalidXLogRecPtr;
2071 : }
2072 :
2073 : /* Let caller know */
2074 16 : invalidated = true;
2075 : }
2076 : else
2077 : {
2078 7 : active_pid = GetPGProcByNumber(active_proc)->pid;
2079 : Assert(active_pid != 0);
2080 : }
2081 :
2082 23 : SpinLockRelease(&s->mutex);
2083 :
2084 : /*
2085 : * Calculate the idle time duration of the slot if slot is marked
2086 : * invalidated with RS_INVAL_IDLE_TIMEOUT.
2087 : */
2088 23 : if (invalidation_cause == RS_INVAL_IDLE_TIMEOUT)
2089 : {
2090 : int slot_idle_usecs;
2091 :
2092 0 : TimestampDifference(inactive_since, now, &slot_idle_secs,
2093 : &slot_idle_usecs);
2094 : }
2095 :
2096 23 : if (active_proc != INVALID_PROC_NUMBER)
2097 : {
2098 : /*
2099 : * Prepare the sleep on the slot's condition variable before
2100 : * releasing the lock, to close a possible race condition if the
2101 : * slot is released before the sleep below.
2102 : */
2103 7 : ConditionVariablePrepareToSleep(&s->active_cv);
2104 :
2105 7 : LWLockRelease(ReplicationSlotControlLock);
2106 7 : released_lock = true;
2107 :
2108 : /*
2109 : * Signal to terminate the process that owns the slot, if we
2110 : * haven't already signalled it. (Avoidance of repeated
2111 : * signalling is the only reason for there to be a loop in this
2112 : * routine; otherwise we could rely on caller's restart loop.)
2113 : *
2114 : * There is the race condition that other process may own the slot
2115 : * after its current owner process is terminated and before this
2116 : * process owns it. To handle that, we signal only if the PID of
2117 : * the owning process has changed from the previous time. (This
2118 : * logic assumes that the same PID is not reused very quickly.)
2119 : */
2120 7 : if (last_signaled_pid != active_pid)
2121 : {
2122 7 : ReportSlotInvalidation(invalidation_cause, true, active_pid,
2123 : slotname, restart_lsn,
2124 : oldestLSN, snapshotConflictHorizon,
2125 : slot_idle_secs);
2126 :
2127 7 : if (MyBackendType == B_STARTUP)
2128 5 : (void) SignalRecoveryConflict(GetPGProcByNumber(active_proc),
2129 : active_pid,
2130 : RECOVERY_CONFLICT_LOGICALSLOT);
2131 : else
2132 2 : (void) kill(active_pid, SIGTERM);
2133 :
2134 7 : last_signaled_pid = active_pid;
2135 : }
2136 :
2137 : /* Wait until the slot is released. */
2138 7 : ConditionVariableSleep(&s->active_cv,
2139 : WAIT_EVENT_REPLICATION_SLOT_DROP);
2140 :
2141 : /*
2142 : * Re-acquire lock and start over; we expect to invalidate the
2143 : * slot next time (unless another process acquires the slot in the
2144 : * meantime).
2145 : *
2146 : * Note: It is possible for a slot to advance its restart_lsn or
2147 : * xmin values sufficiently between when we release the mutex and
2148 : * when we recheck, moving from a conflicting state to a non
2149 : * conflicting state. This is intentional and safe: if the slot
2150 : * has caught up while we're busy here, the resources we were
2151 : * concerned about (WAL segments or tuples) have not yet been
2152 : * removed, and there's no reason to invalidate the slot.
2153 : */
2154 7 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
2155 7 : continue;
2156 : }
2157 : else
2158 : {
2159 : /*
2160 : * We hold the slot now and have already invalidated it; flush it
2161 : * to ensure that state persists.
2162 : *
2163 : * Don't want to hold ReplicationSlotControlLock across file
2164 : * system operations, so release it now but be sure to tell caller
2165 : * to restart from scratch.
2166 : */
2167 16 : LWLockRelease(ReplicationSlotControlLock);
2168 16 : released_lock = true;
2169 :
2170 : /* Make sure the invalidated state persists across server restart */
2171 16 : ReplicationSlotMarkDirty();
2172 16 : ReplicationSlotSave();
2173 16 : ReplicationSlotRelease();
2174 :
2175 16 : ReportSlotInvalidation(invalidation_cause, false, active_pid,
2176 : slotname, restart_lsn,
2177 : oldestLSN, snapshotConflictHorizon,
2178 : slot_idle_secs);
2179 :
2180 : /* done with this slot for now */
2181 16 : break;
2182 : }
2183 : }
2184 :
2185 : Assert(released_lock == !LWLockHeldByMe(ReplicationSlotControlLock));
2186 :
2187 462 : *released_lock_out = released_lock;
2188 462 : return invalidated;
2189 : }
2190 :
2191 : /*
2192 : * Invalidate slots that require resources about to be removed.
2193 : *
2194 : * Returns true when any slot have got invalidated.
2195 : *
2196 : * Whether a slot needs to be invalidated depends on the invalidation cause.
2197 : * A slot is invalidated if it:
2198 : * - RS_INVAL_WAL_REMOVED: requires a LSN older than the given segment
2199 : * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given
2200 : * db; dboid may be InvalidOid for shared relations
2201 : * - RS_INVAL_WAL_LEVEL: is a logical slot and effective_wal_level is not
2202 : * logical.
2203 : * - RS_INVAL_IDLE_TIMEOUT: has been idle longer than the configured
2204 : * "idle_replication_slot_timeout" duration.
2205 : *
2206 : * Note: This function attempts to invalidate the slot for multiple possible
2207 : * causes in a single pass, minimizing redundant iterations. The "cause"
2208 : * parameter can be a MASK representing one or more of the defined causes.
2209 : *
2210 : * If it invalidates the last logical slot in the cluster, it requests to
2211 : * disable logical decoding.
2212 : *
2213 : * NB - this runs as part of checkpoint, so avoid raising errors if possible.
2214 : */
2215 : bool
2216 1973 : InvalidateObsoleteReplicationSlots(uint32 possible_causes,
2217 : XLogSegNo oldestSegno, Oid dboid,
2218 : TransactionId snapshotConflictHorizon)
2219 : {
2220 : XLogRecPtr oldestLSN;
2221 1973 : bool invalidated = false;
2222 1973 : bool invalidated_logical = false;
2223 : bool found_valid_logicalslot;
2224 :
2225 : Assert(!(possible_causes & RS_INVAL_HORIZON) || TransactionIdIsValid(snapshotConflictHorizon));
2226 : Assert(!(possible_causes & RS_INVAL_WAL_REMOVED) || oldestSegno > 0);
2227 : Assert(possible_causes != RS_INVAL_NONE);
2228 :
2229 1973 : if (max_replication_slots == 0 && max_repack_replication_slots == 0)
2230 0 : return invalidated;
2231 :
2232 1973 : XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
2233 :
2234 1989 : restart:
2235 1989 : found_valid_logicalslot = false;
2236 1989 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
2237 31223 : for (int i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
2238 : {
2239 29250 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
2240 29250 : bool released_lock = false;
2241 :
2242 29250 : if (!s->in_use)
2243 28788 : continue;
2244 :
2245 : /* Prevent invalidation of logical slots during binary upgrade */
2246 471 : if (SlotIsLogical(s) && IsBinaryUpgrade)
2247 : {
2248 9 : SpinLockAcquire(&s->mutex);
2249 9 : found_valid_logicalslot |= (s->data.invalidated == RS_INVAL_NONE);
2250 9 : SpinLockRelease(&s->mutex);
2251 :
2252 9 : continue;
2253 : }
2254 :
2255 462 : if (InvalidatePossiblyObsoleteSlot(possible_causes, s, oldestLSN,
2256 : dboid, snapshotConflictHorizon,
2257 : &released_lock))
2258 : {
2259 : Assert(released_lock);
2260 :
2261 : /* Remember we have invalidated a physical or logical slot */
2262 16 : invalidated = true;
2263 :
2264 : /*
2265 : * Additionally, remember we have invalidated a logical slot as we
2266 : * can request disabling logical decoding later.
2267 : */
2268 16 : if (SlotIsLogical(s))
2269 13 : invalidated_logical = true;
2270 : }
2271 : else
2272 : {
2273 : /*
2274 : * We need to check if the slot is invalidated here since
2275 : * InvalidatePossiblyObsoleteSlot() returns false also if the slot
2276 : * is already invalidated.
2277 : */
2278 446 : SpinLockAcquire(&s->mutex);
2279 892 : found_valid_logicalslot |=
2280 446 : (SlotIsLogical(s) && (s->data.invalidated == RS_INVAL_NONE));
2281 446 : SpinLockRelease(&s->mutex);
2282 : }
2283 :
2284 : /* if the lock was released, start from scratch */
2285 462 : if (released_lock)
2286 16 : goto restart;
2287 : }
2288 1973 : LWLockRelease(ReplicationSlotControlLock);
2289 :
2290 : /*
2291 : * If any slots have been invalidated, recalculate the resource limits.
2292 : */
2293 1973 : if (invalidated)
2294 : {
2295 11 : ReplicationSlotsComputeRequiredXmin(false);
2296 11 : ReplicationSlotsComputeRequiredLSN();
2297 : }
2298 :
2299 : /*
2300 : * Request the checkpointer to disable logical decoding if no valid
2301 : * logical slots remain. If called by the checkpointer during a
2302 : * checkpoint, only the request is initiated; actual deactivation is
2303 : * deferred until after the checkpoint completes.
2304 : */
2305 1973 : if (invalidated_logical && !found_valid_logicalslot)
2306 8 : RequestDisableLogicalDecoding();
2307 :
2308 1973 : return invalidated;
2309 : }
2310 :
2311 : /*
2312 : * Flush all replication slots to disk.
2313 : *
2314 : * It is convenient to flush dirty replication slots at the time of checkpoint.
2315 : * Additionally, in case of a shutdown checkpoint, we also identify the slots
2316 : * for which the confirmed_flush LSN has been updated since the last time it
2317 : * was saved and flush them.
2318 : */
2319 : void
2320 1948 : CheckPointReplicationSlots(bool is_shutdown)
2321 : {
2322 : int i;
2323 1948 : bool last_saved_restart_lsn_updated = false;
2324 :
2325 1948 : elog(DEBUG1, "performing replication slot checkpoint");
2326 :
2327 : /*
2328 : * Prevent any slot from being created/dropped while we're active. As we
2329 : * explicitly do *not* want to block iterating over replication_slots or
2330 : * acquiring a slot we cannot take the control lock - but that's OK,
2331 : * because holding ReplicationSlotAllocationLock is strictly stronger, and
2332 : * enough to guarantee that nobody can change the in_use bits on us.
2333 : *
2334 : * Additionally, acquiring the Allocation lock is necessary to serialize
2335 : * the slot flush process with concurrent slot WAL reservation. This
2336 : * ensures that the WAL position being reserved is either flushed to disk
2337 : * or is beyond or equal to the redo pointer of the current checkpoint
2338 : * (See ReplicationSlotReserveWal for details).
2339 : */
2340 1948 : LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
2341 :
2342 30887 : for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
2343 : {
2344 28939 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
2345 : char path[MAXPGPATH];
2346 :
2347 28939 : if (!s->in_use)
2348 28526 : continue;
2349 :
2350 : /* save the slot to disk, locking is handled in SaveSlotToPath() */
2351 413 : sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(s->data.name));
2352 :
2353 : /*
2354 : * Slot's data is not flushed each time the confirmed_flush LSN is
2355 : * updated as that could lead to frequent writes. However, we decide
2356 : * to force a flush of all logical slot's data at the time of shutdown
2357 : * if the confirmed_flush LSN is changed since we last flushed it to
2358 : * disk. This helps in avoiding an unnecessary retreat of the
2359 : * confirmed_flush LSN after restart.
2360 : */
2361 413 : if (is_shutdown && SlotIsLogical(s))
2362 : {
2363 95 : SpinLockAcquire(&s->mutex);
2364 :
2365 95 : if (s->data.invalidated == RS_INVAL_NONE &&
2366 94 : s->data.confirmed_flush > s->last_saved_confirmed_flush)
2367 : {
2368 42 : s->just_dirtied = true;
2369 42 : s->dirty = true;
2370 : }
2371 95 : SpinLockRelease(&s->mutex);
2372 : }
2373 :
2374 : /*
2375 : * Track if we're going to update slot's last_saved_restart_lsn. We
2376 : * need this to know if we need to recompute the required LSN.
2377 : */
2378 413 : if (s->last_saved_restart_lsn != s->data.restart_lsn)
2379 215 : last_saved_restart_lsn_updated = true;
2380 :
2381 413 : SaveSlotToPath(s, path, LOG);
2382 : }
2383 1948 : LWLockRelease(ReplicationSlotAllocationLock);
2384 :
2385 : /*
2386 : * Recompute the required LSN if SaveSlotToPath() updated
2387 : * last_saved_restart_lsn for any slot.
2388 : */
2389 1948 : if (last_saved_restart_lsn_updated)
2390 215 : ReplicationSlotsComputeRequiredLSN();
2391 1948 : }
2392 :
2393 : /*
2394 : * Load all replication slots from disk into memory at server startup. This
2395 : * needs to be run before we start crash recovery.
2396 : */
2397 : void
2398 1093 : StartupReplicationSlots(void)
2399 : {
2400 : DIR *replication_dir;
2401 : struct dirent *replication_de;
2402 :
2403 1093 : elog(DEBUG1, "starting up replication slots");
2404 :
2405 : /* restore all slots by iterating over all on-disk entries */
2406 1093 : replication_dir = AllocateDir(PG_REPLSLOT_DIR);
2407 3399 : while ((replication_de = ReadDir(replication_dir, PG_REPLSLOT_DIR)) != NULL)
2408 : {
2409 : char path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
2410 : PGFileType de_type;
2411 :
2412 2308 : if (strcmp(replication_de->d_name, ".") == 0 ||
2413 1217 : strcmp(replication_de->d_name, "..") == 0)
2414 2182 : continue;
2415 :
2416 126 : snprintf(path, sizeof(path), "%s/%s", PG_REPLSLOT_DIR, replication_de->d_name);
2417 126 : de_type = get_dirent_type(path, replication_de, false, DEBUG1);
2418 :
2419 : /* we're only creating directories here, skip if it's not our's */
2420 126 : if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_DIR)
2421 0 : continue;
2422 :
2423 : /* we crashed while a slot was being setup or deleted, clean up */
2424 126 : if (pg_str_endswith(replication_de->d_name, ".tmp"))
2425 : {
2426 0 : if (!rmtree(path, true))
2427 : {
2428 0 : ereport(WARNING,
2429 : (errmsg("could not remove directory \"%s\"",
2430 : path)));
2431 0 : continue;
2432 : }
2433 0 : fsync_fname(PG_REPLSLOT_DIR, true);
2434 0 : continue;
2435 : }
2436 :
2437 : /* looks like a slot in a normal state, restore */
2438 126 : RestoreSlotFromDisk(replication_de->d_name);
2439 : }
2440 1091 : FreeDir(replication_dir);
2441 :
2442 : /* currently no slots exist, we're done. */
2443 1091 : if (max_replication_slots + max_repack_replication_slots <= 0)
2444 0 : return;
2445 :
2446 : /* Now that we have recovered all the data, compute replication xmin */
2447 1091 : ReplicationSlotsComputeRequiredXmin(false);
2448 1091 : ReplicationSlotsComputeRequiredLSN();
2449 : }
2450 :
2451 : /* ----
2452 : * Manipulation of on-disk state of replication slots
2453 : *
2454 : * NB: none of the routines below should take any notice whether a slot is the
2455 : * current one or not, that's all handled a layer above.
2456 : * ----
2457 : */
2458 : static void
2459 725 : CreateSlotOnDisk(ReplicationSlot *slot)
2460 : {
2461 : char tmppath[MAXPGPATH];
2462 : char path[MAXPGPATH];
2463 : struct stat st;
2464 :
2465 : /*
2466 : * No need to take out the io_in_progress_lock, nobody else can see this
2467 : * slot yet, so nobody else will write. We're reusing SaveSlotToPath which
2468 : * takes out the lock, if we'd take the lock here, we'd deadlock.
2469 : */
2470 :
2471 725 : sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(slot->data.name));
2472 725 : sprintf(tmppath, "%s/%s.tmp", PG_REPLSLOT_DIR, NameStr(slot->data.name));
2473 :
2474 : /*
2475 : * It's just barely possible that some previous effort to create or drop a
2476 : * slot with this name left a temp directory lying around. If that seems
2477 : * to be the case, try to remove it. If the rmtree() fails, we'll error
2478 : * out at the MakePGDirectory() below, so we don't bother checking
2479 : * success.
2480 : */
2481 725 : if (stat(tmppath, &st) == 0 && S_ISDIR(st.st_mode))
2482 0 : rmtree(tmppath, true);
2483 :
2484 : /* Create and fsync the temporary slot directory. */
2485 725 : if (MakePGDirectory(tmppath) < 0)
2486 0 : ereport(ERROR,
2487 : (errcode_for_file_access(),
2488 : errmsg("could not create directory \"%s\": %m",
2489 : tmppath)));
2490 725 : fsync_fname(tmppath, true);
2491 :
2492 : /* Write the actual state file. */
2493 725 : slot->dirty = true; /* signal that we really need to write */
2494 725 : SaveSlotToPath(slot, tmppath, ERROR);
2495 :
2496 : /* Rename the directory into place. */
2497 725 : if (rename(tmppath, path) != 0)
2498 0 : ereport(ERROR,
2499 : (errcode_for_file_access(),
2500 : errmsg("could not rename file \"%s\" to \"%s\": %m",
2501 : tmppath, path)));
2502 :
2503 : /*
2504 : * If we'd now fail - really unlikely - we wouldn't know whether this slot
2505 : * would persist after an OS crash or not - so, force a restart. The
2506 : * restart would try to fsync this again till it works.
2507 : */
2508 725 : START_CRIT_SECTION();
2509 :
2510 725 : fsync_fname(path, true);
2511 725 : fsync_fname(PG_REPLSLOT_DIR, true);
2512 :
2513 725 : END_CRIT_SECTION();
2514 725 : }
2515 :
2516 : /*
2517 : * Shared functionality between saving and creating a replication slot.
2518 : */
2519 : static void
2520 2658 : SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
2521 : {
2522 : char tmppath[MAXPGPATH];
2523 : char path[MAXPGPATH];
2524 : int fd;
2525 : ReplicationSlotOnDisk cp;
2526 : bool was_dirty;
2527 :
2528 : /* first check whether there's something to write out */
2529 2658 : SpinLockAcquire(&slot->mutex);
2530 2658 : was_dirty = slot->dirty;
2531 2658 : slot->just_dirtied = false;
2532 2658 : SpinLockRelease(&slot->mutex);
2533 :
2534 : /* and don't do anything if there's nothing to write */
2535 2658 : if (!was_dirty)
2536 145 : return;
2537 :
2538 2513 : LWLockAcquire(&slot->io_in_progress_lock, LW_EXCLUSIVE);
2539 :
2540 : /* silence valgrind :( */
2541 2513 : memset(&cp, 0, sizeof(ReplicationSlotOnDisk));
2542 :
2543 2513 : sprintf(tmppath, "%s/state.tmp", dir);
2544 2513 : sprintf(path, "%s/state", dir);
2545 :
2546 2513 : fd = OpenTransientFile(tmppath, O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
2547 2513 : if (fd < 0)
2548 : {
2549 : /*
2550 : * If not an ERROR, then release the lock before returning. In case
2551 : * of an ERROR, the error recovery path automatically releases the
2552 : * lock, but no harm in explicitly releasing even in that case. Note
2553 : * that LWLockRelease() could affect errno.
2554 : */
2555 0 : int save_errno = errno;
2556 :
2557 0 : LWLockRelease(&slot->io_in_progress_lock);
2558 0 : errno = save_errno;
2559 0 : ereport(elevel,
2560 : (errcode_for_file_access(),
2561 : errmsg("could not create file \"%s\": %m",
2562 : tmppath)));
2563 0 : return;
2564 : }
2565 :
2566 2513 : cp.magic = SLOT_MAGIC;
2567 2513 : INIT_CRC32C(cp.checksum);
2568 2513 : cp.version = SLOT_VERSION;
2569 2513 : cp.length = ReplicationSlotOnDiskV2Size;
2570 :
2571 2513 : SpinLockAcquire(&slot->mutex);
2572 :
2573 2513 : memcpy(&cp.slotdata, &slot->data, sizeof(ReplicationSlotPersistentData));
2574 :
2575 2513 : SpinLockRelease(&slot->mutex);
2576 :
2577 2513 : COMP_CRC32C(cp.checksum,
2578 : (char *) (&cp) + ReplicationSlotOnDiskNotChecksummedSize,
2579 : ReplicationSlotOnDiskChecksummedSize);
2580 2513 : FIN_CRC32C(cp.checksum);
2581 :
2582 2513 : errno = 0;
2583 2513 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_WRITE);
2584 2513 : if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
2585 : {
2586 0 : int save_errno = errno;
2587 :
2588 0 : pgstat_report_wait_end();
2589 0 : CloseTransientFile(fd);
2590 0 : unlink(tmppath);
2591 0 : LWLockRelease(&slot->io_in_progress_lock);
2592 :
2593 : /* if write didn't set errno, assume problem is no disk space */
2594 0 : errno = save_errno ? save_errno : ENOSPC;
2595 0 : ereport(elevel,
2596 : (errcode_for_file_access(),
2597 : errmsg("could not write to file \"%s\": %m",
2598 : tmppath)));
2599 0 : return;
2600 : }
2601 2513 : pgstat_report_wait_end();
2602 :
2603 : /* fsync the temporary file */
2604 2513 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_SYNC);
2605 2513 : if (pg_fsync(fd) != 0)
2606 : {
2607 0 : int save_errno = errno;
2608 :
2609 0 : pgstat_report_wait_end();
2610 0 : CloseTransientFile(fd);
2611 0 : unlink(tmppath);
2612 0 : LWLockRelease(&slot->io_in_progress_lock);
2613 :
2614 0 : errno = save_errno;
2615 0 : ereport(elevel,
2616 : (errcode_for_file_access(),
2617 : errmsg("could not fsync file \"%s\": %m",
2618 : tmppath)));
2619 0 : return;
2620 : }
2621 2513 : pgstat_report_wait_end();
2622 :
2623 2513 : if (CloseTransientFile(fd) != 0)
2624 : {
2625 0 : int save_errno = errno;
2626 :
2627 0 : unlink(tmppath);
2628 0 : LWLockRelease(&slot->io_in_progress_lock);
2629 :
2630 0 : errno = save_errno;
2631 0 : ereport(elevel,
2632 : (errcode_for_file_access(),
2633 : errmsg("could not close file \"%s\": %m",
2634 : tmppath)));
2635 0 : return;
2636 : }
2637 :
2638 : /* rename to permanent file, fsync file and directory */
2639 2513 : if (rename(tmppath, path) != 0)
2640 : {
2641 0 : int save_errno = errno;
2642 :
2643 0 : unlink(tmppath);
2644 0 : LWLockRelease(&slot->io_in_progress_lock);
2645 :
2646 0 : errno = save_errno;
2647 0 : ereport(elevel,
2648 : (errcode_for_file_access(),
2649 : errmsg("could not rename file \"%s\" to \"%s\": %m",
2650 : tmppath, path)));
2651 0 : return;
2652 : }
2653 :
2654 : /*
2655 : * Check CreateSlotOnDisk() for the reasoning of using a critical section.
2656 : */
2657 2513 : START_CRIT_SECTION();
2658 :
2659 2513 : fsync_fname(path, false);
2660 2513 : fsync_fname(dir, true);
2661 2513 : fsync_fname(PG_REPLSLOT_DIR, true);
2662 :
2663 2513 : END_CRIT_SECTION();
2664 :
2665 : /*
2666 : * Successfully wrote, unset dirty bit, unless somebody dirtied again
2667 : * already and remember the confirmed_flush LSN value.
2668 : */
2669 2513 : SpinLockAcquire(&slot->mutex);
2670 2513 : if (!slot->just_dirtied)
2671 2486 : slot->dirty = false;
2672 2513 : slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
2673 2513 : slot->last_saved_restart_lsn = cp.slotdata.restart_lsn;
2674 2513 : SpinLockRelease(&slot->mutex);
2675 :
2676 2513 : LWLockRelease(&slot->io_in_progress_lock);
2677 : }
2678 :
2679 : /*
2680 : * Load a single slot from disk into memory.
2681 : */
2682 : static void
2683 126 : RestoreSlotFromDisk(const char *name)
2684 : {
2685 : ReplicationSlotOnDisk cp;
2686 : int i;
2687 : char slotdir[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
2688 : char path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR) + 10];
2689 : int fd;
2690 126 : bool restored = false;
2691 : int readBytes;
2692 : pg_crc32c checksum;
2693 126 : TimestampTz now = 0;
2694 :
2695 : /* no need to lock here, no concurrent access allowed yet */
2696 :
2697 : /* delete temp file if it exists */
2698 126 : sprintf(slotdir, "%s/%s", PG_REPLSLOT_DIR, name);
2699 126 : sprintf(path, "%s/state.tmp", slotdir);
2700 126 : if (unlink(path) < 0 && errno != ENOENT)
2701 0 : ereport(PANIC,
2702 : (errcode_for_file_access(),
2703 : errmsg("could not remove file \"%s\": %m", path)));
2704 :
2705 126 : sprintf(path, "%s/state", slotdir);
2706 :
2707 126 : elog(DEBUG1, "restoring replication slot from \"%s\"", path);
2708 :
2709 : /* on some operating systems fsyncing a file requires O_RDWR */
2710 126 : fd = OpenTransientFile(path, O_RDWR | PG_BINARY);
2711 :
2712 : /*
2713 : * We do not need to handle this as we are rename()ing the directory into
2714 : * place only after we fsync()ed the state file.
2715 : */
2716 126 : if (fd < 0)
2717 0 : ereport(PANIC,
2718 : (errcode_for_file_access(),
2719 : errmsg("could not open file \"%s\": %m", path)));
2720 :
2721 : /*
2722 : * Sync state file before we're reading from it. We might have crashed
2723 : * while it wasn't synced yet and we shouldn't continue on that basis.
2724 : */
2725 126 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_RESTORE_SYNC);
2726 126 : if (pg_fsync(fd) != 0)
2727 0 : ereport(PANIC,
2728 : (errcode_for_file_access(),
2729 : errmsg("could not fsync file \"%s\": %m",
2730 : path)));
2731 126 : pgstat_report_wait_end();
2732 :
2733 : /* Also sync the parent directory */
2734 126 : START_CRIT_SECTION();
2735 126 : fsync_fname(slotdir, true);
2736 126 : END_CRIT_SECTION();
2737 :
2738 : /* read part of statefile that's guaranteed to be version independent */
2739 126 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
2740 126 : readBytes = read(fd, &cp, ReplicationSlotOnDiskConstantSize);
2741 126 : pgstat_report_wait_end();
2742 126 : if (readBytes != ReplicationSlotOnDiskConstantSize)
2743 : {
2744 0 : if (readBytes < 0)
2745 0 : ereport(PANIC,
2746 : (errcode_for_file_access(),
2747 : errmsg("could not read file \"%s\": %m", path)));
2748 : else
2749 0 : ereport(PANIC,
2750 : (errcode(ERRCODE_DATA_CORRUPTED),
2751 : errmsg("could not read file \"%s\": read %d of %zu",
2752 : path, readBytes,
2753 : (Size) ReplicationSlotOnDiskConstantSize)));
2754 : }
2755 :
2756 : /* verify magic */
2757 126 : if (cp.magic != SLOT_MAGIC)
2758 0 : ereport(PANIC,
2759 : (errcode(ERRCODE_DATA_CORRUPTED),
2760 : errmsg("replication slot file \"%s\" has wrong magic number: %u instead of %u",
2761 : path, cp.magic, SLOT_MAGIC)));
2762 :
2763 : /* verify version */
2764 126 : if (cp.version != SLOT_VERSION)
2765 0 : ereport(PANIC,
2766 : (errcode(ERRCODE_DATA_CORRUPTED),
2767 : errmsg("replication slot file \"%s\" has unsupported version %u",
2768 : path, cp.version)));
2769 :
2770 : /* boundary check on length */
2771 126 : if (cp.length != ReplicationSlotOnDiskV2Size)
2772 0 : ereport(PANIC,
2773 : (errcode(ERRCODE_DATA_CORRUPTED),
2774 : errmsg("replication slot file \"%s\" has corrupted length %u",
2775 : path, cp.length)));
2776 :
2777 : /* Now that we know the size, read the entire file */
2778 126 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
2779 252 : readBytes = read(fd,
2780 : (char *) &cp + ReplicationSlotOnDiskConstantSize,
2781 126 : cp.length);
2782 126 : pgstat_report_wait_end();
2783 126 : if (readBytes != cp.length)
2784 : {
2785 0 : if (readBytes < 0)
2786 0 : ereport(PANIC,
2787 : (errcode_for_file_access(),
2788 : errmsg("could not read file \"%s\": %m", path)));
2789 : else
2790 0 : ereport(PANIC,
2791 : (errcode(ERRCODE_DATA_CORRUPTED),
2792 : errmsg("could not read file \"%s\": read %d of %zu",
2793 : path, readBytes, (Size) cp.length)));
2794 : }
2795 :
2796 126 : if (CloseTransientFile(fd) != 0)
2797 0 : ereport(PANIC,
2798 : (errcode_for_file_access(),
2799 : errmsg("could not close file \"%s\": %m", path)));
2800 :
2801 : /* now verify the CRC */
2802 126 : INIT_CRC32C(checksum);
2803 126 : COMP_CRC32C(checksum,
2804 : (char *) &cp + ReplicationSlotOnDiskNotChecksummedSize,
2805 : ReplicationSlotOnDiskChecksummedSize);
2806 126 : FIN_CRC32C(checksum);
2807 :
2808 126 : if (!EQ_CRC32C(checksum, cp.checksum))
2809 0 : ereport(PANIC,
2810 : (errmsg("checksum mismatch for replication slot file \"%s\": is %u, should be %u",
2811 : path, checksum, cp.checksum)));
2812 :
2813 : /*
2814 : * If we crashed with an ephemeral slot active, don't restore but delete
2815 : * it.
2816 : */
2817 126 : if (cp.slotdata.persistency != RS_PERSISTENT)
2818 : {
2819 0 : if (!rmtree(slotdir, true))
2820 : {
2821 0 : ereport(WARNING,
2822 : (errmsg("could not remove directory \"%s\"",
2823 : slotdir)));
2824 : }
2825 0 : fsync_fname(PG_REPLSLOT_DIR, true);
2826 0 : return;
2827 : }
2828 :
2829 : /*
2830 : * Verify that requirements for the specific slot type are met. That's
2831 : * important because if these aren't met we're not guaranteed to retain
2832 : * all the necessary resources for the slot.
2833 : *
2834 : * NB: We have to do so *after* the above checks for ephemeral slots,
2835 : * because otherwise a slot that shouldn't exist anymore could prevent
2836 : * restarts.
2837 : *
2838 : * NB: Changing the requirements here also requires adapting
2839 : * CheckSlotRequirements() and CheckLogicalDecodingRequirements().
2840 : */
2841 126 : if (cp.slotdata.database != InvalidOid)
2842 : {
2843 87 : if (wal_level < WAL_LEVEL_REPLICA)
2844 1 : ereport(FATAL,
2845 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2846 : errmsg("logical replication slot \"%s\" exists, but \"wal_level\" < \"replica\"",
2847 : NameStr(cp.slotdata.name)),
2848 : errhint("Change \"wal_level\" to be \"replica\" or higher.")));
2849 :
2850 : /*
2851 : * In standby mode, the hot standby must be enabled. This check is
2852 : * necessary to ensure logical slots are invalidated when they become
2853 : * incompatible due to insufficient wal_level. Otherwise, if the
2854 : * primary reduces effective_wal_level < logical while hot standby is
2855 : * disabled, primary disable logical decoding while hot standby is
2856 : * disabled, logical slots would remain valid even after promotion.
2857 : */
2858 86 : if (StandbyMode && !EnableHotStandby)
2859 1 : ereport(FATAL,
2860 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2861 : errmsg("logical replication slot \"%s\" exists on the standby, but \"hot_standby\" = \"off\"",
2862 : NameStr(cp.slotdata.name)),
2863 : errhint("Change \"hot_standby\" to be \"on\".")));
2864 : }
2865 39 : else if (wal_level < WAL_LEVEL_REPLICA)
2866 0 : ereport(FATAL,
2867 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2868 : errmsg("physical replication slot \"%s\" exists, but \"wal_level\" < \"replica\"",
2869 : NameStr(cp.slotdata.name)),
2870 : errhint("Change \"wal_level\" to be \"replica\" or higher.")));
2871 :
2872 : /*
2873 : * Nothing can be active yet, don't lock anything. Note we iterate up to
2874 : * max_replication_slots instead of adding max_repack_replication_slots as
2875 : * in all other places, because we must enforce the GUC value in case
2876 : * there were more slots before the shutdown than what it is set up to
2877 : * now.
2878 : */
2879 182 : for (i = 0; i < max_replication_slots; i++)
2880 : {
2881 : ReplicationSlot *slot;
2882 :
2883 182 : slot = &ReplicationSlotCtl->replication_slots[i];
2884 :
2885 182 : if (slot->in_use)
2886 58 : continue;
2887 :
2888 : /* restore the entire set of persistent data */
2889 124 : memcpy(&slot->data, &cp.slotdata,
2890 : sizeof(ReplicationSlotPersistentData));
2891 :
2892 : /* initialize in memory state */
2893 124 : slot->effective_xmin = cp.slotdata.xmin;
2894 124 : slot->effective_catalog_xmin = cp.slotdata.catalog_xmin;
2895 124 : slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
2896 124 : slot->last_saved_restart_lsn = cp.slotdata.restart_lsn;
2897 :
2898 124 : slot->candidate_catalog_xmin = InvalidTransactionId;
2899 124 : slot->candidate_xmin_lsn = InvalidXLogRecPtr;
2900 124 : slot->candidate_restart_lsn = InvalidXLogRecPtr;
2901 124 : slot->candidate_restart_valid = InvalidXLogRecPtr;
2902 :
2903 124 : slot->in_use = true;
2904 124 : slot->active_proc = INVALID_PROC_NUMBER;
2905 :
2906 : /*
2907 : * Set the time since the slot has become inactive after loading the
2908 : * slot from the disk into memory. Whoever acquires the slot i.e.
2909 : * makes the slot active will reset it. Use the same inactive_since
2910 : * time for all the slots.
2911 : */
2912 124 : if (now == 0)
2913 124 : now = GetCurrentTimestamp();
2914 :
2915 124 : ReplicationSlotSetInactiveSince(slot, now, false);
2916 :
2917 124 : restored = true;
2918 124 : break;
2919 : }
2920 :
2921 124 : if (!restored)
2922 0 : ereport(FATAL,
2923 : (errmsg("too many replication slots active before shutdown"),
2924 : errhint("Increase \"max_replication_slots\" and try again.")));
2925 : }
2926 :
2927 : /*
2928 : * Maps an invalidation reason for a replication slot to
2929 : * ReplicationSlotInvalidationCause.
2930 : */
2931 : ReplicationSlotInvalidationCause
2932 0 : GetSlotInvalidationCause(const char *cause_name)
2933 : {
2934 : Assert(cause_name);
2935 :
2936 : /* Search lookup table for the cause having this name */
2937 0 : for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
2938 : {
2939 0 : if (strcmp(SlotInvalidationCauses[i].cause_name, cause_name) == 0)
2940 0 : return SlotInvalidationCauses[i].cause;
2941 : }
2942 :
2943 : Assert(false);
2944 0 : return RS_INVAL_NONE; /* to keep compiler quiet */
2945 : }
2946 :
2947 : /*
2948 : * Maps a ReplicationSlotInvalidationCause to the invalidation
2949 : * reason for a replication slot.
2950 : */
2951 : const char *
2952 45 : GetSlotInvalidationCauseName(ReplicationSlotInvalidationCause cause)
2953 : {
2954 : /* Search lookup table for the name of this cause */
2955 146 : for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
2956 : {
2957 146 : if (SlotInvalidationCauses[i].cause == cause)
2958 45 : return SlotInvalidationCauses[i].cause_name;
2959 : }
2960 :
2961 : Assert(false);
2962 0 : return "none"; /* to keep compiler quiet */
2963 : }
2964 :
2965 : /*
2966 : * A helper function to validate slots specified in GUC synchronized_standby_slots.
2967 : *
2968 : * The rawname will be parsed, and the result will be saved into *elemlist.
2969 : */
2970 : static bool
2971 28 : validate_sync_standby_slots(char *rawname, List **elemlist)
2972 : {
2973 : /* Verify syntax and parse string into a list of identifiers */
2974 28 : if (!SplitIdentifierString(rawname, ',', elemlist))
2975 : {
2976 0 : GUC_check_errdetail("List syntax is invalid.");
2977 0 : return false;
2978 : }
2979 :
2980 : /* Iterate the list to validate each slot name */
2981 80 : foreach_ptr(char, name, *elemlist)
2982 : {
2983 : int err_code;
2984 28 : char *err_msg = NULL;
2985 28 : char *err_hint = NULL;
2986 :
2987 28 : if (!ReplicationSlotValidateNameInternal(name, false, &err_code,
2988 : &err_msg, &err_hint))
2989 : {
2990 2 : GUC_check_errcode(err_code);
2991 2 : GUC_check_errdetail("%s", err_msg);
2992 2 : if (err_hint != NULL)
2993 2 : GUC_check_errhint("%s", err_hint);
2994 2 : return false;
2995 : }
2996 : }
2997 :
2998 26 : return true;
2999 : }
3000 :
3001 : /*
3002 : * GUC check_hook for synchronized_standby_slots
3003 : */
3004 : bool
3005 1337 : check_synchronized_standby_slots(char **newval, void **extra, GucSource source)
3006 : {
3007 : char *rawname;
3008 : char *ptr;
3009 : List *elemlist;
3010 : int size;
3011 : bool ok;
3012 : SyncStandbySlotsConfigData *config;
3013 :
3014 1337 : if ((*newval)[0] == '\0')
3015 1309 : return true;
3016 :
3017 : /* Need a modifiable copy of the GUC string */
3018 28 : rawname = pstrdup(*newval);
3019 :
3020 : /* Now verify if the specified slots exist and have correct type */
3021 28 : ok = validate_sync_standby_slots(rawname, &elemlist);
3022 :
3023 28 : if (!ok || elemlist == NIL)
3024 : {
3025 2 : pfree(rawname);
3026 2 : list_free(elemlist);
3027 2 : return ok;
3028 : }
3029 :
3030 : /* Compute the size required for the SyncStandbySlotsConfigData struct */
3031 26 : size = offsetof(SyncStandbySlotsConfigData, slot_names);
3032 78 : foreach_ptr(char, slot_name, elemlist)
3033 26 : size += strlen(slot_name) + 1;
3034 :
3035 : /* GUC extra value must be guc_malloc'd, not palloc'd */
3036 26 : config = (SyncStandbySlotsConfigData *) guc_malloc(LOG, size);
3037 26 : if (!config)
3038 0 : return false;
3039 :
3040 : /* Transform the data into SyncStandbySlotsConfigData */
3041 26 : config->nslotnames = list_length(elemlist);
3042 :
3043 26 : ptr = config->slot_names;
3044 78 : foreach_ptr(char, slot_name, elemlist)
3045 : {
3046 26 : strcpy(ptr, slot_name);
3047 26 : ptr += strlen(slot_name) + 1;
3048 : }
3049 :
3050 26 : *extra = config;
3051 :
3052 26 : pfree(rawname);
3053 26 : list_free(elemlist);
3054 26 : return true;
3055 : }
3056 :
3057 : /*
3058 : * GUC assign_hook for synchronized_standby_slots
3059 : */
3060 : void
3061 1334 : assign_synchronized_standby_slots(const char *newval, void *extra)
3062 : {
3063 : /*
3064 : * The standby slots may have changed, so we must recompute the oldest
3065 : * LSN.
3066 : */
3067 1334 : ss_oldest_flush_lsn = InvalidXLogRecPtr;
3068 :
3069 1334 : synchronized_standby_slots_config = (SyncStandbySlotsConfigData *) extra;
3070 1334 : }
3071 :
3072 : /*
3073 : * Check if the passed slot_name is specified in the synchronized_standby_slots GUC.
3074 : */
3075 : bool
3076 41147 : SlotExistsInSyncStandbySlots(const char *slot_name)
3077 : {
3078 : const char *standby_slot_name;
3079 :
3080 : /* Return false if there is no value in synchronized_standby_slots */
3081 41147 : if (synchronized_standby_slots_config == NULL)
3082 41130 : return false;
3083 :
3084 : /*
3085 : * XXX: We are not expecting this list to be long so a linear search
3086 : * shouldn't hurt but if that turns out not to be true then we can cache
3087 : * this information for each WalSender as well.
3088 : */
3089 17 : standby_slot_name = synchronized_standby_slots_config->slot_names;
3090 25 : for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
3091 : {
3092 17 : if (strcmp(standby_slot_name, slot_name) == 0)
3093 9 : return true;
3094 :
3095 8 : standby_slot_name += strlen(standby_slot_name) + 1;
3096 : }
3097 :
3098 8 : return false;
3099 : }
3100 :
3101 : /*
3102 : * Return true if the slots specified in synchronized_standby_slots have caught up to
3103 : * the given WAL location, false otherwise.
3104 : *
3105 : * The elevel parameter specifies the error level used for logging messages
3106 : * related to slots that do not exist, are invalidated, or are inactive.
3107 : */
3108 : bool
3109 3535 : StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
3110 : {
3111 : const char *name;
3112 3535 : int caught_up_slot_num = 0;
3113 3535 : XLogRecPtr min_restart_lsn = InvalidXLogRecPtr;
3114 :
3115 : /*
3116 : * Don't need to wait for the standbys to catch up if there is no value in
3117 : * synchronized_standby_slots.
3118 : */
3119 3535 : if (synchronized_standby_slots_config == NULL)
3120 3501 : return true;
3121 :
3122 : /*
3123 : * Don't need to wait for the standbys to catch up if we are on a standby
3124 : * server, since we do not support syncing slots to cascading standbys.
3125 : */
3126 34 : if (RecoveryInProgress())
3127 0 : return true;
3128 :
3129 : /*
3130 : * Don't need to wait for the standbys to catch up if they are already
3131 : * beyond the specified WAL location.
3132 : */
3133 34 : if (XLogRecPtrIsValid(ss_oldest_flush_lsn) &&
3134 18 : ss_oldest_flush_lsn >= wait_for_lsn)
3135 9 : return true;
3136 :
3137 : /*
3138 : * To prevent concurrent slot dropping and creation while filtering the
3139 : * slots, take the ReplicationSlotControlLock outside of the loop.
3140 : */
3141 25 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
3142 :
3143 25 : name = synchronized_standby_slots_config->slot_names;
3144 34 : for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
3145 : {
3146 : XLogRecPtr restart_lsn;
3147 : bool invalidated;
3148 : bool inactive;
3149 : ReplicationSlot *slot;
3150 :
3151 25 : slot = SearchNamedReplicationSlot(name, false);
3152 :
3153 : /*
3154 : * If a slot name provided in synchronized_standby_slots does not
3155 : * exist, report a message and exit the loop.
3156 : */
3157 25 : if (!slot)
3158 : {
3159 0 : ereport(elevel,
3160 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
3161 : errmsg("replication slot \"%s\" specified in parameter \"%s\" does not exist",
3162 : name, "synchronized_standby_slots"),
3163 : errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
3164 : name),
3165 : errhint("Create the replication slot \"%s\" or amend parameter \"%s\".",
3166 : name, "synchronized_standby_slots"));
3167 0 : break;
3168 : }
3169 :
3170 : /* Same as above: if a slot is not physical, exit the loop. */
3171 25 : if (SlotIsLogical(slot))
3172 : {
3173 0 : ereport(elevel,
3174 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
3175 : errmsg("cannot specify logical replication slot \"%s\" in parameter \"%s\"",
3176 : name, "synchronized_standby_slots"),
3177 : errdetail("Logical replication is waiting for correction on replication slot \"%s\".",
3178 : name),
3179 : errhint("Remove the logical replication slot \"%s\" from parameter \"%s\".",
3180 : name, "synchronized_standby_slots"));
3181 0 : break;
3182 : }
3183 :
3184 25 : SpinLockAcquire(&slot->mutex);
3185 25 : restart_lsn = slot->data.restart_lsn;
3186 25 : invalidated = slot->data.invalidated != RS_INVAL_NONE;
3187 25 : inactive = slot->active_proc == INVALID_PROC_NUMBER;
3188 25 : SpinLockRelease(&slot->mutex);
3189 :
3190 25 : if (invalidated)
3191 : {
3192 : /* Specified physical slot has been invalidated */
3193 0 : ereport(elevel,
3194 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
3195 : errmsg("physical replication slot \"%s\" specified in parameter \"%s\" has been invalidated",
3196 : name, "synchronized_standby_slots"),
3197 : errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
3198 : name),
3199 : errhint("Drop and recreate the replication slot \"%s\", or amend parameter \"%s\".",
3200 : name, "synchronized_standby_slots"));
3201 0 : break;
3202 : }
3203 :
3204 25 : if (!XLogRecPtrIsValid(restart_lsn) || restart_lsn < wait_for_lsn)
3205 : {
3206 : /* Log a message if no active_pid for this physical slot */
3207 16 : if (inactive)
3208 11 : ereport(elevel,
3209 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
3210 : errmsg("replication slot \"%s\" specified in parameter \"%s\" does not have active_pid",
3211 : name, "synchronized_standby_slots"),
3212 : errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
3213 : name),
3214 : errhint("Start the standby associated with the replication slot \"%s\", or amend parameter \"%s\".",
3215 : name, "synchronized_standby_slots"));
3216 :
3217 : /* Continue if the current slot hasn't caught up. */
3218 16 : break;
3219 : }
3220 :
3221 : Assert(restart_lsn >= wait_for_lsn);
3222 :
3223 9 : if (!XLogRecPtrIsValid(min_restart_lsn) ||
3224 : min_restart_lsn > restart_lsn)
3225 9 : min_restart_lsn = restart_lsn;
3226 :
3227 9 : caught_up_slot_num++;
3228 :
3229 9 : name += strlen(name) + 1;
3230 : }
3231 :
3232 25 : LWLockRelease(ReplicationSlotControlLock);
3233 :
3234 : /*
3235 : * Return false if not all the standbys have caught up to the specified
3236 : * WAL location.
3237 : */
3238 25 : if (caught_up_slot_num != synchronized_standby_slots_config->nslotnames)
3239 16 : return false;
3240 :
3241 : /* The ss_oldest_flush_lsn must not retreat. */
3242 : Assert(!XLogRecPtrIsValid(ss_oldest_flush_lsn) ||
3243 : min_restart_lsn >= ss_oldest_flush_lsn);
3244 :
3245 9 : ss_oldest_flush_lsn = min_restart_lsn;
3246 :
3247 9 : return true;
3248 : }
3249 :
3250 : /*
3251 : * Wait for physical standbys to confirm receiving the given lsn.
3252 : *
3253 : * Used by logical decoding SQL functions. It waits for physical standbys
3254 : * corresponding to the physical slots specified in the synchronized_standby_slots GUC.
3255 : */
3256 : void
3257 236 : WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
3258 : {
3259 : /*
3260 : * Don't need to wait for the standby to catch up if the current acquired
3261 : * slot is not a logical failover slot, or there is no value in
3262 : * synchronized_standby_slots.
3263 : */
3264 236 : if (!MyReplicationSlot->data.failover || !synchronized_standby_slots_config)
3265 235 : return;
3266 :
3267 1 : ConditionVariablePrepareToSleep(&WalSndCtl->wal_confirm_rcv_cv);
3268 :
3269 : for (;;)
3270 : {
3271 2 : CHECK_FOR_INTERRUPTS();
3272 :
3273 2 : if (ConfigReloadPending)
3274 : {
3275 1 : ConfigReloadPending = false;
3276 1 : ProcessConfigFile(PGC_SIGHUP);
3277 : }
3278 :
3279 : /* Exit if done waiting for every slot. */
3280 2 : if (StandbySlotsHaveCaughtup(wait_for_lsn, WARNING))
3281 1 : break;
3282 :
3283 : /*
3284 : * Wait for the slots in the synchronized_standby_slots to catch up,
3285 : * but use a timeout (1s) so we can also check if the
3286 : * synchronized_standby_slots has been changed.
3287 : */
3288 1 : ConditionVariableTimedSleep(&WalSndCtl->wal_confirm_rcv_cv, 1000,
3289 : WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION);
3290 : }
3291 :
3292 1 : ConditionVariableCancelSleep();
3293 : }
|