Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * slot.c
4 : * Replication slot management.
5 : *
6 : *
7 : * Copyright (c) 2012-2025, PostgreSQL Global Development Group
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/backend/replication/slot.c
12 : *
13 : * NOTES
14 : *
15 : * Replication slots are used to keep state about replication streams
16 : * originating from this cluster. Their primary purpose is to prevent the
17 : * premature removal of WAL or of old tuple versions in a manner that would
18 : * interfere with replication; they are also useful for monitoring purposes.
19 : * Slots need to be permanent (to allow restarts), crash-safe, and allocatable
20 : * on standbys (to support cascading setups). The requirement that slots be
21 : * usable on standbys precludes storing them in the system catalogs.
22 : *
23 : * Each replication slot gets its own directory inside the directory
24 : * $PGDATA / PG_REPLSLOT_DIR. Inside that directory the state file will
25 : * contain the slot's own data. Additional data can be stored alongside that
26 : * file if required. While the server is running, the state data is also
27 : * cached in memory for efficiency.
28 : *
29 : * ReplicationSlotAllocationLock must be taken in exclusive mode to allocate
30 : * or free a slot. ReplicationSlotControlLock must be taken in shared mode
31 : * to iterate over the slots, and in exclusive mode to change the in_use flag
32 : * of a slot. The remaining data in each slot is protected by its mutex.
33 : *
34 : *-------------------------------------------------------------------------
35 : */
36 :
37 : #include "postgres.h"
38 :
39 : #include <unistd.h>
40 : #include <sys/stat.h>
41 :
42 : #include "access/transam.h"
43 : #include "access/xlog_internal.h"
44 : #include "access/xlogrecovery.h"
45 : #include "common/file_utils.h"
46 : #include "common/string.h"
47 : #include "miscadmin.h"
48 : #include "pgstat.h"
49 : #include "postmaster/interrupt.h"
50 : #include "replication/slotsync.h"
51 : #include "replication/slot.h"
52 : #include "replication/walsender_private.h"
53 : #include "storage/fd.h"
54 : #include "storage/ipc.h"
55 : #include "storage/proc.h"
56 : #include "storage/procarray.h"
57 : #include "utils/builtins.h"
58 : #include "utils/guc_hooks.h"
59 : #include "utils/injection_point.h"
60 : #include "utils/varlena.h"
61 :
62 : /*
63 : * Replication slot on-disk data structure.
64 : */
65 : typedef struct ReplicationSlotOnDisk
66 : {
67 : /* first part of this struct needs to be version independent */
68 :
69 : /* data not covered by checksum */
70 : uint32 magic;
71 : pg_crc32c checksum;
72 :
73 : /* data covered by checksum */
74 : uint32 version;
75 : uint32 length;
76 :
77 : /*
78 : * The actual data in the slot that follows can differ based on the above
79 : * 'version'.
80 : */
81 :
82 : ReplicationSlotPersistentData slotdata;
83 : } ReplicationSlotOnDisk;
84 :
85 : /*
86 : * Struct for the configuration of synchronized_standby_slots.
87 : *
88 : * Note: this must be a flat representation that can be held in a single chunk
89 : * of guc_malloc'd memory, so that it can be stored as the "extra" data for the
90 : * synchronized_standby_slots GUC.
91 : */
92 : typedef struct
93 : {
94 : /* Number of slot names in the slot_names[] */
95 : int nslotnames;
96 :
97 : /*
98 : * slot_names contains 'nslotnames' consecutive null-terminated C strings.
99 : */
100 : char slot_names[FLEXIBLE_ARRAY_MEMBER];
101 : } SyncStandbySlotsConfigData;
102 :
103 : /*
104 : * Lookup table for slot invalidation causes.
105 : */
106 : typedef struct SlotInvalidationCauseMap
107 : {
108 : ReplicationSlotInvalidationCause cause;
109 : const char *cause_name;
110 : } SlotInvalidationCauseMap;
111 :
112 : static const SlotInvalidationCauseMap SlotInvalidationCauses[] = {
113 : {RS_INVAL_NONE, "none"},
114 : {RS_INVAL_WAL_REMOVED, "wal_removed"},
115 : {RS_INVAL_HORIZON, "rows_removed"},
116 : {RS_INVAL_WAL_LEVEL, "wal_level_insufficient"},
117 : {RS_INVAL_IDLE_TIMEOUT, "idle_timeout"},
118 : };
119 :
120 : /*
121 : * Ensure that the lookup table is up-to-date with the enums defined in
122 : * ReplicationSlotInvalidationCause.
123 : */
124 : StaticAssertDecl(lengthof(SlotInvalidationCauses) == (RS_INVAL_MAX_CAUSES + 1),
125 : "array length mismatch");
126 :
127 : /* size of version independent data */
128 : #define ReplicationSlotOnDiskConstantSize \
129 : offsetof(ReplicationSlotOnDisk, slotdata)
130 : /* size of the part of the slot not covered by the checksum */
131 : #define ReplicationSlotOnDiskNotChecksummedSize \
132 : offsetof(ReplicationSlotOnDisk, version)
133 : /* size of the part covered by the checksum */
134 : #define ReplicationSlotOnDiskChecksummedSize \
135 : sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskNotChecksummedSize
136 : /* size of the slot data that is version dependent */
137 : #define ReplicationSlotOnDiskV2Size \
138 : sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
139 :
140 : #define SLOT_MAGIC 0x1051CA1 /* format identifier */
141 : #define SLOT_VERSION 5 /* version for new files */
142 :
143 : /* Control array for replication slot management */
144 : ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
145 :
146 : /* My backend's replication slot in the shared memory array */
147 : ReplicationSlot *MyReplicationSlot = NULL;
148 :
149 : /* GUC variables */
150 : int max_replication_slots = 10; /* the maximum number of replication
151 : * slots */
152 :
153 : /*
154 : * Invalidate replication slots that have remained idle longer than this
155 : * duration; '0' disables it.
156 : */
157 : int idle_replication_slot_timeout_mins = 0;
158 :
159 : /*
160 : * This GUC lists streaming replication standby server slot names that
161 : * logical WAL sender processes will wait for.
162 : */
163 : char *synchronized_standby_slots;
164 :
165 : /* This is the parsed and cached configuration for synchronized_standby_slots */
166 : static SyncStandbySlotsConfigData *synchronized_standby_slots_config;
167 :
168 : /*
169 : * Oldest LSN that has been confirmed to be flushed to the standbys
170 : * corresponding to the physical slots specified in the synchronized_standby_slots GUC.
171 : */
172 : static XLogRecPtr ss_oldest_flush_lsn = InvalidXLogRecPtr;
173 :
174 : static void ReplicationSlotShmemExit(int code, Datum arg);
175 : static void ReplicationSlotDropPtr(ReplicationSlot *slot);
176 :
177 : /* internal persistency functions */
178 : static void RestoreSlotFromDisk(const char *name);
179 : static void CreateSlotOnDisk(ReplicationSlot *slot);
180 : static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel);
181 :
182 : /*
183 : * Report shared-memory space needed by ReplicationSlotsShmemInit.
184 : */
185 : Size
186 7446 : ReplicationSlotsShmemSize(void)
187 : {
188 7446 : Size size = 0;
189 :
190 7446 : if (max_replication_slots == 0)
191 4 : return size;
192 :
193 7442 : size = offsetof(ReplicationSlotCtlData, replication_slots);
194 7442 : size = add_size(size,
195 : mul_size(max_replication_slots, sizeof(ReplicationSlot)));
196 :
197 7442 : return size;
198 : }
199 :
200 : /*
201 : * Allocate and initialize shared memory for replication slots.
202 : */
203 : void
204 1930 : ReplicationSlotsShmemInit(void)
205 : {
206 : bool found;
207 :
208 1930 : if (max_replication_slots == 0)
209 2 : return;
210 :
211 1928 : ReplicationSlotCtl = (ReplicationSlotCtlData *)
212 1928 : ShmemInitStruct("ReplicationSlot Ctl", ReplicationSlotsShmemSize(),
213 : &found);
214 :
215 1928 : if (!found)
216 : {
217 : int i;
218 :
219 : /* First time through, so initialize */
220 3748 : MemSet(ReplicationSlotCtl, 0, ReplicationSlotsShmemSize());
221 :
222 20816 : for (i = 0; i < max_replication_slots; i++)
223 : {
224 18888 : ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i];
225 :
226 : /* everything else is zeroed by the memset above */
227 18888 : SpinLockInit(&slot->mutex);
228 18888 : LWLockInitialize(&slot->io_in_progress_lock,
229 : LWTRANCHE_REPLICATION_SLOT_IO);
230 18888 : ConditionVariableInit(&slot->active_cv);
231 : }
232 : }
233 : }
234 :
235 : /*
236 : * Register the callback for replication slot cleanup and releasing.
237 : */
238 : void
239 37592 : ReplicationSlotInitialize(void)
240 : {
241 37592 : before_shmem_exit(ReplicationSlotShmemExit, 0);
242 37592 : }
243 :
244 : /*
245 : * Release and cleanup replication slots.
246 : */
247 : static void
248 37592 : ReplicationSlotShmemExit(int code, Datum arg)
249 : {
250 : /* Make sure active replication slots are released */
251 37592 : if (MyReplicationSlot != NULL)
252 426 : ReplicationSlotRelease();
253 :
254 : /* Also cleanup all the temporary slots. */
255 37592 : ReplicationSlotCleanup(false);
256 37592 : }
257 :
258 : /*
259 : * Check whether the passed slot name is valid and report errors at elevel.
260 : *
261 : * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow
262 : * the name to be used as a directory name on every supported OS.
263 : *
264 : * Returns whether the directory name is valid or not if elevel < ERROR.
265 : */
266 : bool
267 1686 : ReplicationSlotValidateName(const char *name, int elevel)
268 : {
269 : const char *cp;
270 :
271 1686 : if (strlen(name) == 0)
272 : {
273 6 : ereport(elevel,
274 : (errcode(ERRCODE_INVALID_NAME),
275 : errmsg("replication slot name \"%s\" is too short",
276 : name)));
277 0 : return false;
278 : }
279 :
280 1680 : if (strlen(name) >= NAMEDATALEN)
281 : {
282 0 : ereport(elevel,
283 : (errcode(ERRCODE_NAME_TOO_LONG),
284 : errmsg("replication slot name \"%s\" is too long",
285 : name)));
286 0 : return false;
287 : }
288 :
289 33968 : for (cp = name; *cp; cp++)
290 : {
291 32290 : if (!((*cp >= 'a' && *cp <= 'z')
292 16134 : || (*cp >= '0' && *cp <= '9')
293 3132 : || (*cp == '_')))
294 : {
295 2 : ereport(elevel,
296 : (errcode(ERRCODE_INVALID_NAME),
297 : errmsg("replication slot name \"%s\" contains invalid character",
298 : name),
299 : errhint("Replication slot names may only contain lower case letters, numbers, and the underscore character.")));
300 0 : return false;
301 : }
302 : }
303 1678 : return true;
304 : }
305 :
306 : /*
307 : * Create a new replication slot and mark it as used by this backend.
308 : *
309 : * name: Name of the slot
310 : * db_specific: logical decoding is db specific; if the slot is going to
311 : * be used for that pass true, otherwise false.
312 : * two_phase: Allows decoding of prepared transactions. We allow this option
313 : * to be enabled only at the slot creation time. If we allow this option
314 : * to be changed during decoding then it is quite possible that we skip
315 : * prepare first time because this option was not enabled. Now next time
316 : * during getting changes, if the two_phase option is enabled it can skip
317 : * prepare because by that time start decoding point has been moved. So the
318 : * user will only get commit prepared.
319 : * failover: If enabled, allows the slot to be synced to standbys so
320 : * that logical replication can be resumed after failover.
321 : * synced: True if the slot is synchronized from the primary server.
322 : */
323 : void
324 1232 : ReplicationSlotCreate(const char *name, bool db_specific,
325 : ReplicationSlotPersistency persistency,
326 : bool two_phase, bool failover, bool synced)
327 : {
328 1232 : ReplicationSlot *slot = NULL;
329 : int i;
330 :
331 : Assert(MyReplicationSlot == NULL);
332 :
333 1232 : ReplicationSlotValidateName(name, ERROR);
334 :
335 1230 : if (failover)
336 : {
337 : /*
338 : * Do not allow users to create the failover enabled slots on the
339 : * standby as we do not support sync to the cascading standby.
340 : *
341 : * However, failover enabled slots can be created during slot
342 : * synchronization because we need to retain the same values as the
343 : * remote slot.
344 : */
345 42 : if (RecoveryInProgress() && !IsSyncingReplicationSlots())
346 0 : ereport(ERROR,
347 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
348 : errmsg("cannot enable failover for a replication slot created on the standby"));
349 :
350 : /*
351 : * Do not allow users to create failover enabled temporary slots,
352 : * because temporary slots will not be synced to the standby.
353 : *
354 : * However, failover enabled temporary slots can be created during
355 : * slot synchronization. See the comments atop slotsync.c for details.
356 : */
357 42 : if (persistency == RS_TEMPORARY && !IsSyncingReplicationSlots())
358 2 : ereport(ERROR,
359 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
360 : errmsg("cannot enable failover for a temporary replication slot"));
361 : }
362 :
363 : /*
364 : * If some other backend ran this code concurrently with us, we'd likely
365 : * both allocate the same slot, and that would be bad. We'd also be at
366 : * risk of missing a name collision. Also, we don't want to try to create
367 : * a new slot while somebody's busy cleaning up an old one, because we
368 : * might both be monkeying with the same directory.
369 : */
370 1228 : LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
371 :
372 : /*
373 : * Check for name collision, and identify an allocatable slot. We need to
374 : * hold ReplicationSlotControlLock in shared mode for this, so that nobody
375 : * else can change the in_use flags while we're looking at them.
376 : */
377 1228 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
378 11760 : for (i = 0; i < max_replication_slots; i++)
379 : {
380 10538 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
381 :
382 10538 : if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
383 6 : ereport(ERROR,
384 : (errcode(ERRCODE_DUPLICATE_OBJECT),
385 : errmsg("replication slot \"%s\" already exists", name)));
386 10532 : if (!s->in_use && slot == NULL)
387 1220 : slot = s;
388 : }
389 1222 : LWLockRelease(ReplicationSlotControlLock);
390 :
391 : /* If all slots are in use, we're out of luck. */
392 1222 : if (slot == NULL)
393 2 : ereport(ERROR,
394 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
395 : errmsg("all replication slots are in use"),
396 : errhint("Free one or increase \"max_replication_slots\".")));
397 :
398 : /*
399 : * Since this slot is not in use, nobody should be looking at any part of
400 : * it other than the in_use field unless they're trying to allocate it.
401 : * And since we hold ReplicationSlotAllocationLock, nobody except us can
402 : * be doing that. So it's safe to initialize the slot.
403 : */
404 : Assert(!slot->in_use);
405 : Assert(slot->active_pid == 0);
406 :
407 : /* first initialize persistent data */
408 1220 : memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData));
409 1220 : namestrcpy(&slot->data.name, name);
410 1220 : slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
411 1220 : slot->data.persistency = persistency;
412 1220 : slot->data.two_phase = two_phase;
413 1220 : slot->data.two_phase_at = InvalidXLogRecPtr;
414 1220 : slot->data.failover = failover;
415 1220 : slot->data.synced = synced;
416 :
417 : /* and then data only present in shared memory */
418 1220 : slot->just_dirtied = false;
419 1220 : slot->dirty = false;
420 1220 : slot->effective_xmin = InvalidTransactionId;
421 1220 : slot->effective_catalog_xmin = InvalidTransactionId;
422 1220 : slot->candidate_catalog_xmin = InvalidTransactionId;
423 1220 : slot->candidate_xmin_lsn = InvalidXLogRecPtr;
424 1220 : slot->candidate_restart_valid = InvalidXLogRecPtr;
425 1220 : slot->candidate_restart_lsn = InvalidXLogRecPtr;
426 1220 : slot->last_saved_confirmed_flush = InvalidXLogRecPtr;
427 1220 : slot->inactive_since = 0;
428 :
429 : /*
430 : * Create the slot on disk. We haven't actually marked the slot allocated
431 : * yet, so no special cleanup is required if this errors out.
432 : */
433 1220 : CreateSlotOnDisk(slot);
434 :
435 : /*
436 : * We need to briefly prevent any other backend from iterating over the
437 : * slots while we flip the in_use flag. We also need to set the active
438 : * flag while holding the ControlLock as otherwise a concurrent
439 : * ReplicationSlotAcquire() could acquire the slot as well.
440 : */
441 1220 : LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
442 :
443 1220 : slot->in_use = true;
444 :
445 : /* We can now mark the slot active, and that makes it our slot. */
446 1220 : SpinLockAcquire(&slot->mutex);
447 : Assert(slot->active_pid == 0);
448 1220 : slot->active_pid = MyProcPid;
449 1220 : SpinLockRelease(&slot->mutex);
450 1220 : MyReplicationSlot = slot;
451 :
452 1220 : LWLockRelease(ReplicationSlotControlLock);
453 :
454 : /*
455 : * Create statistics entry for the new logical slot. We don't collect any
456 : * stats for physical slots, so no need to create an entry for the same.
457 : * See ReplicationSlotDropPtr for why we need to do this before releasing
458 : * ReplicationSlotAllocationLock.
459 : */
460 1220 : if (SlotIsLogical(slot))
461 878 : pgstat_create_replslot(slot);
462 :
463 : /*
464 : * Now that the slot has been marked as in_use and active, it's safe to
465 : * let somebody else try to allocate a slot.
466 : */
467 1220 : LWLockRelease(ReplicationSlotAllocationLock);
468 :
469 : /* Let everybody know we've modified this slot */
470 1220 : ConditionVariableBroadcast(&slot->active_cv);
471 1220 : }
472 :
473 : /*
474 : * Search for the named replication slot.
475 : *
476 : * Return the replication slot if found, otherwise NULL.
477 : */
478 : ReplicationSlot *
479 2666 : SearchNamedReplicationSlot(const char *name, bool need_lock)
480 : {
481 : int i;
482 2666 : ReplicationSlot *slot = NULL;
483 :
484 2666 : if (need_lock)
485 156 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
486 :
487 4586 : for (i = 0; i < max_replication_slots; i++)
488 : {
489 4542 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
490 :
491 4542 : if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
492 : {
493 2622 : slot = s;
494 2622 : break;
495 : }
496 : }
497 :
498 2666 : if (need_lock)
499 156 : LWLockRelease(ReplicationSlotControlLock);
500 :
501 2666 : return slot;
502 : }
503 :
504 : /*
505 : * Return the index of the replication slot in
506 : * ReplicationSlotCtl->replication_slots.
507 : *
508 : * This is mainly useful to have an efficient key for storing replication slot
509 : * stats.
510 : */
511 : int
512 15156 : ReplicationSlotIndex(ReplicationSlot *slot)
513 : {
514 : Assert(slot >= ReplicationSlotCtl->replication_slots &&
515 : slot < ReplicationSlotCtl->replication_slots + max_replication_slots);
516 :
517 15156 : return slot - ReplicationSlotCtl->replication_slots;
518 : }
519 :
520 : /*
521 : * If the slot at 'index' is unused, return false. Otherwise 'name' is set to
522 : * the slot's name and true is returned.
523 : *
524 : * This likely is only useful for pgstat_replslot.c during shutdown, in other
525 : * cases there are obvious TOCTOU issues.
526 : */
527 : bool
528 146 : ReplicationSlotName(int index, Name name)
529 : {
530 : ReplicationSlot *slot;
531 : bool found;
532 :
533 146 : slot = &ReplicationSlotCtl->replication_slots[index];
534 :
535 : /*
536 : * Ensure that the slot cannot be dropped while we copy the name. Don't
537 : * need the spinlock as the name of an existing slot cannot change.
538 : */
539 146 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
540 146 : found = slot->in_use;
541 146 : if (slot->in_use)
542 146 : namestrcpy(name, NameStr(slot->data.name));
543 146 : LWLockRelease(ReplicationSlotControlLock);
544 :
545 146 : return found;
546 : }
547 :
548 : /*
549 : * Find a previously created slot and mark it as used by this process.
550 : *
551 : * An error is raised if nowait is true and the slot is currently in use. If
552 : * nowait is false, we sleep until the slot is released by the owning process.
553 : *
554 : * An error is raised if error_if_invalid is true and the slot is found to
555 : * be invalid. It should always be set to true, except when we are temporarily
556 : * acquiring the slot and don't intend to change it.
557 : */
558 : void
559 2366 : ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
560 : {
561 : ReplicationSlot *s;
562 : int active_pid;
563 :
564 : Assert(name != NULL);
565 :
566 2366 : retry:
567 : Assert(MyReplicationSlot == NULL);
568 :
569 2366 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
570 :
571 : /* Check if the slot exits with the given name. */
572 2366 : s = SearchNamedReplicationSlot(name, false);
573 2366 : if (s == NULL || !s->in_use)
574 : {
575 20 : LWLockRelease(ReplicationSlotControlLock);
576 :
577 20 : ereport(ERROR,
578 : (errcode(ERRCODE_UNDEFINED_OBJECT),
579 : errmsg("replication slot \"%s\" does not exist",
580 : name)));
581 : }
582 :
583 : /* Invalid slots can't be modified or used before accessing the WAL. */
584 2346 : if (error_if_invalid && s->data.invalidated != RS_INVAL_NONE)
585 : {
586 16 : LWLockRelease(ReplicationSlotControlLock);
587 :
588 16 : ereport(ERROR,
589 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
590 : errmsg("can no longer access replication slot \"%s\"",
591 : NameStr(s->data.name)),
592 : errdetail("This replication slot has been invalidated due to \"%s\".",
593 : GetSlotInvalidationCauseName(s->data.invalidated)));
594 : }
595 :
596 : /*
597 : * This is the slot we want; check if it's active under some other
598 : * process. In single user mode, we don't need this check.
599 : */
600 2330 : if (IsUnderPostmaster)
601 : {
602 : /*
603 : * Get ready to sleep on the slot in case it is active. (We may end
604 : * up not sleeping, but we don't want to do this while holding the
605 : * spinlock.)
606 : */
607 2330 : if (!nowait)
608 510 : ConditionVariablePrepareToSleep(&s->active_cv);
609 :
610 : /*
611 : * It is important to reset the inactive_since under spinlock here to
612 : * avoid race conditions with slot invalidation. See comments related
613 : * to inactive_since in InvalidatePossiblyObsoleteSlot.
614 : */
615 2330 : SpinLockAcquire(&s->mutex);
616 2330 : if (s->active_pid == 0)
617 2056 : s->active_pid = MyProcPid;
618 2330 : active_pid = s->active_pid;
619 2330 : ReplicationSlotSetInactiveSince(s, 0, false);
620 2330 : SpinLockRelease(&s->mutex);
621 : }
622 : else
623 : {
624 0 : active_pid = MyProcPid;
625 0 : ReplicationSlotSetInactiveSince(s, 0, true);
626 : }
627 2330 : LWLockRelease(ReplicationSlotControlLock);
628 :
629 : /*
630 : * If we found the slot but it's already active in another process, we
631 : * wait until the owning process signals us that it's been released, or
632 : * error out.
633 : */
634 2330 : if (active_pid != MyProcPid)
635 : {
636 0 : if (!nowait)
637 : {
638 : /* Wait here until we get signaled, and then restart */
639 0 : ConditionVariableSleep(&s->active_cv,
640 : WAIT_EVENT_REPLICATION_SLOT_DROP);
641 0 : ConditionVariableCancelSleep();
642 0 : goto retry;
643 : }
644 :
645 0 : ereport(ERROR,
646 : (errcode(ERRCODE_OBJECT_IN_USE),
647 : errmsg("replication slot \"%s\" is active for PID %d",
648 : NameStr(s->data.name), active_pid)));
649 : }
650 2330 : else if (!nowait)
651 510 : ConditionVariableCancelSleep(); /* no sleep needed after all */
652 :
653 : /* Let everybody know we've modified this slot */
654 2330 : ConditionVariableBroadcast(&s->active_cv);
655 :
656 : /* We made this slot active, so it's ours now. */
657 2330 : MyReplicationSlot = s;
658 :
659 : /*
660 : * The call to pgstat_acquire_replslot() protects against stats for a
661 : * different slot, from before a restart or such, being present during
662 : * pgstat_report_replslot().
663 : */
664 2330 : if (SlotIsLogical(s))
665 1942 : pgstat_acquire_replslot(s);
666 :
667 :
668 2330 : if (am_walsender)
669 : {
670 1586 : ereport(log_replication_commands ? LOG : DEBUG1,
671 : SlotIsLogical(s)
672 : ? errmsg("acquired logical replication slot \"%s\"",
673 : NameStr(s->data.name))
674 : : errmsg("acquired physical replication slot \"%s\"",
675 : NameStr(s->data.name)));
676 : }
677 2330 : }
678 :
679 : /*
680 : * Release the replication slot that this backend considers to own.
681 : *
682 : * This or another backend can re-acquire the slot later.
683 : * Resources this slot requires will be preserved.
684 : */
685 : void
686 2826 : ReplicationSlotRelease(void)
687 : {
688 2826 : ReplicationSlot *slot = MyReplicationSlot;
689 2826 : char *slotname = NULL; /* keep compiler quiet */
690 2826 : bool is_logical = false; /* keep compiler quiet */
691 2826 : TimestampTz now = 0;
692 :
693 : Assert(slot != NULL && slot->active_pid != 0);
694 :
695 2826 : if (am_walsender)
696 : {
697 1976 : slotname = pstrdup(NameStr(slot->data.name));
698 1976 : is_logical = SlotIsLogical(slot);
699 : }
700 :
701 2826 : if (slot->data.persistency == RS_EPHEMERAL)
702 : {
703 : /*
704 : * Delete the slot. There is no !PANIC case where this is allowed to
705 : * fail, all that may happen is an incomplete cleanup of the on-disk
706 : * data.
707 : */
708 10 : ReplicationSlotDropAcquired();
709 : }
710 :
711 : /*
712 : * If slot needed to temporarily restrain both data and catalog xmin to
713 : * create the catalog snapshot, remove that temporary constraint.
714 : * Snapshots can only be exported while the initial snapshot is still
715 : * acquired.
716 : */
717 2826 : if (!TransactionIdIsValid(slot->data.xmin) &&
718 2778 : TransactionIdIsValid(slot->effective_xmin))
719 : {
720 370 : SpinLockAcquire(&slot->mutex);
721 370 : slot->effective_xmin = InvalidTransactionId;
722 370 : SpinLockRelease(&slot->mutex);
723 370 : ReplicationSlotsComputeRequiredXmin(false);
724 : }
725 :
726 : /*
727 : * Set the time since the slot has become inactive. We get the current
728 : * time beforehand to avoid system call while holding the spinlock.
729 : */
730 2826 : now = GetCurrentTimestamp();
731 :
732 2826 : if (slot->data.persistency == RS_PERSISTENT)
733 : {
734 : /*
735 : * Mark persistent slot inactive. We're not freeing it, just
736 : * disconnecting, but wake up others that may be waiting for it.
737 : */
738 2264 : SpinLockAcquire(&slot->mutex);
739 2264 : slot->active_pid = 0;
740 2264 : ReplicationSlotSetInactiveSince(slot, now, false);
741 2264 : SpinLockRelease(&slot->mutex);
742 2264 : ConditionVariableBroadcast(&slot->active_cv);
743 : }
744 : else
745 562 : ReplicationSlotSetInactiveSince(slot, now, true);
746 :
747 2826 : MyReplicationSlot = NULL;
748 :
749 : /* might not have been set when we've been a plain slot */
750 2826 : LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
751 2826 : MyProc->statusFlags &= ~PROC_IN_LOGICAL_DECODING;
752 2826 : ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags;
753 2826 : LWLockRelease(ProcArrayLock);
754 :
755 2826 : if (am_walsender)
756 : {
757 1976 : ereport(log_replication_commands ? LOG : DEBUG1,
758 : is_logical
759 : ? errmsg("released logical replication slot \"%s\"",
760 : slotname)
761 : : errmsg("released physical replication slot \"%s\"",
762 : slotname));
763 :
764 1976 : pfree(slotname);
765 : }
766 2826 : }
767 :
768 : /*
769 : * Cleanup temporary slots created in current session.
770 : *
771 : * Cleanup only synced temporary slots if 'synced_only' is true, else
772 : * cleanup all temporary slots.
773 : */
774 : void
775 80512 : ReplicationSlotCleanup(bool synced_only)
776 : {
777 : int i;
778 :
779 : Assert(MyReplicationSlot == NULL);
780 :
781 80512 : restart:
782 80512 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
783 872740 : for (i = 0; i < max_replication_slots; i++)
784 : {
785 792506 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
786 :
787 792506 : if (!s->in_use)
788 767644 : continue;
789 :
790 24862 : SpinLockAcquire(&s->mutex);
791 24862 : if ((s->active_pid == MyProcPid &&
792 278 : (!synced_only || s->data.synced)))
793 : {
794 : Assert(s->data.persistency == RS_TEMPORARY);
795 278 : SpinLockRelease(&s->mutex);
796 278 : LWLockRelease(ReplicationSlotControlLock); /* avoid deadlock */
797 :
798 278 : ReplicationSlotDropPtr(s);
799 :
800 278 : ConditionVariableBroadcast(&s->active_cv);
801 278 : goto restart;
802 : }
803 : else
804 24584 : SpinLockRelease(&s->mutex);
805 : }
806 :
807 80234 : LWLockRelease(ReplicationSlotControlLock);
808 80234 : }
809 :
810 : /*
811 : * Permanently drop replication slot identified by the passed in name.
812 : */
813 : void
814 766 : ReplicationSlotDrop(const char *name, bool nowait)
815 : {
816 : Assert(MyReplicationSlot == NULL);
817 :
818 766 : ReplicationSlotAcquire(name, nowait, false);
819 :
820 : /*
821 : * Do not allow users to drop the slots which are currently being synced
822 : * from the primary to the standby.
823 : */
824 750 : if (RecoveryInProgress() && MyReplicationSlot->data.synced)
825 2 : ereport(ERROR,
826 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
827 : errmsg("cannot drop replication slot \"%s\"", name),
828 : errdetail("This replication slot is being synchronized from the primary server."));
829 :
830 748 : ReplicationSlotDropAcquired();
831 748 : }
832 :
833 : /*
834 : * Change the definition of the slot identified by the specified name.
835 : */
836 : void
837 12 : ReplicationSlotAlter(const char *name, const bool *failover,
838 : const bool *two_phase)
839 : {
840 12 : bool update_slot = false;
841 :
842 : Assert(MyReplicationSlot == NULL);
843 : Assert(failover || two_phase);
844 :
845 12 : ReplicationSlotAcquire(name, false, true);
846 :
847 10 : if (SlotIsPhysical(MyReplicationSlot))
848 0 : ereport(ERROR,
849 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
850 : errmsg("cannot use %s with a physical replication slot",
851 : "ALTER_REPLICATION_SLOT"));
852 :
853 10 : if (RecoveryInProgress())
854 : {
855 : /*
856 : * Do not allow users to alter the slots which are currently being
857 : * synced from the primary to the standby.
858 : */
859 2 : if (MyReplicationSlot->data.synced)
860 2 : ereport(ERROR,
861 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
862 : errmsg("cannot alter replication slot \"%s\"", name),
863 : errdetail("This replication slot is being synchronized from the primary server."));
864 :
865 : /*
866 : * Do not allow users to enable failover on the standby as we do not
867 : * support sync to the cascading standby.
868 : */
869 0 : if (failover && *failover)
870 0 : ereport(ERROR,
871 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
872 : errmsg("cannot enable failover for a replication slot"
873 : " on the standby"));
874 : }
875 :
876 8 : if (failover)
877 : {
878 : /*
879 : * Do not allow users to enable failover for temporary slots as we do
880 : * not support syncing temporary slots to the standby.
881 : */
882 6 : if (*failover && MyReplicationSlot->data.persistency == RS_TEMPORARY)
883 0 : ereport(ERROR,
884 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
885 : errmsg("cannot enable failover for a temporary replication slot"));
886 :
887 6 : if (MyReplicationSlot->data.failover != *failover)
888 : {
889 6 : SpinLockAcquire(&MyReplicationSlot->mutex);
890 6 : MyReplicationSlot->data.failover = *failover;
891 6 : SpinLockRelease(&MyReplicationSlot->mutex);
892 :
893 6 : update_slot = true;
894 : }
895 : }
896 :
897 8 : if (two_phase && MyReplicationSlot->data.two_phase != *two_phase)
898 : {
899 2 : SpinLockAcquire(&MyReplicationSlot->mutex);
900 2 : MyReplicationSlot->data.two_phase = *two_phase;
901 2 : SpinLockRelease(&MyReplicationSlot->mutex);
902 :
903 2 : update_slot = true;
904 : }
905 :
906 8 : if (update_slot)
907 : {
908 8 : ReplicationSlotMarkDirty();
909 8 : ReplicationSlotSave();
910 : }
911 :
912 8 : ReplicationSlotRelease();
913 8 : }
914 :
915 : /*
916 : * Permanently drop the currently acquired replication slot.
917 : */
918 : void
919 772 : ReplicationSlotDropAcquired(void)
920 : {
921 772 : ReplicationSlot *slot = MyReplicationSlot;
922 :
923 : Assert(MyReplicationSlot != NULL);
924 :
925 : /* slot isn't acquired anymore */
926 772 : MyReplicationSlot = NULL;
927 :
928 772 : ReplicationSlotDropPtr(slot);
929 772 : }
930 :
931 : /*
932 : * Permanently drop the replication slot which will be released by the point
933 : * this function returns.
934 : */
935 : static void
936 1050 : ReplicationSlotDropPtr(ReplicationSlot *slot)
937 : {
938 : char path[MAXPGPATH];
939 : char tmppath[MAXPGPATH];
940 :
941 : /*
942 : * If some other backend ran this code concurrently with us, we might try
943 : * to delete a slot with a certain name while someone else was trying to
944 : * create a slot with the same name.
945 : */
946 1050 : LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
947 :
948 : /* Generate pathnames. */
949 1050 : sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(slot->data.name));
950 1050 : sprintf(tmppath, "%s/%s.tmp", PG_REPLSLOT_DIR, NameStr(slot->data.name));
951 :
952 : /*
953 : * Rename the slot directory on disk, so that we'll no longer recognize
954 : * this as a valid slot. Note that if this fails, we've got to mark the
955 : * slot inactive before bailing out. If we're dropping an ephemeral or a
956 : * temporary slot, we better never fail hard as the caller won't expect
957 : * the slot to survive and this might get called during error handling.
958 : */
959 1050 : if (rename(path, tmppath) == 0)
960 : {
961 : /*
962 : * We need to fsync() the directory we just renamed and its parent to
963 : * make sure that our changes are on disk in a crash-safe fashion. If
964 : * fsync() fails, we can't be sure whether the changes are on disk or
965 : * not. For now, we handle that by panicking;
966 : * StartupReplicationSlots() will try to straighten it out after
967 : * restart.
968 : */
969 1050 : START_CRIT_SECTION();
970 1050 : fsync_fname(tmppath, true);
971 1050 : fsync_fname(PG_REPLSLOT_DIR, true);
972 1050 : END_CRIT_SECTION();
973 : }
974 : else
975 : {
976 0 : bool fail_softly = slot->data.persistency != RS_PERSISTENT;
977 :
978 0 : SpinLockAcquire(&slot->mutex);
979 0 : slot->active_pid = 0;
980 0 : SpinLockRelease(&slot->mutex);
981 :
982 : /* wake up anyone waiting on this slot */
983 0 : ConditionVariableBroadcast(&slot->active_cv);
984 :
985 0 : ereport(fail_softly ? WARNING : ERROR,
986 : (errcode_for_file_access(),
987 : errmsg("could not rename file \"%s\" to \"%s\": %m",
988 : path, tmppath)));
989 : }
990 :
991 : /*
992 : * The slot is definitely gone. Lock out concurrent scans of the array
993 : * long enough to kill it. It's OK to clear the active PID here without
994 : * grabbing the mutex because nobody else can be scanning the array here,
995 : * and nobody can be attached to this slot and thus access it without
996 : * scanning the array.
997 : *
998 : * Also wake up processes waiting for it.
999 : */
1000 1050 : LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
1001 1050 : slot->active_pid = 0;
1002 1050 : slot->in_use = false;
1003 1050 : LWLockRelease(ReplicationSlotControlLock);
1004 1050 : ConditionVariableBroadcast(&slot->active_cv);
1005 :
1006 : /*
1007 : * Slot is dead and doesn't prevent resource removal anymore, recompute
1008 : * limits.
1009 : */
1010 1050 : ReplicationSlotsComputeRequiredXmin(false);
1011 1050 : ReplicationSlotsComputeRequiredLSN();
1012 :
1013 : /*
1014 : * If removing the directory fails, the worst thing that will happen is
1015 : * that the user won't be able to create a new slot with the same name
1016 : * until the next server restart. We warn about it, but that's all.
1017 : */
1018 1050 : if (!rmtree(tmppath, true))
1019 0 : ereport(WARNING,
1020 : (errmsg("could not remove directory \"%s\"", tmppath)));
1021 :
1022 : /*
1023 : * Drop the statistics entry for the replication slot. Do this while
1024 : * holding ReplicationSlotAllocationLock so that we don't drop a
1025 : * statistics entry for another slot with the same name just created in
1026 : * another session.
1027 : */
1028 1050 : if (SlotIsLogical(slot))
1029 756 : pgstat_drop_replslot(slot);
1030 :
1031 : /*
1032 : * We release this at the very end, so that nobody starts trying to create
1033 : * a slot while we're still cleaning up the detritus of the old one.
1034 : */
1035 1050 : LWLockRelease(ReplicationSlotAllocationLock);
1036 1050 : }
1037 :
1038 : /*
1039 : * Serialize the currently acquired slot's state from memory to disk, thereby
1040 : * guaranteeing the current state will survive a crash.
1041 : */
1042 : void
1043 2450 : ReplicationSlotSave(void)
1044 : {
1045 : char path[MAXPGPATH];
1046 :
1047 : Assert(MyReplicationSlot != NULL);
1048 :
1049 2450 : sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(MyReplicationSlot->data.name));
1050 2450 : SaveSlotToPath(MyReplicationSlot, path, ERROR);
1051 2450 : }
1052 :
1053 : /*
1054 : * Signal that it would be useful if the currently acquired slot would be
1055 : * flushed out to disk.
1056 : *
1057 : * Note that the actual flush to disk can be delayed for a long time, if
1058 : * required for correctness explicitly do a ReplicationSlotSave().
1059 : */
1060 : void
1061 26018 : ReplicationSlotMarkDirty(void)
1062 : {
1063 26018 : ReplicationSlot *slot = MyReplicationSlot;
1064 :
1065 : Assert(MyReplicationSlot != NULL);
1066 :
1067 26018 : SpinLockAcquire(&slot->mutex);
1068 26018 : MyReplicationSlot->just_dirtied = true;
1069 26018 : MyReplicationSlot->dirty = true;
1070 26018 : SpinLockRelease(&slot->mutex);
1071 26018 : }
1072 :
1073 : /*
1074 : * Convert a slot that's marked as RS_EPHEMERAL or RS_TEMPORARY to a
1075 : * RS_PERSISTENT slot, guaranteeing it will be there after an eventual crash.
1076 : */
1077 : void
1078 852 : ReplicationSlotPersist(void)
1079 : {
1080 852 : ReplicationSlot *slot = MyReplicationSlot;
1081 :
1082 : Assert(slot != NULL);
1083 : Assert(slot->data.persistency != RS_PERSISTENT);
1084 :
1085 852 : SpinLockAcquire(&slot->mutex);
1086 852 : slot->data.persistency = RS_PERSISTENT;
1087 852 : SpinLockRelease(&slot->mutex);
1088 :
1089 852 : ReplicationSlotMarkDirty();
1090 852 : ReplicationSlotSave();
1091 852 : }
1092 :
1093 : /*
1094 : * Compute the oldest xmin across all slots and store it in the ProcArray.
1095 : *
1096 : * If already_locked is true, ProcArrayLock has already been acquired
1097 : * exclusively.
1098 : */
1099 : void
1100 4232 : ReplicationSlotsComputeRequiredXmin(bool already_locked)
1101 : {
1102 : int i;
1103 4232 : TransactionId agg_xmin = InvalidTransactionId;
1104 4232 : TransactionId agg_catalog_xmin = InvalidTransactionId;
1105 :
1106 : Assert(ReplicationSlotCtl != NULL);
1107 :
1108 4232 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1109 :
1110 42566 : for (i = 0; i < max_replication_slots; i++)
1111 : {
1112 38334 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
1113 : TransactionId effective_xmin;
1114 : TransactionId effective_catalog_xmin;
1115 : bool invalidated;
1116 :
1117 38334 : if (!s->in_use)
1118 34510 : continue;
1119 :
1120 3824 : SpinLockAcquire(&s->mutex);
1121 3824 : effective_xmin = s->effective_xmin;
1122 3824 : effective_catalog_xmin = s->effective_catalog_xmin;
1123 3824 : invalidated = s->data.invalidated != RS_INVAL_NONE;
1124 3824 : SpinLockRelease(&s->mutex);
1125 :
1126 : /* invalidated slots need not apply */
1127 3824 : if (invalidated)
1128 44 : continue;
1129 :
1130 : /* check the data xmin */
1131 3780 : if (TransactionIdIsValid(effective_xmin) &&
1132 10 : (!TransactionIdIsValid(agg_xmin) ||
1133 10 : TransactionIdPrecedes(effective_xmin, agg_xmin)))
1134 546 : agg_xmin = effective_xmin;
1135 :
1136 : /* check the catalog xmin */
1137 3780 : if (TransactionIdIsValid(effective_catalog_xmin) &&
1138 1534 : (!TransactionIdIsValid(agg_catalog_xmin) ||
1139 1534 : TransactionIdPrecedes(effective_catalog_xmin, agg_catalog_xmin)))
1140 2084 : agg_catalog_xmin = effective_catalog_xmin;
1141 : }
1142 :
1143 4232 : LWLockRelease(ReplicationSlotControlLock);
1144 :
1145 4232 : ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
1146 4232 : }
1147 :
1148 : /*
1149 : * Compute the oldest restart LSN across all slots and inform xlog module.
1150 : *
1151 : * Note: while max_slot_wal_keep_size is theoretically relevant for this
1152 : * purpose, we don't try to account for that, because this module doesn't
1153 : * know what to compare against.
1154 : */
1155 : void
1156 26854 : ReplicationSlotsComputeRequiredLSN(void)
1157 : {
1158 : int i;
1159 26854 : XLogRecPtr min_required = InvalidXLogRecPtr;
1160 :
1161 : Assert(ReplicationSlotCtl != NULL);
1162 :
1163 26854 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1164 288752 : for (i = 0; i < max_replication_slots; i++)
1165 : {
1166 261898 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
1167 : XLogRecPtr restart_lsn;
1168 : bool invalidated;
1169 :
1170 261898 : if (!s->in_use)
1171 235576 : continue;
1172 :
1173 26322 : SpinLockAcquire(&s->mutex);
1174 26322 : restart_lsn = s->data.restart_lsn;
1175 26322 : invalidated = s->data.invalidated != RS_INVAL_NONE;
1176 26322 : SpinLockRelease(&s->mutex);
1177 :
1178 : /* invalidated slots need not apply */
1179 26322 : if (invalidated)
1180 46 : continue;
1181 :
1182 26276 : if (restart_lsn != InvalidXLogRecPtr &&
1183 1462 : (min_required == InvalidXLogRecPtr ||
1184 : restart_lsn < min_required))
1185 24866 : min_required = restart_lsn;
1186 : }
1187 26854 : LWLockRelease(ReplicationSlotControlLock);
1188 :
1189 26854 : XLogSetReplicationSlotMinimumLSN(min_required);
1190 26854 : }
1191 :
1192 : /*
1193 : * Compute the oldest WAL LSN required by *logical* decoding slots..
1194 : *
1195 : * Returns InvalidXLogRecPtr if logical decoding is disabled or no logical
1196 : * slots exist.
1197 : *
1198 : * NB: this returns a value >= ReplicationSlotsComputeRequiredLSN(), since it
1199 : * ignores physical replication slots.
1200 : *
1201 : * The results aren't required frequently, so we don't maintain a precomputed
1202 : * value like we do for ComputeRequiredLSN() and ComputeRequiredXmin().
1203 : */
1204 : XLogRecPtr
1205 4968 : ReplicationSlotsComputeLogicalRestartLSN(void)
1206 : {
1207 4968 : XLogRecPtr result = InvalidXLogRecPtr;
1208 : int i;
1209 :
1210 4968 : if (max_replication_slots <= 0)
1211 4 : return InvalidXLogRecPtr;
1212 :
1213 4964 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1214 :
1215 53536 : for (i = 0; i < max_replication_slots; i++)
1216 : {
1217 : ReplicationSlot *s;
1218 : XLogRecPtr restart_lsn;
1219 : bool invalidated;
1220 :
1221 48572 : s = &ReplicationSlotCtl->replication_slots[i];
1222 :
1223 : /* cannot change while ReplicationSlotCtlLock is held */
1224 48572 : if (!s->in_use)
1225 47284 : continue;
1226 :
1227 : /* we're only interested in logical slots */
1228 1288 : if (!SlotIsLogical(s))
1229 948 : continue;
1230 :
1231 : /* read once, it's ok if it increases while we're checking */
1232 340 : SpinLockAcquire(&s->mutex);
1233 340 : restart_lsn = s->data.restart_lsn;
1234 340 : invalidated = s->data.invalidated != RS_INVAL_NONE;
1235 340 : SpinLockRelease(&s->mutex);
1236 :
1237 : /* invalidated slots need not apply */
1238 340 : if (invalidated)
1239 8 : continue;
1240 :
1241 332 : if (restart_lsn == InvalidXLogRecPtr)
1242 0 : continue;
1243 :
1244 332 : if (result == InvalidXLogRecPtr ||
1245 : restart_lsn < result)
1246 272 : result = restart_lsn;
1247 : }
1248 :
1249 4964 : LWLockRelease(ReplicationSlotControlLock);
1250 :
1251 4964 : return result;
1252 : }
1253 :
1254 : /*
1255 : * ReplicationSlotsCountDBSlots -- count the number of slots that refer to the
1256 : * passed database oid.
1257 : *
1258 : * Returns true if there are any slots referencing the database. *nslots will
1259 : * be set to the absolute number of slots in the database, *nactive to ones
1260 : * currently active.
1261 : */
1262 : bool
1263 70 : ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
1264 : {
1265 : int i;
1266 :
1267 70 : *nslots = *nactive = 0;
1268 :
1269 70 : if (max_replication_slots <= 0)
1270 0 : return false;
1271 :
1272 70 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1273 712 : for (i = 0; i < max_replication_slots; i++)
1274 : {
1275 : ReplicationSlot *s;
1276 :
1277 642 : s = &ReplicationSlotCtl->replication_slots[i];
1278 :
1279 : /* cannot change while ReplicationSlotCtlLock is held */
1280 642 : if (!s->in_use)
1281 606 : continue;
1282 :
1283 : /* only logical slots are database specific, skip */
1284 36 : if (!SlotIsLogical(s))
1285 20 : continue;
1286 :
1287 : /* not our database, skip */
1288 16 : if (s->data.database != dboid)
1289 10 : continue;
1290 :
1291 : /* NB: intentionally counting invalidated slots */
1292 :
1293 : /* count slots with spinlock held */
1294 6 : SpinLockAcquire(&s->mutex);
1295 6 : (*nslots)++;
1296 6 : if (s->active_pid != 0)
1297 2 : (*nactive)++;
1298 6 : SpinLockRelease(&s->mutex);
1299 : }
1300 70 : LWLockRelease(ReplicationSlotControlLock);
1301 :
1302 70 : if (*nslots > 0)
1303 6 : return true;
1304 64 : return false;
1305 : }
1306 :
1307 : /*
1308 : * ReplicationSlotsDropDBSlots -- Drop all db-specific slots relating to the
1309 : * passed database oid. The caller should hold an exclusive lock on the
1310 : * pg_database oid for the database to prevent creation of new slots on the db
1311 : * or replay from existing slots.
1312 : *
1313 : * Another session that concurrently acquires an existing slot on the target DB
1314 : * (most likely to drop it) may cause this function to ERROR. If that happens
1315 : * it may have dropped some but not all slots.
1316 : *
1317 : * This routine isn't as efficient as it could be - but we don't drop
1318 : * databases often, especially databases with lots of slots.
1319 : */
1320 : void
1321 94 : ReplicationSlotsDropDBSlots(Oid dboid)
1322 : {
1323 : int i;
1324 :
1325 94 : if (max_replication_slots <= 0)
1326 0 : return;
1327 :
1328 94 : restart:
1329 104 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1330 970 : for (i = 0; i < max_replication_slots; i++)
1331 : {
1332 : ReplicationSlot *s;
1333 : char *slotname;
1334 : int active_pid;
1335 :
1336 876 : s = &ReplicationSlotCtl->replication_slots[i];
1337 :
1338 : /* cannot change while ReplicationSlotCtlLock is held */
1339 876 : if (!s->in_use)
1340 822 : continue;
1341 :
1342 : /* only logical slots are database specific, skip */
1343 54 : if (!SlotIsLogical(s))
1344 22 : continue;
1345 :
1346 : /* not our database, skip */
1347 32 : if (s->data.database != dboid)
1348 22 : continue;
1349 :
1350 : /* NB: intentionally including invalidated slots */
1351 :
1352 : /* acquire slot, so ReplicationSlotDropAcquired can be reused */
1353 10 : SpinLockAcquire(&s->mutex);
1354 : /* can't change while ReplicationSlotControlLock is held */
1355 10 : slotname = NameStr(s->data.name);
1356 10 : active_pid = s->active_pid;
1357 10 : if (active_pid == 0)
1358 : {
1359 10 : MyReplicationSlot = s;
1360 10 : s->active_pid = MyProcPid;
1361 : }
1362 10 : SpinLockRelease(&s->mutex);
1363 :
1364 : /*
1365 : * Even though we hold an exclusive lock on the database object a
1366 : * logical slot for that DB can still be active, e.g. if it's
1367 : * concurrently being dropped by a backend connected to another DB.
1368 : *
1369 : * That's fairly unlikely in practice, so we'll just bail out.
1370 : *
1371 : * The slot sync worker holds a shared lock on the database before
1372 : * operating on synced logical slots to avoid conflict with the drop
1373 : * happening here. The persistent synced slots are thus safe but there
1374 : * is a possibility that the slot sync worker has created a temporary
1375 : * slot (which stays active even on release) and we are trying to drop
1376 : * that here. In practice, the chances of hitting this scenario are
1377 : * less as during slot synchronization, the temporary slot is
1378 : * immediately converted to persistent and thus is safe due to the
1379 : * shared lock taken on the database. So, we'll just bail out in such
1380 : * a case.
1381 : *
1382 : * XXX: We can consider shutting down the slot sync worker before
1383 : * trying to drop synced temporary slots here.
1384 : */
1385 10 : if (active_pid)
1386 0 : ereport(ERROR,
1387 : (errcode(ERRCODE_OBJECT_IN_USE),
1388 : errmsg("replication slot \"%s\" is active for PID %d",
1389 : slotname, active_pid)));
1390 :
1391 : /*
1392 : * To avoid duplicating ReplicationSlotDropAcquired() and to avoid
1393 : * holding ReplicationSlotControlLock over filesystem operations,
1394 : * release ReplicationSlotControlLock and use
1395 : * ReplicationSlotDropAcquired.
1396 : *
1397 : * As that means the set of slots could change, restart scan from the
1398 : * beginning each time we release the lock.
1399 : */
1400 10 : LWLockRelease(ReplicationSlotControlLock);
1401 10 : ReplicationSlotDropAcquired();
1402 10 : goto restart;
1403 : }
1404 94 : LWLockRelease(ReplicationSlotControlLock);
1405 : }
1406 :
1407 :
1408 : /*
1409 : * Check whether the server's configuration supports using replication
1410 : * slots.
1411 : */
1412 : void
1413 3238 : CheckSlotRequirements(void)
1414 : {
1415 : /*
1416 : * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
1417 : * needs the same check.
1418 : */
1419 :
1420 3238 : if (max_replication_slots == 0)
1421 0 : ereport(ERROR,
1422 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1423 : errmsg("replication slots can only be used if \"max_replication_slots\" > 0")));
1424 :
1425 3238 : if (wal_level < WAL_LEVEL_REPLICA)
1426 0 : ereport(ERROR,
1427 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1428 : errmsg("replication slots can only be used if \"wal_level\" >= \"replica\"")));
1429 3238 : }
1430 :
1431 : /*
1432 : * Check whether the user has privilege to use replication slots.
1433 : */
1434 : void
1435 1054 : CheckSlotPermissions(void)
1436 : {
1437 1054 : if (!has_rolreplication(GetUserId()))
1438 10 : ereport(ERROR,
1439 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1440 : errmsg("permission denied to use replication slots"),
1441 : errdetail("Only roles with the %s attribute may use replication slots.",
1442 : "REPLICATION")));
1443 1044 : }
1444 :
1445 : /*
1446 : * Reserve WAL for the currently active slot.
1447 : *
1448 : * Compute and set restart_lsn in a manner that's appropriate for the type of
1449 : * the slot and concurrency safe.
1450 : */
1451 : void
1452 1144 : ReplicationSlotReserveWal(void)
1453 : {
1454 1144 : ReplicationSlot *slot = MyReplicationSlot;
1455 :
1456 : Assert(slot != NULL);
1457 : Assert(slot->data.restart_lsn == InvalidXLogRecPtr);
1458 :
1459 : /*
1460 : * The replication slot mechanism is used to prevent removal of required
1461 : * WAL. As there is no interlock between this routine and checkpoints, WAL
1462 : * segments could concurrently be removed when a now stale return value of
1463 : * ReplicationSlotsComputeRequiredLSN() is used. In the unlikely case that
1464 : * this happens we'll just retry.
1465 : */
1466 : while (true)
1467 0 : {
1468 : XLogSegNo segno;
1469 : XLogRecPtr restart_lsn;
1470 :
1471 : /*
1472 : * For logical slots log a standby snapshot and start logical decoding
1473 : * at exactly that position. That allows the slot to start up more
1474 : * quickly. But on a standby we cannot do WAL writes, so just use the
1475 : * replay pointer; effectively, an attempt to create a logical slot on
1476 : * standby will cause it to wait for an xl_running_xact record to be
1477 : * logged independently on the primary, so that a snapshot can be
1478 : * built using the record.
1479 : *
1480 : * None of this is needed (or indeed helpful) for physical slots as
1481 : * they'll start replay at the last logged checkpoint anyway. Instead
1482 : * return the location of the last redo LSN. While that slightly
1483 : * increases the chance that we have to retry, it's where a base
1484 : * backup has to start replay at.
1485 : */
1486 1144 : if (SlotIsPhysical(slot))
1487 292 : restart_lsn = GetRedoRecPtr();
1488 852 : else if (RecoveryInProgress())
1489 44 : restart_lsn = GetXLogReplayRecPtr(NULL);
1490 : else
1491 808 : restart_lsn = GetXLogInsertRecPtr();
1492 :
1493 1144 : SpinLockAcquire(&slot->mutex);
1494 1144 : slot->data.restart_lsn = restart_lsn;
1495 1144 : SpinLockRelease(&slot->mutex);
1496 :
1497 : /* prevent WAL removal as fast as possible */
1498 1144 : ReplicationSlotsComputeRequiredLSN();
1499 :
1500 : /*
1501 : * If all required WAL is still there, great, otherwise retry. The
1502 : * slot should prevent further removal of WAL, unless there's a
1503 : * concurrent ReplicationSlotsComputeRequiredLSN() after we've written
1504 : * the new restart_lsn above, so normally we should never need to loop
1505 : * more than twice.
1506 : */
1507 1144 : XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size);
1508 1144 : if (XLogGetLastRemovedSegno() < segno)
1509 1144 : break;
1510 : }
1511 :
1512 1144 : if (!RecoveryInProgress() && SlotIsLogical(slot))
1513 : {
1514 : XLogRecPtr flushptr;
1515 :
1516 : /* make sure we have enough information to start */
1517 808 : flushptr = LogStandbySnapshot();
1518 :
1519 : /* and make sure it's fsynced to disk */
1520 808 : XLogFlush(flushptr);
1521 : }
1522 1144 : }
1523 :
1524 : /*
1525 : * Report that replication slot needs to be invalidated
1526 : */
1527 : static void
1528 42 : ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
1529 : bool terminating,
1530 : int pid,
1531 : NameData slotname,
1532 : XLogRecPtr restart_lsn,
1533 : XLogRecPtr oldestLSN,
1534 : TransactionId snapshotConflictHorizon,
1535 : long slot_idle_seconds)
1536 : {
1537 : StringInfoData err_detail;
1538 : StringInfoData err_hint;
1539 :
1540 42 : initStringInfo(&err_detail);
1541 42 : initStringInfo(&err_hint);
1542 :
1543 42 : switch (cause)
1544 : {
1545 12 : case RS_INVAL_WAL_REMOVED:
1546 : {
1547 12 : unsigned long long ex = oldestLSN - restart_lsn;
1548 :
1549 12 : appendStringInfo(&err_detail,
1550 12 : ngettext("The slot's restart_lsn %X/%X exceeds the limit by %llu byte.",
1551 : "The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.",
1552 : ex),
1553 12 : LSN_FORMAT_ARGS(restart_lsn),
1554 : ex);
1555 : /* translator: %s is a GUC variable name */
1556 12 : appendStringInfo(&err_hint, _("You might need to increase \"%s\"."),
1557 : "max_slot_wal_keep_size");
1558 12 : break;
1559 : }
1560 24 : case RS_INVAL_HORIZON:
1561 24 : appendStringInfo(&err_detail, _("The slot conflicted with xid horizon %u."),
1562 : snapshotConflictHorizon);
1563 24 : break;
1564 :
1565 6 : case RS_INVAL_WAL_LEVEL:
1566 6 : appendStringInfoString(&err_detail, _("Logical decoding on standby requires \"wal_level\" >= \"logical\" on the primary server."));
1567 6 : break;
1568 :
1569 0 : case RS_INVAL_IDLE_TIMEOUT:
1570 : {
1571 0 : int minutes = slot_idle_seconds / SECS_PER_MINUTE;
1572 0 : int secs = slot_idle_seconds % SECS_PER_MINUTE;
1573 :
1574 : /* translator: %s is a GUC variable name */
1575 0 : appendStringInfo(&err_detail, _("The slot's idle time of %dmin %02ds exceeds the configured \"%s\" duration of %dmin."),
1576 : minutes, secs, "idle_replication_slot_timeout",
1577 : idle_replication_slot_timeout_mins);
1578 : /* translator: %s is a GUC variable name */
1579 0 : appendStringInfo(&err_hint, _("You might need to increase \"%s\"."),
1580 : "idle_replication_slot_timeout");
1581 0 : break;
1582 : }
1583 : case RS_INVAL_NONE:
1584 : pg_unreachable();
1585 : }
1586 :
1587 42 : ereport(LOG,
1588 : terminating ?
1589 : errmsg("terminating process %d to release replication slot \"%s\"",
1590 : pid, NameStr(slotname)) :
1591 : errmsg("invalidating obsolete replication slot \"%s\"",
1592 : NameStr(slotname)),
1593 : errdetail_internal("%s", err_detail.data),
1594 : err_hint.len ? errhint("%s", err_hint.data) : 0);
1595 :
1596 42 : pfree(err_detail.data);
1597 42 : pfree(err_hint.data);
1598 42 : }
1599 :
1600 : /*
1601 : * Can we invalidate an idle replication slot?
1602 : *
1603 : * Idle timeout invalidation is allowed only when:
1604 : *
1605 : * 1. Idle timeout is set
1606 : * 2. Slot has reserved WAL
1607 : * 3. Slot is inactive
1608 : * 4. The slot is not being synced from the primary while the server is in
1609 : * recovery. This is because synced slots are always considered to be
1610 : * inactive because they don't perform logical decoding to produce changes.
1611 : */
1612 : static inline bool
1613 620 : CanInvalidateIdleSlot(ReplicationSlot *s)
1614 : {
1615 620 : return (idle_replication_slot_timeout_mins != 0 &&
1616 0 : !XLogRecPtrIsInvalid(s->data.restart_lsn) &&
1617 620 : s->inactive_since > 0 &&
1618 0 : !(RecoveryInProgress() && s->data.synced));
1619 : }
1620 :
1621 : /*
1622 : * DetermineSlotInvalidationCause - Determine the cause for which a slot
1623 : * becomes invalid among the given possible causes.
1624 : *
1625 : * This function sequentially checks all possible invalidation causes and
1626 : * returns the first one for which the slot is eligible for invalidation.
1627 : */
1628 : static ReplicationSlotInvalidationCause
1629 682 : DetermineSlotInvalidationCause(uint32 possible_causes, ReplicationSlot *s,
1630 : XLogRecPtr oldestLSN, Oid dboid,
1631 : TransactionId snapshotConflictHorizon,
1632 : TransactionId initial_effective_xmin,
1633 : TransactionId initial_catalog_effective_xmin,
1634 : XLogRecPtr initial_restart_lsn,
1635 : TimestampTz *inactive_since, TimestampTz now)
1636 : {
1637 : Assert(possible_causes != RS_INVAL_NONE);
1638 :
1639 682 : if (possible_causes & RS_INVAL_WAL_REMOVED)
1640 : {
1641 632 : if (initial_restart_lsn != InvalidXLogRecPtr &&
1642 : initial_restart_lsn < oldestLSN)
1643 12 : return RS_INVAL_WAL_REMOVED;
1644 : }
1645 :
1646 670 : if (possible_causes & RS_INVAL_HORIZON)
1647 : {
1648 : /* invalid DB oid signals a shared relation */
1649 44 : if (SlotIsLogical(s) &&
1650 34 : (dboid == InvalidOid || dboid == s->data.database))
1651 : {
1652 44 : if (TransactionIdIsValid(initial_effective_xmin) &&
1653 0 : TransactionIdPrecedesOrEquals(initial_effective_xmin,
1654 : snapshotConflictHorizon))
1655 0 : return RS_INVAL_HORIZON;
1656 88 : else if (TransactionIdIsValid(initial_catalog_effective_xmin) &&
1657 44 : TransactionIdPrecedesOrEquals(initial_catalog_effective_xmin,
1658 : snapshotConflictHorizon))
1659 24 : return RS_INVAL_HORIZON;
1660 : }
1661 : }
1662 :
1663 646 : if (possible_causes & RS_INVAL_WAL_LEVEL)
1664 : {
1665 6 : if (SlotIsLogical(s))
1666 6 : return RS_INVAL_WAL_LEVEL;
1667 : }
1668 :
1669 640 : if (possible_causes & RS_INVAL_IDLE_TIMEOUT)
1670 : {
1671 : Assert(now > 0);
1672 :
1673 620 : if (CanInvalidateIdleSlot(s))
1674 : {
1675 : /*
1676 : * We simulate the invalidation due to idle_timeout as the minimum
1677 : * time idle time is one minute which makes tests take a long
1678 : * time.
1679 : */
1680 : #ifdef USE_INJECTION_POINTS
1681 0 : if (IS_INJECTION_POINT_ATTACHED("slot-timeout-inval"))
1682 : {
1683 0 : *inactive_since = 0; /* since the beginning of time */
1684 0 : return RS_INVAL_IDLE_TIMEOUT;
1685 : }
1686 : #endif
1687 :
1688 : /*
1689 : * Check if the slot needs to be invalidated due to
1690 : * idle_replication_slot_timeout GUC.
1691 : */
1692 0 : if (TimestampDifferenceExceedsSeconds(s->inactive_since, now,
1693 : idle_replication_slot_timeout_mins * SECS_PER_MINUTE))
1694 : {
1695 0 : *inactive_since = s->inactive_since;
1696 0 : return RS_INVAL_IDLE_TIMEOUT;
1697 : }
1698 : }
1699 : }
1700 :
1701 640 : return RS_INVAL_NONE;
1702 : }
1703 :
1704 : /*
1705 : * Helper for InvalidateObsoleteReplicationSlots
1706 : *
1707 : * Acquires the given slot and mark it invalid, if necessary and possible.
1708 : *
1709 : * Returns whether ReplicationSlotControlLock was released in the interim (and
1710 : * in that case we're not holding the lock at return, otherwise we are).
1711 : *
1712 : * Sets *invalidated true if the slot was invalidated. (Untouched otherwise.)
1713 : *
1714 : * This is inherently racy, because we release the LWLock
1715 : * for syscalls, so caller must restart if we return true.
1716 : */
1717 : static bool
1718 754 : InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
1719 : ReplicationSlot *s,
1720 : XLogRecPtr oldestLSN,
1721 : Oid dboid, TransactionId snapshotConflictHorizon,
1722 : bool *invalidated)
1723 : {
1724 754 : int last_signaled_pid = 0;
1725 754 : bool released_lock = false;
1726 754 : bool terminated = false;
1727 754 : TransactionId initial_effective_xmin = InvalidTransactionId;
1728 754 : TransactionId initial_catalog_effective_xmin = InvalidTransactionId;
1729 754 : XLogRecPtr initial_restart_lsn = InvalidXLogRecPtr;
1730 754 : ReplicationSlotInvalidationCause invalidation_cause_prev PG_USED_FOR_ASSERTS_ONLY = RS_INVAL_NONE;
1731 754 : TimestampTz inactive_since = 0;
1732 :
1733 : for (;;)
1734 14 : {
1735 : XLogRecPtr restart_lsn;
1736 : NameData slotname;
1737 768 : int active_pid = 0;
1738 768 : ReplicationSlotInvalidationCause invalidation_cause = RS_INVAL_NONE;
1739 768 : TimestampTz now = 0;
1740 768 : long slot_idle_secs = 0;
1741 :
1742 : Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
1743 :
1744 768 : if (!s->in_use)
1745 : {
1746 0 : if (released_lock)
1747 0 : LWLockRelease(ReplicationSlotControlLock);
1748 0 : break;
1749 : }
1750 :
1751 768 : if (possible_causes & RS_INVAL_IDLE_TIMEOUT)
1752 : {
1753 : /*
1754 : * Assign the current time here to avoid system call overhead
1755 : * while holding the spinlock in subsequent code.
1756 : */
1757 652 : now = GetCurrentTimestamp();
1758 : }
1759 :
1760 : /*
1761 : * Check if the slot needs to be invalidated. If it needs to be
1762 : * invalidated, and is not currently acquired, acquire it and mark it
1763 : * as having been invalidated. We do this with the spinlock held to
1764 : * avoid race conditions -- for example the restart_lsn could move
1765 : * forward, or the slot could be dropped.
1766 : */
1767 768 : SpinLockAcquire(&s->mutex);
1768 :
1769 768 : restart_lsn = s->data.restart_lsn;
1770 :
1771 : /* we do nothing if the slot is already invalid */
1772 768 : if (s->data.invalidated == RS_INVAL_NONE)
1773 : {
1774 : /*
1775 : * The slot's mutex will be released soon, and it is possible that
1776 : * those values change since the process holding the slot has been
1777 : * terminated (if any), so record them here to ensure that we
1778 : * would report the correct invalidation cause.
1779 : *
1780 : * Unlike other slot attributes, slot's inactive_since can't be
1781 : * changed until the acquired slot is released or the owning
1782 : * process is terminated. So, the inactive slot can only be
1783 : * invalidated immediately without being terminated.
1784 : */
1785 682 : if (!terminated)
1786 : {
1787 668 : initial_restart_lsn = s->data.restart_lsn;
1788 668 : initial_effective_xmin = s->effective_xmin;
1789 668 : initial_catalog_effective_xmin = s->effective_catalog_xmin;
1790 : }
1791 :
1792 682 : invalidation_cause = DetermineSlotInvalidationCause(possible_causes,
1793 : s, oldestLSN,
1794 : dboid,
1795 : snapshotConflictHorizon,
1796 : initial_effective_xmin,
1797 : initial_catalog_effective_xmin,
1798 : initial_restart_lsn,
1799 : &inactive_since,
1800 : now);
1801 : }
1802 :
1803 : /*
1804 : * The invalidation cause recorded previously should not change while
1805 : * the process owning the slot (if any) has been terminated.
1806 : */
1807 : Assert(!(invalidation_cause_prev != RS_INVAL_NONE && terminated &&
1808 : invalidation_cause_prev != invalidation_cause));
1809 :
1810 : /* if there's no invalidation, we're done */
1811 768 : if (invalidation_cause == RS_INVAL_NONE)
1812 : {
1813 726 : SpinLockRelease(&s->mutex);
1814 726 : if (released_lock)
1815 0 : LWLockRelease(ReplicationSlotControlLock);
1816 726 : break;
1817 : }
1818 :
1819 42 : slotname = s->data.name;
1820 42 : active_pid = s->active_pid;
1821 :
1822 : /*
1823 : * If the slot can be acquired, do so and mark it invalidated
1824 : * immediately. Otherwise we'll signal the owning process, below, and
1825 : * retry.
1826 : */
1827 42 : if (active_pid == 0)
1828 : {
1829 28 : MyReplicationSlot = s;
1830 28 : s->active_pid = MyProcPid;
1831 28 : s->data.invalidated = invalidation_cause;
1832 :
1833 : /*
1834 : * XXX: We should consider not overwriting restart_lsn and instead
1835 : * just rely on .invalidated.
1836 : */
1837 28 : if (invalidation_cause == RS_INVAL_WAL_REMOVED)
1838 8 : s->data.restart_lsn = InvalidXLogRecPtr;
1839 :
1840 : /* Let caller know */
1841 28 : *invalidated = true;
1842 : }
1843 :
1844 42 : SpinLockRelease(&s->mutex);
1845 :
1846 : /*
1847 : * The logical replication slots shouldn't be invalidated as GUC
1848 : * max_slot_wal_keep_size is set to -1 and
1849 : * idle_replication_slot_timeout is set to 0 during the binary
1850 : * upgrade. See check_old_cluster_for_valid_slots() where we ensure
1851 : * that no invalidated before the upgrade.
1852 : */
1853 : Assert(!(*invalidated && SlotIsLogical(s) && IsBinaryUpgrade));
1854 :
1855 : /*
1856 : * Calculate the idle time duration of the slot if slot is marked
1857 : * invalidated with RS_INVAL_IDLE_TIMEOUT.
1858 : */
1859 42 : if (invalidation_cause == RS_INVAL_IDLE_TIMEOUT)
1860 : {
1861 : int slot_idle_usecs;
1862 :
1863 0 : TimestampDifference(inactive_since, now, &slot_idle_secs,
1864 : &slot_idle_usecs);
1865 : }
1866 :
1867 42 : if (active_pid != 0)
1868 : {
1869 : /*
1870 : * Prepare the sleep on the slot's condition variable before
1871 : * releasing the lock, to close a possible race condition if the
1872 : * slot is released before the sleep below.
1873 : */
1874 14 : ConditionVariablePrepareToSleep(&s->active_cv);
1875 :
1876 14 : LWLockRelease(ReplicationSlotControlLock);
1877 14 : released_lock = true;
1878 :
1879 : /*
1880 : * Signal to terminate the process that owns the slot, if we
1881 : * haven't already signalled it. (Avoidance of repeated
1882 : * signalling is the only reason for there to be a loop in this
1883 : * routine; otherwise we could rely on caller's restart loop.)
1884 : *
1885 : * There is the race condition that other process may own the slot
1886 : * after its current owner process is terminated and before this
1887 : * process owns it. To handle that, we signal only if the PID of
1888 : * the owning process has changed from the previous time. (This
1889 : * logic assumes that the same PID is not reused very quickly.)
1890 : */
1891 14 : if (last_signaled_pid != active_pid)
1892 : {
1893 14 : ReportSlotInvalidation(invalidation_cause, true, active_pid,
1894 : slotname, restart_lsn,
1895 : oldestLSN, snapshotConflictHorizon,
1896 : slot_idle_secs);
1897 :
1898 14 : if (MyBackendType == B_STARTUP)
1899 10 : (void) SendProcSignal(active_pid,
1900 : PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT,
1901 : INVALID_PROC_NUMBER);
1902 : else
1903 4 : (void) kill(active_pid, SIGTERM);
1904 :
1905 14 : last_signaled_pid = active_pid;
1906 14 : terminated = true;
1907 14 : invalidation_cause_prev = invalidation_cause;
1908 : }
1909 :
1910 : /* Wait until the slot is released. */
1911 14 : ConditionVariableSleep(&s->active_cv,
1912 : WAIT_EVENT_REPLICATION_SLOT_DROP);
1913 :
1914 : /*
1915 : * Re-acquire lock and start over; we expect to invalidate the
1916 : * slot next time (unless another process acquires the slot in the
1917 : * meantime).
1918 : */
1919 14 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1920 14 : continue;
1921 : }
1922 : else
1923 : {
1924 : /*
1925 : * We hold the slot now and have already invalidated it; flush it
1926 : * to ensure that state persists.
1927 : *
1928 : * Don't want to hold ReplicationSlotControlLock across file
1929 : * system operations, so release it now but be sure to tell caller
1930 : * to restart from scratch.
1931 : */
1932 28 : LWLockRelease(ReplicationSlotControlLock);
1933 28 : released_lock = true;
1934 :
1935 : /* Make sure the invalidated state persists across server restart */
1936 28 : ReplicationSlotMarkDirty();
1937 28 : ReplicationSlotSave();
1938 28 : ReplicationSlotRelease();
1939 :
1940 28 : ReportSlotInvalidation(invalidation_cause, false, active_pid,
1941 : slotname, restart_lsn,
1942 : oldestLSN, snapshotConflictHorizon,
1943 : slot_idle_secs);
1944 :
1945 : /* done with this slot for now */
1946 28 : break;
1947 : }
1948 : }
1949 :
1950 : Assert(released_lock == !LWLockHeldByMe(ReplicationSlotControlLock));
1951 :
1952 754 : return released_lock;
1953 : }
1954 :
1955 : /*
1956 : * Invalidate slots that require resources about to be removed.
1957 : *
1958 : * Returns true when any slot have got invalidated.
1959 : *
1960 : * Whether a slot needs to be invalidated depends on the invalidation cause.
1961 : * A slot is invalidated if it:
1962 : * - RS_INVAL_WAL_REMOVED: requires a LSN older than the given segment
1963 : * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given
1964 : * db; dboid may be InvalidOid for shared relations
1965 : * - RS_INVAL_WAL_LEVEL: is logical and wal_level is insufficient
1966 : * - RS_INVAL_IDLE_TIMEOUT: has been idle longer than the configured
1967 : * "idle_replication_slot_timeout" duration.
1968 : *
1969 : * Note: This function attempts to invalidate the slot for multiple possible
1970 : * causes in a single pass, minimizing redundant iterations. The "cause"
1971 : * parameter can be a MASK representing one or more of the defined causes.
1972 : *
1973 : * NB - this runs as part of checkpoint, so avoid raising errors if possible.
1974 : */
1975 : bool
1976 2528 : InvalidateObsoleteReplicationSlots(uint32 possible_causes,
1977 : XLogSegNo oldestSegno, Oid dboid,
1978 : TransactionId snapshotConflictHorizon)
1979 : {
1980 : XLogRecPtr oldestLSN;
1981 2528 : bool invalidated = false;
1982 :
1983 : Assert(!(possible_causes & RS_INVAL_HORIZON) || TransactionIdIsValid(snapshotConflictHorizon));
1984 : Assert(!(possible_causes & RS_INVAL_WAL_REMOVED) || oldestSegno > 0);
1985 : Assert(possible_causes != RS_INVAL_NONE);
1986 :
1987 2528 : if (max_replication_slots == 0)
1988 2 : return invalidated;
1989 :
1990 2526 : XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
1991 :
1992 2554 : restart:
1993 2554 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1994 27082 : for (int i = 0; i < max_replication_slots; i++)
1995 : {
1996 24556 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
1997 :
1998 24556 : if (!s->in_use)
1999 23802 : continue;
2000 :
2001 754 : if (InvalidatePossiblyObsoleteSlot(possible_causes, s, oldestLSN, dboid,
2002 : snapshotConflictHorizon,
2003 : &invalidated))
2004 : {
2005 : /* if the lock was released, start from scratch */
2006 28 : goto restart;
2007 : }
2008 : }
2009 2526 : LWLockRelease(ReplicationSlotControlLock);
2010 :
2011 : /*
2012 : * If any slots have been invalidated, recalculate the resource limits.
2013 : */
2014 2526 : if (invalidated)
2015 : {
2016 18 : ReplicationSlotsComputeRequiredXmin(false);
2017 18 : ReplicationSlotsComputeRequiredLSN();
2018 : }
2019 :
2020 2526 : return invalidated;
2021 : }
2022 :
2023 : /*
2024 : * Flush all replication slots to disk.
2025 : *
2026 : * It is convenient to flush dirty replication slots at the time of checkpoint.
2027 : * Additionally, in case of a shutdown checkpoint, we also identify the slots
2028 : * for which the confirmed_flush LSN has been updated since the last time it
2029 : * was saved and flush them.
2030 : */
2031 : void
2032 2484 : CheckPointReplicationSlots(bool is_shutdown)
2033 : {
2034 : int i;
2035 :
2036 2484 : elog(DEBUG1, "performing replication slot checkpoint");
2037 :
2038 : /*
2039 : * Prevent any slot from being created/dropped while we're active. As we
2040 : * explicitly do *not* want to block iterating over replication_slots or
2041 : * acquiring a slot we cannot take the control lock - but that's OK,
2042 : * because holding ReplicationSlotAllocationLock is strictly stronger, and
2043 : * enough to guarantee that nobody can change the in_use bits on us.
2044 : */
2045 2484 : LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
2046 :
2047 26770 : for (i = 0; i < max_replication_slots; i++)
2048 : {
2049 24286 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
2050 : char path[MAXPGPATH];
2051 :
2052 24286 : if (!s->in_use)
2053 23642 : continue;
2054 :
2055 : /* save the slot to disk, locking is handled in SaveSlotToPath() */
2056 644 : sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(s->data.name));
2057 :
2058 : /*
2059 : * Slot's data is not flushed each time the confirmed_flush LSN is
2060 : * updated as that could lead to frequent writes. However, we decide
2061 : * to force a flush of all logical slot's data at the time of shutdown
2062 : * if the confirmed_flush LSN is changed since we last flushed it to
2063 : * disk. This helps in avoiding an unnecessary retreat of the
2064 : * confirmed_flush LSN after restart.
2065 : */
2066 644 : if (is_shutdown && SlotIsLogical(s))
2067 : {
2068 118 : SpinLockAcquire(&s->mutex);
2069 :
2070 118 : if (s->data.invalidated == RS_INVAL_NONE &&
2071 118 : s->data.confirmed_flush > s->last_saved_confirmed_flush)
2072 : {
2073 64 : s->just_dirtied = true;
2074 64 : s->dirty = true;
2075 : }
2076 118 : SpinLockRelease(&s->mutex);
2077 : }
2078 :
2079 644 : SaveSlotToPath(s, path, LOG);
2080 : }
2081 2484 : LWLockRelease(ReplicationSlotAllocationLock);
2082 2484 : }
2083 :
2084 : /*
2085 : * Load all replication slots from disk into memory at server startup. This
2086 : * needs to be run before we start crash recovery.
2087 : */
2088 : void
2089 1660 : StartupReplicationSlots(void)
2090 : {
2091 : DIR *replication_dir;
2092 : struct dirent *replication_de;
2093 :
2094 1660 : elog(DEBUG1, "starting up replication slots");
2095 :
2096 : /* restore all slots by iterating over all on-disk entries */
2097 1660 : replication_dir = AllocateDir(PG_REPLSLOT_DIR);
2098 5116 : while ((replication_de = ReadDir(replication_dir, PG_REPLSLOT_DIR)) != NULL)
2099 : {
2100 : char path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
2101 : PGFileType de_type;
2102 :
2103 3456 : if (strcmp(replication_de->d_name, ".") == 0 ||
2104 1796 : strcmp(replication_de->d_name, "..") == 0)
2105 3320 : continue;
2106 :
2107 136 : snprintf(path, sizeof(path), "%s/%s", PG_REPLSLOT_DIR, replication_de->d_name);
2108 136 : de_type = get_dirent_type(path, replication_de, false, DEBUG1);
2109 :
2110 : /* we're only creating directories here, skip if it's not our's */
2111 136 : if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_DIR)
2112 0 : continue;
2113 :
2114 : /* we crashed while a slot was being setup or deleted, clean up */
2115 136 : if (pg_str_endswith(replication_de->d_name, ".tmp"))
2116 : {
2117 0 : if (!rmtree(path, true))
2118 : {
2119 0 : ereport(WARNING,
2120 : (errmsg("could not remove directory \"%s\"",
2121 : path)));
2122 0 : continue;
2123 : }
2124 0 : fsync_fname(PG_REPLSLOT_DIR, true);
2125 0 : continue;
2126 : }
2127 :
2128 : /* looks like a slot in a normal state, restore */
2129 136 : RestoreSlotFromDisk(replication_de->d_name);
2130 : }
2131 1660 : FreeDir(replication_dir);
2132 :
2133 : /* currently no slots exist, we're done. */
2134 1660 : if (max_replication_slots <= 0)
2135 2 : return;
2136 :
2137 : /* Now that we have recovered all the data, compute replication xmin */
2138 1658 : ReplicationSlotsComputeRequiredXmin(false);
2139 1658 : ReplicationSlotsComputeRequiredLSN();
2140 : }
2141 :
2142 : /* ----
2143 : * Manipulation of on-disk state of replication slots
2144 : *
2145 : * NB: none of the routines below should take any notice whether a slot is the
2146 : * current one or not, that's all handled a layer above.
2147 : * ----
2148 : */
2149 : static void
2150 1220 : CreateSlotOnDisk(ReplicationSlot *slot)
2151 : {
2152 : char tmppath[MAXPGPATH];
2153 : char path[MAXPGPATH];
2154 : struct stat st;
2155 :
2156 : /*
2157 : * No need to take out the io_in_progress_lock, nobody else can see this
2158 : * slot yet, so nobody else will write. We're reusing SaveSlotToPath which
2159 : * takes out the lock, if we'd take the lock here, we'd deadlock.
2160 : */
2161 :
2162 1220 : sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(slot->data.name));
2163 1220 : sprintf(tmppath, "%s/%s.tmp", PG_REPLSLOT_DIR, NameStr(slot->data.name));
2164 :
2165 : /*
2166 : * It's just barely possible that some previous effort to create or drop a
2167 : * slot with this name left a temp directory lying around. If that seems
2168 : * to be the case, try to remove it. If the rmtree() fails, we'll error
2169 : * out at the MakePGDirectory() below, so we don't bother checking
2170 : * success.
2171 : */
2172 1220 : if (stat(tmppath, &st) == 0 && S_ISDIR(st.st_mode))
2173 0 : rmtree(tmppath, true);
2174 :
2175 : /* Create and fsync the temporary slot directory. */
2176 1220 : if (MakePGDirectory(tmppath) < 0)
2177 0 : ereport(ERROR,
2178 : (errcode_for_file_access(),
2179 : errmsg("could not create directory \"%s\": %m",
2180 : tmppath)));
2181 1220 : fsync_fname(tmppath, true);
2182 :
2183 : /* Write the actual state file. */
2184 1220 : slot->dirty = true; /* signal that we really need to write */
2185 1220 : SaveSlotToPath(slot, tmppath, ERROR);
2186 :
2187 : /* Rename the directory into place. */
2188 1220 : if (rename(tmppath, path) != 0)
2189 0 : ereport(ERROR,
2190 : (errcode_for_file_access(),
2191 : errmsg("could not rename file \"%s\" to \"%s\": %m",
2192 : tmppath, path)));
2193 :
2194 : /*
2195 : * If we'd now fail - really unlikely - we wouldn't know whether this slot
2196 : * would persist after an OS crash or not - so, force a restart. The
2197 : * restart would try to fsync this again till it works.
2198 : */
2199 1220 : START_CRIT_SECTION();
2200 :
2201 1220 : fsync_fname(path, true);
2202 1220 : fsync_fname(PG_REPLSLOT_DIR, true);
2203 :
2204 1220 : END_CRIT_SECTION();
2205 1220 : }
2206 :
2207 : /*
2208 : * Shared functionality between saving and creating a replication slot.
2209 : */
2210 : static void
2211 4314 : SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
2212 : {
2213 : char tmppath[MAXPGPATH];
2214 : char path[MAXPGPATH];
2215 : int fd;
2216 : ReplicationSlotOnDisk cp;
2217 : bool was_dirty;
2218 :
2219 : /* first check whether there's something to write out */
2220 4314 : SpinLockAcquire(&slot->mutex);
2221 4314 : was_dirty = slot->dirty;
2222 4314 : slot->just_dirtied = false;
2223 4314 : SpinLockRelease(&slot->mutex);
2224 :
2225 : /* and don't do anything if there's nothing to write */
2226 4314 : if (!was_dirty)
2227 186 : return;
2228 :
2229 4128 : LWLockAcquire(&slot->io_in_progress_lock, LW_EXCLUSIVE);
2230 :
2231 : /* silence valgrind :( */
2232 4128 : memset(&cp, 0, sizeof(ReplicationSlotOnDisk));
2233 :
2234 4128 : sprintf(tmppath, "%s/state.tmp", dir);
2235 4128 : sprintf(path, "%s/state", dir);
2236 :
2237 4128 : fd = OpenTransientFile(tmppath, O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
2238 4128 : if (fd < 0)
2239 : {
2240 : /*
2241 : * If not an ERROR, then release the lock before returning. In case
2242 : * of an ERROR, the error recovery path automatically releases the
2243 : * lock, but no harm in explicitly releasing even in that case. Note
2244 : * that LWLockRelease() could affect errno.
2245 : */
2246 0 : int save_errno = errno;
2247 :
2248 0 : LWLockRelease(&slot->io_in_progress_lock);
2249 0 : errno = save_errno;
2250 0 : ereport(elevel,
2251 : (errcode_for_file_access(),
2252 : errmsg("could not create file \"%s\": %m",
2253 : tmppath)));
2254 0 : return;
2255 : }
2256 :
2257 4128 : cp.magic = SLOT_MAGIC;
2258 4128 : INIT_CRC32C(cp.checksum);
2259 4128 : cp.version = SLOT_VERSION;
2260 4128 : cp.length = ReplicationSlotOnDiskV2Size;
2261 :
2262 4128 : SpinLockAcquire(&slot->mutex);
2263 :
2264 4128 : memcpy(&cp.slotdata, &slot->data, sizeof(ReplicationSlotPersistentData));
2265 :
2266 4128 : SpinLockRelease(&slot->mutex);
2267 :
2268 4128 : COMP_CRC32C(cp.checksum,
2269 : (char *) (&cp) + ReplicationSlotOnDiskNotChecksummedSize,
2270 : ReplicationSlotOnDiskChecksummedSize);
2271 4128 : FIN_CRC32C(cp.checksum);
2272 :
2273 4128 : errno = 0;
2274 4128 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_WRITE);
2275 4128 : if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
2276 : {
2277 0 : int save_errno = errno;
2278 :
2279 0 : pgstat_report_wait_end();
2280 0 : CloseTransientFile(fd);
2281 0 : LWLockRelease(&slot->io_in_progress_lock);
2282 :
2283 : /* if write didn't set errno, assume problem is no disk space */
2284 0 : errno = save_errno ? save_errno : ENOSPC;
2285 0 : ereport(elevel,
2286 : (errcode_for_file_access(),
2287 : errmsg("could not write to file \"%s\": %m",
2288 : tmppath)));
2289 0 : return;
2290 : }
2291 4128 : pgstat_report_wait_end();
2292 :
2293 : /* fsync the temporary file */
2294 4128 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_SYNC);
2295 4128 : if (pg_fsync(fd) != 0)
2296 : {
2297 0 : int save_errno = errno;
2298 :
2299 0 : pgstat_report_wait_end();
2300 0 : CloseTransientFile(fd);
2301 0 : LWLockRelease(&slot->io_in_progress_lock);
2302 0 : errno = save_errno;
2303 0 : ereport(elevel,
2304 : (errcode_for_file_access(),
2305 : errmsg("could not fsync file \"%s\": %m",
2306 : tmppath)));
2307 0 : return;
2308 : }
2309 4128 : pgstat_report_wait_end();
2310 :
2311 4128 : if (CloseTransientFile(fd) != 0)
2312 : {
2313 0 : int save_errno = errno;
2314 :
2315 0 : LWLockRelease(&slot->io_in_progress_lock);
2316 0 : errno = save_errno;
2317 0 : ereport(elevel,
2318 : (errcode_for_file_access(),
2319 : errmsg("could not close file \"%s\": %m",
2320 : tmppath)));
2321 0 : return;
2322 : }
2323 :
2324 : /* rename to permanent file, fsync file and directory */
2325 4128 : if (rename(tmppath, path) != 0)
2326 : {
2327 0 : int save_errno = errno;
2328 :
2329 0 : LWLockRelease(&slot->io_in_progress_lock);
2330 0 : errno = save_errno;
2331 0 : ereport(elevel,
2332 : (errcode_for_file_access(),
2333 : errmsg("could not rename file \"%s\" to \"%s\": %m",
2334 : tmppath, path)));
2335 0 : return;
2336 : }
2337 :
2338 : /*
2339 : * Check CreateSlotOnDisk() for the reasoning of using a critical section.
2340 : */
2341 4128 : START_CRIT_SECTION();
2342 :
2343 4128 : fsync_fname(path, false);
2344 4128 : fsync_fname(dir, true);
2345 4128 : fsync_fname(PG_REPLSLOT_DIR, true);
2346 :
2347 4128 : END_CRIT_SECTION();
2348 :
2349 : /*
2350 : * Successfully wrote, unset dirty bit, unless somebody dirtied again
2351 : * already and remember the confirmed_flush LSN value.
2352 : */
2353 4128 : SpinLockAcquire(&slot->mutex);
2354 4128 : if (!slot->just_dirtied)
2355 4084 : slot->dirty = false;
2356 4128 : slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
2357 4128 : SpinLockRelease(&slot->mutex);
2358 :
2359 4128 : LWLockRelease(&slot->io_in_progress_lock);
2360 : }
2361 :
2362 : /*
2363 : * Load a single slot from disk into memory.
2364 : */
2365 : static void
2366 136 : RestoreSlotFromDisk(const char *name)
2367 : {
2368 : ReplicationSlotOnDisk cp;
2369 : int i;
2370 : char slotdir[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
2371 : char path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR) + 10];
2372 : int fd;
2373 136 : bool restored = false;
2374 : int readBytes;
2375 : pg_crc32c checksum;
2376 136 : TimestampTz now = 0;
2377 :
2378 : /* no need to lock here, no concurrent access allowed yet */
2379 :
2380 : /* delete temp file if it exists */
2381 136 : sprintf(slotdir, "%s/%s", PG_REPLSLOT_DIR, name);
2382 136 : sprintf(path, "%s/state.tmp", slotdir);
2383 136 : if (unlink(path) < 0 && errno != ENOENT)
2384 0 : ereport(PANIC,
2385 : (errcode_for_file_access(),
2386 : errmsg("could not remove file \"%s\": %m", path)));
2387 :
2388 136 : sprintf(path, "%s/state", slotdir);
2389 :
2390 136 : elog(DEBUG1, "restoring replication slot from \"%s\"", path);
2391 :
2392 : /* on some operating systems fsyncing a file requires O_RDWR */
2393 136 : fd = OpenTransientFile(path, O_RDWR | PG_BINARY);
2394 :
2395 : /*
2396 : * We do not need to handle this as we are rename()ing the directory into
2397 : * place only after we fsync()ed the state file.
2398 : */
2399 136 : if (fd < 0)
2400 0 : ereport(PANIC,
2401 : (errcode_for_file_access(),
2402 : errmsg("could not open file \"%s\": %m", path)));
2403 :
2404 : /*
2405 : * Sync state file before we're reading from it. We might have crashed
2406 : * while it wasn't synced yet and we shouldn't continue on that basis.
2407 : */
2408 136 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_RESTORE_SYNC);
2409 136 : if (pg_fsync(fd) != 0)
2410 0 : ereport(PANIC,
2411 : (errcode_for_file_access(),
2412 : errmsg("could not fsync file \"%s\": %m",
2413 : path)));
2414 136 : pgstat_report_wait_end();
2415 :
2416 : /* Also sync the parent directory */
2417 136 : START_CRIT_SECTION();
2418 136 : fsync_fname(slotdir, true);
2419 136 : END_CRIT_SECTION();
2420 :
2421 : /* read part of statefile that's guaranteed to be version independent */
2422 136 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
2423 136 : readBytes = read(fd, &cp, ReplicationSlotOnDiskConstantSize);
2424 136 : pgstat_report_wait_end();
2425 136 : if (readBytes != ReplicationSlotOnDiskConstantSize)
2426 : {
2427 0 : if (readBytes < 0)
2428 0 : ereport(PANIC,
2429 : (errcode_for_file_access(),
2430 : errmsg("could not read file \"%s\": %m", path)));
2431 : else
2432 0 : ereport(PANIC,
2433 : (errcode(ERRCODE_DATA_CORRUPTED),
2434 : errmsg("could not read file \"%s\": read %d of %zu",
2435 : path, readBytes,
2436 : (Size) ReplicationSlotOnDiskConstantSize)));
2437 : }
2438 :
2439 : /* verify magic */
2440 136 : if (cp.magic != SLOT_MAGIC)
2441 0 : ereport(PANIC,
2442 : (errcode(ERRCODE_DATA_CORRUPTED),
2443 : errmsg("replication slot file \"%s\" has wrong magic number: %u instead of %u",
2444 : path, cp.magic, SLOT_MAGIC)));
2445 :
2446 : /* verify version */
2447 136 : if (cp.version != SLOT_VERSION)
2448 0 : ereport(PANIC,
2449 : (errcode(ERRCODE_DATA_CORRUPTED),
2450 : errmsg("replication slot file \"%s\" has unsupported version %u",
2451 : path, cp.version)));
2452 :
2453 : /* boundary check on length */
2454 136 : if (cp.length != ReplicationSlotOnDiskV2Size)
2455 0 : ereport(PANIC,
2456 : (errcode(ERRCODE_DATA_CORRUPTED),
2457 : errmsg("replication slot file \"%s\" has corrupted length %u",
2458 : path, cp.length)));
2459 :
2460 : /* Now that we know the size, read the entire file */
2461 136 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
2462 272 : readBytes = read(fd,
2463 : (char *) &cp + ReplicationSlotOnDiskConstantSize,
2464 136 : cp.length);
2465 136 : pgstat_report_wait_end();
2466 136 : if (readBytes != cp.length)
2467 : {
2468 0 : if (readBytes < 0)
2469 0 : ereport(PANIC,
2470 : (errcode_for_file_access(),
2471 : errmsg("could not read file \"%s\": %m", path)));
2472 : else
2473 0 : ereport(PANIC,
2474 : (errcode(ERRCODE_DATA_CORRUPTED),
2475 : errmsg("could not read file \"%s\": read %d of %zu",
2476 : path, readBytes, (Size) cp.length)));
2477 : }
2478 :
2479 136 : if (CloseTransientFile(fd) != 0)
2480 0 : ereport(PANIC,
2481 : (errcode_for_file_access(),
2482 : errmsg("could not close file \"%s\": %m", path)));
2483 :
2484 : /* now verify the CRC */
2485 136 : INIT_CRC32C(checksum);
2486 136 : COMP_CRC32C(checksum,
2487 : (char *) &cp + ReplicationSlotOnDiskNotChecksummedSize,
2488 : ReplicationSlotOnDiskChecksummedSize);
2489 136 : FIN_CRC32C(checksum);
2490 :
2491 136 : if (!EQ_CRC32C(checksum, cp.checksum))
2492 0 : ereport(PANIC,
2493 : (errmsg("checksum mismatch for replication slot file \"%s\": is %u, should be %u",
2494 : path, checksum, cp.checksum)));
2495 :
2496 : /*
2497 : * If we crashed with an ephemeral slot active, don't restore but delete
2498 : * it.
2499 : */
2500 136 : if (cp.slotdata.persistency != RS_PERSISTENT)
2501 : {
2502 0 : if (!rmtree(slotdir, true))
2503 : {
2504 0 : ereport(WARNING,
2505 : (errmsg("could not remove directory \"%s\"",
2506 : slotdir)));
2507 : }
2508 0 : fsync_fname(PG_REPLSLOT_DIR, true);
2509 0 : return;
2510 : }
2511 :
2512 : /*
2513 : * Verify that requirements for the specific slot type are met. That's
2514 : * important because if these aren't met we're not guaranteed to retain
2515 : * all the necessary resources for the slot.
2516 : *
2517 : * NB: We have to do so *after* the above checks for ephemeral slots,
2518 : * because otherwise a slot that shouldn't exist anymore could prevent
2519 : * restarts.
2520 : *
2521 : * NB: Changing the requirements here also requires adapting
2522 : * CheckSlotRequirements() and CheckLogicalDecodingRequirements().
2523 : */
2524 136 : if (cp.slotdata.database != InvalidOid && wal_level < WAL_LEVEL_LOGICAL)
2525 0 : ereport(FATAL,
2526 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2527 : errmsg("logical replication slot \"%s\" exists, but \"wal_level\" < \"logical\"",
2528 : NameStr(cp.slotdata.name)),
2529 : errhint("Change \"wal_level\" to be \"logical\" or higher.")));
2530 136 : else if (wal_level < WAL_LEVEL_REPLICA)
2531 0 : ereport(FATAL,
2532 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2533 : errmsg("physical replication slot \"%s\" exists, but \"wal_level\" < \"replica\"",
2534 : NameStr(cp.slotdata.name)),
2535 : errhint("Change \"wal_level\" to be \"replica\" or higher.")));
2536 :
2537 : /* nothing can be active yet, don't lock anything */
2538 188 : for (i = 0; i < max_replication_slots; i++)
2539 : {
2540 : ReplicationSlot *slot;
2541 :
2542 188 : slot = &ReplicationSlotCtl->replication_slots[i];
2543 :
2544 188 : if (slot->in_use)
2545 52 : continue;
2546 :
2547 : /* restore the entire set of persistent data */
2548 136 : memcpy(&slot->data, &cp.slotdata,
2549 : sizeof(ReplicationSlotPersistentData));
2550 :
2551 : /* initialize in memory state */
2552 136 : slot->effective_xmin = cp.slotdata.xmin;
2553 136 : slot->effective_catalog_xmin = cp.slotdata.catalog_xmin;
2554 136 : slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
2555 :
2556 136 : slot->candidate_catalog_xmin = InvalidTransactionId;
2557 136 : slot->candidate_xmin_lsn = InvalidXLogRecPtr;
2558 136 : slot->candidate_restart_lsn = InvalidXLogRecPtr;
2559 136 : slot->candidate_restart_valid = InvalidXLogRecPtr;
2560 :
2561 136 : slot->in_use = true;
2562 136 : slot->active_pid = 0;
2563 :
2564 : /*
2565 : * Set the time since the slot has become inactive after loading the
2566 : * slot from the disk into memory. Whoever acquires the slot i.e.
2567 : * makes the slot active will reset it. Use the same inactive_since
2568 : * time for all the slots.
2569 : */
2570 136 : if (now == 0)
2571 136 : now = GetCurrentTimestamp();
2572 :
2573 136 : ReplicationSlotSetInactiveSince(slot, now, false);
2574 :
2575 136 : restored = true;
2576 136 : break;
2577 : }
2578 :
2579 136 : if (!restored)
2580 0 : ereport(FATAL,
2581 : (errmsg("too many replication slots active before shutdown"),
2582 : errhint("Increase \"max_replication_slots\" and try again.")));
2583 : }
2584 :
2585 : /*
2586 : * Maps an invalidation reason for a replication slot to
2587 : * ReplicationSlotInvalidationCause.
2588 : */
2589 : ReplicationSlotInvalidationCause
2590 0 : GetSlotInvalidationCause(const char *cause_name)
2591 : {
2592 : Assert(cause_name);
2593 :
2594 : /* Search lookup table for the cause having this name */
2595 0 : for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
2596 : {
2597 0 : if (strcmp(SlotInvalidationCauses[i].cause_name, cause_name) == 0)
2598 0 : return SlotInvalidationCauses[i].cause;
2599 : }
2600 :
2601 : Assert(false);
2602 0 : return RS_INVAL_NONE; /* to keep compiler quiet */
2603 : }
2604 :
2605 : /*
2606 : * Maps an ReplicationSlotInvalidationCause to the invalidation
2607 : * reason for a replication slot.
2608 : */
2609 : const char *
2610 86 : GetSlotInvalidationCauseName(ReplicationSlotInvalidationCause cause)
2611 : {
2612 : /* Search lookup table for the name of this cause */
2613 274 : for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
2614 : {
2615 274 : if (SlotInvalidationCauses[i].cause == cause)
2616 86 : return SlotInvalidationCauses[i].cause_name;
2617 : }
2618 :
2619 : Assert(false);
2620 0 : return "none"; /* to keep compiler quiet */
2621 : }
2622 :
2623 : /*
2624 : * A helper function to validate slots specified in GUC synchronized_standby_slots.
2625 : *
2626 : * The rawname will be parsed, and the result will be saved into *elemlist.
2627 : */
2628 : static bool
2629 12 : validate_sync_standby_slots(char *rawname, List **elemlist)
2630 : {
2631 : bool ok;
2632 :
2633 : /* Verify syntax and parse string into a list of identifiers */
2634 12 : ok = SplitIdentifierString(rawname, ',', elemlist);
2635 :
2636 12 : if (!ok)
2637 : {
2638 0 : GUC_check_errdetail("List syntax is invalid.");
2639 : }
2640 12 : else if (MyProc)
2641 : {
2642 : /*
2643 : * Check that each specified slot exist and is physical.
2644 : *
2645 : * Because we need an LWLock, we cannot do this on processes without a
2646 : * PGPROC, so we skip it there; but see comments in
2647 : * StandbySlotsHaveCaughtup() as to why that's not a problem.
2648 : */
2649 6 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
2650 :
2651 18 : foreach_ptr(char, name, *elemlist)
2652 : {
2653 : ReplicationSlot *slot;
2654 :
2655 6 : slot = SearchNamedReplicationSlot(name, false);
2656 :
2657 6 : if (!slot)
2658 : {
2659 0 : GUC_check_errdetail("Replication slot \"%s\" does not exist.",
2660 : name);
2661 0 : ok = false;
2662 0 : break;
2663 : }
2664 :
2665 6 : if (!SlotIsPhysical(slot))
2666 : {
2667 0 : GUC_check_errdetail("\"%s\" is not a physical replication slot.",
2668 : name);
2669 0 : ok = false;
2670 0 : break;
2671 : }
2672 : }
2673 :
2674 6 : LWLockRelease(ReplicationSlotControlLock);
2675 : }
2676 :
2677 12 : return ok;
2678 : }
2679 :
2680 : /*
2681 : * GUC check_hook for synchronized_standby_slots
2682 : */
2683 : bool
2684 2016 : check_synchronized_standby_slots(char **newval, void **extra, GucSource source)
2685 : {
2686 : char *rawname;
2687 : char *ptr;
2688 : List *elemlist;
2689 : int size;
2690 : bool ok;
2691 : SyncStandbySlotsConfigData *config;
2692 :
2693 2016 : if ((*newval)[0] == '\0')
2694 2004 : return true;
2695 :
2696 : /* Need a modifiable copy of the GUC string */
2697 12 : rawname = pstrdup(*newval);
2698 :
2699 : /* Now verify if the specified slots exist and have correct type */
2700 12 : ok = validate_sync_standby_slots(rawname, &elemlist);
2701 :
2702 12 : if (!ok || elemlist == NIL)
2703 : {
2704 0 : pfree(rawname);
2705 0 : list_free(elemlist);
2706 0 : return ok;
2707 : }
2708 :
2709 : /* Compute the size required for the SyncStandbySlotsConfigData struct */
2710 12 : size = offsetof(SyncStandbySlotsConfigData, slot_names);
2711 36 : foreach_ptr(char, slot_name, elemlist)
2712 12 : size += strlen(slot_name) + 1;
2713 :
2714 : /* GUC extra value must be guc_malloc'd, not palloc'd */
2715 12 : config = (SyncStandbySlotsConfigData *) guc_malloc(LOG, size);
2716 :
2717 : /* Transform the data into SyncStandbySlotsConfigData */
2718 12 : config->nslotnames = list_length(elemlist);
2719 :
2720 12 : ptr = config->slot_names;
2721 36 : foreach_ptr(char, slot_name, elemlist)
2722 : {
2723 12 : strcpy(ptr, slot_name);
2724 12 : ptr += strlen(slot_name) + 1;
2725 : }
2726 :
2727 12 : *extra = config;
2728 :
2729 12 : pfree(rawname);
2730 12 : list_free(elemlist);
2731 12 : return true;
2732 : }
2733 :
2734 : /*
2735 : * GUC assign_hook for synchronized_standby_slots
2736 : */
2737 : void
2738 2016 : assign_synchronized_standby_slots(const char *newval, void *extra)
2739 : {
2740 : /*
2741 : * The standby slots may have changed, so we must recompute the oldest
2742 : * LSN.
2743 : */
2744 2016 : ss_oldest_flush_lsn = InvalidXLogRecPtr;
2745 :
2746 2016 : synchronized_standby_slots_config = (SyncStandbySlotsConfigData *) extra;
2747 2016 : }
2748 :
2749 : /*
2750 : * Check if the passed slot_name is specified in the synchronized_standby_slots GUC.
2751 : */
2752 : bool
2753 22708 : SlotExistsInSyncStandbySlots(const char *slot_name)
2754 : {
2755 : const char *standby_slot_name;
2756 :
2757 : /* Return false if there is no value in synchronized_standby_slots */
2758 22708 : if (synchronized_standby_slots_config == NULL)
2759 22698 : return false;
2760 :
2761 : /*
2762 : * XXX: We are not expecting this list to be long so a linear search
2763 : * shouldn't hurt but if that turns out not to be true then we can cache
2764 : * this information for each WalSender as well.
2765 : */
2766 10 : standby_slot_name = synchronized_standby_slots_config->slot_names;
2767 10 : for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
2768 : {
2769 10 : if (strcmp(standby_slot_name, slot_name) == 0)
2770 10 : return true;
2771 :
2772 0 : standby_slot_name += strlen(standby_slot_name) + 1;
2773 : }
2774 :
2775 0 : return false;
2776 : }
2777 :
2778 : /*
2779 : * Return true if the slots specified in synchronized_standby_slots have caught up to
2780 : * the given WAL location, false otherwise.
2781 : *
2782 : * The elevel parameter specifies the error level used for logging messages
2783 : * related to slots that do not exist, are invalidated, or are inactive.
2784 : */
2785 : bool
2786 1212 : StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
2787 : {
2788 : const char *name;
2789 1212 : int caught_up_slot_num = 0;
2790 1212 : XLogRecPtr min_restart_lsn = InvalidXLogRecPtr;
2791 :
2792 : /*
2793 : * Don't need to wait for the standbys to catch up if there is no value in
2794 : * synchronized_standby_slots.
2795 : */
2796 1212 : if (synchronized_standby_slots_config == NULL)
2797 1186 : return true;
2798 :
2799 : /*
2800 : * Don't need to wait for the standbys to catch up if we are on a standby
2801 : * server, since we do not support syncing slots to cascading standbys.
2802 : */
2803 26 : if (RecoveryInProgress())
2804 0 : return true;
2805 :
2806 : /*
2807 : * Don't need to wait for the standbys to catch up if they are already
2808 : * beyond the specified WAL location.
2809 : */
2810 26 : if (!XLogRecPtrIsInvalid(ss_oldest_flush_lsn) &&
2811 18 : ss_oldest_flush_lsn >= wait_for_lsn)
2812 10 : return true;
2813 :
2814 : /*
2815 : * To prevent concurrent slot dropping and creation while filtering the
2816 : * slots, take the ReplicationSlotControlLock outside of the loop.
2817 : */
2818 16 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
2819 :
2820 16 : name = synchronized_standby_slots_config->slot_names;
2821 22 : for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
2822 : {
2823 : XLogRecPtr restart_lsn;
2824 : bool invalidated;
2825 : bool inactive;
2826 : ReplicationSlot *slot;
2827 :
2828 16 : slot = SearchNamedReplicationSlot(name, false);
2829 :
2830 : /*
2831 : * If a slot name provided in synchronized_standby_slots does not
2832 : * exist, report a message and exit the loop.
2833 : *
2834 : * Though validate_sync_standby_slots (the GUC check_hook) tries to
2835 : * avoid this, it can nonetheless happen because the user can specify
2836 : * a nonexistent slot name before server startup. That function cannot
2837 : * validate such a slot during startup, as ReplicationSlotCtl is not
2838 : * initialized by then. Also, the user might have dropped one slot.
2839 : */
2840 16 : if (!slot)
2841 : {
2842 0 : ereport(elevel,
2843 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2844 : errmsg("replication slot \"%s\" specified in parameter \"%s\" does not exist",
2845 : name, "synchronized_standby_slots"),
2846 : errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
2847 : name),
2848 : errhint("Create the replication slot \"%s\" or amend parameter \"%s\".",
2849 : name, "synchronized_standby_slots"));
2850 0 : break;
2851 : }
2852 :
2853 : /* Same as above: if a slot is not physical, exit the loop. */
2854 16 : if (SlotIsLogical(slot))
2855 : {
2856 0 : ereport(elevel,
2857 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2858 : errmsg("cannot specify logical replication slot \"%s\" in parameter \"%s\"",
2859 : name, "synchronized_standby_slots"),
2860 : errdetail("Logical replication is waiting for correction on replication slot \"%s\".",
2861 : name),
2862 : errhint("Remove the logical replication slot \"%s\" from parameter \"%s\".",
2863 : name, "synchronized_standby_slots"));
2864 0 : break;
2865 : }
2866 :
2867 16 : SpinLockAcquire(&slot->mutex);
2868 16 : restart_lsn = slot->data.restart_lsn;
2869 16 : invalidated = slot->data.invalidated != RS_INVAL_NONE;
2870 16 : inactive = slot->active_pid == 0;
2871 16 : SpinLockRelease(&slot->mutex);
2872 :
2873 16 : if (invalidated)
2874 : {
2875 : /* Specified physical slot has been invalidated */
2876 0 : ereport(elevel,
2877 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2878 : errmsg("physical replication slot \"%s\" specified in parameter \"%s\" has been invalidated",
2879 : name, "synchronized_standby_slots"),
2880 : errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
2881 : name),
2882 : errhint("Drop and recreate the replication slot \"%s\", or amend parameter \"%s\".",
2883 : name, "synchronized_standby_slots"));
2884 0 : break;
2885 : }
2886 :
2887 16 : if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn < wait_for_lsn)
2888 : {
2889 : /* Log a message if no active_pid for this physical slot */
2890 10 : if (inactive)
2891 8 : ereport(elevel,
2892 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2893 : errmsg("replication slot \"%s\" specified in parameter \"%s\" does not have active_pid",
2894 : name, "synchronized_standby_slots"),
2895 : errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
2896 : name),
2897 : errhint("Start the standby associated with the replication slot \"%s\", or amend parameter \"%s\".",
2898 : name, "synchronized_standby_slots"));
2899 :
2900 : /* Continue if the current slot hasn't caught up. */
2901 10 : break;
2902 : }
2903 :
2904 : Assert(restart_lsn >= wait_for_lsn);
2905 :
2906 6 : if (XLogRecPtrIsInvalid(min_restart_lsn) ||
2907 : min_restart_lsn > restart_lsn)
2908 6 : min_restart_lsn = restart_lsn;
2909 :
2910 6 : caught_up_slot_num++;
2911 :
2912 6 : name += strlen(name) + 1;
2913 : }
2914 :
2915 16 : LWLockRelease(ReplicationSlotControlLock);
2916 :
2917 : /*
2918 : * Return false if not all the standbys have caught up to the specified
2919 : * WAL location.
2920 : */
2921 16 : if (caught_up_slot_num != synchronized_standby_slots_config->nslotnames)
2922 10 : return false;
2923 :
2924 : /* The ss_oldest_flush_lsn must not retreat. */
2925 : Assert(XLogRecPtrIsInvalid(ss_oldest_flush_lsn) ||
2926 : min_restart_lsn >= ss_oldest_flush_lsn);
2927 :
2928 6 : ss_oldest_flush_lsn = min_restart_lsn;
2929 :
2930 6 : return true;
2931 : }
2932 :
2933 : /*
2934 : * Wait for physical standbys to confirm receiving the given lsn.
2935 : *
2936 : * Used by logical decoding SQL functions. It waits for physical standbys
2937 : * corresponding to the physical slots specified in the synchronized_standby_slots GUC.
2938 : */
2939 : void
2940 432 : WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
2941 : {
2942 : /*
2943 : * Don't need to wait for the standby to catch up if the current acquired
2944 : * slot is not a logical failover slot, or there is no value in
2945 : * synchronized_standby_slots.
2946 : */
2947 432 : if (!MyReplicationSlot->data.failover || !synchronized_standby_slots_config)
2948 430 : return;
2949 :
2950 2 : ConditionVariablePrepareToSleep(&WalSndCtl->wal_confirm_rcv_cv);
2951 :
2952 : for (;;)
2953 : {
2954 4 : CHECK_FOR_INTERRUPTS();
2955 :
2956 4 : if (ConfigReloadPending)
2957 : {
2958 2 : ConfigReloadPending = false;
2959 2 : ProcessConfigFile(PGC_SIGHUP);
2960 : }
2961 :
2962 : /* Exit if done waiting for every slot. */
2963 4 : if (StandbySlotsHaveCaughtup(wait_for_lsn, WARNING))
2964 2 : break;
2965 :
2966 : /*
2967 : * Wait for the slots in the synchronized_standby_slots to catch up,
2968 : * but use a timeout (1s) so we can also check if the
2969 : * synchronized_standby_slots has been changed.
2970 : */
2971 2 : ConditionVariableTimedSleep(&WalSndCtl->wal_confirm_rcv_cv, 1000,
2972 : WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION);
2973 : }
2974 :
2975 2 : ConditionVariableCancelSleep();
2976 : }
2977 :
2978 : /*
2979 : * GUC check_hook for idle_replication_slot_timeout
2980 : *
2981 : * The value of idle_replication_slot_timeout must be set to 0 during
2982 : * a binary upgrade. See start_postmaster() in pg_upgrade for more details.
2983 : */
2984 : bool
2985 2062 : check_idle_replication_slot_timeout(int *newval, void **extra, GucSource source)
2986 : {
2987 2062 : if (IsBinaryUpgrade && *newval != 0)
2988 : {
2989 0 : GUC_check_errdetail("\"%s\" must be set to 0 during binary upgrade mode.",
2990 : "idle_replication_slot_timeout");
2991 0 : return false;
2992 : }
2993 :
2994 2062 : return true;
2995 : }
|