LCOV - code coverage report
Current view: top level - src/backend/replication/logical - origin.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 394 455 86.6 %
Date: 2025-01-18 04:15:08 Functions: 28 31 90.3 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * origin.c
       4             :  *    Logical replication progress tracking support.
       5             :  *
       6             :  * Copyright (c) 2013-2025, PostgreSQL Global Development Group
       7             :  *
       8             :  * IDENTIFICATION
       9             :  *    src/backend/replication/logical/origin.c
      10             :  *
      11             :  * NOTES
      12             :  *
      13             :  * This file provides the following:
      14             :  * * An infrastructure to name nodes in a replication setup
      15             :  * * A facility to efficiently store and persist replication progress in an
      16             :  *   efficient and durable manner.
      17             :  *
      18             :  * Replication origin consist out of a descriptive, user defined, external
      19             :  * name and a short, thus space efficient, internal 2 byte one. This split
      20             :  * exists because replication origin have to be stored in WAL and shared
      21             :  * memory and long descriptors would be inefficient.  For now only use 2 bytes
      22             :  * for the internal id of a replication origin as it seems unlikely that there
      23             :  * soon will be more than 65k nodes in one replication setup; and using only
      24             :  * two bytes allow us to be more space efficient.
      25             :  *
      26             :  * Replication progress is tracked in a shared memory table
      27             :  * (ReplicationState) that's dumped to disk every checkpoint. Entries
      28             :  * ('slots') in this table are identified by the internal id. That's the case
      29             :  * because it allows to increase replication progress during crash
      30             :  * recovery. To allow doing so we store the original LSN (from the originating
      31             :  * system) of a transaction in the commit record. That allows to recover the
      32             :  * precise replayed state after crash recovery; without requiring synchronous
      33             :  * commits. Allowing logical replication to use asynchronous commit is
      34             :  * generally good for performance, but especially important as it allows a
      35             :  * single threaded replay process to keep up with a source that has multiple
      36             :  * backends generating changes concurrently.  For efficiency and simplicity
      37             :  * reasons a backend can setup one replication origin that's from then used as
      38             :  * the source of changes produced by the backend, until reset again.
      39             :  *
      40             :  * This infrastructure is intended to be used in cooperation with logical
      41             :  * decoding. When replaying from a remote system the configured origin is
      42             :  * provided to output plugins, allowing prevention of replication loops and
      43             :  * other filtering.
      44             :  *
      45             :  * There are several levels of locking at work:
      46             :  *
      47             :  * * To create and drop replication origins an exclusive lock on
      48             :  *   pg_replication_slot is required for the duration. That allows us to
      49             :  *   safely and conflict free assign new origins using a dirty snapshot.
      50             :  *
      51             :  * * When creating an in-memory replication progress slot the ReplicationOrigin
      52             :  *   LWLock has to be held exclusively; when iterating over the replication
      53             :  *   progress a shared lock has to be held, the same when advancing the
      54             :  *   replication progress of an individual backend that has not setup as the
      55             :  *   session's replication origin.
      56             :  *
      57             :  * * When manipulating or looking at the remote_lsn and local_lsn fields of a
      58             :  *   replication progress slot that slot's lwlock has to be held. That's
      59             :  *   primarily because we do not assume 8 byte writes (the LSN) is atomic on
      60             :  *   all our platforms, but it also simplifies memory ordering concerns
      61             :  *   between the remote and local lsn. We use a lwlock instead of a spinlock
      62             :  *   so it's less harmful to hold the lock over a WAL write
      63             :  *   (cf. AdvanceReplicationProgress).
      64             :  *
      65             :  * ---------------------------------------------------------------------------
      66             :  */
      67             : 
      68             : #include "postgres.h"
      69             : 
      70             : #include <unistd.h>
      71             : #include <sys/stat.h>
      72             : 
      73             : #include "access/genam.h"
      74             : #include "access/htup_details.h"
      75             : #include "access/table.h"
      76             : #include "access/xact.h"
      77             : #include "access/xloginsert.h"
      78             : #include "catalog/catalog.h"
      79             : #include "catalog/indexing.h"
      80             : #include "catalog/pg_subscription.h"
      81             : #include "funcapi.h"
      82             : #include "miscadmin.h"
      83             : #include "nodes/execnodes.h"
      84             : #include "pgstat.h"
      85             : #include "replication/origin.h"
      86             : #include "replication/slot.h"
      87             : #include "storage/condition_variable.h"
      88             : #include "storage/fd.h"
      89             : #include "storage/ipc.h"
      90             : #include "storage/lmgr.h"
      91             : #include "utils/builtins.h"
      92             : #include "utils/fmgroids.h"
      93             : #include "utils/pg_lsn.h"
      94             : #include "utils/rel.h"
      95             : #include "utils/snapmgr.h"
      96             : #include "utils/syscache.h"
      97             : 
      98             : /* paths for replication origin checkpoint files */
      99             : #define PG_REPLORIGIN_CHECKPOINT_FILENAME PG_LOGICAL_DIR "/replorigin_checkpoint"
     100             : #define PG_REPLORIGIN_CHECKPOINT_TMPFILE PG_REPLORIGIN_CHECKPOINT_FILENAME ".tmp"
     101             : 
     102             : /*
     103             :  * Replay progress of a single remote node.
     104             :  */
     105             : typedef struct ReplicationState
     106             : {
     107             :     /*
     108             :      * Local identifier for the remote node.
     109             :      */
     110             :     RepOriginId roident;
     111             : 
     112             :     /*
     113             :      * Location of the latest commit from the remote side.
     114             :      */
     115             :     XLogRecPtr  remote_lsn;
     116             : 
     117             :     /*
     118             :      * Remember the local lsn of the commit record so we can XLogFlush() to it
     119             :      * during a checkpoint so we know the commit record actually is safe on
     120             :      * disk.
     121             :      */
     122             :     XLogRecPtr  local_lsn;
     123             : 
     124             :     /*
     125             :      * PID of backend that's acquired slot, or 0 if none.
     126             :      */
     127             :     int         acquired_by;
     128             : 
     129             :     /*
     130             :      * Condition variable that's signaled when acquired_by changes.
     131             :      */
     132             :     ConditionVariable origin_cv;
     133             : 
     134             :     /*
     135             :      * Lock protecting remote_lsn and local_lsn.
     136             :      */
     137             :     LWLock      lock;
     138             : } ReplicationState;
     139             : 
     140             : /*
     141             :  * On disk version of ReplicationState.
     142             :  */
     143             : typedef struct ReplicationStateOnDisk
     144             : {
     145             :     RepOriginId roident;
     146             :     XLogRecPtr  remote_lsn;
     147             : } ReplicationStateOnDisk;
     148             : 
     149             : 
     150             : typedef struct ReplicationStateCtl
     151             : {
     152             :     /* Tranche to use for per-origin LWLocks */
     153             :     int         tranche_id;
     154             :     /* Array of length max_replication_slots */
     155             :     ReplicationState states[FLEXIBLE_ARRAY_MEMBER];
     156             : } ReplicationStateCtl;
     157             : 
     158             : /* external variables */
     159             : RepOriginId replorigin_session_origin = InvalidRepOriginId; /* assumed identity */
     160             : XLogRecPtr  replorigin_session_origin_lsn = InvalidXLogRecPtr;
     161             : TimestampTz replorigin_session_origin_timestamp = 0;
     162             : 
     163             : /*
     164             :  * Base address into a shared memory array of replication states of size
     165             :  * max_replication_slots.
     166             :  *
     167             :  * XXX: Should we use a separate variable to size this rather than
     168             :  * max_replication_slots?
     169             :  */
     170             : static ReplicationState *replication_states;
     171             : 
     172             : /*
     173             :  * Actual shared memory block (replication_states[] is now part of this).
     174             :  */
     175             : static ReplicationStateCtl *replication_states_ctl;
     176             : 
     177             : /*
     178             :  * We keep a pointer to this backend's ReplicationState to avoid having to
     179             :  * search the replication_states array in replorigin_session_advance for each
     180             :  * remote commit.  (Ownership of a backend's own entry can only be changed by
     181             :  * that backend.)
     182             :  */
     183             : static ReplicationState *session_replication_state = NULL;
     184             : 
     185             : /* Magic for on disk files. */
     186             : #define REPLICATION_STATE_MAGIC ((uint32) 0x1257DADE)
     187             : 
     188             : static void
     189          84 : replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
     190             : {
     191          84 :     if (check_slots && max_replication_slots == 0)
     192           0 :         ereport(ERROR,
     193             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     194             :                  errmsg("cannot query or manipulate replication origin when \"max_replication_slots\" is 0")));
     195             : 
     196          84 :     if (!recoveryOK && RecoveryInProgress())
     197           0 :         ereport(ERROR,
     198             :                 (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
     199             :                  errmsg("cannot manipulate replication origins during recovery")));
     200          84 : }
     201             : 
     202             : 
     203             : /*
     204             :  * IsReservedOriginName
     205             :  *      True iff name is either "none" or "any".
     206             :  */
     207             : static bool
     208          16 : IsReservedOriginName(const char *name)
     209             : {
     210          30 :     return ((pg_strcasecmp(name, LOGICALREP_ORIGIN_NONE) == 0) ||
     211          14 :             (pg_strcasecmp(name, LOGICALREP_ORIGIN_ANY) == 0));
     212             : }
     213             : 
     214             : /* ---------------------------------------------------------------------------
     215             :  * Functions for working with replication origins themselves.
     216             :  * ---------------------------------------------------------------------------
     217             :  */
     218             : 
     219             : /*
     220             :  * Check for a persistent replication origin identified by name.
     221             :  *
     222             :  * Returns InvalidOid if the node isn't known yet and missing_ok is true.
     223             :  */
     224             : RepOriginId
     225        1742 : replorigin_by_name(const char *roname, bool missing_ok)
     226             : {
     227             :     Form_pg_replication_origin ident;
     228        1742 :     Oid         roident = InvalidOid;
     229             :     HeapTuple   tuple;
     230             :     Datum       roname_d;
     231             : 
     232        1742 :     roname_d = CStringGetTextDatum(roname);
     233             : 
     234        1742 :     tuple = SearchSysCache1(REPLORIGNAME, roname_d);
     235        1742 :     if (HeapTupleIsValid(tuple))
     236             :     {
     237        1014 :         ident = (Form_pg_replication_origin) GETSTRUCT(tuple);
     238        1014 :         roident = ident->roident;
     239        1014 :         ReleaseSysCache(tuple);
     240             :     }
     241         728 :     else if (!missing_ok)
     242           8 :         ereport(ERROR,
     243             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
     244             :                  errmsg("replication origin \"%s\" does not exist",
     245             :                         roname)));
     246             : 
     247        1734 :     return roident;
     248             : }
     249             : 
     250             : /*
     251             :  * Create a replication origin.
     252             :  *
     253             :  * Needs to be called in a transaction.
     254             :  */
     255             : RepOriginId
     256         684 : replorigin_create(const char *roname)
     257             : {
     258             :     Oid         roident;
     259         684 :     HeapTuple   tuple = NULL;
     260             :     Relation    rel;
     261             :     Datum       roname_d;
     262             :     SnapshotData SnapshotDirty;
     263             :     SysScanDesc scan;
     264             :     ScanKeyData key;
     265             : 
     266         684 :     roname_d = CStringGetTextDatum(roname);
     267             : 
     268             :     Assert(IsTransactionState());
     269             : 
     270             :     /*
     271             :      * We need the numeric replication origin to be 16bit wide, so we cannot
     272             :      * rely on the normal oid allocation. Instead we simply scan
     273             :      * pg_replication_origin for the first unused id. That's not particularly
     274             :      * efficient, but this should be a fairly infrequent operation - we can
     275             :      * easily spend a bit more code on this when it turns out it needs to be
     276             :      * faster.
     277             :      *
     278             :      * We handle concurrency by taking an exclusive lock (allowing reads!)
     279             :      * over the table for the duration of the search. Because we use a "dirty
     280             :      * snapshot" we can read rows that other in-progress sessions have
     281             :      * written, even though they would be invisible with normal snapshots. Due
     282             :      * to the exclusive lock there's no danger that new rows can appear while
     283             :      * we're checking.
     284             :      */
     285         684 :     InitDirtySnapshot(SnapshotDirty);
     286             : 
     287         684 :     rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
     288             : 
     289        1200 :     for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
     290             :     {
     291             :         bool        nulls[Natts_pg_replication_origin];
     292             :         Datum       values[Natts_pg_replication_origin];
     293             :         bool        collides;
     294             : 
     295        1200 :         CHECK_FOR_INTERRUPTS();
     296             : 
     297        1200 :         ScanKeyInit(&key,
     298             :                     Anum_pg_replication_origin_roident,
     299             :                     BTEqualStrategyNumber, F_OIDEQ,
     300             :                     ObjectIdGetDatum(roident));
     301             : 
     302        1200 :         scan = systable_beginscan(rel, ReplicationOriginIdentIndex,
     303             :                                   true /* indexOK */ ,
     304             :                                   &SnapshotDirty,
     305             :                                   1, &key);
     306             : 
     307        1200 :         collides = HeapTupleIsValid(systable_getnext(scan));
     308             : 
     309        1200 :         systable_endscan(scan);
     310             : 
     311        1200 :         if (!collides)
     312             :         {
     313             :             /*
     314             :              * Ok, found an unused roident, insert the new row and do a CCI,
     315             :              * so our callers can look it up if they want to.
     316             :              */
     317         684 :             memset(&nulls, 0, sizeof(nulls));
     318             : 
     319         684 :             values[Anum_pg_replication_origin_roident - 1] = ObjectIdGetDatum(roident);
     320         684 :             values[Anum_pg_replication_origin_roname - 1] = roname_d;
     321             : 
     322         684 :             tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
     323         684 :             CatalogTupleInsert(rel, tuple);
     324         682 :             CommandCounterIncrement();
     325         682 :             break;
     326             :         }
     327             :     }
     328             : 
     329             :     /* now release lock again,  */
     330         682 :     table_close(rel, ExclusiveLock);
     331             : 
     332         682 :     if (tuple == NULL)
     333           0 :         ereport(ERROR,
     334             :                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
     335             :                  errmsg("could not find free replication origin ID")));
     336             : 
     337         682 :     heap_freetuple(tuple);
     338         682 :     return roident;
     339             : }
     340             : 
     341             : /*
     342             :  * Helper function to drop a replication origin.
     343             :  */
     344             : static void
     345         554 : replorigin_state_clear(RepOriginId roident, bool nowait)
     346             : {
     347             :     int         i;
     348             : 
     349             :     /*
     350             :      * Clean up the slot state info, if there is any matching slot.
     351             :      */
     352         554 : restart:
     353         554 :     LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
     354             : 
     355        1708 :     for (i = 0; i < max_replication_slots; i++)
     356             :     {
     357        1634 :         ReplicationState *state = &replication_states[i];
     358             : 
     359        1634 :         if (state->roident == roident)
     360             :         {
     361             :             /* found our slot, is it busy? */
     362         480 :             if (state->acquired_by != 0)
     363             :             {
     364             :                 ConditionVariable *cv;
     365             : 
     366           0 :                 if (nowait)
     367           0 :                     ereport(ERROR,
     368             :                             (errcode(ERRCODE_OBJECT_IN_USE),
     369             :                              errmsg("could not drop replication origin with ID %d, in use by PID %d",
     370             :                                     state->roident,
     371             :                                     state->acquired_by)));
     372             : 
     373             :                 /*
     374             :                  * We must wait and then retry.  Since we don't know which CV
     375             :                  * to wait on until here, we can't readily use
     376             :                  * ConditionVariablePrepareToSleep (calling it here would be
     377             :                  * wrong, since we could miss the signal if we did so); just
     378             :                  * use ConditionVariableSleep directly.
     379             :                  */
     380           0 :                 cv = &state->origin_cv;
     381             : 
     382           0 :                 LWLockRelease(ReplicationOriginLock);
     383             : 
     384           0 :                 ConditionVariableSleep(cv, WAIT_EVENT_REPLICATION_ORIGIN_DROP);
     385           0 :                 goto restart;
     386             :             }
     387             : 
     388             :             /* first make a WAL log entry */
     389             :             {
     390             :                 xl_replorigin_drop xlrec;
     391             : 
     392         480 :                 xlrec.node_id = roident;
     393         480 :                 XLogBeginInsert();
     394         480 :                 XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
     395         480 :                 XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_DROP);
     396             :             }
     397             : 
     398             :             /* then clear the in-memory slot */
     399         480 :             state->roident = InvalidRepOriginId;
     400         480 :             state->remote_lsn = InvalidXLogRecPtr;
     401         480 :             state->local_lsn = InvalidXLogRecPtr;
     402         480 :             break;
     403             :         }
     404             :     }
     405         554 :     LWLockRelease(ReplicationOriginLock);
     406         554 :     ConditionVariableCancelSleep();
     407         554 : }
     408             : 
     409             : /*
     410             :  * Drop replication origin (by name).
     411             :  *
     412             :  * Needs to be called in a transaction.
     413             :  */
     414             : void
     415         908 : replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
     416             : {
     417             :     RepOriginId roident;
     418             :     Relation    rel;
     419             :     HeapTuple   tuple;
     420             : 
     421             :     Assert(IsTransactionState());
     422             : 
     423         908 :     rel = table_open(ReplicationOriginRelationId, RowExclusiveLock);
     424             : 
     425         908 :     roident = replorigin_by_name(name, missing_ok);
     426             : 
     427             :     /* Lock the origin to prevent concurrent drops. */
     428         906 :     LockSharedObject(ReplicationOriginRelationId, roident, 0,
     429             :                      AccessExclusiveLock);
     430             : 
     431         906 :     tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident));
     432         906 :     if (!HeapTupleIsValid(tuple))
     433             :     {
     434         352 :         if (!missing_ok)
     435           0 :             elog(ERROR, "cache lookup failed for replication origin with ID %d",
     436             :                  roident);
     437             : 
     438             :         /*
     439             :          * We don't need to retain the locks if the origin is already dropped.
     440             :          */
     441         352 :         UnlockSharedObject(ReplicationOriginRelationId, roident, 0,
     442             :                            AccessExclusiveLock);
     443         352 :         table_close(rel, RowExclusiveLock);
     444         352 :         return;
     445             :     }
     446             : 
     447         554 :     replorigin_state_clear(roident, nowait);
     448             : 
     449             :     /*
     450             :      * Now, we can delete the catalog entry.
     451             :      */
     452         554 :     CatalogTupleDelete(rel, &tuple->t_self);
     453         554 :     ReleaseSysCache(tuple);
     454             : 
     455         554 :     CommandCounterIncrement();
     456             : 
     457             :     /* We keep the lock on pg_replication_origin until commit */
     458         554 :     table_close(rel, NoLock);
     459             : }
     460             : 
     461             : /*
     462             :  * Lookup replication origin via its oid and return the name.
     463             :  *
     464             :  * The external name is palloc'd in the calling context.
     465             :  *
     466             :  * Returns true if the origin is known, false otherwise.
     467             :  */
     468             : bool
     469          40 : replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
     470             : {
     471             :     HeapTuple   tuple;
     472             :     Form_pg_replication_origin ric;
     473             : 
     474             :     Assert(OidIsValid((Oid) roident));
     475             :     Assert(roident != InvalidRepOriginId);
     476             :     Assert(roident != DoNotReplicateId);
     477             : 
     478          40 :     tuple = SearchSysCache1(REPLORIGIDENT,
     479             :                             ObjectIdGetDatum((Oid) roident));
     480             : 
     481          40 :     if (HeapTupleIsValid(tuple))
     482             :     {
     483          34 :         ric = (Form_pg_replication_origin) GETSTRUCT(tuple);
     484          34 :         *roname = text_to_cstring(&ric->roname);
     485          34 :         ReleaseSysCache(tuple);
     486             : 
     487          34 :         return true;
     488             :     }
     489             :     else
     490             :     {
     491           6 :         *roname = NULL;
     492             : 
     493           6 :         if (!missing_ok)
     494           0 :             ereport(ERROR,
     495             :                     (errcode(ERRCODE_UNDEFINED_OBJECT),
     496             :                      errmsg("replication origin with ID %d does not exist",
     497             :                             roident)));
     498             : 
     499           6 :         return false;
     500             :     }
     501             : }
     502             : 
     503             : 
     504             : /* ---------------------------------------------------------------------------
     505             :  * Functions for handling replication progress.
     506             :  * ---------------------------------------------------------------------------
     507             :  */
     508             : 
     509             : Size
     510        7398 : ReplicationOriginShmemSize(void)
     511             : {
     512        7398 :     Size        size = 0;
     513             : 
     514             :     /*
     515             :      * XXX: max_replication_slots is arguably the wrong thing to use, as here
     516             :      * we keep the replay state of *remote* transactions. But for now it seems
     517             :      * sufficient to reuse it, rather than introduce a separate GUC.
     518             :      */
     519        7398 :     if (max_replication_slots == 0)
     520           4 :         return size;
     521             : 
     522        7394 :     size = add_size(size, offsetof(ReplicationStateCtl, states));
     523             : 
     524        7394 :     size = add_size(size,
     525             :                     mul_size(max_replication_slots, sizeof(ReplicationState)));
     526        7394 :     return size;
     527             : }
     528             : 
     529             : void
     530        1918 : ReplicationOriginShmemInit(void)
     531             : {
     532             :     bool        found;
     533             : 
     534        1918 :     if (max_replication_slots == 0)
     535           2 :         return;
     536             : 
     537        1916 :     replication_states_ctl = (ReplicationStateCtl *)
     538        1916 :         ShmemInitStruct("ReplicationOriginState",
     539             :                         ReplicationOriginShmemSize(),
     540             :                         &found);
     541        1916 :     replication_states = replication_states_ctl->states;
     542             : 
     543        1916 :     if (!found)
     544             :     {
     545             :         int         i;
     546             : 
     547      135208 :         MemSet(replication_states_ctl, 0, ReplicationOriginShmemSize());
     548             : 
     549        1916 :         replication_states_ctl->tranche_id = LWTRANCHE_REPLICATION_ORIGIN_STATE;
     550             : 
     551       20684 :         for (i = 0; i < max_replication_slots; i++)
     552             :         {
     553       18768 :             LWLockInitialize(&replication_states[i].lock,
     554       18768 :                              replication_states_ctl->tranche_id);
     555       18768 :             ConditionVariableInit(&replication_states[i].origin_cv);
     556             :         }
     557             :     }
     558             : }
     559             : 
     560             : /* ---------------------------------------------------------------------------
     561             :  * Perform a checkpoint of each replication origin's progress with respect to
     562             :  * the replayed remote_lsn. Make sure that all transactions we refer to in the
     563             :  * checkpoint (local_lsn) are actually on-disk. This might not yet be the case
     564             :  * if the transactions were originally committed asynchronously.
     565             :  *
     566             :  * We store checkpoints in the following format:
     567             :  * +-------+------------------------+------------------+-----+--------+
     568             :  * | MAGIC | ReplicationStateOnDisk | struct Replic... | ... | CRC32C | EOF
     569             :  * +-------+------------------------+------------------+-----+--------+
     570             :  *
     571             :  * So its just the magic, followed by the statically sized
     572             :  * ReplicationStateOnDisk structs. Note that the maximum number of
     573             :  * ReplicationState is determined by max_replication_slots.
     574             :  * ---------------------------------------------------------------------------
     575             :  */
     576             : void
     577        2476 : CheckPointReplicationOrigin(void)
     578             : {
     579        2476 :     const char *tmppath = PG_REPLORIGIN_CHECKPOINT_TMPFILE;
     580        2476 :     const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
     581             :     int         tmpfd;
     582             :     int         i;
     583        2476 :     uint32      magic = REPLICATION_STATE_MAGIC;
     584             :     pg_crc32c   crc;
     585             : 
     586        2476 :     if (max_replication_slots == 0)
     587           2 :         return;
     588             : 
     589        2474 :     INIT_CRC32C(crc);
     590             : 
     591             :     /* make sure no old temp file is remaining */
     592        2474 :     if (unlink(tmppath) < 0 && errno != ENOENT)
     593           0 :         ereport(PANIC,
     594             :                 (errcode_for_file_access(),
     595             :                  errmsg("could not remove file \"%s\": %m",
     596             :                         tmppath)));
     597             : 
     598             :     /*
     599             :      * no other backend can perform this at the same time; only one checkpoint
     600             :      * can happen at a time.
     601             :      */
     602        2474 :     tmpfd = OpenTransientFile(tmppath,
     603             :                               O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
     604        2474 :     if (tmpfd < 0)
     605           0 :         ereport(PANIC,
     606             :                 (errcode_for_file_access(),
     607             :                  errmsg("could not create file \"%s\": %m",
     608             :                         tmppath)));
     609             : 
     610             :     /* write magic */
     611        2474 :     errno = 0;
     612        2474 :     if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
     613             :     {
     614             :         /* if write didn't set errno, assume problem is no disk space */
     615           0 :         if (errno == 0)
     616           0 :             errno = ENOSPC;
     617           0 :         ereport(PANIC,
     618             :                 (errcode_for_file_access(),
     619             :                  errmsg("could not write to file \"%s\": %m",
     620             :                         tmppath)));
     621             :     }
     622        2474 :     COMP_CRC32C(crc, &magic, sizeof(magic));
     623             : 
     624             :     /* prevent concurrent creations/drops */
     625        2474 :     LWLockAcquire(ReplicationOriginLock, LW_SHARED);
     626             : 
     627             :     /* write actual data */
     628       26680 :     for (i = 0; i < max_replication_slots; i++)
     629             :     {
     630             :         ReplicationStateOnDisk disk_state;
     631       24206 :         ReplicationState *curstate = &replication_states[i];
     632             :         XLogRecPtr  local_lsn;
     633             : 
     634       24206 :         if (curstate->roident == InvalidRepOriginId)
     635       24116 :             continue;
     636             : 
     637             :         /* zero, to avoid uninitialized padding bytes */
     638          90 :         memset(&disk_state, 0, sizeof(disk_state));
     639             : 
     640          90 :         LWLockAcquire(&curstate->lock, LW_SHARED);
     641             : 
     642          90 :         disk_state.roident = curstate->roident;
     643             : 
     644          90 :         disk_state.remote_lsn = curstate->remote_lsn;
     645          90 :         local_lsn = curstate->local_lsn;
     646             : 
     647          90 :         LWLockRelease(&curstate->lock);
     648             : 
     649             :         /* make sure we only write out a commit that's persistent */
     650          90 :         XLogFlush(local_lsn);
     651             : 
     652          90 :         errno = 0;
     653          90 :         if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
     654             :             sizeof(disk_state))
     655             :         {
     656             :             /* if write didn't set errno, assume problem is no disk space */
     657           0 :             if (errno == 0)
     658           0 :                 errno = ENOSPC;
     659           0 :             ereport(PANIC,
     660             :                     (errcode_for_file_access(),
     661             :                      errmsg("could not write to file \"%s\": %m",
     662             :                             tmppath)));
     663             :         }
     664             : 
     665          90 :         COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
     666             :     }
     667             : 
     668        2474 :     LWLockRelease(ReplicationOriginLock);
     669             : 
     670             :     /* write out the CRC */
     671        2474 :     FIN_CRC32C(crc);
     672        2474 :     errno = 0;
     673        2474 :     if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
     674             :     {
     675             :         /* if write didn't set errno, assume problem is no disk space */
     676           0 :         if (errno == 0)
     677           0 :             errno = ENOSPC;
     678           0 :         ereport(PANIC,
     679             :                 (errcode_for_file_access(),
     680             :                  errmsg("could not write to file \"%s\": %m",
     681             :                         tmppath)));
     682             :     }
     683             : 
     684        2474 :     if (CloseTransientFile(tmpfd) != 0)
     685           0 :         ereport(PANIC,
     686             :                 (errcode_for_file_access(),
     687             :                  errmsg("could not close file \"%s\": %m",
     688             :                         tmppath)));
     689             : 
     690             :     /* fsync, rename to permanent file, fsync file and directory */
     691        2474 :     durable_rename(tmppath, path, PANIC);
     692             : }
     693             : 
     694             : /*
     695             :  * Recover replication replay status from checkpoint data saved earlier by
     696             :  * CheckPointReplicationOrigin.
     697             :  *
     698             :  * This only needs to be called at startup and *not* during every checkpoint
     699             :  * read during recovery (e.g. in HS or PITR from a base backup) afterwards. All
     700             :  * state thereafter can be recovered by looking at commit records.
     701             :  */
     702             : void
     703        1650 : StartupReplicationOrigin(void)
     704             : {
     705        1650 :     const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
     706             :     int         fd;
     707             :     int         readBytes;
     708        1650 :     uint32      magic = REPLICATION_STATE_MAGIC;
     709        1650 :     int         last_state = 0;
     710             :     pg_crc32c   file_crc;
     711             :     pg_crc32c   crc;
     712             : 
     713             :     /* don't want to overwrite already existing state */
     714             : #ifdef USE_ASSERT_CHECKING
     715             :     static bool already_started = false;
     716             : 
     717             :     Assert(!already_started);
     718             :     already_started = true;
     719             : #endif
     720             : 
     721        1650 :     if (max_replication_slots == 0)
     722          92 :         return;
     723             : 
     724        1648 :     INIT_CRC32C(crc);
     725             : 
     726        1648 :     elog(DEBUG2, "starting up replication origin progress state");
     727             : 
     728        1648 :     fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
     729             : 
     730             :     /*
     731             :      * might have had max_replication_slots == 0 last run, or we just brought
     732             :      * up a standby.
     733             :      */
     734        1648 :     if (fd < 0 && errno == ENOENT)
     735          90 :         return;
     736        1558 :     else if (fd < 0)
     737           0 :         ereport(PANIC,
     738             :                 (errcode_for_file_access(),
     739             :                  errmsg("could not open file \"%s\": %m",
     740             :                         path)));
     741             : 
     742             :     /* verify magic, that is written even if nothing was active */
     743        1558 :     readBytes = read(fd, &magic, sizeof(magic));
     744        1558 :     if (readBytes != sizeof(magic))
     745             :     {
     746           0 :         if (readBytes < 0)
     747           0 :             ereport(PANIC,
     748             :                     (errcode_for_file_access(),
     749             :                      errmsg("could not read file \"%s\": %m",
     750             :                             path)));
     751             :         else
     752           0 :             ereport(PANIC,
     753             :                     (errcode(ERRCODE_DATA_CORRUPTED),
     754             :                      errmsg("could not read file \"%s\": read %d of %zu",
     755             :                             path, readBytes, sizeof(magic))));
     756             :     }
     757        1558 :     COMP_CRC32C(crc, &magic, sizeof(magic));
     758             : 
     759        1558 :     if (magic != REPLICATION_STATE_MAGIC)
     760           0 :         ereport(PANIC,
     761             :                 (errmsg("replication checkpoint has wrong magic %u instead of %u",
     762             :                         magic, REPLICATION_STATE_MAGIC)));
     763             : 
     764             :     /* we can skip locking here, no other access is possible */
     765             : 
     766             :     /* recover individual states, until there are no more to be found */
     767             :     while (true)
     768          38 :     {
     769             :         ReplicationStateOnDisk disk_state;
     770             : 
     771        1596 :         readBytes = read(fd, &disk_state, sizeof(disk_state));
     772             : 
     773             :         /* no further data */
     774        1596 :         if (readBytes == sizeof(crc))
     775             :         {
     776             :             /* not pretty, but simple ... */
     777        1558 :             file_crc = *(pg_crc32c *) &disk_state;
     778        1558 :             break;
     779             :         }
     780             : 
     781          38 :         if (readBytes < 0)
     782             :         {
     783           0 :             ereport(PANIC,
     784             :                     (errcode_for_file_access(),
     785             :                      errmsg("could not read file \"%s\": %m",
     786             :                             path)));
     787             :         }
     788             : 
     789          38 :         if (readBytes != sizeof(disk_state))
     790             :         {
     791           0 :             ereport(PANIC,
     792             :                     (errcode_for_file_access(),
     793             :                      errmsg("could not read file \"%s\": read %d of %zu",
     794             :                             path, readBytes, sizeof(disk_state))));
     795             :         }
     796             : 
     797          38 :         COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
     798             : 
     799          38 :         if (last_state == max_replication_slots)
     800           0 :             ereport(PANIC,
     801             :                     (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     802             :                      errmsg("could not find free replication state, increase \"max_replication_slots\"")));
     803             : 
     804             :         /* copy data to shared memory */
     805          38 :         replication_states[last_state].roident = disk_state.roident;
     806          38 :         replication_states[last_state].remote_lsn = disk_state.remote_lsn;
     807          38 :         last_state++;
     808             : 
     809          38 :         ereport(LOG,
     810             :                 (errmsg("recovered replication state of node %d to %X/%X",
     811             :                         disk_state.roident,
     812             :                         LSN_FORMAT_ARGS(disk_state.remote_lsn))));
     813             :     }
     814             : 
     815             :     /* now check checksum */
     816        1558 :     FIN_CRC32C(crc);
     817        1558 :     if (file_crc != crc)
     818           0 :         ereport(PANIC,
     819             :                 (errcode(ERRCODE_DATA_CORRUPTED),
     820             :                  errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
     821             :                         crc, file_crc)));
     822             : 
     823        1558 :     if (CloseTransientFile(fd) != 0)
     824           0 :         ereport(PANIC,
     825             :                 (errcode_for_file_access(),
     826             :                  errmsg("could not close file \"%s\": %m",
     827             :                         path)));
     828             : }
     829             : 
     830             : void
     831           8 : replorigin_redo(XLogReaderState *record)
     832             : {
     833           8 :     uint8       info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
     834             : 
     835           8 :     switch (info)
     836             :     {
     837           4 :         case XLOG_REPLORIGIN_SET:
     838             :             {
     839           4 :                 xl_replorigin_set *xlrec =
     840           4 :                     (xl_replorigin_set *) XLogRecGetData(record);
     841             : 
     842           4 :                 replorigin_advance(xlrec->node_id,
     843             :                                    xlrec->remote_lsn, record->EndRecPtr,
     844           4 :                                    xlrec->force /* backward */ ,
     845             :                                    false /* WAL log */ );
     846           4 :                 break;
     847             :             }
     848           4 :         case XLOG_REPLORIGIN_DROP:
     849             :             {
     850             :                 xl_replorigin_drop *xlrec;
     851             :                 int         i;
     852             : 
     853           4 :                 xlrec = (xl_replorigin_drop *) XLogRecGetData(record);
     854             : 
     855           6 :                 for (i = 0; i < max_replication_slots; i++)
     856             :                 {
     857           6 :                     ReplicationState *state = &replication_states[i];
     858             : 
     859             :                     /* found our slot */
     860           6 :                     if (state->roident == xlrec->node_id)
     861             :                     {
     862             :                         /* reset entry */
     863           4 :                         state->roident = InvalidRepOriginId;
     864           4 :                         state->remote_lsn = InvalidXLogRecPtr;
     865           4 :                         state->local_lsn = InvalidXLogRecPtr;
     866           4 :                         break;
     867             :                     }
     868             :                 }
     869           4 :                 break;
     870             :             }
     871           0 :         default:
     872           0 :             elog(PANIC, "replorigin_redo: unknown op code %u", info);
     873             :     }
     874           8 : }
     875             : 
     876             : 
     877             : /*
     878             :  * Tell the replication origin progress machinery that a commit from 'node'
     879             :  * that originated at the LSN remote_commit on the remote node was replayed
     880             :  * successfully and that we don't need to do so again. In combination with
     881             :  * setting up replorigin_session_origin_lsn and replorigin_session_origin
     882             :  * that ensures we won't lose knowledge about that after a crash if the
     883             :  * transaction had a persistent effect (think of asynchronous commits).
     884             :  *
     885             :  * local_commit needs to be a local LSN of the commit so that we can make sure
     886             :  * upon a checkpoint that enough WAL has been persisted to disk.
     887             :  *
     888             :  * Needs to be called with a RowExclusiveLock on pg_replication_origin,
     889             :  * unless running in recovery.
     890             :  */
     891             : void
     892         454 : replorigin_advance(RepOriginId node,
     893             :                    XLogRecPtr remote_commit, XLogRecPtr local_commit,
     894             :                    bool go_backward, bool wal_log)
     895             : {
     896             :     int         i;
     897         454 :     ReplicationState *replication_state = NULL;
     898         454 :     ReplicationState *free_state = NULL;
     899             : 
     900             :     Assert(node != InvalidRepOriginId);
     901             : 
     902             :     /* we don't track DoNotReplicateId */
     903         454 :     if (node == DoNotReplicateId)
     904           0 :         return;
     905             : 
     906             :     /*
     907             :      * XXX: For the case where this is called by WAL replay, it'd be more
     908             :      * efficient to restore into a backend local hashtable and only dump into
     909             :      * shmem after recovery is finished. Let's wait with implementing that
     910             :      * till it's shown to be a measurable expense
     911             :      */
     912             : 
     913             :     /* Lock exclusively, as we may have to create a new table entry. */
     914         454 :     LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
     915             : 
     916             :     /*
     917             :      * Search for either an existing slot for the origin, or a free one we can
     918             :      * use.
     919             :      */
     920        4134 :     for (i = 0; i < max_replication_slots; i++)
     921             :     {
     922        3768 :         ReplicationState *curstate = &replication_states[i];
     923             : 
     924             :         /* remember where to insert if necessary */
     925        3768 :         if (curstate->roident == InvalidRepOriginId &&
     926             :             free_state == NULL)
     927             :         {
     928         366 :             free_state = curstate;
     929         366 :             continue;
     930             :         }
     931             : 
     932             :         /* not our slot */
     933        3402 :         if (curstate->roident != node)
     934             :         {
     935        3314 :             continue;
     936             :         }
     937             : 
     938             :         /* ok, found slot */
     939          88 :         replication_state = curstate;
     940             : 
     941          88 :         LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE);
     942             : 
     943             :         /* Make sure it's not used by somebody else */
     944          88 :         if (replication_state->acquired_by != 0)
     945             :         {
     946           0 :             ereport(ERROR,
     947             :                     (errcode(ERRCODE_OBJECT_IN_USE),
     948             :                      errmsg("replication origin with ID %d is already active for PID %d",
     949             :                             replication_state->roident,
     950             :                             replication_state->acquired_by)));
     951             :         }
     952             : 
     953          88 :         break;
     954             :     }
     955             : 
     956         454 :     if (replication_state == NULL && free_state == NULL)
     957           0 :         ereport(ERROR,
     958             :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     959             :                  errmsg("could not find free replication state slot for replication origin with ID %d",
     960             :                         node),
     961             :                  errhint("Increase \"max_replication_slots\" and try again.")));
     962             : 
     963         454 :     if (replication_state == NULL)
     964             :     {
     965             :         /* initialize new slot */
     966         366 :         LWLockAcquire(&free_state->lock, LW_EXCLUSIVE);
     967         366 :         replication_state = free_state;
     968             :         Assert(replication_state->remote_lsn == InvalidXLogRecPtr);
     969             :         Assert(replication_state->local_lsn == InvalidXLogRecPtr);
     970         366 :         replication_state->roident = node;
     971             :     }
     972             : 
     973             :     Assert(replication_state->roident != InvalidRepOriginId);
     974             : 
     975             :     /*
     976             :      * If somebody "forcefully" sets this slot, WAL log it, so it's durable
     977             :      * and the standby gets the message. Primarily this will be called during
     978             :      * WAL replay (of commit records) where no WAL logging is necessary.
     979             :      */
     980         454 :     if (wal_log)
     981             :     {
     982             :         xl_replorigin_set xlrec;
     983             : 
     984         372 :         xlrec.remote_lsn = remote_commit;
     985         372 :         xlrec.node_id = node;
     986         372 :         xlrec.force = go_backward;
     987             : 
     988         372 :         XLogBeginInsert();
     989         372 :         XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
     990             : 
     991         372 :         XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_SET);
     992             :     }
     993             : 
     994             :     /*
     995             :      * Due to - harmless - race conditions during a checkpoint we could see
     996             :      * values here that are older than the ones we already have in memory. We
     997             :      * could also see older values for prepared transactions when the prepare
     998             :      * is sent at a later point of time along with commit prepared and there
     999             :      * are other transactions commits between prepare and commit prepared. See
    1000             :      * ReorderBufferFinishPrepared. Don't overwrite those.
    1001             :      */
    1002         454 :     if (go_backward || replication_state->remote_lsn < remote_commit)
    1003         440 :         replication_state->remote_lsn = remote_commit;
    1004         454 :     if (local_commit != InvalidXLogRecPtr &&
    1005          76 :         (go_backward || replication_state->local_lsn < local_commit))
    1006          80 :         replication_state->local_lsn = local_commit;
    1007         454 :     LWLockRelease(&replication_state->lock);
    1008             : 
    1009             :     /*
    1010             :      * Release *after* changing the LSNs, slot isn't acquired and thus could
    1011             :      * otherwise be dropped anytime.
    1012             :      */
    1013         454 :     LWLockRelease(ReplicationOriginLock);
    1014             : }
    1015             : 
    1016             : 
    1017             : XLogRecPtr
    1018          16 : replorigin_get_progress(RepOriginId node, bool flush)
    1019             : {
    1020             :     int         i;
    1021          16 :     XLogRecPtr  local_lsn = InvalidXLogRecPtr;
    1022          16 :     XLogRecPtr  remote_lsn = InvalidXLogRecPtr;
    1023             : 
    1024             :     /* prevent slots from being concurrently dropped */
    1025          16 :     LWLockAcquire(ReplicationOriginLock, LW_SHARED);
    1026             : 
    1027          76 :     for (i = 0; i < max_replication_slots; i++)
    1028             :     {
    1029             :         ReplicationState *state;
    1030             : 
    1031          70 :         state = &replication_states[i];
    1032             : 
    1033          70 :         if (state->roident == node)
    1034             :         {
    1035          10 :             LWLockAcquire(&state->lock, LW_SHARED);
    1036             : 
    1037          10 :             remote_lsn = state->remote_lsn;
    1038          10 :             local_lsn = state->local_lsn;
    1039             : 
    1040          10 :             LWLockRelease(&state->lock);
    1041             : 
    1042          10 :             break;
    1043             :         }
    1044             :     }
    1045             : 
    1046          16 :     LWLockRelease(ReplicationOriginLock);
    1047             : 
    1048          16 :     if (flush && local_lsn != InvalidXLogRecPtr)
    1049           2 :         XLogFlush(local_lsn);
    1050             : 
    1051          16 :     return remote_lsn;
    1052             : }
    1053             : 
    1054             : /*
    1055             :  * Tear down a (possibly) configured session replication origin during process
    1056             :  * exit.
    1057             :  */
    1058             : static void
    1059         800 : ReplicationOriginExitCleanup(int code, Datum arg)
    1060             : {
    1061         800 :     ConditionVariable *cv = NULL;
    1062             : 
    1063         800 :     if (session_replication_state == NULL)
    1064         352 :         return;
    1065             : 
    1066         448 :     LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
    1067             : 
    1068         448 :     if (session_replication_state->acquired_by == MyProcPid)
    1069             :     {
    1070         428 :         cv = &session_replication_state->origin_cv;
    1071             : 
    1072         428 :         session_replication_state->acquired_by = 0;
    1073         428 :         session_replication_state = NULL;
    1074             :     }
    1075             : 
    1076         448 :     LWLockRelease(ReplicationOriginLock);
    1077             : 
    1078         448 :     if (cv)
    1079         428 :         ConditionVariableBroadcast(cv);
    1080             : }
    1081             : 
    1082             : /*
    1083             :  * Setup a replication origin in the shared memory struct if it doesn't
    1084             :  * already exist and cache access to the specific ReplicationSlot so the
    1085             :  * array doesn't have to be searched when calling
    1086             :  * replorigin_session_advance().
    1087             :  *
    1088             :  * Normally only one such cached origin can exist per process so the cached
    1089             :  * value can only be set again after the previous value is torn down with
    1090             :  * replorigin_session_reset(). For this normal case pass acquired_by = 0
    1091             :  * (meaning the slot is not allowed to be already acquired by another process).
    1092             :  *
    1093             :  * However, sometimes multiple processes can safely re-use the same origin slot
    1094             :  * (for example, multiple parallel apply processes can safely use the same
    1095             :  * origin, provided they maintain commit order by allowing only one process to
    1096             :  * commit at a time). For this case the first process must pass acquired_by =
    1097             :  * 0, and then the other processes sharing that same origin can pass
    1098             :  * acquired_by = PID of the first process.
    1099             :  */
    1100             : void
    1101         806 : replorigin_session_setup(RepOriginId node, int acquired_by)
    1102             : {
    1103             :     static bool registered_cleanup;
    1104             :     int         i;
    1105         806 :     int         free_slot = -1;
    1106             : 
    1107         806 :     if (!registered_cleanup)
    1108             :     {
    1109         800 :         on_shmem_exit(ReplicationOriginExitCleanup, 0);
    1110         800 :         registered_cleanup = true;
    1111             :     }
    1112             : 
    1113             :     Assert(max_replication_slots > 0);
    1114             : 
    1115         806 :     if (session_replication_state != NULL)
    1116           2 :         ereport(ERROR,
    1117             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1118             :                  errmsg("cannot setup replication origin when one is already setup")));
    1119             : 
    1120             :     /* Lock exclusively, as we may have to create a new table entry. */
    1121         804 :     LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
    1122             : 
    1123             :     /*
    1124             :      * Search for either an existing slot for the origin, or a free one we can
    1125             :      * use.
    1126             :      */
    1127        3378 :     for (i = 0; i < max_replication_slots; i++)
    1128             :     {
    1129        3164 :         ReplicationState *curstate = &replication_states[i];
    1130             : 
    1131             :         /* remember where to insert if necessary */
    1132        3164 :         if (curstate->roident == InvalidRepOriginId &&
    1133             :             free_slot == -1)
    1134             :         {
    1135         220 :             free_slot = i;
    1136         220 :             continue;
    1137             :         }
    1138             : 
    1139             :         /* not our slot */
    1140        2944 :         if (curstate->roident != node)
    1141        2354 :             continue;
    1142             : 
    1143         590 :         else if (curstate->acquired_by != 0 && acquired_by == 0)
    1144             :         {
    1145           0 :             ereport(ERROR,
    1146             :                     (errcode(ERRCODE_OBJECT_IN_USE),
    1147             :                      errmsg("replication origin with ID %d is already active for PID %d",
    1148             :                             curstate->roident, curstate->acquired_by)));
    1149             :         }
    1150             : 
    1151             :         /* ok, found slot */
    1152         590 :         session_replication_state = curstate;
    1153         590 :         break;
    1154             :     }
    1155             : 
    1156             : 
    1157         804 :     if (session_replication_state == NULL && free_slot == -1)
    1158           0 :         ereport(ERROR,
    1159             :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
    1160             :                  errmsg("could not find free replication state slot for replication origin with ID %d",
    1161             :                         node),
    1162             :                  errhint("Increase \"max_replication_slots\" and try again.")));
    1163         804 :     else if (session_replication_state == NULL)
    1164             :     {
    1165             :         /* initialize new slot */
    1166         214 :         session_replication_state = &replication_states[free_slot];
    1167             :         Assert(session_replication_state->remote_lsn == InvalidXLogRecPtr);
    1168             :         Assert(session_replication_state->local_lsn == InvalidXLogRecPtr);
    1169         214 :         session_replication_state->roident = node;
    1170             :     }
    1171             : 
    1172             : 
    1173             :     Assert(session_replication_state->roident != InvalidRepOriginId);
    1174             : 
    1175         804 :     if (acquired_by == 0)
    1176         784 :         session_replication_state->acquired_by = MyProcPid;
    1177          20 :     else if (session_replication_state->acquired_by != acquired_by)
    1178           0 :         elog(ERROR, "could not find replication state slot for replication origin with OID %u which was acquired by %d",
    1179             :              node, acquired_by);
    1180             : 
    1181         804 :     LWLockRelease(ReplicationOriginLock);
    1182             : 
    1183             :     /* probably this one is pointless */
    1184         804 :     ConditionVariableBroadcast(&session_replication_state->origin_cv);
    1185         804 : }
    1186             : 
    1187             : /*
    1188             :  * Reset replay state previously setup in this session.
    1189             :  *
    1190             :  * This function may only be called if an origin was setup with
    1191             :  * replorigin_session_setup().
    1192             :  */
    1193             : void
    1194         358 : replorigin_session_reset(void)
    1195             : {
    1196             :     ConditionVariable *cv;
    1197             : 
    1198             :     Assert(max_replication_slots != 0);
    1199             : 
    1200         358 :     if (session_replication_state == NULL)
    1201           2 :         ereport(ERROR,
    1202             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1203             :                  errmsg("no replication origin is configured")));
    1204             : 
    1205         356 :     LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
    1206             : 
    1207         356 :     session_replication_state->acquired_by = 0;
    1208         356 :     cv = &session_replication_state->origin_cv;
    1209         356 :     session_replication_state = NULL;
    1210             : 
    1211         356 :     LWLockRelease(ReplicationOriginLock);
    1212             : 
    1213         356 :     ConditionVariableBroadcast(cv);
    1214         356 : }
    1215             : 
    1216             : /*
    1217             :  * Do the same work replorigin_advance() does, just on the session's
    1218             :  * configured origin.
    1219             :  *
    1220             :  * This is noticeably cheaper than using replorigin_advance().
    1221             :  */
    1222             : void
    1223        2082 : replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit)
    1224             : {
    1225             :     Assert(session_replication_state != NULL);
    1226             :     Assert(session_replication_state->roident != InvalidRepOriginId);
    1227             : 
    1228        2082 :     LWLockAcquire(&session_replication_state->lock, LW_EXCLUSIVE);
    1229        2082 :     if (session_replication_state->local_lsn < local_commit)
    1230        2082 :         session_replication_state->local_lsn = local_commit;
    1231        2082 :     if (session_replication_state->remote_lsn < remote_commit)
    1232        1000 :         session_replication_state->remote_lsn = remote_commit;
    1233        2082 :     LWLockRelease(&session_replication_state->lock);
    1234        2082 : }
    1235             : 
    1236             : /*
    1237             :  * Ask the machinery about the point up to which we successfully replayed
    1238             :  * changes from an already setup replication origin.
    1239             :  */
    1240             : XLogRecPtr
    1241         412 : replorigin_session_get_progress(bool flush)
    1242             : {
    1243             :     XLogRecPtr  remote_lsn;
    1244             :     XLogRecPtr  local_lsn;
    1245             : 
    1246             :     Assert(session_replication_state != NULL);
    1247             : 
    1248         412 :     LWLockAcquire(&session_replication_state->lock, LW_SHARED);
    1249         412 :     remote_lsn = session_replication_state->remote_lsn;
    1250         412 :     local_lsn = session_replication_state->local_lsn;
    1251         412 :     LWLockRelease(&session_replication_state->lock);
    1252             : 
    1253         412 :     if (flush && local_lsn != InvalidXLogRecPtr)
    1254           2 :         XLogFlush(local_lsn);
    1255             : 
    1256         412 :     return remote_lsn;
    1257             : }
    1258             : 
    1259             : 
    1260             : 
    1261             : /* ---------------------------------------------------------------------------
    1262             :  * SQL functions for working with replication origin.
    1263             :  *
    1264             :  * These mostly should be fairly short wrappers around more generic functions.
    1265             :  * ---------------------------------------------------------------------------
    1266             :  */
    1267             : 
    1268             : /*
    1269             :  * Create replication origin for the passed in name, and return the assigned
    1270             :  * oid.
    1271             :  */
    1272             : Datum
    1273          18 : pg_replication_origin_create(PG_FUNCTION_ARGS)
    1274             : {
    1275             :     char       *name;
    1276             :     RepOriginId roident;
    1277             : 
    1278          18 :     replorigin_check_prerequisites(false, false);
    1279             : 
    1280          18 :     name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
    1281             : 
    1282             :     /*
    1283             :      * Replication origins "any and "none" are reserved for system options.
    1284             :      * The origins "pg_xxx" are reserved for internal use.
    1285             :      */
    1286          18 :     if (IsReservedName(name) || IsReservedOriginName(name))
    1287           6 :         ereport(ERROR,
    1288             :                 (errcode(ERRCODE_RESERVED_NAME),
    1289             :                  errmsg("replication origin name \"%s\" is reserved",
    1290             :                         name),
    1291             :                  errdetail("Origin names \"%s\", \"%s\", and names starting with \"pg_\" are reserved.",
    1292             :                            LOGICALREP_ORIGIN_ANY, LOGICALREP_ORIGIN_NONE)));
    1293             : 
    1294             :     /*
    1295             :      * If built with appropriate switch, whine when regression-testing
    1296             :      * conventions for replication origin names are violated.
    1297             :      */
    1298             : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
    1299             :     if (strncmp(name, "regress_", 8) != 0)
    1300             :         elog(WARNING, "replication origins created by regression test cases should have names starting with \"regress_\"");
    1301             : #endif
    1302             : 
    1303          12 :     roident = replorigin_create(name);
    1304             : 
    1305          10 :     pfree(name);
    1306             : 
    1307          10 :     PG_RETURN_OID(roident);
    1308             : }
    1309             : 
    1310             : /*
    1311             :  * Drop replication origin.
    1312             :  */
    1313             : Datum
    1314          14 : pg_replication_origin_drop(PG_FUNCTION_ARGS)
    1315             : {
    1316             :     char       *name;
    1317             : 
    1318          14 :     replorigin_check_prerequisites(false, false);
    1319             : 
    1320          14 :     name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
    1321             : 
    1322          14 :     replorigin_drop_by_name(name, false, true);
    1323             : 
    1324          12 :     pfree(name);
    1325             : 
    1326          12 :     PG_RETURN_VOID();
    1327             : }
    1328             : 
    1329             : /*
    1330             :  * Return oid of a replication origin.
    1331             :  */
    1332             : Datum
    1333           0 : pg_replication_origin_oid(PG_FUNCTION_ARGS)
    1334             : {
    1335             :     char       *name;
    1336             :     RepOriginId roident;
    1337             : 
    1338           0 :     replorigin_check_prerequisites(false, false);
    1339             : 
    1340           0 :     name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
    1341           0 :     roident = replorigin_by_name(name, true);
    1342             : 
    1343           0 :     pfree(name);
    1344             : 
    1345           0 :     if (OidIsValid(roident))
    1346           0 :         PG_RETURN_OID(roident);
    1347           0 :     PG_RETURN_NULL();
    1348             : }
    1349             : 
    1350             : /*
    1351             :  * Setup a replication origin for this session.
    1352             :  */
    1353             : Datum
    1354          12 : pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
    1355             : {
    1356             :     char       *name;
    1357             :     RepOriginId origin;
    1358             : 
    1359          12 :     replorigin_check_prerequisites(true, false);
    1360             : 
    1361          12 :     name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
    1362          12 :     origin = replorigin_by_name(name, false);
    1363          10 :     replorigin_session_setup(origin, 0);
    1364             : 
    1365           8 :     replorigin_session_origin = origin;
    1366             : 
    1367           8 :     pfree(name);
    1368             : 
    1369           8 :     PG_RETURN_VOID();
    1370             : }
    1371             : 
    1372             : /*
    1373             :  * Reset previously setup origin in this session
    1374             :  */
    1375             : Datum
    1376          10 : pg_replication_origin_session_reset(PG_FUNCTION_ARGS)
    1377             : {
    1378          10 :     replorigin_check_prerequisites(true, false);
    1379             : 
    1380          10 :     replorigin_session_reset();
    1381             : 
    1382           8 :     replorigin_session_origin = InvalidRepOriginId;
    1383           8 :     replorigin_session_origin_lsn = InvalidXLogRecPtr;
    1384           8 :     replorigin_session_origin_timestamp = 0;
    1385             : 
    1386           8 :     PG_RETURN_VOID();
    1387             : }
    1388             : 
    1389             : /*
    1390             :  * Has a replication origin been setup for this session.
    1391             :  */
    1392             : Datum
    1393           0 : pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)
    1394             : {
    1395           0 :     replorigin_check_prerequisites(false, false);
    1396             : 
    1397           0 :     PG_RETURN_BOOL(replorigin_session_origin != InvalidRepOriginId);
    1398             : }
    1399             : 
    1400             : 
    1401             : /*
    1402             :  * Return the replication progress for origin setup in the current session.
    1403             :  *
    1404             :  * If 'flush' is set to true it is ensured that the returned value corresponds
    1405             :  * to a local transaction that has been flushed. This is useful if asynchronous
    1406             :  * commits are used when replaying replicated transactions.
    1407             :  */
    1408             : Datum
    1409           4 : pg_replication_origin_session_progress(PG_FUNCTION_ARGS)
    1410             : {
    1411           4 :     XLogRecPtr  remote_lsn = InvalidXLogRecPtr;
    1412           4 :     bool        flush = PG_GETARG_BOOL(0);
    1413             : 
    1414           4 :     replorigin_check_prerequisites(true, false);
    1415             : 
    1416           4 :     if (session_replication_state == NULL)
    1417           0 :         ereport(ERROR,
    1418             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1419             :                  errmsg("no replication origin is configured")));
    1420             : 
    1421           4 :     remote_lsn = replorigin_session_get_progress(flush);
    1422             : 
    1423           4 :     if (remote_lsn == InvalidXLogRecPtr)
    1424           0 :         PG_RETURN_NULL();
    1425             : 
    1426           4 :     PG_RETURN_LSN(remote_lsn);
    1427             : }
    1428             : 
    1429             : Datum
    1430           2 : pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)
    1431             : {
    1432           2 :     XLogRecPtr  location = PG_GETARG_LSN(0);
    1433             : 
    1434           2 :     replorigin_check_prerequisites(true, false);
    1435             : 
    1436           2 :     if (session_replication_state == NULL)
    1437           0 :         ereport(ERROR,
    1438             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1439             :                  errmsg("no replication origin is configured")));
    1440             : 
    1441           2 :     replorigin_session_origin_lsn = location;
    1442           2 :     replorigin_session_origin_timestamp = PG_GETARG_TIMESTAMPTZ(1);
    1443             : 
    1444           2 :     PG_RETURN_VOID();
    1445             : }
    1446             : 
    1447             : Datum
    1448           0 : pg_replication_origin_xact_reset(PG_FUNCTION_ARGS)
    1449             : {
    1450           0 :     replorigin_check_prerequisites(true, false);
    1451             : 
    1452           0 :     replorigin_session_origin_lsn = InvalidXLogRecPtr;
    1453           0 :     replorigin_session_origin_timestamp = 0;
    1454             : 
    1455           0 :     PG_RETURN_VOID();
    1456             : }
    1457             : 
    1458             : 
    1459             : Datum
    1460           6 : pg_replication_origin_advance(PG_FUNCTION_ARGS)
    1461             : {
    1462           6 :     text       *name = PG_GETARG_TEXT_PP(0);
    1463           6 :     XLogRecPtr  remote_commit = PG_GETARG_LSN(1);
    1464             :     RepOriginId node;
    1465             : 
    1466           6 :     replorigin_check_prerequisites(true, false);
    1467             : 
    1468             :     /* lock to prevent the replication origin from vanishing */
    1469           6 :     LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
    1470             : 
    1471           6 :     node = replorigin_by_name(text_to_cstring(name), false);
    1472             : 
    1473             :     /*
    1474             :      * Can't sensibly pass a local commit to be flushed at checkpoint - this
    1475             :      * xact hasn't committed yet. This is why this function should be used to
    1476             :      * set up the initial replication state, but not for replay.
    1477             :      */
    1478           4 :     replorigin_advance(node, remote_commit, InvalidXLogRecPtr,
    1479             :                        true /* go backward */ , true /* WAL log */ );
    1480             : 
    1481           4 :     UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
    1482             : 
    1483           4 :     PG_RETURN_VOID();
    1484             : }
    1485             : 
    1486             : 
    1487             : /*
    1488             :  * Return the replication progress for an individual replication origin.
    1489             :  *
    1490             :  * If 'flush' is set to true it is ensured that the returned value corresponds
    1491             :  * to a local transaction that has been flushed. This is useful if asynchronous
    1492             :  * commits are used when replaying replicated transactions.
    1493             :  */
    1494             : Datum
    1495           6 : pg_replication_origin_progress(PG_FUNCTION_ARGS)
    1496             : {
    1497             :     char       *name;
    1498             :     bool        flush;
    1499             :     RepOriginId roident;
    1500           6 :     XLogRecPtr  remote_lsn = InvalidXLogRecPtr;
    1501             : 
    1502           6 :     replorigin_check_prerequisites(true, true);
    1503             : 
    1504           6 :     name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
    1505           6 :     flush = PG_GETARG_BOOL(1);
    1506             : 
    1507           6 :     roident = replorigin_by_name(name, false);
    1508             :     Assert(OidIsValid(roident));
    1509             : 
    1510           4 :     remote_lsn = replorigin_get_progress(roident, flush);
    1511             : 
    1512           4 :     if (remote_lsn == InvalidXLogRecPtr)
    1513           0 :         PG_RETURN_NULL();
    1514             : 
    1515           4 :     PG_RETURN_LSN(remote_lsn);
    1516             : }
    1517             : 
    1518             : 
    1519             : Datum
    1520          12 : pg_show_replication_origin_status(PG_FUNCTION_ARGS)
    1521             : {
    1522          12 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
    1523             :     int         i;
    1524             : #define REPLICATION_ORIGIN_PROGRESS_COLS 4
    1525             : 
    1526             :     /* we want to return 0 rows if slot is set to zero */
    1527          12 :     replorigin_check_prerequisites(false, true);
    1528             : 
    1529          12 :     InitMaterializedSRF(fcinfo, 0);
    1530             : 
    1531             :     /* prevent slots from being concurrently dropped */
    1532          12 :     LWLockAcquire(ReplicationOriginLock, LW_SHARED);
    1533             : 
    1534             :     /*
    1535             :      * Iterate through all possible replication_states, display if they are
    1536             :      * filled. Note that we do not take any locks, so slightly corrupted/out
    1537             :      * of date values are a possibility.
    1538             :      */
    1539         108 :     for (i = 0; i < max_replication_slots; i++)
    1540             :     {
    1541             :         ReplicationState *state;
    1542             :         Datum       values[REPLICATION_ORIGIN_PROGRESS_COLS];
    1543             :         bool        nulls[REPLICATION_ORIGIN_PROGRESS_COLS];
    1544             :         char       *roname;
    1545             : 
    1546          96 :         state = &replication_states[i];
    1547             : 
    1548             :         /* unused slot, nothing to display */
    1549          96 :         if (state->roident == InvalidRepOriginId)
    1550          86 :             continue;
    1551             : 
    1552          10 :         memset(values, 0, sizeof(values));
    1553          10 :         memset(nulls, 1, sizeof(nulls));
    1554             : 
    1555          10 :         values[0] = ObjectIdGetDatum(state->roident);
    1556          10 :         nulls[0] = false;
    1557             : 
    1558             :         /*
    1559             :          * We're not preventing the origin to be dropped concurrently, so
    1560             :          * silently accept that it might be gone.
    1561             :          */
    1562          10 :         if (replorigin_by_oid(state->roident, true,
    1563             :                               &roname))
    1564             :         {
    1565          10 :             values[1] = CStringGetTextDatum(roname);
    1566          10 :             nulls[1] = false;
    1567             :         }
    1568             : 
    1569          10 :         LWLockAcquire(&state->lock, LW_SHARED);
    1570             : 
    1571          10 :         values[2] = LSNGetDatum(state->remote_lsn);
    1572          10 :         nulls[2] = false;
    1573             : 
    1574          10 :         values[3] = LSNGetDatum(state->local_lsn);
    1575          10 :         nulls[3] = false;
    1576             : 
    1577          10 :         LWLockRelease(&state->lock);
    1578             : 
    1579          10 :         tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
    1580             :                              values, nulls);
    1581             :     }
    1582             : 
    1583          12 :     LWLockRelease(ReplicationOriginLock);
    1584             : 
    1585             : #undef REPLICATION_ORIGIN_PROGRESS_COLS
    1586             : 
    1587          12 :     return (Datum) 0;
    1588             : }

Generated by: LCOV version 1.14