LCOV - code coverage report
Current view: top level - src/backend/replication/logical - origin.c (source / functions) Hit Total Coverage
Test: PostgreSQL 19devel Lines: 408 466 87.6 %
Date: 2026-01-31 01:17:55 Functions: 31 33 93.9 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * origin.c
       4             :  *    Logical replication progress tracking support.
       5             :  *
       6             :  * Copyright (c) 2013-2026, PostgreSQL Global Development Group
       7             :  *
       8             :  * IDENTIFICATION
       9             :  *    src/backend/replication/logical/origin.c
      10             :  *
      11             :  * NOTES
      12             :  *
      13             :  * This file provides the following:
      14             :  * * An infrastructure to name nodes in a replication setup
      15             :  * * A facility to efficiently store and persist replication progress in an
      16             :  *   efficient and durable manner.
      17             :  *
      18             :  * Replication origin consists of a descriptive, user defined, external
      19             :  * name and a short, thus space efficient, internal 2 byte one. This split
      20             :  * exists because replication origin have to be stored in WAL and shared
      21             :  * memory and long descriptors would be inefficient.  For now only use 2 bytes
      22             :  * for the internal id of a replication origin as it seems unlikely that there
      23             :  * soon will be more than 65k nodes in one replication setup; and using only
      24             :  * two bytes allow us to be more space efficient.
      25             :  *
      26             :  * Replication progress is tracked in a shared memory table
      27             :  * (ReplicationState) that's dumped to disk every checkpoint. Entries
      28             :  * ('slots') in this table are identified by the internal id. That's the case
      29             :  * because it allows to increase replication progress during crash
      30             :  * recovery. To allow doing so we store the original LSN (from the originating
      31             :  * system) of a transaction in the commit record. That allows to recover the
      32             :  * precise replayed state after crash recovery; without requiring synchronous
      33             :  * commits. Allowing logical replication to use asynchronous commit is
      34             :  * generally good for performance, but especially important as it allows a
      35             :  * single threaded replay process to keep up with a source that has multiple
      36             :  * backends generating changes concurrently.  For efficiency and simplicity
      37             :  * reasons a backend can setup one replication origin that's from then used as
      38             :  * the source of changes produced by the backend, until reset again.
      39             :  *
      40             :  * This infrastructure is intended to be used in cooperation with logical
      41             :  * decoding. When replaying from a remote system the configured origin is
      42             :  * provided to output plugins, allowing prevention of replication loops and
      43             :  * other filtering.
      44             :  *
      45             :  * There are several levels of locking at work:
      46             :  *
      47             :  * * To create and drop replication origins an exclusive lock on
      48             :  *   pg_replication_slot is required for the duration. That allows us to
      49             :  *   safely and conflict free assign new origins using a dirty snapshot.
      50             :  *
      51             :  * * When creating an in-memory replication progress slot the ReplicationOrigin
      52             :  *   LWLock has to be held exclusively; when iterating over the replication
      53             :  *   progress a shared lock has to be held, the same when advancing the
      54             :  *   replication progress of an individual backend that has not setup as the
      55             :  *   session's replication origin.
      56             :  *
      57             :  * * When manipulating or looking at the remote_lsn and local_lsn fields of a
      58             :  *   replication progress slot that slot's lwlock has to be held. That's
      59             :  *   primarily because we do not assume 8 byte writes (the LSN) is atomic on
      60             :  *   all our platforms, but it also simplifies memory ordering concerns
      61             :  *   between the remote and local lsn. We use a lwlock instead of a spinlock
      62             :  *   so it's less harmful to hold the lock over a WAL write
      63             :  *   (cf. AdvanceReplicationProgress).
      64             :  *
      65             :  * ---------------------------------------------------------------------------
      66             :  */
      67             : 
      68             : #include "postgres.h"
      69             : 
      70             : #include <unistd.h>
      71             : #include <sys/stat.h>
      72             : 
      73             : #include "access/genam.h"
      74             : #include "access/htup_details.h"
      75             : #include "access/table.h"
      76             : #include "access/xact.h"
      77             : #include "access/xloginsert.h"
      78             : #include "catalog/catalog.h"
      79             : #include "catalog/indexing.h"
      80             : #include "catalog/pg_subscription.h"
      81             : #include "funcapi.h"
      82             : #include "miscadmin.h"
      83             : #include "nodes/execnodes.h"
      84             : #include "pgstat.h"
      85             : #include "replication/origin.h"
      86             : #include "replication/slot.h"
      87             : #include "storage/condition_variable.h"
      88             : #include "storage/fd.h"
      89             : #include "storage/ipc.h"
      90             : #include "storage/lmgr.h"
      91             : #include "utils/builtins.h"
      92             : #include "utils/fmgroids.h"
      93             : #include "utils/guc.h"
      94             : #include "utils/pg_lsn.h"
      95             : #include "utils/rel.h"
      96             : #include "utils/snapmgr.h"
      97             : #include "utils/syscache.h"
      98             : 
      99             : /* paths for replication origin checkpoint files */
     100             : #define PG_REPLORIGIN_CHECKPOINT_FILENAME PG_LOGICAL_DIR "/replorigin_checkpoint"
     101             : #define PG_REPLORIGIN_CHECKPOINT_TMPFILE PG_REPLORIGIN_CHECKPOINT_FILENAME ".tmp"
     102             : 
     103             : /* GUC variables */
     104             : int         max_active_replication_origins = 10;
     105             : 
     106             : /*
     107             :  * Replay progress of a single remote node.
     108             :  */
     109             : typedef struct ReplicationState
     110             : {
     111             :     /*
     112             :      * Local identifier for the remote node.
     113             :      */
     114             :     ReplOriginId roident;
     115             : 
     116             :     /*
     117             :      * Location of the latest commit from the remote side.
     118             :      */
     119             :     XLogRecPtr  remote_lsn;
     120             : 
     121             :     /*
     122             :      * Remember the local lsn of the commit record so we can XLogFlush() to it
     123             :      * during a checkpoint so we know the commit record actually is safe on
     124             :      * disk.
     125             :      */
     126             :     XLogRecPtr  local_lsn;
     127             : 
     128             :     /*
     129             :      * PID of backend that's acquired slot, or 0 if none.
     130             :      */
     131             :     int         acquired_by;
     132             : 
     133             :     /* Count of processes that are currently using this origin. */
     134             :     int         refcount;
     135             : 
     136             :     /*
     137             :      * Condition variable that's signaled when acquired_by changes.
     138             :      */
     139             :     ConditionVariable origin_cv;
     140             : 
     141             :     /*
     142             :      * Lock protecting remote_lsn and local_lsn.
     143             :      */
     144             :     LWLock      lock;
     145             : } ReplicationState;
     146             : 
     147             : /*
     148             :  * On disk version of ReplicationState.
     149             :  */
     150             : typedef struct ReplicationStateOnDisk
     151             : {
     152             :     ReplOriginId roident;
     153             :     XLogRecPtr  remote_lsn;
     154             : } ReplicationStateOnDisk;
     155             : 
     156             : 
     157             : typedef struct ReplicationStateCtl
     158             : {
     159             :     /* Tranche to use for per-origin LWLocks */
     160             :     int         tranche_id;
     161             :     /* Array of length max_active_replication_origins */
     162             :     ReplicationState states[FLEXIBLE_ARRAY_MEMBER];
     163             : } ReplicationStateCtl;
     164             : 
     165             : /* Global variable for per-transaction replication origin state */
     166             : ReplOriginXactState replorigin_xact_state = {
     167             :     .origin = InvalidReplOriginId,  /* assumed identity */
     168             :     .origin_lsn = InvalidXLogRecPtr,
     169             :     .origin_timestamp = 0
     170             : };
     171             : 
     172             : /*
     173             :  * Base address into a shared memory array of replication states of size
     174             :  * max_active_replication_origins.
     175             :  */
     176             : static ReplicationState *replication_states;
     177             : 
     178             : /*
     179             :  * Actual shared memory block (replication_states[] is now part of this).
     180             :  */
     181             : static ReplicationStateCtl *replication_states_ctl;
     182             : 
     183             : /*
     184             :  * We keep a pointer to this backend's ReplicationState to avoid having to
     185             :  * search the replication_states array in replorigin_session_advance for each
     186             :  * remote commit.  (Ownership of a backend's own entry can only be changed by
     187             :  * that backend.)
     188             :  */
     189             : static ReplicationState *session_replication_state = NULL;
     190             : 
     191             : /* Magic for on disk files. */
     192             : #define REPLICATION_STATE_MAGIC ((uint32) 0x1257DADE)
     193             : 
     194             : static void
     195         136 : replorigin_check_prerequisites(bool check_origins, bool recoveryOK)
     196             : {
     197         136 :     if (check_origins && max_active_replication_origins == 0)
     198           0 :         ereport(ERROR,
     199             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     200             :                  errmsg("cannot query or manipulate replication origin when \"max_active_replication_origins\" is 0")));
     201             : 
     202         136 :     if (!recoveryOK && RecoveryInProgress())
     203           0 :         ereport(ERROR,
     204             :                 (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
     205             :                  errmsg("cannot manipulate replication origins during recovery")));
     206         136 : }
     207             : 
     208             : 
     209             : /*
     210             :  * IsReservedOriginName
     211             :  *      True iff name is either "none" or "any".
     212             :  */
     213             : static bool
     214          26 : IsReservedOriginName(const char *name)
     215             : {
     216          50 :     return ((pg_strcasecmp(name, LOGICALREP_ORIGIN_NONE) == 0) ||
     217          24 :             (pg_strcasecmp(name, LOGICALREP_ORIGIN_ANY) == 0));
     218             : }
     219             : 
     220             : /* ---------------------------------------------------------------------------
     221             :  * Functions for working with replication origins themselves.
     222             :  * ---------------------------------------------------------------------------
     223             :  */
     224             : 
     225             : /*
     226             :  * Check for a persistent replication origin identified by name.
     227             :  *
     228             :  * Returns InvalidOid if the node isn't known yet and missing_ok is true.
     229             :  */
     230             : ReplOriginId
     231        2024 : replorigin_by_name(const char *roname, bool missing_ok)
     232             : {
     233             :     Form_pg_replication_origin ident;
     234        2024 :     Oid         roident = InvalidOid;
     235             :     HeapTuple   tuple;
     236             :     Datum       roname_d;
     237             : 
     238        2024 :     roname_d = CStringGetTextDatum(roname);
     239             : 
     240        2024 :     tuple = SearchSysCache1(REPLORIGNAME, roname_d);
     241        2024 :     if (HeapTupleIsValid(tuple))
     242             :     {
     243        1264 :         ident = (Form_pg_replication_origin) GETSTRUCT(tuple);
     244        1264 :         roident = ident->roident;
     245        1264 :         ReleaseSysCache(tuple);
     246             :     }
     247         760 :     else if (!missing_ok)
     248           8 :         ereport(ERROR,
     249             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
     250             :                  errmsg("replication origin \"%s\" does not exist",
     251             :                         roname)));
     252             : 
     253        2016 :     return roident;
     254             : }
     255             : 
     256             : /*
     257             :  * Create a replication origin.
     258             :  *
     259             :  * Needs to be called in a transaction.
     260             :  */
     261             : ReplOriginId
     262         742 : replorigin_create(const char *roname)
     263             : {
     264             :     Oid         roident;
     265         742 :     HeapTuple   tuple = NULL;
     266             :     Relation    rel;
     267             :     Datum       roname_d;
     268             :     SnapshotData SnapshotDirty;
     269             :     SysScanDesc scan;
     270             :     ScanKeyData key;
     271             : 
     272             :     /*
     273             :      * To avoid needing a TOAST table for pg_replication_origin, we limit
     274             :      * replication origin names to 512 bytes.  This should be more than enough
     275             :      * for all practical use.
     276             :      */
     277         742 :     if (strlen(roname) > MAX_RONAME_LEN)
     278           6 :         ereport(ERROR,
     279             :                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
     280             :                  errmsg("replication origin name is too long"),
     281             :                  errdetail("Replication origin names must be no longer than %d bytes.",
     282             :                            MAX_RONAME_LEN)));
     283             : 
     284         736 :     roname_d = CStringGetTextDatum(roname);
     285             : 
     286             :     Assert(IsTransactionState());
     287             : 
     288             :     /*
     289             :      * We need the numeric replication origin to be 16bit wide, so we cannot
     290             :      * rely on the normal oid allocation. Instead we simply scan
     291             :      * pg_replication_origin for the first unused id. That's not particularly
     292             :      * efficient, but this should be a fairly infrequent operation - we can
     293             :      * easily spend a bit more code on this when it turns out it needs to be
     294             :      * faster.
     295             :      *
     296             :      * We handle concurrency by taking an exclusive lock (allowing reads!)
     297             :      * over the table for the duration of the search. Because we use a "dirty
     298             :      * snapshot" we can read rows that other in-progress sessions have
     299             :      * written, even though they would be invisible with normal snapshots. Due
     300             :      * to the exclusive lock there's no danger that new rows can appear while
     301             :      * we're checking.
     302             :      */
     303         736 :     InitDirtySnapshot(SnapshotDirty);
     304             : 
     305         736 :     rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
     306             : 
     307             :     /*
     308             :      * We want to be able to access pg_replication_origin without setting up a
     309             :      * snapshot.  To make that safe, it needs to not have a TOAST table, since
     310             :      * TOASTed data cannot be fetched without a snapshot.  As of this writing,
     311             :      * its only varlena column is roname, which we limit to 512 bytes to avoid
     312             :      * needing out-of-line storage.  If you add a TOAST table to this catalog,
     313             :      * be sure to set up a snapshot everywhere it might be needed.  For more
     314             :      * information, see https://postgr.es/m/ZvMSUPOqUU-VNADN%40nathan.
     315             :      */
     316             :     Assert(!OidIsValid(rel->rd_rel->reltoastrelid));
     317             : 
     318        1332 :     for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
     319             :     {
     320             :         bool        nulls[Natts_pg_replication_origin];
     321             :         Datum       values[Natts_pg_replication_origin];
     322             :         bool        collides;
     323             : 
     324        1332 :         CHECK_FOR_INTERRUPTS();
     325             : 
     326        1332 :         ScanKeyInit(&key,
     327             :                     Anum_pg_replication_origin_roident,
     328             :                     BTEqualStrategyNumber, F_OIDEQ,
     329             :                     ObjectIdGetDatum(roident));
     330             : 
     331        1332 :         scan = systable_beginscan(rel, ReplicationOriginIdentIndex,
     332             :                                   true /* indexOK */ ,
     333             :                                   &SnapshotDirty,
     334             :                                   1, &key);
     335             : 
     336        1332 :         collides = HeapTupleIsValid(systable_getnext(scan));
     337             : 
     338        1332 :         systable_endscan(scan);
     339             : 
     340        1332 :         if (!collides)
     341             :         {
     342             :             /*
     343             :              * Ok, found an unused roident, insert the new row and do a CCI,
     344             :              * so our callers can look it up if they want to.
     345             :              */
     346         736 :             memset(&nulls, 0, sizeof(nulls));
     347             : 
     348         736 :             values[Anum_pg_replication_origin_roident - 1] = ObjectIdGetDatum(roident);
     349         736 :             values[Anum_pg_replication_origin_roname - 1] = roname_d;
     350             : 
     351         736 :             tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
     352         736 :             CatalogTupleInsert(rel, tuple);
     353         734 :             CommandCounterIncrement();
     354         734 :             break;
     355             :         }
     356             :     }
     357             : 
     358             :     /* now release lock again,  */
     359         734 :     table_close(rel, ExclusiveLock);
     360             : 
     361         734 :     if (tuple == NULL)
     362           0 :         ereport(ERROR,
     363             :                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
     364             :                  errmsg("could not find free replication origin ID")));
     365             : 
     366         734 :     heap_freetuple(tuple);
     367         734 :     return roident;
     368             : }
     369             : 
     370             : /*
     371             :  * Helper function to drop a replication origin.
     372             :  */
     373             : static void
     374         626 : replorigin_state_clear(ReplOriginId roident, bool nowait)
     375             : {
     376             :     int         i;
     377             : 
     378             :     /*
     379             :      * Clean up the slot state info, if there is any matching slot.
     380             :      */
     381         626 : restart:
     382         626 :     LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
     383             : 
     384        2066 :     for (i = 0; i < max_active_replication_origins; i++)
     385             :     {
     386        1974 :         ReplicationState *state = &replication_states[i];
     387             : 
     388        1974 :         if (state->roident == roident)
     389             :         {
     390             :             /* found our slot, is it busy? */
     391         534 :             if (state->refcount > 0)
     392             :             {
     393             :                 ConditionVariable *cv;
     394             : 
     395           0 :                 if (nowait)
     396           0 :                     ereport(ERROR,
     397             :                             (errcode(ERRCODE_OBJECT_IN_USE),
     398             :                              (state->acquired_by != 0)
     399             :                              ? errmsg("could not drop replication origin with ID %d, in use by PID %d",
     400             :                                       state->roident,
     401             :                                       state->acquired_by)
     402             :                              : errmsg("could not drop replication origin with ID %d, in use by another process",
     403             :                                       state->roident)));
     404             : 
     405             :                 /*
     406             :                  * We must wait and then retry.  Since we don't know which CV
     407             :                  * to wait on until here, we can't readily use
     408             :                  * ConditionVariablePrepareToSleep (calling it here would be
     409             :                  * wrong, since we could miss the signal if we did so); just
     410             :                  * use ConditionVariableSleep directly.
     411             :                  */
     412           0 :                 cv = &state->origin_cv;
     413             : 
     414           0 :                 LWLockRelease(ReplicationOriginLock);
     415             : 
     416           0 :                 ConditionVariableSleep(cv, WAIT_EVENT_REPLICATION_ORIGIN_DROP);
     417           0 :                 goto restart;
     418             :             }
     419             : 
     420             :             /* first make a WAL log entry */
     421             :             {
     422             :                 xl_replorigin_drop xlrec;
     423             : 
     424         534 :                 xlrec.node_id = roident;
     425         534 :                 XLogBeginInsert();
     426         534 :                 XLogRegisterData(&xlrec, sizeof(xlrec));
     427         534 :                 XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_DROP);
     428             :             }
     429             : 
     430             :             /* then clear the in-memory slot */
     431         534 :             state->roident = InvalidReplOriginId;
     432         534 :             state->remote_lsn = InvalidXLogRecPtr;
     433         534 :             state->local_lsn = InvalidXLogRecPtr;
     434         534 :             break;
     435             :         }
     436             :     }
     437         626 :     LWLockRelease(ReplicationOriginLock);
     438         626 :     ConditionVariableCancelSleep();
     439         626 : }
     440             : 
     441             : /*
     442             :  * Drop replication origin (by name).
     443             :  *
     444             :  * Needs to be called in a transaction.
     445             :  */
     446             : void
     447        1000 : replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
     448             : {
     449             :     ReplOriginId roident;
     450             :     Relation    rel;
     451             :     HeapTuple   tuple;
     452             : 
     453             :     Assert(IsTransactionState());
     454             : 
     455        1000 :     rel = table_open(ReplicationOriginRelationId, RowExclusiveLock);
     456             : 
     457        1000 :     roident = replorigin_by_name(name, missing_ok);
     458             : 
     459             :     /* Lock the origin to prevent concurrent drops. */
     460         998 :     LockSharedObject(ReplicationOriginRelationId, roident, 0,
     461             :                      AccessExclusiveLock);
     462             : 
     463         998 :     tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident));
     464         998 :     if (!HeapTupleIsValid(tuple))
     465             :     {
     466         372 :         if (!missing_ok)
     467           0 :             elog(ERROR, "cache lookup failed for replication origin with ID %d",
     468             :                  roident);
     469             : 
     470             :         /*
     471             :          * We don't need to retain the locks if the origin is already dropped.
     472             :          */
     473         372 :         UnlockSharedObject(ReplicationOriginRelationId, roident, 0,
     474             :                            AccessExclusiveLock);
     475         372 :         table_close(rel, RowExclusiveLock);
     476         372 :         return;
     477             :     }
     478             : 
     479         626 :     replorigin_state_clear(roident, nowait);
     480             : 
     481             :     /*
     482             :      * Now, we can delete the catalog entry.
     483             :      */
     484         626 :     CatalogTupleDelete(rel, &tuple->t_self);
     485         626 :     ReleaseSysCache(tuple);
     486             : 
     487         626 :     CommandCounterIncrement();
     488             : 
     489             :     /* We keep the lock on pg_replication_origin until commit */
     490         626 :     table_close(rel, NoLock);
     491             : }
     492             : 
     493             : /*
     494             :  * Lookup replication origin via its oid and return the name.
     495             :  *
     496             :  * The external name is palloc'd in the calling context.
     497             :  *
     498             :  * Returns true if the origin is known, false otherwise.
     499             :  */
     500             : bool
     501          56 : replorigin_by_oid(ReplOriginId roident, bool missing_ok, char **roname)
     502             : {
     503             :     HeapTuple   tuple;
     504             :     Form_pg_replication_origin ric;
     505             : 
     506             :     Assert(OidIsValid((Oid) roident));
     507             :     Assert(roident != InvalidReplOriginId);
     508             :     Assert(roident != DoNotReplicateId);
     509             : 
     510          56 :     tuple = SearchSysCache1(REPLORIGIDENT,
     511             :                             ObjectIdGetDatum((Oid) roident));
     512             : 
     513          56 :     if (HeapTupleIsValid(tuple))
     514             :     {
     515          50 :         ric = (Form_pg_replication_origin) GETSTRUCT(tuple);
     516          50 :         *roname = text_to_cstring(&ric->roname);
     517          50 :         ReleaseSysCache(tuple);
     518             : 
     519          50 :         return true;
     520             :     }
     521             :     else
     522             :     {
     523           6 :         *roname = NULL;
     524             : 
     525           6 :         if (!missing_ok)
     526           0 :             ereport(ERROR,
     527             :                     (errcode(ERRCODE_UNDEFINED_OBJECT),
     528             :                      errmsg("replication origin with ID %d does not exist",
     529             :                             roident)));
     530             : 
     531           6 :         return false;
     532             :     }
     533             : }
     534             : 
     535             : 
     536             : /* ---------------------------------------------------------------------------
     537             :  * Functions for handling replication progress.
     538             :  * ---------------------------------------------------------------------------
     539             :  */
     540             : 
     541             : Size
     542        8810 : ReplicationOriginShmemSize(void)
     543             : {
     544        8810 :     Size        size = 0;
     545             : 
     546        8810 :     if (max_active_replication_origins == 0)
     547           4 :         return size;
     548             : 
     549        8806 :     size = add_size(size, offsetof(ReplicationStateCtl, states));
     550             : 
     551        8806 :     size = add_size(size,
     552             :                     mul_size(max_active_replication_origins, sizeof(ReplicationState)));
     553        8806 :     return size;
     554             : }
     555             : 
     556             : void
     557        2280 : ReplicationOriginShmemInit(void)
     558             : {
     559             :     bool        found;
     560             : 
     561        2280 :     if (max_active_replication_origins == 0)
     562           2 :         return;
     563             : 
     564        2278 :     replication_states_ctl = (ReplicationStateCtl *)
     565        2278 :         ShmemInitStruct("ReplicationOriginState",
     566             :                         ReplicationOriginShmemSize(),
     567             :                         &found);
     568        2278 :     replication_states = replication_states_ctl->states;
     569             : 
     570        2278 :     if (!found)
     571             :     {
     572             :         int         i;
     573             : 
     574      186652 :         MemSet(replication_states_ctl, 0, ReplicationOriginShmemSize());
     575             : 
     576        2278 :         replication_states_ctl->tranche_id = LWTRANCHE_REPLICATION_ORIGIN_STATE;
     577             : 
     578       25040 :         for (i = 0; i < max_active_replication_origins; i++)
     579             :         {
     580       22762 :             LWLockInitialize(&replication_states[i].lock,
     581       22762 :                              replication_states_ctl->tranche_id);
     582       22762 :             ConditionVariableInit(&replication_states[i].origin_cv);
     583             :         }
     584             :     }
     585             : }
     586             : 
     587             : /* ---------------------------------------------------------------------------
     588             :  * Perform a checkpoint of each replication origin's progress with respect to
     589             :  * the replayed remote_lsn. Make sure that all transactions we refer to in the
     590             :  * checkpoint (local_lsn) are actually on-disk. This might not yet be the case
     591             :  * if the transactions were originally committed asynchronously.
     592             :  *
     593             :  * We store checkpoints in the following format:
     594             :  * +-------+------------------------+------------------+-----+--------+
     595             :  * | MAGIC | ReplicationStateOnDisk | struct Replic... | ... | CRC32C | EOF
     596             :  * +-------+------------------------+------------------+-----+--------+
     597             :  *
     598             :  * So its just the magic, followed by the statically sized
     599             :  * ReplicationStateOnDisk structs. Note that the maximum number of
     600             :  * ReplicationState is determined by max_active_replication_origins.
     601             :  * ---------------------------------------------------------------------------
     602             :  */
     603             : void
     604        3576 : CheckPointReplicationOrigin(void)
     605             : {
     606        3576 :     const char *tmppath = PG_REPLORIGIN_CHECKPOINT_TMPFILE;
     607        3576 :     const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
     608             :     int         tmpfd;
     609             :     int         i;
     610        3576 :     uint32      magic = REPLICATION_STATE_MAGIC;
     611             :     pg_crc32c   crc;
     612             : 
     613        3576 :     if (max_active_replication_origins == 0)
     614           2 :         return;
     615             : 
     616        3574 :     INIT_CRC32C(crc);
     617             : 
     618             :     /* make sure no old temp file is remaining */
     619        3574 :     if (unlink(tmppath) < 0 && errno != ENOENT)
     620           0 :         ereport(PANIC,
     621             :                 (errcode_for_file_access(),
     622             :                  errmsg("could not remove file \"%s\": %m",
     623             :                         tmppath)));
     624             : 
     625             :     /*
     626             :      * no other backend can perform this at the same time; only one checkpoint
     627             :      * can happen at a time.
     628             :      */
     629        3574 :     tmpfd = OpenTransientFile(tmppath,
     630             :                               O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
     631        3574 :     if (tmpfd < 0)
     632           0 :         ereport(PANIC,
     633             :                 (errcode_for_file_access(),
     634             :                  errmsg("could not create file \"%s\": %m",
     635             :                         tmppath)));
     636             : 
     637             :     /* write magic */
     638        3574 :     errno = 0;
     639        3574 :     if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
     640             :     {
     641             :         /* if write didn't set errno, assume problem is no disk space */
     642           0 :         if (errno == 0)
     643           0 :             errno = ENOSPC;
     644           0 :         ereport(PANIC,
     645             :                 (errcode_for_file_access(),
     646             :                  errmsg("could not write to file \"%s\": %m",
     647             :                         tmppath)));
     648             :     }
     649        3574 :     COMP_CRC32C(crc, &magic, sizeof(magic));
     650             : 
     651             :     /* prevent concurrent creations/drops */
     652        3574 :     LWLockAcquire(ReplicationOriginLock, LW_SHARED);
     653             : 
     654             :     /* write actual data */
     655       39314 :     for (i = 0; i < max_active_replication_origins; i++)
     656             :     {
     657             :         ReplicationStateOnDisk disk_state;
     658       35740 :         ReplicationState *curstate = &replication_states[i];
     659             :         XLogRecPtr  local_lsn;
     660             : 
     661       35740 :         if (curstate->roident == InvalidReplOriginId)
     662       35634 :             continue;
     663             : 
     664             :         /* zero, to avoid uninitialized padding bytes */
     665         106 :         memset(&disk_state, 0, sizeof(disk_state));
     666             : 
     667         106 :         LWLockAcquire(&curstate->lock, LW_SHARED);
     668             : 
     669         106 :         disk_state.roident = curstate->roident;
     670             : 
     671         106 :         disk_state.remote_lsn = curstate->remote_lsn;
     672         106 :         local_lsn = curstate->local_lsn;
     673             : 
     674         106 :         LWLockRelease(&curstate->lock);
     675             : 
     676             :         /* make sure we only write out a commit that's persistent */
     677         106 :         XLogFlush(local_lsn);
     678             : 
     679         106 :         errno = 0;
     680         106 :         if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
     681             :             sizeof(disk_state))
     682             :         {
     683             :             /* if write didn't set errno, assume problem is no disk space */
     684           0 :             if (errno == 0)
     685           0 :                 errno = ENOSPC;
     686           0 :             ereport(PANIC,
     687             :                     (errcode_for_file_access(),
     688             :                      errmsg("could not write to file \"%s\": %m",
     689             :                             tmppath)));
     690             :         }
     691             : 
     692         106 :         COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
     693             :     }
     694             : 
     695        3574 :     LWLockRelease(ReplicationOriginLock);
     696             : 
     697             :     /* write out the CRC */
     698        3574 :     FIN_CRC32C(crc);
     699        3574 :     errno = 0;
     700        3574 :     if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
     701             :     {
     702             :         /* if write didn't set errno, assume problem is no disk space */
     703           0 :         if (errno == 0)
     704           0 :             errno = ENOSPC;
     705           0 :         ereport(PANIC,
     706             :                 (errcode_for_file_access(),
     707             :                  errmsg("could not write to file \"%s\": %m",
     708             :                         tmppath)));
     709             :     }
     710             : 
     711        3574 :     if (CloseTransientFile(tmpfd) != 0)
     712           0 :         ereport(PANIC,
     713             :                 (errcode_for_file_access(),
     714             :                  errmsg("could not close file \"%s\": %m",
     715             :                         tmppath)));
     716             : 
     717             :     /* fsync, rename to permanent file, fsync file and directory */
     718        3574 :     durable_rename(tmppath, path, PANIC);
     719             : }
     720             : 
     721             : /*
     722             :  * Recover replication replay status from checkpoint data saved earlier by
     723             :  * CheckPointReplicationOrigin.
     724             :  *
     725             :  * This only needs to be called at startup and *not* during every checkpoint
     726             :  * read during recovery (e.g. in HS or PITR from a base backup) afterwards. All
     727             :  * state thereafter can be recovered by looking at commit records.
     728             :  */
     729             : void
     730        1984 : StartupReplicationOrigin(void)
     731             : {
     732        1984 :     const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
     733             :     int         fd;
     734             :     int         readBytes;
     735        1984 :     uint32      magic = REPLICATION_STATE_MAGIC;
     736        1984 :     int         last_state = 0;
     737             :     pg_crc32c   file_crc;
     738             :     pg_crc32c   crc;
     739             : 
     740             :     /* don't want to overwrite already existing state */
     741             : #ifdef USE_ASSERT_CHECKING
     742             :     static bool already_started = false;
     743             : 
     744             :     Assert(!already_started);
     745             :     already_started = true;
     746             : #endif
     747             : 
     748        1984 :     if (max_active_replication_origins == 0)
     749         104 :         return;
     750             : 
     751        1982 :     INIT_CRC32C(crc);
     752             : 
     753        1982 :     elog(DEBUG2, "starting up replication origin progress state");
     754             : 
     755        1982 :     fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
     756             : 
     757             :     /*
     758             :      * might have had max_active_replication_origins == 0 last run, or we just
     759             :      * brought up a standby.
     760             :      */
     761        1982 :     if (fd < 0 && errno == ENOENT)
     762         102 :         return;
     763        1880 :     else if (fd < 0)
     764           0 :         ereport(PANIC,
     765             :                 (errcode_for_file_access(),
     766             :                  errmsg("could not open file \"%s\": %m",
     767             :                         path)));
     768             : 
     769             :     /* verify magic, that is written even if nothing was active */
     770        1880 :     readBytes = read(fd, &magic, sizeof(magic));
     771        1880 :     if (readBytes != sizeof(magic))
     772             :     {
     773           0 :         if (readBytes < 0)
     774           0 :             ereport(PANIC,
     775             :                     (errcode_for_file_access(),
     776             :                      errmsg("could not read file \"%s\": %m",
     777             :                             path)));
     778             :         else
     779           0 :             ereport(PANIC,
     780             :                     (errcode(ERRCODE_DATA_CORRUPTED),
     781             :                      errmsg("could not read file \"%s\": read %d of %zu",
     782             :                             path, readBytes, sizeof(magic))));
     783             :     }
     784        1880 :     COMP_CRC32C(crc, &magic, sizeof(magic));
     785             : 
     786        1880 :     if (magic != REPLICATION_STATE_MAGIC)
     787           0 :         ereport(PANIC,
     788             :                 (errmsg("replication checkpoint has wrong magic %u instead of %u",
     789             :                         magic, REPLICATION_STATE_MAGIC)));
     790             : 
     791             :     /* we can skip locking here, no other access is possible */
     792             : 
     793             :     /* recover individual states, until there are no more to be found */
     794             :     while (true)
     795          62 :     {
     796             :         ReplicationStateOnDisk disk_state;
     797             : 
     798        1942 :         readBytes = read(fd, &disk_state, sizeof(disk_state));
     799             : 
     800        1942 :         if (readBytes < 0)
     801             :         {
     802           0 :             ereport(PANIC,
     803             :                     (errcode_for_file_access(),
     804             :                      errmsg("could not read file \"%s\": %m",
     805             :                             path)));
     806             :         }
     807             : 
     808             :         /* no further data */
     809        1942 :         if (readBytes == sizeof(crc))
     810             :         {
     811        1880 :             memcpy(&file_crc, &disk_state, sizeof(file_crc));
     812        1880 :             break;
     813             :         }
     814             : 
     815          62 :         if (readBytes != sizeof(disk_state))
     816             :         {
     817           0 :             ereport(PANIC,
     818             :                     (errcode_for_file_access(),
     819             :                      errmsg("could not read file \"%s\": read %d of %zu",
     820             :                             path, readBytes, sizeof(disk_state))));
     821             :         }
     822             : 
     823          62 :         COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
     824             : 
     825          62 :         if (last_state == max_active_replication_origins)
     826           0 :             ereport(PANIC,
     827             :                     (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     828             :                      errmsg("could not find free replication state, increase \"max_active_replication_origins\"")));
     829             : 
     830             :         /* copy data to shared memory */
     831          62 :         replication_states[last_state].roident = disk_state.roident;
     832          62 :         replication_states[last_state].remote_lsn = disk_state.remote_lsn;
     833          62 :         last_state++;
     834             : 
     835          62 :         ereport(LOG,
     836             :                 errmsg("recovered replication state of node %d to %X/%08X",
     837             :                        disk_state.roident,
     838             :                        LSN_FORMAT_ARGS(disk_state.remote_lsn)));
     839             :     }
     840             : 
     841             :     /* now check checksum */
     842        1880 :     FIN_CRC32C(crc);
     843        1880 :     if (file_crc != crc)
     844           0 :         ereport(PANIC,
     845             :                 (errcode(ERRCODE_DATA_CORRUPTED),
     846             :                  errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
     847             :                         crc, file_crc)));
     848             : 
     849        1880 :     if (CloseTransientFile(fd) != 0)
     850           0 :         ereport(PANIC,
     851             :                 (errcode_for_file_access(),
     852             :                  errmsg("could not close file \"%s\": %m",
     853             :                         path)));
     854             : }
     855             : 
     856             : void
     857           8 : replorigin_redo(XLogReaderState *record)
     858             : {
     859           8 :     uint8       info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
     860             : 
     861           8 :     switch (info)
     862             :     {
     863           4 :         case XLOG_REPLORIGIN_SET:
     864             :             {
     865           4 :                 xl_replorigin_set *xlrec =
     866           4 :                     (xl_replorigin_set *) XLogRecGetData(record);
     867             : 
     868           4 :                 replorigin_advance(xlrec->node_id,
     869             :                                    xlrec->remote_lsn, record->EndRecPtr,
     870           4 :                                    xlrec->force /* backward */ ,
     871             :                                    false /* WAL log */ );
     872           4 :                 break;
     873             :             }
     874           4 :         case XLOG_REPLORIGIN_DROP:
     875             :             {
     876             :                 xl_replorigin_drop *xlrec;
     877             :                 int         i;
     878             : 
     879           4 :                 xlrec = (xl_replorigin_drop *) XLogRecGetData(record);
     880             : 
     881           6 :                 for (i = 0; i < max_active_replication_origins; i++)
     882             :                 {
     883           6 :                     ReplicationState *state = &replication_states[i];
     884             : 
     885             :                     /* found our slot */
     886           6 :                     if (state->roident == xlrec->node_id)
     887             :                     {
     888             :                         /* reset entry */
     889           4 :                         state->roident = InvalidReplOriginId;
     890           4 :                         state->remote_lsn = InvalidXLogRecPtr;
     891           4 :                         state->local_lsn = InvalidXLogRecPtr;
     892           4 :                         break;
     893             :                     }
     894             :                 }
     895           4 :                 break;
     896             :             }
     897           0 :         default:
     898           0 :             elog(PANIC, "replorigin_redo: unknown op code %u", info);
     899             :     }
     900           8 : }
     901             : 
     902             : 
     903             : /*
     904             :  * Tell the replication origin progress machinery that a commit from 'node'
     905             :  * that originated at the LSN remote_commit on the remote node was replayed
     906             :  * successfully and that we don't need to do so again. In combination with
     907             :  * setting up replorigin_xact_state {.origin_lsn, .origin_timestamp}
     908             :  * that ensures we won't lose knowledge about that after a crash if the
     909             :  * transaction had a persistent effect (think of asynchronous commits).
     910             :  *
     911             :  * local_commit needs to be a local LSN of the commit so that we can make sure
     912             :  * upon a checkpoint that enough WAL has been persisted to disk.
     913             :  *
     914             :  * Needs to be called with a RowExclusiveLock on pg_replication_origin,
     915             :  * unless running in recovery.
     916             :  */
     917             : void
     918         484 : replorigin_advance(ReplOriginId node,
     919             :                    XLogRecPtr remote_commit, XLogRecPtr local_commit,
     920             :                    bool go_backward, bool wal_log)
     921             : {
     922             :     int         i;
     923         484 :     ReplicationState *replication_state = NULL;
     924         484 :     ReplicationState *free_state = NULL;
     925             : 
     926             :     Assert(node != InvalidReplOriginId);
     927             : 
     928             :     /* we don't track DoNotReplicateId */
     929         484 :     if (node == DoNotReplicateId)
     930           0 :         return;
     931             : 
     932             :     /*
     933             :      * XXX: For the case where this is called by WAL replay, it'd be more
     934             :      * efficient to restore into a backend local hashtable and only dump into
     935             :      * shmem after recovery is finished. Let's wait with implementing that
     936             :      * till it's shown to be a measurable expense
     937             :      */
     938             : 
     939             :     /* Lock exclusively, as we may have to create a new table entry. */
     940         484 :     LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
     941             : 
     942             :     /*
     943             :      * Search for either an existing slot for the origin, or a free one we can
     944             :      * use.
     945             :      */
     946        4430 :     for (i = 0; i < max_active_replication_origins; i++)
     947             :     {
     948        4038 :         ReplicationState *curstate = &replication_states[i];
     949             : 
     950             :         /* remember where to insert if necessary */
     951        4038 :         if (curstate->roident == InvalidReplOriginId &&
     952             :             free_state == NULL)
     953             :         {
     954         392 :             free_state = curstate;
     955         392 :             continue;
     956             :         }
     957             : 
     958             :         /* not our slot */
     959        3646 :         if (curstate->roident != node)
     960             :         {
     961        3554 :             continue;
     962             :         }
     963             : 
     964             :         /* ok, found slot */
     965          92 :         replication_state = curstate;
     966             : 
     967          92 :         LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE);
     968             : 
     969             :         /* Make sure it's not used by somebody else */
     970          92 :         if (replication_state->refcount > 0)
     971             :         {
     972           0 :             ereport(ERROR,
     973             :                     (errcode(ERRCODE_OBJECT_IN_USE),
     974             :                      (replication_state->acquired_by != 0)
     975             :                      ? errmsg("replication origin with ID %d is already active for PID %d",
     976             :                               replication_state->roident,
     977             :                               replication_state->acquired_by)
     978             :                      : errmsg("replication origin with ID %d is already active in another process",
     979             :                               replication_state->roident)));
     980             :         }
     981             : 
     982          92 :         break;
     983             :     }
     984             : 
     985         484 :     if (replication_state == NULL && free_state == NULL)
     986           0 :         ereport(ERROR,
     987             :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     988             :                  errmsg("could not find free replication state slot for replication origin with ID %d",
     989             :                         node),
     990             :                  errhint("Increase \"max_active_replication_origins\" and try again.")));
     991             : 
     992         484 :     if (replication_state == NULL)
     993             :     {
     994             :         /* initialize new slot */
     995         392 :         LWLockAcquire(&free_state->lock, LW_EXCLUSIVE);
     996         392 :         replication_state = free_state;
     997             :         Assert(!XLogRecPtrIsValid(replication_state->remote_lsn));
     998             :         Assert(!XLogRecPtrIsValid(replication_state->local_lsn));
     999         392 :         replication_state->roident = node;
    1000             :     }
    1001             : 
    1002             :     Assert(replication_state->roident != InvalidReplOriginId);
    1003             : 
    1004             :     /*
    1005             :      * If somebody "forcefully" sets this slot, WAL log it, so it's durable
    1006             :      * and the standby gets the message. Primarily this will be called during
    1007             :      * WAL replay (of commit records) where no WAL logging is necessary.
    1008             :      */
    1009         484 :     if (wal_log)
    1010             :     {
    1011             :         xl_replorigin_set xlrec;
    1012             : 
    1013         402 :         xlrec.remote_lsn = remote_commit;
    1014         402 :         xlrec.node_id = node;
    1015         402 :         xlrec.force = go_backward;
    1016             : 
    1017         402 :         XLogBeginInsert();
    1018         402 :         XLogRegisterData(&xlrec, sizeof(xlrec));
    1019             : 
    1020         402 :         XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_SET);
    1021             :     }
    1022             : 
    1023             :     /*
    1024             :      * Due to - harmless - race conditions during a checkpoint we could see
    1025             :      * values here that are older than the ones we already have in memory. We
    1026             :      * could also see older values for prepared transactions when the prepare
    1027             :      * is sent at a later point of time along with commit prepared and there
    1028             :      * are other transactions commits between prepare and commit prepared. See
    1029             :      * ReorderBufferFinishPrepared. Don't overwrite those.
    1030             :      */
    1031         484 :     if (go_backward || replication_state->remote_lsn < remote_commit)
    1032         470 :         replication_state->remote_lsn = remote_commit;
    1033         484 :     if (XLogRecPtrIsValid(local_commit) &&
    1034          76 :         (go_backward || replication_state->local_lsn < local_commit))
    1035          80 :         replication_state->local_lsn = local_commit;
    1036         484 :     LWLockRelease(&replication_state->lock);
    1037             : 
    1038             :     /*
    1039             :      * Release *after* changing the LSNs, slot isn't acquired and thus could
    1040             :      * otherwise be dropped anytime.
    1041             :      */
    1042         484 :     LWLockRelease(ReplicationOriginLock);
    1043             : }
    1044             : 
    1045             : 
    1046             : XLogRecPtr
    1047          16 : replorigin_get_progress(ReplOriginId node, bool flush)
    1048             : {
    1049             :     int         i;
    1050          16 :     XLogRecPtr  local_lsn = InvalidXLogRecPtr;
    1051          16 :     XLogRecPtr  remote_lsn = InvalidXLogRecPtr;
    1052             : 
    1053             :     /* prevent slots from being concurrently dropped */
    1054          16 :     LWLockAcquire(ReplicationOriginLock, LW_SHARED);
    1055             : 
    1056          76 :     for (i = 0; i < max_active_replication_origins; i++)
    1057             :     {
    1058             :         ReplicationState *state;
    1059             : 
    1060          70 :         state = &replication_states[i];
    1061             : 
    1062          70 :         if (state->roident == node)
    1063             :         {
    1064          10 :             LWLockAcquire(&state->lock, LW_SHARED);
    1065             : 
    1066          10 :             remote_lsn = state->remote_lsn;
    1067          10 :             local_lsn = state->local_lsn;
    1068             : 
    1069          10 :             LWLockRelease(&state->lock);
    1070             : 
    1071          10 :             break;
    1072             :         }
    1073             :     }
    1074             : 
    1075          16 :     LWLockRelease(ReplicationOriginLock);
    1076             : 
    1077          16 :     if (flush && XLogRecPtrIsValid(local_lsn))
    1078           2 :         XLogFlush(local_lsn);
    1079             : 
    1080          16 :     return remote_lsn;
    1081             : }
    1082             : 
    1083             : /* Helper function to reset the session replication origin */
    1084             : static void
    1085         992 : replorigin_session_reset_internal(void)
    1086             : {
    1087             :     ConditionVariable *cv;
    1088             : 
    1089             :     Assert(session_replication_state != NULL);
    1090             : 
    1091         992 :     LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
    1092             : 
    1093             :     /* The origin must be held by at least one process at this point. */
    1094             :     Assert(session_replication_state->refcount > 0);
    1095             : 
    1096             :     /*
    1097             :      * Reset the PID only if the current session is the first to set up this
    1098             :      * origin. This avoids clearing the first process's PID when any other
    1099             :      * session releases the origin.
    1100             :      */
    1101         992 :     if (session_replication_state->acquired_by == MyProcPid)
    1102         964 :         session_replication_state->acquired_by = 0;
    1103             : 
    1104         992 :     session_replication_state->refcount--;
    1105             : 
    1106         992 :     cv = &session_replication_state->origin_cv;
    1107         992 :     session_replication_state = NULL;
    1108             : 
    1109         992 :     LWLockRelease(ReplicationOriginLock);
    1110             : 
    1111         992 :     ConditionVariableBroadcast(cv);
    1112         992 : }
    1113             : 
    1114             : /*
    1115             :  * Tear down a (possibly) configured session replication origin during process
    1116             :  * exit.
    1117             :  */
    1118             : static void
    1119         984 : ReplicationOriginExitCleanup(int code, Datum arg)
    1120             : {
    1121         984 :     if (session_replication_state == NULL)
    1122         380 :         return;
    1123             : 
    1124         604 :     replorigin_session_reset_internal();
    1125             : }
    1126             : 
    1127             : /*
    1128             :  * Setup a replication origin in the shared memory struct if it doesn't
    1129             :  * already exist and cache access to the specific ReplicationSlot so the
    1130             :  * array doesn't have to be searched when calling
    1131             :  * replorigin_session_advance().
    1132             :  *
    1133             :  * Normally only one such cached origin can exist per process so the cached
    1134             :  * value can only be set again after the previous value is torn down with
    1135             :  * replorigin_session_reset(). For this normal case pass acquired_by = 0
    1136             :  * (meaning the slot is not allowed to be already acquired by another process).
    1137             :  *
    1138             :  * However, sometimes multiple processes can safely re-use the same origin slot
    1139             :  * (for example, multiple parallel apply processes can safely use the same
    1140             :  * origin, provided they maintain commit order by allowing only one process to
    1141             :  * commit at a time). For this case the first process must pass acquired_by =
    1142             :  * 0, and then the other processes sharing that same origin can pass
    1143             :  * acquired_by = PID of the first process.
    1144             :  */
    1145             : void
    1146         996 : replorigin_session_setup(ReplOriginId node, int acquired_by)
    1147             : {
    1148             :     static bool registered_cleanup;
    1149             :     int         i;
    1150         996 :     int         free_slot = -1;
    1151             : 
    1152         996 :     if (!registered_cleanup)
    1153             :     {
    1154         984 :         on_shmem_exit(ReplicationOriginExitCleanup, 0);
    1155         984 :         registered_cleanup = true;
    1156             :     }
    1157             : 
    1158             :     Assert(max_active_replication_origins > 0);
    1159             : 
    1160         996 :     if (session_replication_state != NULL)
    1161           2 :         ereport(ERROR,
    1162             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1163             :                  errmsg("cannot setup replication origin when one is already setup")));
    1164             : 
    1165             :     /* Lock exclusively, as we may have to create a new table entry. */
    1166         994 :     LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
    1167             : 
    1168             :     /*
    1169             :      * Search for either an existing slot for the origin, or a free one we can
    1170             :      * use.
    1171             :      */
    1172        3972 :     for (i = 0; i < max_active_replication_origins; i++)
    1173             :     {
    1174        3732 :         ReplicationState *curstate = &replication_states[i];
    1175             : 
    1176             :         /* remember where to insert if necessary */
    1177        3732 :         if (curstate->roident == InvalidReplOriginId &&
    1178             :             free_slot == -1)
    1179             :         {
    1180         244 :             free_slot = i;
    1181         244 :             continue;
    1182             :         }
    1183             : 
    1184             :         /* not our slot */
    1185        3488 :         if (curstate->roident != node)
    1186        2734 :             continue;
    1187             : 
    1188         754 :         else if (curstate->acquired_by != 0 && acquired_by == 0)
    1189             :         {
    1190           0 :             ereport(ERROR,
    1191             :                     (errcode(ERRCODE_OBJECT_IN_USE),
    1192             :                      errmsg("replication origin with ID %d is already active for PID %d",
    1193             :                             curstate->roident, curstate->acquired_by)));
    1194             :         }
    1195             : 
    1196         754 :         else if (curstate->acquired_by != acquired_by)
    1197             :         {
    1198           0 :             ereport(ERROR,
    1199             :                     (errcode(ERRCODE_OBJECT_IN_USE),
    1200             :                      errmsg("could not find replication state slot for replication origin with OID %u which was acquired by %d",
    1201             :                             node, acquired_by)));
    1202             :         }
    1203             : 
    1204             :         /*
    1205             :          * The origin is in use, but PID is not recorded. This can happen if
    1206             :          * the process that originally acquired the origin exited without
    1207             :          * releasing it. To ensure correctness, other processes cannot acquire
    1208             :          * the origin until all processes currently using it have released it.
    1209             :          */
    1210         754 :         else if (curstate->acquired_by == 0 && curstate->refcount > 0)
    1211           0 :             ereport(ERROR,
    1212             :                     (errcode(ERRCODE_OBJECT_IN_USE),
    1213             :                      errmsg("replication origin with ID %d is already active in another process",
    1214             :                             curstate->roident)));
    1215             : 
    1216             :         /* ok, found slot */
    1217         754 :         session_replication_state = curstate;
    1218         754 :         break;
    1219             :     }
    1220             : 
    1221             : 
    1222         994 :     if (session_replication_state == NULL && free_slot == -1)
    1223           0 :         ereport(ERROR,
    1224             :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
    1225             :                  errmsg("could not find free replication state slot for replication origin with ID %d",
    1226             :                         node),
    1227             :                  errhint("Increase \"max_active_replication_origins\" and try again.")));
    1228         994 :     else if (session_replication_state == NULL)
    1229             :     {
    1230         240 :         if (acquired_by)
    1231           2 :             ereport(ERROR,
    1232             :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1233             :                      errmsg("cannot use PID %d for inactive replication origin with ID %d",
    1234             :                             acquired_by, node)));
    1235             : 
    1236             :         /* initialize new slot */
    1237         238 :         session_replication_state = &replication_states[free_slot];
    1238             :         Assert(!XLogRecPtrIsValid(session_replication_state->remote_lsn));
    1239             :         Assert(!XLogRecPtrIsValid(session_replication_state->local_lsn));
    1240         238 :         session_replication_state->roident = node;
    1241             :     }
    1242             : 
    1243             : 
    1244             :     Assert(session_replication_state->roident != InvalidReplOriginId);
    1245             : 
    1246         992 :     if (acquired_by == 0)
    1247             :     {
    1248         964 :         session_replication_state->acquired_by = MyProcPid;
    1249             :         Assert(session_replication_state->refcount == 0);
    1250             :     }
    1251             :     else
    1252             :     {
    1253             :         /*
    1254             :          * Sanity check: the origin must already be acquired by the process
    1255             :          * passed as input, and at least one process must be using it.
    1256             :          */
    1257             :         Assert(session_replication_state->acquired_by == acquired_by);
    1258             :         Assert(session_replication_state->refcount > 0);
    1259             :     }
    1260             : 
    1261         992 :     session_replication_state->refcount++;
    1262             : 
    1263         992 :     LWLockRelease(ReplicationOriginLock);
    1264             : 
    1265             :     /* probably this one is pointless */
    1266         992 :     ConditionVariableBroadcast(&session_replication_state->origin_cv);
    1267         992 : }
    1268             : 
    1269             : /*
    1270             :  * Reset replay state previously setup in this session.
    1271             :  *
    1272             :  * This function may only be called if an origin was setup with
    1273             :  * replorigin_session_setup().
    1274             :  */
    1275             : void
    1276         392 : replorigin_session_reset(void)
    1277             : {
    1278             :     Assert(max_active_replication_origins != 0);
    1279             : 
    1280         392 :     if (session_replication_state == NULL)
    1281           2 :         ereport(ERROR,
    1282             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1283             :                  errmsg("no replication origin is configured")));
    1284             : 
    1285             :     /*
    1286             :      * Restrict explicit resetting of the replication origin if it was first
    1287             :      * acquired by this process and others are still using it. While the
    1288             :      * system handles this safely (as happens if the first session exits
    1289             :      * without calling reset), it is best to avoid doing so.
    1290             :      */
    1291         390 :     if (session_replication_state->acquired_by == MyProcPid &&
    1292         386 :         session_replication_state->refcount > 1)
    1293           2 :         ereport(ERROR,
    1294             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1295             :                  errmsg("cannot reset replication origin with ID %d because it is still in use by other processes",
    1296             :                         session_replication_state->roident),
    1297             :                  errdetail("This session is the first process for this replication origin, and other processes are currently sharing it."),
    1298             :                  errhint("Reset the replication origin in all other processes before retrying.")));
    1299             : 
    1300         388 :     replorigin_session_reset_internal();
    1301         388 : }
    1302             : 
    1303             : /*
    1304             :  * Do the same work replorigin_advance() does, just on the session's
    1305             :  * configured origin.
    1306             :  *
    1307             :  * This is noticeably cheaper than using replorigin_advance().
    1308             :  */
    1309             : void
    1310        2198 : replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit)
    1311             : {
    1312             :     Assert(session_replication_state != NULL);
    1313             :     Assert(session_replication_state->roident != InvalidReplOriginId);
    1314             : 
    1315        2198 :     LWLockAcquire(&session_replication_state->lock, LW_EXCLUSIVE);
    1316        2198 :     if (session_replication_state->local_lsn < local_commit)
    1317        2198 :         session_replication_state->local_lsn = local_commit;
    1318        2198 :     if (session_replication_state->remote_lsn < remote_commit)
    1319        1034 :         session_replication_state->remote_lsn = remote_commit;
    1320        2198 :     LWLockRelease(&session_replication_state->lock);
    1321        2198 : }
    1322             : 
    1323             : /*
    1324             :  * Ask the machinery about the point up to which we successfully replayed
    1325             :  * changes from an already setup replication origin.
    1326             :  */
    1327             : XLogRecPtr
    1328         558 : replorigin_session_get_progress(bool flush)
    1329             : {
    1330             :     XLogRecPtr  remote_lsn;
    1331             :     XLogRecPtr  local_lsn;
    1332             : 
    1333             :     Assert(session_replication_state != NULL);
    1334             : 
    1335         558 :     LWLockAcquire(&session_replication_state->lock, LW_SHARED);
    1336         558 :     remote_lsn = session_replication_state->remote_lsn;
    1337         558 :     local_lsn = session_replication_state->local_lsn;
    1338         558 :     LWLockRelease(&session_replication_state->lock);
    1339             : 
    1340         558 :     if (flush && XLogRecPtrIsValid(local_lsn))
    1341           2 :         XLogFlush(local_lsn);
    1342             : 
    1343         558 :     return remote_lsn;
    1344             : }
    1345             : 
    1346             : /*
    1347             :  * Clear the per-transaction replication origin state.
    1348             :  *
    1349             :  * replorigin_session_origin is also cleared if clear_origin is set.
    1350             :  */
    1351             : void
    1352        1550 : replorigin_xact_clear(bool clear_origin)
    1353             : {
    1354        1550 :     replorigin_xact_state.origin_lsn = InvalidXLogRecPtr;
    1355        1550 :     replorigin_xact_state.origin_timestamp = 0;
    1356        1550 :     if (clear_origin)
    1357        1550 :         replorigin_xact_state.origin = InvalidReplOriginId;
    1358        1550 : }
    1359             : 
    1360             : 
    1361             : /* ---------------------------------------------------------------------------
    1362             :  * SQL functions for working with replication origin.
    1363             :  *
    1364             :  * These mostly should be fairly short wrappers around more generic functions.
    1365             :  * ---------------------------------------------------------------------------
    1366             :  */
    1367             : 
    1368             : /*
    1369             :  * Create replication origin for the passed in name, and return the assigned
    1370             :  * oid.
    1371             :  */
    1372             : Datum
    1373          28 : pg_replication_origin_create(PG_FUNCTION_ARGS)
    1374             : {
    1375             :     char       *name;
    1376             :     ReplOriginId roident;
    1377             : 
    1378          28 :     replorigin_check_prerequisites(false, false);
    1379             : 
    1380          28 :     name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
    1381             : 
    1382             :     /*
    1383             :      * Replication origins "any and "none" are reserved for system options.
    1384             :      * The origins "pg_xxx" are reserved for internal use.
    1385             :      */
    1386          28 :     if (IsReservedName(name) || IsReservedOriginName(name))
    1387           6 :         ereport(ERROR,
    1388             :                 (errcode(ERRCODE_RESERVED_NAME),
    1389             :                  errmsg("replication origin name \"%s\" is reserved",
    1390             :                         name),
    1391             :                  errdetail("Origin names \"%s\", \"%s\", and names starting with \"pg_\" are reserved.",
    1392             :                            LOGICALREP_ORIGIN_ANY, LOGICALREP_ORIGIN_NONE)));
    1393             : 
    1394             :     /*
    1395             :      * If built with appropriate switch, whine when regression-testing
    1396             :      * conventions for replication origin names are violated.
    1397             :      */
    1398             : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
    1399             :     if (strncmp(name, "regress_", 8) != 0)
    1400             :         elog(WARNING, "replication origins created by regression test cases should have names starting with \"regress_\"");
    1401             : #endif
    1402             : 
    1403          22 :     roident = replorigin_create(name);
    1404             : 
    1405          14 :     pfree(name);
    1406             : 
    1407          14 :     PG_RETURN_OID(roident);
    1408             : }
    1409             : 
    1410             : /*
    1411             :  * Drop replication origin.
    1412             :  */
    1413             : Datum
    1414          18 : pg_replication_origin_drop(PG_FUNCTION_ARGS)
    1415             : {
    1416             :     char       *name;
    1417             : 
    1418          18 :     replorigin_check_prerequisites(false, false);
    1419             : 
    1420          18 :     name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
    1421             : 
    1422          18 :     replorigin_drop_by_name(name, false, true);
    1423             : 
    1424          16 :     pfree(name);
    1425             : 
    1426          16 :     PG_RETURN_VOID();
    1427             : }
    1428             : 
    1429             : /*
    1430             :  * Return oid of a replication origin.
    1431             :  */
    1432             : Datum
    1433           0 : pg_replication_origin_oid(PG_FUNCTION_ARGS)
    1434             : {
    1435             :     char       *name;
    1436             :     ReplOriginId roident;
    1437             : 
    1438           0 :     replorigin_check_prerequisites(false, false);
    1439             : 
    1440           0 :     name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
    1441           0 :     roident = replorigin_by_name(name, true);
    1442             : 
    1443           0 :     pfree(name);
    1444             : 
    1445           0 :     if (OidIsValid(roident))
    1446           0 :         PG_RETURN_OID(roident);
    1447           0 :     PG_RETURN_NULL();
    1448             : }
    1449             : 
    1450             : /*
    1451             :  * Setup a replication origin for this session.
    1452             :  */
    1453             : Datum
    1454          22 : pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
    1455             : {
    1456             :     char       *name;
    1457             :     ReplOriginId origin;
    1458             :     int         pid;
    1459             : 
    1460          22 :     replorigin_check_prerequisites(true, false);
    1461             : 
    1462          22 :     name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
    1463          22 :     origin = replorigin_by_name(name, false);
    1464          20 :     pid = PG_GETARG_INT32(1);
    1465          20 :     replorigin_session_setup(origin, pid);
    1466             : 
    1467          16 :     replorigin_xact_state.origin = origin;
    1468             : 
    1469          16 :     pfree(name);
    1470             : 
    1471          16 :     PG_RETURN_VOID();
    1472             : }
    1473             : 
    1474             : /*
    1475             :  * Reset previously setup origin in this session
    1476             :  */
    1477             : Datum
    1478          20 : pg_replication_origin_session_reset(PG_FUNCTION_ARGS)
    1479             : {
    1480          20 :     replorigin_check_prerequisites(true, false);
    1481             : 
    1482          20 :     replorigin_session_reset();
    1483             : 
    1484          16 :     replorigin_xact_clear(true);
    1485             : 
    1486          16 :     PG_RETURN_VOID();
    1487             : }
    1488             : 
    1489             : /*
    1490             :  * Has a replication origin been setup for this session.
    1491             :  */
    1492             : Datum
    1493           8 : pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)
    1494             : {
    1495           8 :     replorigin_check_prerequisites(false, false);
    1496             : 
    1497           8 :     PG_RETURN_BOOL(replorigin_xact_state.origin != InvalidReplOriginId);
    1498             : }
    1499             : 
    1500             : 
    1501             : /*
    1502             :  * Return the replication progress for origin setup in the current session.
    1503             :  *
    1504             :  * If 'flush' is set to true it is ensured that the returned value corresponds
    1505             :  * to a local transaction that has been flushed. This is useful if asynchronous
    1506             :  * commits are used when replaying replicated transactions.
    1507             :  */
    1508             : Datum
    1509           4 : pg_replication_origin_session_progress(PG_FUNCTION_ARGS)
    1510             : {
    1511           4 :     XLogRecPtr  remote_lsn = InvalidXLogRecPtr;
    1512           4 :     bool        flush = PG_GETARG_BOOL(0);
    1513             : 
    1514           4 :     replorigin_check_prerequisites(true, false);
    1515             : 
    1516           4 :     if (session_replication_state == NULL)
    1517           0 :         ereport(ERROR,
    1518             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1519             :                  errmsg("no replication origin is configured")));
    1520             : 
    1521           4 :     remote_lsn = replorigin_session_get_progress(flush);
    1522             : 
    1523           4 :     if (!XLogRecPtrIsValid(remote_lsn))
    1524           0 :         PG_RETURN_NULL();
    1525             : 
    1526           4 :     PG_RETURN_LSN(remote_lsn);
    1527             : }
    1528             : 
    1529             : Datum
    1530           2 : pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)
    1531             : {
    1532           2 :     XLogRecPtr  location = PG_GETARG_LSN(0);
    1533             : 
    1534           2 :     replorigin_check_prerequisites(true, false);
    1535             : 
    1536           2 :     if (session_replication_state == NULL)
    1537           0 :         ereport(ERROR,
    1538             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1539             :                  errmsg("no replication origin is configured")));
    1540             : 
    1541           2 :     replorigin_xact_state.origin_lsn = location;
    1542           2 :     replorigin_xact_state.origin_timestamp = PG_GETARG_TIMESTAMPTZ(1);
    1543             : 
    1544           2 :     PG_RETURN_VOID();
    1545             : }
    1546             : 
    1547             : Datum
    1548           0 : pg_replication_origin_xact_reset(PG_FUNCTION_ARGS)
    1549             : {
    1550           0 :     replorigin_check_prerequisites(true, false);
    1551             : 
    1552             :     /* Do not clear the session origin */
    1553           0 :     replorigin_xact_clear(false);
    1554             : 
    1555           0 :     PG_RETURN_VOID();
    1556             : }
    1557             : 
    1558             : 
    1559             : Datum
    1560           6 : pg_replication_origin_advance(PG_FUNCTION_ARGS)
    1561             : {
    1562           6 :     text       *name = PG_GETARG_TEXT_PP(0);
    1563           6 :     XLogRecPtr  remote_commit = PG_GETARG_LSN(1);
    1564             :     ReplOriginId node;
    1565             : 
    1566           6 :     replorigin_check_prerequisites(true, false);
    1567             : 
    1568             :     /* lock to prevent the replication origin from vanishing */
    1569           6 :     LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
    1570             : 
    1571           6 :     node = replorigin_by_name(text_to_cstring(name), false);
    1572             : 
    1573             :     /*
    1574             :      * Can't sensibly pass a local commit to be flushed at checkpoint - this
    1575             :      * xact hasn't committed yet. This is why this function should be used to
    1576             :      * set up the initial replication state, but not for replay.
    1577             :      */
    1578           4 :     replorigin_advance(node, remote_commit, InvalidXLogRecPtr,
    1579             :                        true /* go backward */ , true /* WAL log */ );
    1580             : 
    1581           4 :     UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
    1582             : 
    1583           4 :     PG_RETURN_VOID();
    1584             : }
    1585             : 
    1586             : 
    1587             : /*
    1588             :  * Return the replication progress for an individual replication origin.
    1589             :  *
    1590             :  * If 'flush' is set to true it is ensured that the returned value corresponds
    1591             :  * to a local transaction that has been flushed. This is useful if asynchronous
    1592             :  * commits are used when replaying replicated transactions.
    1593             :  */
    1594             : Datum
    1595           6 : pg_replication_origin_progress(PG_FUNCTION_ARGS)
    1596             : {
    1597             :     char       *name;
    1598             :     bool        flush;
    1599             :     ReplOriginId roident;
    1600           6 :     XLogRecPtr  remote_lsn = InvalidXLogRecPtr;
    1601             : 
    1602           6 :     replorigin_check_prerequisites(true, true);
    1603             : 
    1604           6 :     name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
    1605           6 :     flush = PG_GETARG_BOOL(1);
    1606             : 
    1607           6 :     roident = replorigin_by_name(name, false);
    1608             :     Assert(OidIsValid(roident));
    1609             : 
    1610           4 :     remote_lsn = replorigin_get_progress(roident, flush);
    1611             : 
    1612           4 :     if (!XLogRecPtrIsValid(remote_lsn))
    1613           0 :         PG_RETURN_NULL();
    1614             : 
    1615           4 :     PG_RETURN_LSN(remote_lsn);
    1616             : }
    1617             : 
    1618             : 
    1619             : Datum
    1620          22 : pg_show_replication_origin_status(PG_FUNCTION_ARGS)
    1621             : {
    1622          22 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
    1623             :     int         i;
    1624             : #define REPLICATION_ORIGIN_PROGRESS_COLS 4
    1625             : 
    1626             :     /* we want to return 0 rows if slot is set to zero */
    1627          22 :     replorigin_check_prerequisites(false, true);
    1628             : 
    1629          22 :     InitMaterializedSRF(fcinfo, 0);
    1630             : 
    1631             :     /* prevent slots from being concurrently dropped */
    1632          22 :     LWLockAcquire(ReplicationOriginLock, LW_SHARED);
    1633             : 
    1634             :     /*
    1635             :      * Iterate through all possible replication_states, display if they are
    1636             :      * filled. Note that we do not take any locks, so slightly corrupted/out
    1637             :      * of date values are a possibility.
    1638             :      */
    1639         242 :     for (i = 0; i < max_active_replication_origins; i++)
    1640             :     {
    1641             :         ReplicationState *state;
    1642             :         Datum       values[REPLICATION_ORIGIN_PROGRESS_COLS];
    1643             :         bool        nulls[REPLICATION_ORIGIN_PROGRESS_COLS];
    1644             :         char       *roname;
    1645             : 
    1646         220 :         state = &replication_states[i];
    1647             : 
    1648             :         /* unused slot, nothing to display */
    1649         220 :         if (state->roident == InvalidReplOriginId)
    1650         194 :             continue;
    1651             : 
    1652          26 :         memset(values, 0, sizeof(values));
    1653          26 :         memset(nulls, 1, sizeof(nulls));
    1654             : 
    1655          26 :         values[0] = ObjectIdGetDatum(state->roident);
    1656          26 :         nulls[0] = false;
    1657             : 
    1658             :         /*
    1659             :          * We're not preventing the origin to be dropped concurrently, so
    1660             :          * silently accept that it might be gone.
    1661             :          */
    1662          26 :         if (replorigin_by_oid(state->roident, true,
    1663             :                               &roname))
    1664             :         {
    1665          26 :             values[1] = CStringGetTextDatum(roname);
    1666          26 :             nulls[1] = false;
    1667             :         }
    1668             : 
    1669          26 :         LWLockAcquire(&state->lock, LW_SHARED);
    1670             : 
    1671          26 :         values[2] = LSNGetDatum(state->remote_lsn);
    1672          26 :         nulls[2] = false;
    1673             : 
    1674          26 :         values[3] = LSNGetDatum(state->local_lsn);
    1675          26 :         nulls[3] = false;
    1676             : 
    1677          26 :         LWLockRelease(&state->lock);
    1678             : 
    1679          26 :         tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
    1680             :                              values, nulls);
    1681             :     }
    1682             : 
    1683          22 :     LWLockRelease(ReplicationOriginLock);
    1684             : 
    1685             : #undef REPLICATION_ORIGIN_PROGRESS_COLS
    1686             : 
    1687          22 :     return (Datum) 0;
    1688             : }

Generated by: LCOV version 1.16