Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * slot.c
4 : * Replication slot management.
5 : *
6 : *
7 : * Copyright (c) 2012-2025, PostgreSQL Global Development Group
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/backend/replication/slot.c
12 : *
13 : * NOTES
14 : *
15 : * Replication slots are used to keep state about replication streams
16 : * originating from this cluster. Their primary purpose is to prevent the
17 : * premature removal of WAL or of old tuple versions in a manner that would
18 : * interfere with replication; they are also useful for monitoring purposes.
19 : * Slots need to be permanent (to allow restarts), crash-safe, and allocatable
20 : * on standbys (to support cascading setups). The requirement that slots be
21 : * usable on standbys precludes storing them in the system catalogs.
22 : *
23 : * Each replication slot gets its own directory inside the directory
24 : * $PGDATA / PG_REPLSLOT_DIR. Inside that directory the state file will
25 : * contain the slot's own data. Additional data can be stored alongside that
26 : * file if required. While the server is running, the state data is also
27 : * cached in memory for efficiency.
28 : *
29 : * ReplicationSlotAllocationLock must be taken in exclusive mode to allocate
30 : * or free a slot. ReplicationSlotControlLock must be taken in shared mode
31 : * to iterate over the slots, and in exclusive mode to change the in_use flag
32 : * of a slot. The remaining data in each slot is protected by its mutex.
33 : *
34 : *-------------------------------------------------------------------------
35 : */
36 :
37 : #include "postgres.h"
38 :
39 : #include <unistd.h>
40 : #include <sys/stat.h>
41 :
42 : #include "access/transam.h"
43 : #include "access/xlog_internal.h"
44 : #include "access/xlogrecovery.h"
45 : #include "common/file_utils.h"
46 : #include "common/string.h"
47 : #include "miscadmin.h"
48 : #include "pgstat.h"
49 : #include "postmaster/interrupt.h"
50 : #include "replication/logicallauncher.h"
51 : #include "replication/slotsync.h"
52 : #include "replication/slot.h"
53 : #include "replication/walsender_private.h"
54 : #include "storage/fd.h"
55 : #include "storage/ipc.h"
56 : #include "storage/proc.h"
57 : #include "storage/procarray.h"
58 : #include "utils/builtins.h"
59 : #include "utils/guc_hooks.h"
60 : #include "utils/injection_point.h"
61 : #include "utils/varlena.h"
62 :
63 : /*
64 : * Replication slot on-disk data structure.
65 : */
66 : typedef struct ReplicationSlotOnDisk
67 : {
68 : /* first part of this struct needs to be version independent */
69 :
70 : /* data not covered by checksum */
71 : uint32 magic;
72 : pg_crc32c checksum;
73 :
74 : /* data covered by checksum */
75 : uint32 version;
76 : uint32 length;
77 :
78 : /*
79 : * The actual data in the slot that follows can differ based on the above
80 : * 'version'.
81 : */
82 :
83 : ReplicationSlotPersistentData slotdata;
84 : } ReplicationSlotOnDisk;
85 :
86 : /*
87 : * Struct for the configuration of synchronized_standby_slots.
88 : *
89 : * Note: this must be a flat representation that can be held in a single chunk
90 : * of guc_malloc'd memory, so that it can be stored as the "extra" data for the
91 : * synchronized_standby_slots GUC.
92 : */
93 : typedef struct
94 : {
95 : /* Number of slot names in the slot_names[] */
96 : int nslotnames;
97 :
98 : /*
99 : * slot_names contains 'nslotnames' consecutive null-terminated C strings.
100 : */
101 : char slot_names[FLEXIBLE_ARRAY_MEMBER];
102 : } SyncStandbySlotsConfigData;
103 :
104 : /*
105 : * Lookup table for slot invalidation causes.
106 : */
107 : typedef struct SlotInvalidationCauseMap
108 : {
109 : ReplicationSlotInvalidationCause cause;
110 : const char *cause_name;
111 : } SlotInvalidationCauseMap;
112 :
113 : static const SlotInvalidationCauseMap SlotInvalidationCauses[] = {
114 : {RS_INVAL_NONE, "none"},
115 : {RS_INVAL_WAL_REMOVED, "wal_removed"},
116 : {RS_INVAL_HORIZON, "rows_removed"},
117 : {RS_INVAL_WAL_LEVEL, "wal_level_insufficient"},
118 : {RS_INVAL_IDLE_TIMEOUT, "idle_timeout"},
119 : };
120 :
121 : /*
122 : * Ensure that the lookup table is up-to-date with the enums defined in
123 : * ReplicationSlotInvalidationCause.
124 : */
125 : StaticAssertDecl(lengthof(SlotInvalidationCauses) == (RS_INVAL_MAX_CAUSES + 1),
126 : "array length mismatch");
127 :
128 : /* size of version independent data */
129 : #define ReplicationSlotOnDiskConstantSize \
130 : offsetof(ReplicationSlotOnDisk, slotdata)
131 : /* size of the part of the slot not covered by the checksum */
132 : #define ReplicationSlotOnDiskNotChecksummedSize \
133 : offsetof(ReplicationSlotOnDisk, version)
134 : /* size of the part covered by the checksum */
135 : #define ReplicationSlotOnDiskChecksummedSize \
136 : sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskNotChecksummedSize
137 : /* size of the slot data that is version dependent */
138 : #define ReplicationSlotOnDiskV2Size \
139 : sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
140 :
141 : #define SLOT_MAGIC 0x1051CA1 /* format identifier */
142 : #define SLOT_VERSION 5 /* version for new files */
143 :
144 : /* Control array for replication slot management */
145 : ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
146 :
147 : /* My backend's replication slot in the shared memory array */
148 : ReplicationSlot *MyReplicationSlot = NULL;
149 :
150 : /* GUC variables */
151 : int max_replication_slots = 10; /* the maximum number of replication
152 : * slots */
153 :
154 : /*
155 : * Invalidate replication slots that have remained idle longer than this
156 : * duration; '0' disables it.
157 : */
158 : int idle_replication_slot_timeout_secs = 0;
159 :
160 : /*
161 : * This GUC lists streaming replication standby server slot names that
162 : * logical WAL sender processes will wait for.
163 : */
164 : char *synchronized_standby_slots;
165 :
166 : /* This is the parsed and cached configuration for synchronized_standby_slots */
167 : static SyncStandbySlotsConfigData *synchronized_standby_slots_config;
168 :
169 : /*
170 : * Oldest LSN that has been confirmed to be flushed to the standbys
171 : * corresponding to the physical slots specified in the synchronized_standby_slots GUC.
172 : */
173 : static XLogRecPtr ss_oldest_flush_lsn = InvalidXLogRecPtr;
174 :
175 : static void ReplicationSlotShmemExit(int code, Datum arg);
176 : static bool IsSlotForConflictCheck(const char *name);
177 : static void ReplicationSlotDropPtr(ReplicationSlot *slot);
178 :
179 : /* internal persistency functions */
180 : static void RestoreSlotFromDisk(const char *name);
181 : static void CreateSlotOnDisk(ReplicationSlot *slot);
182 : static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel);
183 :
184 : /*
185 : * Report shared-memory space needed by ReplicationSlotsShmemInit.
186 : */
187 : Size
188 8754 : ReplicationSlotsShmemSize(void)
189 : {
190 8754 : Size size = 0;
191 :
192 8754 : if (max_replication_slots == 0)
193 4 : return size;
194 :
195 8750 : size = offsetof(ReplicationSlotCtlData, replication_slots);
196 8750 : size = add_size(size,
197 : mul_size(max_replication_slots, sizeof(ReplicationSlot)));
198 :
199 8750 : return size;
200 : }
201 :
202 : /*
203 : * Allocate and initialize shared memory for replication slots.
204 : */
205 : void
206 2266 : ReplicationSlotsShmemInit(void)
207 : {
208 : bool found;
209 :
210 2266 : if (max_replication_slots == 0)
211 2 : return;
212 :
213 2264 : ReplicationSlotCtl = (ReplicationSlotCtlData *)
214 2264 : ShmemInitStruct("ReplicationSlot Ctl", ReplicationSlotsShmemSize(),
215 : &found);
216 :
217 2264 : if (!found)
218 : {
219 : int i;
220 :
221 : /* First time through, so initialize */
222 4114 : MemSet(ReplicationSlotCtl, 0, ReplicationSlotsShmemSize());
223 :
224 24510 : for (i = 0; i < max_replication_slots; i++)
225 : {
226 22246 : ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i];
227 :
228 : /* everything else is zeroed by the memset above */
229 22246 : SpinLockInit(&slot->mutex);
230 22246 : LWLockInitialize(&slot->io_in_progress_lock,
231 : LWTRANCHE_REPLICATION_SLOT_IO);
232 22246 : ConditionVariableInit(&slot->active_cv);
233 : }
234 : }
235 : }
236 :
237 : /*
238 : * Register the callback for replication slot cleanup and releasing.
239 : */
240 : void
241 44764 : ReplicationSlotInitialize(void)
242 : {
243 44764 : before_shmem_exit(ReplicationSlotShmemExit, 0);
244 44764 : }
245 :
246 : /*
247 : * Release and cleanup replication slots.
248 : */
249 : static void
250 44764 : ReplicationSlotShmemExit(int code, Datum arg)
251 : {
252 : /* Make sure active replication slots are released */
253 44764 : if (MyReplicationSlot != NULL)
254 542 : ReplicationSlotRelease();
255 :
256 : /* Also cleanup all the temporary slots. */
257 44764 : ReplicationSlotCleanup(false);
258 44764 : }
259 :
260 : /*
261 : * Check whether the passed slot name is valid and report errors at elevel.
262 : *
263 : * See comments for ReplicationSlotValidateNameInternal().
264 : */
265 : bool
266 1618 : ReplicationSlotValidateName(const char *name, bool allow_reserved_name,
267 : int elevel)
268 : {
269 : int err_code;
270 1618 : char *err_msg = NULL;
271 1618 : char *err_hint = NULL;
272 :
273 1618 : if (!ReplicationSlotValidateNameInternal(name, allow_reserved_name,
274 : &err_code, &err_msg, &err_hint))
275 : {
276 : /*
277 : * Use errmsg_internal() and errhint_internal() instead of errmsg()
278 : * and errhint(), since the messages from
279 : * ReplicationSlotValidateNameInternal() are already translated. This
280 : * avoids double translation.
281 : */
282 8 : ereport(elevel,
283 : errcode(err_code),
284 : errmsg_internal("%s", err_msg),
285 : (err_hint != NULL) ? errhint_internal("%s", err_hint) : 0);
286 :
287 0 : pfree(err_msg);
288 0 : if (err_hint != NULL)
289 0 : pfree(err_hint);
290 0 : return false;
291 : }
292 :
293 1610 : return true;
294 : }
295 :
296 : /*
297 : * Check whether the passed slot name is valid.
298 : *
299 : * An error will be reported for a reserved replication slot name if
300 : * allow_reserved_name is set to false.
301 : *
302 : * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow
303 : * the name to be used as a directory name on every supported OS.
304 : *
305 : * Returns true if the slot name is valid. Otherwise, returns false and stores
306 : * the error code, error message, and optional hint in err_code, err_msg, and
307 : * err_hint, respectively. The caller is responsible for freeing err_msg and
308 : * err_hint, which are palloc'd.
309 : */
310 : bool
311 2054 : ReplicationSlotValidateNameInternal(const char *name, bool allow_reserved_name,
312 : int *err_code, char **err_msg, char **err_hint)
313 : {
314 : const char *cp;
315 :
316 2054 : if (strlen(name) == 0)
317 : {
318 6 : *err_code = ERRCODE_INVALID_NAME;
319 6 : *err_msg = psprintf(_("replication slot name \"%s\" is too short"), name);
320 6 : *err_hint = NULL;
321 6 : return false;
322 : }
323 :
324 2048 : if (strlen(name) >= NAMEDATALEN)
325 : {
326 0 : *err_code = ERRCODE_NAME_TOO_LONG;
327 0 : *err_msg = psprintf(_("replication slot name \"%s\" is too long"), name);
328 0 : *err_hint = NULL;
329 0 : return false;
330 : }
331 :
332 39292 : for (cp = name; *cp; cp++)
333 : {
334 37248 : if (!((*cp >= 'a' && *cp <= 'z')
335 17438 : || (*cp >= '0' && *cp <= '9')
336 3646 : || (*cp == '_')))
337 : {
338 4 : *err_code = ERRCODE_INVALID_NAME;
339 4 : *err_msg = psprintf(_("replication slot name \"%s\" contains invalid character"), name);
340 4 : *err_hint = psprintf(_("Replication slot names may only contain lower case letters, numbers, and the underscore character."));
341 4 : return false;
342 : }
343 : }
344 :
345 2044 : if (!allow_reserved_name && IsSlotForConflictCheck(name))
346 : {
347 2 : *err_code = ERRCODE_RESERVED_NAME;
348 2 : *err_msg = psprintf(_("replication slot name \"%s\" is reserved"), name);
349 2 : *err_hint = psprintf(_("The name \"%s\" is reserved for the conflict detection slot."),
350 : CONFLICT_DETECTION_SLOT);
351 2 : return false;
352 : }
353 :
354 2042 : return true;
355 : }
356 :
357 : /*
358 : * Return true if the replication slot name is "pg_conflict_detection".
359 : */
360 : static bool
361 4426 : IsSlotForConflictCheck(const char *name)
362 : {
363 4426 : return (strcmp(name, CONFLICT_DETECTION_SLOT) == 0);
364 : }
365 :
366 : /*
367 : * Create a new replication slot and mark it as used by this backend.
368 : *
369 : * name: Name of the slot
370 : * db_specific: logical decoding is db specific; if the slot is going to
371 : * be used for that pass true, otherwise false.
372 : * two_phase: Allows decoding of prepared transactions. We allow this option
373 : * to be enabled only at the slot creation time. If we allow this option
374 : * to be changed during decoding then it is quite possible that we skip
375 : * prepare first time because this option was not enabled. Now next time
376 : * during getting changes, if the two_phase option is enabled it can skip
377 : * prepare because by that time start decoding point has been moved. So the
378 : * user will only get commit prepared.
379 : * failover: If enabled, allows the slot to be synced to standbys so
380 : * that logical replication can be resumed after failover.
381 : * synced: True if the slot is synchronized from the primary server.
382 : */
383 : void
384 1352 : ReplicationSlotCreate(const char *name, bool db_specific,
385 : ReplicationSlotPersistency persistency,
386 : bool two_phase, bool failover, bool synced)
387 : {
388 1352 : ReplicationSlot *slot = NULL;
389 : int i;
390 :
391 : Assert(MyReplicationSlot == NULL);
392 :
393 : /*
394 : * The logical launcher or pg_upgrade may create or migrate an internal
395 : * slot, so using a reserved name is allowed in these cases.
396 : */
397 1352 : ReplicationSlotValidateName(name, IsBinaryUpgrade || IsLogicalLauncher(),
398 : ERROR);
399 :
400 1350 : if (failover)
401 : {
402 : /*
403 : * Do not allow users to create the failover enabled slots on the
404 : * standby as we do not support sync to the cascading standby.
405 : *
406 : * However, failover enabled slots can be created during slot
407 : * synchronization because we need to retain the same values as the
408 : * remote slot.
409 : */
410 52 : if (RecoveryInProgress() && !IsSyncingReplicationSlots())
411 0 : ereport(ERROR,
412 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
413 : errmsg("cannot enable failover for a replication slot created on the standby"));
414 :
415 : /*
416 : * Do not allow users to create failover enabled temporary slots,
417 : * because temporary slots will not be synced to the standby.
418 : *
419 : * However, failover enabled temporary slots can be created during
420 : * slot synchronization. See the comments atop slotsync.c for details.
421 : */
422 52 : if (persistency == RS_TEMPORARY && !IsSyncingReplicationSlots())
423 2 : ereport(ERROR,
424 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
425 : errmsg("cannot enable failover for a temporary replication slot"));
426 : }
427 :
428 : /*
429 : * If some other backend ran this code concurrently with us, we'd likely
430 : * both allocate the same slot, and that would be bad. We'd also be at
431 : * risk of missing a name collision. Also, we don't want to try to create
432 : * a new slot while somebody's busy cleaning up an old one, because we
433 : * might both be monkeying with the same directory.
434 : */
435 1348 : LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
436 :
437 : /*
438 : * Check for name collision, and identify an allocatable slot. We need to
439 : * hold ReplicationSlotControlLock in shared mode for this, so that nobody
440 : * else can change the in_use flags while we're looking at them.
441 : */
442 1348 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
443 13022 : for (i = 0; i < max_replication_slots; i++)
444 : {
445 11680 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
446 :
447 11680 : if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
448 6 : ereport(ERROR,
449 : (errcode(ERRCODE_DUPLICATE_OBJECT),
450 : errmsg("replication slot \"%s\" already exists", name)));
451 11674 : if (!s->in_use && slot == NULL)
452 1340 : slot = s;
453 : }
454 1342 : LWLockRelease(ReplicationSlotControlLock);
455 :
456 : /* If all slots are in use, we're out of luck. */
457 1342 : if (slot == NULL)
458 2 : ereport(ERROR,
459 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
460 : errmsg("all replication slots are in use"),
461 : errhint("Free one or increase \"max_replication_slots\".")));
462 :
463 : /*
464 : * Since this slot is not in use, nobody should be looking at any part of
465 : * it other than the in_use field unless they're trying to allocate it.
466 : * And since we hold ReplicationSlotAllocationLock, nobody except us can
467 : * be doing that. So it's safe to initialize the slot.
468 : */
469 : Assert(!slot->in_use);
470 : Assert(slot->active_pid == 0);
471 :
472 : /* first initialize persistent data */
473 1340 : memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData));
474 1340 : namestrcpy(&slot->data.name, name);
475 1340 : slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
476 1340 : slot->data.persistency = persistency;
477 1340 : slot->data.two_phase = two_phase;
478 1340 : slot->data.two_phase_at = InvalidXLogRecPtr;
479 1340 : slot->data.failover = failover;
480 1340 : slot->data.synced = synced;
481 :
482 : /* and then data only present in shared memory */
483 1340 : slot->just_dirtied = false;
484 1340 : slot->dirty = false;
485 1340 : slot->effective_xmin = InvalidTransactionId;
486 1340 : slot->effective_catalog_xmin = InvalidTransactionId;
487 1340 : slot->candidate_catalog_xmin = InvalidTransactionId;
488 1340 : slot->candidate_xmin_lsn = InvalidXLogRecPtr;
489 1340 : slot->candidate_restart_valid = InvalidXLogRecPtr;
490 1340 : slot->candidate_restart_lsn = InvalidXLogRecPtr;
491 1340 : slot->last_saved_confirmed_flush = InvalidXLogRecPtr;
492 1340 : slot->last_saved_restart_lsn = InvalidXLogRecPtr;
493 1340 : slot->inactive_since = 0;
494 1340 : slot->slotsync_skip_reason = SS_SKIP_NONE;
495 :
496 : /*
497 : * Create the slot on disk. We haven't actually marked the slot allocated
498 : * yet, so no special cleanup is required if this errors out.
499 : */
500 1340 : CreateSlotOnDisk(slot);
501 :
502 : /*
503 : * We need to briefly prevent any other backend from iterating over the
504 : * slots while we flip the in_use flag. We also need to set the active
505 : * flag while holding the ControlLock as otherwise a concurrent
506 : * ReplicationSlotAcquire() could acquire the slot as well.
507 : */
508 1340 : LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
509 :
510 1340 : slot->in_use = true;
511 :
512 : /* We can now mark the slot active, and that makes it our slot. */
513 1340 : SpinLockAcquire(&slot->mutex);
514 : Assert(slot->active_pid == 0);
515 1340 : slot->active_pid = MyProcPid;
516 1340 : SpinLockRelease(&slot->mutex);
517 1340 : MyReplicationSlot = slot;
518 :
519 1340 : LWLockRelease(ReplicationSlotControlLock);
520 :
521 : /*
522 : * Create statistics entry for the new logical slot. We don't collect any
523 : * stats for physical slots, so no need to create an entry for the same.
524 : * See ReplicationSlotDropPtr for why we need to do this before releasing
525 : * ReplicationSlotAllocationLock.
526 : */
527 1340 : if (SlotIsLogical(slot))
528 966 : pgstat_create_replslot(slot);
529 :
530 : /*
531 : * Now that the slot has been marked as in_use and active, it's safe to
532 : * let somebody else try to allocate a slot.
533 : */
534 1340 : LWLockRelease(ReplicationSlotAllocationLock);
535 :
536 : /* Let everybody know we've modified this slot */
537 1340 : ConditionVariableBroadcast(&slot->active_cv);
538 1340 : }
539 :
540 : /*
541 : * Search for the named replication slot.
542 : *
543 : * Return the replication slot if found, otherwise NULL.
544 : */
545 : ReplicationSlot *
546 3942 : SearchNamedReplicationSlot(const char *name, bool need_lock)
547 : {
548 : int i;
549 3942 : ReplicationSlot *slot = NULL;
550 :
551 3942 : if (need_lock)
552 1148 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
553 :
554 15070 : for (i = 0; i < max_replication_slots; i++)
555 : {
556 14126 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
557 :
558 14126 : if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
559 : {
560 2998 : slot = s;
561 2998 : break;
562 : }
563 : }
564 :
565 3942 : if (need_lock)
566 1148 : LWLockRelease(ReplicationSlotControlLock);
567 :
568 3942 : return slot;
569 : }
570 :
571 : /*
572 : * Return the index of the replication slot in
573 : * ReplicationSlotCtl->replication_slots.
574 : *
575 : * This is mainly useful to have an efficient key for storing replication slot
576 : * stats.
577 : */
578 : int
579 16966 : ReplicationSlotIndex(ReplicationSlot *slot)
580 : {
581 : Assert(slot >= ReplicationSlotCtl->replication_slots &&
582 : slot < ReplicationSlotCtl->replication_slots + max_replication_slots);
583 :
584 16966 : return slot - ReplicationSlotCtl->replication_slots;
585 : }
586 :
587 : /*
588 : * If the slot at 'index' is unused, return false. Otherwise 'name' is set to
589 : * the slot's name and true is returned.
590 : *
591 : * This likely is only useful for pgstat_replslot.c during shutdown, in other
592 : * cases there are obvious TOCTOU issues.
593 : */
594 : bool
595 204 : ReplicationSlotName(int index, Name name)
596 : {
597 : ReplicationSlot *slot;
598 : bool found;
599 :
600 204 : slot = &ReplicationSlotCtl->replication_slots[index];
601 :
602 : /*
603 : * Ensure that the slot cannot be dropped while we copy the name. Don't
604 : * need the spinlock as the name of an existing slot cannot change.
605 : */
606 204 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
607 204 : found = slot->in_use;
608 204 : if (slot->in_use)
609 204 : namestrcpy(name, NameStr(slot->data.name));
610 204 : LWLockRelease(ReplicationSlotControlLock);
611 :
612 204 : return found;
613 : }
614 :
615 : /*
616 : * Find a previously created slot and mark it as used by this process.
617 : *
618 : * An error is raised if nowait is true and the slot is currently in use. If
619 : * nowait is false, we sleep until the slot is released by the owning process.
620 : *
621 : * An error is raised if error_if_invalid is true and the slot is found to
622 : * be invalid. It should always be set to true, except when we are temporarily
623 : * acquiring the slot and don't intend to change it.
624 : */
625 : void
626 2642 : ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
627 : {
628 : ReplicationSlot *s;
629 : int active_pid;
630 :
631 : Assert(name != NULL);
632 :
633 2642 : retry:
634 : Assert(MyReplicationSlot == NULL);
635 :
636 2642 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
637 :
638 : /* Check if the slot exits with the given name. */
639 2642 : s = SearchNamedReplicationSlot(name, false);
640 2642 : if (s == NULL || !s->in_use)
641 : {
642 18 : LWLockRelease(ReplicationSlotControlLock);
643 :
644 18 : ereport(ERROR,
645 : (errcode(ERRCODE_UNDEFINED_OBJECT),
646 : errmsg("replication slot \"%s\" does not exist",
647 : name)));
648 : }
649 :
650 : /*
651 : * Do not allow users to acquire the reserved slot. This scenario may
652 : * occur if the launcher that owns the slot has terminated unexpectedly
653 : * due to an error, and a backend process attempts to reuse the slot.
654 : */
655 2624 : if (!IsLogicalLauncher() && IsSlotForConflictCheck(name))
656 0 : ereport(ERROR,
657 : errcode(ERRCODE_UNDEFINED_OBJECT),
658 : errmsg("cannot acquire replication slot \"%s\"", name),
659 : errdetail("The slot is reserved for conflict detection and can only be acquired by logical replication launcher."));
660 :
661 : /*
662 : * This is the slot we want; check if it's active under some other
663 : * process. In single user mode, we don't need this check.
664 : */
665 2624 : if (IsUnderPostmaster)
666 : {
667 : /*
668 : * Get ready to sleep on the slot in case it is active. (We may end
669 : * up not sleeping, but we don't want to do this while holding the
670 : * spinlock.)
671 : */
672 2614 : if (!nowait)
673 552 : ConditionVariablePrepareToSleep(&s->active_cv);
674 :
675 : /*
676 : * It is important to reset the inactive_since under spinlock here to
677 : * avoid race conditions with slot invalidation. See comments related
678 : * to inactive_since in InvalidatePossiblyObsoleteSlot.
679 : */
680 2614 : SpinLockAcquire(&s->mutex);
681 2614 : if (s->active_pid == 0)
682 2328 : s->active_pid = MyProcPid;
683 2614 : active_pid = s->active_pid;
684 2614 : ReplicationSlotSetInactiveSince(s, 0, false);
685 2614 : SpinLockRelease(&s->mutex);
686 : }
687 : else
688 : {
689 10 : s->active_pid = active_pid = MyProcPid;
690 10 : ReplicationSlotSetInactiveSince(s, 0, true);
691 : }
692 2624 : LWLockRelease(ReplicationSlotControlLock);
693 :
694 : /*
695 : * If we found the slot but it's already active in another process, we
696 : * wait until the owning process signals us that it's been released, or
697 : * error out.
698 : */
699 2624 : if (active_pid != MyProcPid)
700 : {
701 0 : if (!nowait)
702 : {
703 : /* Wait here until we get signaled, and then restart */
704 0 : ConditionVariableSleep(&s->active_cv,
705 : WAIT_EVENT_REPLICATION_SLOT_DROP);
706 0 : ConditionVariableCancelSleep();
707 0 : goto retry;
708 : }
709 :
710 0 : ereport(ERROR,
711 : (errcode(ERRCODE_OBJECT_IN_USE),
712 : errmsg("replication slot \"%s\" is active for PID %d",
713 : NameStr(s->data.name), active_pid)));
714 : }
715 2624 : else if (!nowait)
716 552 : ConditionVariableCancelSleep(); /* no sleep needed after all */
717 :
718 : /* We made this slot active, so it's ours now. */
719 2624 : MyReplicationSlot = s;
720 :
721 : /*
722 : * We need to check for invalidation after making the slot ours to avoid
723 : * the possible race condition with the checkpointer that can otherwise
724 : * invalidate the slot immediately after the check.
725 : */
726 2624 : if (error_if_invalid && s->data.invalidated != RS_INVAL_NONE)
727 14 : ereport(ERROR,
728 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
729 : errmsg("can no longer access replication slot \"%s\"",
730 : NameStr(s->data.name)),
731 : errdetail("This replication slot has been invalidated due to \"%s\".",
732 : GetSlotInvalidationCauseName(s->data.invalidated)));
733 :
734 : /* Let everybody know we've modified this slot */
735 2610 : ConditionVariableBroadcast(&s->active_cv);
736 :
737 : /*
738 : * The call to pgstat_acquire_replslot() protects against stats for a
739 : * different slot, from before a restart or such, being present during
740 : * pgstat_report_replslot().
741 : */
742 2610 : if (SlotIsLogical(s))
743 2188 : pgstat_acquire_replslot(s);
744 :
745 :
746 2610 : if (am_walsender)
747 : {
748 1766 : ereport(log_replication_commands ? LOG : DEBUG1,
749 : SlotIsLogical(s)
750 : ? errmsg("acquired logical replication slot \"%s\"",
751 : NameStr(s->data.name))
752 : : errmsg("acquired physical replication slot \"%s\"",
753 : NameStr(s->data.name)));
754 : }
755 2610 : }
756 :
757 : /*
758 : * Release the replication slot that this backend considers to own.
759 : *
760 : * This or another backend can re-acquire the slot later.
761 : * Resources this slot requires will be preserved.
762 : */
763 : void
764 3162 : ReplicationSlotRelease(void)
765 : {
766 3162 : ReplicationSlot *slot = MyReplicationSlot;
767 3162 : char *slotname = NULL; /* keep compiler quiet */
768 : bool is_logical;
769 3162 : TimestampTz now = 0;
770 :
771 : Assert(slot != NULL && slot->active_pid != 0);
772 :
773 3162 : is_logical = SlotIsLogical(slot);
774 :
775 3162 : if (am_walsender)
776 2196 : slotname = pstrdup(NameStr(slot->data.name));
777 :
778 3162 : if (slot->data.persistency == RS_EPHEMERAL)
779 : {
780 : /*
781 : * Delete the slot. There is no !PANIC case where this is allowed to
782 : * fail, all that may happen is an incomplete cleanup of the on-disk
783 : * data.
784 : */
785 12 : ReplicationSlotDropAcquired();
786 :
787 : /*
788 : * Request to disable logical decoding, even though this slot may not
789 : * have been the last logical slot. The checkpointer will verify if
790 : * logical decoding should actually be disabled.
791 : */
792 12 : if (is_logical)
793 12 : RequestDisableLogicalDecoding();
794 : }
795 :
796 : /*
797 : * If slot needed to temporarily restrain both data and catalog xmin to
798 : * create the catalog snapshot, remove that temporary constraint.
799 : * Snapshots can only be exported while the initial snapshot is still
800 : * acquired.
801 : */
802 3162 : if (!TransactionIdIsValid(slot->data.xmin) &&
803 3102 : TransactionIdIsValid(slot->effective_xmin))
804 : {
805 402 : SpinLockAcquire(&slot->mutex);
806 402 : slot->effective_xmin = InvalidTransactionId;
807 402 : SpinLockRelease(&slot->mutex);
808 402 : ReplicationSlotsComputeRequiredXmin(false);
809 : }
810 :
811 : /*
812 : * Set the time since the slot has become inactive. We get the current
813 : * time beforehand to avoid system call while holding the spinlock.
814 : */
815 3162 : now = GetCurrentTimestamp();
816 :
817 3162 : if (slot->data.persistency == RS_PERSISTENT)
818 : {
819 : /*
820 : * Mark persistent slot inactive. We're not freeing it, just
821 : * disconnecting, but wake up others that may be waiting for it.
822 : */
823 2572 : SpinLockAcquire(&slot->mutex);
824 2572 : slot->active_pid = 0;
825 2572 : ReplicationSlotSetInactiveSince(slot, now, false);
826 2572 : SpinLockRelease(&slot->mutex);
827 2572 : ConditionVariableBroadcast(&slot->active_cv);
828 : }
829 : else
830 590 : ReplicationSlotSetInactiveSince(slot, now, true);
831 :
832 3162 : MyReplicationSlot = NULL;
833 :
834 : /* might not have been set when we've been a plain slot */
835 3162 : LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
836 3162 : MyProc->statusFlags &= ~PROC_IN_LOGICAL_DECODING;
837 3162 : ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags;
838 3162 : LWLockRelease(ProcArrayLock);
839 :
840 3162 : if (am_walsender)
841 : {
842 2196 : ereport(log_replication_commands ? LOG : DEBUG1,
843 : is_logical
844 : ? errmsg("released logical replication slot \"%s\"",
845 : slotname)
846 : : errmsg("released physical replication slot \"%s\"",
847 : slotname));
848 :
849 2196 : pfree(slotname);
850 : }
851 3162 : }
852 :
853 : /*
854 : * Cleanup temporary slots created in current session.
855 : *
856 : * Cleanup only synced temporary slots if 'synced_only' is true, else
857 : * cleanup all temporary slots.
858 : *
859 : * If it drops the last logical slot in the cluster, requests to disable
860 : * logical decoding.
861 : */
862 : void
863 89978 : ReplicationSlotCleanup(bool synced_only)
864 : {
865 : int i;
866 : bool found_valid_logicalslot;
867 89978 : bool dropped_logical = false;
868 :
869 : Assert(MyReplicationSlot == NULL);
870 :
871 90270 : restart:
872 90270 : found_valid_logicalslot = false;
873 90270 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
874 978384 : for (i = 0; i < max_replication_slots; i++)
875 : {
876 888406 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
877 :
878 888406 : if (!s->in_use)
879 859222 : continue;
880 :
881 29184 : SpinLockAcquire(&s->mutex);
882 :
883 58368 : found_valid_logicalslot |=
884 29184 : (SlotIsLogical(s) && s->data.invalidated == RS_INVAL_NONE);
885 :
886 29184 : if ((s->active_pid == MyProcPid &&
887 292 : (!synced_only || s->data.synced)))
888 : {
889 : Assert(s->data.persistency == RS_TEMPORARY);
890 292 : SpinLockRelease(&s->mutex);
891 292 : LWLockRelease(ReplicationSlotControlLock); /* avoid deadlock */
892 :
893 292 : if (SlotIsLogical(s))
894 18 : dropped_logical = true;
895 :
896 292 : ReplicationSlotDropPtr(s);
897 :
898 292 : ConditionVariableBroadcast(&s->active_cv);
899 292 : goto restart;
900 : }
901 : else
902 28892 : SpinLockRelease(&s->mutex);
903 : }
904 :
905 89978 : LWLockRelease(ReplicationSlotControlLock);
906 :
907 89978 : if (dropped_logical && !found_valid_logicalslot)
908 4 : RequestDisableLogicalDecoding();
909 89978 : }
910 :
911 : /*
912 : * Permanently drop replication slot identified by the passed in name.
913 : */
914 : void
915 844 : ReplicationSlotDrop(const char *name, bool nowait)
916 : {
917 : bool is_logical;
918 :
919 : Assert(MyReplicationSlot == NULL);
920 :
921 844 : ReplicationSlotAcquire(name, nowait, false);
922 :
923 : /*
924 : * Do not allow users to drop the slots which are currently being synced
925 : * from the primary to the standby.
926 : */
927 830 : if (RecoveryInProgress() && MyReplicationSlot->data.synced)
928 2 : ereport(ERROR,
929 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
930 : errmsg("cannot drop replication slot \"%s\"", name),
931 : errdetail("This replication slot is being synchronized from the primary server."));
932 :
933 828 : is_logical = SlotIsLogical(MyReplicationSlot);
934 :
935 828 : ReplicationSlotDropAcquired();
936 :
937 828 : if (is_logical)
938 788 : RequestDisableLogicalDecoding();
939 828 : }
940 :
941 : /*
942 : * Change the definition of the slot identified by the specified name.
943 : */
944 : void
945 14 : ReplicationSlotAlter(const char *name, const bool *failover,
946 : const bool *two_phase)
947 : {
948 14 : bool update_slot = false;
949 :
950 : Assert(MyReplicationSlot == NULL);
951 : Assert(failover || two_phase);
952 :
953 14 : ReplicationSlotAcquire(name, false, true);
954 :
955 12 : if (SlotIsPhysical(MyReplicationSlot))
956 0 : ereport(ERROR,
957 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
958 : errmsg("cannot use %s with a physical replication slot",
959 : "ALTER_REPLICATION_SLOT"));
960 :
961 12 : if (RecoveryInProgress())
962 : {
963 : /*
964 : * Do not allow users to alter the slots which are currently being
965 : * synced from the primary to the standby.
966 : */
967 2 : if (MyReplicationSlot->data.synced)
968 2 : ereport(ERROR,
969 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
970 : errmsg("cannot alter replication slot \"%s\"", name),
971 : errdetail("This replication slot is being synchronized from the primary server."));
972 :
973 : /*
974 : * Do not allow users to enable failover on the standby as we do not
975 : * support sync to the cascading standby.
976 : */
977 0 : if (failover && *failover)
978 0 : ereport(ERROR,
979 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
980 : errmsg("cannot enable failover for a replication slot"
981 : " on the standby"));
982 : }
983 :
984 10 : if (failover)
985 : {
986 : /*
987 : * Do not allow users to enable failover for temporary slots as we do
988 : * not support syncing temporary slots to the standby.
989 : */
990 8 : if (*failover && MyReplicationSlot->data.persistency == RS_TEMPORARY)
991 0 : ereport(ERROR,
992 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
993 : errmsg("cannot enable failover for a temporary replication slot"));
994 :
995 8 : if (MyReplicationSlot->data.failover != *failover)
996 : {
997 8 : SpinLockAcquire(&MyReplicationSlot->mutex);
998 8 : MyReplicationSlot->data.failover = *failover;
999 8 : SpinLockRelease(&MyReplicationSlot->mutex);
1000 :
1001 8 : update_slot = true;
1002 : }
1003 : }
1004 :
1005 10 : if (two_phase && MyReplicationSlot->data.two_phase != *two_phase)
1006 : {
1007 2 : SpinLockAcquire(&MyReplicationSlot->mutex);
1008 2 : MyReplicationSlot->data.two_phase = *two_phase;
1009 2 : SpinLockRelease(&MyReplicationSlot->mutex);
1010 :
1011 2 : update_slot = true;
1012 : }
1013 :
1014 10 : if (update_slot)
1015 : {
1016 10 : ReplicationSlotMarkDirty();
1017 10 : ReplicationSlotSave();
1018 : }
1019 :
1020 10 : ReplicationSlotRelease();
1021 10 : }
1022 :
1023 : /*
1024 : * Permanently drop the currently acquired replication slot.
1025 : */
1026 : void
1027 856 : ReplicationSlotDropAcquired(void)
1028 : {
1029 856 : ReplicationSlot *slot = MyReplicationSlot;
1030 :
1031 : Assert(MyReplicationSlot != NULL);
1032 :
1033 : /* slot isn't acquired anymore */
1034 856 : MyReplicationSlot = NULL;
1035 :
1036 856 : ReplicationSlotDropPtr(slot);
1037 856 : }
1038 :
1039 : /*
1040 : * Permanently drop the replication slot which will be released by the point
1041 : * this function returns.
1042 : */
1043 : static void
1044 1148 : ReplicationSlotDropPtr(ReplicationSlot *slot)
1045 : {
1046 : char path[MAXPGPATH];
1047 : char tmppath[MAXPGPATH];
1048 :
1049 : /*
1050 : * If some other backend ran this code concurrently with us, we might try
1051 : * to delete a slot with a certain name while someone else was trying to
1052 : * create a slot with the same name.
1053 : */
1054 1148 : LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
1055 :
1056 : /* Generate pathnames. */
1057 1148 : sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(slot->data.name));
1058 1148 : sprintf(tmppath, "%s/%s.tmp", PG_REPLSLOT_DIR, NameStr(slot->data.name));
1059 :
1060 : /*
1061 : * Rename the slot directory on disk, so that we'll no longer recognize
1062 : * this as a valid slot. Note that if this fails, we've got to mark the
1063 : * slot inactive before bailing out. If we're dropping an ephemeral or a
1064 : * temporary slot, we better never fail hard as the caller won't expect
1065 : * the slot to survive and this might get called during error handling.
1066 : */
1067 1148 : if (rename(path, tmppath) == 0)
1068 : {
1069 : /*
1070 : * We need to fsync() the directory we just renamed and its parent to
1071 : * make sure that our changes are on disk in a crash-safe fashion. If
1072 : * fsync() fails, we can't be sure whether the changes are on disk or
1073 : * not. For now, we handle that by panicking;
1074 : * StartupReplicationSlots() will try to straighten it out after
1075 : * restart.
1076 : */
1077 1148 : START_CRIT_SECTION();
1078 1148 : fsync_fname(tmppath, true);
1079 1148 : fsync_fname(PG_REPLSLOT_DIR, true);
1080 1148 : END_CRIT_SECTION();
1081 : }
1082 : else
1083 : {
1084 0 : bool fail_softly = slot->data.persistency != RS_PERSISTENT;
1085 :
1086 0 : SpinLockAcquire(&slot->mutex);
1087 0 : slot->active_pid = 0;
1088 0 : SpinLockRelease(&slot->mutex);
1089 :
1090 : /* wake up anyone waiting on this slot */
1091 0 : ConditionVariableBroadcast(&slot->active_cv);
1092 :
1093 0 : ereport(fail_softly ? WARNING : ERROR,
1094 : (errcode_for_file_access(),
1095 : errmsg("could not rename file \"%s\" to \"%s\": %m",
1096 : path, tmppath)));
1097 : }
1098 :
1099 : /*
1100 : * The slot is definitely gone. Lock out concurrent scans of the array
1101 : * long enough to kill it. It's OK to clear the active PID here without
1102 : * grabbing the mutex because nobody else can be scanning the array here,
1103 : * and nobody can be attached to this slot and thus access it without
1104 : * scanning the array.
1105 : *
1106 : * Also wake up processes waiting for it.
1107 : */
1108 1148 : LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
1109 1148 : slot->active_pid = 0;
1110 1148 : slot->in_use = false;
1111 1148 : LWLockRelease(ReplicationSlotControlLock);
1112 1148 : ConditionVariableBroadcast(&slot->active_cv);
1113 :
1114 : /*
1115 : * Slot is dead and doesn't prevent resource removal anymore, recompute
1116 : * limits.
1117 : */
1118 1148 : ReplicationSlotsComputeRequiredXmin(false);
1119 1148 : ReplicationSlotsComputeRequiredLSN();
1120 :
1121 : /*
1122 : * If removing the directory fails, the worst thing that will happen is
1123 : * that the user won't be able to create a new slot with the same name
1124 : * until the next server restart. We warn about it, but that's all.
1125 : */
1126 1148 : if (!rmtree(tmppath, true))
1127 0 : ereport(WARNING,
1128 : (errmsg("could not remove directory \"%s\"", tmppath)));
1129 :
1130 : /*
1131 : * Drop the statistics entry for the replication slot. Do this while
1132 : * holding ReplicationSlotAllocationLock so that we don't drop a
1133 : * statistics entry for another slot with the same name just created in
1134 : * another session.
1135 : */
1136 1148 : if (SlotIsLogical(slot))
1137 832 : pgstat_drop_replslot(slot);
1138 :
1139 : /*
1140 : * We release this at the very end, so that nobody starts trying to create
1141 : * a slot while we're still cleaning up the detritus of the old one.
1142 : */
1143 1148 : LWLockRelease(ReplicationSlotAllocationLock);
1144 1148 : }
1145 :
1146 : /*
1147 : * Serialize the currently acquired slot's state from memory to disk, thereby
1148 : * guaranteeing the current state will survive a crash.
1149 : */
1150 : void
1151 2784 : ReplicationSlotSave(void)
1152 : {
1153 : char path[MAXPGPATH];
1154 :
1155 : Assert(MyReplicationSlot != NULL);
1156 :
1157 2784 : sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(MyReplicationSlot->data.name));
1158 2784 : SaveSlotToPath(MyReplicationSlot, path, ERROR);
1159 2784 : }
1160 :
1161 : /*
1162 : * Signal that it would be useful if the currently acquired slot would be
1163 : * flushed out to disk.
1164 : *
1165 : * Note that the actual flush to disk can be delayed for a long time, if
1166 : * required for correctness explicitly do a ReplicationSlotSave().
1167 : */
1168 : void
1169 60290 : ReplicationSlotMarkDirty(void)
1170 : {
1171 60290 : ReplicationSlot *slot = MyReplicationSlot;
1172 :
1173 : Assert(MyReplicationSlot != NULL);
1174 :
1175 60290 : SpinLockAcquire(&slot->mutex);
1176 60290 : MyReplicationSlot->just_dirtied = true;
1177 60290 : MyReplicationSlot->dirty = true;
1178 60290 : SpinLockRelease(&slot->mutex);
1179 60290 : }
1180 :
1181 : /*
1182 : * Convert a slot that's marked as RS_EPHEMERAL or RS_TEMPORARY to a
1183 : * RS_PERSISTENT slot, guaranteeing it will be there after an eventual crash.
1184 : */
1185 : void
1186 936 : ReplicationSlotPersist(void)
1187 : {
1188 936 : ReplicationSlot *slot = MyReplicationSlot;
1189 :
1190 : Assert(slot != NULL);
1191 : Assert(slot->data.persistency != RS_PERSISTENT);
1192 :
1193 936 : SpinLockAcquire(&slot->mutex);
1194 936 : slot->data.persistency = RS_PERSISTENT;
1195 936 : SpinLockRelease(&slot->mutex);
1196 :
1197 936 : ReplicationSlotMarkDirty();
1198 936 : ReplicationSlotSave();
1199 936 : }
1200 :
1201 : /*
1202 : * Compute the oldest xmin across all slots and store it in the ProcArray.
1203 : *
1204 : * If already_locked is true, ProcArrayLock has already been acquired
1205 : * exclusively.
1206 : */
1207 : void
1208 4882 : ReplicationSlotsComputeRequiredXmin(bool already_locked)
1209 : {
1210 : int i;
1211 4882 : TransactionId agg_xmin = InvalidTransactionId;
1212 4882 : TransactionId agg_catalog_xmin = InvalidTransactionId;
1213 :
1214 : Assert(ReplicationSlotCtl != NULL);
1215 :
1216 4882 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1217 :
1218 49594 : for (i = 0; i < max_replication_slots; i++)
1219 : {
1220 44712 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
1221 : TransactionId effective_xmin;
1222 : TransactionId effective_catalog_xmin;
1223 : bool invalidated;
1224 :
1225 44712 : if (!s->in_use)
1226 40092 : continue;
1227 :
1228 4620 : SpinLockAcquire(&s->mutex);
1229 4620 : effective_xmin = s->effective_xmin;
1230 4620 : effective_catalog_xmin = s->effective_catalog_xmin;
1231 4620 : invalidated = s->data.invalidated != RS_INVAL_NONE;
1232 4620 : SpinLockRelease(&s->mutex);
1233 :
1234 : /* invalidated slots need not apply */
1235 4620 : if (invalidated)
1236 52 : continue;
1237 :
1238 : /* check the data xmin */
1239 4568 : if (TransactionIdIsValid(effective_xmin) &&
1240 30 : (!TransactionIdIsValid(agg_xmin) ||
1241 30 : TransactionIdPrecedes(effective_xmin, agg_xmin)))
1242 710 : agg_xmin = effective_xmin;
1243 :
1244 : /* check the catalog xmin */
1245 4568 : if (TransactionIdIsValid(effective_catalog_xmin) &&
1246 1868 : (!TransactionIdIsValid(agg_catalog_xmin) ||
1247 1868 : TransactionIdPrecedes(effective_catalog_xmin, agg_catalog_xmin)))
1248 2422 : agg_catalog_xmin = effective_catalog_xmin;
1249 : }
1250 :
1251 4882 : LWLockRelease(ReplicationSlotControlLock);
1252 :
1253 4882 : ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
1254 4882 : }
1255 :
1256 : /*
1257 : * Compute the oldest restart LSN across all slots and inform xlog module.
1258 : *
1259 : * Note: while max_slot_wal_keep_size is theoretically relevant for this
1260 : * purpose, we don't try to account for that, because this module doesn't
1261 : * know what to compare against.
1262 : */
1263 : void
1264 61704 : ReplicationSlotsComputeRequiredLSN(void)
1265 : {
1266 : int i;
1267 61704 : XLogRecPtr min_required = InvalidXLogRecPtr;
1268 :
1269 : Assert(ReplicationSlotCtl != NULL);
1270 :
1271 61704 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1272 669996 : for (i = 0; i < max_replication_slots; i++)
1273 : {
1274 608292 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
1275 : XLogRecPtr restart_lsn;
1276 : XLogRecPtr last_saved_restart_lsn;
1277 : bool invalidated;
1278 : ReplicationSlotPersistency persistency;
1279 :
1280 608292 : if (!s->in_use)
1281 546282 : continue;
1282 :
1283 62010 : SpinLockAcquire(&s->mutex);
1284 62010 : persistency = s->data.persistency;
1285 62010 : restart_lsn = s->data.restart_lsn;
1286 62010 : invalidated = s->data.invalidated != RS_INVAL_NONE;
1287 62010 : last_saved_restart_lsn = s->last_saved_restart_lsn;
1288 62010 : SpinLockRelease(&s->mutex);
1289 :
1290 : /* invalidated slots need not apply */
1291 62010 : if (invalidated)
1292 54 : continue;
1293 :
1294 : /*
1295 : * For persistent slot use last_saved_restart_lsn to compute the
1296 : * oldest LSN for removal of WAL segments. The segments between
1297 : * last_saved_restart_lsn and restart_lsn might be needed by a
1298 : * persistent slot in the case of database crash. Non-persistent
1299 : * slots can't survive the database crash, so we don't care about
1300 : * last_saved_restart_lsn for them.
1301 : */
1302 61956 : if (persistency == RS_PERSISTENT)
1303 : {
1304 60280 : if (XLogRecPtrIsValid(last_saved_restart_lsn) &&
1305 : restart_lsn > last_saved_restart_lsn)
1306 : {
1307 55740 : restart_lsn = last_saved_restart_lsn;
1308 : }
1309 : }
1310 :
1311 61956 : if (XLogRecPtrIsValid(restart_lsn) &&
1312 2382 : (!XLogRecPtrIsValid(min_required) ||
1313 : restart_lsn < min_required))
1314 59742 : min_required = restart_lsn;
1315 : }
1316 61704 : LWLockRelease(ReplicationSlotControlLock);
1317 :
1318 61704 : XLogSetReplicationSlotMinimumLSN(min_required);
1319 61704 : }
1320 :
1321 : /*
1322 : * Compute the oldest WAL LSN required by *logical* decoding slots..
1323 : *
1324 : * Returns InvalidXLogRecPtr if logical decoding is disabled or no logical
1325 : * slots exist.
1326 : *
1327 : * NB: this returns a value >= ReplicationSlotsComputeRequiredLSN(), since it
1328 : * ignores physical replication slots.
1329 : *
1330 : * The results aren't required frequently, so we don't maintain a precomputed
1331 : * value like we do for ComputeRequiredLSN() and ComputeRequiredXmin().
1332 : */
1333 : XLogRecPtr
1334 7112 : ReplicationSlotsComputeLogicalRestartLSN(void)
1335 : {
1336 7112 : XLogRecPtr result = InvalidXLogRecPtr;
1337 : int i;
1338 :
1339 7112 : if (max_replication_slots <= 0)
1340 4 : return InvalidXLogRecPtr;
1341 :
1342 7108 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1343 :
1344 77096 : for (i = 0; i < max_replication_slots; i++)
1345 : {
1346 : ReplicationSlot *s;
1347 : XLogRecPtr restart_lsn;
1348 : XLogRecPtr last_saved_restart_lsn;
1349 : bool invalidated;
1350 : ReplicationSlotPersistency persistency;
1351 :
1352 69988 : s = &ReplicationSlotCtl->replication_slots[i];
1353 :
1354 : /* cannot change while ReplicationSlotCtlLock is held */
1355 69988 : if (!s->in_use)
1356 68424 : continue;
1357 :
1358 : /* we're only interested in logical slots */
1359 1564 : if (!SlotIsLogical(s))
1360 1088 : continue;
1361 :
1362 : /* read once, it's ok if it increases while we're checking */
1363 476 : SpinLockAcquire(&s->mutex);
1364 476 : persistency = s->data.persistency;
1365 476 : restart_lsn = s->data.restart_lsn;
1366 476 : invalidated = s->data.invalidated != RS_INVAL_NONE;
1367 476 : last_saved_restart_lsn = s->last_saved_restart_lsn;
1368 476 : SpinLockRelease(&s->mutex);
1369 :
1370 : /* invalidated slots need not apply */
1371 476 : if (invalidated)
1372 20 : continue;
1373 :
1374 : /*
1375 : * For persistent slot use last_saved_restart_lsn to compute the
1376 : * oldest LSN for removal of WAL segments. The segments between
1377 : * last_saved_restart_lsn and restart_lsn might be needed by a
1378 : * persistent slot in the case of database crash. Non-persistent
1379 : * slots can't survive the database crash, so we don't care about
1380 : * last_saved_restart_lsn for them.
1381 : */
1382 456 : if (persistency == RS_PERSISTENT)
1383 : {
1384 452 : if (XLogRecPtrIsValid(last_saved_restart_lsn) &&
1385 : restart_lsn > last_saved_restart_lsn)
1386 : {
1387 0 : restart_lsn = last_saved_restart_lsn;
1388 : }
1389 : }
1390 :
1391 456 : if (!XLogRecPtrIsValid(restart_lsn))
1392 0 : continue;
1393 :
1394 456 : if (!XLogRecPtrIsValid(result) ||
1395 : restart_lsn < result)
1396 360 : result = restart_lsn;
1397 : }
1398 :
1399 7108 : LWLockRelease(ReplicationSlotControlLock);
1400 :
1401 7108 : return result;
1402 : }
1403 :
1404 : /*
1405 : * ReplicationSlotsCountDBSlots -- count the number of slots that refer to the
1406 : * passed database oid.
1407 : *
1408 : * Returns true if there are any slots referencing the database. *nslots will
1409 : * be set to the absolute number of slots in the database, *nactive to ones
1410 : * currently active.
1411 : */
1412 : bool
1413 96 : ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
1414 : {
1415 : int i;
1416 :
1417 96 : *nslots = *nactive = 0;
1418 :
1419 96 : if (max_replication_slots <= 0)
1420 0 : return false;
1421 :
1422 96 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1423 998 : for (i = 0; i < max_replication_slots; i++)
1424 : {
1425 : ReplicationSlot *s;
1426 :
1427 902 : s = &ReplicationSlotCtl->replication_slots[i];
1428 :
1429 : /* cannot change while ReplicationSlotCtlLock is held */
1430 902 : if (!s->in_use)
1431 860 : continue;
1432 :
1433 : /* only logical slots are database specific, skip */
1434 42 : if (!SlotIsLogical(s))
1435 20 : continue;
1436 :
1437 : /* not our database, skip */
1438 22 : if (s->data.database != dboid)
1439 16 : continue;
1440 :
1441 : /* NB: intentionally counting invalidated slots */
1442 :
1443 : /* count slots with spinlock held */
1444 6 : SpinLockAcquire(&s->mutex);
1445 6 : (*nslots)++;
1446 6 : if (s->active_pid != 0)
1447 2 : (*nactive)++;
1448 6 : SpinLockRelease(&s->mutex);
1449 : }
1450 96 : LWLockRelease(ReplicationSlotControlLock);
1451 :
1452 96 : if (*nslots > 0)
1453 6 : return true;
1454 90 : return false;
1455 : }
1456 :
1457 : /*
1458 : * ReplicationSlotsDropDBSlots -- Drop all db-specific slots relating to the
1459 : * passed database oid. The caller should hold an exclusive lock on the
1460 : * pg_database oid for the database to prevent creation of new slots on the db
1461 : * or replay from existing slots.
1462 : *
1463 : * Another session that concurrently acquires an existing slot on the target DB
1464 : * (most likely to drop it) may cause this function to ERROR. If that happens
1465 : * it may have dropped some but not all slots.
1466 : *
1467 : * This routine isn't as efficient as it could be - but we don't drop
1468 : * databases often, especially databases with lots of slots.
1469 : *
1470 : * If it drops the last logical slot in the cluster, it requests to disable
1471 : * logical decoding.
1472 : */
1473 : void
1474 122 : ReplicationSlotsDropDBSlots(Oid dboid)
1475 : {
1476 : int i;
1477 : bool found_valid_logicalslot;
1478 122 : bool dropped = false;
1479 :
1480 122 : if (max_replication_slots <= 0)
1481 0 : return;
1482 :
1483 122 : restart:
1484 132 : found_valid_logicalslot = false;
1485 132 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1486 1278 : for (i = 0; i < max_replication_slots; i++)
1487 : {
1488 : ReplicationSlot *s;
1489 : char *slotname;
1490 : int active_pid;
1491 :
1492 1156 : s = &ReplicationSlotCtl->replication_slots[i];
1493 :
1494 : /* cannot change while ReplicationSlotCtlLock is held */
1495 1156 : if (!s->in_use)
1496 1096 : continue;
1497 :
1498 : /* only logical slots are database specific, skip */
1499 60 : if (!SlotIsLogical(s))
1500 22 : continue;
1501 :
1502 : /*
1503 : * Check logical slots on other databases too so we can disable
1504 : * logical decoding only if no slots in the cluster.
1505 : */
1506 38 : SpinLockAcquire(&s->mutex);
1507 38 : found_valid_logicalslot |= (s->data.invalidated == RS_INVAL_NONE);
1508 38 : SpinLockRelease(&s->mutex);
1509 :
1510 : /* not our database, skip */
1511 38 : if (s->data.database != dboid)
1512 28 : continue;
1513 :
1514 : /* NB: intentionally including invalidated slots to drop */
1515 :
1516 : /* acquire slot, so ReplicationSlotDropAcquired can be reused */
1517 10 : SpinLockAcquire(&s->mutex);
1518 : /* can't change while ReplicationSlotControlLock is held */
1519 10 : slotname = NameStr(s->data.name);
1520 10 : active_pid = s->active_pid;
1521 10 : if (active_pid == 0)
1522 : {
1523 10 : MyReplicationSlot = s;
1524 10 : s->active_pid = MyProcPid;
1525 : }
1526 10 : SpinLockRelease(&s->mutex);
1527 :
1528 : /*
1529 : * Even though we hold an exclusive lock on the database object a
1530 : * logical slot for that DB can still be active, e.g. if it's
1531 : * concurrently being dropped by a backend connected to another DB.
1532 : *
1533 : * That's fairly unlikely in practice, so we'll just bail out.
1534 : *
1535 : * The slot sync worker holds a shared lock on the database before
1536 : * operating on synced logical slots to avoid conflict with the drop
1537 : * happening here. The persistent synced slots are thus safe but there
1538 : * is a possibility that the slot sync worker has created a temporary
1539 : * slot (which stays active even on release) and we are trying to drop
1540 : * that here. In practice, the chances of hitting this scenario are
1541 : * less as during slot synchronization, the temporary slot is
1542 : * immediately converted to persistent and thus is safe due to the
1543 : * shared lock taken on the database. So, we'll just bail out in such
1544 : * a case.
1545 : *
1546 : * XXX: We can consider shutting down the slot sync worker before
1547 : * trying to drop synced temporary slots here.
1548 : */
1549 10 : if (active_pid)
1550 0 : ereport(ERROR,
1551 : (errcode(ERRCODE_OBJECT_IN_USE),
1552 : errmsg("replication slot \"%s\" is active for PID %d",
1553 : slotname, active_pid)));
1554 :
1555 : /*
1556 : * To avoid duplicating ReplicationSlotDropAcquired() and to avoid
1557 : * holding ReplicationSlotControlLock over filesystem operations,
1558 : * release ReplicationSlotControlLock and use
1559 : * ReplicationSlotDropAcquired.
1560 : *
1561 : * As that means the set of slots could change, restart scan from the
1562 : * beginning each time we release the lock.
1563 : */
1564 10 : LWLockRelease(ReplicationSlotControlLock);
1565 10 : ReplicationSlotDropAcquired();
1566 10 : dropped = true;
1567 10 : goto restart;
1568 : }
1569 122 : LWLockRelease(ReplicationSlotControlLock);
1570 :
1571 122 : if (dropped && !found_valid_logicalslot)
1572 0 : RequestDisableLogicalDecoding();
1573 : }
1574 :
1575 : /*
1576 : * Returns true if there is at least one in-use valid logical replication slot.
1577 : */
1578 : bool
1579 930 : CheckLogicalSlotExists(void)
1580 : {
1581 930 : bool found = false;
1582 :
1583 930 : if (max_replication_slots <= 0)
1584 2 : return false;
1585 :
1586 928 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1587 10040 : for (int i = 0; i < max_replication_slots; i++)
1588 : {
1589 : ReplicationSlot *s;
1590 : bool invalidated;
1591 :
1592 9124 : s = &ReplicationSlotCtl->replication_slots[i];
1593 :
1594 : /* cannot change while ReplicationSlotCtlLock is held */
1595 9124 : if (!s->in_use)
1596 9068 : continue;
1597 :
1598 56 : if (SlotIsPhysical(s))
1599 38 : continue;
1600 :
1601 18 : SpinLockAcquire(&s->mutex);
1602 18 : invalidated = s->data.invalidated != RS_INVAL_NONE;
1603 18 : SpinLockRelease(&s->mutex);
1604 :
1605 18 : if (invalidated)
1606 6 : continue;
1607 :
1608 12 : found = true;
1609 12 : break;
1610 : }
1611 928 : LWLockRelease(ReplicationSlotControlLock);
1612 :
1613 928 : return found;
1614 : }
1615 :
1616 : /*
1617 : * Check whether the server's configuration supports using replication
1618 : * slots.
1619 : */
1620 : void
1621 3594 : CheckSlotRequirements(void)
1622 : {
1623 : /*
1624 : * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
1625 : * needs the same check.
1626 : */
1627 :
1628 3594 : if (max_replication_slots == 0)
1629 0 : ereport(ERROR,
1630 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1631 : errmsg("replication slots can only be used if \"max_replication_slots\" > 0")));
1632 :
1633 3594 : if (wal_level < WAL_LEVEL_REPLICA)
1634 0 : ereport(ERROR,
1635 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1636 : errmsg("replication slots can only be used if \"wal_level\" >= \"replica\"")));
1637 3594 : }
1638 :
1639 : /*
1640 : * Check whether the user has privilege to use replication slots.
1641 : */
1642 : void
1643 1164 : CheckSlotPermissions(void)
1644 : {
1645 1164 : if (!has_rolreplication(GetUserId()))
1646 10 : ereport(ERROR,
1647 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1648 : errmsg("permission denied to use replication slots"),
1649 : errdetail("Only roles with the %s attribute may use replication slots.",
1650 : "REPLICATION")));
1651 1154 : }
1652 :
1653 : /*
1654 : * Reserve WAL for the currently active slot.
1655 : *
1656 : * Compute and set restart_lsn in a manner that's appropriate for the type of
1657 : * the slot and concurrency safe.
1658 : */
1659 : void
1660 1244 : ReplicationSlotReserveWal(void)
1661 : {
1662 1244 : ReplicationSlot *slot = MyReplicationSlot;
1663 : XLogSegNo segno;
1664 : XLogRecPtr restart_lsn;
1665 :
1666 : Assert(slot != NULL);
1667 : Assert(!XLogRecPtrIsValid(slot->data.restart_lsn));
1668 : Assert(!XLogRecPtrIsValid(slot->last_saved_restart_lsn));
1669 :
1670 : /*
1671 : * The replication slot mechanism is used to prevent the removal of
1672 : * required WAL.
1673 : *
1674 : * Acquire an exclusive lock to prevent the checkpoint process from
1675 : * concurrently computing the minimum slot LSN (see
1676 : * CheckPointReplicationSlots). This ensures that the WAL reserved for
1677 : * replication cannot be removed during a checkpoint.
1678 : *
1679 : * The mechanism is reliable because if WAL reservation occurs first, the
1680 : * checkpoint must wait for the restart_lsn update before determining the
1681 : * minimum non-removable LSN. On the other hand, if the checkpoint happens
1682 : * first, subsequent WAL reservations will select positions at or beyond
1683 : * the redo pointer of that checkpoint.
1684 : */
1685 1244 : LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
1686 :
1687 : /*
1688 : * For logical slots log a standby snapshot and start logical decoding at
1689 : * exactly that position. That allows the slot to start up more quickly.
1690 : * But on a standby we cannot do WAL writes, so just use the replay
1691 : * pointer; effectively, an attempt to create a logical slot on standby
1692 : * will cause it to wait for an xl_running_xact record to be logged
1693 : * independently on the primary, so that a snapshot can be built using the
1694 : * record.
1695 : *
1696 : * None of this is needed (or indeed helpful) for physical slots as
1697 : * they'll start replay at the last logged checkpoint anyway. Instead,
1698 : * return the location of the last redo LSN, where a base backup has to
1699 : * start replay at.
1700 : */
1701 1244 : if (SlotIsPhysical(slot))
1702 310 : restart_lsn = GetRedoRecPtr();
1703 934 : else if (RecoveryInProgress())
1704 52 : restart_lsn = GetXLogReplayRecPtr(NULL);
1705 : else
1706 882 : restart_lsn = GetXLogInsertRecPtr();
1707 :
1708 1244 : SpinLockAcquire(&slot->mutex);
1709 1244 : slot->data.restart_lsn = restart_lsn;
1710 1244 : SpinLockRelease(&slot->mutex);
1711 :
1712 : /* prevent WAL removal as fast as possible */
1713 1244 : ReplicationSlotsComputeRequiredLSN();
1714 :
1715 : /* Checkpoint shouldn't remove the required WAL. */
1716 1244 : XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size);
1717 1244 : if (XLogGetLastRemovedSegno() >= segno)
1718 0 : elog(ERROR, "WAL required by replication slot %s has been removed concurrently",
1719 : NameStr(slot->data.name));
1720 :
1721 1244 : LWLockRelease(ReplicationSlotAllocationLock);
1722 :
1723 1244 : if (!RecoveryInProgress() && SlotIsLogical(slot))
1724 : {
1725 : XLogRecPtr flushptr;
1726 :
1727 : /* make sure we have enough information to start */
1728 882 : flushptr = LogStandbySnapshot();
1729 :
1730 : /* and make sure it's fsynced to disk */
1731 882 : XLogFlush(flushptr);
1732 : }
1733 1244 : }
1734 :
1735 : /*
1736 : * Report that replication slot needs to be invalidated
1737 : */
1738 : static void
1739 46 : ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
1740 : bool terminating,
1741 : int pid,
1742 : NameData slotname,
1743 : XLogRecPtr restart_lsn,
1744 : XLogRecPtr oldestLSN,
1745 : TransactionId snapshotConflictHorizon,
1746 : long slot_idle_seconds)
1747 : {
1748 : StringInfoData err_detail;
1749 : StringInfoData err_hint;
1750 :
1751 46 : initStringInfo(&err_detail);
1752 46 : initStringInfo(&err_hint);
1753 :
1754 46 : switch (cause)
1755 : {
1756 14 : case RS_INVAL_WAL_REMOVED:
1757 : {
1758 14 : uint64 ex = oldestLSN - restart_lsn;
1759 :
1760 14 : appendStringInfo(&err_detail,
1761 14 : ngettext("The slot's restart_lsn %X/%08X exceeds the limit by %" PRIu64 " byte.",
1762 : "The slot's restart_lsn %X/%08X exceeds the limit by %" PRIu64 " bytes.",
1763 : ex),
1764 14 : LSN_FORMAT_ARGS(restart_lsn),
1765 : ex);
1766 : /* translator: %s is a GUC variable name */
1767 14 : appendStringInfo(&err_hint, _("You might need to increase \"%s\"."),
1768 : "max_slot_wal_keep_size");
1769 14 : break;
1770 : }
1771 24 : case RS_INVAL_HORIZON:
1772 24 : appendStringInfo(&err_detail, _("The slot conflicted with xid horizon %u."),
1773 : snapshotConflictHorizon);
1774 24 : break;
1775 :
1776 8 : case RS_INVAL_WAL_LEVEL:
1777 8 : appendStringInfoString(&err_detail, _("Logical decoding on standby requires the primary server to either set \"wal_level\" >= \"logical\" or have at least one logical slot when \"wal_level\" = \"replica\"."));
1778 8 : break;
1779 :
1780 0 : case RS_INVAL_IDLE_TIMEOUT:
1781 : {
1782 : /* translator: %s is a GUC variable name */
1783 0 : appendStringInfo(&err_detail, _("The slot's idle time of %lds exceeds the configured \"%s\" duration of %ds."),
1784 : slot_idle_seconds, "idle_replication_slot_timeout",
1785 : idle_replication_slot_timeout_secs);
1786 : /* translator: %s is a GUC variable name */
1787 0 : appendStringInfo(&err_hint, _("You might need to increase \"%s\"."),
1788 : "idle_replication_slot_timeout");
1789 0 : break;
1790 : }
1791 : case RS_INVAL_NONE:
1792 : pg_unreachable();
1793 : }
1794 :
1795 46 : ereport(LOG,
1796 : terminating ?
1797 : errmsg("terminating process %d to release replication slot \"%s\"",
1798 : pid, NameStr(slotname)) :
1799 : errmsg("invalidating obsolete replication slot \"%s\"",
1800 : NameStr(slotname)),
1801 : errdetail_internal("%s", err_detail.data),
1802 : err_hint.len ? errhint("%s", err_hint.data) : 0);
1803 :
1804 46 : pfree(err_detail.data);
1805 46 : pfree(err_hint.data);
1806 46 : }
1807 :
1808 : /*
1809 : * Can we invalidate an idle replication slot?
1810 : *
1811 : * Idle timeout invalidation is allowed only when:
1812 : *
1813 : * 1. Idle timeout is set
1814 : * 2. Slot has reserved WAL
1815 : * 3. Slot is inactive
1816 : * 4. The slot is not being synced from the primary while the server is in
1817 : * recovery. This is because synced slots are always considered to be
1818 : * inactive because they don't perform logical decoding to produce changes.
1819 : */
1820 : static inline bool
1821 738 : CanInvalidateIdleSlot(ReplicationSlot *s)
1822 : {
1823 738 : return (idle_replication_slot_timeout_secs != 0 &&
1824 0 : XLogRecPtrIsValid(s->data.restart_lsn) &&
1825 738 : s->inactive_since > 0 &&
1826 0 : !(RecoveryInProgress() && s->data.synced));
1827 : }
1828 :
1829 : /*
1830 : * DetermineSlotInvalidationCause - Determine the cause for which a slot
1831 : * becomes invalid among the given possible causes.
1832 : *
1833 : * This function sequentially checks all possible invalidation causes and
1834 : * returns the first one for which the slot is eligible for invalidation.
1835 : */
1836 : static ReplicationSlotInvalidationCause
1837 804 : DetermineSlotInvalidationCause(uint32 possible_causes, ReplicationSlot *s,
1838 : XLogRecPtr oldestLSN, Oid dboid,
1839 : TransactionId snapshotConflictHorizon,
1840 : TimestampTz *inactive_since, TimestampTz now)
1841 : {
1842 : Assert(possible_causes != RS_INVAL_NONE);
1843 :
1844 804 : if (possible_causes & RS_INVAL_WAL_REMOVED)
1845 : {
1846 752 : XLogRecPtr restart_lsn = s->data.restart_lsn;
1847 :
1848 752 : if (XLogRecPtrIsValid(restart_lsn) &&
1849 : restart_lsn < oldestLSN)
1850 14 : return RS_INVAL_WAL_REMOVED;
1851 : }
1852 :
1853 790 : if (possible_causes & RS_INVAL_HORIZON)
1854 : {
1855 : /* invalid DB oid signals a shared relation */
1856 44 : if (SlotIsLogical(s) &&
1857 34 : (dboid == InvalidOid || dboid == s->data.database))
1858 : {
1859 44 : TransactionId effective_xmin = s->effective_xmin;
1860 44 : TransactionId catalog_effective_xmin = s->effective_catalog_xmin;
1861 :
1862 44 : if (TransactionIdIsValid(effective_xmin) &&
1863 0 : TransactionIdPrecedesOrEquals(effective_xmin,
1864 : snapshotConflictHorizon))
1865 0 : return RS_INVAL_HORIZON;
1866 88 : else if (TransactionIdIsValid(catalog_effective_xmin) &&
1867 44 : TransactionIdPrecedesOrEquals(catalog_effective_xmin,
1868 : snapshotConflictHorizon))
1869 24 : return RS_INVAL_HORIZON;
1870 : }
1871 : }
1872 :
1873 766 : if (possible_causes & RS_INVAL_WAL_LEVEL)
1874 : {
1875 8 : if (SlotIsLogical(s))
1876 8 : return RS_INVAL_WAL_LEVEL;
1877 : }
1878 :
1879 758 : if (possible_causes & RS_INVAL_IDLE_TIMEOUT)
1880 : {
1881 : Assert(now > 0);
1882 :
1883 738 : if (CanInvalidateIdleSlot(s))
1884 : {
1885 : /*
1886 : * Simulate the invalidation due to idle_timeout to test the
1887 : * timeout behavior promptly, without waiting for it to trigger
1888 : * naturally.
1889 : */
1890 : #ifdef USE_INJECTION_POINTS
1891 0 : if (IS_INJECTION_POINT_ATTACHED("slot-timeout-inval"))
1892 : {
1893 0 : *inactive_since = 0; /* since the beginning of time */
1894 0 : return RS_INVAL_IDLE_TIMEOUT;
1895 : }
1896 : #endif
1897 :
1898 : /*
1899 : * Check if the slot needs to be invalidated due to
1900 : * idle_replication_slot_timeout GUC.
1901 : */
1902 0 : if (TimestampDifferenceExceedsSeconds(s->inactive_since, now,
1903 : idle_replication_slot_timeout_secs))
1904 : {
1905 0 : *inactive_since = s->inactive_since;
1906 0 : return RS_INVAL_IDLE_TIMEOUT;
1907 : }
1908 : }
1909 : }
1910 :
1911 758 : return RS_INVAL_NONE;
1912 : }
1913 :
1914 : /*
1915 : * Helper for InvalidateObsoleteReplicationSlots
1916 : *
1917 : * Acquires the given slot and mark it invalid, if necessary and possible.
1918 : *
1919 : * Returns true if the slot was invalidated.
1920 : *
1921 : * Set *released_lock_out if ReplicationSlotControlLock was released in the
1922 : * interim (and in that case we're not holding the lock at return, otherwise
1923 : * we are).
1924 : *
1925 : * This is inherently racy, because we release the LWLock
1926 : * for syscalls, so caller must restart if we return true.
1927 : */
1928 : static bool
1929 882 : InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
1930 : ReplicationSlot *s,
1931 : XLogRecPtr oldestLSN,
1932 : Oid dboid, TransactionId snapshotConflictHorizon,
1933 : bool *released_lock_out)
1934 : {
1935 882 : int last_signaled_pid = 0;
1936 882 : bool released_lock = false;
1937 882 : bool invalidated = false;
1938 882 : TimestampTz inactive_since = 0;
1939 :
1940 : for (;;)
1941 14 : {
1942 : XLogRecPtr restart_lsn;
1943 : NameData slotname;
1944 896 : int active_pid = 0;
1945 896 : ReplicationSlotInvalidationCause invalidation_cause = RS_INVAL_NONE;
1946 896 : TimestampTz now = 0;
1947 896 : long slot_idle_secs = 0;
1948 :
1949 : Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
1950 :
1951 896 : if (!s->in_use)
1952 : {
1953 0 : if (released_lock)
1954 0 : LWLockRelease(ReplicationSlotControlLock);
1955 0 : break;
1956 : }
1957 :
1958 896 : if (possible_causes & RS_INVAL_IDLE_TIMEOUT)
1959 : {
1960 : /*
1961 : * Assign the current time here to avoid system call overhead
1962 : * while holding the spinlock in subsequent code.
1963 : */
1964 780 : now = GetCurrentTimestamp();
1965 : }
1966 :
1967 : /*
1968 : * Check if the slot needs to be invalidated. If it needs to be
1969 : * invalidated, and is not currently acquired, acquire it and mark it
1970 : * as having been invalidated. We do this with the spinlock held to
1971 : * avoid race conditions -- for example the restart_lsn could move
1972 : * forward, or the slot could be dropped.
1973 : */
1974 896 : SpinLockAcquire(&s->mutex);
1975 :
1976 896 : restart_lsn = s->data.restart_lsn;
1977 :
1978 : /* we do nothing if the slot is already invalid */
1979 896 : if (s->data.invalidated == RS_INVAL_NONE)
1980 804 : invalidation_cause = DetermineSlotInvalidationCause(possible_causes,
1981 : s, oldestLSN,
1982 : dboid,
1983 : snapshotConflictHorizon,
1984 : &inactive_since,
1985 : now);
1986 :
1987 : /* if there's no invalidation, we're done */
1988 896 : if (invalidation_cause == RS_INVAL_NONE)
1989 : {
1990 850 : SpinLockRelease(&s->mutex);
1991 850 : if (released_lock)
1992 0 : LWLockRelease(ReplicationSlotControlLock);
1993 850 : break;
1994 : }
1995 :
1996 46 : slotname = s->data.name;
1997 46 : active_pid = s->active_pid;
1998 :
1999 : /*
2000 : * If the slot can be acquired, do so and mark it invalidated
2001 : * immediately. Otherwise we'll signal the owning process, below, and
2002 : * retry.
2003 : *
2004 : * Note: Unlike other slot attributes, slot's inactive_since can't be
2005 : * changed until the acquired slot is released or the owning process
2006 : * is terminated. So, the inactive slot can only be invalidated
2007 : * immediately without being terminated.
2008 : */
2009 46 : if (active_pid == 0)
2010 : {
2011 32 : MyReplicationSlot = s;
2012 32 : s->active_pid = MyProcPid;
2013 32 : s->data.invalidated = invalidation_cause;
2014 :
2015 : /*
2016 : * XXX: We should consider not overwriting restart_lsn and instead
2017 : * just rely on .invalidated.
2018 : */
2019 32 : if (invalidation_cause == RS_INVAL_WAL_REMOVED)
2020 : {
2021 10 : s->data.restart_lsn = InvalidXLogRecPtr;
2022 10 : s->last_saved_restart_lsn = InvalidXLogRecPtr;
2023 : }
2024 :
2025 : /* Let caller know */
2026 32 : invalidated = true;
2027 : }
2028 :
2029 46 : SpinLockRelease(&s->mutex);
2030 :
2031 : /*
2032 : * Calculate the idle time duration of the slot if slot is marked
2033 : * invalidated with RS_INVAL_IDLE_TIMEOUT.
2034 : */
2035 46 : if (invalidation_cause == RS_INVAL_IDLE_TIMEOUT)
2036 : {
2037 : int slot_idle_usecs;
2038 :
2039 0 : TimestampDifference(inactive_since, now, &slot_idle_secs,
2040 : &slot_idle_usecs);
2041 : }
2042 :
2043 46 : if (active_pid != 0)
2044 : {
2045 : /*
2046 : * Prepare the sleep on the slot's condition variable before
2047 : * releasing the lock, to close a possible race condition if the
2048 : * slot is released before the sleep below.
2049 : */
2050 14 : ConditionVariablePrepareToSleep(&s->active_cv);
2051 :
2052 14 : LWLockRelease(ReplicationSlotControlLock);
2053 14 : released_lock = true;
2054 :
2055 : /*
2056 : * Signal to terminate the process that owns the slot, if we
2057 : * haven't already signalled it. (Avoidance of repeated
2058 : * signalling is the only reason for there to be a loop in this
2059 : * routine; otherwise we could rely on caller's restart loop.)
2060 : *
2061 : * There is the race condition that other process may own the slot
2062 : * after its current owner process is terminated and before this
2063 : * process owns it. To handle that, we signal only if the PID of
2064 : * the owning process has changed from the previous time. (This
2065 : * logic assumes that the same PID is not reused very quickly.)
2066 : */
2067 14 : if (last_signaled_pid != active_pid)
2068 : {
2069 14 : ReportSlotInvalidation(invalidation_cause, true, active_pid,
2070 : slotname, restart_lsn,
2071 : oldestLSN, snapshotConflictHorizon,
2072 : slot_idle_secs);
2073 :
2074 14 : if (MyBackendType == B_STARTUP)
2075 10 : (void) SendProcSignal(active_pid,
2076 : PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT,
2077 : INVALID_PROC_NUMBER);
2078 : else
2079 4 : (void) kill(active_pid, SIGTERM);
2080 :
2081 14 : last_signaled_pid = active_pid;
2082 : }
2083 :
2084 : /* Wait until the slot is released. */
2085 14 : ConditionVariableSleep(&s->active_cv,
2086 : WAIT_EVENT_REPLICATION_SLOT_DROP);
2087 :
2088 : /*
2089 : * Re-acquire lock and start over; we expect to invalidate the
2090 : * slot next time (unless another process acquires the slot in the
2091 : * meantime).
2092 : *
2093 : * Note: It is possible for a slot to advance its restart_lsn or
2094 : * xmin values sufficiently between when we release the mutex and
2095 : * when we recheck, moving from a conflicting state to a non
2096 : * conflicting state. This is intentional and safe: if the slot
2097 : * has caught up while we're busy here, the resources we were
2098 : * concerned about (WAL segments or tuples) have not yet been
2099 : * removed, and there's no reason to invalidate the slot.
2100 : */
2101 14 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
2102 14 : continue;
2103 : }
2104 : else
2105 : {
2106 : /*
2107 : * We hold the slot now and have already invalidated it; flush it
2108 : * to ensure that state persists.
2109 : *
2110 : * Don't want to hold ReplicationSlotControlLock across file
2111 : * system operations, so release it now but be sure to tell caller
2112 : * to restart from scratch.
2113 : */
2114 32 : LWLockRelease(ReplicationSlotControlLock);
2115 32 : released_lock = true;
2116 :
2117 : /* Make sure the invalidated state persists across server restart */
2118 32 : ReplicationSlotMarkDirty();
2119 32 : ReplicationSlotSave();
2120 32 : ReplicationSlotRelease();
2121 :
2122 32 : ReportSlotInvalidation(invalidation_cause, false, active_pid,
2123 : slotname, restart_lsn,
2124 : oldestLSN, snapshotConflictHorizon,
2125 : slot_idle_secs);
2126 :
2127 : /* done with this slot for now */
2128 32 : break;
2129 : }
2130 : }
2131 :
2132 : Assert(released_lock == !LWLockHeldByMe(ReplicationSlotControlLock));
2133 :
2134 882 : *released_lock_out = released_lock;
2135 882 : return invalidated;
2136 : }
2137 :
2138 : /*
2139 : * Invalidate slots that require resources about to be removed.
2140 : *
2141 : * Returns true when any slot have got invalidated.
2142 : *
2143 : * Whether a slot needs to be invalidated depends on the invalidation cause.
2144 : * A slot is invalidated if it:
2145 : * - RS_INVAL_WAL_REMOVED: requires a LSN older than the given segment
2146 : * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given
2147 : * db; dboid may be InvalidOid for shared relations
2148 : * - RS_INVAL_WAL_LEVEL: is a logical slot and effective_wal_level is not
2149 : * logical.
2150 : * - RS_INVAL_IDLE_TIMEOUT: has been idle longer than the configured
2151 : * "idle_replication_slot_timeout" duration.
2152 : *
2153 : * Note: This function attempts to invalidate the slot for multiple possible
2154 : * causes in a single pass, minimizing redundant iterations. The "cause"
2155 : * parameter can be a MASK representing one or more of the defined causes.
2156 : *
2157 : * If it invalidates the last logical slot in the cluster, it requests to
2158 : * disable logical decoding.
2159 : *
2160 : * NB - this runs as part of checkpoint, so avoid raising errors if possible.
2161 : */
2162 : bool
2163 3606 : InvalidateObsoleteReplicationSlots(uint32 possible_causes,
2164 : XLogSegNo oldestSegno, Oid dboid,
2165 : TransactionId snapshotConflictHorizon)
2166 : {
2167 : XLogRecPtr oldestLSN;
2168 3606 : bool invalidated = false;
2169 3606 : bool invalidated_logical = false;
2170 : bool found_valid_logicalslot;
2171 :
2172 : Assert(!(possible_causes & RS_INVAL_HORIZON) || TransactionIdIsValid(snapshotConflictHorizon));
2173 : Assert(!(possible_causes & RS_INVAL_WAL_REMOVED) || oldestSegno > 0);
2174 : Assert(possible_causes != RS_INVAL_NONE);
2175 :
2176 3606 : if (max_replication_slots == 0)
2177 2 : return invalidated;
2178 :
2179 3604 : XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
2180 :
2181 3636 : restart:
2182 3636 : found_valid_logicalslot = false;
2183 3636 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
2184 38970 : for (int i = 0; i < max_replication_slots; i++)
2185 : {
2186 35366 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
2187 35366 : bool released_lock = false;
2188 :
2189 35366 : if (!s->in_use)
2190 34484 : continue;
2191 :
2192 : /* Prevent invalidation of logical slots during binary upgrade */
2193 896 : if (SlotIsLogical(s) && IsBinaryUpgrade)
2194 : {
2195 14 : SpinLockAcquire(&s->mutex);
2196 14 : found_valid_logicalslot |= (s->data.invalidated == RS_INVAL_NONE);
2197 14 : SpinLockRelease(&s->mutex);
2198 :
2199 14 : continue;
2200 : }
2201 :
2202 882 : if (InvalidatePossiblyObsoleteSlot(possible_causes, s, oldestLSN,
2203 : dboid, snapshotConflictHorizon,
2204 : &released_lock))
2205 : {
2206 : Assert(released_lock);
2207 :
2208 : /* Remember we have invalidated a physical or logical slot */
2209 32 : invalidated = true;
2210 :
2211 : /*
2212 : * Additionally, remember we have invalidated a logical slot as we
2213 : * can request disabling logical decoding later.
2214 : */
2215 32 : if (SlotIsLogical(s))
2216 26 : invalidated_logical = true;
2217 : }
2218 : else
2219 : {
2220 : /*
2221 : * We need to check if the slot is invalidated here since
2222 : * InvalidatePossiblyObsoleteSlot() returns false also if the slot
2223 : * is already invalidated.
2224 : */
2225 850 : SpinLockAcquire(&s->mutex);
2226 1700 : found_valid_logicalslot |=
2227 850 : (SlotIsLogical(s) && (s->data.invalidated == RS_INVAL_NONE));
2228 850 : SpinLockRelease(&s->mutex);
2229 : }
2230 :
2231 : /* if the lock was released, start from scratch */
2232 882 : if (released_lock)
2233 32 : goto restart;
2234 : }
2235 3604 : LWLockRelease(ReplicationSlotControlLock);
2236 :
2237 : /*
2238 : * If any slots have been invalidated, recalculate the resource limits.
2239 : */
2240 3604 : if (invalidated)
2241 : {
2242 22 : ReplicationSlotsComputeRequiredXmin(false);
2243 22 : ReplicationSlotsComputeRequiredLSN();
2244 : }
2245 :
2246 : /*
2247 : * Request the checkpointer to disable logical decoding if no valid
2248 : * logical slots remain. If called by the checkpointer during a
2249 : * checkpoint, only the request is initiated; actual deactivation is
2250 : * deferred until after the checkpoint completes.
2251 : */
2252 3604 : if (invalidated_logical && !found_valid_logicalslot)
2253 16 : RequestDisableLogicalDecoding();
2254 :
2255 3604 : return invalidated;
2256 : }
2257 :
2258 : /*
2259 : * Flush all replication slots to disk.
2260 : *
2261 : * It is convenient to flush dirty replication slots at the time of checkpoint.
2262 : * Additionally, in case of a shutdown checkpoint, we also identify the slots
2263 : * for which the confirmed_flush LSN has been updated since the last time it
2264 : * was saved and flush them.
2265 : */
2266 : void
2267 3556 : CheckPointReplicationSlots(bool is_shutdown)
2268 : {
2269 : int i;
2270 3556 : bool last_saved_restart_lsn_updated = false;
2271 :
2272 3556 : elog(DEBUG1, "performing replication slot checkpoint");
2273 :
2274 : /*
2275 : * Prevent any slot from being created/dropped while we're active. As we
2276 : * explicitly do *not* want to block iterating over replication_slots or
2277 : * acquiring a slot we cannot take the control lock - but that's OK,
2278 : * because holding ReplicationSlotAllocationLock is strictly stronger, and
2279 : * enough to guarantee that nobody can change the in_use bits on us.
2280 : *
2281 : * Additionally, acquiring the Allocation lock is necessary to serialize
2282 : * the slot flush process with concurrent slot WAL reservation. This
2283 : * ensures that the WAL position being reserved is either flushed to disk
2284 : * or is beyond or equal to the redo pointer of the current checkpoint
2285 : * (See ReplicationSlotReserveWal for details).
2286 : */
2287 3556 : LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
2288 :
2289 38550 : for (i = 0; i < max_replication_slots; i++)
2290 : {
2291 34994 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
2292 : char path[MAXPGPATH];
2293 :
2294 34994 : if (!s->in_use)
2295 34212 : continue;
2296 :
2297 : /* save the slot to disk, locking is handled in SaveSlotToPath() */
2298 782 : sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(s->data.name));
2299 :
2300 : /*
2301 : * Slot's data is not flushed each time the confirmed_flush LSN is
2302 : * updated as that could lead to frequent writes. However, we decide
2303 : * to force a flush of all logical slot's data at the time of shutdown
2304 : * if the confirmed_flush LSN is changed since we last flushed it to
2305 : * disk. This helps in avoiding an unnecessary retreat of the
2306 : * confirmed_flush LSN after restart.
2307 : */
2308 782 : if (is_shutdown && SlotIsLogical(s))
2309 : {
2310 168 : SpinLockAcquire(&s->mutex);
2311 :
2312 168 : if (s->data.invalidated == RS_INVAL_NONE &&
2313 166 : s->data.confirmed_flush > s->last_saved_confirmed_flush)
2314 : {
2315 72 : s->just_dirtied = true;
2316 72 : s->dirty = true;
2317 : }
2318 168 : SpinLockRelease(&s->mutex);
2319 : }
2320 :
2321 : /*
2322 : * Track if we're going to update slot's last_saved_restart_lsn. We
2323 : * need this to know if we need to recompute the required LSN.
2324 : */
2325 782 : if (s->last_saved_restart_lsn != s->data.restart_lsn)
2326 400 : last_saved_restart_lsn_updated = true;
2327 :
2328 782 : SaveSlotToPath(s, path, LOG);
2329 : }
2330 3556 : LWLockRelease(ReplicationSlotAllocationLock);
2331 :
2332 : /*
2333 : * Recompute the required LSN if SaveSlotToPath() updated
2334 : * last_saved_restart_lsn for any slot.
2335 : */
2336 3556 : if (last_saved_restart_lsn_updated)
2337 400 : ReplicationSlotsComputeRequiredLSN();
2338 3556 : }
2339 :
2340 : /*
2341 : * Load all replication slots from disk into memory at server startup. This
2342 : * needs to be run before we start crash recovery.
2343 : */
2344 : void
2345 1978 : StartupReplicationSlots(void)
2346 : {
2347 : DIR *replication_dir;
2348 : struct dirent *replication_de;
2349 :
2350 1978 : elog(DEBUG1, "starting up replication slots");
2351 :
2352 : /* restore all slots by iterating over all on-disk entries */
2353 1978 : replication_dir = AllocateDir(PG_REPLSLOT_DIR);
2354 6156 : while ((replication_de = ReadDir(replication_dir, PG_REPLSLOT_DIR)) != NULL)
2355 : {
2356 : char path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
2357 : PGFileType de_type;
2358 :
2359 4182 : if (strcmp(replication_de->d_name, ".") == 0 ||
2360 2208 : strcmp(replication_de->d_name, "..") == 0)
2361 3948 : continue;
2362 :
2363 234 : snprintf(path, sizeof(path), "%s/%s", PG_REPLSLOT_DIR, replication_de->d_name);
2364 234 : de_type = get_dirent_type(path, replication_de, false, DEBUG1);
2365 :
2366 : /* we're only creating directories here, skip if it's not our's */
2367 234 : if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_DIR)
2368 0 : continue;
2369 :
2370 : /* we crashed while a slot was being setup or deleted, clean up */
2371 234 : if (pg_str_endswith(replication_de->d_name, ".tmp"))
2372 : {
2373 0 : if (!rmtree(path, true))
2374 : {
2375 0 : ereport(WARNING,
2376 : (errmsg("could not remove directory \"%s\"",
2377 : path)));
2378 0 : continue;
2379 : }
2380 0 : fsync_fname(PG_REPLSLOT_DIR, true);
2381 0 : continue;
2382 : }
2383 :
2384 : /* looks like a slot in a normal state, restore */
2385 234 : RestoreSlotFromDisk(replication_de->d_name);
2386 : }
2387 1974 : FreeDir(replication_dir);
2388 :
2389 : /* currently no slots exist, we're done. */
2390 1974 : if (max_replication_slots <= 0)
2391 2 : return;
2392 :
2393 : /* Now that we have recovered all the data, compute replication xmin */
2394 1972 : ReplicationSlotsComputeRequiredXmin(false);
2395 1972 : ReplicationSlotsComputeRequiredLSN();
2396 : }
2397 :
2398 : /* ----
2399 : * Manipulation of on-disk state of replication slots
2400 : *
2401 : * NB: none of the routines below should take any notice whether a slot is the
2402 : * current one or not, that's all handled a layer above.
2403 : * ----
2404 : */
2405 : static void
2406 1340 : CreateSlotOnDisk(ReplicationSlot *slot)
2407 : {
2408 : char tmppath[MAXPGPATH];
2409 : char path[MAXPGPATH];
2410 : struct stat st;
2411 :
2412 : /*
2413 : * No need to take out the io_in_progress_lock, nobody else can see this
2414 : * slot yet, so nobody else will write. We're reusing SaveSlotToPath which
2415 : * takes out the lock, if we'd take the lock here, we'd deadlock.
2416 : */
2417 :
2418 1340 : sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(slot->data.name));
2419 1340 : sprintf(tmppath, "%s/%s.tmp", PG_REPLSLOT_DIR, NameStr(slot->data.name));
2420 :
2421 : /*
2422 : * It's just barely possible that some previous effort to create or drop a
2423 : * slot with this name left a temp directory lying around. If that seems
2424 : * to be the case, try to remove it. If the rmtree() fails, we'll error
2425 : * out at the MakePGDirectory() below, so we don't bother checking
2426 : * success.
2427 : */
2428 1340 : if (stat(tmppath, &st) == 0 && S_ISDIR(st.st_mode))
2429 0 : rmtree(tmppath, true);
2430 :
2431 : /* Create and fsync the temporary slot directory. */
2432 1340 : if (MakePGDirectory(tmppath) < 0)
2433 0 : ereport(ERROR,
2434 : (errcode_for_file_access(),
2435 : errmsg("could not create directory \"%s\": %m",
2436 : tmppath)));
2437 1340 : fsync_fname(tmppath, true);
2438 :
2439 : /* Write the actual state file. */
2440 1340 : slot->dirty = true; /* signal that we really need to write */
2441 1340 : SaveSlotToPath(slot, tmppath, ERROR);
2442 :
2443 : /* Rename the directory into place. */
2444 1340 : if (rename(tmppath, path) != 0)
2445 0 : ereport(ERROR,
2446 : (errcode_for_file_access(),
2447 : errmsg("could not rename file \"%s\" to \"%s\": %m",
2448 : tmppath, path)));
2449 :
2450 : /*
2451 : * If we'd now fail - really unlikely - we wouldn't know whether this slot
2452 : * would persist after an OS crash or not - so, force a restart. The
2453 : * restart would try to fsync this again till it works.
2454 : */
2455 1340 : START_CRIT_SECTION();
2456 :
2457 1340 : fsync_fname(path, true);
2458 1340 : fsync_fname(PG_REPLSLOT_DIR, true);
2459 :
2460 1340 : END_CRIT_SECTION();
2461 1340 : }
2462 :
2463 : /*
2464 : * Shared functionality between saving and creating a replication slot.
2465 : */
2466 : static void
2467 4906 : SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
2468 : {
2469 : char tmppath[MAXPGPATH];
2470 : char path[MAXPGPATH];
2471 : int fd;
2472 : ReplicationSlotOnDisk cp;
2473 : bool was_dirty;
2474 :
2475 : /* first check whether there's something to write out */
2476 4906 : SpinLockAcquire(&slot->mutex);
2477 4906 : was_dirty = slot->dirty;
2478 4906 : slot->just_dirtied = false;
2479 4906 : SpinLockRelease(&slot->mutex);
2480 :
2481 : /* and don't do anything if there's nothing to write */
2482 4906 : if (!was_dirty)
2483 288 : return;
2484 :
2485 4618 : LWLockAcquire(&slot->io_in_progress_lock, LW_EXCLUSIVE);
2486 :
2487 : /* silence valgrind :( */
2488 4618 : memset(&cp, 0, sizeof(ReplicationSlotOnDisk));
2489 :
2490 4618 : sprintf(tmppath, "%s/state.tmp", dir);
2491 4618 : sprintf(path, "%s/state", dir);
2492 :
2493 4618 : fd = OpenTransientFile(tmppath, O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
2494 4618 : if (fd < 0)
2495 : {
2496 : /*
2497 : * If not an ERROR, then release the lock before returning. In case
2498 : * of an ERROR, the error recovery path automatically releases the
2499 : * lock, but no harm in explicitly releasing even in that case. Note
2500 : * that LWLockRelease() could affect errno.
2501 : */
2502 0 : int save_errno = errno;
2503 :
2504 0 : LWLockRelease(&slot->io_in_progress_lock);
2505 0 : errno = save_errno;
2506 0 : ereport(elevel,
2507 : (errcode_for_file_access(),
2508 : errmsg("could not create file \"%s\": %m",
2509 : tmppath)));
2510 0 : return;
2511 : }
2512 :
2513 4618 : cp.magic = SLOT_MAGIC;
2514 4618 : INIT_CRC32C(cp.checksum);
2515 4618 : cp.version = SLOT_VERSION;
2516 4618 : cp.length = ReplicationSlotOnDiskV2Size;
2517 :
2518 4618 : SpinLockAcquire(&slot->mutex);
2519 :
2520 4618 : memcpy(&cp.slotdata, &slot->data, sizeof(ReplicationSlotPersistentData));
2521 :
2522 4618 : SpinLockRelease(&slot->mutex);
2523 :
2524 4618 : COMP_CRC32C(cp.checksum,
2525 : (char *) (&cp) + ReplicationSlotOnDiskNotChecksummedSize,
2526 : ReplicationSlotOnDiskChecksummedSize);
2527 4618 : FIN_CRC32C(cp.checksum);
2528 :
2529 4618 : errno = 0;
2530 4618 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_WRITE);
2531 4618 : if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
2532 : {
2533 0 : int save_errno = errno;
2534 :
2535 0 : pgstat_report_wait_end();
2536 0 : CloseTransientFile(fd);
2537 0 : unlink(tmppath);
2538 0 : LWLockRelease(&slot->io_in_progress_lock);
2539 :
2540 : /* if write didn't set errno, assume problem is no disk space */
2541 0 : errno = save_errno ? save_errno : ENOSPC;
2542 0 : ereport(elevel,
2543 : (errcode_for_file_access(),
2544 : errmsg("could not write to file \"%s\": %m",
2545 : tmppath)));
2546 0 : return;
2547 : }
2548 4618 : pgstat_report_wait_end();
2549 :
2550 : /* fsync the temporary file */
2551 4618 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_SYNC);
2552 4618 : if (pg_fsync(fd) != 0)
2553 : {
2554 0 : int save_errno = errno;
2555 :
2556 0 : pgstat_report_wait_end();
2557 0 : CloseTransientFile(fd);
2558 0 : unlink(tmppath);
2559 0 : LWLockRelease(&slot->io_in_progress_lock);
2560 :
2561 0 : errno = save_errno;
2562 0 : ereport(elevel,
2563 : (errcode_for_file_access(),
2564 : errmsg("could not fsync file \"%s\": %m",
2565 : tmppath)));
2566 0 : return;
2567 : }
2568 4618 : pgstat_report_wait_end();
2569 :
2570 4618 : if (CloseTransientFile(fd) != 0)
2571 : {
2572 0 : int save_errno = errno;
2573 :
2574 0 : unlink(tmppath);
2575 0 : LWLockRelease(&slot->io_in_progress_lock);
2576 :
2577 0 : errno = save_errno;
2578 0 : ereport(elevel,
2579 : (errcode_for_file_access(),
2580 : errmsg("could not close file \"%s\": %m",
2581 : tmppath)));
2582 0 : return;
2583 : }
2584 :
2585 : /* rename to permanent file, fsync file and directory */
2586 4618 : if (rename(tmppath, path) != 0)
2587 : {
2588 0 : int save_errno = errno;
2589 :
2590 0 : unlink(tmppath);
2591 0 : LWLockRelease(&slot->io_in_progress_lock);
2592 :
2593 0 : errno = save_errno;
2594 0 : ereport(elevel,
2595 : (errcode_for_file_access(),
2596 : errmsg("could not rename file \"%s\" to \"%s\": %m",
2597 : tmppath, path)));
2598 0 : return;
2599 : }
2600 :
2601 : /*
2602 : * Check CreateSlotOnDisk() for the reasoning of using a critical section.
2603 : */
2604 4618 : START_CRIT_SECTION();
2605 :
2606 4618 : fsync_fname(path, false);
2607 4618 : fsync_fname(dir, true);
2608 4618 : fsync_fname(PG_REPLSLOT_DIR, true);
2609 :
2610 4618 : END_CRIT_SECTION();
2611 :
2612 : /*
2613 : * Successfully wrote, unset dirty bit, unless somebody dirtied again
2614 : * already and remember the confirmed_flush LSN value.
2615 : */
2616 4618 : SpinLockAcquire(&slot->mutex);
2617 4618 : if (!slot->just_dirtied)
2618 4588 : slot->dirty = false;
2619 4618 : slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
2620 4618 : slot->last_saved_restart_lsn = cp.slotdata.restart_lsn;
2621 4618 : SpinLockRelease(&slot->mutex);
2622 :
2623 4618 : LWLockRelease(&slot->io_in_progress_lock);
2624 : }
2625 :
2626 : /*
2627 : * Load a single slot from disk into memory.
2628 : */
2629 : static void
2630 234 : RestoreSlotFromDisk(const char *name)
2631 : {
2632 : ReplicationSlotOnDisk cp;
2633 : int i;
2634 : char slotdir[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
2635 : char path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR) + 10];
2636 : int fd;
2637 234 : bool restored = false;
2638 : int readBytes;
2639 : pg_crc32c checksum;
2640 234 : TimestampTz now = 0;
2641 :
2642 : /* no need to lock here, no concurrent access allowed yet */
2643 :
2644 : /* delete temp file if it exists */
2645 234 : sprintf(slotdir, "%s/%s", PG_REPLSLOT_DIR, name);
2646 234 : sprintf(path, "%s/state.tmp", slotdir);
2647 234 : if (unlink(path) < 0 && errno != ENOENT)
2648 0 : ereport(PANIC,
2649 : (errcode_for_file_access(),
2650 : errmsg("could not remove file \"%s\": %m", path)));
2651 :
2652 234 : sprintf(path, "%s/state", slotdir);
2653 :
2654 234 : elog(DEBUG1, "restoring replication slot from \"%s\"", path);
2655 :
2656 : /* on some operating systems fsyncing a file requires O_RDWR */
2657 234 : fd = OpenTransientFile(path, O_RDWR | PG_BINARY);
2658 :
2659 : /*
2660 : * We do not need to handle this as we are rename()ing the directory into
2661 : * place only after we fsync()ed the state file.
2662 : */
2663 234 : if (fd < 0)
2664 0 : ereport(PANIC,
2665 : (errcode_for_file_access(),
2666 : errmsg("could not open file \"%s\": %m", path)));
2667 :
2668 : /*
2669 : * Sync state file before we're reading from it. We might have crashed
2670 : * while it wasn't synced yet and we shouldn't continue on that basis.
2671 : */
2672 234 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_RESTORE_SYNC);
2673 234 : if (pg_fsync(fd) != 0)
2674 0 : ereport(PANIC,
2675 : (errcode_for_file_access(),
2676 : errmsg("could not fsync file \"%s\": %m",
2677 : path)));
2678 234 : pgstat_report_wait_end();
2679 :
2680 : /* Also sync the parent directory */
2681 234 : START_CRIT_SECTION();
2682 234 : fsync_fname(slotdir, true);
2683 234 : END_CRIT_SECTION();
2684 :
2685 : /* read part of statefile that's guaranteed to be version independent */
2686 234 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
2687 234 : readBytes = read(fd, &cp, ReplicationSlotOnDiskConstantSize);
2688 234 : pgstat_report_wait_end();
2689 234 : if (readBytes != ReplicationSlotOnDiskConstantSize)
2690 : {
2691 0 : if (readBytes < 0)
2692 0 : ereport(PANIC,
2693 : (errcode_for_file_access(),
2694 : errmsg("could not read file \"%s\": %m", path)));
2695 : else
2696 0 : ereport(PANIC,
2697 : (errcode(ERRCODE_DATA_CORRUPTED),
2698 : errmsg("could not read file \"%s\": read %d of %zu",
2699 : path, readBytes,
2700 : (Size) ReplicationSlotOnDiskConstantSize)));
2701 : }
2702 :
2703 : /* verify magic */
2704 234 : if (cp.magic != SLOT_MAGIC)
2705 0 : ereport(PANIC,
2706 : (errcode(ERRCODE_DATA_CORRUPTED),
2707 : errmsg("replication slot file \"%s\" has wrong magic number: %u instead of %u",
2708 : path, cp.magic, SLOT_MAGIC)));
2709 :
2710 : /* verify version */
2711 234 : if (cp.version != SLOT_VERSION)
2712 0 : ereport(PANIC,
2713 : (errcode(ERRCODE_DATA_CORRUPTED),
2714 : errmsg("replication slot file \"%s\" has unsupported version %u",
2715 : path, cp.version)));
2716 :
2717 : /* boundary check on length */
2718 234 : if (cp.length != ReplicationSlotOnDiskV2Size)
2719 0 : ereport(PANIC,
2720 : (errcode(ERRCODE_DATA_CORRUPTED),
2721 : errmsg("replication slot file \"%s\" has corrupted length %u",
2722 : path, cp.length)));
2723 :
2724 : /* Now that we know the size, read the entire file */
2725 234 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
2726 468 : readBytes = read(fd,
2727 : (char *) &cp + ReplicationSlotOnDiskConstantSize,
2728 234 : cp.length);
2729 234 : pgstat_report_wait_end();
2730 234 : if (readBytes != cp.length)
2731 : {
2732 0 : if (readBytes < 0)
2733 0 : ereport(PANIC,
2734 : (errcode_for_file_access(),
2735 : errmsg("could not read file \"%s\": %m", path)));
2736 : else
2737 0 : ereport(PANIC,
2738 : (errcode(ERRCODE_DATA_CORRUPTED),
2739 : errmsg("could not read file \"%s\": read %d of %zu",
2740 : path, readBytes, (Size) cp.length)));
2741 : }
2742 :
2743 234 : if (CloseTransientFile(fd) != 0)
2744 0 : ereport(PANIC,
2745 : (errcode_for_file_access(),
2746 : errmsg("could not close file \"%s\": %m", path)));
2747 :
2748 : /* now verify the CRC */
2749 234 : INIT_CRC32C(checksum);
2750 234 : COMP_CRC32C(checksum,
2751 : (char *) &cp + ReplicationSlotOnDiskNotChecksummedSize,
2752 : ReplicationSlotOnDiskChecksummedSize);
2753 234 : FIN_CRC32C(checksum);
2754 :
2755 234 : if (!EQ_CRC32C(checksum, cp.checksum))
2756 0 : ereport(PANIC,
2757 : (errmsg("checksum mismatch for replication slot file \"%s\": is %u, should be %u",
2758 : path, checksum, cp.checksum)));
2759 :
2760 : /*
2761 : * If we crashed with an ephemeral slot active, don't restore but delete
2762 : * it.
2763 : */
2764 234 : if (cp.slotdata.persistency != RS_PERSISTENT)
2765 : {
2766 0 : if (!rmtree(slotdir, true))
2767 : {
2768 0 : ereport(WARNING,
2769 : (errmsg("could not remove directory \"%s\"",
2770 : slotdir)));
2771 : }
2772 0 : fsync_fname(PG_REPLSLOT_DIR, true);
2773 0 : return;
2774 : }
2775 :
2776 : /*
2777 : * Verify that requirements for the specific slot type are met. That's
2778 : * important because if these aren't met we're not guaranteed to retain
2779 : * all the necessary resources for the slot.
2780 : *
2781 : * NB: We have to do so *after* the above checks for ephemeral slots,
2782 : * because otherwise a slot that shouldn't exist anymore could prevent
2783 : * restarts.
2784 : *
2785 : * NB: Changing the requirements here also requires adapting
2786 : * CheckSlotRequirements() and CheckLogicalDecodingRequirements().
2787 : */
2788 234 : if (cp.slotdata.database != InvalidOid)
2789 : {
2790 158 : if (wal_level < WAL_LEVEL_REPLICA)
2791 2 : ereport(FATAL,
2792 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2793 : errmsg("logical replication slot \"%s\" exists, but \"wal_level\" < \"replica\"",
2794 : NameStr(cp.slotdata.name)),
2795 : errhint("Change \"wal_level\" to be \"replica\" or higher.")));
2796 :
2797 : /*
2798 : * In standby mode, the hot standby must be enabled. This check is
2799 : * necessary to ensure logical slots are invalidated when they become
2800 : * incompatible due to insufficient wal_level. Otherwise, if the
2801 : * primary reduces effective_wal_level < logical while hot standby is
2802 : * disabled, primary disable logical decoding while hot standby is
2803 : * disabled, logical slots would remain valid even after promotion.
2804 : */
2805 156 : if (StandbyMode && !EnableHotStandby)
2806 2 : ereport(FATAL,
2807 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2808 : errmsg("logical replication slot \"%s\" exists on the standby, but \"hot_standby\" = \"off\"",
2809 : NameStr(cp.slotdata.name)),
2810 : errhint("Change \"hot_standby\" to be \"on\".")));
2811 : }
2812 76 : else if (wal_level < WAL_LEVEL_REPLICA)
2813 0 : ereport(FATAL,
2814 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2815 : errmsg("physical replication slot \"%s\" exists, but \"wal_level\" < \"replica\"",
2816 : NameStr(cp.slotdata.name)),
2817 : errhint("Change \"wal_level\" to be \"replica\" or higher.")));
2818 :
2819 : /* nothing can be active yet, don't lock anything */
2820 328 : for (i = 0; i < max_replication_slots; i++)
2821 : {
2822 : ReplicationSlot *slot;
2823 :
2824 328 : slot = &ReplicationSlotCtl->replication_slots[i];
2825 :
2826 328 : if (slot->in_use)
2827 98 : continue;
2828 :
2829 : /* restore the entire set of persistent data */
2830 230 : memcpy(&slot->data, &cp.slotdata,
2831 : sizeof(ReplicationSlotPersistentData));
2832 :
2833 : /* initialize in memory state */
2834 230 : slot->effective_xmin = cp.slotdata.xmin;
2835 230 : slot->effective_catalog_xmin = cp.slotdata.catalog_xmin;
2836 230 : slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
2837 230 : slot->last_saved_restart_lsn = cp.slotdata.restart_lsn;
2838 :
2839 230 : slot->candidate_catalog_xmin = InvalidTransactionId;
2840 230 : slot->candidate_xmin_lsn = InvalidXLogRecPtr;
2841 230 : slot->candidate_restart_lsn = InvalidXLogRecPtr;
2842 230 : slot->candidate_restart_valid = InvalidXLogRecPtr;
2843 :
2844 230 : slot->in_use = true;
2845 230 : slot->active_pid = 0;
2846 :
2847 : /*
2848 : * Set the time since the slot has become inactive after loading the
2849 : * slot from the disk into memory. Whoever acquires the slot i.e.
2850 : * makes the slot active will reset it. Use the same inactive_since
2851 : * time for all the slots.
2852 : */
2853 230 : if (now == 0)
2854 230 : now = GetCurrentTimestamp();
2855 :
2856 230 : ReplicationSlotSetInactiveSince(slot, now, false);
2857 :
2858 230 : restored = true;
2859 230 : break;
2860 : }
2861 :
2862 230 : if (!restored)
2863 0 : ereport(FATAL,
2864 : (errmsg("too many replication slots active before shutdown"),
2865 : errhint("Increase \"max_replication_slots\" and try again.")));
2866 : }
2867 :
2868 : /*
2869 : * Maps an invalidation reason for a replication slot to
2870 : * ReplicationSlotInvalidationCause.
2871 : */
2872 : ReplicationSlotInvalidationCause
2873 0 : GetSlotInvalidationCause(const char *cause_name)
2874 : {
2875 : Assert(cause_name);
2876 :
2877 : /* Search lookup table for the cause having this name */
2878 0 : for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
2879 : {
2880 0 : if (strcmp(SlotInvalidationCauses[i].cause_name, cause_name) == 0)
2881 0 : return SlotInvalidationCauses[i].cause;
2882 : }
2883 :
2884 : Assert(false);
2885 0 : return RS_INVAL_NONE; /* to keep compiler quiet */
2886 : }
2887 :
2888 : /*
2889 : * Maps an ReplicationSlotInvalidationCause to the invalidation
2890 : * reason for a replication slot.
2891 : */
2892 : const char *
2893 84 : GetSlotInvalidationCauseName(ReplicationSlotInvalidationCause cause)
2894 : {
2895 : /* Search lookup table for the name of this cause */
2896 262 : for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
2897 : {
2898 262 : if (SlotInvalidationCauses[i].cause == cause)
2899 84 : return SlotInvalidationCauses[i].cause_name;
2900 : }
2901 :
2902 : Assert(false);
2903 0 : return "none"; /* to keep compiler quiet */
2904 : }
2905 :
2906 : /*
2907 : * A helper function to validate slots specified in GUC synchronized_standby_slots.
2908 : *
2909 : * The rawname will be parsed, and the result will be saved into *elemlist.
2910 : */
2911 : static bool
2912 40 : validate_sync_standby_slots(char *rawname, List **elemlist)
2913 : {
2914 : /* Verify syntax and parse string into a list of identifiers */
2915 40 : if (!SplitIdentifierString(rawname, ',', elemlist))
2916 : {
2917 0 : GUC_check_errdetail("List syntax is invalid.");
2918 0 : return false;
2919 : }
2920 :
2921 : /* Iterate the list to validate each slot name */
2922 112 : foreach_ptr(char, name, *elemlist)
2923 : {
2924 : int err_code;
2925 40 : char *err_msg = NULL;
2926 40 : char *err_hint = NULL;
2927 :
2928 40 : if (!ReplicationSlotValidateNameInternal(name, false, &err_code,
2929 : &err_msg, &err_hint))
2930 : {
2931 4 : GUC_check_errcode(err_code);
2932 4 : GUC_check_errdetail("%s", err_msg);
2933 4 : if (err_hint != NULL)
2934 4 : GUC_check_errhint("%s", err_hint);
2935 4 : return false;
2936 : }
2937 : }
2938 :
2939 36 : return true;
2940 : }
2941 :
2942 : /*
2943 : * GUC check_hook for synchronized_standby_slots
2944 : */
2945 : bool
2946 2408 : check_synchronized_standby_slots(char **newval, void **extra, GucSource source)
2947 : {
2948 : char *rawname;
2949 : char *ptr;
2950 : List *elemlist;
2951 : int size;
2952 : bool ok;
2953 : SyncStandbySlotsConfigData *config;
2954 :
2955 2408 : if ((*newval)[0] == '\0')
2956 2368 : return true;
2957 :
2958 : /* Need a modifiable copy of the GUC string */
2959 40 : rawname = pstrdup(*newval);
2960 :
2961 : /* Now verify if the specified slots exist and have correct type */
2962 40 : ok = validate_sync_standby_slots(rawname, &elemlist);
2963 :
2964 40 : if (!ok || elemlist == NIL)
2965 : {
2966 4 : pfree(rawname);
2967 4 : list_free(elemlist);
2968 4 : return ok;
2969 : }
2970 :
2971 : /* Compute the size required for the SyncStandbySlotsConfigData struct */
2972 36 : size = offsetof(SyncStandbySlotsConfigData, slot_names);
2973 108 : foreach_ptr(char, slot_name, elemlist)
2974 36 : size += strlen(slot_name) + 1;
2975 :
2976 : /* GUC extra value must be guc_malloc'd, not palloc'd */
2977 36 : config = (SyncStandbySlotsConfigData *) guc_malloc(LOG, size);
2978 36 : if (!config)
2979 0 : return false;
2980 :
2981 : /* Transform the data into SyncStandbySlotsConfigData */
2982 36 : config->nslotnames = list_length(elemlist);
2983 :
2984 36 : ptr = config->slot_names;
2985 108 : foreach_ptr(char, slot_name, elemlist)
2986 : {
2987 36 : strcpy(ptr, slot_name);
2988 36 : ptr += strlen(slot_name) + 1;
2989 : }
2990 :
2991 36 : *extra = config;
2992 :
2993 36 : pfree(rawname);
2994 36 : list_free(elemlist);
2995 36 : return true;
2996 : }
2997 :
2998 : /*
2999 : * GUC assign_hook for synchronized_standby_slots
3000 : */
3001 : void
3002 2402 : assign_synchronized_standby_slots(const char *newval, void *extra)
3003 : {
3004 : /*
3005 : * The standby slots may have changed, so we must recompute the oldest
3006 : * LSN.
3007 : */
3008 2402 : ss_oldest_flush_lsn = InvalidXLogRecPtr;
3009 :
3010 2402 : synchronized_standby_slots_config = (SyncStandbySlotsConfigData *) extra;
3011 2402 : }
3012 :
3013 : /*
3014 : * Check if the passed slot_name is specified in the synchronized_standby_slots GUC.
3015 : */
3016 : bool
3017 56586 : SlotExistsInSyncStandbySlots(const char *slot_name)
3018 : {
3019 : const char *standby_slot_name;
3020 :
3021 : /* Return false if there is no value in synchronized_standby_slots */
3022 56586 : if (synchronized_standby_slots_config == NULL)
3023 56558 : return false;
3024 :
3025 : /*
3026 : * XXX: We are not expecting this list to be long so a linear search
3027 : * shouldn't hurt but if that turns out not to be true then we can cache
3028 : * this information for each WalSender as well.
3029 : */
3030 28 : standby_slot_name = synchronized_standby_slots_config->slot_names;
3031 44 : for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
3032 : {
3033 28 : if (strcmp(standby_slot_name, slot_name) == 0)
3034 12 : return true;
3035 :
3036 16 : standby_slot_name += strlen(standby_slot_name) + 1;
3037 : }
3038 :
3039 16 : return false;
3040 : }
3041 :
3042 : /*
3043 : * Return true if the slots specified in synchronized_standby_slots have caught up to
3044 : * the given WAL location, false otherwise.
3045 : *
3046 : * The elevel parameter specifies the error level used for logging messages
3047 : * related to slots that do not exist, are invalidated, or are inactive.
3048 : */
3049 : bool
3050 1330 : StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
3051 : {
3052 : const char *name;
3053 1330 : int caught_up_slot_num = 0;
3054 1330 : XLogRecPtr min_restart_lsn = InvalidXLogRecPtr;
3055 :
3056 : /*
3057 : * Don't need to wait for the standbys to catch up if there is no value in
3058 : * synchronized_standby_slots.
3059 : */
3060 1330 : if (synchronized_standby_slots_config == NULL)
3061 1288 : return true;
3062 :
3063 : /*
3064 : * Don't need to wait for the standbys to catch up if we are on a standby
3065 : * server, since we do not support syncing slots to cascading standbys.
3066 : */
3067 42 : if (RecoveryInProgress())
3068 0 : return true;
3069 :
3070 : /*
3071 : * Don't need to wait for the standbys to catch up if they are already
3072 : * beyond the specified WAL location.
3073 : */
3074 42 : if (XLogRecPtrIsValid(ss_oldest_flush_lsn) &&
3075 22 : ss_oldest_flush_lsn >= wait_for_lsn)
3076 14 : return true;
3077 :
3078 : /*
3079 : * To prevent concurrent slot dropping and creation while filtering the
3080 : * slots, take the ReplicationSlotControlLock outside of the loop.
3081 : */
3082 28 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
3083 :
3084 28 : name = synchronized_standby_slots_config->slot_names;
3085 40 : for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
3086 : {
3087 : XLogRecPtr restart_lsn;
3088 : bool invalidated;
3089 : bool inactive;
3090 : ReplicationSlot *slot;
3091 :
3092 28 : slot = SearchNamedReplicationSlot(name, false);
3093 :
3094 : /*
3095 : * If a slot name provided in synchronized_standby_slots does not
3096 : * exist, report a message and exit the loop.
3097 : */
3098 28 : if (!slot)
3099 : {
3100 0 : ereport(elevel,
3101 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
3102 : errmsg("replication slot \"%s\" specified in parameter \"%s\" does not exist",
3103 : name, "synchronized_standby_slots"),
3104 : errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
3105 : name),
3106 : errhint("Create the replication slot \"%s\" or amend parameter \"%s\".",
3107 : name, "synchronized_standby_slots"));
3108 0 : break;
3109 : }
3110 :
3111 : /* Same as above: if a slot is not physical, exit the loop. */
3112 28 : if (SlotIsLogical(slot))
3113 : {
3114 0 : ereport(elevel,
3115 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
3116 : errmsg("cannot specify logical replication slot \"%s\" in parameter \"%s\"",
3117 : name, "synchronized_standby_slots"),
3118 : errdetail("Logical replication is waiting for correction on replication slot \"%s\".",
3119 : name),
3120 : errhint("Remove the logical replication slot \"%s\" from parameter \"%s\".",
3121 : name, "synchronized_standby_slots"));
3122 0 : break;
3123 : }
3124 :
3125 28 : SpinLockAcquire(&slot->mutex);
3126 28 : restart_lsn = slot->data.restart_lsn;
3127 28 : invalidated = slot->data.invalidated != RS_INVAL_NONE;
3128 28 : inactive = slot->active_pid == 0;
3129 28 : SpinLockRelease(&slot->mutex);
3130 :
3131 28 : if (invalidated)
3132 : {
3133 : /* Specified physical slot has been invalidated */
3134 0 : ereport(elevel,
3135 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
3136 : errmsg("physical replication slot \"%s\" specified in parameter \"%s\" has been invalidated",
3137 : name, "synchronized_standby_slots"),
3138 : errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
3139 : name),
3140 : errhint("Drop and recreate the replication slot \"%s\", or amend parameter \"%s\".",
3141 : name, "synchronized_standby_slots"));
3142 0 : break;
3143 : }
3144 :
3145 28 : if (!XLogRecPtrIsValid(restart_lsn) || restart_lsn < wait_for_lsn)
3146 : {
3147 : /* Log a message if no active_pid for this physical slot */
3148 16 : if (inactive)
3149 12 : ereport(elevel,
3150 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
3151 : errmsg("replication slot \"%s\" specified in parameter \"%s\" does not have active_pid",
3152 : name, "synchronized_standby_slots"),
3153 : errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
3154 : name),
3155 : errhint("Start the standby associated with the replication slot \"%s\", or amend parameter \"%s\".",
3156 : name, "synchronized_standby_slots"));
3157 :
3158 : /* Continue if the current slot hasn't caught up. */
3159 16 : break;
3160 : }
3161 :
3162 : Assert(restart_lsn >= wait_for_lsn);
3163 :
3164 12 : if (!XLogRecPtrIsValid(min_restart_lsn) ||
3165 : min_restart_lsn > restart_lsn)
3166 12 : min_restart_lsn = restart_lsn;
3167 :
3168 12 : caught_up_slot_num++;
3169 :
3170 12 : name += strlen(name) + 1;
3171 : }
3172 :
3173 28 : LWLockRelease(ReplicationSlotControlLock);
3174 :
3175 : /*
3176 : * Return false if not all the standbys have caught up to the specified
3177 : * WAL location.
3178 : */
3179 28 : if (caught_up_slot_num != synchronized_standby_slots_config->nslotnames)
3180 16 : return false;
3181 :
3182 : /* The ss_oldest_flush_lsn must not retreat. */
3183 : Assert(!XLogRecPtrIsValid(ss_oldest_flush_lsn) ||
3184 : min_restart_lsn >= ss_oldest_flush_lsn);
3185 :
3186 12 : ss_oldest_flush_lsn = min_restart_lsn;
3187 :
3188 12 : return true;
3189 : }
3190 :
3191 : /*
3192 : * Wait for physical standbys to confirm receiving the given lsn.
3193 : *
3194 : * Used by logical decoding SQL functions. It waits for physical standbys
3195 : * corresponding to the physical slots specified in the synchronized_standby_slots GUC.
3196 : */
3197 : void
3198 448 : WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
3199 : {
3200 : /*
3201 : * Don't need to wait for the standby to catch up if the current acquired
3202 : * slot is not a logical failover slot, or there is no value in
3203 : * synchronized_standby_slots.
3204 : */
3205 448 : if (!MyReplicationSlot->data.failover || !synchronized_standby_slots_config)
3206 446 : return;
3207 :
3208 2 : ConditionVariablePrepareToSleep(&WalSndCtl->wal_confirm_rcv_cv);
3209 :
3210 : for (;;)
3211 : {
3212 4 : CHECK_FOR_INTERRUPTS();
3213 :
3214 4 : if (ConfigReloadPending)
3215 : {
3216 2 : ConfigReloadPending = false;
3217 2 : ProcessConfigFile(PGC_SIGHUP);
3218 : }
3219 :
3220 : /* Exit if done waiting for every slot. */
3221 4 : if (StandbySlotsHaveCaughtup(wait_for_lsn, WARNING))
3222 2 : break;
3223 :
3224 : /*
3225 : * Wait for the slots in the synchronized_standby_slots to catch up,
3226 : * but use a timeout (1s) so we can also check if the
3227 : * synchronized_standby_slots has been changed.
3228 : */
3229 2 : ConditionVariableTimedSleep(&WalSndCtl->wal_confirm_rcv_cv, 1000,
3230 : WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION);
3231 : }
3232 :
3233 2 : ConditionVariableCancelSleep();
3234 : }
|