Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * slot.c
4 : * Replication slot management.
5 : *
6 : *
7 : * Copyright (c) 2012-2024, PostgreSQL Global Development Group
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/backend/replication/slot.c
12 : *
13 : * NOTES
14 : *
15 : * Replication slots are used to keep state about replication streams
16 : * originating from this cluster. Their primary purpose is to prevent the
17 : * premature removal of WAL or of old tuple versions in a manner that would
18 : * interfere with replication; they are also useful for monitoring purposes.
19 : * Slots need to be permanent (to allow restarts), crash-safe, and allocatable
20 : * on standbys (to support cascading setups). The requirement that slots be
21 : * usable on standbys precludes storing them in the system catalogs.
22 : *
23 : * Each replication slot gets its own directory inside the $PGDATA/pg_replslot
24 : * directory. Inside that directory the state file will contain the slot's
25 : * own data. Additional data can be stored alongside that file if required.
26 : * While the server is running, the state data is also cached in memory for
27 : * efficiency.
28 : *
29 : * ReplicationSlotAllocationLock must be taken in exclusive mode to allocate
30 : * or free a slot. ReplicationSlotControlLock must be taken in shared mode
31 : * to iterate over the slots, and in exclusive mode to change the in_use flag
32 : * of a slot. The remaining data in each slot is protected by its mutex.
33 : *
34 : *-------------------------------------------------------------------------
35 : */
36 :
37 : #include "postgres.h"
38 :
39 : #include <unistd.h>
40 : #include <sys/stat.h>
41 :
42 : #include "access/transam.h"
43 : #include "access/xlog_internal.h"
44 : #include "access/xlogrecovery.h"
45 : #include "common/file_utils.h"
46 : #include "common/string.h"
47 : #include "miscadmin.h"
48 : #include "pgstat.h"
49 : #include "postmaster/interrupt.h"
50 : #include "replication/slotsync.h"
51 : #include "replication/slot.h"
52 : #include "replication/walsender_private.h"
53 : #include "storage/fd.h"
54 : #include "storage/ipc.h"
55 : #include "storage/proc.h"
56 : #include "storage/procarray.h"
57 : #include "utils/builtins.h"
58 : #include "utils/guc_hooks.h"
59 : #include "utils/varlena.h"
60 :
61 : /*
62 : * Replication slot on-disk data structure.
63 : */
64 : typedef struct ReplicationSlotOnDisk
65 : {
66 : /* first part of this struct needs to be version independent */
67 :
68 : /* data not covered by checksum */
69 : uint32 magic;
70 : pg_crc32c checksum;
71 :
72 : /* data covered by checksum */
73 : uint32 version;
74 : uint32 length;
75 :
76 : /*
77 : * The actual data in the slot that follows can differ based on the above
78 : * 'version'.
79 : */
80 :
81 : ReplicationSlotPersistentData slotdata;
82 : } ReplicationSlotOnDisk;
83 :
84 : /*
85 : * Struct for the configuration of standby_slot_names.
86 : *
87 : * Note: this must be a flat representation that can be held in a single chunk
88 : * of guc_malloc'd memory, so that it can be stored as the "extra" data for the
89 : * standby_slot_names GUC.
90 : */
91 : typedef struct
92 : {
93 : /* Number of slot names in the slot_names[] */
94 : int nslotnames;
95 :
96 : /*
97 : * slot_names contains 'nslotnames' consecutive null-terminated C strings.
98 : */
99 : char slot_names[FLEXIBLE_ARRAY_MEMBER];
100 : } StandbySlotNamesConfigData;
101 :
102 : /*
103 : * Lookup table for slot invalidation causes.
104 : */
105 : const char *const SlotInvalidationCauses[] = {
106 : [RS_INVAL_NONE] = "none",
107 : [RS_INVAL_WAL_REMOVED] = "wal_removed",
108 : [RS_INVAL_HORIZON] = "rows_removed",
109 : [RS_INVAL_WAL_LEVEL] = "wal_level_insufficient",
110 : };
111 :
112 : /* Maximum number of invalidation causes */
113 : #define RS_INVAL_MAX_CAUSES RS_INVAL_WAL_LEVEL
114 :
115 : StaticAssertDecl(lengthof(SlotInvalidationCauses) == (RS_INVAL_MAX_CAUSES + 1),
116 : "array length mismatch");
117 :
118 : /* size of version independent data */
119 : #define ReplicationSlotOnDiskConstantSize \
120 : offsetof(ReplicationSlotOnDisk, slotdata)
121 : /* size of the part of the slot not covered by the checksum */
122 : #define ReplicationSlotOnDiskNotChecksummedSize \
123 : offsetof(ReplicationSlotOnDisk, version)
124 : /* size of the part covered by the checksum */
125 : #define ReplicationSlotOnDiskChecksummedSize \
126 : sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskNotChecksummedSize
127 : /* size of the slot data that is version dependent */
128 : #define ReplicationSlotOnDiskV2Size \
129 : sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
130 :
131 : #define SLOT_MAGIC 0x1051CA1 /* format identifier */
132 : #define SLOT_VERSION 5 /* version for new files */
133 :
134 : /* Control array for replication slot management */
135 : ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
136 :
137 : /* My backend's replication slot in the shared memory array */
138 : ReplicationSlot *MyReplicationSlot = NULL;
139 :
140 : /* GUC variables */
141 : int max_replication_slots = 10; /* the maximum number of replication
142 : * slots */
143 :
144 : /*
145 : * This GUC lists streaming replication standby server slot names that
146 : * logical WAL sender processes will wait for.
147 : */
148 : char *standby_slot_names;
149 :
150 : /* This is the parsed and cached configuration for standby_slot_names */
151 : static StandbySlotNamesConfigData *standby_slot_names_config;
152 :
153 : /*
154 : * Oldest LSN that has been confirmed to be flushed to the standbys
155 : * corresponding to the physical slots specified in the standby_slot_names GUC.
156 : */
157 : static XLogRecPtr ss_oldest_flush_lsn = InvalidXLogRecPtr;
158 :
159 : static void ReplicationSlotShmemExit(int code, Datum arg);
160 : static void ReplicationSlotDropPtr(ReplicationSlot *slot);
161 :
162 : /* internal persistency functions */
163 : static void RestoreSlotFromDisk(const char *name);
164 : static void CreateSlotOnDisk(ReplicationSlot *slot);
165 : static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel);
166 :
167 : /*
168 : * Report shared-memory space needed by ReplicationSlotsShmemInit.
169 : */
170 : Size
171 6830 : ReplicationSlotsShmemSize(void)
172 : {
173 6830 : Size size = 0;
174 :
175 6830 : if (max_replication_slots == 0)
176 4 : return size;
177 :
178 6826 : size = offsetof(ReplicationSlotCtlData, replication_slots);
179 6826 : size = add_size(size,
180 : mul_size(max_replication_slots, sizeof(ReplicationSlot)));
181 :
182 6826 : return size;
183 : }
184 :
185 : /*
186 : * Allocate and initialize shared memory for replication slots.
187 : */
188 : void
189 1768 : ReplicationSlotsShmemInit(void)
190 : {
191 : bool found;
192 :
193 1768 : if (max_replication_slots == 0)
194 2 : return;
195 :
196 1766 : ReplicationSlotCtl = (ReplicationSlotCtlData *)
197 1766 : ShmemInitStruct("ReplicationSlot Ctl", ReplicationSlotsShmemSize(),
198 : &found);
199 :
200 1766 : if (!found)
201 : {
202 : int i;
203 :
204 : /* First time through, so initialize */
205 3446 : MemSet(ReplicationSlotCtl, 0, ReplicationSlotsShmemSize());
206 :
207 19050 : for (i = 0; i < max_replication_slots; i++)
208 : {
209 17284 : ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i];
210 :
211 : /* everything else is zeroed by the memset above */
212 17284 : SpinLockInit(&slot->mutex);
213 17284 : LWLockInitialize(&slot->io_in_progress_lock,
214 : LWTRANCHE_REPLICATION_SLOT_IO);
215 17284 : ConditionVariableInit(&slot->active_cv);
216 : }
217 : }
218 : }
219 :
220 : /*
221 : * Register the callback for replication slot cleanup and releasing.
222 : */
223 : void
224 29854 : ReplicationSlotInitialize(void)
225 : {
226 29854 : before_shmem_exit(ReplicationSlotShmemExit, 0);
227 29854 : }
228 :
229 : /*
230 : * Release and cleanup replication slots.
231 : */
232 : static void
233 29854 : ReplicationSlotShmemExit(int code, Datum arg)
234 : {
235 : /* Make sure active replication slots are released */
236 29854 : if (MyReplicationSlot != NULL)
237 386 : ReplicationSlotRelease();
238 :
239 : /* Also cleanup all the temporary slots. */
240 29854 : ReplicationSlotCleanup(false);
241 29854 : }
242 :
243 : /*
244 : * Check whether the passed slot name is valid and report errors at elevel.
245 : *
246 : * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow
247 : * the name to be used as a directory name on every supported OS.
248 : *
249 : * Returns whether the directory name is valid or not if elevel < ERROR.
250 : */
251 : bool
252 1560 : ReplicationSlotValidateName(const char *name, int elevel)
253 : {
254 : const char *cp;
255 :
256 1560 : if (strlen(name) == 0)
257 : {
258 6 : ereport(elevel,
259 : (errcode(ERRCODE_INVALID_NAME),
260 : errmsg("replication slot name \"%s\" is too short",
261 : name)));
262 0 : return false;
263 : }
264 :
265 1554 : if (strlen(name) >= NAMEDATALEN)
266 : {
267 0 : ereport(elevel,
268 : (errcode(ERRCODE_NAME_TOO_LONG),
269 : errmsg("replication slot name \"%s\" is too long",
270 : name)));
271 0 : return false;
272 : }
273 :
274 31314 : for (cp = name; *cp; cp++)
275 : {
276 29762 : if (!((*cp >= 'a' && *cp <= 'z')
277 14810 : || (*cp >= '0' && *cp <= '9')
278 2876 : || (*cp == '_')))
279 : {
280 2 : ereport(elevel,
281 : (errcode(ERRCODE_INVALID_NAME),
282 : errmsg("replication slot name \"%s\" contains invalid character",
283 : name),
284 : errhint("Replication slot names may only contain lower case letters, numbers, and the underscore character.")));
285 0 : return false;
286 : }
287 : }
288 1552 : return true;
289 : }
290 :
291 : /*
292 : * Create a new replication slot and mark it as used by this backend.
293 : *
294 : * name: Name of the slot
295 : * db_specific: logical decoding is db specific; if the slot is going to
296 : * be used for that pass true, otherwise false.
297 : * two_phase: Allows decoding of prepared transactions. We allow this option
298 : * to be enabled only at the slot creation time. If we allow this option
299 : * to be changed during decoding then it is quite possible that we skip
300 : * prepare first time because this option was not enabled. Now next time
301 : * during getting changes, if the two_phase option is enabled it can skip
302 : * prepare because by that time start decoding point has been moved. So the
303 : * user will only get commit prepared.
304 : * failover: If enabled, allows the slot to be synced to standbys so
305 : * that logical replication can be resumed after failover.
306 : * synced: True if the slot is synchronized from the primary server.
307 : */
308 : void
309 1128 : ReplicationSlotCreate(const char *name, bool db_specific,
310 : ReplicationSlotPersistency persistency,
311 : bool two_phase, bool failover, bool synced)
312 : {
313 1128 : ReplicationSlot *slot = NULL;
314 : int i;
315 :
316 : Assert(MyReplicationSlot == NULL);
317 :
318 1128 : ReplicationSlotValidateName(name, ERROR);
319 :
320 1126 : if (failover)
321 : {
322 : /*
323 : * Do not allow users to create the failover enabled slots on the
324 : * standby as we do not support sync to the cascading standby.
325 : *
326 : * However, failover enabled slots can be created during slot
327 : * synchronization because we need to retain the same values as the
328 : * remote slot.
329 : */
330 38 : if (RecoveryInProgress() && !IsSyncingReplicationSlots())
331 0 : ereport(ERROR,
332 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
333 : errmsg("cannot enable failover for a replication slot created on the standby"));
334 :
335 : /*
336 : * Do not allow users to create failover enabled temporary slots,
337 : * because temporary slots will not be synced to the standby.
338 : *
339 : * However, failover enabled temporary slots can be created during
340 : * slot synchronization. See the comments atop slotsync.c for details.
341 : */
342 38 : if (persistency == RS_TEMPORARY && !IsSyncingReplicationSlots())
343 2 : ereport(ERROR,
344 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
345 : errmsg("cannot enable failover for a temporary replication slot"));
346 : }
347 :
348 : /*
349 : * If some other backend ran this code concurrently with us, we'd likely
350 : * both allocate the same slot, and that would be bad. We'd also be at
351 : * risk of missing a name collision. Also, we don't want to try to create
352 : * a new slot while somebody's busy cleaning up an old one, because we
353 : * might both be monkeying with the same directory.
354 : */
355 1124 : LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
356 :
357 : /*
358 : * Check for name collision, and identify an allocatable slot. We need to
359 : * hold ReplicationSlotControlLock in shared mode for this, so that nobody
360 : * else can change the in_use flags while we're looking at them.
361 : */
362 1124 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
363 10684 : for (i = 0; i < max_replication_slots; i++)
364 : {
365 9566 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
366 :
367 9566 : if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
368 6 : ereport(ERROR,
369 : (errcode(ERRCODE_DUPLICATE_OBJECT),
370 : errmsg("replication slot \"%s\" already exists", name)));
371 9560 : if (!s->in_use && slot == NULL)
372 1116 : slot = s;
373 : }
374 1118 : LWLockRelease(ReplicationSlotControlLock);
375 :
376 : /* If all slots are in use, we're out of luck. */
377 1118 : if (slot == NULL)
378 2 : ereport(ERROR,
379 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
380 : errmsg("all replication slots are in use"),
381 : errhint("Free one or increase max_replication_slots.")));
382 :
383 : /*
384 : * Since this slot is not in use, nobody should be looking at any part of
385 : * it other than the in_use field unless they're trying to allocate it.
386 : * And since we hold ReplicationSlotAllocationLock, nobody except us can
387 : * be doing that. So it's safe to initialize the slot.
388 : */
389 : Assert(!slot->in_use);
390 : Assert(slot->active_pid == 0);
391 :
392 : /* first initialize persistent data */
393 1116 : memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData));
394 1116 : namestrcpy(&slot->data.name, name);
395 1116 : slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
396 1116 : slot->data.persistency = persistency;
397 1116 : slot->data.two_phase = two_phase;
398 1116 : slot->data.two_phase_at = InvalidXLogRecPtr;
399 1116 : slot->data.failover = failover;
400 1116 : slot->data.synced = synced;
401 :
402 : /* and then data only present in shared memory */
403 1116 : slot->just_dirtied = false;
404 1116 : slot->dirty = false;
405 1116 : slot->effective_xmin = InvalidTransactionId;
406 1116 : slot->effective_catalog_xmin = InvalidTransactionId;
407 1116 : slot->candidate_catalog_xmin = InvalidTransactionId;
408 1116 : slot->candidate_xmin_lsn = InvalidXLogRecPtr;
409 1116 : slot->candidate_restart_valid = InvalidXLogRecPtr;
410 1116 : slot->candidate_restart_lsn = InvalidXLogRecPtr;
411 1116 : slot->last_saved_confirmed_flush = InvalidXLogRecPtr;
412 1116 : slot->inactive_since = 0;
413 :
414 : /*
415 : * Create the slot on disk. We haven't actually marked the slot allocated
416 : * yet, so no special cleanup is required if this errors out.
417 : */
418 1116 : CreateSlotOnDisk(slot);
419 :
420 : /*
421 : * We need to briefly prevent any other backend from iterating over the
422 : * slots while we flip the in_use flag. We also need to set the active
423 : * flag while holding the ControlLock as otherwise a concurrent
424 : * ReplicationSlotAcquire() could acquire the slot as well.
425 : */
426 1116 : LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
427 :
428 1116 : slot->in_use = true;
429 :
430 : /* We can now mark the slot active, and that makes it our slot. */
431 1116 : SpinLockAcquire(&slot->mutex);
432 : Assert(slot->active_pid == 0);
433 1116 : slot->active_pid = MyProcPid;
434 1116 : SpinLockRelease(&slot->mutex);
435 1116 : MyReplicationSlot = slot;
436 :
437 1116 : LWLockRelease(ReplicationSlotControlLock);
438 :
439 : /*
440 : * Create statistics entry for the new logical slot. We don't collect any
441 : * stats for physical slots, so no need to create an entry for the same.
442 : * See ReplicationSlotDropPtr for why we need to do this before releasing
443 : * ReplicationSlotAllocationLock.
444 : */
445 1116 : if (SlotIsLogical(slot))
446 808 : pgstat_create_replslot(slot);
447 :
448 : /*
449 : * Now that the slot has been marked as in_use and active, it's safe to
450 : * let somebody else try to allocate a slot.
451 : */
452 1116 : LWLockRelease(ReplicationSlotAllocationLock);
453 :
454 : /* Let everybody know we've modified this slot */
455 1116 : ConditionVariableBroadcast(&slot->active_cv);
456 1116 : }
457 :
458 : /*
459 : * Search for the named replication slot.
460 : *
461 : * Return the replication slot if found, otherwise NULL.
462 : */
463 : ReplicationSlot *
464 2424 : SearchNamedReplicationSlot(const char *name, bool need_lock)
465 : {
466 : int i;
467 2424 : ReplicationSlot *slot = NULL;
468 :
469 2424 : if (need_lock)
470 136 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
471 :
472 4136 : for (i = 0; i < max_replication_slots; i++)
473 : {
474 4098 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
475 :
476 4098 : if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
477 : {
478 2386 : slot = s;
479 2386 : break;
480 : }
481 : }
482 :
483 2424 : if (need_lock)
484 136 : LWLockRelease(ReplicationSlotControlLock);
485 :
486 2424 : return slot;
487 : }
488 :
489 : /*
490 : * Return the index of the replication slot in
491 : * ReplicationSlotCtl->replication_slots.
492 : *
493 : * This is mainly useful to have an efficient key for storing replication slot
494 : * stats.
495 : */
496 : int
497 14404 : ReplicationSlotIndex(ReplicationSlot *slot)
498 : {
499 : Assert(slot >= ReplicationSlotCtl->replication_slots &&
500 : slot < ReplicationSlotCtl->replication_slots + max_replication_slots);
501 :
502 14404 : return slot - ReplicationSlotCtl->replication_slots;
503 : }
504 :
505 : /*
506 : * If the slot at 'index' is unused, return false. Otherwise 'name' is set to
507 : * the slot's name and true is returned.
508 : *
509 : * This likely is only useful for pgstat_replslot.c during shutdown, in other
510 : * cases there are obvious TOCTOU issues.
511 : */
512 : bool
513 134 : ReplicationSlotName(int index, Name name)
514 : {
515 : ReplicationSlot *slot;
516 : bool found;
517 :
518 134 : slot = &ReplicationSlotCtl->replication_slots[index];
519 :
520 : /*
521 : * Ensure that the slot cannot be dropped while we copy the name. Don't
522 : * need the spinlock as the name of an existing slot cannot change.
523 : */
524 134 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
525 134 : found = slot->in_use;
526 134 : if (slot->in_use)
527 134 : namestrcpy(name, NameStr(slot->data.name));
528 134 : LWLockRelease(ReplicationSlotControlLock);
529 :
530 134 : return found;
531 : }
532 :
533 : /*
534 : * Find a previously created slot and mark it as used by this process.
535 : *
536 : * An error is raised if nowait is true and the slot is currently in use. If
537 : * nowait is false, we sleep until the slot is released by the owning process.
538 : */
539 : void
540 2146 : ReplicationSlotAcquire(const char *name, bool nowait)
541 : {
542 : ReplicationSlot *s;
543 : int active_pid;
544 :
545 : Assert(name != NULL);
546 :
547 2146 : retry:
548 : Assert(MyReplicationSlot == NULL);
549 :
550 2146 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
551 :
552 : /* Check if the slot exits with the given name. */
553 2146 : s = SearchNamedReplicationSlot(name, false);
554 2146 : if (s == NULL || !s->in_use)
555 : {
556 16 : LWLockRelease(ReplicationSlotControlLock);
557 :
558 16 : ereport(ERROR,
559 : (errcode(ERRCODE_UNDEFINED_OBJECT),
560 : errmsg("replication slot \"%s\" does not exist",
561 : name)));
562 : }
563 :
564 : /*
565 : * This is the slot we want; check if it's active under some other
566 : * process. In single user mode, we don't need this check.
567 : */
568 2130 : if (IsUnderPostmaster)
569 : {
570 : /*
571 : * Get ready to sleep on the slot in case it is active. (We may end
572 : * up not sleeping, but we don't want to do this while holding the
573 : * spinlock.)
574 : */
575 2130 : if (!nowait)
576 450 : ConditionVariablePrepareToSleep(&s->active_cv);
577 :
578 2130 : SpinLockAcquire(&s->mutex);
579 2130 : if (s->active_pid == 0)
580 1888 : s->active_pid = MyProcPid;
581 2130 : active_pid = s->active_pid;
582 2130 : SpinLockRelease(&s->mutex);
583 : }
584 : else
585 0 : active_pid = MyProcPid;
586 2130 : LWLockRelease(ReplicationSlotControlLock);
587 :
588 : /*
589 : * If we found the slot but it's already active in another process, we
590 : * wait until the owning process signals us that it's been released, or
591 : * error out.
592 : */
593 2130 : if (active_pid != MyProcPid)
594 : {
595 0 : if (!nowait)
596 : {
597 : /* Wait here until we get signaled, and then restart */
598 0 : ConditionVariableSleep(&s->active_cv,
599 : WAIT_EVENT_REPLICATION_SLOT_DROP);
600 0 : ConditionVariableCancelSleep();
601 0 : goto retry;
602 : }
603 :
604 0 : ereport(ERROR,
605 : (errcode(ERRCODE_OBJECT_IN_USE),
606 : errmsg("replication slot \"%s\" is active for PID %d",
607 : NameStr(s->data.name), active_pid)));
608 : }
609 2130 : else if (!nowait)
610 450 : ConditionVariableCancelSleep(); /* no sleep needed after all */
611 :
612 : /* Let everybody know we've modified this slot */
613 2130 : ConditionVariableBroadcast(&s->active_cv);
614 :
615 : /* We made this slot active, so it's ours now. */
616 2130 : MyReplicationSlot = s;
617 :
618 : /*
619 : * The call to pgstat_acquire_replslot() protects against stats for a
620 : * different slot, from before a restart or such, being present during
621 : * pgstat_report_replslot().
622 : */
623 2130 : if (SlotIsLogical(s))
624 1776 : pgstat_acquire_replslot(s);
625 :
626 : /*
627 : * Reset the time since the slot has become inactive as the slot is active
628 : * now.
629 : */
630 2130 : SpinLockAcquire(&s->mutex);
631 2130 : s->inactive_since = 0;
632 2130 : SpinLockRelease(&s->mutex);
633 :
634 2130 : if (am_walsender)
635 : {
636 1434 : ereport(log_replication_commands ? LOG : DEBUG1,
637 : SlotIsLogical(s)
638 : ? errmsg("acquired logical replication slot \"%s\"",
639 : NameStr(s->data.name))
640 : : errmsg("acquired physical replication slot \"%s\"",
641 : NameStr(s->data.name)));
642 : }
643 2130 : }
644 :
645 : /*
646 : * Release the replication slot that this backend considers to own.
647 : *
648 : * This or another backend can re-acquire the slot later.
649 : * Resources this slot requires will be preserved.
650 : */
651 : void
652 2592 : ReplicationSlotRelease(void)
653 : {
654 2592 : ReplicationSlot *slot = MyReplicationSlot;
655 2592 : char *slotname = NULL; /* keep compiler quiet */
656 2592 : bool is_logical = false; /* keep compiler quiet */
657 2592 : TimestampTz now = 0;
658 :
659 : Assert(slot != NULL && slot->active_pid != 0);
660 :
661 2592 : if (am_walsender)
662 : {
663 1796 : slotname = pstrdup(NameStr(slot->data.name));
664 1796 : is_logical = SlotIsLogical(slot);
665 : }
666 :
667 2592 : if (slot->data.persistency == RS_EPHEMERAL)
668 : {
669 : /*
670 : * Delete the slot. There is no !PANIC case where this is allowed to
671 : * fail, all that may happen is an incomplete cleanup of the on-disk
672 : * data.
673 : */
674 10 : ReplicationSlotDropAcquired();
675 : }
676 :
677 : /*
678 : * If slot needed to temporarily restrain both data and catalog xmin to
679 : * create the catalog snapshot, remove that temporary constraint.
680 : * Snapshots can only be exported while the initial snapshot is still
681 : * acquired.
682 : */
683 2592 : if (!TransactionIdIsValid(slot->data.xmin) &&
684 2566 : TransactionIdIsValid(slot->effective_xmin))
685 : {
686 342 : SpinLockAcquire(&slot->mutex);
687 342 : slot->effective_xmin = InvalidTransactionId;
688 342 : SpinLockRelease(&slot->mutex);
689 342 : ReplicationSlotsComputeRequiredXmin(false);
690 : }
691 :
692 : /*
693 : * Set the time since the slot has become inactive. We get the current
694 : * time beforehand to avoid system call while holding the spinlock.
695 : */
696 2592 : now = GetCurrentTimestamp();
697 :
698 2592 : if (slot->data.persistency == RS_PERSISTENT)
699 : {
700 : /*
701 : * Mark persistent slot inactive. We're not freeing it, just
702 : * disconnecting, but wake up others that may be waiting for it.
703 : */
704 2094 : SpinLockAcquire(&slot->mutex);
705 2094 : slot->active_pid = 0;
706 2094 : slot->inactive_since = now;
707 2094 : SpinLockRelease(&slot->mutex);
708 2094 : ConditionVariableBroadcast(&slot->active_cv);
709 : }
710 : else
711 : {
712 498 : SpinLockAcquire(&slot->mutex);
713 498 : slot->inactive_since = now;
714 498 : SpinLockRelease(&slot->mutex);
715 : }
716 :
717 2592 : MyReplicationSlot = NULL;
718 :
719 : /* might not have been set when we've been a plain slot */
720 2592 : LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
721 2592 : MyProc->statusFlags &= ~PROC_IN_LOGICAL_DECODING;
722 2592 : ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags;
723 2592 : LWLockRelease(ProcArrayLock);
724 :
725 2592 : if (am_walsender)
726 : {
727 1796 : ereport(log_replication_commands ? LOG : DEBUG1,
728 : is_logical
729 : ? errmsg("released logical replication slot \"%s\"",
730 : slotname)
731 : : errmsg("released physical replication slot \"%s\"",
732 : slotname));
733 :
734 1796 : pfree(slotname);
735 : }
736 2592 : }
737 :
738 : /*
739 : * Cleanup temporary slots created in current session.
740 : *
741 : * Cleanup only synced temporary slots if 'synced_only' is true, else
742 : * cleanup all temporary slots.
743 : */
744 : void
745 70162 : ReplicationSlotCleanup(bool synced_only)
746 : {
747 : int i;
748 :
749 : Assert(MyReplicationSlot == NULL);
750 :
751 70162 : restart:
752 70162 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
753 759586 : for (i = 0; i < max_replication_slots; i++)
754 : {
755 689670 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
756 :
757 689670 : if (!s->in_use)
758 666474 : continue;
759 :
760 23196 : SpinLockAcquire(&s->mutex);
761 23196 : if ((s->active_pid == MyProcPid &&
762 246 : (!synced_only || s->data.synced)))
763 : {
764 : Assert(s->data.persistency == RS_TEMPORARY);
765 246 : SpinLockRelease(&s->mutex);
766 246 : LWLockRelease(ReplicationSlotControlLock); /* avoid deadlock */
767 :
768 246 : ReplicationSlotDropPtr(s);
769 :
770 246 : ConditionVariableBroadcast(&s->active_cv);
771 246 : goto restart;
772 : }
773 : else
774 22950 : SpinLockRelease(&s->mutex);
775 : }
776 :
777 69916 : LWLockRelease(ReplicationSlotControlLock);
778 69916 : }
779 :
780 : /*
781 : * Permanently drop replication slot identified by the passed in name.
782 : */
783 : void
784 690 : ReplicationSlotDrop(const char *name, bool nowait)
785 : {
786 : Assert(MyReplicationSlot == NULL);
787 :
788 690 : ReplicationSlotAcquire(name, nowait);
789 :
790 : /*
791 : * Do not allow users to drop the slots which are currently being synced
792 : * from the primary to the standby.
793 : */
794 680 : if (RecoveryInProgress() && MyReplicationSlot->data.synced)
795 2 : ereport(ERROR,
796 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
797 : errmsg("cannot drop replication slot \"%s\"", name),
798 : errdetail("This slot is being synced from the primary server."));
799 :
800 678 : ReplicationSlotDropAcquired();
801 678 : }
802 :
803 : /*
804 : * Change the definition of the slot identified by the specified name.
805 : */
806 : void
807 6 : ReplicationSlotAlter(const char *name, bool failover)
808 : {
809 : Assert(MyReplicationSlot == NULL);
810 :
811 6 : ReplicationSlotAcquire(name, false);
812 :
813 6 : if (SlotIsPhysical(MyReplicationSlot))
814 0 : ereport(ERROR,
815 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
816 : errmsg("cannot use %s with a physical replication slot",
817 : "ALTER_REPLICATION_SLOT"));
818 :
819 6 : if (RecoveryInProgress())
820 : {
821 : /*
822 : * Do not allow users to alter the slots which are currently being
823 : * synced from the primary to the standby.
824 : */
825 2 : if (MyReplicationSlot->data.synced)
826 2 : ereport(ERROR,
827 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
828 : errmsg("cannot alter replication slot \"%s\"", name),
829 : errdetail("This slot is being synced from the primary server."));
830 :
831 : /*
832 : * Do not allow users to enable failover on the standby as we do not
833 : * support sync to the cascading standby.
834 : */
835 0 : if (failover)
836 0 : ereport(ERROR,
837 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
838 : errmsg("cannot enable failover for a replication slot"
839 : " on the standby"));
840 : }
841 :
842 : /*
843 : * Do not allow users to enable failover for temporary slots as we do not
844 : * support syncing temporary slots to the standby.
845 : */
846 4 : if (failover && MyReplicationSlot->data.persistency == RS_TEMPORARY)
847 0 : ereport(ERROR,
848 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
849 : errmsg("cannot enable failover for a temporary replication slot"));
850 :
851 4 : if (MyReplicationSlot->data.failover != failover)
852 : {
853 4 : SpinLockAcquire(&MyReplicationSlot->mutex);
854 4 : MyReplicationSlot->data.failover = failover;
855 4 : SpinLockRelease(&MyReplicationSlot->mutex);
856 :
857 4 : ReplicationSlotMarkDirty();
858 4 : ReplicationSlotSave();
859 : }
860 :
861 4 : ReplicationSlotRelease();
862 4 : }
863 :
864 : /*
865 : * Permanently drop the currently acquired replication slot.
866 : */
867 : void
868 702 : ReplicationSlotDropAcquired(void)
869 : {
870 702 : ReplicationSlot *slot = MyReplicationSlot;
871 :
872 : Assert(MyReplicationSlot != NULL);
873 :
874 : /* slot isn't acquired anymore */
875 702 : MyReplicationSlot = NULL;
876 :
877 702 : ReplicationSlotDropPtr(slot);
878 702 : }
879 :
880 : /*
881 : * Permanently drop the replication slot which will be released by the point
882 : * this function returns.
883 : */
884 : static void
885 948 : ReplicationSlotDropPtr(ReplicationSlot *slot)
886 : {
887 : char path[MAXPGPATH];
888 : char tmppath[MAXPGPATH];
889 :
890 : /*
891 : * If some other backend ran this code concurrently with us, we might try
892 : * to delete a slot with a certain name while someone else was trying to
893 : * create a slot with the same name.
894 : */
895 948 : LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
896 :
897 : /* Generate pathnames. */
898 948 : sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
899 948 : sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));
900 :
901 : /*
902 : * Rename the slot directory on disk, so that we'll no longer recognize
903 : * this as a valid slot. Note that if this fails, we've got to mark the
904 : * slot inactive before bailing out. If we're dropping an ephemeral or a
905 : * temporary slot, we better never fail hard as the caller won't expect
906 : * the slot to survive and this might get called during error handling.
907 : */
908 948 : if (rename(path, tmppath) == 0)
909 : {
910 : /*
911 : * We need to fsync() the directory we just renamed and its parent to
912 : * make sure that our changes are on disk in a crash-safe fashion. If
913 : * fsync() fails, we can't be sure whether the changes are on disk or
914 : * not. For now, we handle that by panicking;
915 : * StartupReplicationSlots() will try to straighten it out after
916 : * restart.
917 : */
918 948 : START_CRIT_SECTION();
919 948 : fsync_fname(tmppath, true);
920 948 : fsync_fname("pg_replslot", true);
921 948 : END_CRIT_SECTION();
922 : }
923 : else
924 : {
925 0 : bool fail_softly = slot->data.persistency != RS_PERSISTENT;
926 :
927 0 : SpinLockAcquire(&slot->mutex);
928 0 : slot->active_pid = 0;
929 0 : SpinLockRelease(&slot->mutex);
930 :
931 : /* wake up anyone waiting on this slot */
932 0 : ConditionVariableBroadcast(&slot->active_cv);
933 :
934 0 : ereport(fail_softly ? WARNING : ERROR,
935 : (errcode_for_file_access(),
936 : errmsg("could not rename file \"%s\" to \"%s\": %m",
937 : path, tmppath)));
938 : }
939 :
940 : /*
941 : * The slot is definitely gone. Lock out concurrent scans of the array
942 : * long enough to kill it. It's OK to clear the active PID here without
943 : * grabbing the mutex because nobody else can be scanning the array here,
944 : * and nobody can be attached to this slot and thus access it without
945 : * scanning the array.
946 : *
947 : * Also wake up processes waiting for it.
948 : */
949 948 : LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
950 948 : slot->active_pid = 0;
951 948 : slot->in_use = false;
952 948 : LWLockRelease(ReplicationSlotControlLock);
953 948 : ConditionVariableBroadcast(&slot->active_cv);
954 :
955 : /*
956 : * Slot is dead and doesn't prevent resource removal anymore, recompute
957 : * limits.
958 : */
959 948 : ReplicationSlotsComputeRequiredXmin(false);
960 948 : ReplicationSlotsComputeRequiredLSN();
961 :
962 : /*
963 : * If removing the directory fails, the worst thing that will happen is
964 : * that the user won't be able to create a new slot with the same name
965 : * until the next server restart. We warn about it, but that's all.
966 : */
967 948 : if (!rmtree(tmppath, true))
968 0 : ereport(WARNING,
969 : (errmsg("could not remove directory \"%s\"", tmppath)));
970 :
971 : /*
972 : * Drop the statistics entry for the replication slot. Do this while
973 : * holding ReplicationSlotAllocationLock so that we don't drop a
974 : * statistics entry for another slot with the same name just created in
975 : * another session.
976 : */
977 948 : if (SlotIsLogical(slot))
978 686 : pgstat_drop_replslot(slot);
979 :
980 : /*
981 : * We release this at the very end, so that nobody starts trying to create
982 : * a slot while we're still cleaning up the detritus of the old one.
983 : */
984 948 : LWLockRelease(ReplicationSlotAllocationLock);
985 948 : }
986 :
987 : /*
988 : * Serialize the currently acquired slot's state from memory to disk, thereby
989 : * guaranteeing the current state will survive a crash.
990 : */
991 : void
992 2232 : ReplicationSlotSave(void)
993 : {
994 : char path[MAXPGPATH];
995 :
996 : Assert(MyReplicationSlot != NULL);
997 :
998 2232 : sprintf(path, "pg_replslot/%s", NameStr(MyReplicationSlot->data.name));
999 2232 : SaveSlotToPath(MyReplicationSlot, path, ERROR);
1000 2232 : }
1001 :
1002 : /*
1003 : * Signal that it would be useful if the currently acquired slot would be
1004 : * flushed out to disk.
1005 : *
1006 : * Note that the actual flush to disk can be delayed for a long time, if
1007 : * required for correctness explicitly do a ReplicationSlotSave().
1008 : */
1009 : void
1010 40398 : ReplicationSlotMarkDirty(void)
1011 : {
1012 40398 : ReplicationSlot *slot = MyReplicationSlot;
1013 :
1014 : Assert(MyReplicationSlot != NULL);
1015 :
1016 40398 : SpinLockAcquire(&slot->mutex);
1017 40398 : MyReplicationSlot->just_dirtied = true;
1018 40398 : MyReplicationSlot->dirty = true;
1019 40398 : SpinLockRelease(&slot->mutex);
1020 40398 : }
1021 :
1022 : /*
1023 : * Convert a slot that's marked as RS_EPHEMERAL or RS_TEMPORARY to a
1024 : * RS_PERSISTENT slot, guaranteeing it will be there after an eventual crash.
1025 : */
1026 : void
1027 782 : ReplicationSlotPersist(void)
1028 : {
1029 782 : ReplicationSlot *slot = MyReplicationSlot;
1030 :
1031 : Assert(slot != NULL);
1032 : Assert(slot->data.persistency != RS_PERSISTENT);
1033 :
1034 782 : SpinLockAcquire(&slot->mutex);
1035 782 : slot->data.persistency = RS_PERSISTENT;
1036 782 : SpinLockRelease(&slot->mutex);
1037 :
1038 782 : ReplicationSlotMarkDirty();
1039 782 : ReplicationSlotSave();
1040 782 : }
1041 :
1042 : /*
1043 : * Compute the oldest xmin across all slots and store it in the ProcArray.
1044 : *
1045 : * If already_locked is true, ProcArrayLock has already been acquired
1046 : * exclusively.
1047 : */
1048 : void
1049 3896 : ReplicationSlotsComputeRequiredXmin(bool already_locked)
1050 : {
1051 : int i;
1052 3896 : TransactionId agg_xmin = InvalidTransactionId;
1053 3896 : TransactionId agg_catalog_xmin = InvalidTransactionId;
1054 :
1055 : Assert(ReplicationSlotCtl != NULL);
1056 :
1057 3896 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1058 :
1059 39046 : for (i = 0; i < max_replication_slots; i++)
1060 : {
1061 35150 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
1062 : TransactionId effective_xmin;
1063 : TransactionId effective_catalog_xmin;
1064 : bool invalidated;
1065 :
1066 35150 : if (!s->in_use)
1067 31608 : continue;
1068 :
1069 3542 : SpinLockAcquire(&s->mutex);
1070 3542 : effective_xmin = s->effective_xmin;
1071 3542 : effective_catalog_xmin = s->effective_catalog_xmin;
1072 3542 : invalidated = s->data.invalidated != RS_INVAL_NONE;
1073 3542 : SpinLockRelease(&s->mutex);
1074 :
1075 : /* invalidated slots need not apply */
1076 3542 : if (invalidated)
1077 44 : continue;
1078 :
1079 : /* check the data xmin */
1080 3498 : if (TransactionIdIsValid(effective_xmin) &&
1081 6 : (!TransactionIdIsValid(agg_xmin) ||
1082 6 : TransactionIdPrecedes(effective_xmin, agg_xmin)))
1083 492 : agg_xmin = effective_xmin;
1084 :
1085 : /* check the catalog xmin */
1086 3498 : if (TransactionIdIsValid(effective_catalog_xmin) &&
1087 1398 : (!TransactionIdIsValid(agg_catalog_xmin) ||
1088 1398 : TransactionIdPrecedes(effective_catalog_xmin, agg_catalog_xmin)))
1089 1948 : agg_catalog_xmin = effective_catalog_xmin;
1090 : }
1091 :
1092 3896 : LWLockRelease(ReplicationSlotControlLock);
1093 :
1094 3896 : ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
1095 3896 : }
1096 :
1097 : /*
1098 : * Compute the oldest restart LSN across all slots and inform xlog module.
1099 : *
1100 : * Note: while max_slot_wal_keep_size is theoretically relevant for this
1101 : * purpose, we don't try to account for that, because this module doesn't
1102 : * know what to compare against.
1103 : */
1104 : void
1105 41154 : ReplicationSlotsComputeRequiredLSN(void)
1106 : {
1107 : int i;
1108 41154 : XLogRecPtr min_required = InvalidXLogRecPtr;
1109 :
1110 : Assert(ReplicationSlotCtl != NULL);
1111 :
1112 41154 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1113 446474 : for (i = 0; i < max_replication_slots; i++)
1114 : {
1115 405320 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
1116 : XLogRecPtr restart_lsn;
1117 : bool invalidated;
1118 :
1119 405320 : if (!s->in_use)
1120 364590 : continue;
1121 :
1122 40730 : SpinLockAcquire(&s->mutex);
1123 40730 : restart_lsn = s->data.restart_lsn;
1124 40730 : invalidated = s->data.invalidated != RS_INVAL_NONE;
1125 40730 : SpinLockRelease(&s->mutex);
1126 :
1127 : /* invalidated slots need not apply */
1128 40730 : if (invalidated)
1129 46 : continue;
1130 :
1131 40684 : if (restart_lsn != InvalidXLogRecPtr &&
1132 1374 : (min_required == InvalidXLogRecPtr ||
1133 : restart_lsn < min_required))
1134 39354 : min_required = restart_lsn;
1135 : }
1136 41154 : LWLockRelease(ReplicationSlotControlLock);
1137 :
1138 41154 : XLogSetReplicationSlotMinimumLSN(min_required);
1139 41154 : }
1140 :
1141 : /*
1142 : * Compute the oldest WAL LSN required by *logical* decoding slots..
1143 : *
1144 : * Returns InvalidXLogRecPtr if logical decoding is disabled or no logical
1145 : * slots exist.
1146 : *
1147 : * NB: this returns a value >= ReplicationSlotsComputeRequiredLSN(), since it
1148 : * ignores physical replication slots.
1149 : *
1150 : * The results aren't required frequently, so we don't maintain a precomputed
1151 : * value like we do for ComputeRequiredLSN() and ComputeRequiredXmin().
1152 : */
1153 : XLogRecPtr
1154 3408 : ReplicationSlotsComputeLogicalRestartLSN(void)
1155 : {
1156 3408 : XLogRecPtr result = InvalidXLogRecPtr;
1157 : int i;
1158 :
1159 3408 : if (max_replication_slots <= 0)
1160 4 : return InvalidXLogRecPtr;
1161 :
1162 3404 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1163 :
1164 36488 : for (i = 0; i < max_replication_slots; i++)
1165 : {
1166 : ReplicationSlot *s;
1167 : XLogRecPtr restart_lsn;
1168 : bool invalidated;
1169 :
1170 33084 : s = &ReplicationSlotCtl->replication_slots[i];
1171 :
1172 : /* cannot change while ReplicationSlotCtlLock is held */
1173 33084 : if (!s->in_use)
1174 32452 : continue;
1175 :
1176 : /* we're only interested in logical slots */
1177 632 : if (!SlotIsLogical(s))
1178 316 : continue;
1179 :
1180 : /* read once, it's ok if it increases while we're checking */
1181 316 : SpinLockAcquire(&s->mutex);
1182 316 : restart_lsn = s->data.restart_lsn;
1183 316 : invalidated = s->data.invalidated != RS_INVAL_NONE;
1184 316 : SpinLockRelease(&s->mutex);
1185 :
1186 : /* invalidated slots need not apply */
1187 316 : if (invalidated)
1188 8 : continue;
1189 :
1190 308 : if (restart_lsn == InvalidXLogRecPtr)
1191 0 : continue;
1192 :
1193 308 : if (result == InvalidXLogRecPtr ||
1194 : restart_lsn < result)
1195 252 : result = restart_lsn;
1196 : }
1197 :
1198 3404 : LWLockRelease(ReplicationSlotControlLock);
1199 :
1200 3404 : return result;
1201 : }
1202 :
1203 : /*
1204 : * ReplicationSlotsCountDBSlots -- count the number of slots that refer to the
1205 : * passed database oid.
1206 : *
1207 : * Returns true if there are any slots referencing the database. *nslots will
1208 : * be set to the absolute number of slots in the database, *nactive to ones
1209 : * currently active.
1210 : */
1211 : bool
1212 66 : ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
1213 : {
1214 : int i;
1215 :
1216 66 : *nslots = *nactive = 0;
1217 :
1218 66 : if (max_replication_slots <= 0)
1219 0 : return false;
1220 :
1221 66 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1222 668 : for (i = 0; i < max_replication_slots; i++)
1223 : {
1224 : ReplicationSlot *s;
1225 :
1226 602 : s = &ReplicationSlotCtl->replication_slots[i];
1227 :
1228 : /* cannot change while ReplicationSlotCtlLock is held */
1229 602 : if (!s->in_use)
1230 568 : continue;
1231 :
1232 : /* only logical slots are database specific, skip */
1233 34 : if (!SlotIsLogical(s))
1234 18 : continue;
1235 :
1236 : /* not our database, skip */
1237 16 : if (s->data.database != dboid)
1238 10 : continue;
1239 :
1240 : /* NB: intentionally counting invalidated slots */
1241 :
1242 : /* count slots with spinlock held */
1243 6 : SpinLockAcquire(&s->mutex);
1244 6 : (*nslots)++;
1245 6 : if (s->active_pid != 0)
1246 2 : (*nactive)++;
1247 6 : SpinLockRelease(&s->mutex);
1248 : }
1249 66 : LWLockRelease(ReplicationSlotControlLock);
1250 :
1251 66 : if (*nslots > 0)
1252 6 : return true;
1253 60 : return false;
1254 : }
1255 :
1256 : /*
1257 : * ReplicationSlotsDropDBSlots -- Drop all db-specific slots relating to the
1258 : * passed database oid. The caller should hold an exclusive lock on the
1259 : * pg_database oid for the database to prevent creation of new slots on the db
1260 : * or replay from existing slots.
1261 : *
1262 : * Another session that concurrently acquires an existing slot on the target DB
1263 : * (most likely to drop it) may cause this function to ERROR. If that happens
1264 : * it may have dropped some but not all slots.
1265 : *
1266 : * This routine isn't as efficient as it could be - but we don't drop
1267 : * databases often, especially databases with lots of slots.
1268 : */
1269 : void
1270 86 : ReplicationSlotsDropDBSlots(Oid dboid)
1271 : {
1272 : int i;
1273 :
1274 86 : if (max_replication_slots <= 0)
1275 0 : return;
1276 :
1277 86 : restart:
1278 96 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1279 882 : for (i = 0; i < max_replication_slots; i++)
1280 : {
1281 : ReplicationSlot *s;
1282 : char *slotname;
1283 : int active_pid;
1284 :
1285 796 : s = &ReplicationSlotCtl->replication_slots[i];
1286 :
1287 : /* cannot change while ReplicationSlotCtlLock is held */
1288 796 : if (!s->in_use)
1289 744 : continue;
1290 :
1291 : /* only logical slots are database specific, skip */
1292 52 : if (!SlotIsLogical(s))
1293 20 : continue;
1294 :
1295 : /* not our database, skip */
1296 32 : if (s->data.database != dboid)
1297 22 : continue;
1298 :
1299 : /* NB: intentionally including invalidated slots */
1300 :
1301 : /* acquire slot, so ReplicationSlotDropAcquired can be reused */
1302 10 : SpinLockAcquire(&s->mutex);
1303 : /* can't change while ReplicationSlotControlLock is held */
1304 10 : slotname = NameStr(s->data.name);
1305 10 : active_pid = s->active_pid;
1306 10 : if (active_pid == 0)
1307 : {
1308 10 : MyReplicationSlot = s;
1309 10 : s->active_pid = MyProcPid;
1310 : }
1311 10 : SpinLockRelease(&s->mutex);
1312 :
1313 : /*
1314 : * Even though we hold an exclusive lock on the database object a
1315 : * logical slot for that DB can still be active, e.g. if it's
1316 : * concurrently being dropped by a backend connected to another DB.
1317 : *
1318 : * That's fairly unlikely in practice, so we'll just bail out.
1319 : *
1320 : * The slot sync worker holds a shared lock on the database before
1321 : * operating on synced logical slots to avoid conflict with the drop
1322 : * happening here. The persistent synced slots are thus safe but there
1323 : * is a possibility that the slot sync worker has created a temporary
1324 : * slot (which stays active even on release) and we are trying to drop
1325 : * that here. In practice, the chances of hitting this scenario are
1326 : * less as during slot synchronization, the temporary slot is
1327 : * immediately converted to persistent and thus is safe due to the
1328 : * shared lock taken on the database. So, we'll just bail out in such
1329 : * a case.
1330 : *
1331 : * XXX: We can consider shutting down the slot sync worker before
1332 : * trying to drop synced temporary slots here.
1333 : */
1334 10 : if (active_pid)
1335 0 : ereport(ERROR,
1336 : (errcode(ERRCODE_OBJECT_IN_USE),
1337 : errmsg("replication slot \"%s\" is active for PID %d",
1338 : slotname, active_pid)));
1339 :
1340 : /*
1341 : * To avoid duplicating ReplicationSlotDropAcquired() and to avoid
1342 : * holding ReplicationSlotControlLock over filesystem operations,
1343 : * release ReplicationSlotControlLock and use
1344 : * ReplicationSlotDropAcquired.
1345 : *
1346 : * As that means the set of slots could change, restart scan from the
1347 : * beginning each time we release the lock.
1348 : */
1349 10 : LWLockRelease(ReplicationSlotControlLock);
1350 10 : ReplicationSlotDropAcquired();
1351 10 : goto restart;
1352 : }
1353 86 : LWLockRelease(ReplicationSlotControlLock);
1354 : }
1355 :
1356 :
1357 : /*
1358 : * Check whether the server's configuration supports using replication
1359 : * slots.
1360 : */
1361 : void
1362 2986 : CheckSlotRequirements(void)
1363 : {
1364 : /*
1365 : * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
1366 : * needs the same check.
1367 : */
1368 :
1369 2986 : if (max_replication_slots == 0)
1370 0 : ereport(ERROR,
1371 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1372 : errmsg("replication slots can only be used if max_replication_slots > 0")));
1373 :
1374 2986 : if (wal_level < WAL_LEVEL_REPLICA)
1375 0 : ereport(ERROR,
1376 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1377 : errmsg("replication slots can only be used if wal_level >= replica")));
1378 2986 : }
1379 :
1380 : /*
1381 : * Check whether the user has privilege to use replication slots.
1382 : */
1383 : void
1384 988 : CheckSlotPermissions(void)
1385 : {
1386 988 : if (!has_rolreplication(GetUserId()))
1387 10 : ereport(ERROR,
1388 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1389 : errmsg("permission denied to use replication slots"),
1390 : errdetail("Only roles with the %s attribute may use replication slots.",
1391 : "REPLICATION")));
1392 978 : }
1393 :
1394 : /*
1395 : * Reserve WAL for the currently active slot.
1396 : *
1397 : * Compute and set restart_lsn in a manner that's appropriate for the type of
1398 : * the slot and concurrency safe.
1399 : */
1400 : void
1401 1042 : ReplicationSlotReserveWal(void)
1402 : {
1403 1042 : ReplicationSlot *slot = MyReplicationSlot;
1404 :
1405 : Assert(slot != NULL);
1406 : Assert(slot->data.restart_lsn == InvalidXLogRecPtr);
1407 :
1408 : /*
1409 : * The replication slot mechanism is used to prevent removal of required
1410 : * WAL. As there is no interlock between this routine and checkpoints, WAL
1411 : * segments could concurrently be removed when a now stale return value of
1412 : * ReplicationSlotsComputeRequiredLSN() is used. In the unlikely case that
1413 : * this happens we'll just retry.
1414 : */
1415 : while (true)
1416 0 : {
1417 : XLogSegNo segno;
1418 : XLogRecPtr restart_lsn;
1419 :
1420 : /*
1421 : * For logical slots log a standby snapshot and start logical decoding
1422 : * at exactly that position. That allows the slot to start up more
1423 : * quickly. But on a standby we cannot do WAL writes, so just use the
1424 : * replay pointer; effectively, an attempt to create a logical slot on
1425 : * standby will cause it to wait for an xl_running_xact record to be
1426 : * logged independently on the primary, so that a snapshot can be
1427 : * built using the record.
1428 : *
1429 : * None of this is needed (or indeed helpful) for physical slots as
1430 : * they'll start replay at the last logged checkpoint anyway. Instead
1431 : * return the location of the last redo LSN. While that slightly
1432 : * increases the chance that we have to retry, it's where a base
1433 : * backup has to start replay at.
1434 : */
1435 1042 : if (SlotIsPhysical(slot))
1436 258 : restart_lsn = GetRedoRecPtr();
1437 784 : else if (RecoveryInProgress())
1438 44 : restart_lsn = GetXLogReplayRecPtr(NULL);
1439 : else
1440 740 : restart_lsn = GetXLogInsertRecPtr();
1441 :
1442 1042 : SpinLockAcquire(&slot->mutex);
1443 1042 : slot->data.restart_lsn = restart_lsn;
1444 1042 : SpinLockRelease(&slot->mutex);
1445 :
1446 : /* prevent WAL removal as fast as possible */
1447 1042 : ReplicationSlotsComputeRequiredLSN();
1448 :
1449 : /*
1450 : * If all required WAL is still there, great, otherwise retry. The
1451 : * slot should prevent further removal of WAL, unless there's a
1452 : * concurrent ReplicationSlotsComputeRequiredLSN() after we've written
1453 : * the new restart_lsn above, so normally we should never need to loop
1454 : * more than twice.
1455 : */
1456 1042 : XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size);
1457 1042 : if (XLogGetLastRemovedSegno() < segno)
1458 1042 : break;
1459 : }
1460 :
1461 1042 : if (!RecoveryInProgress() && SlotIsLogical(slot))
1462 : {
1463 : XLogRecPtr flushptr;
1464 :
1465 : /* make sure we have enough information to start */
1466 740 : flushptr = LogStandbySnapshot();
1467 :
1468 : /* and make sure it's fsynced to disk */
1469 740 : XLogFlush(flushptr);
1470 : }
1471 1042 : }
1472 :
1473 : /*
1474 : * Report that replication slot needs to be invalidated
1475 : */
1476 : static void
1477 42 : ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
1478 : bool terminating,
1479 : int pid,
1480 : NameData slotname,
1481 : XLogRecPtr restart_lsn,
1482 : XLogRecPtr oldestLSN,
1483 : TransactionId snapshotConflictHorizon)
1484 : {
1485 : StringInfoData err_detail;
1486 42 : bool hint = false;
1487 :
1488 42 : initStringInfo(&err_detail);
1489 :
1490 42 : switch (cause)
1491 : {
1492 12 : case RS_INVAL_WAL_REMOVED:
1493 : {
1494 12 : unsigned long long ex = oldestLSN - restart_lsn;
1495 :
1496 12 : hint = true;
1497 12 : appendStringInfo(&err_detail,
1498 12 : ngettext("The slot's restart_lsn %X/%X exceeds the limit by %llu byte.",
1499 : "The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.",
1500 : ex),
1501 12 : LSN_FORMAT_ARGS(restart_lsn),
1502 : ex);
1503 12 : break;
1504 : }
1505 24 : case RS_INVAL_HORIZON:
1506 24 : appendStringInfo(&err_detail, _("The slot conflicted with xid horizon %u."),
1507 : snapshotConflictHorizon);
1508 24 : break;
1509 :
1510 6 : case RS_INVAL_WAL_LEVEL:
1511 6 : appendStringInfoString(&err_detail, _("Logical decoding on standby requires wal_level >= logical on the primary server."));
1512 6 : break;
1513 : case RS_INVAL_NONE:
1514 : pg_unreachable();
1515 : }
1516 :
1517 42 : ereport(LOG,
1518 : terminating ?
1519 : errmsg("terminating process %d to release replication slot \"%s\"",
1520 : pid, NameStr(slotname)) :
1521 : errmsg("invalidating obsolete replication slot \"%s\"",
1522 : NameStr(slotname)),
1523 : errdetail_internal("%s", err_detail.data),
1524 : hint ? errhint("You might need to increase %s.", "max_slot_wal_keep_size") : 0);
1525 :
1526 42 : pfree(err_detail.data);
1527 42 : }
1528 :
1529 : /*
1530 : * Helper for InvalidateObsoleteReplicationSlots
1531 : *
1532 : * Acquires the given slot and mark it invalid, if necessary and possible.
1533 : *
1534 : * Returns whether ReplicationSlotControlLock was released in the interim (and
1535 : * in that case we're not holding the lock at return, otherwise we are).
1536 : *
1537 : * Sets *invalidated true if the slot was invalidated. (Untouched otherwise.)
1538 : *
1539 : * This is inherently racy, because we release the LWLock
1540 : * for syscalls, so caller must restart if we return true.
1541 : */
1542 : static bool
1543 426 : InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
1544 : ReplicationSlot *s,
1545 : XLogRecPtr oldestLSN,
1546 : Oid dboid, TransactionId snapshotConflictHorizon,
1547 : bool *invalidated)
1548 : {
1549 426 : int last_signaled_pid = 0;
1550 426 : bool released_lock = false;
1551 426 : bool terminated = false;
1552 426 : TransactionId initial_effective_xmin = InvalidTransactionId;
1553 426 : TransactionId initial_catalog_effective_xmin = InvalidTransactionId;
1554 426 : XLogRecPtr initial_restart_lsn = InvalidXLogRecPtr;
1555 426 : ReplicationSlotInvalidationCause invalidation_cause_prev PG_USED_FOR_ASSERTS_ONLY = RS_INVAL_NONE;
1556 :
1557 : for (;;)
1558 14 : {
1559 : XLogRecPtr restart_lsn;
1560 : NameData slotname;
1561 440 : int active_pid = 0;
1562 440 : ReplicationSlotInvalidationCause invalidation_cause = RS_INVAL_NONE;
1563 :
1564 : Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
1565 :
1566 440 : if (!s->in_use)
1567 : {
1568 0 : if (released_lock)
1569 0 : LWLockRelease(ReplicationSlotControlLock);
1570 0 : break;
1571 : }
1572 :
1573 : /*
1574 : * Check if the slot needs to be invalidated. If it needs to be
1575 : * invalidated, and is not currently acquired, acquire it and mark it
1576 : * as having been invalidated. We do this with the spinlock held to
1577 : * avoid race conditions -- for example the restart_lsn could move
1578 : * forward, or the slot could be dropped.
1579 : */
1580 440 : SpinLockAcquire(&s->mutex);
1581 :
1582 440 : restart_lsn = s->data.restart_lsn;
1583 :
1584 : /* we do nothing if the slot is already invalid */
1585 440 : if (s->data.invalidated == RS_INVAL_NONE)
1586 : {
1587 : /*
1588 : * The slot's mutex will be released soon, and it is possible that
1589 : * those values change since the process holding the slot has been
1590 : * terminated (if any), so record them here to ensure that we
1591 : * would report the correct invalidation cause.
1592 : */
1593 362 : if (!terminated)
1594 : {
1595 348 : initial_restart_lsn = s->data.restart_lsn;
1596 348 : initial_effective_xmin = s->effective_xmin;
1597 348 : initial_catalog_effective_xmin = s->effective_catalog_xmin;
1598 : }
1599 :
1600 362 : switch (cause)
1601 : {
1602 308 : case RS_INVAL_WAL_REMOVED:
1603 308 : if (initial_restart_lsn != InvalidXLogRecPtr &&
1604 : initial_restart_lsn < oldestLSN)
1605 12 : invalidation_cause = cause;
1606 308 : break;
1607 48 : case RS_INVAL_HORIZON:
1608 48 : if (!SlotIsLogical(s))
1609 0 : break;
1610 : /* invalid DB oid signals a shared relation */
1611 48 : if (dboid != InvalidOid && dboid != s->data.database)
1612 0 : break;
1613 48 : if (TransactionIdIsValid(initial_effective_xmin) &&
1614 0 : TransactionIdPrecedesOrEquals(initial_effective_xmin,
1615 : snapshotConflictHorizon))
1616 0 : invalidation_cause = cause;
1617 96 : else if (TransactionIdIsValid(initial_catalog_effective_xmin) &&
1618 48 : TransactionIdPrecedesOrEquals(initial_catalog_effective_xmin,
1619 : snapshotConflictHorizon))
1620 24 : invalidation_cause = cause;
1621 48 : break;
1622 6 : case RS_INVAL_WAL_LEVEL:
1623 6 : if (SlotIsLogical(s))
1624 6 : invalidation_cause = cause;
1625 6 : break;
1626 : case RS_INVAL_NONE:
1627 : pg_unreachable();
1628 : }
1629 78 : }
1630 :
1631 : /*
1632 : * The invalidation cause recorded previously should not change while
1633 : * the process owning the slot (if any) has been terminated.
1634 : */
1635 : Assert(!(invalidation_cause_prev != RS_INVAL_NONE && terminated &&
1636 : invalidation_cause_prev != invalidation_cause));
1637 :
1638 : /* if there's no invalidation, we're done */
1639 440 : if (invalidation_cause == RS_INVAL_NONE)
1640 : {
1641 398 : SpinLockRelease(&s->mutex);
1642 398 : if (released_lock)
1643 0 : LWLockRelease(ReplicationSlotControlLock);
1644 398 : break;
1645 : }
1646 :
1647 42 : slotname = s->data.name;
1648 42 : active_pid = s->active_pid;
1649 :
1650 : /*
1651 : * If the slot can be acquired, do so and mark it invalidated
1652 : * immediately. Otherwise we'll signal the owning process, below, and
1653 : * retry.
1654 : */
1655 42 : if (active_pid == 0)
1656 : {
1657 28 : MyReplicationSlot = s;
1658 28 : s->active_pid = MyProcPid;
1659 28 : s->data.invalidated = invalidation_cause;
1660 :
1661 : /*
1662 : * XXX: We should consider not overwriting restart_lsn and instead
1663 : * just rely on .invalidated.
1664 : */
1665 28 : if (invalidation_cause == RS_INVAL_WAL_REMOVED)
1666 8 : s->data.restart_lsn = InvalidXLogRecPtr;
1667 :
1668 : /* Let caller know */
1669 28 : *invalidated = true;
1670 : }
1671 :
1672 42 : SpinLockRelease(&s->mutex);
1673 :
1674 : /*
1675 : * The logical replication slots shouldn't be invalidated as GUC
1676 : * max_slot_wal_keep_size is set to -1 during the binary upgrade. See
1677 : * check_old_cluster_for_valid_slots() where we ensure that no
1678 : * invalidated before the upgrade.
1679 : */
1680 : Assert(!(*invalidated && SlotIsLogical(s) && IsBinaryUpgrade));
1681 :
1682 42 : if (active_pid != 0)
1683 : {
1684 : /*
1685 : * Prepare the sleep on the slot's condition variable before
1686 : * releasing the lock, to close a possible race condition if the
1687 : * slot is released before the sleep below.
1688 : */
1689 14 : ConditionVariablePrepareToSleep(&s->active_cv);
1690 :
1691 14 : LWLockRelease(ReplicationSlotControlLock);
1692 14 : released_lock = true;
1693 :
1694 : /*
1695 : * Signal to terminate the process that owns the slot, if we
1696 : * haven't already signalled it. (Avoidance of repeated
1697 : * signalling is the only reason for there to be a loop in this
1698 : * routine; otherwise we could rely on caller's restart loop.)
1699 : *
1700 : * There is the race condition that other process may own the slot
1701 : * after its current owner process is terminated and before this
1702 : * process owns it. To handle that, we signal only if the PID of
1703 : * the owning process has changed from the previous time. (This
1704 : * logic assumes that the same PID is not reused very quickly.)
1705 : */
1706 14 : if (last_signaled_pid != active_pid)
1707 : {
1708 14 : ReportSlotInvalidation(invalidation_cause, true, active_pid,
1709 : slotname, restart_lsn,
1710 : oldestLSN, snapshotConflictHorizon);
1711 :
1712 14 : if (MyBackendType == B_STARTUP)
1713 10 : (void) SendProcSignal(active_pid,
1714 : PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT,
1715 : INVALID_PROC_NUMBER);
1716 : else
1717 4 : (void) kill(active_pid, SIGTERM);
1718 :
1719 14 : last_signaled_pid = active_pid;
1720 14 : terminated = true;
1721 14 : invalidation_cause_prev = invalidation_cause;
1722 : }
1723 :
1724 : /* Wait until the slot is released. */
1725 14 : ConditionVariableSleep(&s->active_cv,
1726 : WAIT_EVENT_REPLICATION_SLOT_DROP);
1727 :
1728 : /*
1729 : * Re-acquire lock and start over; we expect to invalidate the
1730 : * slot next time (unless another process acquires the slot in the
1731 : * meantime).
1732 : */
1733 14 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1734 14 : continue;
1735 : }
1736 : else
1737 : {
1738 : /*
1739 : * We hold the slot now and have already invalidated it; flush it
1740 : * to ensure that state persists.
1741 : *
1742 : * Don't want to hold ReplicationSlotControlLock across file
1743 : * system operations, so release it now but be sure to tell caller
1744 : * to restart from scratch.
1745 : */
1746 28 : LWLockRelease(ReplicationSlotControlLock);
1747 28 : released_lock = true;
1748 :
1749 : /* Make sure the invalidated state persists across server restart */
1750 28 : ReplicationSlotMarkDirty();
1751 28 : ReplicationSlotSave();
1752 28 : ReplicationSlotRelease();
1753 :
1754 28 : ReportSlotInvalidation(invalidation_cause, false, active_pid,
1755 : slotname, restart_lsn,
1756 : oldestLSN, snapshotConflictHorizon);
1757 :
1758 : /* done with this slot for now */
1759 28 : break;
1760 : }
1761 : }
1762 :
1763 : Assert(released_lock == !LWLockHeldByMe(ReplicationSlotControlLock));
1764 :
1765 426 : return released_lock;
1766 : }
1767 :
1768 : /*
1769 : * Invalidate slots that require resources about to be removed.
1770 : *
1771 : * Returns true when any slot have got invalidated.
1772 : *
1773 : * Whether a slot needs to be invalidated depends on the cause. A slot is
1774 : * removed if it:
1775 : * - RS_INVAL_WAL_REMOVED: requires a LSN older than the given segment
1776 : * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given
1777 : * db; dboid may be InvalidOid for shared relations
1778 : * - RS_INVAL_WAL_LEVEL: is logical
1779 : *
1780 : * NB - this runs as part of checkpoint, so avoid raising errors if possible.
1781 : */
1782 : bool
1783 1746 : InvalidateObsoleteReplicationSlots(ReplicationSlotInvalidationCause cause,
1784 : XLogSegNo oldestSegno, Oid dboid,
1785 : TransactionId snapshotConflictHorizon)
1786 : {
1787 : XLogRecPtr oldestLSN;
1788 1746 : bool invalidated = false;
1789 :
1790 : Assert(cause != RS_INVAL_HORIZON || TransactionIdIsValid(snapshotConflictHorizon));
1791 : Assert(cause != RS_INVAL_WAL_REMOVED || oldestSegno > 0);
1792 : Assert(cause != RS_INVAL_NONE);
1793 :
1794 1746 : if (max_replication_slots == 0)
1795 2 : return invalidated;
1796 :
1797 1744 : XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
1798 :
1799 1772 : restart:
1800 1772 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1801 18546 : for (int i = 0; i < max_replication_slots; i++)
1802 : {
1803 16802 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
1804 :
1805 16802 : if (!s->in_use)
1806 16376 : continue;
1807 :
1808 426 : if (InvalidatePossiblyObsoleteSlot(cause, s, oldestLSN, dboid,
1809 : snapshotConflictHorizon,
1810 : &invalidated))
1811 : {
1812 : /* if the lock was released, start from scratch */
1813 28 : goto restart;
1814 : }
1815 : }
1816 1744 : LWLockRelease(ReplicationSlotControlLock);
1817 :
1818 : /*
1819 : * If any slots have been invalidated, recalculate the resource limits.
1820 : */
1821 1744 : if (invalidated)
1822 : {
1823 18 : ReplicationSlotsComputeRequiredXmin(false);
1824 18 : ReplicationSlotsComputeRequiredLSN();
1825 : }
1826 :
1827 1744 : return invalidated;
1828 : }
1829 :
1830 : /*
1831 : * Flush all replication slots to disk.
1832 : *
1833 : * It is convenient to flush dirty replication slots at the time of checkpoint.
1834 : * Additionally, in case of a shutdown checkpoint, we also identify the slots
1835 : * for which the confirmed_flush LSN has been updated since the last time it
1836 : * was saved and flush them.
1837 : */
1838 : void
1839 1704 : CheckPointReplicationSlots(bool is_shutdown)
1840 : {
1841 : int i;
1842 :
1843 1704 : elog(DEBUG1, "performing replication slot checkpoint");
1844 :
1845 : /*
1846 : * Prevent any slot from being created/dropped while we're active. As we
1847 : * explicitly do *not* want to block iterating over replication_slots or
1848 : * acquiring a slot we cannot take the control lock - but that's OK,
1849 : * because holding ReplicationSlotAllocationLock is strictly stronger, and
1850 : * enough to guarantee that nobody can change the in_use bits on us.
1851 : */
1852 1704 : LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
1853 :
1854 18246 : for (i = 0; i < max_replication_slots; i++)
1855 : {
1856 16542 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
1857 : char path[MAXPGPATH];
1858 :
1859 16542 : if (!s->in_use)
1860 16226 : continue;
1861 :
1862 : /* save the slot to disk, locking is handled in SaveSlotToPath() */
1863 316 : sprintf(path, "pg_replslot/%s", NameStr(s->data.name));
1864 :
1865 : /*
1866 : * Slot's data is not flushed each time the confirmed_flush LSN is
1867 : * updated as that could lead to frequent writes. However, we decide
1868 : * to force a flush of all logical slot's data at the time of shutdown
1869 : * if the confirmed_flush LSN is changed since we last flushed it to
1870 : * disk. This helps in avoiding an unnecessary retreat of the
1871 : * confirmed_flush LSN after restart.
1872 : */
1873 316 : if (is_shutdown && SlotIsLogical(s))
1874 : {
1875 118 : SpinLockAcquire(&s->mutex);
1876 :
1877 : Assert(s->data.confirmed_flush >= s->last_saved_confirmed_flush);
1878 :
1879 118 : if (s->data.invalidated == RS_INVAL_NONE &&
1880 118 : s->data.confirmed_flush != s->last_saved_confirmed_flush)
1881 : {
1882 68 : s->just_dirtied = true;
1883 68 : s->dirty = true;
1884 : }
1885 118 : SpinLockRelease(&s->mutex);
1886 : }
1887 :
1888 316 : SaveSlotToPath(s, path, LOG);
1889 : }
1890 1704 : LWLockRelease(ReplicationSlotAllocationLock);
1891 1704 : }
1892 :
1893 : /*
1894 : * Load all replication slots from disk into memory at server startup. This
1895 : * needs to be run before we start crash recovery.
1896 : */
1897 : void
1898 1520 : StartupReplicationSlots(void)
1899 : {
1900 : DIR *replication_dir;
1901 : struct dirent *replication_de;
1902 :
1903 1520 : elog(DEBUG1, "starting up replication slots");
1904 :
1905 : /* restore all slots by iterating over all on-disk entries */
1906 1520 : replication_dir = AllocateDir("pg_replslot");
1907 4678 : while ((replication_de = ReadDir(replication_dir, "pg_replslot")) != NULL)
1908 : {
1909 : char path[MAXPGPATH + 12];
1910 : PGFileType de_type;
1911 :
1912 3158 : if (strcmp(replication_de->d_name, ".") == 0 ||
1913 1638 : strcmp(replication_de->d_name, "..") == 0)
1914 3040 : continue;
1915 :
1916 118 : snprintf(path, sizeof(path), "pg_replslot/%s", replication_de->d_name);
1917 118 : de_type = get_dirent_type(path, replication_de, false, DEBUG1);
1918 :
1919 : /* we're only creating directories here, skip if it's not our's */
1920 118 : if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_DIR)
1921 0 : continue;
1922 :
1923 : /* we crashed while a slot was being setup or deleted, clean up */
1924 118 : if (pg_str_endswith(replication_de->d_name, ".tmp"))
1925 : {
1926 0 : if (!rmtree(path, true))
1927 : {
1928 0 : ereport(WARNING,
1929 : (errmsg("could not remove directory \"%s\"",
1930 : path)));
1931 0 : continue;
1932 : }
1933 0 : fsync_fname("pg_replslot", true);
1934 0 : continue;
1935 : }
1936 :
1937 : /* looks like a slot in a normal state, restore */
1938 118 : RestoreSlotFromDisk(replication_de->d_name);
1939 : }
1940 1520 : FreeDir(replication_dir);
1941 :
1942 : /* currently no slots exist, we're done. */
1943 1520 : if (max_replication_slots <= 0)
1944 2 : return;
1945 :
1946 : /* Now that we have recovered all the data, compute replication xmin */
1947 1518 : ReplicationSlotsComputeRequiredXmin(false);
1948 1518 : ReplicationSlotsComputeRequiredLSN();
1949 : }
1950 :
1951 : /* ----
1952 : * Manipulation of on-disk state of replication slots
1953 : *
1954 : * NB: none of the routines below should take any notice whether a slot is the
1955 : * current one or not, that's all handled a layer above.
1956 : * ----
1957 : */
1958 : static void
1959 1116 : CreateSlotOnDisk(ReplicationSlot *slot)
1960 : {
1961 : char tmppath[MAXPGPATH];
1962 : char path[MAXPGPATH];
1963 : struct stat st;
1964 :
1965 : /*
1966 : * No need to take out the io_in_progress_lock, nobody else can see this
1967 : * slot yet, so nobody else will write. We're reusing SaveSlotToPath which
1968 : * takes out the lock, if we'd take the lock here, we'd deadlock.
1969 : */
1970 :
1971 1116 : sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
1972 1116 : sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));
1973 :
1974 : /*
1975 : * It's just barely possible that some previous effort to create or drop a
1976 : * slot with this name left a temp directory lying around. If that seems
1977 : * to be the case, try to remove it. If the rmtree() fails, we'll error
1978 : * out at the MakePGDirectory() below, so we don't bother checking
1979 : * success.
1980 : */
1981 1116 : if (stat(tmppath, &st) == 0 && S_ISDIR(st.st_mode))
1982 0 : rmtree(tmppath, true);
1983 :
1984 : /* Create and fsync the temporary slot directory. */
1985 1116 : if (MakePGDirectory(tmppath) < 0)
1986 0 : ereport(ERROR,
1987 : (errcode_for_file_access(),
1988 : errmsg("could not create directory \"%s\": %m",
1989 : tmppath)));
1990 1116 : fsync_fname(tmppath, true);
1991 :
1992 : /* Write the actual state file. */
1993 1116 : slot->dirty = true; /* signal that we really need to write */
1994 1116 : SaveSlotToPath(slot, tmppath, ERROR);
1995 :
1996 : /* Rename the directory into place. */
1997 1116 : if (rename(tmppath, path) != 0)
1998 0 : ereport(ERROR,
1999 : (errcode_for_file_access(),
2000 : errmsg("could not rename file \"%s\" to \"%s\": %m",
2001 : tmppath, path)));
2002 :
2003 : /*
2004 : * If we'd now fail - really unlikely - we wouldn't know whether this slot
2005 : * would persist after an OS crash or not - so, force a restart. The
2006 : * restart would try to fsync this again till it works.
2007 : */
2008 1116 : START_CRIT_SECTION();
2009 :
2010 1116 : fsync_fname(path, true);
2011 1116 : fsync_fname("pg_replslot", true);
2012 :
2013 1116 : END_CRIT_SECTION();
2014 1116 : }
2015 :
2016 : /*
2017 : * Shared functionality between saving and creating a replication slot.
2018 : */
2019 : static void
2020 3664 : SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
2021 : {
2022 : char tmppath[MAXPGPATH];
2023 : char path[MAXPGPATH];
2024 : int fd;
2025 : ReplicationSlotOnDisk cp;
2026 : bool was_dirty;
2027 :
2028 : /* first check whether there's something to write out */
2029 3664 : SpinLockAcquire(&slot->mutex);
2030 3664 : was_dirty = slot->dirty;
2031 3664 : slot->just_dirtied = false;
2032 3664 : SpinLockRelease(&slot->mutex);
2033 :
2034 : /* and don't do anything if there's nothing to write */
2035 3664 : if (!was_dirty)
2036 164 : return;
2037 :
2038 3500 : LWLockAcquire(&slot->io_in_progress_lock, LW_EXCLUSIVE);
2039 :
2040 : /* silence valgrind :( */
2041 3500 : memset(&cp, 0, sizeof(ReplicationSlotOnDisk));
2042 :
2043 3500 : sprintf(tmppath, "%s/state.tmp", dir);
2044 3500 : sprintf(path, "%s/state", dir);
2045 :
2046 3500 : fd = OpenTransientFile(tmppath, O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
2047 3500 : if (fd < 0)
2048 : {
2049 : /*
2050 : * If not an ERROR, then release the lock before returning. In case
2051 : * of an ERROR, the error recovery path automatically releases the
2052 : * lock, but no harm in explicitly releasing even in that case. Note
2053 : * that LWLockRelease() could affect errno.
2054 : */
2055 0 : int save_errno = errno;
2056 :
2057 0 : LWLockRelease(&slot->io_in_progress_lock);
2058 0 : errno = save_errno;
2059 0 : ereport(elevel,
2060 : (errcode_for_file_access(),
2061 : errmsg("could not create file \"%s\": %m",
2062 : tmppath)));
2063 0 : return;
2064 : }
2065 :
2066 3500 : cp.magic = SLOT_MAGIC;
2067 3500 : INIT_CRC32C(cp.checksum);
2068 3500 : cp.version = SLOT_VERSION;
2069 3500 : cp.length = ReplicationSlotOnDiskV2Size;
2070 :
2071 3500 : SpinLockAcquire(&slot->mutex);
2072 :
2073 3500 : memcpy(&cp.slotdata, &slot->data, sizeof(ReplicationSlotPersistentData));
2074 :
2075 3500 : SpinLockRelease(&slot->mutex);
2076 :
2077 3500 : COMP_CRC32C(cp.checksum,
2078 : (char *) (&cp) + ReplicationSlotOnDiskNotChecksummedSize,
2079 : ReplicationSlotOnDiskChecksummedSize);
2080 3500 : FIN_CRC32C(cp.checksum);
2081 :
2082 3500 : errno = 0;
2083 3500 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_WRITE);
2084 3500 : if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
2085 : {
2086 0 : int save_errno = errno;
2087 :
2088 0 : pgstat_report_wait_end();
2089 0 : CloseTransientFile(fd);
2090 0 : LWLockRelease(&slot->io_in_progress_lock);
2091 :
2092 : /* if write didn't set errno, assume problem is no disk space */
2093 0 : errno = save_errno ? save_errno : ENOSPC;
2094 0 : ereport(elevel,
2095 : (errcode_for_file_access(),
2096 : errmsg("could not write to file \"%s\": %m",
2097 : tmppath)));
2098 0 : return;
2099 : }
2100 3500 : pgstat_report_wait_end();
2101 :
2102 : /* fsync the temporary file */
2103 3500 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_SYNC);
2104 3500 : if (pg_fsync(fd) != 0)
2105 : {
2106 0 : int save_errno = errno;
2107 :
2108 0 : pgstat_report_wait_end();
2109 0 : CloseTransientFile(fd);
2110 0 : LWLockRelease(&slot->io_in_progress_lock);
2111 0 : errno = save_errno;
2112 0 : ereport(elevel,
2113 : (errcode_for_file_access(),
2114 : errmsg("could not fsync file \"%s\": %m",
2115 : tmppath)));
2116 0 : return;
2117 : }
2118 3500 : pgstat_report_wait_end();
2119 :
2120 3500 : if (CloseTransientFile(fd) != 0)
2121 : {
2122 0 : int save_errno = errno;
2123 :
2124 0 : LWLockRelease(&slot->io_in_progress_lock);
2125 0 : errno = save_errno;
2126 0 : ereport(elevel,
2127 : (errcode_for_file_access(),
2128 : errmsg("could not close file \"%s\": %m",
2129 : tmppath)));
2130 0 : return;
2131 : }
2132 :
2133 : /* rename to permanent file, fsync file and directory */
2134 3500 : if (rename(tmppath, path) != 0)
2135 : {
2136 0 : int save_errno = errno;
2137 :
2138 0 : LWLockRelease(&slot->io_in_progress_lock);
2139 0 : errno = save_errno;
2140 0 : ereport(elevel,
2141 : (errcode_for_file_access(),
2142 : errmsg("could not rename file \"%s\" to \"%s\": %m",
2143 : tmppath, path)));
2144 0 : return;
2145 : }
2146 :
2147 : /*
2148 : * Check CreateSlotOnDisk() for the reasoning of using a critical section.
2149 : */
2150 3500 : START_CRIT_SECTION();
2151 :
2152 3500 : fsync_fname(path, false);
2153 3500 : fsync_fname(dir, true);
2154 3500 : fsync_fname("pg_replslot", true);
2155 :
2156 3500 : END_CRIT_SECTION();
2157 :
2158 : /*
2159 : * Successfully wrote, unset dirty bit, unless somebody dirtied again
2160 : * already and remember the confirmed_flush LSN value.
2161 : */
2162 3500 : SpinLockAcquire(&slot->mutex);
2163 3500 : if (!slot->just_dirtied)
2164 3496 : slot->dirty = false;
2165 3500 : slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
2166 3500 : SpinLockRelease(&slot->mutex);
2167 :
2168 3500 : LWLockRelease(&slot->io_in_progress_lock);
2169 : }
2170 :
2171 : /*
2172 : * Load a single slot from disk into memory.
2173 : */
2174 : static void
2175 118 : RestoreSlotFromDisk(const char *name)
2176 : {
2177 : ReplicationSlotOnDisk cp;
2178 : int i;
2179 : char slotdir[MAXPGPATH + 12];
2180 : char path[MAXPGPATH + 22];
2181 : int fd;
2182 118 : bool restored = false;
2183 : int readBytes;
2184 : pg_crc32c checksum;
2185 :
2186 : /* no need to lock here, no concurrent access allowed yet */
2187 :
2188 : /* delete temp file if it exists */
2189 118 : sprintf(slotdir, "pg_replslot/%s", name);
2190 118 : sprintf(path, "%s/state.tmp", slotdir);
2191 118 : if (unlink(path) < 0 && errno != ENOENT)
2192 0 : ereport(PANIC,
2193 : (errcode_for_file_access(),
2194 : errmsg("could not remove file \"%s\": %m", path)));
2195 :
2196 118 : sprintf(path, "%s/state", slotdir);
2197 :
2198 118 : elog(DEBUG1, "restoring replication slot from \"%s\"", path);
2199 :
2200 : /* on some operating systems fsyncing a file requires O_RDWR */
2201 118 : fd = OpenTransientFile(path, O_RDWR | PG_BINARY);
2202 :
2203 : /*
2204 : * We do not need to handle this as we are rename()ing the directory into
2205 : * place only after we fsync()ed the state file.
2206 : */
2207 118 : if (fd < 0)
2208 0 : ereport(PANIC,
2209 : (errcode_for_file_access(),
2210 : errmsg("could not open file \"%s\": %m", path)));
2211 :
2212 : /*
2213 : * Sync state file before we're reading from it. We might have crashed
2214 : * while it wasn't synced yet and we shouldn't continue on that basis.
2215 : */
2216 118 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_RESTORE_SYNC);
2217 118 : if (pg_fsync(fd) != 0)
2218 0 : ereport(PANIC,
2219 : (errcode_for_file_access(),
2220 : errmsg("could not fsync file \"%s\": %m",
2221 : path)));
2222 118 : pgstat_report_wait_end();
2223 :
2224 : /* Also sync the parent directory */
2225 118 : START_CRIT_SECTION();
2226 118 : fsync_fname(slotdir, true);
2227 118 : END_CRIT_SECTION();
2228 :
2229 : /* read part of statefile that's guaranteed to be version independent */
2230 118 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
2231 118 : readBytes = read(fd, &cp, ReplicationSlotOnDiskConstantSize);
2232 118 : pgstat_report_wait_end();
2233 118 : if (readBytes != ReplicationSlotOnDiskConstantSize)
2234 : {
2235 0 : if (readBytes < 0)
2236 0 : ereport(PANIC,
2237 : (errcode_for_file_access(),
2238 : errmsg("could not read file \"%s\": %m", path)));
2239 : else
2240 0 : ereport(PANIC,
2241 : (errcode(ERRCODE_DATA_CORRUPTED),
2242 : errmsg("could not read file \"%s\": read %d of %zu",
2243 : path, readBytes,
2244 : (Size) ReplicationSlotOnDiskConstantSize)));
2245 : }
2246 :
2247 : /* verify magic */
2248 118 : if (cp.magic != SLOT_MAGIC)
2249 0 : ereport(PANIC,
2250 : (errcode(ERRCODE_DATA_CORRUPTED),
2251 : errmsg("replication slot file \"%s\" has wrong magic number: %u instead of %u",
2252 : path, cp.magic, SLOT_MAGIC)));
2253 :
2254 : /* verify version */
2255 118 : if (cp.version != SLOT_VERSION)
2256 0 : ereport(PANIC,
2257 : (errcode(ERRCODE_DATA_CORRUPTED),
2258 : errmsg("replication slot file \"%s\" has unsupported version %u",
2259 : path, cp.version)));
2260 :
2261 : /* boundary check on length */
2262 118 : if (cp.length != ReplicationSlotOnDiskV2Size)
2263 0 : ereport(PANIC,
2264 : (errcode(ERRCODE_DATA_CORRUPTED),
2265 : errmsg("replication slot file \"%s\" has corrupted length %u",
2266 : path, cp.length)));
2267 :
2268 : /* Now that we know the size, read the entire file */
2269 118 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
2270 236 : readBytes = read(fd,
2271 : (char *) &cp + ReplicationSlotOnDiskConstantSize,
2272 118 : cp.length);
2273 118 : pgstat_report_wait_end();
2274 118 : if (readBytes != cp.length)
2275 : {
2276 0 : if (readBytes < 0)
2277 0 : ereport(PANIC,
2278 : (errcode_for_file_access(),
2279 : errmsg("could not read file \"%s\": %m", path)));
2280 : else
2281 0 : ereport(PANIC,
2282 : (errcode(ERRCODE_DATA_CORRUPTED),
2283 : errmsg("could not read file \"%s\": read %d of %zu",
2284 : path, readBytes, (Size) cp.length)));
2285 : }
2286 :
2287 118 : if (CloseTransientFile(fd) != 0)
2288 0 : ereport(PANIC,
2289 : (errcode_for_file_access(),
2290 : errmsg("could not close file \"%s\": %m", path)));
2291 :
2292 : /* now verify the CRC */
2293 118 : INIT_CRC32C(checksum);
2294 118 : COMP_CRC32C(checksum,
2295 : (char *) &cp + ReplicationSlotOnDiskNotChecksummedSize,
2296 : ReplicationSlotOnDiskChecksummedSize);
2297 118 : FIN_CRC32C(checksum);
2298 :
2299 118 : if (!EQ_CRC32C(checksum, cp.checksum))
2300 0 : ereport(PANIC,
2301 : (errmsg("checksum mismatch for replication slot file \"%s\": is %u, should be %u",
2302 : path, checksum, cp.checksum)));
2303 :
2304 : /*
2305 : * If we crashed with an ephemeral slot active, don't restore but delete
2306 : * it.
2307 : */
2308 118 : if (cp.slotdata.persistency != RS_PERSISTENT)
2309 : {
2310 0 : if (!rmtree(slotdir, true))
2311 : {
2312 0 : ereport(WARNING,
2313 : (errmsg("could not remove directory \"%s\"",
2314 : slotdir)));
2315 : }
2316 0 : fsync_fname("pg_replslot", true);
2317 0 : return;
2318 : }
2319 :
2320 : /*
2321 : * Verify that requirements for the specific slot type are met. That's
2322 : * important because if these aren't met we're not guaranteed to retain
2323 : * all the necessary resources for the slot.
2324 : *
2325 : * NB: We have to do so *after* the above checks for ephemeral slots,
2326 : * because otherwise a slot that shouldn't exist anymore could prevent
2327 : * restarts.
2328 : *
2329 : * NB: Changing the requirements here also requires adapting
2330 : * CheckSlotRequirements() and CheckLogicalDecodingRequirements().
2331 : */
2332 118 : if (cp.slotdata.database != InvalidOid && wal_level < WAL_LEVEL_LOGICAL)
2333 0 : ereport(FATAL,
2334 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2335 : errmsg("logical replication slot \"%s\" exists, but wal_level < logical",
2336 : NameStr(cp.slotdata.name)),
2337 : errhint("Change wal_level to be logical or higher.")));
2338 118 : else if (wal_level < WAL_LEVEL_REPLICA)
2339 0 : ereport(FATAL,
2340 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2341 : errmsg("physical replication slot \"%s\" exists, but wal_level < replica",
2342 : NameStr(cp.slotdata.name)),
2343 : errhint("Change wal_level to be replica or higher.")));
2344 :
2345 : /* nothing can be active yet, don't lock anything */
2346 170 : for (i = 0; i < max_replication_slots; i++)
2347 : {
2348 : ReplicationSlot *slot;
2349 :
2350 170 : slot = &ReplicationSlotCtl->replication_slots[i];
2351 :
2352 170 : if (slot->in_use)
2353 52 : continue;
2354 :
2355 : /* restore the entire set of persistent data */
2356 118 : memcpy(&slot->data, &cp.slotdata,
2357 : sizeof(ReplicationSlotPersistentData));
2358 :
2359 : /* initialize in memory state */
2360 118 : slot->effective_xmin = cp.slotdata.xmin;
2361 118 : slot->effective_catalog_xmin = cp.slotdata.catalog_xmin;
2362 118 : slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
2363 :
2364 118 : slot->candidate_catalog_xmin = InvalidTransactionId;
2365 118 : slot->candidate_xmin_lsn = InvalidXLogRecPtr;
2366 118 : slot->candidate_restart_lsn = InvalidXLogRecPtr;
2367 118 : slot->candidate_restart_valid = InvalidXLogRecPtr;
2368 :
2369 118 : slot->in_use = true;
2370 118 : slot->active_pid = 0;
2371 :
2372 : /*
2373 : * Set the time since the slot has become inactive after loading the
2374 : * slot from the disk into memory. Whoever acquires the slot i.e.
2375 : * makes the slot active will reset it.
2376 : */
2377 118 : slot->inactive_since = GetCurrentTimestamp();
2378 :
2379 118 : restored = true;
2380 118 : break;
2381 : }
2382 :
2383 118 : if (!restored)
2384 0 : ereport(FATAL,
2385 : (errmsg("too many replication slots active before shutdown"),
2386 : errhint("Increase max_replication_slots and try again.")));
2387 : }
2388 :
2389 : /*
2390 : * Maps an invalidation reason for a replication slot to
2391 : * ReplicationSlotInvalidationCause.
2392 : */
2393 : ReplicationSlotInvalidationCause
2394 0 : GetSlotInvalidationCause(const char *invalidation_reason)
2395 : {
2396 : ReplicationSlotInvalidationCause cause;
2397 0 : ReplicationSlotInvalidationCause result = RS_INVAL_NONE;
2398 0 : bool found PG_USED_FOR_ASSERTS_ONLY = false;
2399 :
2400 : Assert(invalidation_reason);
2401 :
2402 0 : for (cause = RS_INVAL_NONE; cause <= RS_INVAL_MAX_CAUSES; cause++)
2403 : {
2404 0 : if (strcmp(SlotInvalidationCauses[cause], invalidation_reason) == 0)
2405 : {
2406 0 : found = true;
2407 0 : result = cause;
2408 0 : break;
2409 : }
2410 : }
2411 :
2412 : Assert(found);
2413 0 : return result;
2414 : }
2415 :
2416 : /*
2417 : * A helper function to validate slots specified in GUC standby_slot_names.
2418 : *
2419 : * The rawname will be parsed, and the result will be saved into *elemlist.
2420 : */
2421 : static bool
2422 12 : validate_standby_slots(char *rawname, List **elemlist)
2423 : {
2424 : bool ok;
2425 :
2426 : /* Verify syntax and parse string into a list of identifiers */
2427 12 : ok = SplitIdentifierString(rawname, ',', elemlist);
2428 :
2429 12 : if (!ok)
2430 : {
2431 0 : GUC_check_errdetail("List syntax is invalid.");
2432 : }
2433 12 : else if (!ReplicationSlotCtl)
2434 : {
2435 : /*
2436 : * We cannot validate the replication slot if the replication slots'
2437 : * data has not been initialized. This is ok as we will anyway
2438 : * validate the specified slot when waiting for them to catch up. See
2439 : * StandbySlotsHaveCaughtup() for details.
2440 : */
2441 : }
2442 : else
2443 : {
2444 : /* Check that the specified slots exist and are logical slots */
2445 12 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
2446 :
2447 36 : foreach_ptr(char, name, *elemlist)
2448 : {
2449 : ReplicationSlot *slot;
2450 :
2451 12 : slot = SearchNamedReplicationSlot(name, false);
2452 :
2453 12 : if (!slot)
2454 : {
2455 0 : GUC_check_errdetail("replication slot \"%s\" does not exist",
2456 : name);
2457 0 : ok = false;
2458 0 : break;
2459 : }
2460 :
2461 12 : if (!SlotIsPhysical(slot))
2462 : {
2463 0 : GUC_check_errdetail("\"%s\" is not a physical replication slot",
2464 : name);
2465 0 : ok = false;
2466 0 : break;
2467 : }
2468 : }
2469 :
2470 12 : LWLockRelease(ReplicationSlotControlLock);
2471 : }
2472 :
2473 12 : return ok;
2474 : }
2475 :
2476 : /*
2477 : * GUC check_hook for standby_slot_names
2478 : */
2479 : bool
2480 1850 : check_standby_slot_names(char **newval, void **extra, GucSource source)
2481 : {
2482 : char *rawname;
2483 : char *ptr;
2484 : List *elemlist;
2485 : int size;
2486 : bool ok;
2487 : StandbySlotNamesConfigData *config;
2488 :
2489 1850 : if ((*newval)[0] == '\0')
2490 1838 : return true;
2491 :
2492 : /* Need a modifiable copy of the GUC string */
2493 12 : rawname = pstrdup(*newval);
2494 :
2495 : /* Now verify if the specified slots exist and have correct type */
2496 12 : ok = validate_standby_slots(rawname, &elemlist);
2497 :
2498 12 : if (!ok || elemlist == NIL)
2499 : {
2500 0 : pfree(rawname);
2501 0 : list_free(elemlist);
2502 0 : return ok;
2503 : }
2504 :
2505 : /* Compute the size required for the StandbySlotNamesConfigData struct */
2506 12 : size = offsetof(StandbySlotNamesConfigData, slot_names);
2507 36 : foreach_ptr(char, slot_name, elemlist)
2508 12 : size += strlen(slot_name) + 1;
2509 :
2510 : /* GUC extra value must be guc_malloc'd, not palloc'd */
2511 12 : config = (StandbySlotNamesConfigData *) guc_malloc(LOG, size);
2512 :
2513 : /* Transform the data into StandbySlotNamesConfigData */
2514 12 : config->nslotnames = list_length(elemlist);
2515 :
2516 12 : ptr = config->slot_names;
2517 36 : foreach_ptr(char, slot_name, elemlist)
2518 : {
2519 12 : strcpy(ptr, slot_name);
2520 12 : ptr += strlen(slot_name) + 1;
2521 : }
2522 :
2523 12 : *extra = (void *) config;
2524 :
2525 12 : pfree(rawname);
2526 12 : list_free(elemlist);
2527 12 : return true;
2528 : }
2529 :
2530 : /*
2531 : * GUC assign_hook for standby_slot_names
2532 : */
2533 : void
2534 1850 : assign_standby_slot_names(const char *newval, void *extra)
2535 : {
2536 : /*
2537 : * The standby slots may have changed, so we must recompute the oldest
2538 : * LSN.
2539 : */
2540 1850 : ss_oldest_flush_lsn = InvalidXLogRecPtr;
2541 :
2542 1850 : standby_slot_names_config = (StandbySlotNamesConfigData *) extra;
2543 1850 : }
2544 :
2545 : /*
2546 : * Check if the passed slot_name is specified in the standby_slot_names GUC.
2547 : */
2548 : bool
2549 37354 : SlotExistsInStandbySlotNames(const char *slot_name)
2550 : {
2551 : const char *standby_slot_name;
2552 :
2553 : /* Return false if there is no value in standby_slot_names */
2554 37354 : if (standby_slot_names_config == NULL)
2555 37344 : return false;
2556 :
2557 : /*
2558 : * XXX: We are not expecting this list to be long so a linear search
2559 : * shouldn't hurt but if that turns out not to be true then we can cache
2560 : * this information for each WalSender as well.
2561 : */
2562 10 : standby_slot_name = standby_slot_names_config->slot_names;
2563 10 : for (int i = 0; i < standby_slot_names_config->nslotnames; i++)
2564 : {
2565 10 : if (strcmp(standby_slot_name, slot_name) == 0)
2566 10 : return true;
2567 :
2568 0 : standby_slot_name += strlen(standby_slot_name) + 1;
2569 : }
2570 :
2571 0 : return false;
2572 : }
2573 :
2574 : /*
2575 : * Return true if the slots specified in standby_slot_names have caught up to
2576 : * the given WAL location, false otherwise.
2577 : *
2578 : * The elevel parameter specifies the error level used for logging messages
2579 : * related to slots that do not exist, are invalidated, or are inactive.
2580 : */
2581 : bool
2582 1190 : StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
2583 : {
2584 : const char *name;
2585 1190 : int caught_up_slot_num = 0;
2586 1190 : XLogRecPtr min_restart_lsn = InvalidXLogRecPtr;
2587 :
2588 : /*
2589 : * Don't need to wait for the standbys to catch up if there is no value in
2590 : * standby_slot_names.
2591 : */
2592 1190 : if (standby_slot_names_config == NULL)
2593 1164 : return true;
2594 :
2595 : /*
2596 : * Don't need to wait for the standbys to catch up if we are on a standby
2597 : * server, since we do not support syncing slots to cascading standbys.
2598 : */
2599 26 : if (RecoveryInProgress())
2600 0 : return true;
2601 :
2602 : /*
2603 : * Don't need to wait for the standbys to catch up if they are already
2604 : * beyond the specified WAL location.
2605 : */
2606 26 : if (!XLogRecPtrIsInvalid(ss_oldest_flush_lsn) &&
2607 18 : ss_oldest_flush_lsn >= wait_for_lsn)
2608 10 : return true;
2609 :
2610 : /*
2611 : * To prevent concurrent slot dropping and creation while filtering the
2612 : * slots, take the ReplicationSlotControlLock outside of the loop.
2613 : */
2614 16 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
2615 :
2616 16 : name = standby_slot_names_config->slot_names;
2617 22 : for (int i = 0; i < standby_slot_names_config->nslotnames; i++)
2618 : {
2619 : XLogRecPtr restart_lsn;
2620 : bool invalidated;
2621 : bool inactive;
2622 : ReplicationSlot *slot;
2623 :
2624 16 : slot = SearchNamedReplicationSlot(name, false);
2625 :
2626 16 : if (!slot)
2627 : {
2628 : /*
2629 : * If a slot name provided in standby_slot_names does not exist,
2630 : * report a message and exit the loop. A user can specify a slot
2631 : * name that does not exist just before the server startup. The
2632 : * GUC check_hook(validate_standby_slots) cannot validate such a
2633 : * slot during startup as the ReplicationSlotCtl shared memory is
2634 : * not initialized at that time. It is also possible for a user to
2635 : * drop the slot in standby_slot_names afterwards.
2636 : */
2637 0 : ereport(elevel,
2638 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2639 : errmsg("replication slot \"%s\" specified in parameter %s does not exist",
2640 : name, "standby_slot_names"),
2641 : errdetail("Logical replication is waiting on the standby associated with \"%s\".",
2642 : name),
2643 : errhint("Consider creating the slot \"%s\" or amend parameter %s.",
2644 : name, "standby_slot_names"));
2645 0 : break;
2646 : }
2647 :
2648 16 : if (SlotIsLogical(slot))
2649 : {
2650 : /*
2651 : * If a logical slot name is provided in standby_slot_names,
2652 : * report a message and exit the loop. Similar to the non-existent
2653 : * case, a user can specify a logical slot name in
2654 : * standby_slot_names before the server startup, or drop an
2655 : * existing physical slot and recreate a logical slot with the
2656 : * same name.
2657 : */
2658 0 : ereport(elevel,
2659 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2660 : errmsg("cannot have logical replication slot \"%s\" in parameter %s",
2661 : name, "standby_slot_names"),
2662 : errdetail("Logical replication is waiting for correction on \"%s\".",
2663 : name),
2664 : errhint("Consider removing logical slot \"%s\" from parameter %s.",
2665 : name, "standby_slot_names"));
2666 0 : break;
2667 : }
2668 :
2669 16 : SpinLockAcquire(&slot->mutex);
2670 16 : restart_lsn = slot->data.restart_lsn;
2671 16 : invalidated = slot->data.invalidated != RS_INVAL_NONE;
2672 16 : inactive = slot->active_pid == 0;
2673 16 : SpinLockRelease(&slot->mutex);
2674 :
2675 16 : if (invalidated)
2676 : {
2677 : /* Specified physical slot has been invalidated */
2678 0 : ereport(elevel,
2679 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2680 : errmsg("physical slot \"%s\" specified in parameter %s has been invalidated",
2681 : name, "standby_slot_names"),
2682 : errdetail("Logical replication is waiting on the standby associated with \"%s\".",
2683 : name),
2684 : errhint("Consider dropping and recreating the slot \"%s\" or amend parameter %s.",
2685 : name, "standby_slot_names"));
2686 0 : break;
2687 : }
2688 :
2689 16 : if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn < wait_for_lsn)
2690 : {
2691 : /* Log a message if no active_pid for this physical slot */
2692 10 : if (inactive)
2693 8 : ereport(elevel,
2694 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2695 : errmsg("replication slot \"%s\" specified in parameter %s does not have active_pid",
2696 : name, "standby_slot_names"),
2697 : errdetail("Logical replication is waiting on the standby associated with \"%s\".",
2698 : name),
2699 : errhint("Consider starting standby associated with \"%s\" or amend parameter %s.",
2700 : name, "standby_slot_names"));
2701 :
2702 : /* Continue if the current slot hasn't caught up. */
2703 10 : break;
2704 : }
2705 :
2706 : Assert(restart_lsn >= wait_for_lsn);
2707 :
2708 6 : if (XLogRecPtrIsInvalid(min_restart_lsn) ||
2709 : min_restart_lsn > restart_lsn)
2710 6 : min_restart_lsn = restart_lsn;
2711 :
2712 6 : caught_up_slot_num++;
2713 :
2714 6 : name += strlen(name) + 1;
2715 : }
2716 :
2717 16 : LWLockRelease(ReplicationSlotControlLock);
2718 :
2719 : /*
2720 : * Return false if not all the standbys have caught up to the specified
2721 : * WAL location.
2722 : */
2723 16 : if (caught_up_slot_num != standby_slot_names_config->nslotnames)
2724 10 : return false;
2725 :
2726 : /* The ss_oldest_flush_lsn must not retreat. */
2727 : Assert(XLogRecPtrIsInvalid(ss_oldest_flush_lsn) ||
2728 : min_restart_lsn >= ss_oldest_flush_lsn);
2729 :
2730 6 : ss_oldest_flush_lsn = min_restart_lsn;
2731 :
2732 6 : return true;
2733 : }
2734 :
2735 : /*
2736 : * Wait for physical standbys to confirm receiving the given lsn.
2737 : *
2738 : * Used by logical decoding SQL functions. It waits for physical standbys
2739 : * corresponding to the physical slots specified in the standby_slot_names GUC.
2740 : */
2741 : void
2742 400 : WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
2743 : {
2744 : /*
2745 : * Don't need to wait for the standby to catch up if the current acquired
2746 : * slot is not a logical failover slot, or there is no value in
2747 : * standby_slot_names.
2748 : */
2749 400 : if (!MyReplicationSlot->data.failover || !standby_slot_names_config)
2750 398 : return;
2751 :
2752 2 : ConditionVariablePrepareToSleep(&WalSndCtl->wal_confirm_rcv_cv);
2753 :
2754 : for (;;)
2755 : {
2756 4 : CHECK_FOR_INTERRUPTS();
2757 :
2758 4 : if (ConfigReloadPending)
2759 : {
2760 2 : ConfigReloadPending = false;
2761 2 : ProcessConfigFile(PGC_SIGHUP);
2762 : }
2763 :
2764 : /* Exit if done waiting for every slot. */
2765 4 : if (StandbySlotsHaveCaughtup(wait_for_lsn, WARNING))
2766 2 : break;
2767 :
2768 : /*
2769 : * Wait for the slots in the standby_slot_names to catch up, but use a
2770 : * timeout (1s) so we can also check if the standby_slot_names has
2771 : * been changed.
2772 : */
2773 2 : ConditionVariableTimedSleep(&WalSndCtl->wal_confirm_rcv_cv, 1000,
2774 : WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION);
2775 : }
2776 :
2777 2 : ConditionVariableCancelSleep();
2778 : }
|