LCOV - code coverage report
Current view: top level - src/backend/replication - slotfuncs.c (source / functions) Hit Total Coverage
Test: PostgreSQL 13devel Lines: 254 286 88.8 %
Date: 2019-09-19 02:07:14 Functions: 14 16 87.5 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * slotfuncs.c
       4             :  *     Support functions for replication slots
       5             :  *
       6             :  * Copyright (c) 2012-2019, PostgreSQL Global Development Group
       7             :  *
       8             :  * IDENTIFICATION
       9             :  *    src/backend/replication/slotfuncs.c
      10             :  *
      11             :  *-------------------------------------------------------------------------
      12             :  */
      13             : #include "postgres.h"
      14             : 
      15             : #include "access/htup_details.h"
      16             : #include "access/xlog_internal.h"
      17             : #include "funcapi.h"
      18             : #include "miscadmin.h"
      19             : #include "replication/decode.h"
      20             : #include "replication/slot.h"
      21             : #include "replication/logical.h"
      22             : #include "replication/logicalfuncs.h"
      23             : #include "utils/builtins.h"
      24             : #include "utils/inval.h"
      25             : #include "utils/pg_lsn.h"
      26             : #include "utils/resowner.h"
      27             : 
      28             : static void
      29         342 : check_permissions(void)
      30             : {
      31         342 :     if (!superuser() && !has_rolreplication(GetUserId()))
      32           6 :         ereport(ERROR,
      33             :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
      34             :                  (errmsg("must be superuser or replication role to use replication slots"))));
      35         336 : }
      36             : 
      37             : /*
      38             :  * Helper function for creating a new physical replication slot with
      39             :  * given arguments. Note that this function doesn't release the created
      40             :  * slot.
      41             :  *
      42             :  * If restart_lsn is a valid value, we use it without WAL reservation
      43             :  * routine. So the caller must guarantee that WAL is available.
      44             :  */
      45             : static void
      46          26 : create_physical_replication_slot(char *name, bool immediately_reserve,
      47             :                                  bool temporary, XLogRecPtr restart_lsn)
      48             : {
      49             :     Assert(!MyReplicationSlot);
      50             : 
      51             :     /* acquire replication slot, this will check for conflicting names */
      52          26 :     ReplicationSlotCreate(name, false,
      53             :                           temporary ? RS_TEMPORARY : RS_PERSISTENT);
      54             : 
      55          26 :     if (immediately_reserve)
      56             :     {
      57             :         /* Reserve WAL as the user asked for it */
      58          12 :         if (XLogRecPtrIsInvalid(restart_lsn))
      59           4 :             ReplicationSlotReserveWal();
      60             :         else
      61           8 :             MyReplicationSlot->data.restart_lsn = restart_lsn;
      62             : 
      63             :         /* Write this slot to disk */
      64          12 :         ReplicationSlotMarkDirty();
      65          12 :         ReplicationSlotSave();
      66             :     }
      67          26 : }
      68             : 
      69             : /*
      70             :  * SQL function for creating a new physical (streaming replication)
      71             :  * replication slot.
      72             :  */
      73             : Datum
      74          18 : pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
      75             : {
      76          18 :     Name        name = PG_GETARG_NAME(0);
      77          18 :     bool        immediately_reserve = PG_GETARG_BOOL(1);
      78          18 :     bool        temporary = PG_GETARG_BOOL(2);
      79             :     Datum       values[2];
      80             :     bool        nulls[2];
      81             :     TupleDesc   tupdesc;
      82             :     HeapTuple   tuple;
      83             :     Datum       result;
      84             : 
      85          18 :     if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
      86           0 :         elog(ERROR, "return type must be a row type");
      87             : 
      88          18 :     check_permissions();
      89             : 
      90          18 :     CheckSlotRequirements();
      91             : 
      92          18 :     create_physical_replication_slot(NameStr(*name),
      93             :                                      immediately_reserve,
      94             :                                      temporary,
      95             :                                      InvalidXLogRecPtr);
      96             : 
      97          18 :     values[0] = NameGetDatum(&MyReplicationSlot->data.name);
      98          18 :     nulls[0] = false;
      99             : 
     100          18 :     if (immediately_reserve)
     101             :     {
     102           4 :         values[1] = LSNGetDatum(MyReplicationSlot->data.restart_lsn);
     103           4 :         nulls[1] = false;
     104             :     }
     105             :     else
     106          14 :         nulls[1] = true;
     107             : 
     108          18 :     tuple = heap_form_tuple(tupdesc, values, nulls);
     109          18 :     result = HeapTupleGetDatum(tuple);
     110             : 
     111          18 :     ReplicationSlotRelease();
     112             : 
     113          18 :     PG_RETURN_DATUM(result);
     114             : }
     115             : 
     116             : 
     117             : /*
     118             :  * Helper function for creating a new logical replication slot with
     119             :  * given arguments. Note that this function doesn't release the created
     120             :  * slot.
     121             :  */
     122             : static void
     123         156 : create_logical_replication_slot(char *name, char *plugin,
     124             :                                 bool temporary, XLogRecPtr restart_lsn)
     125             : {
     126         156 :     LogicalDecodingContext *ctx = NULL;
     127             : 
     128             :     Assert(!MyReplicationSlot);
     129             : 
     130             :     /*
     131             :      * Acquire a logical decoding slot, this will check for conflicting names.
     132             :      * Initially create persistent slot as ephemeral - that allows us to
     133             :      * nicely handle errors during initialization because it'll get dropped if
     134             :      * this transaction fails. We'll make it persistent at the end. Temporary
     135             :      * slots can be created as temporary from beginning as they get dropped on
     136             :      * error as well.
     137             :      */
     138         156 :     ReplicationSlotCreate(name, true,
     139             :                           temporary ? RS_TEMPORARY : RS_EPHEMERAL);
     140             : 
     141             :     /*
     142             :      * Create logical decoding context, to build the initial snapshot.
     143             :      */
     144         148 :     ctx = CreateInitDecodingContext(plugin, NIL,
     145             :                                     false,  /* do not build snapshot */
     146             :                                     restart_lsn,
     147             :                                     logical_read_local_xlog_page, NULL, NULL,
     148             :                                     NULL);
     149             : 
     150             :     /* build initial snapshot, might take a while */
     151         142 :     DecodingContextFindStartpoint(ctx);
     152             : 
     153             :     /* don't need the decoding context anymore */
     154         142 :     FreeDecodingContext(ctx);
     155         142 : }
     156             : 
     157             : /*
     158             :  * SQL function for creating a new logical replication slot.
     159             :  */
     160             : Datum
     161         144 : pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
     162             : {
     163         144 :     Name        name = PG_GETARG_NAME(0);
     164         144 :     Name        plugin = PG_GETARG_NAME(1);
     165         144 :     bool        temporary = PG_GETARG_BOOL(2);
     166             :     Datum       result;
     167             :     TupleDesc   tupdesc;
     168             :     HeapTuple   tuple;
     169             :     Datum       values[2];
     170             :     bool        nulls[2];
     171             : 
     172         144 :     if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
     173           0 :         elog(ERROR, "return type must be a row type");
     174             : 
     175         144 :     check_permissions();
     176             : 
     177         142 :     CheckLogicalDecodingRequirements();
     178             : 
     179         284 :     create_logical_replication_slot(NameStr(*name),
     180         142 :                                     NameStr(*plugin),
     181             :                                     temporary,
     182             :                                     InvalidXLogRecPtr);
     183             : 
     184         130 :     values[0] = NameGetDatum(&MyReplicationSlot->data.name);
     185         130 :     values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush);
     186             : 
     187         130 :     memset(nulls, 0, sizeof(nulls));
     188             : 
     189         130 :     tuple = heap_form_tuple(tupdesc, values, nulls);
     190         130 :     result = HeapTupleGetDatum(tuple);
     191             : 
     192             :     /* ok, slot is now fully created, mark it as persistent if needed */
     193         130 :     if (!temporary)
     194         120 :         ReplicationSlotPersist();
     195         130 :     ReplicationSlotRelease();
     196             : 
     197         130 :     PG_RETURN_DATUM(result);
     198             : }
     199             : 
     200             : 
     201             : /*
     202             :  * SQL function for dropping a replication slot.
     203             :  */
     204             : Datum
     205         144 : pg_drop_replication_slot(PG_FUNCTION_ARGS)
     206             : {
     207         144 :     Name        name = PG_GETARG_NAME(0);
     208             : 
     209         144 :     check_permissions();
     210             : 
     211         140 :     CheckSlotRequirements();
     212             : 
     213         140 :     ReplicationSlotDrop(NameStr(*name), true);
     214             : 
     215         130 :     PG_RETURN_VOID();
     216             : }
     217             : 
     218             : /*
     219             :  * pg_get_replication_slots - SQL SRF showing active replication slots.
     220             :  */
     221             : Datum
     222          98 : pg_get_replication_slots(PG_FUNCTION_ARGS)
     223             : {
     224             : #define PG_GET_REPLICATION_SLOTS_COLS 11
     225          98 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
     226             :     TupleDesc   tupdesc;
     227             :     Tuplestorestate *tupstore;
     228             :     MemoryContext per_query_ctx;
     229             :     MemoryContext oldcontext;
     230             :     int         slotno;
     231             : 
     232             :     /* check to see if caller supports us returning a tuplestore */
     233          98 :     if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
     234           0 :         ereport(ERROR,
     235             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     236             :                  errmsg("set-valued function called in context that cannot accept a set")));
     237          98 :     if (!(rsinfo->allowedModes & SFRM_Materialize))
     238           0 :         ereport(ERROR,
     239             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     240             :                  errmsg("materialize mode required, but it is not " \
     241             :                         "allowed in this context")));
     242             : 
     243             :     /* Build a tuple descriptor for our result type */
     244          98 :     if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
     245           0 :         elog(ERROR, "return type must be a row type");
     246             : 
     247             :     /*
     248             :      * We don't require any special permission to see this function's data
     249             :      * because nothing should be sensitive. The most critical being the slot
     250             :      * name, which shouldn't contain anything particularly sensitive.
     251             :      */
     252             : 
     253          98 :     per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
     254          98 :     oldcontext = MemoryContextSwitchTo(per_query_ctx);
     255             : 
     256          98 :     tupstore = tuplestore_begin_heap(true, false, work_mem);
     257          98 :     rsinfo->returnMode = SFRM_Materialize;
     258          98 :     rsinfo->setResult = tupstore;
     259          98 :     rsinfo->setDesc = tupdesc;
     260             : 
     261          98 :     MemoryContextSwitchTo(oldcontext);
     262             : 
     263          98 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     264         548 :     for (slotno = 0; slotno < max_replication_slots; slotno++)
     265             :     {
     266         450 :         ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[slotno];
     267             :         Datum       values[PG_GET_REPLICATION_SLOTS_COLS];
     268             :         bool        nulls[PG_GET_REPLICATION_SLOTS_COLS];
     269             : 
     270             :         ReplicationSlotPersistency persistency;
     271             :         TransactionId xmin;
     272             :         TransactionId catalog_xmin;
     273             :         XLogRecPtr  restart_lsn;
     274             :         XLogRecPtr  confirmed_flush_lsn;
     275             :         pid_t       active_pid;
     276             :         Oid         database;
     277             :         NameData    slot_name;
     278             :         NameData    plugin;
     279             :         int         i;
     280             : 
     281         450 :         if (!slot->in_use)
     282         302 :             continue;
     283             : 
     284         148 :         SpinLockAcquire(&slot->mutex);
     285             : 
     286         148 :         xmin = slot->data.xmin;
     287         148 :         catalog_xmin = slot->data.catalog_xmin;
     288         148 :         database = slot->data.database;
     289         148 :         restart_lsn = slot->data.restart_lsn;
     290         148 :         confirmed_flush_lsn = slot->data.confirmed_flush;
     291         148 :         namecpy(&slot_name, &slot->data.name);
     292         148 :         namecpy(&plugin, &slot->data.plugin);
     293         148 :         active_pid = slot->active_pid;
     294         148 :         persistency = slot->data.persistency;
     295             : 
     296         148 :         SpinLockRelease(&slot->mutex);
     297             : 
     298         148 :         memset(nulls, 0, sizeof(nulls));
     299             : 
     300         148 :         i = 0;
     301         148 :         values[i++] = NameGetDatum(&slot_name);
     302             : 
     303         148 :         if (database == InvalidOid)
     304          76 :             nulls[i++] = true;
     305             :         else
     306          72 :             values[i++] = NameGetDatum(&plugin);
     307             : 
     308         148 :         if (database == InvalidOid)
     309          76 :             values[i++] = CStringGetTextDatum("physical");
     310             :         else
     311          72 :             values[i++] = CStringGetTextDatum("logical");
     312             : 
     313         148 :         if (database == InvalidOid)
     314          76 :             nulls[i++] = true;
     315             :         else
     316          72 :             values[i++] = database;
     317             : 
     318         148 :         values[i++] = BoolGetDatum(persistency == RS_TEMPORARY);
     319         148 :         values[i++] = BoolGetDatum(active_pid != 0);
     320             : 
     321         148 :         if (active_pid != 0)
     322          70 :             values[i++] = Int32GetDatum(active_pid);
     323             :         else
     324          78 :             nulls[i++] = true;
     325             : 
     326         148 :         if (xmin != InvalidTransactionId)
     327          26 :             values[i++] = TransactionIdGetDatum(xmin);
     328             :         else
     329         122 :             nulls[i++] = true;
     330             : 
     331         148 :         if (catalog_xmin != InvalidTransactionId)
     332          78 :             values[i++] = TransactionIdGetDatum(catalog_xmin);
     333             :         else
     334          70 :             nulls[i++] = true;
     335             : 
     336         148 :         if (restart_lsn != InvalidXLogRecPtr)
     337         142 :             values[i++] = LSNGetDatum(restart_lsn);
     338             :         else
     339           6 :             nulls[i++] = true;
     340             : 
     341         148 :         if (confirmed_flush_lsn != InvalidXLogRecPtr)
     342          72 :             values[i++] = LSNGetDatum(confirmed_flush_lsn);
     343             :         else
     344          76 :             nulls[i++] = true;
     345             : 
     346         148 :         tuplestore_putvalues(tupstore, tupdesc, values, nulls);
     347             :     }
     348          98 :     LWLockRelease(ReplicationSlotControlLock);
     349             : 
     350             :     tuplestore_donestoring(tupstore);
     351             : 
     352          98 :     return (Datum) 0;
     353             : }
     354             : 
     355             : /*
     356             :  * Helper function for advancing our physical replication slot forward.
     357             :  *
     358             :  * The LSN position to move to is compared simply to the slot's restart_lsn,
     359             :  * knowing that any position older than that would be removed by successive
     360             :  * checkpoints.
     361             :  */
     362             : static XLogRecPtr
     363           0 : pg_physical_replication_slot_advance(XLogRecPtr moveto)
     364             : {
     365           0 :     XLogRecPtr  startlsn = MyReplicationSlot->data.restart_lsn;
     366           0 :     XLogRecPtr  retlsn = startlsn;
     367             : 
     368           0 :     if (startlsn < moveto)
     369             :     {
     370           0 :         SpinLockAcquire(&MyReplicationSlot->mutex);
     371           0 :         MyReplicationSlot->data.restart_lsn = moveto;
     372           0 :         SpinLockRelease(&MyReplicationSlot->mutex);
     373           0 :         retlsn = moveto;
     374             :     }
     375             : 
     376           0 :     return retlsn;
     377             : }
     378             : 
     379             : /*
     380             :  * Helper function for advancing our logical replication slot forward.
     381             :  *
     382             :  * The slot's restart_lsn is used as start point for reading records,
     383             :  * while confirmed_lsn is used as base point for the decoding context.
     384             :  *
     385             :  * We cannot just do LogicalConfirmReceivedLocation to update confirmed_flush,
     386             :  * because we need to digest WAL to advance restart_lsn allowing to recycle
     387             :  * WAL and removal of old catalog tuples.  As decoding is done in fast_forward
     388             :  * mode, no changes are generated anyway.
     389             :  */
     390             : static XLogRecPtr
     391           4 : pg_logical_replication_slot_advance(XLogRecPtr moveto)
     392             : {
     393             :     LogicalDecodingContext *ctx;
     394           4 :     ResourceOwner old_resowner = CurrentResourceOwner;
     395             :     XLogRecPtr  startlsn;
     396             :     XLogRecPtr  retlsn;
     397             : 
     398           4 :     PG_TRY();
     399             :     {
     400             :         /*
     401             :          * Create our decoding context in fast_forward mode, passing start_lsn
     402             :          * as InvalidXLogRecPtr, so that we start processing from my slot's
     403             :          * confirmed_flush.
     404             :          */
     405           4 :         ctx = CreateDecodingContext(InvalidXLogRecPtr,
     406             :                                     NIL,
     407             :                                     true,   /* fast_forward */
     408             :                                     logical_read_local_xlog_page,
     409             :                                     NULL, NULL, NULL);
     410             : 
     411             :         /*
     412             :          * Start reading at the slot's restart_lsn, which we know to point to
     413             :          * a valid record.
     414             :          */
     415           4 :         startlsn = MyReplicationSlot->data.restart_lsn;
     416             : 
     417             :         /* Initialize our return value in case we don't do anything */
     418           4 :         retlsn = MyReplicationSlot->data.confirmed_flush;
     419             : 
     420             :         /* invalidate non-timetravel entries */
     421           4 :         InvalidateSystemCaches();
     422             : 
     423             :         /* Decode at least one record, until we run out of records */
     424          50 :         while ((!XLogRecPtrIsInvalid(startlsn) &&
     425          42 :                 startlsn < moveto) ||
     426          84 :                (!XLogRecPtrIsInvalid(ctx->reader->EndRecPtr) &&
     427          42 :                 ctx->reader->EndRecPtr < moveto))
     428             :         {
     429          46 :             char       *errm = NULL;
     430             :             XLogRecord *record;
     431             : 
     432             :             /*
     433             :              * Read records.  No changes are generated in fast_forward mode,
     434             :              * but snapbuilder/slot statuses are updated properly.
     435             :              */
     436          46 :             record = XLogReadRecord(ctx->reader, startlsn, &errm);
     437          46 :             if (errm)
     438           0 :                 elog(ERROR, "%s", errm);
     439             : 
     440             :             /* Read sequentially from now on */
     441          46 :             startlsn = InvalidXLogRecPtr;
     442             : 
     443             :             /*
     444             :              * Process the record.  Storage-level changes are ignored in
     445             :              * fast_forward mode, but other modules (such as snapbuilder)
     446             :              * might still have critical updates to do.
     447             :              */
     448          46 :             if (record)
     449          46 :                 LogicalDecodingProcessRecord(ctx, ctx->reader);
     450             : 
     451             :             /* Stop once the requested target has been reached */
     452          46 :             if (moveto <= ctx->reader->EndRecPtr)
     453           4 :                 break;
     454             : 
     455          42 :             CHECK_FOR_INTERRUPTS();
     456             :         }
     457             : 
     458             :         /*
     459             :          * Logical decoding could have clobbered CurrentResourceOwner during
     460             :          * transaction management, so restore the executor's value.  (This is
     461             :          * a kluge, but it's not worth cleaning up right now.)
     462             :          */
     463           4 :         CurrentResourceOwner = old_resowner;
     464             : 
     465           4 :         if (ctx->reader->EndRecPtr != InvalidXLogRecPtr)
     466             :         {
     467           4 :             LogicalConfirmReceivedLocation(moveto);
     468             : 
     469             :             /*
     470             :              * If only the confirmed_flush LSN has changed the slot won't get
     471             :              * marked as dirty by the above. Callers on the walsender
     472             :              * interface are expected to keep track of their own progress and
     473             :              * don't need it written out. But SQL-interface users cannot
     474             :              * specify their own start positions and it's harder for them to
     475             :              * keep track of their progress, so we should make more of an
     476             :              * effort to save it for them.
     477             :              *
     478             :              * Dirty the slot so it's written out at the next checkpoint.
     479             :              * We'll still lose its position on crash, as documented, but it's
     480             :              * better than always losing the position even on clean restart.
     481             :              */
     482           4 :             ReplicationSlotMarkDirty();
     483             :         }
     484             : 
     485           4 :         retlsn = MyReplicationSlot->data.confirmed_flush;
     486             : 
     487             :         /* free context, call shutdown callback */
     488           4 :         FreeDecodingContext(ctx);
     489             : 
     490           4 :         InvalidateSystemCaches();
     491             :     }
     492           0 :     PG_CATCH();
     493             :     {
     494             :         /* clear all timetravel entries */
     495           0 :         InvalidateSystemCaches();
     496             : 
     497           0 :         PG_RE_THROW();
     498             :     }
     499           4 :     PG_END_TRY();
     500             : 
     501           4 :     return retlsn;
     502             : }
     503             : 
     504             : /*
     505             :  * SQL function for moving the position in a replication slot.
     506             :  */
     507             : Datum
     508           8 : pg_replication_slot_advance(PG_FUNCTION_ARGS)
     509             : {
     510           8 :     Name        slotname = PG_GETARG_NAME(0);
     511           8 :     XLogRecPtr  moveto = PG_GETARG_LSN(1);
     512             :     XLogRecPtr  endlsn;
     513             :     XLogRecPtr  minlsn;
     514             :     TupleDesc   tupdesc;
     515             :     Datum       values[2];
     516             :     bool        nulls[2];
     517             :     HeapTuple   tuple;
     518             :     Datum       result;
     519             : 
     520             :     Assert(!MyReplicationSlot);
     521             : 
     522           8 :     check_permissions();
     523             : 
     524           8 :     if (XLogRecPtrIsInvalid(moveto))
     525           2 :         ereport(ERROR,
     526             :                 (errmsg("invalid target wal lsn")));
     527             : 
     528             :     /* Build a tuple descriptor for our result type */
     529           6 :     if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
     530           0 :         elog(ERROR, "return type must be a row type");
     531             : 
     532             :     /*
     533             :      * We can't move slot past what's been flushed/replayed so clamp the
     534             :      * target position accordingly.
     535             :      */
     536           6 :     if (!RecoveryInProgress())
     537           6 :         moveto = Min(moveto, GetFlushRecPtr());
     538             :     else
     539           0 :         moveto = Min(moveto, GetXLogReplayRecPtr(&ThisTimeLineID));
     540             : 
     541             :     /* Acquire the slot so we "own" it */
     542           6 :     ReplicationSlotAcquire(NameStr(*slotname), true);
     543             : 
     544             :     /* A slot whose restart_lsn has never been reserved cannot be advanced */
     545           6 :     if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn))
     546           2 :         ereport(ERROR,
     547             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     548             :                  errmsg("cannot advance replication slot that has not previously reserved WAL")));
     549             : 
     550             :     /*
     551             :      * Check if the slot is not moving backwards.  Physical slots rely simply
     552             :      * on restart_lsn as a minimum point, while logical slots have confirmed
     553             :      * consumption up to confirmed_lsn, meaning that in both cases data older
     554             :      * than that is not available anymore.
     555             :      */
     556           4 :     if (OidIsValid(MyReplicationSlot->data.database))
     557           4 :         minlsn = MyReplicationSlot->data.confirmed_flush;
     558             :     else
     559           0 :         minlsn = MyReplicationSlot->data.restart_lsn;
     560             : 
     561           4 :     if (moveto < minlsn)
     562           0 :         ereport(ERROR,
     563             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     564             :                  errmsg("cannot advance replication slot to %X/%X, minimum is %X/%X",
     565             :                         (uint32) (moveto >> 32), (uint32) moveto,
     566             :                         (uint32) (minlsn >> 32), (uint32) minlsn)));
     567             : 
     568             :     /* Do the actual slot update, depending on the slot type */
     569           4 :     if (OidIsValid(MyReplicationSlot->data.database))
     570           4 :         endlsn = pg_logical_replication_slot_advance(moveto);
     571             :     else
     572           0 :         endlsn = pg_physical_replication_slot_advance(moveto);
     573             : 
     574           4 :     values[0] = NameGetDatum(&MyReplicationSlot->data.name);
     575           4 :     nulls[0] = false;
     576             : 
     577             :     /* Update the on disk state when lsn was updated. */
     578           4 :     if (XLogRecPtrIsInvalid(endlsn))
     579             :     {
     580           0 :         ReplicationSlotMarkDirty();
     581           0 :         ReplicationSlotsComputeRequiredXmin(false);
     582           0 :         ReplicationSlotsComputeRequiredLSN();
     583           0 :         ReplicationSlotSave();
     584             :     }
     585             : 
     586           4 :     ReplicationSlotRelease();
     587             : 
     588             :     /* Return the reached position. */
     589           4 :     values[1] = LSNGetDatum(endlsn);
     590           4 :     nulls[1] = false;
     591             : 
     592           4 :     tuple = heap_form_tuple(tupdesc, values, nulls);
     593           4 :     result = HeapTupleGetDatum(tuple);
     594             : 
     595           4 :     PG_RETURN_DATUM(result);
     596             : }
     597             : 
     598             : /*
     599             :  * Helper function of copying a replication slot.
     600             :  */
     601             : static Datum
     602          28 : copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
     603             : {
     604          28 :     Name        src_name = PG_GETARG_NAME(0);
     605          28 :     Name        dst_name = PG_GETARG_NAME(1);
     606          28 :     ReplicationSlot *src = NULL;
     607             :     XLogRecPtr  src_restart_lsn;
     608             :     bool        src_islogical;
     609             :     bool        temporary;
     610             :     char       *plugin;
     611             :     Datum       values[2];
     612             :     bool        nulls[2];
     613             :     Datum       result;
     614             :     TupleDesc   tupdesc;
     615             :     HeapTuple   tuple;
     616             : 
     617          28 :     if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
     618           0 :         elog(ERROR, "return type must be a row type");
     619             : 
     620          28 :     check_permissions();
     621             : 
     622          28 :     if (logical_slot)
     623          16 :         CheckLogicalDecodingRequirements();
     624             :     else
     625          12 :         CheckSlotRequirements();
     626             : 
     627          28 :     LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
     628             : 
     629             :     /*
     630             :      * We need to prevent the source slot's reserved WAL from being removed,
     631             :      * but we don't want to lock that slot for very long, and it can advance
     632             :      * in the meantime.  So obtain the source slot's data, and create a new
     633             :      * slot using its restart_lsn.  Afterwards we lock the source slot again
     634             :      * and verify that the data we copied (name, type) has not changed
     635             :      * incompatibly.  No inconvenient WAL removal can occur once the new slot
     636             :      * is created -- but since WAL removal could have occurred before we
     637             :      * managed to create the new slot, we advance the new slot's restart_lsn
     638             :      * to the source slot's updated restart_lsn the second time we lock it.
     639             :      */
     640          30 :     for (int i = 0; i < max_replication_slots; i++)
     641             :     {
     642          30 :         ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
     643             : 
     644          30 :         if (s->in_use && strcmp(NameStr(s->data.name), NameStr(*src_name)) == 0)
     645             :         {
     646          28 :             SpinLockAcquire(&s->mutex);
     647          28 :             src_islogical = SlotIsLogical(s);
     648          28 :             src_restart_lsn = s->data.restart_lsn;
     649          28 :             temporary = s->data.persistency == RS_TEMPORARY;
     650          28 :             plugin = logical_slot ? pstrdup(NameStr(s->data.plugin)) : NULL;
     651          28 :             SpinLockRelease(&s->mutex);
     652             : 
     653          28 :             src = s;
     654          28 :             break;
     655             :         }
     656             :     }
     657             : 
     658          28 :     LWLockRelease(ReplicationSlotControlLock);
     659             : 
     660          28 :     if (src == NULL)
     661           0 :         ereport(ERROR,
     662             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
     663             :                  errmsg("replication slot \"%s\" does not exist", NameStr(*src_name))));
     664             : 
     665             :     /* Check type of replication slot */
     666          28 :     if (src_islogical != logical_slot)
     667           4 :         ereport(ERROR,
     668             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     669             :                  src_islogical ?
     670             :                  errmsg("cannot copy physical replication slot \"%s\" as a logical replication slot",
     671             :                         NameStr(*src_name)) :
     672             :                  errmsg("cannot copy logical replication slot \"%s\" as a physical replication slot",
     673             :                         NameStr(*src_name))));
     674             : 
     675             :     /* Copying non-reserved slot doesn't make sense */
     676          24 :     if (XLogRecPtrIsInvalid(src_restart_lsn))
     677             :     {
     678             :         Assert(!logical_slot);
     679           2 :         ereport(ERROR,
     680             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     681             :                  (errmsg("cannot copy a replication slot that doesn't reserve WAL"))));
     682             :     }
     683             : 
     684             :     /* Overwrite params from optional arguments */
     685          22 :     if (PG_NARGS() >= 3)
     686          12 :         temporary = PG_GETARG_BOOL(2);
     687          22 :     if (PG_NARGS() >= 4)
     688             :     {
     689             :         Assert(logical_slot);
     690           8 :         plugin = NameStr(*(PG_GETARG_NAME(3)));
     691             :     }
     692             : 
     693             :     /* Create new slot and acquire it */
     694          22 :     if (logical_slot)
     695          14 :         create_logical_replication_slot(NameStr(*dst_name),
     696             :                                         plugin,
     697             :                                         temporary,
     698             :                                         src_restart_lsn);
     699             :     else
     700           8 :         create_physical_replication_slot(NameStr(*dst_name),
     701             :                                          true,
     702             :                                          temporary,
     703             :                                          src_restart_lsn);
     704             : 
     705             :     /*
     706             :      * Update the destination slot to current values of the source slot;
     707             :      * recheck that the source slot is still the one we saw previously.
     708             :      */
     709             :     {
     710             :         TransactionId copy_effective_xmin;
     711             :         TransactionId copy_effective_catalog_xmin;
     712             :         TransactionId copy_xmin;
     713             :         TransactionId copy_catalog_xmin;
     714             :         XLogRecPtr  copy_restart_lsn;
     715             :         bool        copy_islogical;
     716             :         char       *copy_name;
     717             : 
     718             :         /* Copy data of source slot again */
     719          20 :         SpinLockAcquire(&src->mutex);
     720          20 :         copy_effective_xmin = src->effective_xmin;
     721          20 :         copy_effective_catalog_xmin = src->effective_catalog_xmin;
     722             : 
     723          20 :         copy_xmin = src->data.xmin;
     724          20 :         copy_catalog_xmin = src->data.catalog_xmin;
     725          20 :         copy_restart_lsn = src->data.restart_lsn;
     726             : 
     727             :         /* for existence check */
     728          20 :         copy_name = pstrdup(NameStr(src->data.name));
     729          20 :         copy_islogical = SlotIsLogical(src);
     730          20 :         SpinLockRelease(&src->mutex);
     731             : 
     732             :         /*
     733             :          * Check if the source slot still exists and is valid. We regard it as
     734             :          * invalid if the type of replication slot or name has been changed,
     735             :          * or the restart_lsn either is invalid or has gone backward. (The
     736             :          * restart_lsn could go backwards if the source slot is dropped and
     737             :          * copied from an older slot during installation.)
     738             :          *
     739             :          * Since erroring out will release and drop the destination slot we
     740             :          * don't need to release it here.
     741             :          */
     742          20 :         if (copy_restart_lsn < src_restart_lsn ||
     743          20 :             src_islogical != copy_islogical ||
     744          20 :             strcmp(copy_name, NameStr(*src_name)) != 0)
     745           0 :             ereport(ERROR,
     746             :                     (errmsg("could not copy replication slot \"%s\"",
     747             :                             NameStr(*src_name)),
     748             :                      errdetail("The source replication slot was modified incompatibly during the copy operation.")));
     749             : 
     750             :         /* Install copied values again */
     751          20 :         SpinLockAcquire(&MyReplicationSlot->mutex);
     752          20 :         MyReplicationSlot->effective_xmin = copy_effective_xmin;
     753          20 :         MyReplicationSlot->effective_catalog_xmin = copy_effective_catalog_xmin;
     754             : 
     755          20 :         MyReplicationSlot->data.xmin = copy_xmin;
     756          20 :         MyReplicationSlot->data.catalog_xmin = copy_catalog_xmin;
     757          20 :         MyReplicationSlot->data.restart_lsn = copy_restart_lsn;
     758          20 :         SpinLockRelease(&MyReplicationSlot->mutex);
     759             : 
     760          20 :         ReplicationSlotMarkDirty();
     761          20 :         ReplicationSlotsComputeRequiredXmin(false);
     762          20 :         ReplicationSlotsComputeRequiredLSN();
     763          20 :         ReplicationSlotSave();
     764             : 
     765             : #ifdef USE_ASSERT_CHECKING
     766             :         /* Check that the restart_lsn is available */
     767             :         {
     768             :             XLogSegNo   segno;
     769             : 
     770             :             XLByteToSeg(copy_restart_lsn, segno, wal_segment_size);
     771             :             Assert(XLogGetLastRemovedSegno() < segno);
     772             :         }
     773             : #endif
     774             :     }
     775             : 
     776             :     /* target slot fully created, mark as persistent if needed */
     777          20 :     if (logical_slot && !temporary)
     778           6 :         ReplicationSlotPersist();
     779             : 
     780             :     /* All done.  Set up the return values */
     781          20 :     values[0] = NameGetDatum(dst_name);
     782          20 :     nulls[0] = false;
     783          20 :     if (!XLogRecPtrIsInvalid(MyReplicationSlot->data.confirmed_flush))
     784             :     {
     785          12 :         values[1] = LSNGetDatum(MyReplicationSlot->data.confirmed_flush);
     786          12 :         nulls[1] = false;
     787             :     }
     788             :     else
     789           8 :         nulls[1] = true;
     790             : 
     791          20 :     tuple = heap_form_tuple(tupdesc, values, nulls);
     792          20 :     result = HeapTupleGetDatum(tuple);
     793             : 
     794          20 :     ReplicationSlotRelease();
     795             : 
     796          20 :     PG_RETURN_DATUM(result);
     797             : }
     798             : 
     799             : /* The wrappers below are all to appease opr_sanity */
     800             : Datum
     801           8 : pg_copy_logical_replication_slot_a(PG_FUNCTION_ARGS)
     802             : {
     803           8 :     return copy_replication_slot(fcinfo, true);
     804             : }
     805             : 
     806             : Datum
     807           0 : pg_copy_logical_replication_slot_b(PG_FUNCTION_ARGS)
     808             : {
     809           0 :     return copy_replication_slot(fcinfo, true);
     810             : }
     811             : 
     812             : Datum
     813           8 : pg_copy_logical_replication_slot_c(PG_FUNCTION_ARGS)
     814             : {
     815           8 :     return copy_replication_slot(fcinfo, true);
     816             : }
     817             : 
     818             : Datum
     819           4 : pg_copy_physical_replication_slot_a(PG_FUNCTION_ARGS)
     820             : {
     821           4 :     return copy_replication_slot(fcinfo, false);
     822             : }
     823             : 
     824             : Datum
     825           8 : pg_copy_physical_replication_slot_b(PG_FUNCTION_ARGS)
     826             : {
     827           8 :     return copy_replication_slot(fcinfo, false);
     828             : }

Generated by: LCOV version 1.13