Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * slot.c
4 : * Replication slot management.
5 : *
6 : *
7 : * Copyright (c) 2012-2026, PostgreSQL Global Development Group
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/backend/replication/slot.c
12 : *
13 : * NOTES
14 : *
15 : * Replication slots are used to keep state about replication streams
16 : * originating from this cluster. Their primary purpose is to prevent the
17 : * premature removal of WAL or of old tuple versions in a manner that would
18 : * interfere with replication; they are also useful for monitoring purposes.
19 : * Slots need to be permanent (to allow restarts), crash-safe, and allocatable
20 : * on standbys (to support cascading setups). The requirement that slots be
21 : * usable on standbys precludes storing them in the system catalogs.
22 : *
23 : * Each replication slot gets its own directory inside the directory
24 : * $PGDATA / PG_REPLSLOT_DIR. Inside that directory the state file will
25 : * contain the slot's own data. Additional data can be stored alongside that
26 : * file if required. While the server is running, the state data is also
27 : * cached in memory for efficiency.
28 : *
29 : * ReplicationSlotAllocationLock must be taken in exclusive mode to allocate
30 : * or free a slot. ReplicationSlotControlLock must be taken in shared mode
31 : * to iterate over the slots, and in exclusive mode to change the in_use flag
32 : * of a slot. The remaining data in each slot is protected by its mutex.
33 : *
34 : *-------------------------------------------------------------------------
35 : */
36 :
37 : #include "postgres.h"
38 :
39 : #include <unistd.h>
40 : #include <sys/stat.h>
41 :
42 : #include "access/transam.h"
43 : #include "access/xlog_internal.h"
44 : #include "access/xlogrecovery.h"
45 : #include "common/file_utils.h"
46 : #include "common/string.h"
47 : #include "miscadmin.h"
48 : #include "pgstat.h"
49 : #include "postmaster/interrupt.h"
50 : #include "replication/logicallauncher.h"
51 : #include "replication/slotsync.h"
52 : #include "replication/slot.h"
53 : #include "replication/walsender_private.h"
54 : #include "storage/fd.h"
55 : #include "storage/ipc.h"
56 : #include "storage/proc.h"
57 : #include "storage/procarray.h"
58 : #include "utils/builtins.h"
59 : #include "utils/guc_hooks.h"
60 : #include "utils/injection_point.h"
61 : #include "utils/varlena.h"
62 :
63 : /*
64 : * Replication slot on-disk data structure.
65 : */
66 : typedef struct ReplicationSlotOnDisk
67 : {
68 : /* first part of this struct needs to be version independent */
69 :
70 : /* data not covered by checksum */
71 : uint32 magic;
72 : pg_crc32c checksum;
73 :
74 : /* data covered by checksum */
75 : uint32 version;
76 : uint32 length;
77 :
78 : /*
79 : * The actual data in the slot that follows can differ based on the above
80 : * 'version'.
81 : */
82 :
83 : ReplicationSlotPersistentData slotdata;
84 : } ReplicationSlotOnDisk;
85 :
86 : /*
87 : * Struct for the configuration of synchronized_standby_slots.
88 : *
89 : * Note: this must be a flat representation that can be held in a single chunk
90 : * of guc_malloc'd memory, so that it can be stored as the "extra" data for the
91 : * synchronized_standby_slots GUC.
92 : */
93 : typedef struct
94 : {
95 : /* Number of slot names in the slot_names[] */
96 : int nslotnames;
97 :
98 : /*
99 : * slot_names contains 'nslotnames' consecutive null-terminated C strings.
100 : */
101 : char slot_names[FLEXIBLE_ARRAY_MEMBER];
102 : } SyncStandbySlotsConfigData;
103 :
104 : /*
105 : * Lookup table for slot invalidation causes.
106 : */
107 : typedef struct SlotInvalidationCauseMap
108 : {
109 : ReplicationSlotInvalidationCause cause;
110 : const char *cause_name;
111 : } SlotInvalidationCauseMap;
112 :
113 : static const SlotInvalidationCauseMap SlotInvalidationCauses[] = {
114 : {RS_INVAL_NONE, "none"},
115 : {RS_INVAL_WAL_REMOVED, "wal_removed"},
116 : {RS_INVAL_HORIZON, "rows_removed"},
117 : {RS_INVAL_WAL_LEVEL, "wal_level_insufficient"},
118 : {RS_INVAL_IDLE_TIMEOUT, "idle_timeout"},
119 : };
120 :
121 : /*
122 : * Ensure that the lookup table is up-to-date with the enums defined in
123 : * ReplicationSlotInvalidationCause.
124 : */
125 : StaticAssertDecl(lengthof(SlotInvalidationCauses) == (RS_INVAL_MAX_CAUSES + 1),
126 : "array length mismatch");
127 :
128 : /* size of version independent data */
129 : #define ReplicationSlotOnDiskConstantSize \
130 : offsetof(ReplicationSlotOnDisk, slotdata)
131 : /* size of the part of the slot not covered by the checksum */
132 : #define ReplicationSlotOnDiskNotChecksummedSize \
133 : offsetof(ReplicationSlotOnDisk, version)
134 : /* size of the part covered by the checksum */
135 : #define ReplicationSlotOnDiskChecksummedSize \
136 : sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskNotChecksummedSize
137 : /* size of the slot data that is version dependent */
138 : #define ReplicationSlotOnDiskV2Size \
139 : sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
140 :
141 : #define SLOT_MAGIC 0x1051CA1 /* format identifier */
142 : #define SLOT_VERSION 5 /* version for new files */
143 :
144 : /* Control array for replication slot management */
145 : ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
146 :
147 : /* My backend's replication slot in the shared memory array */
148 : ReplicationSlot *MyReplicationSlot = NULL;
149 :
150 : /* GUC variables */
151 : int max_replication_slots = 10; /* the maximum number of replication
152 : * slots */
153 :
154 : /*
155 : * Invalidate replication slots that have remained idle longer than this
156 : * duration; '0' disables it.
157 : */
158 : int idle_replication_slot_timeout_secs = 0;
159 :
160 : /*
161 : * This GUC lists streaming replication standby server slot names that
162 : * logical WAL sender processes will wait for.
163 : */
164 : char *synchronized_standby_slots;
165 :
166 : /* This is the parsed and cached configuration for synchronized_standby_slots */
167 : static SyncStandbySlotsConfigData *synchronized_standby_slots_config;
168 :
169 : /*
170 : * Oldest LSN that has been confirmed to be flushed to the standbys
171 : * corresponding to the physical slots specified in the synchronized_standby_slots GUC.
172 : */
173 : static XLogRecPtr ss_oldest_flush_lsn = InvalidXLogRecPtr;
174 :
175 : static void ReplicationSlotShmemExit(int code, Datum arg);
176 : static bool IsSlotForConflictCheck(const char *name);
177 : static void ReplicationSlotDropPtr(ReplicationSlot *slot);
178 :
179 : /* internal persistency functions */
180 : static void RestoreSlotFromDisk(const char *name);
181 : static void CreateSlotOnDisk(ReplicationSlot *slot);
182 : static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel);
183 :
184 : /*
185 : * Report shared-memory space needed by ReplicationSlotsShmemInit.
186 : */
187 : Size
188 8810 : ReplicationSlotsShmemSize(void)
189 : {
190 8810 : Size size = 0;
191 :
192 8810 : if (max_replication_slots == 0)
193 4 : return size;
194 :
195 8806 : size = offsetof(ReplicationSlotCtlData, replication_slots);
196 8806 : size = add_size(size,
197 : mul_size(max_replication_slots, sizeof(ReplicationSlot)));
198 :
199 8806 : return size;
200 : }
201 :
202 : /*
203 : * Allocate and initialize shared memory for replication slots.
204 : */
205 : void
206 2280 : ReplicationSlotsShmemInit(void)
207 : {
208 : bool found;
209 :
210 2280 : if (max_replication_slots == 0)
211 2 : return;
212 :
213 2278 : ReplicationSlotCtl = (ReplicationSlotCtlData *)
214 2278 : ShmemInitStruct("ReplicationSlot Ctl", ReplicationSlotsShmemSize(),
215 : &found);
216 :
217 2278 : if (!found)
218 : {
219 : int i;
220 :
221 : /* First time through, so initialize */
222 4498 : MemSet(ReplicationSlotCtl, 0, ReplicationSlotsShmemSize());
223 :
224 24674 : for (i = 0; i < max_replication_slots; i++)
225 : {
226 22396 : ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i];
227 :
228 : /* everything else is zeroed by the memset above */
229 22396 : slot->active_proc = INVALID_PROC_NUMBER;
230 22396 : SpinLockInit(&slot->mutex);
231 22396 : LWLockInitialize(&slot->io_in_progress_lock,
232 : LWTRANCHE_REPLICATION_SLOT_IO);
233 22396 : ConditionVariableInit(&slot->active_cv);
234 : }
235 : }
236 : }
237 :
238 : /*
239 : * Register the callback for replication slot cleanup and releasing.
240 : */
241 : void
242 45122 : ReplicationSlotInitialize(void)
243 : {
244 45122 : before_shmem_exit(ReplicationSlotShmemExit, 0);
245 45122 : }
246 :
247 : /*
248 : * Release and cleanup replication slots.
249 : */
250 : static void
251 45122 : ReplicationSlotShmemExit(int code, Datum arg)
252 : {
253 : /* Make sure active replication slots are released */
254 45122 : if (MyReplicationSlot != NULL)
255 524 : ReplicationSlotRelease();
256 :
257 : /* Also cleanup all the temporary slots. */
258 45122 : ReplicationSlotCleanup(false);
259 45122 : }
260 :
261 : /*
262 : * Check whether the passed slot name is valid and report errors at elevel.
263 : *
264 : * See comments for ReplicationSlotValidateNameInternal().
265 : */
266 : bool
267 1644 : ReplicationSlotValidateName(const char *name, bool allow_reserved_name,
268 : int elevel)
269 : {
270 : int err_code;
271 1644 : char *err_msg = NULL;
272 1644 : char *err_hint = NULL;
273 :
274 1644 : if (!ReplicationSlotValidateNameInternal(name, allow_reserved_name,
275 : &err_code, &err_msg, &err_hint))
276 : {
277 : /*
278 : * Use errmsg_internal() and errhint_internal() instead of errmsg()
279 : * and errhint(), since the messages from
280 : * ReplicationSlotValidateNameInternal() are already translated. This
281 : * avoids double translation.
282 : */
283 8 : ereport(elevel,
284 : errcode(err_code),
285 : errmsg_internal("%s", err_msg),
286 : (err_hint != NULL) ? errhint_internal("%s", err_hint) : 0);
287 :
288 0 : pfree(err_msg);
289 0 : if (err_hint != NULL)
290 0 : pfree(err_hint);
291 0 : return false;
292 : }
293 :
294 1636 : return true;
295 : }
296 :
297 : /*
298 : * Check whether the passed slot name is valid.
299 : *
300 : * An error will be reported for a reserved replication slot name if
301 : * allow_reserved_name is set to false.
302 : *
303 : * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow
304 : * the name to be used as a directory name on every supported OS.
305 : *
306 : * Returns true if the slot name is valid. Otherwise, returns false and stores
307 : * the error code, error message, and optional hint in err_code, err_msg, and
308 : * err_hint, respectively. The caller is responsible for freeing err_msg and
309 : * err_hint, which are palloc'd.
310 : */
311 : bool
312 2082 : ReplicationSlotValidateNameInternal(const char *name, bool allow_reserved_name,
313 : int *err_code, char **err_msg, char **err_hint)
314 : {
315 : const char *cp;
316 :
317 2082 : if (strlen(name) == 0)
318 : {
319 6 : *err_code = ERRCODE_INVALID_NAME;
320 6 : *err_msg = psprintf(_("replication slot name \"%s\" is too short"), name);
321 6 : *err_hint = NULL;
322 6 : return false;
323 : }
324 :
325 2076 : if (strlen(name) >= NAMEDATALEN)
326 : {
327 0 : *err_code = ERRCODE_NAME_TOO_LONG;
328 0 : *err_msg = psprintf(_("replication slot name \"%s\" is too long"), name);
329 0 : *err_hint = NULL;
330 0 : return false;
331 : }
332 :
333 39696 : for (cp = name; *cp; cp++)
334 : {
335 37624 : if (!((*cp >= 'a' && *cp <= 'z')
336 17526 : || (*cp >= '0' && *cp <= '9')
337 3682 : || (*cp == '_')))
338 : {
339 4 : *err_code = ERRCODE_INVALID_NAME;
340 4 : *err_msg = psprintf(_("replication slot name \"%s\" contains invalid character"), name);
341 4 : *err_hint = psprintf(_("Replication slot names may only contain lower case letters, numbers, and the underscore character."));
342 4 : return false;
343 : }
344 : }
345 :
346 2072 : if (!allow_reserved_name && IsSlotForConflictCheck(name))
347 : {
348 2 : *err_code = ERRCODE_RESERVED_NAME;
349 2 : *err_msg = psprintf(_("replication slot name \"%s\" is reserved"), name);
350 2 : *err_hint = psprintf(_("The name \"%s\" is reserved for the conflict detection slot."),
351 : CONFLICT_DETECTION_SLOT);
352 2 : return false;
353 : }
354 :
355 2070 : return true;
356 : }
357 :
358 : /*
359 : * Return true if the replication slot name is "pg_conflict_detection".
360 : */
361 : static bool
362 4434 : IsSlotForConflictCheck(const char *name)
363 : {
364 4434 : return (strcmp(name, CONFLICT_DETECTION_SLOT) == 0);
365 : }
366 :
367 : /*
368 : * Create a new replication slot and mark it as used by this backend.
369 : *
370 : * name: Name of the slot
371 : * db_specific: logical decoding is db specific; if the slot is going to
372 : * be used for that pass true, otherwise false.
373 : * two_phase: If enabled, allows decoding of prepared transactions.
374 : * failover: If enabled, allows the slot to be synced to standbys so
375 : * that logical replication can be resumed after failover.
376 : * synced: True if the slot is synchronized from the primary server.
377 : */
378 : void
379 1370 : ReplicationSlotCreate(const char *name, bool db_specific,
380 : ReplicationSlotPersistency persistency,
381 : bool two_phase, bool failover, bool synced)
382 : {
383 1370 : ReplicationSlot *slot = NULL;
384 : int i;
385 :
386 : Assert(MyReplicationSlot == NULL);
387 :
388 : /*
389 : * The logical launcher or pg_upgrade may create or migrate an internal
390 : * slot, so using a reserved name is allowed in these cases.
391 : */
392 1370 : ReplicationSlotValidateName(name, IsBinaryUpgrade || IsLogicalLauncher(),
393 : ERROR);
394 :
395 1368 : if (failover)
396 : {
397 : /*
398 : * Do not allow users to create the failover enabled slots on the
399 : * standby as we do not support sync to the cascading standby.
400 : *
401 : * However, failover enabled slots can be created during slot
402 : * synchronization because we need to retain the same values as the
403 : * remote slot.
404 : */
405 54 : if (RecoveryInProgress() && !IsSyncingReplicationSlots())
406 0 : ereport(ERROR,
407 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
408 : errmsg("cannot enable failover for a replication slot created on the standby"));
409 :
410 : /*
411 : * Do not allow users to create failover enabled temporary slots,
412 : * because temporary slots will not be synced to the standby.
413 : *
414 : * However, failover enabled temporary slots can be created during
415 : * slot synchronization. See the comments atop slotsync.c for details.
416 : */
417 54 : if (persistency == RS_TEMPORARY && !IsSyncingReplicationSlots())
418 2 : ereport(ERROR,
419 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
420 : errmsg("cannot enable failover for a temporary replication slot"));
421 : }
422 :
423 : /*
424 : * If some other backend ran this code concurrently with us, we'd likely
425 : * both allocate the same slot, and that would be bad. We'd also be at
426 : * risk of missing a name collision. Also, we don't want to try to create
427 : * a new slot while somebody's busy cleaning up an old one, because we
428 : * might both be monkeying with the same directory.
429 : */
430 1366 : LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
431 :
432 : /*
433 : * Check for name collision, and identify an allocatable slot. We need to
434 : * hold ReplicationSlotControlLock in shared mode for this, so that nobody
435 : * else can change the in_use flags while we're looking at them.
436 : */
437 1366 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
438 13210 : for (i = 0; i < max_replication_slots; i++)
439 : {
440 11850 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
441 :
442 11850 : if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
443 6 : ereport(ERROR,
444 : (errcode(ERRCODE_DUPLICATE_OBJECT),
445 : errmsg("replication slot \"%s\" already exists", name)));
446 11844 : if (!s->in_use && slot == NULL)
447 1358 : slot = s;
448 : }
449 1360 : LWLockRelease(ReplicationSlotControlLock);
450 :
451 : /* If all slots are in use, we're out of luck. */
452 1360 : if (slot == NULL)
453 2 : ereport(ERROR,
454 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
455 : errmsg("all replication slots are in use"),
456 : errhint("Free one or increase \"max_replication_slots\".")));
457 :
458 : /*
459 : * Since this slot is not in use, nobody should be looking at any part of
460 : * it other than the in_use field unless they're trying to allocate it.
461 : * And since we hold ReplicationSlotAllocationLock, nobody except us can
462 : * be doing that. So it's safe to initialize the slot.
463 : */
464 : Assert(!slot->in_use);
465 : Assert(slot->active_proc == INVALID_PROC_NUMBER);
466 :
467 : /* first initialize persistent data */
468 1358 : memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData));
469 1358 : namestrcpy(&slot->data.name, name);
470 1358 : slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
471 1358 : slot->data.persistency = persistency;
472 1358 : slot->data.two_phase = two_phase;
473 1358 : slot->data.two_phase_at = InvalidXLogRecPtr;
474 1358 : slot->data.failover = failover;
475 1358 : slot->data.synced = synced;
476 :
477 : /* and then data only present in shared memory */
478 1358 : slot->just_dirtied = false;
479 1358 : slot->dirty = false;
480 1358 : slot->effective_xmin = InvalidTransactionId;
481 1358 : slot->effective_catalog_xmin = InvalidTransactionId;
482 1358 : slot->candidate_catalog_xmin = InvalidTransactionId;
483 1358 : slot->candidate_xmin_lsn = InvalidXLogRecPtr;
484 1358 : slot->candidate_restart_valid = InvalidXLogRecPtr;
485 1358 : slot->candidate_restart_lsn = InvalidXLogRecPtr;
486 1358 : slot->last_saved_confirmed_flush = InvalidXLogRecPtr;
487 1358 : slot->last_saved_restart_lsn = InvalidXLogRecPtr;
488 1358 : slot->inactive_since = 0;
489 1358 : slot->slotsync_skip_reason = SS_SKIP_NONE;
490 :
491 : /*
492 : * Create the slot on disk. We haven't actually marked the slot allocated
493 : * yet, so no special cleanup is required if this errors out.
494 : */
495 1358 : CreateSlotOnDisk(slot);
496 :
497 : /*
498 : * We need to briefly prevent any other backend from iterating over the
499 : * slots while we flip the in_use flag. We also need to set the active
500 : * flag while holding the ControlLock as otherwise a concurrent
501 : * ReplicationSlotAcquire() could acquire the slot as well.
502 : */
503 1358 : LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
504 :
505 1358 : slot->in_use = true;
506 :
507 : /* We can now mark the slot active, and that makes it our slot. */
508 1358 : SpinLockAcquire(&slot->mutex);
509 : Assert(slot->active_proc == INVALID_PROC_NUMBER);
510 1358 : slot->active_proc = MyProcNumber;
511 1358 : SpinLockRelease(&slot->mutex);
512 1358 : MyReplicationSlot = slot;
513 :
514 1358 : LWLockRelease(ReplicationSlotControlLock);
515 :
516 : /*
517 : * Create statistics entry for the new logical slot. We don't collect any
518 : * stats for physical slots, so no need to create an entry for the same.
519 : * See ReplicationSlotDropPtr for why we need to do this before releasing
520 : * ReplicationSlotAllocationLock.
521 : */
522 1358 : if (SlotIsLogical(slot))
523 972 : pgstat_create_replslot(slot);
524 :
525 : /*
526 : * Now that the slot has been marked as in_use and active, it's safe to
527 : * let somebody else try to allocate a slot.
528 : */
529 1358 : LWLockRelease(ReplicationSlotAllocationLock);
530 :
531 : /* Let everybody know we've modified this slot */
532 1358 : ConditionVariableBroadcast(&slot->active_cv);
533 1358 : }
534 :
535 : /*
536 : * Search for the named replication slot.
537 : *
538 : * Return the replication slot if found, otherwise NULL.
539 : */
540 : ReplicationSlot *
541 3922 : SearchNamedReplicationSlot(const char *name, bool need_lock)
542 : {
543 : int i;
544 3922 : ReplicationSlot *slot = NULL;
545 :
546 3922 : if (need_lock)
547 1146 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
548 :
549 15080 : for (i = 0; i < max_replication_slots; i++)
550 : {
551 14136 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
552 :
553 14136 : if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
554 : {
555 2978 : slot = s;
556 2978 : break;
557 : }
558 : }
559 :
560 3922 : if (need_lock)
561 1146 : LWLockRelease(ReplicationSlotControlLock);
562 :
563 3922 : return slot;
564 : }
565 :
566 : /*
567 : * Return the index of the replication slot in
568 : * ReplicationSlotCtl->replication_slots.
569 : *
570 : * This is mainly useful to have an efficient key for storing replication slot
571 : * stats.
572 : */
573 : int
574 16646 : ReplicationSlotIndex(ReplicationSlot *slot)
575 : {
576 : Assert(slot >= ReplicationSlotCtl->replication_slots &&
577 : slot < ReplicationSlotCtl->replication_slots + max_replication_slots);
578 :
579 16646 : return slot - ReplicationSlotCtl->replication_slots;
580 : }
581 :
582 : /*
583 : * If the slot at 'index' is unused, return false. Otherwise 'name' is set to
584 : * the slot's name and true is returned.
585 : *
586 : * This likely is only useful for pgstat_replslot.c during shutdown, in other
587 : * cases there are obvious TOCTOU issues.
588 : */
589 : bool
590 212 : ReplicationSlotName(int index, Name name)
591 : {
592 : ReplicationSlot *slot;
593 : bool found;
594 :
595 212 : slot = &ReplicationSlotCtl->replication_slots[index];
596 :
597 : /*
598 : * Ensure that the slot cannot be dropped while we copy the name. Don't
599 : * need the spinlock as the name of an existing slot cannot change.
600 : */
601 212 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
602 212 : found = slot->in_use;
603 212 : if (slot->in_use)
604 212 : namestrcpy(name, NameStr(slot->data.name));
605 212 : LWLockRelease(ReplicationSlotControlLock);
606 :
607 212 : return found;
608 : }
609 :
610 : /*
611 : * Find a previously created slot and mark it as used by this process.
612 : *
613 : * An error is raised if nowait is true and the slot is currently in use. If
614 : * nowait is false, we sleep until the slot is released by the owning process.
615 : *
616 : * An error is raised if error_if_invalid is true and the slot is found to
617 : * be invalid. It should always be set to true, except when we are temporarily
618 : * acquiring the slot and don't intend to change it.
619 : */
620 : void
621 2630 : ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
622 : {
623 : ReplicationSlot *s;
624 : ProcNumber active_proc;
625 : int active_pid;
626 :
627 : Assert(name != NULL);
628 :
629 2630 : retry:
630 : Assert(MyReplicationSlot == NULL);
631 :
632 2630 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
633 :
634 : /* Check if the slot exists with the given name. */
635 2630 : s = SearchNamedReplicationSlot(name, false);
636 2630 : if (s == NULL || !s->in_use)
637 : {
638 18 : LWLockRelease(ReplicationSlotControlLock);
639 :
640 18 : ereport(ERROR,
641 : (errcode(ERRCODE_UNDEFINED_OBJECT),
642 : errmsg("replication slot \"%s\" does not exist",
643 : name)));
644 : }
645 :
646 : /*
647 : * Do not allow users to acquire the reserved slot. This scenario may
648 : * occur if the launcher that owns the slot has terminated unexpectedly
649 : * due to an error, and a backend process attempts to reuse the slot.
650 : */
651 2612 : if (!IsLogicalLauncher() && IsSlotForConflictCheck(name))
652 0 : ereport(ERROR,
653 : errcode(ERRCODE_UNDEFINED_OBJECT),
654 : errmsg("cannot acquire replication slot \"%s\"", name),
655 : errdetail("The slot is reserved for conflict detection and can only be acquired by logical replication launcher."));
656 :
657 : /*
658 : * This is the slot we want; check if it's active under some other
659 : * process. In single user mode, we don't need this check.
660 : */
661 2612 : if (IsUnderPostmaster)
662 : {
663 : /*
664 : * Get ready to sleep on the slot in case it is active. (We may end
665 : * up not sleeping, but we don't want to do this while holding the
666 : * spinlock.)
667 : */
668 2602 : if (!nowait)
669 556 : ConditionVariablePrepareToSleep(&s->active_cv);
670 :
671 : /*
672 : * It is important to reset the inactive_since under spinlock here to
673 : * avoid race conditions with slot invalidation. See comments related
674 : * to inactive_since in InvalidatePossiblyObsoleteSlot.
675 : */
676 2602 : SpinLockAcquire(&s->mutex);
677 2602 : if (s->active_proc == INVALID_PROC_NUMBER)
678 2306 : s->active_proc = MyProcNumber;
679 2602 : active_proc = s->active_proc;
680 2602 : ReplicationSlotSetInactiveSince(s, 0, false);
681 2602 : SpinLockRelease(&s->mutex);
682 : }
683 : else
684 : {
685 10 : s->active_proc = active_proc = MyProcNumber;
686 10 : ReplicationSlotSetInactiveSince(s, 0, true);
687 : }
688 2612 : active_pid = GetPGProcByNumber(active_proc)->pid;
689 2612 : LWLockRelease(ReplicationSlotControlLock);
690 :
691 : /*
692 : * If we found the slot but it's already active in another process, we
693 : * wait until the owning process signals us that it's been released, or
694 : * error out.
695 : */
696 2612 : if (active_proc != MyProcNumber)
697 : {
698 0 : if (!nowait)
699 : {
700 : /* Wait here until we get signaled, and then restart */
701 0 : ConditionVariableSleep(&s->active_cv,
702 : WAIT_EVENT_REPLICATION_SLOT_DROP);
703 0 : ConditionVariableCancelSleep();
704 0 : goto retry;
705 : }
706 :
707 0 : ereport(ERROR,
708 : (errcode(ERRCODE_OBJECT_IN_USE),
709 : errmsg("replication slot \"%s\" is active for PID %d",
710 : NameStr(s->data.name), active_pid)));
711 : }
712 2612 : else if (!nowait)
713 556 : ConditionVariableCancelSleep(); /* no sleep needed after all */
714 :
715 : /* We made this slot active, so it's ours now. */
716 2612 : MyReplicationSlot = s;
717 :
718 : /*
719 : * We need to check for invalidation after making the slot ours to avoid
720 : * the possible race condition with the checkpointer that can otherwise
721 : * invalidate the slot immediately after the check.
722 : */
723 2612 : if (error_if_invalid && s->data.invalidated != RS_INVAL_NONE)
724 14 : ereport(ERROR,
725 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
726 : errmsg("can no longer access replication slot \"%s\"",
727 : NameStr(s->data.name)),
728 : errdetail("This replication slot has been invalidated due to \"%s\".",
729 : GetSlotInvalidationCauseName(s->data.invalidated)));
730 :
731 : /* Let everybody know we've modified this slot */
732 2598 : ConditionVariableBroadcast(&s->active_cv);
733 :
734 : /*
735 : * The call to pgstat_acquire_replslot() protects against stats for a
736 : * different slot, from before a restart or such, being present during
737 : * pgstat_report_replslot().
738 : */
739 2598 : if (SlotIsLogical(s))
740 2164 : pgstat_acquire_replslot(s);
741 :
742 :
743 2598 : if (am_walsender)
744 : {
745 1762 : ereport(log_replication_commands ? LOG : DEBUG1,
746 : SlotIsLogical(s)
747 : ? errmsg("acquired logical replication slot \"%s\"",
748 : NameStr(s->data.name))
749 : : errmsg("acquired physical replication slot \"%s\"",
750 : NameStr(s->data.name)));
751 : }
752 2598 : }
753 :
754 : /*
755 : * Release the replication slot that this backend considers to own.
756 : *
757 : * This or another backend can re-acquire the slot later.
758 : * Resources this slot requires will be preserved.
759 : */
760 : void
761 3162 : ReplicationSlotRelease(void)
762 : {
763 3162 : ReplicationSlot *slot = MyReplicationSlot;
764 3162 : char *slotname = NULL; /* keep compiler quiet */
765 : bool is_logical;
766 3162 : TimestampTz now = 0;
767 :
768 : Assert(slot != NULL && slot->active_proc != INVALID_PROC_NUMBER);
769 :
770 3162 : is_logical = SlotIsLogical(slot);
771 :
772 3162 : if (am_walsender)
773 2198 : slotname = pstrdup(NameStr(slot->data.name));
774 :
775 3162 : if (slot->data.persistency == RS_EPHEMERAL)
776 : {
777 : /*
778 : * Delete the slot. There is no !PANIC case where this is allowed to
779 : * fail, all that may happen is an incomplete cleanup of the on-disk
780 : * data.
781 : */
782 12 : ReplicationSlotDropAcquired();
783 :
784 : /*
785 : * Request to disable logical decoding, even though this slot may not
786 : * have been the last logical slot. The checkpointer will verify if
787 : * logical decoding should actually be disabled.
788 : */
789 12 : if (is_logical)
790 12 : RequestDisableLogicalDecoding();
791 : }
792 :
793 : /*
794 : * If slot needed to temporarily restrain both data and catalog xmin to
795 : * create the catalog snapshot, remove that temporary constraint.
796 : * Snapshots can only be exported while the initial snapshot is still
797 : * acquired.
798 : */
799 3162 : if (!TransactionIdIsValid(slot->data.xmin) &&
800 3100 : TransactionIdIsValid(slot->effective_xmin))
801 : {
802 402 : SpinLockAcquire(&slot->mutex);
803 402 : slot->effective_xmin = InvalidTransactionId;
804 402 : SpinLockRelease(&slot->mutex);
805 402 : ReplicationSlotsComputeRequiredXmin(false);
806 : }
807 :
808 : /*
809 : * Set the time since the slot has become inactive. We get the current
810 : * time beforehand to avoid system call while holding the spinlock.
811 : */
812 3162 : now = GetCurrentTimestamp();
813 :
814 3162 : if (slot->data.persistency == RS_PERSISTENT)
815 : {
816 : /*
817 : * Mark persistent slot inactive. We're not freeing it, just
818 : * disconnecting, but wake up others that may be waiting for it.
819 : */
820 2554 : SpinLockAcquire(&slot->mutex);
821 2554 : slot->active_proc = INVALID_PROC_NUMBER;
822 2554 : ReplicationSlotSetInactiveSince(slot, now, false);
823 2554 : SpinLockRelease(&slot->mutex);
824 2554 : ConditionVariableBroadcast(&slot->active_cv);
825 : }
826 : else
827 608 : ReplicationSlotSetInactiveSince(slot, now, true);
828 :
829 3162 : MyReplicationSlot = NULL;
830 :
831 : /* might not have been set when we've been a plain slot */
832 3162 : LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
833 3162 : MyProc->statusFlags &= ~PROC_IN_LOGICAL_DECODING;
834 3162 : ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags;
835 3162 : LWLockRelease(ProcArrayLock);
836 :
837 3162 : if (am_walsender)
838 : {
839 2198 : ereport(log_replication_commands ? LOG : DEBUG1,
840 : is_logical
841 : ? errmsg("released logical replication slot \"%s\"",
842 : slotname)
843 : : errmsg("released physical replication slot \"%s\"",
844 : slotname));
845 :
846 2198 : pfree(slotname);
847 : }
848 3162 : }
849 :
850 : /*
851 : * Cleanup temporary slots created in current session.
852 : *
853 : * Cleanup only synced temporary slots if 'synced_only' is true, else
854 : * cleanup all temporary slots.
855 : *
856 : * If it drops the last logical slot in the cluster, requests to disable
857 : * logical decoding.
858 : */
859 : void
860 90632 : ReplicationSlotCleanup(bool synced_only)
861 : {
862 : int i;
863 : bool found_valid_logicalslot;
864 90632 : bool dropped_logical = false;
865 :
866 : Assert(MyReplicationSlot == NULL);
867 :
868 90932 : restart:
869 90932 : found_valid_logicalslot = false;
870 90932 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
871 985334 : for (i = 0; i < max_replication_slots; i++)
872 : {
873 894702 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
874 :
875 894702 : if (!s->in_use)
876 865088 : continue;
877 :
878 29614 : SpinLockAcquire(&s->mutex);
879 :
880 59228 : found_valid_logicalslot |=
881 29614 : (SlotIsLogical(s) && s->data.invalidated == RS_INVAL_NONE);
882 :
883 29614 : if ((s->active_proc == MyProcNumber &&
884 300 : (!synced_only || s->data.synced)))
885 : {
886 : Assert(s->data.persistency == RS_TEMPORARY);
887 300 : SpinLockRelease(&s->mutex);
888 300 : LWLockRelease(ReplicationSlotControlLock); /* avoid deadlock */
889 :
890 300 : if (SlotIsLogical(s))
891 18 : dropped_logical = true;
892 :
893 300 : ReplicationSlotDropPtr(s);
894 :
895 300 : ConditionVariableBroadcast(&s->active_cv);
896 300 : goto restart;
897 : }
898 : else
899 29314 : SpinLockRelease(&s->mutex);
900 : }
901 :
902 90632 : LWLockRelease(ReplicationSlotControlLock);
903 :
904 90632 : if (dropped_logical && !found_valid_logicalslot)
905 4 : RequestDisableLogicalDecoding();
906 90632 : }
907 :
908 : /*
909 : * Permanently drop replication slot identified by the passed in name.
910 : */
911 : void
912 850 : ReplicationSlotDrop(const char *name, bool nowait)
913 : {
914 : bool is_logical;
915 :
916 : Assert(MyReplicationSlot == NULL);
917 :
918 850 : ReplicationSlotAcquire(name, nowait, false);
919 :
920 : /*
921 : * Do not allow users to drop the slots which are currently being synced
922 : * from the primary to the standby.
923 : */
924 836 : if (RecoveryInProgress() && MyReplicationSlot->data.synced)
925 2 : ereport(ERROR,
926 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
927 : errmsg("cannot drop replication slot \"%s\"", name),
928 : errdetail("This replication slot is being synchronized from the primary server."));
929 :
930 834 : is_logical = SlotIsLogical(MyReplicationSlot);
931 :
932 834 : ReplicationSlotDropAcquired();
933 :
934 834 : if (is_logical)
935 794 : RequestDisableLogicalDecoding();
936 834 : }
937 :
938 : /*
939 : * Change the definition of the slot identified by the specified name.
940 : *
941 : * Altering the two_phase property of a slot requires caution on the
942 : * client-side. Enabling it at any random point during decoding has the
943 : * risk that transactions prepared before this change may be skipped by
944 : * the decoder, leading to missing prepare records on the client. So, we
945 : * enable it for subscription related slots only once the initial tablesync
946 : * is finished. See comments atop worker.c. Disabling it is safe only when
947 : * there are no pending prepared transaction, otherwise, the changes of
948 : * already prepared transactions can be replicated again along with their
949 : * corresponding commit leading to duplicate data or errors.
950 : */
951 : void
952 14 : ReplicationSlotAlter(const char *name, const bool *failover,
953 : const bool *two_phase)
954 : {
955 14 : bool update_slot = false;
956 :
957 : Assert(MyReplicationSlot == NULL);
958 : Assert(failover || two_phase);
959 :
960 14 : ReplicationSlotAcquire(name, false, true);
961 :
962 12 : if (SlotIsPhysical(MyReplicationSlot))
963 0 : ereport(ERROR,
964 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
965 : errmsg("cannot use %s with a physical replication slot",
966 : "ALTER_REPLICATION_SLOT"));
967 :
968 12 : if (RecoveryInProgress())
969 : {
970 : /*
971 : * Do not allow users to alter the slots which are currently being
972 : * synced from the primary to the standby.
973 : */
974 2 : if (MyReplicationSlot->data.synced)
975 2 : ereport(ERROR,
976 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
977 : errmsg("cannot alter replication slot \"%s\"", name),
978 : errdetail("This replication slot is being synchronized from the primary server."));
979 :
980 : /*
981 : * Do not allow users to enable failover on the standby as we do not
982 : * support sync to the cascading standby.
983 : */
984 0 : if (failover && *failover)
985 0 : ereport(ERROR,
986 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
987 : errmsg("cannot enable failover for a replication slot"
988 : " on the standby"));
989 : }
990 :
991 10 : if (failover)
992 : {
993 : /*
994 : * Do not allow users to enable failover for temporary slots as we do
995 : * not support syncing temporary slots to the standby.
996 : */
997 8 : if (*failover && MyReplicationSlot->data.persistency == RS_TEMPORARY)
998 0 : ereport(ERROR,
999 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1000 : errmsg("cannot enable failover for a temporary replication slot"));
1001 :
1002 8 : if (MyReplicationSlot->data.failover != *failover)
1003 : {
1004 8 : SpinLockAcquire(&MyReplicationSlot->mutex);
1005 8 : MyReplicationSlot->data.failover = *failover;
1006 8 : SpinLockRelease(&MyReplicationSlot->mutex);
1007 :
1008 8 : update_slot = true;
1009 : }
1010 : }
1011 :
1012 10 : if (two_phase && MyReplicationSlot->data.two_phase != *two_phase)
1013 : {
1014 2 : SpinLockAcquire(&MyReplicationSlot->mutex);
1015 2 : MyReplicationSlot->data.two_phase = *two_phase;
1016 2 : SpinLockRelease(&MyReplicationSlot->mutex);
1017 :
1018 2 : update_slot = true;
1019 : }
1020 :
1021 10 : if (update_slot)
1022 : {
1023 10 : ReplicationSlotMarkDirty();
1024 10 : ReplicationSlotSave();
1025 : }
1026 :
1027 10 : ReplicationSlotRelease();
1028 10 : }
1029 :
1030 : /*
1031 : * Permanently drop the currently acquired replication slot.
1032 : */
1033 : void
1034 862 : ReplicationSlotDropAcquired(void)
1035 : {
1036 862 : ReplicationSlot *slot = MyReplicationSlot;
1037 :
1038 : Assert(MyReplicationSlot != NULL);
1039 :
1040 : /* slot isn't acquired anymore */
1041 862 : MyReplicationSlot = NULL;
1042 :
1043 862 : ReplicationSlotDropPtr(slot);
1044 862 : }
1045 :
1046 : /*
1047 : * Permanently drop the replication slot which will be released by the point
1048 : * this function returns.
1049 : */
1050 : static void
1051 1162 : ReplicationSlotDropPtr(ReplicationSlot *slot)
1052 : {
1053 : char path[MAXPGPATH];
1054 : char tmppath[MAXPGPATH];
1055 :
1056 : /*
1057 : * If some other backend ran this code concurrently with us, we might try
1058 : * to delete a slot with a certain name while someone else was trying to
1059 : * create a slot with the same name.
1060 : */
1061 1162 : LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
1062 :
1063 : /* Generate pathnames. */
1064 1162 : sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(slot->data.name));
1065 1162 : sprintf(tmppath, "%s/%s.tmp", PG_REPLSLOT_DIR, NameStr(slot->data.name));
1066 :
1067 : /*
1068 : * Rename the slot directory on disk, so that we'll no longer recognize
1069 : * this as a valid slot. Note that if this fails, we've got to mark the
1070 : * slot inactive before bailing out. If we're dropping an ephemeral or a
1071 : * temporary slot, we better never fail hard as the caller won't expect
1072 : * the slot to survive and this might get called during error handling.
1073 : */
1074 1162 : if (rename(path, tmppath) == 0)
1075 : {
1076 : /*
1077 : * We need to fsync() the directory we just renamed and its parent to
1078 : * make sure that our changes are on disk in a crash-safe fashion. If
1079 : * fsync() fails, we can't be sure whether the changes are on disk or
1080 : * not. For now, we handle that by panicking;
1081 : * StartupReplicationSlots() will try to straighten it out after
1082 : * restart.
1083 : */
1084 1162 : START_CRIT_SECTION();
1085 1162 : fsync_fname(tmppath, true);
1086 1162 : fsync_fname(PG_REPLSLOT_DIR, true);
1087 1162 : END_CRIT_SECTION();
1088 : }
1089 : else
1090 : {
1091 0 : bool fail_softly = slot->data.persistency != RS_PERSISTENT;
1092 :
1093 0 : SpinLockAcquire(&slot->mutex);
1094 0 : slot->active_proc = INVALID_PROC_NUMBER;
1095 0 : SpinLockRelease(&slot->mutex);
1096 :
1097 : /* wake up anyone waiting on this slot */
1098 0 : ConditionVariableBroadcast(&slot->active_cv);
1099 :
1100 0 : ereport(fail_softly ? WARNING : ERROR,
1101 : (errcode_for_file_access(),
1102 : errmsg("could not rename file \"%s\" to \"%s\": %m",
1103 : path, tmppath)));
1104 : }
1105 :
1106 : /*
1107 : * The slot is definitely gone. Lock out concurrent scans of the array
1108 : * long enough to kill it. It's OK to clear the active PID here without
1109 : * grabbing the mutex because nobody else can be scanning the array here,
1110 : * and nobody can be attached to this slot and thus access it without
1111 : * scanning the array.
1112 : *
1113 : * Also wake up processes waiting for it.
1114 : */
1115 1162 : LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
1116 1162 : slot->active_proc = INVALID_PROC_NUMBER;
1117 1162 : slot->in_use = false;
1118 1162 : LWLockRelease(ReplicationSlotControlLock);
1119 1162 : ConditionVariableBroadcast(&slot->active_cv);
1120 :
1121 : /*
1122 : * Slot is dead and doesn't prevent resource removal anymore, recompute
1123 : * limits.
1124 : */
1125 1162 : ReplicationSlotsComputeRequiredXmin(false);
1126 1162 : ReplicationSlotsComputeRequiredLSN();
1127 :
1128 : /*
1129 : * If removing the directory fails, the worst thing that will happen is
1130 : * that the user won't be able to create a new slot with the same name
1131 : * until the next server restart. We warn about it, but that's all.
1132 : */
1133 1162 : if (!rmtree(tmppath, true))
1134 0 : ereport(WARNING,
1135 : (errmsg("could not remove directory \"%s\"", tmppath)));
1136 :
1137 : /*
1138 : * Drop the statistics entry for the replication slot. Do this while
1139 : * holding ReplicationSlotAllocationLock so that we don't drop a
1140 : * statistics entry for another slot with the same name just created in
1141 : * another session.
1142 : */
1143 1162 : if (SlotIsLogical(slot))
1144 838 : pgstat_drop_replslot(slot);
1145 :
1146 : /*
1147 : * We release this at the very end, so that nobody starts trying to create
1148 : * a slot while we're still cleaning up the detritus of the old one.
1149 : */
1150 1162 : LWLockRelease(ReplicationSlotAllocationLock);
1151 1162 : }
1152 :
1153 : /*
1154 : * Serialize the currently acquired slot's state from memory to disk, thereby
1155 : * guaranteeing the current state will survive a crash.
1156 : */
1157 : void
1158 2824 : ReplicationSlotSave(void)
1159 : {
1160 : char path[MAXPGPATH];
1161 :
1162 : Assert(MyReplicationSlot != NULL);
1163 :
1164 2824 : sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(MyReplicationSlot->data.name));
1165 2824 : SaveSlotToPath(MyReplicationSlot, path, ERROR);
1166 2824 : }
1167 :
1168 : /*
1169 : * Signal that it would be useful if the currently acquired slot would be
1170 : * flushed out to disk.
1171 : *
1172 : * Note that the actual flush to disk can be delayed for a long time, if
1173 : * required for correctness explicitly do a ReplicationSlotSave().
1174 : */
1175 : void
1176 60400 : ReplicationSlotMarkDirty(void)
1177 : {
1178 60400 : ReplicationSlot *slot = MyReplicationSlot;
1179 :
1180 : Assert(MyReplicationSlot != NULL);
1181 :
1182 60400 : SpinLockAcquire(&slot->mutex);
1183 60400 : MyReplicationSlot->just_dirtied = true;
1184 60400 : MyReplicationSlot->dirty = true;
1185 60400 : SpinLockRelease(&slot->mutex);
1186 60400 : }
1187 :
1188 : /*
1189 : * Convert a slot that's marked as RS_EPHEMERAL or RS_TEMPORARY to a
1190 : * RS_PERSISTENT slot, guaranteeing it will be there after an eventual crash.
1191 : */
1192 : void
1193 942 : ReplicationSlotPersist(void)
1194 : {
1195 942 : ReplicationSlot *slot = MyReplicationSlot;
1196 :
1197 : Assert(slot != NULL);
1198 : Assert(slot->data.persistency != RS_PERSISTENT);
1199 :
1200 942 : SpinLockAcquire(&slot->mutex);
1201 942 : slot->data.persistency = RS_PERSISTENT;
1202 942 : SpinLockRelease(&slot->mutex);
1203 :
1204 942 : ReplicationSlotMarkDirty();
1205 942 : ReplicationSlotSave();
1206 942 : }
1207 :
1208 : /*
1209 : * Compute the oldest xmin across all slots and store it in the ProcArray.
1210 : *
1211 : * If already_locked is true, both the ReplicationSlotControlLock and the
1212 : * ProcArrayLock have already been acquired exclusively. It is crucial that the
1213 : * caller first acquires the ReplicationSlotControlLock, followed by the
1214 : * ProcArrayLock, to prevent any undetectable deadlocks since this function
1215 : * acquires them in that order.
1216 : */
1217 : void
1218 4898 : ReplicationSlotsComputeRequiredXmin(bool already_locked)
1219 : {
1220 : int i;
1221 4898 : TransactionId agg_xmin = InvalidTransactionId;
1222 4898 : TransactionId agg_catalog_xmin = InvalidTransactionId;
1223 :
1224 : Assert(ReplicationSlotCtl != NULL);
1225 : Assert(!already_locked ||
1226 : (LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_EXCLUSIVE) &&
1227 : LWLockHeldByMeInMode(ProcArrayLock, LW_EXCLUSIVE)));
1228 :
1229 : /*
1230 : * Hold the ReplicationSlotControlLock until after updating the slot xmin
1231 : * values, so no backend updates the initial xmin for newly created slot
1232 : * concurrently. A shared lock is used here to minimize lock contention,
1233 : * especially when many slots exist and advancements occur frequently.
1234 : * This is safe since an exclusive lock is taken during initial slot xmin
1235 : * update in slot creation.
1236 : *
1237 : * One might think that we can hold the ProcArrayLock exclusively and
1238 : * update the slot xmin values, but it could increase lock contention on
1239 : * the ProcArrayLock, which is not great since this function can be called
1240 : * at non-negligible frequency.
1241 : *
1242 : * Concurrent invocation of this function may cause the computed slot xmin
1243 : * to regress. However, this is harmless because tuples prior to the most
1244 : * recent xmin are no longer useful once advancement occurs (see
1245 : * LogicalConfirmReceivedLocation where the slot's xmin value is flushed
1246 : * before updating the effective_xmin). Thus, such regression merely
1247 : * prevents VACUUM from prematurely removing tuples without causing the
1248 : * early deletion of required data.
1249 : */
1250 4898 : if (!already_locked)
1251 3924 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1252 :
1253 49770 : for (i = 0; i < max_replication_slots; i++)
1254 : {
1255 44872 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
1256 : TransactionId effective_xmin;
1257 : TransactionId effective_catalog_xmin;
1258 : bool invalidated;
1259 :
1260 44872 : if (!s->in_use)
1261 40170 : continue;
1262 :
1263 4702 : SpinLockAcquire(&s->mutex);
1264 4702 : effective_xmin = s->effective_xmin;
1265 4702 : effective_catalog_xmin = s->effective_catalog_xmin;
1266 4702 : invalidated = s->data.invalidated != RS_INVAL_NONE;
1267 4702 : SpinLockRelease(&s->mutex);
1268 :
1269 : /* invalidated slots need not apply */
1270 4702 : if (invalidated)
1271 52 : continue;
1272 :
1273 : /* check the data xmin */
1274 4650 : if (TransactionIdIsValid(effective_xmin) &&
1275 42 : (!TransactionIdIsValid(agg_xmin) ||
1276 42 : TransactionIdPrecedes(effective_xmin, agg_xmin)))
1277 708 : agg_xmin = effective_xmin;
1278 :
1279 : /* check the catalog xmin */
1280 4650 : if (TransactionIdIsValid(effective_catalog_xmin) &&
1281 1958 : (!TransactionIdIsValid(agg_catalog_xmin) ||
1282 1958 : TransactionIdPrecedes(effective_catalog_xmin, agg_catalog_xmin)))
1283 2432 : agg_catalog_xmin = effective_catalog_xmin;
1284 : }
1285 :
1286 4898 : ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
1287 :
1288 4898 : if (!already_locked)
1289 3924 : LWLockRelease(ReplicationSlotControlLock);
1290 4898 : }
1291 :
1292 : /*
1293 : * Compute the oldest restart LSN across all slots and inform xlog module.
1294 : *
1295 : * Note: while max_slot_wal_keep_size is theoretically relevant for this
1296 : * purpose, we don't try to account for that, because this module doesn't
1297 : * know what to compare against.
1298 : */
1299 : void
1300 61784 : ReplicationSlotsComputeRequiredLSN(void)
1301 : {
1302 : int i;
1303 61784 : XLogRecPtr min_required = InvalidXLogRecPtr;
1304 :
1305 : Assert(ReplicationSlotCtl != NULL);
1306 :
1307 61784 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1308 670966 : for (i = 0; i < max_replication_slots; i++)
1309 : {
1310 609182 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
1311 : XLogRecPtr restart_lsn;
1312 : XLogRecPtr last_saved_restart_lsn;
1313 : bool invalidated;
1314 : ReplicationSlotPersistency persistency;
1315 :
1316 609182 : if (!s->in_use)
1317 546968 : continue;
1318 :
1319 62214 : SpinLockAcquire(&s->mutex);
1320 62214 : persistency = s->data.persistency;
1321 62214 : restart_lsn = s->data.restart_lsn;
1322 62214 : invalidated = s->data.invalidated != RS_INVAL_NONE;
1323 62214 : last_saved_restart_lsn = s->last_saved_restart_lsn;
1324 62214 : SpinLockRelease(&s->mutex);
1325 :
1326 : /* invalidated slots need not apply */
1327 62214 : if (invalidated)
1328 54 : continue;
1329 :
1330 : /*
1331 : * For persistent slot use last_saved_restart_lsn to compute the
1332 : * oldest LSN for removal of WAL segments. The segments between
1333 : * last_saved_restart_lsn and restart_lsn might be needed by a
1334 : * persistent slot in the case of database crash. Non-persistent
1335 : * slots can't survive the database crash, so we don't care about
1336 : * last_saved_restart_lsn for them.
1337 : */
1338 62160 : if (persistency == RS_PERSISTENT)
1339 : {
1340 60454 : if (XLogRecPtrIsValid(last_saved_restart_lsn) &&
1341 : restart_lsn > last_saved_restart_lsn)
1342 : {
1343 55654 : restart_lsn = last_saved_restart_lsn;
1344 : }
1345 : }
1346 :
1347 62160 : if (XLogRecPtrIsValid(restart_lsn) &&
1348 2488 : (!XLogRecPtrIsValid(min_required) ||
1349 : restart_lsn < min_required))
1350 59856 : min_required = restart_lsn;
1351 : }
1352 61784 : LWLockRelease(ReplicationSlotControlLock);
1353 :
1354 61784 : XLogSetReplicationSlotMinimumLSN(min_required);
1355 61784 : }
1356 :
1357 : /*
1358 : * Compute the oldest WAL LSN required by *logical* decoding slots..
1359 : *
1360 : * Returns InvalidXLogRecPtr if logical decoding is disabled or no logical
1361 : * slots exist.
1362 : *
1363 : * NB: this returns a value >= ReplicationSlotsComputeRequiredLSN(), since it
1364 : * ignores physical replication slots.
1365 : *
1366 : * The results aren't required frequently, so we don't maintain a precomputed
1367 : * value like we do for ComputeRequiredLSN() and ComputeRequiredXmin().
1368 : */
1369 : XLogRecPtr
1370 7128 : ReplicationSlotsComputeLogicalRestartLSN(void)
1371 : {
1372 7128 : XLogRecPtr result = InvalidXLogRecPtr;
1373 : int i;
1374 :
1375 7128 : if (max_replication_slots <= 0)
1376 4 : return InvalidXLogRecPtr;
1377 :
1378 7124 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1379 :
1380 77304 : for (i = 0; i < max_replication_slots; i++)
1381 : {
1382 : ReplicationSlot *s;
1383 : XLogRecPtr restart_lsn;
1384 : XLogRecPtr last_saved_restart_lsn;
1385 : bool invalidated;
1386 : ReplicationSlotPersistency persistency;
1387 :
1388 70180 : s = &ReplicationSlotCtl->replication_slots[i];
1389 :
1390 : /* cannot change while ReplicationSlotCtlLock is held */
1391 70180 : if (!s->in_use)
1392 68600 : continue;
1393 :
1394 : /* we're only interested in logical slots */
1395 1580 : if (!SlotIsLogical(s))
1396 1088 : continue;
1397 :
1398 : /* read once, it's ok if it increases while we're checking */
1399 492 : SpinLockAcquire(&s->mutex);
1400 492 : persistency = s->data.persistency;
1401 492 : restart_lsn = s->data.restart_lsn;
1402 492 : invalidated = s->data.invalidated != RS_INVAL_NONE;
1403 492 : last_saved_restart_lsn = s->last_saved_restart_lsn;
1404 492 : SpinLockRelease(&s->mutex);
1405 :
1406 : /* invalidated slots need not apply */
1407 492 : if (invalidated)
1408 20 : continue;
1409 :
1410 : /*
1411 : * For persistent slot use last_saved_restart_lsn to compute the
1412 : * oldest LSN for removal of WAL segments. The segments between
1413 : * last_saved_restart_lsn and restart_lsn might be needed by a
1414 : * persistent slot in the case of database crash. Non-persistent
1415 : * slots can't survive the database crash, so we don't care about
1416 : * last_saved_restart_lsn for them.
1417 : */
1418 472 : if (persistency == RS_PERSISTENT)
1419 : {
1420 468 : if (XLogRecPtrIsValid(last_saved_restart_lsn) &&
1421 : restart_lsn > last_saved_restart_lsn)
1422 : {
1423 0 : restart_lsn = last_saved_restart_lsn;
1424 : }
1425 : }
1426 :
1427 472 : if (!XLogRecPtrIsValid(restart_lsn))
1428 0 : continue;
1429 :
1430 472 : if (!XLogRecPtrIsValid(result) ||
1431 : restart_lsn < result)
1432 372 : result = restart_lsn;
1433 : }
1434 :
1435 7124 : LWLockRelease(ReplicationSlotControlLock);
1436 :
1437 7124 : return result;
1438 : }
1439 :
1440 : /*
1441 : * ReplicationSlotsCountDBSlots -- count the number of slots that refer to the
1442 : * passed database oid.
1443 : *
1444 : * Returns true if there are any slots referencing the database. *nslots will
1445 : * be set to the absolute number of slots in the database, *nactive to ones
1446 : * currently active.
1447 : */
1448 : bool
1449 98 : ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
1450 : {
1451 : int i;
1452 :
1453 98 : *nslots = *nactive = 0;
1454 :
1455 98 : if (max_replication_slots <= 0)
1456 0 : return false;
1457 :
1458 98 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1459 1024 : for (i = 0; i < max_replication_slots; i++)
1460 : {
1461 : ReplicationSlot *s;
1462 :
1463 926 : s = &ReplicationSlotCtl->replication_slots[i];
1464 :
1465 : /* cannot change while ReplicationSlotCtlLock is held */
1466 926 : if (!s->in_use)
1467 884 : continue;
1468 :
1469 : /* only logical slots are database specific, skip */
1470 42 : if (!SlotIsLogical(s))
1471 20 : continue;
1472 :
1473 : /* not our database, skip */
1474 22 : if (s->data.database != dboid)
1475 16 : continue;
1476 :
1477 : /* NB: intentionally counting invalidated slots */
1478 :
1479 : /* count slots with spinlock held */
1480 6 : SpinLockAcquire(&s->mutex);
1481 6 : (*nslots)++;
1482 6 : if (s->active_proc != INVALID_PROC_NUMBER)
1483 2 : (*nactive)++;
1484 6 : SpinLockRelease(&s->mutex);
1485 : }
1486 98 : LWLockRelease(ReplicationSlotControlLock);
1487 :
1488 98 : if (*nslots > 0)
1489 6 : return true;
1490 92 : return false;
1491 : }
1492 :
1493 : /*
1494 : * ReplicationSlotsDropDBSlots -- Drop all db-specific slots relating to the
1495 : * passed database oid. The caller should hold an exclusive lock on the
1496 : * pg_database oid for the database to prevent creation of new slots on the db
1497 : * or replay from existing slots.
1498 : *
1499 : * Another session that concurrently acquires an existing slot on the target DB
1500 : * (most likely to drop it) may cause this function to ERROR. If that happens
1501 : * it may have dropped some but not all slots.
1502 : *
1503 : * This routine isn't as efficient as it could be - but we don't drop
1504 : * databases often, especially databases with lots of slots.
1505 : *
1506 : * If it drops the last logical slot in the cluster, it requests to disable
1507 : * logical decoding.
1508 : */
1509 : void
1510 124 : ReplicationSlotsDropDBSlots(Oid dboid)
1511 : {
1512 : int i;
1513 : bool found_valid_logicalslot;
1514 124 : bool dropped = false;
1515 :
1516 124 : if (max_replication_slots <= 0)
1517 0 : return;
1518 :
1519 124 : restart:
1520 134 : found_valid_logicalslot = false;
1521 134 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1522 1304 : for (i = 0; i < max_replication_slots; i++)
1523 : {
1524 : ReplicationSlot *s;
1525 : char *slotname;
1526 : ProcNumber active_proc;
1527 :
1528 1180 : s = &ReplicationSlotCtl->replication_slots[i];
1529 :
1530 : /* cannot change while ReplicationSlotCtlLock is held */
1531 1180 : if (!s->in_use)
1532 1120 : continue;
1533 :
1534 : /* only logical slots are database specific, skip */
1535 60 : if (!SlotIsLogical(s))
1536 22 : continue;
1537 :
1538 : /*
1539 : * Check logical slots on other databases too so we can disable
1540 : * logical decoding only if no slots in the cluster.
1541 : */
1542 38 : SpinLockAcquire(&s->mutex);
1543 38 : found_valid_logicalslot |= (s->data.invalidated == RS_INVAL_NONE);
1544 38 : SpinLockRelease(&s->mutex);
1545 :
1546 : /* not our database, skip */
1547 38 : if (s->data.database != dboid)
1548 28 : continue;
1549 :
1550 : /* NB: intentionally including invalidated slots to drop */
1551 :
1552 : /* acquire slot, so ReplicationSlotDropAcquired can be reused */
1553 10 : SpinLockAcquire(&s->mutex);
1554 : /* can't change while ReplicationSlotControlLock is held */
1555 10 : slotname = NameStr(s->data.name);
1556 10 : active_proc = s->active_proc;
1557 10 : if (active_proc == INVALID_PROC_NUMBER)
1558 : {
1559 10 : MyReplicationSlot = s;
1560 10 : s->active_proc = MyProcNumber;
1561 : }
1562 10 : SpinLockRelease(&s->mutex);
1563 :
1564 : /*
1565 : * Even though we hold an exclusive lock on the database object a
1566 : * logical slot for that DB can still be active, e.g. if it's
1567 : * concurrently being dropped by a backend connected to another DB.
1568 : *
1569 : * That's fairly unlikely in practice, so we'll just bail out.
1570 : *
1571 : * The slot sync worker holds a shared lock on the database before
1572 : * operating on synced logical slots to avoid conflict with the drop
1573 : * happening here. The persistent synced slots are thus safe but there
1574 : * is a possibility that the slot sync worker has created a temporary
1575 : * slot (which stays active even on release) and we are trying to drop
1576 : * that here. In practice, the chances of hitting this scenario are
1577 : * less as during slot synchronization, the temporary slot is
1578 : * immediately converted to persistent and thus is safe due to the
1579 : * shared lock taken on the database. So, we'll just bail out in such
1580 : * a case.
1581 : *
1582 : * XXX: We can consider shutting down the slot sync worker before
1583 : * trying to drop synced temporary slots here.
1584 : */
1585 10 : if (active_proc != INVALID_PROC_NUMBER)
1586 0 : ereport(ERROR,
1587 : (errcode(ERRCODE_OBJECT_IN_USE),
1588 : errmsg("replication slot \"%s\" is active for PID %d",
1589 : slotname, GetPGProcByNumber(active_proc)->pid)));
1590 :
1591 : /*
1592 : * To avoid duplicating ReplicationSlotDropAcquired() and to avoid
1593 : * holding ReplicationSlotControlLock over filesystem operations,
1594 : * release ReplicationSlotControlLock and use
1595 : * ReplicationSlotDropAcquired.
1596 : *
1597 : * As that means the set of slots could change, restart scan from the
1598 : * beginning each time we release the lock.
1599 : */
1600 10 : LWLockRelease(ReplicationSlotControlLock);
1601 10 : ReplicationSlotDropAcquired();
1602 10 : dropped = true;
1603 10 : goto restart;
1604 : }
1605 124 : LWLockRelease(ReplicationSlotControlLock);
1606 :
1607 124 : if (dropped && !found_valid_logicalslot)
1608 0 : RequestDisableLogicalDecoding();
1609 : }
1610 :
1611 : /*
1612 : * Returns true if there is at least one in-use valid logical replication slot.
1613 : */
1614 : bool
1615 936 : CheckLogicalSlotExists(void)
1616 : {
1617 936 : bool found = false;
1618 :
1619 936 : if (max_replication_slots <= 0)
1620 2 : return false;
1621 :
1622 934 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1623 10106 : for (int i = 0; i < max_replication_slots; i++)
1624 : {
1625 : ReplicationSlot *s;
1626 : bool invalidated;
1627 :
1628 9184 : s = &ReplicationSlotCtl->replication_slots[i];
1629 :
1630 : /* cannot change while ReplicationSlotCtlLock is held */
1631 9184 : if (!s->in_use)
1632 9128 : continue;
1633 :
1634 56 : if (SlotIsPhysical(s))
1635 38 : continue;
1636 :
1637 18 : SpinLockAcquire(&s->mutex);
1638 18 : invalidated = s->data.invalidated != RS_INVAL_NONE;
1639 18 : SpinLockRelease(&s->mutex);
1640 :
1641 18 : if (invalidated)
1642 6 : continue;
1643 :
1644 12 : found = true;
1645 12 : break;
1646 : }
1647 934 : LWLockRelease(ReplicationSlotControlLock);
1648 :
1649 934 : return found;
1650 : }
1651 :
1652 : /*
1653 : * Check whether the server's configuration supports using replication
1654 : * slots.
1655 : */
1656 : void
1657 3594 : CheckSlotRequirements(void)
1658 : {
1659 : /*
1660 : * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
1661 : * needs the same check.
1662 : */
1663 :
1664 3594 : if (max_replication_slots == 0)
1665 0 : ereport(ERROR,
1666 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1667 : errmsg("replication slots can only be used if \"max_replication_slots\" > 0")));
1668 :
1669 3594 : if (wal_level < WAL_LEVEL_REPLICA)
1670 0 : ereport(ERROR,
1671 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1672 : errmsg("replication slots can only be used if \"wal_level\" >= \"replica\"")));
1673 3594 : }
1674 :
1675 : /*
1676 : * Check whether the user has privilege to use replication slots.
1677 : */
1678 : void
1679 1178 : CheckSlotPermissions(void)
1680 : {
1681 1178 : if (!has_rolreplication(GetUserId()))
1682 10 : ereport(ERROR,
1683 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1684 : errmsg("permission denied to use replication slots"),
1685 : errdetail("Only roles with the %s attribute may use replication slots.",
1686 : "REPLICATION")));
1687 1168 : }
1688 :
1689 : /*
1690 : * Reserve WAL for the currently active slot.
1691 : *
1692 : * Compute and set restart_lsn in a manner that's appropriate for the type of
1693 : * the slot and concurrency safe.
1694 : */
1695 : void
1696 1258 : ReplicationSlotReserveWal(void)
1697 : {
1698 1258 : ReplicationSlot *slot = MyReplicationSlot;
1699 : XLogSegNo segno;
1700 : XLogRecPtr restart_lsn;
1701 :
1702 : Assert(slot != NULL);
1703 : Assert(!XLogRecPtrIsValid(slot->data.restart_lsn));
1704 : Assert(!XLogRecPtrIsValid(slot->last_saved_restart_lsn));
1705 :
1706 : /*
1707 : * The replication slot mechanism is used to prevent the removal of
1708 : * required WAL.
1709 : *
1710 : * Acquire an exclusive lock to prevent the checkpoint process from
1711 : * concurrently computing the minimum slot LSN (see
1712 : * CheckPointReplicationSlots). This ensures that the WAL reserved for
1713 : * replication cannot be removed during a checkpoint.
1714 : *
1715 : * The mechanism is reliable because if WAL reservation occurs first, the
1716 : * checkpoint must wait for the restart_lsn update before determining the
1717 : * minimum non-removable LSN. On the other hand, if the checkpoint happens
1718 : * first, subsequent WAL reservations will select positions at or beyond
1719 : * the redo pointer of that checkpoint.
1720 : */
1721 1258 : LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
1722 :
1723 : /*
1724 : * For logical slots log a standby snapshot and start logical decoding at
1725 : * exactly that position. That allows the slot to start up more quickly.
1726 : * But on a standby we cannot do WAL writes, so just use the replay
1727 : * pointer; effectively, an attempt to create a logical slot on standby
1728 : * will cause it to wait for an xl_running_xact record to be logged
1729 : * independently on the primary, so that a snapshot can be built using the
1730 : * record.
1731 : *
1732 : * None of this is needed (or indeed helpful) for physical slots as
1733 : * they'll start replay at the last logged checkpoint anyway. Instead,
1734 : * return the location of the last redo LSN, where a base backup has to
1735 : * start replay at.
1736 : */
1737 1258 : if (SlotIsPhysical(slot))
1738 318 : restart_lsn = GetRedoRecPtr();
1739 940 : else if (RecoveryInProgress())
1740 52 : restart_lsn = GetXLogReplayRecPtr(NULL);
1741 : else
1742 888 : restart_lsn = GetXLogInsertRecPtr();
1743 :
1744 1258 : SpinLockAcquire(&slot->mutex);
1745 1258 : slot->data.restart_lsn = restart_lsn;
1746 1258 : SpinLockRelease(&slot->mutex);
1747 :
1748 : /* prevent WAL removal as fast as possible */
1749 1258 : ReplicationSlotsComputeRequiredLSN();
1750 :
1751 : /* Checkpoint shouldn't remove the required WAL. */
1752 1258 : XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size);
1753 1258 : if (XLogGetLastRemovedSegno() >= segno)
1754 0 : elog(ERROR, "WAL required by replication slot %s has been removed concurrently",
1755 : NameStr(slot->data.name));
1756 :
1757 1258 : LWLockRelease(ReplicationSlotAllocationLock);
1758 :
1759 1258 : if (!RecoveryInProgress() && SlotIsLogical(slot))
1760 : {
1761 : XLogRecPtr flushptr;
1762 :
1763 : /* make sure we have enough information to start */
1764 888 : flushptr = LogStandbySnapshot();
1765 :
1766 : /* and make sure it's fsynced to disk */
1767 888 : XLogFlush(flushptr);
1768 : }
1769 1258 : }
1770 :
1771 : /*
1772 : * Report that replication slot needs to be invalidated
1773 : */
1774 : static void
1775 46 : ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
1776 : bool terminating,
1777 : int pid,
1778 : NameData slotname,
1779 : XLogRecPtr restart_lsn,
1780 : XLogRecPtr oldestLSN,
1781 : TransactionId snapshotConflictHorizon,
1782 : long slot_idle_seconds)
1783 : {
1784 : StringInfoData err_detail;
1785 : StringInfoData err_hint;
1786 :
1787 46 : initStringInfo(&err_detail);
1788 46 : initStringInfo(&err_hint);
1789 :
1790 46 : switch (cause)
1791 : {
1792 14 : case RS_INVAL_WAL_REMOVED:
1793 : {
1794 14 : uint64 ex = oldestLSN - restart_lsn;
1795 :
1796 14 : appendStringInfo(&err_detail,
1797 14 : ngettext("The slot's restart_lsn %X/%08X exceeds the limit by %" PRIu64 " byte.",
1798 : "The slot's restart_lsn %X/%08X exceeds the limit by %" PRIu64 " bytes.",
1799 : ex),
1800 14 : LSN_FORMAT_ARGS(restart_lsn),
1801 : ex);
1802 : /* translator: %s is a GUC variable name */
1803 14 : appendStringInfo(&err_hint, _("You might need to increase \"%s\"."),
1804 : "max_slot_wal_keep_size");
1805 14 : break;
1806 : }
1807 24 : case RS_INVAL_HORIZON:
1808 24 : appendStringInfo(&err_detail, _("The slot conflicted with xid horizon %u."),
1809 : snapshotConflictHorizon);
1810 24 : break;
1811 :
1812 8 : case RS_INVAL_WAL_LEVEL:
1813 8 : appendStringInfoString(&err_detail, _("Logical decoding on standby requires the primary server to either set \"wal_level\" >= \"logical\" or have at least one logical slot when \"wal_level\" = \"replica\"."));
1814 8 : break;
1815 :
1816 0 : case RS_INVAL_IDLE_TIMEOUT:
1817 : {
1818 : /* translator: %s is a GUC variable name */
1819 0 : appendStringInfo(&err_detail, _("The slot's idle time of %lds exceeds the configured \"%s\" duration of %ds."),
1820 : slot_idle_seconds, "idle_replication_slot_timeout",
1821 : idle_replication_slot_timeout_secs);
1822 : /* translator: %s is a GUC variable name */
1823 0 : appendStringInfo(&err_hint, _("You might need to increase \"%s\"."),
1824 : "idle_replication_slot_timeout");
1825 0 : break;
1826 : }
1827 : case RS_INVAL_NONE:
1828 : pg_unreachable();
1829 : }
1830 :
1831 46 : ereport(LOG,
1832 : terminating ?
1833 : errmsg("terminating process %d to release replication slot \"%s\"",
1834 : pid, NameStr(slotname)) :
1835 : errmsg("invalidating obsolete replication slot \"%s\"",
1836 : NameStr(slotname)),
1837 : errdetail_internal("%s", err_detail.data),
1838 : err_hint.len ? errhint("%s", err_hint.data) : 0);
1839 :
1840 46 : pfree(err_detail.data);
1841 46 : pfree(err_hint.data);
1842 46 : }
1843 :
1844 : /*
1845 : * Can we invalidate an idle replication slot?
1846 : *
1847 : * Idle timeout invalidation is allowed only when:
1848 : *
1849 : * 1. Idle timeout is set
1850 : * 2. Slot has reserved WAL
1851 : * 3. Slot is inactive
1852 : * 4. The slot is not being synced from the primary while the server is in
1853 : * recovery. This is because synced slots are always considered to be
1854 : * inactive because they don't perform logical decoding to produce changes.
1855 : */
1856 : static inline bool
1857 744 : CanInvalidateIdleSlot(ReplicationSlot *s)
1858 : {
1859 744 : return (idle_replication_slot_timeout_secs != 0 &&
1860 0 : XLogRecPtrIsValid(s->data.restart_lsn) &&
1861 744 : s->inactive_since > 0 &&
1862 0 : !(RecoveryInProgress() && s->data.synced));
1863 : }
1864 :
1865 : /*
1866 : * DetermineSlotInvalidationCause - Determine the cause for which a slot
1867 : * becomes invalid among the given possible causes.
1868 : *
1869 : * This function sequentially checks all possible invalidation causes and
1870 : * returns the first one for which the slot is eligible for invalidation.
1871 : */
1872 : static ReplicationSlotInvalidationCause
1873 818 : DetermineSlotInvalidationCause(uint32 possible_causes, ReplicationSlot *s,
1874 : XLogRecPtr oldestLSN, Oid dboid,
1875 : TransactionId snapshotConflictHorizon,
1876 : TimestampTz *inactive_since, TimestampTz now)
1877 : {
1878 : Assert(possible_causes != RS_INVAL_NONE);
1879 :
1880 818 : if (possible_causes & RS_INVAL_WAL_REMOVED)
1881 : {
1882 758 : XLogRecPtr restart_lsn = s->data.restart_lsn;
1883 :
1884 758 : if (XLogRecPtrIsValid(restart_lsn) &&
1885 : restart_lsn < oldestLSN)
1886 14 : return RS_INVAL_WAL_REMOVED;
1887 : }
1888 :
1889 804 : if (possible_causes & RS_INVAL_HORIZON)
1890 : {
1891 : /* invalid DB oid signals a shared relation */
1892 52 : if (SlotIsLogical(s) &&
1893 42 : (dboid == InvalidOid || dboid == s->data.database))
1894 : {
1895 52 : TransactionId effective_xmin = s->effective_xmin;
1896 52 : TransactionId catalog_effective_xmin = s->effective_catalog_xmin;
1897 :
1898 52 : if (TransactionIdIsValid(effective_xmin) &&
1899 0 : TransactionIdPrecedesOrEquals(effective_xmin,
1900 : snapshotConflictHorizon))
1901 0 : return RS_INVAL_HORIZON;
1902 104 : else if (TransactionIdIsValid(catalog_effective_xmin) &&
1903 52 : TransactionIdPrecedesOrEquals(catalog_effective_xmin,
1904 : snapshotConflictHorizon))
1905 24 : return RS_INVAL_HORIZON;
1906 : }
1907 : }
1908 :
1909 780 : if (possible_causes & RS_INVAL_WAL_LEVEL)
1910 : {
1911 8 : if (SlotIsLogical(s))
1912 8 : return RS_INVAL_WAL_LEVEL;
1913 : }
1914 :
1915 772 : if (possible_causes & RS_INVAL_IDLE_TIMEOUT)
1916 : {
1917 : Assert(now > 0);
1918 :
1919 744 : if (CanInvalidateIdleSlot(s))
1920 : {
1921 : /*
1922 : * Simulate the invalidation due to idle_timeout to test the
1923 : * timeout behavior promptly, without waiting for it to trigger
1924 : * naturally.
1925 : */
1926 : #ifdef USE_INJECTION_POINTS
1927 0 : if (IS_INJECTION_POINT_ATTACHED("slot-timeout-inval"))
1928 : {
1929 0 : *inactive_since = 0; /* since the beginning of time */
1930 0 : return RS_INVAL_IDLE_TIMEOUT;
1931 : }
1932 : #endif
1933 :
1934 : /*
1935 : * Check if the slot needs to be invalidated due to
1936 : * idle_replication_slot_timeout GUC.
1937 : */
1938 0 : if (TimestampDifferenceExceedsSeconds(s->inactive_since, now,
1939 : idle_replication_slot_timeout_secs))
1940 : {
1941 0 : *inactive_since = s->inactive_since;
1942 0 : return RS_INVAL_IDLE_TIMEOUT;
1943 : }
1944 : }
1945 : }
1946 :
1947 772 : return RS_INVAL_NONE;
1948 : }
1949 :
1950 : /*
1951 : * Helper for InvalidateObsoleteReplicationSlots
1952 : *
1953 : * Acquires the given slot and mark it invalid, if necessary and possible.
1954 : *
1955 : * Returns true if the slot was invalidated.
1956 : *
1957 : * Set *released_lock_out if ReplicationSlotControlLock was released in the
1958 : * interim (and in that case we're not holding the lock at return, otherwise
1959 : * we are).
1960 : *
1961 : * This is inherently racy, because we release the LWLock
1962 : * for syscalls, so caller must restart if we return true.
1963 : */
1964 : static bool
1965 888 : InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
1966 : ReplicationSlot *s,
1967 : XLogRecPtr oldestLSN,
1968 : Oid dboid, TransactionId snapshotConflictHorizon,
1969 : bool *released_lock_out)
1970 : {
1971 888 : int last_signaled_pid = 0;
1972 888 : bool released_lock = false;
1973 888 : bool invalidated = false;
1974 888 : TimestampTz inactive_since = 0;
1975 :
1976 : for (;;)
1977 14 : {
1978 : XLogRecPtr restart_lsn;
1979 : NameData slotname;
1980 : ProcNumber active_proc;
1981 902 : int active_pid = 0;
1982 902 : ReplicationSlotInvalidationCause invalidation_cause = RS_INVAL_NONE;
1983 902 : TimestampTz now = 0;
1984 902 : long slot_idle_secs = 0;
1985 :
1986 : Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
1987 :
1988 902 : if (!s->in_use)
1989 : {
1990 0 : if (released_lock)
1991 0 : LWLockRelease(ReplicationSlotControlLock);
1992 0 : break;
1993 : }
1994 :
1995 902 : if (possible_causes & RS_INVAL_IDLE_TIMEOUT)
1996 : {
1997 : /*
1998 : * Assign the current time here to avoid system call overhead
1999 : * while holding the spinlock in subsequent code.
2000 : */
2001 786 : now = GetCurrentTimestamp();
2002 : }
2003 :
2004 : /*
2005 : * Check if the slot needs to be invalidated. If it needs to be
2006 : * invalidated, and is not currently acquired, acquire it and mark it
2007 : * as having been invalidated. We do this with the spinlock held to
2008 : * avoid race conditions -- for example the restart_lsn could move
2009 : * forward, or the slot could be dropped.
2010 : */
2011 902 : SpinLockAcquire(&s->mutex);
2012 :
2013 902 : restart_lsn = s->data.restart_lsn;
2014 :
2015 : /* we do nothing if the slot is already invalid */
2016 902 : if (s->data.invalidated == RS_INVAL_NONE)
2017 818 : invalidation_cause = DetermineSlotInvalidationCause(possible_causes,
2018 : s, oldestLSN,
2019 : dboid,
2020 : snapshotConflictHorizon,
2021 : &inactive_since,
2022 : now);
2023 :
2024 : /* if there's no invalidation, we're done */
2025 902 : if (invalidation_cause == RS_INVAL_NONE)
2026 : {
2027 856 : SpinLockRelease(&s->mutex);
2028 856 : if (released_lock)
2029 0 : LWLockRelease(ReplicationSlotControlLock);
2030 856 : break;
2031 : }
2032 :
2033 46 : slotname = s->data.name;
2034 46 : active_proc = s->active_proc;
2035 :
2036 : /*
2037 : * If the slot can be acquired, do so and mark it invalidated
2038 : * immediately. Otherwise we'll signal the owning process, below, and
2039 : * retry.
2040 : *
2041 : * Note: Unlike other slot attributes, slot's inactive_since can't be
2042 : * changed until the acquired slot is released or the owning process
2043 : * is terminated. So, the inactive slot can only be invalidated
2044 : * immediately without being terminated.
2045 : */
2046 46 : if (active_proc == INVALID_PROC_NUMBER)
2047 : {
2048 32 : MyReplicationSlot = s;
2049 32 : s->active_proc = MyProcNumber;
2050 32 : s->data.invalidated = invalidation_cause;
2051 :
2052 : /*
2053 : * XXX: We should consider not overwriting restart_lsn and instead
2054 : * just rely on .invalidated.
2055 : */
2056 32 : if (invalidation_cause == RS_INVAL_WAL_REMOVED)
2057 : {
2058 10 : s->data.restart_lsn = InvalidXLogRecPtr;
2059 10 : s->last_saved_restart_lsn = InvalidXLogRecPtr;
2060 : }
2061 :
2062 : /* Let caller know */
2063 32 : invalidated = true;
2064 : }
2065 : else
2066 : {
2067 14 : active_pid = GetPGProcByNumber(active_proc)->pid;
2068 : Assert(active_pid != 0);
2069 : }
2070 :
2071 46 : SpinLockRelease(&s->mutex);
2072 :
2073 : /*
2074 : * Calculate the idle time duration of the slot if slot is marked
2075 : * invalidated with RS_INVAL_IDLE_TIMEOUT.
2076 : */
2077 46 : if (invalidation_cause == RS_INVAL_IDLE_TIMEOUT)
2078 : {
2079 : int slot_idle_usecs;
2080 :
2081 0 : TimestampDifference(inactive_since, now, &slot_idle_secs,
2082 : &slot_idle_usecs);
2083 : }
2084 :
2085 46 : if (active_proc != INVALID_PROC_NUMBER)
2086 : {
2087 : /*
2088 : * Prepare the sleep on the slot's condition variable before
2089 : * releasing the lock, to close a possible race condition if the
2090 : * slot is released before the sleep below.
2091 : */
2092 14 : ConditionVariablePrepareToSleep(&s->active_cv);
2093 :
2094 14 : LWLockRelease(ReplicationSlotControlLock);
2095 14 : released_lock = true;
2096 :
2097 : /*
2098 : * Signal to terminate the process that owns the slot, if we
2099 : * haven't already signalled it. (Avoidance of repeated
2100 : * signalling is the only reason for there to be a loop in this
2101 : * routine; otherwise we could rely on caller's restart loop.)
2102 : *
2103 : * There is the race condition that other process may own the slot
2104 : * after its current owner process is terminated and before this
2105 : * process owns it. To handle that, we signal only if the PID of
2106 : * the owning process has changed from the previous time. (This
2107 : * logic assumes that the same PID is not reused very quickly.)
2108 : */
2109 14 : if (last_signaled_pid != active_pid)
2110 : {
2111 14 : ReportSlotInvalidation(invalidation_cause, true, active_pid,
2112 : slotname, restart_lsn,
2113 : oldestLSN, snapshotConflictHorizon,
2114 : slot_idle_secs);
2115 :
2116 14 : if (MyBackendType == B_STARTUP)
2117 10 : (void) SignalRecoveryConflict(GetPGProcByNumber(active_proc),
2118 : active_pid,
2119 : RECOVERY_CONFLICT_LOGICALSLOT);
2120 : else
2121 4 : (void) kill(active_pid, SIGTERM);
2122 :
2123 14 : last_signaled_pid = active_pid;
2124 : }
2125 :
2126 : /* Wait until the slot is released. */
2127 14 : ConditionVariableSleep(&s->active_cv,
2128 : WAIT_EVENT_REPLICATION_SLOT_DROP);
2129 :
2130 : /*
2131 : * Re-acquire lock and start over; we expect to invalidate the
2132 : * slot next time (unless another process acquires the slot in the
2133 : * meantime).
2134 : *
2135 : * Note: It is possible for a slot to advance its restart_lsn or
2136 : * xmin values sufficiently between when we release the mutex and
2137 : * when we recheck, moving from a conflicting state to a non
2138 : * conflicting state. This is intentional and safe: if the slot
2139 : * has caught up while we're busy here, the resources we were
2140 : * concerned about (WAL segments or tuples) have not yet been
2141 : * removed, and there's no reason to invalidate the slot.
2142 : */
2143 14 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
2144 14 : continue;
2145 : }
2146 : else
2147 : {
2148 : /*
2149 : * We hold the slot now and have already invalidated it; flush it
2150 : * to ensure that state persists.
2151 : *
2152 : * Don't want to hold ReplicationSlotControlLock across file
2153 : * system operations, so release it now but be sure to tell caller
2154 : * to restart from scratch.
2155 : */
2156 32 : LWLockRelease(ReplicationSlotControlLock);
2157 32 : released_lock = true;
2158 :
2159 : /* Make sure the invalidated state persists across server restart */
2160 32 : ReplicationSlotMarkDirty();
2161 32 : ReplicationSlotSave();
2162 32 : ReplicationSlotRelease();
2163 :
2164 32 : ReportSlotInvalidation(invalidation_cause, false, active_pid,
2165 : slotname, restart_lsn,
2166 : oldestLSN, snapshotConflictHorizon,
2167 : slot_idle_secs);
2168 :
2169 : /* done with this slot for now */
2170 32 : break;
2171 : }
2172 : }
2173 :
2174 : Assert(released_lock == !LWLockHeldByMe(ReplicationSlotControlLock));
2175 :
2176 888 : *released_lock_out = released_lock;
2177 888 : return invalidated;
2178 : }
2179 :
2180 : /*
2181 : * Invalidate slots that require resources about to be removed.
2182 : *
2183 : * Returns true when any slot have got invalidated.
2184 : *
2185 : * Whether a slot needs to be invalidated depends on the invalidation cause.
2186 : * A slot is invalidated if it:
2187 : * - RS_INVAL_WAL_REMOVED: requires a LSN older than the given segment
2188 : * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given
2189 : * db; dboid may be InvalidOid for shared relations
2190 : * - RS_INVAL_WAL_LEVEL: is a logical slot and effective_wal_level is not
2191 : * logical.
2192 : * - RS_INVAL_IDLE_TIMEOUT: has been idle longer than the configured
2193 : * "idle_replication_slot_timeout" duration.
2194 : *
2195 : * Note: This function attempts to invalidate the slot for multiple possible
2196 : * causes in a single pass, minimizing redundant iterations. The "cause"
2197 : * parameter can be a MASK representing one or more of the defined causes.
2198 : *
2199 : * If it invalidates the last logical slot in the cluster, it requests to
2200 : * disable logical decoding.
2201 : *
2202 : * NB - this runs as part of checkpoint, so avoid raising errors if possible.
2203 : */
2204 : bool
2205 3614 : InvalidateObsoleteReplicationSlots(uint32 possible_causes,
2206 : XLogSegNo oldestSegno, Oid dboid,
2207 : TransactionId snapshotConflictHorizon)
2208 : {
2209 : XLogRecPtr oldestLSN;
2210 3614 : bool invalidated = false;
2211 3614 : bool invalidated_logical = false;
2212 : bool found_valid_logicalslot;
2213 :
2214 : Assert(!(possible_causes & RS_INVAL_HORIZON) || TransactionIdIsValid(snapshotConflictHorizon));
2215 : Assert(!(possible_causes & RS_INVAL_WAL_REMOVED) || oldestSegno > 0);
2216 : Assert(possible_causes != RS_INVAL_NONE);
2217 :
2218 3614 : if (max_replication_slots == 0)
2219 2 : return invalidated;
2220 :
2221 3612 : XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
2222 :
2223 3644 : restart:
2224 3644 : found_valid_logicalslot = false;
2225 3644 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
2226 39074 : for (int i = 0; i < max_replication_slots; i++)
2227 : {
2228 35462 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
2229 35462 : bool released_lock = false;
2230 :
2231 35462 : if (!s->in_use)
2232 34574 : continue;
2233 :
2234 : /* Prevent invalidation of logical slots during binary upgrade */
2235 906 : if (SlotIsLogical(s) && IsBinaryUpgrade)
2236 : {
2237 18 : SpinLockAcquire(&s->mutex);
2238 18 : found_valid_logicalslot |= (s->data.invalidated == RS_INVAL_NONE);
2239 18 : SpinLockRelease(&s->mutex);
2240 :
2241 18 : continue;
2242 : }
2243 :
2244 888 : if (InvalidatePossiblyObsoleteSlot(possible_causes, s, oldestLSN,
2245 : dboid, snapshotConflictHorizon,
2246 : &released_lock))
2247 : {
2248 : Assert(released_lock);
2249 :
2250 : /* Remember we have invalidated a physical or logical slot */
2251 32 : invalidated = true;
2252 :
2253 : /*
2254 : * Additionally, remember we have invalidated a logical slot as we
2255 : * can request disabling logical decoding later.
2256 : */
2257 32 : if (SlotIsLogical(s))
2258 26 : invalidated_logical = true;
2259 : }
2260 : else
2261 : {
2262 : /*
2263 : * We need to check if the slot is invalidated here since
2264 : * InvalidatePossiblyObsoleteSlot() returns false also if the slot
2265 : * is already invalidated.
2266 : */
2267 856 : SpinLockAcquire(&s->mutex);
2268 1712 : found_valid_logicalslot |=
2269 856 : (SlotIsLogical(s) && (s->data.invalidated == RS_INVAL_NONE));
2270 856 : SpinLockRelease(&s->mutex);
2271 : }
2272 :
2273 : /* if the lock was released, start from scratch */
2274 888 : if (released_lock)
2275 32 : goto restart;
2276 : }
2277 3612 : LWLockRelease(ReplicationSlotControlLock);
2278 :
2279 : /*
2280 : * If any slots have been invalidated, recalculate the resource limits.
2281 : */
2282 3612 : if (invalidated)
2283 : {
2284 22 : ReplicationSlotsComputeRequiredXmin(false);
2285 22 : ReplicationSlotsComputeRequiredLSN();
2286 : }
2287 :
2288 : /*
2289 : * Request the checkpointer to disable logical decoding if no valid
2290 : * logical slots remain. If called by the checkpointer during a
2291 : * checkpoint, only the request is initiated; actual deactivation is
2292 : * deferred until after the checkpoint completes.
2293 : */
2294 3612 : if (invalidated_logical && !found_valid_logicalslot)
2295 16 : RequestDisableLogicalDecoding();
2296 :
2297 3612 : return invalidated;
2298 : }
2299 :
2300 : /*
2301 : * Flush all replication slots to disk.
2302 : *
2303 : * It is convenient to flush dirty replication slots at the time of checkpoint.
2304 : * Additionally, in case of a shutdown checkpoint, we also identify the slots
2305 : * for which the confirmed_flush LSN has been updated since the last time it
2306 : * was saved and flush them.
2307 : */
2308 : void
2309 3564 : CheckPointReplicationSlots(bool is_shutdown)
2310 : {
2311 : int i;
2312 3564 : bool last_saved_restart_lsn_updated = false;
2313 :
2314 3564 : elog(DEBUG1, "performing replication slot checkpoint");
2315 :
2316 : /*
2317 : * Prevent any slot from being created/dropped while we're active. As we
2318 : * explicitly do *not* want to block iterating over replication_slots or
2319 : * acquiring a slot we cannot take the control lock - but that's OK,
2320 : * because holding ReplicationSlotAllocationLock is strictly stronger, and
2321 : * enough to guarantee that nobody can change the in_use bits on us.
2322 : *
2323 : * Additionally, acquiring the Allocation lock is necessary to serialize
2324 : * the slot flush process with concurrent slot WAL reservation. This
2325 : * ensures that the WAL position being reserved is either flushed to disk
2326 : * or is beyond or equal to the redo pointer of the current checkpoint
2327 : * (See ReplicationSlotReserveWal for details).
2328 : */
2329 3564 : LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
2330 :
2331 38654 : for (i = 0; i < max_replication_slots; i++)
2332 : {
2333 35090 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
2334 : char path[MAXPGPATH];
2335 :
2336 35090 : if (!s->in_use)
2337 34300 : continue;
2338 :
2339 : /* save the slot to disk, locking is handled in SaveSlotToPath() */
2340 790 : sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(s->data.name));
2341 :
2342 : /*
2343 : * Slot's data is not flushed each time the confirmed_flush LSN is
2344 : * updated as that could lead to frequent writes. However, we decide
2345 : * to force a flush of all logical slot's data at the time of shutdown
2346 : * if the confirmed_flush LSN is changed since we last flushed it to
2347 : * disk. This helps in avoiding an unnecessary retreat of the
2348 : * confirmed_flush LSN after restart.
2349 : */
2350 790 : if (is_shutdown && SlotIsLogical(s))
2351 : {
2352 176 : SpinLockAcquire(&s->mutex);
2353 :
2354 176 : if (s->data.invalidated == RS_INVAL_NONE &&
2355 174 : s->data.confirmed_flush > s->last_saved_confirmed_flush)
2356 : {
2357 74 : s->just_dirtied = true;
2358 74 : s->dirty = true;
2359 : }
2360 176 : SpinLockRelease(&s->mutex);
2361 : }
2362 :
2363 : /*
2364 : * Track if we're going to update slot's last_saved_restart_lsn. We
2365 : * need this to know if we need to recompute the required LSN.
2366 : */
2367 790 : if (s->last_saved_restart_lsn != s->data.restart_lsn)
2368 390 : last_saved_restart_lsn_updated = true;
2369 :
2370 790 : SaveSlotToPath(s, path, LOG);
2371 : }
2372 3564 : LWLockRelease(ReplicationSlotAllocationLock);
2373 :
2374 : /*
2375 : * Recompute the required LSN if SaveSlotToPath() updated
2376 : * last_saved_restart_lsn for any slot.
2377 : */
2378 3564 : if (last_saved_restart_lsn_updated)
2379 390 : ReplicationSlotsComputeRequiredLSN();
2380 3564 : }
2381 :
2382 : /*
2383 : * Load all replication slots from disk into memory at server startup. This
2384 : * needs to be run before we start crash recovery.
2385 : */
2386 : void
2387 1988 : StartupReplicationSlots(void)
2388 : {
2389 : DIR *replication_dir;
2390 : struct dirent *replication_de;
2391 :
2392 1988 : elog(DEBUG1, "starting up replication slots");
2393 :
2394 : /* restore all slots by iterating over all on-disk entries */
2395 1988 : replication_dir = AllocateDir(PG_REPLSLOT_DIR);
2396 6194 : while ((replication_de = ReadDir(replication_dir, PG_REPLSLOT_DIR)) != NULL)
2397 : {
2398 : char path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
2399 : PGFileType de_type;
2400 :
2401 4210 : if (strcmp(replication_de->d_name, ".") == 0 ||
2402 2226 : strcmp(replication_de->d_name, "..") == 0)
2403 3968 : continue;
2404 :
2405 242 : snprintf(path, sizeof(path), "%s/%s", PG_REPLSLOT_DIR, replication_de->d_name);
2406 242 : de_type = get_dirent_type(path, replication_de, false, DEBUG1);
2407 :
2408 : /* we're only creating directories here, skip if it's not our's */
2409 242 : if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_DIR)
2410 0 : continue;
2411 :
2412 : /* we crashed while a slot was being setup or deleted, clean up */
2413 242 : if (pg_str_endswith(replication_de->d_name, ".tmp"))
2414 : {
2415 0 : if (!rmtree(path, true))
2416 : {
2417 0 : ereport(WARNING,
2418 : (errmsg("could not remove directory \"%s\"",
2419 : path)));
2420 0 : continue;
2421 : }
2422 0 : fsync_fname(PG_REPLSLOT_DIR, true);
2423 0 : continue;
2424 : }
2425 :
2426 : /* looks like a slot in a normal state, restore */
2427 242 : RestoreSlotFromDisk(replication_de->d_name);
2428 : }
2429 1984 : FreeDir(replication_dir);
2430 :
2431 : /* currently no slots exist, we're done. */
2432 1984 : if (max_replication_slots <= 0)
2433 2 : return;
2434 :
2435 : /* Now that we have recovered all the data, compute replication xmin */
2436 1982 : ReplicationSlotsComputeRequiredXmin(false);
2437 1982 : ReplicationSlotsComputeRequiredLSN();
2438 : }
2439 :
2440 : /* ----
2441 : * Manipulation of on-disk state of replication slots
2442 : *
2443 : * NB: none of the routines below should take any notice whether a slot is the
2444 : * current one or not, that's all handled a layer above.
2445 : * ----
2446 : */
2447 : static void
2448 1358 : CreateSlotOnDisk(ReplicationSlot *slot)
2449 : {
2450 : char tmppath[MAXPGPATH];
2451 : char path[MAXPGPATH];
2452 : struct stat st;
2453 :
2454 : /*
2455 : * No need to take out the io_in_progress_lock, nobody else can see this
2456 : * slot yet, so nobody else will write. We're reusing SaveSlotToPath which
2457 : * takes out the lock, if we'd take the lock here, we'd deadlock.
2458 : */
2459 :
2460 1358 : sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(slot->data.name));
2461 1358 : sprintf(tmppath, "%s/%s.tmp", PG_REPLSLOT_DIR, NameStr(slot->data.name));
2462 :
2463 : /*
2464 : * It's just barely possible that some previous effort to create or drop a
2465 : * slot with this name left a temp directory lying around. If that seems
2466 : * to be the case, try to remove it. If the rmtree() fails, we'll error
2467 : * out at the MakePGDirectory() below, so we don't bother checking
2468 : * success.
2469 : */
2470 1358 : if (stat(tmppath, &st) == 0 && S_ISDIR(st.st_mode))
2471 0 : rmtree(tmppath, true);
2472 :
2473 : /* Create and fsync the temporary slot directory. */
2474 1358 : if (MakePGDirectory(tmppath) < 0)
2475 0 : ereport(ERROR,
2476 : (errcode_for_file_access(),
2477 : errmsg("could not create directory \"%s\": %m",
2478 : tmppath)));
2479 1358 : fsync_fname(tmppath, true);
2480 :
2481 : /* Write the actual state file. */
2482 1358 : slot->dirty = true; /* signal that we really need to write */
2483 1358 : SaveSlotToPath(slot, tmppath, ERROR);
2484 :
2485 : /* Rename the directory into place. */
2486 1358 : if (rename(tmppath, path) != 0)
2487 0 : ereport(ERROR,
2488 : (errcode_for_file_access(),
2489 : errmsg("could not rename file \"%s\" to \"%s\": %m",
2490 : tmppath, path)));
2491 :
2492 : /*
2493 : * If we'd now fail - really unlikely - we wouldn't know whether this slot
2494 : * would persist after an OS crash or not - so, force a restart. The
2495 : * restart would try to fsync this again till it works.
2496 : */
2497 1358 : START_CRIT_SECTION();
2498 :
2499 1358 : fsync_fname(path, true);
2500 1358 : fsync_fname(PG_REPLSLOT_DIR, true);
2501 :
2502 1358 : END_CRIT_SECTION();
2503 1358 : }
2504 :
2505 : /*
2506 : * Shared functionality between saving and creating a replication slot.
2507 : */
2508 : static void
2509 4972 : SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
2510 : {
2511 : char tmppath[MAXPGPATH];
2512 : char path[MAXPGPATH];
2513 : int fd;
2514 : ReplicationSlotOnDisk cp;
2515 : bool was_dirty;
2516 :
2517 : /* first check whether there's something to write out */
2518 4972 : SpinLockAcquire(&slot->mutex);
2519 4972 : was_dirty = slot->dirty;
2520 4972 : slot->just_dirtied = false;
2521 4972 : SpinLockRelease(&slot->mutex);
2522 :
2523 : /* and don't do anything if there's nothing to write */
2524 4972 : if (!was_dirty)
2525 304 : return;
2526 :
2527 4668 : LWLockAcquire(&slot->io_in_progress_lock, LW_EXCLUSIVE);
2528 :
2529 : /* silence valgrind :( */
2530 4668 : memset(&cp, 0, sizeof(ReplicationSlotOnDisk));
2531 :
2532 4668 : sprintf(tmppath, "%s/state.tmp", dir);
2533 4668 : sprintf(path, "%s/state", dir);
2534 :
2535 4668 : fd = OpenTransientFile(tmppath, O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
2536 4668 : if (fd < 0)
2537 : {
2538 : /*
2539 : * If not an ERROR, then release the lock before returning. In case
2540 : * of an ERROR, the error recovery path automatically releases the
2541 : * lock, but no harm in explicitly releasing even in that case. Note
2542 : * that LWLockRelease() could affect errno.
2543 : */
2544 0 : int save_errno = errno;
2545 :
2546 0 : LWLockRelease(&slot->io_in_progress_lock);
2547 0 : errno = save_errno;
2548 0 : ereport(elevel,
2549 : (errcode_for_file_access(),
2550 : errmsg("could not create file \"%s\": %m",
2551 : tmppath)));
2552 0 : return;
2553 : }
2554 :
2555 4668 : cp.magic = SLOT_MAGIC;
2556 4668 : INIT_CRC32C(cp.checksum);
2557 4668 : cp.version = SLOT_VERSION;
2558 4668 : cp.length = ReplicationSlotOnDiskV2Size;
2559 :
2560 4668 : SpinLockAcquire(&slot->mutex);
2561 :
2562 4668 : memcpy(&cp.slotdata, &slot->data, sizeof(ReplicationSlotPersistentData));
2563 :
2564 4668 : SpinLockRelease(&slot->mutex);
2565 :
2566 4668 : COMP_CRC32C(cp.checksum,
2567 : (char *) (&cp) + ReplicationSlotOnDiskNotChecksummedSize,
2568 : ReplicationSlotOnDiskChecksummedSize);
2569 4668 : FIN_CRC32C(cp.checksum);
2570 :
2571 4668 : errno = 0;
2572 4668 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_WRITE);
2573 4668 : if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
2574 : {
2575 0 : int save_errno = errno;
2576 :
2577 0 : pgstat_report_wait_end();
2578 0 : CloseTransientFile(fd);
2579 0 : unlink(tmppath);
2580 0 : LWLockRelease(&slot->io_in_progress_lock);
2581 :
2582 : /* if write didn't set errno, assume problem is no disk space */
2583 0 : errno = save_errno ? save_errno : ENOSPC;
2584 0 : ereport(elevel,
2585 : (errcode_for_file_access(),
2586 : errmsg("could not write to file \"%s\": %m",
2587 : tmppath)));
2588 0 : return;
2589 : }
2590 4668 : pgstat_report_wait_end();
2591 :
2592 : /* fsync the temporary file */
2593 4668 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_SYNC);
2594 4668 : if (pg_fsync(fd) != 0)
2595 : {
2596 0 : int save_errno = errno;
2597 :
2598 0 : pgstat_report_wait_end();
2599 0 : CloseTransientFile(fd);
2600 0 : unlink(tmppath);
2601 0 : LWLockRelease(&slot->io_in_progress_lock);
2602 :
2603 0 : errno = save_errno;
2604 0 : ereport(elevel,
2605 : (errcode_for_file_access(),
2606 : errmsg("could not fsync file \"%s\": %m",
2607 : tmppath)));
2608 0 : return;
2609 : }
2610 4668 : pgstat_report_wait_end();
2611 :
2612 4668 : if (CloseTransientFile(fd) != 0)
2613 : {
2614 0 : int save_errno = errno;
2615 :
2616 0 : unlink(tmppath);
2617 0 : LWLockRelease(&slot->io_in_progress_lock);
2618 :
2619 0 : errno = save_errno;
2620 0 : ereport(elevel,
2621 : (errcode_for_file_access(),
2622 : errmsg("could not close file \"%s\": %m",
2623 : tmppath)));
2624 0 : return;
2625 : }
2626 :
2627 : /* rename to permanent file, fsync file and directory */
2628 4668 : if (rename(tmppath, path) != 0)
2629 : {
2630 0 : int save_errno = errno;
2631 :
2632 0 : unlink(tmppath);
2633 0 : LWLockRelease(&slot->io_in_progress_lock);
2634 :
2635 0 : errno = save_errno;
2636 0 : ereport(elevel,
2637 : (errcode_for_file_access(),
2638 : errmsg("could not rename file \"%s\" to \"%s\": %m",
2639 : tmppath, path)));
2640 0 : return;
2641 : }
2642 :
2643 : /*
2644 : * Check CreateSlotOnDisk() for the reasoning of using a critical section.
2645 : */
2646 4668 : START_CRIT_SECTION();
2647 :
2648 4668 : fsync_fname(path, false);
2649 4668 : fsync_fname(dir, true);
2650 4668 : fsync_fname(PG_REPLSLOT_DIR, true);
2651 :
2652 4668 : END_CRIT_SECTION();
2653 :
2654 : /*
2655 : * Successfully wrote, unset dirty bit, unless somebody dirtied again
2656 : * already and remember the confirmed_flush LSN value.
2657 : */
2658 4668 : SpinLockAcquire(&slot->mutex);
2659 4668 : if (!slot->just_dirtied)
2660 4654 : slot->dirty = false;
2661 4668 : slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
2662 4668 : slot->last_saved_restart_lsn = cp.slotdata.restart_lsn;
2663 4668 : SpinLockRelease(&slot->mutex);
2664 :
2665 4668 : LWLockRelease(&slot->io_in_progress_lock);
2666 : }
2667 :
2668 : /*
2669 : * Load a single slot from disk into memory.
2670 : */
2671 : static void
2672 242 : RestoreSlotFromDisk(const char *name)
2673 : {
2674 : ReplicationSlotOnDisk cp;
2675 : int i;
2676 : char slotdir[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
2677 : char path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR) + 10];
2678 : int fd;
2679 242 : bool restored = false;
2680 : int readBytes;
2681 : pg_crc32c checksum;
2682 242 : TimestampTz now = 0;
2683 :
2684 : /* no need to lock here, no concurrent access allowed yet */
2685 :
2686 : /* delete temp file if it exists */
2687 242 : sprintf(slotdir, "%s/%s", PG_REPLSLOT_DIR, name);
2688 242 : sprintf(path, "%s/state.tmp", slotdir);
2689 242 : if (unlink(path) < 0 && errno != ENOENT)
2690 0 : ereport(PANIC,
2691 : (errcode_for_file_access(),
2692 : errmsg("could not remove file \"%s\": %m", path)));
2693 :
2694 242 : sprintf(path, "%s/state", slotdir);
2695 :
2696 242 : elog(DEBUG1, "restoring replication slot from \"%s\"", path);
2697 :
2698 : /* on some operating systems fsyncing a file requires O_RDWR */
2699 242 : fd = OpenTransientFile(path, O_RDWR | PG_BINARY);
2700 :
2701 : /*
2702 : * We do not need to handle this as we are rename()ing the directory into
2703 : * place only after we fsync()ed the state file.
2704 : */
2705 242 : if (fd < 0)
2706 0 : ereport(PANIC,
2707 : (errcode_for_file_access(),
2708 : errmsg("could not open file \"%s\": %m", path)));
2709 :
2710 : /*
2711 : * Sync state file before we're reading from it. We might have crashed
2712 : * while it wasn't synced yet and we shouldn't continue on that basis.
2713 : */
2714 242 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_RESTORE_SYNC);
2715 242 : if (pg_fsync(fd) != 0)
2716 0 : ereport(PANIC,
2717 : (errcode_for_file_access(),
2718 : errmsg("could not fsync file \"%s\": %m",
2719 : path)));
2720 242 : pgstat_report_wait_end();
2721 :
2722 : /* Also sync the parent directory */
2723 242 : START_CRIT_SECTION();
2724 242 : fsync_fname(slotdir, true);
2725 242 : END_CRIT_SECTION();
2726 :
2727 : /* read part of statefile that's guaranteed to be version independent */
2728 242 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
2729 242 : readBytes = read(fd, &cp, ReplicationSlotOnDiskConstantSize);
2730 242 : pgstat_report_wait_end();
2731 242 : if (readBytes != ReplicationSlotOnDiskConstantSize)
2732 : {
2733 0 : if (readBytes < 0)
2734 0 : ereport(PANIC,
2735 : (errcode_for_file_access(),
2736 : errmsg("could not read file \"%s\": %m", path)));
2737 : else
2738 0 : ereport(PANIC,
2739 : (errcode(ERRCODE_DATA_CORRUPTED),
2740 : errmsg("could not read file \"%s\": read %d of %zu",
2741 : path, readBytes,
2742 : (Size) ReplicationSlotOnDiskConstantSize)));
2743 : }
2744 :
2745 : /* verify magic */
2746 242 : if (cp.magic != SLOT_MAGIC)
2747 0 : ereport(PANIC,
2748 : (errcode(ERRCODE_DATA_CORRUPTED),
2749 : errmsg("replication slot file \"%s\" has wrong magic number: %u instead of %u",
2750 : path, cp.magic, SLOT_MAGIC)));
2751 :
2752 : /* verify version */
2753 242 : if (cp.version != SLOT_VERSION)
2754 0 : ereport(PANIC,
2755 : (errcode(ERRCODE_DATA_CORRUPTED),
2756 : errmsg("replication slot file \"%s\" has unsupported version %u",
2757 : path, cp.version)));
2758 :
2759 : /* boundary check on length */
2760 242 : if (cp.length != ReplicationSlotOnDiskV2Size)
2761 0 : ereport(PANIC,
2762 : (errcode(ERRCODE_DATA_CORRUPTED),
2763 : errmsg("replication slot file \"%s\" has corrupted length %u",
2764 : path, cp.length)));
2765 :
2766 : /* Now that we know the size, read the entire file */
2767 242 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
2768 484 : readBytes = read(fd,
2769 : (char *) &cp + ReplicationSlotOnDiskConstantSize,
2770 242 : cp.length);
2771 242 : pgstat_report_wait_end();
2772 242 : if (readBytes != cp.length)
2773 : {
2774 0 : if (readBytes < 0)
2775 0 : ereport(PANIC,
2776 : (errcode_for_file_access(),
2777 : errmsg("could not read file \"%s\": %m", path)));
2778 : else
2779 0 : ereport(PANIC,
2780 : (errcode(ERRCODE_DATA_CORRUPTED),
2781 : errmsg("could not read file \"%s\": read %d of %zu",
2782 : path, readBytes, (Size) cp.length)));
2783 : }
2784 :
2785 242 : if (CloseTransientFile(fd) != 0)
2786 0 : ereport(PANIC,
2787 : (errcode_for_file_access(),
2788 : errmsg("could not close file \"%s\": %m", path)));
2789 :
2790 : /* now verify the CRC */
2791 242 : INIT_CRC32C(checksum);
2792 242 : COMP_CRC32C(checksum,
2793 : (char *) &cp + ReplicationSlotOnDiskNotChecksummedSize,
2794 : ReplicationSlotOnDiskChecksummedSize);
2795 242 : FIN_CRC32C(checksum);
2796 :
2797 242 : if (!EQ_CRC32C(checksum, cp.checksum))
2798 0 : ereport(PANIC,
2799 : (errmsg("checksum mismatch for replication slot file \"%s\": is %u, should be %u",
2800 : path, checksum, cp.checksum)));
2801 :
2802 : /*
2803 : * If we crashed with an ephemeral slot active, don't restore but delete
2804 : * it.
2805 : */
2806 242 : if (cp.slotdata.persistency != RS_PERSISTENT)
2807 : {
2808 0 : if (!rmtree(slotdir, true))
2809 : {
2810 0 : ereport(WARNING,
2811 : (errmsg("could not remove directory \"%s\"",
2812 : slotdir)));
2813 : }
2814 0 : fsync_fname(PG_REPLSLOT_DIR, true);
2815 0 : return;
2816 : }
2817 :
2818 : /*
2819 : * Verify that requirements for the specific slot type are met. That's
2820 : * important because if these aren't met we're not guaranteed to retain
2821 : * all the necessary resources for the slot.
2822 : *
2823 : * NB: We have to do so *after* the above checks for ephemeral slots,
2824 : * because otherwise a slot that shouldn't exist anymore could prevent
2825 : * restarts.
2826 : *
2827 : * NB: Changing the requirements here also requires adapting
2828 : * CheckSlotRequirements() and CheckLogicalDecodingRequirements().
2829 : */
2830 242 : if (cp.slotdata.database != InvalidOid)
2831 : {
2832 166 : if (wal_level < WAL_LEVEL_REPLICA)
2833 2 : ereport(FATAL,
2834 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2835 : errmsg("logical replication slot \"%s\" exists, but \"wal_level\" < \"replica\"",
2836 : NameStr(cp.slotdata.name)),
2837 : errhint("Change \"wal_level\" to be \"replica\" or higher.")));
2838 :
2839 : /*
2840 : * In standby mode, the hot standby must be enabled. This check is
2841 : * necessary to ensure logical slots are invalidated when they become
2842 : * incompatible due to insufficient wal_level. Otherwise, if the
2843 : * primary reduces effective_wal_level < logical while hot standby is
2844 : * disabled, primary disable logical decoding while hot standby is
2845 : * disabled, logical slots would remain valid even after promotion.
2846 : */
2847 164 : if (StandbyMode && !EnableHotStandby)
2848 2 : ereport(FATAL,
2849 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2850 : errmsg("logical replication slot \"%s\" exists on the standby, but \"hot_standby\" = \"off\"",
2851 : NameStr(cp.slotdata.name)),
2852 : errhint("Change \"hot_standby\" to be \"on\".")));
2853 : }
2854 76 : else if (wal_level < WAL_LEVEL_REPLICA)
2855 0 : ereport(FATAL,
2856 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2857 : errmsg("physical replication slot \"%s\" exists, but \"wal_level\" < \"replica\"",
2858 : NameStr(cp.slotdata.name)),
2859 : errhint("Change \"wal_level\" to be \"replica\" or higher.")));
2860 :
2861 : /* nothing can be active yet, don't lock anything */
2862 352 : for (i = 0; i < max_replication_slots; i++)
2863 : {
2864 : ReplicationSlot *slot;
2865 :
2866 352 : slot = &ReplicationSlotCtl->replication_slots[i];
2867 :
2868 352 : if (slot->in_use)
2869 114 : continue;
2870 :
2871 : /* restore the entire set of persistent data */
2872 238 : memcpy(&slot->data, &cp.slotdata,
2873 : sizeof(ReplicationSlotPersistentData));
2874 :
2875 : /* initialize in memory state */
2876 238 : slot->effective_xmin = cp.slotdata.xmin;
2877 238 : slot->effective_catalog_xmin = cp.slotdata.catalog_xmin;
2878 238 : slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
2879 238 : slot->last_saved_restart_lsn = cp.slotdata.restart_lsn;
2880 :
2881 238 : slot->candidate_catalog_xmin = InvalidTransactionId;
2882 238 : slot->candidate_xmin_lsn = InvalidXLogRecPtr;
2883 238 : slot->candidate_restart_lsn = InvalidXLogRecPtr;
2884 238 : slot->candidate_restart_valid = InvalidXLogRecPtr;
2885 :
2886 238 : slot->in_use = true;
2887 238 : slot->active_proc = INVALID_PROC_NUMBER;
2888 :
2889 : /*
2890 : * Set the time since the slot has become inactive after loading the
2891 : * slot from the disk into memory. Whoever acquires the slot i.e.
2892 : * makes the slot active will reset it. Use the same inactive_since
2893 : * time for all the slots.
2894 : */
2895 238 : if (now == 0)
2896 238 : now = GetCurrentTimestamp();
2897 :
2898 238 : ReplicationSlotSetInactiveSince(slot, now, false);
2899 :
2900 238 : restored = true;
2901 238 : break;
2902 : }
2903 :
2904 238 : if (!restored)
2905 0 : ereport(FATAL,
2906 : (errmsg("too many replication slots active before shutdown"),
2907 : errhint("Increase \"max_replication_slots\" and try again.")));
2908 : }
2909 :
2910 : /*
2911 : * Maps an invalidation reason for a replication slot to
2912 : * ReplicationSlotInvalidationCause.
2913 : */
2914 : ReplicationSlotInvalidationCause
2915 0 : GetSlotInvalidationCause(const char *cause_name)
2916 : {
2917 : Assert(cause_name);
2918 :
2919 : /* Search lookup table for the cause having this name */
2920 0 : for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
2921 : {
2922 0 : if (strcmp(SlotInvalidationCauses[i].cause_name, cause_name) == 0)
2923 0 : return SlotInvalidationCauses[i].cause;
2924 : }
2925 :
2926 : Assert(false);
2927 0 : return RS_INVAL_NONE; /* to keep compiler quiet */
2928 : }
2929 :
2930 : /*
2931 : * Maps a ReplicationSlotInvalidationCause to the invalidation
2932 : * reason for a replication slot.
2933 : */
2934 : const char *
2935 84 : GetSlotInvalidationCauseName(ReplicationSlotInvalidationCause cause)
2936 : {
2937 : /* Search lookup table for the name of this cause */
2938 262 : for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
2939 : {
2940 262 : if (SlotInvalidationCauses[i].cause == cause)
2941 84 : return SlotInvalidationCauses[i].cause_name;
2942 : }
2943 :
2944 : Assert(false);
2945 0 : return "none"; /* to keep compiler quiet */
2946 : }
2947 :
2948 : /*
2949 : * A helper function to validate slots specified in GUC synchronized_standby_slots.
2950 : *
2951 : * The rawname will be parsed, and the result will be saved into *elemlist.
2952 : */
2953 : static bool
2954 36 : validate_sync_standby_slots(char *rawname, List **elemlist)
2955 : {
2956 : /* Verify syntax and parse string into a list of identifiers */
2957 36 : if (!SplitIdentifierString(rawname, ',', elemlist))
2958 : {
2959 0 : GUC_check_errdetail("List syntax is invalid.");
2960 0 : return false;
2961 : }
2962 :
2963 : /* Iterate the list to validate each slot name */
2964 100 : foreach_ptr(char, name, *elemlist)
2965 : {
2966 : int err_code;
2967 36 : char *err_msg = NULL;
2968 36 : char *err_hint = NULL;
2969 :
2970 36 : if (!ReplicationSlotValidateNameInternal(name, false, &err_code,
2971 : &err_msg, &err_hint))
2972 : {
2973 4 : GUC_check_errcode(err_code);
2974 4 : GUC_check_errdetail("%s", err_msg);
2975 4 : if (err_hint != NULL)
2976 4 : GUC_check_errhint("%s", err_hint);
2977 4 : return false;
2978 : }
2979 : }
2980 :
2981 32 : return true;
2982 : }
2983 :
2984 : /*
2985 : * GUC check_hook for synchronized_standby_slots
2986 : */
2987 : bool
2988 2416 : check_synchronized_standby_slots(char **newval, void **extra, GucSource source)
2989 : {
2990 : char *rawname;
2991 : char *ptr;
2992 : List *elemlist;
2993 : int size;
2994 : bool ok;
2995 : SyncStandbySlotsConfigData *config;
2996 :
2997 2416 : if ((*newval)[0] == '\0')
2998 2380 : return true;
2999 :
3000 : /* Need a modifiable copy of the GUC string */
3001 36 : rawname = pstrdup(*newval);
3002 :
3003 : /* Now verify if the specified slots exist and have correct type */
3004 36 : ok = validate_sync_standby_slots(rawname, &elemlist);
3005 :
3006 36 : if (!ok || elemlist == NIL)
3007 : {
3008 4 : pfree(rawname);
3009 4 : list_free(elemlist);
3010 4 : return ok;
3011 : }
3012 :
3013 : /* Compute the size required for the SyncStandbySlotsConfigData struct */
3014 32 : size = offsetof(SyncStandbySlotsConfigData, slot_names);
3015 96 : foreach_ptr(char, slot_name, elemlist)
3016 32 : size += strlen(slot_name) + 1;
3017 :
3018 : /* GUC extra value must be guc_malloc'd, not palloc'd */
3019 32 : config = (SyncStandbySlotsConfigData *) guc_malloc(LOG, size);
3020 32 : if (!config)
3021 0 : return false;
3022 :
3023 : /* Transform the data into SyncStandbySlotsConfigData */
3024 32 : config->nslotnames = list_length(elemlist);
3025 :
3026 32 : ptr = config->slot_names;
3027 96 : foreach_ptr(char, slot_name, elemlist)
3028 : {
3029 32 : strcpy(ptr, slot_name);
3030 32 : ptr += strlen(slot_name) + 1;
3031 : }
3032 :
3033 32 : *extra = config;
3034 :
3035 32 : pfree(rawname);
3036 32 : list_free(elemlist);
3037 32 : return true;
3038 : }
3039 :
3040 : /*
3041 : * GUC assign_hook for synchronized_standby_slots
3042 : */
3043 : void
3044 2410 : assign_synchronized_standby_slots(const char *newval, void *extra)
3045 : {
3046 : /*
3047 : * The standby slots may have changed, so we must recompute the oldest
3048 : * LSN.
3049 : */
3050 2410 : ss_oldest_flush_lsn = InvalidXLogRecPtr;
3051 :
3052 2410 : synchronized_standby_slots_config = (SyncStandbySlotsConfigData *) extra;
3053 2410 : }
3054 :
3055 : /*
3056 : * Check if the passed slot_name is specified in the synchronized_standby_slots GUC.
3057 : */
3058 : bool
3059 56646 : SlotExistsInSyncStandbySlots(const char *slot_name)
3060 : {
3061 : const char *standby_slot_name;
3062 :
3063 : /* Return false if there is no value in synchronized_standby_slots */
3064 56646 : if (synchronized_standby_slots_config == NULL)
3065 56622 : return false;
3066 :
3067 : /*
3068 : * XXX: We are not expecting this list to be long so a linear search
3069 : * shouldn't hurt but if that turns out not to be true then we can cache
3070 : * this information for each WalSender as well.
3071 : */
3072 24 : standby_slot_name = synchronized_standby_slots_config->slot_names;
3073 36 : for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
3074 : {
3075 24 : if (strcmp(standby_slot_name, slot_name) == 0)
3076 12 : return true;
3077 :
3078 12 : standby_slot_name += strlen(standby_slot_name) + 1;
3079 : }
3080 :
3081 12 : return false;
3082 : }
3083 :
3084 : /*
3085 : * Return true if the slots specified in synchronized_standby_slots have caught up to
3086 : * the given WAL location, false otherwise.
3087 : *
3088 : * The elevel parameter specifies the error level used for logging messages
3089 : * related to slots that do not exist, are invalidated, or are inactive.
3090 : */
3091 : bool
3092 1322 : StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
3093 : {
3094 : const char *name;
3095 1322 : int caught_up_slot_num = 0;
3096 1322 : XLogRecPtr min_restart_lsn = InvalidXLogRecPtr;
3097 :
3098 : /*
3099 : * Don't need to wait for the standbys to catch up if there is no value in
3100 : * synchronized_standby_slots.
3101 : */
3102 1322 : if (synchronized_standby_slots_config == NULL)
3103 1288 : return true;
3104 :
3105 : /*
3106 : * Don't need to wait for the standbys to catch up if we are on a standby
3107 : * server, since we do not support syncing slots to cascading standbys.
3108 : */
3109 34 : if (RecoveryInProgress())
3110 0 : return true;
3111 :
3112 : /*
3113 : * Don't need to wait for the standbys to catch up if they are already
3114 : * beyond the specified WAL location.
3115 : */
3116 34 : if (XLogRecPtrIsValid(ss_oldest_flush_lsn) &&
3117 18 : ss_oldest_flush_lsn >= wait_for_lsn)
3118 12 : return true;
3119 :
3120 : /*
3121 : * To prevent concurrent slot dropping and creation while filtering the
3122 : * slots, take the ReplicationSlotControlLock outside of the loop.
3123 : */
3124 22 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
3125 :
3126 22 : name = synchronized_standby_slots_config->slot_names;
3127 32 : for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
3128 : {
3129 : XLogRecPtr restart_lsn;
3130 : bool invalidated;
3131 : bool inactive;
3132 : ReplicationSlot *slot;
3133 :
3134 22 : slot = SearchNamedReplicationSlot(name, false);
3135 :
3136 : /*
3137 : * If a slot name provided in synchronized_standby_slots does not
3138 : * exist, report a message and exit the loop.
3139 : */
3140 22 : if (!slot)
3141 : {
3142 0 : ereport(elevel,
3143 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
3144 : errmsg("replication slot \"%s\" specified in parameter \"%s\" does not exist",
3145 : name, "synchronized_standby_slots"),
3146 : errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
3147 : name),
3148 : errhint("Create the replication slot \"%s\" or amend parameter \"%s\".",
3149 : name, "synchronized_standby_slots"));
3150 0 : break;
3151 : }
3152 :
3153 : /* Same as above: if a slot is not physical, exit the loop. */
3154 22 : if (SlotIsLogical(slot))
3155 : {
3156 0 : ereport(elevel,
3157 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
3158 : errmsg("cannot specify logical replication slot \"%s\" in parameter \"%s\"",
3159 : name, "synchronized_standby_slots"),
3160 : errdetail("Logical replication is waiting for correction on replication slot \"%s\".",
3161 : name),
3162 : errhint("Remove the logical replication slot \"%s\" from parameter \"%s\".",
3163 : name, "synchronized_standby_slots"));
3164 0 : break;
3165 : }
3166 :
3167 22 : SpinLockAcquire(&slot->mutex);
3168 22 : restart_lsn = slot->data.restart_lsn;
3169 22 : invalidated = slot->data.invalidated != RS_INVAL_NONE;
3170 22 : inactive = slot->active_proc == INVALID_PROC_NUMBER;
3171 22 : SpinLockRelease(&slot->mutex);
3172 :
3173 22 : if (invalidated)
3174 : {
3175 : /* Specified physical slot has been invalidated */
3176 0 : ereport(elevel,
3177 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
3178 : errmsg("physical replication slot \"%s\" specified in parameter \"%s\" has been invalidated",
3179 : name, "synchronized_standby_slots"),
3180 : errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
3181 : name),
3182 : errhint("Drop and recreate the replication slot \"%s\", or amend parameter \"%s\".",
3183 : name, "synchronized_standby_slots"));
3184 0 : break;
3185 : }
3186 :
3187 22 : if (!XLogRecPtrIsValid(restart_lsn) || restart_lsn < wait_for_lsn)
3188 : {
3189 : /* Log a message if no active_pid for this physical slot */
3190 12 : if (inactive)
3191 12 : ereport(elevel,
3192 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
3193 : errmsg("replication slot \"%s\" specified in parameter \"%s\" does not have active_pid",
3194 : name, "synchronized_standby_slots"),
3195 : errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
3196 : name),
3197 : errhint("Start the standby associated with the replication slot \"%s\", or amend parameter \"%s\".",
3198 : name, "synchronized_standby_slots"));
3199 :
3200 : /* Continue if the current slot hasn't caught up. */
3201 12 : break;
3202 : }
3203 :
3204 : Assert(restart_lsn >= wait_for_lsn);
3205 :
3206 10 : if (!XLogRecPtrIsValid(min_restart_lsn) ||
3207 : min_restart_lsn > restart_lsn)
3208 10 : min_restart_lsn = restart_lsn;
3209 :
3210 10 : caught_up_slot_num++;
3211 :
3212 10 : name += strlen(name) + 1;
3213 : }
3214 :
3215 22 : LWLockRelease(ReplicationSlotControlLock);
3216 :
3217 : /*
3218 : * Return false if not all the standbys have caught up to the specified
3219 : * WAL location.
3220 : */
3221 22 : if (caught_up_slot_num != synchronized_standby_slots_config->nslotnames)
3222 12 : return false;
3223 :
3224 : /* The ss_oldest_flush_lsn must not retreat. */
3225 : Assert(!XLogRecPtrIsValid(ss_oldest_flush_lsn) ||
3226 : min_restart_lsn >= ss_oldest_flush_lsn);
3227 :
3228 10 : ss_oldest_flush_lsn = min_restart_lsn;
3229 :
3230 10 : return true;
3231 : }
3232 :
3233 : /*
3234 : * Wait for physical standbys to confirm receiving the given lsn.
3235 : *
3236 : * Used by logical decoding SQL functions. It waits for physical standbys
3237 : * corresponding to the physical slots specified in the synchronized_standby_slots GUC.
3238 : */
3239 : void
3240 452 : WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
3241 : {
3242 : /*
3243 : * Don't need to wait for the standby to catch up if the current acquired
3244 : * slot is not a logical failover slot, or there is no value in
3245 : * synchronized_standby_slots.
3246 : */
3247 452 : if (!MyReplicationSlot->data.failover || !synchronized_standby_slots_config)
3248 450 : return;
3249 :
3250 2 : ConditionVariablePrepareToSleep(&WalSndCtl->wal_confirm_rcv_cv);
3251 :
3252 : for (;;)
3253 : {
3254 4 : CHECK_FOR_INTERRUPTS();
3255 :
3256 4 : if (ConfigReloadPending)
3257 : {
3258 2 : ConfigReloadPending = false;
3259 2 : ProcessConfigFile(PGC_SIGHUP);
3260 : }
3261 :
3262 : /* Exit if done waiting for every slot. */
3263 4 : if (StandbySlotsHaveCaughtup(wait_for_lsn, WARNING))
3264 2 : break;
3265 :
3266 : /*
3267 : * Wait for the slots in the synchronized_standby_slots to catch up,
3268 : * but use a timeout (1s) so we can also check if the
3269 : * synchronized_standby_slots has been changed.
3270 : */
3271 2 : ConditionVariableTimedSleep(&WalSndCtl->wal_confirm_rcv_cv, 1000,
3272 : WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION);
3273 : }
3274 :
3275 2 : ConditionVariableCancelSleep();
3276 : }
|