Line data Source code
1 : /*-------------------------------------------------------------------------
2 : * slotsync.c
3 : * Functionality for synchronizing slots to a standby server from the
4 : * primary server.
5 : *
6 : * Copyright (c) 2024, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * src/backend/replication/logical/slotsync.c
10 : *
11 : * This file contains the code for slot synchronization on a physical standby
12 : * to fetch logical failover slots information from the primary server, create
13 : * the slots on the standby and synchronize them periodically.
14 : *
15 : * Slot synchronization can be performed either automatically by enabling slot
16 : * sync worker or manually by calling SQL function pg_sync_replication_slots().
17 : *
18 : * If the WAL corresponding to the remote's restart_lsn is not available on the
19 : * physical standby or the remote's catalog_xmin precedes the oldest xid for
20 : * which it is guaranteed that rows wouldn't have been removed then we cannot
21 : * create the local standby slot because that would mean moving the local slot
22 : * backward and decoding won't be possible via such a slot. In this case, the
23 : * slot will be marked as RS_TEMPORARY. Once the primary server catches up,
24 : * the slot will be marked as RS_PERSISTENT (which means sync-ready) after
25 : * which slot sync worker can perform the sync periodically or user can call
26 : * pg_sync_replication_slots() periodically to perform the syncs.
27 : *
28 : * If synchronized slots fail to build a consistent snapshot from the
29 : * restart_lsn before reaching confirmed_flush_lsn, they would become
30 : * unreliable after promotion due to potential data loss from changes
31 : * before reaching a consistent point. This can happen because the slots can
32 : * be synced at some random time and we may not reach the consistent point
33 : * at the same WAL location as the primary. So, we mark such slots as
34 : * RS_TEMPORARY. Once the decoding from corresponding LSNs can reach a
35 : * consistent point, they will be marked as RS_PERSISTENT.
36 : *
37 : * The slot sync worker waits for some time before the next synchronization,
38 : * with the duration varying based on whether any slots were updated during
39 : * the last cycle. Refer to the comments above wait_for_slot_activity() for
40 : * more details.
41 : *
42 : * Any standby synchronized slots will be dropped if they no longer need
43 : * to be synchronized. See comment atop drop_local_obsolete_slots() for more
44 : * details.
45 : *---------------------------------------------------------------------------
46 : */
47 :
48 : #include "postgres.h"
49 :
50 : #include <time.h>
51 :
52 : #include "access/xlog_internal.h"
53 : #include "access/xlogrecovery.h"
54 : #include "catalog/pg_database.h"
55 : #include "commands/dbcommands.h"
56 : #include "libpq/pqsignal.h"
57 : #include "pgstat.h"
58 : #include "postmaster/fork_process.h"
59 : #include "postmaster/interrupt.h"
60 : #include "postmaster/postmaster.h"
61 : #include "replication/logical.h"
62 : #include "replication/slotsync.h"
63 : #include "replication/snapbuild.h"
64 : #include "storage/ipc.h"
65 : #include "storage/lmgr.h"
66 : #include "storage/proc.h"
67 : #include "storage/procarray.h"
68 : #include "tcop/tcopprot.h"
69 : #include "utils/builtins.h"
70 : #include "utils/pg_lsn.h"
71 : #include "utils/ps_status.h"
72 : #include "utils/timeout.h"
73 :
74 : /*
75 : * Struct for sharing information to control slot synchronization.
76 : *
77 : * The slot sync worker's pid is needed by the startup process to shut it
78 : * down during promotion. The startup process shuts down the slot sync worker
79 : * and also sets stopSignaled=true to handle the race condition when the
80 : * postmaster has not noticed the promotion yet and thus may end up restarting
81 : * the slot sync worker. If stopSignaled is set, the worker will exit in such a
82 : * case. The SQL function pg_sync_replication_slots() will also error out if
83 : * this flag is set. Note that we don't need to reset this variable as after
84 : * promotion the slot sync worker won't be restarted because the pmState
85 : * changes to PM_RUN from PM_HOT_STANDBY and we don't support demoting
86 : * primary without restarting the server. See MaybeStartSlotSyncWorker.
87 : *
88 : * The 'syncing' flag is needed to prevent concurrent slot syncs to avoid slot
89 : * overwrites.
90 : *
91 : * The 'last_start_time' is needed by postmaster to start the slot sync worker
92 : * once per SLOTSYNC_RESTART_INTERVAL_SEC. In cases where an immediate restart
93 : * is expected (e.g., slot sync GUCs change), slot sync worker will reset
94 : * last_start_time before exiting, so that postmaster can start the worker
95 : * without waiting for SLOTSYNC_RESTART_INTERVAL_SEC.
96 : */
97 : typedef struct SlotSyncCtxStruct
98 : {
99 : pid_t pid;
100 : bool stopSignaled;
101 : bool syncing;
102 : time_t last_start_time;
103 : slock_t mutex;
104 : } SlotSyncCtxStruct;
105 :
106 : SlotSyncCtxStruct *SlotSyncCtx = NULL;
107 :
108 : /* GUC variable */
109 : bool sync_replication_slots = false;
110 :
111 : /*
112 : * The sleep time (ms) between slot-sync cycles varies dynamically
113 : * (within a MIN/MAX range) according to slot activity. See
114 : * wait_for_slot_activity() for details.
115 : */
116 : #define MIN_SLOTSYNC_WORKER_NAPTIME_MS 200
117 : #define MAX_SLOTSYNC_WORKER_NAPTIME_MS 30000 /* 30s */
118 :
119 : static long sleep_ms = MIN_SLOTSYNC_WORKER_NAPTIME_MS;
120 :
121 : /* The restart interval for slot sync work used by postmaster */
122 : #define SLOTSYNC_RESTART_INTERVAL_SEC 10
123 :
124 : /*
125 : * Flag to tell if we are syncing replication slots. Unlike the 'syncing' flag
126 : * in SlotSyncCtxStruct, this flag is true only if the current process is
127 : * performing slot synchronization.
128 : */
129 : static bool syncing_slots = false;
130 :
131 : /*
132 : * Structure to hold information fetched from the primary server about a logical
133 : * replication slot.
134 : */
135 : typedef struct RemoteSlot
136 : {
137 : char *name;
138 : char *plugin;
139 : char *database;
140 : bool two_phase;
141 : bool failover;
142 : XLogRecPtr restart_lsn;
143 : XLogRecPtr confirmed_lsn;
144 : TransactionId catalog_xmin;
145 :
146 : /* RS_INVAL_NONE if valid, or the reason of invalidation */
147 : ReplicationSlotInvalidationCause invalidated;
148 : } RemoteSlot;
149 :
150 : static void slotsync_failure_callback(int code, Datum arg);
151 : static void update_synced_slots_inactive_since(void);
152 :
153 : /*
154 : * If necessary, update the local synced slot's metadata based on the data
155 : * from the remote slot.
156 : *
157 : * If no update was needed (the data of the remote slot is the same as the
158 : * local slot) return false, otherwise true.
159 : *
160 : * *found_consistent_snapshot will be true iff the remote slot's LSN or xmin is
161 : * modified, and decoding from the corresponding LSN's can reach a
162 : * consistent snapshot.
163 : *
164 : * *remote_slot_precedes will be true if the remote slot's LSN or xmin
165 : * precedes locally reserved position.
166 : */
167 : static bool
168 56 : update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
169 : bool *found_consistent_snapshot,
170 : bool *remote_slot_precedes)
171 : {
172 56 : ReplicationSlot *slot = MyReplicationSlot;
173 56 : bool updated_xmin_or_lsn = false;
174 56 : bool updated_config = false;
175 :
176 : Assert(slot->data.invalidated == RS_INVAL_NONE);
177 :
178 56 : if (found_consistent_snapshot)
179 8 : *found_consistent_snapshot = false;
180 :
181 56 : if (remote_slot_precedes)
182 8 : *remote_slot_precedes = false;
183 :
184 : /*
185 : * Don't overwrite if we already have a newer catalog_xmin and
186 : * restart_lsn.
187 : */
188 112 : if (remote_slot->restart_lsn < slot->data.restart_lsn ||
189 56 : TransactionIdPrecedes(remote_slot->catalog_xmin,
190 : slot->data.catalog_xmin))
191 : {
192 : /*
193 : * This can happen in following situations:
194 : *
195 : * If the slot is temporary, it means either the initial WAL location
196 : * reserved for the local slot is ahead of the remote slot's
197 : * restart_lsn or the initial xmin_horizon computed for the local slot
198 : * is ahead of the remote slot.
199 : *
200 : * If the slot is persistent, restart_lsn of the synced slot could
201 : * still be ahead of the remote slot. Since we use slot advance
202 : * functionality to keep snapbuild/slot updated, it is possible that
203 : * the restart_lsn is advanced to a later position than it has on the
204 : * primary. This can happen when slot advancing machinery finds
205 : * running xacts record after reaching the consistent state at a later
206 : * point than the primary where it serializes the snapshot and updates
207 : * the restart_lsn.
208 : *
209 : * We LOG the message if the slot is temporary as it can help the user
210 : * to understand why the slot is not sync-ready. In the case of a
211 : * persistent slot, it would be a more common case and won't directly
212 : * impact the users, so we used DEBUG1 level to log the message.
213 : */
214 0 : ereport(slot->data.persistency == RS_TEMPORARY ? LOG : DEBUG1,
215 : errmsg("could not sync slot \"%s\" as remote slot precedes local slot",
216 : remote_slot->name),
217 : errdetail("Remote slot has LSN %X/%X and catalog xmin %u, but local slot has LSN %X/%X and catalog xmin %u.",
218 : LSN_FORMAT_ARGS(remote_slot->restart_lsn),
219 : remote_slot->catalog_xmin,
220 : LSN_FORMAT_ARGS(slot->data.restart_lsn),
221 : slot->data.catalog_xmin));
222 :
223 0 : if (remote_slot_precedes)
224 0 : *remote_slot_precedes = true;
225 : }
226 :
227 : /*
228 : * Attempt to sync LSNs and xmins only if remote slot is ahead of local
229 : * slot.
230 : */
231 56 : else if (remote_slot->confirmed_lsn > slot->data.confirmed_flush ||
232 74 : remote_slot->restart_lsn > slot->data.restart_lsn ||
233 36 : TransactionIdFollows(remote_slot->catalog_xmin,
234 : slot->data.catalog_xmin))
235 : {
236 : /*
237 : * We can't directly copy the remote slot's LSN or xmin unless there
238 : * exists a consistent snapshot at that point. Otherwise, after
239 : * promotion, the slots may not reach a consistent point before the
240 : * confirmed_flush_lsn which can lead to a data loss. To avoid data
241 : * loss, we let slot machinery advance the slot which ensures that
242 : * snapbuilder/slot statuses are updated properly.
243 : */
244 20 : if (SnapBuildSnapshotExists(remote_slot->restart_lsn))
245 : {
246 : /*
247 : * Update the slot info directly if there is a serialized snapshot
248 : * at the restart_lsn, as the slot can quickly reach consistency
249 : * at restart_lsn by restoring the snapshot.
250 : */
251 4 : SpinLockAcquire(&slot->mutex);
252 4 : slot->data.restart_lsn = remote_slot->restart_lsn;
253 4 : slot->data.confirmed_flush = remote_slot->confirmed_lsn;
254 4 : slot->data.catalog_xmin = remote_slot->catalog_xmin;
255 4 : SpinLockRelease(&slot->mutex);
256 :
257 4 : if (found_consistent_snapshot)
258 0 : *found_consistent_snapshot = true;
259 : }
260 : else
261 : {
262 16 : LogicalSlotAdvanceAndCheckSnapState(remote_slot->confirmed_lsn,
263 : found_consistent_snapshot);
264 :
265 : /* Sanity check */
266 16 : if (slot->data.confirmed_flush != remote_slot->confirmed_lsn)
267 0 : ereport(ERROR,
268 : errmsg_internal("synchronized confirmed_flush for slot \"%s\" differs from remote slot",
269 : remote_slot->name),
270 : errdetail_internal("Remote slot has LSN %X/%X but local slot has LSN %X/%X.",
271 : LSN_FORMAT_ARGS(remote_slot->confirmed_lsn),
272 : LSN_FORMAT_ARGS(slot->data.confirmed_flush)));
273 : }
274 :
275 20 : updated_xmin_or_lsn = true;
276 : }
277 :
278 56 : if (remote_dbid != slot->data.database ||
279 56 : remote_slot->two_phase != slot->data.two_phase ||
280 56 : remote_slot->failover != slot->data.failover ||
281 56 : strcmp(remote_slot->plugin, NameStr(slot->data.plugin)) != 0)
282 : {
283 : NameData plugin_name;
284 :
285 : /* Avoid expensive operations while holding a spinlock. */
286 0 : namestrcpy(&plugin_name, remote_slot->plugin);
287 :
288 0 : SpinLockAcquire(&slot->mutex);
289 0 : slot->data.plugin = plugin_name;
290 0 : slot->data.database = remote_dbid;
291 0 : slot->data.two_phase = remote_slot->two_phase;
292 0 : slot->data.failover = remote_slot->failover;
293 0 : SpinLockRelease(&slot->mutex);
294 :
295 0 : updated_config = true;
296 : }
297 :
298 : /*
299 : * We have to write the changed xmin to disk *before* we change the
300 : * in-memory value, otherwise after a crash we wouldn't know that some
301 : * catalog tuples might have been removed already.
302 : */
303 56 : if (updated_config || updated_xmin_or_lsn)
304 : {
305 20 : ReplicationSlotMarkDirty();
306 20 : ReplicationSlotSave();
307 : }
308 :
309 : /*
310 : * Now the new xmin is safely on disk, we can let the global value
311 : * advance. We do not take ProcArrayLock or similar since we only advance
312 : * xmin here and there's not much harm done by a concurrent computation
313 : * missing that.
314 : */
315 56 : if (updated_xmin_or_lsn)
316 : {
317 20 : SpinLockAcquire(&slot->mutex);
318 20 : slot->effective_catalog_xmin = remote_slot->catalog_xmin;
319 20 : SpinLockRelease(&slot->mutex);
320 :
321 20 : ReplicationSlotsComputeRequiredXmin(false);
322 20 : ReplicationSlotsComputeRequiredLSN();
323 : }
324 :
325 56 : return updated_config || updated_xmin_or_lsn;
326 : }
327 :
328 : /*
329 : * Get the list of local logical slots that are synchronized from the
330 : * primary server.
331 : */
332 : static List *
333 30 : get_local_synced_slots(void)
334 : {
335 30 : List *local_slots = NIL;
336 :
337 30 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
338 :
339 330 : for (int i = 0; i < max_replication_slots; i++)
340 : {
341 300 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
342 :
343 : /* Check if it is a synchronized slot */
344 300 : if (s->in_use && s->data.synced)
345 : {
346 : Assert(SlotIsLogical(s));
347 52 : local_slots = lappend(local_slots, s);
348 : }
349 : }
350 :
351 30 : LWLockRelease(ReplicationSlotControlLock);
352 :
353 30 : return local_slots;
354 : }
355 :
356 : /*
357 : * Helper function to check if local_slot is required to be retained.
358 : *
359 : * Return false either if local_slot does not exist in the remote_slots list
360 : * or is invalidated while the corresponding remote slot is still valid,
361 : * otherwise true.
362 : */
363 : static bool
364 52 : local_sync_slot_required(ReplicationSlot *local_slot, List *remote_slots)
365 : {
366 52 : bool remote_exists = false;
367 52 : bool locally_invalidated = false;
368 :
369 128 : foreach_ptr(RemoteSlot, remote_slot, remote_slots)
370 : {
371 74 : if (strcmp(remote_slot->name, NameStr(local_slot->data.name)) == 0)
372 : {
373 50 : remote_exists = true;
374 :
375 : /*
376 : * If remote slot is not invalidated but local slot is marked as
377 : * invalidated, then set locally_invalidated flag.
378 : */
379 50 : SpinLockAcquire(&local_slot->mutex);
380 50 : locally_invalidated =
381 100 : (remote_slot->invalidated == RS_INVAL_NONE) &&
382 50 : (local_slot->data.invalidated != RS_INVAL_NONE);
383 50 : SpinLockRelease(&local_slot->mutex);
384 :
385 50 : break;
386 : }
387 : }
388 :
389 52 : return (remote_exists && !locally_invalidated);
390 : }
391 :
392 : /*
393 : * Drop local obsolete slots.
394 : *
395 : * Drop the local slots that no longer need to be synced i.e. these either do
396 : * not exist on the primary or are no longer enabled for failover.
397 : *
398 : * Additionally, drop any slots that are valid on the primary but got
399 : * invalidated on the standby. This situation may occur due to the following
400 : * reasons:
401 : * - The 'max_slot_wal_keep_size' on the standby is insufficient to retain WAL
402 : * records from the restart_lsn of the slot.
403 : * - 'primary_slot_name' is temporarily reset to null and the physical slot is
404 : * removed.
405 : * These dropped slots will get recreated in next sync-cycle and it is okay to
406 : * drop and recreate such slots as long as these are not consumable on the
407 : * standby (which is the case currently).
408 : *
409 : * Note: Change of 'wal_level' on the primary server to a level lower than
410 : * logical may also result in slot invalidation and removal on the standby.
411 : * This is because such 'wal_level' change is only possible if the logical
412 : * slots are removed on the primary server, so it's expected to see the
413 : * slots being invalidated and removed on the standby too (and re-created
414 : * if they are re-created on the primary server).
415 : */
416 : static void
417 30 : drop_local_obsolete_slots(List *remote_slot_list)
418 : {
419 30 : List *local_slots = get_local_synced_slots();
420 :
421 112 : foreach_ptr(ReplicationSlot, local_slot, local_slots)
422 : {
423 : /* Drop the local slot if it is not required to be retained. */
424 52 : if (!local_sync_slot_required(local_slot, remote_slot_list))
425 : {
426 : bool synced_slot;
427 :
428 : /*
429 : * Use shared lock to prevent a conflict with
430 : * ReplicationSlotsDropDBSlots(), trying to drop the same slot
431 : * during a drop-database operation.
432 : */
433 4 : LockSharedObject(DatabaseRelationId, local_slot->data.database,
434 : 0, AccessShareLock);
435 :
436 : /*
437 : * In the small window between getting the slot to drop and
438 : * locking the database, there is a possibility of a parallel
439 : * database drop by the startup process and the creation of a new
440 : * slot by the user. This new user-created slot may end up using
441 : * the same shared memory as that of 'local_slot'. Thus check if
442 : * local_slot is still the synced one before performing actual
443 : * drop.
444 : */
445 4 : SpinLockAcquire(&local_slot->mutex);
446 4 : synced_slot = local_slot->in_use && local_slot->data.synced;
447 4 : SpinLockRelease(&local_slot->mutex);
448 :
449 4 : if (synced_slot)
450 : {
451 4 : ReplicationSlotAcquire(NameStr(local_slot->data.name), true);
452 4 : ReplicationSlotDropAcquired();
453 : }
454 :
455 4 : UnlockSharedObject(DatabaseRelationId, local_slot->data.database,
456 : 0, AccessShareLock);
457 :
458 4 : ereport(LOG,
459 : errmsg("dropped replication slot \"%s\" of dbid %d",
460 : NameStr(local_slot->data.name),
461 : local_slot->data.database));
462 : }
463 : }
464 30 : }
465 :
466 : /*
467 : * Reserve WAL for the currently active local slot using the specified WAL
468 : * location (restart_lsn).
469 : *
470 : * If the given WAL location has been removed, reserve WAL using the oldest
471 : * existing WAL segment.
472 : */
473 : static void
474 8 : reserve_wal_for_local_slot(XLogRecPtr restart_lsn)
475 : {
476 : XLogSegNo oldest_segno;
477 : XLogSegNo segno;
478 8 : ReplicationSlot *slot = MyReplicationSlot;
479 :
480 : Assert(slot != NULL);
481 : Assert(XLogRecPtrIsInvalid(slot->data.restart_lsn));
482 :
483 : while (true)
484 : {
485 8 : SpinLockAcquire(&slot->mutex);
486 8 : slot->data.restart_lsn = restart_lsn;
487 8 : SpinLockRelease(&slot->mutex);
488 :
489 : /* Prevent WAL removal as fast as possible */
490 8 : ReplicationSlotsComputeRequiredLSN();
491 :
492 8 : XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size);
493 :
494 : /*
495 : * Find the oldest existing WAL segment file.
496 : *
497 : * Normally, we can determine it by using the last removed segment
498 : * number. However, if no WAL segment files have been removed by a
499 : * checkpoint since startup, we need to search for the oldest segment
500 : * file from the current timeline existing in XLOGDIR.
501 : *
502 : * XXX: Currently, we are searching for the oldest segment in the
503 : * current timeline as there is less chance of the slot's restart_lsn
504 : * from being some prior timeline, and even if it happens, in the
505 : * worst case, we will wait to sync till the slot's restart_lsn moved
506 : * to the current timeline.
507 : */
508 8 : oldest_segno = XLogGetLastRemovedSegno() + 1;
509 :
510 8 : if (oldest_segno == 1)
511 : {
512 : TimeLineID cur_timeline;
513 :
514 4 : GetWalRcvFlushRecPtr(NULL, &cur_timeline);
515 4 : oldest_segno = XLogGetOldestSegno(cur_timeline);
516 : }
517 :
518 8 : elog(DEBUG1, "segno: " UINT64_FORMAT " of purposed restart_lsn for the synced slot, oldest_segno: " UINT64_FORMAT " available",
519 : segno, oldest_segno);
520 :
521 : /*
522 : * If all required WAL is still there, great, otherwise retry. The
523 : * slot should prevent further removal of WAL, unless there's a
524 : * concurrent ReplicationSlotsComputeRequiredLSN() after we've written
525 : * the new restart_lsn above, so normally we should never need to loop
526 : * more than twice.
527 : */
528 8 : if (segno >= oldest_segno)
529 8 : break;
530 :
531 : /* Retry using the location of the oldest wal segment */
532 0 : XLogSegNoOffsetToRecPtr(oldest_segno, 0, wal_segment_size, restart_lsn);
533 : }
534 8 : }
535 :
536 : /*
537 : * If the remote restart_lsn and catalog_xmin have caught up with the
538 : * local ones, then update the LSNs and persist the local synced slot for
539 : * future synchronization; otherwise, do nothing.
540 : *
541 : * Return true if the slot is marked as RS_PERSISTENT (sync-ready), otherwise
542 : * false.
543 : */
544 : static bool
545 8 : update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
546 : {
547 8 : ReplicationSlot *slot = MyReplicationSlot;
548 8 : bool found_consistent_snapshot = false;
549 8 : bool remote_slot_precedes = false;
550 :
551 8 : (void) update_local_synced_slot(remote_slot, remote_dbid,
552 : &found_consistent_snapshot,
553 : &remote_slot_precedes);
554 :
555 : /*
556 : * Check if the primary server has caught up. Refer to the comment atop
557 : * the file for details on this check.
558 : */
559 8 : if (remote_slot_precedes)
560 : {
561 : /*
562 : * The remote slot didn't catch up to locally reserved position.
563 : *
564 : * We do not drop the slot because the restart_lsn can be ahead of the
565 : * current location when recreating the slot in the next cycle. It may
566 : * take more time to create such a slot. Therefore, we keep this slot
567 : * and attempt the synchronization in the next cycle.
568 : */
569 0 : return false;
570 : }
571 :
572 : /*
573 : * Don't persist the slot if it cannot reach the consistent point from the
574 : * restart_lsn. See comments atop this file.
575 : */
576 8 : if (!found_consistent_snapshot)
577 : {
578 0 : ereport(LOG,
579 : errmsg("could not sync slot \"%s\"", remote_slot->name),
580 : errdetail("Logical decoding cannot find consistent point from local slot's LSN %X/%X.",
581 : LSN_FORMAT_ARGS(slot->data.restart_lsn)));
582 :
583 0 : return false;
584 : }
585 :
586 8 : ReplicationSlotPersist();
587 :
588 8 : ereport(LOG,
589 : errmsg("newly created slot \"%s\" is sync-ready now",
590 : remote_slot->name));
591 :
592 8 : return true;
593 : }
594 :
595 : /*
596 : * Synchronize a single slot to the given position.
597 : *
598 : * This creates a new slot if there is no existing one and updates the
599 : * metadata of the slot as per the data received from the primary server.
600 : *
601 : * The slot is created as a temporary slot and stays in the same state until the
602 : * remote_slot catches up with locally reserved position and local slot is
603 : * updated. The slot is then persisted and is considered as sync-ready for
604 : * periodic syncs.
605 : *
606 : * Returns TRUE if the local slot is updated.
607 : */
608 : static bool
609 56 : synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
610 : {
611 : ReplicationSlot *slot;
612 : XLogRecPtr latestFlushPtr;
613 56 : bool slot_updated = false;
614 :
615 : /*
616 : * Make sure that concerned WAL is received and flushed before syncing
617 : * slot to target lsn received from the primary server.
618 : */
619 56 : latestFlushPtr = GetStandbyFlushRecPtr(NULL);
620 56 : if (remote_slot->confirmed_lsn > latestFlushPtr)
621 : {
622 : /*
623 : * Can get here only if GUC 'standby_slot_names' on the primary server
624 : * was not configured correctly.
625 : */
626 0 : ereport(AmLogicalSlotSyncWorkerProcess() ? LOG : ERROR,
627 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
628 : errmsg("skipping slot synchronization as the received slot sync"
629 : " LSN %X/%X for slot \"%s\" is ahead of the standby position %X/%X",
630 : LSN_FORMAT_ARGS(remote_slot->confirmed_lsn),
631 : remote_slot->name,
632 : LSN_FORMAT_ARGS(latestFlushPtr)));
633 :
634 0 : return false;
635 : }
636 :
637 : /* Search for the named slot */
638 56 : if ((slot = SearchNamedReplicationSlot(remote_slot->name, true)))
639 : {
640 : bool synced;
641 :
642 48 : SpinLockAcquire(&slot->mutex);
643 48 : synced = slot->data.synced;
644 48 : SpinLockRelease(&slot->mutex);
645 :
646 : /* User-created slot with the same name exists, raise ERROR. */
647 48 : if (!synced)
648 0 : ereport(ERROR,
649 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
650 : errmsg("exiting from slot synchronization because same"
651 : " name slot \"%s\" already exists on the standby",
652 : remote_slot->name));
653 :
654 : /*
655 : * The slot has been synchronized before.
656 : *
657 : * It is important to acquire the slot here before checking
658 : * invalidation. If we don't acquire the slot first, there could be a
659 : * race condition that the local slot could be invalidated just after
660 : * checking the 'invalidated' flag here and we could end up
661 : * overwriting 'invalidated' flag to remote_slot's value. See
662 : * InvalidatePossiblyObsoleteSlot() where it invalidates slot directly
663 : * if the slot is not acquired by other processes.
664 : *
665 : * XXX: If it ever turns out that slot acquire/release is costly for
666 : * cases when none of the slot properties is changed then we can do a
667 : * pre-check to ensure that at least one of the slot properties is
668 : * changed before acquiring the slot.
669 : */
670 48 : ReplicationSlotAcquire(remote_slot->name, true);
671 :
672 : Assert(slot == MyReplicationSlot);
673 :
674 : /*
675 : * Copy the invalidation cause from remote only if local slot is not
676 : * invalidated locally, we don't want to overwrite existing one.
677 : */
678 48 : if (slot->data.invalidated == RS_INVAL_NONE &&
679 48 : remote_slot->invalidated != RS_INVAL_NONE)
680 : {
681 0 : SpinLockAcquire(&slot->mutex);
682 0 : slot->data.invalidated = remote_slot->invalidated;
683 0 : SpinLockRelease(&slot->mutex);
684 :
685 : /* Make sure the invalidated state persists across server restart */
686 0 : ReplicationSlotMarkDirty();
687 0 : ReplicationSlotSave();
688 :
689 0 : slot_updated = true;
690 : }
691 :
692 : /* Skip the sync of an invalidated slot */
693 48 : if (slot->data.invalidated != RS_INVAL_NONE)
694 : {
695 0 : ReplicationSlotRelease();
696 0 : return slot_updated;
697 : }
698 :
699 : /* Slot not ready yet, let's attempt to make it sync-ready now. */
700 48 : if (slot->data.persistency == RS_TEMPORARY)
701 : {
702 0 : slot_updated = update_and_persist_local_synced_slot(remote_slot,
703 : remote_dbid);
704 : }
705 :
706 : /* Slot ready for sync, so sync it. */
707 : else
708 : {
709 : /*
710 : * Sanity check: As long as the invalidations are handled
711 : * appropriately as above, this should never happen.
712 : *
713 : * We don't need to check restart_lsn here. See the comments in
714 : * update_local_synced_slot() for details.
715 : */
716 48 : if (remote_slot->confirmed_lsn < slot->data.confirmed_flush)
717 0 : ereport(ERROR,
718 : errmsg_internal("cannot synchronize local slot \"%s\"",
719 : remote_slot->name),
720 : errdetail_internal("Local slot's start streaming location LSN(%X/%X) is ahead of remote slot's LSN(%X/%X).",
721 : LSN_FORMAT_ARGS(slot->data.confirmed_flush),
722 : LSN_FORMAT_ARGS(remote_slot->confirmed_lsn)));
723 :
724 48 : slot_updated = update_local_synced_slot(remote_slot, remote_dbid,
725 : NULL, NULL);
726 : }
727 : }
728 : /* Otherwise create the slot first. */
729 : else
730 : {
731 : NameData plugin_name;
732 8 : TransactionId xmin_horizon = InvalidTransactionId;
733 :
734 : /* Skip creating the local slot if remote_slot is invalidated already */
735 8 : if (remote_slot->invalidated != RS_INVAL_NONE)
736 0 : return false;
737 :
738 : /*
739 : * We create temporary slots instead of ephemeral slots here because
740 : * we want the slots to survive after releasing them. This is done to
741 : * avoid dropping and re-creating the slots in each synchronization
742 : * cycle if the restart_lsn or catalog_xmin of the remote slot has not
743 : * caught up.
744 : */
745 8 : ReplicationSlotCreate(remote_slot->name, true, RS_TEMPORARY,
746 8 : remote_slot->two_phase,
747 8 : remote_slot->failover,
748 : true);
749 :
750 : /* For shorter lines. */
751 8 : slot = MyReplicationSlot;
752 :
753 : /* Avoid expensive operations while holding a spinlock. */
754 8 : namestrcpy(&plugin_name, remote_slot->plugin);
755 :
756 8 : SpinLockAcquire(&slot->mutex);
757 8 : slot->data.database = remote_dbid;
758 8 : slot->data.plugin = plugin_name;
759 8 : SpinLockRelease(&slot->mutex);
760 :
761 8 : reserve_wal_for_local_slot(remote_slot->restart_lsn);
762 :
763 8 : LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
764 8 : xmin_horizon = GetOldestSafeDecodingTransactionId(true);
765 8 : SpinLockAcquire(&slot->mutex);
766 8 : slot->effective_catalog_xmin = xmin_horizon;
767 8 : slot->data.catalog_xmin = xmin_horizon;
768 8 : SpinLockRelease(&slot->mutex);
769 8 : ReplicationSlotsComputeRequiredXmin(true);
770 8 : LWLockRelease(ProcArrayLock);
771 :
772 8 : update_and_persist_local_synced_slot(remote_slot, remote_dbid);
773 :
774 8 : slot_updated = true;
775 : }
776 :
777 56 : ReplicationSlotRelease();
778 :
779 56 : return slot_updated;
780 : }
781 :
782 : /*
783 : * Synchronize slots.
784 : *
785 : * Gets the failover logical slots info from the primary server and updates
786 : * the slots locally. Creates the slots if not present on the standby.
787 : *
788 : * Returns TRUE if any of the slots gets updated in this sync-cycle.
789 : */
790 : static bool
791 30 : synchronize_slots(WalReceiverConn *wrconn)
792 : {
793 : #define SLOTSYNC_COLUMN_COUNT 9
794 30 : Oid slotRow[SLOTSYNC_COLUMN_COUNT] = {TEXTOID, TEXTOID, LSNOID,
795 : LSNOID, XIDOID, BOOLOID, BOOLOID, TEXTOID, TEXTOID};
796 :
797 : WalRcvExecResult *res;
798 : TupleTableSlot *tupslot;
799 30 : List *remote_slot_list = NIL;
800 30 : bool some_slot_updated = false;
801 30 : bool started_tx = false;
802 30 : const char *query = "SELECT slot_name, plugin, confirmed_flush_lsn,"
803 : " restart_lsn, catalog_xmin, two_phase, failover,"
804 : " database, invalidation_reason"
805 : " FROM pg_catalog.pg_replication_slots"
806 : " WHERE failover and NOT temporary";
807 :
808 : /* The syscache access in walrcv_exec() needs a transaction env. */
809 30 : if (!IsTransactionState())
810 : {
811 18 : StartTransactionCommand();
812 18 : started_tx = true;
813 : }
814 :
815 : /* Execute the query */
816 30 : res = walrcv_exec(wrconn, query, SLOTSYNC_COLUMN_COUNT, slotRow);
817 30 : if (res->status != WALRCV_OK_TUPLES)
818 0 : ereport(ERROR,
819 : errmsg("could not fetch failover logical slots info from the primary server: %s",
820 : res->err));
821 :
822 : /* Construct the remote_slot tuple and synchronize each slot locally */
823 30 : tupslot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
824 86 : while (tuplestore_gettupleslot(res->tuplestore, true, false, tupslot))
825 : {
826 : bool isnull;
827 56 : RemoteSlot *remote_slot = palloc0(sizeof(RemoteSlot));
828 : Datum d;
829 56 : int col = 0;
830 :
831 56 : remote_slot->name = TextDatumGetCString(slot_getattr(tupslot, ++col,
832 : &isnull));
833 : Assert(!isnull);
834 :
835 56 : remote_slot->plugin = TextDatumGetCString(slot_getattr(tupslot, ++col,
836 : &isnull));
837 : Assert(!isnull);
838 :
839 : /*
840 : * It is possible to get null values for LSN and Xmin if slot is
841 : * invalidated on the primary server, so handle accordingly.
842 : */
843 56 : d = slot_getattr(tupslot, ++col, &isnull);
844 56 : remote_slot->confirmed_lsn = isnull ? InvalidXLogRecPtr :
845 56 : DatumGetLSN(d);
846 :
847 56 : d = slot_getattr(tupslot, ++col, &isnull);
848 56 : remote_slot->restart_lsn = isnull ? InvalidXLogRecPtr : DatumGetLSN(d);
849 :
850 56 : d = slot_getattr(tupslot, ++col, &isnull);
851 56 : remote_slot->catalog_xmin = isnull ? InvalidTransactionId :
852 56 : DatumGetTransactionId(d);
853 :
854 56 : remote_slot->two_phase = DatumGetBool(slot_getattr(tupslot, ++col,
855 : &isnull));
856 : Assert(!isnull);
857 :
858 56 : remote_slot->failover = DatumGetBool(slot_getattr(tupslot, ++col,
859 : &isnull));
860 : Assert(!isnull);
861 :
862 56 : remote_slot->database = TextDatumGetCString(slot_getattr(tupslot,
863 : ++col, &isnull));
864 : Assert(!isnull);
865 :
866 56 : d = slot_getattr(tupslot, ++col, &isnull);
867 56 : remote_slot->invalidated = isnull ? RS_INVAL_NONE :
868 0 : GetSlotInvalidationCause(TextDatumGetCString(d));
869 :
870 : /* Sanity check */
871 : Assert(col == SLOTSYNC_COLUMN_COUNT);
872 :
873 : /*
874 : * If restart_lsn, confirmed_lsn or catalog_xmin is invalid but the
875 : * slot is valid, that means we have fetched the remote_slot in its
876 : * RS_EPHEMERAL state. In such a case, don't sync it; we can always
877 : * sync it in the next sync cycle when the remote_slot is persisted
878 : * and has valid lsn(s) and xmin values.
879 : *
880 : * XXX: In future, if we plan to expose 'slot->data.persistency' in
881 : * pg_replication_slots view, then we can avoid fetching RS_EPHEMERAL
882 : * slots in the first place.
883 : */
884 56 : if ((XLogRecPtrIsInvalid(remote_slot->restart_lsn) ||
885 56 : XLogRecPtrIsInvalid(remote_slot->confirmed_lsn) ||
886 56 : !TransactionIdIsValid(remote_slot->catalog_xmin)) &&
887 0 : remote_slot->invalidated == RS_INVAL_NONE)
888 0 : pfree(remote_slot);
889 : else
890 : /* Create list of remote slots */
891 56 : remote_slot_list = lappend(remote_slot_list, remote_slot);
892 :
893 56 : ExecClearTuple(tupslot);
894 : }
895 :
896 : /* Drop local slots that no longer need to be synced. */
897 30 : drop_local_obsolete_slots(remote_slot_list);
898 :
899 : /* Now sync the slots locally */
900 116 : foreach_ptr(RemoteSlot, remote_slot, remote_slot_list)
901 : {
902 56 : Oid remote_dbid = get_database_oid(remote_slot->database, false);
903 :
904 : /*
905 : * Use shared lock to prevent a conflict with
906 : * ReplicationSlotsDropDBSlots(), trying to drop the same slot during
907 : * a drop-database operation.
908 : */
909 56 : LockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock);
910 :
911 56 : some_slot_updated |= synchronize_one_slot(remote_slot, remote_dbid);
912 :
913 56 : UnlockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock);
914 : }
915 :
916 : /* We are done, free remote_slot_list elements */
917 30 : list_free_deep(remote_slot_list);
918 :
919 30 : walrcv_clear_result(res);
920 :
921 30 : if (started_tx)
922 18 : CommitTransactionCommand();
923 :
924 30 : return some_slot_updated;
925 : }
926 :
927 : /*
928 : * Checks the remote server info.
929 : *
930 : * We ensure that the 'primary_slot_name' exists on the remote server and the
931 : * remote server is not a standby node.
932 : */
933 : static void
934 22 : validate_remote_info(WalReceiverConn *wrconn)
935 : {
936 : #define PRIMARY_INFO_OUTPUT_COL_COUNT 2
937 : WalRcvExecResult *res;
938 22 : Oid slotRow[PRIMARY_INFO_OUTPUT_COL_COUNT] = {BOOLOID, BOOLOID};
939 : StringInfoData cmd;
940 : bool isnull;
941 : TupleTableSlot *tupslot;
942 : bool remote_in_recovery;
943 : bool primary_slot_valid;
944 22 : bool started_tx = false;
945 :
946 22 : initStringInfo(&cmd);
947 22 : appendStringInfo(&cmd,
948 : "SELECT pg_is_in_recovery(), count(*) = 1"
949 : " FROM pg_catalog.pg_replication_slots"
950 : " WHERE slot_type='physical' AND slot_name=%s",
951 : quote_literal_cstr(PrimarySlotName));
952 :
953 : /* The syscache access in walrcv_exec() needs a transaction env. */
954 22 : if (!IsTransactionState())
955 : {
956 8 : StartTransactionCommand();
957 8 : started_tx = true;
958 : }
959 :
960 22 : res = walrcv_exec(wrconn, cmd.data, PRIMARY_INFO_OUTPUT_COL_COUNT, slotRow);
961 22 : pfree(cmd.data);
962 :
963 22 : if (res->status != WALRCV_OK_TUPLES)
964 0 : ereport(ERROR,
965 : errmsg("could not fetch primary_slot_name \"%s\" info from the primary server: %s",
966 : PrimarySlotName, res->err),
967 : errhint("Check if primary_slot_name is configured correctly."));
968 :
969 22 : tupslot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
970 22 : if (!tuplestore_gettupleslot(res->tuplestore, true, false, tupslot))
971 0 : elog(ERROR,
972 : "failed to fetch tuple for the primary server slot specified by primary_slot_name");
973 :
974 22 : remote_in_recovery = DatumGetBool(slot_getattr(tupslot, 1, &isnull));
975 : Assert(!isnull);
976 :
977 : /*
978 : * Slot sync is currently not supported on a cascading standby. This is
979 : * because if we allow it, the primary server needs to wait for all the
980 : * cascading standbys, otherwise, logical subscribers can still be ahead
981 : * of one of the cascading standbys which we plan to promote. Thus, to
982 : * avoid this additional complexity, we restrict it for the time being.
983 : */
984 22 : if (remote_in_recovery)
985 2 : ereport(ERROR,
986 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
987 : errmsg("cannot synchronize replication slots from a standby server"));
988 :
989 20 : primary_slot_valid = DatumGetBool(slot_getattr(tupslot, 2, &isnull));
990 : Assert(!isnull);
991 :
992 20 : if (!primary_slot_valid)
993 0 : ereport(ERROR,
994 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
995 : errmsg("slot synchronization requires valid primary_slot_name"),
996 : /* translator: second %s is a GUC variable name */
997 : errdetail("The replication slot \"%s\" specified by %s does not exist on the primary server.",
998 : PrimarySlotName, "primary_slot_name"));
999 :
1000 20 : ExecClearTuple(tupslot);
1001 20 : walrcv_clear_result(res);
1002 :
1003 20 : if (started_tx)
1004 8 : CommitTransactionCommand();
1005 20 : }
1006 :
1007 : /*
1008 : * Checks if dbname is specified in 'primary_conninfo'.
1009 : *
1010 : * Error out if not specified otherwise return it.
1011 : */
1012 : char *
1013 24 : CheckAndGetDbnameFromConninfo(void)
1014 : {
1015 : char *dbname;
1016 :
1017 : /*
1018 : * The slot synchronization needs a database connection for walrcv_exec to
1019 : * work.
1020 : */
1021 24 : dbname = walrcv_get_dbname_from_conninfo(PrimaryConnInfo);
1022 24 : if (dbname == NULL)
1023 2 : ereport(ERROR,
1024 :
1025 : /*
1026 : * translator: dbname is a specific option; %s is a GUC variable name
1027 : */
1028 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1029 : errmsg("slot synchronization requires dbname to be specified in %s",
1030 : "primary_conninfo"));
1031 22 : return dbname;
1032 : }
1033 :
1034 : /*
1035 : * Return true if all necessary GUCs for slot synchronization are set
1036 : * appropriately, otherwise, return false.
1037 : */
1038 : bool
1039 28 : ValidateSlotSyncParams(int elevel)
1040 : {
1041 : /*
1042 : * Logical slot sync/creation requires wal_level >= logical.
1043 : *
1044 : * Sincle altering the wal_level requires a server restart, so error out
1045 : * in this case regardless of elevel provided by caller.
1046 : */
1047 28 : if (wal_level < WAL_LEVEL_LOGICAL)
1048 0 : ereport(ERROR,
1049 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1050 : errmsg("slot synchronization requires wal_level >= \"logical\""));
1051 :
1052 : /*
1053 : * A physical replication slot(primary_slot_name) is required on the
1054 : * primary to ensure that the rows needed by the standby are not removed
1055 : * after restarting, so that the synchronized slot on the standby will not
1056 : * be invalidated.
1057 : */
1058 28 : if (PrimarySlotName == NULL || *PrimarySlotName == '\0')
1059 : {
1060 0 : ereport(elevel,
1061 : /* translator: %s is a GUC variable name */
1062 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1063 : errmsg("slot synchronization requires %s to be defined", "primary_slot_name"));
1064 0 : return false;
1065 : }
1066 :
1067 : /*
1068 : * hot_standby_feedback must be enabled to cooperate with the physical
1069 : * replication slot, which allows informing the primary about the xmin and
1070 : * catalog_xmin values on the standby.
1071 : */
1072 28 : if (!hot_standby_feedback)
1073 : {
1074 2 : ereport(elevel,
1075 : /* translator: %s is a GUC variable name */
1076 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1077 : errmsg("slot synchronization requires %s to be enabled",
1078 : "hot_standby_feedback"));
1079 2 : return false;
1080 : }
1081 :
1082 : /*
1083 : * The primary_conninfo is required to make connection to primary for
1084 : * getting slots information.
1085 : */
1086 26 : if (PrimaryConnInfo == NULL || *PrimaryConnInfo == '\0')
1087 : {
1088 0 : ereport(elevel,
1089 : /* translator: %s is a GUC variable name */
1090 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1091 : errmsg("slot synchronization requires %s to be defined",
1092 : "primary_conninfo"));
1093 0 : return false;
1094 : }
1095 :
1096 26 : return true;
1097 : }
1098 :
1099 : /*
1100 : * Re-read the config file.
1101 : *
1102 : * Exit if any of the slot sync GUCs have changed. The postmaster will
1103 : * restart it.
1104 : */
1105 : static void
1106 2 : slotsync_reread_config(void)
1107 : {
1108 2 : char *old_primary_conninfo = pstrdup(PrimaryConnInfo);
1109 2 : char *old_primary_slotname = pstrdup(PrimarySlotName);
1110 2 : bool old_sync_replication_slots = sync_replication_slots;
1111 2 : bool old_hot_standby_feedback = hot_standby_feedback;
1112 : bool conninfo_changed;
1113 : bool primary_slotname_changed;
1114 :
1115 : Assert(sync_replication_slots);
1116 :
1117 2 : ConfigReloadPending = false;
1118 2 : ProcessConfigFile(PGC_SIGHUP);
1119 :
1120 2 : conninfo_changed = strcmp(old_primary_conninfo, PrimaryConnInfo) != 0;
1121 2 : primary_slotname_changed = strcmp(old_primary_slotname, PrimarySlotName) != 0;
1122 2 : pfree(old_primary_conninfo);
1123 2 : pfree(old_primary_slotname);
1124 :
1125 2 : if (old_sync_replication_slots != sync_replication_slots)
1126 : {
1127 0 : ereport(LOG,
1128 : /* translator: %s is a GUC variable name */
1129 : errmsg("slot sync worker will shutdown because %s is disabled", "sync_replication_slots"));
1130 0 : proc_exit(0);
1131 : }
1132 :
1133 2 : if (conninfo_changed ||
1134 2 : primary_slotname_changed ||
1135 2 : (old_hot_standby_feedback != hot_standby_feedback))
1136 : {
1137 2 : ereport(LOG,
1138 : errmsg("slot sync worker will restart because of a parameter change"));
1139 :
1140 : /*
1141 : * Reset the last-start time for this worker so that the postmaster
1142 : * can restart it without waiting for SLOTSYNC_RESTART_INTERVAL_SEC.
1143 : */
1144 2 : SlotSyncCtx->last_start_time = 0;
1145 :
1146 2 : proc_exit(0);
1147 : }
1148 :
1149 0 : }
1150 :
1151 : /*
1152 : * Interrupt handler for main loop of slot sync worker.
1153 : */
1154 : static void
1155 26 : ProcessSlotSyncInterrupts(WalReceiverConn *wrconn)
1156 : {
1157 26 : CHECK_FOR_INTERRUPTS();
1158 :
1159 22 : if (ShutdownRequestPending)
1160 : {
1161 2 : ereport(LOG,
1162 : errmsg("slot sync worker is shutting down on receiving SIGINT"));
1163 :
1164 2 : proc_exit(0);
1165 : }
1166 :
1167 20 : if (ConfigReloadPending)
1168 2 : slotsync_reread_config();
1169 18 : }
1170 :
1171 : /*
1172 : * Connection cleanup function for slotsync worker.
1173 : *
1174 : * Called on slotsync worker exit.
1175 : */
1176 : static void
1177 8 : slotsync_worker_disconnect(int code, Datum arg)
1178 : {
1179 8 : WalReceiverConn *wrconn = (WalReceiverConn *) DatumGetPointer(arg);
1180 :
1181 8 : walrcv_disconnect(wrconn);
1182 8 : }
1183 :
1184 : /*
1185 : * Cleanup function for slotsync worker.
1186 : *
1187 : * Called on slotsync worker exit.
1188 : */
1189 : static void
1190 8 : slotsync_worker_onexit(int code, Datum arg)
1191 : {
1192 : /*
1193 : * We need to do slots cleanup here just like WalSndErrorCleanup() does.
1194 : *
1195 : * The startup process during promotion invokes ShutDownSlotSync() which
1196 : * waits for slot sync to finish and it does that by checking the
1197 : * 'syncing' flag. Thus the slot sync worker must be done with slots'
1198 : * release and cleanup to avoid any dangling temporary slots or active
1199 : * slots before it marks itself as finished syncing.
1200 : */
1201 :
1202 : /* Make sure active replication slots are released */
1203 8 : if (MyReplicationSlot != NULL)
1204 0 : ReplicationSlotRelease();
1205 :
1206 : /* Also cleanup the temporary slots. */
1207 8 : ReplicationSlotCleanup(false);
1208 :
1209 8 : SpinLockAcquire(&SlotSyncCtx->mutex);
1210 :
1211 8 : SlotSyncCtx->pid = InvalidPid;
1212 :
1213 : /*
1214 : * If syncing_slots is true, it indicates that the process errored out
1215 : * without resetting the flag. So, we need to clean up shared memory and
1216 : * reset the flag here.
1217 : */
1218 8 : if (syncing_slots)
1219 : {
1220 8 : SlotSyncCtx->syncing = false;
1221 8 : syncing_slots = false;
1222 : }
1223 :
1224 8 : SpinLockRelease(&SlotSyncCtx->mutex);
1225 8 : }
1226 :
1227 : /*
1228 : * Sleep for long enough that we believe it's likely that the slots on primary
1229 : * get updated.
1230 : *
1231 : * If there is no slot activity the wait time between sync-cycles will double
1232 : * (to a maximum of 30s). If there is some slot activity the wait time between
1233 : * sync-cycles is reset to the minimum (200ms).
1234 : */
1235 : static void
1236 18 : wait_for_slot_activity(bool some_slot_updated)
1237 : {
1238 : int rc;
1239 :
1240 18 : if (!some_slot_updated)
1241 : {
1242 : /*
1243 : * No slots were updated, so double the sleep time, but not beyond the
1244 : * maximum allowable value.
1245 : */
1246 8 : sleep_ms = Min(sleep_ms * 2, MAX_SLOTSYNC_WORKER_NAPTIME_MS);
1247 : }
1248 : else
1249 : {
1250 : /*
1251 : * Some slots were updated since the last sleep, so reset the sleep
1252 : * time.
1253 : */
1254 10 : sleep_ms = MIN_SLOTSYNC_WORKER_NAPTIME_MS;
1255 : }
1256 :
1257 18 : rc = WaitLatch(MyLatch,
1258 : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
1259 : sleep_ms,
1260 : WAIT_EVENT_REPLICATION_SLOTSYNC_MAIN);
1261 :
1262 18 : if (rc & WL_LATCH_SET)
1263 8 : ResetLatch(MyLatch);
1264 18 : }
1265 :
1266 : /*
1267 : * Emit an error if a promotion or a concurrent sync call is in progress.
1268 : * Otherwise, advertise that a sync is in progress.
1269 : */
1270 : static void
1271 22 : check_and_set_sync_info(pid_t worker_pid)
1272 : {
1273 22 : SpinLockAcquire(&SlotSyncCtx->mutex);
1274 :
1275 : /* The worker pid must not be already assigned in SlotSyncCtx */
1276 : Assert(worker_pid == InvalidPid || SlotSyncCtx->pid == InvalidPid);
1277 :
1278 : /*
1279 : * Emit an error if startup process signaled the slot sync machinery to
1280 : * stop. See comments atop SlotSyncCtxStruct.
1281 : */
1282 22 : if (SlotSyncCtx->stopSignaled)
1283 : {
1284 0 : SpinLockRelease(&SlotSyncCtx->mutex);
1285 0 : ereport(ERROR,
1286 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1287 : errmsg("cannot synchronize replication slots when standby promotion is ongoing"));
1288 : }
1289 :
1290 22 : if (SlotSyncCtx->syncing)
1291 : {
1292 0 : SpinLockRelease(&SlotSyncCtx->mutex);
1293 0 : ereport(ERROR,
1294 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1295 : errmsg("cannot synchronize replication slots concurrently"));
1296 : }
1297 :
1298 22 : SlotSyncCtx->syncing = true;
1299 :
1300 : /*
1301 : * Advertise the required PID so that the startup process can kill the
1302 : * slot sync worker on promotion.
1303 : */
1304 22 : SlotSyncCtx->pid = worker_pid;
1305 :
1306 22 : SpinLockRelease(&SlotSyncCtx->mutex);
1307 :
1308 22 : syncing_slots = true;
1309 22 : }
1310 :
1311 : /*
1312 : * Reset syncing flag.
1313 : */
1314 : static void
1315 14 : reset_syncing_flag()
1316 : {
1317 14 : SpinLockAcquire(&SlotSyncCtx->mutex);
1318 14 : SlotSyncCtx->syncing = false;
1319 14 : SpinLockRelease(&SlotSyncCtx->mutex);
1320 :
1321 14 : syncing_slots = false;
1322 14 : };
1323 :
1324 : /*
1325 : * The main loop of our worker process.
1326 : *
1327 : * It connects to the primary server, fetches logical failover slots
1328 : * information periodically in order to create and sync the slots.
1329 : */
1330 : void
1331 8 : ReplSlotSyncWorkerMain(char *startup_data, size_t startup_data_len)
1332 : {
1333 8 : WalReceiverConn *wrconn = NULL;
1334 : char *dbname;
1335 : char *err;
1336 : sigjmp_buf local_sigjmp_buf;
1337 : StringInfoData app_name;
1338 :
1339 : Assert(startup_data_len == 0);
1340 :
1341 8 : MyBackendType = B_SLOTSYNC_WORKER;
1342 :
1343 8 : init_ps_display(NULL);
1344 :
1345 8 : SetProcessingMode(InitProcessing);
1346 :
1347 : /*
1348 : * Create a per-backend PGPROC struct in shared memory. We must do this
1349 : * before we access any shared memory.
1350 : */
1351 8 : InitProcess();
1352 :
1353 : /*
1354 : * Early initialization.
1355 : */
1356 8 : BaseInit();
1357 :
1358 : Assert(SlotSyncCtx != NULL);
1359 :
1360 : /*
1361 : * If an exception is encountered, processing resumes here.
1362 : *
1363 : * We just need to clean up, report the error, and go away.
1364 : *
1365 : * If we do not have this handling here, then since this worker process
1366 : * operates at the bottom of the exception stack, ERRORs turn into FATALs.
1367 : * Therefore, we create our own exception handler to catch ERRORs.
1368 : */
1369 8 : if (sigsetjmp(local_sigjmp_buf, 1) != 0)
1370 : {
1371 : /* since not using PG_TRY, must reset error stack by hand */
1372 0 : error_context_stack = NULL;
1373 :
1374 : /* Prevents interrupts while cleaning up */
1375 0 : HOLD_INTERRUPTS();
1376 :
1377 : /* Report the error to the server log */
1378 0 : EmitErrorReport();
1379 :
1380 : /*
1381 : * We can now go away. Note that because we called InitProcess, a
1382 : * callback was registered to do ProcKill, which will clean up
1383 : * necessary state.
1384 : */
1385 0 : proc_exit(0);
1386 : }
1387 :
1388 : /* We can now handle ereport(ERROR) */
1389 8 : PG_exception_stack = &local_sigjmp_buf;
1390 :
1391 : /* Setup signal handling */
1392 8 : pqsignal(SIGHUP, SignalHandlerForConfigReload);
1393 8 : pqsignal(SIGINT, SignalHandlerForShutdownRequest);
1394 8 : pqsignal(SIGTERM, die);
1395 8 : pqsignal(SIGFPE, FloatExceptionHandler);
1396 8 : pqsignal(SIGUSR1, procsignal_sigusr1_handler);
1397 8 : pqsignal(SIGUSR2, SIG_IGN);
1398 8 : pqsignal(SIGPIPE, SIG_IGN);
1399 8 : pqsignal(SIGCHLD, SIG_DFL);
1400 :
1401 8 : check_and_set_sync_info(MyProcPid);
1402 :
1403 8 : ereport(LOG, errmsg("slot sync worker started"));
1404 :
1405 : /* Register it as soon as SlotSyncCtx->pid is initialized. */
1406 8 : before_shmem_exit(slotsync_worker_onexit, (Datum) 0);
1407 :
1408 : /*
1409 : * Establishes SIGALRM handler and initialize timeout module. It is needed
1410 : * by InitPostgres to register different timeouts.
1411 : */
1412 8 : InitializeTimeouts();
1413 :
1414 : /* Load the libpq-specific functions */
1415 8 : load_file("libpqwalreceiver", false);
1416 :
1417 : /*
1418 : * Unblock signals (they were blocked when the postmaster forked us)
1419 : */
1420 8 : sigprocmask(SIG_SETMASK, &UnBlockSig, NULL);
1421 :
1422 : /*
1423 : * Set always-secure search path, so malicious users can't redirect user
1424 : * code (e.g. operators).
1425 : *
1426 : * It's not strictly necessary since we won't be scanning or writing to
1427 : * any user table locally, but it's good to retain it here for added
1428 : * precaution.
1429 : */
1430 8 : SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
1431 :
1432 8 : dbname = CheckAndGetDbnameFromConninfo();
1433 :
1434 : /*
1435 : * Connect to the database specified by the user in primary_conninfo. We
1436 : * need a database connection for walrcv_exec to work which we use to
1437 : * fetch slot information from the remote node. See comments atop
1438 : * libpqrcv_exec.
1439 : *
1440 : * We do not specify a specific user here since the slot sync worker will
1441 : * operate as a superuser. This is safe because the slot sync worker does
1442 : * not interact with user tables, eliminating the risk of executing
1443 : * arbitrary code within triggers.
1444 : */
1445 8 : InitPostgres(dbname, InvalidOid, NULL, InvalidOid, 0, NULL);
1446 :
1447 8 : SetProcessingMode(NormalProcessing);
1448 :
1449 8 : initStringInfo(&app_name);
1450 8 : if (cluster_name[0])
1451 8 : appendStringInfo(&app_name, "%s_%s", cluster_name, "slotsync worker");
1452 : else
1453 0 : appendStringInfoString(&app_name, "slotsync worker");
1454 :
1455 : /*
1456 : * Establish the connection to the primary server for slot
1457 : * synchronization.
1458 : */
1459 8 : wrconn = walrcv_connect(PrimaryConnInfo, false, false, false,
1460 : app_name.data, &err);
1461 8 : pfree(app_name.data);
1462 :
1463 8 : if (!wrconn)
1464 0 : ereport(ERROR,
1465 : errcode(ERRCODE_CONNECTION_FAILURE),
1466 : errmsg("could not connect to the primary server: %s", err));
1467 :
1468 : /*
1469 : * Register the disconnection callback.
1470 : *
1471 : * XXX: This can be combined with previous cleanup registration of
1472 : * slotsync_worker_onexit() but that will need the connection to be made
1473 : * global and we want to avoid introducing global for this purpose.
1474 : */
1475 8 : before_shmem_exit(slotsync_worker_disconnect, PointerGetDatum(wrconn));
1476 :
1477 : /*
1478 : * Using the specified primary server connection, check that we are not a
1479 : * cascading standby and slot configured in 'primary_slot_name' exists on
1480 : * the primary server.
1481 : */
1482 8 : validate_remote_info(wrconn);
1483 :
1484 : /* Main loop to synchronize slots */
1485 : for (;;)
1486 18 : {
1487 26 : bool some_slot_updated = false;
1488 :
1489 26 : ProcessSlotSyncInterrupts(wrconn);
1490 :
1491 18 : some_slot_updated = synchronize_slots(wrconn);
1492 :
1493 18 : wait_for_slot_activity(some_slot_updated);
1494 : }
1495 :
1496 : /*
1497 : * The slot sync worker can't get here because it will only stop when it
1498 : * receives a SIGINT from the startup process, or when there is an error.
1499 : */
1500 : Assert(false);
1501 : }
1502 :
1503 : /*
1504 : * Update the inactive_since property for synced slots.
1505 : *
1506 : * Note that this function is currently called when we shutdown the slot
1507 : * sync machinery.
1508 : */
1509 : static void
1510 1420 : update_synced_slots_inactive_since(void)
1511 : {
1512 1420 : TimestampTz now = 0;
1513 :
1514 : /*
1515 : * We need to update inactive_since only when we are promoting standby to
1516 : * correctly interpret the inactive_since if the standby gets promoted
1517 : * without a restart. We don't want the slots to appear inactive for a
1518 : * long time after promotion if they haven't been synchronized recently.
1519 : * Whoever acquires the slot i.e.makes the slot active will reset it.
1520 : */
1521 1420 : if (!StandbyMode)
1522 1336 : return;
1523 :
1524 : /* The slot sync worker or SQL function mustn't be running by now */
1525 : Assert((SlotSyncCtx->pid == InvalidPid) && !SlotSyncCtx->syncing);
1526 :
1527 84 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1528 :
1529 900 : for (int i = 0; i < max_replication_slots; i++)
1530 : {
1531 816 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
1532 :
1533 : /* Check if it is a synchronized slot */
1534 816 : if (s->in_use && s->data.synced)
1535 : {
1536 : Assert(SlotIsLogical(s));
1537 :
1538 : /* The slot must not be acquired by any process */
1539 : Assert(s->active_pid == 0);
1540 :
1541 : /* Use the same inactive_since time for all the slots. */
1542 4 : if (now == 0)
1543 2 : now = GetCurrentTimestamp();
1544 :
1545 4 : SpinLockAcquire(&s->mutex);
1546 4 : s->inactive_since = now;
1547 4 : SpinLockRelease(&s->mutex);
1548 : }
1549 : }
1550 :
1551 84 : LWLockRelease(ReplicationSlotControlLock);
1552 : }
1553 :
1554 : /*
1555 : * Shut down the slot sync worker.
1556 : *
1557 : * This function sends signal to shutdown slot sync worker, if required. It
1558 : * also waits till the slot sync worker has exited or
1559 : * pg_sync_replication_slots() has finished.
1560 : */
1561 : void
1562 1420 : ShutDownSlotSync(void)
1563 : {
1564 : pid_t worker_pid;
1565 :
1566 1420 : SpinLockAcquire(&SlotSyncCtx->mutex);
1567 :
1568 1420 : SlotSyncCtx->stopSignaled = true;
1569 :
1570 : /*
1571 : * Return if neither the slot sync worker is running nor the function
1572 : * pg_sync_replication_slots() is executing.
1573 : */
1574 1420 : if (!SlotSyncCtx->syncing)
1575 : {
1576 1418 : SpinLockRelease(&SlotSyncCtx->mutex);
1577 1418 : update_synced_slots_inactive_since();
1578 1418 : return;
1579 : }
1580 :
1581 2 : worker_pid = SlotSyncCtx->pid;
1582 :
1583 2 : SpinLockRelease(&SlotSyncCtx->mutex);
1584 :
1585 2 : if (worker_pid != InvalidPid)
1586 2 : kill(worker_pid, SIGINT);
1587 :
1588 : /* Wait for slot sync to end */
1589 : for (;;)
1590 0 : {
1591 : int rc;
1592 :
1593 : /* Wait a bit, we don't expect to have to wait long */
1594 2 : rc = WaitLatch(MyLatch,
1595 : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
1596 : 10L, WAIT_EVENT_REPLICATION_SLOTSYNC_SHUTDOWN);
1597 :
1598 2 : if (rc & WL_LATCH_SET)
1599 : {
1600 0 : ResetLatch(MyLatch);
1601 0 : CHECK_FOR_INTERRUPTS();
1602 : }
1603 :
1604 2 : SpinLockAcquire(&SlotSyncCtx->mutex);
1605 :
1606 : /* Ensure that no process is syncing the slots. */
1607 2 : if (!SlotSyncCtx->syncing)
1608 2 : break;
1609 :
1610 0 : SpinLockRelease(&SlotSyncCtx->mutex);
1611 : }
1612 :
1613 2 : SpinLockRelease(&SlotSyncCtx->mutex);
1614 :
1615 2 : update_synced_slots_inactive_since();
1616 : }
1617 :
1618 : /*
1619 : * SlotSyncWorkerCanRestart
1620 : *
1621 : * Returns true if enough time (SLOTSYNC_RESTART_INTERVAL_SEC) has passed
1622 : * since it was launched last. Otherwise returns false.
1623 : *
1624 : * This is a safety valve to protect against continuous respawn attempts if the
1625 : * worker is dying immediately at launch. Note that since we will retry to
1626 : * launch the worker from the postmaster main loop, we will get another
1627 : * chance later.
1628 : */
1629 : bool
1630 10 : SlotSyncWorkerCanRestart(void)
1631 : {
1632 10 : time_t curtime = time(NULL);
1633 :
1634 : /* Return false if too soon since last start. */
1635 10 : if ((unsigned int) (curtime - SlotSyncCtx->last_start_time) <
1636 : (unsigned int) SLOTSYNC_RESTART_INTERVAL_SEC)
1637 2 : return false;
1638 :
1639 8 : SlotSyncCtx->last_start_time = curtime;
1640 :
1641 8 : return true;
1642 : }
1643 :
1644 : /*
1645 : * Is current process syncing replication slots?
1646 : *
1647 : * Could be either backend executing SQL function or slot sync worker.
1648 : */
1649 : bool
1650 36 : IsSyncingReplicationSlots(void)
1651 : {
1652 36 : return syncing_slots;
1653 : }
1654 :
1655 : /*
1656 : * Amount of shared memory required for slot synchronization.
1657 : */
1658 : Size
1659 5066 : SlotSyncShmemSize(void)
1660 : {
1661 5066 : return sizeof(SlotSyncCtxStruct);
1662 : }
1663 :
1664 : /*
1665 : * Allocate and initialize the shared memory of slot synchronization.
1666 : */
1667 : void
1668 1768 : SlotSyncShmemInit(void)
1669 : {
1670 1768 : Size size = SlotSyncShmemSize();
1671 : bool found;
1672 :
1673 1768 : SlotSyncCtx = (SlotSyncCtxStruct *)
1674 1768 : ShmemInitStruct("Slot Sync Data", size, &found);
1675 :
1676 1768 : if (!found)
1677 : {
1678 1768 : memset(SlotSyncCtx, 0, size);
1679 1768 : SlotSyncCtx->pid = InvalidPid;
1680 1768 : SpinLockInit(&SlotSyncCtx->mutex);
1681 : }
1682 1768 : }
1683 :
1684 : /*
1685 : * Error cleanup callback for slot sync SQL function.
1686 : */
1687 : static void
1688 2 : slotsync_failure_callback(int code, Datum arg)
1689 : {
1690 2 : WalReceiverConn *wrconn = (WalReceiverConn *) DatumGetPointer(arg);
1691 :
1692 : /*
1693 : * We need to do slots cleanup here just like WalSndErrorCleanup() does.
1694 : *
1695 : * The startup process during promotion invokes ShutDownSlotSync() which
1696 : * waits for slot sync to finish and it does that by checking the
1697 : * 'syncing' flag. Thus the SQL function must be done with slots' release
1698 : * and cleanup to avoid any dangling temporary slots or active slots
1699 : * before it marks itself as finished syncing.
1700 : */
1701 :
1702 : /* Make sure active replication slots are released */
1703 2 : if (MyReplicationSlot != NULL)
1704 0 : ReplicationSlotRelease();
1705 :
1706 : /* Also cleanup the synced temporary slots. */
1707 2 : ReplicationSlotCleanup(true);
1708 :
1709 : /*
1710 : * The set syncing_slots indicates that the process errored out without
1711 : * resetting the flag. So, we need to clean up shared memory and reset the
1712 : * flag here.
1713 : */
1714 2 : if (syncing_slots)
1715 2 : reset_syncing_flag();
1716 :
1717 2 : walrcv_disconnect(wrconn);
1718 2 : }
1719 :
1720 : /*
1721 : * Synchronize the failover enabled replication slots using the specified
1722 : * primary server connection.
1723 : */
1724 : void
1725 14 : SyncReplicationSlots(WalReceiverConn *wrconn)
1726 : {
1727 14 : PG_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn));
1728 : {
1729 14 : check_and_set_sync_info(InvalidPid);
1730 :
1731 14 : validate_remote_info(wrconn);
1732 :
1733 12 : synchronize_slots(wrconn);
1734 :
1735 : /* Cleanup the synced temporary slots */
1736 12 : ReplicationSlotCleanup(true);
1737 :
1738 : /* We are done with sync, so reset sync flag */
1739 12 : reset_syncing_flag();
1740 : }
1741 14 : PG_END_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn));
1742 12 : }
|