LCOV - code coverage report
Current view: top level - src/backend/replication - slotfuncs.c (source / functions) Hit Total Coverage
Test: PostgreSQL 19devel Lines: 306 323 94.7 %
Date: 2025-12-27 00:18:33 Functions: 15 16 93.8 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * slotfuncs.c
       4             :  *     Support functions for replication slots
       5             :  *
       6             :  * Copyright (c) 2012-2025, PostgreSQL Global Development Group
       7             :  *
       8             :  * IDENTIFICATION
       9             :  *    src/backend/replication/slotfuncs.c
      10             :  *
      11             :  *-------------------------------------------------------------------------
      12             :  */
      13             : #include "postgres.h"
      14             : 
      15             : #include "access/htup_details.h"
      16             : #include "access/xlog_internal.h"
      17             : #include "access/xlogrecovery.h"
      18             : #include "access/xlogutils.h"
      19             : #include "funcapi.h"
      20             : #include "replication/logical.h"
      21             : #include "replication/slot.h"
      22             : #include "replication/slotsync.h"
      23             : #include "utils/builtins.h"
      24             : #include "utils/guc.h"
      25             : #include "utils/pg_lsn.h"
      26             : 
      27             : /*
      28             :  * Map SlotSyncSkipReason enum values to human-readable names.
      29             :  */
      30             : static const char *SlotSyncSkipReasonNames[] = {
      31             :     [SS_SKIP_NONE] = "none",
      32             :     [SS_SKIP_WAL_NOT_FLUSHED] = "wal_not_flushed",
      33             :     [SS_SKIP_WAL_OR_ROWS_REMOVED] = "wal_or_rows_removed",
      34             :     [SS_SKIP_NO_CONSISTENT_SNAPSHOT] = "no_consistent_snapshot",
      35             :     [SS_SKIP_INVALID] = "slot_invalidated"
      36             : };
      37             : 
      38             : /*
      39             :  * Helper function for creating a new physical replication slot with
      40             :  * given arguments. Note that this function doesn't release the created
      41             :  * slot.
      42             :  *
      43             :  * If restart_lsn is a valid value, we use it without WAL reservation
      44             :  * routine. So the caller must guarantee that WAL is available.
      45             :  */
      46             : static void
      47          88 : create_physical_replication_slot(char *name, bool immediately_reserve,
      48             :                                  bool temporary, XLogRecPtr restart_lsn)
      49             : {
      50             :     Assert(!MyReplicationSlot);
      51             : 
      52             :     /* acquire replication slot, this will check for conflicting names */
      53          88 :     ReplicationSlotCreate(name, false,
      54             :                           temporary ? RS_TEMPORARY : RS_PERSISTENT, false,
      55             :                           false, false);
      56             : 
      57          88 :     if (immediately_reserve)
      58             :     {
      59             :         /* Reserve WAL as the user asked for it */
      60          44 :         if (!XLogRecPtrIsValid(restart_lsn))
      61          34 :             ReplicationSlotReserveWal();
      62             :         else
      63          10 :             MyReplicationSlot->data.restart_lsn = restart_lsn;
      64             : 
      65             :         /* Write this slot to disk */
      66          44 :         ReplicationSlotMarkDirty();
      67          44 :         ReplicationSlotSave();
      68             :     }
      69          88 : }
      70             : 
      71             : /*
      72             :  * SQL function for creating a new physical (streaming replication)
      73             :  * replication slot.
      74             :  */
      75             : Datum
      76          78 : pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
      77             : {
      78          78 :     Name        name = PG_GETARG_NAME(0);
      79          78 :     bool        immediately_reserve = PG_GETARG_BOOL(1);
      80          78 :     bool        temporary = PG_GETARG_BOOL(2);
      81             :     Datum       values[2];
      82             :     bool        nulls[2];
      83             :     TupleDesc   tupdesc;
      84             :     HeapTuple   tuple;
      85             :     Datum       result;
      86             : 
      87          78 :     if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
      88           0 :         elog(ERROR, "return type must be a row type");
      89             : 
      90          78 :     CheckSlotPermissions();
      91             : 
      92          78 :     CheckSlotRequirements();
      93             : 
      94          78 :     create_physical_replication_slot(NameStr(*name),
      95             :                                      immediately_reserve,
      96             :                                      temporary,
      97             :                                      InvalidXLogRecPtr);
      98             : 
      99          78 :     values[0] = NameGetDatum(&MyReplicationSlot->data.name);
     100          78 :     nulls[0] = false;
     101             : 
     102          78 :     if (immediately_reserve)
     103             :     {
     104          34 :         values[1] = LSNGetDatum(MyReplicationSlot->data.restart_lsn);
     105          34 :         nulls[1] = false;
     106             :     }
     107             :     else
     108          44 :         nulls[1] = true;
     109             : 
     110          78 :     tuple = heap_form_tuple(tupdesc, values, nulls);
     111          78 :     result = HeapTupleGetDatum(tuple);
     112             : 
     113          78 :     ReplicationSlotRelease();
     114             : 
     115          78 :     PG_RETURN_DATUM(result);
     116             : }
     117             : 
     118             : 
     119             : /*
     120             :  * Helper function for creating a new logical replication slot with
     121             :  * given arguments. Note that this function doesn't release the created
     122             :  * slot.
     123             :  *
     124             :  * When find_startpoint is false, the slot's confirmed_flush is not set; it's
     125             :  * caller's responsibility to ensure it's set to something sensible.
     126             :  */
     127             : static void
     128         278 : create_logical_replication_slot(char *name, char *plugin,
     129             :                                 bool temporary, bool two_phase,
     130             :                                 bool failover,
     131             :                                 XLogRecPtr restart_lsn,
     132             :                                 bool find_startpoint)
     133             : {
     134         278 :     LogicalDecodingContext *ctx = NULL;
     135             : 
     136             :     Assert(!MyReplicationSlot);
     137             : 
     138             :     /*
     139             :      * Acquire a logical decoding slot, this will check for conflicting names.
     140             :      * Initially create persistent slot as ephemeral - that allows us to
     141             :      * nicely handle errors during initialization because it'll get dropped if
     142             :      * this transaction fails. We'll make it persistent at the end. Temporary
     143             :      * slots can be created as temporary from beginning as they get dropped on
     144             :      * error as well.
     145             :      */
     146         278 :     ReplicationSlotCreate(name, true,
     147             :                           temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase,
     148             :                           failover, false);
     149             : 
     150             :     /*
     151             :      * Ensure the logical decoding is enabled before initializing the logical
     152             :      * decoding context.
     153             :      */
     154         268 :     EnsureLogicalDecodingEnabled();
     155             :     Assert(IsLogicalDecodingEnabled());
     156             : 
     157             :     /*
     158             :      * Create logical decoding context to find start point or, if we don't
     159             :      * need it, to 1) bump slot's restart_lsn and xmin 2) check plugin sanity.
     160             :      *
     161             :      * Note: when !find_startpoint this is still important, because it's at
     162             :      * this point that the output plugin is validated.
     163             :      */
     164         266 :     ctx = CreateInitDecodingContext(plugin, NIL,
     165             :                                     false,  /* just catalogs is OK */
     166             :                                     restart_lsn,
     167         266 :                                     XL_ROUTINE(.page_read = read_local_xlog_page,
     168             :                                                .segment_open = wal_segment_open,
     169             :                                                .segment_close = wal_segment_close),
     170             :                                     NULL, NULL, NULL);
     171             : 
     172             :     /*
     173             :      * If caller needs us to determine the decoding start point, do so now.
     174             :      * This might take a while.
     175             :      */
     176         260 :     if (find_startpoint)
     177         246 :         DecodingContextFindStartpoint(ctx);
     178             : 
     179             :     /* don't need the decoding context anymore */
     180         256 :     FreeDecodingContext(ctx);
     181         256 : }
     182             : 
     183             : /*
     184             :  * SQL function for creating a new logical replication slot.
     185             :  */
     186             : Datum
     187         264 : pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
     188             : {
     189         264 :     Name        name = PG_GETARG_NAME(0);
     190         264 :     Name        plugin = PG_GETARG_NAME(1);
     191         264 :     bool        temporary = PG_GETARG_BOOL(2);
     192         264 :     bool        two_phase = PG_GETARG_BOOL(3);
     193         264 :     bool        failover = PG_GETARG_BOOL(4);
     194             :     Datum       result;
     195             :     TupleDesc   tupdesc;
     196             :     HeapTuple   tuple;
     197             :     Datum       values[2];
     198             :     bool        nulls[2];
     199             : 
     200         264 :     if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
     201           0 :         elog(ERROR, "return type must be a row type");
     202             : 
     203         264 :     CheckSlotPermissions();
     204             : 
     205         262 :     CheckLogicalDecodingRequirements();
     206             : 
     207         262 :     create_logical_replication_slot(NameStr(*name),
     208         262 :                                     NameStr(*plugin),
     209             :                                     temporary,
     210             :                                     two_phase,
     211             :                                     failover,
     212             :                                     InvalidXLogRecPtr,
     213             :                                     true);
     214             : 
     215         242 :     values[0] = NameGetDatum(&MyReplicationSlot->data.name);
     216         242 :     values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush);
     217             : 
     218         242 :     memset(nulls, 0, sizeof(nulls));
     219             : 
     220         242 :     tuple = heap_form_tuple(tupdesc, values, nulls);
     221         242 :     result = HeapTupleGetDatum(tuple);
     222             : 
     223             :     /* ok, slot is now fully created, mark it as persistent if needed */
     224         242 :     if (!temporary)
     225         230 :         ReplicationSlotPersist();
     226         242 :     ReplicationSlotRelease();
     227             : 
     228         242 :     PG_RETURN_DATUM(result);
     229             : }
     230             : 
     231             : 
     232             : /*
     233             :  * SQL function for dropping a replication slot.
     234             :  */
     235             : Datum
     236         296 : pg_drop_replication_slot(PG_FUNCTION_ARGS)
     237             : {
     238         296 :     Name        name = PG_GETARG_NAME(0);
     239             : 
     240         296 :     CheckSlotPermissions();
     241             : 
     242         292 :     CheckSlotRequirements();
     243             : 
     244         292 :     ReplicationSlotDrop(NameStr(*name), true);
     245             : 
     246         280 :     PG_RETURN_VOID();
     247             : }
     248             : 
     249             : /*
     250             :  * pg_get_replication_slots - SQL SRF showing all replication slots
     251             :  * that currently exist on the database cluster.
     252             :  */
     253             : Datum
     254         706 : pg_get_replication_slots(PG_FUNCTION_ARGS)
     255             : {
     256             : #define PG_GET_REPLICATION_SLOTS_COLS 21
     257         706 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
     258             :     XLogRecPtr  currlsn;
     259             :     int         slotno;
     260             : 
     261             :     /*
     262             :      * We don't require any special permission to see this function's data
     263             :      * because nothing should be sensitive. The most critical being the slot
     264             :      * name, which shouldn't contain anything particularly sensitive.
     265             :      */
     266             : 
     267         706 :     InitMaterializedSRF(fcinfo, 0);
     268             : 
     269         706 :     currlsn = GetXLogWriteRecPtr();
     270             : 
     271         706 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     272        6110 :     for (slotno = 0; slotno < max_replication_slots; slotno++)
     273             :     {
     274        5404 :         ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[slotno];
     275             :         ReplicationSlot slot_contents;
     276             :         Datum       values[PG_GET_REPLICATION_SLOTS_COLS];
     277             :         bool        nulls[PG_GET_REPLICATION_SLOTS_COLS];
     278             :         WALAvailability walstate;
     279             :         int         i;
     280             :         ReplicationSlotInvalidationCause cause;
     281             : 
     282        5404 :         if (!slot->in_use)
     283        4260 :             continue;
     284             : 
     285             :         /* Copy slot contents while holding spinlock, then examine at leisure */
     286        1144 :         SpinLockAcquire(&slot->mutex);
     287        1144 :         slot_contents = *slot;
     288        1144 :         SpinLockRelease(&slot->mutex);
     289             : 
     290        1144 :         memset(values, 0, sizeof(values));
     291        1144 :         memset(nulls, 0, sizeof(nulls));
     292             : 
     293        1144 :         i = 0;
     294        1144 :         values[i++] = NameGetDatum(&slot_contents.data.name);
     295             : 
     296        1144 :         if (slot_contents.data.database == InvalidOid)
     297         340 :             nulls[i++] = true;
     298             :         else
     299         804 :             values[i++] = NameGetDatum(&slot_contents.data.plugin);
     300             : 
     301        1144 :         if (slot_contents.data.database == InvalidOid)
     302         340 :             values[i++] = CStringGetTextDatum("physical");
     303             :         else
     304         804 :             values[i++] = CStringGetTextDatum("logical");
     305             : 
     306        1144 :         if (slot_contents.data.database == InvalidOid)
     307         340 :             nulls[i++] = true;
     308             :         else
     309         804 :             values[i++] = ObjectIdGetDatum(slot_contents.data.database);
     310             : 
     311        1144 :         values[i++] = BoolGetDatum(slot_contents.data.persistency == RS_TEMPORARY);
     312        1144 :         values[i++] = BoolGetDatum(slot_contents.active_pid != 0);
     313             : 
     314        1144 :         if (slot_contents.active_pid != 0)
     315         406 :             values[i++] = Int32GetDatum(slot_contents.active_pid);
     316             :         else
     317         738 :             nulls[i++] = true;
     318             : 
     319        1144 :         if (slot_contents.data.xmin != InvalidTransactionId)
     320         184 :             values[i++] = TransactionIdGetDatum(slot_contents.data.xmin);
     321             :         else
     322         960 :             nulls[i++] = true;
     323             : 
     324        1144 :         if (slot_contents.data.catalog_xmin != InvalidTransactionId)
     325         882 :             values[i++] = TransactionIdGetDatum(slot_contents.data.catalog_xmin);
     326             :         else
     327         262 :             nulls[i++] = true;
     328             : 
     329        1144 :         if (XLogRecPtrIsValid(slot_contents.data.restart_lsn))
     330        1066 :             values[i++] = LSNGetDatum(slot_contents.data.restart_lsn);
     331             :         else
     332          78 :             nulls[i++] = true;
     333             : 
     334        1144 :         if (XLogRecPtrIsValid(slot_contents.data.confirmed_flush))
     335         746 :             values[i++] = LSNGetDatum(slot_contents.data.confirmed_flush);
     336             :         else
     337         398 :             nulls[i++] = true;
     338             : 
     339             :         /*
     340             :          * If the slot has not been invalidated, test availability from
     341             :          * restart_lsn.
     342             :          */
     343        1144 :         if (slot_contents.data.invalidated != RS_INVAL_NONE)
     344          70 :             walstate = WALAVAIL_REMOVED;
     345             :         else
     346        1074 :             walstate = GetWALAvailability(slot_contents.data.restart_lsn);
     347             : 
     348        1144 :         switch (walstate)
     349             :         {
     350          70 :             case WALAVAIL_INVALID_LSN:
     351          70 :                 nulls[i++] = true;
     352          70 :                 break;
     353             : 
     354         998 :             case WALAVAIL_RESERVED:
     355         998 :                 values[i++] = CStringGetTextDatum("reserved");
     356         998 :                 break;
     357             : 
     358           4 :             case WALAVAIL_EXTENDED:
     359           4 :                 values[i++] = CStringGetTextDatum("extended");
     360           4 :                 break;
     361             : 
     362           2 :             case WALAVAIL_UNRESERVED:
     363           2 :                 values[i++] = CStringGetTextDatum("unreserved");
     364           2 :                 break;
     365             : 
     366          70 :             case WALAVAIL_REMOVED:
     367             : 
     368             :                 /*
     369             :                  * If we read the restart_lsn long enough ago, maybe that file
     370             :                  * has been removed by now.  However, the walsender could have
     371             :                  * moved forward enough that it jumped to another file after
     372             :                  * we looked.  If checkpointer signalled the process to
     373             :                  * termination, then it's definitely lost; but if a process is
     374             :                  * still alive, then "unreserved" seems more appropriate.
     375             :                  *
     376             :                  * If we do change it, save the state for safe_wal_size below.
     377             :                  */
     378          70 :                 if (XLogRecPtrIsValid(slot_contents.data.restart_lsn))
     379             :                 {
     380             :                     int         pid;
     381             : 
     382          62 :                     SpinLockAcquire(&slot->mutex);
     383          62 :                     pid = slot->active_pid;
     384          62 :                     slot_contents.data.restart_lsn = slot->data.restart_lsn;
     385          62 :                     SpinLockRelease(&slot->mutex);
     386          62 :                     if (pid != 0)
     387             :                     {
     388           0 :                         values[i++] = CStringGetTextDatum("unreserved");
     389           0 :                         walstate = WALAVAIL_UNRESERVED;
     390           0 :                         break;
     391             :                     }
     392             :                 }
     393          70 :                 values[i++] = CStringGetTextDatum("lost");
     394          70 :                 break;
     395             :         }
     396             : 
     397             :         /*
     398             :          * safe_wal_size is only computed for slots that have not been lost,
     399             :          * and only if there's a configured maximum size.
     400             :          */
     401        1144 :         if (walstate == WALAVAIL_REMOVED || max_slot_wal_keep_size_mb < 0)
     402        1134 :             nulls[i++] = true;
     403             :         else
     404             :         {
     405             :             XLogSegNo   targetSeg;
     406             :             uint64      slotKeepSegs;
     407             :             uint64      keepSegs;
     408             :             XLogSegNo   failSeg;
     409             :             XLogRecPtr  failLSN;
     410             : 
     411          10 :             XLByteToSeg(slot_contents.data.restart_lsn, targetSeg, wal_segment_size);
     412             : 
     413             :             /* determine how many segments can be kept by slots */
     414          10 :             slotKeepSegs = XLogMBVarToSegs(max_slot_wal_keep_size_mb, wal_segment_size);
     415             :             /* ditto for wal_keep_size */
     416          10 :             keepSegs = XLogMBVarToSegs(wal_keep_size_mb, wal_segment_size);
     417             : 
     418             :             /* if currpos reaches failLSN, we lose our segment */
     419          10 :             failSeg = targetSeg + Max(slotKeepSegs, keepSegs) + 1;
     420          10 :             XLogSegNoOffsetToRecPtr(failSeg, 0, wal_segment_size, failLSN);
     421             : 
     422          10 :             values[i++] = Int64GetDatum(failLSN - currlsn);
     423             :         }
     424             : 
     425        1144 :         values[i++] = BoolGetDatum(slot_contents.data.two_phase);
     426             : 
     427        1144 :         if (slot_contents.data.two_phase &&
     428          28 :             XLogRecPtrIsValid(slot_contents.data.two_phase_at))
     429          28 :             values[i++] = LSNGetDatum(slot_contents.data.two_phase_at);
     430             :         else
     431        1116 :             nulls[i++] = true;
     432             : 
     433        1144 :         if (slot_contents.inactive_since > 0)
     434         756 :             values[i++] = TimestampTzGetDatum(slot_contents.inactive_since);
     435             :         else
     436         388 :             nulls[i++] = true;
     437             : 
     438        1144 :         cause = slot_contents.data.invalidated;
     439             : 
     440        1144 :         if (SlotIsPhysical(&slot_contents))
     441         340 :             nulls[i++] = true;
     442             :         else
     443             :         {
     444             :             /*
     445             :              * rows_removed and wal_level_insufficient are the only two
     446             :              * reasons for the logical slot's conflict with recovery.
     447             :              */
     448         804 :             if (cause == RS_INVAL_HORIZON ||
     449             :                 cause == RS_INVAL_WAL_LEVEL)
     450          58 :                 values[i++] = BoolGetDatum(true);
     451             :             else
     452         746 :                 values[i++] = BoolGetDatum(false);
     453             :         }
     454             : 
     455        1144 :         if (cause == RS_INVAL_NONE)
     456        1074 :             nulls[i++] = true;
     457             :         else
     458          70 :             values[i++] = CStringGetTextDatum(GetSlotInvalidationCauseName(cause));
     459             : 
     460        1144 :         values[i++] = BoolGetDatum(slot_contents.data.failover);
     461             : 
     462        1144 :         values[i++] = BoolGetDatum(slot_contents.data.synced);
     463             : 
     464        1144 :         if (slot_contents.slotsync_skip_reason == SS_SKIP_NONE)
     465        1140 :             nulls[i++] = true;
     466             :         else
     467           4 :             values[i++] = CStringGetTextDatum(SlotSyncSkipReasonNames[slot_contents.slotsync_skip_reason]);
     468             : 
     469             :         Assert(i == PG_GET_REPLICATION_SLOTS_COLS);
     470             : 
     471        1144 :         tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
     472             :                              values, nulls);
     473             :     }
     474             : 
     475         706 :     LWLockRelease(ReplicationSlotControlLock);
     476             : 
     477         706 :     return (Datum) 0;
     478             : }
     479             : 
     480             : /*
     481             :  * Helper function for advancing our physical replication slot forward.
     482             :  *
     483             :  * The LSN position to move to is compared simply to the slot's restart_lsn,
     484             :  * knowing that any position older than that would be removed by successive
     485             :  * checkpoints.
     486             :  */
     487             : static XLogRecPtr
     488          14 : pg_physical_replication_slot_advance(XLogRecPtr moveto)
     489             : {
     490          14 :     XLogRecPtr  startlsn = MyReplicationSlot->data.restart_lsn;
     491          14 :     XLogRecPtr  retlsn = startlsn;
     492             : 
     493             :     Assert(XLogRecPtrIsValid(moveto));
     494             : 
     495          14 :     if (startlsn < moveto)
     496             :     {
     497          14 :         SpinLockAcquire(&MyReplicationSlot->mutex);
     498          14 :         MyReplicationSlot->data.restart_lsn = moveto;
     499          14 :         SpinLockRelease(&MyReplicationSlot->mutex);
     500          14 :         retlsn = moveto;
     501             : 
     502             :         /*
     503             :          * Dirty the slot so as it is written out at the next checkpoint. Note
     504             :          * that the LSN position advanced may still be lost in the event of a
     505             :          * crash, but this makes the data consistent after a clean shutdown.
     506             :          */
     507          14 :         ReplicationSlotMarkDirty();
     508             : 
     509             :         /*
     510             :          * Wake up logical walsenders holding logical failover slots after
     511             :          * updating the restart_lsn of the physical slot.
     512             :          */
     513          14 :         PhysicalWakeupLogicalWalSnd();
     514             :     }
     515             : 
     516          14 :     return retlsn;
     517             : }
     518             : 
     519             : /*
     520             :  * Advance our logical replication slot forward. See
     521             :  * LogicalSlotAdvanceAndCheckSnapState for details.
     522             :  */
     523             : static XLogRecPtr
     524          16 : pg_logical_replication_slot_advance(XLogRecPtr moveto)
     525             : {
     526          16 :     return LogicalSlotAdvanceAndCheckSnapState(moveto, NULL);
     527             : }
     528             : 
     529             : /*
     530             :  * SQL function for moving the position in a replication slot.
     531             :  */
     532             : Datum
     533          34 : pg_replication_slot_advance(PG_FUNCTION_ARGS)
     534             : {
     535          34 :     Name        slotname = PG_GETARG_NAME(0);
     536          34 :     XLogRecPtr  moveto = PG_GETARG_LSN(1);
     537             :     XLogRecPtr  endlsn;
     538             :     XLogRecPtr  minlsn;
     539             :     TupleDesc   tupdesc;
     540             :     Datum       values[2];
     541             :     bool        nulls[2];
     542             :     HeapTuple   tuple;
     543             :     Datum       result;
     544             : 
     545             :     Assert(!MyReplicationSlot);
     546             : 
     547          34 :     CheckSlotPermissions();
     548             : 
     549          34 :     if (!XLogRecPtrIsValid(moveto))
     550           2 :         ereport(ERROR,
     551             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     552             :                  errmsg("invalid target WAL LSN")));
     553             : 
     554             :     /* Build a tuple descriptor for our result type */
     555          32 :     if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
     556           0 :         elog(ERROR, "return type must be a row type");
     557             : 
     558             :     /*
     559             :      * We can't move slot past what's been flushed/replayed so clamp the
     560             :      * target position accordingly.
     561             :      */
     562          32 :     if (!RecoveryInProgress())
     563          32 :         moveto = Min(moveto, GetFlushRecPtr(NULL));
     564             :     else
     565           0 :         moveto = Min(moveto, GetXLogReplayRecPtr(NULL));
     566             : 
     567             :     /* Acquire the slot so we "own" it */
     568          32 :     ReplicationSlotAcquire(NameStr(*slotname), true, true);
     569             : 
     570             :     /* A slot whose restart_lsn has never been reserved cannot be advanced */
     571          32 :     if (!XLogRecPtrIsValid(MyReplicationSlot->data.restart_lsn))
     572           2 :         ereport(ERROR,
     573             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     574             :                  errmsg("replication slot \"%s\" cannot be advanced",
     575             :                         NameStr(*slotname)),
     576             :                  errdetail("This slot has never previously reserved WAL, or it has been invalidated.")));
     577             : 
     578             :     /*
     579             :      * Check if the slot is not moving backwards.  Physical slots rely simply
     580             :      * on restart_lsn as a minimum point, while logical slots have confirmed
     581             :      * consumption up to confirmed_flush, meaning that in both cases data
     582             :      * older than that is not available anymore.
     583             :      */
     584          30 :     if (OidIsValid(MyReplicationSlot->data.database))
     585          16 :         minlsn = MyReplicationSlot->data.confirmed_flush;
     586             :     else
     587          14 :         minlsn = MyReplicationSlot->data.restart_lsn;
     588             : 
     589          30 :     if (moveto < minlsn)
     590           0 :         ereport(ERROR,
     591             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     592             :                  errmsg("cannot advance replication slot to %X/%08X, minimum is %X/%08X",
     593             :                         LSN_FORMAT_ARGS(moveto), LSN_FORMAT_ARGS(minlsn))));
     594             : 
     595             :     /* Do the actual slot update, depending on the slot type */
     596          30 :     if (OidIsValid(MyReplicationSlot->data.database))
     597          16 :         endlsn = pg_logical_replication_slot_advance(moveto);
     598             :     else
     599          14 :         endlsn = pg_physical_replication_slot_advance(moveto);
     600             : 
     601          30 :     values[0] = NameGetDatum(&MyReplicationSlot->data.name);
     602          30 :     nulls[0] = false;
     603             : 
     604             :     /*
     605             :      * Recompute the minimum LSN and xmin across all slots to adjust with the
     606             :      * advancing potentially done.
     607             :      */
     608          30 :     ReplicationSlotsComputeRequiredXmin(false);
     609          30 :     ReplicationSlotsComputeRequiredLSN();
     610             : 
     611          30 :     ReplicationSlotRelease();
     612             : 
     613             :     /* Return the reached position. */
     614          30 :     values[1] = LSNGetDatum(endlsn);
     615          30 :     nulls[1] = false;
     616             : 
     617          30 :     tuple = heap_form_tuple(tupdesc, values, nulls);
     618          30 :     result = HeapTupleGetDatum(tuple);
     619             : 
     620          30 :     PG_RETURN_DATUM(result);
     621             : }
     622             : 
     623             : /*
     624             :  * Helper function of copying a replication slot.
     625             :  */
     626             : static Datum
     627          34 : copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
     628             : {
     629          34 :     Name        src_name = PG_GETARG_NAME(0);
     630          34 :     Name        dst_name = PG_GETARG_NAME(1);
     631          34 :     ReplicationSlot *src = NULL;
     632             :     ReplicationSlot first_slot_contents;
     633             :     ReplicationSlot second_slot_contents;
     634             :     XLogRecPtr  src_restart_lsn;
     635             :     bool        src_islogical;
     636             :     bool        temporary;
     637             :     char       *plugin;
     638             :     Datum       values[2];
     639             :     bool        nulls[2];
     640             :     Datum       result;
     641             :     TupleDesc   tupdesc;
     642             :     HeapTuple   tuple;
     643             : 
     644          34 :     if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
     645           0 :         elog(ERROR, "return type must be a row type");
     646             : 
     647          34 :     CheckSlotPermissions();
     648             : 
     649          34 :     if (logical_slot)
     650          20 :         CheckLogicalDecodingRequirements();
     651             :     else
     652          14 :         CheckSlotRequirements();
     653             : 
     654          34 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     655             : 
     656             :     /*
     657             :      * We need to prevent the source slot's reserved WAL from being removed,
     658             :      * but we don't want to lock that slot for very long, and it can advance
     659             :      * in the meantime.  So obtain the source slot's data, and create a new
     660             :      * slot using its restart_lsn.  Afterwards we lock the source slot again
     661             :      * and verify that the data we copied (name, type) has not changed
     662             :      * incompatibly.  No inconvenient WAL removal can occur once the new slot
     663             :      * is created -- but since WAL removal could have occurred before we
     664             :      * managed to create the new slot, we advance the new slot's restart_lsn
     665             :      * to the source slot's updated restart_lsn the second time we lock it.
     666             :      */
     667          40 :     for (int i = 0; i < max_replication_slots; i++)
     668             :     {
     669          40 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
     670             : 
     671          40 :         if (s->in_use && strcmp(NameStr(s->data.name), NameStr(*src_name)) == 0)
     672             :         {
     673             :             /* Copy the slot contents while holding spinlock */
     674          34 :             SpinLockAcquire(&s->mutex);
     675          34 :             first_slot_contents = *s;
     676          34 :             SpinLockRelease(&s->mutex);
     677          34 :             src = s;
     678          34 :             break;
     679             :         }
     680             :     }
     681             : 
     682          34 :     LWLockRelease(ReplicationSlotControlLock);
     683             : 
     684          34 :     if (src == NULL)
     685           0 :         ereport(ERROR,
     686             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
     687             :                  errmsg("replication slot \"%s\" does not exist", NameStr(*src_name))));
     688             : 
     689          34 :     src_islogical = SlotIsLogical(&first_slot_contents);
     690          34 :     src_restart_lsn = first_slot_contents.data.restart_lsn;
     691          34 :     temporary = (first_slot_contents.data.persistency == RS_TEMPORARY);
     692          34 :     plugin = logical_slot ? NameStr(first_slot_contents.data.plugin) : NULL;
     693             : 
     694             :     /* Check type of replication slot */
     695          34 :     if (src_islogical != logical_slot)
     696           4 :         ereport(ERROR,
     697             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     698             :                  src_islogical ?
     699             :                  errmsg("cannot copy physical replication slot \"%s\" as a logical replication slot",
     700             :                         NameStr(*src_name)) :
     701             :                  errmsg("cannot copy logical replication slot \"%s\" as a physical replication slot",
     702             :                         NameStr(*src_name))));
     703             : 
     704             :     /* Copying non-reserved slot doesn't make sense */
     705          30 :     if (!XLogRecPtrIsValid(src_restart_lsn))
     706           2 :         ereport(ERROR,
     707             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     708             :                  errmsg("cannot copy a replication slot that doesn't reserve WAL")));
     709             : 
     710             :     /* Cannot copy an invalidated replication slot */
     711          28 :     if (first_slot_contents.data.invalidated != RS_INVAL_NONE)
     712           2 :         ereport(ERROR,
     713             :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     714             :                 errmsg("cannot copy invalidated replication slot \"%s\"",
     715             :                        NameStr(*src_name)));
     716             : 
     717             :     /* Overwrite params from optional arguments */
     718          26 :     if (PG_NARGS() >= 3)
     719          12 :         temporary = PG_GETARG_BOOL(2);
     720          26 :     if (PG_NARGS() >= 4)
     721             :     {
     722             :         Assert(logical_slot);
     723           8 :         plugin = NameStr(*(PG_GETARG_NAME(3)));
     724             :     }
     725             : 
     726             :     /* Create new slot and acquire it */
     727          26 :     if (logical_slot)
     728             :     {
     729             :         /*
     730             :          * We must not try to read WAL, since we haven't reserved it yet --
     731             :          * hence pass find_startpoint false.  confirmed_flush will be set
     732             :          * below, by copying from the source slot.
     733             :          *
     734             :          * We don't copy the failover option to prevent potential issues with
     735             :          * slot synchronization. For instance, if a slot was synchronized to
     736             :          * the standby, then dropped on the primary, and immediately recreated
     737             :          * by copying from another existing slot with much earlier restart_lsn
     738             :          * and confirmed_flush_lsn, the slot synchronization would only
     739             :          * observe the LSN of the same slot moving backward. As slot
     740             :          * synchronization does not copy the restart_lsn and
     741             :          * confirmed_flush_lsn backward (see update_local_synced_slot() for
     742             :          * details), if a failover happens before the primary's slot catches
     743             :          * up, logical replication cannot continue using the synchronized slot
     744             :          * on the promoted standby because the slot retains the restart_lsn
     745             :          * and confirmed_flush_lsn that are much later than expected.
     746             :          */
     747          16 :         create_logical_replication_slot(NameStr(*dst_name),
     748             :                                         plugin,
     749             :                                         temporary,
     750             :                                         false,
     751             :                                         false,
     752             :                                         src_restart_lsn,
     753             :                                         false);
     754             :     }
     755             :     else
     756          10 :         create_physical_replication_slot(NameStr(*dst_name),
     757             :                                          true,
     758             :                                          temporary,
     759             :                                          src_restart_lsn);
     760             : 
     761             :     /*
     762             :      * Update the destination slot to current values of the source slot;
     763             :      * recheck that the source slot is still the one we saw previously.
     764             :      */
     765             :     {
     766             :         TransactionId copy_effective_xmin;
     767             :         TransactionId copy_effective_catalog_xmin;
     768             :         TransactionId copy_xmin;
     769             :         TransactionId copy_catalog_xmin;
     770             :         XLogRecPtr  copy_restart_lsn;
     771             :         XLogRecPtr  copy_confirmed_flush;
     772             :         bool        copy_islogical;
     773             :         char       *copy_name;
     774             : 
     775             :         /* Copy data of source slot again */
     776          24 :         SpinLockAcquire(&src->mutex);
     777          24 :         second_slot_contents = *src;
     778          24 :         SpinLockRelease(&src->mutex);
     779             : 
     780          24 :         copy_effective_xmin = second_slot_contents.effective_xmin;
     781          24 :         copy_effective_catalog_xmin = second_slot_contents.effective_catalog_xmin;
     782             : 
     783          24 :         copy_xmin = second_slot_contents.data.xmin;
     784          24 :         copy_catalog_xmin = second_slot_contents.data.catalog_xmin;
     785          24 :         copy_restart_lsn = second_slot_contents.data.restart_lsn;
     786          24 :         copy_confirmed_flush = second_slot_contents.data.confirmed_flush;
     787             : 
     788             :         /* for existence check */
     789          24 :         copy_name = NameStr(second_slot_contents.data.name);
     790          24 :         copy_islogical = SlotIsLogical(&second_slot_contents);
     791             : 
     792             :         /*
     793             :          * Check if the source slot still exists and is valid. We regard it as
     794             :          * invalid if the type of replication slot or name has been changed,
     795             :          * or the restart_lsn either is invalid or has gone backward. (The
     796             :          * restart_lsn could go backwards if the source slot is dropped and
     797             :          * copied from an older slot during installation.)
     798             :          *
     799             :          * Since erroring out will release and drop the destination slot we
     800             :          * don't need to release it here.
     801             :          */
     802          24 :         if (copy_restart_lsn < src_restart_lsn ||
     803          24 :             src_islogical != copy_islogical ||
     804          24 :             strcmp(copy_name, NameStr(*src_name)) != 0)
     805           0 :             ereport(ERROR,
     806             :                     (errmsg("could not copy replication slot \"%s\"",
     807             :                             NameStr(*src_name)),
     808             :                      errdetail("The source replication slot was modified incompatibly during the copy operation.")));
     809             : 
     810             :         /* The source slot must have a consistent snapshot */
     811          24 :         if (src_islogical && !XLogRecPtrIsValid(copy_confirmed_flush))
     812           0 :             ereport(ERROR,
     813             :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     814             :                      errmsg("cannot copy unfinished logical replication slot \"%s\"",
     815             :                             NameStr(*src_name)),
     816             :                      errhint("Retry when the source replication slot's confirmed_flush_lsn is valid.")));
     817             : 
     818             :         /*
     819             :          * Copying an invalid slot doesn't make sense. Note that the source
     820             :          * slot can become invalid after we create the new slot and copy the
     821             :          * data of source slot. This is possible because the operations in
     822             :          * InvalidateObsoleteReplicationSlots() are not serialized with this
     823             :          * function. Even though we can't detect such a case here, the copied
     824             :          * slot will become invalid in the next checkpoint cycle.
     825             :          */
     826          24 :         if (second_slot_contents.data.invalidated != RS_INVAL_NONE)
     827           0 :             ereport(ERROR,
     828             :                     errmsg("cannot copy replication slot \"%s\"",
     829             :                            NameStr(*src_name)),
     830             :                     errdetail("The source replication slot was invalidated during the copy operation."));
     831             : 
     832             :         /* Install copied values again */
     833          24 :         SpinLockAcquire(&MyReplicationSlot->mutex);
     834          24 :         MyReplicationSlot->effective_xmin = copy_effective_xmin;
     835          24 :         MyReplicationSlot->effective_catalog_xmin = copy_effective_catalog_xmin;
     836             : 
     837          24 :         MyReplicationSlot->data.xmin = copy_xmin;
     838          24 :         MyReplicationSlot->data.catalog_xmin = copy_catalog_xmin;
     839          24 :         MyReplicationSlot->data.restart_lsn = copy_restart_lsn;
     840          24 :         MyReplicationSlot->data.confirmed_flush = copy_confirmed_flush;
     841          24 :         SpinLockRelease(&MyReplicationSlot->mutex);
     842             : 
     843          24 :         ReplicationSlotMarkDirty();
     844          24 :         ReplicationSlotsComputeRequiredXmin(false);
     845          24 :         ReplicationSlotsComputeRequiredLSN();
     846          24 :         ReplicationSlotSave();
     847             : 
     848             : #ifdef USE_ASSERT_CHECKING
     849             :         /* Check that the restart_lsn is available */
     850             :         {
     851             :             XLogSegNo   segno;
     852             : 
     853             :             XLByteToSeg(copy_restart_lsn, segno, wal_segment_size);
     854             :             Assert(XLogGetLastRemovedSegno() < segno);
     855             :         }
     856             : #endif
     857             :     }
     858             : 
     859             :     /* target slot fully created, mark as persistent if needed */
     860          24 :     if (logical_slot && !temporary)
     861           8 :         ReplicationSlotPersist();
     862             : 
     863             :     /* All done.  Set up the return values */
     864          24 :     values[0] = NameGetDatum(dst_name);
     865          24 :     nulls[0] = false;
     866          24 :     if (XLogRecPtrIsValid(MyReplicationSlot->data.confirmed_flush))
     867             :     {
     868          14 :         values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush);
     869          14 :         nulls[1] = false;
     870             :     }
     871             :     else
     872          10 :         nulls[1] = true;
     873             : 
     874          24 :     tuple = heap_form_tuple(tupdesc, values, nulls);
     875          24 :     result = HeapTupleGetDatum(tuple);
     876             : 
     877          24 :     ReplicationSlotRelease();
     878             : 
     879          24 :     PG_RETURN_DATUM(result);
     880             : }
     881             : 
     882             : /* The wrappers below are all to appease opr_sanity */
     883             : Datum
     884           8 : pg_copy_logical_replication_slot_a(PG_FUNCTION_ARGS)
     885             : {
     886           8 :     return copy_replication_slot(fcinfo, true);
     887             : }
     888             : 
     889             : Datum
     890           0 : pg_copy_logical_replication_slot_b(PG_FUNCTION_ARGS)
     891             : {
     892           0 :     return copy_replication_slot(fcinfo, true);
     893             : }
     894             : 
     895             : Datum
     896          12 : pg_copy_logical_replication_slot_c(PG_FUNCTION_ARGS)
     897             : {
     898          12 :     return copy_replication_slot(fcinfo, true);
     899             : }
     900             : 
     901             : Datum
     902           4 : pg_copy_physical_replication_slot_a(PG_FUNCTION_ARGS)
     903             : {
     904           4 :     return copy_replication_slot(fcinfo, false);
     905             : }
     906             : 
     907             : Datum
     908          10 : pg_copy_physical_replication_slot_b(PG_FUNCTION_ARGS)
     909             : {
     910          10 :     return copy_replication_slot(fcinfo, false);
     911             : }
     912             : 
     913             : /*
     914             :  * Synchronize failover enabled replication slots to a standby server
     915             :  * from the primary server.
     916             :  */
     917             : Datum
     918          24 : pg_sync_replication_slots(PG_FUNCTION_ARGS)
     919             : {
     920             :     WalReceiverConn *wrconn;
     921             :     char       *err;
     922             :     StringInfoData app_name;
     923             : 
     924          24 :     CheckSlotPermissions();
     925             : 
     926          22 :     if (!RecoveryInProgress())
     927           2 :         ereport(ERROR,
     928             :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     929             :                 errmsg("replication slots can only be synchronized to a standby server"));
     930             : 
     931          20 :     ValidateSlotSyncParams(ERROR);
     932             : 
     933             :     /* Load the libpq-specific functions */
     934          20 :     load_file("libpqwalreceiver", false);
     935             : 
     936          20 :     (void) CheckAndGetDbnameFromConninfo();
     937             : 
     938          18 :     initStringInfo(&app_name);
     939          18 :     if (cluster_name[0])
     940          18 :         appendStringInfo(&app_name, "%s_slotsync", cluster_name);
     941             :     else
     942           0 :         appendStringInfoString(&app_name, "slotsync");
     943             : 
     944             :     /* Connect to the primary server. */
     945          18 :     wrconn = walrcv_connect(PrimaryConnInfo, false, false, false,
     946             :                             app_name.data, &err);
     947             : 
     948          18 :     if (!wrconn)
     949           0 :         ereport(ERROR,
     950             :                 errcode(ERRCODE_CONNECTION_FAILURE),
     951             :                 errmsg("synchronization worker \"%s\" could not connect to the primary server: %s",
     952             :                        app_name.data, err));
     953             : 
     954          18 :     pfree(app_name.data);
     955             : 
     956          18 :     SyncReplicationSlots(wrconn);
     957             : 
     958          16 :     walrcv_disconnect(wrconn);
     959             : 
     960          16 :     PG_RETURN_VOID();
     961             : }

Generated by: LCOV version 1.16