Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * slot.c
4 : * Replication slot management.
5 : *
6 : *
7 : * Copyright (c) 2012-2024, PostgreSQL Global Development Group
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/backend/replication/slot.c
12 : *
13 : * NOTES
14 : *
15 : * Replication slots are used to keep state about replication streams
16 : * originating from this cluster. Their primary purpose is to prevent the
17 : * premature removal of WAL or of old tuple versions in a manner that would
18 : * interfere with replication; they are also useful for monitoring purposes.
19 : * Slots need to be permanent (to allow restarts), crash-safe, and allocatable
20 : * on standbys (to support cascading setups). The requirement that slots be
21 : * usable on standbys precludes storing them in the system catalogs.
22 : *
23 : * Each replication slot gets its own directory inside the directory
24 : * $PGDATA / PG_REPLSLOT_DIR. Inside that directory the state file will
25 : * contain the slot's own data. Additional data can be stored alongside that
26 : * file if required. While the server is running, the state data is also
27 : * cached in memory for efficiency.
28 : *
29 : * ReplicationSlotAllocationLock must be taken in exclusive mode to allocate
30 : * or free a slot. ReplicationSlotControlLock must be taken in shared mode
31 : * to iterate over the slots, and in exclusive mode to change the in_use flag
32 : * of a slot. The remaining data in each slot is protected by its mutex.
33 : *
34 : *-------------------------------------------------------------------------
35 : */
36 :
37 : #include "postgres.h"
38 :
39 : #include <unistd.h>
40 : #include <sys/stat.h>
41 :
42 : #include "access/transam.h"
43 : #include "access/xlog_internal.h"
44 : #include "access/xlogrecovery.h"
45 : #include "common/file_utils.h"
46 : #include "common/string.h"
47 : #include "miscadmin.h"
48 : #include "pgstat.h"
49 : #include "postmaster/interrupt.h"
50 : #include "replication/slotsync.h"
51 : #include "replication/slot.h"
52 : #include "replication/walsender_private.h"
53 : #include "storage/fd.h"
54 : #include "storage/ipc.h"
55 : #include "storage/proc.h"
56 : #include "storage/procarray.h"
57 : #include "utils/builtins.h"
58 : #include "utils/guc_hooks.h"
59 : #include "utils/varlena.h"
60 :
61 : /*
62 : * Replication slot on-disk data structure.
63 : */
64 : typedef struct ReplicationSlotOnDisk
65 : {
66 : /* first part of this struct needs to be version independent */
67 :
68 : /* data not covered by checksum */
69 : uint32 magic;
70 : pg_crc32c checksum;
71 :
72 : /* data covered by checksum */
73 : uint32 version;
74 : uint32 length;
75 :
76 : /*
77 : * The actual data in the slot that follows can differ based on the above
78 : * 'version'.
79 : */
80 :
81 : ReplicationSlotPersistentData slotdata;
82 : } ReplicationSlotOnDisk;
83 :
84 : /*
85 : * Struct for the configuration of synchronized_standby_slots.
86 : *
87 : * Note: this must be a flat representation that can be held in a single chunk
88 : * of guc_malloc'd memory, so that it can be stored as the "extra" data for the
89 : * synchronized_standby_slots GUC.
90 : */
91 : typedef struct
92 : {
93 : /* Number of slot names in the slot_names[] */
94 : int nslotnames;
95 :
96 : /*
97 : * slot_names contains 'nslotnames' consecutive null-terminated C strings.
98 : */
99 : char slot_names[FLEXIBLE_ARRAY_MEMBER];
100 : } SyncStandbySlotsConfigData;
101 :
102 : /*
103 : * Lookup table for slot invalidation causes.
104 : */
105 : const char *const SlotInvalidationCauses[] = {
106 : [RS_INVAL_NONE] = "none",
107 : [RS_INVAL_WAL_REMOVED] = "wal_removed",
108 : [RS_INVAL_HORIZON] = "rows_removed",
109 : [RS_INVAL_WAL_LEVEL] = "wal_level_insufficient",
110 : };
111 :
112 : /* Maximum number of invalidation causes */
113 : #define RS_INVAL_MAX_CAUSES RS_INVAL_WAL_LEVEL
114 :
115 : StaticAssertDecl(lengthof(SlotInvalidationCauses) == (RS_INVAL_MAX_CAUSES + 1),
116 : "array length mismatch");
117 :
118 : /* size of version independent data */
119 : #define ReplicationSlotOnDiskConstantSize \
120 : offsetof(ReplicationSlotOnDisk, slotdata)
121 : /* size of the part of the slot not covered by the checksum */
122 : #define ReplicationSlotOnDiskNotChecksummedSize \
123 : offsetof(ReplicationSlotOnDisk, version)
124 : /* size of the part covered by the checksum */
125 : #define ReplicationSlotOnDiskChecksummedSize \
126 : sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskNotChecksummedSize
127 : /* size of the slot data that is version dependent */
128 : #define ReplicationSlotOnDiskV2Size \
129 : sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
130 :
131 : #define SLOT_MAGIC 0x1051CA1 /* format identifier */
132 : #define SLOT_VERSION 5 /* version for new files */
133 :
134 : /* Control array for replication slot management */
135 : ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
136 :
137 : /* My backend's replication slot in the shared memory array */
138 : ReplicationSlot *MyReplicationSlot = NULL;
139 :
140 : /* GUC variables */
141 : int max_replication_slots = 10; /* the maximum number of replication
142 : * slots */
143 :
144 : /*
145 : * This GUC lists streaming replication standby server slot names that
146 : * logical WAL sender processes will wait for.
147 : */
148 : char *synchronized_standby_slots;
149 :
150 : /* This is the parsed and cached configuration for synchronized_standby_slots */
151 : static SyncStandbySlotsConfigData *synchronized_standby_slots_config;
152 :
153 : /*
154 : * Oldest LSN that has been confirmed to be flushed to the standbys
155 : * corresponding to the physical slots specified in the synchronized_standby_slots GUC.
156 : */
157 : static XLogRecPtr ss_oldest_flush_lsn = InvalidXLogRecPtr;
158 :
159 : static void ReplicationSlotShmemExit(int code, Datum arg);
160 : static void ReplicationSlotDropPtr(ReplicationSlot *slot);
161 :
162 : /* internal persistency functions */
163 : static void RestoreSlotFromDisk(const char *name);
164 : static void CreateSlotOnDisk(ReplicationSlot *slot);
165 : static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel);
166 :
167 : /*
168 : * Report shared-memory space needed by ReplicationSlotsShmemInit.
169 : */
170 : Size
171 7334 : ReplicationSlotsShmemSize(void)
172 : {
173 7334 : Size size = 0;
174 :
175 7334 : if (max_replication_slots == 0)
176 4 : return size;
177 :
178 7330 : size = offsetof(ReplicationSlotCtlData, replication_slots);
179 7330 : size = add_size(size,
180 : mul_size(max_replication_slots, sizeof(ReplicationSlot)));
181 :
182 7330 : return size;
183 : }
184 :
185 : /*
186 : * Allocate and initialize shared memory for replication slots.
187 : */
188 : void
189 1902 : ReplicationSlotsShmemInit(void)
190 : {
191 : bool found;
192 :
193 1902 : if (max_replication_slots == 0)
194 2 : return;
195 :
196 1900 : ReplicationSlotCtl = (ReplicationSlotCtlData *)
197 1900 : ShmemInitStruct("ReplicationSlot Ctl", ReplicationSlotsShmemSize(),
198 : &found);
199 :
200 1900 : if (!found)
201 : {
202 : int i;
203 :
204 : /* First time through, so initialize */
205 3580 : MemSet(ReplicationSlotCtl, 0, ReplicationSlotsShmemSize());
206 :
207 20524 : for (i = 0; i < max_replication_slots; i++)
208 : {
209 18624 : ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i];
210 :
211 : /* everything else is zeroed by the memset above */
212 18624 : SpinLockInit(&slot->mutex);
213 18624 : LWLockInitialize(&slot->io_in_progress_lock,
214 : LWTRANCHE_REPLICATION_SLOT_IO);
215 18624 : ConditionVariableInit(&slot->active_cv);
216 : }
217 : }
218 : }
219 :
220 : /*
221 : * Register the callback for replication slot cleanup and releasing.
222 : */
223 : void
224 35550 : ReplicationSlotInitialize(void)
225 : {
226 35550 : before_shmem_exit(ReplicationSlotShmemExit, 0);
227 35550 : }
228 :
229 : /*
230 : * Release and cleanup replication slots.
231 : */
232 : static void
233 35550 : ReplicationSlotShmemExit(int code, Datum arg)
234 : {
235 : /* Make sure active replication slots are released */
236 35550 : if (MyReplicationSlot != NULL)
237 410 : ReplicationSlotRelease();
238 :
239 : /* Also cleanup all the temporary slots. */
240 35550 : ReplicationSlotCleanup(false);
241 35550 : }
242 :
243 : /*
244 : * Check whether the passed slot name is valid and report errors at elevel.
245 : *
246 : * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow
247 : * the name to be used as a directory name on every supported OS.
248 : *
249 : * Returns whether the directory name is valid or not if elevel < ERROR.
250 : */
251 : bool
252 1632 : ReplicationSlotValidateName(const char *name, int elevel)
253 : {
254 : const char *cp;
255 :
256 1632 : if (strlen(name) == 0)
257 : {
258 6 : ereport(elevel,
259 : (errcode(ERRCODE_INVALID_NAME),
260 : errmsg("replication slot name \"%s\" is too short",
261 : name)));
262 0 : return false;
263 : }
264 :
265 1626 : if (strlen(name) >= NAMEDATALEN)
266 : {
267 0 : ereport(elevel,
268 : (errcode(ERRCODE_NAME_TOO_LONG),
269 : errmsg("replication slot name \"%s\" is too long",
270 : name)));
271 0 : return false;
272 : }
273 :
274 32552 : for (cp = name; *cp; cp++)
275 : {
276 30928 : if (!((*cp >= 'a' && *cp <= 'z')
277 15196 : || (*cp >= '0' && *cp <= '9')
278 2990 : || (*cp == '_')))
279 : {
280 2 : ereport(elevel,
281 : (errcode(ERRCODE_INVALID_NAME),
282 : errmsg("replication slot name \"%s\" contains invalid character",
283 : name),
284 : errhint("Replication slot names may only contain lower case letters, numbers, and the underscore character.")));
285 0 : return false;
286 : }
287 : }
288 1624 : return true;
289 : }
290 :
291 : /*
292 : * Create a new replication slot and mark it as used by this backend.
293 : *
294 : * name: Name of the slot
295 : * db_specific: logical decoding is db specific; if the slot is going to
296 : * be used for that pass true, otherwise false.
297 : * two_phase: Allows decoding of prepared transactions. We allow this option
298 : * to be enabled only at the slot creation time. If we allow this option
299 : * to be changed during decoding then it is quite possible that we skip
300 : * prepare first time because this option was not enabled. Now next time
301 : * during getting changes, if the two_phase option is enabled it can skip
302 : * prepare because by that time start decoding point has been moved. So the
303 : * user will only get commit prepared.
304 : * failover: If enabled, allows the slot to be synced to standbys so
305 : * that logical replication can be resumed after failover.
306 : * synced: True if the slot is synchronized from the primary server.
307 : */
308 : void
309 1176 : ReplicationSlotCreate(const char *name, bool db_specific,
310 : ReplicationSlotPersistency persistency,
311 : bool two_phase, bool failover, bool synced)
312 : {
313 1176 : ReplicationSlot *slot = NULL;
314 : int i;
315 :
316 : Assert(MyReplicationSlot == NULL);
317 :
318 1176 : ReplicationSlotValidateName(name, ERROR);
319 :
320 1174 : if (failover)
321 : {
322 : /*
323 : * Do not allow users to create the failover enabled slots on the
324 : * standby as we do not support sync to the cascading standby.
325 : *
326 : * However, failover enabled slots can be created during slot
327 : * synchronization because we need to retain the same values as the
328 : * remote slot.
329 : */
330 42 : if (RecoveryInProgress() && !IsSyncingReplicationSlots())
331 0 : ereport(ERROR,
332 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
333 : errmsg("cannot enable failover for a replication slot created on the standby"));
334 :
335 : /*
336 : * Do not allow users to create failover enabled temporary slots,
337 : * because temporary slots will not be synced to the standby.
338 : *
339 : * However, failover enabled temporary slots can be created during
340 : * slot synchronization. See the comments atop slotsync.c for details.
341 : */
342 42 : if (persistency == RS_TEMPORARY && !IsSyncingReplicationSlots())
343 2 : ereport(ERROR,
344 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
345 : errmsg("cannot enable failover for a temporary replication slot"));
346 : }
347 :
348 : /*
349 : * If some other backend ran this code concurrently with us, we'd likely
350 : * both allocate the same slot, and that would be bad. We'd also be at
351 : * risk of missing a name collision. Also, we don't want to try to create
352 : * a new slot while somebody's busy cleaning up an old one, because we
353 : * might both be monkeying with the same directory.
354 : */
355 1172 : LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
356 :
357 : /*
358 : * Check for name collision, and identify an allocatable slot. We need to
359 : * hold ReplicationSlotControlLock in shared mode for this, so that nobody
360 : * else can change the in_use flags while we're looking at them.
361 : */
362 1172 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
363 11188 : for (i = 0; i < max_replication_slots; i++)
364 : {
365 10022 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
366 :
367 10022 : if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
368 6 : ereport(ERROR,
369 : (errcode(ERRCODE_DUPLICATE_OBJECT),
370 : errmsg("replication slot \"%s\" already exists", name)));
371 10016 : if (!s->in_use && slot == NULL)
372 1164 : slot = s;
373 : }
374 1166 : LWLockRelease(ReplicationSlotControlLock);
375 :
376 : /* If all slots are in use, we're out of luck. */
377 1166 : if (slot == NULL)
378 2 : ereport(ERROR,
379 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
380 : errmsg("all replication slots are in use"),
381 : errhint("Free one or increase \"max_replication_slots\".")));
382 :
383 : /*
384 : * Since this slot is not in use, nobody should be looking at any part of
385 : * it other than the in_use field unless they're trying to allocate it.
386 : * And since we hold ReplicationSlotAllocationLock, nobody except us can
387 : * be doing that. So it's safe to initialize the slot.
388 : */
389 : Assert(!slot->in_use);
390 : Assert(slot->active_pid == 0);
391 :
392 : /* first initialize persistent data */
393 1164 : memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData));
394 1164 : namestrcpy(&slot->data.name, name);
395 1164 : slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
396 1164 : slot->data.persistency = persistency;
397 1164 : slot->data.two_phase = two_phase;
398 1164 : slot->data.two_phase_at = InvalidXLogRecPtr;
399 1164 : slot->data.failover = failover;
400 1164 : slot->data.synced = synced;
401 :
402 : /* and then data only present in shared memory */
403 1164 : slot->just_dirtied = false;
404 1164 : slot->dirty = false;
405 1164 : slot->effective_xmin = InvalidTransactionId;
406 1164 : slot->effective_catalog_xmin = InvalidTransactionId;
407 1164 : slot->candidate_catalog_xmin = InvalidTransactionId;
408 1164 : slot->candidate_xmin_lsn = InvalidXLogRecPtr;
409 1164 : slot->candidate_restart_valid = InvalidXLogRecPtr;
410 1164 : slot->candidate_restart_lsn = InvalidXLogRecPtr;
411 1164 : slot->last_saved_confirmed_flush = InvalidXLogRecPtr;
412 1164 : slot->inactive_since = 0;
413 :
414 : /*
415 : * Create the slot on disk. We haven't actually marked the slot allocated
416 : * yet, so no special cleanup is required if this errors out.
417 : */
418 1164 : CreateSlotOnDisk(slot);
419 :
420 : /*
421 : * We need to briefly prevent any other backend from iterating over the
422 : * slots while we flip the in_use flag. We also need to set the active
423 : * flag while holding the ControlLock as otherwise a concurrent
424 : * ReplicationSlotAcquire() could acquire the slot as well.
425 : */
426 1164 : LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
427 :
428 1164 : slot->in_use = true;
429 :
430 : /* We can now mark the slot active, and that makes it our slot. */
431 1164 : SpinLockAcquire(&slot->mutex);
432 : Assert(slot->active_pid == 0);
433 1164 : slot->active_pid = MyProcPid;
434 1164 : SpinLockRelease(&slot->mutex);
435 1164 : MyReplicationSlot = slot;
436 :
437 1164 : LWLockRelease(ReplicationSlotControlLock);
438 :
439 : /*
440 : * Create statistics entry for the new logical slot. We don't collect any
441 : * stats for physical slots, so no need to create an entry for the same.
442 : * See ReplicationSlotDropPtr for why we need to do this before releasing
443 : * ReplicationSlotAllocationLock.
444 : */
445 1164 : if (SlotIsLogical(slot))
446 828 : pgstat_create_replslot(slot);
447 :
448 : /*
449 : * Now that the slot has been marked as in_use and active, it's safe to
450 : * let somebody else try to allocate a slot.
451 : */
452 1164 : LWLockRelease(ReplicationSlotAllocationLock);
453 :
454 : /* Let everybody know we've modified this slot */
455 1164 : ConditionVariableBroadcast(&slot->active_cv);
456 1164 : }
457 :
458 : /*
459 : * Search for the named replication slot.
460 : *
461 : * Return the replication slot if found, otherwise NULL.
462 : */
463 : ReplicationSlot *
464 2558 : SearchNamedReplicationSlot(const char *name, bool need_lock)
465 : {
466 : int i;
467 2558 : ReplicationSlot *slot = NULL;
468 :
469 2558 : if (need_lock)
470 156 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
471 :
472 4376 : for (i = 0; i < max_replication_slots; i++)
473 : {
474 4336 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
475 :
476 4336 : if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
477 : {
478 2518 : slot = s;
479 2518 : break;
480 : }
481 : }
482 :
483 2558 : if (need_lock)
484 156 : LWLockRelease(ReplicationSlotControlLock);
485 :
486 2558 : return slot;
487 : }
488 :
489 : /*
490 : * Return the index of the replication slot in
491 : * ReplicationSlotCtl->replication_slots.
492 : *
493 : * This is mainly useful to have an efficient key for storing replication slot
494 : * stats.
495 : */
496 : int
497 15058 : ReplicationSlotIndex(ReplicationSlot *slot)
498 : {
499 : Assert(slot >= ReplicationSlotCtl->replication_slots &&
500 : slot < ReplicationSlotCtl->replication_slots + max_replication_slots);
501 :
502 15058 : return slot - ReplicationSlotCtl->replication_slots;
503 : }
504 :
505 : /*
506 : * If the slot at 'index' is unused, return false. Otherwise 'name' is set to
507 : * the slot's name and true is returned.
508 : *
509 : * This likely is only useful for pgstat_replslot.c during shutdown, in other
510 : * cases there are obvious TOCTOU issues.
511 : */
512 : bool
513 150 : ReplicationSlotName(int index, Name name)
514 : {
515 : ReplicationSlot *slot;
516 : bool found;
517 :
518 150 : slot = &ReplicationSlotCtl->replication_slots[index];
519 :
520 : /*
521 : * Ensure that the slot cannot be dropped while we copy the name. Don't
522 : * need the spinlock as the name of an existing slot cannot change.
523 : */
524 150 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
525 150 : found = slot->in_use;
526 150 : if (slot->in_use)
527 150 : namestrcpy(name, NameStr(slot->data.name));
528 150 : LWLockRelease(ReplicationSlotControlLock);
529 :
530 150 : return found;
531 : }
532 :
533 : /*
534 : * Find a previously created slot and mark it as used by this process.
535 : *
536 : * An error is raised if nowait is true and the slot is currently in use. If
537 : * nowait is false, we sleep until the slot is released by the owning process.
538 : */
539 : void
540 2260 : ReplicationSlotAcquire(const char *name, bool nowait)
541 : {
542 : ReplicationSlot *s;
543 : int active_pid;
544 :
545 : Assert(name != NULL);
546 :
547 2260 : retry:
548 : Assert(MyReplicationSlot == NULL);
549 :
550 2260 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
551 :
552 : /* Check if the slot exits with the given name. */
553 2260 : s = SearchNamedReplicationSlot(name, false);
554 2260 : if (s == NULL || !s->in_use)
555 : {
556 16 : LWLockRelease(ReplicationSlotControlLock);
557 :
558 16 : ereport(ERROR,
559 : (errcode(ERRCODE_UNDEFINED_OBJECT),
560 : errmsg("replication slot \"%s\" does not exist",
561 : name)));
562 : }
563 :
564 : /*
565 : * This is the slot we want; check if it's active under some other
566 : * process. In single user mode, we don't need this check.
567 : */
568 2244 : if (IsUnderPostmaster)
569 : {
570 : /*
571 : * Get ready to sleep on the slot in case it is active. (We may end
572 : * up not sleeping, but we don't want to do this while holding the
573 : * spinlock.)
574 : */
575 2244 : if (!nowait)
576 468 : ConditionVariablePrepareToSleep(&s->active_cv);
577 :
578 2244 : SpinLockAcquire(&s->mutex);
579 2244 : if (s->active_pid == 0)
580 1972 : s->active_pid = MyProcPid;
581 2244 : active_pid = s->active_pid;
582 2244 : SpinLockRelease(&s->mutex);
583 : }
584 : else
585 0 : active_pid = MyProcPid;
586 2244 : LWLockRelease(ReplicationSlotControlLock);
587 :
588 : /*
589 : * If we found the slot but it's already active in another process, we
590 : * wait until the owning process signals us that it's been released, or
591 : * error out.
592 : */
593 2244 : if (active_pid != MyProcPid)
594 : {
595 0 : if (!nowait)
596 : {
597 : /* Wait here until we get signaled, and then restart */
598 0 : ConditionVariableSleep(&s->active_cv,
599 : WAIT_EVENT_REPLICATION_SLOT_DROP);
600 0 : ConditionVariableCancelSleep();
601 0 : goto retry;
602 : }
603 :
604 0 : ereport(ERROR,
605 : (errcode(ERRCODE_OBJECT_IN_USE),
606 : errmsg("replication slot \"%s\" is active for PID %d",
607 : NameStr(s->data.name), active_pid)));
608 : }
609 2244 : else if (!nowait)
610 468 : ConditionVariableCancelSleep(); /* no sleep needed after all */
611 :
612 : /* Let everybody know we've modified this slot */
613 2244 : ConditionVariableBroadcast(&s->active_cv);
614 :
615 : /* We made this slot active, so it's ours now. */
616 2244 : MyReplicationSlot = s;
617 :
618 : /*
619 : * The call to pgstat_acquire_replslot() protects against stats for a
620 : * different slot, from before a restart or such, being present during
621 : * pgstat_report_replslot().
622 : */
623 2244 : if (SlotIsLogical(s))
624 1856 : pgstat_acquire_replslot(s);
625 :
626 : /*
627 : * Reset the time since the slot has become inactive as the slot is active
628 : * now.
629 : */
630 2244 : SpinLockAcquire(&s->mutex);
631 2244 : s->inactive_since = 0;
632 2244 : SpinLockRelease(&s->mutex);
633 :
634 2244 : if (am_walsender)
635 : {
636 1512 : ereport(log_replication_commands ? LOG : DEBUG1,
637 : SlotIsLogical(s)
638 : ? errmsg("acquired logical replication slot \"%s\"",
639 : NameStr(s->data.name))
640 : : errmsg("acquired physical replication slot \"%s\"",
641 : NameStr(s->data.name)));
642 : }
643 2244 : }
644 :
645 : /*
646 : * Release the replication slot that this backend considers to own.
647 : *
648 : * This or another backend can re-acquire the slot later.
649 : * Resources this slot requires will be preserved.
650 : */
651 : void
652 2734 : ReplicationSlotRelease(void)
653 : {
654 2734 : ReplicationSlot *slot = MyReplicationSlot;
655 2734 : char *slotname = NULL; /* keep compiler quiet */
656 2734 : bool is_logical = false; /* keep compiler quiet */
657 2734 : TimestampTz now = 0;
658 :
659 : Assert(slot != NULL && slot->active_pid != 0);
660 :
661 2734 : if (am_walsender)
662 : {
663 1900 : slotname = pstrdup(NameStr(slot->data.name));
664 1900 : is_logical = SlotIsLogical(slot);
665 : }
666 :
667 2734 : if (slot->data.persistency == RS_EPHEMERAL)
668 : {
669 : /*
670 : * Delete the slot. There is no !PANIC case where this is allowed to
671 : * fail, all that may happen is an incomplete cleanup of the on-disk
672 : * data.
673 : */
674 10 : ReplicationSlotDropAcquired();
675 : }
676 :
677 : /*
678 : * If slot needed to temporarily restrain both data and catalog xmin to
679 : * create the catalog snapshot, remove that temporary constraint.
680 : * Snapshots can only be exported while the initial snapshot is still
681 : * acquired.
682 : */
683 2734 : if (!TransactionIdIsValid(slot->data.xmin) &&
684 2686 : TransactionIdIsValid(slot->effective_xmin))
685 : {
686 344 : SpinLockAcquire(&slot->mutex);
687 344 : slot->effective_xmin = InvalidTransactionId;
688 344 : SpinLockRelease(&slot->mutex);
689 344 : ReplicationSlotsComputeRequiredXmin(false);
690 : }
691 :
692 : /*
693 : * Set the time since the slot has become inactive. We get the current
694 : * time beforehand to avoid system call while holding the spinlock.
695 : */
696 2734 : now = GetCurrentTimestamp();
697 :
698 2734 : if (slot->data.persistency == RS_PERSISTENT)
699 : {
700 : /*
701 : * Mark persistent slot inactive. We're not freeing it, just
702 : * disconnecting, but wake up others that may be waiting for it.
703 : */
704 2178 : SpinLockAcquire(&slot->mutex);
705 2178 : slot->active_pid = 0;
706 2178 : slot->inactive_since = now;
707 2178 : SpinLockRelease(&slot->mutex);
708 2178 : ConditionVariableBroadcast(&slot->active_cv);
709 : }
710 : else
711 : {
712 556 : SpinLockAcquire(&slot->mutex);
713 556 : slot->inactive_since = now;
714 556 : SpinLockRelease(&slot->mutex);
715 : }
716 :
717 2734 : MyReplicationSlot = NULL;
718 :
719 : /* might not have been set when we've been a plain slot */
720 2734 : LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
721 2734 : MyProc->statusFlags &= ~PROC_IN_LOGICAL_DECODING;
722 2734 : ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags;
723 2734 : LWLockRelease(ProcArrayLock);
724 :
725 2734 : if (am_walsender)
726 : {
727 1900 : ereport(log_replication_commands ? LOG : DEBUG1,
728 : is_logical
729 : ? errmsg("released logical replication slot \"%s\"",
730 : slotname)
731 : : errmsg("released physical replication slot \"%s\"",
732 : slotname));
733 :
734 1900 : pfree(slotname);
735 : }
736 2734 : }
737 :
738 : /*
739 : * Cleanup temporary slots created in current session.
740 : *
741 : * Cleanup only synced temporary slots if 'synced_only' is true, else
742 : * cleanup all temporary slots.
743 : */
744 : void
745 77100 : ReplicationSlotCleanup(bool synced_only)
746 : {
747 : int i;
748 :
749 : Assert(MyReplicationSlot == NULL);
750 :
751 77100 : restart:
752 77100 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
753 835436 : for (i = 0; i < max_replication_slots; i++)
754 : {
755 758610 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
756 :
757 758610 : if (!s->in_use)
758 734418 : continue;
759 :
760 24192 : SpinLockAcquire(&s->mutex);
761 24192 : if ((s->active_pid == MyProcPid &&
762 274 : (!synced_only || s->data.synced)))
763 : {
764 : Assert(s->data.persistency == RS_TEMPORARY);
765 274 : SpinLockRelease(&s->mutex);
766 274 : LWLockRelease(ReplicationSlotControlLock); /* avoid deadlock */
767 :
768 274 : ReplicationSlotDropPtr(s);
769 :
770 274 : ConditionVariableBroadcast(&s->active_cv);
771 274 : goto restart;
772 : }
773 : else
774 23918 : SpinLockRelease(&s->mutex);
775 : }
776 :
777 76826 : LWLockRelease(ReplicationSlotControlLock);
778 76826 : }
779 :
780 : /*
781 : * Permanently drop replication slot identified by the passed in name.
782 : */
783 : void
784 710 : ReplicationSlotDrop(const char *name, bool nowait)
785 : {
786 : Assert(MyReplicationSlot == NULL);
787 :
788 710 : ReplicationSlotAcquire(name, nowait);
789 :
790 : /*
791 : * Do not allow users to drop the slots which are currently being synced
792 : * from the primary to the standby.
793 : */
794 700 : if (RecoveryInProgress() && MyReplicationSlot->data.synced)
795 2 : ereport(ERROR,
796 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
797 : errmsg("cannot drop replication slot \"%s\"", name),
798 : errdetail("This replication slot is being synchronized from the primary server."));
799 :
800 698 : ReplicationSlotDropAcquired();
801 698 : }
802 :
803 : /*
804 : * Change the definition of the slot identified by the specified name.
805 : */
806 : void
807 12 : ReplicationSlotAlter(const char *name, const bool *failover,
808 : const bool *two_phase)
809 : {
810 12 : bool update_slot = false;
811 :
812 : Assert(MyReplicationSlot == NULL);
813 : Assert(failover || two_phase);
814 :
815 12 : ReplicationSlotAcquire(name, false);
816 :
817 12 : if (SlotIsPhysical(MyReplicationSlot))
818 0 : ereport(ERROR,
819 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
820 : errmsg("cannot use %s with a physical replication slot",
821 : "ALTER_REPLICATION_SLOT"));
822 :
823 12 : if (MyReplicationSlot->data.invalidated != RS_INVAL_NONE)
824 2 : ereport(ERROR,
825 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
826 : errmsg("cannot alter invalid replication slot \"%s\"", name),
827 : errdetail("This replication slot has been invalidated due to \"%s\".",
828 : SlotInvalidationCauses[MyReplicationSlot->data.invalidated]));
829 :
830 10 : if (RecoveryInProgress())
831 : {
832 : /*
833 : * Do not allow users to alter the slots which are currently being
834 : * synced from the primary to the standby.
835 : */
836 2 : if (MyReplicationSlot->data.synced)
837 2 : ereport(ERROR,
838 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
839 : errmsg("cannot alter replication slot \"%s\"", name),
840 : errdetail("This replication slot is being synchronized from the primary server."));
841 :
842 : /*
843 : * Do not allow users to enable failover on the standby as we do not
844 : * support sync to the cascading standby.
845 : */
846 0 : if (failover && *failover)
847 0 : ereport(ERROR,
848 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
849 : errmsg("cannot enable failover for a replication slot"
850 : " on the standby"));
851 : }
852 :
853 8 : if (failover)
854 : {
855 : /*
856 : * Do not allow users to enable failover for temporary slots as we do
857 : * not support syncing temporary slots to the standby.
858 : */
859 6 : if (*failover && MyReplicationSlot->data.persistency == RS_TEMPORARY)
860 0 : ereport(ERROR,
861 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
862 : errmsg("cannot enable failover for a temporary replication slot"));
863 :
864 6 : if (MyReplicationSlot->data.failover != *failover)
865 : {
866 6 : SpinLockAcquire(&MyReplicationSlot->mutex);
867 6 : MyReplicationSlot->data.failover = *failover;
868 6 : SpinLockRelease(&MyReplicationSlot->mutex);
869 :
870 6 : update_slot = true;
871 : }
872 : }
873 :
874 8 : if (two_phase && MyReplicationSlot->data.two_phase != *two_phase)
875 : {
876 2 : SpinLockAcquire(&MyReplicationSlot->mutex);
877 2 : MyReplicationSlot->data.two_phase = *two_phase;
878 2 : SpinLockRelease(&MyReplicationSlot->mutex);
879 :
880 2 : update_slot = true;
881 : }
882 :
883 8 : if (update_slot)
884 : {
885 8 : ReplicationSlotMarkDirty();
886 8 : ReplicationSlotSave();
887 : }
888 :
889 8 : ReplicationSlotRelease();
890 8 : }
891 :
892 : /*
893 : * Permanently drop the currently acquired replication slot.
894 : */
895 : void
896 722 : ReplicationSlotDropAcquired(void)
897 : {
898 722 : ReplicationSlot *slot = MyReplicationSlot;
899 :
900 : Assert(MyReplicationSlot != NULL);
901 :
902 : /* slot isn't acquired anymore */
903 722 : MyReplicationSlot = NULL;
904 :
905 722 : ReplicationSlotDropPtr(slot);
906 722 : }
907 :
908 : /*
909 : * Permanently drop the replication slot which will be released by the point
910 : * this function returns.
911 : */
912 : static void
913 996 : ReplicationSlotDropPtr(ReplicationSlot *slot)
914 : {
915 : char path[MAXPGPATH];
916 : char tmppath[MAXPGPATH];
917 :
918 : /*
919 : * If some other backend ran this code concurrently with us, we might try
920 : * to delete a slot with a certain name while someone else was trying to
921 : * create a slot with the same name.
922 : */
923 996 : LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
924 :
925 : /* Generate pathnames. */
926 996 : sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(slot->data.name));
927 996 : sprintf(tmppath, "%s/%s.tmp", PG_REPLSLOT_DIR, NameStr(slot->data.name));
928 :
929 : /*
930 : * Rename the slot directory on disk, so that we'll no longer recognize
931 : * this as a valid slot. Note that if this fails, we've got to mark the
932 : * slot inactive before bailing out. If we're dropping an ephemeral or a
933 : * temporary slot, we better never fail hard as the caller won't expect
934 : * the slot to survive and this might get called during error handling.
935 : */
936 996 : if (rename(path, tmppath) == 0)
937 : {
938 : /*
939 : * We need to fsync() the directory we just renamed and its parent to
940 : * make sure that our changes are on disk in a crash-safe fashion. If
941 : * fsync() fails, we can't be sure whether the changes are on disk or
942 : * not. For now, we handle that by panicking;
943 : * StartupReplicationSlots() will try to straighten it out after
944 : * restart.
945 : */
946 996 : START_CRIT_SECTION();
947 996 : fsync_fname(tmppath, true);
948 996 : fsync_fname(PG_REPLSLOT_DIR, true);
949 996 : END_CRIT_SECTION();
950 : }
951 : else
952 : {
953 0 : bool fail_softly = slot->data.persistency != RS_PERSISTENT;
954 :
955 0 : SpinLockAcquire(&slot->mutex);
956 0 : slot->active_pid = 0;
957 0 : SpinLockRelease(&slot->mutex);
958 :
959 : /* wake up anyone waiting on this slot */
960 0 : ConditionVariableBroadcast(&slot->active_cv);
961 :
962 0 : ereport(fail_softly ? WARNING : ERROR,
963 : (errcode_for_file_access(),
964 : errmsg("could not rename file \"%s\" to \"%s\": %m",
965 : path, tmppath)));
966 : }
967 :
968 : /*
969 : * The slot is definitely gone. Lock out concurrent scans of the array
970 : * long enough to kill it. It's OK to clear the active PID here without
971 : * grabbing the mutex because nobody else can be scanning the array here,
972 : * and nobody can be attached to this slot and thus access it without
973 : * scanning the array.
974 : *
975 : * Also wake up processes waiting for it.
976 : */
977 996 : LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
978 996 : slot->active_pid = 0;
979 996 : slot->in_use = false;
980 996 : LWLockRelease(ReplicationSlotControlLock);
981 996 : ConditionVariableBroadcast(&slot->active_cv);
982 :
983 : /*
984 : * Slot is dead and doesn't prevent resource removal anymore, recompute
985 : * limits.
986 : */
987 996 : ReplicationSlotsComputeRequiredXmin(false);
988 996 : ReplicationSlotsComputeRequiredLSN();
989 :
990 : /*
991 : * If removing the directory fails, the worst thing that will happen is
992 : * that the user won't be able to create a new slot with the same name
993 : * until the next server restart. We warn about it, but that's all.
994 : */
995 996 : if (!rmtree(tmppath, true))
996 0 : ereport(WARNING,
997 : (errmsg("could not remove directory \"%s\"", tmppath)));
998 :
999 : /*
1000 : * Drop the statistics entry for the replication slot. Do this while
1001 : * holding ReplicationSlotAllocationLock so that we don't drop a
1002 : * statistics entry for another slot with the same name just created in
1003 : * another session.
1004 : */
1005 996 : if (SlotIsLogical(slot))
1006 706 : pgstat_drop_replslot(slot);
1007 :
1008 : /*
1009 : * We release this at the very end, so that nobody starts trying to create
1010 : * a slot while we're still cleaning up the detritus of the old one.
1011 : */
1012 996 : LWLockRelease(ReplicationSlotAllocationLock);
1013 996 : }
1014 :
1015 : /*
1016 : * Serialize the currently acquired slot's state from memory to disk, thereby
1017 : * guaranteeing the current state will survive a crash.
1018 : */
1019 : void
1020 2310 : ReplicationSlotSave(void)
1021 : {
1022 : char path[MAXPGPATH];
1023 :
1024 : Assert(MyReplicationSlot != NULL);
1025 :
1026 2310 : sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(MyReplicationSlot->data.name));
1027 2310 : SaveSlotToPath(MyReplicationSlot, path, ERROR);
1028 2310 : }
1029 :
1030 : /*
1031 : * Signal that it would be useful if the currently acquired slot would be
1032 : * flushed out to disk.
1033 : *
1034 : * Note that the actual flush to disk can be delayed for a long time, if
1035 : * required for correctness explicitly do a ReplicationSlotSave().
1036 : */
1037 : void
1038 12824 : ReplicationSlotMarkDirty(void)
1039 : {
1040 12824 : ReplicationSlot *slot = MyReplicationSlot;
1041 :
1042 : Assert(MyReplicationSlot != NULL);
1043 :
1044 12824 : SpinLockAcquire(&slot->mutex);
1045 12824 : MyReplicationSlot->just_dirtied = true;
1046 12824 : MyReplicationSlot->dirty = true;
1047 12824 : SpinLockRelease(&slot->mutex);
1048 12824 : }
1049 :
1050 : /*
1051 : * Convert a slot that's marked as RS_EPHEMERAL or RS_TEMPORARY to a
1052 : * RS_PERSISTENT slot, guaranteeing it will be there after an eventual crash.
1053 : */
1054 : void
1055 802 : ReplicationSlotPersist(void)
1056 : {
1057 802 : ReplicationSlot *slot = MyReplicationSlot;
1058 :
1059 : Assert(slot != NULL);
1060 : Assert(slot->data.persistency != RS_PERSISTENT);
1061 :
1062 802 : SpinLockAcquire(&slot->mutex);
1063 802 : slot->data.persistency = RS_PERSISTENT;
1064 802 : SpinLockRelease(&slot->mutex);
1065 :
1066 802 : ReplicationSlotMarkDirty();
1067 802 : ReplicationSlotSave();
1068 802 : }
1069 :
1070 : /*
1071 : * Compute the oldest xmin across all slots and store it in the ProcArray.
1072 : *
1073 : * If already_locked is true, ProcArrayLock has already been acquired
1074 : * exclusively.
1075 : */
1076 : void
1077 4072 : ReplicationSlotsComputeRequiredXmin(bool already_locked)
1078 : {
1079 : int i;
1080 4072 : TransactionId agg_xmin = InvalidTransactionId;
1081 4072 : TransactionId agg_catalog_xmin = InvalidTransactionId;
1082 :
1083 : Assert(ReplicationSlotCtl != NULL);
1084 :
1085 4072 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1086 :
1087 40922 : for (i = 0; i < max_replication_slots; i++)
1088 : {
1089 36850 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
1090 : TransactionId effective_xmin;
1091 : TransactionId effective_catalog_xmin;
1092 : bool invalidated;
1093 :
1094 36850 : if (!s->in_use)
1095 33198 : continue;
1096 :
1097 3652 : SpinLockAcquire(&s->mutex);
1098 3652 : effective_xmin = s->effective_xmin;
1099 3652 : effective_catalog_xmin = s->effective_catalog_xmin;
1100 3652 : invalidated = s->data.invalidated != RS_INVAL_NONE;
1101 3652 : SpinLockRelease(&s->mutex);
1102 :
1103 : /* invalidated slots need not apply */
1104 3652 : if (invalidated)
1105 44 : continue;
1106 :
1107 : /* check the data xmin */
1108 3608 : if (TransactionIdIsValid(effective_xmin) &&
1109 6 : (!TransactionIdIsValid(agg_xmin) ||
1110 6 : TransactionIdPrecedes(effective_xmin, agg_xmin)))
1111 518 : agg_xmin = effective_xmin;
1112 :
1113 : /* check the catalog xmin */
1114 3608 : if (TransactionIdIsValid(effective_catalog_xmin) &&
1115 1472 : (!TransactionIdIsValid(agg_catalog_xmin) ||
1116 1472 : TransactionIdPrecedes(effective_catalog_xmin, agg_catalog_xmin)))
1117 1998 : agg_catalog_xmin = effective_catalog_xmin;
1118 : }
1119 :
1120 4072 : LWLockRelease(ReplicationSlotControlLock);
1121 :
1122 4072 : ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
1123 4072 : }
1124 :
1125 : /*
1126 : * Compute the oldest restart LSN across all slots and inform xlog module.
1127 : *
1128 : * Note: while max_slot_wal_keep_size is theoretically relevant for this
1129 : * purpose, we don't try to account for that, because this module doesn't
1130 : * know what to compare against.
1131 : */
1132 : void
1133 13670 : ReplicationSlotsComputeRequiredLSN(void)
1134 : {
1135 : int i;
1136 13670 : XLogRecPtr min_required = InvalidXLogRecPtr;
1137 :
1138 : Assert(ReplicationSlotCtl != NULL);
1139 :
1140 13670 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1141 144066 : for (i = 0; i < max_replication_slots; i++)
1142 : {
1143 130396 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
1144 : XLogRecPtr restart_lsn;
1145 : bool invalidated;
1146 :
1147 130396 : if (!s->in_use)
1148 117242 : continue;
1149 :
1150 13154 : SpinLockAcquire(&s->mutex);
1151 13154 : restart_lsn = s->data.restart_lsn;
1152 13154 : invalidated = s->data.invalidated != RS_INVAL_NONE;
1153 13154 : SpinLockRelease(&s->mutex);
1154 :
1155 : /* invalidated slots need not apply */
1156 13154 : if (invalidated)
1157 46 : continue;
1158 :
1159 13108 : if (restart_lsn != InvalidXLogRecPtr &&
1160 1426 : (min_required == InvalidXLogRecPtr ||
1161 : restart_lsn < min_required))
1162 11744 : min_required = restart_lsn;
1163 : }
1164 13670 : LWLockRelease(ReplicationSlotControlLock);
1165 :
1166 13670 : XLogSetReplicationSlotMinimumLSN(min_required);
1167 13670 : }
1168 :
1169 : /*
1170 : * Compute the oldest WAL LSN required by *logical* decoding slots..
1171 : *
1172 : * Returns InvalidXLogRecPtr if logical decoding is disabled or no logical
1173 : * slots exist.
1174 : *
1175 : * NB: this returns a value >= ReplicationSlotsComputeRequiredLSN(), since it
1176 : * ignores physical replication slots.
1177 : *
1178 : * The results aren't required frequently, so we don't maintain a precomputed
1179 : * value like we do for ComputeRequiredLSN() and ComputeRequiredXmin().
1180 : */
1181 : XLogRecPtr
1182 4952 : ReplicationSlotsComputeLogicalRestartLSN(void)
1183 : {
1184 4952 : XLogRecPtr result = InvalidXLogRecPtr;
1185 : int i;
1186 :
1187 4952 : if (max_replication_slots <= 0)
1188 4 : return InvalidXLogRecPtr;
1189 :
1190 4948 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1191 :
1192 53392 : for (i = 0; i < max_replication_slots; i++)
1193 : {
1194 : ReplicationSlot *s;
1195 : XLogRecPtr restart_lsn;
1196 : bool invalidated;
1197 :
1198 48444 : s = &ReplicationSlotCtl->replication_slots[i];
1199 :
1200 : /* cannot change while ReplicationSlotCtlLock is held */
1201 48444 : if (!s->in_use)
1202 47148 : continue;
1203 :
1204 : /* we're only interested in logical slots */
1205 1296 : if (!SlotIsLogical(s))
1206 948 : continue;
1207 :
1208 : /* read once, it's ok if it increases while we're checking */
1209 348 : SpinLockAcquire(&s->mutex);
1210 348 : restart_lsn = s->data.restart_lsn;
1211 348 : invalidated = s->data.invalidated != RS_INVAL_NONE;
1212 348 : SpinLockRelease(&s->mutex);
1213 :
1214 : /* invalidated slots need not apply */
1215 348 : if (invalidated)
1216 8 : continue;
1217 :
1218 340 : if (restart_lsn == InvalidXLogRecPtr)
1219 0 : continue;
1220 :
1221 340 : if (result == InvalidXLogRecPtr ||
1222 : restart_lsn < result)
1223 280 : result = restart_lsn;
1224 : }
1225 :
1226 4948 : LWLockRelease(ReplicationSlotControlLock);
1227 :
1228 4948 : return result;
1229 : }
1230 :
1231 : /*
1232 : * ReplicationSlotsCountDBSlots -- count the number of slots that refer to the
1233 : * passed database oid.
1234 : *
1235 : * Returns true if there are any slots referencing the database. *nslots will
1236 : * be set to the absolute number of slots in the database, *nactive to ones
1237 : * currently active.
1238 : */
1239 : bool
1240 72 : ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
1241 : {
1242 : int i;
1243 :
1244 72 : *nslots = *nactive = 0;
1245 :
1246 72 : if (max_replication_slots <= 0)
1247 0 : return false;
1248 :
1249 72 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1250 734 : for (i = 0; i < max_replication_slots; i++)
1251 : {
1252 : ReplicationSlot *s;
1253 :
1254 662 : s = &ReplicationSlotCtl->replication_slots[i];
1255 :
1256 : /* cannot change while ReplicationSlotCtlLock is held */
1257 662 : if (!s->in_use)
1258 626 : continue;
1259 :
1260 : /* only logical slots are database specific, skip */
1261 36 : if (!SlotIsLogical(s))
1262 20 : continue;
1263 :
1264 : /* not our database, skip */
1265 16 : if (s->data.database != dboid)
1266 10 : continue;
1267 :
1268 : /* NB: intentionally counting invalidated slots */
1269 :
1270 : /* count slots with spinlock held */
1271 6 : SpinLockAcquire(&s->mutex);
1272 6 : (*nslots)++;
1273 6 : if (s->active_pid != 0)
1274 2 : (*nactive)++;
1275 6 : SpinLockRelease(&s->mutex);
1276 : }
1277 72 : LWLockRelease(ReplicationSlotControlLock);
1278 :
1279 72 : if (*nslots > 0)
1280 6 : return true;
1281 66 : return false;
1282 : }
1283 :
1284 : /*
1285 : * ReplicationSlotsDropDBSlots -- Drop all db-specific slots relating to the
1286 : * passed database oid. The caller should hold an exclusive lock on the
1287 : * pg_database oid for the database to prevent creation of new slots on the db
1288 : * or replay from existing slots.
1289 : *
1290 : * Another session that concurrently acquires an existing slot on the target DB
1291 : * (most likely to drop it) may cause this function to ERROR. If that happens
1292 : * it may have dropped some but not all slots.
1293 : *
1294 : * This routine isn't as efficient as it could be - but we don't drop
1295 : * databases often, especially databases with lots of slots.
1296 : */
1297 : void
1298 96 : ReplicationSlotsDropDBSlots(Oid dboid)
1299 : {
1300 : int i;
1301 :
1302 96 : if (max_replication_slots <= 0)
1303 0 : return;
1304 :
1305 96 : restart:
1306 106 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1307 992 : for (i = 0; i < max_replication_slots; i++)
1308 : {
1309 : ReplicationSlot *s;
1310 : char *slotname;
1311 : int active_pid;
1312 :
1313 896 : s = &ReplicationSlotCtl->replication_slots[i];
1314 :
1315 : /* cannot change while ReplicationSlotCtlLock is held */
1316 896 : if (!s->in_use)
1317 842 : continue;
1318 :
1319 : /* only logical slots are database specific, skip */
1320 54 : if (!SlotIsLogical(s))
1321 22 : continue;
1322 :
1323 : /* not our database, skip */
1324 32 : if (s->data.database != dboid)
1325 22 : continue;
1326 :
1327 : /* NB: intentionally including invalidated slots */
1328 :
1329 : /* acquire slot, so ReplicationSlotDropAcquired can be reused */
1330 10 : SpinLockAcquire(&s->mutex);
1331 : /* can't change while ReplicationSlotControlLock is held */
1332 10 : slotname = NameStr(s->data.name);
1333 10 : active_pid = s->active_pid;
1334 10 : if (active_pid == 0)
1335 : {
1336 10 : MyReplicationSlot = s;
1337 10 : s->active_pid = MyProcPid;
1338 : }
1339 10 : SpinLockRelease(&s->mutex);
1340 :
1341 : /*
1342 : * Even though we hold an exclusive lock on the database object a
1343 : * logical slot for that DB can still be active, e.g. if it's
1344 : * concurrently being dropped by a backend connected to another DB.
1345 : *
1346 : * That's fairly unlikely in practice, so we'll just bail out.
1347 : *
1348 : * The slot sync worker holds a shared lock on the database before
1349 : * operating on synced logical slots to avoid conflict with the drop
1350 : * happening here. The persistent synced slots are thus safe but there
1351 : * is a possibility that the slot sync worker has created a temporary
1352 : * slot (which stays active even on release) and we are trying to drop
1353 : * that here. In practice, the chances of hitting this scenario are
1354 : * less as during slot synchronization, the temporary slot is
1355 : * immediately converted to persistent and thus is safe due to the
1356 : * shared lock taken on the database. So, we'll just bail out in such
1357 : * a case.
1358 : *
1359 : * XXX: We can consider shutting down the slot sync worker before
1360 : * trying to drop synced temporary slots here.
1361 : */
1362 10 : if (active_pid)
1363 0 : ereport(ERROR,
1364 : (errcode(ERRCODE_OBJECT_IN_USE),
1365 : errmsg("replication slot \"%s\" is active for PID %d",
1366 : slotname, active_pid)));
1367 :
1368 : /*
1369 : * To avoid duplicating ReplicationSlotDropAcquired() and to avoid
1370 : * holding ReplicationSlotControlLock over filesystem operations,
1371 : * release ReplicationSlotControlLock and use
1372 : * ReplicationSlotDropAcquired.
1373 : *
1374 : * As that means the set of slots could change, restart scan from the
1375 : * beginning each time we release the lock.
1376 : */
1377 10 : LWLockRelease(ReplicationSlotControlLock);
1378 10 : ReplicationSlotDropAcquired();
1379 10 : goto restart;
1380 : }
1381 96 : LWLockRelease(ReplicationSlotControlLock);
1382 : }
1383 :
1384 :
1385 : /*
1386 : * Check whether the server's configuration supports using replication
1387 : * slots.
1388 : */
1389 : void
1390 3082 : CheckSlotRequirements(void)
1391 : {
1392 : /*
1393 : * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
1394 : * needs the same check.
1395 : */
1396 :
1397 3082 : if (max_replication_slots == 0)
1398 0 : ereport(ERROR,
1399 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1400 : errmsg("replication slots can only be used if \"max_replication_slots\" > 0")));
1401 :
1402 3082 : if (wal_level < WAL_LEVEL_REPLICA)
1403 0 : ereport(ERROR,
1404 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1405 : errmsg("replication slots can only be used if \"wal_level\" >= \"replica\"")));
1406 3082 : }
1407 :
1408 : /*
1409 : * Check whether the user has privilege to use replication slots.
1410 : */
1411 : void
1412 1032 : CheckSlotPermissions(void)
1413 : {
1414 1032 : if (!has_rolreplication(GetUserId()))
1415 10 : ereport(ERROR,
1416 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1417 : errmsg("permission denied to use replication slots"),
1418 : errdetail("Only roles with the %s attribute may use replication slots.",
1419 : "REPLICATION")));
1420 1022 : }
1421 :
1422 : /*
1423 : * Reserve WAL for the currently active slot.
1424 : *
1425 : * Compute and set restart_lsn in a manner that's appropriate for the type of
1426 : * the slot and concurrency safe.
1427 : */
1428 : void
1429 1088 : ReplicationSlotReserveWal(void)
1430 : {
1431 1088 : ReplicationSlot *slot = MyReplicationSlot;
1432 :
1433 : Assert(slot != NULL);
1434 : Assert(slot->data.restart_lsn == InvalidXLogRecPtr);
1435 :
1436 : /*
1437 : * The replication slot mechanism is used to prevent removal of required
1438 : * WAL. As there is no interlock between this routine and checkpoints, WAL
1439 : * segments could concurrently be removed when a now stale return value of
1440 : * ReplicationSlotsComputeRequiredLSN() is used. In the unlikely case that
1441 : * this happens we'll just retry.
1442 : */
1443 : while (true)
1444 0 : {
1445 : XLogSegNo segno;
1446 : XLogRecPtr restart_lsn;
1447 :
1448 : /*
1449 : * For logical slots log a standby snapshot and start logical decoding
1450 : * at exactly that position. That allows the slot to start up more
1451 : * quickly. But on a standby we cannot do WAL writes, so just use the
1452 : * replay pointer; effectively, an attempt to create a logical slot on
1453 : * standby will cause it to wait for an xl_running_xact record to be
1454 : * logged independently on the primary, so that a snapshot can be
1455 : * built using the record.
1456 : *
1457 : * None of this is needed (or indeed helpful) for physical slots as
1458 : * they'll start replay at the last logged checkpoint anyway. Instead
1459 : * return the location of the last redo LSN. While that slightly
1460 : * increases the chance that we have to retry, it's where a base
1461 : * backup has to start replay at.
1462 : */
1463 1088 : if (SlotIsPhysical(slot))
1464 286 : restart_lsn = GetRedoRecPtr();
1465 802 : else if (RecoveryInProgress())
1466 44 : restart_lsn = GetXLogReplayRecPtr(NULL);
1467 : else
1468 758 : restart_lsn = GetXLogInsertRecPtr();
1469 :
1470 1088 : SpinLockAcquire(&slot->mutex);
1471 1088 : slot->data.restart_lsn = restart_lsn;
1472 1088 : SpinLockRelease(&slot->mutex);
1473 :
1474 : /* prevent WAL removal as fast as possible */
1475 1088 : ReplicationSlotsComputeRequiredLSN();
1476 :
1477 : /*
1478 : * If all required WAL is still there, great, otherwise retry. The
1479 : * slot should prevent further removal of WAL, unless there's a
1480 : * concurrent ReplicationSlotsComputeRequiredLSN() after we've written
1481 : * the new restart_lsn above, so normally we should never need to loop
1482 : * more than twice.
1483 : */
1484 1088 : XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size);
1485 1088 : if (XLogGetLastRemovedSegno() < segno)
1486 1088 : break;
1487 : }
1488 :
1489 1088 : if (!RecoveryInProgress() && SlotIsLogical(slot))
1490 : {
1491 : XLogRecPtr flushptr;
1492 :
1493 : /* make sure we have enough information to start */
1494 758 : flushptr = LogStandbySnapshot();
1495 :
1496 : /* and make sure it's fsynced to disk */
1497 758 : XLogFlush(flushptr);
1498 : }
1499 1088 : }
1500 :
1501 : /*
1502 : * Report that replication slot needs to be invalidated
1503 : */
1504 : static void
1505 42 : ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
1506 : bool terminating,
1507 : int pid,
1508 : NameData slotname,
1509 : XLogRecPtr restart_lsn,
1510 : XLogRecPtr oldestLSN,
1511 : TransactionId snapshotConflictHorizon)
1512 : {
1513 : StringInfoData err_detail;
1514 42 : bool hint = false;
1515 :
1516 42 : initStringInfo(&err_detail);
1517 :
1518 42 : switch (cause)
1519 : {
1520 12 : case RS_INVAL_WAL_REMOVED:
1521 : {
1522 12 : unsigned long long ex = oldestLSN - restart_lsn;
1523 :
1524 12 : hint = true;
1525 12 : appendStringInfo(&err_detail,
1526 12 : ngettext("The slot's restart_lsn %X/%X exceeds the limit by %llu byte.",
1527 : "The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.",
1528 : ex),
1529 12 : LSN_FORMAT_ARGS(restart_lsn),
1530 : ex);
1531 12 : break;
1532 : }
1533 24 : case RS_INVAL_HORIZON:
1534 24 : appendStringInfo(&err_detail, _("The slot conflicted with xid horizon %u."),
1535 : snapshotConflictHorizon);
1536 24 : break;
1537 :
1538 6 : case RS_INVAL_WAL_LEVEL:
1539 6 : appendStringInfoString(&err_detail, _("Logical decoding on standby requires \"wal_level\" >= \"logical\" on the primary server."));
1540 6 : break;
1541 : case RS_INVAL_NONE:
1542 : pg_unreachable();
1543 : }
1544 :
1545 42 : ereport(LOG,
1546 : terminating ?
1547 : errmsg("terminating process %d to release replication slot \"%s\"",
1548 : pid, NameStr(slotname)) :
1549 : errmsg("invalidating obsolete replication slot \"%s\"",
1550 : NameStr(slotname)),
1551 : errdetail_internal("%s", err_detail.data),
1552 : hint ? errhint("You might need to increase \"%s\".", "max_slot_wal_keep_size") : 0);
1553 :
1554 42 : pfree(err_detail.data);
1555 42 : }
1556 :
1557 : /*
1558 : * Helper for InvalidateObsoleteReplicationSlots
1559 : *
1560 : * Acquires the given slot and mark it invalid, if necessary and possible.
1561 : *
1562 : * Returns whether ReplicationSlotControlLock was released in the interim (and
1563 : * in that case we're not holding the lock at return, otherwise we are).
1564 : *
1565 : * Sets *invalidated true if the slot was invalidated. (Untouched otherwise.)
1566 : *
1567 : * This is inherently racy, because we release the LWLock
1568 : * for syscalls, so caller must restart if we return true.
1569 : */
1570 : static bool
1571 750 : InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
1572 : ReplicationSlot *s,
1573 : XLogRecPtr oldestLSN,
1574 : Oid dboid, TransactionId snapshotConflictHorizon,
1575 : bool *invalidated)
1576 : {
1577 750 : int last_signaled_pid = 0;
1578 750 : bool released_lock = false;
1579 750 : bool terminated = false;
1580 750 : TransactionId initial_effective_xmin = InvalidTransactionId;
1581 750 : TransactionId initial_catalog_effective_xmin = InvalidTransactionId;
1582 750 : XLogRecPtr initial_restart_lsn = InvalidXLogRecPtr;
1583 750 : ReplicationSlotInvalidationCause invalidation_cause_prev PG_USED_FOR_ASSERTS_ONLY = RS_INVAL_NONE;
1584 :
1585 : for (;;)
1586 14 : {
1587 : XLogRecPtr restart_lsn;
1588 : NameData slotname;
1589 764 : int active_pid = 0;
1590 764 : ReplicationSlotInvalidationCause invalidation_cause = RS_INVAL_NONE;
1591 :
1592 : Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
1593 :
1594 764 : if (!s->in_use)
1595 : {
1596 0 : if (released_lock)
1597 0 : LWLockRelease(ReplicationSlotControlLock);
1598 0 : break;
1599 : }
1600 :
1601 : /*
1602 : * Check if the slot needs to be invalidated. If it needs to be
1603 : * invalidated, and is not currently acquired, acquire it and mark it
1604 : * as having been invalidated. We do this with the spinlock held to
1605 : * avoid race conditions -- for example the restart_lsn could move
1606 : * forward, or the slot could be dropped.
1607 : */
1608 764 : SpinLockAcquire(&s->mutex);
1609 :
1610 764 : restart_lsn = s->data.restart_lsn;
1611 :
1612 : /* we do nothing if the slot is already invalid */
1613 764 : if (s->data.invalidated == RS_INVAL_NONE)
1614 : {
1615 : /*
1616 : * The slot's mutex will be released soon, and it is possible that
1617 : * those values change since the process holding the slot has been
1618 : * terminated (if any), so record them here to ensure that we
1619 : * would report the correct invalidation cause.
1620 : */
1621 686 : if (!terminated)
1622 : {
1623 672 : initial_restart_lsn = s->data.restart_lsn;
1624 672 : initial_effective_xmin = s->effective_xmin;
1625 672 : initial_catalog_effective_xmin = s->effective_catalog_xmin;
1626 : }
1627 :
1628 686 : switch (cause)
1629 : {
1630 636 : case RS_INVAL_WAL_REMOVED:
1631 636 : if (initial_restart_lsn != InvalidXLogRecPtr &&
1632 : initial_restart_lsn < oldestLSN)
1633 12 : invalidation_cause = cause;
1634 636 : break;
1635 44 : case RS_INVAL_HORIZON:
1636 44 : if (!SlotIsLogical(s))
1637 0 : break;
1638 : /* invalid DB oid signals a shared relation */
1639 44 : if (dboid != InvalidOid && dboid != s->data.database)
1640 0 : break;
1641 44 : if (TransactionIdIsValid(initial_effective_xmin) &&
1642 0 : TransactionIdPrecedesOrEquals(initial_effective_xmin,
1643 : snapshotConflictHorizon))
1644 0 : invalidation_cause = cause;
1645 88 : else if (TransactionIdIsValid(initial_catalog_effective_xmin) &&
1646 44 : TransactionIdPrecedesOrEquals(initial_catalog_effective_xmin,
1647 : snapshotConflictHorizon))
1648 24 : invalidation_cause = cause;
1649 44 : break;
1650 6 : case RS_INVAL_WAL_LEVEL:
1651 6 : if (SlotIsLogical(s))
1652 6 : invalidation_cause = cause;
1653 6 : break;
1654 : case RS_INVAL_NONE:
1655 : pg_unreachable();
1656 : }
1657 78 : }
1658 :
1659 : /*
1660 : * The invalidation cause recorded previously should not change while
1661 : * the process owning the slot (if any) has been terminated.
1662 : */
1663 : Assert(!(invalidation_cause_prev != RS_INVAL_NONE && terminated &&
1664 : invalidation_cause_prev != invalidation_cause));
1665 :
1666 : /* if there's no invalidation, we're done */
1667 764 : if (invalidation_cause == RS_INVAL_NONE)
1668 : {
1669 722 : SpinLockRelease(&s->mutex);
1670 722 : if (released_lock)
1671 0 : LWLockRelease(ReplicationSlotControlLock);
1672 722 : break;
1673 : }
1674 :
1675 42 : slotname = s->data.name;
1676 42 : active_pid = s->active_pid;
1677 :
1678 : /*
1679 : * If the slot can be acquired, do so and mark it invalidated
1680 : * immediately. Otherwise we'll signal the owning process, below, and
1681 : * retry.
1682 : */
1683 42 : if (active_pid == 0)
1684 : {
1685 28 : MyReplicationSlot = s;
1686 28 : s->active_pid = MyProcPid;
1687 28 : s->data.invalidated = invalidation_cause;
1688 :
1689 : /*
1690 : * XXX: We should consider not overwriting restart_lsn and instead
1691 : * just rely on .invalidated.
1692 : */
1693 28 : if (invalidation_cause == RS_INVAL_WAL_REMOVED)
1694 8 : s->data.restart_lsn = InvalidXLogRecPtr;
1695 :
1696 : /* Let caller know */
1697 28 : *invalidated = true;
1698 : }
1699 :
1700 42 : SpinLockRelease(&s->mutex);
1701 :
1702 : /*
1703 : * The logical replication slots shouldn't be invalidated as GUC
1704 : * max_slot_wal_keep_size is set to -1 during the binary upgrade. See
1705 : * check_old_cluster_for_valid_slots() where we ensure that no
1706 : * invalidated before the upgrade.
1707 : */
1708 : Assert(!(*invalidated && SlotIsLogical(s) && IsBinaryUpgrade));
1709 :
1710 42 : if (active_pid != 0)
1711 : {
1712 : /*
1713 : * Prepare the sleep on the slot's condition variable before
1714 : * releasing the lock, to close a possible race condition if the
1715 : * slot is released before the sleep below.
1716 : */
1717 14 : ConditionVariablePrepareToSleep(&s->active_cv);
1718 :
1719 14 : LWLockRelease(ReplicationSlotControlLock);
1720 14 : released_lock = true;
1721 :
1722 : /*
1723 : * Signal to terminate the process that owns the slot, if we
1724 : * haven't already signalled it. (Avoidance of repeated
1725 : * signalling is the only reason for there to be a loop in this
1726 : * routine; otherwise we could rely on caller's restart loop.)
1727 : *
1728 : * There is the race condition that other process may own the slot
1729 : * after its current owner process is terminated and before this
1730 : * process owns it. To handle that, we signal only if the PID of
1731 : * the owning process has changed from the previous time. (This
1732 : * logic assumes that the same PID is not reused very quickly.)
1733 : */
1734 14 : if (last_signaled_pid != active_pid)
1735 : {
1736 14 : ReportSlotInvalidation(invalidation_cause, true, active_pid,
1737 : slotname, restart_lsn,
1738 : oldestLSN, snapshotConflictHorizon);
1739 :
1740 14 : if (MyBackendType == B_STARTUP)
1741 10 : (void) SendProcSignal(active_pid,
1742 : PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT,
1743 : INVALID_PROC_NUMBER);
1744 : else
1745 4 : (void) kill(active_pid, SIGTERM);
1746 :
1747 14 : last_signaled_pid = active_pid;
1748 14 : terminated = true;
1749 14 : invalidation_cause_prev = invalidation_cause;
1750 : }
1751 :
1752 : /* Wait until the slot is released. */
1753 14 : ConditionVariableSleep(&s->active_cv,
1754 : WAIT_EVENT_REPLICATION_SLOT_DROP);
1755 :
1756 : /*
1757 : * Re-acquire lock and start over; we expect to invalidate the
1758 : * slot next time (unless another process acquires the slot in the
1759 : * meantime).
1760 : */
1761 14 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1762 14 : continue;
1763 : }
1764 : else
1765 : {
1766 : /*
1767 : * We hold the slot now and have already invalidated it; flush it
1768 : * to ensure that state persists.
1769 : *
1770 : * Don't want to hold ReplicationSlotControlLock across file
1771 : * system operations, so release it now but be sure to tell caller
1772 : * to restart from scratch.
1773 : */
1774 28 : LWLockRelease(ReplicationSlotControlLock);
1775 28 : released_lock = true;
1776 :
1777 : /* Make sure the invalidated state persists across server restart */
1778 28 : ReplicationSlotMarkDirty();
1779 28 : ReplicationSlotSave();
1780 28 : ReplicationSlotRelease();
1781 :
1782 28 : ReportSlotInvalidation(invalidation_cause, false, active_pid,
1783 : slotname, restart_lsn,
1784 : oldestLSN, snapshotConflictHorizon);
1785 :
1786 : /* done with this slot for now */
1787 28 : break;
1788 : }
1789 : }
1790 :
1791 : Assert(released_lock == !LWLockHeldByMe(ReplicationSlotControlLock));
1792 :
1793 750 : return released_lock;
1794 : }
1795 :
1796 : /*
1797 : * Invalidate slots that require resources about to be removed.
1798 : *
1799 : * Returns true when any slot have got invalidated.
1800 : *
1801 : * Whether a slot needs to be invalidated depends on the cause. A slot is
1802 : * removed if it:
1803 : * - RS_INVAL_WAL_REMOVED: requires a LSN older than the given segment
1804 : * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given
1805 : * db; dboid may be InvalidOid for shared relations
1806 : * - RS_INVAL_WAL_LEVEL: is logical
1807 : *
1808 : * NB - this runs as part of checkpoint, so avoid raising errors if possible.
1809 : */
1810 : bool
1811 2516 : InvalidateObsoleteReplicationSlots(ReplicationSlotInvalidationCause cause,
1812 : XLogSegNo oldestSegno, Oid dboid,
1813 : TransactionId snapshotConflictHorizon)
1814 : {
1815 : XLogRecPtr oldestLSN;
1816 2516 : bool invalidated = false;
1817 :
1818 : Assert(cause != RS_INVAL_HORIZON || TransactionIdIsValid(snapshotConflictHorizon));
1819 : Assert(cause != RS_INVAL_WAL_REMOVED || oldestSegno > 0);
1820 : Assert(cause != RS_INVAL_NONE);
1821 :
1822 2516 : if (max_replication_slots == 0)
1823 2 : return invalidated;
1824 :
1825 2514 : XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
1826 :
1827 2542 : restart:
1828 2542 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1829 26986 : for (int i = 0; i < max_replication_slots; i++)
1830 : {
1831 24472 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
1832 :
1833 24472 : if (!s->in_use)
1834 23722 : continue;
1835 :
1836 750 : if (InvalidatePossiblyObsoleteSlot(cause, s, oldestLSN, dboid,
1837 : snapshotConflictHorizon,
1838 : &invalidated))
1839 : {
1840 : /* if the lock was released, start from scratch */
1841 28 : goto restart;
1842 : }
1843 : }
1844 2514 : LWLockRelease(ReplicationSlotControlLock);
1845 :
1846 : /*
1847 : * If any slots have been invalidated, recalculate the resource limits.
1848 : */
1849 2514 : if (invalidated)
1850 : {
1851 18 : ReplicationSlotsComputeRequiredXmin(false);
1852 18 : ReplicationSlotsComputeRequiredLSN();
1853 : }
1854 :
1855 2514 : return invalidated;
1856 : }
1857 :
1858 : /*
1859 : * Flush all replication slots to disk.
1860 : *
1861 : * It is convenient to flush dirty replication slots at the time of checkpoint.
1862 : * Additionally, in case of a shutdown checkpoint, we also identify the slots
1863 : * for which the confirmed_flush LSN has been updated since the last time it
1864 : * was saved and flush them.
1865 : */
1866 : void
1867 2476 : CheckPointReplicationSlots(bool is_shutdown)
1868 : {
1869 : int i;
1870 :
1871 2476 : elog(DEBUG1, "performing replication slot checkpoint");
1872 :
1873 : /*
1874 : * Prevent any slot from being created/dropped while we're active. As we
1875 : * explicitly do *not* want to block iterating over replication_slots or
1876 : * acquiring a slot we cannot take the control lock - but that's OK,
1877 : * because holding ReplicationSlotAllocationLock is strictly stronger, and
1878 : * enough to guarantee that nobody can change the in_use bits on us.
1879 : */
1880 2476 : LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
1881 :
1882 26698 : for (i = 0; i < max_replication_slots; i++)
1883 : {
1884 24222 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
1885 : char path[MAXPGPATH];
1886 :
1887 24222 : if (!s->in_use)
1888 23574 : continue;
1889 :
1890 : /* save the slot to disk, locking is handled in SaveSlotToPath() */
1891 648 : sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(s->data.name));
1892 :
1893 : /*
1894 : * Slot's data is not flushed each time the confirmed_flush LSN is
1895 : * updated as that could lead to frequent writes. However, we decide
1896 : * to force a flush of all logical slot's data at the time of shutdown
1897 : * if the confirmed_flush LSN is changed since we last flushed it to
1898 : * disk. This helps in avoiding an unnecessary retreat of the
1899 : * confirmed_flush LSN after restart.
1900 : */
1901 648 : if (is_shutdown && SlotIsLogical(s))
1902 : {
1903 122 : SpinLockAcquire(&s->mutex);
1904 :
1905 122 : if (s->data.invalidated == RS_INVAL_NONE &&
1906 122 : s->data.confirmed_flush > s->last_saved_confirmed_flush)
1907 : {
1908 70 : s->just_dirtied = true;
1909 70 : s->dirty = true;
1910 : }
1911 122 : SpinLockRelease(&s->mutex);
1912 : }
1913 :
1914 648 : SaveSlotToPath(s, path, LOG);
1915 : }
1916 2476 : LWLockRelease(ReplicationSlotAllocationLock);
1917 2476 : }
1918 :
1919 : /*
1920 : * Load all replication slots from disk into memory at server startup. This
1921 : * needs to be run before we start crash recovery.
1922 : */
1923 : void
1924 1634 : StartupReplicationSlots(void)
1925 : {
1926 : DIR *replication_dir;
1927 : struct dirent *replication_de;
1928 :
1929 1634 : elog(DEBUG1, "starting up replication slots");
1930 :
1931 : /* restore all slots by iterating over all on-disk entries */
1932 1634 : replication_dir = AllocateDir(PG_REPLSLOT_DIR);
1933 5038 : while ((replication_de = ReadDir(replication_dir, PG_REPLSLOT_DIR)) != NULL)
1934 : {
1935 : char path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
1936 : PGFileType de_type;
1937 :
1938 3404 : if (strcmp(replication_de->d_name, ".") == 0 ||
1939 1770 : strcmp(replication_de->d_name, "..") == 0)
1940 3268 : continue;
1941 :
1942 136 : snprintf(path, sizeof(path), "%s/%s", PG_REPLSLOT_DIR, replication_de->d_name);
1943 136 : de_type = get_dirent_type(path, replication_de, false, DEBUG1);
1944 :
1945 : /* we're only creating directories here, skip if it's not our's */
1946 136 : if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_DIR)
1947 0 : continue;
1948 :
1949 : /* we crashed while a slot was being setup or deleted, clean up */
1950 136 : if (pg_str_endswith(replication_de->d_name, ".tmp"))
1951 : {
1952 0 : if (!rmtree(path, true))
1953 : {
1954 0 : ereport(WARNING,
1955 : (errmsg("could not remove directory \"%s\"",
1956 : path)));
1957 0 : continue;
1958 : }
1959 0 : fsync_fname(PG_REPLSLOT_DIR, true);
1960 0 : continue;
1961 : }
1962 :
1963 : /* looks like a slot in a normal state, restore */
1964 136 : RestoreSlotFromDisk(replication_de->d_name);
1965 : }
1966 1634 : FreeDir(replication_dir);
1967 :
1968 : /* currently no slots exist, we're done. */
1969 1634 : if (max_replication_slots <= 0)
1970 2 : return;
1971 :
1972 : /* Now that we have recovered all the data, compute replication xmin */
1973 1632 : ReplicationSlotsComputeRequiredXmin(false);
1974 1632 : ReplicationSlotsComputeRequiredLSN();
1975 : }
1976 :
1977 : /* ----
1978 : * Manipulation of on-disk state of replication slots
1979 : *
1980 : * NB: none of the routines below should take any notice whether a slot is the
1981 : * current one or not, that's all handled a layer above.
1982 : * ----
1983 : */
1984 : static void
1985 1164 : CreateSlotOnDisk(ReplicationSlot *slot)
1986 : {
1987 : char tmppath[MAXPGPATH];
1988 : char path[MAXPGPATH];
1989 : struct stat st;
1990 :
1991 : /*
1992 : * No need to take out the io_in_progress_lock, nobody else can see this
1993 : * slot yet, so nobody else will write. We're reusing SaveSlotToPath which
1994 : * takes out the lock, if we'd take the lock here, we'd deadlock.
1995 : */
1996 :
1997 1164 : sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(slot->data.name));
1998 1164 : sprintf(tmppath, "%s/%s.tmp", PG_REPLSLOT_DIR, NameStr(slot->data.name));
1999 :
2000 : /*
2001 : * It's just barely possible that some previous effort to create or drop a
2002 : * slot with this name left a temp directory lying around. If that seems
2003 : * to be the case, try to remove it. If the rmtree() fails, we'll error
2004 : * out at the MakePGDirectory() below, so we don't bother checking
2005 : * success.
2006 : */
2007 1164 : if (stat(tmppath, &st) == 0 && S_ISDIR(st.st_mode))
2008 0 : rmtree(tmppath, true);
2009 :
2010 : /* Create and fsync the temporary slot directory. */
2011 1164 : if (MakePGDirectory(tmppath) < 0)
2012 0 : ereport(ERROR,
2013 : (errcode_for_file_access(),
2014 : errmsg("could not create directory \"%s\": %m",
2015 : tmppath)));
2016 1164 : fsync_fname(tmppath, true);
2017 :
2018 : /* Write the actual state file. */
2019 1164 : slot->dirty = true; /* signal that we really need to write */
2020 1164 : SaveSlotToPath(slot, tmppath, ERROR);
2021 :
2022 : /* Rename the directory into place. */
2023 1164 : if (rename(tmppath, path) != 0)
2024 0 : ereport(ERROR,
2025 : (errcode_for_file_access(),
2026 : errmsg("could not rename file \"%s\" to \"%s\": %m",
2027 : tmppath, path)));
2028 :
2029 : /*
2030 : * If we'd now fail - really unlikely - we wouldn't know whether this slot
2031 : * would persist after an OS crash or not - so, force a restart. The
2032 : * restart would try to fsync this again till it works.
2033 : */
2034 1164 : START_CRIT_SECTION();
2035 :
2036 1164 : fsync_fname(path, true);
2037 1164 : fsync_fname(PG_REPLSLOT_DIR, true);
2038 :
2039 1164 : END_CRIT_SECTION();
2040 1164 : }
2041 :
2042 : /*
2043 : * Shared functionality between saving and creating a replication slot.
2044 : */
2045 : static void
2046 4122 : SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
2047 : {
2048 : char tmppath[MAXPGPATH];
2049 : char path[MAXPGPATH];
2050 : int fd;
2051 : ReplicationSlotOnDisk cp;
2052 : bool was_dirty;
2053 :
2054 : /* first check whether there's something to write out */
2055 4122 : SpinLockAcquire(&slot->mutex);
2056 4122 : was_dirty = slot->dirty;
2057 4122 : slot->just_dirtied = false;
2058 4122 : SpinLockRelease(&slot->mutex);
2059 :
2060 : /* and don't do anything if there's nothing to write */
2061 4122 : if (!was_dirty)
2062 188 : return;
2063 :
2064 3934 : LWLockAcquire(&slot->io_in_progress_lock, LW_EXCLUSIVE);
2065 :
2066 : /* silence valgrind :( */
2067 3934 : memset(&cp, 0, sizeof(ReplicationSlotOnDisk));
2068 :
2069 3934 : sprintf(tmppath, "%s/state.tmp", dir);
2070 3934 : sprintf(path, "%s/state", dir);
2071 :
2072 3934 : fd = OpenTransientFile(tmppath, O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
2073 3934 : if (fd < 0)
2074 : {
2075 : /*
2076 : * If not an ERROR, then release the lock before returning. In case
2077 : * of an ERROR, the error recovery path automatically releases the
2078 : * lock, but no harm in explicitly releasing even in that case. Note
2079 : * that LWLockRelease() could affect errno.
2080 : */
2081 0 : int save_errno = errno;
2082 :
2083 0 : LWLockRelease(&slot->io_in_progress_lock);
2084 0 : errno = save_errno;
2085 0 : ereport(elevel,
2086 : (errcode_for_file_access(),
2087 : errmsg("could not create file \"%s\": %m",
2088 : tmppath)));
2089 0 : return;
2090 : }
2091 :
2092 3934 : cp.magic = SLOT_MAGIC;
2093 3934 : INIT_CRC32C(cp.checksum);
2094 3934 : cp.version = SLOT_VERSION;
2095 3934 : cp.length = ReplicationSlotOnDiskV2Size;
2096 :
2097 3934 : SpinLockAcquire(&slot->mutex);
2098 :
2099 3934 : memcpy(&cp.slotdata, &slot->data, sizeof(ReplicationSlotPersistentData));
2100 :
2101 3934 : SpinLockRelease(&slot->mutex);
2102 :
2103 3934 : COMP_CRC32C(cp.checksum,
2104 : (char *) (&cp) + ReplicationSlotOnDiskNotChecksummedSize,
2105 : ReplicationSlotOnDiskChecksummedSize);
2106 3934 : FIN_CRC32C(cp.checksum);
2107 :
2108 3934 : errno = 0;
2109 3934 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_WRITE);
2110 3934 : if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
2111 : {
2112 0 : int save_errno = errno;
2113 :
2114 0 : pgstat_report_wait_end();
2115 0 : CloseTransientFile(fd);
2116 0 : LWLockRelease(&slot->io_in_progress_lock);
2117 :
2118 : /* if write didn't set errno, assume problem is no disk space */
2119 0 : errno = save_errno ? save_errno : ENOSPC;
2120 0 : ereport(elevel,
2121 : (errcode_for_file_access(),
2122 : errmsg("could not write to file \"%s\": %m",
2123 : tmppath)));
2124 0 : return;
2125 : }
2126 3934 : pgstat_report_wait_end();
2127 :
2128 : /* fsync the temporary file */
2129 3934 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_SYNC);
2130 3934 : if (pg_fsync(fd) != 0)
2131 : {
2132 0 : int save_errno = errno;
2133 :
2134 0 : pgstat_report_wait_end();
2135 0 : CloseTransientFile(fd);
2136 0 : LWLockRelease(&slot->io_in_progress_lock);
2137 0 : errno = save_errno;
2138 0 : ereport(elevel,
2139 : (errcode_for_file_access(),
2140 : errmsg("could not fsync file \"%s\": %m",
2141 : tmppath)));
2142 0 : return;
2143 : }
2144 3934 : pgstat_report_wait_end();
2145 :
2146 3934 : if (CloseTransientFile(fd) != 0)
2147 : {
2148 0 : int save_errno = errno;
2149 :
2150 0 : LWLockRelease(&slot->io_in_progress_lock);
2151 0 : errno = save_errno;
2152 0 : ereport(elevel,
2153 : (errcode_for_file_access(),
2154 : errmsg("could not close file \"%s\": %m",
2155 : tmppath)));
2156 0 : return;
2157 : }
2158 :
2159 : /* rename to permanent file, fsync file and directory */
2160 3934 : if (rename(tmppath, path) != 0)
2161 : {
2162 0 : int save_errno = errno;
2163 :
2164 0 : LWLockRelease(&slot->io_in_progress_lock);
2165 0 : errno = save_errno;
2166 0 : ereport(elevel,
2167 : (errcode_for_file_access(),
2168 : errmsg("could not rename file \"%s\" to \"%s\": %m",
2169 : tmppath, path)));
2170 0 : return;
2171 : }
2172 :
2173 : /*
2174 : * Check CreateSlotOnDisk() for the reasoning of using a critical section.
2175 : */
2176 3934 : START_CRIT_SECTION();
2177 :
2178 3934 : fsync_fname(path, false);
2179 3934 : fsync_fname(dir, true);
2180 3934 : fsync_fname(PG_REPLSLOT_DIR, true);
2181 :
2182 3934 : END_CRIT_SECTION();
2183 :
2184 : /*
2185 : * Successfully wrote, unset dirty bit, unless somebody dirtied again
2186 : * already and remember the confirmed_flush LSN value.
2187 : */
2188 3934 : SpinLockAcquire(&slot->mutex);
2189 3934 : if (!slot->just_dirtied)
2190 3926 : slot->dirty = false;
2191 3934 : slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
2192 3934 : SpinLockRelease(&slot->mutex);
2193 :
2194 3934 : LWLockRelease(&slot->io_in_progress_lock);
2195 : }
2196 :
2197 : /*
2198 : * Load a single slot from disk into memory.
2199 : */
2200 : static void
2201 136 : RestoreSlotFromDisk(const char *name)
2202 : {
2203 : ReplicationSlotOnDisk cp;
2204 : int i;
2205 : char slotdir[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
2206 : char path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR) + 10];
2207 : int fd;
2208 136 : bool restored = false;
2209 : int readBytes;
2210 : pg_crc32c checksum;
2211 :
2212 : /* no need to lock here, no concurrent access allowed yet */
2213 :
2214 : /* delete temp file if it exists */
2215 136 : sprintf(slotdir, "%s/%s", PG_REPLSLOT_DIR, name);
2216 136 : sprintf(path, "%s/state.tmp", slotdir);
2217 136 : if (unlink(path) < 0 && errno != ENOENT)
2218 0 : ereport(PANIC,
2219 : (errcode_for_file_access(),
2220 : errmsg("could not remove file \"%s\": %m", path)));
2221 :
2222 136 : sprintf(path, "%s/state", slotdir);
2223 :
2224 136 : elog(DEBUG1, "restoring replication slot from \"%s\"", path);
2225 :
2226 : /* on some operating systems fsyncing a file requires O_RDWR */
2227 136 : fd = OpenTransientFile(path, O_RDWR | PG_BINARY);
2228 :
2229 : /*
2230 : * We do not need to handle this as we are rename()ing the directory into
2231 : * place only after we fsync()ed the state file.
2232 : */
2233 136 : if (fd < 0)
2234 0 : ereport(PANIC,
2235 : (errcode_for_file_access(),
2236 : errmsg("could not open file \"%s\": %m", path)));
2237 :
2238 : /*
2239 : * Sync state file before we're reading from it. We might have crashed
2240 : * while it wasn't synced yet and we shouldn't continue on that basis.
2241 : */
2242 136 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_RESTORE_SYNC);
2243 136 : if (pg_fsync(fd) != 0)
2244 0 : ereport(PANIC,
2245 : (errcode_for_file_access(),
2246 : errmsg("could not fsync file \"%s\": %m",
2247 : path)));
2248 136 : pgstat_report_wait_end();
2249 :
2250 : /* Also sync the parent directory */
2251 136 : START_CRIT_SECTION();
2252 136 : fsync_fname(slotdir, true);
2253 136 : END_CRIT_SECTION();
2254 :
2255 : /* read part of statefile that's guaranteed to be version independent */
2256 136 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
2257 136 : readBytes = read(fd, &cp, ReplicationSlotOnDiskConstantSize);
2258 136 : pgstat_report_wait_end();
2259 136 : if (readBytes != ReplicationSlotOnDiskConstantSize)
2260 : {
2261 0 : if (readBytes < 0)
2262 0 : ereport(PANIC,
2263 : (errcode_for_file_access(),
2264 : errmsg("could not read file \"%s\": %m", path)));
2265 : else
2266 0 : ereport(PANIC,
2267 : (errcode(ERRCODE_DATA_CORRUPTED),
2268 : errmsg("could not read file \"%s\": read %d of %zu",
2269 : path, readBytes,
2270 : (Size) ReplicationSlotOnDiskConstantSize)));
2271 : }
2272 :
2273 : /* verify magic */
2274 136 : if (cp.magic != SLOT_MAGIC)
2275 0 : ereport(PANIC,
2276 : (errcode(ERRCODE_DATA_CORRUPTED),
2277 : errmsg("replication slot file \"%s\" has wrong magic number: %u instead of %u",
2278 : path, cp.magic, SLOT_MAGIC)));
2279 :
2280 : /* verify version */
2281 136 : if (cp.version != SLOT_VERSION)
2282 0 : ereport(PANIC,
2283 : (errcode(ERRCODE_DATA_CORRUPTED),
2284 : errmsg("replication slot file \"%s\" has unsupported version %u",
2285 : path, cp.version)));
2286 :
2287 : /* boundary check on length */
2288 136 : if (cp.length != ReplicationSlotOnDiskV2Size)
2289 0 : ereport(PANIC,
2290 : (errcode(ERRCODE_DATA_CORRUPTED),
2291 : errmsg("replication slot file \"%s\" has corrupted length %u",
2292 : path, cp.length)));
2293 :
2294 : /* Now that we know the size, read the entire file */
2295 136 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
2296 272 : readBytes = read(fd,
2297 : (char *) &cp + ReplicationSlotOnDiskConstantSize,
2298 136 : cp.length);
2299 136 : pgstat_report_wait_end();
2300 136 : if (readBytes != cp.length)
2301 : {
2302 0 : if (readBytes < 0)
2303 0 : ereport(PANIC,
2304 : (errcode_for_file_access(),
2305 : errmsg("could not read file \"%s\": %m", path)));
2306 : else
2307 0 : ereport(PANIC,
2308 : (errcode(ERRCODE_DATA_CORRUPTED),
2309 : errmsg("could not read file \"%s\": read %d of %zu",
2310 : path, readBytes, (Size) cp.length)));
2311 : }
2312 :
2313 136 : if (CloseTransientFile(fd) != 0)
2314 0 : ereport(PANIC,
2315 : (errcode_for_file_access(),
2316 : errmsg("could not close file \"%s\": %m", path)));
2317 :
2318 : /* now verify the CRC */
2319 136 : INIT_CRC32C(checksum);
2320 136 : COMP_CRC32C(checksum,
2321 : (char *) &cp + ReplicationSlotOnDiskNotChecksummedSize,
2322 : ReplicationSlotOnDiskChecksummedSize);
2323 136 : FIN_CRC32C(checksum);
2324 :
2325 136 : if (!EQ_CRC32C(checksum, cp.checksum))
2326 0 : ereport(PANIC,
2327 : (errmsg("checksum mismatch for replication slot file \"%s\": is %u, should be %u",
2328 : path, checksum, cp.checksum)));
2329 :
2330 : /*
2331 : * If we crashed with an ephemeral slot active, don't restore but delete
2332 : * it.
2333 : */
2334 136 : if (cp.slotdata.persistency != RS_PERSISTENT)
2335 : {
2336 0 : if (!rmtree(slotdir, true))
2337 : {
2338 0 : ereport(WARNING,
2339 : (errmsg("could not remove directory \"%s\"",
2340 : slotdir)));
2341 : }
2342 0 : fsync_fname(PG_REPLSLOT_DIR, true);
2343 0 : return;
2344 : }
2345 :
2346 : /*
2347 : * Verify that requirements for the specific slot type are met. That's
2348 : * important because if these aren't met we're not guaranteed to retain
2349 : * all the necessary resources for the slot.
2350 : *
2351 : * NB: We have to do so *after* the above checks for ephemeral slots,
2352 : * because otherwise a slot that shouldn't exist anymore could prevent
2353 : * restarts.
2354 : *
2355 : * NB: Changing the requirements here also requires adapting
2356 : * CheckSlotRequirements() and CheckLogicalDecodingRequirements().
2357 : */
2358 136 : if (cp.slotdata.database != InvalidOid && wal_level < WAL_LEVEL_LOGICAL)
2359 0 : ereport(FATAL,
2360 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2361 : errmsg("logical replication slot \"%s\" exists, but \"wal_level\" < \"logical\"",
2362 : NameStr(cp.slotdata.name)),
2363 : errhint("Change \"wal_level\" to be \"logical\" or higher.")));
2364 136 : else if (wal_level < WAL_LEVEL_REPLICA)
2365 0 : ereport(FATAL,
2366 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2367 : errmsg("physical replication slot \"%s\" exists, but \"wal_level\" < \"replica\"",
2368 : NameStr(cp.slotdata.name)),
2369 : errhint("Change \"wal_level\" to be \"replica\" or higher.")));
2370 :
2371 : /* nothing can be active yet, don't lock anything */
2372 188 : for (i = 0; i < max_replication_slots; i++)
2373 : {
2374 : ReplicationSlot *slot;
2375 :
2376 188 : slot = &ReplicationSlotCtl->replication_slots[i];
2377 :
2378 188 : if (slot->in_use)
2379 52 : continue;
2380 :
2381 : /* restore the entire set of persistent data */
2382 136 : memcpy(&slot->data, &cp.slotdata,
2383 : sizeof(ReplicationSlotPersistentData));
2384 :
2385 : /* initialize in memory state */
2386 136 : slot->effective_xmin = cp.slotdata.xmin;
2387 136 : slot->effective_catalog_xmin = cp.slotdata.catalog_xmin;
2388 136 : slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
2389 :
2390 136 : slot->candidate_catalog_xmin = InvalidTransactionId;
2391 136 : slot->candidate_xmin_lsn = InvalidXLogRecPtr;
2392 136 : slot->candidate_restart_lsn = InvalidXLogRecPtr;
2393 136 : slot->candidate_restart_valid = InvalidXLogRecPtr;
2394 :
2395 136 : slot->in_use = true;
2396 136 : slot->active_pid = 0;
2397 :
2398 : /*
2399 : * Set the time since the slot has become inactive after loading the
2400 : * slot from the disk into memory. Whoever acquires the slot i.e.
2401 : * makes the slot active will reset it.
2402 : */
2403 136 : slot->inactive_since = GetCurrentTimestamp();
2404 :
2405 136 : restored = true;
2406 136 : break;
2407 : }
2408 :
2409 136 : if (!restored)
2410 0 : ereport(FATAL,
2411 : (errmsg("too many replication slots active before shutdown"),
2412 : errhint("Increase \"max_replication_slots\" and try again.")));
2413 : }
2414 :
2415 : /*
2416 : * Maps an invalidation reason for a replication slot to
2417 : * ReplicationSlotInvalidationCause.
2418 : */
2419 : ReplicationSlotInvalidationCause
2420 0 : GetSlotInvalidationCause(const char *invalidation_reason)
2421 : {
2422 : ReplicationSlotInvalidationCause cause;
2423 0 : ReplicationSlotInvalidationCause result = RS_INVAL_NONE;
2424 0 : bool found PG_USED_FOR_ASSERTS_ONLY = false;
2425 :
2426 : Assert(invalidation_reason);
2427 :
2428 0 : for (cause = RS_INVAL_NONE; cause <= RS_INVAL_MAX_CAUSES; cause++)
2429 : {
2430 0 : if (strcmp(SlotInvalidationCauses[cause], invalidation_reason) == 0)
2431 : {
2432 0 : found = true;
2433 0 : result = cause;
2434 0 : break;
2435 : }
2436 : }
2437 :
2438 : Assert(found);
2439 0 : return result;
2440 : }
2441 :
2442 : /*
2443 : * A helper function to validate slots specified in GUC synchronized_standby_slots.
2444 : *
2445 : * The rawname will be parsed, and the result will be saved into *elemlist.
2446 : */
2447 : static bool
2448 12 : validate_sync_standby_slots(char *rawname, List **elemlist)
2449 : {
2450 : bool ok;
2451 :
2452 : /* Verify syntax and parse string into a list of identifiers */
2453 12 : ok = SplitIdentifierString(rawname, ',', elemlist);
2454 :
2455 12 : if (!ok)
2456 : {
2457 0 : GUC_check_errdetail("List syntax is invalid.");
2458 : }
2459 12 : else if (!ReplicationSlotCtl)
2460 : {
2461 : /*
2462 : * We cannot validate the replication slot if the replication slots'
2463 : * data has not been initialized. This is ok as we will anyway
2464 : * validate the specified slot when waiting for them to catch up. See
2465 : * StandbySlotsHaveCaughtup() for details.
2466 : */
2467 : }
2468 : else
2469 : {
2470 : /* Check that the specified slots exist and are logical slots */
2471 12 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
2472 :
2473 36 : foreach_ptr(char, name, *elemlist)
2474 : {
2475 : ReplicationSlot *slot;
2476 :
2477 12 : slot = SearchNamedReplicationSlot(name, false);
2478 :
2479 12 : if (!slot)
2480 : {
2481 0 : GUC_check_errdetail("replication slot \"%s\" does not exist",
2482 : name);
2483 0 : ok = false;
2484 0 : break;
2485 : }
2486 :
2487 12 : if (!SlotIsPhysical(slot))
2488 : {
2489 0 : GUC_check_errdetail("\"%s\" is not a physical replication slot",
2490 : name);
2491 0 : ok = false;
2492 0 : break;
2493 : }
2494 : }
2495 :
2496 12 : LWLockRelease(ReplicationSlotControlLock);
2497 : }
2498 :
2499 12 : return ok;
2500 : }
2501 :
2502 : /*
2503 : * GUC check_hook for synchronized_standby_slots
2504 : */
2505 : bool
2506 1986 : check_synchronized_standby_slots(char **newval, void **extra, GucSource source)
2507 : {
2508 : char *rawname;
2509 : char *ptr;
2510 : List *elemlist;
2511 : int size;
2512 : bool ok;
2513 : SyncStandbySlotsConfigData *config;
2514 :
2515 1986 : if ((*newval)[0] == '\0')
2516 1974 : return true;
2517 :
2518 : /* Need a modifiable copy of the GUC string */
2519 12 : rawname = pstrdup(*newval);
2520 :
2521 : /* Now verify if the specified slots exist and have correct type */
2522 12 : ok = validate_sync_standby_slots(rawname, &elemlist);
2523 :
2524 12 : if (!ok || elemlist == NIL)
2525 : {
2526 0 : pfree(rawname);
2527 0 : list_free(elemlist);
2528 0 : return ok;
2529 : }
2530 :
2531 : /* Compute the size required for the SyncStandbySlotsConfigData struct */
2532 12 : size = offsetof(SyncStandbySlotsConfigData, slot_names);
2533 36 : foreach_ptr(char, slot_name, elemlist)
2534 12 : size += strlen(slot_name) + 1;
2535 :
2536 : /* GUC extra value must be guc_malloc'd, not palloc'd */
2537 12 : config = (SyncStandbySlotsConfigData *) guc_malloc(LOG, size);
2538 :
2539 : /* Transform the data into SyncStandbySlotsConfigData */
2540 12 : config->nslotnames = list_length(elemlist);
2541 :
2542 12 : ptr = config->slot_names;
2543 36 : foreach_ptr(char, slot_name, elemlist)
2544 : {
2545 12 : strcpy(ptr, slot_name);
2546 12 : ptr += strlen(slot_name) + 1;
2547 : }
2548 :
2549 12 : *extra = (void *) config;
2550 :
2551 12 : pfree(rawname);
2552 12 : list_free(elemlist);
2553 12 : return true;
2554 : }
2555 :
2556 : /*
2557 : * GUC assign_hook for synchronized_standby_slots
2558 : */
2559 : void
2560 1986 : assign_synchronized_standby_slots(const char *newval, void *extra)
2561 : {
2562 : /*
2563 : * The standby slots may have changed, so we must recompute the oldest
2564 : * LSN.
2565 : */
2566 1986 : ss_oldest_flush_lsn = InvalidXLogRecPtr;
2567 :
2568 1986 : synchronized_standby_slots_config = (SyncStandbySlotsConfigData *) extra;
2569 1986 : }
2570 :
2571 : /*
2572 : * Check if the passed slot_name is specified in the synchronized_standby_slots GUC.
2573 : */
2574 : bool
2575 9660 : SlotExistsInSyncStandbySlots(const char *slot_name)
2576 : {
2577 : const char *standby_slot_name;
2578 :
2579 : /* Return false if there is no value in synchronized_standby_slots */
2580 9660 : if (synchronized_standby_slots_config == NULL)
2581 9650 : return false;
2582 :
2583 : /*
2584 : * XXX: We are not expecting this list to be long so a linear search
2585 : * shouldn't hurt but if that turns out not to be true then we can cache
2586 : * this information for each WalSender as well.
2587 : */
2588 10 : standby_slot_name = synchronized_standby_slots_config->slot_names;
2589 10 : for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
2590 : {
2591 10 : if (strcmp(standby_slot_name, slot_name) == 0)
2592 10 : return true;
2593 :
2594 0 : standby_slot_name += strlen(standby_slot_name) + 1;
2595 : }
2596 :
2597 0 : return false;
2598 : }
2599 :
2600 : /*
2601 : * Return true if the slots specified in synchronized_standby_slots have caught up to
2602 : * the given WAL location, false otherwise.
2603 : *
2604 : * The elevel parameter specifies the error level used for logging messages
2605 : * related to slots that do not exist, are invalidated, or are inactive.
2606 : */
2607 : bool
2608 1220 : StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
2609 : {
2610 : const char *name;
2611 1220 : int caught_up_slot_num = 0;
2612 1220 : XLogRecPtr min_restart_lsn = InvalidXLogRecPtr;
2613 :
2614 : /*
2615 : * Don't need to wait for the standbys to catch up if there is no value in
2616 : * synchronized_standby_slots.
2617 : */
2618 1220 : if (synchronized_standby_slots_config == NULL)
2619 1194 : return true;
2620 :
2621 : /*
2622 : * Don't need to wait for the standbys to catch up if we are on a standby
2623 : * server, since we do not support syncing slots to cascading standbys.
2624 : */
2625 26 : if (RecoveryInProgress())
2626 0 : return true;
2627 :
2628 : /*
2629 : * Don't need to wait for the standbys to catch up if they are already
2630 : * beyond the specified WAL location.
2631 : */
2632 26 : if (!XLogRecPtrIsInvalid(ss_oldest_flush_lsn) &&
2633 18 : ss_oldest_flush_lsn >= wait_for_lsn)
2634 10 : return true;
2635 :
2636 : /*
2637 : * To prevent concurrent slot dropping and creation while filtering the
2638 : * slots, take the ReplicationSlotControlLock outside of the loop.
2639 : */
2640 16 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
2641 :
2642 16 : name = synchronized_standby_slots_config->slot_names;
2643 22 : for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
2644 : {
2645 : XLogRecPtr restart_lsn;
2646 : bool invalidated;
2647 : bool inactive;
2648 : ReplicationSlot *slot;
2649 :
2650 16 : slot = SearchNamedReplicationSlot(name, false);
2651 :
2652 16 : if (!slot)
2653 : {
2654 : /*
2655 : * If a slot name provided in synchronized_standby_slots does not
2656 : * exist, report a message and exit the loop. A user can specify a
2657 : * slot name that does not exist just before the server startup.
2658 : * The GUC check_hook(validate_sync_standby_slots) cannot validate
2659 : * such a slot during startup as the ReplicationSlotCtl shared
2660 : * memory is not initialized at that time. It is also possible for
2661 : * a user to drop the slot in synchronized_standby_slots
2662 : * afterwards.
2663 : */
2664 0 : ereport(elevel,
2665 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2666 : errmsg("replication slot \"%s\" specified in parameter \"%s\" does not exist",
2667 : name, "synchronized_standby_slots"),
2668 : errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
2669 : name),
2670 : errhint("Create the replication slot \"%s\" or amend parameter \"%s\".",
2671 : name, "synchronized_standby_slots"));
2672 0 : break;
2673 : }
2674 :
2675 16 : if (SlotIsLogical(slot))
2676 : {
2677 : /*
2678 : * If a logical slot name is provided in
2679 : * synchronized_standby_slots, report a message and exit the loop.
2680 : * Similar to the non-existent case, a user can specify a logical
2681 : * slot name in synchronized_standby_slots before the server
2682 : * startup, or drop an existing physical slot and recreate a
2683 : * logical slot with the same name.
2684 : */
2685 0 : ereport(elevel,
2686 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2687 : errmsg("cannot specify logical replication slot \"%s\" in parameter \"%s\"",
2688 : name, "synchronized_standby_slots"),
2689 : errdetail("Logical replication is waiting for correction on replication slot \"%s\".",
2690 : name),
2691 : errhint("Remove the logical replication slot \"%s\" from parameter \"%s\".",
2692 : name, "synchronized_standby_slots"));
2693 0 : break;
2694 : }
2695 :
2696 16 : SpinLockAcquire(&slot->mutex);
2697 16 : restart_lsn = slot->data.restart_lsn;
2698 16 : invalidated = slot->data.invalidated != RS_INVAL_NONE;
2699 16 : inactive = slot->active_pid == 0;
2700 16 : SpinLockRelease(&slot->mutex);
2701 :
2702 16 : if (invalidated)
2703 : {
2704 : /* Specified physical slot has been invalidated */
2705 0 : ereport(elevel,
2706 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2707 : errmsg("physical replication slot \"%s\" specified in parameter \"%s\" has been invalidated",
2708 : name, "synchronized_standby_slots"),
2709 : errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
2710 : name),
2711 : errhint("Drop and recreate the replication slot \"%s\", or amend parameter \"%s\".",
2712 : name, "synchronized_standby_slots"));
2713 0 : break;
2714 : }
2715 :
2716 16 : if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn < wait_for_lsn)
2717 : {
2718 : /* Log a message if no active_pid for this physical slot */
2719 10 : if (inactive)
2720 8 : ereport(elevel,
2721 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2722 : errmsg("replication slot \"%s\" specified in parameter \"%s\" does not have active_pid",
2723 : name, "synchronized_standby_slots"),
2724 : errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
2725 : name),
2726 : errhint("Start the standby associated with the replication slot \"%s\", or amend parameter \"%s\".",
2727 : name, "synchronized_standby_slots"));
2728 :
2729 : /* Continue if the current slot hasn't caught up. */
2730 10 : break;
2731 : }
2732 :
2733 : Assert(restart_lsn >= wait_for_lsn);
2734 :
2735 6 : if (XLogRecPtrIsInvalid(min_restart_lsn) ||
2736 : min_restart_lsn > restart_lsn)
2737 6 : min_restart_lsn = restart_lsn;
2738 :
2739 6 : caught_up_slot_num++;
2740 :
2741 6 : name += strlen(name) + 1;
2742 : }
2743 :
2744 16 : LWLockRelease(ReplicationSlotControlLock);
2745 :
2746 : /*
2747 : * Return false if not all the standbys have caught up to the specified
2748 : * WAL location.
2749 : */
2750 16 : if (caught_up_slot_num != synchronized_standby_slots_config->nslotnames)
2751 10 : return false;
2752 :
2753 : /* The ss_oldest_flush_lsn must not retreat. */
2754 : Assert(XLogRecPtrIsInvalid(ss_oldest_flush_lsn) ||
2755 : min_restart_lsn >= ss_oldest_flush_lsn);
2756 :
2757 6 : ss_oldest_flush_lsn = min_restart_lsn;
2758 :
2759 6 : return true;
2760 : }
2761 :
2762 : /*
2763 : * Wait for physical standbys to confirm receiving the given lsn.
2764 : *
2765 : * Used by logical decoding SQL functions. It waits for physical standbys
2766 : * corresponding to the physical slots specified in the synchronized_standby_slots GUC.
2767 : */
2768 : void
2769 426 : WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
2770 : {
2771 : /*
2772 : * Don't need to wait for the standby to catch up if the current acquired
2773 : * slot is not a logical failover slot, or there is no value in
2774 : * synchronized_standby_slots.
2775 : */
2776 426 : if (!MyReplicationSlot->data.failover || !synchronized_standby_slots_config)
2777 424 : return;
2778 :
2779 2 : ConditionVariablePrepareToSleep(&WalSndCtl->wal_confirm_rcv_cv);
2780 :
2781 : for (;;)
2782 : {
2783 4 : CHECK_FOR_INTERRUPTS();
2784 :
2785 4 : if (ConfigReloadPending)
2786 : {
2787 2 : ConfigReloadPending = false;
2788 2 : ProcessConfigFile(PGC_SIGHUP);
2789 : }
2790 :
2791 : /* Exit if done waiting for every slot. */
2792 4 : if (StandbySlotsHaveCaughtup(wait_for_lsn, WARNING))
2793 2 : break;
2794 :
2795 : /*
2796 : * Wait for the slots in the synchronized_standby_slots to catch up,
2797 : * but use a timeout (1s) so we can also check if the
2798 : * synchronized_standby_slots has been changed.
2799 : */
2800 2 : ConditionVariableTimedSleep(&WalSndCtl->wal_confirm_rcv_cv, 1000,
2801 : WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION);
2802 : }
2803 :
2804 2 : ConditionVariableCancelSleep();
2805 : }
|