Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * slotfuncs.c
4 : * Support functions for replication slots
5 : *
6 : * Copyright (c) 2012-2026, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * src/backend/replication/slotfuncs.c
10 : *
11 : *-------------------------------------------------------------------------
12 : */
13 : #include "postgres.h"
14 :
15 : #include "access/htup_details.h"
16 : #include "access/xlog_internal.h"
17 : #include "access/xlogrecovery.h"
18 : #include "access/xlogutils.h"
19 : #include "funcapi.h"
20 : #include "replication/logical.h"
21 : #include "replication/slot.h"
22 : #include "replication/slotsync.h"
23 : #include "storage/proc.h"
24 : #include "utils/builtins.h"
25 : #include "utils/guc.h"
26 : #include "utils/pg_lsn.h"
27 :
28 : /*
29 : * Map SlotSyncSkipReason enum values to human-readable names.
30 : */
31 : static const char *SlotSyncSkipReasonNames[] = {
32 : [SS_SKIP_NONE] = "none",
33 : [SS_SKIP_WAL_NOT_FLUSHED] = "wal_not_flushed",
34 : [SS_SKIP_WAL_OR_ROWS_REMOVED] = "wal_or_rows_removed",
35 : [SS_SKIP_NO_CONSISTENT_SNAPSHOT] = "no_consistent_snapshot",
36 : [SS_SKIP_INVALID] = "slot_invalidated"
37 : };
38 :
39 : /*
40 : * Helper function for creating a new physical replication slot with
41 : * given arguments. Note that this function doesn't release the created
42 : * slot.
43 : *
44 : * If restart_lsn is a valid value, we use it without WAL reservation
45 : * routine. So the caller must guarantee that WAL is available.
46 : */
47 : static void
48 47 : create_physical_replication_slot(char *name, bool immediately_reserve,
49 : bool temporary, XLogRecPtr restart_lsn)
50 : {
51 : Assert(!MyReplicationSlot);
52 :
53 : /* acquire replication slot, this will check for conflicting names */
54 47 : ReplicationSlotCreate(name, false,
55 : temporary ? RS_TEMPORARY : RS_PERSISTENT, false,
56 : false, false, false);
57 :
58 47 : if (immediately_reserve)
59 : {
60 : /* Reserve WAL as the user asked for it */
61 22 : if (!XLogRecPtrIsValid(restart_lsn))
62 17 : ReplicationSlotReserveWal();
63 : else
64 5 : MyReplicationSlot->data.restart_lsn = restart_lsn;
65 :
66 : /* Write this slot to disk */
67 22 : ReplicationSlotMarkDirty();
68 22 : ReplicationSlotSave();
69 : }
70 47 : }
71 :
72 : /*
73 : * SQL function for creating a new physical (streaming replication)
74 : * replication slot.
75 : */
76 : Datum
77 42 : pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
78 : {
79 42 : Name name = PG_GETARG_NAME(0);
80 42 : bool immediately_reserve = PG_GETARG_BOOL(1);
81 42 : bool temporary = PG_GETARG_BOOL(2);
82 : Datum values[2];
83 : bool nulls[2];
84 : TupleDesc tupdesc;
85 : HeapTuple tuple;
86 : Datum result;
87 :
88 42 : if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
89 0 : elog(ERROR, "return type must be a row type");
90 :
91 42 : CheckSlotPermissions();
92 :
93 42 : CheckSlotRequirements(false);
94 :
95 42 : create_physical_replication_slot(NameStr(*name),
96 : immediately_reserve,
97 : temporary,
98 : InvalidXLogRecPtr);
99 :
100 42 : values[0] = NameGetDatum(&MyReplicationSlot->data.name);
101 42 : nulls[0] = false;
102 :
103 42 : if (immediately_reserve)
104 : {
105 17 : values[1] = LSNGetDatum(MyReplicationSlot->data.restart_lsn);
106 17 : nulls[1] = false;
107 : }
108 : else
109 25 : nulls[1] = true;
110 :
111 42 : tuple = heap_form_tuple(tupdesc, values, nulls);
112 42 : result = HeapTupleGetDatum(tuple);
113 :
114 42 : ReplicationSlotRelease();
115 :
116 42 : PG_RETURN_DATUM(result);
117 : }
118 :
119 :
120 : /*
121 : * Helper function for creating a new logical replication slot with
122 : * given arguments. Note that this function doesn't release the created
123 : * slot.
124 : *
125 : * When find_startpoint is false, the slot's confirmed_flush is not set; it's
126 : * caller's responsibility to ensure it's set to something sensible.
127 : */
128 : static void
129 142 : create_logical_replication_slot(char *name, char *plugin,
130 : bool temporary, bool two_phase,
131 : bool failover,
132 : XLogRecPtr restart_lsn,
133 : bool find_startpoint)
134 : {
135 142 : LogicalDecodingContext *ctx = NULL;
136 :
137 : Assert(!MyReplicationSlot);
138 :
139 : /*
140 : * Acquire a logical decoding slot, this will check for conflicting names.
141 : * Initially create persistent slot as ephemeral - that allows us to
142 : * nicely handle errors during initialization because it'll get dropped if
143 : * this transaction fails. We'll make it persistent at the end. Temporary
144 : * slots can be created as temporary from beginning as they get dropped on
145 : * error as well.
146 : */
147 142 : ReplicationSlotCreate(name, true,
148 : temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase,
149 : false, failover, false);
150 :
151 : /*
152 : * Ensure the logical decoding is enabled before initializing the logical
153 : * decoding context.
154 : */
155 137 : EnsureLogicalDecodingEnabled();
156 : Assert(IsLogicalDecodingEnabled());
157 :
158 : /*
159 : * Create logical decoding context to find start point or, if we don't
160 : * need it, to 1) bump slot's restart_lsn and xmin 2) check plugin sanity.
161 : *
162 : * Note: when !find_startpoint this is still important, because it's at
163 : * this point that the output plugin is validated.
164 : */
165 136 : ctx = CreateInitDecodingContext(plugin, NIL,
166 : false, /* just catalogs is OK */
167 : false, /* not repack */
168 : restart_lsn,
169 136 : XL_ROUTINE(.page_read = read_local_xlog_page,
170 : .segment_open = wal_segment_open,
171 : .segment_close = wal_segment_close),
172 : NULL, NULL, NULL);
173 :
174 : /*
175 : * If caller needs us to determine the decoding start point, do so now.
176 : * This might take a while.
177 : */
178 133 : if (find_startpoint)
179 126 : DecodingContextFindStartpoint(ctx);
180 :
181 : /* don't need the decoding context anymore */
182 131 : FreeDecodingContext(ctx);
183 131 : }
184 :
185 : /*
186 : * SQL function for creating a new logical replication slot.
187 : */
188 : Datum
189 135 : pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
190 : {
191 135 : Name name = PG_GETARG_NAME(0);
192 135 : Name plugin = PG_GETARG_NAME(1);
193 135 : bool temporary = PG_GETARG_BOOL(2);
194 135 : bool two_phase = PG_GETARG_BOOL(3);
195 135 : bool failover = PG_GETARG_BOOL(4);
196 : Datum result;
197 : TupleDesc tupdesc;
198 : HeapTuple tuple;
199 : Datum values[2];
200 : bool nulls[2];
201 :
202 135 : if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
203 0 : elog(ERROR, "return type must be a row type");
204 :
205 135 : CheckSlotPermissions();
206 :
207 134 : CheckLogicalDecodingRequirements(false);
208 :
209 134 : create_logical_replication_slot(NameStr(*name),
210 134 : NameStr(*plugin),
211 : temporary,
212 : two_phase,
213 : failover,
214 : InvalidXLogRecPtr,
215 : true);
216 :
217 124 : values[0] = NameGetDatum(&MyReplicationSlot->data.name);
218 124 : values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush);
219 :
220 124 : memset(nulls, 0, sizeof(nulls));
221 :
222 124 : tuple = heap_form_tuple(tupdesc, values, nulls);
223 124 : result = HeapTupleGetDatum(tuple);
224 :
225 : /* ok, slot is now fully created, mark it as persistent if needed */
226 124 : if (!temporary)
227 118 : ReplicationSlotPersist();
228 124 : ReplicationSlotRelease();
229 :
230 124 : PG_RETURN_DATUM(result);
231 : }
232 :
233 :
234 : /*
235 : * SQL function for dropping a replication slot.
236 : */
237 : Datum
238 149 : pg_drop_replication_slot(PG_FUNCTION_ARGS)
239 : {
240 149 : Name name = PG_GETARG_NAME(0);
241 :
242 149 : CheckSlotPermissions();
243 :
244 147 : CheckSlotRequirements(false);
245 :
246 147 : ReplicationSlotDrop(NameStr(*name), true);
247 :
248 141 : PG_RETURN_VOID();
249 : }
250 :
251 : /*
252 : * pg_get_replication_slots - SQL SRF showing all replication slots
253 : * that currently exist on the database cluster.
254 : */
255 : Datum
256 411 : pg_get_replication_slots(PG_FUNCTION_ARGS)
257 : {
258 : #define PG_GET_REPLICATION_SLOTS_COLS 21
259 411 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
260 : XLogRecPtr currlsn;
261 : int slotno;
262 :
263 : /*
264 : * We don't require any special permission to see this function's data
265 : * because nothing should be sensitive. The most critical being the slot
266 : * name, which shouldn't contain anything particularly sensitive.
267 : */
268 :
269 411 : InitMaterializedSRF(fcinfo, 0);
270 :
271 411 : currlsn = GetXLogWriteRecPtr();
272 :
273 411 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
274 5500 : for (slotno = 0; slotno < max_replication_slots + max_repack_replication_slots; slotno++)
275 : {
276 5089 : ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[slotno];
277 : ReplicationSlot slot_contents;
278 : Datum values[PG_GET_REPLICATION_SLOTS_COLS];
279 : bool nulls[PG_GET_REPLICATION_SLOTS_COLS];
280 : WALAvailability walstate;
281 : int i;
282 : ReplicationSlotInvalidationCause cause;
283 :
284 5089 : if (!slot->in_use)
285 4388 : continue;
286 :
287 : /* Copy slot contents while holding spinlock, then examine at leisure */
288 701 : SpinLockAcquire(&slot->mutex);
289 701 : slot_contents = *slot;
290 701 : SpinLockRelease(&slot->mutex);
291 :
292 701 : memset(values, 0, sizeof(values));
293 701 : memset(nulls, 0, sizeof(nulls));
294 :
295 701 : i = 0;
296 701 : values[i++] = NameGetDatum(&slot_contents.data.name);
297 :
298 701 : if (slot_contents.data.database == InvalidOid)
299 188 : nulls[i++] = true;
300 : else
301 513 : values[i++] = NameGetDatum(&slot_contents.data.plugin);
302 :
303 701 : if (slot_contents.data.database == InvalidOid)
304 188 : values[i++] = CStringGetTextDatum("physical");
305 : else
306 513 : values[i++] = CStringGetTextDatum("logical");
307 :
308 701 : if (slot_contents.data.database == InvalidOid)
309 188 : nulls[i++] = true;
310 : else
311 513 : values[i++] = ObjectIdGetDatum(slot_contents.data.database);
312 :
313 701 : values[i++] = BoolGetDatum(slot_contents.data.persistency == RS_TEMPORARY);
314 701 : values[i++] = BoolGetDatum(slot_contents.active_proc != INVALID_PROC_NUMBER);
315 :
316 701 : if (slot_contents.active_proc != INVALID_PROC_NUMBER)
317 242 : values[i++] = Int32GetDatum(GetPGProcByNumber(slot_contents.active_proc)->pid);
318 : else
319 459 : nulls[i++] = true;
320 :
321 701 : if (slot_contents.data.xmin != InvalidTransactionId)
322 102 : values[i++] = TransactionIdGetDatum(slot_contents.data.xmin);
323 : else
324 599 : nulls[i++] = true;
325 :
326 701 : if (slot_contents.data.catalog_xmin != InvalidTransactionId)
327 559 : values[i++] = TransactionIdGetDatum(slot_contents.data.catalog_xmin);
328 : else
329 142 : nulls[i++] = true;
330 :
331 701 : if (XLogRecPtrIsValid(slot_contents.data.restart_lsn))
332 667 : values[i++] = LSNGetDatum(slot_contents.data.restart_lsn);
333 : else
334 34 : nulls[i++] = true;
335 :
336 701 : if (XLogRecPtrIsValid(slot_contents.data.confirmed_flush))
337 483 : values[i++] = LSNGetDatum(slot_contents.data.confirmed_flush);
338 : else
339 218 : nulls[i++] = true;
340 :
341 : /*
342 : * If the slot has not been invalidated, test availability from
343 : * restart_lsn.
344 : */
345 701 : if (slot_contents.data.invalidated != RS_INVAL_NONE)
346 37 : walstate = WALAVAIL_REMOVED;
347 : else
348 664 : walstate = GetWALAvailability(slot_contents.data.restart_lsn);
349 :
350 701 : switch (walstate)
351 : {
352 30 : case WALAVAIL_INVALID_LSN:
353 30 : nulls[i++] = true;
354 30 : break;
355 :
356 631 : case WALAVAIL_RESERVED:
357 631 : values[i++] = CStringGetTextDatum("reserved");
358 631 : break;
359 :
360 2 : case WALAVAIL_EXTENDED:
361 2 : values[i++] = CStringGetTextDatum("extended");
362 2 : break;
363 :
364 1 : case WALAVAIL_UNRESERVED:
365 1 : values[i++] = CStringGetTextDatum("unreserved");
366 1 : break;
367 :
368 37 : case WALAVAIL_REMOVED:
369 :
370 : /*
371 : * If we read the restart_lsn long enough ago, maybe that file
372 : * has been removed by now. However, the walsender could have
373 : * moved forward enough that it jumped to another file after
374 : * we looked. If checkpointer signalled the process to
375 : * termination, then it's definitely lost; but if a process is
376 : * still alive, then "unreserved" seems more appropriate.
377 : *
378 : * If we do change it, save the state for safe_wal_size below.
379 : */
380 37 : if (XLogRecPtrIsValid(slot_contents.data.restart_lsn))
381 : {
382 : ProcNumber procno;
383 :
384 33 : SpinLockAcquire(&slot->mutex);
385 33 : procno = slot->active_proc;
386 33 : slot_contents.data.restart_lsn = slot->data.restart_lsn;
387 33 : SpinLockRelease(&slot->mutex);
388 33 : if (procno != INVALID_PROC_NUMBER)
389 : {
390 0 : values[i++] = CStringGetTextDatum("unreserved");
391 0 : walstate = WALAVAIL_UNRESERVED;
392 0 : break;
393 : }
394 : }
395 37 : values[i++] = CStringGetTextDatum("lost");
396 37 : break;
397 : }
398 :
399 : /*
400 : * safe_wal_size is only computed for slots that have not been lost,
401 : * and only if there's a configured maximum size.
402 : */
403 701 : if (walstate == WALAVAIL_REMOVED || max_slot_wal_keep_size_mb < 0)
404 696 : nulls[i++] = true;
405 : else
406 : {
407 : XLogSegNo targetSeg;
408 : uint64 slotKeepSegs;
409 : uint64 keepSegs;
410 : XLogSegNo failSeg;
411 : XLogRecPtr failLSN;
412 :
413 5 : XLByteToSeg(slot_contents.data.restart_lsn, targetSeg, wal_segment_size);
414 :
415 : /* determine how many segments can be kept by slots */
416 5 : slotKeepSegs = XLogMBVarToSegs(max_slot_wal_keep_size_mb, wal_segment_size);
417 : /* ditto for wal_keep_size */
418 5 : keepSegs = XLogMBVarToSegs(wal_keep_size_mb, wal_segment_size);
419 :
420 : /* if currpos reaches failLSN, we lose our segment */
421 5 : failSeg = targetSeg + Max(slotKeepSegs, keepSegs) + 1;
422 5 : XLogSegNoOffsetToRecPtr(failSeg, 0, wal_segment_size, failLSN);
423 :
424 5 : values[i++] = Int64GetDatum(failLSN - currlsn);
425 : }
426 :
427 701 : values[i++] = BoolGetDatum(slot_contents.data.two_phase);
428 :
429 701 : if (slot_contents.data.two_phase &&
430 16 : XLogRecPtrIsValid(slot_contents.data.two_phase_at))
431 16 : values[i++] = LSNGetDatum(slot_contents.data.two_phase_at);
432 : else
433 685 : nulls[i++] = true;
434 :
435 701 : if (slot_contents.inactive_since > 0)
436 470 : values[i++] = TimestampTzGetDatum(slot_contents.inactive_since);
437 : else
438 231 : nulls[i++] = true;
439 :
440 701 : cause = slot_contents.data.invalidated;
441 :
442 701 : if (SlotIsPhysical(&slot_contents))
443 188 : nulls[i++] = true;
444 : else
445 : {
446 : /*
447 : * rows_removed and wal_level_insufficient are the only two
448 : * reasons for the logical slot's conflict with recovery.
449 : */
450 513 : if (cause == RS_INVAL_HORIZON ||
451 : cause == RS_INVAL_WAL_LEVEL)
452 29 : values[i++] = BoolGetDatum(true);
453 : else
454 484 : values[i++] = BoolGetDatum(false);
455 : }
456 :
457 701 : if (cause == RS_INVAL_NONE)
458 664 : nulls[i++] = true;
459 : else
460 37 : values[i++] = CStringGetTextDatum(GetSlotInvalidationCauseName(cause));
461 :
462 701 : values[i++] = BoolGetDatum(slot_contents.data.failover);
463 :
464 701 : values[i++] = BoolGetDatum(slot_contents.data.synced);
465 :
466 701 : if (slot_contents.slotsync_skip_reason == SS_SKIP_NONE)
467 697 : nulls[i++] = true;
468 : else
469 4 : values[i++] = CStringGetTextDatum(SlotSyncSkipReasonNames[slot_contents.slotsync_skip_reason]);
470 :
471 : Assert(i == PG_GET_REPLICATION_SLOTS_COLS);
472 :
473 701 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
474 : values, nulls);
475 : }
476 :
477 411 : LWLockRelease(ReplicationSlotControlLock);
478 :
479 411 : return (Datum) 0;
480 : }
481 :
482 : /*
483 : * Helper function for advancing our physical replication slot forward.
484 : *
485 : * The LSN position to move to is compared simply to the slot's restart_lsn,
486 : * knowing that any position older than that would be removed by successive
487 : * checkpoints.
488 : */
489 : static XLogRecPtr
490 7 : pg_physical_replication_slot_advance(XLogRecPtr moveto)
491 : {
492 7 : XLogRecPtr startlsn = MyReplicationSlot->data.restart_lsn;
493 7 : XLogRecPtr retlsn = startlsn;
494 :
495 : Assert(XLogRecPtrIsValid(moveto));
496 :
497 7 : if (startlsn < moveto)
498 : {
499 7 : SpinLockAcquire(&MyReplicationSlot->mutex);
500 7 : MyReplicationSlot->data.restart_lsn = moveto;
501 7 : SpinLockRelease(&MyReplicationSlot->mutex);
502 7 : retlsn = moveto;
503 :
504 : /*
505 : * Dirty the slot so as it is written out at the next checkpoint. Note
506 : * that the LSN position advanced may still be lost in the event of a
507 : * crash, but this makes the data consistent after a clean shutdown.
508 : */
509 7 : ReplicationSlotMarkDirty();
510 :
511 : /*
512 : * Wake up logical walsenders holding logical failover slots after
513 : * updating the restart_lsn of the physical slot.
514 : */
515 7 : PhysicalWakeupLogicalWalSnd();
516 : }
517 :
518 7 : return retlsn;
519 : }
520 :
521 : /*
522 : * Advance our logical replication slot forward. See
523 : * LogicalSlotAdvanceAndCheckSnapState for details.
524 : */
525 : static XLogRecPtr
526 11 : pg_logical_replication_slot_advance(XLogRecPtr moveto)
527 : {
528 11 : return LogicalSlotAdvanceAndCheckSnapState(moveto, NULL);
529 : }
530 :
531 : /*
532 : * SQL function for moving the position in a replication slot.
533 : */
534 : Datum
535 21 : pg_replication_slot_advance(PG_FUNCTION_ARGS)
536 : {
537 21 : Name slotname = PG_GETARG_NAME(0);
538 21 : XLogRecPtr moveto = PG_GETARG_LSN(1);
539 : XLogRecPtr endlsn;
540 : XLogRecPtr minlsn;
541 : TupleDesc tupdesc;
542 : Datum values[2];
543 : bool nulls[2];
544 : HeapTuple tuple;
545 : Datum result;
546 :
547 : Assert(!MyReplicationSlot);
548 :
549 21 : CheckSlotPermissions();
550 :
551 21 : if (!XLogRecPtrIsValid(moveto))
552 1 : ereport(ERROR,
553 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
554 : errmsg("invalid target WAL LSN")));
555 :
556 : /* Build a tuple descriptor for our result type */
557 20 : if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
558 0 : elog(ERROR, "return type must be a row type");
559 :
560 : /*
561 : * We can't move slot past what's been flushed/replayed so clamp the
562 : * target position accordingly.
563 : */
564 20 : if (!RecoveryInProgress())
565 20 : moveto = Min(moveto, GetFlushRecPtr(NULL));
566 : else
567 0 : moveto = Min(moveto, GetXLogReplayRecPtr(NULL));
568 :
569 : /* Acquire the slot so we "own" it */
570 20 : ReplicationSlotAcquire(NameStr(*slotname), true, true);
571 :
572 : /* A slot whose restart_lsn has never been reserved cannot be advanced */
573 19 : if (!XLogRecPtrIsValid(MyReplicationSlot->data.restart_lsn))
574 1 : ereport(ERROR,
575 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
576 : errmsg("replication slot \"%s\" cannot be advanced",
577 : NameStr(*slotname)),
578 : errdetail("This slot has never previously reserved WAL, or it has been invalidated.")));
579 :
580 : /*
581 : * Check if the slot is not moving backwards. Physical slots rely simply
582 : * on restart_lsn as a minimum point, while logical slots have confirmed
583 : * consumption up to confirmed_flush, meaning that in both cases data
584 : * older than that is not available anymore.
585 : */
586 18 : if (OidIsValid(MyReplicationSlot->data.database))
587 11 : minlsn = MyReplicationSlot->data.confirmed_flush;
588 : else
589 7 : minlsn = MyReplicationSlot->data.restart_lsn;
590 :
591 18 : if (moveto < minlsn)
592 0 : ereport(ERROR,
593 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
594 : errmsg("cannot advance replication slot to %X/%08X, minimum is %X/%08X",
595 : LSN_FORMAT_ARGS(moveto), LSN_FORMAT_ARGS(minlsn))));
596 :
597 : /* Do the actual slot update, depending on the slot type */
598 18 : if (OidIsValid(MyReplicationSlot->data.database))
599 11 : endlsn = pg_logical_replication_slot_advance(moveto);
600 : else
601 7 : endlsn = pg_physical_replication_slot_advance(moveto);
602 :
603 18 : values[0] = NameGetDatum(&MyReplicationSlot->data.name);
604 18 : nulls[0] = false;
605 :
606 : /*
607 : * Recompute the minimum LSN and xmin across all slots to adjust with the
608 : * advancing potentially done.
609 : */
610 18 : ReplicationSlotsComputeRequiredXmin(false);
611 18 : ReplicationSlotsComputeRequiredLSN();
612 :
613 18 : ReplicationSlotRelease();
614 :
615 : /* Return the reached position. */
616 18 : values[1] = LSNGetDatum(endlsn);
617 18 : nulls[1] = false;
618 :
619 18 : tuple = heap_form_tuple(tupdesc, values, nulls);
620 18 : result = HeapTupleGetDatum(tuple);
621 :
622 18 : PG_RETURN_DATUM(result);
623 : }
624 :
625 : /*
626 : * Helper function of copying a replication slot.
627 : */
628 : static Datum
629 17 : copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
630 : {
631 17 : Name src_name = PG_GETARG_NAME(0);
632 17 : Name dst_name = PG_GETARG_NAME(1);
633 17 : ReplicationSlot *src = NULL;
634 : ReplicationSlot first_slot_contents;
635 : ReplicationSlot second_slot_contents;
636 : XLogRecPtr src_restart_lsn;
637 : bool src_islogical;
638 : bool temporary;
639 : char *plugin;
640 : Datum values[2];
641 : bool nulls[2];
642 : Datum result;
643 : TupleDesc tupdesc;
644 : HeapTuple tuple;
645 :
646 17 : if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
647 0 : elog(ERROR, "return type must be a row type");
648 :
649 17 : CheckSlotPermissions();
650 :
651 17 : if (logical_slot)
652 10 : CheckLogicalDecodingRequirements(false);
653 : else
654 7 : CheckSlotRequirements(false);
655 :
656 17 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
657 :
658 : /*
659 : * We need to prevent the source slot's reserved WAL from being removed,
660 : * but we don't want to lock that slot for very long, and it can advance
661 : * in the meantime. So obtain the source slot's data, and create a new
662 : * slot using its restart_lsn. Afterwards we lock the source slot again
663 : * and verify that the data we copied (name, type) has not changed
664 : * incompatibly. No inconvenient WAL removal can occur once the new slot
665 : * is created -- but since WAL removal could have occurred before we
666 : * managed to create the new slot, we advance the new slot's restart_lsn
667 : * to the source slot's updated restart_lsn the second time we lock it.
668 : */
669 20 : for (int i = 0; i < max_replication_slots + max_repack_replication_slots; i++)
670 : {
671 20 : ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
672 :
673 20 : if (s->in_use && strcmp(NameStr(s->data.name), NameStr(*src_name)) == 0)
674 : {
675 : /* Copy the slot contents while holding spinlock */
676 17 : SpinLockAcquire(&s->mutex);
677 17 : first_slot_contents = *s;
678 17 : SpinLockRelease(&s->mutex);
679 17 : src = s;
680 17 : break;
681 : }
682 : }
683 :
684 17 : LWLockRelease(ReplicationSlotControlLock);
685 :
686 17 : if (src == NULL)
687 0 : ereport(ERROR,
688 : (errcode(ERRCODE_UNDEFINED_OBJECT),
689 : errmsg("replication slot \"%s\" does not exist", NameStr(*src_name))));
690 :
691 17 : src_islogical = SlotIsLogical(&first_slot_contents);
692 17 : src_restart_lsn = first_slot_contents.data.restart_lsn;
693 17 : temporary = (first_slot_contents.data.persistency == RS_TEMPORARY);
694 17 : plugin = logical_slot ? NameStr(first_slot_contents.data.plugin) : NULL;
695 :
696 : /* Check type of replication slot */
697 17 : if (src_islogical != logical_slot)
698 2 : ereport(ERROR,
699 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
700 : src_islogical ?
701 : errmsg("cannot copy physical replication slot \"%s\" as a logical replication slot",
702 : NameStr(*src_name)) :
703 : errmsg("cannot copy logical replication slot \"%s\" as a physical replication slot",
704 : NameStr(*src_name))));
705 :
706 : /* Copying non-reserved slot doesn't make sense */
707 15 : if (!XLogRecPtrIsValid(src_restart_lsn))
708 1 : ereport(ERROR,
709 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
710 : errmsg("cannot copy a replication slot that doesn't reserve WAL")));
711 :
712 : /* Cannot copy an invalidated replication slot */
713 14 : if (first_slot_contents.data.invalidated != RS_INVAL_NONE)
714 1 : ereport(ERROR,
715 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
716 : errmsg("cannot copy invalidated replication slot \"%s\"",
717 : NameStr(*src_name)));
718 :
719 : /* Overwrite params from optional arguments */
720 13 : if (PG_NARGS() >= 3)
721 6 : temporary = PG_GETARG_BOOL(2);
722 13 : if (PG_NARGS() >= 4)
723 : {
724 : Assert(logical_slot);
725 4 : plugin = NameStr(*(PG_GETARG_NAME(3)));
726 : }
727 :
728 : /* Create new slot and acquire it */
729 13 : if (logical_slot)
730 : {
731 : /*
732 : * We must not try to read WAL, since we haven't reserved it yet --
733 : * hence pass find_startpoint false. confirmed_flush will be set
734 : * below, by copying from the source slot.
735 : *
736 : * We don't copy the failover option to prevent potential issues with
737 : * slot synchronization. For instance, if a slot was synchronized to
738 : * the standby, then dropped on the primary, and immediately recreated
739 : * by copying from another existing slot with much earlier restart_lsn
740 : * and confirmed_flush_lsn, the slot synchronization would only
741 : * observe the LSN of the same slot moving backward. As slot
742 : * synchronization does not copy the restart_lsn and
743 : * confirmed_flush_lsn backward (see update_local_synced_slot() for
744 : * details), if a failover happens before the primary's slot catches
745 : * up, logical replication cannot continue using the synchronized slot
746 : * on the promoted standby because the slot retains the restart_lsn
747 : * and confirmed_flush_lsn that are much later than expected.
748 : */
749 8 : create_logical_replication_slot(NameStr(*dst_name),
750 : plugin,
751 : temporary,
752 : false,
753 : false,
754 : src_restart_lsn,
755 : false);
756 : }
757 : else
758 5 : create_physical_replication_slot(NameStr(*dst_name),
759 : true,
760 : temporary,
761 : src_restart_lsn);
762 :
763 : /*
764 : * Update the destination slot to current values of the source slot;
765 : * recheck that the source slot is still the one we saw previously.
766 : */
767 : {
768 : TransactionId copy_effective_xmin;
769 : TransactionId copy_effective_catalog_xmin;
770 : TransactionId copy_xmin;
771 : TransactionId copy_catalog_xmin;
772 : XLogRecPtr copy_restart_lsn;
773 : XLogRecPtr copy_confirmed_flush;
774 : bool copy_islogical;
775 : char *copy_name;
776 :
777 : /* Copy data of source slot again */
778 12 : SpinLockAcquire(&src->mutex);
779 12 : second_slot_contents = *src;
780 12 : SpinLockRelease(&src->mutex);
781 :
782 12 : copy_effective_xmin = second_slot_contents.effective_xmin;
783 12 : copy_effective_catalog_xmin = second_slot_contents.effective_catalog_xmin;
784 :
785 12 : copy_xmin = second_slot_contents.data.xmin;
786 12 : copy_catalog_xmin = second_slot_contents.data.catalog_xmin;
787 12 : copy_restart_lsn = second_slot_contents.data.restart_lsn;
788 12 : copy_confirmed_flush = second_slot_contents.data.confirmed_flush;
789 :
790 : /* for existence check */
791 12 : copy_name = NameStr(second_slot_contents.data.name);
792 12 : copy_islogical = SlotIsLogical(&second_slot_contents);
793 :
794 : /*
795 : * Check if the source slot still exists and is valid. We regard it as
796 : * invalid if the type of replication slot or name has been changed,
797 : * or the restart_lsn either is invalid or has gone backward. (The
798 : * restart_lsn could go backwards if the source slot is dropped and
799 : * copied from an older slot during installation.)
800 : *
801 : * Since erroring out will release and drop the destination slot we
802 : * don't need to release it here.
803 : */
804 12 : if (copy_restart_lsn < src_restart_lsn ||
805 12 : src_islogical != copy_islogical ||
806 12 : strcmp(copy_name, NameStr(*src_name)) != 0)
807 0 : ereport(ERROR,
808 : (errmsg("could not copy replication slot \"%s\"",
809 : NameStr(*src_name)),
810 : errdetail("The source replication slot was modified incompatibly during the copy operation.")));
811 :
812 : /* The source slot must have a consistent snapshot */
813 12 : if (src_islogical && !XLogRecPtrIsValid(copy_confirmed_flush))
814 0 : ereport(ERROR,
815 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
816 : errmsg("cannot copy unfinished logical replication slot \"%s\"",
817 : NameStr(*src_name)),
818 : errhint("Retry when the source replication slot's confirmed_flush_lsn is valid.")));
819 :
820 : /*
821 : * Copying an invalid slot doesn't make sense. Note that the source
822 : * slot can become invalid after we create the new slot and copy the
823 : * data of source slot. This is possible because the operations in
824 : * InvalidateObsoleteReplicationSlots() are not serialized with this
825 : * function. Even though we can't detect such a case here, the copied
826 : * slot will become invalid in the next checkpoint cycle.
827 : */
828 12 : if (second_slot_contents.data.invalidated != RS_INVAL_NONE)
829 0 : ereport(ERROR,
830 : errmsg("cannot copy replication slot \"%s\"",
831 : NameStr(*src_name)),
832 : errdetail("The source replication slot was invalidated during the copy operation."));
833 :
834 : /* Install copied values again */
835 12 : SpinLockAcquire(&MyReplicationSlot->mutex);
836 12 : MyReplicationSlot->effective_xmin = copy_effective_xmin;
837 12 : MyReplicationSlot->effective_catalog_xmin = copy_effective_catalog_xmin;
838 :
839 12 : MyReplicationSlot->data.xmin = copy_xmin;
840 12 : MyReplicationSlot->data.catalog_xmin = copy_catalog_xmin;
841 12 : MyReplicationSlot->data.restart_lsn = copy_restart_lsn;
842 12 : MyReplicationSlot->data.confirmed_flush = copy_confirmed_flush;
843 12 : SpinLockRelease(&MyReplicationSlot->mutex);
844 :
845 12 : ReplicationSlotMarkDirty();
846 12 : ReplicationSlotsComputeRequiredXmin(false);
847 12 : ReplicationSlotsComputeRequiredLSN();
848 12 : ReplicationSlotSave();
849 :
850 : #ifdef USE_ASSERT_CHECKING
851 : /* Check that the restart_lsn is available */
852 : {
853 : XLogSegNo segno;
854 :
855 : XLByteToSeg(copy_restart_lsn, segno, wal_segment_size);
856 : Assert(XLogGetLastRemovedSegno() < segno);
857 : }
858 : #endif
859 : }
860 :
861 : /* target slot fully created, mark as persistent if needed */
862 12 : if (logical_slot && !temporary)
863 4 : ReplicationSlotPersist();
864 :
865 : /* All done. Set up the return values */
866 12 : values[0] = NameGetDatum(dst_name);
867 12 : nulls[0] = false;
868 12 : if (XLogRecPtrIsValid(MyReplicationSlot->data.confirmed_flush))
869 : {
870 7 : values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush);
871 7 : nulls[1] = false;
872 : }
873 : else
874 5 : nulls[1] = true;
875 :
876 12 : tuple = heap_form_tuple(tupdesc, values, nulls);
877 12 : result = HeapTupleGetDatum(tuple);
878 :
879 12 : ReplicationSlotRelease();
880 :
881 12 : PG_RETURN_DATUM(result);
882 : }
883 :
884 : /* The wrappers below are all to appease opr_sanity */
885 : Datum
886 4 : pg_copy_logical_replication_slot_a(PG_FUNCTION_ARGS)
887 : {
888 4 : return copy_replication_slot(fcinfo, true);
889 : }
890 :
891 : Datum
892 0 : pg_copy_logical_replication_slot_b(PG_FUNCTION_ARGS)
893 : {
894 0 : return copy_replication_slot(fcinfo, true);
895 : }
896 :
897 : Datum
898 6 : pg_copy_logical_replication_slot_c(PG_FUNCTION_ARGS)
899 : {
900 6 : return copy_replication_slot(fcinfo, true);
901 : }
902 :
903 : Datum
904 2 : pg_copy_physical_replication_slot_a(PG_FUNCTION_ARGS)
905 : {
906 2 : return copy_replication_slot(fcinfo, false);
907 : }
908 :
909 : Datum
910 5 : pg_copy_physical_replication_slot_b(PG_FUNCTION_ARGS)
911 : {
912 5 : return copy_replication_slot(fcinfo, false);
913 : }
914 :
915 : /*
916 : * Synchronize failover enabled replication slots to a standby server
917 : * from the primary server.
918 : */
919 : Datum
920 12 : pg_sync_replication_slots(PG_FUNCTION_ARGS)
921 : {
922 : WalReceiverConn *wrconn;
923 : char *err;
924 : StringInfoData app_name;
925 :
926 12 : CheckSlotPermissions();
927 :
928 11 : if (!RecoveryInProgress())
929 1 : ereport(ERROR,
930 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
931 : errmsg("replication slots can only be synchronized to a standby server"));
932 :
933 10 : ValidateSlotSyncParams(ERROR);
934 :
935 : /* Load the libpq-specific functions */
936 10 : load_file("libpqwalreceiver", false);
937 :
938 10 : (void) CheckAndGetDbnameFromConninfo();
939 :
940 9 : initStringInfo(&app_name);
941 9 : if (cluster_name[0])
942 9 : appendStringInfo(&app_name, "%s_slotsync", cluster_name);
943 : else
944 0 : appendStringInfoString(&app_name, "slotsync");
945 :
946 : /* Connect to the primary server. */
947 9 : wrconn = walrcv_connect(PrimaryConnInfo, false, false, false,
948 : app_name.data, &err);
949 :
950 9 : if (!wrconn)
951 0 : ereport(ERROR,
952 : errcode(ERRCODE_CONNECTION_FAILURE),
953 : errmsg("synchronization worker \"%s\" could not connect to the primary server: %s",
954 : app_name.data, err));
955 :
956 9 : pfree(app_name.data);
957 :
958 9 : SyncReplicationSlots(wrconn);
959 :
960 8 : walrcv_disconnect(wrconn);
961 :
962 8 : PG_RETURN_VOID();
963 : }
|