Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * slot.c
4 : * Replication slot management.
5 : *
6 : *
7 : * Copyright (c) 2012-2025, PostgreSQL Global Development Group
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/backend/replication/slot.c
12 : *
13 : * NOTES
14 : *
15 : * Replication slots are used to keep state about replication streams
16 : * originating from this cluster. Their primary purpose is to prevent the
17 : * premature removal of WAL or of old tuple versions in a manner that would
18 : * interfere with replication; they are also useful for monitoring purposes.
19 : * Slots need to be permanent (to allow restarts), crash-safe, and allocatable
20 : * on standbys (to support cascading setups). The requirement that slots be
21 : * usable on standbys precludes storing them in the system catalogs.
22 : *
23 : * Each replication slot gets its own directory inside the directory
24 : * $PGDATA / PG_REPLSLOT_DIR. Inside that directory the state file will
25 : * contain the slot's own data. Additional data can be stored alongside that
26 : * file if required. While the server is running, the state data is also
27 : * cached in memory for efficiency.
28 : *
29 : * ReplicationSlotAllocationLock must be taken in exclusive mode to allocate
30 : * or free a slot. ReplicationSlotControlLock must be taken in shared mode
31 : * to iterate over the slots, and in exclusive mode to change the in_use flag
32 : * of a slot. The remaining data in each slot is protected by its mutex.
33 : *
34 : *-------------------------------------------------------------------------
35 : */
36 :
37 : #include "postgres.h"
38 :
39 : #include <unistd.h>
40 : #include <sys/stat.h>
41 :
42 : #include "access/transam.h"
43 : #include "access/xlog_internal.h"
44 : #include "access/xlogrecovery.h"
45 : #include "common/file_utils.h"
46 : #include "common/string.h"
47 : #include "miscadmin.h"
48 : #include "pgstat.h"
49 : #include "postmaster/interrupt.h"
50 : #include "replication/logicallauncher.h"
51 : #include "replication/slotsync.h"
52 : #include "replication/slot.h"
53 : #include "replication/walsender_private.h"
54 : #include "storage/fd.h"
55 : #include "storage/ipc.h"
56 : #include "storage/proc.h"
57 : #include "storage/procarray.h"
58 : #include "utils/builtins.h"
59 : #include "utils/guc_hooks.h"
60 : #include "utils/injection_point.h"
61 : #include "utils/varlena.h"
62 :
63 : /*
64 : * Replication slot on-disk data structure.
65 : */
66 : typedef struct ReplicationSlotOnDisk
67 : {
68 : /* first part of this struct needs to be version independent */
69 :
70 : /* data not covered by checksum */
71 : uint32 magic;
72 : pg_crc32c checksum;
73 :
74 : /* data covered by checksum */
75 : uint32 version;
76 : uint32 length;
77 :
78 : /*
79 : * The actual data in the slot that follows can differ based on the above
80 : * 'version'.
81 : */
82 :
83 : ReplicationSlotPersistentData slotdata;
84 : } ReplicationSlotOnDisk;
85 :
86 : /*
87 : * Struct for the configuration of synchronized_standby_slots.
88 : *
89 : * Note: this must be a flat representation that can be held in a single chunk
90 : * of guc_malloc'd memory, so that it can be stored as the "extra" data for the
91 : * synchronized_standby_slots GUC.
92 : */
93 : typedef struct
94 : {
95 : /* Number of slot names in the slot_names[] */
96 : int nslotnames;
97 :
98 : /*
99 : * slot_names contains 'nslotnames' consecutive null-terminated C strings.
100 : */
101 : char slot_names[FLEXIBLE_ARRAY_MEMBER];
102 : } SyncStandbySlotsConfigData;
103 :
104 : /*
105 : * Lookup table for slot invalidation causes.
106 : */
107 : typedef struct SlotInvalidationCauseMap
108 : {
109 : ReplicationSlotInvalidationCause cause;
110 : const char *cause_name;
111 : } SlotInvalidationCauseMap;
112 :
113 : static const SlotInvalidationCauseMap SlotInvalidationCauses[] = {
114 : {RS_INVAL_NONE, "none"},
115 : {RS_INVAL_WAL_REMOVED, "wal_removed"},
116 : {RS_INVAL_HORIZON, "rows_removed"},
117 : {RS_INVAL_WAL_LEVEL, "wal_level_insufficient"},
118 : {RS_INVAL_IDLE_TIMEOUT, "idle_timeout"},
119 : };
120 :
121 : /*
122 : * Ensure that the lookup table is up-to-date with the enums defined in
123 : * ReplicationSlotInvalidationCause.
124 : */
125 : StaticAssertDecl(lengthof(SlotInvalidationCauses) == (RS_INVAL_MAX_CAUSES + 1),
126 : "array length mismatch");
127 :
128 : /* size of version independent data */
129 : #define ReplicationSlotOnDiskConstantSize \
130 : offsetof(ReplicationSlotOnDisk, slotdata)
131 : /* size of the part of the slot not covered by the checksum */
132 : #define ReplicationSlotOnDiskNotChecksummedSize \
133 : offsetof(ReplicationSlotOnDisk, version)
134 : /* size of the part covered by the checksum */
135 : #define ReplicationSlotOnDiskChecksummedSize \
136 : sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskNotChecksummedSize
137 : /* size of the slot data that is version dependent */
138 : #define ReplicationSlotOnDiskV2Size \
139 : sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
140 :
141 : #define SLOT_MAGIC 0x1051CA1 /* format identifier */
142 : #define SLOT_VERSION 5 /* version for new files */
143 :
144 : /* Control array for replication slot management */
145 : ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
146 :
147 : /* My backend's replication slot in the shared memory array */
148 : ReplicationSlot *MyReplicationSlot = NULL;
149 :
150 : /* GUC variables */
151 : int max_replication_slots = 10; /* the maximum number of replication
152 : * slots */
153 :
154 : /*
155 : * Invalidate replication slots that have remained idle longer than this
156 : * duration; '0' disables it.
157 : */
158 : int idle_replication_slot_timeout_secs = 0;
159 :
160 : /*
161 : * This GUC lists streaming replication standby server slot names that
162 : * logical WAL sender processes will wait for.
163 : */
164 : char *synchronized_standby_slots;
165 :
166 : /* This is the parsed and cached configuration for synchronized_standby_slots */
167 : static SyncStandbySlotsConfigData *synchronized_standby_slots_config;
168 :
169 : /*
170 : * Oldest LSN that has been confirmed to be flushed to the standbys
171 : * corresponding to the physical slots specified in the synchronized_standby_slots GUC.
172 : */
173 : static XLogRecPtr ss_oldest_flush_lsn = InvalidXLogRecPtr;
174 :
175 : static void ReplicationSlotShmemExit(int code, Datum arg);
176 : static bool IsSlotForConflictCheck(const char *name);
177 : static void ReplicationSlotDropPtr(ReplicationSlot *slot);
178 :
179 : /* internal persistency functions */
180 : static void RestoreSlotFromDisk(const char *name);
181 : static void CreateSlotOnDisk(ReplicationSlot *slot);
182 : static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel);
183 :
184 : /*
185 : * Report shared-memory space needed by ReplicationSlotsShmemInit.
186 : */
187 : Size
188 8488 : ReplicationSlotsShmemSize(void)
189 : {
190 8488 : Size size = 0;
191 :
192 8488 : if (max_replication_slots == 0)
193 4 : return size;
194 :
195 8484 : size = offsetof(ReplicationSlotCtlData, replication_slots);
196 8484 : size = add_size(size,
197 : mul_size(max_replication_slots, sizeof(ReplicationSlot)));
198 :
199 8484 : return size;
200 : }
201 :
202 : /*
203 : * Allocate and initialize shared memory for replication slots.
204 : */
205 : void
206 2198 : ReplicationSlotsShmemInit(void)
207 : {
208 : bool found;
209 :
210 2198 : if (max_replication_slots == 0)
211 2 : return;
212 :
213 2196 : ReplicationSlotCtl = (ReplicationSlotCtlData *)
214 2196 : ShmemInitStruct("ReplicationSlot Ctl", ReplicationSlotsShmemSize(),
215 : &found);
216 :
217 2196 : if (!found)
218 : {
219 : int i;
220 :
221 : /* First time through, so initialize */
222 3996 : MemSet(ReplicationSlotCtl, 0, ReplicationSlotsShmemSize());
223 :
224 23762 : for (i = 0; i < max_replication_slots; i++)
225 : {
226 21566 : ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i];
227 :
228 : /* everything else is zeroed by the memset above */
229 21566 : SpinLockInit(&slot->mutex);
230 21566 : LWLockInitialize(&slot->io_in_progress_lock,
231 : LWTRANCHE_REPLICATION_SLOT_IO);
232 21566 : ConditionVariableInit(&slot->active_cv);
233 : }
234 : }
235 : }
236 :
237 : /*
238 : * Register the callback for replication slot cleanup and releasing.
239 : */
240 : void
241 44696 : ReplicationSlotInitialize(void)
242 : {
243 44696 : before_shmem_exit(ReplicationSlotShmemExit, 0);
244 44696 : }
245 :
246 : /*
247 : * Release and cleanup replication slots.
248 : */
249 : static void
250 44696 : ReplicationSlotShmemExit(int code, Datum arg)
251 : {
252 : /* Make sure active replication slots are released */
253 44696 : if (MyReplicationSlot != NULL)
254 504 : ReplicationSlotRelease();
255 :
256 : /* Also cleanup all the temporary slots. */
257 44696 : ReplicationSlotCleanup(false);
258 44696 : }
259 :
260 : /*
261 : * Check whether the passed slot name is valid and report errors at elevel.
262 : *
263 : * See comments for ReplicationSlotValidateNameInternal().
264 : */
265 : bool
266 1564 : ReplicationSlotValidateName(const char *name, bool allow_reserved_name,
267 : int elevel)
268 : {
269 : int err_code;
270 1564 : char *err_msg = NULL;
271 1564 : char *err_hint = NULL;
272 :
273 1564 : if (!ReplicationSlotValidateNameInternal(name, allow_reserved_name,
274 : &err_code, &err_msg, &err_hint))
275 : {
276 : /*
277 : * Use errmsg_internal() and errhint_internal() instead of errmsg()
278 : * and errhint(), since the messages from
279 : * ReplicationSlotValidateNameInternal() are already translated. This
280 : * avoids double translation.
281 : */
282 8 : ereport(elevel,
283 : errcode(err_code),
284 : errmsg_internal("%s", err_msg),
285 : (err_hint != NULL) ? errhint_internal("%s", err_hint) : 0);
286 :
287 0 : pfree(err_msg);
288 0 : if (err_hint != NULL)
289 0 : pfree(err_hint);
290 0 : return false;
291 : }
292 :
293 1556 : return true;
294 : }
295 :
296 : /*
297 : * Check whether the passed slot name is valid.
298 : *
299 : * An error will be reported for a reserved replication slot name if
300 : * allow_reserved_name is set to false.
301 : *
302 : * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow
303 : * the name to be used as a directory name on every supported OS.
304 : *
305 : * Returns true if the slot name is valid. Otherwise, returns false and stores
306 : * the error code, error message, and optional hint in err_code, err_msg, and
307 : * err_hint, respectively. The caller is responsible for freeing err_msg and
308 : * err_hint, which are palloc'd.
309 : */
310 : bool
311 1980 : ReplicationSlotValidateNameInternal(const char *name, bool allow_reserved_name,
312 : int *err_code, char **err_msg, char **err_hint)
313 : {
314 : const char *cp;
315 :
316 1980 : if (strlen(name) == 0)
317 : {
318 6 : *err_code = ERRCODE_INVALID_NAME;
319 6 : *err_msg = psprintf(_("replication slot name \"%s\" is too short"), name);
320 6 : *err_hint = NULL;
321 6 : return false;
322 : }
323 :
324 1974 : if (strlen(name) >= NAMEDATALEN)
325 : {
326 0 : *err_code = ERRCODE_NAME_TOO_LONG;
327 0 : *err_msg = psprintf(_("replication slot name \"%s\" is too long"), name);
328 0 : *err_hint = NULL;
329 0 : return false;
330 : }
331 :
332 38404 : for (cp = name; *cp; cp++)
333 : {
334 36434 : if (!((*cp >= 'a' && *cp <= 'z')
335 17240 : || (*cp >= '0' && *cp <= '9')
336 3534 : || (*cp == '_')))
337 : {
338 4 : *err_code = ERRCODE_INVALID_NAME;
339 4 : *err_msg = psprintf(_("replication slot name \"%s\" contains invalid character"), name);
340 4 : *err_hint = psprintf(_("Replication slot names may only contain lower case letters, numbers, and the underscore character."));
341 4 : return false;
342 : }
343 : }
344 :
345 1970 : if (!allow_reserved_name && IsSlotForConflictCheck(name))
346 : {
347 2 : *err_code = ERRCODE_RESERVED_NAME;
348 2 : *err_msg = psprintf(_("replication slot name \"%s\" is reserved"), name);
349 2 : *err_hint = psprintf(_("The name \"%s\" is reserved for the conflict detection slot."),
350 : CONFLICT_DETECTION_SLOT);
351 2 : return false;
352 : }
353 :
354 1968 : return true;
355 : }
356 :
357 : /*
358 : * Return true if the replication slot name is "pg_conflict_detection".
359 : */
360 : static bool
361 4262 : IsSlotForConflictCheck(const char *name)
362 : {
363 4262 : return (strcmp(name, CONFLICT_DETECTION_SLOT) == 0);
364 : }
365 :
366 : /*
367 : * Create a new replication slot and mark it as used by this backend.
368 : *
369 : * name: Name of the slot
370 : * db_specific: logical decoding is db specific; if the slot is going to
371 : * be used for that pass true, otherwise false.
372 : * two_phase: Allows decoding of prepared transactions. We allow this option
373 : * to be enabled only at the slot creation time. If we allow this option
374 : * to be changed during decoding then it is quite possible that we skip
375 : * prepare first time because this option was not enabled. Now next time
376 : * during getting changes, if the two_phase option is enabled it can skip
377 : * prepare because by that time start decoding point has been moved. So the
378 : * user will only get commit prepared.
379 : * failover: If enabled, allows the slot to be synced to standbys so
380 : * that logical replication can be resumed after failover.
381 : * synced: True if the slot is synchronized from the primary server.
382 : */
383 : void
384 1310 : ReplicationSlotCreate(const char *name, bool db_specific,
385 : ReplicationSlotPersistency persistency,
386 : bool two_phase, bool failover, bool synced)
387 : {
388 1310 : ReplicationSlot *slot = NULL;
389 : int i;
390 :
391 : Assert(MyReplicationSlot == NULL);
392 :
393 : /*
394 : * The logical launcher or pg_upgrade may create or migrate an internal
395 : * slot, so using a reserved name is allowed in these cases.
396 : */
397 1310 : ReplicationSlotValidateName(name, IsBinaryUpgrade || IsLogicalLauncher(),
398 : ERROR);
399 :
400 1308 : if (failover)
401 : {
402 : /*
403 : * Do not allow users to create the failover enabled slots on the
404 : * standby as we do not support sync to the cascading standby.
405 : *
406 : * However, failover enabled slots can be created during slot
407 : * synchronization because we need to retain the same values as the
408 : * remote slot.
409 : */
410 44 : if (RecoveryInProgress() && !IsSyncingReplicationSlots())
411 0 : ereport(ERROR,
412 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
413 : errmsg("cannot enable failover for a replication slot created on the standby"));
414 :
415 : /*
416 : * Do not allow users to create failover enabled temporary slots,
417 : * because temporary slots will not be synced to the standby.
418 : *
419 : * However, failover enabled temporary slots can be created during
420 : * slot synchronization. See the comments atop slotsync.c for details.
421 : */
422 44 : if (persistency == RS_TEMPORARY && !IsSyncingReplicationSlots())
423 2 : ereport(ERROR,
424 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
425 : errmsg("cannot enable failover for a temporary replication slot"));
426 : }
427 :
428 : /*
429 : * If some other backend ran this code concurrently with us, we'd likely
430 : * both allocate the same slot, and that would be bad. We'd also be at
431 : * risk of missing a name collision. Also, we don't want to try to create
432 : * a new slot while somebody's busy cleaning up an old one, because we
433 : * might both be monkeying with the same directory.
434 : */
435 1306 : LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
436 :
437 : /*
438 : * Check for name collision, and identify an allocatable slot. We need to
439 : * hold ReplicationSlotControlLock in shared mode for this, so that nobody
440 : * else can change the in_use flags while we're looking at them.
441 : */
442 1306 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
443 12560 : for (i = 0; i < max_replication_slots; i++)
444 : {
445 11260 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
446 :
447 11260 : if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
448 6 : ereport(ERROR,
449 : (errcode(ERRCODE_DUPLICATE_OBJECT),
450 : errmsg("replication slot \"%s\" already exists", name)));
451 11254 : if (!s->in_use && slot == NULL)
452 1298 : slot = s;
453 : }
454 1300 : LWLockRelease(ReplicationSlotControlLock);
455 :
456 : /* If all slots are in use, we're out of luck. */
457 1300 : if (slot == NULL)
458 2 : ereport(ERROR,
459 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
460 : errmsg("all replication slots are in use"),
461 : errhint("Free one or increase \"max_replication_slots\".")));
462 :
463 : /*
464 : * Since this slot is not in use, nobody should be looking at any part of
465 : * it other than the in_use field unless they're trying to allocate it.
466 : * And since we hold ReplicationSlotAllocationLock, nobody except us can
467 : * be doing that. So it's safe to initialize the slot.
468 : */
469 : Assert(!slot->in_use);
470 : Assert(slot->active_pid == 0);
471 :
472 : /* first initialize persistent data */
473 1298 : memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData));
474 1298 : namestrcpy(&slot->data.name, name);
475 1298 : slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
476 1298 : slot->data.persistency = persistency;
477 1298 : slot->data.two_phase = two_phase;
478 1298 : slot->data.two_phase_at = InvalidXLogRecPtr;
479 1298 : slot->data.failover = failover;
480 1298 : slot->data.synced = synced;
481 :
482 : /* and then data only present in shared memory */
483 1298 : slot->just_dirtied = false;
484 1298 : slot->dirty = false;
485 1298 : slot->effective_xmin = InvalidTransactionId;
486 1298 : slot->effective_catalog_xmin = InvalidTransactionId;
487 1298 : slot->candidate_catalog_xmin = InvalidTransactionId;
488 1298 : slot->candidate_xmin_lsn = InvalidXLogRecPtr;
489 1298 : slot->candidate_restart_valid = InvalidXLogRecPtr;
490 1298 : slot->candidate_restart_lsn = InvalidXLogRecPtr;
491 1298 : slot->last_saved_confirmed_flush = InvalidXLogRecPtr;
492 1298 : slot->last_saved_restart_lsn = InvalidXLogRecPtr;
493 1298 : slot->inactive_since = 0;
494 :
495 : /*
496 : * Create the slot on disk. We haven't actually marked the slot allocated
497 : * yet, so no special cleanup is required if this errors out.
498 : */
499 1298 : CreateSlotOnDisk(slot);
500 :
501 : /*
502 : * We need to briefly prevent any other backend from iterating over the
503 : * slots while we flip the in_use flag. We also need to set the active
504 : * flag while holding the ControlLock as otherwise a concurrent
505 : * ReplicationSlotAcquire() could acquire the slot as well.
506 : */
507 1298 : LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
508 :
509 1298 : slot->in_use = true;
510 :
511 : /* We can now mark the slot active, and that makes it our slot. */
512 1298 : SpinLockAcquire(&slot->mutex);
513 : Assert(slot->active_pid == 0);
514 1298 : slot->active_pid = MyProcPid;
515 1298 : SpinLockRelease(&slot->mutex);
516 1298 : MyReplicationSlot = slot;
517 :
518 1298 : LWLockRelease(ReplicationSlotControlLock);
519 :
520 : /*
521 : * Create statistics entry for the new logical slot. We don't collect any
522 : * stats for physical slots, so no need to create an entry for the same.
523 : * See ReplicationSlotDropPtr for why we need to do this before releasing
524 : * ReplicationSlotAllocationLock.
525 : */
526 1298 : if (SlotIsLogical(slot))
527 930 : pgstat_create_replslot(slot);
528 :
529 : /*
530 : * Now that the slot has been marked as in_use and active, it's safe to
531 : * let somebody else try to allocate a slot.
532 : */
533 1298 : LWLockRelease(ReplicationSlotAllocationLock);
534 :
535 : /* Let everybody know we've modified this slot */
536 1298 : ConditionVariableBroadcast(&slot->active_cv);
537 1298 : }
538 :
539 : /*
540 : * Search for the named replication slot.
541 : *
542 : * Return the replication slot if found, otherwise NULL.
543 : */
544 : ReplicationSlot *
545 3776 : SearchNamedReplicationSlot(const char *name, bool need_lock)
546 : {
547 : int i;
548 3776 : ReplicationSlot *slot = NULL;
549 :
550 3776 : if (need_lock)
551 1080 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
552 :
553 14454 : for (i = 0; i < max_replication_slots; i++)
554 : {
555 13548 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
556 :
557 13548 : if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
558 : {
559 2870 : slot = s;
560 2870 : break;
561 : }
562 : }
563 :
564 3776 : if (need_lock)
565 1080 : LWLockRelease(ReplicationSlotControlLock);
566 :
567 3776 : return slot;
568 : }
569 :
570 : /*
571 : * Return the index of the replication slot in
572 : * ReplicationSlotCtl->replication_slots.
573 : *
574 : * This is mainly useful to have an efficient key for storing replication slot
575 : * stats.
576 : */
577 : int
578 16434 : ReplicationSlotIndex(ReplicationSlot *slot)
579 : {
580 : Assert(slot >= ReplicationSlotCtl->replication_slots &&
581 : slot < ReplicationSlotCtl->replication_slots + max_replication_slots);
582 :
583 16434 : return slot - ReplicationSlotCtl->replication_slots;
584 : }
585 :
586 : /*
587 : * If the slot at 'index' is unused, return false. Otherwise 'name' is set to
588 : * the slot's name and true is returned.
589 : *
590 : * This likely is only useful for pgstat_replslot.c during shutdown, in other
591 : * cases there are obvious TOCTOU issues.
592 : */
593 : bool
594 190 : ReplicationSlotName(int index, Name name)
595 : {
596 : ReplicationSlot *slot;
597 : bool found;
598 :
599 190 : slot = &ReplicationSlotCtl->replication_slots[index];
600 :
601 : /*
602 : * Ensure that the slot cannot be dropped while we copy the name. Don't
603 : * need the spinlock as the name of an existing slot cannot change.
604 : */
605 190 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
606 190 : found = slot->in_use;
607 190 : if (slot->in_use)
608 190 : namestrcpy(name, NameStr(slot->data.name));
609 190 : LWLockRelease(ReplicationSlotControlLock);
610 :
611 190 : return found;
612 : }
613 :
614 : /*
615 : * Find a previously created slot and mark it as used by this process.
616 : *
617 : * An error is raised if nowait is true and the slot is currently in use. If
618 : * nowait is false, we sleep until the slot is released by the owning process.
619 : *
620 : * An error is raised if error_if_invalid is true and the slot is found to
621 : * be invalid. It should always be set to true, except when we are temporarily
622 : * acquiring the slot and don't intend to change it.
623 : */
624 : void
625 2544 : ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
626 : {
627 : ReplicationSlot *s;
628 : int active_pid;
629 :
630 : Assert(name != NULL);
631 :
632 2544 : retry:
633 : Assert(MyReplicationSlot == NULL);
634 :
635 2544 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
636 :
637 : /* Check if the slot exits with the given name. */
638 2544 : s = SearchNamedReplicationSlot(name, false);
639 2544 : if (s == NULL || !s->in_use)
640 : {
641 20 : LWLockRelease(ReplicationSlotControlLock);
642 :
643 20 : ereport(ERROR,
644 : (errcode(ERRCODE_UNDEFINED_OBJECT),
645 : errmsg("replication slot \"%s\" does not exist",
646 : name)));
647 : }
648 :
649 : /*
650 : * Do not allow users to acquire the reserved slot. This scenario may
651 : * occur if the launcher that owns the slot has terminated unexpectedly
652 : * due to an error, and a backend process attempts to reuse the slot.
653 : */
654 2524 : if (!IsLogicalLauncher() && IsSlotForConflictCheck(name))
655 0 : ereport(ERROR,
656 : errcode(ERRCODE_UNDEFINED_OBJECT),
657 : errmsg("cannot acquire replication slot \"%s\"", name),
658 : errdetail("The slot is reserved for conflict detection and can only be acquired by logical replication launcher."));
659 :
660 : /*
661 : * This is the slot we want; check if it's active under some other
662 : * process. In single user mode, we don't need this check.
663 : */
664 2524 : if (IsUnderPostmaster)
665 : {
666 : /*
667 : * Get ready to sleep on the slot in case it is active. (We may end
668 : * up not sleeping, but we don't want to do this while holding the
669 : * spinlock.)
670 : */
671 2514 : if (!nowait)
672 538 : ConditionVariablePrepareToSleep(&s->active_cv);
673 :
674 : /*
675 : * It is important to reset the inactive_since under spinlock here to
676 : * avoid race conditions with slot invalidation. See comments related
677 : * to inactive_since in InvalidatePossiblyObsoleteSlot.
678 : */
679 2514 : SpinLockAcquire(&s->mutex);
680 2514 : if (s->active_pid == 0)
681 2234 : s->active_pid = MyProcPid;
682 2514 : active_pid = s->active_pid;
683 2514 : ReplicationSlotSetInactiveSince(s, 0, false);
684 2514 : SpinLockRelease(&s->mutex);
685 : }
686 : else
687 : {
688 10 : s->active_pid = active_pid = MyProcPid;
689 10 : ReplicationSlotSetInactiveSince(s, 0, true);
690 : }
691 2524 : LWLockRelease(ReplicationSlotControlLock);
692 :
693 : /*
694 : * If we found the slot but it's already active in another process, we
695 : * wait until the owning process signals us that it's been released, or
696 : * error out.
697 : */
698 2524 : if (active_pid != MyProcPid)
699 : {
700 0 : if (!nowait)
701 : {
702 : /* Wait here until we get signaled, and then restart */
703 0 : ConditionVariableSleep(&s->active_cv,
704 : WAIT_EVENT_REPLICATION_SLOT_DROP);
705 0 : ConditionVariableCancelSleep();
706 0 : goto retry;
707 : }
708 :
709 0 : ereport(ERROR,
710 : (errcode(ERRCODE_OBJECT_IN_USE),
711 : errmsg("replication slot \"%s\" is active for PID %d",
712 : NameStr(s->data.name), active_pid)));
713 : }
714 2524 : else if (!nowait)
715 538 : ConditionVariableCancelSleep(); /* no sleep needed after all */
716 :
717 : /* We made this slot active, so it's ours now. */
718 2524 : MyReplicationSlot = s;
719 :
720 : /*
721 : * We need to check for invalidation after making the slot ours to avoid
722 : * the possible race condition with the checkpointer that can otherwise
723 : * invalidate the slot immediately after the check.
724 : */
725 2524 : if (error_if_invalid && s->data.invalidated != RS_INVAL_NONE)
726 14 : ereport(ERROR,
727 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
728 : errmsg("can no longer access replication slot \"%s\"",
729 : NameStr(s->data.name)),
730 : errdetail("This replication slot has been invalidated due to \"%s\".",
731 : GetSlotInvalidationCauseName(s->data.invalidated)));
732 :
733 : /* Let everybody know we've modified this slot */
734 2510 : ConditionVariableBroadcast(&s->active_cv);
735 :
736 : /*
737 : * The call to pgstat_acquire_replslot() protects against stats for a
738 : * different slot, from before a restart or such, being present during
739 : * pgstat_report_replslot().
740 : */
741 2510 : if (SlotIsLogical(s))
742 2096 : pgstat_acquire_replslot(s);
743 :
744 :
745 2510 : if (am_walsender)
746 : {
747 1706 : ereport(log_replication_commands ? LOG : DEBUG1,
748 : SlotIsLogical(s)
749 : ? errmsg("acquired logical replication slot \"%s\"",
750 : NameStr(s->data.name))
751 : : errmsg("acquired physical replication slot \"%s\"",
752 : NameStr(s->data.name)));
753 : }
754 2510 : }
755 :
756 : /*
757 : * Release the replication slot that this backend considers to own.
758 : *
759 : * This or another backend can re-acquire the slot later.
760 : * Resources this slot requires will be preserved.
761 : */
762 : void
763 3052 : ReplicationSlotRelease(void)
764 : {
765 3052 : ReplicationSlot *slot = MyReplicationSlot;
766 3052 : char *slotname = NULL; /* keep compiler quiet */
767 3052 : bool is_logical = false; /* keep compiler quiet */
768 3052 : TimestampTz now = 0;
769 :
770 : Assert(slot != NULL && slot->active_pid != 0);
771 :
772 3052 : if (am_walsender)
773 : {
774 2128 : slotname = pstrdup(NameStr(slot->data.name));
775 2128 : is_logical = SlotIsLogical(slot);
776 : }
777 :
778 3052 : if (slot->data.persistency == RS_EPHEMERAL)
779 : {
780 : /*
781 : * Delete the slot. There is no !PANIC case where this is allowed to
782 : * fail, all that may happen is an incomplete cleanup of the on-disk
783 : * data.
784 : */
785 10 : ReplicationSlotDropAcquired();
786 : }
787 :
788 : /*
789 : * If slot needed to temporarily restrain both data and catalog xmin to
790 : * create the catalog snapshot, remove that temporary constraint.
791 : * Snapshots can only be exported while the initial snapshot is still
792 : * acquired.
793 : */
794 3052 : if (!TransactionIdIsValid(slot->data.xmin) &&
795 2992 : TransactionIdIsValid(slot->effective_xmin))
796 : {
797 392 : SpinLockAcquire(&slot->mutex);
798 392 : slot->effective_xmin = InvalidTransactionId;
799 392 : SpinLockRelease(&slot->mutex);
800 392 : ReplicationSlotsComputeRequiredXmin(false);
801 : }
802 :
803 : /*
804 : * Set the time since the slot has become inactive. We get the current
805 : * time beforehand to avoid system call while holding the spinlock.
806 : */
807 3052 : now = GetCurrentTimestamp();
808 :
809 3052 : if (slot->data.persistency == RS_PERSISTENT)
810 : {
811 : /*
812 : * Mark persistent slot inactive. We're not freeing it, just
813 : * disconnecting, but wake up others that may be waiting for it.
814 : */
815 2476 : SpinLockAcquire(&slot->mutex);
816 2476 : slot->active_pid = 0;
817 2476 : ReplicationSlotSetInactiveSince(slot, now, false);
818 2476 : SpinLockRelease(&slot->mutex);
819 2476 : ConditionVariableBroadcast(&slot->active_cv);
820 : }
821 : else
822 576 : ReplicationSlotSetInactiveSince(slot, now, true);
823 :
824 3052 : MyReplicationSlot = NULL;
825 :
826 : /* might not have been set when we've been a plain slot */
827 3052 : LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
828 3052 : MyProc->statusFlags &= ~PROC_IN_LOGICAL_DECODING;
829 3052 : ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags;
830 3052 : LWLockRelease(ProcArrayLock);
831 :
832 3052 : if (am_walsender)
833 : {
834 2128 : ereport(log_replication_commands ? LOG : DEBUG1,
835 : is_logical
836 : ? errmsg("released logical replication slot \"%s\"",
837 : slotname)
838 : : errmsg("released physical replication slot \"%s\"",
839 : slotname));
840 :
841 2128 : pfree(slotname);
842 : }
843 3052 : }
844 :
845 : /*
846 : * Cleanup temporary slots created in current session.
847 : *
848 : * Cleanup only synced temporary slots if 'synced_only' is true, else
849 : * cleanup all temporary slots.
850 : */
851 : void
852 88872 : ReplicationSlotCleanup(bool synced_only)
853 : {
854 : int i;
855 :
856 : Assert(MyReplicationSlot == NULL);
857 :
858 89158 : restart:
859 89158 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
860 966228 : for (i = 0; i < max_replication_slots; i++)
861 : {
862 877356 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
863 :
864 877356 : if (!s->in_use)
865 849074 : continue;
866 :
867 28282 : SpinLockAcquire(&s->mutex);
868 28282 : if ((s->active_pid == MyProcPid &&
869 286 : (!synced_only || s->data.synced)))
870 : {
871 : Assert(s->data.persistency == RS_TEMPORARY);
872 286 : SpinLockRelease(&s->mutex);
873 286 : LWLockRelease(ReplicationSlotControlLock); /* avoid deadlock */
874 :
875 286 : ReplicationSlotDropPtr(s);
876 :
877 286 : ConditionVariableBroadcast(&s->active_cv);
878 286 : goto restart;
879 : }
880 : else
881 27996 : SpinLockRelease(&s->mutex);
882 : }
883 :
884 88872 : LWLockRelease(ReplicationSlotControlLock);
885 88872 : }
886 :
887 : /*
888 : * Permanently drop replication slot identified by the passed in name.
889 : */
890 : void
891 810 : ReplicationSlotDrop(const char *name, bool nowait)
892 : {
893 : Assert(MyReplicationSlot == NULL);
894 :
895 810 : ReplicationSlotAcquire(name, nowait, false);
896 :
897 : /*
898 : * Do not allow users to drop the slots which are currently being synced
899 : * from the primary to the standby.
900 : */
901 794 : if (RecoveryInProgress() && MyReplicationSlot->data.synced)
902 2 : ereport(ERROR,
903 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
904 : errmsg("cannot drop replication slot \"%s\"", name),
905 : errdetail("This replication slot is being synchronized from the primary server."));
906 :
907 792 : ReplicationSlotDropAcquired();
908 792 : }
909 :
910 : /*
911 : * Change the definition of the slot identified by the specified name.
912 : */
913 : void
914 14 : ReplicationSlotAlter(const char *name, const bool *failover,
915 : const bool *two_phase)
916 : {
917 14 : bool update_slot = false;
918 :
919 : Assert(MyReplicationSlot == NULL);
920 : Assert(failover || two_phase);
921 :
922 14 : ReplicationSlotAcquire(name, false, true);
923 :
924 12 : if (SlotIsPhysical(MyReplicationSlot))
925 0 : ereport(ERROR,
926 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
927 : errmsg("cannot use %s with a physical replication slot",
928 : "ALTER_REPLICATION_SLOT"));
929 :
930 12 : if (RecoveryInProgress())
931 : {
932 : /*
933 : * Do not allow users to alter the slots which are currently being
934 : * synced from the primary to the standby.
935 : */
936 2 : if (MyReplicationSlot->data.synced)
937 2 : ereport(ERROR,
938 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
939 : errmsg("cannot alter replication slot \"%s\"", name),
940 : errdetail("This replication slot is being synchronized from the primary server."));
941 :
942 : /*
943 : * Do not allow users to enable failover on the standby as we do not
944 : * support sync to the cascading standby.
945 : */
946 0 : if (failover && *failover)
947 0 : ereport(ERROR,
948 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
949 : errmsg("cannot enable failover for a replication slot"
950 : " on the standby"));
951 : }
952 :
953 10 : if (failover)
954 : {
955 : /*
956 : * Do not allow users to enable failover for temporary slots as we do
957 : * not support syncing temporary slots to the standby.
958 : */
959 8 : if (*failover && MyReplicationSlot->data.persistency == RS_TEMPORARY)
960 0 : ereport(ERROR,
961 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
962 : errmsg("cannot enable failover for a temporary replication slot"));
963 :
964 8 : if (MyReplicationSlot->data.failover != *failover)
965 : {
966 8 : SpinLockAcquire(&MyReplicationSlot->mutex);
967 8 : MyReplicationSlot->data.failover = *failover;
968 8 : SpinLockRelease(&MyReplicationSlot->mutex);
969 :
970 8 : update_slot = true;
971 : }
972 : }
973 :
974 10 : if (two_phase && MyReplicationSlot->data.two_phase != *two_phase)
975 : {
976 2 : SpinLockAcquire(&MyReplicationSlot->mutex);
977 2 : MyReplicationSlot->data.two_phase = *two_phase;
978 2 : SpinLockRelease(&MyReplicationSlot->mutex);
979 :
980 2 : update_slot = true;
981 : }
982 :
983 10 : if (update_slot)
984 : {
985 10 : ReplicationSlotMarkDirty();
986 10 : ReplicationSlotSave();
987 : }
988 :
989 10 : ReplicationSlotRelease();
990 10 : }
991 :
992 : /*
993 : * Permanently drop the currently acquired replication slot.
994 : */
995 : void
996 818 : ReplicationSlotDropAcquired(void)
997 : {
998 818 : ReplicationSlot *slot = MyReplicationSlot;
999 :
1000 : Assert(MyReplicationSlot != NULL);
1001 :
1002 : /* slot isn't acquired anymore */
1003 818 : MyReplicationSlot = NULL;
1004 :
1005 818 : ReplicationSlotDropPtr(slot);
1006 818 : }
1007 :
1008 : /*
1009 : * Permanently drop the replication slot which will be released by the point
1010 : * this function returns.
1011 : */
1012 : static void
1013 1104 : ReplicationSlotDropPtr(ReplicationSlot *slot)
1014 : {
1015 : char path[MAXPGPATH];
1016 : char tmppath[MAXPGPATH];
1017 :
1018 : /*
1019 : * If some other backend ran this code concurrently with us, we might try
1020 : * to delete a slot with a certain name while someone else was trying to
1021 : * create a slot with the same name.
1022 : */
1023 1104 : LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
1024 :
1025 : /* Generate pathnames. */
1026 1104 : sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(slot->data.name));
1027 1104 : sprintf(tmppath, "%s/%s.tmp", PG_REPLSLOT_DIR, NameStr(slot->data.name));
1028 :
1029 : /*
1030 : * Rename the slot directory on disk, so that we'll no longer recognize
1031 : * this as a valid slot. Note that if this fails, we've got to mark the
1032 : * slot inactive before bailing out. If we're dropping an ephemeral or a
1033 : * temporary slot, we better never fail hard as the caller won't expect
1034 : * the slot to survive and this might get called during error handling.
1035 : */
1036 1104 : if (rename(path, tmppath) == 0)
1037 : {
1038 : /*
1039 : * We need to fsync() the directory we just renamed and its parent to
1040 : * make sure that our changes are on disk in a crash-safe fashion. If
1041 : * fsync() fails, we can't be sure whether the changes are on disk or
1042 : * not. For now, we handle that by panicking;
1043 : * StartupReplicationSlots() will try to straighten it out after
1044 : * restart.
1045 : */
1046 1104 : START_CRIT_SECTION();
1047 1104 : fsync_fname(tmppath, true);
1048 1104 : fsync_fname(PG_REPLSLOT_DIR, true);
1049 1104 : END_CRIT_SECTION();
1050 : }
1051 : else
1052 : {
1053 0 : bool fail_softly = slot->data.persistency != RS_PERSISTENT;
1054 :
1055 0 : SpinLockAcquire(&slot->mutex);
1056 0 : slot->active_pid = 0;
1057 0 : SpinLockRelease(&slot->mutex);
1058 :
1059 : /* wake up anyone waiting on this slot */
1060 0 : ConditionVariableBroadcast(&slot->active_cv);
1061 :
1062 0 : ereport(fail_softly ? WARNING : ERROR,
1063 : (errcode_for_file_access(),
1064 : errmsg("could not rename file \"%s\" to \"%s\": %m",
1065 : path, tmppath)));
1066 : }
1067 :
1068 : /*
1069 : * The slot is definitely gone. Lock out concurrent scans of the array
1070 : * long enough to kill it. It's OK to clear the active PID here without
1071 : * grabbing the mutex because nobody else can be scanning the array here,
1072 : * and nobody can be attached to this slot and thus access it without
1073 : * scanning the array.
1074 : *
1075 : * Also wake up processes waiting for it.
1076 : */
1077 1104 : LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
1078 1104 : slot->active_pid = 0;
1079 1104 : slot->in_use = false;
1080 1104 : LWLockRelease(ReplicationSlotControlLock);
1081 1104 : ConditionVariableBroadcast(&slot->active_cv);
1082 :
1083 : /*
1084 : * Slot is dead and doesn't prevent resource removal anymore, recompute
1085 : * limits.
1086 : */
1087 1104 : ReplicationSlotsComputeRequiredXmin(false);
1088 1104 : ReplicationSlotsComputeRequiredLSN();
1089 :
1090 : /*
1091 : * If removing the directory fails, the worst thing that will happen is
1092 : * that the user won't be able to create a new slot with the same name
1093 : * until the next server restart. We warn about it, but that's all.
1094 : */
1095 1104 : if (!rmtree(tmppath, true))
1096 0 : ereport(WARNING,
1097 : (errmsg("could not remove directory \"%s\"", tmppath)));
1098 :
1099 : /*
1100 : * Drop the statistics entry for the replication slot. Do this while
1101 : * holding ReplicationSlotAllocationLock so that we don't drop a
1102 : * statistics entry for another slot with the same name just created in
1103 : * another session.
1104 : */
1105 1104 : if (SlotIsLogical(slot))
1106 796 : pgstat_drop_replslot(slot);
1107 :
1108 : /*
1109 : * We release this at the very end, so that nobody starts trying to create
1110 : * a slot while we're still cleaning up the detritus of the old one.
1111 : */
1112 1104 : LWLockRelease(ReplicationSlotAllocationLock);
1113 1104 : }
1114 :
1115 : /*
1116 : * Serialize the currently acquired slot's state from memory to disk, thereby
1117 : * guaranteeing the current state will survive a crash.
1118 : */
1119 : void
1120 2684 : ReplicationSlotSave(void)
1121 : {
1122 : char path[MAXPGPATH];
1123 :
1124 : Assert(MyReplicationSlot != NULL);
1125 :
1126 2684 : sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(MyReplicationSlot->data.name));
1127 2684 : SaveSlotToPath(MyReplicationSlot, path, ERROR);
1128 2684 : }
1129 :
1130 : /*
1131 : * Signal that it would be useful if the currently acquired slot would be
1132 : * flushed out to disk.
1133 : *
1134 : * Note that the actual flush to disk can be delayed for a long time, if
1135 : * required for correctness explicitly do a ReplicationSlotSave().
1136 : */
1137 : void
1138 45380 : ReplicationSlotMarkDirty(void)
1139 : {
1140 45380 : ReplicationSlot *slot = MyReplicationSlot;
1141 :
1142 : Assert(MyReplicationSlot != NULL);
1143 :
1144 45380 : SpinLockAcquire(&slot->mutex);
1145 45380 : MyReplicationSlot->just_dirtied = true;
1146 45380 : MyReplicationSlot->dirty = true;
1147 45380 : SpinLockRelease(&slot->mutex);
1148 45380 : }
1149 :
1150 : /*
1151 : * Convert a slot that's marked as RS_EPHEMERAL or RS_TEMPORARY to a
1152 : * RS_PERSISTENT slot, guaranteeing it will be there after an eventual crash.
1153 : */
1154 : void
1155 904 : ReplicationSlotPersist(void)
1156 : {
1157 904 : ReplicationSlot *slot = MyReplicationSlot;
1158 :
1159 : Assert(slot != NULL);
1160 : Assert(slot->data.persistency != RS_PERSISTENT);
1161 :
1162 904 : SpinLockAcquire(&slot->mutex);
1163 904 : slot->data.persistency = RS_PERSISTENT;
1164 904 : SpinLockRelease(&slot->mutex);
1165 :
1166 904 : ReplicationSlotMarkDirty();
1167 904 : ReplicationSlotSave();
1168 904 : }
1169 :
1170 : /*
1171 : * Compute the oldest xmin across all slots and store it in the ProcArray.
1172 : *
1173 : * If already_locked is true, ProcArrayLock has already been acquired
1174 : * exclusively.
1175 : */
1176 : void
1177 4684 : ReplicationSlotsComputeRequiredXmin(bool already_locked)
1178 : {
1179 : int i;
1180 4684 : TransactionId agg_xmin = InvalidTransactionId;
1181 4684 : TransactionId agg_catalog_xmin = InvalidTransactionId;
1182 :
1183 : Assert(ReplicationSlotCtl != NULL);
1184 :
1185 4684 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1186 :
1187 47416 : for (i = 0; i < max_replication_slots; i++)
1188 : {
1189 42732 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
1190 : TransactionId effective_xmin;
1191 : TransactionId effective_catalog_xmin;
1192 : bool invalidated;
1193 :
1194 42732 : if (!s->in_use)
1195 38298 : continue;
1196 :
1197 4434 : SpinLockAcquire(&s->mutex);
1198 4434 : effective_xmin = s->effective_xmin;
1199 4434 : effective_catalog_xmin = s->effective_catalog_xmin;
1200 4434 : invalidated = s->data.invalidated != RS_INVAL_NONE;
1201 4434 : SpinLockRelease(&s->mutex);
1202 :
1203 : /* invalidated slots need not apply */
1204 4434 : if (invalidated)
1205 44 : continue;
1206 :
1207 : /* check the data xmin */
1208 4390 : if (TransactionIdIsValid(effective_xmin) &&
1209 38 : (!TransactionIdIsValid(agg_xmin) ||
1210 38 : TransactionIdPrecedes(effective_xmin, agg_xmin)))
1211 682 : agg_xmin = effective_xmin;
1212 :
1213 : /* check the catalog xmin */
1214 4390 : if (TransactionIdIsValid(effective_catalog_xmin) &&
1215 1838 : (!TransactionIdIsValid(agg_catalog_xmin) ||
1216 1838 : TransactionIdPrecedes(effective_catalog_xmin, agg_catalog_xmin)))
1217 2300 : agg_catalog_xmin = effective_catalog_xmin;
1218 : }
1219 :
1220 4684 : LWLockRelease(ReplicationSlotControlLock);
1221 :
1222 4684 : ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
1223 4684 : }
1224 :
1225 : /*
1226 : * Compute the oldest restart LSN across all slots and inform xlog module.
1227 : *
1228 : * Note: while max_slot_wal_keep_size is theoretically relevant for this
1229 : * purpose, we don't try to account for that, because this module doesn't
1230 : * know what to compare against.
1231 : */
1232 : void
1233 46722 : ReplicationSlotsComputeRequiredLSN(void)
1234 : {
1235 : int i;
1236 46722 : XLogRecPtr min_required = InvalidXLogRecPtr;
1237 :
1238 : Assert(ReplicationSlotCtl != NULL);
1239 :
1240 46722 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1241 506484 : for (i = 0; i < max_replication_slots; i++)
1242 : {
1243 459762 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
1244 : XLogRecPtr restart_lsn;
1245 : XLogRecPtr last_saved_restart_lsn;
1246 : bool invalidated;
1247 : ReplicationSlotPersistency persistency;
1248 :
1249 459762 : if (!s->in_use)
1250 413088 : continue;
1251 :
1252 46674 : SpinLockAcquire(&s->mutex);
1253 46674 : persistency = s->data.persistency;
1254 46674 : restart_lsn = s->data.restart_lsn;
1255 46674 : invalidated = s->data.invalidated != RS_INVAL_NONE;
1256 46674 : last_saved_restart_lsn = s->last_saved_restart_lsn;
1257 46674 : SpinLockRelease(&s->mutex);
1258 :
1259 : /* invalidated slots need not apply */
1260 46674 : if (invalidated)
1261 46 : continue;
1262 :
1263 : /*
1264 : * For persistent slot use last_saved_restart_lsn to compute the
1265 : * oldest LSN for removal of WAL segments. The segments between
1266 : * last_saved_restart_lsn and restart_lsn might be needed by a
1267 : * persistent slot in the case of database crash. Non-persistent
1268 : * slots can't survive the database crash, so we don't care about
1269 : * last_saved_restart_lsn for them.
1270 : */
1271 46628 : if (persistency == RS_PERSISTENT)
1272 : {
1273 44904 : if (last_saved_restart_lsn != InvalidXLogRecPtr &&
1274 : restart_lsn > last_saved_restart_lsn)
1275 : {
1276 41092 : restart_lsn = last_saved_restart_lsn;
1277 : }
1278 : }
1279 :
1280 46628 : if (restart_lsn != InvalidXLogRecPtr &&
1281 2052 : (min_required == InvalidXLogRecPtr ||
1282 : restart_lsn < min_required))
1283 44698 : min_required = restart_lsn;
1284 : }
1285 46722 : LWLockRelease(ReplicationSlotControlLock);
1286 :
1287 46722 : XLogSetReplicationSlotMinimumLSN(min_required);
1288 46722 : }
1289 :
1290 : /*
1291 : * Compute the oldest WAL LSN required by *logical* decoding slots..
1292 : *
1293 : * Returns InvalidXLogRecPtr if logical decoding is disabled or no logical
1294 : * slots exist.
1295 : *
1296 : * NB: this returns a value >= ReplicationSlotsComputeRequiredLSN(), since it
1297 : * ignores physical replication slots.
1298 : *
1299 : * The results aren't required frequently, so we don't maintain a precomputed
1300 : * value like we do for ComputeRequiredLSN() and ComputeRequiredXmin().
1301 : */
1302 : XLogRecPtr
1303 6912 : ReplicationSlotsComputeLogicalRestartLSN(void)
1304 : {
1305 6912 : XLogRecPtr result = InvalidXLogRecPtr;
1306 : int i;
1307 :
1308 6912 : if (max_replication_slots <= 0)
1309 4 : return InvalidXLogRecPtr;
1310 :
1311 6908 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1312 :
1313 74896 : for (i = 0; i < max_replication_slots; i++)
1314 : {
1315 : ReplicationSlot *s;
1316 : XLogRecPtr restart_lsn;
1317 : XLogRecPtr last_saved_restart_lsn;
1318 : bool invalidated;
1319 : ReplicationSlotPersistency persistency;
1320 :
1321 67988 : s = &ReplicationSlotCtl->replication_slots[i];
1322 :
1323 : /* cannot change while ReplicationSlotCtlLock is held */
1324 67988 : if (!s->in_use)
1325 66496 : continue;
1326 :
1327 : /* we're only interested in logical slots */
1328 1492 : if (!SlotIsLogical(s))
1329 1064 : continue;
1330 :
1331 : /* read once, it's ok if it increases while we're checking */
1332 428 : SpinLockAcquire(&s->mutex);
1333 428 : persistency = s->data.persistency;
1334 428 : restart_lsn = s->data.restart_lsn;
1335 428 : invalidated = s->data.invalidated != RS_INVAL_NONE;
1336 428 : last_saved_restart_lsn = s->last_saved_restart_lsn;
1337 428 : SpinLockRelease(&s->mutex);
1338 :
1339 : /* invalidated slots need not apply */
1340 428 : if (invalidated)
1341 8 : continue;
1342 :
1343 : /*
1344 : * For persistent slot use last_saved_restart_lsn to compute the
1345 : * oldest LSN for removal of WAL segments. The segments between
1346 : * last_saved_restart_lsn and restart_lsn might be needed by a
1347 : * persistent slot in the case of database crash. Non-persistent
1348 : * slots can't survive the database crash, so we don't care about
1349 : * last_saved_restart_lsn for them.
1350 : */
1351 420 : if (persistency == RS_PERSISTENT)
1352 : {
1353 416 : if (last_saved_restart_lsn != InvalidXLogRecPtr &&
1354 : restart_lsn > last_saved_restart_lsn)
1355 : {
1356 0 : restart_lsn = last_saved_restart_lsn;
1357 : }
1358 : }
1359 :
1360 420 : if (restart_lsn == InvalidXLogRecPtr)
1361 0 : continue;
1362 :
1363 420 : if (result == InvalidXLogRecPtr ||
1364 : restart_lsn < result)
1365 328 : result = restart_lsn;
1366 : }
1367 :
1368 6908 : LWLockRelease(ReplicationSlotControlLock);
1369 :
1370 6908 : return result;
1371 : }
1372 :
1373 : /*
1374 : * ReplicationSlotsCountDBSlots -- count the number of slots that refer to the
1375 : * passed database oid.
1376 : *
1377 : * Returns true if there are any slots referencing the database. *nslots will
1378 : * be set to the absolute number of slots in the database, *nactive to ones
1379 : * currently active.
1380 : */
1381 : bool
1382 92 : ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
1383 : {
1384 : int i;
1385 :
1386 92 : *nslots = *nactive = 0;
1387 :
1388 92 : if (max_replication_slots <= 0)
1389 0 : return false;
1390 :
1391 92 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1392 954 : for (i = 0; i < max_replication_slots; i++)
1393 : {
1394 : ReplicationSlot *s;
1395 :
1396 862 : s = &ReplicationSlotCtl->replication_slots[i];
1397 :
1398 : /* cannot change while ReplicationSlotCtlLock is held */
1399 862 : if (!s->in_use)
1400 820 : continue;
1401 :
1402 : /* only logical slots are database specific, skip */
1403 42 : if (!SlotIsLogical(s))
1404 20 : continue;
1405 :
1406 : /* not our database, skip */
1407 22 : if (s->data.database != dboid)
1408 16 : continue;
1409 :
1410 : /* NB: intentionally counting invalidated slots */
1411 :
1412 : /* count slots with spinlock held */
1413 6 : SpinLockAcquire(&s->mutex);
1414 6 : (*nslots)++;
1415 6 : if (s->active_pid != 0)
1416 2 : (*nactive)++;
1417 6 : SpinLockRelease(&s->mutex);
1418 : }
1419 92 : LWLockRelease(ReplicationSlotControlLock);
1420 :
1421 92 : if (*nslots > 0)
1422 6 : return true;
1423 86 : return false;
1424 : }
1425 :
1426 : /*
1427 : * ReplicationSlotsDropDBSlots -- Drop all db-specific slots relating to the
1428 : * passed database oid. The caller should hold an exclusive lock on the
1429 : * pg_database oid for the database to prevent creation of new slots on the db
1430 : * or replay from existing slots.
1431 : *
1432 : * Another session that concurrently acquires an existing slot on the target DB
1433 : * (most likely to drop it) may cause this function to ERROR. If that happens
1434 : * it may have dropped some but not all slots.
1435 : *
1436 : * This routine isn't as efficient as it could be - but we don't drop
1437 : * databases often, especially databases with lots of slots.
1438 : */
1439 : void
1440 118 : ReplicationSlotsDropDBSlots(Oid dboid)
1441 : {
1442 : int i;
1443 :
1444 118 : if (max_replication_slots <= 0)
1445 0 : return;
1446 :
1447 118 : restart:
1448 128 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1449 1234 : for (i = 0; i < max_replication_slots; i++)
1450 : {
1451 : ReplicationSlot *s;
1452 : char *slotname;
1453 : int active_pid;
1454 :
1455 1116 : s = &ReplicationSlotCtl->replication_slots[i];
1456 :
1457 : /* cannot change while ReplicationSlotCtlLock is held */
1458 1116 : if (!s->in_use)
1459 1056 : continue;
1460 :
1461 : /* only logical slots are database specific, skip */
1462 60 : if (!SlotIsLogical(s))
1463 22 : continue;
1464 :
1465 : /* not our database, skip */
1466 38 : if (s->data.database != dboid)
1467 28 : continue;
1468 :
1469 : /* NB: intentionally including invalidated slots */
1470 :
1471 : /* acquire slot, so ReplicationSlotDropAcquired can be reused */
1472 10 : SpinLockAcquire(&s->mutex);
1473 : /* can't change while ReplicationSlotControlLock is held */
1474 10 : slotname = NameStr(s->data.name);
1475 10 : active_pid = s->active_pid;
1476 10 : if (active_pid == 0)
1477 : {
1478 10 : MyReplicationSlot = s;
1479 10 : s->active_pid = MyProcPid;
1480 : }
1481 10 : SpinLockRelease(&s->mutex);
1482 :
1483 : /*
1484 : * Even though we hold an exclusive lock on the database object a
1485 : * logical slot for that DB can still be active, e.g. if it's
1486 : * concurrently being dropped by a backend connected to another DB.
1487 : *
1488 : * That's fairly unlikely in practice, so we'll just bail out.
1489 : *
1490 : * The slot sync worker holds a shared lock on the database before
1491 : * operating on synced logical slots to avoid conflict with the drop
1492 : * happening here. The persistent synced slots are thus safe but there
1493 : * is a possibility that the slot sync worker has created a temporary
1494 : * slot (which stays active even on release) and we are trying to drop
1495 : * that here. In practice, the chances of hitting this scenario are
1496 : * less as during slot synchronization, the temporary slot is
1497 : * immediately converted to persistent and thus is safe due to the
1498 : * shared lock taken on the database. So, we'll just bail out in such
1499 : * a case.
1500 : *
1501 : * XXX: We can consider shutting down the slot sync worker before
1502 : * trying to drop synced temporary slots here.
1503 : */
1504 10 : if (active_pid)
1505 0 : ereport(ERROR,
1506 : (errcode(ERRCODE_OBJECT_IN_USE),
1507 : errmsg("replication slot \"%s\" is active for PID %d",
1508 : slotname, active_pid)));
1509 :
1510 : /*
1511 : * To avoid duplicating ReplicationSlotDropAcquired() and to avoid
1512 : * holding ReplicationSlotControlLock over filesystem operations,
1513 : * release ReplicationSlotControlLock and use
1514 : * ReplicationSlotDropAcquired.
1515 : *
1516 : * As that means the set of slots could change, restart scan from the
1517 : * beginning each time we release the lock.
1518 : */
1519 10 : LWLockRelease(ReplicationSlotControlLock);
1520 10 : ReplicationSlotDropAcquired();
1521 10 : goto restart;
1522 : }
1523 118 : LWLockRelease(ReplicationSlotControlLock);
1524 : }
1525 :
1526 :
1527 : /*
1528 : * Check whether the server's configuration supports using replication
1529 : * slots.
1530 : */
1531 : void
1532 3460 : CheckSlotRequirements(void)
1533 : {
1534 : /*
1535 : * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
1536 : * needs the same check.
1537 : */
1538 :
1539 3460 : if (max_replication_slots == 0)
1540 0 : ereport(ERROR,
1541 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1542 : errmsg("replication slots can only be used if \"max_replication_slots\" > 0")));
1543 :
1544 3460 : if (wal_level < WAL_LEVEL_REPLICA)
1545 0 : ereport(ERROR,
1546 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1547 : errmsg("replication slots can only be used if \"wal_level\" >= \"replica\"")));
1548 3460 : }
1549 :
1550 : /*
1551 : * Check whether the user has privilege to use replication slots.
1552 : */
1553 : void
1554 1120 : CheckSlotPermissions(void)
1555 : {
1556 1120 : if (!has_rolreplication(GetUserId()))
1557 10 : ereport(ERROR,
1558 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1559 : errmsg("permission denied to use replication slots"),
1560 : errdetail("Only roles with the %s attribute may use replication slots.",
1561 : "REPLICATION")));
1562 1110 : }
1563 :
1564 : /*
1565 : * Reserve WAL for the currently active slot.
1566 : *
1567 : * Compute and set restart_lsn in a manner that's appropriate for the type of
1568 : * the slot and concurrency safe.
1569 : */
1570 : void
1571 1208 : ReplicationSlotReserveWal(void)
1572 : {
1573 1208 : ReplicationSlot *slot = MyReplicationSlot;
1574 :
1575 : Assert(slot != NULL);
1576 : Assert(slot->data.restart_lsn == InvalidXLogRecPtr);
1577 : Assert(slot->last_saved_restart_lsn == InvalidXLogRecPtr);
1578 :
1579 : /*
1580 : * The replication slot mechanism is used to prevent removal of required
1581 : * WAL. As there is no interlock between this routine and checkpoints, WAL
1582 : * segments could concurrently be removed when a now stale return value of
1583 : * ReplicationSlotsComputeRequiredLSN() is used. In the unlikely case that
1584 : * this happens we'll just retry.
1585 : */
1586 : while (true)
1587 0 : {
1588 : XLogSegNo segno;
1589 : XLogRecPtr restart_lsn;
1590 :
1591 : /*
1592 : * For logical slots log a standby snapshot and start logical decoding
1593 : * at exactly that position. That allows the slot to start up more
1594 : * quickly. But on a standby we cannot do WAL writes, so just use the
1595 : * replay pointer; effectively, an attempt to create a logical slot on
1596 : * standby will cause it to wait for an xl_running_xact record to be
1597 : * logged independently on the primary, so that a snapshot can be
1598 : * built using the record.
1599 : *
1600 : * None of this is needed (or indeed helpful) for physical slots as
1601 : * they'll start replay at the last logged checkpoint anyway. Instead
1602 : * return the location of the last redo LSN. While that slightly
1603 : * increases the chance that we have to retry, it's where a base
1604 : * backup has to start replay at.
1605 : */
1606 1208 : if (SlotIsPhysical(slot))
1607 306 : restart_lsn = GetRedoRecPtr();
1608 902 : else if (RecoveryInProgress())
1609 46 : restart_lsn = GetXLogReplayRecPtr(NULL);
1610 : else
1611 856 : restart_lsn = GetXLogInsertRecPtr();
1612 :
1613 1208 : SpinLockAcquire(&slot->mutex);
1614 1208 : slot->data.restart_lsn = restart_lsn;
1615 1208 : SpinLockRelease(&slot->mutex);
1616 :
1617 : /* prevent WAL removal as fast as possible */
1618 1208 : ReplicationSlotsComputeRequiredLSN();
1619 :
1620 : /*
1621 : * If all required WAL is still there, great, otherwise retry. The
1622 : * slot should prevent further removal of WAL, unless there's a
1623 : * concurrent ReplicationSlotsComputeRequiredLSN() after we've written
1624 : * the new restart_lsn above, so normally we should never need to loop
1625 : * more than twice.
1626 : */
1627 1208 : XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size);
1628 1208 : if (XLogGetLastRemovedSegno() < segno)
1629 1208 : break;
1630 : }
1631 :
1632 1208 : if (!RecoveryInProgress() && SlotIsLogical(slot))
1633 : {
1634 : XLogRecPtr flushptr;
1635 :
1636 : /* make sure we have enough information to start */
1637 856 : flushptr = LogStandbySnapshot();
1638 :
1639 : /* and make sure it's fsynced to disk */
1640 856 : XLogFlush(flushptr);
1641 : }
1642 1208 : }
1643 :
1644 : /*
1645 : * Report that replication slot needs to be invalidated
1646 : */
1647 : static void
1648 42 : ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
1649 : bool terminating,
1650 : int pid,
1651 : NameData slotname,
1652 : XLogRecPtr restart_lsn,
1653 : XLogRecPtr oldestLSN,
1654 : TransactionId snapshotConflictHorizon,
1655 : long slot_idle_seconds)
1656 : {
1657 : StringInfoData err_detail;
1658 : StringInfoData err_hint;
1659 :
1660 42 : initStringInfo(&err_detail);
1661 42 : initStringInfo(&err_hint);
1662 :
1663 42 : switch (cause)
1664 : {
1665 12 : case RS_INVAL_WAL_REMOVED:
1666 : {
1667 12 : uint64 ex = oldestLSN - restart_lsn;
1668 :
1669 12 : appendStringInfo(&err_detail,
1670 12 : ngettext("The slot's restart_lsn %X/%08X exceeds the limit by %" PRIu64 " byte.",
1671 : "The slot's restart_lsn %X/%08X exceeds the limit by %" PRIu64 " bytes.",
1672 : ex),
1673 12 : LSN_FORMAT_ARGS(restart_lsn),
1674 : ex);
1675 : /* translator: %s is a GUC variable name */
1676 12 : appendStringInfo(&err_hint, _("You might need to increase \"%s\"."),
1677 : "max_slot_wal_keep_size");
1678 12 : break;
1679 : }
1680 24 : case RS_INVAL_HORIZON:
1681 24 : appendStringInfo(&err_detail, _("The slot conflicted with xid horizon %u."),
1682 : snapshotConflictHorizon);
1683 24 : break;
1684 :
1685 6 : case RS_INVAL_WAL_LEVEL:
1686 6 : appendStringInfoString(&err_detail, _("Logical decoding on standby requires \"wal_level\" >= \"logical\" on the primary server."));
1687 6 : break;
1688 :
1689 0 : case RS_INVAL_IDLE_TIMEOUT:
1690 : {
1691 : /* translator: %s is a GUC variable name */
1692 0 : appendStringInfo(&err_detail, _("The slot's idle time of %lds exceeds the configured \"%s\" duration of %ds."),
1693 : slot_idle_seconds, "idle_replication_slot_timeout",
1694 : idle_replication_slot_timeout_secs);
1695 : /* translator: %s is a GUC variable name */
1696 0 : appendStringInfo(&err_hint, _("You might need to increase \"%s\"."),
1697 : "idle_replication_slot_timeout");
1698 0 : break;
1699 : }
1700 : case RS_INVAL_NONE:
1701 : pg_unreachable();
1702 : }
1703 :
1704 42 : ereport(LOG,
1705 : terminating ?
1706 : errmsg("terminating process %d to release replication slot \"%s\"",
1707 : pid, NameStr(slotname)) :
1708 : errmsg("invalidating obsolete replication slot \"%s\"",
1709 : NameStr(slotname)),
1710 : errdetail_internal("%s", err_detail.data),
1711 : err_hint.len ? errhint("%s", err_hint.data) : 0);
1712 :
1713 42 : pfree(err_detail.data);
1714 42 : pfree(err_hint.data);
1715 42 : }
1716 :
1717 : /*
1718 : * Can we invalidate an idle replication slot?
1719 : *
1720 : * Idle timeout invalidation is allowed only when:
1721 : *
1722 : * 1. Idle timeout is set
1723 : * 2. Slot has reserved WAL
1724 : * 3. Slot is inactive
1725 : * 4. The slot is not being synced from the primary while the server is in
1726 : * recovery. This is because synced slots are always considered to be
1727 : * inactive because they don't perform logical decoding to produce changes.
1728 : */
1729 : static inline bool
1730 708 : CanInvalidateIdleSlot(ReplicationSlot *s)
1731 : {
1732 708 : return (idle_replication_slot_timeout_secs != 0 &&
1733 0 : !XLogRecPtrIsInvalid(s->data.restart_lsn) &&
1734 708 : s->inactive_since > 0 &&
1735 0 : !(RecoveryInProgress() && s->data.synced));
1736 : }
1737 :
1738 : /*
1739 : * DetermineSlotInvalidationCause - Determine the cause for which a slot
1740 : * becomes invalid among the given possible causes.
1741 : *
1742 : * This function sequentially checks all possible invalidation causes and
1743 : * returns the first one for which the slot is eligible for invalidation.
1744 : */
1745 : static ReplicationSlotInvalidationCause
1746 774 : DetermineSlotInvalidationCause(uint32 possible_causes, ReplicationSlot *s,
1747 : XLogRecPtr oldestLSN, Oid dboid,
1748 : TransactionId snapshotConflictHorizon,
1749 : TimestampTz *inactive_since, TimestampTz now)
1750 : {
1751 : Assert(possible_causes != RS_INVAL_NONE);
1752 :
1753 774 : if (possible_causes & RS_INVAL_WAL_REMOVED)
1754 : {
1755 720 : XLogRecPtr restart_lsn = s->data.restart_lsn;
1756 :
1757 720 : if (restart_lsn != InvalidXLogRecPtr &&
1758 : restart_lsn < oldestLSN)
1759 12 : return RS_INVAL_WAL_REMOVED;
1760 : }
1761 :
1762 762 : if (possible_causes & RS_INVAL_HORIZON)
1763 : {
1764 : /* invalid DB oid signals a shared relation */
1765 48 : if (SlotIsLogical(s) &&
1766 38 : (dboid == InvalidOid || dboid == s->data.database))
1767 : {
1768 48 : TransactionId effective_xmin = s->effective_xmin;
1769 48 : TransactionId catalog_effective_xmin = s->effective_catalog_xmin;
1770 :
1771 48 : if (TransactionIdIsValid(effective_xmin) &&
1772 0 : TransactionIdPrecedesOrEquals(effective_xmin,
1773 : snapshotConflictHorizon))
1774 0 : return RS_INVAL_HORIZON;
1775 96 : else if (TransactionIdIsValid(catalog_effective_xmin) &&
1776 48 : TransactionIdPrecedesOrEquals(catalog_effective_xmin,
1777 : snapshotConflictHorizon))
1778 24 : return RS_INVAL_HORIZON;
1779 : }
1780 : }
1781 :
1782 738 : if (possible_causes & RS_INVAL_WAL_LEVEL)
1783 : {
1784 6 : if (SlotIsLogical(s))
1785 6 : return RS_INVAL_WAL_LEVEL;
1786 : }
1787 :
1788 732 : if (possible_causes & RS_INVAL_IDLE_TIMEOUT)
1789 : {
1790 : Assert(now > 0);
1791 :
1792 708 : if (CanInvalidateIdleSlot(s))
1793 : {
1794 : /*
1795 : * Simulate the invalidation due to idle_timeout to test the
1796 : * timeout behavior promptly, without waiting for it to trigger
1797 : * naturally.
1798 : */
1799 : #ifdef USE_INJECTION_POINTS
1800 0 : if (IS_INJECTION_POINT_ATTACHED("slot-timeout-inval"))
1801 : {
1802 0 : *inactive_since = 0; /* since the beginning of time */
1803 0 : return RS_INVAL_IDLE_TIMEOUT;
1804 : }
1805 : #endif
1806 :
1807 : /*
1808 : * Check if the slot needs to be invalidated due to
1809 : * idle_replication_slot_timeout GUC.
1810 : */
1811 0 : if (TimestampDifferenceExceedsSeconds(s->inactive_since, now,
1812 : idle_replication_slot_timeout_secs))
1813 : {
1814 0 : *inactive_since = s->inactive_since;
1815 0 : return RS_INVAL_IDLE_TIMEOUT;
1816 : }
1817 : }
1818 : }
1819 :
1820 732 : return RS_INVAL_NONE;
1821 : }
1822 :
1823 : /*
1824 : * Helper for InvalidateObsoleteReplicationSlots
1825 : *
1826 : * Acquires the given slot and mark it invalid, if necessary and possible.
1827 : *
1828 : * Returns whether ReplicationSlotControlLock was released in the interim (and
1829 : * in that case we're not holding the lock at return, otherwise we are).
1830 : *
1831 : * Sets *invalidated true if the slot was invalidated. (Untouched otherwise.)
1832 : *
1833 : * This is inherently racy, because we release the LWLock
1834 : * for syscalls, so caller must restart if we return true.
1835 : */
1836 : static bool
1837 838 : InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
1838 : ReplicationSlot *s,
1839 : XLogRecPtr oldestLSN,
1840 : Oid dboid, TransactionId snapshotConflictHorizon,
1841 : bool *invalidated)
1842 : {
1843 838 : int last_signaled_pid = 0;
1844 838 : bool released_lock = false;
1845 838 : TimestampTz inactive_since = 0;
1846 :
1847 : for (;;)
1848 14 : {
1849 : XLogRecPtr restart_lsn;
1850 : NameData slotname;
1851 852 : int active_pid = 0;
1852 852 : ReplicationSlotInvalidationCause invalidation_cause = RS_INVAL_NONE;
1853 852 : TimestampTz now = 0;
1854 852 : long slot_idle_secs = 0;
1855 :
1856 : Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
1857 :
1858 852 : if (!s->in_use)
1859 : {
1860 0 : if (released_lock)
1861 0 : LWLockRelease(ReplicationSlotControlLock);
1862 0 : break;
1863 : }
1864 :
1865 852 : if (possible_causes & RS_INVAL_IDLE_TIMEOUT)
1866 : {
1867 : /*
1868 : * Assign the current time here to avoid system call overhead
1869 : * while holding the spinlock in subsequent code.
1870 : */
1871 740 : now = GetCurrentTimestamp();
1872 : }
1873 :
1874 : /*
1875 : * Check if the slot needs to be invalidated. If it needs to be
1876 : * invalidated, and is not currently acquired, acquire it and mark it
1877 : * as having been invalidated. We do this with the spinlock held to
1878 : * avoid race conditions -- for example the restart_lsn could move
1879 : * forward, or the slot could be dropped.
1880 : */
1881 852 : SpinLockAcquire(&s->mutex);
1882 :
1883 852 : restart_lsn = s->data.restart_lsn;
1884 :
1885 : /* we do nothing if the slot is already invalid */
1886 852 : if (s->data.invalidated == RS_INVAL_NONE)
1887 774 : invalidation_cause = DetermineSlotInvalidationCause(possible_causes,
1888 : s, oldestLSN,
1889 : dboid,
1890 : snapshotConflictHorizon,
1891 : &inactive_since,
1892 : now);
1893 :
1894 : /* if there's no invalidation, we're done */
1895 852 : if (invalidation_cause == RS_INVAL_NONE)
1896 : {
1897 810 : SpinLockRelease(&s->mutex);
1898 810 : if (released_lock)
1899 0 : LWLockRelease(ReplicationSlotControlLock);
1900 810 : break;
1901 : }
1902 :
1903 42 : slotname = s->data.name;
1904 42 : active_pid = s->active_pid;
1905 :
1906 : /*
1907 : * If the slot can be acquired, do so and mark it invalidated
1908 : * immediately. Otherwise we'll signal the owning process, below, and
1909 : * retry.
1910 : *
1911 : * Note: Unlike other slot attributes, slot's inactive_since can't be
1912 : * changed until the acquired slot is released or the owning process
1913 : * is terminated. So, the inactive slot can only be invalidated
1914 : * immediately without being terminated.
1915 : */
1916 42 : if (active_pid == 0)
1917 : {
1918 28 : MyReplicationSlot = s;
1919 28 : s->active_pid = MyProcPid;
1920 28 : s->data.invalidated = invalidation_cause;
1921 :
1922 : /*
1923 : * XXX: We should consider not overwriting restart_lsn and instead
1924 : * just rely on .invalidated.
1925 : */
1926 28 : if (invalidation_cause == RS_INVAL_WAL_REMOVED)
1927 : {
1928 8 : s->data.restart_lsn = InvalidXLogRecPtr;
1929 8 : s->last_saved_restart_lsn = InvalidXLogRecPtr;
1930 : }
1931 :
1932 : /* Let caller know */
1933 28 : *invalidated = true;
1934 : }
1935 :
1936 42 : SpinLockRelease(&s->mutex);
1937 :
1938 : /*
1939 : * Calculate the idle time duration of the slot if slot is marked
1940 : * invalidated with RS_INVAL_IDLE_TIMEOUT.
1941 : */
1942 42 : if (invalidation_cause == RS_INVAL_IDLE_TIMEOUT)
1943 : {
1944 : int slot_idle_usecs;
1945 :
1946 0 : TimestampDifference(inactive_since, now, &slot_idle_secs,
1947 : &slot_idle_usecs);
1948 : }
1949 :
1950 42 : if (active_pid != 0)
1951 : {
1952 : /*
1953 : * Prepare the sleep on the slot's condition variable before
1954 : * releasing the lock, to close a possible race condition if the
1955 : * slot is released before the sleep below.
1956 : */
1957 14 : ConditionVariablePrepareToSleep(&s->active_cv);
1958 :
1959 14 : LWLockRelease(ReplicationSlotControlLock);
1960 14 : released_lock = true;
1961 :
1962 : /*
1963 : * Signal to terminate the process that owns the slot, if we
1964 : * haven't already signalled it. (Avoidance of repeated
1965 : * signalling is the only reason for there to be a loop in this
1966 : * routine; otherwise we could rely on caller's restart loop.)
1967 : *
1968 : * There is the race condition that other process may own the slot
1969 : * after its current owner process is terminated and before this
1970 : * process owns it. To handle that, we signal only if the PID of
1971 : * the owning process has changed from the previous time. (This
1972 : * logic assumes that the same PID is not reused very quickly.)
1973 : */
1974 14 : if (last_signaled_pid != active_pid)
1975 : {
1976 14 : ReportSlotInvalidation(invalidation_cause, true, active_pid,
1977 : slotname, restart_lsn,
1978 : oldestLSN, snapshotConflictHorizon,
1979 : slot_idle_secs);
1980 :
1981 14 : if (MyBackendType == B_STARTUP)
1982 10 : (void) SendProcSignal(active_pid,
1983 : PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT,
1984 : INVALID_PROC_NUMBER);
1985 : else
1986 4 : (void) kill(active_pid, SIGTERM);
1987 :
1988 14 : last_signaled_pid = active_pid;
1989 : }
1990 :
1991 : /* Wait until the slot is released. */
1992 14 : ConditionVariableSleep(&s->active_cv,
1993 : WAIT_EVENT_REPLICATION_SLOT_DROP);
1994 :
1995 : /*
1996 : * Re-acquire lock and start over; we expect to invalidate the
1997 : * slot next time (unless another process acquires the slot in the
1998 : * meantime).
1999 : *
2000 : * Note: It is possible for a slot to advance its restart_lsn or
2001 : * xmin values sufficiently between when we release the mutex and
2002 : * when we recheck, moving from a conflicting state to a non
2003 : * conflicting state. This is intentional and safe: if the slot
2004 : * has caught up while we're busy here, the resources we were
2005 : * concerned about (WAL segments or tuples) have not yet been
2006 : * removed, and there's no reason to invalidate the slot.
2007 : */
2008 14 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
2009 14 : continue;
2010 : }
2011 : else
2012 : {
2013 : /*
2014 : * We hold the slot now and have already invalidated it; flush it
2015 : * to ensure that state persists.
2016 : *
2017 : * Don't want to hold ReplicationSlotControlLock across file
2018 : * system operations, so release it now but be sure to tell caller
2019 : * to restart from scratch.
2020 : */
2021 28 : LWLockRelease(ReplicationSlotControlLock);
2022 28 : released_lock = true;
2023 :
2024 : /* Make sure the invalidated state persists across server restart */
2025 28 : ReplicationSlotMarkDirty();
2026 28 : ReplicationSlotSave();
2027 28 : ReplicationSlotRelease();
2028 :
2029 28 : ReportSlotInvalidation(invalidation_cause, false, active_pid,
2030 : slotname, restart_lsn,
2031 : oldestLSN, snapshotConflictHorizon,
2032 : slot_idle_secs);
2033 :
2034 : /* done with this slot for now */
2035 28 : break;
2036 : }
2037 : }
2038 :
2039 : Assert(released_lock == !LWLockHeldByMe(ReplicationSlotControlLock));
2040 :
2041 838 : return released_lock;
2042 : }
2043 :
2044 : /*
2045 : * Invalidate slots that require resources about to be removed.
2046 : *
2047 : * Returns true when any slot have got invalidated.
2048 : *
2049 : * Whether a slot needs to be invalidated depends on the invalidation cause.
2050 : * A slot is invalidated if it:
2051 : * - RS_INVAL_WAL_REMOVED: requires a LSN older than the given segment
2052 : * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given
2053 : * db; dboid may be InvalidOid for shared relations
2054 : * - RS_INVAL_WAL_LEVEL: is logical and wal_level is insufficient
2055 : * - RS_INVAL_IDLE_TIMEOUT: has been idle longer than the configured
2056 : * "idle_replication_slot_timeout" duration.
2057 : *
2058 : * Note: This function attempts to invalidate the slot for multiple possible
2059 : * causes in a single pass, minimizing redundant iterations. The "cause"
2060 : * parameter can be a MASK representing one or more of the defined causes.
2061 : *
2062 : * NB - this runs as part of checkpoint, so avoid raising errors if possible.
2063 : */
2064 : bool
2065 3498 : InvalidateObsoleteReplicationSlots(uint32 possible_causes,
2066 : XLogSegNo oldestSegno, Oid dboid,
2067 : TransactionId snapshotConflictHorizon)
2068 : {
2069 : XLogRecPtr oldestLSN;
2070 3498 : bool invalidated = false;
2071 :
2072 : Assert(!(possible_causes & RS_INVAL_HORIZON) || TransactionIdIsValid(snapshotConflictHorizon));
2073 : Assert(!(possible_causes & RS_INVAL_WAL_REMOVED) || oldestSegno > 0);
2074 : Assert(possible_causes != RS_INVAL_NONE);
2075 :
2076 3498 : if (max_replication_slots == 0)
2077 2 : return invalidated;
2078 :
2079 3496 : XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
2080 :
2081 3524 : restart:
2082 3524 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
2083 37768 : for (int i = 0; i < max_replication_slots; i++)
2084 : {
2085 34272 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
2086 :
2087 34272 : if (!s->in_use)
2088 33420 : continue;
2089 :
2090 : /* Prevent invalidation of logical slots during binary upgrade */
2091 852 : if (SlotIsLogical(s) && IsBinaryUpgrade)
2092 14 : continue;
2093 :
2094 838 : if (InvalidatePossiblyObsoleteSlot(possible_causes, s, oldestLSN, dboid,
2095 : snapshotConflictHorizon,
2096 : &invalidated))
2097 : {
2098 : /* if the lock was released, start from scratch */
2099 28 : goto restart;
2100 : }
2101 : }
2102 3496 : LWLockRelease(ReplicationSlotControlLock);
2103 :
2104 : /*
2105 : * If any slots have been invalidated, recalculate the resource limits.
2106 : */
2107 3496 : if (invalidated)
2108 : {
2109 18 : ReplicationSlotsComputeRequiredXmin(false);
2110 18 : ReplicationSlotsComputeRequiredLSN();
2111 : }
2112 :
2113 3496 : return invalidated;
2114 : }
2115 :
2116 : /*
2117 : * Flush all replication slots to disk.
2118 : *
2119 : * It is convenient to flush dirty replication slots at the time of checkpoint.
2120 : * Additionally, in case of a shutdown checkpoint, we also identify the slots
2121 : * for which the confirmed_flush LSN has been updated since the last time it
2122 : * was saved and flush them.
2123 : */
2124 : void
2125 3456 : CheckPointReplicationSlots(bool is_shutdown)
2126 : {
2127 : int i;
2128 3456 : bool last_saved_restart_lsn_updated = false;
2129 :
2130 3456 : elog(DEBUG1, "performing replication slot checkpoint");
2131 :
2132 : /*
2133 : * Prevent any slot from being created/dropped while we're active. As we
2134 : * explicitly do *not* want to block iterating over replication_slots or
2135 : * acquiring a slot we cannot take the control lock - but that's OK,
2136 : * because holding ReplicationSlotAllocationLock is strictly stronger, and
2137 : * enough to guarantee that nobody can change the in_use bits on us.
2138 : */
2139 3456 : LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
2140 :
2141 37450 : for (i = 0; i < max_replication_slots; i++)
2142 : {
2143 33994 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
2144 : char path[MAXPGPATH];
2145 :
2146 33994 : if (!s->in_use)
2147 33248 : continue;
2148 :
2149 : /* save the slot to disk, locking is handled in SaveSlotToPath() */
2150 746 : sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(s->data.name));
2151 :
2152 : /*
2153 : * Slot's data is not flushed each time the confirmed_flush LSN is
2154 : * updated as that could lead to frequent writes. However, we decide
2155 : * to force a flush of all logical slot's data at the time of shutdown
2156 : * if the confirmed_flush LSN is changed since we last flushed it to
2157 : * disk. This helps in avoiding an unnecessary retreat of the
2158 : * confirmed_flush LSN after restart.
2159 : */
2160 746 : if (is_shutdown && SlotIsLogical(s))
2161 : {
2162 156 : SpinLockAcquire(&s->mutex);
2163 :
2164 156 : if (s->data.invalidated == RS_INVAL_NONE &&
2165 156 : s->data.confirmed_flush > s->last_saved_confirmed_flush)
2166 : {
2167 74 : s->just_dirtied = true;
2168 74 : s->dirty = true;
2169 : }
2170 156 : SpinLockRelease(&s->mutex);
2171 : }
2172 :
2173 : /*
2174 : * Track if we're going to update slot's last_saved_restart_lsn. We
2175 : * need this to know if we need to recompute the required LSN.
2176 : */
2177 746 : if (s->last_saved_restart_lsn != s->data.restart_lsn)
2178 378 : last_saved_restart_lsn_updated = true;
2179 :
2180 746 : SaveSlotToPath(s, path, LOG);
2181 : }
2182 3456 : LWLockRelease(ReplicationSlotAllocationLock);
2183 :
2184 : /*
2185 : * Recompute the required LSN if SaveSlotToPath() updated
2186 : * last_saved_restart_lsn for any slot.
2187 : */
2188 3456 : if (last_saved_restart_lsn_updated)
2189 378 : ReplicationSlotsComputeRequiredLSN();
2190 3456 : }
2191 :
2192 : /*
2193 : * Load all replication slots from disk into memory at server startup. This
2194 : * needs to be run before we start crash recovery.
2195 : */
2196 : void
2197 1914 : StartupReplicationSlots(void)
2198 : {
2199 : DIR *replication_dir;
2200 : struct dirent *replication_de;
2201 :
2202 1914 : elog(DEBUG1, "starting up replication slots");
2203 :
2204 : /* restore all slots by iterating over all on-disk entries */
2205 1914 : replication_dir = AllocateDir(PG_REPLSLOT_DIR);
2206 5958 : while ((replication_de = ReadDir(replication_dir, PG_REPLSLOT_DIR)) != NULL)
2207 : {
2208 : char path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
2209 : PGFileType de_type;
2210 :
2211 4046 : if (strcmp(replication_de->d_name, ".") == 0 ||
2212 2134 : strcmp(replication_de->d_name, "..") == 0)
2213 3824 : continue;
2214 :
2215 222 : snprintf(path, sizeof(path), "%s/%s", PG_REPLSLOT_DIR, replication_de->d_name);
2216 222 : de_type = get_dirent_type(path, replication_de, false, DEBUG1);
2217 :
2218 : /* we're only creating directories here, skip if it's not our's */
2219 222 : if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_DIR)
2220 0 : continue;
2221 :
2222 : /* we crashed while a slot was being setup or deleted, clean up */
2223 222 : if (pg_str_endswith(replication_de->d_name, ".tmp"))
2224 : {
2225 0 : if (!rmtree(path, true))
2226 : {
2227 0 : ereport(WARNING,
2228 : (errmsg("could not remove directory \"%s\"",
2229 : path)));
2230 0 : continue;
2231 : }
2232 0 : fsync_fname(PG_REPLSLOT_DIR, true);
2233 0 : continue;
2234 : }
2235 :
2236 : /* looks like a slot in a normal state, restore */
2237 222 : RestoreSlotFromDisk(replication_de->d_name);
2238 : }
2239 1912 : FreeDir(replication_dir);
2240 :
2241 : /* currently no slots exist, we're done. */
2242 1912 : if (max_replication_slots <= 0)
2243 2 : return;
2244 :
2245 : /* Now that we have recovered all the data, compute replication xmin */
2246 1910 : ReplicationSlotsComputeRequiredXmin(false);
2247 1910 : ReplicationSlotsComputeRequiredLSN();
2248 : }
2249 :
2250 : /* ----
2251 : * Manipulation of on-disk state of replication slots
2252 : *
2253 : * NB: none of the routines below should take any notice whether a slot is the
2254 : * current one or not, that's all handled a layer above.
2255 : * ----
2256 : */
2257 : static void
2258 1298 : CreateSlotOnDisk(ReplicationSlot *slot)
2259 : {
2260 : char tmppath[MAXPGPATH];
2261 : char path[MAXPGPATH];
2262 : struct stat st;
2263 :
2264 : /*
2265 : * No need to take out the io_in_progress_lock, nobody else can see this
2266 : * slot yet, so nobody else will write. We're reusing SaveSlotToPath which
2267 : * takes out the lock, if we'd take the lock here, we'd deadlock.
2268 : */
2269 :
2270 1298 : sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(slot->data.name));
2271 1298 : sprintf(tmppath, "%s/%s.tmp", PG_REPLSLOT_DIR, NameStr(slot->data.name));
2272 :
2273 : /*
2274 : * It's just barely possible that some previous effort to create or drop a
2275 : * slot with this name left a temp directory lying around. If that seems
2276 : * to be the case, try to remove it. If the rmtree() fails, we'll error
2277 : * out at the MakePGDirectory() below, so we don't bother checking
2278 : * success.
2279 : */
2280 1298 : if (stat(tmppath, &st) == 0 && S_ISDIR(st.st_mode))
2281 0 : rmtree(tmppath, true);
2282 :
2283 : /* Create and fsync the temporary slot directory. */
2284 1298 : if (MakePGDirectory(tmppath) < 0)
2285 0 : ereport(ERROR,
2286 : (errcode_for_file_access(),
2287 : errmsg("could not create directory \"%s\": %m",
2288 : tmppath)));
2289 1298 : fsync_fname(tmppath, true);
2290 :
2291 : /* Write the actual state file. */
2292 1298 : slot->dirty = true; /* signal that we really need to write */
2293 1298 : SaveSlotToPath(slot, tmppath, ERROR);
2294 :
2295 : /* Rename the directory into place. */
2296 1298 : if (rename(tmppath, path) != 0)
2297 0 : ereport(ERROR,
2298 : (errcode_for_file_access(),
2299 : errmsg("could not rename file \"%s\" to \"%s\": %m",
2300 : tmppath, path)));
2301 :
2302 : /*
2303 : * If we'd now fail - really unlikely - we wouldn't know whether this slot
2304 : * would persist after an OS crash or not - so, force a restart. The
2305 : * restart would try to fsync this again till it works.
2306 : */
2307 1298 : START_CRIT_SECTION();
2308 :
2309 1298 : fsync_fname(path, true);
2310 1298 : fsync_fname(PG_REPLSLOT_DIR, true);
2311 :
2312 1298 : END_CRIT_SECTION();
2313 1298 : }
2314 :
2315 : /*
2316 : * Shared functionality between saving and creating a replication slot.
2317 : */
2318 : static void
2319 4728 : SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
2320 : {
2321 : char tmppath[MAXPGPATH];
2322 : char path[MAXPGPATH];
2323 : int fd;
2324 : ReplicationSlotOnDisk cp;
2325 : bool was_dirty;
2326 :
2327 : /* first check whether there's something to write out */
2328 4728 : SpinLockAcquire(&slot->mutex);
2329 4728 : was_dirty = slot->dirty;
2330 4728 : slot->just_dirtied = false;
2331 4728 : SpinLockRelease(&slot->mutex);
2332 :
2333 : /* and don't do anything if there's nothing to write */
2334 4728 : if (!was_dirty)
2335 272 : return;
2336 :
2337 4456 : LWLockAcquire(&slot->io_in_progress_lock, LW_EXCLUSIVE);
2338 :
2339 : /* silence valgrind :( */
2340 4456 : memset(&cp, 0, sizeof(ReplicationSlotOnDisk));
2341 :
2342 4456 : sprintf(tmppath, "%s/state.tmp", dir);
2343 4456 : sprintf(path, "%s/state", dir);
2344 :
2345 4456 : fd = OpenTransientFile(tmppath, O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
2346 4456 : if (fd < 0)
2347 : {
2348 : /*
2349 : * If not an ERROR, then release the lock before returning. In case
2350 : * of an ERROR, the error recovery path automatically releases the
2351 : * lock, but no harm in explicitly releasing even in that case. Note
2352 : * that LWLockRelease() could affect errno.
2353 : */
2354 0 : int save_errno = errno;
2355 :
2356 0 : LWLockRelease(&slot->io_in_progress_lock);
2357 0 : errno = save_errno;
2358 0 : ereport(elevel,
2359 : (errcode_for_file_access(),
2360 : errmsg("could not create file \"%s\": %m",
2361 : tmppath)));
2362 0 : return;
2363 : }
2364 :
2365 4456 : cp.magic = SLOT_MAGIC;
2366 4456 : INIT_CRC32C(cp.checksum);
2367 4456 : cp.version = SLOT_VERSION;
2368 4456 : cp.length = ReplicationSlotOnDiskV2Size;
2369 :
2370 4456 : SpinLockAcquire(&slot->mutex);
2371 :
2372 4456 : memcpy(&cp.slotdata, &slot->data, sizeof(ReplicationSlotPersistentData));
2373 :
2374 4456 : SpinLockRelease(&slot->mutex);
2375 :
2376 4456 : COMP_CRC32C(cp.checksum,
2377 : (char *) (&cp) + ReplicationSlotOnDiskNotChecksummedSize,
2378 : ReplicationSlotOnDiskChecksummedSize);
2379 4456 : FIN_CRC32C(cp.checksum);
2380 :
2381 4456 : errno = 0;
2382 4456 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_WRITE);
2383 4456 : if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
2384 : {
2385 0 : int save_errno = errno;
2386 :
2387 0 : pgstat_report_wait_end();
2388 0 : CloseTransientFile(fd);
2389 0 : unlink(tmppath);
2390 0 : LWLockRelease(&slot->io_in_progress_lock);
2391 :
2392 : /* if write didn't set errno, assume problem is no disk space */
2393 0 : errno = save_errno ? save_errno : ENOSPC;
2394 0 : ereport(elevel,
2395 : (errcode_for_file_access(),
2396 : errmsg("could not write to file \"%s\": %m",
2397 : tmppath)));
2398 0 : return;
2399 : }
2400 4456 : pgstat_report_wait_end();
2401 :
2402 : /* fsync the temporary file */
2403 4456 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_SYNC);
2404 4456 : if (pg_fsync(fd) != 0)
2405 : {
2406 0 : int save_errno = errno;
2407 :
2408 0 : pgstat_report_wait_end();
2409 0 : CloseTransientFile(fd);
2410 0 : unlink(tmppath);
2411 0 : LWLockRelease(&slot->io_in_progress_lock);
2412 :
2413 0 : errno = save_errno;
2414 0 : ereport(elevel,
2415 : (errcode_for_file_access(),
2416 : errmsg("could not fsync file \"%s\": %m",
2417 : tmppath)));
2418 0 : return;
2419 : }
2420 4456 : pgstat_report_wait_end();
2421 :
2422 4456 : if (CloseTransientFile(fd) != 0)
2423 : {
2424 0 : int save_errno = errno;
2425 :
2426 0 : unlink(tmppath);
2427 0 : LWLockRelease(&slot->io_in_progress_lock);
2428 :
2429 0 : errno = save_errno;
2430 0 : ereport(elevel,
2431 : (errcode_for_file_access(),
2432 : errmsg("could not close file \"%s\": %m",
2433 : tmppath)));
2434 0 : return;
2435 : }
2436 :
2437 : /* rename to permanent file, fsync file and directory */
2438 4456 : if (rename(tmppath, path) != 0)
2439 : {
2440 0 : int save_errno = errno;
2441 :
2442 0 : unlink(tmppath);
2443 0 : LWLockRelease(&slot->io_in_progress_lock);
2444 :
2445 0 : errno = save_errno;
2446 0 : ereport(elevel,
2447 : (errcode_for_file_access(),
2448 : errmsg("could not rename file \"%s\" to \"%s\": %m",
2449 : tmppath, path)));
2450 0 : return;
2451 : }
2452 :
2453 : /*
2454 : * Check CreateSlotOnDisk() for the reasoning of using a critical section.
2455 : */
2456 4456 : START_CRIT_SECTION();
2457 :
2458 4456 : fsync_fname(path, false);
2459 4456 : fsync_fname(dir, true);
2460 4456 : fsync_fname(PG_REPLSLOT_DIR, true);
2461 :
2462 4456 : END_CRIT_SECTION();
2463 :
2464 : /*
2465 : * Successfully wrote, unset dirty bit, unless somebody dirtied again
2466 : * already and remember the confirmed_flush LSN value.
2467 : */
2468 4456 : SpinLockAcquire(&slot->mutex);
2469 4456 : if (!slot->just_dirtied)
2470 4416 : slot->dirty = false;
2471 4456 : slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
2472 4456 : slot->last_saved_restart_lsn = cp.slotdata.restart_lsn;
2473 4456 : SpinLockRelease(&slot->mutex);
2474 :
2475 4456 : LWLockRelease(&slot->io_in_progress_lock);
2476 : }
2477 :
2478 : /*
2479 : * Load a single slot from disk into memory.
2480 : */
2481 : static void
2482 222 : RestoreSlotFromDisk(const char *name)
2483 : {
2484 : ReplicationSlotOnDisk cp;
2485 : int i;
2486 : char slotdir[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
2487 : char path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR) + 10];
2488 : int fd;
2489 222 : bool restored = false;
2490 : int readBytes;
2491 : pg_crc32c checksum;
2492 222 : TimestampTz now = 0;
2493 :
2494 : /* no need to lock here, no concurrent access allowed yet */
2495 :
2496 : /* delete temp file if it exists */
2497 222 : sprintf(slotdir, "%s/%s", PG_REPLSLOT_DIR, name);
2498 222 : sprintf(path, "%s/state.tmp", slotdir);
2499 222 : if (unlink(path) < 0 && errno != ENOENT)
2500 0 : ereport(PANIC,
2501 : (errcode_for_file_access(),
2502 : errmsg("could not remove file \"%s\": %m", path)));
2503 :
2504 222 : sprintf(path, "%s/state", slotdir);
2505 :
2506 222 : elog(DEBUG1, "restoring replication slot from \"%s\"", path);
2507 :
2508 : /* on some operating systems fsyncing a file requires O_RDWR */
2509 222 : fd = OpenTransientFile(path, O_RDWR | PG_BINARY);
2510 :
2511 : /*
2512 : * We do not need to handle this as we are rename()ing the directory into
2513 : * place only after we fsync()ed the state file.
2514 : */
2515 222 : if (fd < 0)
2516 0 : ereport(PANIC,
2517 : (errcode_for_file_access(),
2518 : errmsg("could not open file \"%s\": %m", path)));
2519 :
2520 : /*
2521 : * Sync state file before we're reading from it. We might have crashed
2522 : * while it wasn't synced yet and we shouldn't continue on that basis.
2523 : */
2524 222 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_RESTORE_SYNC);
2525 222 : if (pg_fsync(fd) != 0)
2526 0 : ereport(PANIC,
2527 : (errcode_for_file_access(),
2528 : errmsg("could not fsync file \"%s\": %m",
2529 : path)));
2530 222 : pgstat_report_wait_end();
2531 :
2532 : /* Also sync the parent directory */
2533 222 : START_CRIT_SECTION();
2534 222 : fsync_fname(slotdir, true);
2535 222 : END_CRIT_SECTION();
2536 :
2537 : /* read part of statefile that's guaranteed to be version independent */
2538 222 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
2539 222 : readBytes = read(fd, &cp, ReplicationSlotOnDiskConstantSize);
2540 222 : pgstat_report_wait_end();
2541 222 : if (readBytes != ReplicationSlotOnDiskConstantSize)
2542 : {
2543 0 : if (readBytes < 0)
2544 0 : ereport(PANIC,
2545 : (errcode_for_file_access(),
2546 : errmsg("could not read file \"%s\": %m", path)));
2547 : else
2548 0 : ereport(PANIC,
2549 : (errcode(ERRCODE_DATA_CORRUPTED),
2550 : errmsg("could not read file \"%s\": read %d of %zu",
2551 : path, readBytes,
2552 : (Size) ReplicationSlotOnDiskConstantSize)));
2553 : }
2554 :
2555 : /* verify magic */
2556 222 : if (cp.magic != SLOT_MAGIC)
2557 0 : ereport(PANIC,
2558 : (errcode(ERRCODE_DATA_CORRUPTED),
2559 : errmsg("replication slot file \"%s\" has wrong magic number: %u instead of %u",
2560 : path, cp.magic, SLOT_MAGIC)));
2561 :
2562 : /* verify version */
2563 222 : if (cp.version != SLOT_VERSION)
2564 0 : ereport(PANIC,
2565 : (errcode(ERRCODE_DATA_CORRUPTED),
2566 : errmsg("replication slot file \"%s\" has unsupported version %u",
2567 : path, cp.version)));
2568 :
2569 : /* boundary check on length */
2570 222 : if (cp.length != ReplicationSlotOnDiskV2Size)
2571 0 : ereport(PANIC,
2572 : (errcode(ERRCODE_DATA_CORRUPTED),
2573 : errmsg("replication slot file \"%s\" has corrupted length %u",
2574 : path, cp.length)));
2575 :
2576 : /* Now that we know the size, read the entire file */
2577 222 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
2578 444 : readBytes = read(fd,
2579 : (char *) &cp + ReplicationSlotOnDiskConstantSize,
2580 222 : cp.length);
2581 222 : pgstat_report_wait_end();
2582 222 : if (readBytes != cp.length)
2583 : {
2584 0 : if (readBytes < 0)
2585 0 : ereport(PANIC,
2586 : (errcode_for_file_access(),
2587 : errmsg("could not read file \"%s\": %m", path)));
2588 : else
2589 0 : ereport(PANIC,
2590 : (errcode(ERRCODE_DATA_CORRUPTED),
2591 : errmsg("could not read file \"%s\": read %d of %zu",
2592 : path, readBytes, (Size) cp.length)));
2593 : }
2594 :
2595 222 : if (CloseTransientFile(fd) != 0)
2596 0 : ereport(PANIC,
2597 : (errcode_for_file_access(),
2598 : errmsg("could not close file \"%s\": %m", path)));
2599 :
2600 : /* now verify the CRC */
2601 222 : INIT_CRC32C(checksum);
2602 222 : COMP_CRC32C(checksum,
2603 : (char *) &cp + ReplicationSlotOnDiskNotChecksummedSize,
2604 : ReplicationSlotOnDiskChecksummedSize);
2605 222 : FIN_CRC32C(checksum);
2606 :
2607 222 : if (!EQ_CRC32C(checksum, cp.checksum))
2608 0 : ereport(PANIC,
2609 : (errmsg("checksum mismatch for replication slot file \"%s\": is %u, should be %u",
2610 : path, checksum, cp.checksum)));
2611 :
2612 : /*
2613 : * If we crashed with an ephemeral slot active, don't restore but delete
2614 : * it.
2615 : */
2616 222 : if (cp.slotdata.persistency != RS_PERSISTENT)
2617 : {
2618 0 : if (!rmtree(slotdir, true))
2619 : {
2620 0 : ereport(WARNING,
2621 : (errmsg("could not remove directory \"%s\"",
2622 : slotdir)));
2623 : }
2624 0 : fsync_fname(PG_REPLSLOT_DIR, true);
2625 0 : return;
2626 : }
2627 :
2628 : /*
2629 : * Verify that requirements for the specific slot type are met. That's
2630 : * important because if these aren't met we're not guaranteed to retain
2631 : * all the necessary resources for the slot.
2632 : *
2633 : * NB: We have to do so *after* the above checks for ephemeral slots,
2634 : * because otherwise a slot that shouldn't exist anymore could prevent
2635 : * restarts.
2636 : *
2637 : * NB: Changing the requirements here also requires adapting
2638 : * CheckSlotRequirements() and CheckLogicalDecodingRequirements().
2639 : */
2640 222 : if (cp.slotdata.database != InvalidOid)
2641 : {
2642 146 : if (wal_level < WAL_LEVEL_LOGICAL)
2643 0 : ereport(FATAL,
2644 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2645 : errmsg("logical replication slot \"%s\" exists, but \"wal_level\" < \"logical\"",
2646 : NameStr(cp.slotdata.name)),
2647 : errhint("Change \"wal_level\" to be \"logical\" or higher.")));
2648 :
2649 : /*
2650 : * In standby mode, the hot standby must be enabled. This check is
2651 : * necessary to ensure logical slots are invalidated when they become
2652 : * incompatible due to insufficient wal_level. Otherwise, if the
2653 : * primary reduces wal_level < logical while hot standby is disabled,
2654 : * logical slots would remain valid even after promotion.
2655 : */
2656 146 : if (StandbyMode && !EnableHotStandby)
2657 2 : ereport(FATAL,
2658 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2659 : errmsg("logical replication slot \"%s\" exists on the standby, but \"hot_standby\" = \"off\"",
2660 : NameStr(cp.slotdata.name)),
2661 : errhint("Change \"hot_standby\" to be \"on\".")));
2662 : }
2663 76 : else if (wal_level < WAL_LEVEL_REPLICA)
2664 0 : ereport(FATAL,
2665 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2666 : errmsg("physical replication slot \"%s\" exists, but \"wal_level\" < \"replica\"",
2667 : NameStr(cp.slotdata.name)),
2668 : errhint("Change \"wal_level\" to be \"replica\" or higher.")));
2669 :
2670 : /* nothing can be active yet, don't lock anything */
2671 318 : for (i = 0; i < max_replication_slots; i++)
2672 : {
2673 : ReplicationSlot *slot;
2674 :
2675 318 : slot = &ReplicationSlotCtl->replication_slots[i];
2676 :
2677 318 : if (slot->in_use)
2678 98 : continue;
2679 :
2680 : /* restore the entire set of persistent data */
2681 220 : memcpy(&slot->data, &cp.slotdata,
2682 : sizeof(ReplicationSlotPersistentData));
2683 :
2684 : /* initialize in memory state */
2685 220 : slot->effective_xmin = cp.slotdata.xmin;
2686 220 : slot->effective_catalog_xmin = cp.slotdata.catalog_xmin;
2687 220 : slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
2688 220 : slot->last_saved_restart_lsn = cp.slotdata.restart_lsn;
2689 :
2690 220 : slot->candidate_catalog_xmin = InvalidTransactionId;
2691 220 : slot->candidate_xmin_lsn = InvalidXLogRecPtr;
2692 220 : slot->candidate_restart_lsn = InvalidXLogRecPtr;
2693 220 : slot->candidate_restart_valid = InvalidXLogRecPtr;
2694 :
2695 220 : slot->in_use = true;
2696 220 : slot->active_pid = 0;
2697 :
2698 : /*
2699 : * Set the time since the slot has become inactive after loading the
2700 : * slot from the disk into memory. Whoever acquires the slot i.e.
2701 : * makes the slot active will reset it. Use the same inactive_since
2702 : * time for all the slots.
2703 : */
2704 220 : if (now == 0)
2705 220 : now = GetCurrentTimestamp();
2706 :
2707 220 : ReplicationSlotSetInactiveSince(slot, now, false);
2708 :
2709 220 : restored = true;
2710 220 : break;
2711 : }
2712 :
2713 220 : if (!restored)
2714 0 : ereport(FATAL,
2715 : (errmsg("too many replication slots active before shutdown"),
2716 : errhint("Increase \"max_replication_slots\" and try again.")));
2717 : }
2718 :
2719 : /*
2720 : * Maps an invalidation reason for a replication slot to
2721 : * ReplicationSlotInvalidationCause.
2722 : */
2723 : ReplicationSlotInvalidationCause
2724 0 : GetSlotInvalidationCause(const char *cause_name)
2725 : {
2726 : Assert(cause_name);
2727 :
2728 : /* Search lookup table for the cause having this name */
2729 0 : for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
2730 : {
2731 0 : if (strcmp(SlotInvalidationCauses[i].cause_name, cause_name) == 0)
2732 0 : return SlotInvalidationCauses[i].cause;
2733 : }
2734 :
2735 : Assert(false);
2736 0 : return RS_INVAL_NONE; /* to keep compiler quiet */
2737 : }
2738 :
2739 : /*
2740 : * Maps an ReplicationSlotInvalidationCause to the invalidation
2741 : * reason for a replication slot.
2742 : */
2743 : const char *
2744 84 : GetSlotInvalidationCauseName(ReplicationSlotInvalidationCause cause)
2745 : {
2746 : /* Search lookup table for the name of this cause */
2747 270 : for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
2748 : {
2749 270 : if (SlotInvalidationCauses[i].cause == cause)
2750 84 : return SlotInvalidationCauses[i].cause_name;
2751 : }
2752 :
2753 : Assert(false);
2754 0 : return "none"; /* to keep compiler quiet */
2755 : }
2756 :
2757 : /*
2758 : * A helper function to validate slots specified in GUC synchronized_standby_slots.
2759 : *
2760 : * The rawname will be parsed, and the result will be saved into *elemlist.
2761 : */
2762 : static bool
2763 24 : validate_sync_standby_slots(char *rawname, List **elemlist)
2764 : {
2765 : /* Verify syntax and parse string into a list of identifiers */
2766 24 : if (!SplitIdentifierString(rawname, ',', elemlist))
2767 : {
2768 0 : GUC_check_errdetail("List syntax is invalid.");
2769 0 : return false;
2770 : }
2771 :
2772 : /* Iterate the list to validate each slot name */
2773 64 : foreach_ptr(char, name, *elemlist)
2774 : {
2775 : int err_code;
2776 24 : char *err_msg = NULL;
2777 24 : char *err_hint = NULL;
2778 :
2779 24 : if (!ReplicationSlotValidateNameInternal(name, false, &err_code,
2780 : &err_msg, &err_hint))
2781 : {
2782 4 : GUC_check_errcode(err_code);
2783 4 : GUC_check_errdetail("%s", err_msg);
2784 4 : if (err_hint != NULL)
2785 4 : GUC_check_errhint("%s", err_hint);
2786 4 : return false;
2787 : }
2788 : }
2789 :
2790 20 : return true;
2791 : }
2792 :
2793 : /*
2794 : * GUC check_hook for synchronized_standby_slots
2795 : */
2796 : bool
2797 2310 : check_synchronized_standby_slots(char **newval, void **extra, GucSource source)
2798 : {
2799 : char *rawname;
2800 : char *ptr;
2801 : List *elemlist;
2802 : int size;
2803 : bool ok;
2804 : SyncStandbySlotsConfigData *config;
2805 :
2806 2310 : if ((*newval)[0] == '\0')
2807 2286 : return true;
2808 :
2809 : /* Need a modifiable copy of the GUC string */
2810 24 : rawname = pstrdup(*newval);
2811 :
2812 : /* Now verify if the specified slots exist and have correct type */
2813 24 : ok = validate_sync_standby_slots(rawname, &elemlist);
2814 :
2815 24 : if (!ok || elemlist == NIL)
2816 : {
2817 4 : pfree(rawname);
2818 4 : list_free(elemlist);
2819 4 : return ok;
2820 : }
2821 :
2822 : /* Compute the size required for the SyncStandbySlotsConfigData struct */
2823 20 : size = offsetof(SyncStandbySlotsConfigData, slot_names);
2824 60 : foreach_ptr(char, slot_name, elemlist)
2825 20 : size += strlen(slot_name) + 1;
2826 :
2827 : /* GUC extra value must be guc_malloc'd, not palloc'd */
2828 20 : config = (SyncStandbySlotsConfigData *) guc_malloc(LOG, size);
2829 20 : if (!config)
2830 0 : return false;
2831 :
2832 : /* Transform the data into SyncStandbySlotsConfigData */
2833 20 : config->nslotnames = list_length(elemlist);
2834 :
2835 20 : ptr = config->slot_names;
2836 60 : foreach_ptr(char, slot_name, elemlist)
2837 : {
2838 20 : strcpy(ptr, slot_name);
2839 20 : ptr += strlen(slot_name) + 1;
2840 : }
2841 :
2842 20 : *extra = config;
2843 :
2844 20 : pfree(rawname);
2845 20 : list_free(elemlist);
2846 20 : return true;
2847 : }
2848 :
2849 : /*
2850 : * GUC assign_hook for synchronized_standby_slots
2851 : */
2852 : void
2853 2304 : assign_synchronized_standby_slots(const char *newval, void *extra)
2854 : {
2855 : /*
2856 : * The standby slots may have changed, so we must recompute the oldest
2857 : * LSN.
2858 : */
2859 2304 : ss_oldest_flush_lsn = InvalidXLogRecPtr;
2860 :
2861 2304 : synchronized_standby_slots_config = (SyncStandbySlotsConfigData *) extra;
2862 2304 : }
2863 :
2864 : /*
2865 : * Check if the passed slot_name is specified in the synchronized_standby_slots GUC.
2866 : */
2867 : bool
2868 41792 : SlotExistsInSyncStandbySlots(const char *slot_name)
2869 : {
2870 : const char *standby_slot_name;
2871 :
2872 : /* Return false if there is no value in synchronized_standby_slots */
2873 41792 : if (synchronized_standby_slots_config == NULL)
2874 41778 : return false;
2875 :
2876 : /*
2877 : * XXX: We are not expecting this list to be long so a linear search
2878 : * shouldn't hurt but if that turns out not to be true then we can cache
2879 : * this information for each WalSender as well.
2880 : */
2881 14 : standby_slot_name = synchronized_standby_slots_config->slot_names;
2882 14 : for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
2883 : {
2884 14 : if (strcmp(standby_slot_name, slot_name) == 0)
2885 14 : return true;
2886 :
2887 0 : standby_slot_name += strlen(standby_slot_name) + 1;
2888 : }
2889 :
2890 0 : return false;
2891 : }
2892 :
2893 : /*
2894 : * Return true if the slots specified in synchronized_standby_slots have caught up to
2895 : * the given WAL location, false otherwise.
2896 : *
2897 : * The elevel parameter specifies the error level used for logging messages
2898 : * related to slots that do not exist, are invalidated, or are inactive.
2899 : */
2900 : bool
2901 1342 : StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
2902 : {
2903 : const char *name;
2904 1342 : int caught_up_slot_num = 0;
2905 1342 : XLogRecPtr min_restart_lsn = InvalidXLogRecPtr;
2906 :
2907 : /*
2908 : * Don't need to wait for the standbys to catch up if there is no value in
2909 : * synchronized_standby_slots.
2910 : */
2911 1342 : if (synchronized_standby_slots_config == NULL)
2912 1300 : return true;
2913 :
2914 : /*
2915 : * Don't need to wait for the standbys to catch up if we are on a standby
2916 : * server, since we do not support syncing slots to cascading standbys.
2917 : */
2918 42 : if (RecoveryInProgress())
2919 0 : return true;
2920 :
2921 : /*
2922 : * Don't need to wait for the standbys to catch up if they are already
2923 : * beyond the specified WAL location.
2924 : */
2925 42 : if (!XLogRecPtrIsInvalid(ss_oldest_flush_lsn) &&
2926 20 : ss_oldest_flush_lsn >= wait_for_lsn)
2927 12 : return true;
2928 :
2929 : /*
2930 : * To prevent concurrent slot dropping and creation while filtering the
2931 : * slots, take the ReplicationSlotControlLock outside of the loop.
2932 : */
2933 30 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
2934 :
2935 30 : name = synchronized_standby_slots_config->slot_names;
2936 40 : for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
2937 : {
2938 : XLogRecPtr restart_lsn;
2939 : bool invalidated;
2940 : bool inactive;
2941 : ReplicationSlot *slot;
2942 :
2943 30 : slot = SearchNamedReplicationSlot(name, false);
2944 :
2945 : /*
2946 : * If a slot name provided in synchronized_standby_slots does not
2947 : * exist, report a message and exit the loop.
2948 : */
2949 30 : if (!slot)
2950 : {
2951 0 : ereport(elevel,
2952 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2953 : errmsg("replication slot \"%s\" specified in parameter \"%s\" does not exist",
2954 : name, "synchronized_standby_slots"),
2955 : errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
2956 : name),
2957 : errhint("Create the replication slot \"%s\" or amend parameter \"%s\".",
2958 : name, "synchronized_standby_slots"));
2959 0 : break;
2960 : }
2961 :
2962 : /* Same as above: if a slot is not physical, exit the loop. */
2963 30 : if (SlotIsLogical(slot))
2964 : {
2965 0 : ereport(elevel,
2966 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2967 : errmsg("cannot specify logical replication slot \"%s\" in parameter \"%s\"",
2968 : name, "synchronized_standby_slots"),
2969 : errdetail("Logical replication is waiting for correction on replication slot \"%s\".",
2970 : name),
2971 : errhint("Remove the logical replication slot \"%s\" from parameter \"%s\".",
2972 : name, "synchronized_standby_slots"));
2973 0 : break;
2974 : }
2975 :
2976 30 : SpinLockAcquire(&slot->mutex);
2977 30 : restart_lsn = slot->data.restart_lsn;
2978 30 : invalidated = slot->data.invalidated != RS_INVAL_NONE;
2979 30 : inactive = slot->active_pid == 0;
2980 30 : SpinLockRelease(&slot->mutex);
2981 :
2982 30 : if (invalidated)
2983 : {
2984 : /* Specified physical slot has been invalidated */
2985 0 : ereport(elevel,
2986 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2987 : errmsg("physical replication slot \"%s\" specified in parameter \"%s\" has been invalidated",
2988 : name, "synchronized_standby_slots"),
2989 : errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
2990 : name),
2991 : errhint("Drop and recreate the replication slot \"%s\", or amend parameter \"%s\".",
2992 : name, "synchronized_standby_slots"));
2993 0 : break;
2994 : }
2995 :
2996 30 : if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn < wait_for_lsn)
2997 : {
2998 : /* Log a message if no active_pid for this physical slot */
2999 20 : if (inactive)
3000 16 : ereport(elevel,
3001 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
3002 : errmsg("replication slot \"%s\" specified in parameter \"%s\" does not have active_pid",
3003 : name, "synchronized_standby_slots"),
3004 : errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
3005 : name),
3006 : errhint("Start the standby associated with the replication slot \"%s\", or amend parameter \"%s\".",
3007 : name, "synchronized_standby_slots"));
3008 :
3009 : /* Continue if the current slot hasn't caught up. */
3010 20 : break;
3011 : }
3012 :
3013 : Assert(restart_lsn >= wait_for_lsn);
3014 :
3015 10 : if (XLogRecPtrIsInvalid(min_restart_lsn) ||
3016 : min_restart_lsn > restart_lsn)
3017 10 : min_restart_lsn = restart_lsn;
3018 :
3019 10 : caught_up_slot_num++;
3020 :
3021 10 : name += strlen(name) + 1;
3022 : }
3023 :
3024 30 : LWLockRelease(ReplicationSlotControlLock);
3025 :
3026 : /*
3027 : * Return false if not all the standbys have caught up to the specified
3028 : * WAL location.
3029 : */
3030 30 : if (caught_up_slot_num != synchronized_standby_slots_config->nslotnames)
3031 20 : return false;
3032 :
3033 : /* The ss_oldest_flush_lsn must not retreat. */
3034 : Assert(XLogRecPtrIsInvalid(ss_oldest_flush_lsn) ||
3035 : min_restart_lsn >= ss_oldest_flush_lsn);
3036 :
3037 10 : ss_oldest_flush_lsn = min_restart_lsn;
3038 :
3039 10 : return true;
3040 : }
3041 :
3042 : /*
3043 : * Wait for physical standbys to confirm receiving the given lsn.
3044 : *
3045 : * Used by logical decoding SQL functions. It waits for physical standbys
3046 : * corresponding to the physical slots specified in the synchronized_standby_slots GUC.
3047 : */
3048 : void
3049 446 : WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
3050 : {
3051 : /*
3052 : * Don't need to wait for the standby to catch up if the current acquired
3053 : * slot is not a logical failover slot, or there is no value in
3054 : * synchronized_standby_slots.
3055 : */
3056 446 : if (!MyReplicationSlot->data.failover || !synchronized_standby_slots_config)
3057 444 : return;
3058 :
3059 2 : ConditionVariablePrepareToSleep(&WalSndCtl->wal_confirm_rcv_cv);
3060 :
3061 : for (;;)
3062 : {
3063 4 : CHECK_FOR_INTERRUPTS();
3064 :
3065 4 : if (ConfigReloadPending)
3066 : {
3067 2 : ConfigReloadPending = false;
3068 2 : ProcessConfigFile(PGC_SIGHUP);
3069 : }
3070 :
3071 : /* Exit if done waiting for every slot. */
3072 4 : if (StandbySlotsHaveCaughtup(wait_for_lsn, WARNING))
3073 2 : break;
3074 :
3075 : /*
3076 : * Wait for the slots in the synchronized_standby_slots to catch up,
3077 : * but use a timeout (1s) so we can also check if the
3078 : * synchronized_standby_slots has been changed.
3079 : */
3080 2 : ConditionVariableTimedSleep(&WalSndCtl->wal_confirm_rcv_cv, 1000,
3081 : WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION);
3082 : }
3083 :
3084 2 : ConditionVariableCancelSleep();
3085 : }
|