Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * slot.c
4 : * Replication slot management.
5 : *
6 : *
7 : * Copyright (c) 2012-2024, PostgreSQL Global Development Group
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/backend/replication/slot.c
12 : *
13 : * NOTES
14 : *
15 : * Replication slots are used to keep state about replication streams
16 : * originating from this cluster. Their primary purpose is to prevent the
17 : * premature removal of WAL or of old tuple versions in a manner that would
18 : * interfere with replication; they are also useful for monitoring purposes.
19 : * Slots need to be permanent (to allow restarts), crash-safe, and allocatable
20 : * on standbys (to support cascading setups). The requirement that slots be
21 : * usable on standbys precludes storing them in the system catalogs.
22 : *
23 : * Each replication slot gets its own directory inside the $PGDATA/pg_replslot
24 : * directory. Inside that directory the state file will contain the slot's
25 : * own data. Additional data can be stored alongside that file if required.
26 : * While the server is running, the state data is also cached in memory for
27 : * efficiency.
28 : *
29 : * ReplicationSlotAllocationLock must be taken in exclusive mode to allocate
30 : * or free a slot. ReplicationSlotControlLock must be taken in shared mode
31 : * to iterate over the slots, and in exclusive mode to change the in_use flag
32 : * of a slot. The remaining data in each slot is protected by its mutex.
33 : *
34 : *-------------------------------------------------------------------------
35 : */
36 :
37 : #include "postgres.h"
38 :
39 : #include <unistd.h>
40 : #include <sys/stat.h>
41 :
42 : #include "access/transam.h"
43 : #include "access/xlog_internal.h"
44 : #include "access/xlogrecovery.h"
45 : #include "common/file_utils.h"
46 : #include "common/string.h"
47 : #include "miscadmin.h"
48 : #include "pgstat.h"
49 : #include "postmaster/interrupt.h"
50 : #include "replication/slotsync.h"
51 : #include "replication/slot.h"
52 : #include "replication/walsender_private.h"
53 : #include "storage/fd.h"
54 : #include "storage/ipc.h"
55 : #include "storage/proc.h"
56 : #include "storage/procarray.h"
57 : #include "utils/builtins.h"
58 : #include "utils/guc_hooks.h"
59 : #include "utils/varlena.h"
60 :
61 : /*
62 : * Replication slot on-disk data structure.
63 : */
64 : typedef struct ReplicationSlotOnDisk
65 : {
66 : /* first part of this struct needs to be version independent */
67 :
68 : /* data not covered by checksum */
69 : uint32 magic;
70 : pg_crc32c checksum;
71 :
72 : /* data covered by checksum */
73 : uint32 version;
74 : uint32 length;
75 :
76 : /*
77 : * The actual data in the slot that follows can differ based on the above
78 : * 'version'.
79 : */
80 :
81 : ReplicationSlotPersistentData slotdata;
82 : } ReplicationSlotOnDisk;
83 :
84 : /*
85 : * Struct for the configuration of standby_slot_names.
86 : *
87 : * Note: this must be a flat representation that can be held in a single chunk
88 : * of guc_malloc'd memory, so that it can be stored as the "extra" data for the
89 : * standby_slot_names GUC.
90 : */
91 : typedef struct
92 : {
93 : /* Number of slot names in the slot_names[] */
94 : int nslotnames;
95 :
96 : /*
97 : * slot_names contains 'nslotnames' consecutive null-terminated C strings.
98 : */
99 : char slot_names[FLEXIBLE_ARRAY_MEMBER];
100 : } StandbySlotNamesConfigData;
101 :
102 : /*
103 : * Lookup table for slot invalidation causes.
104 : */
105 : const char *const SlotInvalidationCauses[] = {
106 : [RS_INVAL_NONE] = "none",
107 : [RS_INVAL_WAL_REMOVED] = "wal_removed",
108 : [RS_INVAL_HORIZON] = "rows_removed",
109 : [RS_INVAL_WAL_LEVEL] = "wal_level_insufficient",
110 : };
111 :
112 : /* Maximum number of invalidation causes */
113 : #define RS_INVAL_MAX_CAUSES RS_INVAL_WAL_LEVEL
114 :
115 : StaticAssertDecl(lengthof(SlotInvalidationCauses) == (RS_INVAL_MAX_CAUSES + 1),
116 : "array length mismatch");
117 :
118 : /* size of version independent data */
119 : #define ReplicationSlotOnDiskConstantSize \
120 : offsetof(ReplicationSlotOnDisk, slotdata)
121 : /* size of the part of the slot not covered by the checksum */
122 : #define ReplicationSlotOnDiskNotChecksummedSize \
123 : offsetof(ReplicationSlotOnDisk, version)
124 : /* size of the part covered by the checksum */
125 : #define ReplicationSlotOnDiskChecksummedSize \
126 : sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskNotChecksummedSize
127 : /* size of the slot data that is version dependent */
128 : #define ReplicationSlotOnDiskV2Size \
129 : sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
130 :
131 : #define SLOT_MAGIC 0x1051CA1 /* format identifier */
132 : #define SLOT_VERSION 5 /* version for new files */
133 :
134 : /* Control array for replication slot management */
135 : ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
136 :
137 : /* My backend's replication slot in the shared memory array */
138 : ReplicationSlot *MyReplicationSlot = NULL;
139 :
140 : /* GUC variables */
141 : int max_replication_slots = 10; /* the maximum number of replication
142 : * slots */
143 :
144 : /*
145 : * This GUC lists streaming replication standby server slot names that
146 : * logical WAL sender processes will wait for.
147 : */
148 : char *standby_slot_names;
149 :
150 : /* This is the parsed and cached configuration for standby_slot_names */
151 : static StandbySlotNamesConfigData *standby_slot_names_config;
152 :
153 : /*
154 : * Oldest LSN that has been confirmed to be flushed to the standbys
155 : * corresponding to the physical slots specified in the standby_slot_names GUC.
156 : */
157 : static XLogRecPtr ss_oldest_flush_lsn = InvalidXLogRecPtr;
158 :
159 : static void ReplicationSlotShmemExit(int code, Datum arg);
160 : static void ReplicationSlotDropPtr(ReplicationSlot *slot);
161 :
162 : /* internal persistency functions */
163 : static void RestoreSlotFromDisk(const char *name);
164 : static void CreateSlotOnDisk(ReplicationSlot *slot);
165 : static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel);
166 :
167 : /*
168 : * Report shared-memory space needed by ReplicationSlotsShmemInit.
169 : */
170 : Size
171 6594 : ReplicationSlotsShmemSize(void)
172 : {
173 6594 : Size size = 0;
174 :
175 6594 : if (max_replication_slots == 0)
176 4 : return size;
177 :
178 6590 : size = offsetof(ReplicationSlotCtlData, replication_slots);
179 6590 : size = add_size(size,
180 : mul_size(max_replication_slots, sizeof(ReplicationSlot)));
181 :
182 6590 : return size;
183 : }
184 :
185 : /*
186 : * Allocate and initialize shared memory for replication slots.
187 : */
188 : void
189 1706 : ReplicationSlotsShmemInit(void)
190 : {
191 : bool found;
192 :
193 1706 : if (max_replication_slots == 0)
194 2 : return;
195 :
196 1704 : ReplicationSlotCtl = (ReplicationSlotCtlData *)
197 1704 : ShmemInitStruct("ReplicationSlot Ctl", ReplicationSlotsShmemSize(),
198 : &found);
199 :
200 1704 : if (!found)
201 : {
202 : int i;
203 :
204 : /* First time through, so initialize */
205 3200 : MemSet(ReplicationSlotCtl, 0, ReplicationSlotsShmemSize());
206 :
207 18404 : for (i = 0; i < max_replication_slots; i++)
208 : {
209 16700 : ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i];
210 :
211 : /* everything else is zeroed by the memset above */
212 16700 : SpinLockInit(&slot->mutex);
213 16700 : LWLockInitialize(&slot->io_in_progress_lock,
214 : LWTRANCHE_REPLICATION_SLOT_IO);
215 16700 : ConditionVariableInit(&slot->active_cv);
216 : }
217 : }
218 : }
219 :
220 : /*
221 : * Register the callback for replication slot cleanup and releasing.
222 : */
223 : void
224 29298 : ReplicationSlotInitialize(void)
225 : {
226 29298 : before_shmem_exit(ReplicationSlotShmemExit, 0);
227 29298 : }
228 :
229 : /*
230 : * Release and cleanup replication slots.
231 : */
232 : static void
233 29298 : ReplicationSlotShmemExit(int code, Datum arg)
234 : {
235 : /* Make sure active replication slots are released */
236 29298 : if (MyReplicationSlot != NULL)
237 360 : ReplicationSlotRelease();
238 :
239 : /* Also cleanup all the temporary slots. */
240 29298 : ReplicationSlotCleanup();
241 29298 : }
242 :
243 : /*
244 : * Check whether the passed slot name is valid and report errors at elevel.
245 : *
246 : * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow
247 : * the name to be used as a directory name on every supported OS.
248 : *
249 : * Returns whether the directory name is valid or not if elevel < ERROR.
250 : */
251 : bool
252 1488 : ReplicationSlotValidateName(const char *name, int elevel)
253 : {
254 : const char *cp;
255 :
256 1488 : if (strlen(name) == 0)
257 : {
258 6 : ereport(elevel,
259 : (errcode(ERRCODE_INVALID_NAME),
260 : errmsg("replication slot name \"%s\" is too short",
261 : name)));
262 0 : return false;
263 : }
264 :
265 1482 : if (strlen(name) >= NAMEDATALEN)
266 : {
267 0 : ereport(elevel,
268 : (errcode(ERRCODE_NAME_TOO_LONG),
269 : errmsg("replication slot name \"%s\" is too long",
270 : name)));
271 0 : return false;
272 : }
273 :
274 30240 : for (cp = name; *cp; cp++)
275 : {
276 28760 : if (!((*cp >= 'a' && *cp <= 'z')
277 14536 : || (*cp >= '0' && *cp <= '9')
278 2784 : || (*cp == '_')))
279 : {
280 2 : ereport(elevel,
281 : (errcode(ERRCODE_INVALID_NAME),
282 : errmsg("replication slot name \"%s\" contains invalid character",
283 : name),
284 : errhint("Replication slot names may only contain lower case letters, numbers, and the underscore character.")));
285 0 : return false;
286 : }
287 : }
288 1480 : return true;
289 : }
290 :
291 : /*
292 : * Create a new replication slot and mark it as used by this backend.
293 : *
294 : * name: Name of the slot
295 : * db_specific: logical decoding is db specific; if the slot is going to
296 : * be used for that pass true, otherwise false.
297 : * two_phase: Allows decoding of prepared transactions. We allow this option
298 : * to be enabled only at the slot creation time. If we allow this option
299 : * to be changed during decoding then it is quite possible that we skip
300 : * prepare first time because this option was not enabled. Now next time
301 : * during getting changes, if the two_phase option is enabled it can skip
302 : * prepare because by that time start decoding point has been moved. So the
303 : * user will only get commit prepared.
304 : * failover: If enabled, allows the slot to be synced to standbys so
305 : * that logical replication can be resumed after failover.
306 : * synced: True if the slot is synchronized from the primary server.
307 : */
308 : void
309 1102 : ReplicationSlotCreate(const char *name, bool db_specific,
310 : ReplicationSlotPersistency persistency,
311 : bool two_phase, bool failover, bool synced)
312 : {
313 1102 : ReplicationSlot *slot = NULL;
314 : int i;
315 :
316 : Assert(MyReplicationSlot == NULL);
317 :
318 1102 : ReplicationSlotValidateName(name, ERROR);
319 :
320 1100 : if (failover)
321 : {
322 : /*
323 : * Do not allow users to create the failover enabled slots on the
324 : * standby as we do not support sync to the cascading standby.
325 : *
326 : * However, failover enabled slots can be created during slot
327 : * synchronization because we need to retain the same values as the
328 : * remote slot.
329 : */
330 30 : if (RecoveryInProgress() && !IsSyncingReplicationSlots())
331 0 : ereport(ERROR,
332 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
333 : errmsg("cannot enable failover for a replication slot created on the standby"));
334 :
335 : /*
336 : * Do not allow users to create failover enabled temporary slots,
337 : * because temporary slots will not be synced to the standby.
338 : *
339 : * However, failover enabled temporary slots can be created during
340 : * slot synchronization. See the comments atop slotsync.c for details.
341 : */
342 30 : if (persistency == RS_TEMPORARY && !IsSyncingReplicationSlots())
343 2 : ereport(ERROR,
344 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
345 : errmsg("cannot enable failover for a temporary replication slot"));
346 : }
347 :
348 : /*
349 : * If some other backend ran this code concurrently with us, we'd likely
350 : * both allocate the same slot, and that would be bad. We'd also be at
351 : * risk of missing a name collision. Also, we don't want to try to create
352 : * a new slot while somebody's busy cleaning up an old one, because we
353 : * might both be monkeying with the same directory.
354 : */
355 1098 : LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
356 :
357 : /*
358 : * Check for name collision, and identify an allocatable slot. We need to
359 : * hold ReplicationSlotControlLock in shared mode for this, so that nobody
360 : * else can change the in_use flags while we're looking at them.
361 : */
362 1098 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
363 10398 : for (i = 0; i < max_replication_slots; i++)
364 : {
365 9306 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
366 :
367 9306 : if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
368 6 : ereport(ERROR,
369 : (errcode(ERRCODE_DUPLICATE_OBJECT),
370 : errmsg("replication slot \"%s\" already exists", name)));
371 9300 : if (!s->in_use && slot == NULL)
372 1090 : slot = s;
373 : }
374 1092 : LWLockRelease(ReplicationSlotControlLock);
375 :
376 : /* If all slots are in use, we're out of luck. */
377 1092 : if (slot == NULL)
378 2 : ereport(ERROR,
379 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
380 : errmsg("all replication slots are in use"),
381 : errhint("Free one or increase max_replication_slots.")));
382 :
383 : /*
384 : * Since this slot is not in use, nobody should be looking at any part of
385 : * it other than the in_use field unless they're trying to allocate it.
386 : * And since we hold ReplicationSlotAllocationLock, nobody except us can
387 : * be doing that. So it's safe to initialize the slot.
388 : */
389 : Assert(!slot->in_use);
390 : Assert(slot->active_pid == 0);
391 :
392 : /* first initialize persistent data */
393 1090 : memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData));
394 1090 : namestrcpy(&slot->data.name, name);
395 1090 : slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
396 1090 : slot->data.persistency = persistency;
397 1090 : slot->data.two_phase = two_phase;
398 1090 : slot->data.two_phase_at = InvalidXLogRecPtr;
399 1090 : slot->data.failover = failover;
400 1090 : slot->data.synced = synced;
401 :
402 : /* and then data only present in shared memory */
403 1090 : slot->just_dirtied = false;
404 1090 : slot->dirty = false;
405 1090 : slot->effective_xmin = InvalidTransactionId;
406 1090 : slot->effective_catalog_xmin = InvalidTransactionId;
407 1090 : slot->candidate_catalog_xmin = InvalidTransactionId;
408 1090 : slot->candidate_xmin_lsn = InvalidXLogRecPtr;
409 1090 : slot->candidate_restart_valid = InvalidXLogRecPtr;
410 1090 : slot->candidate_restart_lsn = InvalidXLogRecPtr;
411 1090 : slot->last_saved_confirmed_flush = InvalidXLogRecPtr;
412 :
413 : /*
414 : * Create the slot on disk. We haven't actually marked the slot allocated
415 : * yet, so no special cleanup is required if this errors out.
416 : */
417 1090 : CreateSlotOnDisk(slot);
418 :
419 : /*
420 : * We need to briefly prevent any other backend from iterating over the
421 : * slots while we flip the in_use flag. We also need to set the active
422 : * flag while holding the ControlLock as otherwise a concurrent
423 : * ReplicationSlotAcquire() could acquire the slot as well.
424 : */
425 1090 : LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
426 :
427 1090 : slot->in_use = true;
428 :
429 : /* We can now mark the slot active, and that makes it our slot. */
430 1090 : SpinLockAcquire(&slot->mutex);
431 : Assert(slot->active_pid == 0);
432 1090 : slot->active_pid = MyProcPid;
433 1090 : SpinLockRelease(&slot->mutex);
434 1090 : MyReplicationSlot = slot;
435 :
436 1090 : LWLockRelease(ReplicationSlotControlLock);
437 :
438 : /*
439 : * Create statistics entry for the new logical slot. We don't collect any
440 : * stats for physical slots, so no need to create an entry for the same.
441 : * See ReplicationSlotDropPtr for why we need to do this before releasing
442 : * ReplicationSlotAllocationLock.
443 : */
444 1090 : if (SlotIsLogical(slot))
445 792 : pgstat_create_replslot(slot);
446 :
447 : /*
448 : * Now that the slot has been marked as in_use and active, it's safe to
449 : * let somebody else try to allocate a slot.
450 : */
451 1090 : LWLockRelease(ReplicationSlotAllocationLock);
452 :
453 : /* Let everybody know we've modified this slot */
454 1090 : ConditionVariableBroadcast(&slot->active_cv);
455 1090 : }
456 :
457 : /*
458 : * Search for the named replication slot.
459 : *
460 : * Return the replication slot if found, otherwise NULL.
461 : */
462 : ReplicationSlot *
463 2324 : SearchNamedReplicationSlot(const char *name, bool need_lock)
464 : {
465 : int i;
466 2324 : ReplicationSlot *slot = NULL;
467 :
468 2324 : if (need_lock)
469 98 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
470 :
471 3902 : for (i = 0; i < max_replication_slots; i++)
472 : {
473 3864 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
474 :
475 3864 : if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
476 : {
477 2286 : slot = s;
478 2286 : break;
479 : }
480 : }
481 :
482 2324 : if (need_lock)
483 98 : LWLockRelease(ReplicationSlotControlLock);
484 :
485 2324 : return slot;
486 : }
487 :
488 : /*
489 : * Return the index of the replication slot in
490 : * ReplicationSlotCtl->replication_slots.
491 : *
492 : * This is mainly useful to have an efficient key for storing replication slot
493 : * stats.
494 : */
495 : int
496 14318 : ReplicationSlotIndex(ReplicationSlot *slot)
497 : {
498 : Assert(slot >= ReplicationSlotCtl->replication_slots &&
499 : slot < ReplicationSlotCtl->replication_slots + max_replication_slots);
500 :
501 14318 : return slot - ReplicationSlotCtl->replication_slots;
502 : }
503 :
504 : /*
505 : * If the slot at 'index' is unused, return false. Otherwise 'name' is set to
506 : * the slot's name and true is returned.
507 : *
508 : * This likely is only useful for pgstat_replslot.c during shutdown, in other
509 : * cases there are obvious TOCTOU issues.
510 : */
511 : bool
512 124 : ReplicationSlotName(int index, Name name)
513 : {
514 : ReplicationSlot *slot;
515 : bool found;
516 :
517 124 : slot = &ReplicationSlotCtl->replication_slots[index];
518 :
519 : /*
520 : * Ensure that the slot cannot be dropped while we copy the name. Don't
521 : * need the spinlock as the name of an existing slot cannot change.
522 : */
523 124 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
524 124 : found = slot->in_use;
525 124 : if (slot->in_use)
526 124 : namestrcpy(name, NameStr(slot->data.name));
527 124 : LWLockRelease(ReplicationSlotControlLock);
528 :
529 124 : return found;
530 : }
531 :
532 : /*
533 : * Find a previously created slot and mark it as used by this process.
534 : *
535 : * An error is raised if nowait is true and the slot is currently in use. If
536 : * nowait is false, we sleep until the slot is released by the owning process.
537 : */
538 : void
539 2084 : ReplicationSlotAcquire(const char *name, bool nowait)
540 : {
541 : ReplicationSlot *s;
542 : int active_pid;
543 :
544 : Assert(name != NULL);
545 :
546 2084 : retry:
547 : Assert(MyReplicationSlot == NULL);
548 :
549 2084 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
550 :
551 : /* Check if the slot exits with the given name. */
552 2084 : s = SearchNamedReplicationSlot(name, false);
553 2084 : if (s == NULL || !s->in_use)
554 : {
555 18 : LWLockRelease(ReplicationSlotControlLock);
556 :
557 18 : ereport(ERROR,
558 : (errcode(ERRCODE_UNDEFINED_OBJECT),
559 : errmsg("replication slot \"%s\" does not exist",
560 : name)));
561 : }
562 :
563 : /*
564 : * This is the slot we want; check if it's active under some other
565 : * process. In single user mode, we don't need this check.
566 : */
567 2066 : if (IsUnderPostmaster)
568 : {
569 : /*
570 : * Get ready to sleep on the slot in case it is active. (We may end
571 : * up not sleeping, but we don't want to do this while holding the
572 : * spinlock.)
573 : */
574 2066 : if (!nowait)
575 452 : ConditionVariablePrepareToSleep(&s->active_cv);
576 :
577 2066 : SpinLockAcquire(&s->mutex);
578 2066 : if (s->active_pid == 0)
579 1830 : s->active_pid = MyProcPid;
580 2066 : active_pid = s->active_pid;
581 2066 : SpinLockRelease(&s->mutex);
582 : }
583 : else
584 0 : active_pid = MyProcPid;
585 2066 : LWLockRelease(ReplicationSlotControlLock);
586 :
587 : /*
588 : * If we found the slot but it's already active in another process, we
589 : * wait until the owning process signals us that it's been released, or
590 : * error out.
591 : */
592 2066 : if (active_pid != MyProcPid)
593 : {
594 0 : if (!nowait)
595 : {
596 : /* Wait here until we get signaled, and then restart */
597 0 : ConditionVariableSleep(&s->active_cv,
598 : WAIT_EVENT_REPLICATION_SLOT_DROP);
599 0 : ConditionVariableCancelSleep();
600 0 : goto retry;
601 : }
602 :
603 0 : ereport(ERROR,
604 : (errcode(ERRCODE_OBJECT_IN_USE),
605 : errmsg("replication slot \"%s\" is active for PID %d",
606 : NameStr(s->data.name), active_pid)));
607 : }
608 2066 : else if (!nowait)
609 452 : ConditionVariableCancelSleep(); /* no sleep needed after all */
610 :
611 : /* Let everybody know we've modified this slot */
612 2066 : ConditionVariableBroadcast(&s->active_cv);
613 :
614 : /* We made this slot active, so it's ours now. */
615 2066 : MyReplicationSlot = s;
616 :
617 : /*
618 : * The call to pgstat_acquire_replslot() protects against stats for a
619 : * different slot, from before a restart or such, being present during
620 : * pgstat_report_replslot().
621 : */
622 2066 : if (SlotIsLogical(s))
623 1740 : pgstat_acquire_replslot(s);
624 :
625 2066 : if (am_walsender)
626 : {
627 1406 : ereport(log_replication_commands ? LOG : DEBUG1,
628 : SlotIsLogical(s)
629 : ? errmsg("acquired logical replication slot \"%s\"",
630 : NameStr(s->data.name))
631 : : errmsg("acquired physical replication slot \"%s\"",
632 : NameStr(s->data.name)));
633 : }
634 2066 : }
635 :
636 : /*
637 : * Release the replication slot that this backend considers to own.
638 : *
639 : * This or another backend can re-acquire the slot later.
640 : * Resources this slot requires will be preserved.
641 : */
642 : void
643 2510 : ReplicationSlotRelease(void)
644 : {
645 2510 : ReplicationSlot *slot = MyReplicationSlot;
646 2510 : char *slotname = NULL; /* keep compiler quiet */
647 2510 : bool is_logical = false; /* keep compiler quiet */
648 :
649 : Assert(slot != NULL && slot->active_pid != 0);
650 :
651 2510 : if (am_walsender)
652 : {
653 1762 : slotname = pstrdup(NameStr(slot->data.name));
654 1762 : is_logical = SlotIsLogical(slot);
655 : }
656 :
657 2510 : if (slot->data.persistency == RS_EPHEMERAL)
658 : {
659 : /*
660 : * Delete the slot. There is no !PANIC case where this is allowed to
661 : * fail, all that may happen is an incomplete cleanup of the on-disk
662 : * data.
663 : */
664 10 : ReplicationSlotDropAcquired();
665 : }
666 :
667 : /*
668 : * If slot needed to temporarily restrain both data and catalog xmin to
669 : * create the catalog snapshot, remove that temporary constraint.
670 : * Snapshots can only be exported while the initial snapshot is still
671 : * acquired.
672 : */
673 2510 : if (!TransactionIdIsValid(slot->data.xmin) &&
674 2484 : TransactionIdIsValid(slot->effective_xmin))
675 : {
676 338 : SpinLockAcquire(&slot->mutex);
677 338 : slot->effective_xmin = InvalidTransactionId;
678 338 : SpinLockRelease(&slot->mutex);
679 338 : ReplicationSlotsComputeRequiredXmin(false);
680 : }
681 :
682 2510 : if (slot->data.persistency == RS_PERSISTENT)
683 : {
684 : /*
685 : * Mark persistent slot inactive. We're not freeing it, just
686 : * disconnecting, but wake up others that may be waiting for it.
687 : */
688 2024 : SpinLockAcquire(&slot->mutex);
689 2024 : slot->active_pid = 0;
690 2024 : SpinLockRelease(&slot->mutex);
691 2024 : ConditionVariableBroadcast(&slot->active_cv);
692 : }
693 :
694 2510 : MyReplicationSlot = NULL;
695 :
696 : /* might not have been set when we've been a plain slot */
697 2510 : LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
698 2510 : MyProc->statusFlags &= ~PROC_IN_LOGICAL_DECODING;
699 2510 : ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags;
700 2510 : LWLockRelease(ProcArrayLock);
701 :
702 2510 : if (am_walsender)
703 : {
704 1762 : ereport(log_replication_commands ? LOG : DEBUG1,
705 : is_logical
706 : ? errmsg("released logical replication slot \"%s\"",
707 : slotname)
708 : : errmsg("released physical replication slot \"%s\"",
709 : slotname));
710 :
711 1762 : pfree(slotname);
712 : }
713 2510 : }
714 :
715 : /*
716 : * Cleanup all temporary slots created in current session.
717 : */
718 : void
719 68100 : ReplicationSlotCleanup(void)
720 : {
721 : int i;
722 :
723 : Assert(MyReplicationSlot == NULL);
724 :
725 68100 : restart:
726 68100 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
727 737262 : for (i = 0; i < max_replication_slots; i++)
728 : {
729 669402 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
730 :
731 669402 : if (!s->in_use)
732 647356 : continue;
733 :
734 22046 : SpinLockAcquire(&s->mutex);
735 22046 : if (s->active_pid == MyProcPid)
736 : {
737 : Assert(s->data.persistency == RS_TEMPORARY);
738 240 : SpinLockRelease(&s->mutex);
739 240 : LWLockRelease(ReplicationSlotControlLock); /* avoid deadlock */
740 :
741 240 : ReplicationSlotDropPtr(s);
742 :
743 240 : ConditionVariableBroadcast(&s->active_cv);
744 240 : goto restart;
745 : }
746 : else
747 21806 : SpinLockRelease(&s->mutex);
748 : }
749 :
750 67860 : LWLockRelease(ReplicationSlotControlLock);
751 67860 : }
752 :
753 : /*
754 : * Permanently drop replication slot identified by the passed in name.
755 : */
756 : void
757 684 : ReplicationSlotDrop(const char *name, bool nowait)
758 : {
759 : Assert(MyReplicationSlot == NULL);
760 :
761 684 : ReplicationSlotAcquire(name, nowait);
762 :
763 : /*
764 : * Do not allow users to drop the slots which are currently being synced
765 : * from the primary to the standby.
766 : */
767 672 : if (RecoveryInProgress() && MyReplicationSlot->data.synced)
768 2 : ereport(ERROR,
769 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
770 : errmsg("cannot drop replication slot \"%s\"", name),
771 : errdetail("This slot is being synced from the primary server."));
772 :
773 670 : ReplicationSlotDropAcquired();
774 670 : }
775 :
776 : /*
777 : * Change the definition of the slot identified by the specified name.
778 : */
779 : void
780 12 : ReplicationSlotAlter(const char *name, bool failover)
781 : {
782 : Assert(MyReplicationSlot == NULL);
783 :
784 12 : ReplicationSlotAcquire(name, false);
785 :
786 12 : if (SlotIsPhysical(MyReplicationSlot))
787 0 : ereport(ERROR,
788 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
789 : errmsg("cannot use %s with a physical replication slot",
790 : "ALTER_REPLICATION_SLOT"));
791 :
792 12 : if (RecoveryInProgress())
793 : {
794 : /*
795 : * Do not allow users to alter the slots which are currently being
796 : * synced from the primary to the standby.
797 : */
798 2 : if (MyReplicationSlot->data.synced)
799 2 : ereport(ERROR,
800 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
801 : errmsg("cannot alter replication slot \"%s\"", name),
802 : errdetail("This slot is being synced from the primary server."));
803 :
804 : /*
805 : * Do not allow users to enable failover on the standby as we do not
806 : * support sync to the cascading standby.
807 : */
808 0 : if (failover)
809 0 : ereport(ERROR,
810 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
811 : errmsg("cannot enable failover for a replication slot"
812 : " on the standby"));
813 : }
814 :
815 : /*
816 : * Do not allow users to enable failover for temporary slots as we do not
817 : * support syncing temporary slots to the standby.
818 : */
819 10 : if (failover && MyReplicationSlot->data.persistency == RS_TEMPORARY)
820 0 : ereport(ERROR,
821 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
822 : errmsg("cannot enable failover for a temporary replication slot"));
823 :
824 10 : if (MyReplicationSlot->data.failover != failover)
825 : {
826 6 : SpinLockAcquire(&MyReplicationSlot->mutex);
827 6 : MyReplicationSlot->data.failover = failover;
828 6 : SpinLockRelease(&MyReplicationSlot->mutex);
829 :
830 6 : ReplicationSlotMarkDirty();
831 6 : ReplicationSlotSave();
832 : }
833 :
834 10 : ReplicationSlotRelease();
835 10 : }
836 :
837 : /*
838 : * Permanently drop the currently acquired replication slot.
839 : */
840 : void
841 694 : ReplicationSlotDropAcquired(void)
842 : {
843 694 : ReplicationSlot *slot = MyReplicationSlot;
844 :
845 : Assert(MyReplicationSlot != NULL);
846 :
847 : /* slot isn't acquired anymore */
848 694 : MyReplicationSlot = NULL;
849 :
850 694 : ReplicationSlotDropPtr(slot);
851 694 : }
852 :
853 : /*
854 : * Permanently drop the replication slot which will be released by the point
855 : * this function returns.
856 : */
857 : static void
858 934 : ReplicationSlotDropPtr(ReplicationSlot *slot)
859 : {
860 : char path[MAXPGPATH];
861 : char tmppath[MAXPGPATH];
862 :
863 : /*
864 : * If some other backend ran this code concurrently with us, we might try
865 : * to delete a slot with a certain name while someone else was trying to
866 : * create a slot with the same name.
867 : */
868 934 : LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
869 :
870 : /* Generate pathnames. */
871 934 : sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
872 934 : sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));
873 :
874 : /*
875 : * Rename the slot directory on disk, so that we'll no longer recognize
876 : * this as a valid slot. Note that if this fails, we've got to mark the
877 : * slot inactive before bailing out. If we're dropping an ephemeral or a
878 : * temporary slot, we better never fail hard as the caller won't expect
879 : * the slot to survive and this might get called during error handling.
880 : */
881 934 : if (rename(path, tmppath) == 0)
882 : {
883 : /*
884 : * We need to fsync() the directory we just renamed and its parent to
885 : * make sure that our changes are on disk in a crash-safe fashion. If
886 : * fsync() fails, we can't be sure whether the changes are on disk or
887 : * not. For now, we handle that by panicking;
888 : * StartupReplicationSlots() will try to straighten it out after
889 : * restart.
890 : */
891 934 : START_CRIT_SECTION();
892 934 : fsync_fname(tmppath, true);
893 934 : fsync_fname("pg_replslot", true);
894 934 : END_CRIT_SECTION();
895 : }
896 : else
897 : {
898 0 : bool fail_softly = slot->data.persistency != RS_PERSISTENT;
899 :
900 0 : SpinLockAcquire(&slot->mutex);
901 0 : slot->active_pid = 0;
902 0 : SpinLockRelease(&slot->mutex);
903 :
904 : /* wake up anyone waiting on this slot */
905 0 : ConditionVariableBroadcast(&slot->active_cv);
906 :
907 0 : ereport(fail_softly ? WARNING : ERROR,
908 : (errcode_for_file_access(),
909 : errmsg("could not rename file \"%s\" to \"%s\": %m",
910 : path, tmppath)));
911 : }
912 :
913 : /*
914 : * The slot is definitely gone. Lock out concurrent scans of the array
915 : * long enough to kill it. It's OK to clear the active PID here without
916 : * grabbing the mutex because nobody else can be scanning the array here,
917 : * and nobody can be attached to this slot and thus access it without
918 : * scanning the array.
919 : *
920 : * Also wake up processes waiting for it.
921 : */
922 934 : LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
923 934 : slot->active_pid = 0;
924 934 : slot->in_use = false;
925 934 : LWLockRelease(ReplicationSlotControlLock);
926 934 : ConditionVariableBroadcast(&slot->active_cv);
927 :
928 : /*
929 : * Slot is dead and doesn't prevent resource removal anymore, recompute
930 : * limits.
931 : */
932 934 : ReplicationSlotsComputeRequiredXmin(false);
933 934 : ReplicationSlotsComputeRequiredLSN();
934 :
935 : /*
936 : * If removing the directory fails, the worst thing that will happen is
937 : * that the user won't be able to create a new slot with the same name
938 : * until the next server restart. We warn about it, but that's all.
939 : */
940 934 : if (!rmtree(tmppath, true))
941 0 : ereport(WARNING,
942 : (errmsg("could not remove directory \"%s\"", tmppath)));
943 :
944 : /*
945 : * Drop the statistics entry for the replication slot. Do this while
946 : * holding ReplicationSlotAllocationLock so that we don't drop a
947 : * statistics entry for another slot with the same name just created in
948 : * another session.
949 : */
950 934 : if (SlotIsLogical(slot))
951 680 : pgstat_drop_replslot(slot);
952 :
953 : /*
954 : * We release this at the very end, so that nobody starts trying to create
955 : * a slot while we're still cleaning up the detritus of the old one.
956 : */
957 934 : LWLockRelease(ReplicationSlotAllocationLock);
958 934 : }
959 :
960 : /*
961 : * Serialize the currently acquired slot's state from memory to disk, thereby
962 : * guaranteeing the current state will survive a crash.
963 : */
964 : void
965 2154 : ReplicationSlotSave(void)
966 : {
967 : char path[MAXPGPATH];
968 :
969 : Assert(MyReplicationSlot != NULL);
970 :
971 2154 : sprintf(path, "pg_replslot/%s", NameStr(MyReplicationSlot->data.name));
972 2154 : SaveSlotToPath(MyReplicationSlot, path, ERROR);
973 2154 : }
974 :
975 : /*
976 : * Signal that it would be useful if the currently acquired slot would be
977 : * flushed out to disk.
978 : *
979 : * Note that the actual flush to disk can be delayed for a long time, if
980 : * required for correctness explicitly do a ReplicationSlotSave().
981 : */
982 : void
983 38980 : ReplicationSlotMarkDirty(void)
984 : {
985 38980 : ReplicationSlot *slot = MyReplicationSlot;
986 :
987 : Assert(MyReplicationSlot != NULL);
988 :
989 38980 : SpinLockAcquire(&slot->mutex);
990 38980 : MyReplicationSlot->just_dirtied = true;
991 38980 : MyReplicationSlot->dirty = true;
992 38980 : SpinLockRelease(&slot->mutex);
993 38980 : }
994 :
995 : /*
996 : * Convert a slot that's marked as RS_EPHEMERAL or RS_TEMPORARY to a
997 : * RS_PERSISTENT slot, guaranteeing it will be there after an eventual crash.
998 : */
999 : void
1000 766 : ReplicationSlotPersist(void)
1001 : {
1002 766 : ReplicationSlot *slot = MyReplicationSlot;
1003 :
1004 : Assert(slot != NULL);
1005 : Assert(slot->data.persistency != RS_PERSISTENT);
1006 :
1007 766 : SpinLockAcquire(&slot->mutex);
1008 766 : slot->data.persistency = RS_PERSISTENT;
1009 766 : SpinLockRelease(&slot->mutex);
1010 :
1011 766 : ReplicationSlotMarkDirty();
1012 766 : ReplicationSlotSave();
1013 766 : }
1014 :
1015 : /*
1016 : * Compute the oldest xmin across all slots and store it in the ProcArray.
1017 : *
1018 : * If already_locked is true, ProcArrayLock has already been acquired
1019 : * exclusively.
1020 : */
1021 : void
1022 3750 : ReplicationSlotsComputeRequiredXmin(bool already_locked)
1023 : {
1024 : int i;
1025 3750 : TransactionId agg_xmin = InvalidTransactionId;
1026 3750 : TransactionId agg_catalog_xmin = InvalidTransactionId;
1027 :
1028 : Assert(ReplicationSlotCtl != NULL);
1029 :
1030 3750 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1031 :
1032 37524 : for (i = 0; i < max_replication_slots; i++)
1033 : {
1034 33774 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
1035 : TransactionId effective_xmin;
1036 : TransactionId effective_catalog_xmin;
1037 : bool invalidated;
1038 :
1039 33774 : if (!s->in_use)
1040 30454 : continue;
1041 :
1042 3320 : SpinLockAcquire(&s->mutex);
1043 3320 : effective_xmin = s->effective_xmin;
1044 3320 : effective_catalog_xmin = s->effective_catalog_xmin;
1045 3320 : invalidated = s->data.invalidated != RS_INVAL_NONE;
1046 3320 : SpinLockRelease(&s->mutex);
1047 :
1048 : /* invalidated slots need not apply */
1049 3320 : if (invalidated)
1050 44 : continue;
1051 :
1052 : /* check the data xmin */
1053 3276 : if (TransactionIdIsValid(effective_xmin) &&
1054 6 : (!TransactionIdIsValid(agg_xmin) ||
1055 6 : TransactionIdPrecedes(effective_xmin, agg_xmin)))
1056 470 : agg_xmin = effective_xmin;
1057 :
1058 : /* check the catalog xmin */
1059 3276 : if (TransactionIdIsValid(effective_catalog_xmin) &&
1060 1302 : (!TransactionIdIsValid(agg_catalog_xmin) ||
1061 1302 : TransactionIdPrecedes(effective_catalog_xmin, agg_catalog_xmin)))
1062 1840 : agg_catalog_xmin = effective_catalog_xmin;
1063 : }
1064 :
1065 3750 : LWLockRelease(ReplicationSlotControlLock);
1066 :
1067 3750 : ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
1068 3750 : }
1069 :
1070 : /*
1071 : * Compute the oldest restart LSN across all slots and inform xlog module.
1072 : *
1073 : * Note: while max_slot_wal_keep_size is theoretically relevant for this
1074 : * purpose, we don't try to account for that, because this module doesn't
1075 : * know what to compare against.
1076 : */
1077 : void
1078 39738 : ReplicationSlotsComputeRequiredLSN(void)
1079 : {
1080 : int i;
1081 39738 : XLogRecPtr min_required = InvalidXLogRecPtr;
1082 :
1083 : Assert(ReplicationSlotCtl != NULL);
1084 :
1085 39738 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1086 431066 : for (i = 0; i < max_replication_slots; i++)
1087 : {
1088 391328 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
1089 : XLogRecPtr restart_lsn;
1090 : bool invalidated;
1091 :
1092 391328 : if (!s->in_use)
1093 352204 : continue;
1094 :
1095 39124 : SpinLockAcquire(&s->mutex);
1096 39124 : restart_lsn = s->data.restart_lsn;
1097 39124 : invalidated = s->data.invalidated != RS_INVAL_NONE;
1098 39124 : SpinLockRelease(&s->mutex);
1099 :
1100 : /* invalidated slots need not apply */
1101 39124 : if (invalidated)
1102 46 : continue;
1103 :
1104 39078 : if (restart_lsn != InvalidXLogRecPtr &&
1105 1154 : (min_required == InvalidXLogRecPtr ||
1106 : restart_lsn < min_required))
1107 37928 : min_required = restart_lsn;
1108 : }
1109 39738 : LWLockRelease(ReplicationSlotControlLock);
1110 :
1111 39738 : XLogSetReplicationSlotMinimumLSN(min_required);
1112 39738 : }
1113 :
1114 : /*
1115 : * Compute the oldest WAL LSN required by *logical* decoding slots..
1116 : *
1117 : * Returns InvalidXLogRecPtr if logical decoding is disabled or no logical
1118 : * slots exist.
1119 : *
1120 : * NB: this returns a value >= ReplicationSlotsComputeRequiredLSN(), since it
1121 : * ignores physical replication slots.
1122 : *
1123 : * The results aren't required frequently, so we don't maintain a precomputed
1124 : * value like we do for ComputeRequiredLSN() and ComputeRequiredXmin().
1125 : */
1126 : XLogRecPtr
1127 3292 : ReplicationSlotsComputeLogicalRestartLSN(void)
1128 : {
1129 3292 : XLogRecPtr result = InvalidXLogRecPtr;
1130 : int i;
1131 :
1132 3292 : if (max_replication_slots <= 0)
1133 4 : return InvalidXLogRecPtr;
1134 :
1135 3288 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1136 :
1137 35248 : for (i = 0; i < max_replication_slots; i++)
1138 : {
1139 : ReplicationSlot *s;
1140 : XLogRecPtr restart_lsn;
1141 : bool invalidated;
1142 :
1143 31960 : s = &ReplicationSlotCtl->replication_slots[i];
1144 :
1145 : /* cannot change while ReplicationSlotCtlLock is held */
1146 31960 : if (!s->in_use)
1147 31372 : continue;
1148 :
1149 : /* we're only interested in logical slots */
1150 588 : if (!SlotIsLogical(s))
1151 284 : continue;
1152 :
1153 : /* read once, it's ok if it increases while we're checking */
1154 304 : SpinLockAcquire(&s->mutex);
1155 304 : restart_lsn = s->data.restart_lsn;
1156 304 : invalidated = s->data.invalidated != RS_INVAL_NONE;
1157 304 : SpinLockRelease(&s->mutex);
1158 :
1159 : /* invalidated slots need not apply */
1160 304 : if (invalidated)
1161 8 : continue;
1162 :
1163 296 : if (restart_lsn == InvalidXLogRecPtr)
1164 0 : continue;
1165 :
1166 296 : if (result == InvalidXLogRecPtr ||
1167 : restart_lsn < result)
1168 240 : result = restart_lsn;
1169 : }
1170 :
1171 3288 : LWLockRelease(ReplicationSlotControlLock);
1172 :
1173 3288 : return result;
1174 : }
1175 :
1176 : /*
1177 : * ReplicationSlotsCountDBSlots -- count the number of slots that refer to the
1178 : * passed database oid.
1179 : *
1180 : * Returns true if there are any slots referencing the database. *nslots will
1181 : * be set to the absolute number of slots in the database, *nactive to ones
1182 : * currently active.
1183 : */
1184 : bool
1185 64 : ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
1186 : {
1187 : int i;
1188 :
1189 64 : *nslots = *nactive = 0;
1190 :
1191 64 : if (max_replication_slots <= 0)
1192 0 : return false;
1193 :
1194 64 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1195 646 : for (i = 0; i < max_replication_slots; i++)
1196 : {
1197 : ReplicationSlot *s;
1198 :
1199 582 : s = &ReplicationSlotCtl->replication_slots[i];
1200 :
1201 : /* cannot change while ReplicationSlotCtlLock is held */
1202 582 : if (!s->in_use)
1203 550 : continue;
1204 :
1205 : /* only logical slots are database specific, skip */
1206 32 : if (!SlotIsLogical(s))
1207 18 : continue;
1208 :
1209 : /* not our database, skip */
1210 14 : if (s->data.database != dboid)
1211 8 : continue;
1212 :
1213 : /* NB: intentionally counting invalidated slots */
1214 :
1215 : /* count slots with spinlock held */
1216 6 : SpinLockAcquire(&s->mutex);
1217 6 : (*nslots)++;
1218 6 : if (s->active_pid != 0)
1219 2 : (*nactive)++;
1220 6 : SpinLockRelease(&s->mutex);
1221 : }
1222 64 : LWLockRelease(ReplicationSlotControlLock);
1223 :
1224 64 : if (*nslots > 0)
1225 6 : return true;
1226 58 : return false;
1227 : }
1228 :
1229 : /*
1230 : * ReplicationSlotsDropDBSlots -- Drop all db-specific slots relating to the
1231 : * passed database oid. The caller should hold an exclusive lock on the
1232 : * pg_database oid for the database to prevent creation of new slots on the db
1233 : * or replay from existing slots.
1234 : *
1235 : * Another session that concurrently acquires an existing slot on the target DB
1236 : * (most likely to drop it) may cause this function to ERROR. If that happens
1237 : * it may have dropped some but not all slots.
1238 : *
1239 : * This routine isn't as efficient as it could be - but we don't drop
1240 : * databases often, especially databases with lots of slots.
1241 : */
1242 : void
1243 84 : ReplicationSlotsDropDBSlots(Oid dboid)
1244 : {
1245 : int i;
1246 :
1247 84 : if (max_replication_slots <= 0)
1248 0 : return;
1249 :
1250 84 : restart:
1251 94 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1252 860 : for (i = 0; i < max_replication_slots; i++)
1253 : {
1254 : ReplicationSlot *s;
1255 : char *slotname;
1256 : int active_pid;
1257 :
1258 776 : s = &ReplicationSlotCtl->replication_slots[i];
1259 :
1260 : /* cannot change while ReplicationSlotCtlLock is held */
1261 776 : if (!s->in_use)
1262 728 : continue;
1263 :
1264 : /* only logical slots are database specific, skip */
1265 48 : if (!SlotIsLogical(s))
1266 20 : continue;
1267 :
1268 : /* not our database, skip */
1269 28 : if (s->data.database != dboid)
1270 18 : continue;
1271 :
1272 : /* NB: intentionally including invalidated slots */
1273 :
1274 : /* acquire slot, so ReplicationSlotDropAcquired can be reused */
1275 10 : SpinLockAcquire(&s->mutex);
1276 : /* can't change while ReplicationSlotControlLock is held */
1277 10 : slotname = NameStr(s->data.name);
1278 10 : active_pid = s->active_pid;
1279 10 : if (active_pid == 0)
1280 : {
1281 10 : MyReplicationSlot = s;
1282 10 : s->active_pid = MyProcPid;
1283 : }
1284 10 : SpinLockRelease(&s->mutex);
1285 :
1286 : /*
1287 : * Even though we hold an exclusive lock on the database object a
1288 : * logical slot for that DB can still be active, e.g. if it's
1289 : * concurrently being dropped by a backend connected to another DB.
1290 : *
1291 : * That's fairly unlikely in practice, so we'll just bail out.
1292 : *
1293 : * The slot sync worker holds a shared lock on the database before
1294 : * operating on synced logical slots to avoid conflict with the drop
1295 : * happening here. The persistent synced slots are thus safe but there
1296 : * is a possibility that the slot sync worker has created a temporary
1297 : * slot (which stays active even on release) and we are trying to drop
1298 : * that here. In practice, the chances of hitting this scenario are
1299 : * less as during slot synchronization, the temporary slot is
1300 : * immediately converted to persistent and thus is safe due to the
1301 : * shared lock taken on the database. So, we'll just bail out in such
1302 : * a case.
1303 : *
1304 : * XXX: We can consider shutting down the slot sync worker before
1305 : * trying to drop synced temporary slots here.
1306 : */
1307 10 : if (active_pid)
1308 0 : ereport(ERROR,
1309 : (errcode(ERRCODE_OBJECT_IN_USE),
1310 : errmsg("replication slot \"%s\" is active for PID %d",
1311 : slotname, active_pid)));
1312 :
1313 : /*
1314 : * To avoid duplicating ReplicationSlotDropAcquired() and to avoid
1315 : * holding ReplicationSlotControlLock over filesystem operations,
1316 : * release ReplicationSlotControlLock and use
1317 : * ReplicationSlotDropAcquired.
1318 : *
1319 : * As that means the set of slots could change, restart scan from the
1320 : * beginning each time we release the lock.
1321 : */
1322 10 : LWLockRelease(ReplicationSlotControlLock);
1323 10 : ReplicationSlotDropAcquired();
1324 10 : goto restart;
1325 : }
1326 84 : LWLockRelease(ReplicationSlotControlLock);
1327 : }
1328 :
1329 :
1330 : /*
1331 : * Check whether the server's configuration supports using replication
1332 : * slots.
1333 : */
1334 : void
1335 2944 : CheckSlotRequirements(void)
1336 : {
1337 : /*
1338 : * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
1339 : * needs the same check.
1340 : */
1341 :
1342 2944 : if (max_replication_slots == 0)
1343 0 : ereport(ERROR,
1344 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1345 : errmsg("replication slots can only be used if max_replication_slots > 0")));
1346 :
1347 2944 : if (wal_level < WAL_LEVEL_REPLICA)
1348 0 : ereport(ERROR,
1349 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1350 : errmsg("replication slots can only be used if wal_level >= replica")));
1351 2944 : }
1352 :
1353 : /*
1354 : * Check whether the user has privilege to use replication slots.
1355 : */
1356 : void
1357 962 : CheckSlotPermissions(void)
1358 : {
1359 962 : if (!has_rolreplication(GetUserId()))
1360 10 : ereport(ERROR,
1361 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1362 : errmsg("permission denied to use replication slots"),
1363 : errdetail("Only roles with the %s attribute may use replication slots.",
1364 : "REPLICATION")));
1365 952 : }
1366 :
1367 : /*
1368 : * Reserve WAL for the currently active slot.
1369 : *
1370 : * Compute and set restart_lsn in a manner that's appropriate for the type of
1371 : * the slot and concurrency safe.
1372 : */
1373 : void
1374 1022 : ReplicationSlotReserveWal(void)
1375 : {
1376 1022 : ReplicationSlot *slot = MyReplicationSlot;
1377 :
1378 : Assert(slot != NULL);
1379 : Assert(slot->data.restart_lsn == InvalidXLogRecPtr);
1380 :
1381 : /*
1382 : * The replication slot mechanism is used to prevent removal of required
1383 : * WAL. As there is no interlock between this routine and checkpoints, WAL
1384 : * segments could concurrently be removed when a now stale return value of
1385 : * ReplicationSlotsComputeRequiredLSN() is used. In the unlikely case that
1386 : * this happens we'll just retry.
1387 : */
1388 : while (true)
1389 0 : {
1390 : XLogSegNo segno;
1391 : XLogRecPtr restart_lsn;
1392 :
1393 : /*
1394 : * For logical slots log a standby snapshot and start logical decoding
1395 : * at exactly that position. That allows the slot to start up more
1396 : * quickly. But on a standby we cannot do WAL writes, so just use the
1397 : * replay pointer; effectively, an attempt to create a logical slot on
1398 : * standby will cause it to wait for an xl_running_xact record to be
1399 : * logged independently on the primary, so that a snapshot can be
1400 : * built using the record.
1401 : *
1402 : * None of this is needed (or indeed helpful) for physical slots as
1403 : * they'll start replay at the last logged checkpoint anyway. Instead
1404 : * return the location of the last redo LSN. While that slightly
1405 : * increases the chance that we have to retry, it's where a base
1406 : * backup has to start replay at.
1407 : */
1408 1022 : if (SlotIsPhysical(slot))
1409 252 : restart_lsn = GetRedoRecPtr();
1410 770 : else if (RecoveryInProgress())
1411 44 : restart_lsn = GetXLogReplayRecPtr(NULL);
1412 : else
1413 726 : restart_lsn = GetXLogInsertRecPtr();
1414 :
1415 1022 : SpinLockAcquire(&slot->mutex);
1416 1022 : slot->data.restart_lsn = restart_lsn;
1417 1022 : SpinLockRelease(&slot->mutex);
1418 :
1419 : /* prevent WAL removal as fast as possible */
1420 1022 : ReplicationSlotsComputeRequiredLSN();
1421 :
1422 : /*
1423 : * If all required WAL is still there, great, otherwise retry. The
1424 : * slot should prevent further removal of WAL, unless there's a
1425 : * concurrent ReplicationSlotsComputeRequiredLSN() after we've written
1426 : * the new restart_lsn above, so normally we should never need to loop
1427 : * more than twice.
1428 : */
1429 1022 : XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size);
1430 1022 : if (XLogGetLastRemovedSegno() < segno)
1431 1022 : break;
1432 : }
1433 :
1434 1022 : if (!RecoveryInProgress() && SlotIsLogical(slot))
1435 : {
1436 : XLogRecPtr flushptr;
1437 :
1438 : /* make sure we have enough information to start */
1439 726 : flushptr = LogStandbySnapshot();
1440 :
1441 : /* and make sure it's fsynced to disk */
1442 726 : XLogFlush(flushptr);
1443 : }
1444 1022 : }
1445 :
1446 : /*
1447 : * Report that replication slot needs to be invalidated
1448 : */
1449 : static void
1450 42 : ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
1451 : bool terminating,
1452 : int pid,
1453 : NameData slotname,
1454 : XLogRecPtr restart_lsn,
1455 : XLogRecPtr oldestLSN,
1456 : TransactionId snapshotConflictHorizon)
1457 : {
1458 : StringInfoData err_detail;
1459 42 : bool hint = false;
1460 :
1461 42 : initStringInfo(&err_detail);
1462 :
1463 42 : switch (cause)
1464 : {
1465 12 : case RS_INVAL_WAL_REMOVED:
1466 : {
1467 12 : unsigned long long ex = oldestLSN - restart_lsn;
1468 :
1469 12 : hint = true;
1470 12 : appendStringInfo(&err_detail,
1471 12 : ngettext("The slot's restart_lsn %X/%X exceeds the limit by %llu byte.",
1472 : "The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.",
1473 : ex),
1474 12 : LSN_FORMAT_ARGS(restart_lsn),
1475 : ex);
1476 12 : break;
1477 : }
1478 24 : case RS_INVAL_HORIZON:
1479 24 : appendStringInfo(&err_detail, _("The slot conflicted with xid horizon %u."),
1480 : snapshotConflictHorizon);
1481 24 : break;
1482 :
1483 6 : case RS_INVAL_WAL_LEVEL:
1484 6 : appendStringInfoString(&err_detail, _("Logical decoding on standby requires wal_level >= logical on the primary server."));
1485 6 : break;
1486 : case RS_INVAL_NONE:
1487 : pg_unreachable();
1488 : }
1489 :
1490 42 : ereport(LOG,
1491 : terminating ?
1492 : errmsg("terminating process %d to release replication slot \"%s\"",
1493 : pid, NameStr(slotname)) :
1494 : errmsg("invalidating obsolete replication slot \"%s\"",
1495 : NameStr(slotname)),
1496 : errdetail_internal("%s", err_detail.data),
1497 : hint ? errhint("You might need to increase %s.", "max_slot_wal_keep_size") : 0);
1498 :
1499 42 : pfree(err_detail.data);
1500 42 : }
1501 :
1502 : /*
1503 : * Helper for InvalidateObsoleteReplicationSlots
1504 : *
1505 : * Acquires the given slot and mark it invalid, if necessary and possible.
1506 : *
1507 : * Returns whether ReplicationSlotControlLock was released in the interim (and
1508 : * in that case we're not holding the lock at return, otherwise we are).
1509 : *
1510 : * Sets *invalidated true if the slot was invalidated. (Untouched otherwise.)
1511 : *
1512 : * This is inherently racy, because we release the LWLock
1513 : * for syscalls, so caller must restart if we return true.
1514 : */
1515 : static bool
1516 404 : InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
1517 : ReplicationSlot *s,
1518 : XLogRecPtr oldestLSN,
1519 : Oid dboid, TransactionId snapshotConflictHorizon,
1520 : bool *invalidated)
1521 : {
1522 404 : int last_signaled_pid = 0;
1523 404 : bool released_lock = false;
1524 404 : bool terminated = false;
1525 404 : XLogRecPtr initial_effective_xmin = InvalidXLogRecPtr;
1526 404 : XLogRecPtr initial_catalog_effective_xmin = InvalidXLogRecPtr;
1527 404 : XLogRecPtr initial_restart_lsn = InvalidXLogRecPtr;
1528 404 : ReplicationSlotInvalidationCause conflict_prev PG_USED_FOR_ASSERTS_ONLY = RS_INVAL_NONE;
1529 :
1530 : for (;;)
1531 14 : {
1532 : XLogRecPtr restart_lsn;
1533 : NameData slotname;
1534 418 : int active_pid = 0;
1535 418 : ReplicationSlotInvalidationCause conflict = RS_INVAL_NONE;
1536 :
1537 : Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
1538 :
1539 418 : if (!s->in_use)
1540 : {
1541 0 : if (released_lock)
1542 0 : LWLockRelease(ReplicationSlotControlLock);
1543 0 : break;
1544 : }
1545 :
1546 : /*
1547 : * Check if the slot needs to be invalidated. If it needs to be
1548 : * invalidated, and is not currently acquired, acquire it and mark it
1549 : * as having been invalidated. We do this with the spinlock held to
1550 : * avoid race conditions -- for example the restart_lsn could move
1551 : * forward, or the slot could be dropped.
1552 : */
1553 418 : SpinLockAcquire(&s->mutex);
1554 :
1555 418 : restart_lsn = s->data.restart_lsn;
1556 :
1557 : /*
1558 : * If the slot is already invalid or is a non conflicting slot, we
1559 : * don't need to do anything.
1560 : */
1561 418 : if (s->data.invalidated == RS_INVAL_NONE)
1562 : {
1563 : /*
1564 : * The slot's mutex will be released soon, and it is possible that
1565 : * those values change since the process holding the slot has been
1566 : * terminated (if any), so record them here to ensure that we
1567 : * would report the correct conflict cause.
1568 : */
1569 340 : if (!terminated)
1570 : {
1571 326 : initial_restart_lsn = s->data.restart_lsn;
1572 326 : initial_effective_xmin = s->effective_xmin;
1573 326 : initial_catalog_effective_xmin = s->effective_catalog_xmin;
1574 : }
1575 :
1576 340 : switch (cause)
1577 : {
1578 286 : case RS_INVAL_WAL_REMOVED:
1579 286 : if (initial_restart_lsn != InvalidXLogRecPtr &&
1580 : initial_restart_lsn < oldestLSN)
1581 12 : conflict = cause;
1582 286 : break;
1583 48 : case RS_INVAL_HORIZON:
1584 48 : if (!SlotIsLogical(s))
1585 0 : break;
1586 : /* invalid DB oid signals a shared relation */
1587 48 : if (dboid != InvalidOid && dboid != s->data.database)
1588 0 : break;
1589 48 : if (TransactionIdIsValid(initial_effective_xmin) &&
1590 0 : TransactionIdPrecedesOrEquals(initial_effective_xmin,
1591 : snapshotConflictHorizon))
1592 0 : conflict = cause;
1593 96 : else if (TransactionIdIsValid(initial_catalog_effective_xmin) &&
1594 48 : TransactionIdPrecedesOrEquals(initial_catalog_effective_xmin,
1595 : snapshotConflictHorizon))
1596 24 : conflict = cause;
1597 48 : break;
1598 6 : case RS_INVAL_WAL_LEVEL:
1599 6 : if (SlotIsLogical(s))
1600 6 : conflict = cause;
1601 6 : break;
1602 : case RS_INVAL_NONE:
1603 : pg_unreachable();
1604 : }
1605 78 : }
1606 :
1607 : /*
1608 : * The conflict cause recorded previously should not change while the
1609 : * process owning the slot (if any) has been terminated.
1610 : */
1611 : Assert(!(conflict_prev != RS_INVAL_NONE && terminated &&
1612 : conflict_prev != conflict));
1613 :
1614 : /* if there's no conflict, we're done */
1615 418 : if (conflict == RS_INVAL_NONE)
1616 : {
1617 376 : SpinLockRelease(&s->mutex);
1618 376 : if (released_lock)
1619 0 : LWLockRelease(ReplicationSlotControlLock);
1620 376 : break;
1621 : }
1622 :
1623 42 : slotname = s->data.name;
1624 42 : active_pid = s->active_pid;
1625 :
1626 : /*
1627 : * If the slot can be acquired, do so and mark it invalidated
1628 : * immediately. Otherwise we'll signal the owning process, below, and
1629 : * retry.
1630 : */
1631 42 : if (active_pid == 0)
1632 : {
1633 28 : MyReplicationSlot = s;
1634 28 : s->active_pid = MyProcPid;
1635 28 : s->data.invalidated = conflict;
1636 :
1637 : /*
1638 : * XXX: We should consider not overwriting restart_lsn and instead
1639 : * just rely on .invalidated.
1640 : */
1641 28 : if (conflict == RS_INVAL_WAL_REMOVED)
1642 8 : s->data.restart_lsn = InvalidXLogRecPtr;
1643 :
1644 : /* Let caller know */
1645 28 : *invalidated = true;
1646 : }
1647 :
1648 42 : SpinLockRelease(&s->mutex);
1649 :
1650 : /*
1651 : * The logical replication slots shouldn't be invalidated as GUC
1652 : * max_slot_wal_keep_size is set to -1 during the binary upgrade. See
1653 : * check_old_cluster_for_valid_slots() where we ensure that no
1654 : * invalidated before the upgrade.
1655 : */
1656 : Assert(!(*invalidated && SlotIsLogical(s) && IsBinaryUpgrade));
1657 :
1658 42 : if (active_pid != 0)
1659 : {
1660 : /*
1661 : * Prepare the sleep on the slot's condition variable before
1662 : * releasing the lock, to close a possible race condition if the
1663 : * slot is released before the sleep below.
1664 : */
1665 14 : ConditionVariablePrepareToSleep(&s->active_cv);
1666 :
1667 14 : LWLockRelease(ReplicationSlotControlLock);
1668 14 : released_lock = true;
1669 :
1670 : /*
1671 : * Signal to terminate the process that owns the slot, if we
1672 : * haven't already signalled it. (Avoidance of repeated
1673 : * signalling is the only reason for there to be a loop in this
1674 : * routine; otherwise we could rely on caller's restart loop.)
1675 : *
1676 : * There is the race condition that other process may own the slot
1677 : * after its current owner process is terminated and before this
1678 : * process owns it. To handle that, we signal only if the PID of
1679 : * the owning process has changed from the previous time. (This
1680 : * logic assumes that the same PID is not reused very quickly.)
1681 : */
1682 14 : if (last_signaled_pid != active_pid)
1683 : {
1684 14 : ReportSlotInvalidation(conflict, true, active_pid,
1685 : slotname, restart_lsn,
1686 : oldestLSN, snapshotConflictHorizon);
1687 :
1688 14 : if (MyBackendType == B_STARTUP)
1689 10 : (void) SendProcSignal(active_pid,
1690 : PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT,
1691 : INVALID_PROC_NUMBER);
1692 : else
1693 4 : (void) kill(active_pid, SIGTERM);
1694 :
1695 14 : last_signaled_pid = active_pid;
1696 14 : terminated = true;
1697 14 : conflict_prev = conflict;
1698 : }
1699 :
1700 : /* Wait until the slot is released. */
1701 14 : ConditionVariableSleep(&s->active_cv,
1702 : WAIT_EVENT_REPLICATION_SLOT_DROP);
1703 :
1704 : /*
1705 : * Re-acquire lock and start over; we expect to invalidate the
1706 : * slot next time (unless another process acquires the slot in the
1707 : * meantime).
1708 : */
1709 14 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1710 14 : continue;
1711 : }
1712 : else
1713 : {
1714 : /*
1715 : * We hold the slot now and have already invalidated it; flush it
1716 : * to ensure that state persists.
1717 : *
1718 : * Don't want to hold ReplicationSlotControlLock across file
1719 : * system operations, so release it now but be sure to tell caller
1720 : * to restart from scratch.
1721 : */
1722 28 : LWLockRelease(ReplicationSlotControlLock);
1723 28 : released_lock = true;
1724 :
1725 : /* Make sure the invalidated state persists across server restart */
1726 28 : ReplicationSlotMarkDirty();
1727 28 : ReplicationSlotSave();
1728 28 : ReplicationSlotRelease();
1729 :
1730 28 : ReportSlotInvalidation(conflict, false, active_pid,
1731 : slotname, restart_lsn,
1732 : oldestLSN, snapshotConflictHorizon);
1733 :
1734 : /* done with this slot for now */
1735 28 : break;
1736 : }
1737 : }
1738 :
1739 : Assert(released_lock == !LWLockHeldByMe(ReplicationSlotControlLock));
1740 :
1741 404 : return released_lock;
1742 : }
1743 :
1744 : /*
1745 : * Invalidate slots that require resources about to be removed.
1746 : *
1747 : * Returns true when any slot have got invalidated.
1748 : *
1749 : * Whether a slot needs to be invalidated depends on the cause. A slot is
1750 : * removed if it:
1751 : * - RS_INVAL_WAL_REMOVED: requires a LSN older than the given segment
1752 : * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given
1753 : * db; dboid may be InvalidOid for shared relations
1754 : * - RS_INVAL_WAL_LEVEL: is logical
1755 : *
1756 : * NB - this runs as part of checkpoint, so avoid raising errors if possible.
1757 : */
1758 : bool
1759 1682 : InvalidateObsoleteReplicationSlots(ReplicationSlotInvalidationCause cause,
1760 : XLogSegNo oldestSegno, Oid dboid,
1761 : TransactionId snapshotConflictHorizon)
1762 : {
1763 : XLogRecPtr oldestLSN;
1764 1682 : bool invalidated = false;
1765 :
1766 : Assert(cause != RS_INVAL_HORIZON || TransactionIdIsValid(snapshotConflictHorizon));
1767 : Assert(cause != RS_INVAL_WAL_REMOVED || oldestSegno > 0);
1768 : Assert(cause != RS_INVAL_NONE);
1769 :
1770 1682 : if (max_replication_slots == 0)
1771 2 : return invalidated;
1772 :
1773 1680 : XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
1774 :
1775 1708 : restart:
1776 1708 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1777 17878 : for (int i = 0; i < max_replication_slots; i++)
1778 : {
1779 16198 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
1780 :
1781 16198 : if (!s->in_use)
1782 15794 : continue;
1783 :
1784 404 : if (InvalidatePossiblyObsoleteSlot(cause, s, oldestLSN, dboid,
1785 : snapshotConflictHorizon,
1786 : &invalidated))
1787 : {
1788 : /* if the lock was released, start from scratch */
1789 28 : goto restart;
1790 : }
1791 : }
1792 1680 : LWLockRelease(ReplicationSlotControlLock);
1793 :
1794 : /*
1795 : * If any slots have been invalidated, recalculate the resource limits.
1796 : */
1797 1680 : if (invalidated)
1798 : {
1799 18 : ReplicationSlotsComputeRequiredXmin(false);
1800 18 : ReplicationSlotsComputeRequiredLSN();
1801 : }
1802 :
1803 1680 : return invalidated;
1804 : }
1805 :
1806 : /*
1807 : * Flush all replication slots to disk.
1808 : *
1809 : * It is convenient to flush dirty replication slots at the time of checkpoint.
1810 : * Additionally, in case of a shutdown checkpoint, we also identify the slots
1811 : * for which the confirmed_flush LSN has been updated since the last time it
1812 : * was saved and flush them.
1813 : */
1814 : void
1815 1646 : CheckPointReplicationSlots(bool is_shutdown)
1816 : {
1817 : int i;
1818 :
1819 1646 : elog(DEBUG1, "performing replication slot checkpoint");
1820 :
1821 : /*
1822 : * Prevent any slot from being created/dropped while we're active. As we
1823 : * explicitly do *not* want to block iterating over replication_slots or
1824 : * acquiring a slot we cannot take the control lock - but that's OK,
1825 : * because holding ReplicationSlotAllocationLock is strictly stronger, and
1826 : * enough to guarantee that nobody can change the in_use bits on us.
1827 : */
1828 1646 : LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
1829 :
1830 17626 : for (i = 0; i < max_replication_slots; i++)
1831 : {
1832 15980 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
1833 : char path[MAXPGPATH];
1834 :
1835 15980 : if (!s->in_use)
1836 15686 : continue;
1837 :
1838 : /* save the slot to disk, locking is handled in SaveSlotToPath() */
1839 294 : sprintf(path, "pg_replslot/%s", NameStr(s->data.name));
1840 :
1841 : /*
1842 : * Slot's data is not flushed each time the confirmed_flush LSN is
1843 : * updated as that could lead to frequent writes. However, we decide
1844 : * to force a flush of all logical slot's data at the time of shutdown
1845 : * if the confirmed_flush LSN is changed since we last flushed it to
1846 : * disk. This helps in avoiding an unnecessary retreat of the
1847 : * confirmed_flush LSN after restart.
1848 : */
1849 294 : if (is_shutdown && SlotIsLogical(s))
1850 : {
1851 112 : SpinLockAcquire(&s->mutex);
1852 :
1853 : Assert(s->data.confirmed_flush >= s->last_saved_confirmed_flush);
1854 :
1855 112 : if (s->data.invalidated == RS_INVAL_NONE &&
1856 112 : s->data.confirmed_flush != s->last_saved_confirmed_flush)
1857 : {
1858 68 : s->just_dirtied = true;
1859 68 : s->dirty = true;
1860 : }
1861 112 : SpinLockRelease(&s->mutex);
1862 : }
1863 :
1864 294 : SaveSlotToPath(s, path, LOG);
1865 : }
1866 1646 : LWLockRelease(ReplicationSlotAllocationLock);
1867 1646 : }
1868 :
1869 : /*
1870 : * Load all replication slots from disk into memory at server startup. This
1871 : * needs to be run before we start crash recovery.
1872 : */
1873 : void
1874 1466 : StartupReplicationSlots(void)
1875 : {
1876 : DIR *replication_dir;
1877 : struct dirent *replication_de;
1878 :
1879 1466 : elog(DEBUG1, "starting up replication slots");
1880 :
1881 : /* restore all slots by iterating over all on-disk entries */
1882 1466 : replication_dir = AllocateDir("pg_replslot");
1883 4500 : while ((replication_de = ReadDir(replication_dir, "pg_replslot")) != NULL)
1884 : {
1885 : char path[MAXPGPATH + 12];
1886 : PGFileType de_type;
1887 :
1888 3034 : if (strcmp(replication_de->d_name, ".") == 0 ||
1889 1568 : strcmp(replication_de->d_name, "..") == 0)
1890 2932 : continue;
1891 :
1892 102 : snprintf(path, sizeof(path), "pg_replslot/%s", replication_de->d_name);
1893 102 : de_type = get_dirent_type(path, replication_de, false, DEBUG1);
1894 :
1895 : /* we're only creating directories here, skip if it's not our's */
1896 102 : if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_DIR)
1897 0 : continue;
1898 :
1899 : /* we crashed while a slot was being setup or deleted, clean up */
1900 102 : if (pg_str_endswith(replication_de->d_name, ".tmp"))
1901 : {
1902 0 : if (!rmtree(path, true))
1903 : {
1904 0 : ereport(WARNING,
1905 : (errmsg("could not remove directory \"%s\"",
1906 : path)));
1907 0 : continue;
1908 : }
1909 0 : fsync_fname("pg_replslot", true);
1910 0 : continue;
1911 : }
1912 :
1913 : /* looks like a slot in a normal state, restore */
1914 102 : RestoreSlotFromDisk(replication_de->d_name);
1915 : }
1916 1466 : FreeDir(replication_dir);
1917 :
1918 : /* currently no slots exist, we're done. */
1919 1466 : if (max_replication_slots <= 0)
1920 2 : return;
1921 :
1922 : /* Now that we have recovered all the data, compute replication xmin */
1923 1464 : ReplicationSlotsComputeRequiredXmin(false);
1924 1464 : ReplicationSlotsComputeRequiredLSN();
1925 : }
1926 :
1927 : /* ----
1928 : * Manipulation of on-disk state of replication slots
1929 : *
1930 : * NB: none of the routines below should take any notice whether a slot is the
1931 : * current one or not, that's all handled a layer above.
1932 : * ----
1933 : */
1934 : static void
1935 1090 : CreateSlotOnDisk(ReplicationSlot *slot)
1936 : {
1937 : char tmppath[MAXPGPATH];
1938 : char path[MAXPGPATH];
1939 : struct stat st;
1940 :
1941 : /*
1942 : * No need to take out the io_in_progress_lock, nobody else can see this
1943 : * slot yet, so nobody else will write. We're reusing SaveSlotToPath which
1944 : * takes out the lock, if we'd take the lock here, we'd deadlock.
1945 : */
1946 :
1947 1090 : sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
1948 1090 : sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));
1949 :
1950 : /*
1951 : * It's just barely possible that some previous effort to create or drop a
1952 : * slot with this name left a temp directory lying around. If that seems
1953 : * to be the case, try to remove it. If the rmtree() fails, we'll error
1954 : * out at the MakePGDirectory() below, so we don't bother checking
1955 : * success.
1956 : */
1957 1090 : if (stat(tmppath, &st) == 0 && S_ISDIR(st.st_mode))
1958 0 : rmtree(tmppath, true);
1959 :
1960 : /* Create and fsync the temporary slot directory. */
1961 1090 : if (MakePGDirectory(tmppath) < 0)
1962 0 : ereport(ERROR,
1963 : (errcode_for_file_access(),
1964 : errmsg("could not create directory \"%s\": %m",
1965 : tmppath)));
1966 1090 : fsync_fname(tmppath, true);
1967 :
1968 : /* Write the actual state file. */
1969 1090 : slot->dirty = true; /* signal that we really need to write */
1970 1090 : SaveSlotToPath(slot, tmppath, ERROR);
1971 :
1972 : /* Rename the directory into place. */
1973 1090 : if (rename(tmppath, path) != 0)
1974 0 : ereport(ERROR,
1975 : (errcode_for_file_access(),
1976 : errmsg("could not rename file \"%s\" to \"%s\": %m",
1977 : tmppath, path)));
1978 :
1979 : /*
1980 : * If we'd now fail - really unlikely - we wouldn't know whether this slot
1981 : * would persist after an OS crash or not - so, force a restart. The
1982 : * restart would try to fsync this again till it works.
1983 : */
1984 1090 : START_CRIT_SECTION();
1985 :
1986 1090 : fsync_fname(path, true);
1987 1090 : fsync_fname("pg_replslot", true);
1988 :
1989 1090 : END_CRIT_SECTION();
1990 1090 : }
1991 :
1992 : /*
1993 : * Shared functionality between saving and creating a replication slot.
1994 : */
1995 : static void
1996 3538 : SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
1997 : {
1998 : char tmppath[MAXPGPATH];
1999 : char path[MAXPGPATH];
2000 : int fd;
2001 : ReplicationSlotOnDisk cp;
2002 : bool was_dirty;
2003 :
2004 : /* first check whether there's something to write out */
2005 3538 : SpinLockAcquire(&slot->mutex);
2006 3538 : was_dirty = slot->dirty;
2007 3538 : slot->just_dirtied = false;
2008 3538 : SpinLockRelease(&slot->mutex);
2009 :
2010 : /* and don't do anything if there's nothing to write */
2011 3538 : if (!was_dirty)
2012 148 : return;
2013 :
2014 3390 : LWLockAcquire(&slot->io_in_progress_lock, LW_EXCLUSIVE);
2015 :
2016 : /* silence valgrind :( */
2017 3390 : memset(&cp, 0, sizeof(ReplicationSlotOnDisk));
2018 :
2019 3390 : sprintf(tmppath, "%s/state.tmp", dir);
2020 3390 : sprintf(path, "%s/state", dir);
2021 :
2022 3390 : fd = OpenTransientFile(tmppath, O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
2023 3390 : if (fd < 0)
2024 : {
2025 : /*
2026 : * If not an ERROR, then release the lock before returning. In case
2027 : * of an ERROR, the error recovery path automatically releases the
2028 : * lock, but no harm in explicitly releasing even in that case. Note
2029 : * that LWLockRelease() could affect errno.
2030 : */
2031 0 : int save_errno = errno;
2032 :
2033 0 : LWLockRelease(&slot->io_in_progress_lock);
2034 0 : errno = save_errno;
2035 0 : ereport(elevel,
2036 : (errcode_for_file_access(),
2037 : errmsg("could not create file \"%s\": %m",
2038 : tmppath)));
2039 0 : return;
2040 : }
2041 :
2042 3390 : cp.magic = SLOT_MAGIC;
2043 3390 : INIT_CRC32C(cp.checksum);
2044 3390 : cp.version = SLOT_VERSION;
2045 3390 : cp.length = ReplicationSlotOnDiskV2Size;
2046 :
2047 3390 : SpinLockAcquire(&slot->mutex);
2048 :
2049 3390 : memcpy(&cp.slotdata, &slot->data, sizeof(ReplicationSlotPersistentData));
2050 :
2051 3390 : SpinLockRelease(&slot->mutex);
2052 :
2053 3390 : COMP_CRC32C(cp.checksum,
2054 : (char *) (&cp) + ReplicationSlotOnDiskNotChecksummedSize,
2055 : ReplicationSlotOnDiskChecksummedSize);
2056 3390 : FIN_CRC32C(cp.checksum);
2057 :
2058 3390 : errno = 0;
2059 3390 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_WRITE);
2060 3390 : if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
2061 : {
2062 0 : int save_errno = errno;
2063 :
2064 0 : pgstat_report_wait_end();
2065 0 : CloseTransientFile(fd);
2066 0 : LWLockRelease(&slot->io_in_progress_lock);
2067 :
2068 : /* if write didn't set errno, assume problem is no disk space */
2069 0 : errno = save_errno ? save_errno : ENOSPC;
2070 0 : ereport(elevel,
2071 : (errcode_for_file_access(),
2072 : errmsg("could not write to file \"%s\": %m",
2073 : tmppath)));
2074 0 : return;
2075 : }
2076 3390 : pgstat_report_wait_end();
2077 :
2078 : /* fsync the temporary file */
2079 3390 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_SYNC);
2080 3390 : if (pg_fsync(fd) != 0)
2081 : {
2082 0 : int save_errno = errno;
2083 :
2084 0 : pgstat_report_wait_end();
2085 0 : CloseTransientFile(fd);
2086 0 : LWLockRelease(&slot->io_in_progress_lock);
2087 0 : errno = save_errno;
2088 0 : ereport(elevel,
2089 : (errcode_for_file_access(),
2090 : errmsg("could not fsync file \"%s\": %m",
2091 : tmppath)));
2092 0 : return;
2093 : }
2094 3390 : pgstat_report_wait_end();
2095 :
2096 3390 : if (CloseTransientFile(fd) != 0)
2097 : {
2098 0 : int save_errno = errno;
2099 :
2100 0 : LWLockRelease(&slot->io_in_progress_lock);
2101 0 : errno = save_errno;
2102 0 : ereport(elevel,
2103 : (errcode_for_file_access(),
2104 : errmsg("could not close file \"%s\": %m",
2105 : tmppath)));
2106 0 : return;
2107 : }
2108 :
2109 : /* rename to permanent file, fsync file and directory */
2110 3390 : if (rename(tmppath, path) != 0)
2111 : {
2112 0 : int save_errno = errno;
2113 :
2114 0 : LWLockRelease(&slot->io_in_progress_lock);
2115 0 : errno = save_errno;
2116 0 : ereport(elevel,
2117 : (errcode_for_file_access(),
2118 : errmsg("could not rename file \"%s\" to \"%s\": %m",
2119 : tmppath, path)));
2120 0 : return;
2121 : }
2122 :
2123 : /*
2124 : * Check CreateSlotOnDisk() for the reasoning of using a critical section.
2125 : */
2126 3390 : START_CRIT_SECTION();
2127 :
2128 3390 : fsync_fname(path, false);
2129 3390 : fsync_fname(dir, true);
2130 3390 : fsync_fname("pg_replslot", true);
2131 :
2132 3390 : END_CRIT_SECTION();
2133 :
2134 : /*
2135 : * Successfully wrote, unset dirty bit, unless somebody dirtied again
2136 : * already and remember the confirmed_flush LSN value.
2137 : */
2138 3390 : SpinLockAcquire(&slot->mutex);
2139 3390 : if (!slot->just_dirtied)
2140 3390 : slot->dirty = false;
2141 3390 : slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
2142 3390 : SpinLockRelease(&slot->mutex);
2143 :
2144 3390 : LWLockRelease(&slot->io_in_progress_lock);
2145 : }
2146 :
2147 : /*
2148 : * Load a single slot from disk into memory.
2149 : */
2150 : static void
2151 102 : RestoreSlotFromDisk(const char *name)
2152 : {
2153 : ReplicationSlotOnDisk cp;
2154 : int i;
2155 : char slotdir[MAXPGPATH + 12];
2156 : char path[MAXPGPATH + 22];
2157 : int fd;
2158 102 : bool restored = false;
2159 : int readBytes;
2160 : pg_crc32c checksum;
2161 :
2162 : /* no need to lock here, no concurrent access allowed yet */
2163 :
2164 : /* delete temp file if it exists */
2165 102 : sprintf(slotdir, "pg_replslot/%s", name);
2166 102 : sprintf(path, "%s/state.tmp", slotdir);
2167 102 : if (unlink(path) < 0 && errno != ENOENT)
2168 0 : ereport(PANIC,
2169 : (errcode_for_file_access(),
2170 : errmsg("could not remove file \"%s\": %m", path)));
2171 :
2172 102 : sprintf(path, "%s/state", slotdir);
2173 :
2174 102 : elog(DEBUG1, "restoring replication slot from \"%s\"", path);
2175 :
2176 : /* on some operating systems fsyncing a file requires O_RDWR */
2177 102 : fd = OpenTransientFile(path, O_RDWR | PG_BINARY);
2178 :
2179 : /*
2180 : * We do not need to handle this as we are rename()ing the directory into
2181 : * place only after we fsync()ed the state file.
2182 : */
2183 102 : if (fd < 0)
2184 0 : ereport(PANIC,
2185 : (errcode_for_file_access(),
2186 : errmsg("could not open file \"%s\": %m", path)));
2187 :
2188 : /*
2189 : * Sync state file before we're reading from it. We might have crashed
2190 : * while it wasn't synced yet and we shouldn't continue on that basis.
2191 : */
2192 102 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_RESTORE_SYNC);
2193 102 : if (pg_fsync(fd) != 0)
2194 0 : ereport(PANIC,
2195 : (errcode_for_file_access(),
2196 : errmsg("could not fsync file \"%s\": %m",
2197 : path)));
2198 102 : pgstat_report_wait_end();
2199 :
2200 : /* Also sync the parent directory */
2201 102 : START_CRIT_SECTION();
2202 102 : fsync_fname(slotdir, true);
2203 102 : END_CRIT_SECTION();
2204 :
2205 : /* read part of statefile that's guaranteed to be version independent */
2206 102 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
2207 102 : readBytes = read(fd, &cp, ReplicationSlotOnDiskConstantSize);
2208 102 : pgstat_report_wait_end();
2209 102 : if (readBytes != ReplicationSlotOnDiskConstantSize)
2210 : {
2211 0 : if (readBytes < 0)
2212 0 : ereport(PANIC,
2213 : (errcode_for_file_access(),
2214 : errmsg("could not read file \"%s\": %m", path)));
2215 : else
2216 0 : ereport(PANIC,
2217 : (errcode(ERRCODE_DATA_CORRUPTED),
2218 : errmsg("could not read file \"%s\": read %d of %zu",
2219 : path, readBytes,
2220 : (Size) ReplicationSlotOnDiskConstantSize)));
2221 : }
2222 :
2223 : /* verify magic */
2224 102 : if (cp.magic != SLOT_MAGIC)
2225 0 : ereport(PANIC,
2226 : (errcode(ERRCODE_DATA_CORRUPTED),
2227 : errmsg("replication slot file \"%s\" has wrong magic number: %u instead of %u",
2228 : path, cp.magic, SLOT_MAGIC)));
2229 :
2230 : /* verify version */
2231 102 : if (cp.version != SLOT_VERSION)
2232 0 : ereport(PANIC,
2233 : (errcode(ERRCODE_DATA_CORRUPTED),
2234 : errmsg("replication slot file \"%s\" has unsupported version %u",
2235 : path, cp.version)));
2236 :
2237 : /* boundary check on length */
2238 102 : if (cp.length != ReplicationSlotOnDiskV2Size)
2239 0 : ereport(PANIC,
2240 : (errcode(ERRCODE_DATA_CORRUPTED),
2241 : errmsg("replication slot file \"%s\" has corrupted length %u",
2242 : path, cp.length)));
2243 :
2244 : /* Now that we know the size, read the entire file */
2245 102 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
2246 204 : readBytes = read(fd,
2247 : (char *) &cp + ReplicationSlotOnDiskConstantSize,
2248 102 : cp.length);
2249 102 : pgstat_report_wait_end();
2250 102 : if (readBytes != cp.length)
2251 : {
2252 0 : if (readBytes < 0)
2253 0 : ereport(PANIC,
2254 : (errcode_for_file_access(),
2255 : errmsg("could not read file \"%s\": %m", path)));
2256 : else
2257 0 : ereport(PANIC,
2258 : (errcode(ERRCODE_DATA_CORRUPTED),
2259 : errmsg("could not read file \"%s\": read %d of %zu",
2260 : path, readBytes, (Size) cp.length)));
2261 : }
2262 :
2263 102 : if (CloseTransientFile(fd) != 0)
2264 0 : ereport(PANIC,
2265 : (errcode_for_file_access(),
2266 : errmsg("could not close file \"%s\": %m", path)));
2267 :
2268 : /* now verify the CRC */
2269 102 : INIT_CRC32C(checksum);
2270 102 : COMP_CRC32C(checksum,
2271 : (char *) &cp + ReplicationSlotOnDiskNotChecksummedSize,
2272 : ReplicationSlotOnDiskChecksummedSize);
2273 102 : FIN_CRC32C(checksum);
2274 :
2275 102 : if (!EQ_CRC32C(checksum, cp.checksum))
2276 0 : ereport(PANIC,
2277 : (errmsg("checksum mismatch for replication slot file \"%s\": is %u, should be %u",
2278 : path, checksum, cp.checksum)));
2279 :
2280 : /*
2281 : * If we crashed with an ephemeral slot active, don't restore but delete
2282 : * it.
2283 : */
2284 102 : if (cp.slotdata.persistency != RS_PERSISTENT)
2285 : {
2286 0 : if (!rmtree(slotdir, true))
2287 : {
2288 0 : ereport(WARNING,
2289 : (errmsg("could not remove directory \"%s\"",
2290 : slotdir)));
2291 : }
2292 0 : fsync_fname("pg_replslot", true);
2293 0 : return;
2294 : }
2295 :
2296 : /*
2297 : * Verify that requirements for the specific slot type are met. That's
2298 : * important because if these aren't met we're not guaranteed to retain
2299 : * all the necessary resources for the slot.
2300 : *
2301 : * NB: We have to do so *after* the above checks for ephemeral slots,
2302 : * because otherwise a slot that shouldn't exist anymore could prevent
2303 : * restarts.
2304 : *
2305 : * NB: Changing the requirements here also requires adapting
2306 : * CheckSlotRequirements() and CheckLogicalDecodingRequirements().
2307 : */
2308 102 : if (cp.slotdata.database != InvalidOid && wal_level < WAL_LEVEL_LOGICAL)
2309 0 : ereport(FATAL,
2310 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2311 : errmsg("logical replication slot \"%s\" exists, but wal_level < logical",
2312 : NameStr(cp.slotdata.name)),
2313 : errhint("Change wal_level to be logical or higher.")));
2314 102 : else if (wal_level < WAL_LEVEL_REPLICA)
2315 0 : ereport(FATAL,
2316 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2317 : errmsg("physical replication slot \"%s\" exists, but wal_level < replica",
2318 : NameStr(cp.slotdata.name)),
2319 : errhint("Change wal_level to be replica or higher.")));
2320 :
2321 : /* nothing can be active yet, don't lock anything */
2322 140 : for (i = 0; i < max_replication_slots; i++)
2323 : {
2324 : ReplicationSlot *slot;
2325 :
2326 140 : slot = &ReplicationSlotCtl->replication_slots[i];
2327 :
2328 140 : if (slot->in_use)
2329 38 : continue;
2330 :
2331 : /* restore the entire set of persistent data */
2332 102 : memcpy(&slot->data, &cp.slotdata,
2333 : sizeof(ReplicationSlotPersistentData));
2334 :
2335 : /* initialize in memory state */
2336 102 : slot->effective_xmin = cp.slotdata.xmin;
2337 102 : slot->effective_catalog_xmin = cp.slotdata.catalog_xmin;
2338 102 : slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
2339 :
2340 102 : slot->candidate_catalog_xmin = InvalidTransactionId;
2341 102 : slot->candidate_xmin_lsn = InvalidXLogRecPtr;
2342 102 : slot->candidate_restart_lsn = InvalidXLogRecPtr;
2343 102 : slot->candidate_restart_valid = InvalidXLogRecPtr;
2344 :
2345 102 : slot->in_use = true;
2346 102 : slot->active_pid = 0;
2347 :
2348 102 : restored = true;
2349 102 : break;
2350 : }
2351 :
2352 102 : if (!restored)
2353 0 : ereport(FATAL,
2354 : (errmsg("too many replication slots active before shutdown"),
2355 : errhint("Increase max_replication_slots and try again.")));
2356 : }
2357 :
2358 : /*
2359 : * Maps a conflict reason for a replication slot to
2360 : * ReplicationSlotInvalidationCause.
2361 : */
2362 : ReplicationSlotInvalidationCause
2363 0 : GetSlotInvalidationCause(const char *conflict_reason)
2364 : {
2365 : ReplicationSlotInvalidationCause cause;
2366 0 : ReplicationSlotInvalidationCause result = RS_INVAL_NONE;
2367 0 : bool found PG_USED_FOR_ASSERTS_ONLY = false;
2368 :
2369 : Assert(conflict_reason);
2370 :
2371 0 : for (cause = RS_INVAL_NONE; cause <= RS_INVAL_MAX_CAUSES; cause++)
2372 : {
2373 0 : if (strcmp(SlotInvalidationCauses[cause], conflict_reason) == 0)
2374 : {
2375 0 : found = true;
2376 0 : result = cause;
2377 0 : break;
2378 : }
2379 : }
2380 :
2381 : Assert(found);
2382 0 : return result;
2383 : }
2384 :
2385 : /*
2386 : * A helper function to validate slots specified in GUC standby_slot_names.
2387 : *
2388 : * The rawname will be parsed, and the result will be saved into *elemlist.
2389 : */
2390 : static bool
2391 12 : validate_standby_slots(char *rawname, List **elemlist)
2392 : {
2393 : bool ok;
2394 :
2395 : /* Verify syntax and parse string into a list of identifiers */
2396 12 : ok = SplitIdentifierString(rawname, ',', elemlist);
2397 :
2398 12 : if (!ok)
2399 : {
2400 0 : GUC_check_errdetail("List syntax is invalid.");
2401 : }
2402 12 : else if (!ReplicationSlotCtl)
2403 : {
2404 : /*
2405 : * We cannot validate the replication slot if the replication slots'
2406 : * data has not been initialized. This is ok as we will anyway
2407 : * validate the specified slot when waiting for them to catch up. See
2408 : * StandbySlotsHaveCaughtup() for details.
2409 : */
2410 : }
2411 : else
2412 : {
2413 : /* Check that the specified slots exist and are logical slots */
2414 12 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
2415 :
2416 36 : foreach_ptr(char, name, *elemlist)
2417 : {
2418 : ReplicationSlot *slot;
2419 :
2420 12 : slot = SearchNamedReplicationSlot(name, false);
2421 :
2422 12 : if (!slot)
2423 : {
2424 0 : GUC_check_errdetail("replication slot \"%s\" does not exist",
2425 : name);
2426 0 : ok = false;
2427 0 : break;
2428 : }
2429 :
2430 12 : if (!SlotIsPhysical(slot))
2431 : {
2432 0 : GUC_check_errdetail("\"%s\" is not a physical replication slot",
2433 : name);
2434 0 : ok = false;
2435 0 : break;
2436 : }
2437 : }
2438 :
2439 12 : LWLockRelease(ReplicationSlotControlLock);
2440 : }
2441 :
2442 12 : return ok;
2443 : }
2444 :
2445 : /*
2446 : * GUC check_hook for standby_slot_names
2447 : */
2448 : bool
2449 1788 : check_standby_slot_names(char **newval, void **extra, GucSource source)
2450 : {
2451 : char *rawname;
2452 : char *ptr;
2453 : List *elemlist;
2454 : int size;
2455 : bool ok;
2456 : StandbySlotNamesConfigData *config;
2457 :
2458 1788 : if ((*newval)[0] == '\0')
2459 1776 : return true;
2460 :
2461 : /* Need a modifiable copy of the GUC string */
2462 12 : rawname = pstrdup(*newval);
2463 :
2464 : /* Now verify if the specified slots exist and have correct type */
2465 12 : ok = validate_standby_slots(rawname, &elemlist);
2466 :
2467 12 : if (!ok || elemlist == NIL)
2468 : {
2469 0 : pfree(rawname);
2470 0 : list_free(elemlist);
2471 0 : return ok;
2472 : }
2473 :
2474 : /* Compute the size required for the StandbySlotNamesConfigData struct */
2475 12 : size = offsetof(StandbySlotNamesConfigData, slot_names);
2476 36 : foreach_ptr(char, slot_name, elemlist)
2477 12 : size += strlen(slot_name) + 1;
2478 :
2479 : /* GUC extra value must be guc_malloc'd, not palloc'd */
2480 12 : config = (StandbySlotNamesConfigData *) guc_malloc(LOG, size);
2481 :
2482 : /* Transform the data into StandbySlotNamesConfigData */
2483 12 : config->nslotnames = list_length(elemlist);
2484 :
2485 12 : ptr = config->slot_names;
2486 36 : foreach_ptr(char, slot_name, elemlist)
2487 : {
2488 12 : strcpy(ptr, slot_name);
2489 12 : ptr += strlen(slot_name) + 1;
2490 : }
2491 :
2492 12 : *extra = (void *) config;
2493 :
2494 12 : pfree(rawname);
2495 12 : list_free(elemlist);
2496 12 : return true;
2497 : }
2498 :
2499 : /*
2500 : * GUC assign_hook for standby_slot_names
2501 : */
2502 : void
2503 1788 : assign_standby_slot_names(const char *newval, void *extra)
2504 : {
2505 : /*
2506 : * The standby slots may have changed, so we must recompute the oldest
2507 : * LSN.
2508 : */
2509 1788 : ss_oldest_flush_lsn = InvalidXLogRecPtr;
2510 :
2511 1788 : standby_slot_names_config = (StandbySlotNamesConfigData *) extra;
2512 1788 : }
2513 :
2514 : /*
2515 : * Check if the passed slot_name is specified in the standby_slot_names GUC.
2516 : */
2517 : bool
2518 36072 : SlotExistsInStandbySlotNames(const char *slot_name)
2519 : {
2520 : const char *standby_slot_name;
2521 :
2522 : /* Return false if there is no value in standby_slot_names */
2523 36072 : if (standby_slot_names_config == NULL)
2524 36062 : return false;
2525 :
2526 : /*
2527 : * XXX: We are not expecting this list to be long so a linear search
2528 : * shouldn't hurt but if that turns out not to be true then we can cache
2529 : * this information for each WalSender as well.
2530 : */
2531 10 : standby_slot_name = standby_slot_names_config->slot_names;
2532 10 : for (int i = 0; i < standby_slot_names_config->nslotnames; i++)
2533 : {
2534 10 : if (strcmp(standby_slot_name, slot_name) == 0)
2535 10 : return true;
2536 :
2537 0 : standby_slot_name += strlen(standby_slot_name) + 1;
2538 : }
2539 :
2540 0 : return false;
2541 : }
2542 :
2543 : /*
2544 : * Return true if the slots specified in standby_slot_names have caught up to
2545 : * the given WAL location, false otherwise.
2546 : *
2547 : * The elevel parameter specifies the error level used for logging messages
2548 : * related to slots that do not exist, are invalidated, or are inactive.
2549 : */
2550 : bool
2551 1204 : StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
2552 : {
2553 : const char *name;
2554 1204 : int caught_up_slot_num = 0;
2555 1204 : XLogRecPtr min_restart_lsn = InvalidXLogRecPtr;
2556 :
2557 : /*
2558 : * Don't need to wait for the standbys to catch up if there is no value in
2559 : * standby_slot_names.
2560 : */
2561 1204 : if (standby_slot_names_config == NULL)
2562 1178 : return true;
2563 :
2564 : /*
2565 : * Don't need to wait for the standbys to catch up if we are on a standby
2566 : * server, since we do not support syncing slots to cascading standbys.
2567 : */
2568 26 : if (RecoveryInProgress())
2569 0 : return true;
2570 :
2571 : /*
2572 : * Don't need to wait for the standbys to catch up if they are already
2573 : * beyond the specified WAL location.
2574 : */
2575 26 : if (!XLogRecPtrIsInvalid(ss_oldest_flush_lsn) &&
2576 18 : ss_oldest_flush_lsn >= wait_for_lsn)
2577 10 : return true;
2578 :
2579 : /*
2580 : * To prevent concurrent slot dropping and creation while filtering the
2581 : * slots, take the ReplicationSlotControlLock outside of the loop.
2582 : */
2583 16 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
2584 :
2585 16 : name = standby_slot_names_config->slot_names;
2586 22 : for (int i = 0; i < standby_slot_names_config->nslotnames; i++)
2587 : {
2588 : XLogRecPtr restart_lsn;
2589 : bool invalidated;
2590 : bool inactive;
2591 : ReplicationSlot *slot;
2592 :
2593 16 : slot = SearchNamedReplicationSlot(name, false);
2594 :
2595 16 : if (!slot)
2596 : {
2597 : /*
2598 : * If a slot name provided in standby_slot_names does not exist,
2599 : * report a message and exit the loop. A user can specify a slot
2600 : * name that does not exist just before the server startup. The
2601 : * GUC check_hook(validate_standby_slots) cannot validate such a
2602 : * slot during startup as the ReplicationSlotCtl shared memory is
2603 : * not initialized at that time. It is also possible for a user to
2604 : * drop the slot in standby_slot_names afterwards.
2605 : */
2606 0 : ereport(elevel,
2607 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2608 : errmsg("replication slot \"%s\" specified in parameter %s does not exist",
2609 : name, "standby_slot_names"),
2610 : errdetail("Logical replication is waiting on the standby associated with \"%s\".",
2611 : name),
2612 : errhint("Consider creating the slot \"%s\" or amend parameter %s.",
2613 : name, "standby_slot_names"));
2614 0 : break;
2615 : }
2616 :
2617 16 : if (SlotIsLogical(slot))
2618 : {
2619 : /*
2620 : * If a logical slot name is provided in standby_slot_names,
2621 : * report a message and exit the loop. Similar to the non-existent
2622 : * case, a user can specify a logical slot name in
2623 : * standby_slot_names before the server startup, or drop an
2624 : * existing physical slot and recreate a logical slot with the
2625 : * same name.
2626 : */
2627 0 : ereport(elevel,
2628 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2629 : errmsg("cannot have logical replication slot \"%s\" in parameter %s",
2630 : name, "standby_slot_names"),
2631 : errdetail("Logical replication is waiting for correction on \"%s\".",
2632 : name),
2633 : errhint("Consider removing logical slot \"%s\" from parameter %s.",
2634 : name, "standby_slot_names"));
2635 0 : break;
2636 : }
2637 :
2638 16 : SpinLockAcquire(&slot->mutex);
2639 16 : restart_lsn = slot->data.restart_lsn;
2640 16 : invalidated = slot->data.invalidated != RS_INVAL_NONE;
2641 16 : inactive = slot->active_pid == 0;
2642 16 : SpinLockRelease(&slot->mutex);
2643 :
2644 16 : if (invalidated)
2645 : {
2646 : /* Specified physical slot has been invalidated */
2647 0 : ereport(elevel,
2648 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2649 : errmsg("physical slot \"%s\" specified in parameter %s has been invalidated",
2650 : name, "standby_slot_names"),
2651 : errdetail("Logical replication is waiting on the standby associated with \"%s\".",
2652 : name),
2653 : errhint("Consider dropping and recreating the slot \"%s\" or amend parameter %s.",
2654 : name, "standby_slot_names"));
2655 0 : break;
2656 : }
2657 :
2658 16 : if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn < wait_for_lsn)
2659 : {
2660 : /* Log a message if no active_pid for this physical slot */
2661 10 : if (inactive)
2662 8 : ereport(elevel,
2663 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2664 : errmsg("replication slot \"%s\" specified in parameter %s does not have active_pid",
2665 : name, "standby_slot_names"),
2666 : errdetail("Logical replication is waiting on the standby associated with \"%s\".",
2667 : name),
2668 : errhint("Consider starting standby associated with \"%s\" or amend parameter %s.",
2669 : name, "standby_slot_names"));
2670 :
2671 : /* Continue if the current slot hasn't caught up. */
2672 10 : break;
2673 : }
2674 :
2675 : Assert(restart_lsn >= wait_for_lsn);
2676 :
2677 6 : if (XLogRecPtrIsInvalid(min_restart_lsn) ||
2678 : min_restart_lsn > restart_lsn)
2679 6 : min_restart_lsn = restart_lsn;
2680 :
2681 6 : caught_up_slot_num++;
2682 :
2683 6 : name += strlen(name) + 1;
2684 : }
2685 :
2686 16 : LWLockRelease(ReplicationSlotControlLock);
2687 :
2688 : /*
2689 : * Return false if not all the standbys have caught up to the specified
2690 : * WAL location.
2691 : */
2692 16 : if (caught_up_slot_num != standby_slot_names_config->nslotnames)
2693 10 : return false;
2694 :
2695 : /* The ss_oldest_flush_lsn must not retreat. */
2696 : Assert(XLogRecPtrIsInvalid(ss_oldest_flush_lsn) ||
2697 : min_restart_lsn >= ss_oldest_flush_lsn);
2698 :
2699 6 : ss_oldest_flush_lsn = min_restart_lsn;
2700 :
2701 6 : return true;
2702 : }
2703 :
2704 : /*
2705 : * Wait for physical standbys to confirm receiving the given lsn.
2706 : *
2707 : * Used by logical decoding SQL functions. It waits for physical standbys
2708 : * corresponding to the physical slots specified in the standby_slot_names GUC.
2709 : */
2710 : void
2711 380 : WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
2712 : {
2713 : /*
2714 : * Don't need to wait for the standby to catch up if the current acquired
2715 : * slot is not a logical failover slot, or there is no value in
2716 : * standby_slot_names.
2717 : */
2718 380 : if (!MyReplicationSlot->data.failover || !standby_slot_names_config)
2719 378 : return;
2720 :
2721 2 : ConditionVariablePrepareToSleep(&WalSndCtl->wal_confirm_rcv_cv);
2722 :
2723 : for (;;)
2724 : {
2725 4 : CHECK_FOR_INTERRUPTS();
2726 :
2727 4 : if (ConfigReloadPending)
2728 : {
2729 2 : ConfigReloadPending = false;
2730 2 : ProcessConfigFile(PGC_SIGHUP);
2731 : }
2732 :
2733 : /* Exit if done waiting for every slot. */
2734 4 : if (StandbySlotsHaveCaughtup(wait_for_lsn, WARNING))
2735 2 : break;
2736 :
2737 : /*
2738 : * Wait for the slots in the standby_slot_names to catch up, but use a
2739 : * timeout (1s) so we can also check if the standby_slot_names has
2740 : * been changed.
2741 : */
2742 2 : ConditionVariableTimedSleep(&WalSndCtl->wal_confirm_rcv_cv, 1000,
2743 : WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION);
2744 : }
2745 :
2746 2 : ConditionVariableCancelSleep();
2747 : }
|