Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * slot.c
4 : * Replication slot management.
5 : *
6 : *
7 : * Copyright (c) 2012-2025, PostgreSQL Global Development Group
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/backend/replication/slot.c
12 : *
13 : * NOTES
14 : *
15 : * Replication slots are used to keep state about replication streams
16 : * originating from this cluster. Their primary purpose is to prevent the
17 : * premature removal of WAL or of old tuple versions in a manner that would
18 : * interfere with replication; they are also useful for monitoring purposes.
19 : * Slots need to be permanent (to allow restarts), crash-safe, and allocatable
20 : * on standbys (to support cascading setups). The requirement that slots be
21 : * usable on standbys precludes storing them in the system catalogs.
22 : *
23 : * Each replication slot gets its own directory inside the directory
24 : * $PGDATA / PG_REPLSLOT_DIR. Inside that directory the state file will
25 : * contain the slot's own data. Additional data can be stored alongside that
26 : * file if required. While the server is running, the state data is also
27 : * cached in memory for efficiency.
28 : *
29 : * ReplicationSlotAllocationLock must be taken in exclusive mode to allocate
30 : * or free a slot. ReplicationSlotControlLock must be taken in shared mode
31 : * to iterate over the slots, and in exclusive mode to change the in_use flag
32 : * of a slot. The remaining data in each slot is protected by its mutex.
33 : *
34 : *-------------------------------------------------------------------------
35 : */
36 :
37 : #include "postgres.h"
38 :
39 : #include <unistd.h>
40 : #include <sys/stat.h>
41 :
42 : #include "access/transam.h"
43 : #include "access/xlog_internal.h"
44 : #include "access/xlogrecovery.h"
45 : #include "common/file_utils.h"
46 : #include "common/string.h"
47 : #include "miscadmin.h"
48 : #include "pgstat.h"
49 : #include "postmaster/interrupt.h"
50 : #include "replication/slotsync.h"
51 : #include "replication/slot.h"
52 : #include "replication/walsender_private.h"
53 : #include "storage/fd.h"
54 : #include "storage/ipc.h"
55 : #include "storage/proc.h"
56 : #include "storage/procarray.h"
57 : #include "utils/builtins.h"
58 : #include "utils/guc_hooks.h"
59 : #include "utils/injection_point.h"
60 : #include "utils/varlena.h"
61 :
62 : /*
63 : * Replication slot on-disk data structure.
64 : */
65 : typedef struct ReplicationSlotOnDisk
66 : {
67 : /* first part of this struct needs to be version independent */
68 :
69 : /* data not covered by checksum */
70 : uint32 magic;
71 : pg_crc32c checksum;
72 :
73 : /* data covered by checksum */
74 : uint32 version;
75 : uint32 length;
76 :
77 : /*
78 : * The actual data in the slot that follows can differ based on the above
79 : * 'version'.
80 : */
81 :
82 : ReplicationSlotPersistentData slotdata;
83 : } ReplicationSlotOnDisk;
84 :
85 : /*
86 : * Struct for the configuration of synchronized_standby_slots.
87 : *
88 : * Note: this must be a flat representation that can be held in a single chunk
89 : * of guc_malloc'd memory, so that it can be stored as the "extra" data for the
90 : * synchronized_standby_slots GUC.
91 : */
92 : typedef struct
93 : {
94 : /* Number of slot names in the slot_names[] */
95 : int nslotnames;
96 :
97 : /*
98 : * slot_names contains 'nslotnames' consecutive null-terminated C strings.
99 : */
100 : char slot_names[FLEXIBLE_ARRAY_MEMBER];
101 : } SyncStandbySlotsConfigData;
102 :
103 : /*
104 : * Lookup table for slot invalidation causes.
105 : */
106 : typedef struct SlotInvalidationCauseMap
107 : {
108 : ReplicationSlotInvalidationCause cause;
109 : const char *cause_name;
110 : } SlotInvalidationCauseMap;
111 :
112 : static const SlotInvalidationCauseMap SlotInvalidationCauses[] = {
113 : {RS_INVAL_NONE, "none"},
114 : {RS_INVAL_WAL_REMOVED, "wal_removed"},
115 : {RS_INVAL_HORIZON, "rows_removed"},
116 : {RS_INVAL_WAL_LEVEL, "wal_level_insufficient"},
117 : {RS_INVAL_IDLE_TIMEOUT, "idle_timeout"},
118 : };
119 :
120 : /*
121 : * Ensure that the lookup table is up-to-date with the enums defined in
122 : * ReplicationSlotInvalidationCause.
123 : */
124 : StaticAssertDecl(lengthof(SlotInvalidationCauses) == (RS_INVAL_MAX_CAUSES + 1),
125 : "array length mismatch");
126 :
127 : /* size of version independent data */
128 : #define ReplicationSlotOnDiskConstantSize \
129 : offsetof(ReplicationSlotOnDisk, slotdata)
130 : /* size of the part of the slot not covered by the checksum */
131 : #define ReplicationSlotOnDiskNotChecksummedSize \
132 : offsetof(ReplicationSlotOnDisk, version)
133 : /* size of the part covered by the checksum */
134 : #define ReplicationSlotOnDiskChecksummedSize \
135 : sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskNotChecksummedSize
136 : /* size of the slot data that is version dependent */
137 : #define ReplicationSlotOnDiskV2Size \
138 : sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
139 :
140 : #define SLOT_MAGIC 0x1051CA1 /* format identifier */
141 : #define SLOT_VERSION 5 /* version for new files */
142 :
143 : /* Control array for replication slot management */
144 : ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
145 :
146 : /* My backend's replication slot in the shared memory array */
147 : ReplicationSlot *MyReplicationSlot = NULL;
148 :
149 : /* GUC variables */
150 : int max_replication_slots = 10; /* the maximum number of replication
151 : * slots */
152 :
153 : /*
154 : * Invalidate replication slots that have remained idle longer than this
155 : * duration; '0' disables it.
156 : */
157 : int idle_replication_slot_timeout_mins = 0;
158 :
159 : /*
160 : * This GUC lists streaming replication standby server slot names that
161 : * logical WAL sender processes will wait for.
162 : */
163 : char *synchronized_standby_slots;
164 :
165 : /* This is the parsed and cached configuration for synchronized_standby_slots */
166 : static SyncStandbySlotsConfigData *synchronized_standby_slots_config;
167 :
168 : /*
169 : * Oldest LSN that has been confirmed to be flushed to the standbys
170 : * corresponding to the physical slots specified in the synchronized_standby_slots GUC.
171 : */
172 : static XLogRecPtr ss_oldest_flush_lsn = InvalidXLogRecPtr;
173 :
174 : static void ReplicationSlotShmemExit(int code, Datum arg);
175 : static void ReplicationSlotDropPtr(ReplicationSlot *slot);
176 :
177 : /* internal persistency functions */
178 : static void RestoreSlotFromDisk(const char *name);
179 : static void CreateSlotOnDisk(ReplicationSlot *slot);
180 : static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel);
181 :
182 : /*
183 : * Report shared-memory space needed by ReplicationSlotsShmemInit.
184 : */
185 : Size
186 8204 : ReplicationSlotsShmemSize(void)
187 : {
188 8204 : Size size = 0;
189 :
190 8204 : if (max_replication_slots == 0)
191 0 : return size;
192 :
193 8204 : size = offsetof(ReplicationSlotCtlData, replication_slots);
194 8204 : size = add_size(size,
195 : mul_size(max_replication_slots, sizeof(ReplicationSlot)));
196 :
197 8204 : return size;
198 : }
199 :
200 : /*
201 : * Allocate and initialize shared memory for replication slots.
202 : */
203 : void
204 2126 : ReplicationSlotsShmemInit(void)
205 : {
206 : bool found;
207 :
208 2126 : if (max_replication_slots == 0)
209 0 : return;
210 :
211 2126 : ReplicationSlotCtl = (ReplicationSlotCtlData *)
212 2126 : ShmemInitStruct("ReplicationSlot Ctl", ReplicationSlotsShmemSize(),
213 : &found);
214 :
215 2126 : if (!found)
216 : {
217 : int i;
218 :
219 : /* First time through, so initialize */
220 3926 : MemSet(ReplicationSlotCtl, 0, ReplicationSlotsShmemSize());
221 :
222 22992 : for (i = 0; i < max_replication_slots; i++)
223 : {
224 20866 : ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i];
225 :
226 : /* everything else is zeroed by the memset above */
227 20866 : SpinLockInit(&slot->mutex);
228 20866 : LWLockInitialize(&slot->io_in_progress_lock,
229 : LWTRANCHE_REPLICATION_SLOT_IO);
230 20866 : ConditionVariableInit(&slot->active_cv);
231 : }
232 : }
233 : }
234 :
235 : /*
236 : * Register the callback for replication slot cleanup and releasing.
237 : */
238 : void
239 41876 : ReplicationSlotInitialize(void)
240 : {
241 41876 : before_shmem_exit(ReplicationSlotShmemExit, 0);
242 41876 : }
243 :
244 : /*
245 : * Release and cleanup replication slots.
246 : */
247 : static void
248 41876 : ReplicationSlotShmemExit(int code, Datum arg)
249 : {
250 : /* Make sure active replication slots are released */
251 41876 : if (MyReplicationSlot != NULL)
252 454 : ReplicationSlotRelease();
253 :
254 : /* Also cleanup all the temporary slots. */
255 41876 : ReplicationSlotCleanup(false);
256 41876 : }
257 :
258 : /*
259 : * Check whether the passed slot name is valid and report errors at elevel.
260 : *
261 : * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow
262 : * the name to be used as a directory name on every supported OS.
263 : *
264 : * Returns whether the directory name is valid or not if elevel < ERROR.
265 : */
266 : bool
267 1742 : ReplicationSlotValidateName(const char *name, int elevel)
268 : {
269 : const char *cp;
270 :
271 1742 : if (strlen(name) == 0)
272 : {
273 6 : ereport(elevel,
274 : (errcode(ERRCODE_INVALID_NAME),
275 : errmsg("replication slot name \"%s\" is too short",
276 : name)));
277 0 : return false;
278 : }
279 :
280 1736 : if (strlen(name) >= NAMEDATALEN)
281 : {
282 0 : ereport(elevel,
283 : (errcode(ERRCODE_NAME_TOO_LONG),
284 : errmsg("replication slot name \"%s\" is too long",
285 : name)));
286 0 : return false;
287 : }
288 :
289 35178 : for (cp = name; *cp; cp++)
290 : {
291 33444 : if (!((*cp >= 'a' && *cp <= 'z')
292 16764 : || (*cp >= '0' && *cp <= '9')
293 3240 : || (*cp == '_')))
294 : {
295 2 : ereport(elevel,
296 : (errcode(ERRCODE_INVALID_NAME),
297 : errmsg("replication slot name \"%s\" contains invalid character",
298 : name),
299 : errhint("Replication slot names may only contain lower case letters, numbers, and the underscore character.")));
300 0 : return false;
301 : }
302 : }
303 1734 : return true;
304 : }
305 :
306 : /*
307 : * Create a new replication slot and mark it as used by this backend.
308 : *
309 : * name: Name of the slot
310 : * db_specific: logical decoding is db specific; if the slot is going to
311 : * be used for that pass true, otherwise false.
312 : * two_phase: Allows decoding of prepared transactions. We allow this option
313 : * to be enabled only at the slot creation time. If we allow this option
314 : * to be changed during decoding then it is quite possible that we skip
315 : * prepare first time because this option was not enabled. Now next time
316 : * during getting changes, if the two_phase option is enabled it can skip
317 : * prepare because by that time start decoding point has been moved. So the
318 : * user will only get commit prepared.
319 : * failover: If enabled, allows the slot to be synced to standbys so
320 : * that logical replication can be resumed after failover.
321 : * synced: True if the slot is synchronized from the primary server.
322 : */
323 : void
324 1270 : ReplicationSlotCreate(const char *name, bool db_specific,
325 : ReplicationSlotPersistency persistency,
326 : bool two_phase, bool failover, bool synced)
327 : {
328 1270 : ReplicationSlot *slot = NULL;
329 : int i;
330 :
331 : Assert(MyReplicationSlot == NULL);
332 :
333 1270 : ReplicationSlotValidateName(name, ERROR);
334 :
335 1268 : if (failover)
336 : {
337 : /*
338 : * Do not allow users to create the failover enabled slots on the
339 : * standby as we do not support sync to the cascading standby.
340 : *
341 : * However, failover enabled slots can be created during slot
342 : * synchronization because we need to retain the same values as the
343 : * remote slot.
344 : */
345 44 : if (RecoveryInProgress() && !IsSyncingReplicationSlots())
346 0 : ereport(ERROR,
347 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
348 : errmsg("cannot enable failover for a replication slot created on the standby"));
349 :
350 : /*
351 : * Do not allow users to create failover enabled temporary slots,
352 : * because temporary slots will not be synced to the standby.
353 : *
354 : * However, failover enabled temporary slots can be created during
355 : * slot synchronization. See the comments atop slotsync.c for details.
356 : */
357 44 : if (persistency == RS_TEMPORARY && !IsSyncingReplicationSlots())
358 2 : ereport(ERROR,
359 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
360 : errmsg("cannot enable failover for a temporary replication slot"));
361 : }
362 :
363 : /*
364 : * If some other backend ran this code concurrently with us, we'd likely
365 : * both allocate the same slot, and that would be bad. We'd also be at
366 : * risk of missing a name collision. Also, we don't want to try to create
367 : * a new slot while somebody's busy cleaning up an old one, because we
368 : * might both be monkeying with the same directory.
369 : */
370 1266 : LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
371 :
372 : /*
373 : * Check for name collision, and identify an allocatable slot. We need to
374 : * hold ReplicationSlotControlLock in shared mode for this, so that nobody
375 : * else can change the in_use flags while we're looking at them.
376 : */
377 1266 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
378 12120 : for (i = 0; i < max_replication_slots; i++)
379 : {
380 10860 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
381 :
382 10860 : if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
383 6 : ereport(ERROR,
384 : (errcode(ERRCODE_DUPLICATE_OBJECT),
385 : errmsg("replication slot \"%s\" already exists", name)));
386 10854 : if (!s->in_use && slot == NULL)
387 1258 : slot = s;
388 : }
389 1260 : LWLockRelease(ReplicationSlotControlLock);
390 :
391 : /* If all slots are in use, we're out of luck. */
392 1260 : if (slot == NULL)
393 2 : ereport(ERROR,
394 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
395 : errmsg("all replication slots are in use"),
396 : errhint("Free one or increase \"max_replication_slots\".")));
397 :
398 : /*
399 : * Since this slot is not in use, nobody should be looking at any part of
400 : * it other than the in_use field unless they're trying to allocate it.
401 : * And since we hold ReplicationSlotAllocationLock, nobody except us can
402 : * be doing that. So it's safe to initialize the slot.
403 : */
404 : Assert(!slot->in_use);
405 : Assert(slot->active_pid == 0);
406 :
407 : /* first initialize persistent data */
408 1258 : memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData));
409 1258 : namestrcpy(&slot->data.name, name);
410 1258 : slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
411 1258 : slot->data.persistency = persistency;
412 1258 : slot->data.two_phase = two_phase;
413 1258 : slot->data.two_phase_at = InvalidXLogRecPtr;
414 1258 : slot->data.failover = failover;
415 1258 : slot->data.synced = synced;
416 :
417 : /* and then data only present in shared memory */
418 1258 : slot->just_dirtied = false;
419 1258 : slot->dirty = false;
420 1258 : slot->effective_xmin = InvalidTransactionId;
421 1258 : slot->effective_catalog_xmin = InvalidTransactionId;
422 1258 : slot->candidate_catalog_xmin = InvalidTransactionId;
423 1258 : slot->candidate_xmin_lsn = InvalidXLogRecPtr;
424 1258 : slot->candidate_restart_valid = InvalidXLogRecPtr;
425 1258 : slot->candidate_restart_lsn = InvalidXLogRecPtr;
426 1258 : slot->last_saved_confirmed_flush = InvalidXLogRecPtr;
427 1258 : slot->last_saved_restart_lsn = InvalidXLogRecPtr;
428 1258 : slot->inactive_since = 0;
429 :
430 : /*
431 : * Create the slot on disk. We haven't actually marked the slot allocated
432 : * yet, so no special cleanup is required if this errors out.
433 : */
434 1258 : CreateSlotOnDisk(slot);
435 :
436 : /*
437 : * We need to briefly prevent any other backend from iterating over the
438 : * slots while we flip the in_use flag. We also need to set the active
439 : * flag while holding the ControlLock as otherwise a concurrent
440 : * ReplicationSlotAcquire() could acquire the slot as well.
441 : */
442 1258 : LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
443 :
444 1258 : slot->in_use = true;
445 :
446 : /* We can now mark the slot active, and that makes it our slot. */
447 1258 : SpinLockAcquire(&slot->mutex);
448 : Assert(slot->active_pid == 0);
449 1258 : slot->active_pid = MyProcPid;
450 1258 : SpinLockRelease(&slot->mutex);
451 1258 : MyReplicationSlot = slot;
452 :
453 1258 : LWLockRelease(ReplicationSlotControlLock);
454 :
455 : /*
456 : * Create statistics entry for the new logical slot. We don't collect any
457 : * stats for physical slots, so no need to create an entry for the same.
458 : * See ReplicationSlotDropPtr for why we need to do this before releasing
459 : * ReplicationSlotAllocationLock.
460 : */
461 1258 : if (SlotIsLogical(slot))
462 914 : pgstat_create_replslot(slot);
463 :
464 : /*
465 : * Now that the slot has been marked as in_use and active, it's safe to
466 : * let somebody else try to allocate a slot.
467 : */
468 1258 : LWLockRelease(ReplicationSlotAllocationLock);
469 :
470 : /* Let everybody know we've modified this slot */
471 1258 : ConditionVariableBroadcast(&slot->active_cv);
472 1258 : }
473 :
474 : /*
475 : * Search for the named replication slot.
476 : *
477 : * Return the replication slot if found, otherwise NULL.
478 : */
479 : ReplicationSlot *
480 2794 : SearchNamedReplicationSlot(const char *name, bool need_lock)
481 : {
482 : int i;
483 2794 : ReplicationSlot *slot = NULL;
484 :
485 2794 : if (need_lock)
486 184 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
487 :
488 4896 : for (i = 0; i < max_replication_slots; i++)
489 : {
490 4854 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
491 :
492 4854 : if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
493 : {
494 2752 : slot = s;
495 2752 : break;
496 : }
497 : }
498 :
499 2794 : if (need_lock)
500 184 : LWLockRelease(ReplicationSlotControlLock);
501 :
502 2794 : return slot;
503 : }
504 :
505 : /*
506 : * Return the index of the replication slot in
507 : * ReplicationSlotCtl->replication_slots.
508 : *
509 : * This is mainly useful to have an efficient key for storing replication slot
510 : * stats.
511 : */
512 : int
513 16236 : ReplicationSlotIndex(ReplicationSlot *slot)
514 : {
515 : Assert(slot >= ReplicationSlotCtl->replication_slots &&
516 : slot < ReplicationSlotCtl->replication_slots + max_replication_slots);
517 :
518 16236 : return slot - ReplicationSlotCtl->replication_slots;
519 : }
520 :
521 : /*
522 : * If the slot at 'index' is unused, return false. Otherwise 'name' is set to
523 : * the slot's name and true is returned.
524 : *
525 : * This likely is only useful for pgstat_replslot.c during shutdown, in other
526 : * cases there are obvious TOCTOU issues.
527 : */
528 : bool
529 156 : ReplicationSlotName(int index, Name name)
530 : {
531 : ReplicationSlot *slot;
532 : bool found;
533 :
534 156 : slot = &ReplicationSlotCtl->replication_slots[index];
535 :
536 : /*
537 : * Ensure that the slot cannot be dropped while we copy the name. Don't
538 : * need the spinlock as the name of an existing slot cannot change.
539 : */
540 156 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
541 156 : found = slot->in_use;
542 156 : if (slot->in_use)
543 156 : namestrcpy(name, NameStr(slot->data.name));
544 156 : LWLockRelease(ReplicationSlotControlLock);
545 :
546 156 : return found;
547 : }
548 :
549 : /*
550 : * Find a previously created slot and mark it as used by this process.
551 : *
552 : * An error is raised if nowait is true and the slot is currently in use. If
553 : * nowait is false, we sleep until the slot is released by the owning process.
554 : *
555 : * An error is raised if error_if_invalid is true and the slot is found to
556 : * be invalid. It should always be set to true, except when we are temporarily
557 : * acquiring the slot and don't intend to change it.
558 : */
559 : void
560 2462 : ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
561 : {
562 : ReplicationSlot *s;
563 : int active_pid;
564 :
565 : Assert(name != NULL);
566 :
567 2462 : retry:
568 : Assert(MyReplicationSlot == NULL);
569 :
570 2462 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
571 :
572 : /* Check if the slot exits with the given name. */
573 2462 : s = SearchNamedReplicationSlot(name, false);
574 2462 : if (s == NULL || !s->in_use)
575 : {
576 18 : LWLockRelease(ReplicationSlotControlLock);
577 :
578 18 : ereport(ERROR,
579 : (errcode(ERRCODE_UNDEFINED_OBJECT),
580 : errmsg("replication slot \"%s\" does not exist",
581 : name)));
582 : }
583 :
584 : /*
585 : * This is the slot we want; check if it's active under some other
586 : * process. In single user mode, we don't need this check.
587 : */
588 2444 : if (IsUnderPostmaster)
589 : {
590 : /*
591 : * Get ready to sleep on the slot in case it is active. (We may end
592 : * up not sleeping, but we don't want to do this while holding the
593 : * spinlock.)
594 : */
595 2444 : if (!nowait)
596 528 : ConditionVariablePrepareToSleep(&s->active_cv);
597 :
598 : /*
599 : * It is important to reset the inactive_since under spinlock here to
600 : * avoid race conditions with slot invalidation. See comments related
601 : * to inactive_since in InvalidatePossiblyObsoleteSlot.
602 : */
603 2444 : SpinLockAcquire(&s->mutex);
604 2444 : if (s->active_pid == 0)
605 2170 : s->active_pid = MyProcPid;
606 2444 : active_pid = s->active_pid;
607 2444 : ReplicationSlotSetInactiveSince(s, 0, false);
608 2444 : SpinLockRelease(&s->mutex);
609 : }
610 : else
611 : {
612 0 : active_pid = MyProcPid;
613 0 : ReplicationSlotSetInactiveSince(s, 0, true);
614 : }
615 2444 : LWLockRelease(ReplicationSlotControlLock);
616 :
617 : /*
618 : * If we found the slot but it's already active in another process, we
619 : * wait until the owning process signals us that it's been released, or
620 : * error out.
621 : */
622 2444 : if (active_pid != MyProcPid)
623 : {
624 0 : if (!nowait)
625 : {
626 : /* Wait here until we get signaled, and then restart */
627 0 : ConditionVariableSleep(&s->active_cv,
628 : WAIT_EVENT_REPLICATION_SLOT_DROP);
629 0 : ConditionVariableCancelSleep();
630 0 : goto retry;
631 : }
632 :
633 0 : ereport(ERROR,
634 : (errcode(ERRCODE_OBJECT_IN_USE),
635 : errmsg("replication slot \"%s\" is active for PID %d",
636 : NameStr(s->data.name), active_pid)));
637 : }
638 2444 : else if (!nowait)
639 528 : ConditionVariableCancelSleep(); /* no sleep needed after all */
640 :
641 : /* We made this slot active, so it's ours now. */
642 2444 : MyReplicationSlot = s;
643 :
644 : /*
645 : * We need to check for invalidation after making the slot ours to avoid
646 : * the possible race condition with the checkpointer that can otherwise
647 : * invalidate the slot immediately after the check.
648 : */
649 2444 : if (error_if_invalid && s->data.invalidated != RS_INVAL_NONE)
650 14 : ereport(ERROR,
651 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
652 : errmsg("can no longer access replication slot \"%s\"",
653 : NameStr(s->data.name)),
654 : errdetail("This replication slot has been invalidated due to \"%s\".",
655 : GetSlotInvalidationCauseName(s->data.invalidated)));
656 :
657 : /* Let everybody know we've modified this slot */
658 2430 : ConditionVariableBroadcast(&s->active_cv);
659 :
660 : /*
661 : * The call to pgstat_acquire_replslot() protects against stats for a
662 : * different slot, from before a restart or such, being present during
663 : * pgstat_report_replslot().
664 : */
665 2430 : if (SlotIsLogical(s))
666 2034 : pgstat_acquire_replslot(s);
667 :
668 :
669 2430 : if (am_walsender)
670 : {
671 1646 : ereport(log_replication_commands ? LOG : DEBUG1,
672 : SlotIsLogical(s)
673 : ? errmsg("acquired logical replication slot \"%s\"",
674 : NameStr(s->data.name))
675 : : errmsg("acquired physical replication slot \"%s\"",
676 : NameStr(s->data.name)));
677 : }
678 2430 : }
679 :
680 : /*
681 : * Release the replication slot that this backend considers to own.
682 : *
683 : * This or another backend can re-acquire the slot later.
684 : * Resources this slot requires will be preserved.
685 : */
686 : void
687 2948 : ReplicationSlotRelease(void)
688 : {
689 2948 : ReplicationSlot *slot = MyReplicationSlot;
690 2948 : char *slotname = NULL; /* keep compiler quiet */
691 2948 : bool is_logical = false; /* keep compiler quiet */
692 2948 : TimestampTz now = 0;
693 :
694 : Assert(slot != NULL && slot->active_pid != 0);
695 :
696 2948 : if (am_walsender)
697 : {
698 2060 : slotname = pstrdup(NameStr(slot->data.name));
699 2060 : is_logical = SlotIsLogical(slot);
700 : }
701 :
702 2948 : if (slot->data.persistency == RS_EPHEMERAL)
703 : {
704 : /*
705 : * Delete the slot. There is no !PANIC case where this is allowed to
706 : * fail, all that may happen is an incomplete cleanup of the on-disk
707 : * data.
708 : */
709 10 : ReplicationSlotDropAcquired();
710 : }
711 :
712 : /*
713 : * If slot needed to temporarily restrain both data and catalog xmin to
714 : * create the catalog snapshot, remove that temporary constraint.
715 : * Snapshots can only be exported while the initial snapshot is still
716 : * acquired.
717 : */
718 2948 : if (!TransactionIdIsValid(slot->data.xmin) &&
719 2896 : TransactionIdIsValid(slot->effective_xmin))
720 : {
721 390 : SpinLockAcquire(&slot->mutex);
722 390 : slot->effective_xmin = InvalidTransactionId;
723 390 : SpinLockRelease(&slot->mutex);
724 390 : ReplicationSlotsComputeRequiredXmin(false);
725 : }
726 :
727 : /*
728 : * Set the time since the slot has become inactive. We get the current
729 : * time beforehand to avoid system call while holding the spinlock.
730 : */
731 2948 : now = GetCurrentTimestamp();
732 :
733 2948 : if (slot->data.persistency == RS_PERSISTENT)
734 : {
735 : /*
736 : * Mark persistent slot inactive. We're not freeing it, just
737 : * disconnecting, but wake up others that may be waiting for it.
738 : */
739 2386 : SpinLockAcquire(&slot->mutex);
740 2386 : slot->active_pid = 0;
741 2386 : ReplicationSlotSetInactiveSince(slot, now, false);
742 2386 : SpinLockRelease(&slot->mutex);
743 2386 : ConditionVariableBroadcast(&slot->active_cv);
744 : }
745 : else
746 562 : ReplicationSlotSetInactiveSince(slot, now, true);
747 :
748 2948 : MyReplicationSlot = NULL;
749 :
750 : /* might not have been set when we've been a plain slot */
751 2948 : LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
752 2948 : MyProc->statusFlags &= ~PROC_IN_LOGICAL_DECODING;
753 2948 : ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags;
754 2948 : LWLockRelease(ProcArrayLock);
755 :
756 2948 : if (am_walsender)
757 : {
758 2060 : ereport(log_replication_commands ? LOG : DEBUG1,
759 : is_logical
760 : ? errmsg("released logical replication slot \"%s\"",
761 : slotname)
762 : : errmsg("released physical replication slot \"%s\"",
763 : slotname));
764 :
765 2060 : pfree(slotname);
766 : }
767 2948 : }
768 :
769 : /*
770 : * Cleanup temporary slots created in current session.
771 : *
772 : * Cleanup only synced temporary slots if 'synced_only' is true, else
773 : * cleanup all temporary slots.
774 : */
775 : void
776 85200 : ReplicationSlotCleanup(bool synced_only)
777 : {
778 : int i;
779 :
780 : Assert(MyReplicationSlot == NULL);
781 :
782 85478 : restart:
783 85478 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
784 926206 : for (i = 0; i < max_replication_slots; i++)
785 : {
786 841006 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
787 :
788 841006 : if (!s->in_use)
789 814504 : continue;
790 :
791 26502 : SpinLockAcquire(&s->mutex);
792 26502 : if ((s->active_pid == MyProcPid &&
793 278 : (!synced_only || s->data.synced)))
794 : {
795 : Assert(s->data.persistency == RS_TEMPORARY);
796 278 : SpinLockRelease(&s->mutex);
797 278 : LWLockRelease(ReplicationSlotControlLock); /* avoid deadlock */
798 :
799 278 : ReplicationSlotDropPtr(s);
800 :
801 278 : ConditionVariableBroadcast(&s->active_cv);
802 278 : goto restart;
803 : }
804 : else
805 26224 : SpinLockRelease(&s->mutex);
806 : }
807 :
808 85200 : LWLockRelease(ReplicationSlotControlLock);
809 85200 : }
810 :
811 : /*
812 : * Permanently drop replication slot identified by the passed in name.
813 : */
814 : void
815 794 : ReplicationSlotDrop(const char *name, bool nowait)
816 : {
817 : Assert(MyReplicationSlot == NULL);
818 :
819 794 : ReplicationSlotAcquire(name, nowait, false);
820 :
821 : /*
822 : * Do not allow users to drop the slots which are currently being synced
823 : * from the primary to the standby.
824 : */
825 780 : if (RecoveryInProgress() && MyReplicationSlot->data.synced)
826 2 : ereport(ERROR,
827 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
828 : errmsg("cannot drop replication slot \"%s\"", name),
829 : errdetail("This replication slot is being synchronized from the primary server."));
830 :
831 778 : ReplicationSlotDropAcquired();
832 778 : }
833 :
834 : /*
835 : * Change the definition of the slot identified by the specified name.
836 : */
837 : void
838 12 : ReplicationSlotAlter(const char *name, const bool *failover,
839 : const bool *two_phase)
840 : {
841 12 : bool update_slot = false;
842 :
843 : Assert(MyReplicationSlot == NULL);
844 : Assert(failover || two_phase);
845 :
846 12 : ReplicationSlotAcquire(name, false, true);
847 :
848 10 : if (SlotIsPhysical(MyReplicationSlot))
849 0 : ereport(ERROR,
850 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
851 : errmsg("cannot use %s with a physical replication slot",
852 : "ALTER_REPLICATION_SLOT"));
853 :
854 10 : if (RecoveryInProgress())
855 : {
856 : /*
857 : * Do not allow users to alter the slots which are currently being
858 : * synced from the primary to the standby.
859 : */
860 2 : if (MyReplicationSlot->data.synced)
861 2 : ereport(ERROR,
862 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
863 : errmsg("cannot alter replication slot \"%s\"", name),
864 : errdetail("This replication slot is being synchronized from the primary server."));
865 :
866 : /*
867 : * Do not allow users to enable failover on the standby as we do not
868 : * support sync to the cascading standby.
869 : */
870 0 : if (failover && *failover)
871 0 : ereport(ERROR,
872 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
873 : errmsg("cannot enable failover for a replication slot"
874 : " on the standby"));
875 : }
876 :
877 8 : if (failover)
878 : {
879 : /*
880 : * Do not allow users to enable failover for temporary slots as we do
881 : * not support syncing temporary slots to the standby.
882 : */
883 6 : if (*failover && MyReplicationSlot->data.persistency == RS_TEMPORARY)
884 0 : ereport(ERROR,
885 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
886 : errmsg("cannot enable failover for a temporary replication slot"));
887 :
888 6 : if (MyReplicationSlot->data.failover != *failover)
889 : {
890 6 : SpinLockAcquire(&MyReplicationSlot->mutex);
891 6 : MyReplicationSlot->data.failover = *failover;
892 6 : SpinLockRelease(&MyReplicationSlot->mutex);
893 :
894 6 : update_slot = true;
895 : }
896 : }
897 :
898 8 : if (two_phase && MyReplicationSlot->data.two_phase != *two_phase)
899 : {
900 2 : SpinLockAcquire(&MyReplicationSlot->mutex);
901 2 : MyReplicationSlot->data.two_phase = *two_phase;
902 2 : SpinLockRelease(&MyReplicationSlot->mutex);
903 :
904 2 : update_slot = true;
905 : }
906 :
907 8 : if (update_slot)
908 : {
909 8 : ReplicationSlotMarkDirty();
910 8 : ReplicationSlotSave();
911 : }
912 :
913 8 : ReplicationSlotRelease();
914 8 : }
915 :
916 : /*
917 : * Permanently drop the currently acquired replication slot.
918 : */
919 : void
920 802 : ReplicationSlotDropAcquired(void)
921 : {
922 802 : ReplicationSlot *slot = MyReplicationSlot;
923 :
924 : Assert(MyReplicationSlot != NULL);
925 :
926 : /* slot isn't acquired anymore */
927 802 : MyReplicationSlot = NULL;
928 :
929 802 : ReplicationSlotDropPtr(slot);
930 802 : }
931 :
932 : /*
933 : * Permanently drop the replication slot which will be released by the point
934 : * this function returns.
935 : */
936 : static void
937 1080 : ReplicationSlotDropPtr(ReplicationSlot *slot)
938 : {
939 : char path[MAXPGPATH];
940 : char tmppath[MAXPGPATH];
941 :
942 : /*
943 : * If some other backend ran this code concurrently with us, we might try
944 : * to delete a slot with a certain name while someone else was trying to
945 : * create a slot with the same name.
946 : */
947 1080 : LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
948 :
949 : /* Generate pathnames. */
950 1080 : sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(slot->data.name));
951 1080 : sprintf(tmppath, "%s/%s.tmp", PG_REPLSLOT_DIR, NameStr(slot->data.name));
952 :
953 : /*
954 : * Rename the slot directory on disk, so that we'll no longer recognize
955 : * this as a valid slot. Note that if this fails, we've got to mark the
956 : * slot inactive before bailing out. If we're dropping an ephemeral or a
957 : * temporary slot, we better never fail hard as the caller won't expect
958 : * the slot to survive and this might get called during error handling.
959 : */
960 1080 : if (rename(path, tmppath) == 0)
961 : {
962 : /*
963 : * We need to fsync() the directory we just renamed and its parent to
964 : * make sure that our changes are on disk in a crash-safe fashion. If
965 : * fsync() fails, we can't be sure whether the changes are on disk or
966 : * not. For now, we handle that by panicking;
967 : * StartupReplicationSlots() will try to straighten it out after
968 : * restart.
969 : */
970 1080 : START_CRIT_SECTION();
971 1080 : fsync_fname(tmppath, true);
972 1080 : fsync_fname(PG_REPLSLOT_DIR, true);
973 1080 : END_CRIT_SECTION();
974 : }
975 : else
976 : {
977 0 : bool fail_softly = slot->data.persistency != RS_PERSISTENT;
978 :
979 0 : SpinLockAcquire(&slot->mutex);
980 0 : slot->active_pid = 0;
981 0 : SpinLockRelease(&slot->mutex);
982 :
983 : /* wake up anyone waiting on this slot */
984 0 : ConditionVariableBroadcast(&slot->active_cv);
985 :
986 0 : ereport(fail_softly ? WARNING : ERROR,
987 : (errcode_for_file_access(),
988 : errmsg("could not rename file \"%s\" to \"%s\": %m",
989 : path, tmppath)));
990 : }
991 :
992 : /*
993 : * The slot is definitely gone. Lock out concurrent scans of the array
994 : * long enough to kill it. It's OK to clear the active PID here without
995 : * grabbing the mutex because nobody else can be scanning the array here,
996 : * and nobody can be attached to this slot and thus access it without
997 : * scanning the array.
998 : *
999 : * Also wake up processes waiting for it.
1000 : */
1001 1080 : LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
1002 1080 : slot->active_pid = 0;
1003 1080 : slot->in_use = false;
1004 1080 : LWLockRelease(ReplicationSlotControlLock);
1005 1080 : ConditionVariableBroadcast(&slot->active_cv);
1006 :
1007 : /*
1008 : * Slot is dead and doesn't prevent resource removal anymore, recompute
1009 : * limits.
1010 : */
1011 1080 : ReplicationSlotsComputeRequiredXmin(false);
1012 1080 : ReplicationSlotsComputeRequiredLSN();
1013 :
1014 : /*
1015 : * If removing the directory fails, the worst thing that will happen is
1016 : * that the user won't be able to create a new slot with the same name
1017 : * until the next server restart. We warn about it, but that's all.
1018 : */
1019 1080 : if (!rmtree(tmppath, true))
1020 0 : ereport(WARNING,
1021 : (errmsg("could not remove directory \"%s\"", tmppath)));
1022 :
1023 : /*
1024 : * Drop the statistics entry for the replication slot. Do this while
1025 : * holding ReplicationSlotAllocationLock so that we don't drop a
1026 : * statistics entry for another slot with the same name just created in
1027 : * another session.
1028 : */
1029 1080 : if (SlotIsLogical(slot))
1030 786 : pgstat_drop_replslot(slot);
1031 :
1032 : /*
1033 : * We release this at the very end, so that nobody starts trying to create
1034 : * a slot while we're still cleaning up the detritus of the old one.
1035 : */
1036 1080 : LWLockRelease(ReplicationSlotAllocationLock);
1037 1080 : }
1038 :
1039 : /*
1040 : * Serialize the currently acquired slot's state from memory to disk, thereby
1041 : * guaranteeing the current state will survive a crash.
1042 : */
1043 : void
1044 2648 : ReplicationSlotSave(void)
1045 : {
1046 : char path[MAXPGPATH];
1047 :
1048 : Assert(MyReplicationSlot != NULL);
1049 :
1050 2648 : sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(MyReplicationSlot->data.name));
1051 2648 : SaveSlotToPath(MyReplicationSlot, path, ERROR);
1052 2648 : }
1053 :
1054 : /*
1055 : * Signal that it would be useful if the currently acquired slot would be
1056 : * flushed out to disk.
1057 : *
1058 : * Note that the actual flush to disk can be delayed for a long time, if
1059 : * required for correctness explicitly do a ReplicationSlotSave().
1060 : */
1061 : void
1062 61970 : ReplicationSlotMarkDirty(void)
1063 : {
1064 61970 : ReplicationSlot *slot = MyReplicationSlot;
1065 :
1066 : Assert(MyReplicationSlot != NULL);
1067 :
1068 61970 : SpinLockAcquire(&slot->mutex);
1069 61970 : MyReplicationSlot->just_dirtied = true;
1070 61970 : MyReplicationSlot->dirty = true;
1071 61970 : SpinLockRelease(&slot->mutex);
1072 61970 : }
1073 :
1074 : /*
1075 : * Convert a slot that's marked as RS_EPHEMERAL or RS_TEMPORARY to a
1076 : * RS_PERSISTENT slot, guaranteeing it will be there after an eventual crash.
1077 : */
1078 : void
1079 888 : ReplicationSlotPersist(void)
1080 : {
1081 888 : ReplicationSlot *slot = MyReplicationSlot;
1082 :
1083 : Assert(slot != NULL);
1084 : Assert(slot->data.persistency != RS_PERSISTENT);
1085 :
1086 888 : SpinLockAcquire(&slot->mutex);
1087 888 : slot->data.persistency = RS_PERSISTENT;
1088 888 : SpinLockRelease(&slot->mutex);
1089 :
1090 888 : ReplicationSlotMarkDirty();
1091 888 : ReplicationSlotSave();
1092 888 : }
1093 :
1094 : /*
1095 : * Compute the oldest xmin across all slots and store it in the ProcArray.
1096 : *
1097 : * If already_locked is true, ProcArrayLock has already been acquired
1098 : * exclusively.
1099 : */
1100 : void
1101 4538 : ReplicationSlotsComputeRequiredXmin(bool already_locked)
1102 : {
1103 : int i;
1104 4538 : TransactionId agg_xmin = InvalidTransactionId;
1105 4538 : TransactionId agg_catalog_xmin = InvalidTransactionId;
1106 :
1107 : Assert(ReplicationSlotCtl != NULL);
1108 :
1109 4538 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1110 :
1111 45776 : for (i = 0; i < max_replication_slots; i++)
1112 : {
1113 41238 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
1114 : TransactionId effective_xmin;
1115 : TransactionId effective_catalog_xmin;
1116 : bool invalidated;
1117 :
1118 41238 : if (!s->in_use)
1119 36970 : continue;
1120 :
1121 4268 : SpinLockAcquire(&s->mutex);
1122 4268 : effective_xmin = s->effective_xmin;
1123 4268 : effective_catalog_xmin = s->effective_catalog_xmin;
1124 4268 : invalidated = s->data.invalidated != RS_INVAL_NONE;
1125 4268 : SpinLockRelease(&s->mutex);
1126 :
1127 : /* invalidated slots need not apply */
1128 4268 : if (invalidated)
1129 44 : continue;
1130 :
1131 : /* check the data xmin */
1132 4224 : if (TransactionIdIsValid(effective_xmin) &&
1133 34 : (!TransactionIdIsValid(agg_xmin) ||
1134 34 : TransactionIdPrecedes(effective_xmin, agg_xmin)))
1135 624 : agg_xmin = effective_xmin;
1136 :
1137 : /* check the catalog xmin */
1138 4224 : if (TransactionIdIsValid(effective_catalog_xmin) &&
1139 1846 : (!TransactionIdIsValid(agg_catalog_xmin) ||
1140 1846 : TransactionIdPrecedes(effective_catalog_xmin, agg_catalog_xmin)))
1141 2238 : agg_catalog_xmin = effective_catalog_xmin;
1142 : }
1143 :
1144 4538 : LWLockRelease(ReplicationSlotControlLock);
1145 :
1146 4538 : ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
1147 4538 : }
1148 :
1149 : /*
1150 : * Compute the oldest restart LSN across all slots and inform xlog module.
1151 : *
1152 : * Note: while max_slot_wal_keep_size is theoretically relevant for this
1153 : * purpose, we don't try to account for that, because this module doesn't
1154 : * know what to compare against.
1155 : */
1156 : void
1157 63264 : ReplicationSlotsComputeRequiredLSN(void)
1158 : {
1159 : int i;
1160 63264 : XLogRecPtr min_required = InvalidXLogRecPtr;
1161 :
1162 : Assert(ReplicationSlotCtl != NULL);
1163 :
1164 63264 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1165 687306 : for (i = 0; i < max_replication_slots; i++)
1166 : {
1167 624042 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
1168 : XLogRecPtr restart_lsn;
1169 : XLogRecPtr last_saved_restart_lsn;
1170 : bool invalidated;
1171 : ReplicationSlotPersistency persistency;
1172 :
1173 624042 : if (!s->in_use)
1174 561024 : continue;
1175 :
1176 63018 : SpinLockAcquire(&s->mutex);
1177 63018 : persistency = s->data.persistency;
1178 63018 : restart_lsn = s->data.restart_lsn;
1179 63018 : invalidated = s->data.invalidated != RS_INVAL_NONE;
1180 63018 : last_saved_restart_lsn = s->last_saved_restart_lsn;
1181 63018 : SpinLockRelease(&s->mutex);
1182 :
1183 : /* invalidated slots need not apply */
1184 63018 : if (invalidated)
1185 46 : continue;
1186 :
1187 : /*
1188 : * For persistent slot use last_saved_restart_lsn to compute the
1189 : * oldest LSN for removal of WAL segments. The segments between
1190 : * last_saved_restart_lsn and restart_lsn might be needed by a
1191 : * persistent slot in the case of database crash. Non-persistent
1192 : * slots can't survive the database crash, so we don't care about
1193 : * last_saved_restart_lsn for them.
1194 : */
1195 62972 : if (persistency == RS_PERSISTENT)
1196 : {
1197 61384 : if (last_saved_restart_lsn != InvalidXLogRecPtr &&
1198 : restart_lsn > last_saved_restart_lsn)
1199 : {
1200 57472 : restart_lsn = last_saved_restart_lsn;
1201 : }
1202 : }
1203 :
1204 62972 : if (restart_lsn != InvalidXLogRecPtr &&
1205 1894 : (min_required == InvalidXLogRecPtr ||
1206 : restart_lsn < min_required))
1207 61208 : min_required = restart_lsn;
1208 : }
1209 63264 : LWLockRelease(ReplicationSlotControlLock);
1210 :
1211 63264 : XLogSetReplicationSlotMinimumLSN(min_required);
1212 63264 : }
1213 :
1214 : /*
1215 : * Compute the oldest WAL LSN required by *logical* decoding slots..
1216 : *
1217 : * Returns InvalidXLogRecPtr if logical decoding is disabled or no logical
1218 : * slots exist.
1219 : *
1220 : * NB: this returns a value >= ReplicationSlotsComputeRequiredLSN(), since it
1221 : * ignores physical replication slots.
1222 : *
1223 : * The results aren't required frequently, so we don't maintain a precomputed
1224 : * value like we do for ComputeRequiredLSN() and ComputeRequiredXmin().
1225 : */
1226 : XLogRecPtr
1227 6700 : ReplicationSlotsComputeLogicalRestartLSN(void)
1228 : {
1229 6700 : XLogRecPtr result = InvalidXLogRecPtr;
1230 : int i;
1231 :
1232 6700 : if (max_replication_slots <= 0)
1233 0 : return InvalidXLogRecPtr;
1234 :
1235 6700 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1236 :
1237 72608 : for (i = 0; i < max_replication_slots; i++)
1238 : {
1239 : ReplicationSlot *s;
1240 : XLogRecPtr restart_lsn;
1241 : XLogRecPtr last_saved_restart_lsn;
1242 : bool invalidated;
1243 : ReplicationSlotPersistency persistency;
1244 :
1245 65908 : s = &ReplicationSlotCtl->replication_slots[i];
1246 :
1247 : /* cannot change while ReplicationSlotCtlLock is held */
1248 65908 : if (!s->in_use)
1249 64596 : continue;
1250 :
1251 : /* we're only interested in logical slots */
1252 1312 : if (!SlotIsLogical(s))
1253 964 : continue;
1254 :
1255 : /* read once, it's ok if it increases while we're checking */
1256 348 : SpinLockAcquire(&s->mutex);
1257 348 : persistency = s->data.persistency;
1258 348 : restart_lsn = s->data.restart_lsn;
1259 348 : invalidated = s->data.invalidated != RS_INVAL_NONE;
1260 348 : last_saved_restart_lsn = s->last_saved_restart_lsn;
1261 348 : SpinLockRelease(&s->mutex);
1262 :
1263 : /* invalidated slots need not apply */
1264 348 : if (invalidated)
1265 8 : continue;
1266 :
1267 : /*
1268 : * For persistent slot use last_saved_restart_lsn to compute the
1269 : * oldest LSN for removal of WAL segments. The segments between
1270 : * last_saved_restart_lsn and restart_lsn might be needed by a
1271 : * persistent slot in the case of database crash. Non-persistent
1272 : * slots can't survive the database crash, so we don't care about
1273 : * last_saved_restart_lsn for them.
1274 : */
1275 340 : if (persistency == RS_PERSISTENT)
1276 : {
1277 336 : if (last_saved_restart_lsn != InvalidXLogRecPtr &&
1278 : restart_lsn > last_saved_restart_lsn)
1279 : {
1280 0 : restart_lsn = last_saved_restart_lsn;
1281 : }
1282 : }
1283 :
1284 340 : if (restart_lsn == InvalidXLogRecPtr)
1285 0 : continue;
1286 :
1287 340 : if (result == InvalidXLogRecPtr ||
1288 : restart_lsn < result)
1289 272 : result = restart_lsn;
1290 : }
1291 :
1292 6700 : LWLockRelease(ReplicationSlotControlLock);
1293 :
1294 6700 : return result;
1295 : }
1296 :
1297 : /*
1298 : * ReplicationSlotsCountDBSlots -- count the number of slots that refer to the
1299 : * passed database oid.
1300 : *
1301 : * Returns true if there are any slots referencing the database. *nslots will
1302 : * be set to the absolute number of slots in the database, *nactive to ones
1303 : * currently active.
1304 : */
1305 : bool
1306 90 : ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
1307 : {
1308 : int i;
1309 :
1310 90 : *nslots = *nactive = 0;
1311 :
1312 90 : if (max_replication_slots <= 0)
1313 0 : return false;
1314 :
1315 90 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1316 932 : for (i = 0; i < max_replication_slots; i++)
1317 : {
1318 : ReplicationSlot *s;
1319 :
1320 842 : s = &ReplicationSlotCtl->replication_slots[i];
1321 :
1322 : /* cannot change while ReplicationSlotCtlLock is held */
1323 842 : if (!s->in_use)
1324 806 : continue;
1325 :
1326 : /* only logical slots are database specific, skip */
1327 36 : if (!SlotIsLogical(s))
1328 20 : continue;
1329 :
1330 : /* not our database, skip */
1331 16 : if (s->data.database != dboid)
1332 10 : continue;
1333 :
1334 : /* NB: intentionally counting invalidated slots */
1335 :
1336 : /* count slots with spinlock held */
1337 6 : SpinLockAcquire(&s->mutex);
1338 6 : (*nslots)++;
1339 6 : if (s->active_pid != 0)
1340 2 : (*nactive)++;
1341 6 : SpinLockRelease(&s->mutex);
1342 : }
1343 90 : LWLockRelease(ReplicationSlotControlLock);
1344 :
1345 90 : if (*nslots > 0)
1346 6 : return true;
1347 84 : return false;
1348 : }
1349 :
1350 : /*
1351 : * ReplicationSlotsDropDBSlots -- Drop all db-specific slots relating to the
1352 : * passed database oid. The caller should hold an exclusive lock on the
1353 : * pg_database oid for the database to prevent creation of new slots on the db
1354 : * or replay from existing slots.
1355 : *
1356 : * Another session that concurrently acquires an existing slot on the target DB
1357 : * (most likely to drop it) may cause this function to ERROR. If that happens
1358 : * it may have dropped some but not all slots.
1359 : *
1360 : * This routine isn't as efficient as it could be - but we don't drop
1361 : * databases often, especially databases with lots of slots.
1362 : */
1363 : void
1364 114 : ReplicationSlotsDropDBSlots(Oid dboid)
1365 : {
1366 : int i;
1367 :
1368 114 : if (max_replication_slots <= 0)
1369 0 : return;
1370 :
1371 114 : restart:
1372 124 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1373 1190 : for (i = 0; i < max_replication_slots; i++)
1374 : {
1375 : ReplicationSlot *s;
1376 : char *slotname;
1377 : int active_pid;
1378 :
1379 1076 : s = &ReplicationSlotCtl->replication_slots[i];
1380 :
1381 : /* cannot change while ReplicationSlotCtlLock is held */
1382 1076 : if (!s->in_use)
1383 1022 : continue;
1384 :
1385 : /* only logical slots are database specific, skip */
1386 54 : if (!SlotIsLogical(s))
1387 22 : continue;
1388 :
1389 : /* not our database, skip */
1390 32 : if (s->data.database != dboid)
1391 22 : continue;
1392 :
1393 : /* NB: intentionally including invalidated slots */
1394 :
1395 : /* acquire slot, so ReplicationSlotDropAcquired can be reused */
1396 10 : SpinLockAcquire(&s->mutex);
1397 : /* can't change while ReplicationSlotControlLock is held */
1398 10 : slotname = NameStr(s->data.name);
1399 10 : active_pid = s->active_pid;
1400 10 : if (active_pid == 0)
1401 : {
1402 10 : MyReplicationSlot = s;
1403 10 : s->active_pid = MyProcPid;
1404 : }
1405 10 : SpinLockRelease(&s->mutex);
1406 :
1407 : /*
1408 : * Even though we hold an exclusive lock on the database object a
1409 : * logical slot for that DB can still be active, e.g. if it's
1410 : * concurrently being dropped by a backend connected to another DB.
1411 : *
1412 : * That's fairly unlikely in practice, so we'll just bail out.
1413 : *
1414 : * The slot sync worker holds a shared lock on the database before
1415 : * operating on synced logical slots to avoid conflict with the drop
1416 : * happening here. The persistent synced slots are thus safe but there
1417 : * is a possibility that the slot sync worker has created a temporary
1418 : * slot (which stays active even on release) and we are trying to drop
1419 : * that here. In practice, the chances of hitting this scenario are
1420 : * less as during slot synchronization, the temporary slot is
1421 : * immediately converted to persistent and thus is safe due to the
1422 : * shared lock taken on the database. So, we'll just bail out in such
1423 : * a case.
1424 : *
1425 : * XXX: We can consider shutting down the slot sync worker before
1426 : * trying to drop synced temporary slots here.
1427 : */
1428 10 : if (active_pid)
1429 0 : ereport(ERROR,
1430 : (errcode(ERRCODE_OBJECT_IN_USE),
1431 : errmsg("replication slot \"%s\" is active for PID %d",
1432 : slotname, active_pid)));
1433 :
1434 : /*
1435 : * To avoid duplicating ReplicationSlotDropAcquired() and to avoid
1436 : * holding ReplicationSlotControlLock over filesystem operations,
1437 : * release ReplicationSlotControlLock and use
1438 : * ReplicationSlotDropAcquired.
1439 : *
1440 : * As that means the set of slots could change, restart scan from the
1441 : * beginning each time we release the lock.
1442 : */
1443 10 : LWLockRelease(ReplicationSlotControlLock);
1444 10 : ReplicationSlotDropAcquired();
1445 10 : goto restart;
1446 : }
1447 114 : LWLockRelease(ReplicationSlotControlLock);
1448 : }
1449 :
1450 :
1451 : /*
1452 : * Check whether the server's configuration supports using replication
1453 : * slots.
1454 : */
1455 : void
1456 3364 : CheckSlotRequirements(void)
1457 : {
1458 : /*
1459 : * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
1460 : * needs the same check.
1461 : */
1462 :
1463 3364 : if (max_replication_slots == 0)
1464 0 : ereport(ERROR,
1465 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1466 : errmsg("replication slots can only be used if \"max_replication_slots\" > 0")));
1467 :
1468 3364 : if (wal_level < WAL_LEVEL_REPLICA)
1469 0 : ereport(ERROR,
1470 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1471 : errmsg("replication slots can only be used if \"wal_level\" >= \"replica\"")));
1472 3364 : }
1473 :
1474 : /*
1475 : * Check whether the user has privilege to use replication slots.
1476 : */
1477 : void
1478 1086 : CheckSlotPermissions(void)
1479 : {
1480 1086 : if (!has_rolreplication(GetUserId()))
1481 10 : ereport(ERROR,
1482 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1483 : errmsg("permission denied to use replication slots"),
1484 : errdetail("Only roles with the %s attribute may use replication slots.",
1485 : "REPLICATION")));
1486 1076 : }
1487 :
1488 : /*
1489 : * Reserve WAL for the currently active slot.
1490 : *
1491 : * Compute and set restart_lsn in a manner that's appropriate for the type of
1492 : * the slot and concurrency safe.
1493 : */
1494 : void
1495 1182 : ReplicationSlotReserveWal(void)
1496 : {
1497 1182 : ReplicationSlot *slot = MyReplicationSlot;
1498 :
1499 : Assert(slot != NULL);
1500 : Assert(slot->data.restart_lsn == InvalidXLogRecPtr);
1501 : Assert(slot->last_saved_restart_lsn == InvalidXLogRecPtr);
1502 :
1503 : /*
1504 : * The replication slot mechanism is used to prevent removal of required
1505 : * WAL. As there is no interlock between this routine and checkpoints, WAL
1506 : * segments could concurrently be removed when a now stale return value of
1507 : * ReplicationSlotsComputeRequiredLSN() is used. In the unlikely case that
1508 : * this happens we'll just retry.
1509 : */
1510 : while (true)
1511 0 : {
1512 : XLogSegNo segno;
1513 : XLogRecPtr restart_lsn;
1514 :
1515 : /*
1516 : * For logical slots log a standby snapshot and start logical decoding
1517 : * at exactly that position. That allows the slot to start up more
1518 : * quickly. But on a standby we cannot do WAL writes, so just use the
1519 : * replay pointer; effectively, an attempt to create a logical slot on
1520 : * standby will cause it to wait for an xl_running_xact record to be
1521 : * logged independently on the primary, so that a snapshot can be
1522 : * built using the record.
1523 : *
1524 : * None of this is needed (or indeed helpful) for physical slots as
1525 : * they'll start replay at the last logged checkpoint anyway. Instead
1526 : * return the location of the last redo LSN. While that slightly
1527 : * increases the chance that we have to retry, it's where a base
1528 : * backup has to start replay at.
1529 : */
1530 1182 : if (SlotIsPhysical(slot))
1531 294 : restart_lsn = GetRedoRecPtr();
1532 888 : else if (RecoveryInProgress())
1533 46 : restart_lsn = GetXLogReplayRecPtr(NULL);
1534 : else
1535 842 : restart_lsn = GetXLogInsertRecPtr();
1536 :
1537 1182 : SpinLockAcquire(&slot->mutex);
1538 1182 : slot->data.restart_lsn = restart_lsn;
1539 1182 : SpinLockRelease(&slot->mutex);
1540 :
1541 : /* prevent WAL removal as fast as possible */
1542 1182 : ReplicationSlotsComputeRequiredLSN();
1543 :
1544 : /*
1545 : * If all required WAL is still there, great, otherwise retry. The
1546 : * slot should prevent further removal of WAL, unless there's a
1547 : * concurrent ReplicationSlotsComputeRequiredLSN() after we've written
1548 : * the new restart_lsn above, so normally we should never need to loop
1549 : * more than twice.
1550 : */
1551 1182 : XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size);
1552 1182 : if (XLogGetLastRemovedSegno() < segno)
1553 1182 : break;
1554 : }
1555 :
1556 1182 : if (!RecoveryInProgress() && SlotIsLogical(slot))
1557 : {
1558 : XLogRecPtr flushptr;
1559 :
1560 : /* make sure we have enough information to start */
1561 842 : flushptr = LogStandbySnapshot();
1562 :
1563 : /* and make sure it's fsynced to disk */
1564 842 : XLogFlush(flushptr);
1565 : }
1566 1182 : }
1567 :
1568 : /*
1569 : * Report that replication slot needs to be invalidated
1570 : */
1571 : static void
1572 42 : ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
1573 : bool terminating,
1574 : int pid,
1575 : NameData slotname,
1576 : XLogRecPtr restart_lsn,
1577 : XLogRecPtr oldestLSN,
1578 : TransactionId snapshotConflictHorizon,
1579 : long slot_idle_seconds)
1580 : {
1581 : StringInfoData err_detail;
1582 : StringInfoData err_hint;
1583 :
1584 42 : initStringInfo(&err_detail);
1585 42 : initStringInfo(&err_hint);
1586 :
1587 42 : switch (cause)
1588 : {
1589 12 : case RS_INVAL_WAL_REMOVED:
1590 : {
1591 12 : uint64 ex = oldestLSN - restart_lsn;
1592 :
1593 12 : appendStringInfo(&err_detail,
1594 12 : ngettext("The slot's restart_lsn %X/%X exceeds the limit by %" PRIu64 " byte.",
1595 : "The slot's restart_lsn %X/%X exceeds the limit by %" PRIu64 " bytes.",
1596 : ex),
1597 12 : LSN_FORMAT_ARGS(restart_lsn),
1598 : ex);
1599 : /* translator: %s is a GUC variable name */
1600 12 : appendStringInfo(&err_hint, _("You might need to increase \"%s\"."),
1601 : "max_slot_wal_keep_size");
1602 12 : break;
1603 : }
1604 24 : case RS_INVAL_HORIZON:
1605 24 : appendStringInfo(&err_detail, _("The slot conflicted with xid horizon %u."),
1606 : snapshotConflictHorizon);
1607 24 : break;
1608 :
1609 6 : case RS_INVAL_WAL_LEVEL:
1610 6 : appendStringInfoString(&err_detail, _("Logical decoding on standby requires \"wal_level\" >= \"logical\" on the primary server."));
1611 6 : break;
1612 :
1613 0 : case RS_INVAL_IDLE_TIMEOUT:
1614 : {
1615 0 : int minutes = slot_idle_seconds / SECS_PER_MINUTE;
1616 0 : int secs = slot_idle_seconds % SECS_PER_MINUTE;
1617 :
1618 : /* translator: %s is a GUC variable name */
1619 0 : appendStringInfo(&err_detail, _("The slot's idle time of %dmin %02ds exceeds the configured \"%s\" duration of %dmin."),
1620 : minutes, secs, "idle_replication_slot_timeout",
1621 : idle_replication_slot_timeout_mins);
1622 : /* translator: %s is a GUC variable name */
1623 0 : appendStringInfo(&err_hint, _("You might need to increase \"%s\"."),
1624 : "idle_replication_slot_timeout");
1625 0 : break;
1626 : }
1627 : case RS_INVAL_NONE:
1628 : pg_unreachable();
1629 : }
1630 :
1631 42 : ereport(LOG,
1632 : terminating ?
1633 : errmsg("terminating process %d to release replication slot \"%s\"",
1634 : pid, NameStr(slotname)) :
1635 : errmsg("invalidating obsolete replication slot \"%s\"",
1636 : NameStr(slotname)),
1637 : errdetail_internal("%s", err_detail.data),
1638 : err_hint.len ? errhint("%s", err_hint.data) : 0);
1639 :
1640 42 : pfree(err_detail.data);
1641 42 : pfree(err_hint.data);
1642 42 : }
1643 :
1644 : /*
1645 : * Can we invalidate an idle replication slot?
1646 : *
1647 : * Idle timeout invalidation is allowed only when:
1648 : *
1649 : * 1. Idle timeout is set
1650 : * 2. Slot has reserved WAL
1651 : * 3. Slot is inactive
1652 : * 4. The slot is not being synced from the primary while the server is in
1653 : * recovery. This is because synced slots are always considered to be
1654 : * inactive because they don't perform logical decoding to produce changes.
1655 : */
1656 : static inline bool
1657 634 : CanInvalidateIdleSlot(ReplicationSlot *s)
1658 : {
1659 634 : return (idle_replication_slot_timeout_mins != 0 &&
1660 0 : !XLogRecPtrIsInvalid(s->data.restart_lsn) &&
1661 634 : s->inactive_since > 0 &&
1662 0 : !(RecoveryInProgress() && s->data.synced));
1663 : }
1664 :
1665 : /*
1666 : * DetermineSlotInvalidationCause - Determine the cause for which a slot
1667 : * becomes invalid among the given possible causes.
1668 : *
1669 : * This function sequentially checks all possible invalidation causes and
1670 : * returns the first one for which the slot is eligible for invalidation.
1671 : */
1672 : static ReplicationSlotInvalidationCause
1673 696 : DetermineSlotInvalidationCause(uint32 possible_causes, ReplicationSlot *s,
1674 : XLogRecPtr oldestLSN, Oid dboid,
1675 : TransactionId snapshotConflictHorizon,
1676 : TransactionId initial_effective_xmin,
1677 : TransactionId initial_catalog_effective_xmin,
1678 : XLogRecPtr initial_restart_lsn,
1679 : TimestampTz *inactive_since, TimestampTz now)
1680 : {
1681 : Assert(possible_causes != RS_INVAL_NONE);
1682 :
1683 696 : if (possible_causes & RS_INVAL_WAL_REMOVED)
1684 : {
1685 646 : if (initial_restart_lsn != InvalidXLogRecPtr &&
1686 : initial_restart_lsn < oldestLSN)
1687 12 : return RS_INVAL_WAL_REMOVED;
1688 : }
1689 :
1690 684 : if (possible_causes & RS_INVAL_HORIZON)
1691 : {
1692 : /* invalid DB oid signals a shared relation */
1693 44 : if (SlotIsLogical(s) &&
1694 34 : (dboid == InvalidOid || dboid == s->data.database))
1695 : {
1696 44 : if (TransactionIdIsValid(initial_effective_xmin) &&
1697 0 : TransactionIdPrecedesOrEquals(initial_effective_xmin,
1698 : snapshotConflictHorizon))
1699 0 : return RS_INVAL_HORIZON;
1700 88 : else if (TransactionIdIsValid(initial_catalog_effective_xmin) &&
1701 44 : TransactionIdPrecedesOrEquals(initial_catalog_effective_xmin,
1702 : snapshotConflictHorizon))
1703 24 : return RS_INVAL_HORIZON;
1704 : }
1705 : }
1706 :
1707 660 : if (possible_causes & RS_INVAL_WAL_LEVEL)
1708 : {
1709 6 : if (SlotIsLogical(s))
1710 6 : return RS_INVAL_WAL_LEVEL;
1711 : }
1712 :
1713 654 : if (possible_causes & RS_INVAL_IDLE_TIMEOUT)
1714 : {
1715 : Assert(now > 0);
1716 :
1717 634 : if (CanInvalidateIdleSlot(s))
1718 : {
1719 : /*
1720 : * We simulate the invalidation due to idle_timeout as the minimum
1721 : * time idle time is one minute which makes tests take a long
1722 : * time.
1723 : */
1724 : #ifdef USE_INJECTION_POINTS
1725 0 : if (IS_INJECTION_POINT_ATTACHED("slot-timeout-inval"))
1726 : {
1727 0 : *inactive_since = 0; /* since the beginning of time */
1728 0 : return RS_INVAL_IDLE_TIMEOUT;
1729 : }
1730 : #endif
1731 :
1732 : /*
1733 : * Check if the slot needs to be invalidated due to
1734 : * idle_replication_slot_timeout GUC.
1735 : */
1736 0 : if (TimestampDifferenceExceedsSeconds(s->inactive_since, now,
1737 : idle_replication_slot_timeout_mins * SECS_PER_MINUTE))
1738 : {
1739 0 : *inactive_since = s->inactive_since;
1740 0 : return RS_INVAL_IDLE_TIMEOUT;
1741 : }
1742 : }
1743 : }
1744 :
1745 654 : return RS_INVAL_NONE;
1746 : }
1747 :
1748 : /*
1749 : * Helper for InvalidateObsoleteReplicationSlots
1750 : *
1751 : * Acquires the given slot and mark it invalid, if necessary and possible.
1752 : *
1753 : * Returns whether ReplicationSlotControlLock was released in the interim (and
1754 : * in that case we're not holding the lock at return, otherwise we are).
1755 : *
1756 : * Sets *invalidated true if the slot was invalidated. (Untouched otherwise.)
1757 : *
1758 : * This is inherently racy, because we release the LWLock
1759 : * for syscalls, so caller must restart if we return true.
1760 : */
1761 : static bool
1762 768 : InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
1763 : ReplicationSlot *s,
1764 : XLogRecPtr oldestLSN,
1765 : Oid dboid, TransactionId snapshotConflictHorizon,
1766 : bool *invalidated)
1767 : {
1768 768 : int last_signaled_pid = 0;
1769 768 : bool released_lock = false;
1770 768 : bool terminated = false;
1771 768 : TransactionId initial_effective_xmin = InvalidTransactionId;
1772 768 : TransactionId initial_catalog_effective_xmin = InvalidTransactionId;
1773 768 : XLogRecPtr initial_restart_lsn = InvalidXLogRecPtr;
1774 768 : ReplicationSlotInvalidationCause invalidation_cause_prev PG_USED_FOR_ASSERTS_ONLY = RS_INVAL_NONE;
1775 768 : TimestampTz inactive_since = 0;
1776 :
1777 : for (;;)
1778 14 : {
1779 : XLogRecPtr restart_lsn;
1780 : NameData slotname;
1781 782 : int active_pid = 0;
1782 782 : ReplicationSlotInvalidationCause invalidation_cause = RS_INVAL_NONE;
1783 782 : TimestampTz now = 0;
1784 782 : long slot_idle_secs = 0;
1785 :
1786 : Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
1787 :
1788 782 : if (!s->in_use)
1789 : {
1790 0 : if (released_lock)
1791 0 : LWLockRelease(ReplicationSlotControlLock);
1792 0 : break;
1793 : }
1794 :
1795 782 : if (possible_causes & RS_INVAL_IDLE_TIMEOUT)
1796 : {
1797 : /*
1798 : * Assign the current time here to avoid system call overhead
1799 : * while holding the spinlock in subsequent code.
1800 : */
1801 666 : now = GetCurrentTimestamp();
1802 : }
1803 :
1804 : /*
1805 : * Check if the slot needs to be invalidated. If it needs to be
1806 : * invalidated, and is not currently acquired, acquire it and mark it
1807 : * as having been invalidated. We do this with the spinlock held to
1808 : * avoid race conditions -- for example the restart_lsn could move
1809 : * forward, or the slot could be dropped.
1810 : */
1811 782 : SpinLockAcquire(&s->mutex);
1812 :
1813 782 : restart_lsn = s->data.restart_lsn;
1814 :
1815 : /* we do nothing if the slot is already invalid */
1816 782 : if (s->data.invalidated == RS_INVAL_NONE)
1817 : {
1818 : /*
1819 : * The slot's mutex will be released soon, and it is possible that
1820 : * those values change since the process holding the slot has been
1821 : * terminated (if any), so record them here to ensure that we
1822 : * would report the correct invalidation cause.
1823 : *
1824 : * Unlike other slot attributes, slot's inactive_since can't be
1825 : * changed until the acquired slot is released or the owning
1826 : * process is terminated. So, the inactive slot can only be
1827 : * invalidated immediately without being terminated.
1828 : */
1829 696 : if (!terminated)
1830 : {
1831 682 : initial_restart_lsn = s->data.restart_lsn;
1832 682 : initial_effective_xmin = s->effective_xmin;
1833 682 : initial_catalog_effective_xmin = s->effective_catalog_xmin;
1834 : }
1835 :
1836 696 : invalidation_cause = DetermineSlotInvalidationCause(possible_causes,
1837 : s, oldestLSN,
1838 : dboid,
1839 : snapshotConflictHorizon,
1840 : initial_effective_xmin,
1841 : initial_catalog_effective_xmin,
1842 : initial_restart_lsn,
1843 : &inactive_since,
1844 : now);
1845 : }
1846 :
1847 : /*
1848 : * The invalidation cause recorded previously should not change while
1849 : * the process owning the slot (if any) has been terminated.
1850 : */
1851 : Assert(!(invalidation_cause_prev != RS_INVAL_NONE && terminated &&
1852 : invalidation_cause_prev != invalidation_cause));
1853 :
1854 : /* if there's no invalidation, we're done */
1855 782 : if (invalidation_cause == RS_INVAL_NONE)
1856 : {
1857 740 : SpinLockRelease(&s->mutex);
1858 740 : if (released_lock)
1859 0 : LWLockRelease(ReplicationSlotControlLock);
1860 740 : break;
1861 : }
1862 :
1863 42 : slotname = s->data.name;
1864 42 : active_pid = s->active_pid;
1865 :
1866 : /*
1867 : * If the slot can be acquired, do so and mark it invalidated
1868 : * immediately. Otherwise we'll signal the owning process, below, and
1869 : * retry.
1870 : */
1871 42 : if (active_pid == 0)
1872 : {
1873 28 : MyReplicationSlot = s;
1874 28 : s->active_pid = MyProcPid;
1875 28 : s->data.invalidated = invalidation_cause;
1876 :
1877 : /*
1878 : * XXX: We should consider not overwriting restart_lsn and instead
1879 : * just rely on .invalidated.
1880 : */
1881 28 : if (invalidation_cause == RS_INVAL_WAL_REMOVED)
1882 : {
1883 8 : s->data.restart_lsn = InvalidXLogRecPtr;
1884 8 : s->last_saved_restart_lsn = InvalidXLogRecPtr;
1885 : }
1886 :
1887 : /* Let caller know */
1888 28 : *invalidated = true;
1889 : }
1890 :
1891 42 : SpinLockRelease(&s->mutex);
1892 :
1893 : /*
1894 : * The logical replication slots shouldn't be invalidated as GUC
1895 : * max_slot_wal_keep_size is set to -1 and
1896 : * idle_replication_slot_timeout is set to 0 during the binary
1897 : * upgrade. See check_old_cluster_for_valid_slots() where we ensure
1898 : * that no invalidated before the upgrade.
1899 : */
1900 : Assert(!(*invalidated && SlotIsLogical(s) && IsBinaryUpgrade));
1901 :
1902 : /*
1903 : * Calculate the idle time duration of the slot if slot is marked
1904 : * invalidated with RS_INVAL_IDLE_TIMEOUT.
1905 : */
1906 42 : if (invalidation_cause == RS_INVAL_IDLE_TIMEOUT)
1907 : {
1908 : int slot_idle_usecs;
1909 :
1910 0 : TimestampDifference(inactive_since, now, &slot_idle_secs,
1911 : &slot_idle_usecs);
1912 : }
1913 :
1914 42 : if (active_pid != 0)
1915 : {
1916 : /*
1917 : * Prepare the sleep on the slot's condition variable before
1918 : * releasing the lock, to close a possible race condition if the
1919 : * slot is released before the sleep below.
1920 : */
1921 14 : ConditionVariablePrepareToSleep(&s->active_cv);
1922 :
1923 14 : LWLockRelease(ReplicationSlotControlLock);
1924 14 : released_lock = true;
1925 :
1926 : /*
1927 : * Signal to terminate the process that owns the slot, if we
1928 : * haven't already signalled it. (Avoidance of repeated
1929 : * signalling is the only reason for there to be a loop in this
1930 : * routine; otherwise we could rely on caller's restart loop.)
1931 : *
1932 : * There is the race condition that other process may own the slot
1933 : * after its current owner process is terminated and before this
1934 : * process owns it. To handle that, we signal only if the PID of
1935 : * the owning process has changed from the previous time. (This
1936 : * logic assumes that the same PID is not reused very quickly.)
1937 : */
1938 14 : if (last_signaled_pid != active_pid)
1939 : {
1940 14 : ReportSlotInvalidation(invalidation_cause, true, active_pid,
1941 : slotname, restart_lsn,
1942 : oldestLSN, snapshotConflictHorizon,
1943 : slot_idle_secs);
1944 :
1945 14 : if (MyBackendType == B_STARTUP)
1946 10 : (void) SendProcSignal(active_pid,
1947 : PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT,
1948 : INVALID_PROC_NUMBER);
1949 : else
1950 4 : (void) kill(active_pid, SIGTERM);
1951 :
1952 14 : last_signaled_pid = active_pid;
1953 14 : terminated = true;
1954 14 : invalidation_cause_prev = invalidation_cause;
1955 : }
1956 :
1957 : /* Wait until the slot is released. */
1958 14 : ConditionVariableSleep(&s->active_cv,
1959 : WAIT_EVENT_REPLICATION_SLOT_DROP);
1960 :
1961 : /*
1962 : * Re-acquire lock and start over; we expect to invalidate the
1963 : * slot next time (unless another process acquires the slot in the
1964 : * meantime).
1965 : */
1966 14 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1967 14 : continue;
1968 : }
1969 : else
1970 : {
1971 : /*
1972 : * We hold the slot now and have already invalidated it; flush it
1973 : * to ensure that state persists.
1974 : *
1975 : * Don't want to hold ReplicationSlotControlLock across file
1976 : * system operations, so release it now but be sure to tell caller
1977 : * to restart from scratch.
1978 : */
1979 28 : LWLockRelease(ReplicationSlotControlLock);
1980 28 : released_lock = true;
1981 :
1982 : /* Make sure the invalidated state persists across server restart */
1983 28 : ReplicationSlotMarkDirty();
1984 28 : ReplicationSlotSave();
1985 28 : ReplicationSlotRelease();
1986 :
1987 28 : ReportSlotInvalidation(invalidation_cause, false, active_pid,
1988 : slotname, restart_lsn,
1989 : oldestLSN, snapshotConflictHorizon,
1990 : slot_idle_secs);
1991 :
1992 : /* done with this slot for now */
1993 28 : break;
1994 : }
1995 : }
1996 :
1997 : Assert(released_lock == !LWLockHeldByMe(ReplicationSlotControlLock));
1998 :
1999 768 : return released_lock;
2000 : }
2001 :
2002 : /*
2003 : * Invalidate slots that require resources about to be removed.
2004 : *
2005 : * Returns true when any slot have got invalidated.
2006 : *
2007 : * Whether a slot needs to be invalidated depends on the invalidation cause.
2008 : * A slot is invalidated if it:
2009 : * - RS_INVAL_WAL_REMOVED: requires a LSN older than the given segment
2010 : * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given
2011 : * db; dboid may be InvalidOid for shared relations
2012 : * - RS_INVAL_WAL_LEVEL: is logical and wal_level is insufficient
2013 : * - RS_INVAL_IDLE_TIMEOUT: has been idle longer than the configured
2014 : * "idle_replication_slot_timeout" duration.
2015 : *
2016 : * Note: This function attempts to invalidate the slot for multiple possible
2017 : * causes in a single pass, minimizing redundant iterations. The "cause"
2018 : * parameter can be a MASK representing one or more of the defined causes.
2019 : *
2020 : * NB - this runs as part of checkpoint, so avoid raising errors if possible.
2021 : */
2022 : bool
2023 3394 : InvalidateObsoleteReplicationSlots(uint32 possible_causes,
2024 : XLogSegNo oldestSegno, Oid dboid,
2025 : TransactionId snapshotConflictHorizon)
2026 : {
2027 : XLogRecPtr oldestLSN;
2028 3394 : bool invalidated = false;
2029 :
2030 : Assert(!(possible_causes & RS_INVAL_HORIZON) || TransactionIdIsValid(snapshotConflictHorizon));
2031 : Assert(!(possible_causes & RS_INVAL_WAL_REMOVED) || oldestSegno > 0);
2032 : Assert(possible_causes != RS_INVAL_NONE);
2033 :
2034 3394 : if (max_replication_slots == 0)
2035 0 : return invalidated;
2036 :
2037 3394 : XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
2038 :
2039 3422 : restart:
2040 3422 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
2041 36636 : for (int i = 0; i < max_replication_slots; i++)
2042 : {
2043 33242 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
2044 :
2045 33242 : if (!s->in_use)
2046 32474 : continue;
2047 :
2048 768 : if (InvalidatePossiblyObsoleteSlot(possible_causes, s, oldestLSN, dboid,
2049 : snapshotConflictHorizon,
2050 : &invalidated))
2051 : {
2052 : /* if the lock was released, start from scratch */
2053 28 : goto restart;
2054 : }
2055 : }
2056 3394 : LWLockRelease(ReplicationSlotControlLock);
2057 :
2058 : /*
2059 : * If any slots have been invalidated, recalculate the resource limits.
2060 : */
2061 3394 : if (invalidated)
2062 : {
2063 18 : ReplicationSlotsComputeRequiredXmin(false);
2064 18 : ReplicationSlotsComputeRequiredLSN();
2065 : }
2066 :
2067 3394 : return invalidated;
2068 : }
2069 :
2070 : /*
2071 : * Flush all replication slots to disk.
2072 : *
2073 : * It is convenient to flush dirty replication slots at the time of checkpoint.
2074 : * Additionally, in case of a shutdown checkpoint, we also identify the slots
2075 : * for which the confirmed_flush LSN has been updated since the last time it
2076 : * was saved and flush them.
2077 : */
2078 : void
2079 3350 : CheckPointReplicationSlots(bool is_shutdown)
2080 : {
2081 : int i;
2082 3350 : bool last_saved_restart_lsn_updated = false;
2083 :
2084 3350 : elog(DEBUG1, "performing replication slot checkpoint");
2085 :
2086 : /*
2087 : * Prevent any slot from being created/dropped while we're active. As we
2088 : * explicitly do *not* want to block iterating over replication_slots or
2089 : * acquiring a slot we cannot take the control lock - but that's OK,
2090 : * because holding ReplicationSlotAllocationLock is strictly stronger, and
2091 : * enough to guarantee that nobody can change the in_use bits on us.
2092 : */
2093 3350 : LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
2094 :
2095 36304 : for (i = 0; i < max_replication_slots; i++)
2096 : {
2097 32954 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
2098 : char path[MAXPGPATH];
2099 :
2100 32954 : if (!s->in_use)
2101 32298 : continue;
2102 :
2103 : /* save the slot to disk, locking is handled in SaveSlotToPath() */
2104 656 : sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(s->data.name));
2105 :
2106 : /*
2107 : * Slot's data is not flushed each time the confirmed_flush LSN is
2108 : * updated as that could lead to frequent writes. However, we decide
2109 : * to force a flush of all logical slot's data at the time of shutdown
2110 : * if the confirmed_flush LSN is changed since we last flushed it to
2111 : * disk. This helps in avoiding an unnecessary retreat of the
2112 : * confirmed_flush LSN after restart.
2113 : */
2114 656 : if (is_shutdown && SlotIsLogical(s))
2115 : {
2116 122 : SpinLockAcquire(&s->mutex);
2117 :
2118 122 : if (s->data.invalidated == RS_INVAL_NONE &&
2119 122 : s->data.confirmed_flush > s->last_saved_confirmed_flush)
2120 : {
2121 62 : s->just_dirtied = true;
2122 62 : s->dirty = true;
2123 : }
2124 122 : SpinLockRelease(&s->mutex);
2125 : }
2126 :
2127 : /*
2128 : * Track if we're going to update slot's last_saved_restart_lsn. We
2129 : * need this to know if we need to recompute the required LSN.
2130 : */
2131 656 : if (s->last_saved_restart_lsn != s->data.restart_lsn)
2132 378 : last_saved_restart_lsn_updated = true;
2133 :
2134 656 : SaveSlotToPath(s, path, LOG);
2135 : }
2136 3350 : LWLockRelease(ReplicationSlotAllocationLock);
2137 :
2138 : /*
2139 : * Recompute the required LSN if SaveSlotToPath() updated
2140 : * last_saved_restart_lsn for any slot.
2141 : */
2142 3350 : if (last_saved_restart_lsn_updated)
2143 378 : ReplicationSlotsComputeRequiredLSN();
2144 3350 : }
2145 :
2146 : /*
2147 : * Load all replication slots from disk into memory at server startup. This
2148 : * needs to be run before we start crash recovery.
2149 : */
2150 : void
2151 1842 : StartupReplicationSlots(void)
2152 : {
2153 : DIR *replication_dir;
2154 : struct dirent *replication_de;
2155 :
2156 1842 : elog(DEBUG1, "starting up replication slots");
2157 :
2158 : /* restore all slots by iterating over all on-disk entries */
2159 1842 : replication_dir = AllocateDir(PG_REPLSLOT_DIR);
2160 5670 : while ((replication_de = ReadDir(replication_dir, PG_REPLSLOT_DIR)) != NULL)
2161 : {
2162 : char path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
2163 : PGFileType de_type;
2164 :
2165 3830 : if (strcmp(replication_de->d_name, ".") == 0 ||
2166 1990 : strcmp(replication_de->d_name, "..") == 0)
2167 3680 : continue;
2168 :
2169 150 : snprintf(path, sizeof(path), "%s/%s", PG_REPLSLOT_DIR, replication_de->d_name);
2170 150 : de_type = get_dirent_type(path, replication_de, false, DEBUG1);
2171 :
2172 : /* we're only creating directories here, skip if it's not our's */
2173 150 : if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_DIR)
2174 0 : continue;
2175 :
2176 : /* we crashed while a slot was being setup or deleted, clean up */
2177 150 : if (pg_str_endswith(replication_de->d_name, ".tmp"))
2178 : {
2179 0 : if (!rmtree(path, true))
2180 : {
2181 0 : ereport(WARNING,
2182 : (errmsg("could not remove directory \"%s\"",
2183 : path)));
2184 0 : continue;
2185 : }
2186 0 : fsync_fname(PG_REPLSLOT_DIR, true);
2187 0 : continue;
2188 : }
2189 :
2190 : /* looks like a slot in a normal state, restore */
2191 150 : RestoreSlotFromDisk(replication_de->d_name);
2192 : }
2193 1840 : FreeDir(replication_dir);
2194 :
2195 : /* currently no slots exist, we're done. */
2196 1840 : if (max_replication_slots <= 0)
2197 0 : return;
2198 :
2199 : /* Now that we have recovered all the data, compute replication xmin */
2200 1840 : ReplicationSlotsComputeRequiredXmin(false);
2201 1840 : ReplicationSlotsComputeRequiredLSN();
2202 : }
2203 :
2204 : /* ----
2205 : * Manipulation of on-disk state of replication slots
2206 : *
2207 : * NB: none of the routines below should take any notice whether a slot is the
2208 : * current one or not, that's all handled a layer above.
2209 : * ----
2210 : */
2211 : static void
2212 1258 : CreateSlotOnDisk(ReplicationSlot *slot)
2213 : {
2214 : char tmppath[MAXPGPATH];
2215 : char path[MAXPGPATH];
2216 : struct stat st;
2217 :
2218 : /*
2219 : * No need to take out the io_in_progress_lock, nobody else can see this
2220 : * slot yet, so nobody else will write. We're reusing SaveSlotToPath which
2221 : * takes out the lock, if we'd take the lock here, we'd deadlock.
2222 : */
2223 :
2224 1258 : sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(slot->data.name));
2225 1258 : sprintf(tmppath, "%s/%s.tmp", PG_REPLSLOT_DIR, NameStr(slot->data.name));
2226 :
2227 : /*
2228 : * It's just barely possible that some previous effort to create or drop a
2229 : * slot with this name left a temp directory lying around. If that seems
2230 : * to be the case, try to remove it. If the rmtree() fails, we'll error
2231 : * out at the MakePGDirectory() below, so we don't bother checking
2232 : * success.
2233 : */
2234 1258 : if (stat(tmppath, &st) == 0 && S_ISDIR(st.st_mode))
2235 0 : rmtree(tmppath, true);
2236 :
2237 : /* Create and fsync the temporary slot directory. */
2238 1258 : if (MakePGDirectory(tmppath) < 0)
2239 0 : ereport(ERROR,
2240 : (errcode_for_file_access(),
2241 : errmsg("could not create directory \"%s\": %m",
2242 : tmppath)));
2243 1258 : fsync_fname(tmppath, true);
2244 :
2245 : /* Write the actual state file. */
2246 1258 : slot->dirty = true; /* signal that we really need to write */
2247 1258 : SaveSlotToPath(slot, tmppath, ERROR);
2248 :
2249 : /* Rename the directory into place. */
2250 1258 : if (rename(tmppath, path) != 0)
2251 0 : ereport(ERROR,
2252 : (errcode_for_file_access(),
2253 : errmsg("could not rename file \"%s\" to \"%s\": %m",
2254 : tmppath, path)));
2255 :
2256 : /*
2257 : * If we'd now fail - really unlikely - we wouldn't know whether this slot
2258 : * would persist after an OS crash or not - so, force a restart. The
2259 : * restart would try to fsync this again till it works.
2260 : */
2261 1258 : START_CRIT_SECTION();
2262 :
2263 1258 : fsync_fname(path, true);
2264 1258 : fsync_fname(PG_REPLSLOT_DIR, true);
2265 :
2266 1258 : END_CRIT_SECTION();
2267 1258 : }
2268 :
2269 : /*
2270 : * Shared functionality between saving and creating a replication slot.
2271 : */
2272 : static void
2273 4562 : SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
2274 : {
2275 : char tmppath[MAXPGPATH];
2276 : char path[MAXPGPATH];
2277 : int fd;
2278 : ReplicationSlotOnDisk cp;
2279 : bool was_dirty;
2280 :
2281 : /* first check whether there's something to write out */
2282 4562 : SpinLockAcquire(&slot->mutex);
2283 4562 : was_dirty = slot->dirty;
2284 4562 : slot->just_dirtied = false;
2285 4562 : SpinLockRelease(&slot->mutex);
2286 :
2287 : /* and don't do anything if there's nothing to write */
2288 4562 : if (!was_dirty)
2289 198 : return;
2290 :
2291 4364 : LWLockAcquire(&slot->io_in_progress_lock, LW_EXCLUSIVE);
2292 :
2293 : /* silence valgrind :( */
2294 4364 : memset(&cp, 0, sizeof(ReplicationSlotOnDisk));
2295 :
2296 4364 : sprintf(tmppath, "%s/state.tmp", dir);
2297 4364 : sprintf(path, "%s/state", dir);
2298 :
2299 4364 : fd = OpenTransientFile(tmppath, O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
2300 4364 : if (fd < 0)
2301 : {
2302 : /*
2303 : * If not an ERROR, then release the lock before returning. In case
2304 : * of an ERROR, the error recovery path automatically releases the
2305 : * lock, but no harm in explicitly releasing even in that case. Note
2306 : * that LWLockRelease() could affect errno.
2307 : */
2308 0 : int save_errno = errno;
2309 :
2310 0 : LWLockRelease(&slot->io_in_progress_lock);
2311 0 : errno = save_errno;
2312 0 : ereport(elevel,
2313 : (errcode_for_file_access(),
2314 : errmsg("could not create file \"%s\": %m",
2315 : tmppath)));
2316 0 : return;
2317 : }
2318 :
2319 4364 : cp.magic = SLOT_MAGIC;
2320 4364 : INIT_CRC32C(cp.checksum);
2321 4364 : cp.version = SLOT_VERSION;
2322 4364 : cp.length = ReplicationSlotOnDiskV2Size;
2323 :
2324 4364 : SpinLockAcquire(&slot->mutex);
2325 :
2326 4364 : memcpy(&cp.slotdata, &slot->data, sizeof(ReplicationSlotPersistentData));
2327 :
2328 4364 : SpinLockRelease(&slot->mutex);
2329 :
2330 4364 : COMP_CRC32C(cp.checksum,
2331 : (char *) (&cp) + ReplicationSlotOnDiskNotChecksummedSize,
2332 : ReplicationSlotOnDiskChecksummedSize);
2333 4364 : FIN_CRC32C(cp.checksum);
2334 :
2335 4364 : errno = 0;
2336 4364 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_WRITE);
2337 4364 : if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
2338 : {
2339 0 : int save_errno = errno;
2340 :
2341 0 : pgstat_report_wait_end();
2342 0 : CloseTransientFile(fd);
2343 0 : LWLockRelease(&slot->io_in_progress_lock);
2344 :
2345 : /* if write didn't set errno, assume problem is no disk space */
2346 0 : errno = save_errno ? save_errno : ENOSPC;
2347 0 : ereport(elevel,
2348 : (errcode_for_file_access(),
2349 : errmsg("could not write to file \"%s\": %m",
2350 : tmppath)));
2351 0 : return;
2352 : }
2353 4364 : pgstat_report_wait_end();
2354 :
2355 : /* fsync the temporary file */
2356 4364 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_SYNC);
2357 4364 : if (pg_fsync(fd) != 0)
2358 : {
2359 0 : int save_errno = errno;
2360 :
2361 0 : pgstat_report_wait_end();
2362 0 : CloseTransientFile(fd);
2363 0 : LWLockRelease(&slot->io_in_progress_lock);
2364 0 : errno = save_errno;
2365 0 : ereport(elevel,
2366 : (errcode_for_file_access(),
2367 : errmsg("could not fsync file \"%s\": %m",
2368 : tmppath)));
2369 0 : return;
2370 : }
2371 4364 : pgstat_report_wait_end();
2372 :
2373 4364 : if (CloseTransientFile(fd) != 0)
2374 : {
2375 0 : int save_errno = errno;
2376 :
2377 0 : LWLockRelease(&slot->io_in_progress_lock);
2378 0 : errno = save_errno;
2379 0 : ereport(elevel,
2380 : (errcode_for_file_access(),
2381 : errmsg("could not close file \"%s\": %m",
2382 : tmppath)));
2383 0 : return;
2384 : }
2385 :
2386 : /* rename to permanent file, fsync file and directory */
2387 4364 : if (rename(tmppath, path) != 0)
2388 : {
2389 0 : int save_errno = errno;
2390 :
2391 0 : LWLockRelease(&slot->io_in_progress_lock);
2392 0 : errno = save_errno;
2393 0 : ereport(elevel,
2394 : (errcode_for_file_access(),
2395 : errmsg("could not rename file \"%s\" to \"%s\": %m",
2396 : tmppath, path)));
2397 0 : return;
2398 : }
2399 :
2400 : /*
2401 : * Check CreateSlotOnDisk() for the reasoning of using a critical section.
2402 : */
2403 4364 : START_CRIT_SECTION();
2404 :
2405 4364 : fsync_fname(path, false);
2406 4364 : fsync_fname(dir, true);
2407 4364 : fsync_fname(PG_REPLSLOT_DIR, true);
2408 :
2409 4364 : END_CRIT_SECTION();
2410 :
2411 : /*
2412 : * Successfully wrote, unset dirty bit, unless somebody dirtied again
2413 : * already and remember the confirmed_flush LSN value.
2414 : */
2415 4364 : SpinLockAcquire(&slot->mutex);
2416 4364 : if (!slot->just_dirtied)
2417 4322 : slot->dirty = false;
2418 4364 : slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
2419 4364 : slot->last_saved_restart_lsn = cp.slotdata.restart_lsn;
2420 4364 : SpinLockRelease(&slot->mutex);
2421 :
2422 4364 : LWLockRelease(&slot->io_in_progress_lock);
2423 : }
2424 :
2425 : /*
2426 : * Load a single slot from disk into memory.
2427 : */
2428 : static void
2429 150 : RestoreSlotFromDisk(const char *name)
2430 : {
2431 : ReplicationSlotOnDisk cp;
2432 : int i;
2433 : char slotdir[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
2434 : char path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR) + 10];
2435 : int fd;
2436 150 : bool restored = false;
2437 : int readBytes;
2438 : pg_crc32c checksum;
2439 150 : TimestampTz now = 0;
2440 :
2441 : /* no need to lock here, no concurrent access allowed yet */
2442 :
2443 : /* delete temp file if it exists */
2444 150 : sprintf(slotdir, "%s/%s", PG_REPLSLOT_DIR, name);
2445 150 : sprintf(path, "%s/state.tmp", slotdir);
2446 150 : if (unlink(path) < 0 && errno != ENOENT)
2447 0 : ereport(PANIC,
2448 : (errcode_for_file_access(),
2449 : errmsg("could not remove file \"%s\": %m", path)));
2450 :
2451 150 : sprintf(path, "%s/state", slotdir);
2452 :
2453 150 : elog(DEBUG1, "restoring replication slot from \"%s\"", path);
2454 :
2455 : /* on some operating systems fsyncing a file requires O_RDWR */
2456 150 : fd = OpenTransientFile(path, O_RDWR | PG_BINARY);
2457 :
2458 : /*
2459 : * We do not need to handle this as we are rename()ing the directory into
2460 : * place only after we fsync()ed the state file.
2461 : */
2462 150 : if (fd < 0)
2463 0 : ereport(PANIC,
2464 : (errcode_for_file_access(),
2465 : errmsg("could not open file \"%s\": %m", path)));
2466 :
2467 : /*
2468 : * Sync state file before we're reading from it. We might have crashed
2469 : * while it wasn't synced yet and we shouldn't continue on that basis.
2470 : */
2471 150 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_RESTORE_SYNC);
2472 150 : if (pg_fsync(fd) != 0)
2473 0 : ereport(PANIC,
2474 : (errcode_for_file_access(),
2475 : errmsg("could not fsync file \"%s\": %m",
2476 : path)));
2477 150 : pgstat_report_wait_end();
2478 :
2479 : /* Also sync the parent directory */
2480 150 : START_CRIT_SECTION();
2481 150 : fsync_fname(slotdir, true);
2482 150 : END_CRIT_SECTION();
2483 :
2484 : /* read part of statefile that's guaranteed to be version independent */
2485 150 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
2486 150 : readBytes = read(fd, &cp, ReplicationSlotOnDiskConstantSize);
2487 150 : pgstat_report_wait_end();
2488 150 : if (readBytes != ReplicationSlotOnDiskConstantSize)
2489 : {
2490 0 : if (readBytes < 0)
2491 0 : ereport(PANIC,
2492 : (errcode_for_file_access(),
2493 : errmsg("could not read file \"%s\": %m", path)));
2494 : else
2495 0 : ereport(PANIC,
2496 : (errcode(ERRCODE_DATA_CORRUPTED),
2497 : errmsg("could not read file \"%s\": read %d of %zu",
2498 : path, readBytes,
2499 : (Size) ReplicationSlotOnDiskConstantSize)));
2500 : }
2501 :
2502 : /* verify magic */
2503 150 : if (cp.magic != SLOT_MAGIC)
2504 0 : ereport(PANIC,
2505 : (errcode(ERRCODE_DATA_CORRUPTED),
2506 : errmsg("replication slot file \"%s\" has wrong magic number: %u instead of %u",
2507 : path, cp.magic, SLOT_MAGIC)));
2508 :
2509 : /* verify version */
2510 150 : if (cp.version != SLOT_VERSION)
2511 0 : ereport(PANIC,
2512 : (errcode(ERRCODE_DATA_CORRUPTED),
2513 : errmsg("replication slot file \"%s\" has unsupported version %u",
2514 : path, cp.version)));
2515 :
2516 : /* boundary check on length */
2517 150 : if (cp.length != ReplicationSlotOnDiskV2Size)
2518 0 : ereport(PANIC,
2519 : (errcode(ERRCODE_DATA_CORRUPTED),
2520 : errmsg("replication slot file \"%s\" has corrupted length %u",
2521 : path, cp.length)));
2522 :
2523 : /* Now that we know the size, read the entire file */
2524 150 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
2525 300 : readBytes = read(fd,
2526 : (char *) &cp + ReplicationSlotOnDiskConstantSize,
2527 150 : cp.length);
2528 150 : pgstat_report_wait_end();
2529 150 : if (readBytes != cp.length)
2530 : {
2531 0 : if (readBytes < 0)
2532 0 : ereport(PANIC,
2533 : (errcode_for_file_access(),
2534 : errmsg("could not read file \"%s\": %m", path)));
2535 : else
2536 0 : ereport(PANIC,
2537 : (errcode(ERRCODE_DATA_CORRUPTED),
2538 : errmsg("could not read file \"%s\": read %d of %zu",
2539 : path, readBytes, (Size) cp.length)));
2540 : }
2541 :
2542 150 : if (CloseTransientFile(fd) != 0)
2543 0 : ereport(PANIC,
2544 : (errcode_for_file_access(),
2545 : errmsg("could not close file \"%s\": %m", path)));
2546 :
2547 : /* now verify the CRC */
2548 150 : INIT_CRC32C(checksum);
2549 150 : COMP_CRC32C(checksum,
2550 : (char *) &cp + ReplicationSlotOnDiskNotChecksummedSize,
2551 : ReplicationSlotOnDiskChecksummedSize);
2552 150 : FIN_CRC32C(checksum);
2553 :
2554 150 : if (!EQ_CRC32C(checksum, cp.checksum))
2555 0 : ereport(PANIC,
2556 : (errmsg("checksum mismatch for replication slot file \"%s\": is %u, should be %u",
2557 : path, checksum, cp.checksum)));
2558 :
2559 : /*
2560 : * If we crashed with an ephemeral slot active, don't restore but delete
2561 : * it.
2562 : */
2563 150 : if (cp.slotdata.persistency != RS_PERSISTENT)
2564 : {
2565 0 : if (!rmtree(slotdir, true))
2566 : {
2567 0 : ereport(WARNING,
2568 : (errmsg("could not remove directory \"%s\"",
2569 : slotdir)));
2570 : }
2571 0 : fsync_fname(PG_REPLSLOT_DIR, true);
2572 0 : return;
2573 : }
2574 :
2575 : /*
2576 : * Verify that requirements for the specific slot type are met. That's
2577 : * important because if these aren't met we're not guaranteed to retain
2578 : * all the necessary resources for the slot.
2579 : *
2580 : * NB: We have to do so *after* the above checks for ephemeral slots,
2581 : * because otherwise a slot that shouldn't exist anymore could prevent
2582 : * restarts.
2583 : *
2584 : * NB: Changing the requirements here also requires adapting
2585 : * CheckSlotRequirements() and CheckLogicalDecodingRequirements().
2586 : */
2587 150 : if (cp.slotdata.database != InvalidOid)
2588 : {
2589 112 : if (wal_level < WAL_LEVEL_LOGICAL)
2590 0 : ereport(FATAL,
2591 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2592 : errmsg("logical replication slot \"%s\" exists, but \"wal_level\" < \"logical\"",
2593 : NameStr(cp.slotdata.name)),
2594 : errhint("Change \"wal_level\" to be \"logical\" or higher.")));
2595 :
2596 : /*
2597 : * In standby mode, the hot standby must be enabled. This check is
2598 : * necessary to ensure logical slots are invalidated when they become
2599 : * incompatible due to insufficient wal_level. Otherwise, if the
2600 : * primary reduces wal_level < logical while hot standby is disabled,
2601 : * logical slots would remain valid even after promotion.
2602 : */
2603 112 : if (StandbyMode && !EnableHotStandby)
2604 2 : ereport(FATAL,
2605 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2606 : errmsg("logical replication slot \"%s\" exists on the standby, but \"hot_standby\" = \"off\"",
2607 : NameStr(cp.slotdata.name)),
2608 : errhint("Change \"hot_standby\" to be \"on\".")));
2609 : }
2610 38 : else if (wal_level < WAL_LEVEL_REPLICA)
2611 0 : ereport(FATAL,
2612 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2613 : errmsg("physical replication slot \"%s\" exists, but \"wal_level\" < \"replica\"",
2614 : NameStr(cp.slotdata.name)),
2615 : errhint("Change \"wal_level\" to be \"replica\" or higher.")));
2616 :
2617 : /* nothing can be active yet, don't lock anything */
2618 202 : for (i = 0; i < max_replication_slots; i++)
2619 : {
2620 : ReplicationSlot *slot;
2621 :
2622 202 : slot = &ReplicationSlotCtl->replication_slots[i];
2623 :
2624 202 : if (slot->in_use)
2625 54 : continue;
2626 :
2627 : /* restore the entire set of persistent data */
2628 148 : memcpy(&slot->data, &cp.slotdata,
2629 : sizeof(ReplicationSlotPersistentData));
2630 :
2631 : /* initialize in memory state */
2632 148 : slot->effective_xmin = cp.slotdata.xmin;
2633 148 : slot->effective_catalog_xmin = cp.slotdata.catalog_xmin;
2634 148 : slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
2635 148 : slot->last_saved_restart_lsn = cp.slotdata.restart_lsn;
2636 :
2637 148 : slot->candidate_catalog_xmin = InvalidTransactionId;
2638 148 : slot->candidate_xmin_lsn = InvalidXLogRecPtr;
2639 148 : slot->candidate_restart_lsn = InvalidXLogRecPtr;
2640 148 : slot->candidate_restart_valid = InvalidXLogRecPtr;
2641 :
2642 148 : slot->in_use = true;
2643 148 : slot->active_pid = 0;
2644 :
2645 : /*
2646 : * Set the time since the slot has become inactive after loading the
2647 : * slot from the disk into memory. Whoever acquires the slot i.e.
2648 : * makes the slot active will reset it. Use the same inactive_since
2649 : * time for all the slots.
2650 : */
2651 148 : if (now == 0)
2652 148 : now = GetCurrentTimestamp();
2653 :
2654 148 : ReplicationSlotSetInactiveSince(slot, now, false);
2655 :
2656 148 : restored = true;
2657 148 : break;
2658 : }
2659 :
2660 148 : if (!restored)
2661 0 : ereport(FATAL,
2662 : (errmsg("too many replication slots active before shutdown"),
2663 : errhint("Increase \"max_replication_slots\" and try again.")));
2664 : }
2665 :
2666 : /*
2667 : * Maps an invalidation reason for a replication slot to
2668 : * ReplicationSlotInvalidationCause.
2669 : */
2670 : ReplicationSlotInvalidationCause
2671 0 : GetSlotInvalidationCause(const char *cause_name)
2672 : {
2673 : Assert(cause_name);
2674 :
2675 : /* Search lookup table for the cause having this name */
2676 0 : for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
2677 : {
2678 0 : if (strcmp(SlotInvalidationCauses[i].cause_name, cause_name) == 0)
2679 0 : return SlotInvalidationCauses[i].cause;
2680 : }
2681 :
2682 : Assert(false);
2683 0 : return RS_INVAL_NONE; /* to keep compiler quiet */
2684 : }
2685 :
2686 : /*
2687 : * Maps an ReplicationSlotInvalidationCause to the invalidation
2688 : * reason for a replication slot.
2689 : */
2690 : const char *
2691 84 : GetSlotInvalidationCauseName(ReplicationSlotInvalidationCause cause)
2692 : {
2693 : /* Search lookup table for the name of this cause */
2694 270 : for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
2695 : {
2696 270 : if (SlotInvalidationCauses[i].cause == cause)
2697 84 : return SlotInvalidationCauses[i].cause_name;
2698 : }
2699 :
2700 : Assert(false);
2701 0 : return "none"; /* to keep compiler quiet */
2702 : }
2703 :
2704 : /*
2705 : * A helper function to validate slots specified in GUC synchronized_standby_slots.
2706 : *
2707 : * The rawname will be parsed, and the result will be saved into *elemlist.
2708 : */
2709 : static bool
2710 12 : validate_sync_standby_slots(char *rawname, List **elemlist)
2711 : {
2712 : bool ok;
2713 :
2714 : /* Verify syntax and parse string into a list of identifiers */
2715 12 : ok = SplitIdentifierString(rawname, ',', elemlist);
2716 :
2717 12 : if (!ok)
2718 : {
2719 0 : GUC_check_errdetail("List syntax is invalid.");
2720 : }
2721 12 : else if (MyProc)
2722 : {
2723 : /*
2724 : * Check that each specified slot exist and is physical.
2725 : *
2726 : * Because we need an LWLock, we cannot do this on processes without a
2727 : * PGPROC, so we skip it there; but see comments in
2728 : * StandbySlotsHaveCaughtup() as to why that's not a problem.
2729 : */
2730 6 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
2731 :
2732 18 : foreach_ptr(char, name, *elemlist)
2733 : {
2734 : ReplicationSlot *slot;
2735 :
2736 6 : slot = SearchNamedReplicationSlot(name, false);
2737 :
2738 6 : if (!slot)
2739 : {
2740 0 : GUC_check_errdetail("Replication slot \"%s\" does not exist.",
2741 : name);
2742 0 : ok = false;
2743 0 : break;
2744 : }
2745 :
2746 6 : if (!SlotIsPhysical(slot))
2747 : {
2748 0 : GUC_check_errdetail("\"%s\" is not a physical replication slot.",
2749 : name);
2750 0 : ok = false;
2751 0 : break;
2752 : }
2753 : }
2754 :
2755 6 : LWLockRelease(ReplicationSlotControlLock);
2756 : }
2757 :
2758 12 : return ok;
2759 : }
2760 :
2761 : /*
2762 : * GUC check_hook for synchronized_standby_slots
2763 : */
2764 : bool
2765 2216 : check_synchronized_standby_slots(char **newval, void **extra, GucSource source)
2766 : {
2767 : char *rawname;
2768 : char *ptr;
2769 : List *elemlist;
2770 : int size;
2771 : bool ok;
2772 : SyncStandbySlotsConfigData *config;
2773 :
2774 2216 : if ((*newval)[0] == '\0')
2775 2204 : return true;
2776 :
2777 : /* Need a modifiable copy of the GUC string */
2778 12 : rawname = pstrdup(*newval);
2779 :
2780 : /* Now verify if the specified slots exist and have correct type */
2781 12 : ok = validate_sync_standby_slots(rawname, &elemlist);
2782 :
2783 12 : if (!ok || elemlist == NIL)
2784 : {
2785 0 : pfree(rawname);
2786 0 : list_free(elemlist);
2787 0 : return ok;
2788 : }
2789 :
2790 : /* Compute the size required for the SyncStandbySlotsConfigData struct */
2791 12 : size = offsetof(SyncStandbySlotsConfigData, slot_names);
2792 36 : foreach_ptr(char, slot_name, elemlist)
2793 12 : size += strlen(slot_name) + 1;
2794 :
2795 : /* GUC extra value must be guc_malloc'd, not palloc'd */
2796 12 : config = (SyncStandbySlotsConfigData *) guc_malloc(LOG, size);
2797 12 : if (!config)
2798 0 : return false;
2799 :
2800 : /* Transform the data into SyncStandbySlotsConfigData */
2801 12 : config->nslotnames = list_length(elemlist);
2802 :
2803 12 : ptr = config->slot_names;
2804 36 : foreach_ptr(char, slot_name, elemlist)
2805 : {
2806 12 : strcpy(ptr, slot_name);
2807 12 : ptr += strlen(slot_name) + 1;
2808 : }
2809 :
2810 12 : *extra = config;
2811 :
2812 12 : pfree(rawname);
2813 12 : list_free(elemlist);
2814 12 : return true;
2815 : }
2816 :
2817 : /*
2818 : * GUC assign_hook for synchronized_standby_slots
2819 : */
2820 : void
2821 2216 : assign_synchronized_standby_slots(const char *newval, void *extra)
2822 : {
2823 : /*
2824 : * The standby slots may have changed, so we must recompute the oldest
2825 : * LSN.
2826 : */
2827 2216 : ss_oldest_flush_lsn = InvalidXLogRecPtr;
2828 :
2829 2216 : synchronized_standby_slots_config = (SyncStandbySlotsConfigData *) extra;
2830 2216 : }
2831 :
2832 : /*
2833 : * Check if the passed slot_name is specified in the synchronized_standby_slots GUC.
2834 : */
2835 : bool
2836 58452 : SlotExistsInSyncStandbySlots(const char *slot_name)
2837 : {
2838 : const char *standby_slot_name;
2839 :
2840 : /* Return false if there is no value in synchronized_standby_slots */
2841 58452 : if (synchronized_standby_slots_config == NULL)
2842 58440 : return false;
2843 :
2844 : /*
2845 : * XXX: We are not expecting this list to be long so a linear search
2846 : * shouldn't hurt but if that turns out not to be true then we can cache
2847 : * this information for each WalSender as well.
2848 : */
2849 12 : standby_slot_name = synchronized_standby_slots_config->slot_names;
2850 12 : for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
2851 : {
2852 12 : if (strcmp(standby_slot_name, slot_name) == 0)
2853 12 : return true;
2854 :
2855 0 : standby_slot_name += strlen(standby_slot_name) + 1;
2856 : }
2857 :
2858 0 : return false;
2859 : }
2860 :
2861 : /*
2862 : * Return true if the slots specified in synchronized_standby_slots have caught up to
2863 : * the given WAL location, false otherwise.
2864 : *
2865 : * The elevel parameter specifies the error level used for logging messages
2866 : * related to slots that do not exist, are invalidated, or are inactive.
2867 : */
2868 : bool
2869 1226 : StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
2870 : {
2871 : const char *name;
2872 1226 : int caught_up_slot_num = 0;
2873 1226 : XLogRecPtr min_restart_lsn = InvalidXLogRecPtr;
2874 :
2875 : /*
2876 : * Don't need to wait for the standbys to catch up if there is no value in
2877 : * synchronized_standby_slots.
2878 : */
2879 1226 : if (synchronized_standby_slots_config == NULL)
2880 1194 : return true;
2881 :
2882 : /*
2883 : * Don't need to wait for the standbys to catch up if we are on a standby
2884 : * server, since we do not support syncing slots to cascading standbys.
2885 : */
2886 32 : if (RecoveryInProgress())
2887 0 : return true;
2888 :
2889 : /*
2890 : * Don't need to wait for the standbys to catch up if they are already
2891 : * beyond the specified WAL location.
2892 : */
2893 32 : if (!XLogRecPtrIsInvalid(ss_oldest_flush_lsn) &&
2894 20 : ss_oldest_flush_lsn >= wait_for_lsn)
2895 12 : return true;
2896 :
2897 : /*
2898 : * To prevent concurrent slot dropping and creation while filtering the
2899 : * slots, take the ReplicationSlotControlLock outside of the loop.
2900 : */
2901 20 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
2902 :
2903 20 : name = synchronized_standby_slots_config->slot_names;
2904 30 : for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
2905 : {
2906 : XLogRecPtr restart_lsn;
2907 : bool invalidated;
2908 : bool inactive;
2909 : ReplicationSlot *slot;
2910 :
2911 20 : slot = SearchNamedReplicationSlot(name, false);
2912 :
2913 : /*
2914 : * If a slot name provided in synchronized_standby_slots does not
2915 : * exist, report a message and exit the loop.
2916 : *
2917 : * Though validate_sync_standby_slots (the GUC check_hook) tries to
2918 : * avoid this, it can nonetheless happen because the user can specify
2919 : * a nonexistent slot name before server startup. That function cannot
2920 : * validate such a slot during startup, as ReplicationSlotCtl is not
2921 : * initialized by then. Also, the user might have dropped one slot.
2922 : */
2923 20 : if (!slot)
2924 : {
2925 0 : ereport(elevel,
2926 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2927 : errmsg("replication slot \"%s\" specified in parameter \"%s\" does not exist",
2928 : name, "synchronized_standby_slots"),
2929 : errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
2930 : name),
2931 : errhint("Create the replication slot \"%s\" or amend parameter \"%s\".",
2932 : name, "synchronized_standby_slots"));
2933 0 : break;
2934 : }
2935 :
2936 : /* Same as above: if a slot is not physical, exit the loop. */
2937 20 : if (SlotIsLogical(slot))
2938 : {
2939 0 : ereport(elevel,
2940 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2941 : errmsg("cannot specify logical replication slot \"%s\" in parameter \"%s\"",
2942 : name, "synchronized_standby_slots"),
2943 : errdetail("Logical replication is waiting for correction on replication slot \"%s\".",
2944 : name),
2945 : errhint("Remove the logical replication slot \"%s\" from parameter \"%s\".",
2946 : name, "synchronized_standby_slots"));
2947 0 : break;
2948 : }
2949 :
2950 20 : SpinLockAcquire(&slot->mutex);
2951 20 : restart_lsn = slot->data.restart_lsn;
2952 20 : invalidated = slot->data.invalidated != RS_INVAL_NONE;
2953 20 : inactive = slot->active_pid == 0;
2954 20 : SpinLockRelease(&slot->mutex);
2955 :
2956 20 : if (invalidated)
2957 : {
2958 : /* Specified physical slot has been invalidated */
2959 0 : ereport(elevel,
2960 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2961 : errmsg("physical replication slot \"%s\" specified in parameter \"%s\" has been invalidated",
2962 : name, "synchronized_standby_slots"),
2963 : errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
2964 : name),
2965 : errhint("Drop and recreate the replication slot \"%s\", or amend parameter \"%s\".",
2966 : name, "synchronized_standby_slots"));
2967 0 : break;
2968 : }
2969 :
2970 20 : if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn < wait_for_lsn)
2971 : {
2972 : /* Log a message if no active_pid for this physical slot */
2973 10 : if (inactive)
2974 6 : ereport(elevel,
2975 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2976 : errmsg("replication slot \"%s\" specified in parameter \"%s\" does not have active_pid",
2977 : name, "synchronized_standby_slots"),
2978 : errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
2979 : name),
2980 : errhint("Start the standby associated with the replication slot \"%s\", or amend parameter \"%s\".",
2981 : name, "synchronized_standby_slots"));
2982 :
2983 : /* Continue if the current slot hasn't caught up. */
2984 10 : break;
2985 : }
2986 :
2987 : Assert(restart_lsn >= wait_for_lsn);
2988 :
2989 10 : if (XLogRecPtrIsInvalid(min_restart_lsn) ||
2990 : min_restart_lsn > restart_lsn)
2991 10 : min_restart_lsn = restart_lsn;
2992 :
2993 10 : caught_up_slot_num++;
2994 :
2995 10 : name += strlen(name) + 1;
2996 : }
2997 :
2998 20 : LWLockRelease(ReplicationSlotControlLock);
2999 :
3000 : /*
3001 : * Return false if not all the standbys have caught up to the specified
3002 : * WAL location.
3003 : */
3004 20 : if (caught_up_slot_num != synchronized_standby_slots_config->nslotnames)
3005 10 : return false;
3006 :
3007 : /* The ss_oldest_flush_lsn must not retreat. */
3008 : Assert(XLogRecPtrIsInvalid(ss_oldest_flush_lsn) ||
3009 : min_restart_lsn >= ss_oldest_flush_lsn);
3010 :
3011 10 : ss_oldest_flush_lsn = min_restart_lsn;
3012 :
3013 10 : return true;
3014 : }
3015 :
3016 : /*
3017 : * Wait for physical standbys to confirm receiving the given lsn.
3018 : *
3019 : * Used by logical decoding SQL functions. It waits for physical standbys
3020 : * corresponding to the physical slots specified in the synchronized_standby_slots GUC.
3021 : */
3022 : void
3023 440 : WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
3024 : {
3025 : /*
3026 : * Don't need to wait for the standby to catch up if the current acquired
3027 : * slot is not a logical failover slot, or there is no value in
3028 : * synchronized_standby_slots.
3029 : */
3030 440 : if (!MyReplicationSlot->data.failover || !synchronized_standby_slots_config)
3031 438 : return;
3032 :
3033 2 : ConditionVariablePrepareToSleep(&WalSndCtl->wal_confirm_rcv_cv);
3034 :
3035 : for (;;)
3036 : {
3037 4 : CHECK_FOR_INTERRUPTS();
3038 :
3039 4 : if (ConfigReloadPending)
3040 : {
3041 2 : ConfigReloadPending = false;
3042 2 : ProcessConfigFile(PGC_SIGHUP);
3043 : }
3044 :
3045 : /* Exit if done waiting for every slot. */
3046 4 : if (StandbySlotsHaveCaughtup(wait_for_lsn, WARNING))
3047 2 : break;
3048 :
3049 : /*
3050 : * Wait for the slots in the synchronized_standby_slots to catch up,
3051 : * but use a timeout (1s) so we can also check if the
3052 : * synchronized_standby_slots has been changed.
3053 : */
3054 2 : ConditionVariableTimedSleep(&WalSndCtl->wal_confirm_rcv_cv, 1000,
3055 : WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION);
3056 : }
3057 :
3058 2 : ConditionVariableCancelSleep();
3059 : }
3060 :
3061 : /*
3062 : * GUC check_hook for idle_replication_slot_timeout
3063 : *
3064 : * The value of idle_replication_slot_timeout must be set to 0 during
3065 : * a binary upgrade. See start_postmaster() in pg_upgrade for more details.
3066 : */
3067 : bool
3068 2310 : check_idle_replication_slot_timeout(int *newval, void **extra, GucSource source)
3069 : {
3070 2310 : if (IsBinaryUpgrade && *newval != 0)
3071 : {
3072 0 : GUC_check_errdetail("\"%s\" must be set to 0 during binary upgrade mode.",
3073 : "idle_replication_slot_timeout");
3074 0 : return false;
3075 : }
3076 :
3077 2310 : return true;
3078 : }
|