LCOV - code coverage report
Current view: top level - src/backend/replication - slotfuncs.c (source / functions) Hit Total Coverage
Test: PostgreSQL 19devel Lines: 306 323 94.7 %
Date: 2026-02-10 21:16:50 Functions: 15 16 93.8 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * slotfuncs.c
       4             :  *     Support functions for replication slots
       5             :  *
       6             :  * Copyright (c) 2012-2026, PostgreSQL Global Development Group
       7             :  *
       8             :  * IDENTIFICATION
       9             :  *    src/backend/replication/slotfuncs.c
      10             :  *
      11             :  *-------------------------------------------------------------------------
      12             :  */
      13             : #include "postgres.h"
      14             : 
      15             : #include "access/htup_details.h"
      16             : #include "access/xlog_internal.h"
      17             : #include "access/xlogrecovery.h"
      18             : #include "access/xlogutils.h"
      19             : #include "funcapi.h"
      20             : #include "replication/logical.h"
      21             : #include "replication/slot.h"
      22             : #include "replication/slotsync.h"
      23             : #include "storage/proc.h"
      24             : #include "utils/builtins.h"
      25             : #include "utils/guc.h"
      26             : #include "utils/pg_lsn.h"
      27             : 
      28             : /*
      29             :  * Map SlotSyncSkipReason enum values to human-readable names.
      30             :  */
      31             : static const char *SlotSyncSkipReasonNames[] = {
      32             :     [SS_SKIP_NONE] = "none",
      33             :     [SS_SKIP_WAL_NOT_FLUSHED] = "wal_not_flushed",
      34             :     [SS_SKIP_WAL_OR_ROWS_REMOVED] = "wal_or_rows_removed",
      35             :     [SS_SKIP_NO_CONSISTENT_SNAPSHOT] = "no_consistent_snapshot",
      36             :     [SS_SKIP_INVALID] = "slot_invalidated"
      37             : };
      38             : 
      39             : /*
      40             :  * Helper function for creating a new physical replication slot with
      41             :  * given arguments. Note that this function doesn't release the created
      42             :  * slot.
      43             :  *
      44             :  * If restart_lsn is a valid value, we use it without WAL reservation
      45             :  * routine. So the caller must guarantee that WAL is available.
      46             :  */
      47             : static void
      48          92 : create_physical_replication_slot(char *name, bool immediately_reserve,
      49             :                                  bool temporary, XLogRecPtr restart_lsn)
      50             : {
      51             :     Assert(!MyReplicationSlot);
      52             : 
      53             :     /* acquire replication slot, this will check for conflicting names */
      54          92 :     ReplicationSlotCreate(name, false,
      55             :                           temporary ? RS_TEMPORARY : RS_PERSISTENT, false,
      56             :                           false, false);
      57             : 
      58          92 :     if (immediately_reserve)
      59             :     {
      60             :         /* Reserve WAL as the user asked for it */
      61          44 :         if (!XLogRecPtrIsValid(restart_lsn))
      62          34 :             ReplicationSlotReserveWal();
      63             :         else
      64          10 :             MyReplicationSlot->data.restart_lsn = restart_lsn;
      65             : 
      66             :         /* Write this slot to disk */
      67          44 :         ReplicationSlotMarkDirty();
      68          44 :         ReplicationSlotSave();
      69             :     }
      70          92 : }
      71             : 
      72             : /*
      73             :  * SQL function for creating a new physical (streaming replication)
      74             :  * replication slot.
      75             :  */
      76             : Datum
      77          82 : pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
      78             : {
      79          82 :     Name        name = PG_GETARG_NAME(0);
      80          82 :     bool        immediately_reserve = PG_GETARG_BOOL(1);
      81          82 :     bool        temporary = PG_GETARG_BOOL(2);
      82             :     Datum       values[2];
      83             :     bool        nulls[2];
      84             :     TupleDesc   tupdesc;
      85             :     HeapTuple   tuple;
      86             :     Datum       result;
      87             : 
      88          82 :     if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
      89           0 :         elog(ERROR, "return type must be a row type");
      90             : 
      91          82 :     CheckSlotPermissions();
      92             : 
      93          82 :     CheckSlotRequirements();
      94             : 
      95          82 :     create_physical_replication_slot(NameStr(*name),
      96             :                                      immediately_reserve,
      97             :                                      temporary,
      98             :                                      InvalidXLogRecPtr);
      99             : 
     100          82 :     values[0] = NameGetDatum(&MyReplicationSlot->data.name);
     101          82 :     nulls[0] = false;
     102             : 
     103          82 :     if (immediately_reserve)
     104             :     {
     105          34 :         values[1] = LSNGetDatum(MyReplicationSlot->data.restart_lsn);
     106          34 :         nulls[1] = false;
     107             :     }
     108             :     else
     109          48 :         nulls[1] = true;
     110             : 
     111          82 :     tuple = heap_form_tuple(tupdesc, values, nulls);
     112          82 :     result = HeapTupleGetDatum(tuple);
     113             : 
     114          82 :     ReplicationSlotRelease();
     115             : 
     116          82 :     PG_RETURN_DATUM(result);
     117             : }
     118             : 
     119             : 
     120             : /*
     121             :  * Helper function for creating a new logical replication slot with
     122             :  * given arguments. Note that this function doesn't release the created
     123             :  * slot.
     124             :  *
     125             :  * When find_startpoint is false, the slot's confirmed_flush is not set; it's
     126             :  * caller's responsibility to ensure it's set to something sensible.
     127             :  */
     128             : static void
     129         282 : create_logical_replication_slot(char *name, char *plugin,
     130             :                                 bool temporary, bool two_phase,
     131             :                                 bool failover,
     132             :                                 XLogRecPtr restart_lsn,
     133             :                                 bool find_startpoint)
     134             : {
     135         282 :     LogicalDecodingContext *ctx = NULL;
     136             : 
     137             :     Assert(!MyReplicationSlot);
     138             : 
     139             :     /*
     140             :      * Acquire a logical decoding slot, this will check for conflicting names.
     141             :      * Initially create persistent slot as ephemeral - that allows us to
     142             :      * nicely handle errors during initialization because it'll get dropped if
     143             :      * this transaction fails. We'll make it persistent at the end. Temporary
     144             :      * slots can be created as temporary from beginning as they get dropped on
     145             :      * error as well.
     146             :      */
     147         282 :     ReplicationSlotCreate(name, true,
     148             :                           temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase,
     149             :                           failover, false);
     150             : 
     151             :     /*
     152             :      * Ensure the logical decoding is enabled before initializing the logical
     153             :      * decoding context.
     154             :      */
     155         272 :     EnsureLogicalDecodingEnabled();
     156             :     Assert(IsLogicalDecodingEnabled());
     157             : 
     158             :     /*
     159             :      * Create logical decoding context to find start point or, if we don't
     160             :      * need it, to 1) bump slot's restart_lsn and xmin 2) check plugin sanity.
     161             :      *
     162             :      * Note: when !find_startpoint this is still important, because it's at
     163             :      * this point that the output plugin is validated.
     164             :      */
     165         270 :     ctx = CreateInitDecodingContext(plugin, NIL,
     166             :                                     false,  /* just catalogs is OK */
     167             :                                     restart_lsn,
     168         270 :                                     XL_ROUTINE(.page_read = read_local_xlog_page,
     169             :                                                .segment_open = wal_segment_open,
     170             :                                                .segment_close = wal_segment_close),
     171             :                                     NULL, NULL, NULL);
     172             : 
     173             :     /*
     174             :      * If caller needs us to determine the decoding start point, do so now.
     175             :      * This might take a while.
     176             :      */
     177         264 :     if (find_startpoint)
     178         250 :         DecodingContextFindStartpoint(ctx);
     179             : 
     180             :     /* don't need the decoding context anymore */
     181         260 :     FreeDecodingContext(ctx);
     182         260 : }
     183             : 
     184             : /*
     185             :  * SQL function for creating a new logical replication slot.
     186             :  */
     187             : Datum
     188         268 : pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
     189             : {
     190         268 :     Name        name = PG_GETARG_NAME(0);
     191         268 :     Name        plugin = PG_GETARG_NAME(1);
     192         268 :     bool        temporary = PG_GETARG_BOOL(2);
     193         268 :     bool        two_phase = PG_GETARG_BOOL(3);
     194         268 :     bool        failover = PG_GETARG_BOOL(4);
     195             :     Datum       result;
     196             :     TupleDesc   tupdesc;
     197             :     HeapTuple   tuple;
     198             :     Datum       values[2];
     199             :     bool        nulls[2];
     200             : 
     201         268 :     if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
     202           0 :         elog(ERROR, "return type must be a row type");
     203             : 
     204         268 :     CheckSlotPermissions();
     205             : 
     206         266 :     CheckLogicalDecodingRequirements();
     207             : 
     208         266 :     create_logical_replication_slot(NameStr(*name),
     209         266 :                                     NameStr(*plugin),
     210             :                                     temporary,
     211             :                                     two_phase,
     212             :                                     failover,
     213             :                                     InvalidXLogRecPtr,
     214             :                                     true);
     215             : 
     216         246 :     values[0] = NameGetDatum(&MyReplicationSlot->data.name);
     217         246 :     values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush);
     218             : 
     219         246 :     memset(nulls, 0, sizeof(nulls));
     220             : 
     221         246 :     tuple = heap_form_tuple(tupdesc, values, nulls);
     222         246 :     result = HeapTupleGetDatum(tuple);
     223             : 
     224             :     /* ok, slot is now fully created, mark it as persistent if needed */
     225         246 :     if (!temporary)
     226         234 :         ReplicationSlotPersist();
     227         246 :     ReplicationSlotRelease();
     228             : 
     229         246 :     PG_RETURN_DATUM(result);
     230             : }
     231             : 
     232             : 
     233             : /*
     234             :  * SQL function for dropping a replication slot.
     235             :  */
     236             : Datum
     237         298 : pg_drop_replication_slot(PG_FUNCTION_ARGS)
     238             : {
     239         298 :     Name        name = PG_GETARG_NAME(0);
     240             : 
     241         298 :     CheckSlotPermissions();
     242             : 
     243         294 :     CheckSlotRequirements();
     244             : 
     245         294 :     ReplicationSlotDrop(NameStr(*name), true);
     246             : 
     247         282 :     PG_RETURN_VOID();
     248             : }
     249             : 
     250             : /*
     251             :  * pg_get_replication_slots - SQL SRF showing all replication slots
     252             :  * that currently exist on the database cluster.
     253             :  */
     254             : Datum
     255         816 : pg_get_replication_slots(PG_FUNCTION_ARGS)
     256             : {
     257             : #define PG_GET_REPLICATION_SLOTS_COLS 21
     258         816 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
     259             :     XLogRecPtr  currlsn;
     260             :     int         slotno;
     261             : 
     262             :     /*
     263             :      * We don't require any special permission to see this function's data
     264             :      * because nothing should be sensitive. The most critical being the slot
     265             :      * name, which shouldn't contain anything particularly sensitive.
     266             :      */
     267             : 
     268         816 :     InitMaterializedSRF(fcinfo, 0);
     269             : 
     270         816 :     currlsn = GetXLogWriteRecPtr();
     271             : 
     272         816 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     273        6776 :     for (slotno = 0; slotno < max_replication_slots; slotno++)
     274             :     {
     275        5960 :         ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[slotno];
     276             :         ReplicationSlot slot_contents;
     277             :         Datum       values[PG_GET_REPLICATION_SLOTS_COLS];
     278             :         bool        nulls[PG_GET_REPLICATION_SLOTS_COLS];
     279             :         WALAvailability walstate;
     280             :         int         i;
     281             :         ReplicationSlotInvalidationCause cause;
     282             : 
     283        5960 :         if (!slot->in_use)
     284        4586 :             continue;
     285             : 
     286             :         /* Copy slot contents while holding spinlock, then examine at leisure */
     287        1374 :         SpinLockAcquire(&slot->mutex);
     288        1374 :         slot_contents = *slot;
     289        1374 :         SpinLockRelease(&slot->mutex);
     290             : 
     291        1374 :         memset(values, 0, sizeof(values));
     292        1374 :         memset(nulls, 0, sizeof(nulls));
     293             : 
     294        1374 :         i = 0;
     295        1374 :         values[i++] = NameGetDatum(&slot_contents.data.name);
     296             : 
     297        1374 :         if (slot_contents.data.database == InvalidOid)
     298         376 :             nulls[i++] = true;
     299             :         else
     300         998 :             values[i++] = NameGetDatum(&slot_contents.data.plugin);
     301             : 
     302        1374 :         if (slot_contents.data.database == InvalidOid)
     303         376 :             values[i++] = CStringGetTextDatum("physical");
     304             :         else
     305         998 :             values[i++] = CStringGetTextDatum("logical");
     306             : 
     307        1374 :         if (slot_contents.data.database == InvalidOid)
     308         376 :             nulls[i++] = true;
     309             :         else
     310         998 :             values[i++] = ObjectIdGetDatum(slot_contents.data.database);
     311             : 
     312        1374 :         values[i++] = BoolGetDatum(slot_contents.data.persistency == RS_TEMPORARY);
     313        1374 :         values[i++] = BoolGetDatum(slot_contents.active_proc != INVALID_PROC_NUMBER);
     314             : 
     315        1374 :         if (slot_contents.active_proc != INVALID_PROC_NUMBER)
     316         448 :             values[i++] = Int32GetDatum(GetPGProcByNumber(slot_contents.active_proc)->pid);
     317             :         else
     318         926 :             nulls[i++] = true;
     319             : 
     320        1374 :         if (slot_contents.data.xmin != InvalidTransactionId)
     321         206 :             values[i++] = TransactionIdGetDatum(slot_contents.data.xmin);
     322             :         else
     323        1168 :             nulls[i++] = true;
     324             : 
     325        1374 :         if (slot_contents.data.catalog_xmin != InvalidTransactionId)
     326        1076 :             values[i++] = TransactionIdGetDatum(slot_contents.data.catalog_xmin);
     327             :         else
     328         298 :             nulls[i++] = true;
     329             : 
     330        1374 :         if (XLogRecPtrIsValid(slot_contents.data.restart_lsn))
     331        1282 :             values[i++] = LSNGetDatum(slot_contents.data.restart_lsn);
     332             :         else
     333          92 :             nulls[i++] = true;
     334             : 
     335        1374 :         if (XLogRecPtrIsValid(slot_contents.data.confirmed_flush))
     336         938 :             values[i++] = LSNGetDatum(slot_contents.data.confirmed_flush);
     337             :         else
     338         436 :             nulls[i++] = true;
     339             : 
     340             :         /*
     341             :          * If the slot has not been invalidated, test availability from
     342             :          * restart_lsn.
     343             :          */
     344        1374 :         if (slot_contents.data.invalidated != RS_INVAL_NONE)
     345          70 :             walstate = WALAVAIL_REMOVED;
     346             :         else
     347        1304 :             walstate = GetWALAvailability(slot_contents.data.restart_lsn);
     348             : 
     349        1374 :         switch (walstate)
     350             :         {
     351          84 :             case WALAVAIL_INVALID_LSN:
     352          84 :                 nulls[i++] = true;
     353          84 :                 break;
     354             : 
     355        1214 :             case WALAVAIL_RESERVED:
     356        1214 :                 values[i++] = CStringGetTextDatum("reserved");
     357        1214 :                 break;
     358             : 
     359           4 :             case WALAVAIL_EXTENDED:
     360           4 :                 values[i++] = CStringGetTextDatum("extended");
     361           4 :                 break;
     362             : 
     363           2 :             case WALAVAIL_UNRESERVED:
     364           2 :                 values[i++] = CStringGetTextDatum("unreserved");
     365           2 :                 break;
     366             : 
     367          70 :             case WALAVAIL_REMOVED:
     368             : 
     369             :                 /*
     370             :                  * If we read the restart_lsn long enough ago, maybe that file
     371             :                  * has been removed by now.  However, the walsender could have
     372             :                  * moved forward enough that it jumped to another file after
     373             :                  * we looked.  If checkpointer signalled the process to
     374             :                  * termination, then it's definitely lost; but if a process is
     375             :                  * still alive, then "unreserved" seems more appropriate.
     376             :                  *
     377             :                  * If we do change it, save the state for safe_wal_size below.
     378             :                  */
     379          70 :                 if (XLogRecPtrIsValid(slot_contents.data.restart_lsn))
     380             :                 {
     381             :                     ProcNumber  procno;
     382             : 
     383          62 :                     SpinLockAcquire(&slot->mutex);
     384          62 :                     procno = slot->active_proc;
     385          62 :                     slot_contents.data.restart_lsn = slot->data.restart_lsn;
     386          62 :                     SpinLockRelease(&slot->mutex);
     387          62 :                     if (procno != INVALID_PROC_NUMBER)
     388             :                     {
     389           0 :                         values[i++] = CStringGetTextDatum("unreserved");
     390           0 :                         walstate = WALAVAIL_UNRESERVED;
     391           0 :                         break;
     392             :                     }
     393             :                 }
     394          70 :                 values[i++] = CStringGetTextDatum("lost");
     395          70 :                 break;
     396             :         }
     397             : 
     398             :         /*
     399             :          * safe_wal_size is only computed for slots that have not been lost,
     400             :          * and only if there's a configured maximum size.
     401             :          */
     402        1374 :         if (walstate == WALAVAIL_REMOVED || max_slot_wal_keep_size_mb < 0)
     403        1364 :             nulls[i++] = true;
     404             :         else
     405             :         {
     406             :             XLogSegNo   targetSeg;
     407             :             uint64      slotKeepSegs;
     408             :             uint64      keepSegs;
     409             :             XLogSegNo   failSeg;
     410             :             XLogRecPtr  failLSN;
     411             : 
     412          10 :             XLByteToSeg(slot_contents.data.restart_lsn, targetSeg, wal_segment_size);
     413             : 
     414             :             /* determine how many segments can be kept by slots */
     415          10 :             slotKeepSegs = XLogMBVarToSegs(max_slot_wal_keep_size_mb, wal_segment_size);
     416             :             /* ditto for wal_keep_size */
     417          10 :             keepSegs = XLogMBVarToSegs(wal_keep_size_mb, wal_segment_size);
     418             : 
     419             :             /* if currpos reaches failLSN, we lose our segment */
     420          10 :             failSeg = targetSeg + Max(slotKeepSegs, keepSegs) + 1;
     421          10 :             XLogSegNoOffsetToRecPtr(failSeg, 0, wal_segment_size, failLSN);
     422             : 
     423          10 :             values[i++] = Int64GetDatum(failLSN - currlsn);
     424             :         }
     425             : 
     426        1374 :         values[i++] = BoolGetDatum(slot_contents.data.two_phase);
     427             : 
     428        1374 :         if (slot_contents.data.two_phase &&
     429          30 :             XLogRecPtrIsValid(slot_contents.data.two_phase_at))
     430          30 :             values[i++] = LSNGetDatum(slot_contents.data.two_phase_at);
     431             :         else
     432        1344 :             nulls[i++] = true;
     433             : 
     434        1374 :         if (slot_contents.inactive_since > 0)
     435         948 :             values[i++] = TimestampTzGetDatum(slot_contents.inactive_since);
     436             :         else
     437         426 :             nulls[i++] = true;
     438             : 
     439        1374 :         cause = slot_contents.data.invalidated;
     440             : 
     441        1374 :         if (SlotIsPhysical(&slot_contents))
     442         376 :             nulls[i++] = true;
     443             :         else
     444             :         {
     445             :             /*
     446             :              * rows_removed and wal_level_insufficient are the only two
     447             :              * reasons for the logical slot's conflict with recovery.
     448             :              */
     449         998 :             if (cause == RS_INVAL_HORIZON ||
     450             :                 cause == RS_INVAL_WAL_LEVEL)
     451          58 :                 values[i++] = BoolGetDatum(true);
     452             :             else
     453         940 :                 values[i++] = BoolGetDatum(false);
     454             :         }
     455             : 
     456        1374 :         if (cause == RS_INVAL_NONE)
     457        1304 :             nulls[i++] = true;
     458             :         else
     459          70 :             values[i++] = CStringGetTextDatum(GetSlotInvalidationCauseName(cause));
     460             : 
     461        1374 :         values[i++] = BoolGetDatum(slot_contents.data.failover);
     462             : 
     463        1374 :         values[i++] = BoolGetDatum(slot_contents.data.synced);
     464             : 
     465        1374 :         if (slot_contents.slotsync_skip_reason == SS_SKIP_NONE)
     466        1366 :             nulls[i++] = true;
     467             :         else
     468           8 :             values[i++] = CStringGetTextDatum(SlotSyncSkipReasonNames[slot_contents.slotsync_skip_reason]);
     469             : 
     470             :         Assert(i == PG_GET_REPLICATION_SLOTS_COLS);
     471             : 
     472        1374 :         tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
     473             :                              values, nulls);
     474             :     }
     475             : 
     476         816 :     LWLockRelease(ReplicationSlotControlLock);
     477             : 
     478         816 :     return (Datum) 0;
     479             : }
     480             : 
     481             : /*
     482             :  * Helper function for advancing our physical replication slot forward.
     483             :  *
     484             :  * The LSN position to move to is compared simply to the slot's restart_lsn,
     485             :  * knowing that any position older than that would be removed by successive
     486             :  * checkpoints.
     487             :  */
     488             : static XLogRecPtr
     489          14 : pg_physical_replication_slot_advance(XLogRecPtr moveto)
     490             : {
     491          14 :     XLogRecPtr  startlsn = MyReplicationSlot->data.restart_lsn;
     492          14 :     XLogRecPtr  retlsn = startlsn;
     493             : 
     494             :     Assert(XLogRecPtrIsValid(moveto));
     495             : 
     496          14 :     if (startlsn < moveto)
     497             :     {
     498          14 :         SpinLockAcquire(&MyReplicationSlot->mutex);
     499          14 :         MyReplicationSlot->data.restart_lsn = moveto;
     500          14 :         SpinLockRelease(&MyReplicationSlot->mutex);
     501          14 :         retlsn = moveto;
     502             : 
     503             :         /*
     504             :          * Dirty the slot so as it is written out at the next checkpoint. Note
     505             :          * that the LSN position advanced may still be lost in the event of a
     506             :          * crash, but this makes the data consistent after a clean shutdown.
     507             :          */
     508          14 :         ReplicationSlotMarkDirty();
     509             : 
     510             :         /*
     511             :          * Wake up logical walsenders holding logical failover slots after
     512             :          * updating the restart_lsn of the physical slot.
     513             :          */
     514          14 :         PhysicalWakeupLogicalWalSnd();
     515             :     }
     516             : 
     517          14 :     return retlsn;
     518             : }
     519             : 
     520             : /*
     521             :  * Advance our logical replication slot forward. See
     522             :  * LogicalSlotAdvanceAndCheckSnapState for details.
     523             :  */
     524             : static XLogRecPtr
     525          18 : pg_logical_replication_slot_advance(XLogRecPtr moveto)
     526             : {
     527          18 :     return LogicalSlotAdvanceAndCheckSnapState(moveto, NULL);
     528             : }
     529             : 
     530             : /*
     531             :  * SQL function for moving the position in a replication slot.
     532             :  */
     533             : Datum
     534          36 : pg_replication_slot_advance(PG_FUNCTION_ARGS)
     535             : {
     536          36 :     Name        slotname = PG_GETARG_NAME(0);
     537          36 :     XLogRecPtr  moveto = PG_GETARG_LSN(1);
     538             :     XLogRecPtr  endlsn;
     539             :     XLogRecPtr  minlsn;
     540             :     TupleDesc   tupdesc;
     541             :     Datum       values[2];
     542             :     bool        nulls[2];
     543             :     HeapTuple   tuple;
     544             :     Datum       result;
     545             : 
     546             :     Assert(!MyReplicationSlot);
     547             : 
     548          36 :     CheckSlotPermissions();
     549             : 
     550          36 :     if (!XLogRecPtrIsValid(moveto))
     551           2 :         ereport(ERROR,
     552             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     553             :                  errmsg("invalid target WAL LSN")));
     554             : 
     555             :     /* Build a tuple descriptor for our result type */
     556          34 :     if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
     557           0 :         elog(ERROR, "return type must be a row type");
     558             : 
     559             :     /*
     560             :      * We can't move slot past what's been flushed/replayed so clamp the
     561             :      * target position accordingly.
     562             :      */
     563          34 :     if (!RecoveryInProgress())
     564          34 :         moveto = Min(moveto, GetFlushRecPtr(NULL));
     565             :     else
     566           0 :         moveto = Min(moveto, GetXLogReplayRecPtr(NULL));
     567             : 
     568             :     /* Acquire the slot so we "own" it */
     569          34 :     ReplicationSlotAcquire(NameStr(*slotname), true, true);
     570             : 
     571             :     /* A slot whose restart_lsn has never been reserved cannot be advanced */
     572          34 :     if (!XLogRecPtrIsValid(MyReplicationSlot->data.restart_lsn))
     573           2 :         ereport(ERROR,
     574             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     575             :                  errmsg("replication slot \"%s\" cannot be advanced",
     576             :                         NameStr(*slotname)),
     577             :                  errdetail("This slot has never previously reserved WAL, or it has been invalidated.")));
     578             : 
     579             :     /*
     580             :      * Check if the slot is not moving backwards.  Physical slots rely simply
     581             :      * on restart_lsn as a minimum point, while logical slots have confirmed
     582             :      * consumption up to confirmed_flush, meaning that in both cases data
     583             :      * older than that is not available anymore.
     584             :      */
     585          32 :     if (OidIsValid(MyReplicationSlot->data.database))
     586          18 :         minlsn = MyReplicationSlot->data.confirmed_flush;
     587             :     else
     588          14 :         minlsn = MyReplicationSlot->data.restart_lsn;
     589             : 
     590          32 :     if (moveto < minlsn)
     591           0 :         ereport(ERROR,
     592             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     593             :                  errmsg("cannot advance replication slot to %X/%08X, minimum is %X/%08X",
     594             :                         LSN_FORMAT_ARGS(moveto), LSN_FORMAT_ARGS(minlsn))));
     595             : 
     596             :     /* Do the actual slot update, depending on the slot type */
     597          32 :     if (OidIsValid(MyReplicationSlot->data.database))
     598          18 :         endlsn = pg_logical_replication_slot_advance(moveto);
     599             :     else
     600          14 :         endlsn = pg_physical_replication_slot_advance(moveto);
     601             : 
     602          32 :     values[0] = NameGetDatum(&MyReplicationSlot->data.name);
     603          32 :     nulls[0] = false;
     604             : 
     605             :     /*
     606             :      * Recompute the minimum LSN and xmin across all slots to adjust with the
     607             :      * advancing potentially done.
     608             :      */
     609          32 :     ReplicationSlotsComputeRequiredXmin(false);
     610          32 :     ReplicationSlotsComputeRequiredLSN();
     611             : 
     612          32 :     ReplicationSlotRelease();
     613             : 
     614             :     /* Return the reached position. */
     615          32 :     values[1] = LSNGetDatum(endlsn);
     616          32 :     nulls[1] = false;
     617             : 
     618          32 :     tuple = heap_form_tuple(tupdesc, values, nulls);
     619          32 :     result = HeapTupleGetDatum(tuple);
     620             : 
     621          32 :     PG_RETURN_DATUM(result);
     622             : }
     623             : 
     624             : /*
     625             :  * Helper function of copying a replication slot.
     626             :  */
     627             : static Datum
     628          34 : copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
     629             : {
     630          34 :     Name        src_name = PG_GETARG_NAME(0);
     631          34 :     Name        dst_name = PG_GETARG_NAME(1);
     632          34 :     ReplicationSlot *src = NULL;
     633             :     ReplicationSlot first_slot_contents;
     634             :     ReplicationSlot second_slot_contents;
     635             :     XLogRecPtr  src_restart_lsn;
     636             :     bool        src_islogical;
     637             :     bool        temporary;
     638             :     char       *plugin;
     639             :     Datum       values[2];
     640             :     bool        nulls[2];
     641             :     Datum       result;
     642             :     TupleDesc   tupdesc;
     643             :     HeapTuple   tuple;
     644             : 
     645          34 :     if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
     646           0 :         elog(ERROR, "return type must be a row type");
     647             : 
     648          34 :     CheckSlotPermissions();
     649             : 
     650          34 :     if (logical_slot)
     651          20 :         CheckLogicalDecodingRequirements();
     652             :     else
     653          14 :         CheckSlotRequirements();
     654             : 
     655          34 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     656             : 
     657             :     /*
     658             :      * We need to prevent the source slot's reserved WAL from being removed,
     659             :      * but we don't want to lock that slot for very long, and it can advance
     660             :      * in the meantime.  So obtain the source slot's data, and create a new
     661             :      * slot using its restart_lsn.  Afterwards we lock the source slot again
     662             :      * and verify that the data we copied (name, type) has not changed
     663             :      * incompatibly.  No inconvenient WAL removal can occur once the new slot
     664             :      * is created -- but since WAL removal could have occurred before we
     665             :      * managed to create the new slot, we advance the new slot's restart_lsn
     666             :      * to the source slot's updated restart_lsn the second time we lock it.
     667             :      */
     668          40 :     for (int i = 0; i < max_replication_slots; i++)
     669             :     {
     670          40 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
     671             : 
     672          40 :         if (s->in_use && strcmp(NameStr(s->data.name), NameStr(*src_name)) == 0)
     673             :         {
     674             :             /* Copy the slot contents while holding spinlock */
     675          34 :             SpinLockAcquire(&s->mutex);
     676          34 :             first_slot_contents = *s;
     677          34 :             SpinLockRelease(&s->mutex);
     678          34 :             src = s;
     679          34 :             break;
     680             :         }
     681             :     }
     682             : 
     683          34 :     LWLockRelease(ReplicationSlotControlLock);
     684             : 
     685          34 :     if (src == NULL)
     686           0 :         ereport(ERROR,
     687             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
     688             :                  errmsg("replication slot \"%s\" does not exist", NameStr(*src_name))));
     689             : 
     690          34 :     src_islogical = SlotIsLogical(&first_slot_contents);
     691          34 :     src_restart_lsn = first_slot_contents.data.restart_lsn;
     692          34 :     temporary = (first_slot_contents.data.persistency == RS_TEMPORARY);
     693          34 :     plugin = logical_slot ? NameStr(first_slot_contents.data.plugin) : NULL;
     694             : 
     695             :     /* Check type of replication slot */
     696          34 :     if (src_islogical != logical_slot)
     697           4 :         ereport(ERROR,
     698             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     699             :                  src_islogical ?
     700             :                  errmsg("cannot copy physical replication slot \"%s\" as a logical replication slot",
     701             :                         NameStr(*src_name)) :
     702             :                  errmsg("cannot copy logical replication slot \"%s\" as a physical replication slot",
     703             :                         NameStr(*src_name))));
     704             : 
     705             :     /* Copying non-reserved slot doesn't make sense */
     706          30 :     if (!XLogRecPtrIsValid(src_restart_lsn))
     707           2 :         ereport(ERROR,
     708             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     709             :                  errmsg("cannot copy a replication slot that doesn't reserve WAL")));
     710             : 
     711             :     /* Cannot copy an invalidated replication slot */
     712          28 :     if (first_slot_contents.data.invalidated != RS_INVAL_NONE)
     713           2 :         ereport(ERROR,
     714             :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     715             :                 errmsg("cannot copy invalidated replication slot \"%s\"",
     716             :                        NameStr(*src_name)));
     717             : 
     718             :     /* Overwrite params from optional arguments */
     719          26 :     if (PG_NARGS() >= 3)
     720          12 :         temporary = PG_GETARG_BOOL(2);
     721          26 :     if (PG_NARGS() >= 4)
     722             :     {
     723             :         Assert(logical_slot);
     724           8 :         plugin = NameStr(*(PG_GETARG_NAME(3)));
     725             :     }
     726             : 
     727             :     /* Create new slot and acquire it */
     728          26 :     if (logical_slot)
     729             :     {
     730             :         /*
     731             :          * We must not try to read WAL, since we haven't reserved it yet --
     732             :          * hence pass find_startpoint false.  confirmed_flush will be set
     733             :          * below, by copying from the source slot.
     734             :          *
     735             :          * We don't copy the failover option to prevent potential issues with
     736             :          * slot synchronization. For instance, if a slot was synchronized to
     737             :          * the standby, then dropped on the primary, and immediately recreated
     738             :          * by copying from another existing slot with much earlier restart_lsn
     739             :          * and confirmed_flush_lsn, the slot synchronization would only
     740             :          * observe the LSN of the same slot moving backward. As slot
     741             :          * synchronization does not copy the restart_lsn and
     742             :          * confirmed_flush_lsn backward (see update_local_synced_slot() for
     743             :          * details), if a failover happens before the primary's slot catches
     744             :          * up, logical replication cannot continue using the synchronized slot
     745             :          * on the promoted standby because the slot retains the restart_lsn
     746             :          * and confirmed_flush_lsn that are much later than expected.
     747             :          */
     748          16 :         create_logical_replication_slot(NameStr(*dst_name),
     749             :                                         plugin,
     750             :                                         temporary,
     751             :                                         false,
     752             :                                         false,
     753             :                                         src_restart_lsn,
     754             :                                         false);
     755             :     }
     756             :     else
     757          10 :         create_physical_replication_slot(NameStr(*dst_name),
     758             :                                          true,
     759             :                                          temporary,
     760             :                                          src_restart_lsn);
     761             : 
     762             :     /*
     763             :      * Update the destination slot to current values of the source slot;
     764             :      * recheck that the source slot is still the one we saw previously.
     765             :      */
     766             :     {
     767             :         TransactionId copy_effective_xmin;
     768             :         TransactionId copy_effective_catalog_xmin;
     769             :         TransactionId copy_xmin;
     770             :         TransactionId copy_catalog_xmin;
     771             :         XLogRecPtr  copy_restart_lsn;
     772             :         XLogRecPtr  copy_confirmed_flush;
     773             :         bool        copy_islogical;
     774             :         char       *copy_name;
     775             : 
     776             :         /* Copy data of source slot again */
     777          24 :         SpinLockAcquire(&src->mutex);
     778          24 :         second_slot_contents = *src;
     779          24 :         SpinLockRelease(&src->mutex);
     780             : 
     781          24 :         copy_effective_xmin = second_slot_contents.effective_xmin;
     782          24 :         copy_effective_catalog_xmin = second_slot_contents.effective_catalog_xmin;
     783             : 
     784          24 :         copy_xmin = second_slot_contents.data.xmin;
     785          24 :         copy_catalog_xmin = second_slot_contents.data.catalog_xmin;
     786          24 :         copy_restart_lsn = second_slot_contents.data.restart_lsn;
     787          24 :         copy_confirmed_flush = second_slot_contents.data.confirmed_flush;
     788             : 
     789             :         /* for existence check */
     790          24 :         copy_name = NameStr(second_slot_contents.data.name);
     791          24 :         copy_islogical = SlotIsLogical(&second_slot_contents);
     792             : 
     793             :         /*
     794             :          * Check if the source slot still exists and is valid. We regard it as
     795             :          * invalid if the type of replication slot or name has been changed,
     796             :          * or the restart_lsn either is invalid or has gone backward. (The
     797             :          * restart_lsn could go backwards if the source slot is dropped and
     798             :          * copied from an older slot during installation.)
     799             :          *
     800             :          * Since erroring out will release and drop the destination slot we
     801             :          * don't need to release it here.
     802             :          */
     803          24 :         if (copy_restart_lsn < src_restart_lsn ||
     804          24 :             src_islogical != copy_islogical ||
     805          24 :             strcmp(copy_name, NameStr(*src_name)) != 0)
     806           0 :             ereport(ERROR,
     807             :                     (errmsg("could not copy replication slot \"%s\"",
     808             :                             NameStr(*src_name)),
     809             :                      errdetail("The source replication slot was modified incompatibly during the copy operation.")));
     810             : 
     811             :         /* The source slot must have a consistent snapshot */
     812          24 :         if (src_islogical && !XLogRecPtrIsValid(copy_confirmed_flush))
     813           0 :             ereport(ERROR,
     814             :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     815             :                      errmsg("cannot copy unfinished logical replication slot \"%s\"",
     816             :                             NameStr(*src_name)),
     817             :                      errhint("Retry when the source replication slot's confirmed_flush_lsn is valid.")));
     818             : 
     819             :         /*
     820             :          * Copying an invalid slot doesn't make sense. Note that the source
     821             :          * slot can become invalid after we create the new slot and copy the
     822             :          * data of source slot. This is possible because the operations in
     823             :          * InvalidateObsoleteReplicationSlots() are not serialized with this
     824             :          * function. Even though we can't detect such a case here, the copied
     825             :          * slot will become invalid in the next checkpoint cycle.
     826             :          */
     827          24 :         if (second_slot_contents.data.invalidated != RS_INVAL_NONE)
     828           0 :             ereport(ERROR,
     829             :                     errmsg("cannot copy replication slot \"%s\"",
     830             :                            NameStr(*src_name)),
     831             :                     errdetail("The source replication slot was invalidated during the copy operation."));
     832             : 
     833             :         /* Install copied values again */
     834          24 :         SpinLockAcquire(&MyReplicationSlot->mutex);
     835          24 :         MyReplicationSlot->effective_xmin = copy_effective_xmin;
     836          24 :         MyReplicationSlot->effective_catalog_xmin = copy_effective_catalog_xmin;
     837             : 
     838          24 :         MyReplicationSlot->data.xmin = copy_xmin;
     839          24 :         MyReplicationSlot->data.catalog_xmin = copy_catalog_xmin;
     840          24 :         MyReplicationSlot->data.restart_lsn = copy_restart_lsn;
     841          24 :         MyReplicationSlot->data.confirmed_flush = copy_confirmed_flush;
     842          24 :         SpinLockRelease(&MyReplicationSlot->mutex);
     843             : 
     844          24 :         ReplicationSlotMarkDirty();
     845          24 :         ReplicationSlotsComputeRequiredXmin(false);
     846          24 :         ReplicationSlotsComputeRequiredLSN();
     847          24 :         ReplicationSlotSave();
     848             : 
     849             : #ifdef USE_ASSERT_CHECKING
     850             :         /* Check that the restart_lsn is available */
     851             :         {
     852             :             XLogSegNo   segno;
     853             : 
     854             :             XLByteToSeg(copy_restart_lsn, segno, wal_segment_size);
     855             :             Assert(XLogGetLastRemovedSegno() < segno);
     856             :         }
     857             : #endif
     858             :     }
     859             : 
     860             :     /* target slot fully created, mark as persistent if needed */
     861          24 :     if (logical_slot && !temporary)
     862           8 :         ReplicationSlotPersist();
     863             : 
     864             :     /* All done.  Set up the return values */
     865          24 :     values[0] = NameGetDatum(dst_name);
     866          24 :     nulls[0] = false;
     867          24 :     if (XLogRecPtrIsValid(MyReplicationSlot->data.confirmed_flush))
     868             :     {
     869          14 :         values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush);
     870          14 :         nulls[1] = false;
     871             :     }
     872             :     else
     873          10 :         nulls[1] = true;
     874             : 
     875          24 :     tuple = heap_form_tuple(tupdesc, values, nulls);
     876          24 :     result = HeapTupleGetDatum(tuple);
     877             : 
     878          24 :     ReplicationSlotRelease();
     879             : 
     880          24 :     PG_RETURN_DATUM(result);
     881             : }
     882             : 
     883             : /* The wrappers below are all to appease opr_sanity */
     884             : Datum
     885           8 : pg_copy_logical_replication_slot_a(PG_FUNCTION_ARGS)
     886             : {
     887           8 :     return copy_replication_slot(fcinfo, true);
     888             : }
     889             : 
     890             : Datum
     891           0 : pg_copy_logical_replication_slot_b(PG_FUNCTION_ARGS)
     892             : {
     893           0 :     return copy_replication_slot(fcinfo, true);
     894             : }
     895             : 
     896             : Datum
     897          12 : pg_copy_logical_replication_slot_c(PG_FUNCTION_ARGS)
     898             : {
     899          12 :     return copy_replication_slot(fcinfo, true);
     900             : }
     901             : 
     902             : Datum
     903           4 : pg_copy_physical_replication_slot_a(PG_FUNCTION_ARGS)
     904             : {
     905           4 :     return copy_replication_slot(fcinfo, false);
     906             : }
     907             : 
     908             : Datum
     909          10 : pg_copy_physical_replication_slot_b(PG_FUNCTION_ARGS)
     910             : {
     911          10 :     return copy_replication_slot(fcinfo, false);
     912             : }
     913             : 
     914             : /*
     915             :  * Synchronize failover enabled replication slots to a standby server
     916             :  * from the primary server.
     917             :  */
     918             : Datum
     919          24 : pg_sync_replication_slots(PG_FUNCTION_ARGS)
     920             : {
     921             :     WalReceiverConn *wrconn;
     922             :     char       *err;
     923             :     StringInfoData app_name;
     924             : 
     925          24 :     CheckSlotPermissions();
     926             : 
     927          22 :     if (!RecoveryInProgress())
     928           2 :         ereport(ERROR,
     929             :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     930             :                 errmsg("replication slots can only be synchronized to a standby server"));
     931             : 
     932          20 :     ValidateSlotSyncParams(ERROR);
     933             : 
     934             :     /* Load the libpq-specific functions */
     935          20 :     load_file("libpqwalreceiver", false);
     936             : 
     937          20 :     (void) CheckAndGetDbnameFromConninfo();
     938             : 
     939          18 :     initStringInfo(&app_name);
     940          18 :     if (cluster_name[0])
     941          18 :         appendStringInfo(&app_name, "%s_slotsync", cluster_name);
     942             :     else
     943           0 :         appendStringInfoString(&app_name, "slotsync");
     944             : 
     945             :     /* Connect to the primary server. */
     946          18 :     wrconn = walrcv_connect(PrimaryConnInfo, false, false, false,
     947             :                             app_name.data, &err);
     948             : 
     949          18 :     if (!wrconn)
     950           0 :         ereport(ERROR,
     951             :                 errcode(ERRCODE_CONNECTION_FAILURE),
     952             :                 errmsg("synchronization worker \"%s\" could not connect to the primary server: %s",
     953             :                        app_name.data, err));
     954             : 
     955          18 :     pfree(app_name.data);
     956             : 
     957          18 :     SyncReplicationSlots(wrconn);
     958             : 
     959          16 :     walrcv_disconnect(wrconn);
     960             : 
     961          16 :     PG_RETURN_VOID();
     962             : }

Generated by: LCOV version 1.16