Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * slot.c
4 : * Replication slot management.
5 : *
6 : *
7 : * Copyright (c) 2012-2026, PostgreSQL Global Development Group
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/backend/replication/slot.c
12 : *
13 : * NOTES
14 : *
15 : * Replication slots are used to keep state about replication streams
16 : * originating from this cluster. Their primary purpose is to prevent the
17 : * premature removal of WAL or of old tuple versions in a manner that would
18 : * interfere with replication; they are also useful for monitoring purposes.
19 : * Slots need to be permanent (to allow restarts), crash-safe, and allocatable
20 : * on standbys (to support cascading setups). The requirement that slots be
21 : * usable on standbys precludes storing them in the system catalogs.
22 : *
23 : * Each replication slot gets its own directory inside the directory
24 : * $PGDATA / PG_REPLSLOT_DIR. Inside that directory the state file will
25 : * contain the slot's own data. Additional data can be stored alongside that
26 : * file if required. While the server is running, the state data is also
27 : * cached in memory for efficiency.
28 : *
29 : * ReplicationSlotAllocationLock must be taken in exclusive mode to allocate
30 : * or free a slot. ReplicationSlotControlLock must be taken in shared mode
31 : * to iterate over the slots, and in exclusive mode to change the in_use flag
32 : * of a slot. The remaining data in each slot is protected by its mutex.
33 : *
34 : *-------------------------------------------------------------------------
35 : */
36 :
37 : #include "postgres.h"
38 :
39 : #include <unistd.h>
40 : #include <sys/stat.h>
41 :
42 : #include "access/transam.h"
43 : #include "access/xlog_internal.h"
44 : #include "access/xlogrecovery.h"
45 : #include "common/file_utils.h"
46 : #include "common/string.h"
47 : #include "miscadmin.h"
48 : #include "pgstat.h"
49 : #include "postmaster/interrupt.h"
50 : #include "replication/logicallauncher.h"
51 : #include "replication/slotsync.h"
52 : #include "replication/slot.h"
53 : #include "replication/walsender_private.h"
54 : #include "storage/fd.h"
55 : #include "storage/ipc.h"
56 : #include "storage/proc.h"
57 : #include "storage/procarray.h"
58 : #include "storage/subsystems.h"
59 : #include "utils/builtins.h"
60 : #include "utils/guc_hooks.h"
61 : #include "utils/injection_point.h"
62 : #include "utils/varlena.h"
63 : #include "utils/wait_event.h"
64 :
65 : /*
66 : * Replication slot on-disk data structure.
67 : */
68 : typedef struct ReplicationSlotOnDisk
69 : {
70 : /* first part of this struct needs to be version independent */
71 :
72 : /* data not covered by checksum */
73 : uint32 magic;
74 : pg_crc32c checksum;
75 :
76 : /* data covered by checksum */
77 : uint32 version;
78 : uint32 length;
79 :
80 : /*
81 : * The actual data in the slot that follows can differ based on the above
82 : * 'version'.
83 : */
84 :
85 : ReplicationSlotPersistentData slotdata;
86 : } ReplicationSlotOnDisk;
87 :
88 : /*
89 : * Struct for the configuration of synchronized_standby_slots.
90 : *
91 : * Note: this must be a flat representation that can be held in a single chunk
92 : * of guc_malloc'd memory, so that it can be stored as the "extra" data for the
93 : * synchronized_standby_slots GUC.
94 : */
95 : typedef struct
96 : {
97 : /* Number of slot names in the slot_names[] */
98 : int nslotnames;
99 :
100 : /*
101 : * slot_names contains 'nslotnames' consecutive null-terminated C strings.
102 : */
103 : char slot_names[FLEXIBLE_ARRAY_MEMBER];
104 : } SyncStandbySlotsConfigData;
105 :
106 : /*
107 : * Lookup table for slot invalidation causes.
108 : */
109 : typedef struct SlotInvalidationCauseMap
110 : {
111 : ReplicationSlotInvalidationCause cause;
112 : const char *cause_name;
113 : } SlotInvalidationCauseMap;
114 :
115 : static const SlotInvalidationCauseMap SlotInvalidationCauses[] = {
116 : {RS_INVAL_NONE, "none"},
117 : {RS_INVAL_WAL_REMOVED, "wal_removed"},
118 : {RS_INVAL_HORIZON, "rows_removed"},
119 : {RS_INVAL_WAL_LEVEL, "wal_level_insufficient"},
120 : {RS_INVAL_IDLE_TIMEOUT, "idle_timeout"},
121 : };
122 :
123 : /*
124 : * Ensure that the lookup table is up-to-date with the enums defined in
125 : * ReplicationSlotInvalidationCause.
126 : */
127 : StaticAssertDecl(lengthof(SlotInvalidationCauses) == (RS_INVAL_MAX_CAUSES + 1),
128 : "array length mismatch");
129 :
130 : /* size of version independent data */
131 : #define ReplicationSlotOnDiskConstantSize \
132 : offsetof(ReplicationSlotOnDisk, slotdata)
133 : /* size of the part of the slot not covered by the checksum */
134 : #define ReplicationSlotOnDiskNotChecksummedSize \
135 : offsetof(ReplicationSlotOnDisk, version)
136 : /* size of the part covered by the checksum */
137 : #define ReplicationSlotOnDiskChecksummedSize \
138 : sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskNotChecksummedSize
139 : /* size of the slot data that is version dependent */
140 : #define ReplicationSlotOnDiskV2Size \
141 : sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
142 :
143 : #define SLOT_MAGIC 0x1051CA1 /* format identifier */
144 : #define SLOT_VERSION 5 /* version for new files */
145 :
146 : /* Control array for replication slot management */
147 : ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
148 :
149 : static void ReplicationSlotsShmemRequest(void *arg);
150 : static void ReplicationSlotsShmemInit(void *arg);
151 :
152 : const ShmemCallbacks ReplicationSlotsShmemCallbacks = {
153 : .request_fn = ReplicationSlotsShmemRequest,
154 : .init_fn = ReplicationSlotsShmemInit,
155 : };
156 :
157 : /* My backend's replication slot in the shared memory array */
158 : ReplicationSlot *MyReplicationSlot = NULL;
159 :
160 : /* GUC variables */
161 : int max_replication_slots = 10; /* the maximum number of replication
162 : * slots */
163 : int max_repack_replication_slots = 5; /* the maximum number of slots
164 : * for REPACK */
165 :
166 : /*
167 : * Invalidate replication slots that have remained idle longer than this
168 : * duration; '0' disables it.
169 : */
170 : int idle_replication_slot_timeout_secs = 0;
171 :
172 : /*
173 : * This GUC lists streaming replication standby server slot names that
174 : * logical WAL sender processes will wait for.
175 : */
176 : char *synchronized_standby_slots;
177 :
178 : /* This is the parsed and cached configuration for synchronized_standby_slots */
179 : static SyncStandbySlotsConfigData *synchronized_standby_slots_config;
180 :
181 : /*
182 : * Oldest LSN that has been confirmed to be flushed to the standbys
183 : * corresponding to the physical slots specified in the synchronized_standby_slots GUC.
184 : */
185 : static XLogRecPtr ss_oldest_flush_lsn = InvalidXLogRecPtr;
186 :
187 : static void ReplicationSlotShmemExit(int code, Datum arg);
188 : static bool IsSlotForConflictCheck(const char *name);
189 : static void ReplicationSlotDropPtr(ReplicationSlot *slot);
190 :
191 : /* internal persistency functions */
192 : static void RestoreSlotFromDisk(const char *name);
193 : static void CreateSlotOnDisk(ReplicationSlot *slot);
194 : static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel);
195 :
196 : /*
197 : * Register shared memory space needed for replication slots.
198 : */
199 : static void
200 1239 : ReplicationSlotsShmemRequest(void *arg)
201 : {
202 : Size size;
203 :
204 1239 : if (max_replication_slots + max_repack_replication_slots == 0)
205 0 : return;
206 :
207 1239 : size = offsetof(ReplicationSlotCtlData, replication_slots);
208 1239 : size = add_size(size,
209 1239 : mul_size(max_replication_slots + max_repack_replication_slots,
210 : sizeof(ReplicationSlot)));
211 1239 : ShmemRequestStruct(.name = "ReplicationSlot Ctl",
212 : .size = size,
213 : .ptr = (void **) &ReplicationSlotCtl,
214 : );
215 : }
216 :
217 : /*
218 : * Initialize shared memory for replication slots.
219 : */
220 : static void
221 1236 : ReplicationSlotsShmemInit(void *arg)
222 : {
223 19574 : for (int i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
224 : {
225 18338 : ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i];
226 :
227 : /* everything else is zeroed by the memset above */
228 18338 : slot->active_proc = INVALID_PROC_NUMBER;
229 18338 : SpinLockInit(&slot->mutex);
230 18338 : LWLockInitialize(&slot->io_in_progress_lock,
231 : LWTRANCHE_REPLICATION_SLOT_IO);
232 18338 : ConditionVariableInit(&slot->active_cv);
233 : }
234 1236 : }
235 :
236 : /*
237 : * Register the callback for replication slot cleanup and releasing.
238 : */
239 : void
240 24222 : ReplicationSlotInitialize(void)
241 : {
242 24222 : before_shmem_exit(ReplicationSlotShmemExit, 0);
243 24222 : }
244 :
245 : /*
246 : * Release and cleanup replication slots.
247 : */
248 : static void
249 24222 : ReplicationSlotShmemExit(int code, Datum arg)
250 : {
251 : /* Make sure active replication slots are released */
252 24222 : if (MyReplicationSlot != NULL)
253 309 : ReplicationSlotRelease();
254 :
255 : /* Also cleanup all the temporary slots. */
256 24222 : ReplicationSlotCleanup(false);
257 24222 : }
258 :
259 : /*
260 : * Check whether the passed slot name is valid and report errors at elevel.
261 : *
262 : * See comments for ReplicationSlotValidateNameInternal().
263 : */
264 : bool
265 859 : ReplicationSlotValidateName(const char *name, bool allow_reserved_name,
266 : int elevel)
267 : {
268 : int err_code;
269 859 : char *err_msg = NULL;
270 859 : char *err_hint = NULL;
271 :
272 859 : if (!ReplicationSlotValidateNameInternal(name, allow_reserved_name,
273 : &err_code, &err_msg, &err_hint))
274 : {
275 : /*
276 : * Use errmsg_internal() and errhint_internal() instead of errmsg()
277 : * and errhint(), since the messages from
278 : * ReplicationSlotValidateNameInternal() are already translated. This
279 : * avoids double translation.
280 : */
281 5 : ereport(elevel,
282 : errcode(err_code),
283 : errmsg_internal("%s", err_msg),
284 : (err_hint != NULL) ? errhint_internal("%s", err_hint) : 0);
285 :
286 0 : pfree(err_msg);
287 0 : if (err_hint != NULL)
288 0 : pfree(err_hint);
289 0 : return false;
290 : }
291 :
292 854 : return true;
293 : }
294 :
295 : /*
296 : * Check whether the passed slot name is valid.
297 : *
298 : * An error will be reported for a reserved replication slot name if
299 : * allow_reserved_name is set to false.
300 : *
301 : * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow
302 : * the name to be used as a directory name on every supported OS.
303 : *
304 : * Returns true if the slot name is valid. Otherwise, returns false and stores
305 : * the error code, error message, and optional hint in err_code, err_msg, and
306 : * err_hint, respectively. The caller is responsible for freeing err_msg and
307 : * err_hint, which are palloc'd.
308 : */
309 : bool
310 1074 : ReplicationSlotValidateNameInternal(const char *name, bool allow_reserved_name,
311 : int *err_code, char **err_msg, char **err_hint)
312 : {
313 : const char *cp;
314 :
315 1074 : if (strlen(name) == 0)
316 : {
317 4 : *err_code = ERRCODE_INVALID_NAME;
318 4 : *err_msg = psprintf(_("replication slot name \"%s\" is too short"), name);
319 4 : *err_hint = NULL;
320 4 : return false;
321 : }
322 :
323 1070 : if (strlen(name) >= NAMEDATALEN)
324 : {
325 0 : *err_code = ERRCODE_NAME_TOO_LONG;
326 0 : *err_msg = psprintf(_("replication slot name \"%s\" is too long"), name);
327 0 : *err_hint = NULL;
328 0 : return false;
329 : }
330 :
331 20691 : for (cp = name; *cp; cp++)
332 : {
333 19623 : if (!((*cp >= 'a' && *cp <= 'z')
334 9375 : || (*cp >= '0' && *cp <= '9')
335 1911 : || (*cp == '_')))
336 : {
337 2 : *err_code = ERRCODE_INVALID_NAME;
338 2 : *err_msg = psprintf(_("replication slot name \"%s\" contains invalid character"), name);
339 2 : *err_hint = psprintf(_("Replication slot names may only contain lower case letters, numbers, and the underscore character."));
340 2 : return false;
341 : }
342 : }
343 :
344 1068 : if (!allow_reserved_name && IsSlotForConflictCheck(name))
345 : {
346 1 : *err_code = ERRCODE_RESERVED_NAME;
347 1 : *err_msg = psprintf(_("replication slot name \"%s\" is reserved"), name);
348 1 : *err_hint = psprintf(_("The name \"%s\" is reserved for the conflict detection slot."),
349 : CONFLICT_DETECTION_SLOT);
350 1 : return false;
351 : }
352 :
353 1067 : return true;
354 : }
355 :
356 : /*
357 : * Return true if the replication slot name is "pg_conflict_detection".
358 : */
359 : static bool
360 2354 : IsSlotForConflictCheck(const char *name)
361 : {
362 2354 : return (strcmp(name, CONFLICT_DETECTION_SLOT) == 0);
363 : }
364 :
365 : /*
366 : * Create a new replication slot and mark it as used by this backend.
367 : *
368 : * name: Name of the slot
369 : * db_specific: logical decoding is db specific; if the slot is going to
370 : * be used for that pass true, otherwise false.
371 : * two_phase: If enabled, allows decoding of prepared transactions.
372 : * repack: If true, use a slot from the pool for REPACK.
373 : * failover: If enabled, allows the slot to be synced to standbys so
374 : * that logical replication can be resumed after failover.
375 : * synced: True if the slot is synchronized from the primary server.
376 : */
377 : void
378 714 : ReplicationSlotCreate(const char *name, bool db_specific,
379 : ReplicationSlotPersistency persistency,
380 : bool two_phase, bool repack, bool failover, bool synced)
381 : {
382 714 : ReplicationSlot *slot = NULL;
383 : int startpoint,
384 : endpoint;
385 :
386 : Assert(MyReplicationSlot == NULL);
387 :
388 : /*
389 : * The logical launcher or pg_upgrade may create or migrate an internal
390 : * slot, so using a reserved name is allowed in these cases.
391 : */
392 714 : ReplicationSlotValidateName(name, IsBinaryUpgrade || IsLogicalLauncher(),
393 714 : ERROR);
394 :
395 713 : if (failover)
396 : {
397 : /*
398 : * Do not allow users to create the failover enabled slots on the
399 : * standby as we do not support sync to the cascading standby.
400 : *
401 : * However, failover enabled slots can be created during slot
402 : * synchronization because we need to retain the same values as the
403 : * remote slot.
404 : */
405 31 : if (RecoveryInProgress() && !IsSyncingReplicationSlots())
406 0 : ereport(ERROR,
407 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
408 : errmsg("cannot enable failover for a replication slot created on the standby"));
409 :
410 : /*
411 : * Do not allow users to create failover enabled temporary slots,
412 : * because temporary slots will not be synced to the standby.
413 : *
414 : * However, failover enabled temporary slots can be created during
415 : * slot synchronization. See the comments atop slotsync.c for details.
416 : */
417 31 : if (persistency == RS_TEMPORARY && !IsSyncingReplicationSlots())
418 1 : ereport(ERROR,
419 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
420 : errmsg("cannot enable failover for a temporary replication slot"));
421 : }
422 :
423 : /*
424 : * If some other backend ran this code concurrently with us, we'd likely
425 : * both allocate the same slot, and that would be bad. We'd also be at
426 : * risk of missing a name collision. Also, we don't want to try to create
427 : * a new slot while somebody's busy cleaning up an old one, because we
428 : * might both be monkeying with the same directory.
429 : */
430 712 : LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
431 :
432 : /*
433 : * Check for name collision (across the whole array), and identify an
434 : * allocatable slot (in the array slice specific to our current use case:
435 : * either general, or REPACK only). We need to hold
436 : * ReplicationSlotControlLock in shared mode for this, so that nobody else
437 : * can change the in_use flags while we're looking at them.
438 : */
439 712 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
440 712 : startpoint = !repack ? 0 : max_replication_slots;
441 712 : endpoint = max_replication_slots + (repack ? max_repack_replication_slots : 0);
442 10463 : for (int i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
443 : {
444 9754 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
445 :
446 9754 : if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
447 3 : ereport(ERROR,
448 : (errcode(ERRCODE_DUPLICATE_OBJECT),
449 : errmsg("replication slot \"%s\" already exists", name)));
450 :
451 9751 : if (i >= startpoint && i < endpoint &&
452 6197 : !s->in_use && slot == NULL)
453 708 : slot = s;
454 : }
455 709 : LWLockRelease(ReplicationSlotControlLock);
456 :
457 : /* If all slots are in use, we're out of luck. */
458 709 : if (slot == NULL)
459 1 : ereport(ERROR,
460 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
461 : errmsg("all replication slots are in use"),
462 : errhint("Free one or increase \"%s\".",
463 : repack ? "max_repack_replication_slots" : "max_replication_slots")));
464 :
465 : /*
466 : * Since this slot is not in use, nobody should be looking at any part of
467 : * it other than the in_use field unless they're trying to allocate it.
468 : * And since we hold ReplicationSlotAllocationLock, nobody except us can
469 : * be doing that. So it's safe to initialize the slot.
470 : */
471 : Assert(!slot->in_use);
472 : Assert(slot->active_proc == INVALID_PROC_NUMBER);
473 :
474 : /* first initialize persistent data */
475 708 : memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData));
476 708 : namestrcpy(&slot->data.name, name);
477 708 : slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
478 708 : slot->data.persistency = persistency;
479 708 : slot->data.two_phase = two_phase;
480 708 : slot->data.two_phase_at = InvalidXLogRecPtr;
481 708 : slot->data.failover = failover;
482 708 : slot->data.synced = synced;
483 :
484 : /* and then data only present in shared memory */
485 708 : slot->just_dirtied = false;
486 708 : slot->dirty = false;
487 708 : slot->effective_xmin = InvalidTransactionId;
488 708 : slot->effective_catalog_xmin = InvalidTransactionId;
489 708 : slot->candidate_catalog_xmin = InvalidTransactionId;
490 708 : slot->candidate_xmin_lsn = InvalidXLogRecPtr;
491 708 : slot->candidate_restart_valid = InvalidXLogRecPtr;
492 708 : slot->candidate_restart_lsn = InvalidXLogRecPtr;
493 708 : slot->last_saved_confirmed_flush = InvalidXLogRecPtr;
494 708 : slot->last_saved_restart_lsn = InvalidXLogRecPtr;
495 708 : slot->inactive_since = 0;
496 708 : slot->slotsync_skip_reason = SS_SKIP_NONE;
497 :
498 : /*
499 : * Create the slot on disk. We haven't actually marked the slot allocated
500 : * yet, so no special cleanup is required if this errors out.
501 : */
502 708 : CreateSlotOnDisk(slot);
503 :
504 : /*
505 : * We need to briefly prevent any other backend from iterating over the
506 : * slots while we flip the in_use flag. We also need to set the active
507 : * flag while holding the ControlLock as otherwise a concurrent
508 : * ReplicationSlotAcquire() could acquire the slot as well.
509 : */
510 708 : LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
511 :
512 708 : slot->in_use = true;
513 :
514 : /* We can now mark the slot active, and that makes it our slot. */
515 708 : SpinLockAcquire(&slot->mutex);
516 : Assert(slot->active_proc == INVALID_PROC_NUMBER);
517 708 : slot->active_proc = MyProcNumber;
518 708 : SpinLockRelease(&slot->mutex);
519 708 : MyReplicationSlot = slot;
520 :
521 708 : LWLockRelease(ReplicationSlotControlLock);
522 :
523 : /*
524 : * Create statistics entry for the new logical slot. We don't collect any
525 : * stats for physical slots, so no need to create an entry for the same.
526 : * See ReplicationSlotDropPtr for why we need to do this before releasing
527 : * ReplicationSlotAllocationLock.
528 : */
529 708 : if (SlotIsLogical(slot))
530 511 : pgstat_create_replslot(slot);
531 :
532 : /*
533 : * Now that the slot has been marked as in_use and active, it's safe to
534 : * let somebody else try to allocate a slot.
535 : */
536 708 : LWLockRelease(ReplicationSlotAllocationLock);
537 :
538 : /* Let everybody know we've modified this slot */
539 708 : ConditionVariableBroadcast(&slot->active_cv);
540 708 : }
541 :
542 : /*
543 : * Search for the named replication slot.
544 : *
545 : * Return the replication slot if found, otherwise NULL.
546 : */
547 : ReplicationSlot *
548 2161 : SearchNamedReplicationSlot(const char *name, bool need_lock)
549 : {
550 : int i;
551 2161 : ReplicationSlot *slot = NULL;
552 :
553 2161 : if (need_lock)
554 655 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
555 :
556 11189 : for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
557 : {
558 10652 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
559 :
560 10652 : if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
561 : {
562 1624 : slot = s;
563 1624 : break;
564 : }
565 : }
566 :
567 2161 : if (need_lock)
568 655 : LWLockRelease(ReplicationSlotControlLock);
569 :
570 2161 : return slot;
571 : }
572 :
573 : /*
574 : * Return the index of the replication slot in
575 : * ReplicationSlotCtl->replication_slots.
576 : *
577 : * This is mainly useful to have an efficient key for storing replication slot
578 : * stats.
579 : */
580 : int
581 8096 : ReplicationSlotIndex(ReplicationSlot *slot)
582 : {
583 : Assert(slot >= ReplicationSlotCtl->replication_slots &&
584 : slot < ReplicationSlotCtl->replication_slots +
585 : (max_replication_slots + max_repack_replication_slots));
586 :
587 8096 : return slot - ReplicationSlotCtl->replication_slots;
588 : }
589 :
590 : /*
591 : * If the slot at 'index' is unused, return false. Otherwise 'name' is set to
592 : * the slot's name and true is returned.
593 : *
594 : * This likely is only useful for pgstat_replslot.c during shutdown, in other
595 : * cases there are obvious TOCTOU issues.
596 : */
597 : bool
598 111 : ReplicationSlotName(int index, Name name)
599 : {
600 : ReplicationSlot *slot;
601 : bool found;
602 :
603 111 : slot = &ReplicationSlotCtl->replication_slots[index];
604 :
605 : /*
606 : * Ensure that the slot cannot be dropped while we copy the name. Don't
607 : * need the spinlock as the name of an existing slot cannot change.
608 : */
609 111 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
610 111 : found = slot->in_use;
611 111 : if (slot->in_use)
612 111 : namestrcpy(name, NameStr(slot->data.name));
613 111 : LWLockRelease(ReplicationSlotControlLock);
614 :
615 111 : return found;
616 : }
617 :
618 : /*
619 : * Find a previously created slot and mark it as used by this process.
620 : *
621 : * An error is raised if nowait is true and the slot is currently in use. If
622 : * nowait is false, we sleep until the slot is released by the owning process.
623 : *
624 : * An error is raised if error_if_invalid is true and the slot is found to
625 : * be invalid. It should always be set to true, except when we are temporarily
626 : * acquiring the slot and don't intend to change it.
627 : */
628 : void
629 1422 : ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
630 : {
631 : ReplicationSlot *s;
632 : ProcNumber active_proc;
633 : int active_pid;
634 :
635 : Assert(name != NULL);
636 :
637 1422 : retry:
638 : Assert(MyReplicationSlot == NULL);
639 :
640 1422 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
641 :
642 : /* Check if the slot exists with the given name. */
643 1422 : s = SearchNamedReplicationSlot(name, false);
644 1422 : if (s == NULL || !s->in_use)
645 : {
646 9 : LWLockRelease(ReplicationSlotControlLock);
647 :
648 9 : ereport(ERROR,
649 : (errcode(ERRCODE_UNDEFINED_OBJECT),
650 : errmsg("replication slot \"%s\" does not exist",
651 : name)));
652 : }
653 :
654 : /*
655 : * Do not allow users to acquire the reserved slot. This scenario may
656 : * occur if the launcher that owns the slot has terminated unexpectedly
657 : * due to an error, and a backend process attempts to reuse the slot.
658 : */
659 1413 : if (!IsLogicalLauncher() && IsSlotForConflictCheck(name))
660 0 : ereport(ERROR,
661 : errcode(ERRCODE_UNDEFINED_OBJECT),
662 : errmsg("cannot acquire replication slot \"%s\"", name),
663 : errdetail("The slot is reserved for conflict detection and can only be acquired by logical replication launcher."));
664 :
665 : /*
666 : * This is the slot we want; check if it's active under some other
667 : * process. In single user mode, we don't need this check.
668 : */
669 1413 : if (IsUnderPostmaster)
670 : {
671 : /*
672 : * Get ready to sleep on the slot in case it is active. (We may end
673 : * up not sleeping, but we don't want to do this while holding the
674 : * spinlock.)
675 : */
676 1408 : if (!nowait)
677 296 : ConditionVariablePrepareToSleep(&s->active_cv);
678 :
679 : /*
680 : * It is important to reset the inactive_since under spinlock here to
681 : * avoid race conditions with slot invalidation. See comments related
682 : * to inactive_since in InvalidatePossiblyObsoleteSlot.
683 : */
684 1408 : SpinLockAcquire(&s->mutex);
685 1408 : if (s->active_proc == INVALID_PROC_NUMBER)
686 1256 : s->active_proc = MyProcNumber;
687 1408 : active_proc = s->active_proc;
688 1408 : ReplicationSlotSetInactiveSince(s, 0, false);
689 1408 : SpinLockRelease(&s->mutex);
690 : }
691 : else
692 : {
693 5 : s->active_proc = active_proc = MyProcNumber;
694 5 : ReplicationSlotSetInactiveSince(s, 0, true);
695 : }
696 1413 : active_pid = GetPGProcByNumber(active_proc)->pid;
697 1413 : LWLockRelease(ReplicationSlotControlLock);
698 :
699 : /*
700 : * If we found the slot but it's already active in another process, we
701 : * wait until the owning process signals us that it's been released, or
702 : * error out.
703 : */
704 1413 : if (active_proc != MyProcNumber)
705 : {
706 0 : if (!nowait)
707 : {
708 : /* Wait here until we get signaled, and then restart */
709 0 : ConditionVariableSleep(&s->active_cv,
710 : WAIT_EVENT_REPLICATION_SLOT_DROP);
711 0 : ConditionVariableCancelSleep();
712 0 : goto retry;
713 : }
714 :
715 0 : ereport(ERROR,
716 : (errcode(ERRCODE_OBJECT_IN_USE),
717 : errmsg("replication slot \"%s\" is active for PID %d",
718 : NameStr(s->data.name), active_pid)));
719 : }
720 1413 : else if (!nowait)
721 296 : ConditionVariableCancelSleep(); /* no sleep needed after all */
722 :
723 : /* We made this slot active, so it's ours now. */
724 1413 : MyReplicationSlot = s;
725 :
726 : /*
727 : * We need to check for invalidation after making the slot ours to avoid
728 : * the possible race condition with the checkpointer that can otherwise
729 : * invalidate the slot immediately after the check.
730 : */
731 1413 : if (error_if_invalid && s->data.invalidated != RS_INVAL_NONE)
732 8 : ereport(ERROR,
733 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
734 : errmsg("can no longer access replication slot \"%s\"",
735 : NameStr(s->data.name)),
736 : errdetail("This replication slot has been invalidated due to \"%s\".",
737 : GetSlotInvalidationCauseName(s->data.invalidated)));
738 :
739 : /* Let everybody know we've modified this slot */
740 1405 : ConditionVariableBroadcast(&s->active_cv);
741 :
742 : /*
743 : * The call to pgstat_acquire_replslot() protects against stats for a
744 : * different slot, from before a restart or such, being present during
745 : * pgstat_report_replslot().
746 : */
747 1405 : if (SlotIsLogical(s))
748 1180 : pgstat_acquire_replslot(s);
749 :
750 :
751 1405 : if (am_walsender)
752 : {
753 964 : ereport(log_replication_commands ? LOG : DEBUG1,
754 : SlotIsLogical(s)
755 : ? errmsg("acquired logical replication slot \"%s\"",
756 : NameStr(s->data.name))
757 : : errmsg("acquired physical replication slot \"%s\"",
758 : NameStr(s->data.name)));
759 : }
760 1405 : }
761 :
762 : /*
763 : * Release the replication slot that this backend considers to own.
764 : *
765 : * This or another backend can re-acquire the slot later.
766 : * Resources this slot requires will be preserved.
767 : */
768 : void
769 1695 : ReplicationSlotRelease(void)
770 : {
771 1695 : ReplicationSlot *slot = MyReplicationSlot;
772 1695 : char *slotname = NULL; /* keep compiler quiet */
773 : bool is_logical;
774 1695 : TimestampTz now = 0;
775 :
776 : Assert(slot != NULL && slot->active_proc != INVALID_PROC_NUMBER);
777 :
778 1695 : is_logical = SlotIsLogical(slot);
779 :
780 1695 : if (am_walsender)
781 1185 : slotname = pstrdup(NameStr(slot->data.name));
782 :
783 1695 : if (slot->data.persistency == RS_EPHEMERAL)
784 : {
785 : /*
786 : * Delete the slot. There is no !PANIC case where this is allowed to
787 : * fail, all that may happen is an incomplete cleanup of the on-disk
788 : * data.
789 : */
790 6 : ReplicationSlotDropAcquired();
791 :
792 : /*
793 : * Request to disable logical decoding, even though this slot may not
794 : * have been the last logical slot. The checkpointer will verify if
795 : * logical decoding should actually be disabled.
796 : */
797 6 : if (is_logical)
798 6 : RequestDisableLogicalDecoding();
799 : }
800 :
801 : /*
802 : * If slot needed to temporarily restrain both data and catalog xmin to
803 : * create the catalog snapshot, remove that temporary constraint.
804 : * Snapshots can only be exported while the initial snapshot is still
805 : * acquired.
806 : */
807 1695 : if (!TransactionIdIsValid(slot->data.xmin) &&
808 1662 : TransactionIdIsValid(slot->effective_xmin))
809 : {
810 214 : SpinLockAcquire(&slot->mutex);
811 214 : slot->effective_xmin = InvalidTransactionId;
812 214 : SpinLockRelease(&slot->mutex);
813 214 : ReplicationSlotsComputeRequiredXmin(false);
814 : }
815 :
816 : /*
817 : * Set the time since the slot has become inactive. We get the current
818 : * time beforehand to avoid system call while holding the spinlock.
819 : */
820 1695 : now = GetCurrentTimestamp();
821 :
822 1695 : if (slot->data.persistency == RS_PERSISTENT)
823 : {
824 : /*
825 : * Mark persistent slot inactive. We're not freeing it, just
826 : * disconnecting, but wake up others that may be waiting for it.
827 : */
828 1384 : SpinLockAcquire(&slot->mutex);
829 1384 : slot->active_proc = INVALID_PROC_NUMBER;
830 1384 : ReplicationSlotSetInactiveSince(slot, now, false);
831 1384 : SpinLockRelease(&slot->mutex);
832 1384 : ConditionVariableBroadcast(&slot->active_cv);
833 : }
834 : else
835 311 : ReplicationSlotSetInactiveSince(slot, now, true);
836 :
837 1695 : MyReplicationSlot = NULL;
838 :
839 : /* might not have been set when we've been a plain slot */
840 1695 : LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
841 1695 : MyProc->statusFlags &= ~PROC_IN_LOGICAL_DECODING;
842 1695 : ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags;
843 1695 : LWLockRelease(ProcArrayLock);
844 :
845 1695 : if (am_walsender)
846 : {
847 1185 : ereport(log_replication_commands ? LOG : DEBUG1,
848 : is_logical
849 : ? errmsg("released logical replication slot \"%s\"",
850 : slotname)
851 : : errmsg("released physical replication slot \"%s\"",
852 : slotname));
853 :
854 1185 : pfree(slotname);
855 : }
856 1695 : }
857 :
858 : /*
859 : * Cleanup temporary slots created in current session.
860 : *
861 : * Cleanup only synced temporary slots if 'synced_only' is true, else
862 : * cleanup all temporary slots.
863 : *
864 : * If it drops the last logical slot in the cluster, requests to disable
865 : * logical decoding.
866 : */
867 : void
868 55390 : ReplicationSlotCleanup(bool synced_only)
869 : {
870 : int i;
871 : bool found_valid_logicalslot;
872 55390 : bool dropped_logical = false;
873 :
874 : Assert(MyReplicationSlot == NULL);
875 :
876 55543 : restart:
877 55543 : found_valid_logicalslot = false;
878 55543 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
879 880625 : for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
880 : {
881 825235 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
882 :
883 825235 : if (!s->in_use)
884 809916 : continue;
885 :
886 15319 : SpinLockAcquire(&s->mutex);
887 :
888 30638 : found_valid_logicalslot |=
889 15319 : (SlotIsLogical(s) && s->data.invalidated == RS_INVAL_NONE);
890 :
891 15319 : if ((s->active_proc == MyProcNumber &&
892 153 : (!synced_only || s->data.synced)))
893 : {
894 : Assert(s->data.persistency == RS_TEMPORARY);
895 153 : SpinLockRelease(&s->mutex);
896 153 : LWLockRelease(ReplicationSlotControlLock); /* avoid deadlock */
897 :
898 153 : if (SlotIsLogical(s))
899 10 : dropped_logical = true;
900 :
901 153 : ReplicationSlotDropPtr(s);
902 :
903 153 : ConditionVariableBroadcast(&s->active_cv);
904 153 : goto restart;
905 : }
906 : else
907 15166 : SpinLockRelease(&s->mutex);
908 : }
909 :
910 55390 : LWLockRelease(ReplicationSlotControlLock);
911 :
912 55390 : if (dropped_logical && !found_valid_logicalslot)
913 3 : RequestDisableLogicalDecoding();
914 55390 : }
915 :
916 : /*
917 : * Permanently drop replication slot identified by the passed in name.
918 : */
919 : void
920 444 : ReplicationSlotDrop(const char *name, bool nowait)
921 : {
922 : bool is_logical;
923 :
924 : Assert(MyReplicationSlot == NULL);
925 :
926 444 : ReplicationSlotAcquire(name, nowait, false);
927 :
928 : /*
929 : * Do not allow users to drop the slots which are currently being synced
930 : * from the primary to the standby.
931 : */
932 437 : if (RecoveryInProgress() && MyReplicationSlot->data.synced)
933 1 : ereport(ERROR,
934 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
935 : errmsg("cannot drop replication slot \"%s\"", name),
936 : errdetail("This replication slot is being synchronized from the primary server."));
937 :
938 436 : is_logical = SlotIsLogical(MyReplicationSlot);
939 :
940 436 : ReplicationSlotDropAcquired();
941 :
942 436 : if (is_logical)
943 416 : RequestDisableLogicalDecoding();
944 436 : }
945 :
946 : /*
947 : * Change the definition of the slot identified by the specified name.
948 : *
949 : * Altering the two_phase property of a slot requires caution on the
950 : * client-side. Enabling it at any random point during decoding has the
951 : * risk that transactions prepared before this change may be skipped by
952 : * the decoder, leading to missing prepare records on the client. So, we
953 : * enable it for subscription related slots only once the initial tablesync
954 : * is finished. See comments atop worker.c. Disabling it is safe only when
955 : * there are no pending prepared transaction, otherwise, the changes of
956 : * already prepared transactions can be replicated again along with their
957 : * corresponding commit leading to duplicate data or errors.
958 : */
959 : void
960 7 : ReplicationSlotAlter(const char *name, const bool *failover,
961 : const bool *two_phase)
962 : {
963 7 : bool update_slot = false;
964 :
965 : Assert(MyReplicationSlot == NULL);
966 : Assert(failover || two_phase);
967 :
968 7 : ReplicationSlotAcquire(name, false, true);
969 :
970 6 : if (SlotIsPhysical(MyReplicationSlot))
971 0 : ereport(ERROR,
972 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
973 : errmsg("cannot use %s with a physical replication slot",
974 : "ALTER_REPLICATION_SLOT"));
975 :
976 6 : if (RecoveryInProgress())
977 : {
978 : /*
979 : * Do not allow users to alter the slots which are currently being
980 : * synced from the primary to the standby.
981 : */
982 1 : if (MyReplicationSlot->data.synced)
983 1 : ereport(ERROR,
984 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
985 : errmsg("cannot alter replication slot \"%s\"", name),
986 : errdetail("This replication slot is being synchronized from the primary server."));
987 :
988 : /*
989 : * Do not allow users to enable failover on the standby as we do not
990 : * support sync to the cascading standby.
991 : */
992 0 : if (failover && *failover)
993 0 : ereport(ERROR,
994 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
995 : errmsg("cannot enable failover for a replication slot"
996 : " on the standby"));
997 : }
998 :
999 5 : if (failover)
1000 : {
1001 : /*
1002 : * Do not allow users to enable failover for temporary slots as we do
1003 : * not support syncing temporary slots to the standby.
1004 : */
1005 4 : if (*failover && MyReplicationSlot->data.persistency == RS_TEMPORARY)
1006 0 : ereport(ERROR,
1007 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1008 : errmsg("cannot enable failover for a temporary replication slot"));
1009 :
1010 4 : if (MyReplicationSlot->data.failover != *failover)
1011 : {
1012 4 : SpinLockAcquire(&MyReplicationSlot->mutex);
1013 4 : MyReplicationSlot->data.failover = *failover;
1014 4 : SpinLockRelease(&MyReplicationSlot->mutex);
1015 :
1016 4 : update_slot = true;
1017 : }
1018 : }
1019 :
1020 5 : if (two_phase && MyReplicationSlot->data.two_phase != *two_phase)
1021 : {
1022 1 : SpinLockAcquire(&MyReplicationSlot->mutex);
1023 1 : MyReplicationSlot->data.two_phase = *two_phase;
1024 1 : SpinLockRelease(&MyReplicationSlot->mutex);
1025 :
1026 1 : update_slot = true;
1027 : }
1028 :
1029 5 : if (update_slot)
1030 : {
1031 5 : ReplicationSlotMarkDirty();
1032 5 : ReplicationSlotSave();
1033 : }
1034 :
1035 5 : ReplicationSlotRelease();
1036 5 : }
1037 :
1038 : /*
1039 : * Permanently drop the currently acquired replication slot.
1040 : */
1041 : void
1042 453 : ReplicationSlotDropAcquired(void)
1043 : {
1044 453 : ReplicationSlot *slot = MyReplicationSlot;
1045 :
1046 : Assert(MyReplicationSlot != NULL);
1047 :
1048 : /* slot isn't acquired anymore */
1049 453 : MyReplicationSlot = NULL;
1050 :
1051 453 : ReplicationSlotDropPtr(slot);
1052 453 : }
1053 :
1054 : /*
1055 : * Permanently drop the replication slot which will be released by the point
1056 : * this function returns.
1057 : */
1058 : static void
1059 606 : ReplicationSlotDropPtr(ReplicationSlot *slot)
1060 : {
1061 : char path[MAXPGPATH];
1062 : char tmppath[MAXPGPATH];
1063 :
1064 : /*
1065 : * If some other backend ran this code concurrently with us, we might try
1066 : * to delete a slot with a certain name while someone else was trying to
1067 : * create a slot with the same name.
1068 : */
1069 606 : LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
1070 :
1071 : /* Generate pathnames. */
1072 606 : sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(slot->data.name));
1073 606 : sprintf(tmppath, "%s/%s.tmp", PG_REPLSLOT_DIR, NameStr(slot->data.name));
1074 :
1075 : /*
1076 : * Rename the slot directory on disk, so that we'll no longer recognize
1077 : * this as a valid slot. Note that if this fails, we've got to mark the
1078 : * slot inactive before bailing out. If we're dropping an ephemeral or a
1079 : * temporary slot, we better never fail hard as the caller won't expect
1080 : * the slot to survive and this might get called during error handling.
1081 : */
1082 606 : if (rename(path, tmppath) == 0)
1083 : {
1084 : /*
1085 : * We need to fsync() the directory we just renamed and its parent to
1086 : * make sure that our changes are on disk in a crash-safe fashion. If
1087 : * fsync() fails, we can't be sure whether the changes are on disk or
1088 : * not. For now, we handle that by panicking;
1089 : * StartupReplicationSlots() will try to straighten it out after
1090 : * restart.
1091 : */
1092 606 : START_CRIT_SECTION();
1093 606 : fsync_fname(tmppath, true);
1094 606 : fsync_fname(PG_REPLSLOT_DIR, true);
1095 606 : END_CRIT_SECTION();
1096 : }
1097 : else
1098 : {
1099 0 : bool fail_softly = slot->data.persistency != RS_PERSISTENT;
1100 :
1101 0 : SpinLockAcquire(&slot->mutex);
1102 0 : slot->active_proc = INVALID_PROC_NUMBER;
1103 0 : SpinLockRelease(&slot->mutex);
1104 :
1105 : /* wake up anyone waiting on this slot */
1106 0 : ConditionVariableBroadcast(&slot->active_cv);
1107 :
1108 0 : ereport(fail_softly ? WARNING : ERROR,
1109 : (errcode_for_file_access(),
1110 : errmsg("could not rename file \"%s\" to \"%s\": %m",
1111 : path, tmppath)));
1112 : }
1113 :
1114 : /*
1115 : * The slot is definitely gone. Lock out concurrent scans of the array
1116 : * long enough to kill it. It's OK to clear the active PID here without
1117 : * grabbing the mutex because nobody else can be scanning the array here,
1118 : * and nobody can be attached to this slot and thus access it without
1119 : * scanning the array.
1120 : *
1121 : * Also wake up processes waiting for it.
1122 : */
1123 606 : LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
1124 606 : slot->active_proc = INVALID_PROC_NUMBER;
1125 606 : slot->in_use = false;
1126 606 : LWLockRelease(ReplicationSlotControlLock);
1127 606 : ConditionVariableBroadcast(&slot->active_cv);
1128 :
1129 : /*
1130 : * Slot is dead and doesn't prevent resource removal anymore, recompute
1131 : * limits.
1132 : */
1133 606 : ReplicationSlotsComputeRequiredXmin(false);
1134 606 : ReplicationSlotsComputeRequiredLSN();
1135 :
1136 : /*
1137 : * If removing the directory fails, the worst thing that will happen is
1138 : * that the user won't be able to create a new slot with the same name
1139 : * until the next server restart. We warn about it, but that's all.
1140 : */
1141 606 : if (!rmtree(tmppath, true))
1142 0 : ereport(WARNING,
1143 : (errmsg("could not remove directory \"%s\"", tmppath)));
1144 :
1145 : /*
1146 : * Drop the statistics entry for the replication slot. Do this while
1147 : * holding ReplicationSlotAllocationLock so that we don't drop a
1148 : * statistics entry for another slot with the same name just created in
1149 : * another session.
1150 : */
1151 606 : if (SlotIsLogical(slot))
1152 442 : pgstat_drop_replslot(slot);
1153 :
1154 : /*
1155 : * We release this at the very end, so that nobody starts trying to create
1156 : * a slot while we're still cleaning up the detritus of the old one.
1157 : */
1158 606 : LWLockRelease(ReplicationSlotAllocationLock);
1159 606 : }
1160 :
1161 : /*
1162 : * Serialize the currently acquired slot's state from memory to disk, thereby
1163 : * guaranteeing the current state will survive a crash.
1164 : */
1165 : void
1166 1508 : ReplicationSlotSave(void)
1167 : {
1168 : char path[MAXPGPATH];
1169 :
1170 : Assert(MyReplicationSlot != NULL);
1171 :
1172 1508 : sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(MyReplicationSlot->data.name));
1173 1508 : SaveSlotToPath(MyReplicationSlot, path, ERROR);
1174 1508 : }
1175 :
1176 : /*
1177 : * Signal that it would be useful if the currently acquired slot would be
1178 : * flushed out to disk.
1179 : *
1180 : * Note that the actual flush to disk can be delayed for a long time, if
1181 : * required for correctness explicitly do a ReplicationSlotSave().
1182 : */
1183 : void
1184 41459 : ReplicationSlotMarkDirty(void)
1185 : {
1186 41459 : ReplicationSlot *slot = MyReplicationSlot;
1187 :
1188 : Assert(MyReplicationSlot != NULL);
1189 :
1190 41459 : SpinLockAcquire(&slot->mutex);
1191 41459 : MyReplicationSlot->just_dirtied = true;
1192 41459 : MyReplicationSlot->dirty = true;
1193 41459 : SpinLockRelease(&slot->mutex);
1194 41459 : }
1195 :
1196 : /*
1197 : * Convert a slot that's marked as RS_EPHEMERAL or RS_TEMPORARY to a
1198 : * RS_PERSISTENT slot, guaranteeing it will be there after an eventual crash.
1199 : */
1200 : void
1201 492 : ReplicationSlotPersist(void)
1202 : {
1203 492 : ReplicationSlot *slot = MyReplicationSlot;
1204 :
1205 : Assert(slot != NULL);
1206 : Assert(slot->data.persistency != RS_PERSISTENT);
1207 :
1208 492 : SpinLockAcquire(&slot->mutex);
1209 492 : slot->data.persistency = RS_PERSISTENT;
1210 492 : SpinLockRelease(&slot->mutex);
1211 :
1212 492 : ReplicationSlotMarkDirty();
1213 492 : ReplicationSlotSave();
1214 492 : }
1215 :
1216 : /*
1217 : * Compute the oldest xmin across all slots and store it in the ProcArray.
1218 : *
1219 : * If already_locked is true, both the ReplicationSlotControlLock and the
1220 : * ProcArrayLock have already been acquired exclusively. It is crucial that the
1221 : * caller first acquires the ReplicationSlotControlLock, followed by the
1222 : * ProcArrayLock, to prevent any undetectable deadlocks since this function
1223 : * acquires them in that order.
1224 : */
1225 : void
1226 2631 : ReplicationSlotsComputeRequiredXmin(bool already_locked)
1227 : {
1228 : int i;
1229 2631 : TransactionId agg_xmin = InvalidTransactionId;
1230 2631 : TransactionId agg_catalog_xmin = InvalidTransactionId;
1231 :
1232 : Assert(ReplicationSlotCtl != NULL);
1233 : Assert(!already_locked ||
1234 : (LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_EXCLUSIVE) &&
1235 : LWLockHeldByMeInMode(ProcArrayLock, LW_EXCLUSIVE)));
1236 :
1237 : /*
1238 : * Hold the ReplicationSlotControlLock until after updating the slot xmin
1239 : * values, so no backend updates the initial xmin for newly created slot
1240 : * concurrently. A shared lock is used here to minimize lock contention,
1241 : * especially when many slots exist and advancements occur frequently.
1242 : * This is safe since an exclusive lock is taken during initial slot xmin
1243 : * update in slot creation.
1244 : *
1245 : * One might think that we can hold the ProcArrayLock exclusively and
1246 : * update the slot xmin values, but it could increase lock contention on
1247 : * the ProcArrayLock, which is not great since this function can be called
1248 : * at non-negligible frequency.
1249 : *
1250 : * Concurrent invocation of this function may cause the computed slot xmin
1251 : * to regress. However, this is harmless because tuples prior to the most
1252 : * recent xmin are no longer useful once advancement occurs (see
1253 : * LogicalConfirmReceivedLocation where the slot's xmin value is flushed
1254 : * before updating the effective_xmin). Thus, such regression merely
1255 : * prevents VACUUM from prematurely removing tuples without causing the
1256 : * early deletion of required data.
1257 : */
1258 2631 : if (!already_locked)
1259 2119 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1260 :
1261 40008 : for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
1262 : {
1263 37377 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
1264 : TransactionId effective_xmin;
1265 : TransactionId effective_catalog_xmin;
1266 : bool invalidated;
1267 :
1268 37377 : if (!s->in_use)
1269 34849 : continue;
1270 :
1271 2528 : SpinLockAcquire(&s->mutex);
1272 2528 : effective_xmin = s->effective_xmin;
1273 2528 : effective_catalog_xmin = s->effective_catalog_xmin;
1274 2528 : invalidated = s->data.invalidated != RS_INVAL_NONE;
1275 2528 : SpinLockRelease(&s->mutex);
1276 :
1277 : /* invalidated slots need not apply */
1278 2528 : if (invalidated)
1279 26 : continue;
1280 :
1281 : /* check the data xmin */
1282 2502 : if (TransactionIdIsValid(effective_xmin) &&
1283 7 : (!TransactionIdIsValid(agg_xmin) ||
1284 7 : TransactionIdPrecedes(effective_xmin, agg_xmin)))
1285 363 : agg_xmin = effective_xmin;
1286 :
1287 : /* check the catalog xmin */
1288 2502 : if (TransactionIdIsValid(effective_catalog_xmin) &&
1289 1038 : (!TransactionIdIsValid(agg_catalog_xmin) ||
1290 1038 : TransactionIdPrecedes(effective_catalog_xmin, agg_catalog_xmin)))
1291 1303 : agg_catalog_xmin = effective_catalog_xmin;
1292 : }
1293 :
1294 2631 : ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
1295 :
1296 2631 : if (!already_locked)
1297 2119 : LWLockRelease(ReplicationSlotControlLock);
1298 2631 : }
1299 :
1300 : /*
1301 : * Compute the oldest restart LSN across all slots and inform xlog module.
1302 : *
1303 : * Note: while max_slot_wal_keep_size is theoretically relevant for this
1304 : * purpose, we don't try to account for that, because this module doesn't
1305 : * know what to compare against.
1306 : */
1307 : void
1308 42201 : ReplicationSlotsComputeRequiredLSN(void)
1309 : {
1310 : int i;
1311 42201 : XLogRecPtr min_required = InvalidXLogRecPtr;
1312 :
1313 : Assert(ReplicationSlotCtl != NULL);
1314 :
1315 42201 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1316 671255 : for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
1317 : {
1318 629054 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
1319 : XLogRecPtr restart_lsn;
1320 : XLogRecPtr last_saved_restart_lsn;
1321 : bool invalidated;
1322 : ReplicationSlotPersistency persistency;
1323 :
1324 629054 : if (!s->in_use)
1325 586721 : continue;
1326 :
1327 42333 : SpinLockAcquire(&s->mutex);
1328 42333 : persistency = s->data.persistency;
1329 42333 : restart_lsn = s->data.restart_lsn;
1330 42333 : invalidated = s->data.invalidated != RS_INVAL_NONE;
1331 42333 : last_saved_restart_lsn = s->last_saved_restart_lsn;
1332 42333 : SpinLockRelease(&s->mutex);
1333 :
1334 : /* invalidated slots need not apply */
1335 42333 : if (invalidated)
1336 27 : continue;
1337 :
1338 : /*
1339 : * For persistent slot use last_saved_restart_lsn to compute the
1340 : * oldest LSN for removal of WAL segments. The segments between
1341 : * last_saved_restart_lsn and restart_lsn might be needed by a
1342 : * persistent slot in the case of database crash. Non-persistent
1343 : * slots can't survive the database crash, so we don't care about
1344 : * last_saved_restart_lsn for them.
1345 : */
1346 42306 : if (persistency == RS_PERSISTENT)
1347 : {
1348 41414 : if (XLogRecPtrIsValid(last_saved_restart_lsn) &&
1349 : restart_lsn > last_saved_restart_lsn)
1350 : {
1351 38874 : restart_lsn = last_saved_restart_lsn;
1352 : }
1353 : }
1354 :
1355 42306 : if (XLogRecPtrIsValid(restart_lsn) &&
1356 1261 : (!XLogRecPtrIsValid(min_required) ||
1357 : restart_lsn < min_required))
1358 41159 : min_required = restart_lsn;
1359 : }
1360 42201 : LWLockRelease(ReplicationSlotControlLock);
1361 :
1362 42201 : XLogSetReplicationSlotMinimumLSN(min_required);
1363 42201 : }
1364 :
1365 : /*
1366 : * Compute the oldest WAL LSN required by *logical* decoding slots..
1367 : *
1368 : * Returns InvalidXLogRecPtr if logical decoding is disabled or no logical
1369 : * slots exist.
1370 : *
1371 : * NB: this returns a value >= ReplicationSlotsComputeRequiredLSN(), since it
1372 : * ignores physical replication slots.
1373 : *
1374 : * The results aren't required frequently, so we don't maintain a precomputed
1375 : * value like we do for ComputeRequiredLSN() and ComputeRequiredXmin().
1376 : */
1377 : XLogRecPtr
1378 3866 : ReplicationSlotsComputeLogicalRestartLSN(void)
1379 : {
1380 3866 : XLogRecPtr result = InvalidXLogRecPtr;
1381 : int i;
1382 :
1383 3866 : if (max_replication_slots + max_repack_replication_slots <= 0)
1384 0 : return InvalidXLogRecPtr;
1385 :
1386 3866 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1387 :
1388 61306 : for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
1389 : {
1390 : ReplicationSlot *s;
1391 : XLogRecPtr restart_lsn;
1392 : XLogRecPtr last_saved_restart_lsn;
1393 : bool invalidated;
1394 : ReplicationSlotPersistency persistency;
1395 :
1396 57440 : s = &ReplicationSlotCtl->replication_slots[i];
1397 :
1398 : /* cannot change while ReplicationSlotCtlLock is held */
1399 57440 : if (!s->in_use)
1400 56622 : continue;
1401 :
1402 : /* we're only interested in logical slots */
1403 818 : if (!SlotIsLogical(s))
1404 562 : continue;
1405 :
1406 : /* read once, it's ok if it increases while we're checking */
1407 256 : SpinLockAcquire(&s->mutex);
1408 256 : persistency = s->data.persistency;
1409 256 : restart_lsn = s->data.restart_lsn;
1410 256 : invalidated = s->data.invalidated != RS_INVAL_NONE;
1411 256 : last_saved_restart_lsn = s->last_saved_restart_lsn;
1412 256 : SpinLockRelease(&s->mutex);
1413 :
1414 : /* invalidated slots need not apply */
1415 256 : if (invalidated)
1416 10 : continue;
1417 :
1418 : /*
1419 : * For persistent slot use last_saved_restart_lsn to compute the
1420 : * oldest LSN for removal of WAL segments. The segments between
1421 : * last_saved_restart_lsn and restart_lsn might be needed by a
1422 : * persistent slot in the case of database crash. Non-persistent
1423 : * slots can't survive the database crash, so we don't care about
1424 : * last_saved_restart_lsn for them.
1425 : */
1426 246 : if (persistency == RS_PERSISTENT)
1427 : {
1428 244 : if (XLogRecPtrIsValid(last_saved_restart_lsn) &&
1429 : restart_lsn > last_saved_restart_lsn)
1430 : {
1431 0 : restart_lsn = last_saved_restart_lsn;
1432 : }
1433 : }
1434 :
1435 246 : if (!XLogRecPtrIsValid(restart_lsn))
1436 0 : continue;
1437 :
1438 246 : if (!XLogRecPtrIsValid(result) ||
1439 : restart_lsn < result)
1440 196 : result = restart_lsn;
1441 : }
1442 :
1443 3866 : LWLockRelease(ReplicationSlotControlLock);
1444 :
1445 3866 : return result;
1446 : }
1447 :
1448 : /*
1449 : * ReplicationSlotsCountDBSlots -- count the number of slots that refer to the
1450 : * passed database oid.
1451 : *
1452 : * Returns true if there are any slots referencing the database. *nslots will
1453 : * be set to the absolute number of slots in the database, *nactive to ones
1454 : * currently active.
1455 : */
1456 : bool
1457 51 : ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
1458 : {
1459 : int i;
1460 :
1461 51 : *nslots = *nactive = 0;
1462 :
1463 51 : if (max_replication_slots + max_repack_replication_slots <= 0)
1464 0 : return false;
1465 :
1466 51 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1467 789 : for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
1468 : {
1469 : ReplicationSlot *s;
1470 :
1471 738 : s = &ReplicationSlotCtl->replication_slots[i];
1472 :
1473 : /* cannot change while ReplicationSlotCtlLock is held */
1474 738 : if (!s->in_use)
1475 717 : continue;
1476 :
1477 : /* only logical slots are database specific, skip */
1478 21 : if (!SlotIsLogical(s))
1479 10 : continue;
1480 :
1481 : /* not our database, skip */
1482 11 : if (s->data.database != dboid)
1483 8 : continue;
1484 :
1485 : /* NB: intentionally counting invalidated slots */
1486 :
1487 : /* count slots with spinlock held */
1488 3 : SpinLockAcquire(&s->mutex);
1489 3 : (*nslots)++;
1490 3 : if (s->active_proc != INVALID_PROC_NUMBER)
1491 1 : (*nactive)++;
1492 3 : SpinLockRelease(&s->mutex);
1493 : }
1494 51 : LWLockRelease(ReplicationSlotControlLock);
1495 :
1496 51 : if (*nslots > 0)
1497 3 : return true;
1498 48 : return false;
1499 : }
1500 :
1501 : /*
1502 : * ReplicationSlotsDropDBSlots -- Drop all db-specific slots relating to the
1503 : * passed database oid. The caller should hold an exclusive lock on the
1504 : * pg_database oid for the database to prevent creation of new slots on the db
1505 : * or replay from existing slots.
1506 : *
1507 : * Another session that concurrently acquires an existing slot on the target DB
1508 : * (most likely to drop it) may cause this function to ERROR. If that happens
1509 : * it may have dropped some but not all slots.
1510 : *
1511 : * This routine isn't as efficient as it could be - but we don't drop
1512 : * databases often, especially databases with lots of slots.
1513 : *
1514 : * If it drops the last logical slot in the cluster, it requests to disable
1515 : * logical decoding.
1516 : */
1517 : void
1518 64 : ReplicationSlotsDropDBSlots(Oid dboid)
1519 : {
1520 : int i;
1521 : bool found_valid_logicalslot;
1522 64 : bool dropped = false;
1523 :
1524 64 : if (max_replication_slots + max_repack_replication_slots <= 0)
1525 0 : return;
1526 :
1527 64 : restart:
1528 69 : found_valid_logicalslot = false;
1529 69 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1530 994 : for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
1531 : {
1532 : ReplicationSlot *s;
1533 : char *slotname;
1534 : ProcNumber active_proc;
1535 :
1536 930 : s = &ReplicationSlotCtl->replication_slots[i];
1537 :
1538 : /* cannot change while ReplicationSlotCtlLock is held */
1539 930 : if (!s->in_use)
1540 900 : continue;
1541 :
1542 : /* only logical slots are database specific, skip */
1543 30 : if (!SlotIsLogical(s))
1544 11 : continue;
1545 :
1546 : /*
1547 : * Check logical slots on other databases too so we can disable
1548 : * logical decoding only if no slots in the cluster.
1549 : */
1550 19 : SpinLockAcquire(&s->mutex);
1551 19 : found_valid_logicalslot |= (s->data.invalidated == RS_INVAL_NONE);
1552 19 : SpinLockRelease(&s->mutex);
1553 :
1554 : /* not our database, skip */
1555 19 : if (s->data.database != dboid)
1556 14 : continue;
1557 :
1558 : /* NB: intentionally including invalidated slots to drop */
1559 :
1560 : /* acquire slot, so ReplicationSlotDropAcquired can be reused */
1561 5 : SpinLockAcquire(&s->mutex);
1562 : /* can't change while ReplicationSlotControlLock is held */
1563 5 : slotname = NameStr(s->data.name);
1564 5 : active_proc = s->active_proc;
1565 5 : if (active_proc == INVALID_PROC_NUMBER)
1566 : {
1567 5 : MyReplicationSlot = s;
1568 5 : s->active_proc = MyProcNumber;
1569 : }
1570 5 : SpinLockRelease(&s->mutex);
1571 :
1572 : /*
1573 : * Even though we hold an exclusive lock on the database object a
1574 : * logical slot for that DB can still be active, e.g. if it's
1575 : * concurrently being dropped by a backend connected to another DB.
1576 : *
1577 : * That's fairly unlikely in practice, so we'll just bail out.
1578 : *
1579 : * The slot sync worker holds a shared lock on the database before
1580 : * operating on synced logical slots to avoid conflict with the drop
1581 : * happening here. The persistent synced slots are thus safe but there
1582 : * is a possibility that the slot sync worker has created a temporary
1583 : * slot (which stays active even on release) and we are trying to drop
1584 : * that here. In practice, the chances of hitting this scenario are
1585 : * less as during slot synchronization, the temporary slot is
1586 : * immediately converted to persistent and thus is safe due to the
1587 : * shared lock taken on the database. So, we'll just bail out in such
1588 : * a case.
1589 : *
1590 : * XXX: We can consider shutting down the slot sync worker before
1591 : * trying to drop synced temporary slots here.
1592 : */
1593 5 : if (active_proc != INVALID_PROC_NUMBER)
1594 0 : ereport(ERROR,
1595 : (errcode(ERRCODE_OBJECT_IN_USE),
1596 : errmsg("replication slot \"%s\" is active for PID %d",
1597 : slotname, GetPGProcByNumber(active_proc)->pid)));
1598 :
1599 : /*
1600 : * To avoid duplicating ReplicationSlotDropAcquired() and to avoid
1601 : * holding ReplicationSlotControlLock over filesystem operations,
1602 : * release ReplicationSlotControlLock and use
1603 : * ReplicationSlotDropAcquired.
1604 : *
1605 : * As that means the set of slots could change, restart scan from the
1606 : * beginning each time we release the lock.
1607 : */
1608 5 : LWLockRelease(ReplicationSlotControlLock);
1609 5 : ReplicationSlotDropAcquired();
1610 5 : dropped = true;
1611 5 : goto restart;
1612 : }
1613 64 : LWLockRelease(ReplicationSlotControlLock);
1614 :
1615 64 : if (dropped && !found_valid_logicalslot)
1616 0 : RequestDisableLogicalDecoding();
1617 : }
1618 :
1619 : /*
1620 : * Returns true if there is at least one in-use valid logical replication slot.
1621 : */
1622 : bool
1623 489 : CheckLogicalSlotExists(void)
1624 : {
1625 489 : bool found = false;
1626 :
1627 489 : if (max_replication_slots + max_repack_replication_slots <= 0)
1628 0 : return false;
1629 :
1630 489 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1631 7700 : for (int i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
1632 : {
1633 : ReplicationSlot *s;
1634 : bool invalidated;
1635 :
1636 7217 : s = &ReplicationSlotCtl->replication_slots[i];
1637 :
1638 : /* cannot change while ReplicationSlotCtlLock is held */
1639 7217 : if (!s->in_use)
1640 7189 : continue;
1641 :
1642 28 : if (SlotIsPhysical(s))
1643 19 : continue;
1644 :
1645 9 : SpinLockAcquire(&s->mutex);
1646 9 : invalidated = s->data.invalidated != RS_INVAL_NONE;
1647 9 : SpinLockRelease(&s->mutex);
1648 :
1649 9 : if (invalidated)
1650 3 : continue;
1651 :
1652 6 : found = true;
1653 6 : break;
1654 : }
1655 489 : LWLockRelease(ReplicationSlotControlLock);
1656 :
1657 489 : return found;
1658 : }
1659 :
1660 : /*
1661 : * Check whether the server's configuration supports using replication
1662 : * slots.
1663 : */
1664 : void
1665 1906 : CheckSlotRequirements(bool repack)
1666 : {
1667 : /*
1668 : * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
1669 : * needs the same check.
1670 : */
1671 :
1672 1906 : if (!repack && max_replication_slots == 0)
1673 0 : ereport(ERROR,
1674 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1675 : errmsg("replication slots can only be used if \"%s\" > 0",
1676 : "max_replication_slots"));
1677 :
1678 1906 : if (repack && max_repack_replication_slots == 0)
1679 0 : ereport(ERROR,
1680 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1681 : errmsg("REPACK can only be used if \"%s\" > 0",
1682 : "max_repack_replication_slots"));
1683 :
1684 1906 : if (wal_level < WAL_LEVEL_REPLICA)
1685 0 : ereport(ERROR,
1686 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1687 : errmsg("replication slots can only be used if \"wal_level\" >= \"replica\"")));
1688 1906 : }
1689 :
1690 : /*
1691 : * Check whether the user has privilege to use replication slots.
1692 : */
1693 : void
1694 600 : CheckSlotPermissions(void)
1695 : {
1696 600 : if (!has_rolreplication(GetUserId()))
1697 5 : ereport(ERROR,
1698 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1699 : errmsg("permission denied to use replication slots"),
1700 : errdetail("Only roles with the %s attribute may use replication slots.",
1701 : "REPLICATION")));
1702 595 : }
1703 :
1704 : /*
1705 : * Reserve WAL for the currently active slot.
1706 : *
1707 : * Compute and set restart_lsn in a manner that's appropriate for the type of
1708 : * the slot and concurrency safe.
1709 : */
1710 : void
1711 655 : ReplicationSlotReserveWal(void)
1712 : {
1713 655 : ReplicationSlot *slot = MyReplicationSlot;
1714 : XLogSegNo segno;
1715 : XLogRecPtr restart_lsn;
1716 :
1717 : Assert(slot != NULL);
1718 : Assert(!XLogRecPtrIsValid(slot->data.restart_lsn));
1719 : Assert(!XLogRecPtrIsValid(slot->last_saved_restart_lsn));
1720 :
1721 : /*
1722 : * The replication slot mechanism is used to prevent the removal of
1723 : * required WAL.
1724 : *
1725 : * Acquire an exclusive lock to prevent the checkpoint process from
1726 : * concurrently computing the minimum slot LSN (see
1727 : * CheckPointReplicationSlots). This ensures that the WAL reserved for
1728 : * replication cannot be removed during a checkpoint.
1729 : *
1730 : * The mechanism is reliable because if WAL reservation occurs first, the
1731 : * checkpoint must wait for the restart_lsn update before determining the
1732 : * minimum non-removable LSN. On the other hand, if the checkpoint happens
1733 : * first, subsequent WAL reservations will select positions at or beyond
1734 : * the redo pointer of that checkpoint.
1735 : */
1736 655 : LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
1737 :
1738 : /*
1739 : * For logical slots log a standby snapshot and start logical decoding at
1740 : * exactly that position. That allows the slot to start up more quickly.
1741 : * But on a standby we cannot do WAL writes, so just use the replay
1742 : * pointer; effectively, an attempt to create a logical slot on standby
1743 : * will cause it to wait for an xl_running_xact record to be logged
1744 : * independently on the primary, so that a snapshot can be built using the
1745 : * record.
1746 : *
1747 : * None of this is needed (or indeed helpful) for physical slots as
1748 : * they'll start replay at the last logged checkpoint anyway. Instead,
1749 : * return the location of the last redo LSN, where a base backup has to
1750 : * start replay at.
1751 : */
1752 655 : if (SlotIsPhysical(slot))
1753 162 : restart_lsn = GetRedoRecPtr();
1754 493 : else if (RecoveryInProgress())
1755 26 : restart_lsn = GetXLogReplayRecPtr(NULL);
1756 : else
1757 467 : restart_lsn = GetXLogInsertRecPtr();
1758 :
1759 655 : SpinLockAcquire(&slot->mutex);
1760 655 : slot->data.restart_lsn = restart_lsn;
1761 655 : SpinLockRelease(&slot->mutex);
1762 :
1763 : /* prevent WAL removal as fast as possible */
1764 655 : ReplicationSlotsComputeRequiredLSN();
1765 :
1766 : /* Checkpoint shouldn't remove the required WAL. */
1767 655 : XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size);
1768 655 : if (XLogGetLastRemovedSegno() >= segno)
1769 0 : elog(ERROR, "WAL required by replication slot %s has been removed concurrently",
1770 : NameStr(slot->data.name));
1771 :
1772 655 : LWLockRelease(ReplicationSlotAllocationLock);
1773 :
1774 655 : if (!RecoveryInProgress() && SlotIsLogical(slot))
1775 : {
1776 : XLogRecPtr flushptr;
1777 :
1778 : /* make sure we have enough information to start */
1779 467 : flushptr = LogStandbySnapshot(InvalidOid);
1780 :
1781 : /* and make sure it's fsynced to disk */
1782 467 : XLogFlush(flushptr);
1783 : }
1784 655 : }
1785 :
1786 : /*
1787 : * Report that replication slot needs to be invalidated
1788 : */
1789 : static void
1790 23 : ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
1791 : bool terminating,
1792 : int pid,
1793 : NameData slotname,
1794 : XLogRecPtr restart_lsn,
1795 : XLogRecPtr oldestLSN,
1796 : TransactionId snapshotConflictHorizon,
1797 : long slot_idle_seconds)
1798 : {
1799 : StringInfoData err_detail;
1800 : StringInfoData err_hint;
1801 :
1802 23 : initStringInfo(&err_detail);
1803 23 : initStringInfo(&err_hint);
1804 :
1805 23 : switch (cause)
1806 : {
1807 7 : case RS_INVAL_WAL_REMOVED:
1808 : {
1809 7 : uint64 ex = oldestLSN - restart_lsn;
1810 :
1811 7 : appendStringInfo(&err_detail,
1812 7 : ngettext("The slot's restart_lsn %X/%08X exceeds the limit by %" PRIu64 " byte.",
1813 : "The slot's restart_lsn %X/%08X exceeds the limit by %" PRIu64 " bytes.",
1814 : ex),
1815 7 : LSN_FORMAT_ARGS(restart_lsn),
1816 : ex);
1817 : /* translator: %s is a GUC variable name */
1818 7 : appendStringInfo(&err_hint, _("You might need to increase \"%s\"."),
1819 : "max_slot_wal_keep_size");
1820 7 : break;
1821 : }
1822 12 : case RS_INVAL_HORIZON:
1823 12 : appendStringInfo(&err_detail, _("The slot conflicted with xid horizon %u."),
1824 : snapshotConflictHorizon);
1825 12 : break;
1826 :
1827 4 : case RS_INVAL_WAL_LEVEL:
1828 4 : appendStringInfoString(&err_detail, _("Logical decoding on standby requires the primary server to either set \"wal_level\" >= \"logical\" or have at least one logical slot when \"wal_level\" = \"replica\"."));
1829 4 : break;
1830 :
1831 0 : case RS_INVAL_IDLE_TIMEOUT:
1832 : {
1833 : /* translator: %s is a GUC variable name */
1834 0 : appendStringInfo(&err_detail, _("The slot's idle time of %lds exceeds the configured \"%s\" duration of %ds."),
1835 : slot_idle_seconds, "idle_replication_slot_timeout",
1836 : idle_replication_slot_timeout_secs);
1837 : /* translator: %s is a GUC variable name */
1838 0 : appendStringInfo(&err_hint, _("You might need to increase \"%s\"."),
1839 : "idle_replication_slot_timeout");
1840 0 : break;
1841 : }
1842 : case RS_INVAL_NONE:
1843 : pg_unreachable();
1844 : }
1845 :
1846 23 : ereport(LOG,
1847 : terminating ?
1848 : errmsg("terminating process %d to release replication slot \"%s\"",
1849 : pid, NameStr(slotname)) :
1850 : errmsg("invalidating obsolete replication slot \"%s\"",
1851 : NameStr(slotname)),
1852 : errdetail_internal("%s", err_detail.data),
1853 : err_hint.len ? errhint("%s", err_hint.data) : 0);
1854 :
1855 23 : pfree(err_detail.data);
1856 23 : pfree(err_hint.data);
1857 23 : }
1858 :
1859 : /*
1860 : * Can we invalidate an idle replication slot?
1861 : *
1862 : * Idle timeout invalidation is allowed only when:
1863 : *
1864 : * 1. Idle timeout is set
1865 : * 2. Slot has reserved WAL
1866 : * 3. Slot is inactive
1867 : * 4. The slot is not being synced from the primary while the server is in
1868 : * recovery. This is because synced slots are always considered to be
1869 : * inactive because they don't perform logical decoding to produce changes.
1870 : */
1871 : static inline bool
1872 386 : CanInvalidateIdleSlot(ReplicationSlot *s)
1873 : {
1874 386 : return (idle_replication_slot_timeout_secs != 0 &&
1875 0 : XLogRecPtrIsValid(s->data.restart_lsn) &&
1876 386 : s->inactive_since > 0 &&
1877 0 : !(RecoveryInProgress() && s->data.synced));
1878 : }
1879 :
1880 : /*
1881 : * DetermineSlotInvalidationCause - Determine the cause for which a slot
1882 : * becomes invalid among the given possible causes.
1883 : *
1884 : * This function sequentially checks all possible invalidation causes and
1885 : * returns the first one for which the slot is eligible for invalidation.
1886 : */
1887 : static ReplicationSlotInvalidationCause
1888 419 : DetermineSlotInvalidationCause(uint32 possible_causes, ReplicationSlot *s,
1889 : XLogRecPtr oldestLSN, Oid dboid,
1890 : TransactionId snapshotConflictHorizon,
1891 : TimestampTz *inactive_since, TimestampTz now)
1892 : {
1893 : Assert(possible_causes != RS_INVAL_NONE);
1894 :
1895 419 : if (possible_causes & RS_INVAL_WAL_REMOVED)
1896 : {
1897 393 : XLogRecPtr restart_lsn = s->data.restart_lsn;
1898 :
1899 393 : if (XLogRecPtrIsValid(restart_lsn) &&
1900 : restart_lsn < oldestLSN)
1901 7 : return RS_INVAL_WAL_REMOVED;
1902 : }
1903 :
1904 412 : if (possible_causes & RS_INVAL_HORIZON)
1905 : {
1906 : /* invalid DB oid signals a shared relation */
1907 22 : if (SlotIsLogical(s) &&
1908 17 : (dboid == InvalidOid || dboid == s->data.database))
1909 : {
1910 22 : TransactionId effective_xmin = s->effective_xmin;
1911 22 : TransactionId catalog_effective_xmin = s->effective_catalog_xmin;
1912 :
1913 22 : if (TransactionIdIsValid(effective_xmin) &&
1914 0 : TransactionIdPrecedesOrEquals(effective_xmin,
1915 : snapshotConflictHorizon))
1916 0 : return RS_INVAL_HORIZON;
1917 44 : else if (TransactionIdIsValid(catalog_effective_xmin) &&
1918 22 : TransactionIdPrecedesOrEquals(catalog_effective_xmin,
1919 : snapshotConflictHorizon))
1920 12 : return RS_INVAL_HORIZON;
1921 : }
1922 : }
1923 :
1924 400 : if (possible_causes & RS_INVAL_WAL_LEVEL)
1925 : {
1926 4 : if (SlotIsLogical(s))
1927 4 : return RS_INVAL_WAL_LEVEL;
1928 : }
1929 :
1930 396 : if (possible_causes & RS_INVAL_IDLE_TIMEOUT)
1931 : {
1932 : Assert(now > 0);
1933 :
1934 386 : if (CanInvalidateIdleSlot(s))
1935 : {
1936 : /*
1937 : * Simulate the invalidation due to idle_timeout to test the
1938 : * timeout behavior promptly, without waiting for it to trigger
1939 : * naturally.
1940 : */
1941 : #ifdef USE_INJECTION_POINTS
1942 0 : if (IS_INJECTION_POINT_ATTACHED("slot-timeout-inval"))
1943 : {
1944 0 : *inactive_since = 0; /* since the beginning of time */
1945 0 : return RS_INVAL_IDLE_TIMEOUT;
1946 : }
1947 : #endif
1948 :
1949 : /*
1950 : * Check if the slot needs to be invalidated due to
1951 : * idle_replication_slot_timeout GUC.
1952 : */
1953 0 : if (TimestampDifferenceExceedsSeconds(s->inactive_since, now,
1954 : idle_replication_slot_timeout_secs))
1955 : {
1956 0 : *inactive_since = s->inactive_since;
1957 0 : return RS_INVAL_IDLE_TIMEOUT;
1958 : }
1959 : }
1960 : }
1961 :
1962 396 : return RS_INVAL_NONE;
1963 : }
1964 :
1965 : /*
1966 : * Helper for InvalidateObsoleteReplicationSlots
1967 : *
1968 : * Acquires the given slot and mark it invalid, if necessary and possible.
1969 : *
1970 : * Returns true if the slot was invalidated.
1971 : *
1972 : * Set *released_lock_out if ReplicationSlotControlLock was released in the
1973 : * interim (and in that case we're not holding the lock at return, otherwise
1974 : * we are).
1975 : *
1976 : * This is inherently racy, because we release the LWLock
1977 : * for syscalls, so caller must restart if we return true.
1978 : */
1979 : static bool
1980 458 : InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
1981 : ReplicationSlot *s,
1982 : XLogRecPtr oldestLSN,
1983 : Oid dboid, TransactionId snapshotConflictHorizon,
1984 : bool *released_lock_out)
1985 : {
1986 458 : int last_signaled_pid = 0;
1987 458 : bool released_lock = false;
1988 458 : bool invalidated = false;
1989 458 : TimestampTz inactive_since = 0;
1990 :
1991 : for (;;)
1992 7 : {
1993 : XLogRecPtr restart_lsn;
1994 : NameData slotname;
1995 : ProcNumber active_proc;
1996 465 : int active_pid = 0;
1997 465 : ReplicationSlotInvalidationCause invalidation_cause = RS_INVAL_NONE;
1998 465 : TimestampTz now = 0;
1999 465 : long slot_idle_secs = 0;
2000 :
2001 : Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
2002 :
2003 465 : if (!s->in_use)
2004 : {
2005 0 : if (released_lock)
2006 0 : LWLockRelease(ReplicationSlotControlLock);
2007 0 : break;
2008 : }
2009 :
2010 465 : if (possible_causes & RS_INVAL_IDLE_TIMEOUT)
2011 : {
2012 : /*
2013 : * Assign the current time here to avoid system call overhead
2014 : * while holding the spinlock in subsequent code.
2015 : */
2016 407 : now = GetCurrentTimestamp();
2017 : }
2018 :
2019 : /*
2020 : * Check if the slot needs to be invalidated. If it needs to be
2021 : * invalidated, and is not currently acquired, acquire it and mark it
2022 : * as having been invalidated. We do this with the spinlock held to
2023 : * avoid race conditions -- for example the restart_lsn could move
2024 : * forward, or the slot could be dropped.
2025 : */
2026 465 : SpinLockAcquire(&s->mutex);
2027 :
2028 465 : restart_lsn = s->data.restart_lsn;
2029 :
2030 : /* we do nothing if the slot is already invalid */
2031 465 : if (s->data.invalidated == RS_INVAL_NONE)
2032 419 : invalidation_cause = DetermineSlotInvalidationCause(possible_causes,
2033 : s, oldestLSN,
2034 : dboid,
2035 : snapshotConflictHorizon,
2036 : &inactive_since,
2037 : now);
2038 :
2039 : /* if there's no invalidation, we're done */
2040 465 : if (invalidation_cause == RS_INVAL_NONE)
2041 : {
2042 442 : SpinLockRelease(&s->mutex);
2043 442 : if (released_lock)
2044 0 : LWLockRelease(ReplicationSlotControlLock);
2045 442 : break;
2046 : }
2047 :
2048 23 : slotname = s->data.name;
2049 23 : active_proc = s->active_proc;
2050 :
2051 : /*
2052 : * If the slot can be acquired, do so and mark it invalidated
2053 : * immediately. Otherwise we'll signal the owning process, below, and
2054 : * retry.
2055 : *
2056 : * Note: Unlike other slot attributes, slot's inactive_since can't be
2057 : * changed until the acquired slot is released or the owning process
2058 : * is terminated. So, the inactive slot can only be invalidated
2059 : * immediately without being terminated.
2060 : */
2061 23 : if (active_proc == INVALID_PROC_NUMBER)
2062 : {
2063 16 : MyReplicationSlot = s;
2064 16 : s->active_proc = MyProcNumber;
2065 16 : s->data.invalidated = invalidation_cause;
2066 :
2067 : /*
2068 : * XXX: We should consider not overwriting restart_lsn and instead
2069 : * just rely on .invalidated.
2070 : */
2071 16 : if (invalidation_cause == RS_INVAL_WAL_REMOVED)
2072 : {
2073 5 : s->data.restart_lsn = InvalidXLogRecPtr;
2074 5 : s->last_saved_restart_lsn = InvalidXLogRecPtr;
2075 : }
2076 :
2077 : /* Let caller know */
2078 16 : invalidated = true;
2079 : }
2080 : else
2081 : {
2082 7 : active_pid = GetPGProcByNumber(active_proc)->pid;
2083 : Assert(active_pid != 0);
2084 : }
2085 :
2086 23 : SpinLockRelease(&s->mutex);
2087 :
2088 : /*
2089 : * Calculate the idle time duration of the slot if slot is marked
2090 : * invalidated with RS_INVAL_IDLE_TIMEOUT.
2091 : */
2092 23 : if (invalidation_cause == RS_INVAL_IDLE_TIMEOUT)
2093 : {
2094 : int slot_idle_usecs;
2095 :
2096 0 : TimestampDifference(inactive_since, now, &slot_idle_secs,
2097 : &slot_idle_usecs);
2098 : }
2099 :
2100 23 : if (active_proc != INVALID_PROC_NUMBER)
2101 : {
2102 : /*
2103 : * Prepare the sleep on the slot's condition variable before
2104 : * releasing the lock, to close a possible race condition if the
2105 : * slot is released before the sleep below.
2106 : */
2107 7 : ConditionVariablePrepareToSleep(&s->active_cv);
2108 :
2109 7 : LWLockRelease(ReplicationSlotControlLock);
2110 7 : released_lock = true;
2111 :
2112 : /*
2113 : * Signal to terminate the process that owns the slot, if we
2114 : * haven't already signalled it. (Avoidance of repeated
2115 : * signalling is the only reason for there to be a loop in this
2116 : * routine; otherwise we could rely on caller's restart loop.)
2117 : *
2118 : * There is the race condition that other process may own the slot
2119 : * after its current owner process is terminated and before this
2120 : * process owns it. To handle that, we signal only if the PID of
2121 : * the owning process has changed from the previous time. (This
2122 : * logic assumes that the same PID is not reused very quickly.)
2123 : */
2124 7 : if (last_signaled_pid != active_pid)
2125 : {
2126 7 : ReportSlotInvalidation(invalidation_cause, true, active_pid,
2127 : slotname, restart_lsn,
2128 : oldestLSN, snapshotConflictHorizon,
2129 : slot_idle_secs);
2130 :
2131 7 : if (MyBackendType == B_STARTUP)
2132 5 : (void) SignalRecoveryConflict(GetPGProcByNumber(active_proc),
2133 : active_pid,
2134 : RECOVERY_CONFLICT_LOGICALSLOT);
2135 : else
2136 2 : (void) kill(active_pid, SIGTERM);
2137 :
2138 7 : last_signaled_pid = active_pid;
2139 : }
2140 :
2141 : /* Wait until the slot is released. */
2142 7 : ConditionVariableSleep(&s->active_cv,
2143 : WAIT_EVENT_REPLICATION_SLOT_DROP);
2144 :
2145 : /*
2146 : * Re-acquire lock and start over; we expect to invalidate the
2147 : * slot next time (unless another process acquires the slot in the
2148 : * meantime).
2149 : *
2150 : * Note: It is possible for a slot to advance its restart_lsn or
2151 : * xmin values sufficiently between when we release the mutex and
2152 : * when we recheck, moving from a conflicting state to a non
2153 : * conflicting state. This is intentional and safe: if the slot
2154 : * has caught up while we're busy here, the resources we were
2155 : * concerned about (WAL segments or tuples) have not yet been
2156 : * removed, and there's no reason to invalidate the slot.
2157 : */
2158 7 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
2159 7 : continue;
2160 : }
2161 : else
2162 : {
2163 : /*
2164 : * We hold the slot now and have already invalidated it; flush it
2165 : * to ensure that state persists.
2166 : *
2167 : * Don't want to hold ReplicationSlotControlLock across file
2168 : * system operations, so release it now but be sure to tell caller
2169 : * to restart from scratch.
2170 : */
2171 16 : LWLockRelease(ReplicationSlotControlLock);
2172 16 : released_lock = true;
2173 :
2174 : /* Make sure the invalidated state persists across server restart */
2175 16 : ReplicationSlotMarkDirty();
2176 16 : ReplicationSlotSave();
2177 16 : ReplicationSlotRelease();
2178 :
2179 16 : ReportSlotInvalidation(invalidation_cause, false, active_pid,
2180 : slotname, restart_lsn,
2181 : oldestLSN, snapshotConflictHorizon,
2182 : slot_idle_secs);
2183 :
2184 : /* done with this slot for now */
2185 16 : break;
2186 : }
2187 : }
2188 :
2189 : Assert(released_lock == !LWLockHeldByMe(ReplicationSlotControlLock));
2190 :
2191 458 : *released_lock_out = released_lock;
2192 458 : return invalidated;
2193 : }
2194 :
2195 : /*
2196 : * Invalidate slots that require resources about to be removed.
2197 : *
2198 : * Returns true when any slot have got invalidated.
2199 : *
2200 : * Whether a slot needs to be invalidated depends on the invalidation cause.
2201 : * A slot is invalidated if it:
2202 : * - RS_INVAL_WAL_REMOVED: requires a LSN older than the given segment
2203 : * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given
2204 : * db; dboid may be InvalidOid for shared relations
2205 : * - RS_INVAL_WAL_LEVEL: is a logical slot and effective_wal_level is not
2206 : * logical.
2207 : * - RS_INVAL_IDLE_TIMEOUT: has been idle longer than the configured
2208 : * "idle_replication_slot_timeout" duration.
2209 : *
2210 : * Note: This function attempts to invalidate the slot for multiple possible
2211 : * causes in a single pass, minimizing redundant iterations. The "cause"
2212 : * parameter can be a MASK representing one or more of the defined causes.
2213 : *
2214 : * If it invalidates the last logical slot in the cluster, it requests to
2215 : * disable logical decoding.
2216 : *
2217 : * NB - this runs as part of checkpoint, so avoid raising errors if possible.
2218 : */
2219 : bool
2220 1958 : InvalidateObsoleteReplicationSlots(uint32 possible_causes,
2221 : XLogSegNo oldestSegno, Oid dboid,
2222 : TransactionId snapshotConflictHorizon)
2223 : {
2224 : XLogRecPtr oldestLSN;
2225 1958 : bool invalidated = false;
2226 1958 : bool invalidated_logical = false;
2227 : bool found_valid_logicalslot;
2228 :
2229 : Assert(!(possible_causes & RS_INVAL_HORIZON) || TransactionIdIsValid(snapshotConflictHorizon));
2230 : Assert(!(possible_causes & RS_INVAL_WAL_REMOVED) || oldestSegno > 0);
2231 : Assert(possible_causes != RS_INVAL_NONE);
2232 :
2233 1958 : if (max_replication_slots == 0 && max_repack_replication_slots == 0)
2234 0 : return invalidated;
2235 :
2236 1958 : XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
2237 :
2238 1974 : restart:
2239 1974 : found_valid_logicalslot = false;
2240 1974 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
2241 30989 : for (int i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
2242 : {
2243 29031 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
2244 29031 : bool released_lock = false;
2245 :
2246 29031 : if (!s->in_use)
2247 28573 : continue;
2248 :
2249 : /* Prevent invalidation of logical slots during binary upgrade */
2250 467 : if (SlotIsLogical(s) && IsBinaryUpgrade)
2251 : {
2252 9 : SpinLockAcquire(&s->mutex);
2253 9 : found_valid_logicalslot |= (s->data.invalidated == RS_INVAL_NONE);
2254 9 : SpinLockRelease(&s->mutex);
2255 :
2256 9 : continue;
2257 : }
2258 :
2259 458 : if (InvalidatePossiblyObsoleteSlot(possible_causes, s, oldestLSN,
2260 : dboid, snapshotConflictHorizon,
2261 : &released_lock))
2262 : {
2263 : Assert(released_lock);
2264 :
2265 : /* Remember we have invalidated a physical or logical slot */
2266 16 : invalidated = true;
2267 :
2268 : /*
2269 : * Additionally, remember we have invalidated a logical slot as we
2270 : * can request disabling logical decoding later.
2271 : */
2272 16 : if (SlotIsLogical(s))
2273 13 : invalidated_logical = true;
2274 : }
2275 : else
2276 : {
2277 : /*
2278 : * We need to check if the slot is invalidated here since
2279 : * InvalidatePossiblyObsoleteSlot() returns false also if the slot
2280 : * is already invalidated.
2281 : */
2282 442 : SpinLockAcquire(&s->mutex);
2283 884 : found_valid_logicalslot |=
2284 442 : (SlotIsLogical(s) && (s->data.invalidated == RS_INVAL_NONE));
2285 442 : SpinLockRelease(&s->mutex);
2286 : }
2287 :
2288 : /* if the lock was released, start from scratch */
2289 458 : if (released_lock)
2290 16 : goto restart;
2291 : }
2292 1958 : LWLockRelease(ReplicationSlotControlLock);
2293 :
2294 : /*
2295 : * If any slots have been invalidated, recalculate the resource limits.
2296 : */
2297 1958 : if (invalidated)
2298 : {
2299 11 : ReplicationSlotsComputeRequiredXmin(false);
2300 11 : ReplicationSlotsComputeRequiredLSN();
2301 : }
2302 :
2303 : /*
2304 : * Request the checkpointer to disable logical decoding if no valid
2305 : * logical slots remain. If called by the checkpointer during a
2306 : * checkpoint, only the request is initiated; actual deactivation is
2307 : * deferred until after the checkpoint completes.
2308 : */
2309 1958 : if (invalidated_logical && !found_valid_logicalslot)
2310 8 : RequestDisableLogicalDecoding();
2311 :
2312 1958 : return invalidated;
2313 : }
2314 :
2315 : /*
2316 : * Flush all replication slots to disk.
2317 : *
2318 : * It is convenient to flush dirty replication slots at the time of checkpoint.
2319 : * Additionally, in case of a shutdown checkpoint, we also identify the slots
2320 : * for which the confirmed_flush LSN has been updated since the last time it
2321 : * was saved and flush them.
2322 : */
2323 : void
2324 1933 : CheckPointReplicationSlots(bool is_shutdown)
2325 : {
2326 : int i;
2327 1933 : bool last_saved_restart_lsn_updated = false;
2328 :
2329 1933 : elog(DEBUG1, "performing replication slot checkpoint");
2330 :
2331 : /*
2332 : * Prevent any slot from being created/dropped while we're active. As we
2333 : * explicitly do *not* want to block iterating over replication_slots or
2334 : * acquiring a slot we cannot take the control lock - but that's OK,
2335 : * because holding ReplicationSlotAllocationLock is strictly stronger, and
2336 : * enough to guarantee that nobody can change the in_use bits on us.
2337 : *
2338 : * Additionally, acquiring the Allocation lock is necessary to serialize
2339 : * the slot flush process with concurrent slot WAL reservation. This
2340 : * ensures that the WAL position being reserved is either flushed to disk
2341 : * or is beyond or equal to the redo pointer of the current checkpoint
2342 : * (See ReplicationSlotReserveWal for details).
2343 : */
2344 1933 : LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
2345 :
2346 30653 : for (i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
2347 : {
2348 28720 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
2349 : char path[MAXPGPATH];
2350 :
2351 28720 : if (!s->in_use)
2352 28311 : continue;
2353 :
2354 : /* save the slot to disk, locking is handled in SaveSlotToPath() */
2355 409 : sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(s->data.name));
2356 :
2357 : /*
2358 : * Slot's data is not flushed each time the confirmed_flush LSN is
2359 : * updated as that could lead to frequent writes. However, we decide
2360 : * to force a flush of all logical slot's data at the time of shutdown
2361 : * if the confirmed_flush LSN is changed since we last flushed it to
2362 : * disk. This helps in avoiding an unnecessary retreat of the
2363 : * confirmed_flush LSN after restart.
2364 : */
2365 409 : if (is_shutdown && SlotIsLogical(s))
2366 : {
2367 92 : SpinLockAcquire(&s->mutex);
2368 :
2369 92 : if (s->data.invalidated == RS_INVAL_NONE &&
2370 91 : s->data.confirmed_flush > s->last_saved_confirmed_flush)
2371 : {
2372 41 : s->just_dirtied = true;
2373 41 : s->dirty = true;
2374 : }
2375 92 : SpinLockRelease(&s->mutex);
2376 : }
2377 :
2378 : /*
2379 : * Track if we're going to update slot's last_saved_restart_lsn. We
2380 : * need this to know if we need to recompute the required LSN.
2381 : */
2382 409 : if (s->last_saved_restart_lsn != s->data.restart_lsn)
2383 213 : last_saved_restart_lsn_updated = true;
2384 :
2385 409 : SaveSlotToPath(s, path, LOG);
2386 : }
2387 1933 : LWLockRelease(ReplicationSlotAllocationLock);
2388 :
2389 : /*
2390 : * Recompute the required LSN if SaveSlotToPath() updated
2391 : * last_saved_restart_lsn for any slot.
2392 : */
2393 1933 : if (last_saved_restart_lsn_updated)
2394 213 : ReplicationSlotsComputeRequiredLSN();
2395 1933 : }
2396 :
2397 : /*
2398 : * Load all replication slots from disk into memory at server startup. This
2399 : * needs to be run before we start crash recovery.
2400 : */
2401 : void
2402 1077 : StartupReplicationSlots(void)
2403 : {
2404 : DIR *replication_dir;
2405 : struct dirent *replication_de;
2406 :
2407 1077 : elog(DEBUG1, "starting up replication slots");
2408 :
2409 : /* restore all slots by iterating over all on-disk entries */
2410 1077 : replication_dir = AllocateDir(PG_REPLSLOT_DIR);
2411 3348 : while ((replication_de = ReadDir(replication_dir, PG_REPLSLOT_DIR)) != NULL)
2412 : {
2413 : char path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
2414 : PGFileType de_type;
2415 :
2416 2273 : if (strcmp(replication_de->d_name, ".") == 0 ||
2417 1198 : strcmp(replication_de->d_name, "..") == 0)
2418 2150 : continue;
2419 :
2420 123 : snprintf(path, sizeof(path), "%s/%s", PG_REPLSLOT_DIR, replication_de->d_name);
2421 123 : de_type = get_dirent_type(path, replication_de, false, DEBUG1);
2422 :
2423 : /* we're only creating directories here, skip if it's not our's */
2424 123 : if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_DIR)
2425 0 : continue;
2426 :
2427 : /* we crashed while a slot was being setup or deleted, clean up */
2428 123 : if (pg_str_endswith(replication_de->d_name, ".tmp"))
2429 : {
2430 0 : if (!rmtree(path, true))
2431 : {
2432 0 : ereport(WARNING,
2433 : (errmsg("could not remove directory \"%s\"",
2434 : path)));
2435 0 : continue;
2436 : }
2437 0 : fsync_fname(PG_REPLSLOT_DIR, true);
2438 0 : continue;
2439 : }
2440 :
2441 : /* looks like a slot in a normal state, restore */
2442 123 : RestoreSlotFromDisk(replication_de->d_name);
2443 : }
2444 1075 : FreeDir(replication_dir);
2445 :
2446 : /* currently no slots exist, we're done. */
2447 1075 : if (max_replication_slots + max_repack_replication_slots <= 0)
2448 0 : return;
2449 :
2450 : /* Now that we have recovered all the data, compute replication xmin */
2451 1075 : ReplicationSlotsComputeRequiredXmin(false);
2452 1075 : ReplicationSlotsComputeRequiredLSN();
2453 : }
2454 :
2455 : /* ----
2456 : * Manipulation of on-disk state of replication slots
2457 : *
2458 : * NB: none of the routines below should take any notice whether a slot is the
2459 : * current one or not, that's all handled a layer above.
2460 : * ----
2461 : */
2462 : static void
2463 708 : CreateSlotOnDisk(ReplicationSlot *slot)
2464 : {
2465 : char tmppath[MAXPGPATH];
2466 : char path[MAXPGPATH];
2467 : struct stat st;
2468 :
2469 : /*
2470 : * No need to take out the io_in_progress_lock, nobody else can see this
2471 : * slot yet, so nobody else will write. We're reusing SaveSlotToPath which
2472 : * takes out the lock, if we'd take the lock here, we'd deadlock.
2473 : */
2474 :
2475 708 : sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(slot->data.name));
2476 708 : sprintf(tmppath, "%s/%s.tmp", PG_REPLSLOT_DIR, NameStr(slot->data.name));
2477 :
2478 : /*
2479 : * It's just barely possible that some previous effort to create or drop a
2480 : * slot with this name left a temp directory lying around. If that seems
2481 : * to be the case, try to remove it. If the rmtree() fails, we'll error
2482 : * out at the MakePGDirectory() below, so we don't bother checking
2483 : * success.
2484 : */
2485 708 : if (stat(tmppath, &st) == 0 && S_ISDIR(st.st_mode))
2486 0 : rmtree(tmppath, true);
2487 :
2488 : /* Create and fsync the temporary slot directory. */
2489 708 : if (MakePGDirectory(tmppath) < 0)
2490 0 : ereport(ERROR,
2491 : (errcode_for_file_access(),
2492 : errmsg("could not create directory \"%s\": %m",
2493 : tmppath)));
2494 708 : fsync_fname(tmppath, true);
2495 :
2496 : /* Write the actual state file. */
2497 708 : slot->dirty = true; /* signal that we really need to write */
2498 708 : SaveSlotToPath(slot, tmppath, ERROR);
2499 :
2500 : /* Rename the directory into place. */
2501 708 : if (rename(tmppath, path) != 0)
2502 0 : ereport(ERROR,
2503 : (errcode_for_file_access(),
2504 : errmsg("could not rename file \"%s\" to \"%s\": %m",
2505 : tmppath, path)));
2506 :
2507 : /*
2508 : * If we'd now fail - really unlikely - we wouldn't know whether this slot
2509 : * would persist after an OS crash or not - so, force a restart. The
2510 : * restart would try to fsync this again till it works.
2511 : */
2512 708 : START_CRIT_SECTION();
2513 :
2514 708 : fsync_fname(path, true);
2515 708 : fsync_fname(PG_REPLSLOT_DIR, true);
2516 :
2517 708 : END_CRIT_SECTION();
2518 708 : }
2519 :
2520 : /*
2521 : * Shared functionality between saving and creating a replication slot.
2522 : */
2523 : static void
2524 2625 : SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
2525 : {
2526 : char tmppath[MAXPGPATH];
2527 : char path[MAXPGPATH];
2528 : int fd;
2529 : ReplicationSlotOnDisk cp;
2530 : bool was_dirty;
2531 :
2532 : /* first check whether there's something to write out */
2533 2625 : SpinLockAcquire(&slot->mutex);
2534 2625 : was_dirty = slot->dirty;
2535 2625 : slot->just_dirtied = false;
2536 2625 : SpinLockRelease(&slot->mutex);
2537 :
2538 : /* and don't do anything if there's nothing to write */
2539 2625 : if (!was_dirty)
2540 144 : return;
2541 :
2542 2481 : LWLockAcquire(&slot->io_in_progress_lock, LW_EXCLUSIVE);
2543 :
2544 : /* silence valgrind :( */
2545 2481 : memset(&cp, 0, sizeof(ReplicationSlotOnDisk));
2546 :
2547 2481 : sprintf(tmppath, "%s/state.tmp", dir);
2548 2481 : sprintf(path, "%s/state", dir);
2549 :
2550 2481 : fd = OpenTransientFile(tmppath, O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
2551 2481 : if (fd < 0)
2552 : {
2553 : /*
2554 : * If not an ERROR, then release the lock before returning. In case
2555 : * of an ERROR, the error recovery path automatically releases the
2556 : * lock, but no harm in explicitly releasing even in that case. Note
2557 : * that LWLockRelease() could affect errno.
2558 : */
2559 0 : int save_errno = errno;
2560 :
2561 0 : LWLockRelease(&slot->io_in_progress_lock);
2562 0 : errno = save_errno;
2563 0 : ereport(elevel,
2564 : (errcode_for_file_access(),
2565 : errmsg("could not create file \"%s\": %m",
2566 : tmppath)));
2567 0 : return;
2568 : }
2569 :
2570 2481 : cp.magic = SLOT_MAGIC;
2571 2481 : INIT_CRC32C(cp.checksum);
2572 2481 : cp.version = SLOT_VERSION;
2573 2481 : cp.length = ReplicationSlotOnDiskV2Size;
2574 :
2575 2481 : SpinLockAcquire(&slot->mutex);
2576 :
2577 2481 : memcpy(&cp.slotdata, &slot->data, sizeof(ReplicationSlotPersistentData));
2578 :
2579 2481 : SpinLockRelease(&slot->mutex);
2580 :
2581 2481 : COMP_CRC32C(cp.checksum,
2582 : (char *) (&cp) + ReplicationSlotOnDiskNotChecksummedSize,
2583 : ReplicationSlotOnDiskChecksummedSize);
2584 2481 : FIN_CRC32C(cp.checksum);
2585 :
2586 2481 : errno = 0;
2587 2481 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_WRITE);
2588 2481 : if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
2589 : {
2590 0 : int save_errno = errno;
2591 :
2592 0 : pgstat_report_wait_end();
2593 0 : CloseTransientFile(fd);
2594 0 : unlink(tmppath);
2595 0 : LWLockRelease(&slot->io_in_progress_lock);
2596 :
2597 : /* if write didn't set errno, assume problem is no disk space */
2598 0 : errno = save_errno ? save_errno : ENOSPC;
2599 0 : ereport(elevel,
2600 : (errcode_for_file_access(),
2601 : errmsg("could not write to file \"%s\": %m",
2602 : tmppath)));
2603 0 : return;
2604 : }
2605 2481 : pgstat_report_wait_end();
2606 :
2607 : /* fsync the temporary file */
2608 2481 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_SYNC);
2609 2481 : if (pg_fsync(fd) != 0)
2610 : {
2611 0 : int save_errno = errno;
2612 :
2613 0 : pgstat_report_wait_end();
2614 0 : CloseTransientFile(fd);
2615 0 : unlink(tmppath);
2616 0 : LWLockRelease(&slot->io_in_progress_lock);
2617 :
2618 0 : errno = save_errno;
2619 0 : ereport(elevel,
2620 : (errcode_for_file_access(),
2621 : errmsg("could not fsync file \"%s\": %m",
2622 : tmppath)));
2623 0 : return;
2624 : }
2625 2481 : pgstat_report_wait_end();
2626 :
2627 2481 : if (CloseTransientFile(fd) != 0)
2628 : {
2629 0 : int save_errno = errno;
2630 :
2631 0 : unlink(tmppath);
2632 0 : LWLockRelease(&slot->io_in_progress_lock);
2633 :
2634 0 : errno = save_errno;
2635 0 : ereport(elevel,
2636 : (errcode_for_file_access(),
2637 : errmsg("could not close file \"%s\": %m",
2638 : tmppath)));
2639 0 : return;
2640 : }
2641 :
2642 : /* rename to permanent file, fsync file and directory */
2643 2481 : if (rename(tmppath, path) != 0)
2644 : {
2645 0 : int save_errno = errno;
2646 :
2647 0 : unlink(tmppath);
2648 0 : LWLockRelease(&slot->io_in_progress_lock);
2649 :
2650 0 : errno = save_errno;
2651 0 : ereport(elevel,
2652 : (errcode_for_file_access(),
2653 : errmsg("could not rename file \"%s\" to \"%s\": %m",
2654 : tmppath, path)));
2655 0 : return;
2656 : }
2657 :
2658 : /*
2659 : * Check CreateSlotOnDisk() for the reasoning of using a critical section.
2660 : */
2661 2481 : START_CRIT_SECTION();
2662 :
2663 2481 : fsync_fname(path, false);
2664 2481 : fsync_fname(dir, true);
2665 2481 : fsync_fname(PG_REPLSLOT_DIR, true);
2666 :
2667 2481 : END_CRIT_SECTION();
2668 :
2669 : /*
2670 : * Successfully wrote, unset dirty bit, unless somebody dirtied again
2671 : * already and remember the confirmed_flush LSN value.
2672 : */
2673 2481 : SpinLockAcquire(&slot->mutex);
2674 2481 : if (!slot->just_dirtied)
2675 2461 : slot->dirty = false;
2676 2481 : slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
2677 2481 : slot->last_saved_restart_lsn = cp.slotdata.restart_lsn;
2678 2481 : SpinLockRelease(&slot->mutex);
2679 :
2680 2481 : LWLockRelease(&slot->io_in_progress_lock);
2681 : }
2682 :
2683 : /*
2684 : * Load a single slot from disk into memory.
2685 : */
2686 : static void
2687 123 : RestoreSlotFromDisk(const char *name)
2688 : {
2689 : ReplicationSlotOnDisk cp;
2690 : int i;
2691 : char slotdir[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
2692 : char path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR) + 10];
2693 : int fd;
2694 123 : bool restored = false;
2695 : int readBytes;
2696 : pg_crc32c checksum;
2697 123 : TimestampTz now = 0;
2698 :
2699 : /* no need to lock here, no concurrent access allowed yet */
2700 :
2701 : /* delete temp file if it exists */
2702 123 : sprintf(slotdir, "%s/%s", PG_REPLSLOT_DIR, name);
2703 123 : sprintf(path, "%s/state.tmp", slotdir);
2704 123 : if (unlink(path) < 0 && errno != ENOENT)
2705 0 : ereport(PANIC,
2706 : (errcode_for_file_access(),
2707 : errmsg("could not remove file \"%s\": %m", path)));
2708 :
2709 123 : sprintf(path, "%s/state", slotdir);
2710 :
2711 123 : elog(DEBUG1, "restoring replication slot from \"%s\"", path);
2712 :
2713 : /* on some operating systems fsyncing a file requires O_RDWR */
2714 123 : fd = OpenTransientFile(path, O_RDWR | PG_BINARY);
2715 :
2716 : /*
2717 : * We do not need to handle this as we are rename()ing the directory into
2718 : * place only after we fsync()ed the state file.
2719 : */
2720 123 : if (fd < 0)
2721 0 : ereport(PANIC,
2722 : (errcode_for_file_access(),
2723 : errmsg("could not open file \"%s\": %m", path)));
2724 :
2725 : /*
2726 : * Sync state file before we're reading from it. We might have crashed
2727 : * while it wasn't synced yet and we shouldn't continue on that basis.
2728 : */
2729 123 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_RESTORE_SYNC);
2730 123 : if (pg_fsync(fd) != 0)
2731 0 : ereport(PANIC,
2732 : (errcode_for_file_access(),
2733 : errmsg("could not fsync file \"%s\": %m",
2734 : path)));
2735 123 : pgstat_report_wait_end();
2736 :
2737 : /* Also sync the parent directory */
2738 123 : START_CRIT_SECTION();
2739 123 : fsync_fname(slotdir, true);
2740 123 : END_CRIT_SECTION();
2741 :
2742 : /* read part of statefile that's guaranteed to be version independent */
2743 123 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
2744 123 : readBytes = read(fd, &cp, ReplicationSlotOnDiskConstantSize);
2745 123 : pgstat_report_wait_end();
2746 123 : if (readBytes != ReplicationSlotOnDiskConstantSize)
2747 : {
2748 0 : if (readBytes < 0)
2749 0 : ereport(PANIC,
2750 : (errcode_for_file_access(),
2751 : errmsg("could not read file \"%s\": %m", path)));
2752 : else
2753 0 : ereport(PANIC,
2754 : (errcode(ERRCODE_DATA_CORRUPTED),
2755 : errmsg("could not read file \"%s\": read %d of %zu",
2756 : path, readBytes,
2757 : (Size) ReplicationSlotOnDiskConstantSize)));
2758 : }
2759 :
2760 : /* verify magic */
2761 123 : if (cp.magic != SLOT_MAGIC)
2762 0 : ereport(PANIC,
2763 : (errcode(ERRCODE_DATA_CORRUPTED),
2764 : errmsg("replication slot file \"%s\" has wrong magic number: %u instead of %u",
2765 : path, cp.magic, SLOT_MAGIC)));
2766 :
2767 : /* verify version */
2768 123 : if (cp.version != SLOT_VERSION)
2769 0 : ereport(PANIC,
2770 : (errcode(ERRCODE_DATA_CORRUPTED),
2771 : errmsg("replication slot file \"%s\" has unsupported version %u",
2772 : path, cp.version)));
2773 :
2774 : /* boundary check on length */
2775 123 : if (cp.length != ReplicationSlotOnDiskV2Size)
2776 0 : ereport(PANIC,
2777 : (errcode(ERRCODE_DATA_CORRUPTED),
2778 : errmsg("replication slot file \"%s\" has corrupted length %u",
2779 : path, cp.length)));
2780 :
2781 : /* Now that we know the size, read the entire file */
2782 123 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
2783 246 : readBytes = read(fd,
2784 : (char *) &cp + ReplicationSlotOnDiskConstantSize,
2785 123 : cp.length);
2786 123 : pgstat_report_wait_end();
2787 123 : if (readBytes != cp.length)
2788 : {
2789 0 : if (readBytes < 0)
2790 0 : ereport(PANIC,
2791 : (errcode_for_file_access(),
2792 : errmsg("could not read file \"%s\": %m", path)));
2793 : else
2794 0 : ereport(PANIC,
2795 : (errcode(ERRCODE_DATA_CORRUPTED),
2796 : errmsg("could not read file \"%s\": read %d of %zu",
2797 : path, readBytes, (Size) cp.length)));
2798 : }
2799 :
2800 123 : if (CloseTransientFile(fd) != 0)
2801 0 : ereport(PANIC,
2802 : (errcode_for_file_access(),
2803 : errmsg("could not close file \"%s\": %m", path)));
2804 :
2805 : /* now verify the CRC */
2806 123 : INIT_CRC32C(checksum);
2807 123 : COMP_CRC32C(checksum,
2808 : (char *) &cp + ReplicationSlotOnDiskNotChecksummedSize,
2809 : ReplicationSlotOnDiskChecksummedSize);
2810 123 : FIN_CRC32C(checksum);
2811 :
2812 123 : if (!EQ_CRC32C(checksum, cp.checksum))
2813 0 : ereport(PANIC,
2814 : (errmsg("checksum mismatch for replication slot file \"%s\": is %u, should be %u",
2815 : path, checksum, cp.checksum)));
2816 :
2817 : /*
2818 : * If we crashed with an ephemeral slot active, don't restore but delete
2819 : * it.
2820 : */
2821 123 : if (cp.slotdata.persistency != RS_PERSISTENT)
2822 : {
2823 0 : if (!rmtree(slotdir, true))
2824 : {
2825 0 : ereport(WARNING,
2826 : (errmsg("could not remove directory \"%s\"",
2827 : slotdir)));
2828 : }
2829 0 : fsync_fname(PG_REPLSLOT_DIR, true);
2830 0 : return;
2831 : }
2832 :
2833 : /*
2834 : * Verify that requirements for the specific slot type are met. That's
2835 : * important because if these aren't met we're not guaranteed to retain
2836 : * all the necessary resources for the slot.
2837 : *
2838 : * NB: We have to do so *after* the above checks for ephemeral slots,
2839 : * because otherwise a slot that shouldn't exist anymore could prevent
2840 : * restarts.
2841 : *
2842 : * NB: Changing the requirements here also requires adapting
2843 : * CheckSlotRequirements() and CheckLogicalDecodingRequirements().
2844 : */
2845 123 : if (cp.slotdata.database != InvalidOid)
2846 : {
2847 85 : if (wal_level < WAL_LEVEL_REPLICA)
2848 1 : ereport(FATAL,
2849 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2850 : errmsg("logical replication slot \"%s\" exists, but \"wal_level\" < \"replica\"",
2851 : NameStr(cp.slotdata.name)),
2852 : errhint("Change \"wal_level\" to be \"replica\" or higher.")));
2853 :
2854 : /*
2855 : * In standby mode, the hot standby must be enabled. This check is
2856 : * necessary to ensure logical slots are invalidated when they become
2857 : * incompatible due to insufficient wal_level. Otherwise, if the
2858 : * primary reduces effective_wal_level < logical while hot standby is
2859 : * disabled, primary disable logical decoding while hot standby is
2860 : * disabled, logical slots would remain valid even after promotion.
2861 : */
2862 84 : if (StandbyMode && !EnableHotStandby)
2863 1 : ereport(FATAL,
2864 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2865 : errmsg("logical replication slot \"%s\" exists on the standby, but \"hot_standby\" = \"off\"",
2866 : NameStr(cp.slotdata.name)),
2867 : errhint("Change \"hot_standby\" to be \"on\".")));
2868 : }
2869 38 : else if (wal_level < WAL_LEVEL_REPLICA)
2870 0 : ereport(FATAL,
2871 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2872 : errmsg("physical replication slot \"%s\" exists, but \"wal_level\" < \"replica\"",
2873 : NameStr(cp.slotdata.name)),
2874 : errhint("Change \"wal_level\" to be \"replica\" or higher.")));
2875 :
2876 : /*
2877 : * Nothing can be active yet, don't lock anything. Note we iterate up to
2878 : * max_replication_slots instead of adding max_repack_replication_slots as
2879 : * in all other places, because we must enforce the GUC value in case
2880 : * there were more slots before the shutdown than what it is set up to
2881 : * now.
2882 : */
2883 178 : for (i = 0; i < max_replication_slots; i++)
2884 : {
2885 : ReplicationSlot *slot;
2886 :
2887 178 : slot = &ReplicationSlotCtl->replication_slots[i];
2888 :
2889 178 : if (slot->in_use)
2890 57 : continue;
2891 :
2892 : /* restore the entire set of persistent data */
2893 121 : memcpy(&slot->data, &cp.slotdata,
2894 : sizeof(ReplicationSlotPersistentData));
2895 :
2896 : /* initialize in memory state */
2897 121 : slot->effective_xmin = cp.slotdata.xmin;
2898 121 : slot->effective_catalog_xmin = cp.slotdata.catalog_xmin;
2899 121 : slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
2900 121 : slot->last_saved_restart_lsn = cp.slotdata.restart_lsn;
2901 :
2902 121 : slot->candidate_catalog_xmin = InvalidTransactionId;
2903 121 : slot->candidate_xmin_lsn = InvalidXLogRecPtr;
2904 121 : slot->candidate_restart_lsn = InvalidXLogRecPtr;
2905 121 : slot->candidate_restart_valid = InvalidXLogRecPtr;
2906 :
2907 121 : slot->in_use = true;
2908 121 : slot->active_proc = INVALID_PROC_NUMBER;
2909 :
2910 : /*
2911 : * Set the time since the slot has become inactive after loading the
2912 : * slot from the disk into memory. Whoever acquires the slot i.e.
2913 : * makes the slot active will reset it. Use the same inactive_since
2914 : * time for all the slots.
2915 : */
2916 121 : if (now == 0)
2917 121 : now = GetCurrentTimestamp();
2918 :
2919 121 : ReplicationSlotSetInactiveSince(slot, now, false);
2920 :
2921 121 : restored = true;
2922 121 : break;
2923 : }
2924 :
2925 121 : if (!restored)
2926 0 : ereport(FATAL,
2927 : (errmsg("too many replication slots active before shutdown"),
2928 : errhint("Increase \"max_replication_slots\" and try again.")));
2929 : }
2930 :
2931 : /*
2932 : * Maps an invalidation reason for a replication slot to
2933 : * ReplicationSlotInvalidationCause.
2934 : */
2935 : ReplicationSlotInvalidationCause
2936 0 : GetSlotInvalidationCause(const char *cause_name)
2937 : {
2938 : Assert(cause_name);
2939 :
2940 : /* Search lookup table for the cause having this name */
2941 0 : for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
2942 : {
2943 0 : if (strcmp(SlotInvalidationCauses[i].cause_name, cause_name) == 0)
2944 0 : return SlotInvalidationCauses[i].cause;
2945 : }
2946 :
2947 : Assert(false);
2948 0 : return RS_INVAL_NONE; /* to keep compiler quiet */
2949 : }
2950 :
2951 : /*
2952 : * Maps a ReplicationSlotInvalidationCause to the invalidation
2953 : * reason for a replication slot.
2954 : */
2955 : const char *
2956 45 : GetSlotInvalidationCauseName(ReplicationSlotInvalidationCause cause)
2957 : {
2958 : /* Search lookup table for the name of this cause */
2959 146 : for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
2960 : {
2961 146 : if (SlotInvalidationCauses[i].cause == cause)
2962 45 : return SlotInvalidationCauses[i].cause_name;
2963 : }
2964 :
2965 : Assert(false);
2966 0 : return "none"; /* to keep compiler quiet */
2967 : }
2968 :
2969 : /*
2970 : * A helper function to validate slots specified in GUC synchronized_standby_slots.
2971 : *
2972 : * The rawname will be parsed, and the result will be saved into *elemlist.
2973 : */
2974 : static bool
2975 28 : validate_sync_standby_slots(char *rawname, List **elemlist)
2976 : {
2977 : /* Verify syntax and parse string into a list of identifiers */
2978 28 : if (!SplitIdentifierString(rawname, ',', elemlist))
2979 : {
2980 0 : GUC_check_errdetail("List syntax is invalid.");
2981 0 : return false;
2982 : }
2983 :
2984 : /* Iterate the list to validate each slot name */
2985 80 : foreach_ptr(char, name, *elemlist)
2986 : {
2987 : int err_code;
2988 28 : char *err_msg = NULL;
2989 28 : char *err_hint = NULL;
2990 :
2991 28 : if (!ReplicationSlotValidateNameInternal(name, false, &err_code,
2992 : &err_msg, &err_hint))
2993 : {
2994 2 : GUC_check_errcode(err_code);
2995 2 : GUC_check_errdetail("%s", err_msg);
2996 2 : if (err_hint != NULL)
2997 2 : GUC_check_errhint("%s", err_hint);
2998 2 : return false;
2999 : }
3000 : }
3001 :
3002 26 : return true;
3003 : }
3004 :
3005 : /*
3006 : * GUC check_hook for synchronized_standby_slots
3007 : */
3008 : bool
3009 1321 : check_synchronized_standby_slots(char **newval, void **extra, GucSource source)
3010 : {
3011 : char *rawname;
3012 : char *ptr;
3013 : List *elemlist;
3014 : int size;
3015 : bool ok;
3016 : SyncStandbySlotsConfigData *config;
3017 :
3018 1321 : if ((*newval)[0] == '\0')
3019 1293 : return true;
3020 :
3021 : /* Need a modifiable copy of the GUC string */
3022 28 : rawname = pstrdup(*newval);
3023 :
3024 : /* Now verify if the specified slots exist and have correct type */
3025 28 : ok = validate_sync_standby_slots(rawname, &elemlist);
3026 :
3027 28 : if (!ok || elemlist == NIL)
3028 : {
3029 2 : pfree(rawname);
3030 2 : list_free(elemlist);
3031 2 : return ok;
3032 : }
3033 :
3034 : /* Compute the size required for the SyncStandbySlotsConfigData struct */
3035 26 : size = offsetof(SyncStandbySlotsConfigData, slot_names);
3036 78 : foreach_ptr(char, slot_name, elemlist)
3037 26 : size += strlen(slot_name) + 1;
3038 :
3039 : /* GUC extra value must be guc_malloc'd, not palloc'd */
3040 26 : config = (SyncStandbySlotsConfigData *) guc_malloc(LOG, size);
3041 26 : if (!config)
3042 0 : return false;
3043 :
3044 : /* Transform the data into SyncStandbySlotsConfigData */
3045 26 : config->nslotnames = list_length(elemlist);
3046 :
3047 26 : ptr = config->slot_names;
3048 78 : foreach_ptr(char, slot_name, elemlist)
3049 : {
3050 26 : strcpy(ptr, slot_name);
3051 26 : ptr += strlen(slot_name) + 1;
3052 : }
3053 :
3054 26 : *extra = config;
3055 :
3056 26 : pfree(rawname);
3057 26 : list_free(elemlist);
3058 26 : return true;
3059 : }
3060 :
3061 : /*
3062 : * GUC assign_hook for synchronized_standby_slots
3063 : */
3064 : void
3065 1318 : assign_synchronized_standby_slots(const char *newval, void *extra)
3066 : {
3067 : /*
3068 : * The standby slots may have changed, so we must recompute the oldest
3069 : * LSN.
3070 : */
3071 1318 : ss_oldest_flush_lsn = InvalidXLogRecPtr;
3072 :
3073 1318 : synchronized_standby_slots_config = (SyncStandbySlotsConfigData *) extra;
3074 1318 : }
3075 :
3076 : /*
3077 : * Check if the passed slot_name is specified in the synchronized_standby_slots GUC.
3078 : */
3079 : bool
3080 39460 : SlotExistsInSyncStandbySlots(const char *slot_name)
3081 : {
3082 : const char *standby_slot_name;
3083 :
3084 : /* Return false if there is no value in synchronized_standby_slots */
3085 39460 : if (synchronized_standby_slots_config == NULL)
3086 39443 : return false;
3087 :
3088 : /*
3089 : * XXX: We are not expecting this list to be long so a linear search
3090 : * shouldn't hurt but if that turns out not to be true then we can cache
3091 : * this information for each WalSender as well.
3092 : */
3093 17 : standby_slot_name = synchronized_standby_slots_config->slot_names;
3094 25 : for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
3095 : {
3096 17 : if (strcmp(standby_slot_name, slot_name) == 0)
3097 9 : return true;
3098 :
3099 8 : standby_slot_name += strlen(standby_slot_name) + 1;
3100 : }
3101 :
3102 8 : return false;
3103 : }
3104 :
3105 : /*
3106 : * Return true if the slots specified in synchronized_standby_slots have caught up to
3107 : * the given WAL location, false otherwise.
3108 : *
3109 : * The elevel parameter specifies the error level used for logging messages
3110 : * related to slots that do not exist, are invalidated, or are inactive.
3111 : */
3112 : bool
3113 3888 : StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
3114 : {
3115 : const char *name;
3116 3888 : int caught_up_slot_num = 0;
3117 3888 : XLogRecPtr min_restart_lsn = InvalidXLogRecPtr;
3118 :
3119 : /*
3120 : * Don't need to wait for the standbys to catch up if there is no value in
3121 : * synchronized_standby_slots.
3122 : */
3123 3888 : if (synchronized_standby_slots_config == NULL)
3124 3858 : return true;
3125 :
3126 : /*
3127 : * Don't need to wait for the standbys to catch up if we are on a standby
3128 : * server, since we do not support syncing slots to cascading standbys.
3129 : */
3130 30 : if (RecoveryInProgress())
3131 0 : return true;
3132 :
3133 : /*
3134 : * Don't need to wait for the standbys to catch up if they are already
3135 : * beyond the specified WAL location.
3136 : */
3137 30 : if (XLogRecPtrIsValid(ss_oldest_flush_lsn) &&
3138 17 : ss_oldest_flush_lsn >= wait_for_lsn)
3139 8 : return true;
3140 :
3141 : /*
3142 : * To prevent concurrent slot dropping and creation while filtering the
3143 : * slots, take the ReplicationSlotControlLock outside of the loop.
3144 : */
3145 22 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
3146 :
3147 22 : name = synchronized_standby_slots_config->slot_names;
3148 30 : for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
3149 : {
3150 : XLogRecPtr restart_lsn;
3151 : bool invalidated;
3152 : bool inactive;
3153 : ReplicationSlot *slot;
3154 :
3155 22 : slot = SearchNamedReplicationSlot(name, false);
3156 :
3157 : /*
3158 : * If a slot name provided in synchronized_standby_slots does not
3159 : * exist, report a message and exit the loop.
3160 : */
3161 22 : if (!slot)
3162 : {
3163 0 : ereport(elevel,
3164 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
3165 : errmsg("replication slot \"%s\" specified in parameter \"%s\" does not exist",
3166 : name, "synchronized_standby_slots"),
3167 : errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
3168 : name),
3169 : errhint("Create the replication slot \"%s\" or amend parameter \"%s\".",
3170 : name, "synchronized_standby_slots"));
3171 0 : break;
3172 : }
3173 :
3174 : /* Same as above: if a slot is not physical, exit the loop. */
3175 22 : if (SlotIsLogical(slot))
3176 : {
3177 0 : ereport(elevel,
3178 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
3179 : errmsg("cannot specify logical replication slot \"%s\" in parameter \"%s\"",
3180 : name, "synchronized_standby_slots"),
3181 : errdetail("Logical replication is waiting for correction on replication slot \"%s\".",
3182 : name),
3183 : errhint("Remove the logical replication slot \"%s\" from parameter \"%s\".",
3184 : name, "synchronized_standby_slots"));
3185 0 : break;
3186 : }
3187 :
3188 22 : SpinLockAcquire(&slot->mutex);
3189 22 : restart_lsn = slot->data.restart_lsn;
3190 22 : invalidated = slot->data.invalidated != RS_INVAL_NONE;
3191 22 : inactive = slot->active_proc == INVALID_PROC_NUMBER;
3192 22 : SpinLockRelease(&slot->mutex);
3193 :
3194 22 : if (invalidated)
3195 : {
3196 : /* Specified physical slot has been invalidated */
3197 0 : ereport(elevel,
3198 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
3199 : errmsg("physical replication slot \"%s\" specified in parameter \"%s\" has been invalidated",
3200 : name, "synchronized_standby_slots"),
3201 : errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
3202 : name),
3203 : errhint("Drop and recreate the replication slot \"%s\", or amend parameter \"%s\".",
3204 : name, "synchronized_standby_slots"));
3205 0 : break;
3206 : }
3207 :
3208 22 : if (!XLogRecPtrIsValid(restart_lsn) || restart_lsn < wait_for_lsn)
3209 : {
3210 : /* Log a message if no active_pid for this physical slot */
3211 14 : if (inactive)
3212 7 : ereport(elevel,
3213 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
3214 : errmsg("replication slot \"%s\" specified in parameter \"%s\" does not have active_pid",
3215 : name, "synchronized_standby_slots"),
3216 : errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
3217 : name),
3218 : errhint("Start the standby associated with the replication slot \"%s\", or amend parameter \"%s\".",
3219 : name, "synchronized_standby_slots"));
3220 :
3221 : /* Continue if the current slot hasn't caught up. */
3222 14 : break;
3223 : }
3224 :
3225 : Assert(restart_lsn >= wait_for_lsn);
3226 :
3227 8 : if (!XLogRecPtrIsValid(min_restart_lsn) ||
3228 : min_restart_lsn > restart_lsn)
3229 8 : min_restart_lsn = restart_lsn;
3230 :
3231 8 : caught_up_slot_num++;
3232 :
3233 8 : name += strlen(name) + 1;
3234 : }
3235 :
3236 22 : LWLockRelease(ReplicationSlotControlLock);
3237 :
3238 : /*
3239 : * Return false if not all the standbys have caught up to the specified
3240 : * WAL location.
3241 : */
3242 22 : if (caught_up_slot_num != synchronized_standby_slots_config->nslotnames)
3243 14 : return false;
3244 :
3245 : /* The ss_oldest_flush_lsn must not retreat. */
3246 : Assert(!XLogRecPtrIsValid(ss_oldest_flush_lsn) ||
3247 : min_restart_lsn >= ss_oldest_flush_lsn);
3248 :
3249 8 : ss_oldest_flush_lsn = min_restart_lsn;
3250 :
3251 8 : return true;
3252 : }
3253 :
3254 : /*
3255 : * Wait for physical standbys to confirm receiving the given lsn.
3256 : *
3257 : * Used by logical decoding SQL functions. It waits for physical standbys
3258 : * corresponding to the physical slots specified in the synchronized_standby_slots GUC.
3259 : */
3260 : void
3261 236 : WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
3262 : {
3263 : /*
3264 : * Don't need to wait for the standby to catch up if the current acquired
3265 : * slot is not a logical failover slot, or there is no value in
3266 : * synchronized_standby_slots.
3267 : */
3268 236 : if (!MyReplicationSlot->data.failover || !synchronized_standby_slots_config)
3269 235 : return;
3270 :
3271 1 : ConditionVariablePrepareToSleep(&WalSndCtl->wal_confirm_rcv_cv);
3272 :
3273 : for (;;)
3274 : {
3275 2 : CHECK_FOR_INTERRUPTS();
3276 :
3277 2 : if (ConfigReloadPending)
3278 : {
3279 1 : ConfigReloadPending = false;
3280 1 : ProcessConfigFile(PGC_SIGHUP);
3281 : }
3282 :
3283 : /* Exit if done waiting for every slot. */
3284 2 : if (StandbySlotsHaveCaughtup(wait_for_lsn, WARNING))
3285 1 : break;
3286 :
3287 : /*
3288 : * Wait for the slots in the synchronized_standby_slots to catch up,
3289 : * but use a timeout (1s) so we can also check if the
3290 : * synchronized_standby_slots has been changed.
3291 : */
3292 1 : ConditionVariableTimedSleep(&WalSndCtl->wal_confirm_rcv_cv, 1000,
3293 : WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION);
3294 : }
3295 :
3296 1 : ConditionVariableCancelSleep();
3297 : }
|