Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * slot.c
4 : * Replication slot management.
5 : *
6 : *
7 : * Copyright (c) 2012-2025, PostgreSQL Global Development Group
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/backend/replication/slot.c
12 : *
13 : * NOTES
14 : *
15 : * Replication slots are used to keep state about replication streams
16 : * originating from this cluster. Their primary purpose is to prevent the
17 : * premature removal of WAL or of old tuple versions in a manner that would
18 : * interfere with replication; they are also useful for monitoring purposes.
19 : * Slots need to be permanent (to allow restarts), crash-safe, and allocatable
20 : * on standbys (to support cascading setups). The requirement that slots be
21 : * usable on standbys precludes storing them in the system catalogs.
22 : *
23 : * Each replication slot gets its own directory inside the directory
24 : * $PGDATA / PG_REPLSLOT_DIR. Inside that directory the state file will
25 : * contain the slot's own data. Additional data can be stored alongside that
26 : * file if required. While the server is running, the state data is also
27 : * cached in memory for efficiency.
28 : *
29 : * ReplicationSlotAllocationLock must be taken in exclusive mode to allocate
30 : * or free a slot. ReplicationSlotControlLock must be taken in shared mode
31 : * to iterate over the slots, and in exclusive mode to change the in_use flag
32 : * of a slot. The remaining data in each slot is protected by its mutex.
33 : *
34 : *-------------------------------------------------------------------------
35 : */
36 :
37 : #include "postgres.h"
38 :
39 : #include <unistd.h>
40 : #include <sys/stat.h>
41 :
42 : #include "access/transam.h"
43 : #include "access/xlog_internal.h"
44 : #include "access/xlogrecovery.h"
45 : #include "common/file_utils.h"
46 : #include "common/string.h"
47 : #include "miscadmin.h"
48 : #include "pgstat.h"
49 : #include "postmaster/interrupt.h"
50 : #include "replication/logicallauncher.h"
51 : #include "replication/slotsync.h"
52 : #include "replication/slot.h"
53 : #include "replication/walsender_private.h"
54 : #include "storage/fd.h"
55 : #include "storage/ipc.h"
56 : #include "storage/proc.h"
57 : #include "storage/procarray.h"
58 : #include "utils/builtins.h"
59 : #include "utils/guc_hooks.h"
60 : #include "utils/injection_point.h"
61 : #include "utils/varlena.h"
62 :
63 : /*
64 : * Replication slot on-disk data structure.
65 : */
66 : typedef struct ReplicationSlotOnDisk
67 : {
68 : /* first part of this struct needs to be version independent */
69 :
70 : /* data not covered by checksum */
71 : uint32 magic;
72 : pg_crc32c checksum;
73 :
74 : /* data covered by checksum */
75 : uint32 version;
76 : uint32 length;
77 :
78 : /*
79 : * The actual data in the slot that follows can differ based on the above
80 : * 'version'.
81 : */
82 :
83 : ReplicationSlotPersistentData slotdata;
84 : } ReplicationSlotOnDisk;
85 :
86 : /*
87 : * Struct for the configuration of synchronized_standby_slots.
88 : *
89 : * Note: this must be a flat representation that can be held in a single chunk
90 : * of guc_malloc'd memory, so that it can be stored as the "extra" data for the
91 : * synchronized_standby_slots GUC.
92 : */
93 : typedef struct
94 : {
95 : /* Number of slot names in the slot_names[] */
96 : int nslotnames;
97 :
98 : /*
99 : * slot_names contains 'nslotnames' consecutive null-terminated C strings.
100 : */
101 : char slot_names[FLEXIBLE_ARRAY_MEMBER];
102 : } SyncStandbySlotsConfigData;
103 :
104 : /*
105 : * Lookup table for slot invalidation causes.
106 : */
107 : typedef struct SlotInvalidationCauseMap
108 : {
109 : ReplicationSlotInvalidationCause cause;
110 : const char *cause_name;
111 : } SlotInvalidationCauseMap;
112 :
113 : static const SlotInvalidationCauseMap SlotInvalidationCauses[] = {
114 : {RS_INVAL_NONE, "none"},
115 : {RS_INVAL_WAL_REMOVED, "wal_removed"},
116 : {RS_INVAL_HORIZON, "rows_removed"},
117 : {RS_INVAL_WAL_LEVEL, "wal_level_insufficient"},
118 : {RS_INVAL_IDLE_TIMEOUT, "idle_timeout"},
119 : };
120 :
121 : /*
122 : * Ensure that the lookup table is up-to-date with the enums defined in
123 : * ReplicationSlotInvalidationCause.
124 : */
125 : StaticAssertDecl(lengthof(SlotInvalidationCauses) == (RS_INVAL_MAX_CAUSES + 1),
126 : "array length mismatch");
127 :
128 : /* size of version independent data */
129 : #define ReplicationSlotOnDiskConstantSize \
130 : offsetof(ReplicationSlotOnDisk, slotdata)
131 : /* size of the part of the slot not covered by the checksum */
132 : #define ReplicationSlotOnDiskNotChecksummedSize \
133 : offsetof(ReplicationSlotOnDisk, version)
134 : /* size of the part covered by the checksum */
135 : #define ReplicationSlotOnDiskChecksummedSize \
136 : sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskNotChecksummedSize
137 : /* size of the slot data that is version dependent */
138 : #define ReplicationSlotOnDiskV2Size \
139 : sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
140 :
141 : #define SLOT_MAGIC 0x1051CA1 /* format identifier */
142 : #define SLOT_VERSION 5 /* version for new files */
143 :
144 : /* Control array for replication slot management */
145 : ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
146 :
147 : /* My backend's replication slot in the shared memory array */
148 : ReplicationSlot *MyReplicationSlot = NULL;
149 :
150 : /* GUC variables */
151 : int max_replication_slots = 10; /* the maximum number of replication
152 : * slots */
153 :
154 : /*
155 : * Invalidate replication slots that have remained idle longer than this
156 : * duration; '0' disables it.
157 : */
158 : int idle_replication_slot_timeout_secs = 0;
159 :
160 : /*
161 : * This GUC lists streaming replication standby server slot names that
162 : * logical WAL sender processes will wait for.
163 : */
164 : char *synchronized_standby_slots;
165 :
166 : /* This is the parsed and cached configuration for synchronized_standby_slots */
167 : static SyncStandbySlotsConfigData *synchronized_standby_slots_config;
168 :
169 : /*
170 : * Oldest LSN that has been confirmed to be flushed to the standbys
171 : * corresponding to the physical slots specified in the synchronized_standby_slots GUC.
172 : */
173 : static XLogRecPtr ss_oldest_flush_lsn = InvalidXLogRecPtr;
174 :
175 : static void ReplicationSlotShmemExit(int code, Datum arg);
176 : static bool IsSlotForConflictCheck(const char *name);
177 : static void ReplicationSlotDropPtr(ReplicationSlot *slot);
178 :
179 : /* internal persistency functions */
180 : static void RestoreSlotFromDisk(const char *name);
181 : static void CreateSlotOnDisk(ReplicationSlot *slot);
182 : static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel);
183 :
184 : /*
185 : * Report shared-memory space needed by ReplicationSlotsShmemInit.
186 : */
187 : Size
188 8512 : ReplicationSlotsShmemSize(void)
189 : {
190 8512 : Size size = 0;
191 :
192 8512 : if (max_replication_slots == 0)
193 4 : return size;
194 :
195 8508 : size = offsetof(ReplicationSlotCtlData, replication_slots);
196 8508 : size = add_size(size,
197 : mul_size(max_replication_slots, sizeof(ReplicationSlot)));
198 :
199 8508 : return size;
200 : }
201 :
202 : /*
203 : * Allocate and initialize shared memory for replication slots.
204 : */
205 : void
206 2204 : ReplicationSlotsShmemInit(void)
207 : {
208 : bool found;
209 :
210 2204 : if (max_replication_slots == 0)
211 2 : return;
212 :
213 2202 : ReplicationSlotCtl = (ReplicationSlotCtlData *)
214 2202 : ShmemInitStruct("ReplicationSlot Ctl", ReplicationSlotsShmemSize(),
215 : &found);
216 :
217 2202 : if (!found)
218 : {
219 : int i;
220 :
221 : /* First time through, so initialize */
222 4052 : MemSet(ReplicationSlotCtl, 0, ReplicationSlotsShmemSize());
223 :
224 23828 : for (i = 0; i < max_replication_slots; i++)
225 : {
226 21626 : ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i];
227 :
228 : /* everything else is zeroed by the memset above */
229 21626 : SpinLockInit(&slot->mutex);
230 21626 : LWLockInitialize(&slot->io_in_progress_lock,
231 : LWTRANCHE_REPLICATION_SLOT_IO);
232 21626 : ConditionVariableInit(&slot->active_cv);
233 : }
234 : }
235 : }
236 :
237 : /*
238 : * Register the callback for replication slot cleanup and releasing.
239 : */
240 : void
241 44518 : ReplicationSlotInitialize(void)
242 : {
243 44518 : before_shmem_exit(ReplicationSlotShmemExit, 0);
244 44518 : }
245 :
246 : /*
247 : * Release and cleanup replication slots.
248 : */
249 : static void
250 44518 : ReplicationSlotShmemExit(int code, Datum arg)
251 : {
252 : /* Make sure active replication slots are released */
253 44518 : if (MyReplicationSlot != NULL)
254 542 : ReplicationSlotRelease();
255 :
256 : /* Also cleanup all the temporary slots. */
257 44518 : ReplicationSlotCleanup(false);
258 44518 : }
259 :
260 : /*
261 : * Check whether the passed slot name is valid and report errors at elevel.
262 : *
263 : * See comments for ReplicationSlotValidateNameInternal().
264 : */
265 : bool
266 1572 : ReplicationSlotValidateName(const char *name, bool allow_reserved_name,
267 : int elevel)
268 : {
269 : int err_code;
270 1572 : char *err_msg = NULL;
271 1572 : char *err_hint = NULL;
272 :
273 1572 : if (!ReplicationSlotValidateNameInternal(name, allow_reserved_name,
274 : &err_code, &err_msg, &err_hint))
275 : {
276 : /*
277 : * Use errmsg_internal() and errhint_internal() instead of errmsg()
278 : * and errhint(), since the messages from
279 : * ReplicationSlotValidateNameInternal() are already translated. This
280 : * avoids double translation.
281 : */
282 8 : ereport(elevel,
283 : errcode(err_code),
284 : errmsg_internal("%s", err_msg),
285 : (err_hint != NULL) ? errhint_internal("%s", err_hint) : 0);
286 :
287 0 : pfree(err_msg);
288 0 : if (err_hint != NULL)
289 0 : pfree(err_hint);
290 0 : return false;
291 : }
292 :
293 1564 : return true;
294 : }
295 :
296 : /*
297 : * Check whether the passed slot name is valid.
298 : *
299 : * An error will be reported for a reserved replication slot name if
300 : * allow_reserved_name is set to false.
301 : *
302 : * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow
303 : * the name to be used as a directory name on every supported OS.
304 : *
305 : * Returns true if the slot name is valid. Otherwise, returns false and stores
306 : * the error code, error message, and optional hint in err_code, err_msg, and
307 : * err_hint, respectively. The caller is responsible for freeing err_msg and
308 : * err_hint, which are palloc'd.
309 : */
310 : bool
311 2004 : ReplicationSlotValidateNameInternal(const char *name, bool allow_reserved_name,
312 : int *err_code, char **err_msg, char **err_hint)
313 : {
314 : const char *cp;
315 :
316 2004 : if (strlen(name) == 0)
317 : {
318 6 : *err_code = ERRCODE_INVALID_NAME;
319 6 : *err_msg = psprintf(_("replication slot name \"%s\" is too short"), name);
320 6 : *err_hint = NULL;
321 6 : return false;
322 : }
323 :
324 1998 : if (strlen(name) >= NAMEDATALEN)
325 : {
326 0 : *err_code = ERRCODE_NAME_TOO_LONG;
327 0 : *err_msg = psprintf(_("replication slot name \"%s\" is too long"), name);
328 0 : *err_hint = NULL;
329 0 : return false;
330 : }
331 :
332 38500 : for (cp = name; *cp; cp++)
333 : {
334 36506 : if (!((*cp >= 'a' && *cp <= 'z')
335 17140 : || (*cp >= '0' && *cp <= '9')
336 3568 : || (*cp == '_')))
337 : {
338 4 : *err_code = ERRCODE_INVALID_NAME;
339 4 : *err_msg = psprintf(_("replication slot name \"%s\" contains invalid character"), name);
340 4 : *err_hint = psprintf(_("Replication slot names may only contain lower case letters, numbers, and the underscore character."));
341 4 : return false;
342 : }
343 : }
344 :
345 1994 : if (!allow_reserved_name && IsSlotForConflictCheck(name))
346 : {
347 2 : *err_code = ERRCODE_RESERVED_NAME;
348 2 : *err_msg = psprintf(_("replication slot name \"%s\" is reserved"), name);
349 2 : *err_hint = psprintf(_("The name \"%s\" is reserved for the conflict detection slot."),
350 : CONFLICT_DETECTION_SLOT);
351 2 : return false;
352 : }
353 :
354 1992 : return true;
355 : }
356 :
357 : /*
358 : * Return true if the replication slot name is "pg_conflict_detection".
359 : */
360 : static bool
361 4340 : IsSlotForConflictCheck(const char *name)
362 : {
363 4340 : return (strcmp(name, CONFLICT_DETECTION_SLOT) == 0);
364 : }
365 :
366 : /*
367 : * Create a new replication slot and mark it as used by this backend.
368 : *
369 : * name: Name of the slot
370 : * db_specific: logical decoding is db specific; if the slot is going to
371 : * be used for that pass true, otherwise false.
372 : * two_phase: Allows decoding of prepared transactions. We allow this option
373 : * to be enabled only at the slot creation time. If we allow this option
374 : * to be changed during decoding then it is quite possible that we skip
375 : * prepare first time because this option was not enabled. Now next time
376 : * during getting changes, if the two_phase option is enabled it can skip
377 : * prepare because by that time start decoding point has been moved. So the
378 : * user will only get commit prepared.
379 : * failover: If enabled, allows the slot to be synced to standbys so
380 : * that logical replication can be resumed after failover.
381 : * synced: True if the slot is synchronized from the primary server.
382 : */
383 : void
384 1318 : ReplicationSlotCreate(const char *name, bool db_specific,
385 : ReplicationSlotPersistency persistency,
386 : bool two_phase, bool failover, bool synced)
387 : {
388 1318 : ReplicationSlot *slot = NULL;
389 : int i;
390 :
391 : Assert(MyReplicationSlot == NULL);
392 :
393 : /*
394 : * The logical launcher or pg_upgrade may create or migrate an internal
395 : * slot, so using a reserved name is allowed in these cases.
396 : */
397 1318 : ReplicationSlotValidateName(name, IsBinaryUpgrade || IsLogicalLauncher(),
398 : ERROR);
399 :
400 1316 : if (failover)
401 : {
402 : /*
403 : * Do not allow users to create the failover enabled slots on the
404 : * standby as we do not support sync to the cascading standby.
405 : *
406 : * However, failover enabled slots can be created during slot
407 : * synchronization because we need to retain the same values as the
408 : * remote slot.
409 : */
410 48 : if (RecoveryInProgress() && !IsSyncingReplicationSlots())
411 0 : ereport(ERROR,
412 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
413 : errmsg("cannot enable failover for a replication slot created on the standby"));
414 :
415 : /*
416 : * Do not allow users to create failover enabled temporary slots,
417 : * because temporary slots will not be synced to the standby.
418 : *
419 : * However, failover enabled temporary slots can be created during
420 : * slot synchronization. See the comments atop slotsync.c for details.
421 : */
422 48 : if (persistency == RS_TEMPORARY && !IsSyncingReplicationSlots())
423 2 : ereport(ERROR,
424 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
425 : errmsg("cannot enable failover for a temporary replication slot"));
426 : }
427 :
428 : /*
429 : * If some other backend ran this code concurrently with us, we'd likely
430 : * both allocate the same slot, and that would be bad. We'd also be at
431 : * risk of missing a name collision. Also, we don't want to try to create
432 : * a new slot while somebody's busy cleaning up an old one, because we
433 : * might both be monkeying with the same directory.
434 : */
435 1314 : LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
436 :
437 : /*
438 : * Check for name collision, and identify an allocatable slot. We need to
439 : * hold ReplicationSlotControlLock in shared mode for this, so that nobody
440 : * else can change the in_use flags while we're looking at them.
441 : */
442 1314 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
443 12648 : for (i = 0; i < max_replication_slots; i++)
444 : {
445 11340 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
446 :
447 11340 : if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
448 6 : ereport(ERROR,
449 : (errcode(ERRCODE_DUPLICATE_OBJECT),
450 : errmsg("replication slot \"%s\" already exists", name)));
451 11334 : if (!s->in_use && slot == NULL)
452 1306 : slot = s;
453 : }
454 1308 : LWLockRelease(ReplicationSlotControlLock);
455 :
456 : /* If all slots are in use, we're out of luck. */
457 1308 : if (slot == NULL)
458 2 : ereport(ERROR,
459 : (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
460 : errmsg("all replication slots are in use"),
461 : errhint("Free one or increase \"max_replication_slots\".")));
462 :
463 : /*
464 : * Since this slot is not in use, nobody should be looking at any part of
465 : * it other than the in_use field unless they're trying to allocate it.
466 : * And since we hold ReplicationSlotAllocationLock, nobody except us can
467 : * be doing that. So it's safe to initialize the slot.
468 : */
469 : Assert(!slot->in_use);
470 : Assert(slot->active_pid == 0);
471 :
472 : /* first initialize persistent data */
473 1306 : memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData));
474 1306 : namestrcpy(&slot->data.name, name);
475 1306 : slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
476 1306 : slot->data.persistency = persistency;
477 1306 : slot->data.two_phase = two_phase;
478 1306 : slot->data.two_phase_at = InvalidXLogRecPtr;
479 1306 : slot->data.failover = failover;
480 1306 : slot->data.synced = synced;
481 :
482 : /* and then data only present in shared memory */
483 1306 : slot->just_dirtied = false;
484 1306 : slot->dirty = false;
485 1306 : slot->effective_xmin = InvalidTransactionId;
486 1306 : slot->effective_catalog_xmin = InvalidTransactionId;
487 1306 : slot->candidate_catalog_xmin = InvalidTransactionId;
488 1306 : slot->candidate_xmin_lsn = InvalidXLogRecPtr;
489 1306 : slot->candidate_restart_valid = InvalidXLogRecPtr;
490 1306 : slot->candidate_restart_lsn = InvalidXLogRecPtr;
491 1306 : slot->last_saved_confirmed_flush = InvalidXLogRecPtr;
492 1306 : slot->last_saved_restart_lsn = InvalidXLogRecPtr;
493 1306 : slot->inactive_since = 0;
494 1306 : slot->slotsync_skip_reason = SS_SKIP_NONE;
495 :
496 : /*
497 : * Create the slot on disk. We haven't actually marked the slot allocated
498 : * yet, so no special cleanup is required if this errors out.
499 : */
500 1306 : CreateSlotOnDisk(slot);
501 :
502 : /*
503 : * We need to briefly prevent any other backend from iterating over the
504 : * slots while we flip the in_use flag. We also need to set the active
505 : * flag while holding the ControlLock as otherwise a concurrent
506 : * ReplicationSlotAcquire() could acquire the slot as well.
507 : */
508 1306 : LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
509 :
510 1306 : slot->in_use = true;
511 :
512 : /* We can now mark the slot active, and that makes it our slot. */
513 1306 : SpinLockAcquire(&slot->mutex);
514 : Assert(slot->active_pid == 0);
515 1306 : slot->active_pid = MyProcPid;
516 1306 : SpinLockRelease(&slot->mutex);
517 1306 : MyReplicationSlot = slot;
518 :
519 1306 : LWLockRelease(ReplicationSlotControlLock);
520 :
521 : /*
522 : * Create statistics entry for the new logical slot. We don't collect any
523 : * stats for physical slots, so no need to create an entry for the same.
524 : * See ReplicationSlotDropPtr for why we need to do this before releasing
525 : * ReplicationSlotAllocationLock.
526 : */
527 1306 : if (SlotIsLogical(slot))
528 938 : pgstat_create_replslot(slot);
529 :
530 : /*
531 : * Now that the slot has been marked as in_use and active, it's safe to
532 : * let somebody else try to allocate a slot.
533 : */
534 1306 : LWLockRelease(ReplicationSlotAllocationLock);
535 :
536 : /* Let everybody know we've modified this slot */
537 1306 : ConditionVariableBroadcast(&slot->active_cv);
538 1306 : }
539 :
540 : /*
541 : * Search for the named replication slot.
542 : *
543 : * Return the replication slot if found, otherwise NULL.
544 : */
545 : ReplicationSlot *
546 3836 : SearchNamedReplicationSlot(const char *name, bool need_lock)
547 : {
548 : int i;
549 3836 : ReplicationSlot *slot = NULL;
550 :
551 3836 : if (need_lock)
552 1088 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
553 :
554 14574 : for (i = 0; i < max_replication_slots; i++)
555 : {
556 13660 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
557 :
558 13660 : if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
559 : {
560 2922 : slot = s;
561 2922 : break;
562 : }
563 : }
564 :
565 3836 : if (need_lock)
566 1088 : LWLockRelease(ReplicationSlotControlLock);
567 :
568 3836 : return slot;
569 : }
570 :
571 : /*
572 : * Return the index of the replication slot in
573 : * ReplicationSlotCtl->replication_slots.
574 : *
575 : * This is mainly useful to have an efficient key for storing replication slot
576 : * stats.
577 : */
578 : int
579 14936 : ReplicationSlotIndex(ReplicationSlot *slot)
580 : {
581 : Assert(slot >= ReplicationSlotCtl->replication_slots &&
582 : slot < ReplicationSlotCtl->replication_slots + max_replication_slots);
583 :
584 14936 : return slot - ReplicationSlotCtl->replication_slots;
585 : }
586 :
587 : /*
588 : * If the slot at 'index' is unused, return false. Otherwise 'name' is set to
589 : * the slot's name and true is returned.
590 : *
591 : * This likely is only useful for pgstat_replslot.c during shutdown, in other
592 : * cases there are obvious TOCTOU issues.
593 : */
594 : bool
595 190 : ReplicationSlotName(int index, Name name)
596 : {
597 : ReplicationSlot *slot;
598 : bool found;
599 :
600 190 : slot = &ReplicationSlotCtl->replication_slots[index];
601 :
602 : /*
603 : * Ensure that the slot cannot be dropped while we copy the name. Don't
604 : * need the spinlock as the name of an existing slot cannot change.
605 : */
606 190 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
607 190 : found = slot->in_use;
608 190 : if (slot->in_use)
609 190 : namestrcpy(name, NameStr(slot->data.name));
610 190 : LWLockRelease(ReplicationSlotControlLock);
611 :
612 190 : return found;
613 : }
614 :
615 : /*
616 : * Find a previously created slot and mark it as used by this process.
617 : *
618 : * An error is raised if nowait is true and the slot is currently in use. If
619 : * nowait is false, we sleep until the slot is released by the owning process.
620 : *
621 : * An error is raised if error_if_invalid is true and the slot is found to
622 : * be invalid. It should always be set to true, except when we are temporarily
623 : * acquiring the slot and don't intend to change it.
624 : */
625 : void
626 2598 : ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
627 : {
628 : ReplicationSlot *s;
629 : int active_pid;
630 :
631 : Assert(name != NULL);
632 :
633 2598 : retry:
634 : Assert(MyReplicationSlot == NULL);
635 :
636 2598 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
637 :
638 : /* Check if the slot exits with the given name. */
639 2598 : s = SearchNamedReplicationSlot(name, false);
640 2598 : if (s == NULL || !s->in_use)
641 : {
642 20 : LWLockRelease(ReplicationSlotControlLock);
643 :
644 20 : ereport(ERROR,
645 : (errcode(ERRCODE_UNDEFINED_OBJECT),
646 : errmsg("replication slot \"%s\" does not exist",
647 : name)));
648 : }
649 :
650 : /*
651 : * Do not allow users to acquire the reserved slot. This scenario may
652 : * occur if the launcher that owns the slot has terminated unexpectedly
653 : * due to an error, and a backend process attempts to reuse the slot.
654 : */
655 2578 : if (!IsLogicalLauncher() && IsSlotForConflictCheck(name))
656 0 : ereport(ERROR,
657 : errcode(ERRCODE_UNDEFINED_OBJECT),
658 : errmsg("cannot acquire replication slot \"%s\"", name),
659 : errdetail("The slot is reserved for conflict detection and can only be acquired by logical replication launcher."));
660 :
661 : /*
662 : * This is the slot we want; check if it's active under some other
663 : * process. In single user mode, we don't need this check.
664 : */
665 2578 : if (IsUnderPostmaster)
666 : {
667 : /*
668 : * Get ready to sleep on the slot in case it is active. (We may end
669 : * up not sleeping, but we don't want to do this while holding the
670 : * spinlock.)
671 : */
672 2568 : if (!nowait)
673 548 : ConditionVariablePrepareToSleep(&s->active_cv);
674 :
675 : /*
676 : * It is important to reset the inactive_since under spinlock here to
677 : * avoid race conditions with slot invalidation. See comments related
678 : * to inactive_since in InvalidatePossiblyObsoleteSlot.
679 : */
680 2568 : SpinLockAcquire(&s->mutex);
681 2568 : if (s->active_pid == 0)
682 2288 : s->active_pid = MyProcPid;
683 2568 : active_pid = s->active_pid;
684 2568 : ReplicationSlotSetInactiveSince(s, 0, false);
685 2568 : SpinLockRelease(&s->mutex);
686 : }
687 : else
688 : {
689 10 : s->active_pid = active_pid = MyProcPid;
690 10 : ReplicationSlotSetInactiveSince(s, 0, true);
691 : }
692 2578 : LWLockRelease(ReplicationSlotControlLock);
693 :
694 : /*
695 : * If we found the slot but it's already active in another process, we
696 : * wait until the owning process signals us that it's been released, or
697 : * error out.
698 : */
699 2578 : if (active_pid != MyProcPid)
700 : {
701 0 : if (!nowait)
702 : {
703 : /* Wait here until we get signaled, and then restart */
704 0 : ConditionVariableSleep(&s->active_cv,
705 : WAIT_EVENT_REPLICATION_SLOT_DROP);
706 0 : ConditionVariableCancelSleep();
707 0 : goto retry;
708 : }
709 :
710 0 : ereport(ERROR,
711 : (errcode(ERRCODE_OBJECT_IN_USE),
712 : errmsg("replication slot \"%s\" is active for PID %d",
713 : NameStr(s->data.name), active_pid)));
714 : }
715 2578 : else if (!nowait)
716 548 : ConditionVariableCancelSleep(); /* no sleep needed after all */
717 :
718 : /* We made this slot active, so it's ours now. */
719 2578 : MyReplicationSlot = s;
720 :
721 : /*
722 : * We need to check for invalidation after making the slot ours to avoid
723 : * the possible race condition with the checkpointer that can otherwise
724 : * invalidate the slot immediately after the check.
725 : */
726 2578 : if (error_if_invalid && s->data.invalidated != RS_INVAL_NONE)
727 14 : ereport(ERROR,
728 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
729 : errmsg("can no longer access replication slot \"%s\"",
730 : NameStr(s->data.name)),
731 : errdetail("This replication slot has been invalidated due to \"%s\".",
732 : GetSlotInvalidationCauseName(s->data.invalidated)));
733 :
734 : /* Let everybody know we've modified this slot */
735 2564 : ConditionVariableBroadcast(&s->active_cv);
736 :
737 : /*
738 : * The call to pgstat_acquire_replslot() protects against stats for a
739 : * different slot, from before a restart or such, being present during
740 : * pgstat_report_replslot().
741 : */
742 2564 : if (SlotIsLogical(s))
743 2146 : pgstat_acquire_replslot(s);
744 :
745 :
746 2564 : if (am_walsender)
747 : {
748 1754 : ereport(log_replication_commands ? LOG : DEBUG1,
749 : SlotIsLogical(s)
750 : ? errmsg("acquired logical replication slot \"%s\"",
751 : NameStr(s->data.name))
752 : : errmsg("acquired physical replication slot \"%s\"",
753 : NameStr(s->data.name)));
754 : }
755 2564 : }
756 :
757 : /*
758 : * Release the replication slot that this backend considers to own.
759 : *
760 : * This or another backend can re-acquire the slot later.
761 : * Resources this slot requires will be preserved.
762 : */
763 : void
764 3098 : ReplicationSlotRelease(void)
765 : {
766 3098 : ReplicationSlot *slot = MyReplicationSlot;
767 3098 : char *slotname = NULL; /* keep compiler quiet */
768 3098 : bool is_logical = false; /* keep compiler quiet */
769 3098 : TimestampTz now = 0;
770 :
771 : Assert(slot != NULL && slot->active_pid != 0);
772 :
773 3098 : if (am_walsender)
774 : {
775 2170 : slotname = pstrdup(NameStr(slot->data.name));
776 2170 : is_logical = SlotIsLogical(slot);
777 : }
778 :
779 3098 : if (slot->data.persistency == RS_EPHEMERAL)
780 : {
781 : /*
782 : * Delete the slot. There is no !PANIC case where this is allowed to
783 : * fail, all that may happen is an incomplete cleanup of the on-disk
784 : * data.
785 : */
786 10 : ReplicationSlotDropAcquired();
787 : }
788 :
789 : /*
790 : * If slot needed to temporarily restrain both data and catalog xmin to
791 : * create the catalog snapshot, remove that temporary constraint.
792 : * Snapshots can only be exported while the initial snapshot is still
793 : * acquired.
794 : */
795 3098 : if (!TransactionIdIsValid(slot->data.xmin) &&
796 3038 : TransactionIdIsValid(slot->effective_xmin))
797 : {
798 396 : SpinLockAcquire(&slot->mutex);
799 396 : slot->effective_xmin = InvalidTransactionId;
800 396 : SpinLockRelease(&slot->mutex);
801 396 : ReplicationSlotsComputeRequiredXmin(false);
802 : }
803 :
804 : /*
805 : * Set the time since the slot has become inactive. We get the current
806 : * time beforehand to avoid system call while holding the spinlock.
807 : */
808 3098 : now = GetCurrentTimestamp();
809 :
810 3098 : if (slot->data.persistency == RS_PERSISTENT)
811 : {
812 : /*
813 : * Mark persistent slot inactive. We're not freeing it, just
814 : * disconnecting, but wake up others that may be waiting for it.
815 : */
816 2520 : SpinLockAcquire(&slot->mutex);
817 2520 : slot->active_pid = 0;
818 2520 : ReplicationSlotSetInactiveSince(slot, now, false);
819 2520 : SpinLockRelease(&slot->mutex);
820 2520 : ConditionVariableBroadcast(&slot->active_cv);
821 : }
822 : else
823 578 : ReplicationSlotSetInactiveSince(slot, now, true);
824 :
825 3098 : MyReplicationSlot = NULL;
826 :
827 : /* might not have been set when we've been a plain slot */
828 3098 : LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
829 3098 : MyProc->statusFlags &= ~PROC_IN_LOGICAL_DECODING;
830 3098 : ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags;
831 3098 : LWLockRelease(ProcArrayLock);
832 :
833 3098 : if (am_walsender)
834 : {
835 2170 : ereport(log_replication_commands ? LOG : DEBUG1,
836 : is_logical
837 : ? errmsg("released logical replication slot \"%s\"",
838 : slotname)
839 : : errmsg("released physical replication slot \"%s\"",
840 : slotname));
841 :
842 2170 : pfree(slotname);
843 : }
844 3098 : }
845 :
846 : /*
847 : * Cleanup temporary slots created in current session.
848 : *
849 : * Cleanup only synced temporary slots if 'synced_only' is true, else
850 : * cleanup all temporary slots.
851 : */
852 : void
853 89334 : ReplicationSlotCleanup(bool synced_only)
854 : {
855 : int i;
856 :
857 : Assert(MyReplicationSlot == NULL);
858 :
859 89622 : restart:
860 89622 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
861 971254 : for (i = 0; i < max_replication_slots; i++)
862 : {
863 881920 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
864 :
865 881920 : if (!s->in_use)
866 853214 : continue;
867 :
868 28706 : SpinLockAcquire(&s->mutex);
869 28706 : if ((s->active_pid == MyProcPid &&
870 288 : (!synced_only || s->data.synced)))
871 : {
872 : Assert(s->data.persistency == RS_TEMPORARY);
873 288 : SpinLockRelease(&s->mutex);
874 288 : LWLockRelease(ReplicationSlotControlLock); /* avoid deadlock */
875 :
876 288 : ReplicationSlotDropPtr(s);
877 :
878 288 : ConditionVariableBroadcast(&s->active_cv);
879 288 : goto restart;
880 : }
881 : else
882 28418 : SpinLockRelease(&s->mutex);
883 : }
884 :
885 89334 : LWLockRelease(ReplicationSlotControlLock);
886 89334 : }
887 :
888 : /*
889 : * Permanently drop replication slot identified by the passed in name.
890 : */
891 : void
892 826 : ReplicationSlotDrop(const char *name, bool nowait)
893 : {
894 : Assert(MyReplicationSlot == NULL);
895 :
896 826 : ReplicationSlotAcquire(name, nowait, false);
897 :
898 : /*
899 : * Do not allow users to drop the slots which are currently being synced
900 : * from the primary to the standby.
901 : */
902 810 : if (RecoveryInProgress() && MyReplicationSlot->data.synced)
903 2 : ereport(ERROR,
904 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
905 : errmsg("cannot drop replication slot \"%s\"", name),
906 : errdetail("This replication slot is being synchronized from the primary server."));
907 :
908 808 : ReplicationSlotDropAcquired();
909 808 : }
910 :
911 : /*
912 : * Change the definition of the slot identified by the specified name.
913 : */
914 : void
915 14 : ReplicationSlotAlter(const char *name, const bool *failover,
916 : const bool *two_phase)
917 : {
918 14 : bool update_slot = false;
919 :
920 : Assert(MyReplicationSlot == NULL);
921 : Assert(failover || two_phase);
922 :
923 14 : ReplicationSlotAcquire(name, false, true);
924 :
925 12 : if (SlotIsPhysical(MyReplicationSlot))
926 0 : ereport(ERROR,
927 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
928 : errmsg("cannot use %s with a physical replication slot",
929 : "ALTER_REPLICATION_SLOT"));
930 :
931 12 : if (RecoveryInProgress())
932 : {
933 : /*
934 : * Do not allow users to alter the slots which are currently being
935 : * synced from the primary to the standby.
936 : */
937 2 : if (MyReplicationSlot->data.synced)
938 2 : ereport(ERROR,
939 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
940 : errmsg("cannot alter replication slot \"%s\"", name),
941 : errdetail("This replication slot is being synchronized from the primary server."));
942 :
943 : /*
944 : * Do not allow users to enable failover on the standby as we do not
945 : * support sync to the cascading standby.
946 : */
947 0 : if (failover && *failover)
948 0 : ereport(ERROR,
949 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
950 : errmsg("cannot enable failover for a replication slot"
951 : " on the standby"));
952 : }
953 :
954 10 : if (failover)
955 : {
956 : /*
957 : * Do not allow users to enable failover for temporary slots as we do
958 : * not support syncing temporary slots to the standby.
959 : */
960 8 : if (*failover && MyReplicationSlot->data.persistency == RS_TEMPORARY)
961 0 : ereport(ERROR,
962 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
963 : errmsg("cannot enable failover for a temporary replication slot"));
964 :
965 8 : if (MyReplicationSlot->data.failover != *failover)
966 : {
967 8 : SpinLockAcquire(&MyReplicationSlot->mutex);
968 8 : MyReplicationSlot->data.failover = *failover;
969 8 : SpinLockRelease(&MyReplicationSlot->mutex);
970 :
971 8 : update_slot = true;
972 : }
973 : }
974 :
975 10 : if (two_phase && MyReplicationSlot->data.two_phase != *two_phase)
976 : {
977 2 : SpinLockAcquire(&MyReplicationSlot->mutex);
978 2 : MyReplicationSlot->data.two_phase = *two_phase;
979 2 : SpinLockRelease(&MyReplicationSlot->mutex);
980 :
981 2 : update_slot = true;
982 : }
983 :
984 10 : if (update_slot)
985 : {
986 10 : ReplicationSlotMarkDirty();
987 10 : ReplicationSlotSave();
988 : }
989 :
990 10 : ReplicationSlotRelease();
991 10 : }
992 :
993 : /*
994 : * Permanently drop the currently acquired replication slot.
995 : */
996 : void
997 834 : ReplicationSlotDropAcquired(void)
998 : {
999 834 : ReplicationSlot *slot = MyReplicationSlot;
1000 :
1001 : Assert(MyReplicationSlot != NULL);
1002 :
1003 : /* slot isn't acquired anymore */
1004 834 : MyReplicationSlot = NULL;
1005 :
1006 834 : ReplicationSlotDropPtr(slot);
1007 834 : }
1008 :
1009 : /*
1010 : * Permanently drop the replication slot which will be released by the point
1011 : * this function returns.
1012 : */
1013 : static void
1014 1122 : ReplicationSlotDropPtr(ReplicationSlot *slot)
1015 : {
1016 : char path[MAXPGPATH];
1017 : char tmppath[MAXPGPATH];
1018 :
1019 : /*
1020 : * If some other backend ran this code concurrently with us, we might try
1021 : * to delete a slot with a certain name while someone else was trying to
1022 : * create a slot with the same name.
1023 : */
1024 1122 : LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
1025 :
1026 : /* Generate pathnames. */
1027 1122 : sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(slot->data.name));
1028 1122 : sprintf(tmppath, "%s/%s.tmp", PG_REPLSLOT_DIR, NameStr(slot->data.name));
1029 :
1030 : /*
1031 : * Rename the slot directory on disk, so that we'll no longer recognize
1032 : * this as a valid slot. Note that if this fails, we've got to mark the
1033 : * slot inactive before bailing out. If we're dropping an ephemeral or a
1034 : * temporary slot, we better never fail hard as the caller won't expect
1035 : * the slot to survive and this might get called during error handling.
1036 : */
1037 1122 : if (rename(path, tmppath) == 0)
1038 : {
1039 : /*
1040 : * We need to fsync() the directory we just renamed and its parent to
1041 : * make sure that our changes are on disk in a crash-safe fashion. If
1042 : * fsync() fails, we can't be sure whether the changes are on disk or
1043 : * not. For now, we handle that by panicking;
1044 : * StartupReplicationSlots() will try to straighten it out after
1045 : * restart.
1046 : */
1047 1122 : START_CRIT_SECTION();
1048 1122 : fsync_fname(tmppath, true);
1049 1122 : fsync_fname(PG_REPLSLOT_DIR, true);
1050 1122 : END_CRIT_SECTION();
1051 : }
1052 : else
1053 : {
1054 0 : bool fail_softly = slot->data.persistency != RS_PERSISTENT;
1055 :
1056 0 : SpinLockAcquire(&slot->mutex);
1057 0 : slot->active_pid = 0;
1058 0 : SpinLockRelease(&slot->mutex);
1059 :
1060 : /* wake up anyone waiting on this slot */
1061 0 : ConditionVariableBroadcast(&slot->active_cv);
1062 :
1063 0 : ereport(fail_softly ? WARNING : ERROR,
1064 : (errcode_for_file_access(),
1065 : errmsg("could not rename file \"%s\" to \"%s\": %m",
1066 : path, tmppath)));
1067 : }
1068 :
1069 : /*
1070 : * The slot is definitely gone. Lock out concurrent scans of the array
1071 : * long enough to kill it. It's OK to clear the active PID here without
1072 : * grabbing the mutex because nobody else can be scanning the array here,
1073 : * and nobody can be attached to this slot and thus access it without
1074 : * scanning the array.
1075 : *
1076 : * Also wake up processes waiting for it.
1077 : */
1078 1122 : LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
1079 1122 : slot->active_pid = 0;
1080 1122 : slot->in_use = false;
1081 1122 : LWLockRelease(ReplicationSlotControlLock);
1082 1122 : ConditionVariableBroadcast(&slot->active_cv);
1083 :
1084 : /*
1085 : * Slot is dead and doesn't prevent resource removal anymore, recompute
1086 : * limits.
1087 : */
1088 1122 : ReplicationSlotsComputeRequiredXmin(false);
1089 1122 : ReplicationSlotsComputeRequiredLSN();
1090 :
1091 : /*
1092 : * If removing the directory fails, the worst thing that will happen is
1093 : * that the user won't be able to create a new slot with the same name
1094 : * until the next server restart. We warn about it, but that's all.
1095 : */
1096 1122 : if (!rmtree(tmppath, true))
1097 0 : ereport(WARNING,
1098 : (errmsg("could not remove directory \"%s\"", tmppath)));
1099 :
1100 : /*
1101 : * Drop the statistics entry for the replication slot. Do this while
1102 : * holding ReplicationSlotAllocationLock so that we don't drop a
1103 : * statistics entry for another slot with the same name just created in
1104 : * another session.
1105 : */
1106 1122 : if (SlotIsLogical(slot))
1107 812 : pgstat_drop_replslot(slot);
1108 :
1109 : /*
1110 : * We release this at the very end, so that nobody starts trying to create
1111 : * a slot while we're still cleaning up the detritus of the old one.
1112 : */
1113 1122 : LWLockRelease(ReplicationSlotAllocationLock);
1114 1122 : }
1115 :
1116 : /*
1117 : * Serialize the currently acquired slot's state from memory to disk, thereby
1118 : * guaranteeing the current state will survive a crash.
1119 : */
1120 : void
1121 2708 : ReplicationSlotSave(void)
1122 : {
1123 : char path[MAXPGPATH];
1124 :
1125 : Assert(MyReplicationSlot != NULL);
1126 :
1127 2708 : sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(MyReplicationSlot->data.name));
1128 2708 : SaveSlotToPath(MyReplicationSlot, path, ERROR);
1129 2708 : }
1130 :
1131 : /*
1132 : * Signal that it would be useful if the currently acquired slot would be
1133 : * flushed out to disk.
1134 : *
1135 : * Note that the actual flush to disk can be delayed for a long time, if
1136 : * required for correctness explicitly do a ReplicationSlotSave().
1137 : */
1138 : void
1139 55800 : ReplicationSlotMarkDirty(void)
1140 : {
1141 55800 : ReplicationSlot *slot = MyReplicationSlot;
1142 :
1143 : Assert(MyReplicationSlot != NULL);
1144 :
1145 55800 : SpinLockAcquire(&slot->mutex);
1146 55800 : MyReplicationSlot->just_dirtied = true;
1147 55800 : MyReplicationSlot->dirty = true;
1148 55800 : SpinLockRelease(&slot->mutex);
1149 55800 : }
1150 :
1151 : /*
1152 : * Convert a slot that's marked as RS_EPHEMERAL or RS_TEMPORARY to a
1153 : * RS_PERSISTENT slot, guaranteeing it will be there after an eventual crash.
1154 : */
1155 : void
1156 910 : ReplicationSlotPersist(void)
1157 : {
1158 910 : ReplicationSlot *slot = MyReplicationSlot;
1159 :
1160 : Assert(slot != NULL);
1161 : Assert(slot->data.persistency != RS_PERSISTENT);
1162 :
1163 910 : SpinLockAcquire(&slot->mutex);
1164 910 : slot->data.persistency = RS_PERSISTENT;
1165 910 : SpinLockRelease(&slot->mutex);
1166 :
1167 910 : ReplicationSlotMarkDirty();
1168 910 : ReplicationSlotSave();
1169 910 : }
1170 :
1171 : /*
1172 : * Compute the oldest xmin across all slots and store it in the ProcArray.
1173 : *
1174 : * If already_locked is true, ProcArrayLock has already been acquired
1175 : * exclusively.
1176 : */
1177 : void
1178 4752 : ReplicationSlotsComputeRequiredXmin(bool already_locked)
1179 : {
1180 : int i;
1181 4752 : TransactionId agg_xmin = InvalidTransactionId;
1182 4752 : TransactionId agg_catalog_xmin = InvalidTransactionId;
1183 :
1184 : Assert(ReplicationSlotCtl != NULL);
1185 :
1186 4752 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1187 :
1188 48152 : for (i = 0; i < max_replication_slots; i++)
1189 : {
1190 43400 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
1191 : TransactionId effective_xmin;
1192 : TransactionId effective_catalog_xmin;
1193 : bool invalidated;
1194 :
1195 43400 : if (!s->in_use)
1196 38912 : continue;
1197 :
1198 4488 : SpinLockAcquire(&s->mutex);
1199 4488 : effective_xmin = s->effective_xmin;
1200 4488 : effective_catalog_xmin = s->effective_catalog_xmin;
1201 4488 : invalidated = s->data.invalidated != RS_INVAL_NONE;
1202 4488 : SpinLockRelease(&s->mutex);
1203 :
1204 : /* invalidated slots need not apply */
1205 4488 : if (invalidated)
1206 44 : continue;
1207 :
1208 : /* check the data xmin */
1209 4444 : if (TransactionIdIsValid(effective_xmin) &&
1210 20 : (!TransactionIdIsValid(agg_xmin) ||
1211 20 : TransactionIdPrecedes(effective_xmin, agg_xmin)))
1212 680 : agg_xmin = effective_xmin;
1213 :
1214 : /* check the catalog xmin */
1215 4444 : if (TransactionIdIsValid(effective_catalog_xmin) &&
1216 1802 : (!TransactionIdIsValid(agg_catalog_xmin) ||
1217 1802 : TransactionIdPrecedes(effective_catalog_xmin, agg_catalog_xmin)))
1218 2370 : agg_catalog_xmin = effective_catalog_xmin;
1219 : }
1220 :
1221 4752 : LWLockRelease(ReplicationSlotControlLock);
1222 :
1223 4752 : ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
1224 4752 : }
1225 :
1226 : /*
1227 : * Compute the oldest restart LSN across all slots and inform xlog module.
1228 : *
1229 : * Note: while max_slot_wal_keep_size is theoretically relevant for this
1230 : * purpose, we don't try to account for that, because this module doesn't
1231 : * know what to compare against.
1232 : */
1233 : void
1234 57150 : ReplicationSlotsComputeRequiredLSN(void)
1235 : {
1236 : int i;
1237 57150 : XLogRecPtr min_required = InvalidXLogRecPtr;
1238 :
1239 : Assert(ReplicationSlotCtl != NULL);
1240 :
1241 57150 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1242 620586 : for (i = 0; i < max_replication_slots; i++)
1243 : {
1244 563436 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
1245 : XLogRecPtr restart_lsn;
1246 : XLogRecPtr last_saved_restart_lsn;
1247 : bool invalidated;
1248 : ReplicationSlotPersistency persistency;
1249 :
1250 563436 : if (!s->in_use)
1251 506260 : continue;
1252 :
1253 57176 : SpinLockAcquire(&s->mutex);
1254 57176 : persistency = s->data.persistency;
1255 57176 : restart_lsn = s->data.restart_lsn;
1256 57176 : invalidated = s->data.invalidated != RS_INVAL_NONE;
1257 57176 : last_saved_restart_lsn = s->last_saved_restart_lsn;
1258 57176 : SpinLockRelease(&s->mutex);
1259 :
1260 : /* invalidated slots need not apply */
1261 57176 : if (invalidated)
1262 46 : continue;
1263 :
1264 : /*
1265 : * For persistent slot use last_saved_restart_lsn to compute the
1266 : * oldest LSN for removal of WAL segments. The segments between
1267 : * last_saved_restart_lsn and restart_lsn might be needed by a
1268 : * persistent slot in the case of database crash. Non-persistent
1269 : * slots can't survive the database crash, so we don't care about
1270 : * last_saved_restart_lsn for them.
1271 : */
1272 57130 : if (persistency == RS_PERSISTENT)
1273 : {
1274 55450 : if (XLogRecPtrIsValid(last_saved_restart_lsn) &&
1275 : restart_lsn > last_saved_restart_lsn)
1276 : {
1277 51222 : restart_lsn = last_saved_restart_lsn;
1278 : }
1279 : }
1280 :
1281 57130 : if (XLogRecPtrIsValid(restart_lsn) &&
1282 2158 : (!XLogRecPtrIsValid(min_required) ||
1283 : restart_lsn < min_required))
1284 55142 : min_required = restart_lsn;
1285 : }
1286 57150 : LWLockRelease(ReplicationSlotControlLock);
1287 :
1288 57150 : XLogSetReplicationSlotMinimumLSN(min_required);
1289 57150 : }
1290 :
1291 : /*
1292 : * Compute the oldest WAL LSN required by *logical* decoding slots..
1293 : *
1294 : * Returns InvalidXLogRecPtr if logical decoding is disabled or no logical
1295 : * slots exist.
1296 : *
1297 : * NB: this returns a value >= ReplicationSlotsComputeRequiredLSN(), since it
1298 : * ignores physical replication slots.
1299 : *
1300 : * The results aren't required frequently, so we don't maintain a precomputed
1301 : * value like we do for ComputeRequiredLSN() and ComputeRequiredXmin().
1302 : */
1303 : XLogRecPtr
1304 6932 : ReplicationSlotsComputeLogicalRestartLSN(void)
1305 : {
1306 6932 : XLogRecPtr result = InvalidXLogRecPtr;
1307 : int i;
1308 :
1309 6932 : if (max_replication_slots <= 0)
1310 4 : return InvalidXLogRecPtr;
1311 :
1312 6928 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1313 :
1314 75116 : for (i = 0; i < max_replication_slots; i++)
1315 : {
1316 : ReplicationSlot *s;
1317 : XLogRecPtr restart_lsn;
1318 : XLogRecPtr last_saved_restart_lsn;
1319 : bool invalidated;
1320 : ReplicationSlotPersistency persistency;
1321 :
1322 68188 : s = &ReplicationSlotCtl->replication_slots[i];
1323 :
1324 : /* cannot change while ReplicationSlotCtlLock is held */
1325 68188 : if (!s->in_use)
1326 66692 : continue;
1327 :
1328 : /* we're only interested in logical slots */
1329 1496 : if (!SlotIsLogical(s))
1330 1068 : continue;
1331 :
1332 : /* read once, it's ok if it increases while we're checking */
1333 428 : SpinLockAcquire(&s->mutex);
1334 428 : persistency = s->data.persistency;
1335 428 : restart_lsn = s->data.restart_lsn;
1336 428 : invalidated = s->data.invalidated != RS_INVAL_NONE;
1337 428 : last_saved_restart_lsn = s->last_saved_restart_lsn;
1338 428 : SpinLockRelease(&s->mutex);
1339 :
1340 : /* invalidated slots need not apply */
1341 428 : if (invalidated)
1342 8 : continue;
1343 :
1344 : /*
1345 : * For persistent slot use last_saved_restart_lsn to compute the
1346 : * oldest LSN for removal of WAL segments. The segments between
1347 : * last_saved_restart_lsn and restart_lsn might be needed by a
1348 : * persistent slot in the case of database crash. Non-persistent
1349 : * slots can't survive the database crash, so we don't care about
1350 : * last_saved_restart_lsn for them.
1351 : */
1352 420 : if (persistency == RS_PERSISTENT)
1353 : {
1354 416 : if (XLogRecPtrIsValid(last_saved_restart_lsn) &&
1355 : restart_lsn > last_saved_restart_lsn)
1356 : {
1357 0 : restart_lsn = last_saved_restart_lsn;
1358 : }
1359 : }
1360 :
1361 420 : if (!XLogRecPtrIsValid(restart_lsn))
1362 0 : continue;
1363 :
1364 420 : if (!XLogRecPtrIsValid(result) ||
1365 : restart_lsn < result)
1366 328 : result = restart_lsn;
1367 : }
1368 :
1369 6928 : LWLockRelease(ReplicationSlotControlLock);
1370 :
1371 6928 : return result;
1372 : }
1373 :
1374 : /*
1375 : * ReplicationSlotsCountDBSlots -- count the number of slots that refer to the
1376 : * passed database oid.
1377 : *
1378 : * Returns true if there are any slots referencing the database. *nslots will
1379 : * be set to the absolute number of slots in the database, *nactive to ones
1380 : * currently active.
1381 : */
1382 : bool
1383 92 : ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
1384 : {
1385 : int i;
1386 :
1387 92 : *nslots = *nactive = 0;
1388 :
1389 92 : if (max_replication_slots <= 0)
1390 0 : return false;
1391 :
1392 92 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1393 954 : for (i = 0; i < max_replication_slots; i++)
1394 : {
1395 : ReplicationSlot *s;
1396 :
1397 862 : s = &ReplicationSlotCtl->replication_slots[i];
1398 :
1399 : /* cannot change while ReplicationSlotCtlLock is held */
1400 862 : if (!s->in_use)
1401 820 : continue;
1402 :
1403 : /* only logical slots are database specific, skip */
1404 42 : if (!SlotIsLogical(s))
1405 20 : continue;
1406 :
1407 : /* not our database, skip */
1408 22 : if (s->data.database != dboid)
1409 16 : continue;
1410 :
1411 : /* NB: intentionally counting invalidated slots */
1412 :
1413 : /* count slots with spinlock held */
1414 6 : SpinLockAcquire(&s->mutex);
1415 6 : (*nslots)++;
1416 6 : if (s->active_pid != 0)
1417 2 : (*nactive)++;
1418 6 : SpinLockRelease(&s->mutex);
1419 : }
1420 92 : LWLockRelease(ReplicationSlotControlLock);
1421 :
1422 92 : if (*nslots > 0)
1423 6 : return true;
1424 86 : return false;
1425 : }
1426 :
1427 : /*
1428 : * ReplicationSlotsDropDBSlots -- Drop all db-specific slots relating to the
1429 : * passed database oid. The caller should hold an exclusive lock on the
1430 : * pg_database oid for the database to prevent creation of new slots on the db
1431 : * or replay from existing slots.
1432 : *
1433 : * Another session that concurrently acquires an existing slot on the target DB
1434 : * (most likely to drop it) may cause this function to ERROR. If that happens
1435 : * it may have dropped some but not all slots.
1436 : *
1437 : * This routine isn't as efficient as it could be - but we don't drop
1438 : * databases often, especially databases with lots of slots.
1439 : */
1440 : void
1441 118 : ReplicationSlotsDropDBSlots(Oid dboid)
1442 : {
1443 : int i;
1444 :
1445 118 : if (max_replication_slots <= 0)
1446 0 : return;
1447 :
1448 118 : restart:
1449 128 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1450 1234 : for (i = 0; i < max_replication_slots; i++)
1451 : {
1452 : ReplicationSlot *s;
1453 : char *slotname;
1454 : int active_pid;
1455 :
1456 1116 : s = &ReplicationSlotCtl->replication_slots[i];
1457 :
1458 : /* cannot change while ReplicationSlotCtlLock is held */
1459 1116 : if (!s->in_use)
1460 1056 : continue;
1461 :
1462 : /* only logical slots are database specific, skip */
1463 60 : if (!SlotIsLogical(s))
1464 22 : continue;
1465 :
1466 : /* not our database, skip */
1467 38 : if (s->data.database != dboid)
1468 28 : continue;
1469 :
1470 : /* NB: intentionally including invalidated slots */
1471 :
1472 : /* acquire slot, so ReplicationSlotDropAcquired can be reused */
1473 10 : SpinLockAcquire(&s->mutex);
1474 : /* can't change while ReplicationSlotControlLock is held */
1475 10 : slotname = NameStr(s->data.name);
1476 10 : active_pid = s->active_pid;
1477 10 : if (active_pid == 0)
1478 : {
1479 10 : MyReplicationSlot = s;
1480 10 : s->active_pid = MyProcPid;
1481 : }
1482 10 : SpinLockRelease(&s->mutex);
1483 :
1484 : /*
1485 : * Even though we hold an exclusive lock on the database object a
1486 : * logical slot for that DB can still be active, e.g. if it's
1487 : * concurrently being dropped by a backend connected to another DB.
1488 : *
1489 : * That's fairly unlikely in practice, so we'll just bail out.
1490 : *
1491 : * The slot sync worker holds a shared lock on the database before
1492 : * operating on synced logical slots to avoid conflict with the drop
1493 : * happening here. The persistent synced slots are thus safe but there
1494 : * is a possibility that the slot sync worker has created a temporary
1495 : * slot (which stays active even on release) and we are trying to drop
1496 : * that here. In practice, the chances of hitting this scenario are
1497 : * less as during slot synchronization, the temporary slot is
1498 : * immediately converted to persistent and thus is safe due to the
1499 : * shared lock taken on the database. So, we'll just bail out in such
1500 : * a case.
1501 : *
1502 : * XXX: We can consider shutting down the slot sync worker before
1503 : * trying to drop synced temporary slots here.
1504 : */
1505 10 : if (active_pid)
1506 0 : ereport(ERROR,
1507 : (errcode(ERRCODE_OBJECT_IN_USE),
1508 : errmsg("replication slot \"%s\" is active for PID %d",
1509 : slotname, active_pid)));
1510 :
1511 : /*
1512 : * To avoid duplicating ReplicationSlotDropAcquired() and to avoid
1513 : * holding ReplicationSlotControlLock over filesystem operations,
1514 : * release ReplicationSlotControlLock and use
1515 : * ReplicationSlotDropAcquired.
1516 : *
1517 : * As that means the set of slots could change, restart scan from the
1518 : * beginning each time we release the lock.
1519 : */
1520 10 : LWLockRelease(ReplicationSlotControlLock);
1521 10 : ReplicationSlotDropAcquired();
1522 10 : goto restart;
1523 : }
1524 118 : LWLockRelease(ReplicationSlotControlLock);
1525 : }
1526 :
1527 :
1528 : /*
1529 : * Check whether the server's configuration supports using replication
1530 : * slots.
1531 : */
1532 : void
1533 3514 : CheckSlotRequirements(void)
1534 : {
1535 : /*
1536 : * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
1537 : * needs the same check.
1538 : */
1539 :
1540 3514 : if (max_replication_slots == 0)
1541 0 : ereport(ERROR,
1542 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1543 : errmsg("replication slots can only be used if \"max_replication_slots\" > 0")));
1544 :
1545 3514 : if (wal_level < WAL_LEVEL_REPLICA)
1546 0 : ereport(ERROR,
1547 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1548 : errmsg("replication slots can only be used if \"wal_level\" >= \"replica\"")));
1549 3514 : }
1550 :
1551 : /*
1552 : * Check whether the user has privilege to use replication slots.
1553 : */
1554 : void
1555 1128 : CheckSlotPermissions(void)
1556 : {
1557 1128 : if (!has_rolreplication(GetUserId()))
1558 10 : ereport(ERROR,
1559 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1560 : errmsg("permission denied to use replication slots"),
1561 : errdetail("Only roles with the %s attribute may use replication slots.",
1562 : "REPLICATION")));
1563 1118 : }
1564 :
1565 : /*
1566 : * Reserve WAL for the currently active slot.
1567 : *
1568 : * Compute and set restart_lsn in a manner that's appropriate for the type of
1569 : * the slot and concurrency safe.
1570 : */
1571 : void
1572 1214 : ReplicationSlotReserveWal(void)
1573 : {
1574 1214 : ReplicationSlot *slot = MyReplicationSlot;
1575 :
1576 : Assert(slot != NULL);
1577 : Assert(!XLogRecPtrIsValid(slot->data.restart_lsn));
1578 : Assert(!XLogRecPtrIsValid(slot->last_saved_restart_lsn));
1579 :
1580 : /*
1581 : * The replication slot mechanism is used to prevent removal of required
1582 : * WAL. As there is no interlock between this routine and checkpoints, WAL
1583 : * segments could concurrently be removed when a now stale return value of
1584 : * ReplicationSlotsComputeRequiredLSN() is used. In the unlikely case that
1585 : * this happens we'll just retry.
1586 : */
1587 : while (true)
1588 0 : {
1589 : XLogSegNo segno;
1590 : XLogRecPtr restart_lsn;
1591 :
1592 : /*
1593 : * For logical slots log a standby snapshot and start logical decoding
1594 : * at exactly that position. That allows the slot to start up more
1595 : * quickly. But on a standby we cannot do WAL writes, so just use the
1596 : * replay pointer; effectively, an attempt to create a logical slot on
1597 : * standby will cause it to wait for an xl_running_xact record to be
1598 : * logged independently on the primary, so that a snapshot can be
1599 : * built using the record.
1600 : *
1601 : * None of this is needed (or indeed helpful) for physical slots as
1602 : * they'll start replay at the last logged checkpoint anyway. Instead
1603 : * return the location of the last redo LSN. While that slightly
1604 : * increases the chance that we have to retry, it's where a base
1605 : * backup has to start replay at.
1606 : */
1607 1214 : if (SlotIsPhysical(slot))
1608 306 : restart_lsn = GetRedoRecPtr();
1609 908 : else if (RecoveryInProgress())
1610 46 : restart_lsn = GetXLogReplayRecPtr(NULL);
1611 : else
1612 862 : restart_lsn = GetXLogInsertRecPtr();
1613 :
1614 1214 : SpinLockAcquire(&slot->mutex);
1615 1214 : slot->data.restart_lsn = restart_lsn;
1616 1214 : SpinLockRelease(&slot->mutex);
1617 :
1618 : /* prevent WAL removal as fast as possible */
1619 1214 : ReplicationSlotsComputeRequiredLSN();
1620 :
1621 : /*
1622 : * If all required WAL is still there, great, otherwise retry. The
1623 : * slot should prevent further removal of WAL, unless there's a
1624 : * concurrent ReplicationSlotsComputeRequiredLSN() after we've written
1625 : * the new restart_lsn above, so normally we should never need to loop
1626 : * more than twice.
1627 : */
1628 1214 : XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size);
1629 1214 : if (XLogGetLastRemovedSegno() < segno)
1630 1214 : break;
1631 : }
1632 :
1633 1214 : if (!RecoveryInProgress() && SlotIsLogical(slot))
1634 : {
1635 : XLogRecPtr flushptr;
1636 :
1637 : /* make sure we have enough information to start */
1638 862 : flushptr = LogStandbySnapshot();
1639 :
1640 : /* and make sure it's fsynced to disk */
1641 862 : XLogFlush(flushptr);
1642 : }
1643 1214 : }
1644 :
1645 : /*
1646 : * Report that replication slot needs to be invalidated
1647 : */
1648 : static void
1649 42 : ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
1650 : bool terminating,
1651 : int pid,
1652 : NameData slotname,
1653 : XLogRecPtr restart_lsn,
1654 : XLogRecPtr oldestLSN,
1655 : TransactionId snapshotConflictHorizon,
1656 : long slot_idle_seconds)
1657 : {
1658 : StringInfoData err_detail;
1659 : StringInfoData err_hint;
1660 :
1661 42 : initStringInfo(&err_detail);
1662 42 : initStringInfo(&err_hint);
1663 :
1664 42 : switch (cause)
1665 : {
1666 12 : case RS_INVAL_WAL_REMOVED:
1667 : {
1668 12 : uint64 ex = oldestLSN - restart_lsn;
1669 :
1670 12 : appendStringInfo(&err_detail,
1671 12 : ngettext("The slot's restart_lsn %X/%08X exceeds the limit by %" PRIu64 " byte.",
1672 : "The slot's restart_lsn %X/%08X exceeds the limit by %" PRIu64 " bytes.",
1673 : ex),
1674 12 : LSN_FORMAT_ARGS(restart_lsn),
1675 : ex);
1676 : /* translator: %s is a GUC variable name */
1677 12 : appendStringInfo(&err_hint, _("You might need to increase \"%s\"."),
1678 : "max_slot_wal_keep_size");
1679 12 : break;
1680 : }
1681 24 : case RS_INVAL_HORIZON:
1682 24 : appendStringInfo(&err_detail, _("The slot conflicted with xid horizon %u."),
1683 : snapshotConflictHorizon);
1684 24 : break;
1685 :
1686 6 : case RS_INVAL_WAL_LEVEL:
1687 6 : appendStringInfoString(&err_detail, _("Logical decoding on standby requires \"wal_level\" >= \"logical\" on the primary server."));
1688 6 : break;
1689 :
1690 0 : case RS_INVAL_IDLE_TIMEOUT:
1691 : {
1692 : /* translator: %s is a GUC variable name */
1693 0 : appendStringInfo(&err_detail, _("The slot's idle time of %lds exceeds the configured \"%s\" duration of %ds."),
1694 : slot_idle_seconds, "idle_replication_slot_timeout",
1695 : idle_replication_slot_timeout_secs);
1696 : /* translator: %s is a GUC variable name */
1697 0 : appendStringInfo(&err_hint, _("You might need to increase \"%s\"."),
1698 : "idle_replication_slot_timeout");
1699 0 : break;
1700 : }
1701 : case RS_INVAL_NONE:
1702 : pg_unreachable();
1703 : }
1704 :
1705 42 : ereport(LOG,
1706 : terminating ?
1707 : errmsg("terminating process %d to release replication slot \"%s\"",
1708 : pid, NameStr(slotname)) :
1709 : errmsg("invalidating obsolete replication slot \"%s\"",
1710 : NameStr(slotname)),
1711 : errdetail_internal("%s", err_detail.data),
1712 : err_hint.len ? errhint("%s", err_hint.data) : 0);
1713 :
1714 42 : pfree(err_detail.data);
1715 42 : pfree(err_hint.data);
1716 42 : }
1717 :
1718 : /*
1719 : * Can we invalidate an idle replication slot?
1720 : *
1721 : * Idle timeout invalidation is allowed only when:
1722 : *
1723 : * 1. Idle timeout is set
1724 : * 2. Slot has reserved WAL
1725 : * 3. Slot is inactive
1726 : * 4. The slot is not being synced from the primary while the server is in
1727 : * recovery. This is because synced slots are always considered to be
1728 : * inactive because they don't perform logical decoding to produce changes.
1729 : */
1730 : static inline bool
1731 712 : CanInvalidateIdleSlot(ReplicationSlot *s)
1732 : {
1733 712 : return (idle_replication_slot_timeout_secs != 0 &&
1734 0 : XLogRecPtrIsValid(s->data.restart_lsn) &&
1735 712 : s->inactive_since > 0 &&
1736 0 : !(RecoveryInProgress() && s->data.synced));
1737 : }
1738 :
1739 : /*
1740 : * DetermineSlotInvalidationCause - Determine the cause for which a slot
1741 : * becomes invalid among the given possible causes.
1742 : *
1743 : * This function sequentially checks all possible invalidation causes and
1744 : * returns the first one for which the slot is eligible for invalidation.
1745 : */
1746 : static ReplicationSlotInvalidationCause
1747 774 : DetermineSlotInvalidationCause(uint32 possible_causes, ReplicationSlot *s,
1748 : XLogRecPtr oldestLSN, Oid dboid,
1749 : TransactionId snapshotConflictHorizon,
1750 : TimestampTz *inactive_since, TimestampTz now)
1751 : {
1752 : Assert(possible_causes != RS_INVAL_NONE);
1753 :
1754 774 : if (possible_causes & RS_INVAL_WAL_REMOVED)
1755 : {
1756 724 : XLogRecPtr restart_lsn = s->data.restart_lsn;
1757 :
1758 724 : if (XLogRecPtrIsValid(restart_lsn) &&
1759 : restart_lsn < oldestLSN)
1760 12 : return RS_INVAL_WAL_REMOVED;
1761 : }
1762 :
1763 762 : if (possible_causes & RS_INVAL_HORIZON)
1764 : {
1765 : /* invalid DB oid signals a shared relation */
1766 44 : if (SlotIsLogical(s) &&
1767 34 : (dboid == InvalidOid || dboid == s->data.database))
1768 : {
1769 44 : TransactionId effective_xmin = s->effective_xmin;
1770 44 : TransactionId catalog_effective_xmin = s->effective_catalog_xmin;
1771 :
1772 44 : if (TransactionIdIsValid(effective_xmin) &&
1773 0 : TransactionIdPrecedesOrEquals(effective_xmin,
1774 : snapshotConflictHorizon))
1775 0 : return RS_INVAL_HORIZON;
1776 88 : else if (TransactionIdIsValid(catalog_effective_xmin) &&
1777 44 : TransactionIdPrecedesOrEquals(catalog_effective_xmin,
1778 : snapshotConflictHorizon))
1779 24 : return RS_INVAL_HORIZON;
1780 : }
1781 : }
1782 :
1783 738 : if (possible_causes & RS_INVAL_WAL_LEVEL)
1784 : {
1785 6 : if (SlotIsLogical(s))
1786 6 : return RS_INVAL_WAL_LEVEL;
1787 : }
1788 :
1789 732 : if (possible_causes & RS_INVAL_IDLE_TIMEOUT)
1790 : {
1791 : Assert(now > 0);
1792 :
1793 712 : if (CanInvalidateIdleSlot(s))
1794 : {
1795 : /*
1796 : * Simulate the invalidation due to idle_timeout to test the
1797 : * timeout behavior promptly, without waiting for it to trigger
1798 : * naturally.
1799 : */
1800 : #ifdef USE_INJECTION_POINTS
1801 0 : if (IS_INJECTION_POINT_ATTACHED("slot-timeout-inval"))
1802 : {
1803 0 : *inactive_since = 0; /* since the beginning of time */
1804 0 : return RS_INVAL_IDLE_TIMEOUT;
1805 : }
1806 : #endif
1807 :
1808 : /*
1809 : * Check if the slot needs to be invalidated due to
1810 : * idle_replication_slot_timeout GUC.
1811 : */
1812 0 : if (TimestampDifferenceExceedsSeconds(s->inactive_since, now,
1813 : idle_replication_slot_timeout_secs))
1814 : {
1815 0 : *inactive_since = s->inactive_since;
1816 0 : return RS_INVAL_IDLE_TIMEOUT;
1817 : }
1818 : }
1819 : }
1820 :
1821 732 : return RS_INVAL_NONE;
1822 : }
1823 :
1824 : /*
1825 : * Helper for InvalidateObsoleteReplicationSlots
1826 : *
1827 : * Acquires the given slot and mark it invalid, if necessary and possible.
1828 : *
1829 : * Returns whether ReplicationSlotControlLock was released in the interim (and
1830 : * in that case we're not holding the lock at return, otherwise we are).
1831 : *
1832 : * Sets *invalidated true if the slot was invalidated. (Untouched otherwise.)
1833 : *
1834 : * This is inherently racy, because we release the LWLock
1835 : * for syscalls, so caller must restart if we return true.
1836 : */
1837 : static bool
1838 838 : InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
1839 : ReplicationSlot *s,
1840 : XLogRecPtr oldestLSN,
1841 : Oid dboid, TransactionId snapshotConflictHorizon,
1842 : bool *invalidated)
1843 : {
1844 838 : int last_signaled_pid = 0;
1845 838 : bool released_lock = false;
1846 838 : TimestampTz inactive_since = 0;
1847 :
1848 : for (;;)
1849 14 : {
1850 : XLogRecPtr restart_lsn;
1851 : NameData slotname;
1852 852 : int active_pid = 0;
1853 852 : ReplicationSlotInvalidationCause invalidation_cause = RS_INVAL_NONE;
1854 852 : TimestampTz now = 0;
1855 852 : long slot_idle_secs = 0;
1856 :
1857 : Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
1858 :
1859 852 : if (!s->in_use)
1860 : {
1861 0 : if (released_lock)
1862 0 : LWLockRelease(ReplicationSlotControlLock);
1863 0 : break;
1864 : }
1865 :
1866 852 : if (possible_causes & RS_INVAL_IDLE_TIMEOUT)
1867 : {
1868 : /*
1869 : * Assign the current time here to avoid system call overhead
1870 : * while holding the spinlock in subsequent code.
1871 : */
1872 744 : now = GetCurrentTimestamp();
1873 : }
1874 :
1875 : /*
1876 : * Check if the slot needs to be invalidated. If it needs to be
1877 : * invalidated, and is not currently acquired, acquire it and mark it
1878 : * as having been invalidated. We do this with the spinlock held to
1879 : * avoid race conditions -- for example the restart_lsn could move
1880 : * forward, or the slot could be dropped.
1881 : */
1882 852 : SpinLockAcquire(&s->mutex);
1883 :
1884 852 : restart_lsn = s->data.restart_lsn;
1885 :
1886 : /* we do nothing if the slot is already invalid */
1887 852 : if (s->data.invalidated == RS_INVAL_NONE)
1888 774 : invalidation_cause = DetermineSlotInvalidationCause(possible_causes,
1889 : s, oldestLSN,
1890 : dboid,
1891 : snapshotConflictHorizon,
1892 : &inactive_since,
1893 : now);
1894 :
1895 : /* if there's no invalidation, we're done */
1896 852 : if (invalidation_cause == RS_INVAL_NONE)
1897 : {
1898 810 : SpinLockRelease(&s->mutex);
1899 810 : if (released_lock)
1900 0 : LWLockRelease(ReplicationSlotControlLock);
1901 810 : break;
1902 : }
1903 :
1904 42 : slotname = s->data.name;
1905 42 : active_pid = s->active_pid;
1906 :
1907 : /*
1908 : * If the slot can be acquired, do so and mark it invalidated
1909 : * immediately. Otherwise we'll signal the owning process, below, and
1910 : * retry.
1911 : *
1912 : * Note: Unlike other slot attributes, slot's inactive_since can't be
1913 : * changed until the acquired slot is released or the owning process
1914 : * is terminated. So, the inactive slot can only be invalidated
1915 : * immediately without being terminated.
1916 : */
1917 42 : if (active_pid == 0)
1918 : {
1919 28 : MyReplicationSlot = s;
1920 28 : s->active_pid = MyProcPid;
1921 28 : s->data.invalidated = invalidation_cause;
1922 :
1923 : /*
1924 : * XXX: We should consider not overwriting restart_lsn and instead
1925 : * just rely on .invalidated.
1926 : */
1927 28 : if (invalidation_cause == RS_INVAL_WAL_REMOVED)
1928 : {
1929 8 : s->data.restart_lsn = InvalidXLogRecPtr;
1930 8 : s->last_saved_restart_lsn = InvalidXLogRecPtr;
1931 : }
1932 :
1933 : /* Let caller know */
1934 28 : *invalidated = true;
1935 : }
1936 :
1937 42 : SpinLockRelease(&s->mutex);
1938 :
1939 : /*
1940 : * Calculate the idle time duration of the slot if slot is marked
1941 : * invalidated with RS_INVAL_IDLE_TIMEOUT.
1942 : */
1943 42 : if (invalidation_cause == RS_INVAL_IDLE_TIMEOUT)
1944 : {
1945 : int slot_idle_usecs;
1946 :
1947 0 : TimestampDifference(inactive_since, now, &slot_idle_secs,
1948 : &slot_idle_usecs);
1949 : }
1950 :
1951 42 : if (active_pid != 0)
1952 : {
1953 : /*
1954 : * Prepare the sleep on the slot's condition variable before
1955 : * releasing the lock, to close a possible race condition if the
1956 : * slot is released before the sleep below.
1957 : */
1958 14 : ConditionVariablePrepareToSleep(&s->active_cv);
1959 :
1960 14 : LWLockRelease(ReplicationSlotControlLock);
1961 14 : released_lock = true;
1962 :
1963 : /*
1964 : * Signal to terminate the process that owns the slot, if we
1965 : * haven't already signalled it. (Avoidance of repeated
1966 : * signalling is the only reason for there to be a loop in this
1967 : * routine; otherwise we could rely on caller's restart loop.)
1968 : *
1969 : * There is the race condition that other process may own the slot
1970 : * after its current owner process is terminated and before this
1971 : * process owns it. To handle that, we signal only if the PID of
1972 : * the owning process has changed from the previous time. (This
1973 : * logic assumes that the same PID is not reused very quickly.)
1974 : */
1975 14 : if (last_signaled_pid != active_pid)
1976 : {
1977 14 : ReportSlotInvalidation(invalidation_cause, true, active_pid,
1978 : slotname, restart_lsn,
1979 : oldestLSN, snapshotConflictHorizon,
1980 : slot_idle_secs);
1981 :
1982 14 : if (MyBackendType == B_STARTUP)
1983 10 : (void) SendProcSignal(active_pid,
1984 : PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT,
1985 : INVALID_PROC_NUMBER);
1986 : else
1987 4 : (void) kill(active_pid, SIGTERM);
1988 :
1989 14 : last_signaled_pid = active_pid;
1990 : }
1991 :
1992 : /* Wait until the slot is released. */
1993 14 : ConditionVariableSleep(&s->active_cv,
1994 : WAIT_EVENT_REPLICATION_SLOT_DROP);
1995 :
1996 : /*
1997 : * Re-acquire lock and start over; we expect to invalidate the
1998 : * slot next time (unless another process acquires the slot in the
1999 : * meantime).
2000 : *
2001 : * Note: It is possible for a slot to advance its restart_lsn or
2002 : * xmin values sufficiently between when we release the mutex and
2003 : * when we recheck, moving from a conflicting state to a non
2004 : * conflicting state. This is intentional and safe: if the slot
2005 : * has caught up while we're busy here, the resources we were
2006 : * concerned about (WAL segments or tuples) have not yet been
2007 : * removed, and there's no reason to invalidate the slot.
2008 : */
2009 14 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
2010 14 : continue;
2011 : }
2012 : else
2013 : {
2014 : /*
2015 : * We hold the slot now and have already invalidated it; flush it
2016 : * to ensure that state persists.
2017 : *
2018 : * Don't want to hold ReplicationSlotControlLock across file
2019 : * system operations, so release it now but be sure to tell caller
2020 : * to restart from scratch.
2021 : */
2022 28 : LWLockRelease(ReplicationSlotControlLock);
2023 28 : released_lock = true;
2024 :
2025 : /* Make sure the invalidated state persists across server restart */
2026 28 : ReplicationSlotMarkDirty();
2027 28 : ReplicationSlotSave();
2028 28 : ReplicationSlotRelease();
2029 :
2030 28 : ReportSlotInvalidation(invalidation_cause, false, active_pid,
2031 : slotname, restart_lsn,
2032 : oldestLSN, snapshotConflictHorizon,
2033 : slot_idle_secs);
2034 :
2035 : /* done with this slot for now */
2036 28 : break;
2037 : }
2038 : }
2039 :
2040 : Assert(released_lock == !LWLockHeldByMe(ReplicationSlotControlLock));
2041 :
2042 838 : return released_lock;
2043 : }
2044 :
2045 : /*
2046 : * Invalidate slots that require resources about to be removed.
2047 : *
2048 : * Returns true when any slot have got invalidated.
2049 : *
2050 : * Whether a slot needs to be invalidated depends on the invalidation cause.
2051 : * A slot is invalidated if it:
2052 : * - RS_INVAL_WAL_REMOVED: requires a LSN older than the given segment
2053 : * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given
2054 : * db; dboid may be InvalidOid for shared relations
2055 : * - RS_INVAL_WAL_LEVEL: is logical and wal_level is insufficient
2056 : * - RS_INVAL_IDLE_TIMEOUT: has been idle longer than the configured
2057 : * "idle_replication_slot_timeout" duration.
2058 : *
2059 : * Note: This function attempts to invalidate the slot for multiple possible
2060 : * causes in a single pass, minimizing redundant iterations. The "cause"
2061 : * parameter can be a MASK representing one or more of the defined causes.
2062 : *
2063 : * NB - this runs as part of checkpoint, so avoid raising errors if possible.
2064 : */
2065 : bool
2066 3506 : InvalidateObsoleteReplicationSlots(uint32 possible_causes,
2067 : XLogSegNo oldestSegno, Oid dboid,
2068 : TransactionId snapshotConflictHorizon)
2069 : {
2070 : XLogRecPtr oldestLSN;
2071 3506 : bool invalidated = false;
2072 :
2073 : Assert(!(possible_causes & RS_INVAL_HORIZON) || TransactionIdIsValid(snapshotConflictHorizon));
2074 : Assert(!(possible_causes & RS_INVAL_WAL_REMOVED) || oldestSegno > 0);
2075 : Assert(possible_causes != RS_INVAL_NONE);
2076 :
2077 3506 : if (max_replication_slots == 0)
2078 2 : return invalidated;
2079 :
2080 3504 : XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
2081 :
2082 3532 : restart:
2083 3532 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
2084 37866 : for (int i = 0; i < max_replication_slots; i++)
2085 : {
2086 34362 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
2087 :
2088 34362 : if (!s->in_use)
2089 33510 : continue;
2090 :
2091 : /* Prevent invalidation of logical slots during binary upgrade */
2092 852 : if (SlotIsLogical(s) && IsBinaryUpgrade)
2093 14 : continue;
2094 :
2095 838 : if (InvalidatePossiblyObsoleteSlot(possible_causes, s, oldestLSN, dboid,
2096 : snapshotConflictHorizon,
2097 : &invalidated))
2098 : {
2099 : /* if the lock was released, start from scratch */
2100 28 : goto restart;
2101 : }
2102 : }
2103 3504 : LWLockRelease(ReplicationSlotControlLock);
2104 :
2105 : /*
2106 : * If any slots have been invalidated, recalculate the resource limits.
2107 : */
2108 3504 : if (invalidated)
2109 : {
2110 18 : ReplicationSlotsComputeRequiredXmin(false);
2111 18 : ReplicationSlotsComputeRequiredLSN();
2112 : }
2113 :
2114 3504 : return invalidated;
2115 : }
2116 :
2117 : /*
2118 : * Flush all replication slots to disk.
2119 : *
2120 : * It is convenient to flush dirty replication slots at the time of checkpoint.
2121 : * Additionally, in case of a shutdown checkpoint, we also identify the slots
2122 : * for which the confirmed_flush LSN has been updated since the last time it
2123 : * was saved and flush them.
2124 : */
2125 : void
2126 3466 : CheckPointReplicationSlots(bool is_shutdown)
2127 : {
2128 : int i;
2129 3466 : bool last_saved_restart_lsn_updated = false;
2130 :
2131 3466 : elog(DEBUG1, "performing replication slot checkpoint");
2132 :
2133 : /*
2134 : * Prevent any slot from being created/dropped while we're active. As we
2135 : * explicitly do *not* want to block iterating over replication_slots or
2136 : * acquiring a slot we cannot take the control lock - but that's OK,
2137 : * because holding ReplicationSlotAllocationLock is strictly stronger, and
2138 : * enough to guarantee that nobody can change the in_use bits on us.
2139 : */
2140 3466 : LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
2141 :
2142 37560 : for (i = 0; i < max_replication_slots; i++)
2143 : {
2144 34094 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
2145 : char path[MAXPGPATH];
2146 :
2147 34094 : if (!s->in_use)
2148 33346 : continue;
2149 :
2150 : /* save the slot to disk, locking is handled in SaveSlotToPath() */
2151 748 : sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(s->data.name));
2152 :
2153 : /*
2154 : * Slot's data is not flushed each time the confirmed_flush LSN is
2155 : * updated as that could lead to frequent writes. However, we decide
2156 : * to force a flush of all logical slot's data at the time of shutdown
2157 : * if the confirmed_flush LSN is changed since we last flushed it to
2158 : * disk. This helps in avoiding an unnecessary retreat of the
2159 : * confirmed_flush LSN after restart.
2160 : */
2161 748 : if (is_shutdown && SlotIsLogical(s))
2162 : {
2163 156 : SpinLockAcquire(&s->mutex);
2164 :
2165 156 : if (s->data.invalidated == RS_INVAL_NONE &&
2166 156 : s->data.confirmed_flush > s->last_saved_confirmed_flush)
2167 : {
2168 74 : s->just_dirtied = true;
2169 74 : s->dirty = true;
2170 : }
2171 156 : SpinLockRelease(&s->mutex);
2172 : }
2173 :
2174 : /*
2175 : * Track if we're going to update slot's last_saved_restart_lsn. We
2176 : * need this to know if we need to recompute the required LSN.
2177 : */
2178 748 : if (s->last_saved_restart_lsn != s->data.restart_lsn)
2179 386 : last_saved_restart_lsn_updated = true;
2180 :
2181 748 : SaveSlotToPath(s, path, LOG);
2182 : }
2183 3466 : LWLockRelease(ReplicationSlotAllocationLock);
2184 :
2185 : /*
2186 : * Recompute the required LSN if SaveSlotToPath() updated
2187 : * last_saved_restart_lsn for any slot.
2188 : */
2189 3466 : if (last_saved_restart_lsn_updated)
2190 386 : ReplicationSlotsComputeRequiredLSN();
2191 3466 : }
2192 :
2193 : /*
2194 : * Load all replication slots from disk into memory at server startup. This
2195 : * needs to be run before we start crash recovery.
2196 : */
2197 : void
2198 1922 : StartupReplicationSlots(void)
2199 : {
2200 : DIR *replication_dir;
2201 : struct dirent *replication_de;
2202 :
2203 1922 : elog(DEBUG1, "starting up replication slots");
2204 :
2205 : /* restore all slots by iterating over all on-disk entries */
2206 1922 : replication_dir = AllocateDir(PG_REPLSLOT_DIR);
2207 5982 : while ((replication_de = ReadDir(replication_dir, PG_REPLSLOT_DIR)) != NULL)
2208 : {
2209 : char path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
2210 : PGFileType de_type;
2211 :
2212 4062 : if (strcmp(replication_de->d_name, ".") == 0 ||
2213 2142 : strcmp(replication_de->d_name, "..") == 0)
2214 3840 : continue;
2215 :
2216 222 : snprintf(path, sizeof(path), "%s/%s", PG_REPLSLOT_DIR, replication_de->d_name);
2217 222 : de_type = get_dirent_type(path, replication_de, false, DEBUG1);
2218 :
2219 : /* we're only creating directories here, skip if it's not our's */
2220 222 : if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_DIR)
2221 0 : continue;
2222 :
2223 : /* we crashed while a slot was being setup or deleted, clean up */
2224 222 : if (pg_str_endswith(replication_de->d_name, ".tmp"))
2225 : {
2226 0 : if (!rmtree(path, true))
2227 : {
2228 0 : ereport(WARNING,
2229 : (errmsg("could not remove directory \"%s\"",
2230 : path)));
2231 0 : continue;
2232 : }
2233 0 : fsync_fname(PG_REPLSLOT_DIR, true);
2234 0 : continue;
2235 : }
2236 :
2237 : /* looks like a slot in a normal state, restore */
2238 222 : RestoreSlotFromDisk(replication_de->d_name);
2239 : }
2240 1920 : FreeDir(replication_dir);
2241 :
2242 : /* currently no slots exist, we're done. */
2243 1920 : if (max_replication_slots <= 0)
2244 2 : return;
2245 :
2246 : /* Now that we have recovered all the data, compute replication xmin */
2247 1918 : ReplicationSlotsComputeRequiredXmin(false);
2248 1918 : ReplicationSlotsComputeRequiredLSN();
2249 : }
2250 :
2251 : /* ----
2252 : * Manipulation of on-disk state of replication slots
2253 : *
2254 : * NB: none of the routines below should take any notice whether a slot is the
2255 : * current one or not, that's all handled a layer above.
2256 : * ----
2257 : */
2258 : static void
2259 1306 : CreateSlotOnDisk(ReplicationSlot *slot)
2260 : {
2261 : char tmppath[MAXPGPATH];
2262 : char path[MAXPGPATH];
2263 : struct stat st;
2264 :
2265 : /*
2266 : * No need to take out the io_in_progress_lock, nobody else can see this
2267 : * slot yet, so nobody else will write. We're reusing SaveSlotToPath which
2268 : * takes out the lock, if we'd take the lock here, we'd deadlock.
2269 : */
2270 :
2271 1306 : sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(slot->data.name));
2272 1306 : sprintf(tmppath, "%s/%s.tmp", PG_REPLSLOT_DIR, NameStr(slot->data.name));
2273 :
2274 : /*
2275 : * It's just barely possible that some previous effort to create or drop a
2276 : * slot with this name left a temp directory lying around. If that seems
2277 : * to be the case, try to remove it. If the rmtree() fails, we'll error
2278 : * out at the MakePGDirectory() below, so we don't bother checking
2279 : * success.
2280 : */
2281 1306 : if (stat(tmppath, &st) == 0 && S_ISDIR(st.st_mode))
2282 0 : rmtree(tmppath, true);
2283 :
2284 : /* Create and fsync the temporary slot directory. */
2285 1306 : if (MakePGDirectory(tmppath) < 0)
2286 0 : ereport(ERROR,
2287 : (errcode_for_file_access(),
2288 : errmsg("could not create directory \"%s\": %m",
2289 : tmppath)));
2290 1306 : fsync_fname(tmppath, true);
2291 :
2292 : /* Write the actual state file. */
2293 1306 : slot->dirty = true; /* signal that we really need to write */
2294 1306 : SaveSlotToPath(slot, tmppath, ERROR);
2295 :
2296 : /* Rename the directory into place. */
2297 1306 : if (rename(tmppath, path) != 0)
2298 0 : ereport(ERROR,
2299 : (errcode_for_file_access(),
2300 : errmsg("could not rename file \"%s\" to \"%s\": %m",
2301 : tmppath, path)));
2302 :
2303 : /*
2304 : * If we'd now fail - really unlikely - we wouldn't know whether this slot
2305 : * would persist after an OS crash or not - so, force a restart. The
2306 : * restart would try to fsync this again till it works.
2307 : */
2308 1306 : START_CRIT_SECTION();
2309 :
2310 1306 : fsync_fname(path, true);
2311 1306 : fsync_fname(PG_REPLSLOT_DIR, true);
2312 :
2313 1306 : END_CRIT_SECTION();
2314 1306 : }
2315 :
2316 : /*
2317 : * Shared functionality between saving and creating a replication slot.
2318 : */
2319 : static void
2320 4762 : SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
2321 : {
2322 : char tmppath[MAXPGPATH];
2323 : char path[MAXPGPATH];
2324 : int fd;
2325 : ReplicationSlotOnDisk cp;
2326 : bool was_dirty;
2327 :
2328 : /* first check whether there's something to write out */
2329 4762 : SpinLockAcquire(&slot->mutex);
2330 4762 : was_dirty = slot->dirty;
2331 4762 : slot->just_dirtied = false;
2332 4762 : SpinLockRelease(&slot->mutex);
2333 :
2334 : /* and don't do anything if there's nothing to write */
2335 4762 : if (!was_dirty)
2336 266 : return;
2337 :
2338 4496 : LWLockAcquire(&slot->io_in_progress_lock, LW_EXCLUSIVE);
2339 :
2340 : /* silence valgrind :( */
2341 4496 : memset(&cp, 0, sizeof(ReplicationSlotOnDisk));
2342 :
2343 4496 : sprintf(tmppath, "%s/state.tmp", dir);
2344 4496 : sprintf(path, "%s/state", dir);
2345 :
2346 4496 : fd = OpenTransientFile(tmppath, O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
2347 4496 : if (fd < 0)
2348 : {
2349 : /*
2350 : * If not an ERROR, then release the lock before returning. In case
2351 : * of an ERROR, the error recovery path automatically releases the
2352 : * lock, but no harm in explicitly releasing even in that case. Note
2353 : * that LWLockRelease() could affect errno.
2354 : */
2355 0 : int save_errno = errno;
2356 :
2357 0 : LWLockRelease(&slot->io_in_progress_lock);
2358 0 : errno = save_errno;
2359 0 : ereport(elevel,
2360 : (errcode_for_file_access(),
2361 : errmsg("could not create file \"%s\": %m",
2362 : tmppath)));
2363 0 : return;
2364 : }
2365 :
2366 4496 : cp.magic = SLOT_MAGIC;
2367 4496 : INIT_CRC32C(cp.checksum);
2368 4496 : cp.version = SLOT_VERSION;
2369 4496 : cp.length = ReplicationSlotOnDiskV2Size;
2370 :
2371 4496 : SpinLockAcquire(&slot->mutex);
2372 :
2373 4496 : memcpy(&cp.slotdata, &slot->data, sizeof(ReplicationSlotPersistentData));
2374 :
2375 4496 : SpinLockRelease(&slot->mutex);
2376 :
2377 4496 : COMP_CRC32C(cp.checksum,
2378 : (char *) (&cp) + ReplicationSlotOnDiskNotChecksummedSize,
2379 : ReplicationSlotOnDiskChecksummedSize);
2380 4496 : FIN_CRC32C(cp.checksum);
2381 :
2382 4496 : errno = 0;
2383 4496 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_WRITE);
2384 4496 : if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
2385 : {
2386 0 : int save_errno = errno;
2387 :
2388 0 : pgstat_report_wait_end();
2389 0 : CloseTransientFile(fd);
2390 0 : unlink(tmppath);
2391 0 : LWLockRelease(&slot->io_in_progress_lock);
2392 :
2393 : /* if write didn't set errno, assume problem is no disk space */
2394 0 : errno = save_errno ? save_errno : ENOSPC;
2395 0 : ereport(elevel,
2396 : (errcode_for_file_access(),
2397 : errmsg("could not write to file \"%s\": %m",
2398 : tmppath)));
2399 0 : return;
2400 : }
2401 4496 : pgstat_report_wait_end();
2402 :
2403 : /* fsync the temporary file */
2404 4496 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_SYNC);
2405 4496 : if (pg_fsync(fd) != 0)
2406 : {
2407 0 : int save_errno = errno;
2408 :
2409 0 : pgstat_report_wait_end();
2410 0 : CloseTransientFile(fd);
2411 0 : unlink(tmppath);
2412 0 : LWLockRelease(&slot->io_in_progress_lock);
2413 :
2414 0 : errno = save_errno;
2415 0 : ereport(elevel,
2416 : (errcode_for_file_access(),
2417 : errmsg("could not fsync file \"%s\": %m",
2418 : tmppath)));
2419 0 : return;
2420 : }
2421 4496 : pgstat_report_wait_end();
2422 :
2423 4496 : if (CloseTransientFile(fd) != 0)
2424 : {
2425 0 : int save_errno = errno;
2426 :
2427 0 : unlink(tmppath);
2428 0 : LWLockRelease(&slot->io_in_progress_lock);
2429 :
2430 0 : errno = save_errno;
2431 0 : ereport(elevel,
2432 : (errcode_for_file_access(),
2433 : errmsg("could not close file \"%s\": %m",
2434 : tmppath)));
2435 0 : return;
2436 : }
2437 :
2438 : /* rename to permanent file, fsync file and directory */
2439 4496 : if (rename(tmppath, path) != 0)
2440 : {
2441 0 : int save_errno = errno;
2442 :
2443 0 : unlink(tmppath);
2444 0 : LWLockRelease(&slot->io_in_progress_lock);
2445 :
2446 0 : errno = save_errno;
2447 0 : ereport(elevel,
2448 : (errcode_for_file_access(),
2449 : errmsg("could not rename file \"%s\" to \"%s\": %m",
2450 : tmppath, path)));
2451 0 : return;
2452 : }
2453 :
2454 : /*
2455 : * Check CreateSlotOnDisk() for the reasoning of using a critical section.
2456 : */
2457 4496 : START_CRIT_SECTION();
2458 :
2459 4496 : fsync_fname(path, false);
2460 4496 : fsync_fname(dir, true);
2461 4496 : fsync_fname(PG_REPLSLOT_DIR, true);
2462 :
2463 4496 : END_CRIT_SECTION();
2464 :
2465 : /*
2466 : * Successfully wrote, unset dirty bit, unless somebody dirtied again
2467 : * already and remember the confirmed_flush LSN value.
2468 : */
2469 4496 : SpinLockAcquire(&slot->mutex);
2470 4496 : if (!slot->just_dirtied)
2471 4478 : slot->dirty = false;
2472 4496 : slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
2473 4496 : slot->last_saved_restart_lsn = cp.slotdata.restart_lsn;
2474 4496 : SpinLockRelease(&slot->mutex);
2475 :
2476 4496 : LWLockRelease(&slot->io_in_progress_lock);
2477 : }
2478 :
2479 : /*
2480 : * Load a single slot from disk into memory.
2481 : */
2482 : static void
2483 222 : RestoreSlotFromDisk(const char *name)
2484 : {
2485 : ReplicationSlotOnDisk cp;
2486 : int i;
2487 : char slotdir[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
2488 : char path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR) + 10];
2489 : int fd;
2490 222 : bool restored = false;
2491 : int readBytes;
2492 : pg_crc32c checksum;
2493 222 : TimestampTz now = 0;
2494 :
2495 : /* no need to lock here, no concurrent access allowed yet */
2496 :
2497 : /* delete temp file if it exists */
2498 222 : sprintf(slotdir, "%s/%s", PG_REPLSLOT_DIR, name);
2499 222 : sprintf(path, "%s/state.tmp", slotdir);
2500 222 : if (unlink(path) < 0 && errno != ENOENT)
2501 0 : ereport(PANIC,
2502 : (errcode_for_file_access(),
2503 : errmsg("could not remove file \"%s\": %m", path)));
2504 :
2505 222 : sprintf(path, "%s/state", slotdir);
2506 :
2507 222 : elog(DEBUG1, "restoring replication slot from \"%s\"", path);
2508 :
2509 : /* on some operating systems fsyncing a file requires O_RDWR */
2510 222 : fd = OpenTransientFile(path, O_RDWR | PG_BINARY);
2511 :
2512 : /*
2513 : * We do not need to handle this as we are rename()ing the directory into
2514 : * place only after we fsync()ed the state file.
2515 : */
2516 222 : if (fd < 0)
2517 0 : ereport(PANIC,
2518 : (errcode_for_file_access(),
2519 : errmsg("could not open file \"%s\": %m", path)));
2520 :
2521 : /*
2522 : * Sync state file before we're reading from it. We might have crashed
2523 : * while it wasn't synced yet and we shouldn't continue on that basis.
2524 : */
2525 222 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_RESTORE_SYNC);
2526 222 : if (pg_fsync(fd) != 0)
2527 0 : ereport(PANIC,
2528 : (errcode_for_file_access(),
2529 : errmsg("could not fsync file \"%s\": %m",
2530 : path)));
2531 222 : pgstat_report_wait_end();
2532 :
2533 : /* Also sync the parent directory */
2534 222 : START_CRIT_SECTION();
2535 222 : fsync_fname(slotdir, true);
2536 222 : END_CRIT_SECTION();
2537 :
2538 : /* read part of statefile that's guaranteed to be version independent */
2539 222 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
2540 222 : readBytes = read(fd, &cp, ReplicationSlotOnDiskConstantSize);
2541 222 : pgstat_report_wait_end();
2542 222 : if (readBytes != ReplicationSlotOnDiskConstantSize)
2543 : {
2544 0 : if (readBytes < 0)
2545 0 : ereport(PANIC,
2546 : (errcode_for_file_access(),
2547 : errmsg("could not read file \"%s\": %m", path)));
2548 : else
2549 0 : ereport(PANIC,
2550 : (errcode(ERRCODE_DATA_CORRUPTED),
2551 : errmsg("could not read file \"%s\": read %d of %zu",
2552 : path, readBytes,
2553 : (Size) ReplicationSlotOnDiskConstantSize)));
2554 : }
2555 :
2556 : /* verify magic */
2557 222 : if (cp.magic != SLOT_MAGIC)
2558 0 : ereport(PANIC,
2559 : (errcode(ERRCODE_DATA_CORRUPTED),
2560 : errmsg("replication slot file \"%s\" has wrong magic number: %u instead of %u",
2561 : path, cp.magic, SLOT_MAGIC)));
2562 :
2563 : /* verify version */
2564 222 : if (cp.version != SLOT_VERSION)
2565 0 : ereport(PANIC,
2566 : (errcode(ERRCODE_DATA_CORRUPTED),
2567 : errmsg("replication slot file \"%s\" has unsupported version %u",
2568 : path, cp.version)));
2569 :
2570 : /* boundary check on length */
2571 222 : if (cp.length != ReplicationSlotOnDiskV2Size)
2572 0 : ereport(PANIC,
2573 : (errcode(ERRCODE_DATA_CORRUPTED),
2574 : errmsg("replication slot file \"%s\" has corrupted length %u",
2575 : path, cp.length)));
2576 :
2577 : /* Now that we know the size, read the entire file */
2578 222 : pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
2579 444 : readBytes = read(fd,
2580 : (char *) &cp + ReplicationSlotOnDiskConstantSize,
2581 222 : cp.length);
2582 222 : pgstat_report_wait_end();
2583 222 : if (readBytes != cp.length)
2584 : {
2585 0 : if (readBytes < 0)
2586 0 : ereport(PANIC,
2587 : (errcode_for_file_access(),
2588 : errmsg("could not read file \"%s\": %m", path)));
2589 : else
2590 0 : ereport(PANIC,
2591 : (errcode(ERRCODE_DATA_CORRUPTED),
2592 : errmsg("could not read file \"%s\": read %d of %zu",
2593 : path, readBytes, (Size) cp.length)));
2594 : }
2595 :
2596 222 : if (CloseTransientFile(fd) != 0)
2597 0 : ereport(PANIC,
2598 : (errcode_for_file_access(),
2599 : errmsg("could not close file \"%s\": %m", path)));
2600 :
2601 : /* now verify the CRC */
2602 222 : INIT_CRC32C(checksum);
2603 222 : COMP_CRC32C(checksum,
2604 : (char *) &cp + ReplicationSlotOnDiskNotChecksummedSize,
2605 : ReplicationSlotOnDiskChecksummedSize);
2606 222 : FIN_CRC32C(checksum);
2607 :
2608 222 : if (!EQ_CRC32C(checksum, cp.checksum))
2609 0 : ereport(PANIC,
2610 : (errmsg("checksum mismatch for replication slot file \"%s\": is %u, should be %u",
2611 : path, checksum, cp.checksum)));
2612 :
2613 : /*
2614 : * If we crashed with an ephemeral slot active, don't restore but delete
2615 : * it.
2616 : */
2617 222 : if (cp.slotdata.persistency != RS_PERSISTENT)
2618 : {
2619 0 : if (!rmtree(slotdir, true))
2620 : {
2621 0 : ereport(WARNING,
2622 : (errmsg("could not remove directory \"%s\"",
2623 : slotdir)));
2624 : }
2625 0 : fsync_fname(PG_REPLSLOT_DIR, true);
2626 0 : return;
2627 : }
2628 :
2629 : /*
2630 : * Verify that requirements for the specific slot type are met. That's
2631 : * important because if these aren't met we're not guaranteed to retain
2632 : * all the necessary resources for the slot.
2633 : *
2634 : * NB: We have to do so *after* the above checks for ephemeral slots,
2635 : * because otherwise a slot that shouldn't exist anymore could prevent
2636 : * restarts.
2637 : *
2638 : * NB: Changing the requirements here also requires adapting
2639 : * CheckSlotRequirements() and CheckLogicalDecodingRequirements().
2640 : */
2641 222 : if (cp.slotdata.database != InvalidOid)
2642 : {
2643 146 : if (wal_level < WAL_LEVEL_LOGICAL)
2644 0 : ereport(FATAL,
2645 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2646 : errmsg("logical replication slot \"%s\" exists, but \"wal_level\" < \"logical\"",
2647 : NameStr(cp.slotdata.name)),
2648 : errhint("Change \"wal_level\" to be \"logical\" or higher.")));
2649 :
2650 : /*
2651 : * In standby mode, the hot standby must be enabled. This check is
2652 : * necessary to ensure logical slots are invalidated when they become
2653 : * incompatible due to insufficient wal_level. Otherwise, if the
2654 : * primary reduces wal_level < logical while hot standby is disabled,
2655 : * logical slots would remain valid even after promotion.
2656 : */
2657 146 : if (StandbyMode && !EnableHotStandby)
2658 2 : ereport(FATAL,
2659 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2660 : errmsg("logical replication slot \"%s\" exists on the standby, but \"hot_standby\" = \"off\"",
2661 : NameStr(cp.slotdata.name)),
2662 : errhint("Change \"hot_standby\" to be \"on\".")));
2663 : }
2664 76 : else if (wal_level < WAL_LEVEL_REPLICA)
2665 0 : ereport(FATAL,
2666 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2667 : errmsg("physical replication slot \"%s\" exists, but \"wal_level\" < \"replica\"",
2668 : NameStr(cp.slotdata.name)),
2669 : errhint("Change \"wal_level\" to be \"replica\" or higher.")));
2670 :
2671 : /* nothing can be active yet, don't lock anything */
2672 318 : for (i = 0; i < max_replication_slots; i++)
2673 : {
2674 : ReplicationSlot *slot;
2675 :
2676 318 : slot = &ReplicationSlotCtl->replication_slots[i];
2677 :
2678 318 : if (slot->in_use)
2679 98 : continue;
2680 :
2681 : /* restore the entire set of persistent data */
2682 220 : memcpy(&slot->data, &cp.slotdata,
2683 : sizeof(ReplicationSlotPersistentData));
2684 :
2685 : /* initialize in memory state */
2686 220 : slot->effective_xmin = cp.slotdata.xmin;
2687 220 : slot->effective_catalog_xmin = cp.slotdata.catalog_xmin;
2688 220 : slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
2689 220 : slot->last_saved_restart_lsn = cp.slotdata.restart_lsn;
2690 :
2691 220 : slot->candidate_catalog_xmin = InvalidTransactionId;
2692 220 : slot->candidate_xmin_lsn = InvalidXLogRecPtr;
2693 220 : slot->candidate_restart_lsn = InvalidXLogRecPtr;
2694 220 : slot->candidate_restart_valid = InvalidXLogRecPtr;
2695 :
2696 220 : slot->in_use = true;
2697 220 : slot->active_pid = 0;
2698 :
2699 : /*
2700 : * Set the time since the slot has become inactive after loading the
2701 : * slot from the disk into memory. Whoever acquires the slot i.e.
2702 : * makes the slot active will reset it. Use the same inactive_since
2703 : * time for all the slots.
2704 : */
2705 220 : if (now == 0)
2706 220 : now = GetCurrentTimestamp();
2707 :
2708 220 : ReplicationSlotSetInactiveSince(slot, now, false);
2709 :
2710 220 : restored = true;
2711 220 : break;
2712 : }
2713 :
2714 220 : if (!restored)
2715 0 : ereport(FATAL,
2716 : (errmsg("too many replication slots active before shutdown"),
2717 : errhint("Increase \"max_replication_slots\" and try again.")));
2718 : }
2719 :
2720 : /*
2721 : * Maps an invalidation reason for a replication slot to
2722 : * ReplicationSlotInvalidationCause.
2723 : */
2724 : ReplicationSlotInvalidationCause
2725 0 : GetSlotInvalidationCause(const char *cause_name)
2726 : {
2727 : Assert(cause_name);
2728 :
2729 : /* Search lookup table for the cause having this name */
2730 0 : for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
2731 : {
2732 0 : if (strcmp(SlotInvalidationCauses[i].cause_name, cause_name) == 0)
2733 0 : return SlotInvalidationCauses[i].cause;
2734 : }
2735 :
2736 : Assert(false);
2737 0 : return RS_INVAL_NONE; /* to keep compiler quiet */
2738 : }
2739 :
2740 : /*
2741 : * Maps an ReplicationSlotInvalidationCause to the invalidation
2742 : * reason for a replication slot.
2743 : */
2744 : const char *
2745 80 : GetSlotInvalidationCauseName(ReplicationSlotInvalidationCause cause)
2746 : {
2747 : /* Search lookup table for the name of this cause */
2748 250 : for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
2749 : {
2750 250 : if (SlotInvalidationCauses[i].cause == cause)
2751 80 : return SlotInvalidationCauses[i].cause_name;
2752 : }
2753 :
2754 : Assert(false);
2755 0 : return "none"; /* to keep compiler quiet */
2756 : }
2757 :
2758 : /*
2759 : * A helper function to validate slots specified in GUC synchronized_standby_slots.
2760 : *
2761 : * The rawname will be parsed, and the result will be saved into *elemlist.
2762 : */
2763 : static bool
2764 36 : validate_sync_standby_slots(char *rawname, List **elemlist)
2765 : {
2766 : /* Verify syntax and parse string into a list of identifiers */
2767 36 : if (!SplitIdentifierString(rawname, ',', elemlist))
2768 : {
2769 0 : GUC_check_errdetail("List syntax is invalid.");
2770 0 : return false;
2771 : }
2772 :
2773 : /* Iterate the list to validate each slot name */
2774 100 : foreach_ptr(char, name, *elemlist)
2775 : {
2776 : int err_code;
2777 36 : char *err_msg = NULL;
2778 36 : char *err_hint = NULL;
2779 :
2780 36 : if (!ReplicationSlotValidateNameInternal(name, false, &err_code,
2781 : &err_msg, &err_hint))
2782 : {
2783 4 : GUC_check_errcode(err_code);
2784 4 : GUC_check_errdetail("%s", err_msg);
2785 4 : if (err_hint != NULL)
2786 4 : GUC_check_errhint("%s", err_hint);
2787 4 : return false;
2788 : }
2789 : }
2790 :
2791 32 : return true;
2792 : }
2793 :
2794 : /*
2795 : * GUC check_hook for synchronized_standby_slots
2796 : */
2797 : bool
2798 2338 : check_synchronized_standby_slots(char **newval, void **extra, GucSource source)
2799 : {
2800 : char *rawname;
2801 : char *ptr;
2802 : List *elemlist;
2803 : int size;
2804 : bool ok;
2805 : SyncStandbySlotsConfigData *config;
2806 :
2807 2338 : if ((*newval)[0] == '\0')
2808 2302 : return true;
2809 :
2810 : /* Need a modifiable copy of the GUC string */
2811 36 : rawname = pstrdup(*newval);
2812 :
2813 : /* Now verify if the specified slots exist and have correct type */
2814 36 : ok = validate_sync_standby_slots(rawname, &elemlist);
2815 :
2816 36 : if (!ok || elemlist == NIL)
2817 : {
2818 4 : pfree(rawname);
2819 4 : list_free(elemlist);
2820 4 : return ok;
2821 : }
2822 :
2823 : /* Compute the size required for the SyncStandbySlotsConfigData struct */
2824 32 : size = offsetof(SyncStandbySlotsConfigData, slot_names);
2825 96 : foreach_ptr(char, slot_name, elemlist)
2826 32 : size += strlen(slot_name) + 1;
2827 :
2828 : /* GUC extra value must be guc_malloc'd, not palloc'd */
2829 32 : config = (SyncStandbySlotsConfigData *) guc_malloc(LOG, size);
2830 32 : if (!config)
2831 0 : return false;
2832 :
2833 : /* Transform the data into SyncStandbySlotsConfigData */
2834 32 : config->nslotnames = list_length(elemlist);
2835 :
2836 32 : ptr = config->slot_names;
2837 96 : foreach_ptr(char, slot_name, elemlist)
2838 : {
2839 32 : strcpy(ptr, slot_name);
2840 32 : ptr += strlen(slot_name) + 1;
2841 : }
2842 :
2843 32 : *extra = config;
2844 :
2845 32 : pfree(rawname);
2846 32 : list_free(elemlist);
2847 32 : return true;
2848 : }
2849 :
2850 : /*
2851 : * GUC assign_hook for synchronized_standby_slots
2852 : */
2853 : void
2854 2332 : assign_synchronized_standby_slots(const char *newval, void *extra)
2855 : {
2856 : /*
2857 : * The standby slots may have changed, so we must recompute the oldest
2858 : * LSN.
2859 : */
2860 2332 : ss_oldest_flush_lsn = InvalidXLogRecPtr;
2861 :
2862 2332 : synchronized_standby_slots_config = (SyncStandbySlotsConfigData *) extra;
2863 2332 : }
2864 :
2865 : /*
2866 : * Check if the passed slot_name is specified in the synchronized_standby_slots GUC.
2867 : */
2868 : bool
2869 52178 : SlotExistsInSyncStandbySlots(const char *slot_name)
2870 : {
2871 : const char *standby_slot_name;
2872 :
2873 : /* Return false if there is no value in synchronized_standby_slots */
2874 52178 : if (synchronized_standby_slots_config == NULL)
2875 52148 : return false;
2876 :
2877 : /*
2878 : * XXX: We are not expecting this list to be long so a linear search
2879 : * shouldn't hurt but if that turns out not to be true then we can cache
2880 : * this information for each WalSender as well.
2881 : */
2882 30 : standby_slot_name = synchronized_standby_slots_config->slot_names;
2883 48 : for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
2884 : {
2885 30 : if (strcmp(standby_slot_name, slot_name) == 0)
2886 12 : return true;
2887 :
2888 18 : standby_slot_name += strlen(standby_slot_name) + 1;
2889 : }
2890 :
2891 18 : return false;
2892 : }
2893 :
2894 : /*
2895 : * Return true if the slots specified in synchronized_standby_slots have caught up to
2896 : * the given WAL location, false otherwise.
2897 : *
2898 : * The elevel parameter specifies the error level used for logging messages
2899 : * related to slots that do not exist, are invalidated, or are inactive.
2900 : */
2901 : bool
2902 1336 : StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
2903 : {
2904 : const char *name;
2905 1336 : int caught_up_slot_num = 0;
2906 1336 : XLogRecPtr min_restart_lsn = InvalidXLogRecPtr;
2907 :
2908 : /*
2909 : * Don't need to wait for the standbys to catch up if there is no value in
2910 : * synchronized_standby_slots.
2911 : */
2912 1336 : if (synchronized_standby_slots_config == NULL)
2913 1298 : return true;
2914 :
2915 : /*
2916 : * Don't need to wait for the standbys to catch up if we are on a standby
2917 : * server, since we do not support syncing slots to cascading standbys.
2918 : */
2919 38 : if (RecoveryInProgress())
2920 0 : return true;
2921 :
2922 : /*
2923 : * Don't need to wait for the standbys to catch up if they are already
2924 : * beyond the specified WAL location.
2925 : */
2926 38 : if (XLogRecPtrIsValid(ss_oldest_flush_lsn) &&
2927 20 : ss_oldest_flush_lsn >= wait_for_lsn)
2928 12 : return true;
2929 :
2930 : /*
2931 : * To prevent concurrent slot dropping and creation while filtering the
2932 : * slots, take the ReplicationSlotControlLock outside of the loop.
2933 : */
2934 26 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
2935 :
2936 26 : name = synchronized_standby_slots_config->slot_names;
2937 36 : for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
2938 : {
2939 : XLogRecPtr restart_lsn;
2940 : bool invalidated;
2941 : bool inactive;
2942 : ReplicationSlot *slot;
2943 :
2944 26 : slot = SearchNamedReplicationSlot(name, false);
2945 :
2946 : /*
2947 : * If a slot name provided in synchronized_standby_slots does not
2948 : * exist, report a message and exit the loop.
2949 : */
2950 26 : if (!slot)
2951 : {
2952 0 : ereport(elevel,
2953 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2954 : errmsg("replication slot \"%s\" specified in parameter \"%s\" does not exist",
2955 : name, "synchronized_standby_slots"),
2956 : errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
2957 : name),
2958 : errhint("Create the replication slot \"%s\" or amend parameter \"%s\".",
2959 : name, "synchronized_standby_slots"));
2960 0 : break;
2961 : }
2962 :
2963 : /* Same as above: if a slot is not physical, exit the loop. */
2964 26 : if (SlotIsLogical(slot))
2965 : {
2966 0 : ereport(elevel,
2967 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2968 : errmsg("cannot specify logical replication slot \"%s\" in parameter \"%s\"",
2969 : name, "synchronized_standby_slots"),
2970 : errdetail("Logical replication is waiting for correction on replication slot \"%s\".",
2971 : name),
2972 : errhint("Remove the logical replication slot \"%s\" from parameter \"%s\".",
2973 : name, "synchronized_standby_slots"));
2974 0 : break;
2975 : }
2976 :
2977 26 : SpinLockAcquire(&slot->mutex);
2978 26 : restart_lsn = slot->data.restart_lsn;
2979 26 : invalidated = slot->data.invalidated != RS_INVAL_NONE;
2980 26 : inactive = slot->active_pid == 0;
2981 26 : SpinLockRelease(&slot->mutex);
2982 :
2983 26 : if (invalidated)
2984 : {
2985 : /* Specified physical slot has been invalidated */
2986 0 : ereport(elevel,
2987 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2988 : errmsg("physical replication slot \"%s\" specified in parameter \"%s\" has been invalidated",
2989 : name, "synchronized_standby_slots"),
2990 : errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
2991 : name),
2992 : errhint("Drop and recreate the replication slot \"%s\", or amend parameter \"%s\".",
2993 : name, "synchronized_standby_slots"));
2994 0 : break;
2995 : }
2996 :
2997 26 : if (!XLogRecPtrIsValid(restart_lsn) || restart_lsn < wait_for_lsn)
2998 : {
2999 : /* Log a message if no active_pid for this physical slot */
3000 16 : if (inactive)
3001 12 : ereport(elevel,
3002 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
3003 : errmsg("replication slot \"%s\" specified in parameter \"%s\" does not have active_pid",
3004 : name, "synchronized_standby_slots"),
3005 : errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
3006 : name),
3007 : errhint("Start the standby associated with the replication slot \"%s\", or amend parameter \"%s\".",
3008 : name, "synchronized_standby_slots"));
3009 :
3010 : /* Continue if the current slot hasn't caught up. */
3011 16 : break;
3012 : }
3013 :
3014 : Assert(restart_lsn >= wait_for_lsn);
3015 :
3016 10 : if (!XLogRecPtrIsValid(min_restart_lsn) ||
3017 : min_restart_lsn > restart_lsn)
3018 10 : min_restart_lsn = restart_lsn;
3019 :
3020 10 : caught_up_slot_num++;
3021 :
3022 10 : name += strlen(name) + 1;
3023 : }
3024 :
3025 26 : LWLockRelease(ReplicationSlotControlLock);
3026 :
3027 : /*
3028 : * Return false if not all the standbys have caught up to the specified
3029 : * WAL location.
3030 : */
3031 26 : if (caught_up_slot_num != synchronized_standby_slots_config->nslotnames)
3032 16 : return false;
3033 :
3034 : /* The ss_oldest_flush_lsn must not retreat. */
3035 : Assert(!XLogRecPtrIsValid(ss_oldest_flush_lsn) ||
3036 : min_restart_lsn >= ss_oldest_flush_lsn);
3037 :
3038 10 : ss_oldest_flush_lsn = min_restart_lsn;
3039 :
3040 10 : return true;
3041 : }
3042 :
3043 : /*
3044 : * Wait for physical standbys to confirm receiving the given lsn.
3045 : *
3046 : * Used by logical decoding SQL functions. It waits for physical standbys
3047 : * corresponding to the physical slots specified in the synchronized_standby_slots GUC.
3048 : */
3049 : void
3050 446 : WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
3051 : {
3052 : /*
3053 : * Don't need to wait for the standby to catch up if the current acquired
3054 : * slot is not a logical failover slot, or there is no value in
3055 : * synchronized_standby_slots.
3056 : */
3057 446 : if (!MyReplicationSlot->data.failover || !synchronized_standby_slots_config)
3058 444 : return;
3059 :
3060 2 : ConditionVariablePrepareToSleep(&WalSndCtl->wal_confirm_rcv_cv);
3061 :
3062 : for (;;)
3063 : {
3064 4 : CHECK_FOR_INTERRUPTS();
3065 :
3066 4 : if (ConfigReloadPending)
3067 : {
3068 2 : ConfigReloadPending = false;
3069 2 : ProcessConfigFile(PGC_SIGHUP);
3070 : }
3071 :
3072 : /* Exit if done waiting for every slot. */
3073 4 : if (StandbySlotsHaveCaughtup(wait_for_lsn, WARNING))
3074 2 : break;
3075 :
3076 : /*
3077 : * Wait for the slots in the synchronized_standby_slots to catch up,
3078 : * but use a timeout (1s) so we can also check if the
3079 : * synchronized_standby_slots has been changed.
3080 : */
3081 2 : ConditionVariableTimedSleep(&WalSndCtl->wal_confirm_rcv_cv, 1000,
3082 : WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION);
3083 : }
3084 :
3085 2 : ConditionVariableCancelSleep();
3086 : }
|