LCOV - code coverage report
Current view: top level - src/backend/replication - slotfuncs.c (source / functions) Hit Total Coverage
Test: PostgreSQL 19devel Lines: 305 322 94.7 %
Date: 2025-12-02 21:18:45 Functions: 15 16 93.8 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * slotfuncs.c
       4             :  *     Support functions for replication slots
       5             :  *
       6             :  * Copyright (c) 2012-2025, PostgreSQL Global Development Group
       7             :  *
       8             :  * IDENTIFICATION
       9             :  *    src/backend/replication/slotfuncs.c
      10             :  *
      11             :  *-------------------------------------------------------------------------
      12             :  */
      13             : #include "postgres.h"
      14             : 
      15             : #include "access/htup_details.h"
      16             : #include "access/xlog_internal.h"
      17             : #include "access/xlogrecovery.h"
      18             : #include "access/xlogutils.h"
      19             : #include "funcapi.h"
      20             : #include "replication/logical.h"
      21             : #include "replication/slot.h"
      22             : #include "replication/slotsync.h"
      23             : #include "utils/builtins.h"
      24             : #include "utils/guc.h"
      25             : #include "utils/pg_lsn.h"
      26             : 
      27             : /*
      28             :  * Map SlotSyncSkipReason enum values to human-readable names.
      29             :  */
      30             : static const char *SlotSyncSkipReasonNames[] = {
      31             :     [SS_SKIP_NONE] = "none",
      32             :     [SS_SKIP_WAL_NOT_FLUSHED] = "wal_not_flushed",
      33             :     [SS_SKIP_WAL_OR_ROWS_REMOVED] = "wal_or_rows_removed",
      34             :     [SS_SKIP_NO_CONSISTENT_SNAPSHOT] = "no_consistent_snapshot",
      35             :     [SS_SKIP_INVALID] = "slot_invalidated"
      36             : };
      37             : 
      38             : /*
      39             :  * Helper function for creating a new physical replication slot with
      40             :  * given arguments. Note that this function doesn't release the created
      41             :  * slot.
      42             :  *
      43             :  * If restart_lsn is a valid value, we use it without WAL reservation
      44             :  * routine. So the caller must guarantee that WAL is available.
      45             :  */
      46             : static void
      47          86 : create_physical_replication_slot(char *name, bool immediately_reserve,
      48             :                                  bool temporary, XLogRecPtr restart_lsn)
      49             : {
      50             :     Assert(!MyReplicationSlot);
      51             : 
      52             :     /* acquire replication slot, this will check for conflicting names */
      53          86 :     ReplicationSlotCreate(name, false,
      54             :                           temporary ? RS_TEMPORARY : RS_PERSISTENT, false,
      55             :                           false, false);
      56             : 
      57          86 :     if (immediately_reserve)
      58             :     {
      59             :         /* Reserve WAL as the user asked for it */
      60          44 :         if (!XLogRecPtrIsValid(restart_lsn))
      61          34 :             ReplicationSlotReserveWal();
      62             :         else
      63          10 :             MyReplicationSlot->data.restart_lsn = restart_lsn;
      64             : 
      65             :         /* Write this slot to disk */
      66          44 :         ReplicationSlotMarkDirty();
      67          44 :         ReplicationSlotSave();
      68             :     }
      69          86 : }
      70             : 
      71             : /*
      72             :  * SQL function for creating a new physical (streaming replication)
      73             :  * replication slot.
      74             :  */
      75             : Datum
      76          76 : pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
      77             : {
      78          76 :     Name        name = PG_GETARG_NAME(0);
      79          76 :     bool        immediately_reserve = PG_GETARG_BOOL(1);
      80          76 :     bool        temporary = PG_GETARG_BOOL(2);
      81             :     Datum       values[2];
      82             :     bool        nulls[2];
      83             :     TupleDesc   tupdesc;
      84             :     HeapTuple   tuple;
      85             :     Datum       result;
      86             : 
      87          76 :     if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
      88           0 :         elog(ERROR, "return type must be a row type");
      89             : 
      90          76 :     CheckSlotPermissions();
      91             : 
      92          76 :     CheckSlotRequirements();
      93             : 
      94          76 :     create_physical_replication_slot(NameStr(*name),
      95             :                                      immediately_reserve,
      96             :                                      temporary,
      97             :                                      InvalidXLogRecPtr);
      98             : 
      99          76 :     values[0] = NameGetDatum(&MyReplicationSlot->data.name);
     100          76 :     nulls[0] = false;
     101             : 
     102          76 :     if (immediately_reserve)
     103             :     {
     104          34 :         values[1] = LSNGetDatum(MyReplicationSlot->data.restart_lsn);
     105          34 :         nulls[1] = false;
     106             :     }
     107             :     else
     108          42 :         nulls[1] = true;
     109             : 
     110          76 :     tuple = heap_form_tuple(tupdesc, values, nulls);
     111          76 :     result = HeapTupleGetDatum(tuple);
     112             : 
     113          76 :     ReplicationSlotRelease();
     114             : 
     115          76 :     PG_RETURN_DATUM(result);
     116             : }
     117             : 
     118             : 
     119             : /*
     120             :  * Helper function for creating a new logical replication slot with
     121             :  * given arguments. Note that this function doesn't release the created
     122             :  * slot.
     123             :  *
     124             :  * When find_startpoint is false, the slot's confirmed_flush is not set; it's
     125             :  * caller's responsibility to ensure it's set to something sensible.
     126             :  */
     127             : static void
     128         264 : create_logical_replication_slot(char *name, char *plugin,
     129             :                                 bool temporary, bool two_phase,
     130             :                                 bool failover,
     131             :                                 XLogRecPtr restart_lsn,
     132             :                                 bool find_startpoint)
     133             : {
     134         264 :     LogicalDecodingContext *ctx = NULL;
     135             : 
     136             :     Assert(!MyReplicationSlot);
     137             : 
     138             :     /*
     139             :      * Acquire a logical decoding slot, this will check for conflicting names.
     140             :      * Initially create persistent slot as ephemeral - that allows us to
     141             :      * nicely handle errors during initialization because it'll get dropped if
     142             :      * this transaction fails. We'll make it persistent at the end. Temporary
     143             :      * slots can be created as temporary from beginning as they get dropped on
     144             :      * error as well.
     145             :      */
     146         264 :     ReplicationSlotCreate(name, true,
     147             :                           temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase,
     148             :                           failover, false);
     149             : 
     150             :     /*
     151             :      * Create logical decoding context to find start point or, if we don't
     152             :      * need it, to 1) bump slot's restart_lsn and xmin 2) check plugin sanity.
     153             :      *
     154             :      * Note: when !find_startpoint this is still important, because it's at
     155             :      * this point that the output plugin is validated.
     156             :      */
     157         254 :     ctx = CreateInitDecodingContext(plugin, NIL,
     158             :                                     false,  /* just catalogs is OK */
     159             :                                     restart_lsn,
     160         254 :                                     XL_ROUTINE(.page_read = read_local_xlog_page,
     161             :                                                .segment_open = wal_segment_open,
     162             :                                                .segment_close = wal_segment_close),
     163             :                                     NULL, NULL, NULL);
     164             : 
     165             :     /*
     166             :      * If caller needs us to determine the decoding start point, do so now.
     167             :      * This might take a while.
     168             :      */
     169         248 :     if (find_startpoint)
     170         234 :         DecodingContextFindStartpoint(ctx);
     171             : 
     172             :     /* don't need the decoding context anymore */
     173         244 :     FreeDecodingContext(ctx);
     174         244 : }
     175             : 
     176             : /*
     177             :  * SQL function for creating a new logical replication slot.
     178             :  */
     179             : Datum
     180         250 : pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
     181             : {
     182         250 :     Name        name = PG_GETARG_NAME(0);
     183         250 :     Name        plugin = PG_GETARG_NAME(1);
     184         250 :     bool        temporary = PG_GETARG_BOOL(2);
     185         250 :     bool        two_phase = PG_GETARG_BOOL(3);
     186         250 :     bool        failover = PG_GETARG_BOOL(4);
     187             :     Datum       result;
     188             :     TupleDesc   tupdesc;
     189             :     HeapTuple   tuple;
     190             :     Datum       values[2];
     191             :     bool        nulls[2];
     192             : 
     193         250 :     if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
     194           0 :         elog(ERROR, "return type must be a row type");
     195             : 
     196         250 :     CheckSlotPermissions();
     197             : 
     198         248 :     CheckLogicalDecodingRequirements();
     199             : 
     200         248 :     create_logical_replication_slot(NameStr(*name),
     201         248 :                                     NameStr(*plugin),
     202             :                                     temporary,
     203             :                                     two_phase,
     204             :                                     failover,
     205             :                                     InvalidXLogRecPtr,
     206             :                                     true);
     207             : 
     208         230 :     values[0] = NameGetDatum(&MyReplicationSlot->data.name);
     209         230 :     values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush);
     210             : 
     211         230 :     memset(nulls, 0, sizeof(nulls));
     212             : 
     213         230 :     tuple = heap_form_tuple(tupdesc, values, nulls);
     214         230 :     result = HeapTupleGetDatum(tuple);
     215             : 
     216             :     /* ok, slot is now fully created, mark it as persistent if needed */
     217         230 :     if (!temporary)
     218         220 :         ReplicationSlotPersist();
     219         230 :     ReplicationSlotRelease();
     220             : 
     221         230 :     PG_RETURN_DATUM(result);
     222             : }
     223             : 
     224             : 
     225             : /*
     226             :  * SQL function for dropping a replication slot.
     227             :  */
     228             : Datum
     229         280 : pg_drop_replication_slot(PG_FUNCTION_ARGS)
     230             : {
     231         280 :     Name        name = PG_GETARG_NAME(0);
     232             : 
     233         280 :     CheckSlotPermissions();
     234             : 
     235         276 :     CheckSlotRequirements();
     236             : 
     237         276 :     ReplicationSlotDrop(NameStr(*name), true);
     238             : 
     239         264 :     PG_RETURN_VOID();
     240             : }
     241             : 
     242             : /*
     243             :  * pg_get_replication_slots - SQL SRF showing all replication slots
     244             :  * that currently exist on the database cluster.
     245             :  */
     246             : Datum
     247         672 : pg_get_replication_slots(PG_FUNCTION_ARGS)
     248             : {
     249             : #define PG_GET_REPLICATION_SLOTS_COLS 21
     250         672 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
     251             :     XLogRecPtr  currlsn;
     252             :     int         slotno;
     253             : 
     254             :     /*
     255             :      * We don't require any special permission to see this function's data
     256             :      * because nothing should be sensitive. The most critical being the slot
     257             :      * name, which shouldn't contain anything particularly sensitive.
     258             :      */
     259             : 
     260         672 :     InitMaterializedSRF(fcinfo, 0);
     261             : 
     262         672 :     currlsn = GetXLogWriteRecPtr();
     263             : 
     264         672 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     265        5742 :     for (slotno = 0; slotno < max_replication_slots; slotno++)
     266             :     {
     267        5070 :         ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[slotno];
     268             :         ReplicationSlot slot_contents;
     269             :         Datum       values[PG_GET_REPLICATION_SLOTS_COLS];
     270             :         bool        nulls[PG_GET_REPLICATION_SLOTS_COLS];
     271             :         WALAvailability walstate;
     272             :         int         i;
     273             :         ReplicationSlotInvalidationCause cause;
     274             : 
     275        5070 :         if (!slot->in_use)
     276        3960 :             continue;
     277             : 
     278             :         /* Copy slot contents while holding spinlock, then examine at leisure */
     279        1110 :         SpinLockAcquire(&slot->mutex);
     280        1110 :         slot_contents = *slot;
     281        1110 :         SpinLockRelease(&slot->mutex);
     282             : 
     283        1110 :         memset(values, 0, sizeof(values));
     284        1110 :         memset(nulls, 0, sizeof(nulls));
     285             : 
     286        1110 :         i = 0;
     287        1110 :         values[i++] = NameGetDatum(&slot_contents.data.name);
     288             : 
     289        1110 :         if (slot_contents.data.database == InvalidOid)
     290         334 :             nulls[i++] = true;
     291             :         else
     292         776 :             values[i++] = NameGetDatum(&slot_contents.data.plugin);
     293             : 
     294        1110 :         if (slot_contents.data.database == InvalidOid)
     295         334 :             values[i++] = CStringGetTextDatum("physical");
     296             :         else
     297         776 :             values[i++] = CStringGetTextDatum("logical");
     298             : 
     299        1110 :         if (slot_contents.data.database == InvalidOid)
     300         334 :             nulls[i++] = true;
     301             :         else
     302         776 :             values[i++] = ObjectIdGetDatum(slot_contents.data.database);
     303             : 
     304        1110 :         values[i++] = BoolGetDatum(slot_contents.data.persistency == RS_TEMPORARY);
     305        1110 :         values[i++] = BoolGetDatum(slot_contents.active_pid != 0);
     306             : 
     307        1110 :         if (slot_contents.active_pid != 0)
     308         386 :             values[i++] = Int32GetDatum(slot_contents.active_pid);
     309             :         else
     310         724 :             nulls[i++] = true;
     311             : 
     312        1110 :         if (slot_contents.data.xmin != InvalidTransactionId)
     313         164 :             values[i++] = TransactionIdGetDatum(slot_contents.data.xmin);
     314             :         else
     315         946 :             nulls[i++] = true;
     316             : 
     317        1110 :         if (slot_contents.data.catalog_xmin != InvalidTransactionId)
     318         852 :             values[i++] = TransactionIdGetDatum(slot_contents.data.catalog_xmin);
     319             :         else
     320         258 :             nulls[i++] = true;
     321             : 
     322        1110 :         if (XLogRecPtrIsValid(slot_contents.data.restart_lsn))
     323        1036 :             values[i++] = LSNGetDatum(slot_contents.data.restart_lsn);
     324             :         else
     325          74 :             nulls[i++] = true;
     326             : 
     327        1110 :         if (XLogRecPtrIsValid(slot_contents.data.confirmed_flush))
     328         726 :             values[i++] = LSNGetDatum(slot_contents.data.confirmed_flush);
     329             :         else
     330         384 :             nulls[i++] = true;
     331             : 
     332             :         /*
     333             :          * If the slot has not been invalidated, test availability from
     334             :          * restart_lsn.
     335             :          */
     336        1110 :         if (slot_contents.data.invalidated != RS_INVAL_NONE)
     337          66 :             walstate = WALAVAIL_REMOVED;
     338             :         else
     339        1044 :             walstate = GetWALAvailability(slot_contents.data.restart_lsn);
     340             : 
     341        1110 :         switch (walstate)
     342             :         {
     343          68 :             case WALAVAIL_INVALID_LSN:
     344          68 :                 nulls[i++] = true;
     345          68 :                 break;
     346             : 
     347         970 :             case WALAVAIL_RESERVED:
     348         970 :                 values[i++] = CStringGetTextDatum("reserved");
     349         970 :                 break;
     350             : 
     351           4 :             case WALAVAIL_EXTENDED:
     352           4 :                 values[i++] = CStringGetTextDatum("extended");
     353           4 :                 break;
     354             : 
     355           2 :             case WALAVAIL_UNRESERVED:
     356           2 :                 values[i++] = CStringGetTextDatum("unreserved");
     357           2 :                 break;
     358             : 
     359          66 :             case WALAVAIL_REMOVED:
     360             : 
     361             :                 /*
     362             :                  * If we read the restart_lsn long enough ago, maybe that file
     363             :                  * has been removed by now.  However, the walsender could have
     364             :                  * moved forward enough that it jumped to another file after
     365             :                  * we looked.  If checkpointer signalled the process to
     366             :                  * termination, then it's definitely lost; but if a process is
     367             :                  * still alive, then "unreserved" seems more appropriate.
     368             :                  *
     369             :                  * If we do change it, save the state for safe_wal_size below.
     370             :                  */
     371          66 :                 if (XLogRecPtrIsValid(slot_contents.data.restart_lsn))
     372             :                 {
     373             :                     int         pid;
     374             : 
     375          60 :                     SpinLockAcquire(&slot->mutex);
     376          60 :                     pid = slot->active_pid;
     377          60 :                     slot_contents.data.restart_lsn = slot->data.restart_lsn;
     378          60 :                     SpinLockRelease(&slot->mutex);
     379          60 :                     if (pid != 0)
     380             :                     {
     381           0 :                         values[i++] = CStringGetTextDatum("unreserved");
     382           0 :                         walstate = WALAVAIL_UNRESERVED;
     383           0 :                         break;
     384             :                     }
     385             :                 }
     386          66 :                 values[i++] = CStringGetTextDatum("lost");
     387          66 :                 break;
     388             :         }
     389             : 
     390             :         /*
     391             :          * safe_wal_size is only computed for slots that have not been lost,
     392             :          * and only if there's a configured maximum size.
     393             :          */
     394        1110 :         if (walstate == WALAVAIL_REMOVED || max_slot_wal_keep_size_mb < 0)
     395        1100 :             nulls[i++] = true;
     396             :         else
     397             :         {
     398             :             XLogSegNo   targetSeg;
     399             :             uint64      slotKeepSegs;
     400             :             uint64      keepSegs;
     401             :             XLogSegNo   failSeg;
     402             :             XLogRecPtr  failLSN;
     403             : 
     404          10 :             XLByteToSeg(slot_contents.data.restart_lsn, targetSeg, wal_segment_size);
     405             : 
     406             :             /* determine how many segments can be kept by slots */
     407          10 :             slotKeepSegs = XLogMBVarToSegs(max_slot_wal_keep_size_mb, wal_segment_size);
     408             :             /* ditto for wal_keep_size */
     409          10 :             keepSegs = XLogMBVarToSegs(wal_keep_size_mb, wal_segment_size);
     410             : 
     411             :             /* if currpos reaches failLSN, we lose our segment */
     412          10 :             failSeg = targetSeg + Max(slotKeepSegs, keepSegs) + 1;
     413          10 :             XLogSegNoOffsetToRecPtr(failSeg, 0, wal_segment_size, failLSN);
     414             : 
     415          10 :             values[i++] = Int64GetDatum(failLSN - currlsn);
     416             :         }
     417             : 
     418        1110 :         values[i++] = BoolGetDatum(slot_contents.data.two_phase);
     419             : 
     420        1110 :         if (slot_contents.data.two_phase &&
     421          28 :             XLogRecPtrIsValid(slot_contents.data.two_phase_at))
     422          28 :             values[i++] = LSNGetDatum(slot_contents.data.two_phase_at);
     423             :         else
     424        1082 :             nulls[i++] = true;
     425             : 
     426        1110 :         if (slot_contents.inactive_since > 0)
     427         742 :             values[i++] = TimestampTzGetDatum(slot_contents.inactive_since);
     428             :         else
     429         368 :             nulls[i++] = true;
     430             : 
     431        1110 :         cause = slot_contents.data.invalidated;
     432             : 
     433        1110 :         if (SlotIsPhysical(&slot_contents))
     434         334 :             nulls[i++] = true;
     435             :         else
     436             :         {
     437             :             /*
     438             :              * rows_removed and wal_level_insufficient are the only two
     439             :              * reasons for the logical slot's conflict with recovery.
     440             :              */
     441         776 :             if (cause == RS_INVAL_HORIZON ||
     442             :                 cause == RS_INVAL_WAL_LEVEL)
     443          56 :                 values[i++] = BoolGetDatum(true);
     444             :             else
     445         720 :                 values[i++] = BoolGetDatum(false);
     446             :         }
     447             : 
     448        1110 :         if (cause == RS_INVAL_NONE)
     449        1044 :             nulls[i++] = true;
     450             :         else
     451          66 :             values[i++] = CStringGetTextDatum(GetSlotInvalidationCauseName(cause));
     452             : 
     453        1110 :         values[i++] = BoolGetDatum(slot_contents.data.failover);
     454             : 
     455        1110 :         values[i++] = BoolGetDatum(slot_contents.data.synced);
     456             : 
     457        1110 :         if (slot_contents.slotsync_skip_reason == SS_SKIP_NONE)
     458        1106 :             nulls[i++] = true;
     459             :         else
     460           4 :             values[i++] = CStringGetTextDatum(SlotSyncSkipReasonNames[slot_contents.slotsync_skip_reason]);
     461             : 
     462             :         Assert(i == PG_GET_REPLICATION_SLOTS_COLS);
     463             : 
     464        1110 :         tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
     465             :                              values, nulls);
     466             :     }
     467             : 
     468         672 :     LWLockRelease(ReplicationSlotControlLock);
     469             : 
     470         672 :     return (Datum) 0;
     471             : }
     472             : 
     473             : /*
     474             :  * Helper function for advancing our physical replication slot forward.
     475             :  *
     476             :  * The LSN position to move to is compared simply to the slot's restart_lsn,
     477             :  * knowing that any position older than that would be removed by successive
     478             :  * checkpoints.
     479             :  */
     480             : static XLogRecPtr
     481          14 : pg_physical_replication_slot_advance(XLogRecPtr moveto)
     482             : {
     483          14 :     XLogRecPtr  startlsn = MyReplicationSlot->data.restart_lsn;
     484          14 :     XLogRecPtr  retlsn = startlsn;
     485             : 
     486             :     Assert(XLogRecPtrIsValid(moveto));
     487             : 
     488          14 :     if (startlsn < moveto)
     489             :     {
     490          14 :         SpinLockAcquire(&MyReplicationSlot->mutex);
     491          14 :         MyReplicationSlot->data.restart_lsn = moveto;
     492          14 :         SpinLockRelease(&MyReplicationSlot->mutex);
     493          14 :         retlsn = moveto;
     494             : 
     495             :         /*
     496             :          * Dirty the slot so as it is written out at the next checkpoint. Note
     497             :          * that the LSN position advanced may still be lost in the event of a
     498             :          * crash, but this makes the data consistent after a clean shutdown.
     499             :          */
     500          14 :         ReplicationSlotMarkDirty();
     501             : 
     502             :         /*
     503             :          * Wake up logical walsenders holding logical failover slots after
     504             :          * updating the restart_lsn of the physical slot.
     505             :          */
     506          14 :         PhysicalWakeupLogicalWalSnd();
     507             :     }
     508             : 
     509          14 :     return retlsn;
     510             : }
     511             : 
     512             : /*
     513             :  * Advance our logical replication slot forward. See
     514             :  * LogicalSlotAdvanceAndCheckSnapState for details.
     515             :  */
     516             : static XLogRecPtr
     517          16 : pg_logical_replication_slot_advance(XLogRecPtr moveto)
     518             : {
     519          16 :     return LogicalSlotAdvanceAndCheckSnapState(moveto, NULL);
     520             : }
     521             : 
     522             : /*
     523             :  * SQL function for moving the position in a replication slot.
     524             :  */
     525             : Datum
     526          34 : pg_replication_slot_advance(PG_FUNCTION_ARGS)
     527             : {
     528          34 :     Name        slotname = PG_GETARG_NAME(0);
     529          34 :     XLogRecPtr  moveto = PG_GETARG_LSN(1);
     530             :     XLogRecPtr  endlsn;
     531             :     XLogRecPtr  minlsn;
     532             :     TupleDesc   tupdesc;
     533             :     Datum       values[2];
     534             :     bool        nulls[2];
     535             :     HeapTuple   tuple;
     536             :     Datum       result;
     537             : 
     538             :     Assert(!MyReplicationSlot);
     539             : 
     540          34 :     CheckSlotPermissions();
     541             : 
     542          34 :     if (!XLogRecPtrIsValid(moveto))
     543           2 :         ereport(ERROR,
     544             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     545             :                  errmsg("invalid target WAL LSN")));
     546             : 
     547             :     /* Build a tuple descriptor for our result type */
     548          32 :     if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
     549           0 :         elog(ERROR, "return type must be a row type");
     550             : 
     551             :     /*
     552             :      * We can't move slot past what's been flushed/replayed so clamp the
     553             :      * target position accordingly.
     554             :      */
     555          32 :     if (!RecoveryInProgress())
     556          32 :         moveto = Min(moveto, GetFlushRecPtr(NULL));
     557             :     else
     558           0 :         moveto = Min(moveto, GetXLogReplayRecPtr(NULL));
     559             : 
     560             :     /* Acquire the slot so we "own" it */
     561          32 :     ReplicationSlotAcquire(NameStr(*slotname), true, true);
     562             : 
     563             :     /* A slot whose restart_lsn has never been reserved cannot be advanced */
     564          32 :     if (!XLogRecPtrIsValid(MyReplicationSlot->data.restart_lsn))
     565           2 :         ereport(ERROR,
     566             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     567             :                  errmsg("replication slot \"%s\" cannot be advanced",
     568             :                         NameStr(*slotname)),
     569             :                  errdetail("This slot has never previously reserved WAL, or it has been invalidated.")));
     570             : 
     571             :     /*
     572             :      * Check if the slot is not moving backwards.  Physical slots rely simply
     573             :      * on restart_lsn as a minimum point, while logical slots have confirmed
     574             :      * consumption up to confirmed_flush, meaning that in both cases data
     575             :      * older than that is not available anymore.
     576             :      */
     577          30 :     if (OidIsValid(MyReplicationSlot->data.database))
     578          16 :         minlsn = MyReplicationSlot->data.confirmed_flush;
     579             :     else
     580          14 :         minlsn = MyReplicationSlot->data.restart_lsn;
     581             : 
     582          30 :     if (moveto < minlsn)
     583           0 :         ereport(ERROR,
     584             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     585             :                  errmsg("cannot advance replication slot to %X/%08X, minimum is %X/%08X",
     586             :                         LSN_FORMAT_ARGS(moveto), LSN_FORMAT_ARGS(minlsn))));
     587             : 
     588             :     /* Do the actual slot update, depending on the slot type */
     589          30 :     if (OidIsValid(MyReplicationSlot->data.database))
     590          16 :         endlsn = pg_logical_replication_slot_advance(moveto);
     591             :     else
     592          14 :         endlsn = pg_physical_replication_slot_advance(moveto);
     593             : 
     594          30 :     values[0] = NameGetDatum(&MyReplicationSlot->data.name);
     595          30 :     nulls[0] = false;
     596             : 
     597             :     /*
     598             :      * Recompute the minimum LSN and xmin across all slots to adjust with the
     599             :      * advancing potentially done.
     600             :      */
     601          30 :     ReplicationSlotsComputeRequiredXmin(false);
     602          30 :     ReplicationSlotsComputeRequiredLSN();
     603             : 
     604          30 :     ReplicationSlotRelease();
     605             : 
     606             :     /* Return the reached position. */
     607          30 :     values[1] = LSNGetDatum(endlsn);
     608          30 :     nulls[1] = false;
     609             : 
     610          30 :     tuple = heap_form_tuple(tupdesc, values, nulls);
     611          30 :     result = HeapTupleGetDatum(tuple);
     612             : 
     613          30 :     PG_RETURN_DATUM(result);
     614             : }
     615             : 
     616             : /*
     617             :  * Helper function of copying a replication slot.
     618             :  */
     619             : static Datum
     620          34 : copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
     621             : {
     622          34 :     Name        src_name = PG_GETARG_NAME(0);
     623          34 :     Name        dst_name = PG_GETARG_NAME(1);
     624          34 :     ReplicationSlot *src = NULL;
     625             :     ReplicationSlot first_slot_contents;
     626             :     ReplicationSlot second_slot_contents;
     627             :     XLogRecPtr  src_restart_lsn;
     628             :     bool        src_islogical;
     629             :     bool        temporary;
     630             :     char       *plugin;
     631             :     Datum       values[2];
     632             :     bool        nulls[2];
     633             :     Datum       result;
     634             :     TupleDesc   tupdesc;
     635             :     HeapTuple   tuple;
     636             : 
     637          34 :     if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
     638           0 :         elog(ERROR, "return type must be a row type");
     639             : 
     640          34 :     CheckSlotPermissions();
     641             : 
     642          34 :     if (logical_slot)
     643          20 :         CheckLogicalDecodingRequirements();
     644             :     else
     645          14 :         CheckSlotRequirements();
     646             : 
     647          34 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     648             : 
     649             :     /*
     650             :      * We need to prevent the source slot's reserved WAL from being removed,
     651             :      * but we don't want to lock that slot for very long, and it can advance
     652             :      * in the meantime.  So obtain the source slot's data, and create a new
     653             :      * slot using its restart_lsn.  Afterwards we lock the source slot again
     654             :      * and verify that the data we copied (name, type) has not changed
     655             :      * incompatibly.  No inconvenient WAL removal can occur once the new slot
     656             :      * is created -- but since WAL removal could have occurred before we
     657             :      * managed to create the new slot, we advance the new slot's restart_lsn
     658             :      * to the source slot's updated restart_lsn the second time we lock it.
     659             :      */
     660          40 :     for (int i = 0; i < max_replication_slots; i++)
     661             :     {
     662          40 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
     663             : 
     664          40 :         if (s->in_use && strcmp(NameStr(s->data.name), NameStr(*src_name)) == 0)
     665             :         {
     666             :             /* Copy the slot contents while holding spinlock */
     667          34 :             SpinLockAcquire(&s->mutex);
     668          34 :             first_slot_contents = *s;
     669          34 :             SpinLockRelease(&s->mutex);
     670          34 :             src = s;
     671          34 :             break;
     672             :         }
     673             :     }
     674             : 
     675          34 :     LWLockRelease(ReplicationSlotControlLock);
     676             : 
     677          34 :     if (src == NULL)
     678           0 :         ereport(ERROR,
     679             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
     680             :                  errmsg("replication slot \"%s\" does not exist", NameStr(*src_name))));
     681             : 
     682          34 :     src_islogical = SlotIsLogical(&first_slot_contents);
     683          34 :     src_restart_lsn = first_slot_contents.data.restart_lsn;
     684          34 :     temporary = (first_slot_contents.data.persistency == RS_TEMPORARY);
     685          34 :     plugin = logical_slot ? NameStr(first_slot_contents.data.plugin) : NULL;
     686             : 
     687             :     /* Check type of replication slot */
     688          34 :     if (src_islogical != logical_slot)
     689           4 :         ereport(ERROR,
     690             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     691             :                  src_islogical ?
     692             :                  errmsg("cannot copy physical replication slot \"%s\" as a logical replication slot",
     693             :                         NameStr(*src_name)) :
     694             :                  errmsg("cannot copy logical replication slot \"%s\" as a physical replication slot",
     695             :                         NameStr(*src_name))));
     696             : 
     697             :     /* Copying non-reserved slot doesn't make sense */
     698          30 :     if (!XLogRecPtrIsValid(src_restart_lsn))
     699           2 :         ereport(ERROR,
     700             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     701             :                  errmsg("cannot copy a replication slot that doesn't reserve WAL")));
     702             : 
     703             :     /* Cannot copy an invalidated replication slot */
     704          28 :     if (first_slot_contents.data.invalidated != RS_INVAL_NONE)
     705           2 :         ereport(ERROR,
     706             :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     707             :                 errmsg("cannot copy invalidated replication slot \"%s\"",
     708             :                        NameStr(*src_name)));
     709             : 
     710             :     /* Overwrite params from optional arguments */
     711          26 :     if (PG_NARGS() >= 3)
     712          12 :         temporary = PG_GETARG_BOOL(2);
     713          26 :     if (PG_NARGS() >= 4)
     714             :     {
     715             :         Assert(logical_slot);
     716           8 :         plugin = NameStr(*(PG_GETARG_NAME(3)));
     717             :     }
     718             : 
     719             :     /* Create new slot and acquire it */
     720          26 :     if (logical_slot)
     721             :     {
     722             :         /*
     723             :          * We must not try to read WAL, since we haven't reserved it yet --
     724             :          * hence pass find_startpoint false.  confirmed_flush will be set
     725             :          * below, by copying from the source slot.
     726             :          *
     727             :          * We don't copy the failover option to prevent potential issues with
     728             :          * slot synchronization. For instance, if a slot was synchronized to
     729             :          * the standby, then dropped on the primary, and immediately recreated
     730             :          * by copying from another existing slot with much earlier restart_lsn
     731             :          * and confirmed_flush_lsn, the slot synchronization would only
     732             :          * observe the LSN of the same slot moving backward. As slot
     733             :          * synchronization does not copy the restart_lsn and
     734             :          * confirmed_flush_lsn backward (see update_local_synced_slot() for
     735             :          * details), if a failover happens before the primary's slot catches
     736             :          * up, logical replication cannot continue using the synchronized slot
     737             :          * on the promoted standby because the slot retains the restart_lsn
     738             :          * and confirmed_flush_lsn that are much later than expected.
     739             :          */
     740          16 :         create_logical_replication_slot(NameStr(*dst_name),
     741             :                                         plugin,
     742             :                                         temporary,
     743             :                                         false,
     744             :                                         false,
     745             :                                         src_restart_lsn,
     746             :                                         false);
     747             :     }
     748             :     else
     749          10 :         create_physical_replication_slot(NameStr(*dst_name),
     750             :                                          true,
     751             :                                          temporary,
     752             :                                          src_restart_lsn);
     753             : 
     754             :     /*
     755             :      * Update the destination slot to current values of the source slot;
     756             :      * recheck that the source slot is still the one we saw previously.
     757             :      */
     758             :     {
     759             :         TransactionId copy_effective_xmin;
     760             :         TransactionId copy_effective_catalog_xmin;
     761             :         TransactionId copy_xmin;
     762             :         TransactionId copy_catalog_xmin;
     763             :         XLogRecPtr  copy_restart_lsn;
     764             :         XLogRecPtr  copy_confirmed_flush;
     765             :         bool        copy_islogical;
     766             :         char       *copy_name;
     767             : 
     768             :         /* Copy data of source slot again */
     769          24 :         SpinLockAcquire(&src->mutex);
     770          24 :         second_slot_contents = *src;
     771          24 :         SpinLockRelease(&src->mutex);
     772             : 
     773          24 :         copy_effective_xmin = second_slot_contents.effective_xmin;
     774          24 :         copy_effective_catalog_xmin = second_slot_contents.effective_catalog_xmin;
     775             : 
     776          24 :         copy_xmin = second_slot_contents.data.xmin;
     777          24 :         copy_catalog_xmin = second_slot_contents.data.catalog_xmin;
     778          24 :         copy_restart_lsn = second_slot_contents.data.restart_lsn;
     779          24 :         copy_confirmed_flush = second_slot_contents.data.confirmed_flush;
     780             : 
     781             :         /* for existence check */
     782          24 :         copy_name = NameStr(second_slot_contents.data.name);
     783          24 :         copy_islogical = SlotIsLogical(&second_slot_contents);
     784             : 
     785             :         /*
     786             :          * Check if the source slot still exists and is valid. We regard it as
     787             :          * invalid if the type of replication slot or name has been changed,
     788             :          * or the restart_lsn either is invalid or has gone backward. (The
     789             :          * restart_lsn could go backwards if the source slot is dropped and
     790             :          * copied from an older slot during installation.)
     791             :          *
     792             :          * Since erroring out will release and drop the destination slot we
     793             :          * don't need to release it here.
     794             :          */
     795          24 :         if (copy_restart_lsn < src_restart_lsn ||
     796          24 :             src_islogical != copy_islogical ||
     797          24 :             strcmp(copy_name, NameStr(*src_name)) != 0)
     798           0 :             ereport(ERROR,
     799             :                     (errmsg("could not copy replication slot \"%s\"",
     800             :                             NameStr(*src_name)),
     801             :                      errdetail("The source replication slot was modified incompatibly during the copy operation.")));
     802             : 
     803             :         /* The source slot must have a consistent snapshot */
     804          24 :         if (src_islogical && !XLogRecPtrIsValid(copy_confirmed_flush))
     805           0 :             ereport(ERROR,
     806             :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     807             :                      errmsg("cannot copy unfinished logical replication slot \"%s\"",
     808             :                             NameStr(*src_name)),
     809             :                      errhint("Retry when the source replication slot's confirmed_flush_lsn is valid.")));
     810             : 
     811             :         /*
     812             :          * Copying an invalid slot doesn't make sense. Note that the source
     813             :          * slot can become invalid after we create the new slot and copy the
     814             :          * data of source slot. This is possible because the operations in
     815             :          * InvalidateObsoleteReplicationSlots() are not serialized with this
     816             :          * function. Even though we can't detect such a case here, the copied
     817             :          * slot will become invalid in the next checkpoint cycle.
     818             :          */
     819          24 :         if (second_slot_contents.data.invalidated != RS_INVAL_NONE)
     820           0 :             ereport(ERROR,
     821             :                     errmsg("cannot copy replication slot \"%s\"",
     822             :                            NameStr(*src_name)),
     823             :                     errdetail("The source replication slot was invalidated during the copy operation."));
     824             : 
     825             :         /* Install copied values again */
     826          24 :         SpinLockAcquire(&MyReplicationSlot->mutex);
     827          24 :         MyReplicationSlot->effective_xmin = copy_effective_xmin;
     828          24 :         MyReplicationSlot->effective_catalog_xmin = copy_effective_catalog_xmin;
     829             : 
     830          24 :         MyReplicationSlot->data.xmin = copy_xmin;
     831          24 :         MyReplicationSlot->data.catalog_xmin = copy_catalog_xmin;
     832          24 :         MyReplicationSlot->data.restart_lsn = copy_restart_lsn;
     833          24 :         MyReplicationSlot->data.confirmed_flush = copy_confirmed_flush;
     834          24 :         SpinLockRelease(&MyReplicationSlot->mutex);
     835             : 
     836          24 :         ReplicationSlotMarkDirty();
     837          24 :         ReplicationSlotsComputeRequiredXmin(false);
     838          24 :         ReplicationSlotsComputeRequiredLSN();
     839          24 :         ReplicationSlotSave();
     840             : 
     841             : #ifdef USE_ASSERT_CHECKING
     842             :         /* Check that the restart_lsn is available */
     843             :         {
     844             :             XLogSegNo   segno;
     845             : 
     846             :             XLByteToSeg(copy_restart_lsn, segno, wal_segment_size);
     847             :             Assert(XLogGetLastRemovedSegno() < segno);
     848             :         }
     849             : #endif
     850             :     }
     851             : 
     852             :     /* target slot fully created, mark as persistent if needed */
     853          24 :     if (logical_slot && !temporary)
     854           8 :         ReplicationSlotPersist();
     855             : 
     856             :     /* All done.  Set up the return values */
     857          24 :     values[0] = NameGetDatum(dst_name);
     858          24 :     nulls[0] = false;
     859          24 :     if (XLogRecPtrIsValid(MyReplicationSlot->data.confirmed_flush))
     860             :     {
     861          14 :         values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush);
     862          14 :         nulls[1] = false;
     863             :     }
     864             :     else
     865          10 :         nulls[1] = true;
     866             : 
     867          24 :     tuple = heap_form_tuple(tupdesc, values, nulls);
     868          24 :     result = HeapTupleGetDatum(tuple);
     869             : 
     870          24 :     ReplicationSlotRelease();
     871             : 
     872          24 :     PG_RETURN_DATUM(result);
     873             : }
     874             : 
     875             : /* The wrappers below are all to appease opr_sanity */
     876             : Datum
     877           8 : pg_copy_logical_replication_slot_a(PG_FUNCTION_ARGS)
     878             : {
     879           8 :     return copy_replication_slot(fcinfo, true);
     880             : }
     881             : 
     882             : Datum
     883           0 : pg_copy_logical_replication_slot_b(PG_FUNCTION_ARGS)
     884             : {
     885           0 :     return copy_replication_slot(fcinfo, true);
     886             : }
     887             : 
     888             : Datum
     889          12 : pg_copy_logical_replication_slot_c(PG_FUNCTION_ARGS)
     890             : {
     891          12 :     return copy_replication_slot(fcinfo, true);
     892             : }
     893             : 
     894             : Datum
     895           4 : pg_copy_physical_replication_slot_a(PG_FUNCTION_ARGS)
     896             : {
     897           4 :     return copy_replication_slot(fcinfo, false);
     898             : }
     899             : 
     900             : Datum
     901          10 : pg_copy_physical_replication_slot_b(PG_FUNCTION_ARGS)
     902             : {
     903          10 :     return copy_replication_slot(fcinfo, false);
     904             : }
     905             : 
     906             : /*
     907             :  * Synchronize failover enabled replication slots to a standby server
     908             :  * from the primary server.
     909             :  */
     910             : Datum
     911          22 : pg_sync_replication_slots(PG_FUNCTION_ARGS)
     912             : {
     913             :     WalReceiverConn *wrconn;
     914             :     char       *err;
     915             :     StringInfoData app_name;
     916             : 
     917          22 :     CheckSlotPermissions();
     918             : 
     919          20 :     if (!RecoveryInProgress())
     920           2 :         ereport(ERROR,
     921             :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     922             :                 errmsg("replication slots can only be synchronized to a standby server"));
     923             : 
     924          18 :     ValidateSlotSyncParams(ERROR);
     925             : 
     926             :     /* Load the libpq-specific functions */
     927          18 :     load_file("libpqwalreceiver", false);
     928             : 
     929          18 :     (void) CheckAndGetDbnameFromConninfo();
     930             : 
     931          16 :     initStringInfo(&app_name);
     932          16 :     if (cluster_name[0])
     933          16 :         appendStringInfo(&app_name, "%s_slotsync", cluster_name);
     934             :     else
     935           0 :         appendStringInfoString(&app_name, "slotsync");
     936             : 
     937             :     /* Connect to the primary server. */
     938          16 :     wrconn = walrcv_connect(PrimaryConnInfo, false, false, false,
     939             :                             app_name.data, &err);
     940             : 
     941          16 :     if (!wrconn)
     942           0 :         ereport(ERROR,
     943             :                 errcode(ERRCODE_CONNECTION_FAILURE),
     944             :                 errmsg("synchronization worker \"%s\" could not connect to the primary server: %s",
     945             :                        app_name.data, err));
     946             : 
     947          16 :     pfree(app_name.data);
     948             : 
     949          16 :     SyncReplicationSlots(wrconn);
     950             : 
     951          14 :     walrcv_disconnect(wrconn);
     952             : 
     953          14 :     PG_RETURN_VOID();
     954             : }

Generated by: LCOV version 1.16