Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * slot.c
4 : * Replication slot management.
5 : *
6 : *
7 : * Copyright (c) 2012-2024, PostgreSQL Global Development Group
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/backend/replication/slot.c
12 : *
13 : * NOTES
14 : *
15 : * Replication slots are used to keep state about replication streams
16 : * originating from this cluster. Their primary purpose is to prevent the
17 : * premature removal of WAL or of old tuple versions in a manner that would
18 : * interfere with replication; they are also useful for monitoring purposes.
19 : * Slots need to be permanent (to allow restarts), crash-safe, and allocatable
20 : * on standbys (to support cascading setups). The requirement that slots be
21 : * usable on standbys precludes storing them in the system catalogs.
22 : *
23 : * Each replication slot gets its own directory inside the $PGDATA/pg_replslot
24 : * directory. Inside that directory the state file will contain the slot's
25 : * own data. Additional data can be stored alongside that file if required.
26 : * While the server is running, the state data is also cached in memory for
27 : * efficiency.
28 : *
29 : * ReplicationSlotAllocationLock must be taken in exclusive mode to allocate
30 : * or free a slot. ReplicationSlotControlLock must be taken in shared mode
31 : * to iterate over the slots, and in exclusive mode to change the in_use flag
32 : * of a slot. The remaining data in each slot is protected by its mutex.
33 : *
34 : *-------------------------------------------------------------------------
35 : */
36 :
37 : #include "postgres.h"
38 :
39 : #include <unistd.h>
40 : #include <sys/stat.h>
41 :
42 : #include "access/transam.h"
43 : #include "access/xlog_internal.h"
44 : #include "access/xlogrecovery.h"
45 : #include "common/file_utils.h"
46 : #include "common/string.h"
47 : #include "miscadmin.h"
48 : #include "pgstat.h"
49 : #include "postmaster/interrupt.h"
50 : #include "replication/slotsync.h"
51 : #include "replication/slot.h"
52 : #include "replication/walsender_private.h"
53 : #include "storage/fd.h"
54 : #include "storage/ipc.h"
55 : #include "storage/proc.h"
56 : #include "storage/procarray.h"
57 : #include "utils/builtins.h"
58 : #include "utils/guc_hooks.h"
59 : #include "utils/varlena.h"
60 :
61 : /*
62 : * Replication slot on-disk data structure.
63 : */
64 : typedef struct ReplicationSlotOnDisk
65 : {
66 : /* first part of this struct needs to be version independent */
67 :
68 : /* data not covered by checksum */
69 : uint32 magic;
70 : pg_crc32c checksum;
71 :
72 : /* data covered by checksum */
73 : uint32 version;
74 : uint32 length;
75 :
76 : /*
77 : * The actual data in the slot that follows can differ based on the above
78 : * 'version'.
79 : */
80 :
81 : ReplicationSlotPersistentData slotdata;
82 : } ReplicationSlotOnDisk;
83 :
84 : /*
85 : * Struct for the configuration of synchronized_standby_slots.
86 : *
87 : * Note: this must be a flat representation that can be held in a single chunk
88 : * of guc_malloc'd memory, so that it can be stored as the "extra" data for the
89 : * synchronized_standby_slots GUC.
90 : */
91 : typedef struct
92 : {
93 : /* Number of slot names in the slot_names[] */
94 : int nslotnames;
95 :
96 : /*
97 : * slot_names contains 'nslotnames' consecutive null-terminated C strings.
98 : */
99 : char slot_names[FLEXIBLE_ARRAY_MEMBER];
100 : } SyncStandbySlotsConfigData;
101 :
102 : /*
103 : * Lookup table for slot invalidation causes.
104 : */
105 : const char *const SlotInvalidationCauses[] = {
106 : [RS_INVAL_NONE] = "none",
107 : [RS_INVAL_WAL_REMOVED] = "wal_removed",
108 : [RS_INVAL_HORIZON] = "rows_removed",
109 : [RS_INVAL_WAL_LEVEL] = "wal_level_insufficient",
110 : };
111 :
112 : /* Maximum number of invalidation causes */
113 : #define RS_INVAL_MAX_CAUSES RS_INVAL_WAL_LEVEL
114 :
115 : StaticAssertDecl(lengthof(SlotInvalidationCauses) == (RS_INVAL_MAX_CAUSES + 1),
116 : "array length mismatch");
117 :
118 : /* size of version independent data */
119 : #define ReplicationSlotOnDiskConstantSize \
120 : offsetof(ReplicationSlotOnDisk, slotdata)
121 : /* size of the part of the slot not covered by the checksum */
122 : #define ReplicationSlotOnDiskNotChecksummedSize \
123 : offsetof(ReplicationSlotOnDisk, version)
124 : /* size of the part covered by the checksum */
125 : #define ReplicationSlotOnDiskChecksummedSize \
126 : sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskNotChecksummedSize
127 : /* size of the slot data that is version dependent */
128 : #define ReplicationSlotOnDiskV2Size \
129 : sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
130 :
131 : #define SLOT_MAGIC 0x1051CA1 /* format identifier */
132 : #define SLOT_VERSION 5 /* version for new files */
133 :
134 : /* Control array for replication slot management */
135 : ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
136 :
137 : /* My backend's replication slot in the shared memory array */
138 : ReplicationSlot *MyReplicationSlot = NULL;
139 :
140 : /* GUC variables */
141 : int max_replication_slots = 10; /* the maximum number of replication
142 : * slots */
143 :
144 : /*
145 : * This GUC lists streaming replication standby server slot names that
146 : * logical WAL sender processes will wait for.
147 : */
148 : char *synchronized_standby_slots;
149 :
150 : /* This is the parsed and cached configuration for synchronized_standby_slots */
151 : static SyncStandbySlotsConfigData *synchronized_standby_slots_config;
152 :
153 : /*
154 : * Oldest LSN that has been confirmed to be flushed to the standbys
155 : * corresponding to the physical slots specified in the synchronized_standby_slots GUC.
156 : */
157 : static XLogRecPtr ss_oldest_flush_lsn = InvalidXLogRecPtr;
158 :
159 : static void ReplicationSlotShmemExit(int code, Datum arg);
160 : static void ReplicationSlotDropPtr(ReplicationSlot *slot);
161 :
162 : /* internal persistency functions */
163 : static void RestoreSlotFromDisk(const char *name);
164 : static void CreateSlotOnDisk(ReplicationSlot *slot);
165 : static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel);
166 :
167 : /*
168 : * Report shared-memory space needed by ReplicationSlotsShmemInit.
169 : */
170 : Size
171 6918 : ReplicationSlotsShmemSize(void)
172 : {
173 6918 : Size size = 0;
174 :
175 6918 : if (max_replication_slots == 0)
176 4 : return size;
177 :
178 6914 : size = offsetof(ReplicationSlotCtlData, replication_slots);
179 6914 : size = add_size(size,
180 : mul_size(max_replication_slots, sizeof(ReplicationSlot)));
181 :
182 6914 : return size;
183 : }
184 :
185 : /*
186 : * Allocate and initialize shared memory for replication slots.
187 : */
188 : void
189 1790 : ReplicationSlotsShmemInit(void)
190 : {
191 : bool found;
192 :
193 1790 : if (max_replication_slots == 0)
194 2 : return;
195 :
196 1788 : ReplicationSlotCtl = (ReplicationSlotCtlData *)
197 1788 : ShmemInitStruct("ReplicationSlot Ctl", ReplicationSlotsShmemSize(),
198 : &found);
199 :
200 1788 : if (!found)
201 : {
202 : int i;
203 :
204 : /* First time through, so initialize */
205 3468 : MemSet(ReplicationSlotCtl, 0, ReplicationSlotsShmemSize());
206 :
207 19292 : for (i = 0; i < max_replication_slots; i++)
208 : {
209 17504 : ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i];
210 :
211 : /* everything else is zeroed by the memset above */
212 17504 : SpinLockInit(&slot->mutex);
213 17504 : LWLockInitialize(&slot->io_in_progress_lock,
214 : LWTRANCHE_REPLICATION_SLOT_IO);
215 17504 : ConditionVariableInit(&slot->active_cv);
216 : }
217 : }
218 : }
219 :
220 : /*
221 : * Register the callback for replication slot cleanup and releasing.
222 : */
223 : void
224 30100 : ReplicationSlotInitialize(void)
225 : {
226 30100 : before_shmem_exit(ReplicationSlotShmemExit, 0);
227 30100 : }
228 :
229 : /*
230 : * Release and cleanup replication slots.
231 : */
232 : static void
233 30100 : ReplicationSlotShmemExit(int code, Datum arg)
234 : {
235 : /* Make sure active replication slots are released */
236 30100 : if (MyReplicationSlot != NULL)
237 392 : ReplicationSlotRelease();
238 :
239 : /* Also cleanup all the temporary slots. */
240 30100 : ReplicationSlotCleanup(false);
241 30100 : }
242 :
243 : /*
244 : * Check whether the passed slot name is valid and report errors at elevel.
245 : *
246 : * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow
247 : * the name to be used as a directory name on every supported OS.
248 : *
249 : * Returns whether the directory name is valid or not if elevel < ERROR.
250 : */
251 : bool
252 1592 : ReplicationSlotValidateName(const char *name, int elevel)
253 : {
254 : const char *cp;
255 :
256 1592 : if (strlen(name) == 0)
257 : {
258 6 : ereport(elevel,
259 : (errcode(ERRCODE_INVALID_NAME),
260 : errmsg("replication slot name \"%s\" is too short",
261 : name)));
262 0 : return false;
263 : }
264 :
265 1586 : if (strlen(name) >= NAMEDATALEN)
266 : {
267 0 : ereport(elevel,
268 : (errcode(ERRCODE_NAME_TOO_LONG),
269 : errmsg("replication slot name \"%s\" is too long",
270 : name)));
271 0 : return false;
272 : }
273 :
274 31734 : for (cp = name; *cp; cp++)
275 : {
276 30150 : if (!((*cp >= 'a' && *cp <= 'z')
277 14844 : || (*cp >= '0' && *cp <= '9')
278 2906 : || (*cp == '_')))
279 : {
280 2 : ereport(elevel,
281 : (errcode(ERRCODE_INVALID_NAME),
282 : errmsg("replication slot name \"%s\" contains invalid character",
283 : name),
284 : errhint("Replication slot names may only contain lower case letters, numbers, and the underscore character.")));
285 0 : return false;
286 : }
287 : }
288 1584 : return true;
289 : }
290 :
291 : /*
292 : * Create a new replication slot and mark it as used by this backend.
293 : *
294 : * name: Name of the slot
295 : * db_specific: logical decoding is db specific; if the slot is going to
296 : * be used for that pass true, otherwise false.
297 : * two_phase: Allows decoding of prepared transactions. We allow this option
298 : * to be enabled only at the slot creation time. If we allow this option
299 : * to be changed during decoding then it is quite possible that we skip
300 : * prepare first time because this option was not enabled. Now next time
301 : * during getting changes, if the two_phase option is enabled it can skip
302 : * prepare because by that time start decoding point has been moved. So the
303 : * user will only get commit prepared.
304 : * failover: If enabled, allows the slot to be synced to standbys so
305 : * that logical replication can be resumed after failover.
306 : * synced: True if the slot is synchronized from the primary server.
307 : */
308 : void
309 1142 : ReplicationSlotCreate(const char *name, bool db_specific,
310 : ReplicationSlotPersistency persistency,
311 : bool two_phase, bool failover, bool synced)
312 : {
313 1142 : ReplicationSlot *slot = NULL;
314 : int i;
315 :
316 : Assert(MyReplicationSlot == NULL);
317 :
318 1142 : ReplicationSlotValidateName(name, ERROR);
319 :
320 1140 : if (failover)
321 : {
322 : /*
323 : * Do not allow users to create the failover enabled slots on the
324 : * standby as we do not support sync to the cascading standby.
325 : *
326 : * However, failover enabled slots can be created during slot
327 : * synchronization because we need to retain the same values as the
328 : * remote slot.
329 : */
330 42 : if (RecoveryInProgress() && !IsSyncingReplicationSlots())
331 0 : ereport(ERROR,
332 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
333 : errmsg("cannot enable failover for a replication slot created on the standby"));
334 :
335 : /*
336 : * Do not allow users to create failover enabled temporary slots,
337 : * because temporary slots will not be synced to the standby.
338 : *
339 : * However, failover enabled temporary slots can be created during
340 : * slot synchronization. See the comments atop slotsync.c for details.
341 : */
342 42 : if (persistency == RS_TEMPORARY && !IsSyncingReplicationSlots())
343 2 : ereport(ERROR,
344 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
345 : errmsg("cannot enable failover for a temporary replication slot"));
346 : }
347 :
348 : /*
349 : * If some other backend ran this code concurrently with us, we'd likely
350 : * both allocate the same slot, and that would be bad. We'd also be at
351 : * risk of missing a name collision. Also, we don't want to try to create
352 : * a new slot while somebody's busy cleaning up an old one, because we
353 : * might both be monkeying with the same directory.
354 : */
355 1138 : LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
356 :
357 : /*
358 : * Check for name collision, and identify an allocatable slot. We need to
359 : * hold ReplicationSlotControlLock in shared mode for this, so that nobody
360 : * else can change the in_use flags while we're looking at them.
361 : */
362 1138 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
363 10814 : for (i = 0; i < max_replication_slots; i++)
364 : {
365 9682 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
366 :
367 9682 : if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
368 6 : ereport(ERROR,
369 : (errcode(ERRCODE_DUPLICATE_OBJECT),
370 : errmsg("replication slot \"%s\" already exists", name)));
371 9676 : if (!s->in_use && slot == NULL)
372 1130 : slot = s;
373 : }
374 1132 : LWLockRelease(ReplicationSlotControlLock);
375 :
376 : /* If all slots are in use, we're out of luck. */
377 1132 : if (slot == NULL)
378 2 : ereport(ERROR,
379 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
380 : errmsg("all replication slots are in use"),
381 : errhint("Free one or increase \"max_replication_slots\".")));
382 :
383 : /*
384 : * Since this slot is not in use, nobody should be looking at any part of
385 : * it other than the in_use field unless they're trying to allocate it.
386 : * And since we hold ReplicationSlotAllocationLock, nobody except us can
387 : * be doing that. So it's safe to initialize the slot.
388 : */
389 : Assert(!slot->in_use);
390 : Assert(slot->active_pid == 0);
391 :
392 : /* first initialize persistent data */
393 1130 : memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData));
394 1130 : namestrcpy(&slot->data.name, name);
395 1130 : slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
396 1130 : slot->data.persistency = persistency;
397 1130 : slot->data.two_phase = two_phase;
398 1130 : slot->data.two_phase_at = InvalidXLogRecPtr;
399 1130 : slot->data.failover = failover;
400 1130 : slot->data.synced = synced;
401 :
402 : /* and then data only present in shared memory */
403 1130 : slot->just_dirtied = false;
404 1130 : slot->dirty = false;
405 1130 : slot->effective_xmin = InvalidTransactionId;
406 1130 : slot->effective_catalog_xmin = InvalidTransactionId;
407 1130 : slot->candidate_catalog_xmin = InvalidTransactionId;
408 1130 : slot->candidate_xmin_lsn = InvalidXLogRecPtr;
409 1130 : slot->candidate_restart_valid = InvalidXLogRecPtr;
410 1130 : slot->candidate_restart_lsn = InvalidXLogRecPtr;
411 1130 : slot->last_saved_confirmed_flush = InvalidXLogRecPtr;
412 1130 : slot->inactive_since = 0;
413 :
414 : /*
415 : * Create the slot on disk. We haven't actually marked the slot allocated
416 : * yet, so no special cleanup is required if this errors out.
417 : */
418 1130 : CreateSlotOnDisk(slot);
419 :
420 : /*
421 : * We need to briefly prevent any other backend from iterating over the
422 : * slots while we flip the in_use flag. We also need to set the active
423 : * flag while holding the ControlLock as otherwise a concurrent
424 : * ReplicationSlotAcquire() could acquire the slot as well.
425 : */
426 1130 : LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
427 :
428 1130 : slot->in_use = true;
429 :
430 : /* We can now mark the slot active, and that makes it our slot. */
431 1130 : SpinLockAcquire(&slot->mutex);
432 : Assert(slot->active_pid == 0);
433 1130 : slot->active_pid = MyProcPid;
434 1130 : SpinLockRelease(&slot->mutex);
435 1130 : MyReplicationSlot = slot;
436 :
437 1130 : LWLockRelease(ReplicationSlotControlLock);
438 :
439 : /*
440 : * Create statistics entry for the new logical slot. We don't collect any
441 : * stats for physical slots, so no need to create an entry for the same.
442 : * See ReplicationSlotDropPtr for why we need to do this before releasing
443 : * ReplicationSlotAllocationLock.
444 : */
445 1130 : if (SlotIsLogical(slot))
446 814 : pgstat_create_replslot(slot);
447 :
448 : /*
449 : * Now that the slot has been marked as in_use and active, it's safe to
450 : * let somebody else try to allocate a slot.
451 : */
452 1130 : LWLockRelease(ReplicationSlotAllocationLock);
453 :
454 : /* Let everybody know we've modified this slot */
455 1130 : ConditionVariableBroadcast(&slot->active_cv);
456 1130 : }
457 :
458 : /*
459 : * Search for the named replication slot.
460 : *
461 : * Return the replication slot if found, otherwise NULL.
462 : */
463 : ReplicationSlot *
464 2480 : SearchNamedReplicationSlot(const char *name, bool need_lock)
465 : {
466 : int i;
467 2480 : ReplicationSlot *slot = NULL;
468 :
469 2480 : if (need_lock)
470 152 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
471 :
472 4250 : for (i = 0; i < max_replication_slots; i++)
473 : {
474 4208 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
475 :
476 4208 : if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
477 : {
478 2438 : slot = s;
479 2438 : break;
480 : }
481 : }
482 :
483 2480 : if (need_lock)
484 152 : LWLockRelease(ReplicationSlotControlLock);
485 :
486 2480 : return slot;
487 : }
488 :
489 : /*
490 : * Return the index of the replication slot in
491 : * ReplicationSlotCtl->replication_slots.
492 : *
493 : * This is mainly useful to have an efficient key for storing replication slot
494 : * stats.
495 : */
496 : int
497 14846 : ReplicationSlotIndex(ReplicationSlot *slot)
498 : {
499 : Assert(slot >= ReplicationSlotCtl->replication_slots &&
500 : slot < ReplicationSlotCtl->replication_slots + max_replication_slots);
501 :
502 14846 : return slot - ReplicationSlotCtl->replication_slots;
503 : }
504 :
505 : /*
506 : * If the slot at 'index' is unused, return false. Otherwise 'name' is set to
507 : * the slot's name and true is returned.
508 : *
509 : * This likely is only useful for pgstat_replslot.c during shutdown, in other
510 : * cases there are obvious TOCTOU issues.
511 : */
512 : bool
513 148 : ReplicationSlotName(int index, Name name)
514 : {
515 : ReplicationSlot *slot;
516 : bool found;
517 :
518 148 : slot = &ReplicationSlotCtl->replication_slots[index];
519 :
520 : /*
521 : * Ensure that the slot cannot be dropped while we copy the name. Don't
522 : * need the spinlock as the name of an existing slot cannot change.
523 : */
524 148 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
525 148 : found = slot->in_use;
526 148 : if (slot->in_use)
527 148 : namestrcpy(name, NameStr(slot->data.name));
528 148 : LWLockRelease(ReplicationSlotControlLock);
529 :
530 148 : return found;
531 : }
532 :
533 : /*
534 : * Find a previously created slot and mark it as used by this process.
535 : *
536 : * An error is raised if nowait is true and the slot is currently in use. If
537 : * nowait is false, we sleep until the slot is released by the owning process.
538 : */
539 : void
540 2186 : ReplicationSlotAcquire(const char *name, bool nowait)
541 : {
542 : ReplicationSlot *s;
543 : int active_pid;
544 :
545 : Assert(name != NULL);
546 :
547 2186 : retry:
548 : Assert(MyReplicationSlot == NULL);
549 :
550 2186 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
551 :
552 : /* Check if the slot exits with the given name. */
553 2186 : s = SearchNamedReplicationSlot(name, false);
554 2186 : if (s == NULL || !s->in_use)
555 : {
556 18 : LWLockRelease(ReplicationSlotControlLock);
557 :
558 18 : ereport(ERROR,
559 : (errcode(ERRCODE_UNDEFINED_OBJECT),
560 : errmsg("replication slot \"%s\" does not exist",
561 : name)));
562 : }
563 :
564 : /*
565 : * This is the slot we want; check if it's active under some other
566 : * process. In single user mode, we don't need this check.
567 : */
568 2168 : if (IsUnderPostmaster)
569 : {
570 : /*
571 : * Get ready to sleep on the slot in case it is active. (We may end
572 : * up not sleeping, but we don't want to do this while holding the
573 : * spinlock.)
574 : */
575 2168 : if (!nowait)
576 452 : ConditionVariablePrepareToSleep(&s->active_cv);
577 :
578 2168 : SpinLockAcquire(&s->mutex);
579 2168 : if (s->active_pid == 0)
580 1918 : s->active_pid = MyProcPid;
581 2168 : active_pid = s->active_pid;
582 2168 : SpinLockRelease(&s->mutex);
583 : }
584 : else
585 0 : active_pid = MyProcPid;
586 2168 : LWLockRelease(ReplicationSlotControlLock);
587 :
588 : /*
589 : * If we found the slot but it's already active in another process, we
590 : * wait until the owning process signals us that it's been released, or
591 : * error out.
592 : */
593 2168 : if (active_pid != MyProcPid)
594 : {
595 0 : if (!nowait)
596 : {
597 : /* Wait here until we get signaled, and then restart */
598 0 : ConditionVariableSleep(&s->active_cv,
599 : WAIT_EVENT_REPLICATION_SLOT_DROP);
600 0 : ConditionVariableCancelSleep();
601 0 : goto retry;
602 : }
603 :
604 0 : ereport(ERROR,
605 : (errcode(ERRCODE_OBJECT_IN_USE),
606 : errmsg("replication slot \"%s\" is active for PID %d",
607 : NameStr(s->data.name), active_pid)));
608 : }
609 2168 : else if (!nowait)
610 452 : ConditionVariableCancelSleep(); /* no sleep needed after all */
611 :
612 : /* Let everybody know we've modified this slot */
613 2168 : ConditionVariableBroadcast(&s->active_cv);
614 :
615 : /* We made this slot active, so it's ours now. */
616 2168 : MyReplicationSlot = s;
617 :
618 : /*
619 : * The call to pgstat_acquire_replslot() protects against stats for a
620 : * different slot, from before a restart or such, being present during
621 : * pgstat_report_replslot().
622 : */
623 2168 : if (SlotIsLogical(s))
624 1802 : pgstat_acquire_replslot(s);
625 :
626 : /*
627 : * Reset the time since the slot has become inactive as the slot is active
628 : * now.
629 : */
630 2168 : SpinLockAcquire(&s->mutex);
631 2168 : s->inactive_since = 0;
632 2168 : SpinLockRelease(&s->mutex);
633 :
634 2168 : if (am_walsender)
635 : {
636 1452 : ereport(log_replication_commands ? LOG : DEBUG1,
637 : SlotIsLogical(s)
638 : ? errmsg("acquired logical replication slot \"%s\"",
639 : NameStr(s->data.name))
640 : : errmsg("acquired physical replication slot \"%s\"",
641 : NameStr(s->data.name)));
642 : }
643 2168 : }
644 :
645 : /*
646 : * Release the replication slot that this backend considers to own.
647 : *
648 : * This or another backend can re-acquire the slot later.
649 : * Resources this slot requires will be preserved.
650 : */
651 : void
652 2640 : ReplicationSlotRelease(void)
653 : {
654 2640 : ReplicationSlot *slot = MyReplicationSlot;
655 2640 : char *slotname = NULL; /* keep compiler quiet */
656 2640 : bool is_logical = false; /* keep compiler quiet */
657 2640 : TimestampTz now = 0;
658 :
659 : Assert(slot != NULL && slot->active_pid != 0);
660 :
661 2640 : if (am_walsender)
662 : {
663 1822 : slotname = pstrdup(NameStr(slot->data.name));
664 1822 : is_logical = SlotIsLogical(slot);
665 : }
666 :
667 2640 : if (slot->data.persistency == RS_EPHEMERAL)
668 : {
669 : /*
670 : * Delete the slot. There is no !PANIC case where this is allowed to
671 : * fail, all that may happen is an incomplete cleanup of the on-disk
672 : * data.
673 : */
674 10 : ReplicationSlotDropAcquired();
675 : }
676 :
677 : /*
678 : * If slot needed to temporarily restrain both data and catalog xmin to
679 : * create the catalog snapshot, remove that temporary constraint.
680 : * Snapshots can only be exported while the initial snapshot is still
681 : * acquired.
682 : */
683 2640 : if (!TransactionIdIsValid(slot->data.xmin) &&
684 2592 : TransactionIdIsValid(slot->effective_xmin))
685 : {
686 340 : SpinLockAcquire(&slot->mutex);
687 340 : slot->effective_xmin = InvalidTransactionId;
688 340 : SpinLockRelease(&slot->mutex);
689 340 : ReplicationSlotsComputeRequiredXmin(false);
690 : }
691 :
692 : /*
693 : * Set the time since the slot has become inactive. We get the current
694 : * time beforehand to avoid system call while holding the spinlock.
695 : */
696 2640 : now = GetCurrentTimestamp();
697 :
698 2640 : if (slot->data.persistency == RS_PERSISTENT)
699 : {
700 : /*
701 : * Mark persistent slot inactive. We're not freeing it, just
702 : * disconnecting, but wake up others that may be waiting for it.
703 : */
704 2126 : SpinLockAcquire(&slot->mutex);
705 2126 : slot->active_pid = 0;
706 2126 : slot->inactive_since = now;
707 2126 : SpinLockRelease(&slot->mutex);
708 2126 : ConditionVariableBroadcast(&slot->active_cv);
709 : }
710 : else
711 : {
712 514 : SpinLockAcquire(&slot->mutex);
713 514 : slot->inactive_since = now;
714 514 : SpinLockRelease(&slot->mutex);
715 : }
716 :
717 2640 : MyReplicationSlot = NULL;
718 :
719 : /* might not have been set when we've been a plain slot */
720 2640 : LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
721 2640 : MyProc->statusFlags &= ~PROC_IN_LOGICAL_DECODING;
722 2640 : ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags;
723 2640 : LWLockRelease(ProcArrayLock);
724 :
725 2640 : if (am_walsender)
726 : {
727 1822 : ereport(log_replication_commands ? LOG : DEBUG1,
728 : is_logical
729 : ? errmsg("released logical replication slot \"%s\"",
730 : slotname)
731 : : errmsg("released physical replication slot \"%s\"",
732 : slotname));
733 :
734 1822 : pfree(slotname);
735 : }
736 2640 : }
737 :
738 : /*
739 : * Cleanup temporary slots created in current session.
740 : *
741 : * Cleanup only synced temporary slots if 'synced_only' is true, else
742 : * cleanup all temporary slots.
743 : */
744 : void
745 70184 : ReplicationSlotCleanup(bool synced_only)
746 : {
747 : int i;
748 :
749 : Assert(MyReplicationSlot == NULL);
750 :
751 70184 : restart:
752 70184 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
753 759732 : for (i = 0; i < max_replication_slots; i++)
754 : {
755 689802 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
756 :
757 689802 : if (!s->in_use)
758 666450 : continue;
759 :
760 23352 : SpinLockAcquire(&s->mutex);
761 23352 : if ((s->active_pid == MyProcPid &&
762 254 : (!synced_only || s->data.synced)))
763 : {
764 : Assert(s->data.persistency == RS_TEMPORARY);
765 254 : SpinLockRelease(&s->mutex);
766 254 : LWLockRelease(ReplicationSlotControlLock); /* avoid deadlock */
767 :
768 254 : ReplicationSlotDropPtr(s);
769 :
770 254 : ConditionVariableBroadcast(&s->active_cv);
771 254 : goto restart;
772 : }
773 : else
774 23098 : SpinLockRelease(&s->mutex);
775 : }
776 :
777 69930 : LWLockRelease(ReplicationSlotControlLock);
778 69930 : }
779 :
780 : /*
781 : * Permanently drop replication slot identified by the passed in name.
782 : */
783 : void
784 696 : ReplicationSlotDrop(const char *name, bool nowait)
785 : {
786 : Assert(MyReplicationSlot == NULL);
787 :
788 696 : ReplicationSlotAcquire(name, nowait);
789 :
790 : /*
791 : * Do not allow users to drop the slots which are currently being synced
792 : * from the primary to the standby.
793 : */
794 684 : if (RecoveryInProgress() && MyReplicationSlot->data.synced)
795 2 : ereport(ERROR,
796 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
797 : errmsg("cannot drop replication slot \"%s\"", name),
798 : errdetail("This slot is being synced from the primary server."));
799 :
800 682 : ReplicationSlotDropAcquired();
801 682 : }
802 :
803 : /*
804 : * Change the definition of the slot identified by the specified name.
805 : */
806 : void
807 10 : ReplicationSlotAlter(const char *name, const bool *failover,
808 : const bool *two_phase)
809 : {
810 10 : bool update_slot = false;
811 :
812 : Assert(MyReplicationSlot == NULL);
813 : Assert(failover || two_phase);
814 :
815 10 : ReplicationSlotAcquire(name, false);
816 :
817 10 : if (SlotIsPhysical(MyReplicationSlot))
818 0 : ereport(ERROR,
819 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
820 : errmsg("cannot use %s with a physical replication slot",
821 : "ALTER_REPLICATION_SLOT"));
822 :
823 10 : if (RecoveryInProgress())
824 : {
825 : /*
826 : * Do not allow users to alter the slots which are currently being
827 : * synced from the primary to the standby.
828 : */
829 2 : if (MyReplicationSlot->data.synced)
830 2 : ereport(ERROR,
831 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
832 : errmsg("cannot alter replication slot \"%s\"", name),
833 : errdetail("This slot is being synced from the primary server."));
834 :
835 : /*
836 : * Do not allow users to enable failover on the standby as we do not
837 : * support sync to the cascading standby.
838 : */
839 0 : if (failover && *failover)
840 0 : ereport(ERROR,
841 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
842 : errmsg("cannot enable failover for a replication slot"
843 : " on the standby"));
844 : }
845 :
846 8 : if (failover)
847 : {
848 : /*
849 : * Do not allow users to enable failover for temporary slots as we do
850 : * not support syncing temporary slots to the standby.
851 : */
852 6 : if (*failover && MyReplicationSlot->data.persistency == RS_TEMPORARY)
853 0 : ereport(ERROR,
854 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
855 : errmsg("cannot enable failover for a temporary replication slot"));
856 :
857 6 : if (MyReplicationSlot->data.failover != *failover)
858 : {
859 6 : SpinLockAcquire(&MyReplicationSlot->mutex);
860 6 : MyReplicationSlot->data.failover = *failover;
861 6 : SpinLockRelease(&MyReplicationSlot->mutex);
862 :
863 6 : update_slot = true;
864 : }
865 : }
866 :
867 8 : if (two_phase && MyReplicationSlot->data.two_phase != *two_phase)
868 : {
869 2 : SpinLockAcquire(&MyReplicationSlot->mutex);
870 2 : MyReplicationSlot->data.two_phase = *two_phase;
871 2 : SpinLockRelease(&MyReplicationSlot->mutex);
872 :
873 2 : update_slot = true;
874 : }
875 :
876 8 : if (update_slot)
877 : {
878 8 : ReplicationSlotMarkDirty();
879 8 : ReplicationSlotSave();
880 : }
881 :
882 8 : ReplicationSlotRelease();
883 8 : }
884 :
885 : /*
886 : * Permanently drop the currently acquired replication slot.
887 : */
888 : void
889 706 : ReplicationSlotDropAcquired(void)
890 : {
891 706 : ReplicationSlot *slot = MyReplicationSlot;
892 :
893 : Assert(MyReplicationSlot != NULL);
894 :
895 : /* slot isn't acquired anymore */
896 706 : MyReplicationSlot = NULL;
897 :
898 706 : ReplicationSlotDropPtr(slot);
899 706 : }
900 :
901 : /*
902 : * Permanently drop the replication slot which will be released by the point
903 : * this function returns.
904 : */
905 : static void
906 960 : ReplicationSlotDropPtr(ReplicationSlot *slot)
907 : {
908 : char path[MAXPGPATH];
909 : char tmppath[MAXPGPATH];
910 :
911 : /*
912 : * If some other backend ran this code concurrently with us, we might try
913 : * to delete a slot with a certain name while someone else was trying to
914 : * create a slot with the same name.
915 : */
916 960 : LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
917 :
918 : /* Generate pathnames. */
919 960 : sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
920 960 : sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));
921 :
922 : /*
923 : * Rename the slot directory on disk, so that we'll no longer recognize
924 : * this as a valid slot. Note that if this fails, we've got to mark the
925 : * slot inactive before bailing out. If we're dropping an ephemeral or a
926 : * temporary slot, we better never fail hard as the caller won't expect
927 : * the slot to survive and this might get called during error handling.
928 : */
929 960 : if (rename(path, tmppath) == 0)
930 : {
931 : /*
932 : * We need to fsync() the directory we just renamed and its parent to
933 : * make sure that our changes are on disk in a crash-safe fashion. If
934 : * fsync() fails, we can't be sure whether the changes are on disk or
935 : * not. For now, we handle that by panicking;
936 : * StartupReplicationSlots() will try to straighten it out after
937 : * restart.
938 : */
939 960 : START_CRIT_SECTION();
940 960 : fsync_fname(tmppath, true);
941 960 : fsync_fname("pg_replslot", true);
942 960 : END_CRIT_SECTION();
943 : }
944 : else
945 : {
946 0 : bool fail_softly = slot->data.persistency != RS_PERSISTENT;
947 :
948 0 : SpinLockAcquire(&slot->mutex);
949 0 : slot->active_pid = 0;
950 0 : SpinLockRelease(&slot->mutex);
951 :
952 : /* wake up anyone waiting on this slot */
953 0 : ConditionVariableBroadcast(&slot->active_cv);
954 :
955 0 : ereport(fail_softly ? WARNING : ERROR,
956 : (errcode_for_file_access(),
957 : errmsg("could not rename file \"%s\" to \"%s\": %m",
958 : path, tmppath)));
959 : }
960 :
961 : /*
962 : * The slot is definitely gone. Lock out concurrent scans of the array
963 : * long enough to kill it. It's OK to clear the active PID here without
964 : * grabbing the mutex because nobody else can be scanning the array here,
965 : * and nobody can be attached to this slot and thus access it without
966 : * scanning the array.
967 : *
968 : * Also wake up processes waiting for it.
969 : */
970 960 : LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
971 960 : slot->active_pid = 0;
972 960 : slot->in_use = false;
973 960 : LWLockRelease(ReplicationSlotControlLock);
974 960 : ConditionVariableBroadcast(&slot->active_cv);
975 :
976 : /*
977 : * Slot is dead and doesn't prevent resource removal anymore, recompute
978 : * limits.
979 : */
980 960 : ReplicationSlotsComputeRequiredXmin(false);
981 960 : ReplicationSlotsComputeRequiredLSN();
982 :
983 : /*
984 : * If removing the directory fails, the worst thing that will happen is
985 : * that the user won't be able to create a new slot with the same name
986 : * until the next server restart. We warn about it, but that's all.
987 : */
988 960 : if (!rmtree(tmppath, true))
989 0 : ereport(WARNING,
990 : (errmsg("could not remove directory \"%s\"", tmppath)));
991 :
992 : /*
993 : * Drop the statistics entry for the replication slot. Do this while
994 : * holding ReplicationSlotAllocationLock so that we don't drop a
995 : * statistics entry for another slot with the same name just created in
996 : * another session.
997 : */
998 960 : if (SlotIsLogical(slot))
999 690 : pgstat_drop_replslot(slot);
1000 :
1001 : /*
1002 : * We release this at the very end, so that nobody starts trying to create
1003 : * a slot while we're still cleaning up the detritus of the old one.
1004 : */
1005 960 : LWLockRelease(ReplicationSlotAllocationLock);
1006 960 : }
1007 :
1008 : /*
1009 : * Serialize the currently acquired slot's state from memory to disk, thereby
1010 : * guaranteeing the current state will survive a crash.
1011 : */
1012 : void
1013 2256 : ReplicationSlotSave(void)
1014 : {
1015 : char path[MAXPGPATH];
1016 :
1017 : Assert(MyReplicationSlot != NULL);
1018 :
1019 2256 : sprintf(path, "pg_replslot/%s", NameStr(MyReplicationSlot->data.name));
1020 2256 : SaveSlotToPath(MyReplicationSlot, path, ERROR);
1021 2256 : }
1022 :
1023 : /*
1024 : * Signal that it would be useful if the currently acquired slot would be
1025 : * flushed out to disk.
1026 : *
1027 : * Note that the actual flush to disk can be delayed for a long time, if
1028 : * required for correctness explicitly do a ReplicationSlotSave().
1029 : */
1030 : void
1031 40604 : ReplicationSlotMarkDirty(void)
1032 : {
1033 40604 : ReplicationSlot *slot = MyReplicationSlot;
1034 :
1035 : Assert(MyReplicationSlot != NULL);
1036 :
1037 40604 : SpinLockAcquire(&slot->mutex);
1038 40604 : MyReplicationSlot->just_dirtied = true;
1039 40604 : MyReplicationSlot->dirty = true;
1040 40604 : SpinLockRelease(&slot->mutex);
1041 40604 : }
1042 :
1043 : /*
1044 : * Convert a slot that's marked as RS_EPHEMERAL or RS_TEMPORARY to a
1045 : * RS_PERSISTENT slot, guaranteeing it will be there after an eventual crash.
1046 : */
1047 : void
1048 788 : ReplicationSlotPersist(void)
1049 : {
1050 788 : ReplicationSlot *slot = MyReplicationSlot;
1051 :
1052 : Assert(slot != NULL);
1053 : Assert(slot->data.persistency != RS_PERSISTENT);
1054 :
1055 788 : SpinLockAcquire(&slot->mutex);
1056 788 : slot->data.persistency = RS_PERSISTENT;
1057 788 : SpinLockRelease(&slot->mutex);
1058 :
1059 788 : ReplicationSlotMarkDirty();
1060 788 : ReplicationSlotSave();
1061 788 : }
1062 :
1063 : /*
1064 : * Compute the oldest xmin across all slots and store it in the ProcArray.
1065 : *
1066 : * If already_locked is true, ProcArrayLock has already been acquired
1067 : * exclusively.
1068 : */
1069 : void
1070 3932 : ReplicationSlotsComputeRequiredXmin(bool already_locked)
1071 : {
1072 : int i;
1073 3932 : TransactionId agg_xmin = InvalidTransactionId;
1074 3932 : TransactionId agg_catalog_xmin = InvalidTransactionId;
1075 :
1076 : Assert(ReplicationSlotCtl != NULL);
1077 :
1078 3932 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1079 :
1080 39382 : for (i = 0; i < max_replication_slots; i++)
1081 : {
1082 35450 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
1083 : TransactionId effective_xmin;
1084 : TransactionId effective_catalog_xmin;
1085 : bool invalidated;
1086 :
1087 35450 : if (!s->in_use)
1088 31862 : continue;
1089 :
1090 3588 : SpinLockAcquire(&s->mutex);
1091 3588 : effective_xmin = s->effective_xmin;
1092 3588 : effective_catalog_xmin = s->effective_catalog_xmin;
1093 3588 : invalidated = s->data.invalidated != RS_INVAL_NONE;
1094 3588 : SpinLockRelease(&s->mutex);
1095 :
1096 : /* invalidated slots need not apply */
1097 3588 : if (invalidated)
1098 44 : continue;
1099 :
1100 : /* check the data xmin */
1101 3544 : if (TransactionIdIsValid(effective_xmin) &&
1102 6 : (!TransactionIdIsValid(agg_xmin) ||
1103 6 : TransactionIdPrecedes(effective_xmin, agg_xmin)))
1104 510 : agg_xmin = effective_xmin;
1105 :
1106 : /* check the catalog xmin */
1107 3544 : if (TransactionIdIsValid(effective_catalog_xmin) &&
1108 1430 : (!TransactionIdIsValid(agg_catalog_xmin) ||
1109 1430 : TransactionIdPrecedes(effective_catalog_xmin, agg_catalog_xmin)))
1110 1970 : agg_catalog_xmin = effective_catalog_xmin;
1111 : }
1112 :
1113 3932 : LWLockRelease(ReplicationSlotControlLock);
1114 :
1115 3932 : ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
1116 3932 : }
1117 :
1118 : /*
1119 : * Compute the oldest restart LSN across all slots and inform xlog module.
1120 : *
1121 : * Note: while max_slot_wal_keep_size is theoretically relevant for this
1122 : * purpose, we don't try to account for that, because this module doesn't
1123 : * know what to compare against.
1124 : */
1125 : void
1126 41370 : ReplicationSlotsComputeRequiredLSN(void)
1127 : {
1128 : int i;
1129 41370 : XLogRecPtr min_required = InvalidXLogRecPtr;
1130 :
1131 : Assert(ReplicationSlotCtl != NULL);
1132 :
1133 41370 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1134 448886 : for (i = 0; i < max_replication_slots; i++)
1135 : {
1136 407516 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
1137 : XLogRecPtr restart_lsn;
1138 : bool invalidated;
1139 :
1140 407516 : if (!s->in_use)
1141 366486 : continue;
1142 :
1143 41030 : SpinLockAcquire(&s->mutex);
1144 41030 : restart_lsn = s->data.restart_lsn;
1145 41030 : invalidated = s->data.invalidated != RS_INVAL_NONE;
1146 41030 : SpinLockRelease(&s->mutex);
1147 :
1148 : /* invalidated slots need not apply */
1149 41030 : if (invalidated)
1150 46 : continue;
1151 :
1152 40984 : if (restart_lsn != InvalidXLogRecPtr &&
1153 1454 : (min_required == InvalidXLogRecPtr ||
1154 : restart_lsn < min_required))
1155 39588 : min_required = restart_lsn;
1156 : }
1157 41370 : LWLockRelease(ReplicationSlotControlLock);
1158 :
1159 41370 : XLogSetReplicationSlotMinimumLSN(min_required);
1160 41370 : }
1161 :
1162 : /*
1163 : * Compute the oldest WAL LSN required by *logical* decoding slots..
1164 : *
1165 : * Returns InvalidXLogRecPtr if logical decoding is disabled or no logical
1166 : * slots exist.
1167 : *
1168 : * NB: this returns a value >= ReplicationSlotsComputeRequiredLSN(), since it
1169 : * ignores physical replication slots.
1170 : *
1171 : * The results aren't required frequently, so we don't maintain a precomputed
1172 : * value like we do for ComputeRequiredLSN() and ComputeRequiredXmin().
1173 : */
1174 : XLogRecPtr
1175 3488 : ReplicationSlotsComputeLogicalRestartLSN(void)
1176 : {
1177 3488 : XLogRecPtr result = InvalidXLogRecPtr;
1178 : int i;
1179 :
1180 3488 : if (max_replication_slots <= 0)
1181 4 : return InvalidXLogRecPtr;
1182 :
1183 3484 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1184 :
1185 37288 : for (i = 0; i < max_replication_slots; i++)
1186 : {
1187 : ReplicationSlot *s;
1188 : XLogRecPtr restart_lsn;
1189 : bool invalidated;
1190 :
1191 33804 : s = &ReplicationSlotCtl->replication_slots[i];
1192 :
1193 : /* cannot change while ReplicationSlotCtlLock is held */
1194 33804 : if (!s->in_use)
1195 33136 : continue;
1196 :
1197 : /* we're only interested in logical slots */
1198 668 : if (!SlotIsLogical(s))
1199 332 : continue;
1200 :
1201 : /* read once, it's ok if it increases while we're checking */
1202 336 : SpinLockAcquire(&s->mutex);
1203 336 : restart_lsn = s->data.restart_lsn;
1204 336 : invalidated = s->data.invalidated != RS_INVAL_NONE;
1205 336 : SpinLockRelease(&s->mutex);
1206 :
1207 : /* invalidated slots need not apply */
1208 336 : if (invalidated)
1209 8 : continue;
1210 :
1211 328 : if (restart_lsn == InvalidXLogRecPtr)
1212 0 : continue;
1213 :
1214 328 : if (result == InvalidXLogRecPtr ||
1215 : restart_lsn < result)
1216 268 : result = restart_lsn;
1217 : }
1218 :
1219 3484 : LWLockRelease(ReplicationSlotControlLock);
1220 :
1221 3484 : return result;
1222 : }
1223 :
1224 : /*
1225 : * ReplicationSlotsCountDBSlots -- count the number of slots that refer to the
1226 : * passed database oid.
1227 : *
1228 : * Returns true if there are any slots referencing the database. *nslots will
1229 : * be set to the absolute number of slots in the database, *nactive to ones
1230 : * currently active.
1231 : */
1232 : bool
1233 68 : ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
1234 : {
1235 : int i;
1236 :
1237 68 : *nslots = *nactive = 0;
1238 :
1239 68 : if (max_replication_slots <= 0)
1240 0 : return false;
1241 :
1242 68 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1243 690 : for (i = 0; i < max_replication_slots; i++)
1244 : {
1245 : ReplicationSlot *s;
1246 :
1247 622 : s = &ReplicationSlotCtl->replication_slots[i];
1248 :
1249 : /* cannot change while ReplicationSlotCtlLock is held */
1250 622 : if (!s->in_use)
1251 586 : continue;
1252 :
1253 : /* only logical slots are database specific, skip */
1254 36 : if (!SlotIsLogical(s))
1255 20 : continue;
1256 :
1257 : /* not our database, skip */
1258 16 : if (s->data.database != dboid)
1259 10 : continue;
1260 :
1261 : /* NB: intentionally counting invalidated slots */
1262 :
1263 : /* count slots with spinlock held */
1264 6 : SpinLockAcquire(&s->mutex);
1265 6 : (*nslots)++;
1266 6 : if (s->active_pid != 0)
1267 2 : (*nactive)++;
1268 6 : SpinLockRelease(&s->mutex);
1269 : }
1270 68 : LWLockRelease(ReplicationSlotControlLock);
1271 :
1272 68 : if (*nslots > 0)
1273 6 : return true;
1274 62 : return false;
1275 : }
1276 :
1277 : /*
1278 : * ReplicationSlotsDropDBSlots -- Drop all db-specific slots relating to the
1279 : * passed database oid. The caller should hold an exclusive lock on the
1280 : * pg_database oid for the database to prevent creation of new slots on the db
1281 : * or replay from existing slots.
1282 : *
1283 : * Another session that concurrently acquires an existing slot on the target DB
1284 : * (most likely to drop it) may cause this function to ERROR. If that happens
1285 : * it may have dropped some but not all slots.
1286 : *
1287 : * This routine isn't as efficient as it could be - but we don't drop
1288 : * databases often, especially databases with lots of slots.
1289 : */
1290 : void
1291 92 : ReplicationSlotsDropDBSlots(Oid dboid)
1292 : {
1293 : int i;
1294 :
1295 92 : if (max_replication_slots <= 0)
1296 0 : return;
1297 :
1298 92 : restart:
1299 102 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1300 948 : for (i = 0; i < max_replication_slots; i++)
1301 : {
1302 : ReplicationSlot *s;
1303 : char *slotname;
1304 : int active_pid;
1305 :
1306 856 : s = &ReplicationSlotCtl->replication_slots[i];
1307 :
1308 : /* cannot change while ReplicationSlotCtlLock is held */
1309 856 : if (!s->in_use)
1310 802 : continue;
1311 :
1312 : /* only logical slots are database specific, skip */
1313 54 : if (!SlotIsLogical(s))
1314 22 : continue;
1315 :
1316 : /* not our database, skip */
1317 32 : if (s->data.database != dboid)
1318 22 : continue;
1319 :
1320 : /* NB: intentionally including invalidated slots */
1321 :
1322 : /* acquire slot, so ReplicationSlotDropAcquired can be reused */
1323 10 : SpinLockAcquire(&s->mutex);
1324 : /* can't change while ReplicationSlotControlLock is held */
1325 10 : slotname = NameStr(s->data.name);
1326 10 : active_pid = s->active_pid;
1327 10 : if (active_pid == 0)
1328 : {
1329 10 : MyReplicationSlot = s;
1330 10 : s->active_pid = MyProcPid;
1331 : }
1332 10 : SpinLockRelease(&s->mutex);
1333 :
1334 : /*
1335 : * Even though we hold an exclusive lock on the database object a
1336 : * logical slot for that DB can still be active, e.g. if it's
1337 : * concurrently being dropped by a backend connected to another DB.
1338 : *
1339 : * That's fairly unlikely in practice, so we'll just bail out.
1340 : *
1341 : * The slot sync worker holds a shared lock on the database before
1342 : * operating on synced logical slots to avoid conflict with the drop
1343 : * happening here. The persistent synced slots are thus safe but there
1344 : * is a possibility that the slot sync worker has created a temporary
1345 : * slot (which stays active even on release) and we are trying to drop
1346 : * that here. In practice, the chances of hitting this scenario are
1347 : * less as during slot synchronization, the temporary slot is
1348 : * immediately converted to persistent and thus is safe due to the
1349 : * shared lock taken on the database. So, we'll just bail out in such
1350 : * a case.
1351 : *
1352 : * XXX: We can consider shutting down the slot sync worker before
1353 : * trying to drop synced temporary slots here.
1354 : */
1355 10 : if (active_pid)
1356 0 : ereport(ERROR,
1357 : (errcode(ERRCODE_OBJECT_IN_USE),
1358 : errmsg("replication slot \"%s\" is active for PID %d",
1359 : slotname, active_pid)));
1360 :
1361 : /*
1362 : * To avoid duplicating ReplicationSlotDropAcquired() and to avoid
1363 : * holding ReplicationSlotControlLock over filesystem operations,
1364 : * release ReplicationSlotControlLock and use
1365 : * ReplicationSlotDropAcquired.
1366 : *
1367 : * As that means the set of slots could change, restart scan from the
1368 : * beginning each time we release the lock.
1369 : */
1370 10 : LWLockRelease(ReplicationSlotControlLock);
1371 10 : ReplicationSlotDropAcquired();
1372 10 : goto restart;
1373 : }
1374 92 : LWLockRelease(ReplicationSlotControlLock);
1375 : }
1376 :
1377 :
1378 : /*
1379 : * Check whether the server's configuration supports using replication
1380 : * slots.
1381 : */
1382 : void
1383 3018 : CheckSlotRequirements(void)
1384 : {
1385 : /*
1386 : * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
1387 : * needs the same check.
1388 : */
1389 :
1390 3018 : if (max_replication_slots == 0)
1391 0 : ereport(ERROR,
1392 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1393 : errmsg("replication slots can only be used if \"max_replication_slots\" > 0")));
1394 :
1395 3018 : if (wal_level < WAL_LEVEL_REPLICA)
1396 0 : ereport(ERROR,
1397 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1398 : errmsg("replication slots can only be used if \"wal_level\" >= \"replica\"")));
1399 3018 : }
1400 :
1401 : /*
1402 : * Check whether the user has privilege to use replication slots.
1403 : */
1404 : void
1405 1016 : CheckSlotPermissions(void)
1406 : {
1407 1016 : if (!has_rolreplication(GetUserId()))
1408 10 : ereport(ERROR,
1409 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1410 : errmsg("permission denied to use replication slots"),
1411 : errdetail("Only roles with the %s attribute may use replication slots.",
1412 : "REPLICATION")));
1413 1006 : }
1414 :
1415 : /*
1416 : * Reserve WAL for the currently active slot.
1417 : *
1418 : * Compute and set restart_lsn in a manner that's appropriate for the type of
1419 : * the slot and concurrency safe.
1420 : */
1421 : void
1422 1054 : ReplicationSlotReserveWal(void)
1423 : {
1424 1054 : ReplicationSlot *slot = MyReplicationSlot;
1425 :
1426 : Assert(slot != NULL);
1427 : Assert(slot->data.restart_lsn == InvalidXLogRecPtr);
1428 :
1429 : /*
1430 : * The replication slot mechanism is used to prevent removal of required
1431 : * WAL. As there is no interlock between this routine and checkpoints, WAL
1432 : * segments could concurrently be removed when a now stale return value of
1433 : * ReplicationSlotsComputeRequiredLSN() is used. In the unlikely case that
1434 : * this happens we'll just retry.
1435 : */
1436 : while (true)
1437 0 : {
1438 : XLogSegNo segno;
1439 : XLogRecPtr restart_lsn;
1440 :
1441 : /*
1442 : * For logical slots log a standby snapshot and start logical decoding
1443 : * at exactly that position. That allows the slot to start up more
1444 : * quickly. But on a standby we cannot do WAL writes, so just use the
1445 : * replay pointer; effectively, an attempt to create a logical slot on
1446 : * standby will cause it to wait for an xl_running_xact record to be
1447 : * logged independently on the primary, so that a snapshot can be
1448 : * built using the record.
1449 : *
1450 : * None of this is needed (or indeed helpful) for physical slots as
1451 : * they'll start replay at the last logged checkpoint anyway. Instead
1452 : * return the location of the last redo LSN. While that slightly
1453 : * increases the chance that we have to retry, it's where a base
1454 : * backup has to start replay at.
1455 : */
1456 1054 : if (SlotIsPhysical(slot))
1457 266 : restart_lsn = GetRedoRecPtr();
1458 788 : else if (RecoveryInProgress())
1459 44 : restart_lsn = GetXLogReplayRecPtr(NULL);
1460 : else
1461 744 : restart_lsn = GetXLogInsertRecPtr();
1462 :
1463 1054 : SpinLockAcquire(&slot->mutex);
1464 1054 : slot->data.restart_lsn = restart_lsn;
1465 1054 : SpinLockRelease(&slot->mutex);
1466 :
1467 : /* prevent WAL removal as fast as possible */
1468 1054 : ReplicationSlotsComputeRequiredLSN();
1469 :
1470 : /*
1471 : * If all required WAL is still there, great, otherwise retry. The
1472 : * slot should prevent further removal of WAL, unless there's a
1473 : * concurrent ReplicationSlotsComputeRequiredLSN() after we've written
1474 : * the new restart_lsn above, so normally we should never need to loop
1475 : * more than twice.
1476 : */
1477 1054 : XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size);
1478 1054 : if (XLogGetLastRemovedSegno() < segno)
1479 1054 : break;
1480 : }
1481 :
1482 1054 : if (!RecoveryInProgress() && SlotIsLogical(slot))
1483 : {
1484 : XLogRecPtr flushptr;
1485 :
1486 : /* make sure we have enough information to start */
1487 744 : flushptr = LogStandbySnapshot();
1488 :
1489 : /* and make sure it's fsynced to disk */
1490 744 : XLogFlush(flushptr);
1491 : }
1492 1054 : }
1493 :
1494 : /*
1495 : * Report that replication slot needs to be invalidated
1496 : */
1497 : static void
1498 42 : ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
1499 : bool terminating,
1500 : int pid,
1501 : NameData slotname,
1502 : XLogRecPtr restart_lsn,
1503 : XLogRecPtr oldestLSN,
1504 : TransactionId snapshotConflictHorizon)
1505 : {
1506 : StringInfoData err_detail;
1507 42 : bool hint = false;
1508 :
1509 42 : initStringInfo(&err_detail);
1510 :
1511 42 : switch (cause)
1512 : {
1513 12 : case RS_INVAL_WAL_REMOVED:
1514 : {
1515 12 : unsigned long long ex = oldestLSN - restart_lsn;
1516 :
1517 12 : hint = true;
1518 12 : appendStringInfo(&err_detail,
1519 12 : ngettext("The slot's restart_lsn %X/%X exceeds the limit by %llu byte.",
1520 : "The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.",
1521 : ex),
1522 12 : LSN_FORMAT_ARGS(restart_lsn),
1523 : ex);
1524 12 : break;
1525 : }
1526 24 : case RS_INVAL_HORIZON:
1527 24 : appendStringInfo(&err_detail, _("The slot conflicted with xid horizon %u."),
1528 : snapshotConflictHorizon);
1529 24 : break;
1530 :
1531 6 : case RS_INVAL_WAL_LEVEL:
1532 6 : appendStringInfoString(&err_detail, _("Logical decoding on standby requires \"wal_level\" >= \"logical\" on the primary server."));
1533 6 : break;
1534 : case RS_INVAL_NONE:
1535 : pg_unreachable();
1536 : }
1537 :
1538 42 : ereport(LOG,
1539 : terminating ?
1540 : errmsg("terminating process %d to release replication slot \"%s\"",
1541 : pid, NameStr(slotname)) :
1542 : errmsg("invalidating obsolete replication slot \"%s\"",
1543 : NameStr(slotname)),
1544 : errdetail_internal("%s", err_detail.data),
1545 : hint ? errhint("You might need to increase \"%s\".", "max_slot_wal_keep_size") : 0);
1546 :
1547 42 : pfree(err_detail.data);
1548 42 : }
1549 :
1550 : /*
1551 : * Helper for InvalidateObsoleteReplicationSlots
1552 : *
1553 : * Acquires the given slot and mark it invalid, if necessary and possible.
1554 : *
1555 : * Returns whether ReplicationSlotControlLock was released in the interim (and
1556 : * in that case we're not holding the lock at return, otherwise we are).
1557 : *
1558 : * Sets *invalidated true if the slot was invalidated. (Untouched otherwise.)
1559 : *
1560 : * This is inherently racy, because we release the LWLock
1561 : * for syscalls, so caller must restart if we return true.
1562 : */
1563 : static bool
1564 444 : InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
1565 : ReplicationSlot *s,
1566 : XLogRecPtr oldestLSN,
1567 : Oid dboid, TransactionId snapshotConflictHorizon,
1568 : bool *invalidated)
1569 : {
1570 444 : int last_signaled_pid = 0;
1571 444 : bool released_lock = false;
1572 444 : bool terminated = false;
1573 444 : TransactionId initial_effective_xmin = InvalidTransactionId;
1574 444 : TransactionId initial_catalog_effective_xmin = InvalidTransactionId;
1575 444 : XLogRecPtr initial_restart_lsn = InvalidXLogRecPtr;
1576 444 : ReplicationSlotInvalidationCause invalidation_cause_prev PG_USED_FOR_ASSERTS_ONLY = RS_INVAL_NONE;
1577 :
1578 : for (;;)
1579 14 : {
1580 : XLogRecPtr restart_lsn;
1581 : NameData slotname;
1582 458 : int active_pid = 0;
1583 458 : ReplicationSlotInvalidationCause invalidation_cause = RS_INVAL_NONE;
1584 :
1585 : Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
1586 :
1587 458 : if (!s->in_use)
1588 : {
1589 0 : if (released_lock)
1590 0 : LWLockRelease(ReplicationSlotControlLock);
1591 0 : break;
1592 : }
1593 :
1594 : /*
1595 : * Check if the slot needs to be invalidated. If it needs to be
1596 : * invalidated, and is not currently acquired, acquire it and mark it
1597 : * as having been invalidated. We do this with the spinlock held to
1598 : * avoid race conditions -- for example the restart_lsn could move
1599 : * forward, or the slot could be dropped.
1600 : */
1601 458 : SpinLockAcquire(&s->mutex);
1602 :
1603 458 : restart_lsn = s->data.restart_lsn;
1604 :
1605 : /* we do nothing if the slot is already invalid */
1606 458 : if (s->data.invalidated == RS_INVAL_NONE)
1607 : {
1608 : /*
1609 : * The slot's mutex will be released soon, and it is possible that
1610 : * those values change since the process holding the slot has been
1611 : * terminated (if any), so record them here to ensure that we
1612 : * would report the correct invalidation cause.
1613 : */
1614 380 : if (!terminated)
1615 : {
1616 366 : initial_restart_lsn = s->data.restart_lsn;
1617 366 : initial_effective_xmin = s->effective_xmin;
1618 366 : initial_catalog_effective_xmin = s->effective_catalog_xmin;
1619 : }
1620 :
1621 380 : switch (cause)
1622 : {
1623 326 : case RS_INVAL_WAL_REMOVED:
1624 326 : if (initial_restart_lsn != InvalidXLogRecPtr &&
1625 : initial_restart_lsn < oldestLSN)
1626 12 : invalidation_cause = cause;
1627 326 : break;
1628 48 : case RS_INVAL_HORIZON:
1629 48 : if (!SlotIsLogical(s))
1630 0 : break;
1631 : /* invalid DB oid signals a shared relation */
1632 48 : if (dboid != InvalidOid && dboid != s->data.database)
1633 0 : break;
1634 48 : if (TransactionIdIsValid(initial_effective_xmin) &&
1635 0 : TransactionIdPrecedesOrEquals(initial_effective_xmin,
1636 : snapshotConflictHorizon))
1637 0 : invalidation_cause = cause;
1638 96 : else if (TransactionIdIsValid(initial_catalog_effective_xmin) &&
1639 48 : TransactionIdPrecedesOrEquals(initial_catalog_effective_xmin,
1640 : snapshotConflictHorizon))
1641 24 : invalidation_cause = cause;
1642 48 : break;
1643 6 : case RS_INVAL_WAL_LEVEL:
1644 6 : if (SlotIsLogical(s))
1645 6 : invalidation_cause = cause;
1646 6 : break;
1647 : case RS_INVAL_NONE:
1648 : pg_unreachable();
1649 : }
1650 78 : }
1651 :
1652 : /*
1653 : * The invalidation cause recorded previously should not change while
1654 : * the process owning the slot (if any) has been terminated.
1655 : */
1656 : Assert(!(invalidation_cause_prev != RS_INVAL_NONE && terminated &&
1657 : invalidation_cause_prev != invalidation_cause));
1658 :
1659 : /* if there's no invalidation, we're done */
1660 458 : if (invalidation_cause == RS_INVAL_NONE)
1661 : {
1662 416 : SpinLockRelease(&s->mutex);
1663 416 : if (released_lock)
1664 0 : LWLockRelease(ReplicationSlotControlLock);
1665 416 : break;
1666 : }
1667 :
1668 42 : slotname = s->data.name;
1669 42 : active_pid = s->active_pid;
1670 :
1671 : /*
1672 : * If the slot can be acquired, do so and mark it invalidated
1673 : * immediately. Otherwise we'll signal the owning process, below, and
1674 : * retry.
1675 : */
1676 42 : if (active_pid == 0)
1677 : {
1678 28 : MyReplicationSlot = s;
1679 28 : s->active_pid = MyProcPid;
1680 28 : s->data.invalidated = invalidation_cause;
1681 :
1682 : /*
1683 : * XXX: We should consider not overwriting restart_lsn and instead
1684 : * just rely on .invalidated.
1685 : */
1686 28 : if (invalidation_cause == RS_INVAL_WAL_REMOVED)
1687 8 : s->data.restart_lsn = InvalidXLogRecPtr;
1688 :
1689 : /* Let caller know */
1690 28 : *invalidated = true;
1691 : }
1692 :
1693 42 : SpinLockRelease(&s->mutex);
1694 :
1695 : /*
1696 : * The logical replication slots shouldn't be invalidated as GUC
1697 : * max_slot_wal_keep_size is set to -1 during the binary upgrade. See
1698 : * check_old_cluster_for_valid_slots() where we ensure that no
1699 : * invalidated before the upgrade.
1700 : */
1701 : Assert(!(*invalidated && SlotIsLogical(s) && IsBinaryUpgrade));
1702 :
1703 42 : if (active_pid != 0)
1704 : {
1705 : /*
1706 : * Prepare the sleep on the slot's condition variable before
1707 : * releasing the lock, to close a possible race condition if the
1708 : * slot is released before the sleep below.
1709 : */
1710 14 : ConditionVariablePrepareToSleep(&s->active_cv);
1711 :
1712 14 : LWLockRelease(ReplicationSlotControlLock);
1713 14 : released_lock = true;
1714 :
1715 : /*
1716 : * Signal to terminate the process that owns the slot, if we
1717 : * haven't already signalled it. (Avoidance of repeated
1718 : * signalling is the only reason for there to be a loop in this
1719 : * routine; otherwise we could rely on caller's restart loop.)
1720 : *
1721 : * There is the race condition that other process may own the slot
1722 : * after its current owner process is terminated and before this
1723 : * process owns it. To handle that, we signal only if the PID of
1724 : * the owning process has changed from the previous time. (This
1725 : * logic assumes that the same PID is not reused very quickly.)
1726 : */
1727 14 : if (last_signaled_pid != active_pid)
1728 : {
1729 14 : ReportSlotInvalidation(invalidation_cause, true, active_pid,
1730 : slotname, restart_lsn,
1731 : oldestLSN, snapshotConflictHorizon);
1732 :
1733 14 : if (MyBackendType == B_STARTUP)
1734 10 : (void) SendProcSignal(active_pid,
1735 : PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT,
1736 : INVALID_PROC_NUMBER);
1737 : else
1738 4 : (void) kill(active_pid, SIGTERM);
1739 :
1740 14 : last_signaled_pid = active_pid;
1741 14 : terminated = true;
1742 14 : invalidation_cause_prev = invalidation_cause;
1743 : }
1744 :
1745 : /* Wait until the slot is released. */
1746 14 : ConditionVariableSleep(&s->active_cv,
1747 : WAIT_EVENT_REPLICATION_SLOT_DROP);
1748 :
1749 : /*
1750 : * Re-acquire lock and start over; we expect to invalidate the
1751 : * slot next time (unless another process acquires the slot in the
1752 : * meantime).
1753 : */
1754 14 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1755 14 : continue;
1756 : }
1757 : else
1758 : {
1759 : /*
1760 : * We hold the slot now and have already invalidated it; flush it
1761 : * to ensure that state persists.
1762 : *
1763 : * Don't want to hold ReplicationSlotControlLock across file
1764 : * system operations, so release it now but be sure to tell caller
1765 : * to restart from scratch.
1766 : */
1767 28 : LWLockRelease(ReplicationSlotControlLock);
1768 28 : released_lock = true;
1769 :
1770 : /* Make sure the invalidated state persists across server restart */
1771 28 : ReplicationSlotMarkDirty();
1772 28 : ReplicationSlotSave();
1773 28 : ReplicationSlotRelease();
1774 :
1775 28 : ReportSlotInvalidation(invalidation_cause, false, active_pid,
1776 : slotname, restart_lsn,
1777 : oldestLSN, snapshotConflictHorizon);
1778 :
1779 : /* done with this slot for now */
1780 28 : break;
1781 : }
1782 : }
1783 :
1784 : Assert(released_lock == !LWLockHeldByMe(ReplicationSlotControlLock));
1785 :
1786 444 : return released_lock;
1787 : }
1788 :
1789 : /*
1790 : * Invalidate slots that require resources about to be removed.
1791 : *
1792 : * Returns true when any slot have got invalidated.
1793 : *
1794 : * Whether a slot needs to be invalidated depends on the cause. A slot is
1795 : * removed if it:
1796 : * - RS_INVAL_WAL_REMOVED: requires a LSN older than the given segment
1797 : * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given
1798 : * db; dboid may be InvalidOid for shared relations
1799 : * - RS_INVAL_WAL_LEVEL: is logical
1800 : *
1801 : * NB - this runs as part of checkpoint, so avoid raising errors if possible.
1802 : */
1803 : bool
1804 1786 : InvalidateObsoleteReplicationSlots(ReplicationSlotInvalidationCause cause,
1805 : XLogSegNo oldestSegno, Oid dboid,
1806 : TransactionId snapshotConflictHorizon)
1807 : {
1808 : XLogRecPtr oldestLSN;
1809 1786 : bool invalidated = false;
1810 :
1811 : Assert(cause != RS_INVAL_HORIZON || TransactionIdIsValid(snapshotConflictHorizon));
1812 : Assert(cause != RS_INVAL_WAL_REMOVED || oldestSegno > 0);
1813 : Assert(cause != RS_INVAL_NONE);
1814 :
1815 1786 : if (max_replication_slots == 0)
1816 2 : return invalidated;
1817 :
1818 1784 : XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
1819 :
1820 1812 : restart:
1821 1812 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1822 18946 : for (int i = 0; i < max_replication_slots; i++)
1823 : {
1824 17162 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
1825 :
1826 17162 : if (!s->in_use)
1827 16718 : continue;
1828 :
1829 444 : if (InvalidatePossiblyObsoleteSlot(cause, s, oldestLSN, dboid,
1830 : snapshotConflictHorizon,
1831 : &invalidated))
1832 : {
1833 : /* if the lock was released, start from scratch */
1834 28 : goto restart;
1835 : }
1836 : }
1837 1784 : LWLockRelease(ReplicationSlotControlLock);
1838 :
1839 : /*
1840 : * If any slots have been invalidated, recalculate the resource limits.
1841 : */
1842 1784 : if (invalidated)
1843 : {
1844 18 : ReplicationSlotsComputeRequiredXmin(false);
1845 18 : ReplicationSlotsComputeRequiredLSN();
1846 : }
1847 :
1848 1784 : return invalidated;
1849 : }
1850 :
1851 : /*
1852 : * Flush all replication slots to disk.
1853 : *
1854 : * It is convenient to flush dirty replication slots at the time of checkpoint.
1855 : * Additionally, in case of a shutdown checkpoint, we also identify the slots
1856 : * for which the confirmed_flush LSN has been updated since the last time it
1857 : * was saved and flush them.
1858 : */
1859 : void
1860 1744 : CheckPointReplicationSlots(bool is_shutdown)
1861 : {
1862 : int i;
1863 :
1864 1744 : elog(DEBUG1, "performing replication slot checkpoint");
1865 :
1866 : /*
1867 : * Prevent any slot from being created/dropped while we're active. As we
1868 : * explicitly do *not* want to block iterating over replication_slots or
1869 : * acquiring a slot we cannot take the control lock - but that's OK,
1870 : * because holding ReplicationSlotAllocationLock is strictly stronger, and
1871 : * enough to guarantee that nobody can change the in_use bits on us.
1872 : */
1873 1744 : LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
1874 :
1875 18646 : for (i = 0; i < max_replication_slots; i++)
1876 : {
1877 16902 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
1878 : char path[MAXPGPATH];
1879 :
1880 16902 : if (!s->in_use)
1881 16568 : continue;
1882 :
1883 : /* save the slot to disk, locking is handled in SaveSlotToPath() */
1884 334 : sprintf(path, "pg_replslot/%s", NameStr(s->data.name));
1885 :
1886 : /*
1887 : * Slot's data is not flushed each time the confirmed_flush LSN is
1888 : * updated as that could lead to frequent writes. However, we decide
1889 : * to force a flush of all logical slot's data at the time of shutdown
1890 : * if the confirmed_flush LSN is changed since we last flushed it to
1891 : * disk. This helps in avoiding an unnecessary retreat of the
1892 : * confirmed_flush LSN after restart.
1893 : */
1894 334 : if (is_shutdown && SlotIsLogical(s))
1895 : {
1896 120 : SpinLockAcquire(&s->mutex);
1897 :
1898 120 : if (s->data.invalidated == RS_INVAL_NONE &&
1899 120 : s->data.confirmed_flush > s->last_saved_confirmed_flush)
1900 : {
1901 68 : s->just_dirtied = true;
1902 68 : s->dirty = true;
1903 : }
1904 120 : SpinLockRelease(&s->mutex);
1905 : }
1906 :
1907 334 : SaveSlotToPath(s, path, LOG);
1908 : }
1909 1744 : LWLockRelease(ReplicationSlotAllocationLock);
1910 1744 : }
1911 :
1912 : /*
1913 : * Load all replication slots from disk into memory at server startup. This
1914 : * needs to be run before we start crash recovery.
1915 : */
1916 : void
1917 1542 : StartupReplicationSlots(void)
1918 : {
1919 : DIR *replication_dir;
1920 : struct dirent *replication_de;
1921 :
1922 1542 : elog(DEBUG1, "starting up replication slots");
1923 :
1924 : /* restore all slots by iterating over all on-disk entries */
1925 1542 : replication_dir = AllocateDir("pg_replslot");
1926 4760 : while ((replication_de = ReadDir(replication_dir, "pg_replslot")) != NULL)
1927 : {
1928 : char path[MAXPGPATH + 12];
1929 : PGFileType de_type;
1930 :
1931 3218 : if (strcmp(replication_de->d_name, ".") == 0 ||
1932 1676 : strcmp(replication_de->d_name, "..") == 0)
1933 3084 : continue;
1934 :
1935 134 : snprintf(path, sizeof(path), "pg_replslot/%s", replication_de->d_name);
1936 134 : de_type = get_dirent_type(path, replication_de, false, DEBUG1);
1937 :
1938 : /* we're only creating directories here, skip if it's not our's */
1939 134 : if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_DIR)
1940 0 : continue;
1941 :
1942 : /* we crashed while a slot was being setup or deleted, clean up */
1943 134 : if (pg_str_endswith(replication_de->d_name, ".tmp"))
1944 : {
1945 0 : if (!rmtree(path, true))
1946 : {
1947 0 : ereport(WARNING,
1948 : (errmsg("could not remove directory \"%s\"",
1949 : path)));
1950 0 : continue;
1951 : }
1952 0 : fsync_fname("pg_replslot", true);
1953 0 : continue;
1954 : }
1955 :
1956 : /* looks like a slot in a normal state, restore */
1957 134 : RestoreSlotFromDisk(replication_de->d_name);
1958 : }
1959 1542 : FreeDir(replication_dir);
1960 :
1961 : /* currently no slots exist, we're done. */
1962 1542 : if (max_replication_slots <= 0)
1963 2 : return;
1964 :
1965 : /* Now that we have recovered all the data, compute replication xmin */
1966 1540 : ReplicationSlotsComputeRequiredXmin(false);
1967 1540 : ReplicationSlotsComputeRequiredLSN();
1968 : }
1969 :
1970 : /* ----
1971 : * Manipulation of on-disk state of replication slots
1972 : *
1973 : * NB: none of the routines below should take any notice whether a slot is the
1974 : * current one or not, that's all handled a layer above.
1975 : * ----
1976 : */
1977 : static void
1978 1130 : CreateSlotOnDisk(ReplicationSlot *slot)
1979 : {
1980 : char tmppath[MAXPGPATH];
1981 : char path[MAXPGPATH];
1982 : struct stat st;
1983 :
1984 : /*
1985 : * No need to take out the io_in_progress_lock, nobody else can see this
1986 : * slot yet, so nobody else will write. We're reusing SaveSlotToPath which
1987 : * takes out the lock, if we'd take the lock here, we'd deadlock.
1988 : */
1989 :
1990 1130 : sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
1991 1130 : sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));
1992 :
1993 : /*
1994 : * It's just barely possible that some previous effort to create or drop a
1995 : * slot with this name left a temp directory lying around. If that seems
1996 : * to be the case, try to remove it. If the rmtree() fails, we'll error
1997 : * out at the MakePGDirectory() below, so we don't bother checking
1998 : * success.
1999 : */
2000 1130 : if (stat(tmppath, &st) == 0 && S_ISDIR(st.st_mode))
2001 0 : rmtree(tmppath, true);
2002 :
2003 : /* Create and fsync the temporary slot directory. */
2004 1130 : if (MakePGDirectory(tmppath) < 0)
2005 0 : ereport(ERROR,
2006 : (errcode_for_file_access(),
2007 : errmsg("could not create directory \"%s\": %m",
2008 : tmppath)));
2009 1130 : fsync_fname(tmppath, true);
2010 :
2011 : /* Write the actual state file. */
2012 1130 : slot->dirty = true; /* signal that we really need to write */
2013 1130 : SaveSlotToPath(slot, tmppath, ERROR);
2014 :
2015 : /* Rename the directory into place. */
2016 1130 : if (rename(tmppath, path) != 0)
2017 0 : ereport(ERROR,
2018 : (errcode_for_file_access(),
2019 : errmsg("could not rename file \"%s\" to \"%s\": %m",
2020 : tmppath, path)));
2021 :
2022 : /*
2023 : * If we'd now fail - really unlikely - we wouldn't know whether this slot
2024 : * would persist after an OS crash or not - so, force a restart. The
2025 : * restart would try to fsync this again till it works.
2026 : */
2027 1130 : START_CRIT_SECTION();
2028 :
2029 1130 : fsync_fname(path, true);
2030 1130 : fsync_fname("pg_replslot", true);
2031 :
2032 1130 : END_CRIT_SECTION();
2033 1130 : }
2034 :
2035 : /*
2036 : * Shared functionality between saving and creating a replication slot.
2037 : */
2038 : static void
2039 3720 : SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
2040 : {
2041 : char tmppath[MAXPGPATH];
2042 : char path[MAXPGPATH];
2043 : int fd;
2044 : ReplicationSlotOnDisk cp;
2045 : bool was_dirty;
2046 :
2047 : /* first check whether there's something to write out */
2048 3720 : SpinLockAcquire(&slot->mutex);
2049 3720 : was_dirty = slot->dirty;
2050 3720 : slot->just_dirtied = false;
2051 3720 : SpinLockRelease(&slot->mutex);
2052 :
2053 : /* and don't do anything if there's nothing to write */
2054 3720 : if (!was_dirty)
2055 172 : return;
2056 :
2057 3548 : LWLockAcquire(&slot->io_in_progress_lock, LW_EXCLUSIVE);
2058 :
2059 : /* silence valgrind :( */
2060 3548 : memset(&cp, 0, sizeof(ReplicationSlotOnDisk));
2061 :
2062 3548 : sprintf(tmppath, "%s/state.tmp", dir);
2063 3548 : sprintf(path, "%s/state", dir);
2064 :
2065 3548 : fd = OpenTransientFile(tmppath, O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
2066 3548 : if (fd < 0)
2067 : {
2068 : /*
2069 : * If not an ERROR, then release the lock before returning. In case
2070 : * of an ERROR, the error recovery path automatically releases the
2071 : * lock, but no harm in explicitly releasing even in that case. Note
2072 : * that LWLockRelease() could affect errno.
2073 : */
2074 0 : int save_errno = errno;
2075 :
2076 0 : LWLockRelease(&slot->io_in_progress_lock);
2077 0 : errno = save_errno;
2078 0 : ereport(elevel,
2079 : (errcode_for_file_access(),
2080 : errmsg("could not create file \"%s\": %m",
2081 : tmppath)));
2082 0 : return;
2083 : }
2084 :
2085 3548 : cp.magic = SLOT_MAGIC;
2086 3548 : INIT_CRC32C(cp.checksum);
2087 3548 : cp.version = SLOT_VERSION;
2088 3548 : cp.length = ReplicationSlotOnDiskV2Size;
2089 :
2090 3548 : SpinLockAcquire(&slot->mutex);
2091 :
2092 3548 : memcpy(&cp.slotdata, &slot->data, sizeof(ReplicationSlotPersistentData));
2093 :
2094 3548 : SpinLockRelease(&slot->mutex);
2095 :
2096 3548 : COMP_CRC32C(cp.checksum,
2097 : (char *) (&cp) + ReplicationSlotOnDiskNotChecksummedSize,
2098 : ReplicationSlotOnDiskChecksummedSize);
2099 3548 : FIN_CRC32C(cp.checksum);
2100 :
2101 3548 : errno = 0;
2102 3548 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_WRITE);
2103 3548 : if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
2104 : {
2105 0 : int save_errno = errno;
2106 :
2107 0 : pgstat_report_wait_end();
2108 0 : CloseTransientFile(fd);
2109 0 : LWLockRelease(&slot->io_in_progress_lock);
2110 :
2111 : /* if write didn't set errno, assume problem is no disk space */
2112 0 : errno = save_errno ? save_errno : ENOSPC;
2113 0 : ereport(elevel,
2114 : (errcode_for_file_access(),
2115 : errmsg("could not write to file \"%s\": %m",
2116 : tmppath)));
2117 0 : return;
2118 : }
2119 3548 : pgstat_report_wait_end();
2120 :
2121 : /* fsync the temporary file */
2122 3548 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_SYNC);
2123 3548 : if (pg_fsync(fd) != 0)
2124 : {
2125 0 : int save_errno = errno;
2126 :
2127 0 : pgstat_report_wait_end();
2128 0 : CloseTransientFile(fd);
2129 0 : LWLockRelease(&slot->io_in_progress_lock);
2130 0 : errno = save_errno;
2131 0 : ereport(elevel,
2132 : (errcode_for_file_access(),
2133 : errmsg("could not fsync file \"%s\": %m",
2134 : tmppath)));
2135 0 : return;
2136 : }
2137 3548 : pgstat_report_wait_end();
2138 :
2139 3548 : if (CloseTransientFile(fd) != 0)
2140 : {
2141 0 : int save_errno = errno;
2142 :
2143 0 : LWLockRelease(&slot->io_in_progress_lock);
2144 0 : errno = save_errno;
2145 0 : ereport(elevel,
2146 : (errcode_for_file_access(),
2147 : errmsg("could not close file \"%s\": %m",
2148 : tmppath)));
2149 0 : return;
2150 : }
2151 :
2152 : /* rename to permanent file, fsync file and directory */
2153 3548 : if (rename(tmppath, path) != 0)
2154 : {
2155 0 : int save_errno = errno;
2156 :
2157 0 : LWLockRelease(&slot->io_in_progress_lock);
2158 0 : errno = save_errno;
2159 0 : ereport(elevel,
2160 : (errcode_for_file_access(),
2161 : errmsg("could not rename file \"%s\" to \"%s\": %m",
2162 : tmppath, path)));
2163 0 : return;
2164 : }
2165 :
2166 : /*
2167 : * Check CreateSlotOnDisk() for the reasoning of using a critical section.
2168 : */
2169 3548 : START_CRIT_SECTION();
2170 :
2171 3548 : fsync_fname(path, false);
2172 3548 : fsync_fname(dir, true);
2173 3548 : fsync_fname("pg_replslot", true);
2174 :
2175 3548 : END_CRIT_SECTION();
2176 :
2177 : /*
2178 : * Successfully wrote, unset dirty bit, unless somebody dirtied again
2179 : * already and remember the confirmed_flush LSN value.
2180 : */
2181 3548 : SpinLockAcquire(&slot->mutex);
2182 3548 : if (!slot->just_dirtied)
2183 3548 : slot->dirty = false;
2184 3548 : slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
2185 3548 : SpinLockRelease(&slot->mutex);
2186 :
2187 3548 : LWLockRelease(&slot->io_in_progress_lock);
2188 : }
2189 :
2190 : /*
2191 : * Load a single slot from disk into memory.
2192 : */
2193 : static void
2194 134 : RestoreSlotFromDisk(const char *name)
2195 : {
2196 : ReplicationSlotOnDisk cp;
2197 : int i;
2198 : char slotdir[MAXPGPATH + 12];
2199 : char path[MAXPGPATH + 22];
2200 : int fd;
2201 134 : bool restored = false;
2202 : int readBytes;
2203 : pg_crc32c checksum;
2204 :
2205 : /* no need to lock here, no concurrent access allowed yet */
2206 :
2207 : /* delete temp file if it exists */
2208 134 : sprintf(slotdir, "pg_replslot/%s", name);
2209 134 : sprintf(path, "%s/state.tmp", slotdir);
2210 134 : if (unlink(path) < 0 && errno != ENOENT)
2211 0 : ereport(PANIC,
2212 : (errcode_for_file_access(),
2213 : errmsg("could not remove file \"%s\": %m", path)));
2214 :
2215 134 : sprintf(path, "%s/state", slotdir);
2216 :
2217 134 : elog(DEBUG1, "restoring replication slot from \"%s\"", path);
2218 :
2219 : /* on some operating systems fsyncing a file requires O_RDWR */
2220 134 : fd = OpenTransientFile(path, O_RDWR | PG_BINARY);
2221 :
2222 : /*
2223 : * We do not need to handle this as we are rename()ing the directory into
2224 : * place only after we fsync()ed the state file.
2225 : */
2226 134 : if (fd < 0)
2227 0 : ereport(PANIC,
2228 : (errcode_for_file_access(),
2229 : errmsg("could not open file \"%s\": %m", path)));
2230 :
2231 : /*
2232 : * Sync state file before we're reading from it. We might have crashed
2233 : * while it wasn't synced yet and we shouldn't continue on that basis.
2234 : */
2235 134 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_RESTORE_SYNC);
2236 134 : if (pg_fsync(fd) != 0)
2237 0 : ereport(PANIC,
2238 : (errcode_for_file_access(),
2239 : errmsg("could not fsync file \"%s\": %m",
2240 : path)));
2241 134 : pgstat_report_wait_end();
2242 :
2243 : /* Also sync the parent directory */
2244 134 : START_CRIT_SECTION();
2245 134 : fsync_fname(slotdir, true);
2246 134 : END_CRIT_SECTION();
2247 :
2248 : /* read part of statefile that's guaranteed to be version independent */
2249 134 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
2250 134 : readBytes = read(fd, &cp, ReplicationSlotOnDiskConstantSize);
2251 134 : pgstat_report_wait_end();
2252 134 : if (readBytes != ReplicationSlotOnDiskConstantSize)
2253 : {
2254 0 : if (readBytes < 0)
2255 0 : ereport(PANIC,
2256 : (errcode_for_file_access(),
2257 : errmsg("could not read file \"%s\": %m", path)));
2258 : else
2259 0 : ereport(PANIC,
2260 : (errcode(ERRCODE_DATA_CORRUPTED),
2261 : errmsg("could not read file \"%s\": read %d of %zu",
2262 : path, readBytes,
2263 : (Size) ReplicationSlotOnDiskConstantSize)));
2264 : }
2265 :
2266 : /* verify magic */
2267 134 : if (cp.magic != SLOT_MAGIC)
2268 0 : ereport(PANIC,
2269 : (errcode(ERRCODE_DATA_CORRUPTED),
2270 : errmsg("replication slot file \"%s\" has wrong magic number: %u instead of %u",
2271 : path, cp.magic, SLOT_MAGIC)));
2272 :
2273 : /* verify version */
2274 134 : if (cp.version != SLOT_VERSION)
2275 0 : ereport(PANIC,
2276 : (errcode(ERRCODE_DATA_CORRUPTED),
2277 : errmsg("replication slot file \"%s\" has unsupported version %u",
2278 : path, cp.version)));
2279 :
2280 : /* boundary check on length */
2281 134 : if (cp.length != ReplicationSlotOnDiskV2Size)
2282 0 : ereport(PANIC,
2283 : (errcode(ERRCODE_DATA_CORRUPTED),
2284 : errmsg("replication slot file \"%s\" has corrupted length %u",
2285 : path, cp.length)));
2286 :
2287 : /* Now that we know the size, read the entire file */
2288 134 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
2289 268 : readBytes = read(fd,
2290 : (char *) &cp + ReplicationSlotOnDiskConstantSize,
2291 134 : cp.length);
2292 134 : pgstat_report_wait_end();
2293 134 : if (readBytes != cp.length)
2294 : {
2295 0 : if (readBytes < 0)
2296 0 : ereport(PANIC,
2297 : (errcode_for_file_access(),
2298 : errmsg("could not read file \"%s\": %m", path)));
2299 : else
2300 0 : ereport(PANIC,
2301 : (errcode(ERRCODE_DATA_CORRUPTED),
2302 : errmsg("could not read file \"%s\": read %d of %zu",
2303 : path, readBytes, (Size) cp.length)));
2304 : }
2305 :
2306 134 : if (CloseTransientFile(fd) != 0)
2307 0 : ereport(PANIC,
2308 : (errcode_for_file_access(),
2309 : errmsg("could not close file \"%s\": %m", path)));
2310 :
2311 : /* now verify the CRC */
2312 134 : INIT_CRC32C(checksum);
2313 134 : COMP_CRC32C(checksum,
2314 : (char *) &cp + ReplicationSlotOnDiskNotChecksummedSize,
2315 : ReplicationSlotOnDiskChecksummedSize);
2316 134 : FIN_CRC32C(checksum);
2317 :
2318 134 : if (!EQ_CRC32C(checksum, cp.checksum))
2319 0 : ereport(PANIC,
2320 : (errmsg("checksum mismatch for replication slot file \"%s\": is %u, should be %u",
2321 : path, checksum, cp.checksum)));
2322 :
2323 : /*
2324 : * If we crashed with an ephemeral slot active, don't restore but delete
2325 : * it.
2326 : */
2327 134 : if (cp.slotdata.persistency != RS_PERSISTENT)
2328 : {
2329 0 : if (!rmtree(slotdir, true))
2330 : {
2331 0 : ereport(WARNING,
2332 : (errmsg("could not remove directory \"%s\"",
2333 : slotdir)));
2334 : }
2335 0 : fsync_fname("pg_replslot", true);
2336 0 : return;
2337 : }
2338 :
2339 : /*
2340 : * Verify that requirements for the specific slot type are met. That's
2341 : * important because if these aren't met we're not guaranteed to retain
2342 : * all the necessary resources for the slot.
2343 : *
2344 : * NB: We have to do so *after* the above checks for ephemeral slots,
2345 : * because otherwise a slot that shouldn't exist anymore could prevent
2346 : * restarts.
2347 : *
2348 : * NB: Changing the requirements here also requires adapting
2349 : * CheckSlotRequirements() and CheckLogicalDecodingRequirements().
2350 : */
2351 134 : if (cp.slotdata.database != InvalidOid && wal_level < WAL_LEVEL_LOGICAL)
2352 0 : ereport(FATAL,
2353 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2354 : errmsg("logical replication slot \"%s\" exists, but \"wal_level\" < \"logical\"",
2355 : NameStr(cp.slotdata.name)),
2356 : errhint("Change \"wal_level\" to be \"logical\" or higher.")));
2357 134 : else if (wal_level < WAL_LEVEL_REPLICA)
2358 0 : ereport(FATAL,
2359 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2360 : errmsg("physical replication slot \"%s\" exists, but \"wal_level\" < \"replica\"",
2361 : NameStr(cp.slotdata.name)),
2362 : errhint("Change \"wal_level\" to be \"replica\" or higher.")));
2363 :
2364 : /* nothing can be active yet, don't lock anything */
2365 186 : for (i = 0; i < max_replication_slots; i++)
2366 : {
2367 : ReplicationSlot *slot;
2368 :
2369 186 : slot = &ReplicationSlotCtl->replication_slots[i];
2370 :
2371 186 : if (slot->in_use)
2372 52 : continue;
2373 :
2374 : /* restore the entire set of persistent data */
2375 134 : memcpy(&slot->data, &cp.slotdata,
2376 : sizeof(ReplicationSlotPersistentData));
2377 :
2378 : /* initialize in memory state */
2379 134 : slot->effective_xmin = cp.slotdata.xmin;
2380 134 : slot->effective_catalog_xmin = cp.slotdata.catalog_xmin;
2381 134 : slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
2382 :
2383 134 : slot->candidate_catalog_xmin = InvalidTransactionId;
2384 134 : slot->candidate_xmin_lsn = InvalidXLogRecPtr;
2385 134 : slot->candidate_restart_lsn = InvalidXLogRecPtr;
2386 134 : slot->candidate_restart_valid = InvalidXLogRecPtr;
2387 :
2388 134 : slot->in_use = true;
2389 134 : slot->active_pid = 0;
2390 :
2391 : /*
2392 : * Set the time since the slot has become inactive after loading the
2393 : * slot from the disk into memory. Whoever acquires the slot i.e.
2394 : * makes the slot active will reset it.
2395 : */
2396 134 : slot->inactive_since = GetCurrentTimestamp();
2397 :
2398 134 : restored = true;
2399 134 : break;
2400 : }
2401 :
2402 134 : if (!restored)
2403 0 : ereport(FATAL,
2404 : (errmsg("too many replication slots active before shutdown"),
2405 : errhint("Increase \"max_replication_slots\" and try again.")));
2406 : }
2407 :
2408 : /*
2409 : * Maps an invalidation reason for a replication slot to
2410 : * ReplicationSlotInvalidationCause.
2411 : */
2412 : ReplicationSlotInvalidationCause
2413 0 : GetSlotInvalidationCause(const char *invalidation_reason)
2414 : {
2415 : ReplicationSlotInvalidationCause cause;
2416 0 : ReplicationSlotInvalidationCause result = RS_INVAL_NONE;
2417 0 : bool found PG_USED_FOR_ASSERTS_ONLY = false;
2418 :
2419 : Assert(invalidation_reason);
2420 :
2421 0 : for (cause = RS_INVAL_NONE; cause <= RS_INVAL_MAX_CAUSES; cause++)
2422 : {
2423 0 : if (strcmp(SlotInvalidationCauses[cause], invalidation_reason) == 0)
2424 : {
2425 0 : found = true;
2426 0 : result = cause;
2427 0 : break;
2428 : }
2429 : }
2430 :
2431 : Assert(found);
2432 0 : return result;
2433 : }
2434 :
2435 : /*
2436 : * A helper function to validate slots specified in GUC synchronized_standby_slots.
2437 : *
2438 : * The rawname will be parsed, and the result will be saved into *elemlist.
2439 : */
2440 : static bool
2441 12 : validate_sync_standby_slots(char *rawname, List **elemlist)
2442 : {
2443 : bool ok;
2444 :
2445 : /* Verify syntax and parse string into a list of identifiers */
2446 12 : ok = SplitIdentifierString(rawname, ',', elemlist);
2447 :
2448 12 : if (!ok)
2449 : {
2450 0 : GUC_check_errdetail("List syntax is invalid.");
2451 : }
2452 12 : else if (!ReplicationSlotCtl)
2453 : {
2454 : /*
2455 : * We cannot validate the replication slot if the replication slots'
2456 : * data has not been initialized. This is ok as we will anyway
2457 : * validate the specified slot when waiting for them to catch up. See
2458 : * StandbySlotsHaveCaughtup() for details.
2459 : */
2460 : }
2461 : else
2462 : {
2463 : /* Check that the specified slots exist and are logical slots */
2464 12 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
2465 :
2466 36 : foreach_ptr(char, name, *elemlist)
2467 : {
2468 : ReplicationSlot *slot;
2469 :
2470 12 : slot = SearchNamedReplicationSlot(name, false);
2471 :
2472 12 : if (!slot)
2473 : {
2474 0 : GUC_check_errdetail("replication slot \"%s\" does not exist",
2475 : name);
2476 0 : ok = false;
2477 0 : break;
2478 : }
2479 :
2480 12 : if (!SlotIsPhysical(slot))
2481 : {
2482 0 : GUC_check_errdetail("\"%s\" is not a physical replication slot",
2483 : name);
2484 0 : ok = false;
2485 0 : break;
2486 : }
2487 : }
2488 :
2489 12 : LWLockRelease(ReplicationSlotControlLock);
2490 : }
2491 :
2492 12 : return ok;
2493 : }
2494 :
2495 : /*
2496 : * GUC check_hook for synchronized_standby_slots
2497 : */
2498 : bool
2499 1872 : check_synchronized_standby_slots(char **newval, void **extra, GucSource source)
2500 : {
2501 : char *rawname;
2502 : char *ptr;
2503 : List *elemlist;
2504 : int size;
2505 : bool ok;
2506 : SyncStandbySlotsConfigData *config;
2507 :
2508 1872 : if ((*newval)[0] == '\0')
2509 1860 : return true;
2510 :
2511 : /* Need a modifiable copy of the GUC string */
2512 12 : rawname = pstrdup(*newval);
2513 :
2514 : /* Now verify if the specified slots exist and have correct type */
2515 12 : ok = validate_sync_standby_slots(rawname, &elemlist);
2516 :
2517 12 : if (!ok || elemlist == NIL)
2518 : {
2519 0 : pfree(rawname);
2520 0 : list_free(elemlist);
2521 0 : return ok;
2522 : }
2523 :
2524 : /* Compute the size required for the SyncStandbySlotsConfigData struct */
2525 12 : size = offsetof(SyncStandbySlotsConfigData, slot_names);
2526 36 : foreach_ptr(char, slot_name, elemlist)
2527 12 : size += strlen(slot_name) + 1;
2528 :
2529 : /* GUC extra value must be guc_malloc'd, not palloc'd */
2530 12 : config = (SyncStandbySlotsConfigData *) guc_malloc(LOG, size);
2531 :
2532 : /* Transform the data into SyncStandbySlotsConfigData */
2533 12 : config->nslotnames = list_length(elemlist);
2534 :
2535 12 : ptr = config->slot_names;
2536 36 : foreach_ptr(char, slot_name, elemlist)
2537 : {
2538 12 : strcpy(ptr, slot_name);
2539 12 : ptr += strlen(slot_name) + 1;
2540 : }
2541 :
2542 12 : *extra = (void *) config;
2543 :
2544 12 : pfree(rawname);
2545 12 : list_free(elemlist);
2546 12 : return true;
2547 : }
2548 :
2549 : /*
2550 : * GUC assign_hook for synchronized_standby_slots
2551 : */
2552 : void
2553 1872 : assign_synchronized_standby_slots(const char *newval, void *extra)
2554 : {
2555 : /*
2556 : * The standby slots may have changed, so we must recompute the oldest
2557 : * LSN.
2558 : */
2559 1872 : ss_oldest_flush_lsn = InvalidXLogRecPtr;
2560 :
2561 1872 : synchronized_standby_slots_config = (SyncStandbySlotsConfigData *) extra;
2562 1872 : }
2563 :
2564 : /*
2565 : * Check if the passed slot_name is specified in the synchronized_standby_slots GUC.
2566 : */
2567 : bool
2568 37524 : SlotExistsInSyncStandbySlots(const char *slot_name)
2569 : {
2570 : const char *standby_slot_name;
2571 :
2572 : /* Return false if there is no value in synchronized_standby_slots */
2573 37524 : if (synchronized_standby_slots_config == NULL)
2574 37514 : return false;
2575 :
2576 : /*
2577 : * XXX: We are not expecting this list to be long so a linear search
2578 : * shouldn't hurt but if that turns out not to be true then we can cache
2579 : * this information for each WalSender as well.
2580 : */
2581 10 : standby_slot_name = synchronized_standby_slots_config->slot_names;
2582 10 : for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
2583 : {
2584 10 : if (strcmp(standby_slot_name, slot_name) == 0)
2585 10 : return true;
2586 :
2587 0 : standby_slot_name += strlen(standby_slot_name) + 1;
2588 : }
2589 :
2590 0 : return false;
2591 : }
2592 :
2593 : /*
2594 : * Return true if the slots specified in synchronized_standby_slots have caught up to
2595 : * the given WAL location, false otherwise.
2596 : *
2597 : * The elevel parameter specifies the error level used for logging messages
2598 : * related to slots that do not exist, are invalidated, or are inactive.
2599 : */
2600 : bool
2601 1198 : StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
2602 : {
2603 : const char *name;
2604 1198 : int caught_up_slot_num = 0;
2605 1198 : XLogRecPtr min_restart_lsn = InvalidXLogRecPtr;
2606 :
2607 : /*
2608 : * Don't need to wait for the standbys to catch up if there is no value in
2609 : * synchronized_standby_slots.
2610 : */
2611 1198 : if (synchronized_standby_slots_config == NULL)
2612 1172 : return true;
2613 :
2614 : /*
2615 : * Don't need to wait for the standbys to catch up if we are on a standby
2616 : * server, since we do not support syncing slots to cascading standbys.
2617 : */
2618 26 : if (RecoveryInProgress())
2619 0 : return true;
2620 :
2621 : /*
2622 : * Don't need to wait for the standbys to catch up if they are already
2623 : * beyond the specified WAL location.
2624 : */
2625 26 : if (!XLogRecPtrIsInvalid(ss_oldest_flush_lsn) &&
2626 18 : ss_oldest_flush_lsn >= wait_for_lsn)
2627 10 : return true;
2628 :
2629 : /*
2630 : * To prevent concurrent slot dropping and creation while filtering the
2631 : * slots, take the ReplicationSlotControlLock outside of the loop.
2632 : */
2633 16 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
2634 :
2635 16 : name = synchronized_standby_slots_config->slot_names;
2636 22 : for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
2637 : {
2638 : XLogRecPtr restart_lsn;
2639 : bool invalidated;
2640 : bool inactive;
2641 : ReplicationSlot *slot;
2642 :
2643 16 : slot = SearchNamedReplicationSlot(name, false);
2644 :
2645 16 : if (!slot)
2646 : {
2647 : /*
2648 : * If a slot name provided in synchronized_standby_slots does not
2649 : * exist, report a message and exit the loop. A user can specify a
2650 : * slot name that does not exist just before the server startup.
2651 : * The GUC check_hook(validate_sync_standby_slots) cannot validate
2652 : * such a slot during startup as the ReplicationSlotCtl shared
2653 : * memory is not initialized at that time. It is also possible for
2654 : * a user to drop the slot in synchronized_standby_slots
2655 : * afterwards.
2656 : */
2657 0 : ereport(elevel,
2658 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2659 : errmsg("replication slot \"%s\" specified in parameter %s does not exist",
2660 : name, "synchronized_standby_slots"),
2661 : errdetail("Logical replication is waiting on the standby associated with \"%s\".",
2662 : name),
2663 : errhint("Consider creating the slot \"%s\" or amend parameter %s.",
2664 : name, "synchronized_standby_slots"));
2665 0 : break;
2666 : }
2667 :
2668 16 : if (SlotIsLogical(slot))
2669 : {
2670 : /*
2671 : * If a logical slot name is provided in
2672 : * synchronized_standby_slots, report a message and exit the loop.
2673 : * Similar to the non-existent case, a user can specify a logical
2674 : * slot name in synchronized_standby_slots before the server
2675 : * startup, or drop an existing physical slot and recreate a
2676 : * logical slot with the same name.
2677 : */
2678 0 : ereport(elevel,
2679 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2680 : errmsg("cannot have logical replication slot \"%s\" in parameter %s",
2681 : name, "synchronized_standby_slots"),
2682 : errdetail("Logical replication is waiting for correction on \"%s\".",
2683 : name),
2684 : errhint("Consider removing logical slot \"%s\" from parameter %s.",
2685 : name, "synchronized_standby_slots"));
2686 0 : break;
2687 : }
2688 :
2689 16 : SpinLockAcquire(&slot->mutex);
2690 16 : restart_lsn = slot->data.restart_lsn;
2691 16 : invalidated = slot->data.invalidated != RS_INVAL_NONE;
2692 16 : inactive = slot->active_pid == 0;
2693 16 : SpinLockRelease(&slot->mutex);
2694 :
2695 16 : if (invalidated)
2696 : {
2697 : /* Specified physical slot has been invalidated */
2698 0 : ereport(elevel,
2699 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2700 : errmsg("physical slot \"%s\" specified in parameter %s has been invalidated",
2701 : name, "synchronized_standby_slots"),
2702 : errdetail("Logical replication is waiting on the standby associated with \"%s\".",
2703 : name),
2704 : errhint("Consider dropping and recreating the slot \"%s\" or amend parameter %s.",
2705 : name, "synchronized_standby_slots"));
2706 0 : break;
2707 : }
2708 :
2709 16 : if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn < wait_for_lsn)
2710 : {
2711 : /* Log a message if no active_pid for this physical slot */
2712 10 : if (inactive)
2713 8 : ereport(elevel,
2714 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2715 : errmsg("replication slot \"%s\" specified in parameter %s does not have active_pid",
2716 : name, "synchronized_standby_slots"),
2717 : errdetail("Logical replication is waiting on the standby associated with \"%s\".",
2718 : name),
2719 : errhint("Consider starting standby associated with \"%s\" or amend parameter %s.",
2720 : name, "synchronized_standby_slots"));
2721 :
2722 : /* Continue if the current slot hasn't caught up. */
2723 10 : break;
2724 : }
2725 :
2726 : Assert(restart_lsn >= wait_for_lsn);
2727 :
2728 6 : if (XLogRecPtrIsInvalid(min_restart_lsn) ||
2729 : min_restart_lsn > restart_lsn)
2730 6 : min_restart_lsn = restart_lsn;
2731 :
2732 6 : caught_up_slot_num++;
2733 :
2734 6 : name += strlen(name) + 1;
2735 : }
2736 :
2737 16 : LWLockRelease(ReplicationSlotControlLock);
2738 :
2739 : /*
2740 : * Return false if not all the standbys have caught up to the specified
2741 : * WAL location.
2742 : */
2743 16 : if (caught_up_slot_num != synchronized_standby_slots_config->nslotnames)
2744 10 : return false;
2745 :
2746 : /* The ss_oldest_flush_lsn must not retreat. */
2747 : Assert(XLogRecPtrIsInvalid(ss_oldest_flush_lsn) ||
2748 : min_restart_lsn >= ss_oldest_flush_lsn);
2749 :
2750 6 : ss_oldest_flush_lsn = min_restart_lsn;
2751 :
2752 6 : return true;
2753 : }
2754 :
2755 : /*
2756 : * Wait for physical standbys to confirm receiving the given lsn.
2757 : *
2758 : * Used by logical decoding SQL functions. It waits for physical standbys
2759 : * corresponding to the physical slots specified in the synchronized_standby_slots GUC.
2760 : */
2761 : void
2762 416 : WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
2763 : {
2764 : /*
2765 : * Don't need to wait for the standby to catch up if the current acquired
2766 : * slot is not a logical failover slot, or there is no value in
2767 : * synchronized_standby_slots.
2768 : */
2769 416 : if (!MyReplicationSlot->data.failover || !synchronized_standby_slots_config)
2770 414 : return;
2771 :
2772 2 : ConditionVariablePrepareToSleep(&WalSndCtl->wal_confirm_rcv_cv);
2773 :
2774 : for (;;)
2775 : {
2776 4 : CHECK_FOR_INTERRUPTS();
2777 :
2778 4 : if (ConfigReloadPending)
2779 : {
2780 2 : ConfigReloadPending = false;
2781 2 : ProcessConfigFile(PGC_SIGHUP);
2782 : }
2783 :
2784 : /* Exit if done waiting for every slot. */
2785 4 : if (StandbySlotsHaveCaughtup(wait_for_lsn, WARNING))
2786 2 : break;
2787 :
2788 : /*
2789 : * Wait for the slots in the synchronized_standby_slots to catch up,
2790 : * but use a timeout (1s) so we can also check if the
2791 : * synchronized_standby_slots has been changed.
2792 : */
2793 2 : ConditionVariableTimedSleep(&WalSndCtl->wal_confirm_rcv_cv, 1000,
2794 : WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION);
2795 : }
2796 :
2797 2 : ConditionVariableCancelSleep();
2798 : }
|