LCOV - code coverage report
Current view: top level - src/backend/replication/logical - origin.c (source / functions) Hit Total Coverage
Test: PostgreSQL 17devel Lines: 391 455 85.9 %
Date: 2024-05-08 14:10:47 Functions: 28 31 90.3 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * origin.c
       4             :  *    Logical replication progress tracking support.
       5             :  *
       6             :  * Copyright (c) 2013-2024, PostgreSQL Global Development Group
       7             :  *
       8             :  * IDENTIFICATION
       9             :  *    src/backend/replication/logical/origin.c
      10             :  *
      11             :  * NOTES
      12             :  *
      13             :  * This file provides the following:
      14             :  * * An infrastructure to name nodes in a replication setup
      15             :  * * A facility to efficiently store and persist replication progress in an
      16             :  *   efficient and durable manner.
      17             :  *
      18             :  * Replication origin consist out of a descriptive, user defined, external
      19             :  * name and a short, thus space efficient, internal 2 byte one. This split
      20             :  * exists because replication origin have to be stored in WAL and shared
      21             :  * memory and long descriptors would be inefficient.  For now only use 2 bytes
      22             :  * for the internal id of a replication origin as it seems unlikely that there
      23             :  * soon will be more than 65k nodes in one replication setup; and using only
      24             :  * two bytes allow us to be more space efficient.
      25             :  *
      26             :  * Replication progress is tracked in a shared memory table
      27             :  * (ReplicationState) that's dumped to disk every checkpoint. Entries
      28             :  * ('slots') in this table are identified by the internal id. That's the case
      29             :  * because it allows to increase replication progress during crash
      30             :  * recovery. To allow doing so we store the original LSN (from the originating
      31             :  * system) of a transaction in the commit record. That allows to recover the
      32             :  * precise replayed state after crash recovery; without requiring synchronous
      33             :  * commits. Allowing logical replication to use asynchronous commit is
      34             :  * generally good for performance, but especially important as it allows a
      35             :  * single threaded replay process to keep up with a source that has multiple
      36             :  * backends generating changes concurrently.  For efficiency and simplicity
      37             :  * reasons a backend can setup one replication origin that's from then used as
      38             :  * the source of changes produced by the backend, until reset again.
      39             :  *
      40             :  * This infrastructure is intended to be used in cooperation with logical
      41             :  * decoding. When replaying from a remote system the configured origin is
      42             :  * provided to output plugins, allowing prevention of replication loops and
      43             :  * other filtering.
      44             :  *
      45             :  * There are several levels of locking at work:
      46             :  *
      47             :  * * To create and drop replication origins an exclusive lock on
      48             :  *   pg_replication_slot is required for the duration. That allows us to
      49             :  *   safely and conflict free assign new origins using a dirty snapshot.
      50             :  *
      51             :  * * When creating an in-memory replication progress slot the ReplicationOrigin
      52             :  *   LWLock has to be held exclusively; when iterating over the replication
      53             :  *   progress a shared lock has to be held, the same when advancing the
      54             :  *   replication progress of an individual backend that has not setup as the
      55             :  *   session's replication origin.
      56             :  *
      57             :  * * When manipulating or looking at the remote_lsn and local_lsn fields of a
      58             :  *   replication progress slot that slot's lwlock has to be held. That's
      59             :  *   primarily because we do not assume 8 byte writes (the LSN) is atomic on
      60             :  *   all our platforms, but it also simplifies memory ordering concerns
      61             :  *   between the remote and local lsn. We use a lwlock instead of a spinlock
      62             :  *   so it's less harmful to hold the lock over a WAL write
      63             :  *   (cf. AdvanceReplicationProgress).
      64             :  *
      65             :  * ---------------------------------------------------------------------------
      66             :  */
      67             : 
      68             : #include "postgres.h"
      69             : 
      70             : #include <unistd.h>
      71             : #include <sys/stat.h>
      72             : 
      73             : #include "access/genam.h"
      74             : #include "access/htup_details.h"
      75             : #include "access/table.h"
      76             : #include "access/xact.h"
      77             : #include "access/xloginsert.h"
      78             : #include "catalog/catalog.h"
      79             : #include "catalog/indexing.h"
      80             : #include "catalog/pg_subscription.h"
      81             : #include "funcapi.h"
      82             : #include "miscadmin.h"
      83             : #include "nodes/execnodes.h"
      84             : #include "pgstat.h"
      85             : #include "replication/origin.h"
      86             : #include "replication/slot.h"
      87             : #include "storage/condition_variable.h"
      88             : #include "storage/fd.h"
      89             : #include "storage/ipc.h"
      90             : #include "storage/lmgr.h"
      91             : #include "utils/builtins.h"
      92             : #include "utils/fmgroids.h"
      93             : #include "utils/pg_lsn.h"
      94             : #include "utils/rel.h"
      95             : #include "utils/snapmgr.h"
      96             : #include "utils/syscache.h"
      97             : 
      98             : /*
      99             :  * Replay progress of a single remote node.
     100             :  */
     101             : typedef struct ReplicationState
     102             : {
     103             :     /*
     104             :      * Local identifier for the remote node.
     105             :      */
     106             :     RepOriginId roident;
     107             : 
     108             :     /*
     109             :      * Location of the latest commit from the remote side.
     110             :      */
     111             :     XLogRecPtr  remote_lsn;
     112             : 
     113             :     /*
     114             :      * Remember the local lsn of the commit record so we can XLogFlush() to it
     115             :      * during a checkpoint so we know the commit record actually is safe on
     116             :      * disk.
     117             :      */
     118             :     XLogRecPtr  local_lsn;
     119             : 
     120             :     /*
     121             :      * PID of backend that's acquired slot, or 0 if none.
     122             :      */
     123             :     int         acquired_by;
     124             : 
     125             :     /*
     126             :      * Condition variable that's signaled when acquired_by changes.
     127             :      */
     128             :     ConditionVariable origin_cv;
     129             : 
     130             :     /*
     131             :      * Lock protecting remote_lsn and local_lsn.
     132             :      */
     133             :     LWLock      lock;
     134             : } ReplicationState;
     135             : 
     136             : /*
     137             :  * On disk version of ReplicationState.
     138             :  */
     139             : typedef struct ReplicationStateOnDisk
     140             : {
     141             :     RepOriginId roident;
     142             :     XLogRecPtr  remote_lsn;
     143             : } ReplicationStateOnDisk;
     144             : 
     145             : 
     146             : typedef struct ReplicationStateCtl
     147             : {
     148             :     /* Tranche to use for per-origin LWLocks */
     149             :     int         tranche_id;
     150             :     /* Array of length max_replication_slots */
     151             :     ReplicationState states[FLEXIBLE_ARRAY_MEMBER];
     152             : } ReplicationStateCtl;
     153             : 
     154             : /* external variables */
     155             : RepOriginId replorigin_session_origin = InvalidRepOriginId; /* assumed identity */
     156             : XLogRecPtr  replorigin_session_origin_lsn = InvalidXLogRecPtr;
     157             : TimestampTz replorigin_session_origin_timestamp = 0;
     158             : 
     159             : /*
     160             :  * Base address into a shared memory array of replication states of size
     161             :  * max_replication_slots.
     162             :  *
     163             :  * XXX: Should we use a separate variable to size this rather than
     164             :  * max_replication_slots?
     165             :  */
     166             : static ReplicationState *replication_states;
     167             : 
     168             : /*
     169             :  * Actual shared memory block (replication_states[] is now part of this).
     170             :  */
     171             : static ReplicationStateCtl *replication_states_ctl;
     172             : 
     173             : /*
     174             :  * We keep a pointer to this backend's ReplicationState to avoid having to
     175             :  * search the replication_states array in replorigin_session_advance for each
     176             :  * remote commit.  (Ownership of a backend's own entry can only be changed by
     177             :  * that backend.)
     178             :  */
     179             : static ReplicationState *session_replication_state = NULL;
     180             : 
     181             : /* Magic for on disk files. */
     182             : #define REPLICATION_STATE_MAGIC ((uint32) 0x1257DADE)
     183             : 
     184             : static void
     185          84 : replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
     186             : {
     187          84 :     if (check_slots && max_replication_slots == 0)
     188           0 :         ereport(ERROR,
     189             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     190             :                  errmsg("cannot query or manipulate replication origin when max_replication_slots = 0")));
     191             : 
     192          84 :     if (!recoveryOK && RecoveryInProgress())
     193           0 :         ereport(ERROR,
     194             :                 (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
     195             :                  errmsg("cannot manipulate replication origins during recovery")));
     196          84 : }
     197             : 
     198             : 
     199             : /*
     200             :  * IsReservedOriginName
     201             :  *      True iff name is either "none" or "any".
     202             :  */
     203             : static bool
     204          16 : IsReservedOriginName(const char *name)
     205             : {
     206          30 :     return ((pg_strcasecmp(name, LOGICALREP_ORIGIN_NONE) == 0) ||
     207          14 :             (pg_strcasecmp(name, LOGICALREP_ORIGIN_ANY) == 0));
     208             : }
     209             : 
     210             : /* ---------------------------------------------------------------------------
     211             :  * Functions for working with replication origins themselves.
     212             :  * ---------------------------------------------------------------------------
     213             :  */
     214             : 
     215             : /*
     216             :  * Check for a persistent replication origin identified by name.
     217             :  *
     218             :  * Returns InvalidOid if the node isn't known yet and missing_ok is true.
     219             :  */
     220             : RepOriginId
     221        1588 : replorigin_by_name(const char *roname, bool missing_ok)
     222             : {
     223             :     Form_pg_replication_origin ident;
     224        1588 :     Oid         roident = InvalidOid;
     225             :     HeapTuple   tuple;
     226             :     Datum       roname_d;
     227             : 
     228        1588 :     roname_d = CStringGetTextDatum(roname);
     229             : 
     230        1588 :     tuple = SearchSysCache1(REPLORIGNAME, roname_d);
     231        1588 :     if (HeapTupleIsValid(tuple))
     232             :     {
     233         922 :         ident = (Form_pg_replication_origin) GETSTRUCT(tuple);
     234         922 :         roident = ident->roident;
     235         922 :         ReleaseSysCache(tuple);
     236             :     }
     237         666 :     else if (!missing_ok)
     238           8 :         ereport(ERROR,
     239             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
     240             :                  errmsg("replication origin \"%s\" does not exist",
     241             :                         roname)));
     242             : 
     243        1580 :     return roident;
     244             : }
     245             : 
     246             : /*
     247             :  * Create a replication origin.
     248             :  *
     249             :  * Needs to be called in a transaction.
     250             :  */
     251             : RepOriginId
     252         634 : replorigin_create(const char *roname)
     253             : {
     254             :     Oid         roident;
     255         634 :     HeapTuple   tuple = NULL;
     256             :     Relation    rel;
     257             :     Datum       roname_d;
     258             :     SnapshotData SnapshotDirty;
     259             :     SysScanDesc scan;
     260             :     ScanKeyData key;
     261             : 
     262         634 :     roname_d = CStringGetTextDatum(roname);
     263             : 
     264             :     Assert(IsTransactionState());
     265             : 
     266             :     /*
     267             :      * We need the numeric replication origin to be 16bit wide, so we cannot
     268             :      * rely on the normal oid allocation. Instead we simply scan
     269             :      * pg_replication_origin for the first unused id. That's not particularly
     270             :      * efficient, but this should be a fairly infrequent operation - we can
     271             :      * easily spend a bit more code on this when it turns out it needs to be
     272             :      * faster.
     273             :      *
     274             :      * We handle concurrency by taking an exclusive lock (allowing reads!)
     275             :      * over the table for the duration of the search. Because we use a "dirty
     276             :      * snapshot" we can read rows that other in-progress sessions have
     277             :      * written, even though they would be invisible with normal snapshots. Due
     278             :      * to the exclusive lock there's no danger that new rows can appear while
     279             :      * we're checking.
     280             :      */
     281         634 :     InitDirtySnapshot(SnapshotDirty);
     282             : 
     283         634 :     rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
     284             : 
     285        1096 :     for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
     286             :     {
     287             :         bool        nulls[Natts_pg_replication_origin];
     288             :         Datum       values[Natts_pg_replication_origin];
     289             :         bool        collides;
     290             : 
     291        1096 :         CHECK_FOR_INTERRUPTS();
     292             : 
     293        1096 :         ScanKeyInit(&key,
     294             :                     Anum_pg_replication_origin_roident,
     295             :                     BTEqualStrategyNumber, F_OIDEQ,
     296             :                     ObjectIdGetDatum(roident));
     297             : 
     298        1096 :         scan = systable_beginscan(rel, ReplicationOriginIdentIndex,
     299             :                                   true /* indexOK */ ,
     300             :                                   &SnapshotDirty,
     301             :                                   1, &key);
     302             : 
     303        1096 :         collides = HeapTupleIsValid(systable_getnext(scan));
     304             : 
     305        1096 :         systable_endscan(scan);
     306             : 
     307        1096 :         if (!collides)
     308             :         {
     309             :             /*
     310             :              * Ok, found an unused roident, insert the new row and do a CCI,
     311             :              * so our callers can look it up if they want to.
     312             :              */
     313         634 :             memset(&nulls, 0, sizeof(nulls));
     314             : 
     315         634 :             values[Anum_pg_replication_origin_roident - 1] = ObjectIdGetDatum(roident);
     316         634 :             values[Anum_pg_replication_origin_roname - 1] = roname_d;
     317             : 
     318         634 :             tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
     319         634 :             CatalogTupleInsert(rel, tuple);
     320         632 :             CommandCounterIncrement();
     321         632 :             break;
     322             :         }
     323             :     }
     324             : 
     325             :     /* now release lock again,  */
     326         632 :     table_close(rel, ExclusiveLock);
     327             : 
     328         632 :     if (tuple == NULL)
     329           0 :         ereport(ERROR,
     330             :                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
     331             :                  errmsg("could not find free replication origin ID")));
     332             : 
     333         632 :     heap_freetuple(tuple);
     334         632 :     return roident;
     335             : }
     336             : 
     337             : /*
     338             :  * Helper function to drop a replication origin.
     339             :  */
     340             : static void
     341         502 : replorigin_state_clear(RepOriginId roident, bool nowait)
     342             : {
     343             :     int         i;
     344             : 
     345             :     /*
     346             :      * Clean up the slot state info, if there is any matching slot.
     347             :      */
     348         502 : restart:
     349         502 :     LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
     350             : 
     351        1590 :     for (i = 0; i < max_replication_slots; i++)
     352             :     {
     353        1518 :         ReplicationState *state = &replication_states[i];
     354             : 
     355        1518 :         if (state->roident == roident)
     356             :         {
     357             :             /* found our slot, is it busy? */
     358         430 :             if (state->acquired_by != 0)
     359             :             {
     360             :                 ConditionVariable *cv;
     361             : 
     362           0 :                 if (nowait)
     363           0 :                     ereport(ERROR,
     364             :                             (errcode(ERRCODE_OBJECT_IN_USE),
     365             :                              errmsg("could not drop replication origin with ID %d, in use by PID %d",
     366             :                                     state->roident,
     367             :                                     state->acquired_by)));
     368             : 
     369             :                 /*
     370             :                  * We must wait and then retry.  Since we don't know which CV
     371             :                  * to wait on until here, we can't readily use
     372             :                  * ConditionVariablePrepareToSleep (calling it here would be
     373             :                  * wrong, since we could miss the signal if we did so); just
     374             :                  * use ConditionVariableSleep directly.
     375             :                  */
     376           0 :                 cv = &state->origin_cv;
     377             : 
     378           0 :                 LWLockRelease(ReplicationOriginLock);
     379             : 
     380           0 :                 ConditionVariableSleep(cv, WAIT_EVENT_REPLICATION_ORIGIN_DROP);
     381           0 :                 goto restart;
     382             :             }
     383             : 
     384             :             /* first make a WAL log entry */
     385             :             {
     386             :                 xl_replorigin_drop xlrec;
     387             : 
     388         430 :                 xlrec.node_id = roident;
     389         430 :                 XLogBeginInsert();
     390         430 :                 XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
     391         430 :                 XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_DROP);
     392             :             }
     393             : 
     394             :             /* then clear the in-memory slot */
     395         430 :             state->roident = InvalidRepOriginId;
     396         430 :             state->remote_lsn = InvalidXLogRecPtr;
     397         430 :             state->local_lsn = InvalidXLogRecPtr;
     398         430 :             break;
     399             :         }
     400             :     }
     401         502 :     LWLockRelease(ReplicationOriginLock);
     402         502 :     ConditionVariableCancelSleep();
     403         502 : }
     404             : 
     405             : /*
     406             :  * Drop replication origin (by name).
     407             :  *
     408             :  * Needs to be called in a transaction.
     409             :  */
     410             : void
     411         824 : replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
     412             : {
     413             :     RepOriginId roident;
     414             :     Relation    rel;
     415             :     HeapTuple   tuple;
     416             : 
     417             :     Assert(IsTransactionState());
     418             : 
     419         824 :     rel = table_open(ReplicationOriginRelationId, RowExclusiveLock);
     420             : 
     421         824 :     roident = replorigin_by_name(name, missing_ok);
     422             : 
     423             :     /* Lock the origin to prevent concurrent drops. */
     424         822 :     LockSharedObject(ReplicationOriginRelationId, roident, 0,
     425             :                      AccessExclusiveLock);
     426             : 
     427         822 :     tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident));
     428         822 :     if (!HeapTupleIsValid(tuple))
     429             :     {
     430         320 :         if (!missing_ok)
     431           0 :             elog(ERROR, "cache lookup failed for replication origin with ID %d",
     432             :                  roident);
     433             : 
     434             :         /*
     435             :          * We don't need to retain the locks if the origin is already dropped.
     436             :          */
     437         320 :         UnlockSharedObject(ReplicationOriginRelationId, roident, 0,
     438             :                            AccessExclusiveLock);
     439         320 :         table_close(rel, RowExclusiveLock);
     440         320 :         return;
     441             :     }
     442             : 
     443         502 :     replorigin_state_clear(roident, nowait);
     444             : 
     445             :     /*
     446             :      * Now, we can delete the catalog entry.
     447             :      */
     448         502 :     CatalogTupleDelete(rel, &tuple->t_self);
     449         502 :     ReleaseSysCache(tuple);
     450             : 
     451         502 :     CommandCounterIncrement();
     452             : 
     453             :     /* We keep the lock on pg_replication_origin until commit */
     454         502 :     table_close(rel, NoLock);
     455             : }
     456             : 
     457             : /*
     458             :  * Lookup replication origin via its oid and return the name.
     459             :  *
     460             :  * The external name is palloc'd in the calling context.
     461             :  *
     462             :  * Returns true if the origin is known, false otherwise.
     463             :  */
     464             : bool
     465          28 : replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
     466             : {
     467             :     HeapTuple   tuple;
     468             :     Form_pg_replication_origin ric;
     469             : 
     470             :     Assert(OidIsValid((Oid) roident));
     471             :     Assert(roident != InvalidRepOriginId);
     472             :     Assert(roident != DoNotReplicateId);
     473             : 
     474          28 :     tuple = SearchSysCache1(REPLORIGIDENT,
     475             :                             ObjectIdGetDatum((Oid) roident));
     476             : 
     477          28 :     if (HeapTupleIsValid(tuple))
     478             :     {
     479          28 :         ric = (Form_pg_replication_origin) GETSTRUCT(tuple);
     480          28 :         *roname = text_to_cstring(&ric->roname);
     481          28 :         ReleaseSysCache(tuple);
     482             : 
     483          28 :         return true;
     484             :     }
     485             :     else
     486             :     {
     487           0 :         *roname = NULL;
     488             : 
     489           0 :         if (!missing_ok)
     490           0 :             ereport(ERROR,
     491             :                     (errcode(ERRCODE_UNDEFINED_OBJECT),
     492             :                      errmsg("replication origin with ID %d does not exist",
     493             :                             roident)));
     494             : 
     495           0 :         return false;
     496             :     }
     497             : }
     498             : 
     499             : 
     500             : /* ---------------------------------------------------------------------------
     501             :  * Functions for handling replication progress.
     502             :  * ---------------------------------------------------------------------------
     503             :  */
     504             : 
     505             : Size
     506        6830 : ReplicationOriginShmemSize(void)
     507             : {
     508        6830 :     Size        size = 0;
     509             : 
     510             :     /*
     511             :      * XXX: max_replication_slots is arguably the wrong thing to use, as here
     512             :      * we keep the replay state of *remote* transactions. But for now it seems
     513             :      * sufficient to reuse it, rather than introduce a separate GUC.
     514             :      */
     515        6830 :     if (max_replication_slots == 0)
     516           4 :         return size;
     517             : 
     518        6826 :     size = add_size(size, offsetof(ReplicationStateCtl, states));
     519             : 
     520        6826 :     size = add_size(size,
     521             :                     mul_size(max_replication_slots, sizeof(ReplicationState)));
     522        6826 :     return size;
     523             : }
     524             : 
     525             : void
     526        1768 : ReplicationOriginShmemInit(void)
     527             : {
     528             :     bool        found;
     529             : 
     530        1768 :     if (max_replication_slots == 0)
     531           2 :         return;
     532             : 
     533        1766 :     replication_states_ctl = (ReplicationStateCtl *)
     534        1766 :         ShmemInitStruct("ReplicationOriginState",
     535             :                         ReplicationOriginShmemSize(),
     536             :                         &found);
     537        1766 :     replication_states = replication_states_ctl->states;
     538             : 
     539        1766 :     if (!found)
     540             :     {
     541             :         int         i;
     542             : 
     543      124520 :         MemSet(replication_states_ctl, 0, ReplicationOriginShmemSize());
     544             : 
     545        1766 :         replication_states_ctl->tranche_id = LWTRANCHE_REPLICATION_ORIGIN_STATE;
     546             : 
     547       19050 :         for (i = 0; i < max_replication_slots; i++)
     548             :         {
     549       17284 :             LWLockInitialize(&replication_states[i].lock,
     550       17284 :                              replication_states_ctl->tranche_id);
     551       17284 :             ConditionVariableInit(&replication_states[i].origin_cv);
     552             :         }
     553             :     }
     554             : }
     555             : 
     556             : /* ---------------------------------------------------------------------------
     557             :  * Perform a checkpoint of each replication origin's progress with respect to
     558             :  * the replayed remote_lsn. Make sure that all transactions we refer to in the
     559             :  * checkpoint (local_lsn) are actually on-disk. This might not yet be the case
     560             :  * if the transactions were originally committed asynchronously.
     561             :  *
     562             :  * We store checkpoints in the following format:
     563             :  * +-------+------------------------+------------------+-----+--------+
     564             :  * | MAGIC | ReplicationStateOnDisk | struct Replic... | ... | CRC32C | EOF
     565             :  * +-------+------------------------+------------------+-----+--------+
     566             :  *
     567             :  * So its just the magic, followed by the statically sized
     568             :  * ReplicationStateOnDisk structs. Note that the maximum number of
     569             :  * ReplicationState is determined by max_replication_slots.
     570             :  * ---------------------------------------------------------------------------
     571             :  */
     572             : void
     573        1704 : CheckPointReplicationOrigin(void)
     574             : {
     575        1704 :     const char *tmppath = "pg_logical/replorigin_checkpoint.tmp";
     576        1704 :     const char *path = "pg_logical/replorigin_checkpoint";
     577             :     int         tmpfd;
     578             :     int         i;
     579        1704 :     uint32      magic = REPLICATION_STATE_MAGIC;
     580             :     pg_crc32c   crc;
     581             : 
     582        1704 :     if (max_replication_slots == 0)
     583           2 :         return;
     584             : 
     585        1702 :     INIT_CRC32C(crc);
     586             : 
     587             :     /* make sure no old temp file is remaining */
     588        1702 :     if (unlink(tmppath) < 0 && errno != ENOENT)
     589           0 :         ereport(PANIC,
     590             :                 (errcode_for_file_access(),
     591             :                  errmsg("could not remove file \"%s\": %m",
     592             :                         tmppath)));
     593             : 
     594             :     /*
     595             :      * no other backend can perform this at the same time; only one checkpoint
     596             :      * can happen at a time.
     597             :      */
     598        1702 :     tmpfd = OpenTransientFile(tmppath,
     599             :                               O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
     600        1702 :     if (tmpfd < 0)
     601           0 :         ereport(PANIC,
     602             :                 (errcode_for_file_access(),
     603             :                  errmsg("could not create file \"%s\": %m",
     604             :                         tmppath)));
     605             : 
     606             :     /* write magic */
     607        1702 :     errno = 0;
     608        1702 :     if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
     609             :     {
     610             :         /* if write didn't set errno, assume problem is no disk space */
     611           0 :         if (errno == 0)
     612           0 :             errno = ENOSPC;
     613           0 :         ereport(PANIC,
     614             :                 (errcode_for_file_access(),
     615             :                  errmsg("could not write to file \"%s\": %m",
     616             :                         tmppath)));
     617             :     }
     618        1702 :     COMP_CRC32C(crc, &magic, sizeof(magic));
     619             : 
     620             :     /* prevent concurrent creations/drops */
     621        1702 :     LWLockAcquire(ReplicationOriginLock, LW_SHARED);
     622             : 
     623             :     /* write actual data */
     624       18244 :     for (i = 0; i < max_replication_slots; i++)
     625             :     {
     626             :         ReplicationStateOnDisk disk_state;
     627       16542 :         ReplicationState *curstate = &replication_states[i];
     628             :         XLogRecPtr  local_lsn;
     629             : 
     630       16542 :         if (curstate->roident == InvalidRepOriginId)
     631       16462 :             continue;
     632             : 
     633             :         /* zero, to avoid uninitialized padding bytes */
     634          80 :         memset(&disk_state, 0, sizeof(disk_state));
     635             : 
     636          80 :         LWLockAcquire(&curstate->lock, LW_SHARED);
     637             : 
     638          80 :         disk_state.roident = curstate->roident;
     639             : 
     640          80 :         disk_state.remote_lsn = curstate->remote_lsn;
     641          80 :         local_lsn = curstate->local_lsn;
     642             : 
     643          80 :         LWLockRelease(&curstate->lock);
     644             : 
     645             :         /* make sure we only write out a commit that's persistent */
     646          80 :         XLogFlush(local_lsn);
     647             : 
     648          80 :         errno = 0;
     649          80 :         if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
     650             :             sizeof(disk_state))
     651             :         {
     652             :             /* if write didn't set errno, assume problem is no disk space */
     653           0 :             if (errno == 0)
     654           0 :                 errno = ENOSPC;
     655           0 :             ereport(PANIC,
     656             :                     (errcode_for_file_access(),
     657             :                      errmsg("could not write to file \"%s\": %m",
     658             :                             tmppath)));
     659             :         }
     660             : 
     661          80 :         COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
     662             :     }
     663             : 
     664        1702 :     LWLockRelease(ReplicationOriginLock);
     665             : 
     666             :     /* write out the CRC */
     667        1702 :     FIN_CRC32C(crc);
     668        1702 :     errno = 0;
     669        1702 :     if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
     670             :     {
     671             :         /* if write didn't set errno, assume problem is no disk space */
     672           0 :         if (errno == 0)
     673           0 :             errno = ENOSPC;
     674           0 :         ereport(PANIC,
     675             :                 (errcode_for_file_access(),
     676             :                  errmsg("could not write to file \"%s\": %m",
     677             :                         tmppath)));
     678             :     }
     679             : 
     680        1702 :     if (CloseTransientFile(tmpfd) != 0)
     681           0 :         ereport(PANIC,
     682             :                 (errcode_for_file_access(),
     683             :                  errmsg("could not close file \"%s\": %m",
     684             :                         tmppath)));
     685             : 
     686             :     /* fsync, rename to permanent file, fsync file and directory */
     687        1702 :     durable_rename(tmppath, path, PANIC);
     688             : }
     689             : 
     690             : /*
     691             :  * Recover replication replay status from checkpoint data saved earlier by
     692             :  * CheckPointReplicationOrigin.
     693             :  *
     694             :  * This only needs to be called at startup and *not* during every checkpoint
     695             :  * read during recovery (e.g. in HS or PITR from a base backup) afterwards. All
     696             :  * state thereafter can be recovered by looking at commit records.
     697             :  */
     698             : void
     699        1520 : StartupReplicationOrigin(void)
     700             : {
     701        1520 :     const char *path = "pg_logical/replorigin_checkpoint";
     702             :     int         fd;
     703             :     int         readBytes;
     704        1520 :     uint32      magic = REPLICATION_STATE_MAGIC;
     705        1520 :     int         last_state = 0;
     706             :     pg_crc32c   file_crc;
     707             :     pg_crc32c   crc;
     708             : 
     709             :     /* don't want to overwrite already existing state */
     710             : #ifdef USE_ASSERT_CHECKING
     711             :     static bool already_started = false;
     712             : 
     713             :     Assert(!already_started);
     714             :     already_started = true;
     715             : #endif
     716             : 
     717        1520 :     if (max_replication_slots == 0)
     718          82 :         return;
     719             : 
     720        1518 :     INIT_CRC32C(crc);
     721             : 
     722        1518 :     elog(DEBUG2, "starting up replication origin progress state");
     723             : 
     724        1518 :     fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
     725             : 
     726             :     /*
     727             :      * might have had max_replication_slots == 0 last run, or we just brought
     728             :      * up a standby.
     729             :      */
     730        1518 :     if (fd < 0 && errno == ENOENT)
     731          80 :         return;
     732        1438 :     else if (fd < 0)
     733           0 :         ereport(PANIC,
     734             :                 (errcode_for_file_access(),
     735             :                  errmsg("could not open file \"%s\": %m",
     736             :                         path)));
     737             : 
     738             :     /* verify magic, that is written even if nothing was active */
     739        1438 :     readBytes = read(fd, &magic, sizeof(magic));
     740        1438 :     if (readBytes != sizeof(magic))
     741             :     {
     742           0 :         if (readBytes < 0)
     743           0 :             ereport(PANIC,
     744             :                     (errcode_for_file_access(),
     745             :                      errmsg("could not read file \"%s\": %m",
     746             :                             path)));
     747             :         else
     748           0 :             ereport(PANIC,
     749             :                     (errcode(ERRCODE_DATA_CORRUPTED),
     750             :                      errmsg("could not read file \"%s\": read %d of %zu",
     751             :                             path, readBytes, sizeof(magic))));
     752             :     }
     753        1438 :     COMP_CRC32C(crc, &magic, sizeof(magic));
     754             : 
     755        1438 :     if (magic != REPLICATION_STATE_MAGIC)
     756           0 :         ereport(PANIC,
     757             :                 (errmsg("replication checkpoint has wrong magic %u instead of %u",
     758             :                         magic, REPLICATION_STATE_MAGIC)));
     759             : 
     760             :     /* we can skip locking here, no other access is possible */
     761             : 
     762             :     /* recover individual states, until there are no more to be found */
     763             :     while (true)
     764          26 :     {
     765             :         ReplicationStateOnDisk disk_state;
     766             : 
     767        1464 :         readBytes = read(fd, &disk_state, sizeof(disk_state));
     768             : 
     769             :         /* no further data */
     770        1464 :         if (readBytes == sizeof(crc))
     771             :         {
     772             :             /* not pretty, but simple ... */
     773        1438 :             file_crc = *(pg_crc32c *) &disk_state;
     774        1438 :             break;
     775             :         }
     776             : 
     777          26 :         if (readBytes < 0)
     778             :         {
     779           0 :             ereport(PANIC,
     780             :                     (errcode_for_file_access(),
     781             :                      errmsg("could not read file \"%s\": %m",
     782             :                             path)));
     783             :         }
     784             : 
     785          26 :         if (readBytes != sizeof(disk_state))
     786             :         {
     787           0 :             ereport(PANIC,
     788             :                     (errcode_for_file_access(),
     789             :                      errmsg("could not read file \"%s\": read %d of %zu",
     790             :                             path, readBytes, sizeof(disk_state))));
     791             :         }
     792             : 
     793          26 :         COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
     794             : 
     795          26 :         if (last_state == max_replication_slots)
     796           0 :             ereport(PANIC,
     797             :                     (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     798             :                      errmsg("could not find free replication state, increase max_replication_slots")));
     799             : 
     800             :         /* copy data to shared memory */
     801          26 :         replication_states[last_state].roident = disk_state.roident;
     802          26 :         replication_states[last_state].remote_lsn = disk_state.remote_lsn;
     803          26 :         last_state++;
     804             : 
     805          26 :         ereport(LOG,
     806             :                 (errmsg("recovered replication state of node %d to %X/%X",
     807             :                         disk_state.roident,
     808             :                         LSN_FORMAT_ARGS(disk_state.remote_lsn))));
     809             :     }
     810             : 
     811             :     /* now check checksum */
     812        1438 :     FIN_CRC32C(crc);
     813        1438 :     if (file_crc != crc)
     814           0 :         ereport(PANIC,
     815             :                 (errcode(ERRCODE_DATA_CORRUPTED),
     816             :                  errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
     817             :                         crc, file_crc)));
     818             : 
     819        1438 :     if (CloseTransientFile(fd) != 0)
     820           0 :         ereport(PANIC,
     821             :                 (errcode_for_file_access(),
     822             :                  errmsg("could not close file \"%s\": %m",
     823             :                         path)));
     824             : }
     825             : 
     826             : void
     827          12 : replorigin_redo(XLogReaderState *record)
     828             : {
     829          12 :     uint8       info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
     830             : 
     831          12 :     switch (info)
     832             :     {
     833           6 :         case XLOG_REPLORIGIN_SET:
     834             :             {
     835           6 :                 xl_replorigin_set *xlrec =
     836           6 :                     (xl_replorigin_set *) XLogRecGetData(record);
     837             : 
     838           6 :                 replorigin_advance(xlrec->node_id,
     839             :                                    xlrec->remote_lsn, record->EndRecPtr,
     840           6 :                                    xlrec->force /* backward */ ,
     841             :                                    false /* WAL log */ );
     842           6 :                 break;
     843             :             }
     844           6 :         case XLOG_REPLORIGIN_DROP:
     845             :             {
     846             :                 xl_replorigin_drop *xlrec;
     847             :                 int         i;
     848             : 
     849           6 :                 xlrec = (xl_replorigin_drop *) XLogRecGetData(record);
     850             : 
     851           8 :                 for (i = 0; i < max_replication_slots; i++)
     852             :                 {
     853           8 :                     ReplicationState *state = &replication_states[i];
     854             : 
     855             :                     /* found our slot */
     856           8 :                     if (state->roident == xlrec->node_id)
     857             :                     {
     858             :                         /* reset entry */
     859           6 :                         state->roident = InvalidRepOriginId;
     860           6 :                         state->remote_lsn = InvalidXLogRecPtr;
     861           6 :                         state->local_lsn = InvalidXLogRecPtr;
     862           6 :                         break;
     863             :                     }
     864             :                 }
     865           6 :                 break;
     866             :             }
     867           0 :         default:
     868           0 :             elog(PANIC, "replorigin_redo: unknown op code %u", info);
     869             :     }
     870          12 : }
     871             : 
     872             : 
     873             : /*
     874             :  * Tell the replication origin progress machinery that a commit from 'node'
     875             :  * that originated at the LSN remote_commit on the remote node was replayed
     876             :  * successfully and that we don't need to do so again. In combination with
     877             :  * setting up replorigin_session_origin_lsn and replorigin_session_origin
     878             :  * that ensures we won't lose knowledge about that after a crash if the
     879             :  * transaction had a persistent effect (think of asynchronous commits).
     880             :  *
     881             :  * local_commit needs to be a local LSN of the commit so that we can make sure
     882             :  * upon a checkpoint that enough WAL has been persisted to disk.
     883             :  *
     884             :  * Needs to be called with a RowExclusiveLock on pg_replication_origin,
     885             :  * unless running in recovery.
     886             :  */
     887             : void
     888         434 : replorigin_advance(RepOriginId node,
     889             :                    XLogRecPtr remote_commit, XLogRecPtr local_commit,
     890             :                    bool go_backward, bool wal_log)
     891             : {
     892             :     int         i;
     893         434 :     ReplicationState *replication_state = NULL;
     894         434 :     ReplicationState *free_state = NULL;
     895             : 
     896             :     Assert(node != InvalidRepOriginId);
     897             : 
     898             :     /* we don't track DoNotReplicateId */
     899         434 :     if (node == DoNotReplicateId)
     900           0 :         return;
     901             : 
     902             :     /*
     903             :      * XXX: For the case where this is called by WAL replay, it'd be more
     904             :      * efficient to restore into a backend local hashtable and only dump into
     905             :      * shmem after recovery is finished. Let's wait with implementing that
     906             :      * till it's shown to be a measurable expense
     907             :      */
     908             : 
     909             :     /* Lock exclusively, as we may have to create a new table entry. */
     910         434 :     LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
     911             : 
     912             :     /*
     913             :      * Search for either an existing slot for the origin, or a free one we can
     914             :      * use.
     915             :      */
     916        3836 :     for (i = 0; i < max_replication_slots; i++)
     917             :     {
     918        3498 :         ReplicationState *curstate = &replication_states[i];
     919             : 
     920             :         /* remember where to insert if necessary */
     921        3498 :         if (curstate->roident == InvalidRepOriginId &&
     922             :             free_state == NULL)
     923             :         {
     924         338 :             free_state = curstate;
     925         338 :             continue;
     926             :         }
     927             : 
     928             :         /* not our slot */
     929        3160 :         if (curstate->roident != node)
     930             :         {
     931        3064 :             continue;
     932             :         }
     933             : 
     934             :         /* ok, found slot */
     935          96 :         replication_state = curstate;
     936             : 
     937          96 :         LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE);
     938             : 
     939             :         /* Make sure it's not used by somebody else */
     940          96 :         if (replication_state->acquired_by != 0)
     941             :         {
     942           0 :             ereport(ERROR,
     943             :                     (errcode(ERRCODE_OBJECT_IN_USE),
     944             :                      errmsg("replication origin with ID %d is already active for PID %d",
     945             :                             replication_state->roident,
     946             :                             replication_state->acquired_by)));
     947             :         }
     948             : 
     949          96 :         break;
     950             :     }
     951             : 
     952         434 :     if (replication_state == NULL && free_state == NULL)
     953           0 :         ereport(ERROR,
     954             :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     955             :                  errmsg("could not find free replication state slot for replication origin with ID %d",
     956             :                         node),
     957             :                  errhint("Increase max_replication_slots and try again.")));
     958             : 
     959         434 :     if (replication_state == NULL)
     960             :     {
     961             :         /* initialize new slot */
     962         338 :         LWLockAcquire(&free_state->lock, LW_EXCLUSIVE);
     963         338 :         replication_state = free_state;
     964             :         Assert(replication_state->remote_lsn == InvalidXLogRecPtr);
     965             :         Assert(replication_state->local_lsn == InvalidXLogRecPtr);
     966         338 :         replication_state->roident = node;
     967             :     }
     968             : 
     969             :     Assert(replication_state->roident != InvalidRepOriginId);
     970             : 
     971             :     /*
     972             :      * If somebody "forcefully" sets this slot, WAL log it, so it's durable
     973             :      * and the standby gets the message. Primarily this will be called during
     974             :      * WAL replay (of commit records) where no WAL logging is necessary.
     975             :      */
     976         434 :     if (wal_log)
     977             :     {
     978             :         xl_replorigin_set xlrec;
     979             : 
     980         342 :         xlrec.remote_lsn = remote_commit;
     981         342 :         xlrec.node_id = node;
     982         342 :         xlrec.force = go_backward;
     983             : 
     984         342 :         XLogBeginInsert();
     985         342 :         XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
     986             : 
     987         342 :         XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_SET);
     988             :     }
     989             : 
     990             :     /*
     991             :      * Due to - harmless - race conditions during a checkpoint we could see
     992             :      * values here that are older than the ones we already have in memory. We
     993             :      * could also see older values for prepared transactions when the prepare
     994             :      * is sent at a later point of time along with commit prepared and there
     995             :      * are other transactions commits between prepare and commit prepared. See
     996             :      * ReorderBufferFinishPrepared. Don't overwrite those.
     997             :      */
     998         434 :     if (go_backward || replication_state->remote_lsn < remote_commit)
     999         412 :         replication_state->remote_lsn = remote_commit;
    1000         434 :     if (local_commit != InvalidXLogRecPtr &&
    1001          84 :         (go_backward || replication_state->local_lsn < local_commit))
    1002          90 :         replication_state->local_lsn = local_commit;
    1003         434 :     LWLockRelease(&replication_state->lock);
    1004             : 
    1005             :     /*
    1006             :      * Release *after* changing the LSNs, slot isn't acquired and thus could
    1007             :      * otherwise be dropped anytime.
    1008             :      */
    1009         434 :     LWLockRelease(ReplicationOriginLock);
    1010             : }
    1011             : 
    1012             : 
    1013             : XLogRecPtr
    1014          16 : replorigin_get_progress(RepOriginId node, bool flush)
    1015             : {
    1016             :     int         i;
    1017          16 :     XLogRecPtr  local_lsn = InvalidXLogRecPtr;
    1018          16 :     XLogRecPtr  remote_lsn = InvalidXLogRecPtr;
    1019             : 
    1020             :     /* prevent slots from being concurrently dropped */
    1021          16 :     LWLockAcquire(ReplicationOriginLock, LW_SHARED);
    1022             : 
    1023          76 :     for (i = 0; i < max_replication_slots; i++)
    1024             :     {
    1025             :         ReplicationState *state;
    1026             : 
    1027          70 :         state = &replication_states[i];
    1028             : 
    1029          70 :         if (state->roident == node)
    1030             :         {
    1031          10 :             LWLockAcquire(&state->lock, LW_SHARED);
    1032             : 
    1033          10 :             remote_lsn = state->remote_lsn;
    1034          10 :             local_lsn = state->local_lsn;
    1035             : 
    1036          10 :             LWLockRelease(&state->lock);
    1037             : 
    1038          10 :             break;
    1039             :         }
    1040             :     }
    1041             : 
    1042          16 :     LWLockRelease(ReplicationOriginLock);
    1043             : 
    1044          16 :     if (flush && local_lsn != InvalidXLogRecPtr)
    1045           2 :         XLogFlush(local_lsn);
    1046             : 
    1047          16 :     return remote_lsn;
    1048             : }
    1049             : 
    1050             : /*
    1051             :  * Tear down a (possibly) configured session replication origin during process
    1052             :  * exit.
    1053             :  */
    1054             : static void
    1055         730 : ReplicationOriginExitCleanup(int code, Datum arg)
    1056             : {
    1057         730 :     ConditionVariable *cv = NULL;
    1058             : 
    1059         730 :     if (session_replication_state == NULL)
    1060         322 :         return;
    1061             : 
    1062         408 :     LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
    1063             : 
    1064         408 :     if (session_replication_state->acquired_by == MyProcPid)
    1065             :     {
    1066         388 :         cv = &session_replication_state->origin_cv;
    1067             : 
    1068         388 :         session_replication_state->acquired_by = 0;
    1069         388 :         session_replication_state = NULL;
    1070             :     }
    1071             : 
    1072         408 :     LWLockRelease(ReplicationOriginLock);
    1073             : 
    1074         408 :     if (cv)
    1075         388 :         ConditionVariableBroadcast(cv);
    1076             : }
    1077             : 
    1078             : /*
    1079             :  * Setup a replication origin in the shared memory struct if it doesn't
    1080             :  * already exist and cache access to the specific ReplicationSlot so the
    1081             :  * array doesn't have to be searched when calling
    1082             :  * replorigin_session_advance().
    1083             :  *
    1084             :  * Normally only one such cached origin can exist per process so the cached
    1085             :  * value can only be set again after the previous value is torn down with
    1086             :  * replorigin_session_reset(). For this normal case pass acquired_by = 0
    1087             :  * (meaning the slot is not allowed to be already acquired by another process).
    1088             :  *
    1089             :  * However, sometimes multiple processes can safely re-use the same origin slot
    1090             :  * (for example, multiple parallel apply processes can safely use the same
    1091             :  * origin, provided they maintain commit order by allowing only one process to
    1092             :  * commit at a time). For this case the first process must pass acquired_by =
    1093             :  * 0, and then the other processes sharing that same origin can pass
    1094             :  * acquired_by = PID of the first process.
    1095             :  */
    1096             : void
    1097         736 : replorigin_session_setup(RepOriginId node, int acquired_by)
    1098             : {
    1099             :     static bool registered_cleanup;
    1100             :     int         i;
    1101         736 :     int         free_slot = -1;
    1102             : 
    1103         736 :     if (!registered_cleanup)
    1104             :     {
    1105         730 :         on_shmem_exit(ReplicationOriginExitCleanup, 0);
    1106         730 :         registered_cleanup = true;
    1107             :     }
    1108             : 
    1109             :     Assert(max_replication_slots > 0);
    1110             : 
    1111         736 :     if (session_replication_state != NULL)
    1112           2 :         ereport(ERROR,
    1113             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1114             :                  errmsg("cannot setup replication origin when one is already setup")));
    1115             : 
    1116             :     /* Lock exclusively, as we may have to create a new table entry. */
    1117         734 :     LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
    1118             : 
    1119             :     /*
    1120             :      * Search for either an existing slot for the origin, or a free one we can
    1121             :      * use.
    1122             :      */
    1123        3072 :     for (i = 0; i < max_replication_slots; i++)
    1124             :     {
    1125        2876 :         ReplicationState *curstate = &replication_states[i];
    1126             : 
    1127             :         /* remember where to insert if necessary */
    1128        2876 :         if (curstate->roident == InvalidRepOriginId &&
    1129             :             free_slot == -1)
    1130             :         {
    1131         196 :             free_slot = i;
    1132         196 :             continue;
    1133             :         }
    1134             : 
    1135             :         /* not our slot */
    1136        2680 :         if (curstate->roident != node)
    1137        2142 :             continue;
    1138             : 
    1139         538 :         else if (curstate->acquired_by != 0 && acquired_by == 0)
    1140             :         {
    1141           0 :             ereport(ERROR,
    1142             :                     (errcode(ERRCODE_OBJECT_IN_USE),
    1143             :                      errmsg("replication origin with ID %d is already active for PID %d",
    1144             :                             curstate->roident, curstate->acquired_by)));
    1145             :         }
    1146             : 
    1147             :         /* ok, found slot */
    1148         538 :         session_replication_state = curstate;
    1149         538 :         break;
    1150             :     }
    1151             : 
    1152             : 
    1153         734 :     if (session_replication_state == NULL && free_slot == -1)
    1154           0 :         ereport(ERROR,
    1155             :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
    1156             :                  errmsg("could not find free replication state slot for replication origin with ID %d",
    1157             :                         node),
    1158             :                  errhint("Increase max_replication_slots and try again.")));
    1159         734 :     else if (session_replication_state == NULL)
    1160             :     {
    1161             :         /* initialize new slot */
    1162         196 :         session_replication_state = &replication_states[free_slot];
    1163             :         Assert(session_replication_state->remote_lsn == InvalidXLogRecPtr);
    1164             :         Assert(session_replication_state->local_lsn == InvalidXLogRecPtr);
    1165         196 :         session_replication_state->roident = node;
    1166             :     }
    1167             : 
    1168             : 
    1169             :     Assert(session_replication_state->roident != InvalidRepOriginId);
    1170             : 
    1171         734 :     if (acquired_by == 0)
    1172         714 :         session_replication_state->acquired_by = MyProcPid;
    1173          20 :     else if (session_replication_state->acquired_by != acquired_by)
    1174           0 :         elog(ERROR, "could not find replication state slot for replication origin with OID %u which was acquired by %d",
    1175             :              node, acquired_by);
    1176             : 
    1177         734 :     LWLockRelease(ReplicationOriginLock);
    1178             : 
    1179             :     /* probably this one is pointless */
    1180         734 :     ConditionVariableBroadcast(&session_replication_state->origin_cv);
    1181         734 : }
    1182             : 
    1183             : /*
    1184             :  * Reset replay state previously setup in this session.
    1185             :  *
    1186             :  * This function may only be called if an origin was setup with
    1187             :  * replorigin_session_setup().
    1188             :  */
    1189             : void
    1190         328 : replorigin_session_reset(void)
    1191             : {
    1192             :     ConditionVariable *cv;
    1193             : 
    1194             :     Assert(max_replication_slots != 0);
    1195             : 
    1196         328 :     if (session_replication_state == NULL)
    1197           2 :         ereport(ERROR,
    1198             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1199             :                  errmsg("no replication origin is configured")));
    1200             : 
    1201         326 :     LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
    1202             : 
    1203         326 :     session_replication_state->acquired_by = 0;
    1204         326 :     cv = &session_replication_state->origin_cv;
    1205         326 :     session_replication_state = NULL;
    1206             : 
    1207         326 :     LWLockRelease(ReplicationOriginLock);
    1208             : 
    1209         326 :     ConditionVariableBroadcast(cv);
    1210         326 : }
    1211             : 
    1212             : /*
    1213             :  * Do the same work replorigin_advance() does, just on the session's
    1214             :  * configured origin.
    1215             :  *
    1216             :  * This is noticeably cheaper than using replorigin_advance().
    1217             :  */
    1218             : void
    1219        1942 : replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit)
    1220             : {
    1221             :     Assert(session_replication_state != NULL);
    1222             :     Assert(session_replication_state->roident != InvalidRepOriginId);
    1223             : 
    1224        1942 :     LWLockAcquire(&session_replication_state->lock, LW_EXCLUSIVE);
    1225        1942 :     if (session_replication_state->local_lsn < local_commit)
    1226        1942 :         session_replication_state->local_lsn = local_commit;
    1227        1942 :     if (session_replication_state->remote_lsn < remote_commit)
    1228         928 :         session_replication_state->remote_lsn = remote_commit;
    1229        1942 :     LWLockRelease(&session_replication_state->lock);
    1230        1942 : }
    1231             : 
    1232             : /*
    1233             :  * Ask the machinery about the point up to which we successfully replayed
    1234             :  * changes from an already setup replication origin.
    1235             :  */
    1236             : XLogRecPtr
    1237         372 : replorigin_session_get_progress(bool flush)
    1238             : {
    1239             :     XLogRecPtr  remote_lsn;
    1240             :     XLogRecPtr  local_lsn;
    1241             : 
    1242             :     Assert(session_replication_state != NULL);
    1243             : 
    1244         372 :     LWLockAcquire(&session_replication_state->lock, LW_SHARED);
    1245         372 :     remote_lsn = session_replication_state->remote_lsn;
    1246         372 :     local_lsn = session_replication_state->local_lsn;
    1247         372 :     LWLockRelease(&session_replication_state->lock);
    1248             : 
    1249         372 :     if (flush && local_lsn != InvalidXLogRecPtr)
    1250           2 :         XLogFlush(local_lsn);
    1251             : 
    1252         372 :     return remote_lsn;
    1253             : }
    1254             : 
    1255             : 
    1256             : 
    1257             : /* ---------------------------------------------------------------------------
    1258             :  * SQL functions for working with replication origin.
    1259             :  *
    1260             :  * These mostly should be fairly short wrappers around more generic functions.
    1261             :  * ---------------------------------------------------------------------------
    1262             :  */
    1263             : 
    1264             : /*
    1265             :  * Create replication origin for the passed in name, and return the assigned
    1266             :  * oid.
    1267             :  */
    1268             : Datum
    1269          18 : pg_replication_origin_create(PG_FUNCTION_ARGS)
    1270             : {
    1271             :     char       *name;
    1272             :     RepOriginId roident;
    1273             : 
    1274          18 :     replorigin_check_prerequisites(false, false);
    1275             : 
    1276          18 :     name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
    1277             : 
    1278             :     /*
    1279             :      * Replication origins "any and "none" are reserved for system options.
    1280             :      * The origins "pg_xxx" are reserved for internal use.
    1281             :      */
    1282          18 :     if (IsReservedName(name) || IsReservedOriginName(name))
    1283           6 :         ereport(ERROR,
    1284             :                 (errcode(ERRCODE_RESERVED_NAME),
    1285             :                  errmsg("replication origin name \"%s\" is reserved",
    1286             :                         name),
    1287             :                  errdetail("Origin names \"%s\", \"%s\", and names starting with \"pg_\" are reserved.",
    1288             :                            LOGICALREP_ORIGIN_ANY, LOGICALREP_ORIGIN_NONE)));
    1289             : 
    1290             :     /*
    1291             :      * If built with appropriate switch, whine when regression-testing
    1292             :      * conventions for replication origin names are violated.
    1293             :      */
    1294             : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
    1295             :     if (strncmp(name, "regress_", 8) != 0)
    1296             :         elog(WARNING, "replication origins created by regression test cases should have names starting with \"regress_\"");
    1297             : #endif
    1298             : 
    1299          12 :     roident = replorigin_create(name);
    1300             : 
    1301          10 :     pfree(name);
    1302             : 
    1303          10 :     PG_RETURN_OID(roident);
    1304             : }
    1305             : 
    1306             : /*
    1307             :  * Drop replication origin.
    1308             :  */
    1309             : Datum
    1310          14 : pg_replication_origin_drop(PG_FUNCTION_ARGS)
    1311             : {
    1312             :     char       *name;
    1313             : 
    1314          14 :     replorigin_check_prerequisites(false, false);
    1315             : 
    1316          14 :     name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
    1317             : 
    1318          14 :     replorigin_drop_by_name(name, false, true);
    1319             : 
    1320          12 :     pfree(name);
    1321             : 
    1322          12 :     PG_RETURN_VOID();
    1323             : }
    1324             : 
    1325             : /*
    1326             :  * Return oid of a replication origin.
    1327             :  */
    1328             : Datum
    1329           0 : pg_replication_origin_oid(PG_FUNCTION_ARGS)
    1330             : {
    1331             :     char       *name;
    1332             :     RepOriginId roident;
    1333             : 
    1334           0 :     replorigin_check_prerequisites(false, false);
    1335             : 
    1336           0 :     name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
    1337           0 :     roident = replorigin_by_name(name, true);
    1338             : 
    1339           0 :     pfree(name);
    1340             : 
    1341           0 :     if (OidIsValid(roident))
    1342           0 :         PG_RETURN_OID(roident);
    1343           0 :     PG_RETURN_NULL();
    1344             : }
    1345             : 
    1346             : /*
    1347             :  * Setup a replication origin for this session.
    1348             :  */
    1349             : Datum
    1350          12 : pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
    1351             : {
    1352             :     char       *name;
    1353             :     RepOriginId origin;
    1354             : 
    1355          12 :     replorigin_check_prerequisites(true, false);
    1356             : 
    1357          12 :     name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
    1358          12 :     origin = replorigin_by_name(name, false);
    1359          10 :     replorigin_session_setup(origin, 0);
    1360             : 
    1361           8 :     replorigin_session_origin = origin;
    1362             : 
    1363           8 :     pfree(name);
    1364             : 
    1365           8 :     PG_RETURN_VOID();
    1366             : }
    1367             : 
    1368             : /*
    1369             :  * Reset previously setup origin in this session
    1370             :  */
    1371             : Datum
    1372          10 : pg_replication_origin_session_reset(PG_FUNCTION_ARGS)
    1373             : {
    1374          10 :     replorigin_check_prerequisites(true, false);
    1375             : 
    1376          10 :     replorigin_session_reset();
    1377             : 
    1378           8 :     replorigin_session_origin = InvalidRepOriginId;
    1379           8 :     replorigin_session_origin_lsn = InvalidXLogRecPtr;
    1380           8 :     replorigin_session_origin_timestamp = 0;
    1381             : 
    1382           8 :     PG_RETURN_VOID();
    1383             : }
    1384             : 
    1385             : /*
    1386             :  * Has a replication origin been setup for this session.
    1387             :  */
    1388             : Datum
    1389           0 : pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)
    1390             : {
    1391           0 :     replorigin_check_prerequisites(false, false);
    1392             : 
    1393           0 :     PG_RETURN_BOOL(replorigin_session_origin != InvalidRepOriginId);
    1394             : }
    1395             : 
    1396             : 
    1397             : /*
    1398             :  * Return the replication progress for origin setup in the current session.
    1399             :  *
    1400             :  * If 'flush' is set to true it is ensured that the returned value corresponds
    1401             :  * to a local transaction that has been flushed. This is useful if asynchronous
    1402             :  * commits are used when replaying replicated transactions.
    1403             :  */
    1404             : Datum
    1405           4 : pg_replication_origin_session_progress(PG_FUNCTION_ARGS)
    1406             : {
    1407           4 :     XLogRecPtr  remote_lsn = InvalidXLogRecPtr;
    1408           4 :     bool        flush = PG_GETARG_BOOL(0);
    1409             : 
    1410           4 :     replorigin_check_prerequisites(true, false);
    1411             : 
    1412           4 :     if (session_replication_state == NULL)
    1413           0 :         ereport(ERROR,
    1414             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1415             :                  errmsg("no replication origin is configured")));
    1416             : 
    1417           4 :     remote_lsn = replorigin_session_get_progress(flush);
    1418             : 
    1419           4 :     if (remote_lsn == InvalidXLogRecPtr)
    1420           0 :         PG_RETURN_NULL();
    1421             : 
    1422           4 :     PG_RETURN_LSN(remote_lsn);
    1423             : }
    1424             : 
    1425             : Datum
    1426           2 : pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)
    1427             : {
    1428           2 :     XLogRecPtr  location = PG_GETARG_LSN(0);
    1429             : 
    1430           2 :     replorigin_check_prerequisites(true, false);
    1431             : 
    1432           2 :     if (session_replication_state == NULL)
    1433           0 :         ereport(ERROR,
    1434             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1435             :                  errmsg("no replication origin is configured")));
    1436             : 
    1437           2 :     replorigin_session_origin_lsn = location;
    1438           2 :     replorigin_session_origin_timestamp = PG_GETARG_TIMESTAMPTZ(1);
    1439             : 
    1440           2 :     PG_RETURN_VOID();
    1441             : }
    1442             : 
    1443             : Datum
    1444           0 : pg_replication_origin_xact_reset(PG_FUNCTION_ARGS)
    1445             : {
    1446           0 :     replorigin_check_prerequisites(true, false);
    1447             : 
    1448           0 :     replorigin_session_origin_lsn = InvalidXLogRecPtr;
    1449           0 :     replorigin_session_origin_timestamp = 0;
    1450             : 
    1451           0 :     PG_RETURN_VOID();
    1452             : }
    1453             : 
    1454             : 
    1455             : Datum
    1456           6 : pg_replication_origin_advance(PG_FUNCTION_ARGS)
    1457             : {
    1458           6 :     text       *name = PG_GETARG_TEXT_PP(0);
    1459           6 :     XLogRecPtr  remote_commit = PG_GETARG_LSN(1);
    1460             :     RepOriginId node;
    1461             : 
    1462           6 :     replorigin_check_prerequisites(true, false);
    1463             : 
    1464             :     /* lock to prevent the replication origin from vanishing */
    1465           6 :     LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
    1466             : 
    1467           6 :     node = replorigin_by_name(text_to_cstring(name), false);
    1468             : 
    1469             :     /*
    1470             :      * Can't sensibly pass a local commit to be flushed at checkpoint - this
    1471             :      * xact hasn't committed yet. This is why this function should be used to
    1472             :      * set up the initial replication state, but not for replay.
    1473             :      */
    1474           4 :     replorigin_advance(node, remote_commit, InvalidXLogRecPtr,
    1475             :                        true /* go backward */ , true /* WAL log */ );
    1476             : 
    1477           4 :     UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
    1478             : 
    1479           4 :     PG_RETURN_VOID();
    1480             : }
    1481             : 
    1482             : 
    1483             : /*
    1484             :  * Return the replication progress for an individual replication origin.
    1485             :  *
    1486             :  * If 'flush' is set to true it is ensured that the returned value corresponds
    1487             :  * to a local transaction that has been flushed. This is useful if asynchronous
    1488             :  * commits are used when replaying replicated transactions.
    1489             :  */
    1490             : Datum
    1491           6 : pg_replication_origin_progress(PG_FUNCTION_ARGS)
    1492             : {
    1493             :     char       *name;
    1494             :     bool        flush;
    1495             :     RepOriginId roident;
    1496           6 :     XLogRecPtr  remote_lsn = InvalidXLogRecPtr;
    1497             : 
    1498           6 :     replorigin_check_prerequisites(true, true);
    1499             : 
    1500           6 :     name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
    1501           6 :     flush = PG_GETARG_BOOL(1);
    1502             : 
    1503           6 :     roident = replorigin_by_name(name, false);
    1504             :     Assert(OidIsValid(roident));
    1505             : 
    1506           4 :     remote_lsn = replorigin_get_progress(roident, flush);
    1507             : 
    1508           4 :     if (remote_lsn == InvalidXLogRecPtr)
    1509           0 :         PG_RETURN_NULL();
    1510             : 
    1511           4 :     PG_RETURN_LSN(remote_lsn);
    1512             : }
    1513             : 
    1514             : 
    1515             : Datum
    1516          12 : pg_show_replication_origin_status(PG_FUNCTION_ARGS)
    1517             : {
    1518          12 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
    1519             :     int         i;
    1520             : #define REPLICATION_ORIGIN_PROGRESS_COLS 4
    1521             : 
    1522             :     /* we want to return 0 rows if slot is set to zero */
    1523          12 :     replorigin_check_prerequisites(false, true);
    1524             : 
    1525          12 :     InitMaterializedSRF(fcinfo, 0);
    1526             : 
    1527             :     /* prevent slots from being concurrently dropped */
    1528          12 :     LWLockAcquire(ReplicationOriginLock, LW_SHARED);
    1529             : 
    1530             :     /*
    1531             :      * Iterate through all possible replication_states, display if they are
    1532             :      * filled. Note that we do not take any locks, so slightly corrupted/out
    1533             :      * of date values are a possibility.
    1534             :      */
    1535         108 :     for (i = 0; i < max_replication_slots; i++)
    1536             :     {
    1537             :         ReplicationState *state;
    1538             :         Datum       values[REPLICATION_ORIGIN_PROGRESS_COLS];
    1539             :         bool        nulls[REPLICATION_ORIGIN_PROGRESS_COLS];
    1540             :         char       *roname;
    1541             : 
    1542          96 :         state = &replication_states[i];
    1543             : 
    1544             :         /* unused slot, nothing to display */
    1545          96 :         if (state->roident == InvalidRepOriginId)
    1546          86 :             continue;
    1547             : 
    1548          10 :         memset(values, 0, sizeof(values));
    1549          10 :         memset(nulls, 1, sizeof(nulls));
    1550             : 
    1551          10 :         values[0] = ObjectIdGetDatum(state->roident);
    1552          10 :         nulls[0] = false;
    1553             : 
    1554             :         /*
    1555             :          * We're not preventing the origin to be dropped concurrently, so
    1556             :          * silently accept that it might be gone.
    1557             :          */
    1558          10 :         if (replorigin_by_oid(state->roident, true,
    1559             :                               &roname))
    1560             :         {
    1561          10 :             values[1] = CStringGetTextDatum(roname);
    1562          10 :             nulls[1] = false;
    1563             :         }
    1564             : 
    1565          10 :         LWLockAcquire(&state->lock, LW_SHARED);
    1566             : 
    1567          10 :         values[2] = LSNGetDatum(state->remote_lsn);
    1568          10 :         nulls[2] = false;
    1569             : 
    1570          10 :         values[3] = LSNGetDatum(state->local_lsn);
    1571          10 :         nulls[3] = false;
    1572             : 
    1573          10 :         LWLockRelease(&state->lock);
    1574             : 
    1575          10 :         tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
    1576             :                              values, nulls);
    1577             :     }
    1578             : 
    1579          12 :     LWLockRelease(ReplicationOriginLock);
    1580             : 
    1581             : #undef REPLICATION_ORIGIN_PROGRESS_COLS
    1582             : 
    1583          12 :     return (Datum) 0;
    1584             : }

Generated by: LCOV version 1.14