LCOV - code coverage report
Current view: top level - src/backend/replication/logical - origin.c (source / functions) Hit Total Coverage
Test: PostgreSQL 19devel Lines: 409 463 88.3 %
Date: 2026-01-24 01:17:11 Functions: 30 32 93.8 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * origin.c
       4             :  *    Logical replication progress tracking support.
       5             :  *
       6             :  * Copyright (c) 2013-2026, PostgreSQL Global Development Group
       7             :  *
       8             :  * IDENTIFICATION
       9             :  *    src/backend/replication/logical/origin.c
      10             :  *
      11             :  * NOTES
      12             :  *
      13             :  * This file provides the following:
      14             :  * * An infrastructure to name nodes in a replication setup
      15             :  * * A facility to efficiently store and persist replication progress in an
      16             :  *   efficient and durable manner.
      17             :  *
      18             :  * Replication origin consists of a descriptive, user defined, external
      19             :  * name and a short, thus space efficient, internal 2 byte one. This split
      20             :  * exists because replication origin have to be stored in WAL and shared
      21             :  * memory and long descriptors would be inefficient.  For now only use 2 bytes
      22             :  * for the internal id of a replication origin as it seems unlikely that there
      23             :  * soon will be more than 65k nodes in one replication setup; and using only
      24             :  * two bytes allow us to be more space efficient.
      25             :  *
      26             :  * Replication progress is tracked in a shared memory table
      27             :  * (ReplicationState) that's dumped to disk every checkpoint. Entries
      28             :  * ('slots') in this table are identified by the internal id. That's the case
      29             :  * because it allows to increase replication progress during crash
      30             :  * recovery. To allow doing so we store the original LSN (from the originating
      31             :  * system) of a transaction in the commit record. That allows to recover the
      32             :  * precise replayed state after crash recovery; without requiring synchronous
      33             :  * commits. Allowing logical replication to use asynchronous commit is
      34             :  * generally good for performance, but especially important as it allows a
      35             :  * single threaded replay process to keep up with a source that has multiple
      36             :  * backends generating changes concurrently.  For efficiency and simplicity
      37             :  * reasons a backend can setup one replication origin that's from then used as
      38             :  * the source of changes produced by the backend, until reset again.
      39             :  *
      40             :  * This infrastructure is intended to be used in cooperation with logical
      41             :  * decoding. When replaying from a remote system the configured origin is
      42             :  * provided to output plugins, allowing prevention of replication loops and
      43             :  * other filtering.
      44             :  *
      45             :  * There are several levels of locking at work:
      46             :  *
      47             :  * * To create and drop replication origins an exclusive lock on
      48             :  *   pg_replication_slot is required for the duration. That allows us to
      49             :  *   safely and conflict free assign new origins using a dirty snapshot.
      50             :  *
      51             :  * * When creating an in-memory replication progress slot the ReplicationOrigin
      52             :  *   LWLock has to be held exclusively; when iterating over the replication
      53             :  *   progress a shared lock has to be held, the same when advancing the
      54             :  *   replication progress of an individual backend that has not setup as the
      55             :  *   session's replication origin.
      56             :  *
      57             :  * * When manipulating or looking at the remote_lsn and local_lsn fields of a
      58             :  *   replication progress slot that slot's lwlock has to be held. That's
      59             :  *   primarily because we do not assume 8 byte writes (the LSN) is atomic on
      60             :  *   all our platforms, but it also simplifies memory ordering concerns
      61             :  *   between the remote and local lsn. We use a lwlock instead of a spinlock
      62             :  *   so it's less harmful to hold the lock over a WAL write
      63             :  *   (cf. AdvanceReplicationProgress).
      64             :  *
      65             :  * ---------------------------------------------------------------------------
      66             :  */
      67             : 
      68             : #include "postgres.h"
      69             : 
      70             : #include <unistd.h>
      71             : #include <sys/stat.h>
      72             : 
      73             : #include "access/genam.h"
      74             : #include "access/htup_details.h"
      75             : #include "access/table.h"
      76             : #include "access/xact.h"
      77             : #include "access/xloginsert.h"
      78             : #include "catalog/catalog.h"
      79             : #include "catalog/indexing.h"
      80             : #include "catalog/pg_subscription.h"
      81             : #include "funcapi.h"
      82             : #include "miscadmin.h"
      83             : #include "nodes/execnodes.h"
      84             : #include "pgstat.h"
      85             : #include "replication/origin.h"
      86             : #include "replication/slot.h"
      87             : #include "storage/condition_variable.h"
      88             : #include "storage/fd.h"
      89             : #include "storage/ipc.h"
      90             : #include "storage/lmgr.h"
      91             : #include "utils/builtins.h"
      92             : #include "utils/fmgroids.h"
      93             : #include "utils/guc.h"
      94             : #include "utils/pg_lsn.h"
      95             : #include "utils/rel.h"
      96             : #include "utils/snapmgr.h"
      97             : #include "utils/syscache.h"
      98             : 
      99             : /* paths for replication origin checkpoint files */
     100             : #define PG_REPLORIGIN_CHECKPOINT_FILENAME PG_LOGICAL_DIR "/replorigin_checkpoint"
     101             : #define PG_REPLORIGIN_CHECKPOINT_TMPFILE PG_REPLORIGIN_CHECKPOINT_FILENAME ".tmp"
     102             : 
     103             : /* GUC variables */
     104             : int         max_active_replication_origins = 10;
     105             : 
     106             : /*
     107             :  * Replay progress of a single remote node.
     108             :  */
     109             : typedef struct ReplicationState
     110             : {
     111             :     /*
     112             :      * Local identifier for the remote node.
     113             :      */
     114             :     RepOriginId roident;
     115             : 
     116             :     /*
     117             :      * Location of the latest commit from the remote side.
     118             :      */
     119             :     XLogRecPtr  remote_lsn;
     120             : 
     121             :     /*
     122             :      * Remember the local lsn of the commit record so we can XLogFlush() to it
     123             :      * during a checkpoint so we know the commit record actually is safe on
     124             :      * disk.
     125             :      */
     126             :     XLogRecPtr  local_lsn;
     127             : 
     128             :     /*
     129             :      * PID of backend that's acquired slot, or 0 if none.
     130             :      */
     131             :     int         acquired_by;
     132             : 
     133             :     /* Count of processes that are currently using this origin. */
     134             :     int         refcount;
     135             : 
     136             :     /*
     137             :      * Condition variable that's signaled when acquired_by changes.
     138             :      */
     139             :     ConditionVariable origin_cv;
     140             : 
     141             :     /*
     142             :      * Lock protecting remote_lsn and local_lsn.
     143             :      */
     144             :     LWLock      lock;
     145             : } ReplicationState;
     146             : 
     147             : /*
     148             :  * On disk version of ReplicationState.
     149             :  */
     150             : typedef struct ReplicationStateOnDisk
     151             : {
     152             :     RepOriginId roident;
     153             :     XLogRecPtr  remote_lsn;
     154             : } ReplicationStateOnDisk;
     155             : 
     156             : 
     157             : typedef struct ReplicationStateCtl
     158             : {
     159             :     /* Tranche to use for per-origin LWLocks */
     160             :     int         tranche_id;
     161             :     /* Array of length max_active_replication_origins */
     162             :     ReplicationState states[FLEXIBLE_ARRAY_MEMBER];
     163             : } ReplicationStateCtl;
     164             : 
     165             : /* external variables */
     166             : RepOriginId replorigin_session_origin = InvalidRepOriginId; /* assumed identity */
     167             : XLogRecPtr  replorigin_session_origin_lsn = InvalidXLogRecPtr;
     168             : TimestampTz replorigin_session_origin_timestamp = 0;
     169             : 
     170             : /*
     171             :  * Base address into a shared memory array of replication states of size
     172             :  * max_active_replication_origins.
     173             :  */
     174             : static ReplicationState *replication_states;
     175             : 
     176             : /*
     177             :  * Actual shared memory block (replication_states[] is now part of this).
     178             :  */
     179             : static ReplicationStateCtl *replication_states_ctl;
     180             : 
     181             : /*
     182             :  * We keep a pointer to this backend's ReplicationState to avoid having to
     183             :  * search the replication_states array in replorigin_session_advance for each
     184             :  * remote commit.  (Ownership of a backend's own entry can only be changed by
     185             :  * that backend.)
     186             :  */
     187             : static ReplicationState *session_replication_state = NULL;
     188             : 
     189             : /* Magic for on disk files. */
     190             : #define REPLICATION_STATE_MAGIC ((uint32) 0x1257DADE)
     191             : 
     192             : static void
     193         136 : replorigin_check_prerequisites(bool check_origins, bool recoveryOK)
     194             : {
     195         136 :     if (check_origins && max_active_replication_origins == 0)
     196           0 :         ereport(ERROR,
     197             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     198             :                  errmsg("cannot query or manipulate replication origin when \"max_active_replication_origins\" is 0")));
     199             : 
     200         136 :     if (!recoveryOK && RecoveryInProgress())
     201           0 :         ereport(ERROR,
     202             :                 (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
     203             :                  errmsg("cannot manipulate replication origins during recovery")));
     204         136 : }
     205             : 
     206             : 
     207             : /*
     208             :  * IsReservedOriginName
     209             :  *      True iff name is either "none" or "any".
     210             :  */
     211             : static bool
     212          26 : IsReservedOriginName(const char *name)
     213             : {
     214          50 :     return ((pg_strcasecmp(name, LOGICALREP_ORIGIN_NONE) == 0) ||
     215          24 :             (pg_strcasecmp(name, LOGICALREP_ORIGIN_ANY) == 0));
     216             : }
     217             : 
     218             : /* ---------------------------------------------------------------------------
     219             :  * Functions for working with replication origins themselves.
     220             :  * ---------------------------------------------------------------------------
     221             :  */
     222             : 
     223             : /*
     224             :  * Check for a persistent replication origin identified by name.
     225             :  *
     226             :  * Returns InvalidOid if the node isn't known yet and missing_ok is true.
     227             :  */
     228             : RepOriginId
     229        1974 : replorigin_by_name(const char *roname, bool missing_ok)
     230             : {
     231             :     Form_pg_replication_origin ident;
     232        1974 :     Oid         roident = InvalidOid;
     233             :     HeapTuple   tuple;
     234             :     Datum       roname_d;
     235             : 
     236        1974 :     roname_d = CStringGetTextDatum(roname);
     237             : 
     238        1974 :     tuple = SearchSysCache1(REPLORIGNAME, roname_d);
     239        1974 :     if (HeapTupleIsValid(tuple))
     240             :     {
     241        1214 :         ident = (Form_pg_replication_origin) GETSTRUCT(tuple);
     242        1214 :         roident = ident->roident;
     243        1214 :         ReleaseSysCache(tuple);
     244             :     }
     245         760 :     else if (!missing_ok)
     246           8 :         ereport(ERROR,
     247             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
     248             :                  errmsg("replication origin \"%s\" does not exist",
     249             :                         roname)));
     250             : 
     251        1966 :     return roident;
     252             : }
     253             : 
     254             : /*
     255             :  * Create a replication origin.
     256             :  *
     257             :  * Needs to be called in a transaction.
     258             :  */
     259             : RepOriginId
     260         742 : replorigin_create(const char *roname)
     261             : {
     262             :     Oid         roident;
     263         742 :     HeapTuple   tuple = NULL;
     264             :     Relation    rel;
     265             :     Datum       roname_d;
     266             :     SnapshotData SnapshotDirty;
     267             :     SysScanDesc scan;
     268             :     ScanKeyData key;
     269             : 
     270             :     /*
     271             :      * To avoid needing a TOAST table for pg_replication_origin, we limit
     272             :      * replication origin names to 512 bytes.  This should be more than enough
     273             :      * for all practical use.
     274             :      */
     275         742 :     if (strlen(roname) > MAX_RONAME_LEN)
     276           6 :         ereport(ERROR,
     277             :                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
     278             :                  errmsg("replication origin name is too long"),
     279             :                  errdetail("Replication origin names must be no longer than %d bytes.",
     280             :                            MAX_RONAME_LEN)));
     281             : 
     282         736 :     roname_d = CStringGetTextDatum(roname);
     283             : 
     284             :     Assert(IsTransactionState());
     285             : 
     286             :     /*
     287             :      * We need the numeric replication origin to be 16bit wide, so we cannot
     288             :      * rely on the normal oid allocation. Instead we simply scan
     289             :      * pg_replication_origin for the first unused id. That's not particularly
     290             :      * efficient, but this should be a fairly infrequent operation - we can
     291             :      * easily spend a bit more code on this when it turns out it needs to be
     292             :      * faster.
     293             :      *
     294             :      * We handle concurrency by taking an exclusive lock (allowing reads!)
     295             :      * over the table for the duration of the search. Because we use a "dirty
     296             :      * snapshot" we can read rows that other in-progress sessions have
     297             :      * written, even though they would be invisible with normal snapshots. Due
     298             :      * to the exclusive lock there's no danger that new rows can appear while
     299             :      * we're checking.
     300             :      */
     301         736 :     InitDirtySnapshot(SnapshotDirty);
     302             : 
     303         736 :     rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
     304             : 
     305             :     /*
     306             :      * We want to be able to access pg_replication_origin without setting up a
     307             :      * snapshot.  To make that safe, it needs to not have a TOAST table, since
     308             :      * TOASTed data cannot be fetched without a snapshot.  As of this writing,
     309             :      * its only varlena column is roname, which we limit to 512 bytes to avoid
     310             :      * needing out-of-line storage.  If you add a TOAST table to this catalog,
     311             :      * be sure to set up a snapshot everywhere it might be needed.  For more
     312             :      * information, see https://postgr.es/m/ZvMSUPOqUU-VNADN%40nathan.
     313             :      */
     314             :     Assert(!OidIsValid(rel->rd_rel->reltoastrelid));
     315             : 
     316        1332 :     for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
     317             :     {
     318             :         bool        nulls[Natts_pg_replication_origin];
     319             :         Datum       values[Natts_pg_replication_origin];
     320             :         bool        collides;
     321             : 
     322        1332 :         CHECK_FOR_INTERRUPTS();
     323             : 
     324        1332 :         ScanKeyInit(&key,
     325             :                     Anum_pg_replication_origin_roident,
     326             :                     BTEqualStrategyNumber, F_OIDEQ,
     327             :                     ObjectIdGetDatum(roident));
     328             : 
     329        1332 :         scan = systable_beginscan(rel, ReplicationOriginIdentIndex,
     330             :                                   true /* indexOK */ ,
     331             :                                   &SnapshotDirty,
     332             :                                   1, &key);
     333             : 
     334        1332 :         collides = HeapTupleIsValid(systable_getnext(scan));
     335             : 
     336        1332 :         systable_endscan(scan);
     337             : 
     338        1332 :         if (!collides)
     339             :         {
     340             :             /*
     341             :              * Ok, found an unused roident, insert the new row and do a CCI,
     342             :              * so our callers can look it up if they want to.
     343             :              */
     344         736 :             memset(&nulls, 0, sizeof(nulls));
     345             : 
     346         736 :             values[Anum_pg_replication_origin_roident - 1] = ObjectIdGetDatum(roident);
     347         736 :             values[Anum_pg_replication_origin_roname - 1] = roname_d;
     348             : 
     349         736 :             tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
     350         736 :             CatalogTupleInsert(rel, tuple);
     351         734 :             CommandCounterIncrement();
     352         734 :             break;
     353             :         }
     354             :     }
     355             : 
     356             :     /* now release lock again,  */
     357         734 :     table_close(rel, ExclusiveLock);
     358             : 
     359         734 :     if (tuple == NULL)
     360           0 :         ereport(ERROR,
     361             :                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
     362             :                  errmsg("could not find free replication origin ID")));
     363             : 
     364         734 :     heap_freetuple(tuple);
     365         734 :     return roident;
     366             : }
     367             : 
     368             : /*
     369             :  * Helper function to drop a replication origin.
     370             :  */
     371             : static void
     372         626 : replorigin_state_clear(RepOriginId roident, bool nowait)
     373             : {
     374             :     int         i;
     375             : 
     376             :     /*
     377             :      * Clean up the slot state info, if there is any matching slot.
     378             :      */
     379         630 : restart:
     380         630 :     LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
     381             : 
     382        2064 :     for (i = 0; i < max_active_replication_origins; i++)
     383             :     {
     384        1972 :         ReplicationState *state = &replication_states[i];
     385             : 
     386        1972 :         if (state->roident == roident)
     387             :         {
     388             :             /* found our slot, is it busy? */
     389         538 :             if (state->refcount > 0)
     390             :             {
     391             :                 ConditionVariable *cv;
     392             : 
     393           4 :                 if (nowait)
     394           0 :                     ereport(ERROR,
     395             :                             (errcode(ERRCODE_OBJECT_IN_USE),
     396             :                              (state->acquired_by != 0)
     397             :                              ? errmsg("could not drop replication origin with ID %d, in use by PID %d",
     398             :                                       state->roident,
     399             :                                       state->acquired_by)
     400             :                              : errmsg("could not drop replication origin with ID %d, in use by another process",
     401             :                                       state->roident)));
     402             : 
     403             :                 /*
     404             :                  * We must wait and then retry.  Since we don't know which CV
     405             :                  * to wait on until here, we can't readily use
     406             :                  * ConditionVariablePrepareToSleep (calling it here would be
     407             :                  * wrong, since we could miss the signal if we did so); just
     408             :                  * use ConditionVariableSleep directly.
     409             :                  */
     410           4 :                 cv = &state->origin_cv;
     411             : 
     412           4 :                 LWLockRelease(ReplicationOriginLock);
     413             : 
     414           4 :                 ConditionVariableSleep(cv, WAIT_EVENT_REPLICATION_ORIGIN_DROP);
     415           4 :                 goto restart;
     416             :             }
     417             : 
     418             :             /* first make a WAL log entry */
     419             :             {
     420             :                 xl_replorigin_drop xlrec;
     421             : 
     422         534 :                 xlrec.node_id = roident;
     423         534 :                 XLogBeginInsert();
     424         534 :                 XLogRegisterData(&xlrec, sizeof(xlrec));
     425         534 :                 XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_DROP);
     426             :             }
     427             : 
     428             :             /* then clear the in-memory slot */
     429         534 :             state->roident = InvalidRepOriginId;
     430         534 :             state->remote_lsn = InvalidXLogRecPtr;
     431         534 :             state->local_lsn = InvalidXLogRecPtr;
     432         534 :             break;
     433             :         }
     434             :     }
     435         626 :     LWLockRelease(ReplicationOriginLock);
     436         626 :     ConditionVariableCancelSleep();
     437         626 : }
     438             : 
     439             : /*
     440             :  * Drop replication origin (by name).
     441             :  *
     442             :  * Needs to be called in a transaction.
     443             :  */
     444             : void
     445        1000 : replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
     446             : {
     447             :     RepOriginId roident;
     448             :     Relation    rel;
     449             :     HeapTuple   tuple;
     450             : 
     451             :     Assert(IsTransactionState());
     452             : 
     453        1000 :     rel = table_open(ReplicationOriginRelationId, RowExclusiveLock);
     454             : 
     455        1000 :     roident = replorigin_by_name(name, missing_ok);
     456             : 
     457             :     /* Lock the origin to prevent concurrent drops. */
     458         998 :     LockSharedObject(ReplicationOriginRelationId, roident, 0,
     459             :                      AccessExclusiveLock);
     460             : 
     461         998 :     tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident));
     462         998 :     if (!HeapTupleIsValid(tuple))
     463             :     {
     464         372 :         if (!missing_ok)
     465           0 :             elog(ERROR, "cache lookup failed for replication origin with ID %d",
     466             :                  roident);
     467             : 
     468             :         /*
     469             :          * We don't need to retain the locks if the origin is already dropped.
     470             :          */
     471         372 :         UnlockSharedObject(ReplicationOriginRelationId, roident, 0,
     472             :                            AccessExclusiveLock);
     473         372 :         table_close(rel, RowExclusiveLock);
     474         372 :         return;
     475             :     }
     476             : 
     477         626 :     replorigin_state_clear(roident, nowait);
     478             : 
     479             :     /*
     480             :      * Now, we can delete the catalog entry.
     481             :      */
     482         626 :     CatalogTupleDelete(rel, &tuple->t_self);
     483         626 :     ReleaseSysCache(tuple);
     484             : 
     485         626 :     CommandCounterIncrement();
     486             : 
     487             :     /* We keep the lock on pg_replication_origin until commit */
     488         626 :     table_close(rel, NoLock);
     489             : }
     490             : 
     491             : /*
     492             :  * Lookup replication origin via its oid and return the name.
     493             :  *
     494             :  * The external name is palloc'd in the calling context.
     495             :  *
     496             :  * Returns true if the origin is known, false otherwise.
     497             :  */
     498             : bool
     499          58 : replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
     500             : {
     501             :     HeapTuple   tuple;
     502             :     Form_pg_replication_origin ric;
     503             : 
     504             :     Assert(OidIsValid((Oid) roident));
     505             :     Assert(roident != InvalidRepOriginId);
     506             :     Assert(roident != DoNotReplicateId);
     507             : 
     508          58 :     tuple = SearchSysCache1(REPLORIGIDENT,
     509             :                             ObjectIdGetDatum((Oid) roident));
     510             : 
     511          58 :     if (HeapTupleIsValid(tuple))
     512             :     {
     513          52 :         ric = (Form_pg_replication_origin) GETSTRUCT(tuple);
     514          52 :         *roname = text_to_cstring(&ric->roname);
     515          52 :         ReleaseSysCache(tuple);
     516             : 
     517          52 :         return true;
     518             :     }
     519             :     else
     520             :     {
     521           6 :         *roname = NULL;
     522             : 
     523           6 :         if (!missing_ok)
     524           0 :             ereport(ERROR,
     525             :                     (errcode(ERRCODE_UNDEFINED_OBJECT),
     526             :                      errmsg("replication origin with ID %d does not exist",
     527             :                             roident)));
     528             : 
     529           6 :         return false;
     530             :     }
     531             : }
     532             : 
     533             : 
     534             : /* ---------------------------------------------------------------------------
     535             :  * Functions for handling replication progress.
     536             :  * ---------------------------------------------------------------------------
     537             :  */
     538             : 
     539             : Size
     540        8802 : ReplicationOriginShmemSize(void)
     541             : {
     542        8802 :     Size        size = 0;
     543             : 
     544        8802 :     if (max_active_replication_origins == 0)
     545           4 :         return size;
     546             : 
     547        8798 :     size = add_size(size, offsetof(ReplicationStateCtl, states));
     548             : 
     549        8798 :     size = add_size(size,
     550             :                     mul_size(max_active_replication_origins, sizeof(ReplicationState)));
     551        8798 :     return size;
     552             : }
     553             : 
     554             : void
     555        2278 : ReplicationOriginShmemInit(void)
     556             : {
     557             :     bool        found;
     558             : 
     559        2278 :     if (max_active_replication_origins == 0)
     560           2 :         return;
     561             : 
     562        2276 :     replication_states_ctl = (ReplicationStateCtl *)
     563        2276 :         ShmemInitStruct("ReplicationOriginState",
     564             :                         ReplicationOriginShmemSize(),
     565             :                         &found);
     566        2276 :     replication_states = replication_states_ctl->states;
     567             : 
     568        2276 :     if (!found)
     569             :     {
     570             :         int         i;
     571             : 
     572      186488 :         MemSet(replication_states_ctl, 0, ReplicationOriginShmemSize());
     573             : 
     574        2276 :         replication_states_ctl->tranche_id = LWTRANCHE_REPLICATION_ORIGIN_STATE;
     575             : 
     576       25018 :         for (i = 0; i < max_active_replication_origins; i++)
     577             :         {
     578       22742 :             LWLockInitialize(&replication_states[i].lock,
     579       22742 :                              replication_states_ctl->tranche_id);
     580       22742 :             ConditionVariableInit(&replication_states[i].origin_cv);
     581             :         }
     582             :     }
     583             : }
     584             : 
     585             : /* ---------------------------------------------------------------------------
     586             :  * Perform a checkpoint of each replication origin's progress with respect to
     587             :  * the replayed remote_lsn. Make sure that all transactions we refer to in the
     588             :  * checkpoint (local_lsn) are actually on-disk. This might not yet be the case
     589             :  * if the transactions were originally committed asynchronously.
     590             :  *
     591             :  * We store checkpoints in the following format:
     592             :  * +-------+------------------------+------------------+-----+--------+
     593             :  * | MAGIC | ReplicationStateOnDisk | struct Replic... | ... | CRC32C | EOF
     594             :  * +-------+------------------------+------------------+-----+--------+
     595             :  *
     596             :  * So its just the magic, followed by the statically sized
     597             :  * ReplicationStateOnDisk structs. Note that the maximum number of
     598             :  * ReplicationState is determined by max_active_replication_origins.
     599             :  * ---------------------------------------------------------------------------
     600             :  */
     601             : void
     602        3558 : CheckPointReplicationOrigin(void)
     603             : {
     604        3558 :     const char *tmppath = PG_REPLORIGIN_CHECKPOINT_TMPFILE;
     605        3558 :     const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
     606             :     int         tmpfd;
     607             :     int         i;
     608        3558 :     uint32      magic = REPLICATION_STATE_MAGIC;
     609             :     pg_crc32c   crc;
     610             : 
     611        3558 :     if (max_active_replication_origins == 0)
     612           2 :         return;
     613             : 
     614        3556 :     INIT_CRC32C(crc);
     615             : 
     616             :     /* make sure no old temp file is remaining */
     617        3556 :     if (unlink(tmppath) < 0 && errno != ENOENT)
     618           0 :         ereport(PANIC,
     619             :                 (errcode_for_file_access(),
     620             :                  errmsg("could not remove file \"%s\": %m",
     621             :                         tmppath)));
     622             : 
     623             :     /*
     624             :      * no other backend can perform this at the same time; only one checkpoint
     625             :      * can happen at a time.
     626             :      */
     627        3556 :     tmpfd = OpenTransientFile(tmppath,
     628             :                               O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
     629        3556 :     if (tmpfd < 0)
     630           0 :         ereport(PANIC,
     631             :                 (errcode_for_file_access(),
     632             :                  errmsg("could not create file \"%s\": %m",
     633             :                         tmppath)));
     634             : 
     635             :     /* write magic */
     636        3556 :     errno = 0;
     637        3556 :     if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
     638             :     {
     639             :         /* if write didn't set errno, assume problem is no disk space */
     640           0 :         if (errno == 0)
     641           0 :             errno = ENOSPC;
     642           0 :         ereport(PANIC,
     643             :                 (errcode_for_file_access(),
     644             :                  errmsg("could not write to file \"%s\": %m",
     645             :                         tmppath)));
     646             :     }
     647        3556 :     COMP_CRC32C(crc, &magic, sizeof(magic));
     648             : 
     649             :     /* prevent concurrent creations/drops */
     650        3556 :     LWLockAcquire(ReplicationOriginLock, LW_SHARED);
     651             : 
     652             :     /* write actual data */
     653       39116 :     for (i = 0; i < max_active_replication_origins; i++)
     654             :     {
     655             :         ReplicationStateOnDisk disk_state;
     656       35560 :         ReplicationState *curstate = &replication_states[i];
     657             :         XLogRecPtr  local_lsn;
     658             : 
     659       35560 :         if (curstate->roident == InvalidRepOriginId)
     660       35454 :             continue;
     661             : 
     662             :         /* zero, to avoid uninitialized padding bytes */
     663         106 :         memset(&disk_state, 0, sizeof(disk_state));
     664             : 
     665         106 :         LWLockAcquire(&curstate->lock, LW_SHARED);
     666             : 
     667         106 :         disk_state.roident = curstate->roident;
     668             : 
     669         106 :         disk_state.remote_lsn = curstate->remote_lsn;
     670         106 :         local_lsn = curstate->local_lsn;
     671             : 
     672         106 :         LWLockRelease(&curstate->lock);
     673             : 
     674             :         /* make sure we only write out a commit that's persistent */
     675         106 :         XLogFlush(local_lsn);
     676             : 
     677         106 :         errno = 0;
     678         106 :         if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
     679             :             sizeof(disk_state))
     680             :         {
     681             :             /* if write didn't set errno, assume problem is no disk space */
     682           0 :             if (errno == 0)
     683           0 :                 errno = ENOSPC;
     684           0 :             ereport(PANIC,
     685             :                     (errcode_for_file_access(),
     686             :                      errmsg("could not write to file \"%s\": %m",
     687             :                             tmppath)));
     688             :         }
     689             : 
     690         106 :         COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
     691             :     }
     692             : 
     693        3556 :     LWLockRelease(ReplicationOriginLock);
     694             : 
     695             :     /* write out the CRC */
     696        3556 :     FIN_CRC32C(crc);
     697        3556 :     errno = 0;
     698        3556 :     if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
     699             :     {
     700             :         /* if write didn't set errno, assume problem is no disk space */
     701           0 :         if (errno == 0)
     702           0 :             errno = ENOSPC;
     703           0 :         ereport(PANIC,
     704             :                 (errcode_for_file_access(),
     705             :                  errmsg("could not write to file \"%s\": %m",
     706             :                         tmppath)));
     707             :     }
     708             : 
     709        3556 :     if (CloseTransientFile(tmpfd) != 0)
     710           0 :         ereport(PANIC,
     711             :                 (errcode_for_file_access(),
     712             :                  errmsg("could not close file \"%s\": %m",
     713             :                         tmppath)));
     714             : 
     715             :     /* fsync, rename to permanent file, fsync file and directory */
     716        3556 :     durable_rename(tmppath, path, PANIC);
     717             : }
     718             : 
     719             : /*
     720             :  * Recover replication replay status from checkpoint data saved earlier by
     721             :  * CheckPointReplicationOrigin.
     722             :  *
     723             :  * This only needs to be called at startup and *not* during every checkpoint
     724             :  * read during recovery (e.g. in HS or PITR from a base backup) afterwards. All
     725             :  * state thereafter can be recovered by looking at commit records.
     726             :  */
     727             : void
     728        1986 : StartupReplicationOrigin(void)
     729             : {
     730        1986 :     const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
     731             :     int         fd;
     732             :     int         readBytes;
     733        1986 :     uint32      magic = REPLICATION_STATE_MAGIC;
     734        1986 :     int         last_state = 0;
     735             :     pg_crc32c   file_crc;
     736             :     pg_crc32c   crc;
     737             : 
     738             :     /* don't want to overwrite already existing state */
     739             : #ifdef USE_ASSERT_CHECKING
     740             :     static bool already_started = false;
     741             : 
     742             :     Assert(!already_started);
     743             :     already_started = true;
     744             : #endif
     745             : 
     746        1986 :     if (max_active_replication_origins == 0)
     747         104 :         return;
     748             : 
     749        1984 :     INIT_CRC32C(crc);
     750             : 
     751        1984 :     elog(DEBUG2, "starting up replication origin progress state");
     752             : 
     753        1984 :     fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
     754             : 
     755             :     /*
     756             :      * might have had max_active_replication_origins == 0 last run, or we just
     757             :      * brought up a standby.
     758             :      */
     759        1984 :     if (fd < 0 && errno == ENOENT)
     760         102 :         return;
     761        1882 :     else if (fd < 0)
     762           0 :         ereport(PANIC,
     763             :                 (errcode_for_file_access(),
     764             :                  errmsg("could not open file \"%s\": %m",
     765             :                         path)));
     766             : 
     767             :     /* verify magic, that is written even if nothing was active */
     768        1882 :     readBytes = read(fd, &magic, sizeof(magic));
     769        1882 :     if (readBytes != sizeof(magic))
     770             :     {
     771           0 :         if (readBytes < 0)
     772           0 :             ereport(PANIC,
     773             :                     (errcode_for_file_access(),
     774             :                      errmsg("could not read file \"%s\": %m",
     775             :                             path)));
     776             :         else
     777           0 :             ereport(PANIC,
     778             :                     (errcode(ERRCODE_DATA_CORRUPTED),
     779             :                      errmsg("could not read file \"%s\": read %d of %zu",
     780             :                             path, readBytes, sizeof(magic))));
     781             :     }
     782        1882 :     COMP_CRC32C(crc, &magic, sizeof(magic));
     783             : 
     784        1882 :     if (magic != REPLICATION_STATE_MAGIC)
     785           0 :         ereport(PANIC,
     786             :                 (errmsg("replication checkpoint has wrong magic %u instead of %u",
     787             :                         magic, REPLICATION_STATE_MAGIC)));
     788             : 
     789             :     /* we can skip locking here, no other access is possible */
     790             : 
     791             :     /* recover individual states, until there are no more to be found */
     792             :     while (true)
     793          62 :     {
     794             :         ReplicationStateOnDisk disk_state;
     795             : 
     796        1944 :         readBytes = read(fd, &disk_state, sizeof(disk_state));
     797             : 
     798        1944 :         if (readBytes < 0)
     799             :         {
     800           0 :             ereport(PANIC,
     801             :                     (errcode_for_file_access(),
     802             :                      errmsg("could not read file \"%s\": %m",
     803             :                             path)));
     804             :         }
     805             : 
     806             :         /* no further data */
     807        1944 :         if (readBytes == sizeof(crc))
     808             :         {
     809        1882 :             memcpy(&file_crc, &disk_state, sizeof(file_crc));
     810        1882 :             break;
     811             :         }
     812             : 
     813          62 :         if (readBytes != sizeof(disk_state))
     814             :         {
     815           0 :             ereport(PANIC,
     816             :                     (errcode_for_file_access(),
     817             :                      errmsg("could not read file \"%s\": read %d of %zu",
     818             :                             path, readBytes, sizeof(disk_state))));
     819             :         }
     820             : 
     821          62 :         COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
     822             : 
     823          62 :         if (last_state == max_active_replication_origins)
     824           0 :             ereport(PANIC,
     825             :                     (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     826             :                      errmsg("could not find free replication state, increase \"max_active_replication_origins\"")));
     827             : 
     828             :         /* copy data to shared memory */
     829          62 :         replication_states[last_state].roident = disk_state.roident;
     830          62 :         replication_states[last_state].remote_lsn = disk_state.remote_lsn;
     831          62 :         last_state++;
     832             : 
     833          62 :         ereport(LOG,
     834             :                 errmsg("recovered replication state of node %d to %X/%08X",
     835             :                        disk_state.roident,
     836             :                        LSN_FORMAT_ARGS(disk_state.remote_lsn)));
     837             :     }
     838             : 
     839             :     /* now check checksum */
     840        1882 :     FIN_CRC32C(crc);
     841        1882 :     if (file_crc != crc)
     842           0 :         ereport(PANIC,
     843             :                 (errcode(ERRCODE_DATA_CORRUPTED),
     844             :                  errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
     845             :                         crc, file_crc)));
     846             : 
     847        1882 :     if (CloseTransientFile(fd) != 0)
     848           0 :         ereport(PANIC,
     849             :                 (errcode_for_file_access(),
     850             :                  errmsg("could not close file \"%s\": %m",
     851             :                         path)));
     852             : }
     853             : 
     854             : void
     855           8 : replorigin_redo(XLogReaderState *record)
     856             : {
     857           8 :     uint8       info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
     858             : 
     859           8 :     switch (info)
     860             :     {
     861           4 :         case XLOG_REPLORIGIN_SET:
     862             :             {
     863           4 :                 xl_replorigin_set *xlrec =
     864           4 :                     (xl_replorigin_set *) XLogRecGetData(record);
     865             : 
     866           4 :                 replorigin_advance(xlrec->node_id,
     867             :                                    xlrec->remote_lsn, record->EndRecPtr,
     868           4 :                                    xlrec->force /* backward */ ,
     869             :                                    false /* WAL log */ );
     870           4 :                 break;
     871             :             }
     872           4 :         case XLOG_REPLORIGIN_DROP:
     873             :             {
     874             :                 xl_replorigin_drop *xlrec;
     875             :                 int         i;
     876             : 
     877           4 :                 xlrec = (xl_replorigin_drop *) XLogRecGetData(record);
     878             : 
     879           6 :                 for (i = 0; i < max_active_replication_origins; i++)
     880             :                 {
     881           6 :                     ReplicationState *state = &replication_states[i];
     882             : 
     883             :                     /* found our slot */
     884           6 :                     if (state->roident == xlrec->node_id)
     885             :                     {
     886             :                         /* reset entry */
     887           4 :                         state->roident = InvalidRepOriginId;
     888           4 :                         state->remote_lsn = InvalidXLogRecPtr;
     889           4 :                         state->local_lsn = InvalidXLogRecPtr;
     890           4 :                         break;
     891             :                     }
     892             :                 }
     893           4 :                 break;
     894             :             }
     895           0 :         default:
     896           0 :             elog(PANIC, "replorigin_redo: unknown op code %u", info);
     897             :     }
     898           8 : }
     899             : 
     900             : 
     901             : /*
     902             :  * Tell the replication origin progress machinery that a commit from 'node'
     903             :  * that originated at the LSN remote_commit on the remote node was replayed
     904             :  * successfully and that we don't need to do so again. In combination with
     905             :  * setting up replorigin_session_origin_lsn and replorigin_session_origin
     906             :  * that ensures we won't lose knowledge about that after a crash if the
     907             :  * transaction had a persistent effect (think of asynchronous commits).
     908             :  *
     909             :  * local_commit needs to be a local LSN of the commit so that we can make sure
     910             :  * upon a checkpoint that enough WAL has been persisted to disk.
     911             :  *
     912             :  * Needs to be called with a RowExclusiveLock on pg_replication_origin,
     913             :  * unless running in recovery.
     914             :  */
     915             : void
     916         484 : replorigin_advance(RepOriginId node,
     917             :                    XLogRecPtr remote_commit, XLogRecPtr local_commit,
     918             :                    bool go_backward, bool wal_log)
     919             : {
     920             :     int         i;
     921         484 :     ReplicationState *replication_state = NULL;
     922         484 :     ReplicationState *free_state = NULL;
     923             : 
     924             :     Assert(node != InvalidRepOriginId);
     925             : 
     926             :     /* we don't track DoNotReplicateId */
     927         484 :     if (node == DoNotReplicateId)
     928           0 :         return;
     929             : 
     930             :     /*
     931             :      * XXX: For the case where this is called by WAL replay, it'd be more
     932             :      * efficient to restore into a backend local hashtable and only dump into
     933             :      * shmem after recovery is finished. Let's wait with implementing that
     934             :      * till it's shown to be a measurable expense
     935             :      */
     936             : 
     937             :     /* Lock exclusively, as we may have to create a new table entry. */
     938         484 :     LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
     939             : 
     940             :     /*
     941             :      * Search for either an existing slot for the origin, or a free one we can
     942             :      * use.
     943             :      */
     944        4430 :     for (i = 0; i < max_active_replication_origins; i++)
     945             :     {
     946        4038 :         ReplicationState *curstate = &replication_states[i];
     947             : 
     948             :         /* remember where to insert if necessary */
     949        4038 :         if (curstate->roident == InvalidRepOriginId &&
     950             :             free_state == NULL)
     951             :         {
     952         392 :             free_state = curstate;
     953         392 :             continue;
     954             :         }
     955             : 
     956             :         /* not our slot */
     957        3646 :         if (curstate->roident != node)
     958             :         {
     959        3554 :             continue;
     960             :         }
     961             : 
     962             :         /* ok, found slot */
     963          92 :         replication_state = curstate;
     964             : 
     965          92 :         LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE);
     966             : 
     967             :         /* Make sure it's not used by somebody else */
     968          92 :         if (replication_state->refcount > 0)
     969             :         {
     970           0 :             ereport(ERROR,
     971             :                     (errcode(ERRCODE_OBJECT_IN_USE),
     972             :                      (replication_state->acquired_by != 0)
     973             :                      ? errmsg("replication origin with ID %d is already active for PID %d",
     974             :                               replication_state->roident,
     975             :                               replication_state->acquired_by)
     976             :                      : errmsg("replication origin with ID %d is already active in another process",
     977             :                               replication_state->roident)));
     978             :         }
     979             : 
     980          92 :         break;
     981             :     }
     982             : 
     983         484 :     if (replication_state == NULL && free_state == NULL)
     984           0 :         ereport(ERROR,
     985             :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     986             :                  errmsg("could not find free replication state slot for replication origin with ID %d",
     987             :                         node),
     988             :                  errhint("Increase \"max_active_replication_origins\" and try again.")));
     989             : 
     990         484 :     if (replication_state == NULL)
     991             :     {
     992             :         /* initialize new slot */
     993         392 :         LWLockAcquire(&free_state->lock, LW_EXCLUSIVE);
     994         392 :         replication_state = free_state;
     995             :         Assert(!XLogRecPtrIsValid(replication_state->remote_lsn));
     996             :         Assert(!XLogRecPtrIsValid(replication_state->local_lsn));
     997         392 :         replication_state->roident = node;
     998             :     }
     999             : 
    1000             :     Assert(replication_state->roident != InvalidRepOriginId);
    1001             : 
    1002             :     /*
    1003             :      * If somebody "forcefully" sets this slot, WAL log it, so it's durable
    1004             :      * and the standby gets the message. Primarily this will be called during
    1005             :      * WAL replay (of commit records) where no WAL logging is necessary.
    1006             :      */
    1007         484 :     if (wal_log)
    1008             :     {
    1009             :         xl_replorigin_set xlrec;
    1010             : 
    1011         402 :         xlrec.remote_lsn = remote_commit;
    1012         402 :         xlrec.node_id = node;
    1013         402 :         xlrec.force = go_backward;
    1014             : 
    1015         402 :         XLogBeginInsert();
    1016         402 :         XLogRegisterData(&xlrec, sizeof(xlrec));
    1017             : 
    1018         402 :         XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_SET);
    1019             :     }
    1020             : 
    1021             :     /*
    1022             :      * Due to - harmless - race conditions during a checkpoint we could see
    1023             :      * values here that are older than the ones we already have in memory. We
    1024             :      * could also see older values for prepared transactions when the prepare
    1025             :      * is sent at a later point of time along with commit prepared and there
    1026             :      * are other transactions commits between prepare and commit prepared. See
    1027             :      * ReorderBufferFinishPrepared. Don't overwrite those.
    1028             :      */
    1029         484 :     if (go_backward || replication_state->remote_lsn < remote_commit)
    1030         470 :         replication_state->remote_lsn = remote_commit;
    1031         484 :     if (XLogRecPtrIsValid(local_commit) &&
    1032          76 :         (go_backward || replication_state->local_lsn < local_commit))
    1033          80 :         replication_state->local_lsn = local_commit;
    1034         484 :     LWLockRelease(&replication_state->lock);
    1035             : 
    1036             :     /*
    1037             :      * Release *after* changing the LSNs, slot isn't acquired and thus could
    1038             :      * otherwise be dropped anytime.
    1039             :      */
    1040         484 :     LWLockRelease(ReplicationOriginLock);
    1041             : }
    1042             : 
    1043             : 
    1044             : XLogRecPtr
    1045          16 : replorigin_get_progress(RepOriginId node, bool flush)
    1046             : {
    1047             :     int         i;
    1048          16 :     XLogRecPtr  local_lsn = InvalidXLogRecPtr;
    1049          16 :     XLogRecPtr  remote_lsn = InvalidXLogRecPtr;
    1050             : 
    1051             :     /* prevent slots from being concurrently dropped */
    1052          16 :     LWLockAcquire(ReplicationOriginLock, LW_SHARED);
    1053             : 
    1054          76 :     for (i = 0; i < max_active_replication_origins; i++)
    1055             :     {
    1056             :         ReplicationState *state;
    1057             : 
    1058          70 :         state = &replication_states[i];
    1059             : 
    1060          70 :         if (state->roident == node)
    1061             :         {
    1062          10 :             LWLockAcquire(&state->lock, LW_SHARED);
    1063             : 
    1064          10 :             remote_lsn = state->remote_lsn;
    1065          10 :             local_lsn = state->local_lsn;
    1066             : 
    1067          10 :             LWLockRelease(&state->lock);
    1068             : 
    1069          10 :             break;
    1070             :         }
    1071             :     }
    1072             : 
    1073          16 :     LWLockRelease(ReplicationOriginLock);
    1074             : 
    1075          16 :     if (flush && XLogRecPtrIsValid(local_lsn))
    1076           2 :         XLogFlush(local_lsn);
    1077             : 
    1078          16 :     return remote_lsn;
    1079             : }
    1080             : 
    1081             : /* Helper function to reset the session replication origin */
    1082             : static void
    1083         942 : replorigin_session_reset_internal(void)
    1084             : {
    1085             :     ConditionVariable *cv;
    1086             : 
    1087             :     Assert(session_replication_state != NULL);
    1088             : 
    1089         942 :     LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
    1090             : 
    1091             :     /* The origin must be held by at least one process at this point. */
    1092             :     Assert(session_replication_state->refcount > 0);
    1093             : 
    1094             :     /*
    1095             :      * Reset the PID only if the current session is the first to set up this
    1096             :      * origin. This avoids clearing the first process's PID when any other
    1097             :      * session releases the origin.
    1098             :      */
    1099         942 :     if (session_replication_state->acquired_by == MyProcPid)
    1100         914 :         session_replication_state->acquired_by = 0;
    1101             : 
    1102         942 :     session_replication_state->refcount--;
    1103             : 
    1104         942 :     cv = &session_replication_state->origin_cv;
    1105         942 :     session_replication_state = NULL;
    1106             : 
    1107         942 :     LWLockRelease(ReplicationOriginLock);
    1108             : 
    1109         942 :     ConditionVariableBroadcast(cv);
    1110         942 : }
    1111             : 
    1112             : /*
    1113             :  * Tear down a (possibly) configured session replication origin during process
    1114             :  * exit.
    1115             :  */
    1116             : static void
    1117         934 : ReplicationOriginExitCleanup(int code, Datum arg)
    1118             : {
    1119         934 :     if (session_replication_state == NULL)
    1120         380 :         return;
    1121             : 
    1122         554 :     replorigin_session_reset_internal();
    1123             : }
    1124             : 
    1125             : /*
    1126             :  * Setup a replication origin in the shared memory struct if it doesn't
    1127             :  * already exist and cache access to the specific ReplicationSlot so the
    1128             :  * array doesn't have to be searched when calling
    1129             :  * replorigin_session_advance().
    1130             :  *
    1131             :  * Normally only one such cached origin can exist per process so the cached
    1132             :  * value can only be set again after the previous value is torn down with
    1133             :  * replorigin_session_reset(). For this normal case pass acquired_by = 0
    1134             :  * (meaning the slot is not allowed to be already acquired by another process).
    1135             :  *
    1136             :  * However, sometimes multiple processes can safely re-use the same origin slot
    1137             :  * (for example, multiple parallel apply processes can safely use the same
    1138             :  * origin, provided they maintain commit order by allowing only one process to
    1139             :  * commit at a time). For this case the first process must pass acquired_by =
    1140             :  * 0, and then the other processes sharing that same origin can pass
    1141             :  * acquired_by = PID of the first process.
    1142             :  */
    1143             : void
    1144         946 : replorigin_session_setup(RepOriginId node, int acquired_by)
    1145             : {
    1146             :     static bool registered_cleanup;
    1147             :     int         i;
    1148         946 :     int         free_slot = -1;
    1149             : 
    1150         946 :     if (!registered_cleanup)
    1151             :     {
    1152         934 :         on_shmem_exit(ReplicationOriginExitCleanup, 0);
    1153         934 :         registered_cleanup = true;
    1154             :     }
    1155             : 
    1156             :     Assert(max_active_replication_origins > 0);
    1157             : 
    1158         946 :     if (session_replication_state != NULL)
    1159           2 :         ereport(ERROR,
    1160             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1161             :                  errmsg("cannot setup replication origin when one is already setup")));
    1162             : 
    1163             :     /* Lock exclusively, as we may have to create a new table entry. */
    1164         944 :     LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
    1165             : 
    1166             :     /*
    1167             :      * Search for either an existing slot for the origin, or a free one we can
    1168             :      * use.
    1169             :      */
    1170        3908 :     for (i = 0; i < max_active_replication_origins; i++)
    1171             :     {
    1172        3668 :         ReplicationState *curstate = &replication_states[i];
    1173             : 
    1174             :         /* remember where to insert if necessary */
    1175        3668 :         if (curstate->roident == InvalidRepOriginId &&
    1176             :             free_slot == -1)
    1177             :         {
    1178         244 :             free_slot = i;
    1179         244 :             continue;
    1180             :         }
    1181             : 
    1182             :         /* not our slot */
    1183        3424 :         if (curstate->roident != node)
    1184        2720 :             continue;
    1185             : 
    1186         704 :         else if (curstate->acquired_by != 0 && acquired_by == 0)
    1187             :         {
    1188           0 :             ereport(ERROR,
    1189             :                     (errcode(ERRCODE_OBJECT_IN_USE),
    1190             :                      errmsg("replication origin with ID %d is already active for PID %d",
    1191             :                             curstate->roident, curstate->acquired_by)));
    1192             :         }
    1193             : 
    1194         704 :         else if (curstate->acquired_by != acquired_by)
    1195             :         {
    1196           0 :             ereport(ERROR,
    1197             :                     (errcode(ERRCODE_OBJECT_IN_USE),
    1198             :                      errmsg("could not find replication state slot for replication origin with OID %u which was acquired by %d",
    1199             :                             node, acquired_by)));
    1200             :         }
    1201             : 
    1202             :         /*
    1203             :          * The origin is in use, but PID is not recorded. This can happen if
    1204             :          * the process that originally acquired the origin exited without
    1205             :          * releasing it. To ensure correctness, other processes cannot acquire
    1206             :          * the origin until all processes currently using it have released it.
    1207             :          */
    1208         704 :         else if (curstate->acquired_by == 0 && curstate->refcount > 0)
    1209           0 :             ereport(ERROR,
    1210             :                     (errcode(ERRCODE_OBJECT_IN_USE),
    1211             :                      errmsg("replication origin with ID %d is already active in another process",
    1212             :                             curstate->roident)));
    1213             : 
    1214             :         /* ok, found slot */
    1215         704 :         session_replication_state = curstate;
    1216         704 :         break;
    1217             :     }
    1218             : 
    1219             : 
    1220         944 :     if (session_replication_state == NULL && free_slot == -1)
    1221           0 :         ereport(ERROR,
    1222             :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
    1223             :                  errmsg("could not find free replication state slot for replication origin with ID %d",
    1224             :                         node),
    1225             :                  errhint("Increase \"max_active_replication_origins\" and try again.")));
    1226         944 :     else if (session_replication_state == NULL)
    1227             :     {
    1228         240 :         if (acquired_by)
    1229           2 :             ereport(ERROR,
    1230             :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1231             :                      errmsg("cannot use PID %d for inactive replication origin with ID %d",
    1232             :                             acquired_by, node)));
    1233             : 
    1234             :         /* initialize new slot */
    1235         238 :         session_replication_state = &replication_states[free_slot];
    1236             :         Assert(!XLogRecPtrIsValid(session_replication_state->remote_lsn));
    1237             :         Assert(!XLogRecPtrIsValid(session_replication_state->local_lsn));
    1238         238 :         session_replication_state->roident = node;
    1239             :     }
    1240             : 
    1241             : 
    1242             :     Assert(session_replication_state->roident != InvalidRepOriginId);
    1243             : 
    1244         942 :     if (acquired_by == 0)
    1245             :     {
    1246         914 :         session_replication_state->acquired_by = MyProcPid;
    1247             :         Assert(session_replication_state->refcount == 0);
    1248             :     }
    1249             :     else
    1250             :     {
    1251             :         /*
    1252             :          * Sanity check: the origin must already be acquired by the process
    1253             :          * passed as input, and at least one process must be using it.
    1254             :          */
    1255             :         Assert(session_replication_state->acquired_by == acquired_by);
    1256             :         Assert(session_replication_state->refcount > 0);
    1257             :     }
    1258             : 
    1259         942 :     session_replication_state->refcount++;
    1260             : 
    1261         942 :     LWLockRelease(ReplicationOriginLock);
    1262             : 
    1263             :     /* probably this one is pointless */
    1264         942 :     ConditionVariableBroadcast(&session_replication_state->origin_cv);
    1265         942 : }
    1266             : 
    1267             : /*
    1268             :  * Reset replay state previously setup in this session.
    1269             :  *
    1270             :  * This function may only be called if an origin was setup with
    1271             :  * replorigin_session_setup().
    1272             :  */
    1273             : void
    1274         392 : replorigin_session_reset(void)
    1275             : {
    1276             :     Assert(max_active_replication_origins != 0);
    1277             : 
    1278         392 :     if (session_replication_state == NULL)
    1279           2 :         ereport(ERROR,
    1280             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1281             :                  errmsg("no replication origin is configured")));
    1282             : 
    1283             :     /*
    1284             :      * Restrict explicit resetting of the replication origin if it was first
    1285             :      * acquired by this process and others are still using it. While the
    1286             :      * system handles this safely (as happens if the first session exits
    1287             :      * without calling reset), it is best to avoid doing so.
    1288             :      */
    1289         390 :     if (session_replication_state->acquired_by == MyProcPid &&
    1290         386 :         session_replication_state->refcount > 1)
    1291           2 :         ereport(ERROR,
    1292             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1293             :                  errmsg("cannot reset replication origin with ID %d because it is still in use by other processes",
    1294             :                         session_replication_state->roident),
    1295             :                  errdetail("This session is the first process for this replication origin, and other processes are currently sharing it."),
    1296             :                  errhint("Reset the replication origin in all other processes before retrying.")));
    1297             : 
    1298         388 :     replorigin_session_reset_internal();
    1299         388 : }
    1300             : 
    1301             : /*
    1302             :  * Do the same work replorigin_advance() does, just on the session's
    1303             :  * configured origin.
    1304             :  *
    1305             :  * This is noticeably cheaper than using replorigin_advance().
    1306             :  */
    1307             : void
    1308        2196 : replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit)
    1309             : {
    1310             :     Assert(session_replication_state != NULL);
    1311             :     Assert(session_replication_state->roident != InvalidRepOriginId);
    1312             : 
    1313        2196 :     LWLockAcquire(&session_replication_state->lock, LW_EXCLUSIVE);
    1314        2196 :     if (session_replication_state->local_lsn < local_commit)
    1315        2196 :         session_replication_state->local_lsn = local_commit;
    1316        2196 :     if (session_replication_state->remote_lsn < remote_commit)
    1317        1040 :         session_replication_state->remote_lsn = remote_commit;
    1318        2196 :     LWLockRelease(&session_replication_state->lock);
    1319        2196 : }
    1320             : 
    1321             : /*
    1322             :  * Ask the machinery about the point up to which we successfully replayed
    1323             :  * changes from an already setup replication origin.
    1324             :  */
    1325             : XLogRecPtr
    1326         508 : replorigin_session_get_progress(bool flush)
    1327             : {
    1328             :     XLogRecPtr  remote_lsn;
    1329             :     XLogRecPtr  local_lsn;
    1330             : 
    1331             :     Assert(session_replication_state != NULL);
    1332             : 
    1333         508 :     LWLockAcquire(&session_replication_state->lock, LW_SHARED);
    1334         508 :     remote_lsn = session_replication_state->remote_lsn;
    1335         508 :     local_lsn = session_replication_state->local_lsn;
    1336         508 :     LWLockRelease(&session_replication_state->lock);
    1337             : 
    1338         508 :     if (flush && XLogRecPtrIsValid(local_lsn))
    1339           2 :         XLogFlush(local_lsn);
    1340             : 
    1341         508 :     return remote_lsn;
    1342             : }
    1343             : 
    1344             : 
    1345             : 
    1346             : /* ---------------------------------------------------------------------------
    1347             :  * SQL functions for working with replication origin.
    1348             :  *
    1349             :  * These mostly should be fairly short wrappers around more generic functions.
    1350             :  * ---------------------------------------------------------------------------
    1351             :  */
    1352             : 
    1353             : /*
    1354             :  * Create replication origin for the passed in name, and return the assigned
    1355             :  * oid.
    1356             :  */
    1357             : Datum
    1358          28 : pg_replication_origin_create(PG_FUNCTION_ARGS)
    1359             : {
    1360             :     char       *name;
    1361             :     RepOriginId roident;
    1362             : 
    1363          28 :     replorigin_check_prerequisites(false, false);
    1364             : 
    1365          28 :     name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
    1366             : 
    1367             :     /*
    1368             :      * Replication origins "any and "none" are reserved for system options.
    1369             :      * The origins "pg_xxx" are reserved for internal use.
    1370             :      */
    1371          28 :     if (IsReservedName(name) || IsReservedOriginName(name))
    1372           6 :         ereport(ERROR,
    1373             :                 (errcode(ERRCODE_RESERVED_NAME),
    1374             :                  errmsg("replication origin name \"%s\" is reserved",
    1375             :                         name),
    1376             :                  errdetail("Origin names \"%s\", \"%s\", and names starting with \"pg_\" are reserved.",
    1377             :                            LOGICALREP_ORIGIN_ANY, LOGICALREP_ORIGIN_NONE)));
    1378             : 
    1379             :     /*
    1380             :      * If built with appropriate switch, whine when regression-testing
    1381             :      * conventions for replication origin names are violated.
    1382             :      */
    1383             : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
    1384             :     if (strncmp(name, "regress_", 8) != 0)
    1385             :         elog(WARNING, "replication origins created by regression test cases should have names starting with \"regress_\"");
    1386             : #endif
    1387             : 
    1388          22 :     roident = replorigin_create(name);
    1389             : 
    1390          14 :     pfree(name);
    1391             : 
    1392          14 :     PG_RETURN_OID(roident);
    1393             : }
    1394             : 
    1395             : /*
    1396             :  * Drop replication origin.
    1397             :  */
    1398             : Datum
    1399          18 : pg_replication_origin_drop(PG_FUNCTION_ARGS)
    1400             : {
    1401             :     char       *name;
    1402             : 
    1403          18 :     replorigin_check_prerequisites(false, false);
    1404             : 
    1405          18 :     name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
    1406             : 
    1407          18 :     replorigin_drop_by_name(name, false, true);
    1408             : 
    1409          16 :     pfree(name);
    1410             : 
    1411          16 :     PG_RETURN_VOID();
    1412             : }
    1413             : 
    1414             : /*
    1415             :  * Return oid of a replication origin.
    1416             :  */
    1417             : Datum
    1418           0 : pg_replication_origin_oid(PG_FUNCTION_ARGS)
    1419             : {
    1420             :     char       *name;
    1421             :     RepOriginId roident;
    1422             : 
    1423           0 :     replorigin_check_prerequisites(false, false);
    1424             : 
    1425           0 :     name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
    1426           0 :     roident = replorigin_by_name(name, true);
    1427             : 
    1428           0 :     pfree(name);
    1429             : 
    1430           0 :     if (OidIsValid(roident))
    1431           0 :         PG_RETURN_OID(roident);
    1432           0 :     PG_RETURN_NULL();
    1433             : }
    1434             : 
    1435             : /*
    1436             :  * Setup a replication origin for this session.
    1437             :  */
    1438             : Datum
    1439          22 : pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
    1440             : {
    1441             :     char       *name;
    1442             :     RepOriginId origin;
    1443             :     int         pid;
    1444             : 
    1445          22 :     replorigin_check_prerequisites(true, false);
    1446             : 
    1447          22 :     name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
    1448          22 :     origin = replorigin_by_name(name, false);
    1449          20 :     pid = PG_GETARG_INT32(1);
    1450          20 :     replorigin_session_setup(origin, pid);
    1451             : 
    1452          16 :     replorigin_session_origin = origin;
    1453             : 
    1454          16 :     pfree(name);
    1455             : 
    1456          16 :     PG_RETURN_VOID();
    1457             : }
    1458             : 
    1459             : /*
    1460             :  * Reset previously setup origin in this session
    1461             :  */
    1462             : Datum
    1463          20 : pg_replication_origin_session_reset(PG_FUNCTION_ARGS)
    1464             : {
    1465          20 :     replorigin_check_prerequisites(true, false);
    1466             : 
    1467          20 :     replorigin_session_reset();
    1468             : 
    1469          16 :     replorigin_session_origin = InvalidRepOriginId;
    1470          16 :     replorigin_session_origin_lsn = InvalidXLogRecPtr;
    1471          16 :     replorigin_session_origin_timestamp = 0;
    1472             : 
    1473          16 :     PG_RETURN_VOID();
    1474             : }
    1475             : 
    1476             : /*
    1477             :  * Has a replication origin been setup for this session.
    1478             :  */
    1479             : Datum
    1480           8 : pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)
    1481             : {
    1482           8 :     replorigin_check_prerequisites(false, false);
    1483             : 
    1484           8 :     PG_RETURN_BOOL(replorigin_session_origin != InvalidRepOriginId);
    1485             : }
    1486             : 
    1487             : 
    1488             : /*
    1489             :  * Return the replication progress for origin setup in the current session.
    1490             :  *
    1491             :  * If 'flush' is set to true it is ensured that the returned value corresponds
    1492             :  * to a local transaction that has been flushed. This is useful if asynchronous
    1493             :  * commits are used when replaying replicated transactions.
    1494             :  */
    1495             : Datum
    1496           4 : pg_replication_origin_session_progress(PG_FUNCTION_ARGS)
    1497             : {
    1498           4 :     XLogRecPtr  remote_lsn = InvalidXLogRecPtr;
    1499           4 :     bool        flush = PG_GETARG_BOOL(0);
    1500             : 
    1501           4 :     replorigin_check_prerequisites(true, false);
    1502             : 
    1503           4 :     if (session_replication_state == NULL)
    1504           0 :         ereport(ERROR,
    1505             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1506             :                  errmsg("no replication origin is configured")));
    1507             : 
    1508           4 :     remote_lsn = replorigin_session_get_progress(flush);
    1509             : 
    1510           4 :     if (!XLogRecPtrIsValid(remote_lsn))
    1511           0 :         PG_RETURN_NULL();
    1512             : 
    1513           4 :     PG_RETURN_LSN(remote_lsn);
    1514             : }
    1515             : 
    1516             : Datum
    1517           2 : pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)
    1518             : {
    1519           2 :     XLogRecPtr  location = PG_GETARG_LSN(0);
    1520             : 
    1521           2 :     replorigin_check_prerequisites(true, false);
    1522             : 
    1523           2 :     if (session_replication_state == NULL)
    1524           0 :         ereport(ERROR,
    1525             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1526             :                  errmsg("no replication origin is configured")));
    1527             : 
    1528           2 :     replorigin_session_origin_lsn = location;
    1529           2 :     replorigin_session_origin_timestamp = PG_GETARG_TIMESTAMPTZ(1);
    1530             : 
    1531           2 :     PG_RETURN_VOID();
    1532             : }
    1533             : 
    1534             : Datum
    1535           0 : pg_replication_origin_xact_reset(PG_FUNCTION_ARGS)
    1536             : {
    1537           0 :     replorigin_check_prerequisites(true, false);
    1538             : 
    1539           0 :     replorigin_session_origin_lsn = InvalidXLogRecPtr;
    1540           0 :     replorigin_session_origin_timestamp = 0;
    1541             : 
    1542           0 :     PG_RETURN_VOID();
    1543             : }
    1544             : 
    1545             : 
    1546             : Datum
    1547           6 : pg_replication_origin_advance(PG_FUNCTION_ARGS)
    1548             : {
    1549           6 :     text       *name = PG_GETARG_TEXT_PP(0);
    1550           6 :     XLogRecPtr  remote_commit = PG_GETARG_LSN(1);
    1551             :     RepOriginId node;
    1552             : 
    1553           6 :     replorigin_check_prerequisites(true, false);
    1554             : 
    1555             :     /* lock to prevent the replication origin from vanishing */
    1556           6 :     LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
    1557             : 
    1558           6 :     node = replorigin_by_name(text_to_cstring(name), false);
    1559             : 
    1560             :     /*
    1561             :      * Can't sensibly pass a local commit to be flushed at checkpoint - this
    1562             :      * xact hasn't committed yet. This is why this function should be used to
    1563             :      * set up the initial replication state, but not for replay.
    1564             :      */
    1565           4 :     replorigin_advance(node, remote_commit, InvalidXLogRecPtr,
    1566             :                        true /* go backward */ , true /* WAL log */ );
    1567             : 
    1568           4 :     UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
    1569             : 
    1570           4 :     PG_RETURN_VOID();
    1571             : }
    1572             : 
    1573             : 
    1574             : /*
    1575             :  * Return the replication progress for an individual replication origin.
    1576             :  *
    1577             :  * If 'flush' is set to true it is ensured that the returned value corresponds
    1578             :  * to a local transaction that has been flushed. This is useful if asynchronous
    1579             :  * commits are used when replaying replicated transactions.
    1580             :  */
    1581             : Datum
    1582           6 : pg_replication_origin_progress(PG_FUNCTION_ARGS)
    1583             : {
    1584             :     char       *name;
    1585             :     bool        flush;
    1586             :     RepOriginId roident;
    1587           6 :     XLogRecPtr  remote_lsn = InvalidXLogRecPtr;
    1588             : 
    1589           6 :     replorigin_check_prerequisites(true, true);
    1590             : 
    1591           6 :     name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
    1592           6 :     flush = PG_GETARG_BOOL(1);
    1593             : 
    1594           6 :     roident = replorigin_by_name(name, false);
    1595             :     Assert(OidIsValid(roident));
    1596             : 
    1597           4 :     remote_lsn = replorigin_get_progress(roident, flush);
    1598             : 
    1599           4 :     if (!XLogRecPtrIsValid(remote_lsn))
    1600           0 :         PG_RETURN_NULL();
    1601             : 
    1602           4 :     PG_RETURN_LSN(remote_lsn);
    1603             : }
    1604             : 
    1605             : 
    1606             : Datum
    1607          22 : pg_show_replication_origin_status(PG_FUNCTION_ARGS)
    1608             : {
    1609          22 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
    1610             :     int         i;
    1611             : #define REPLICATION_ORIGIN_PROGRESS_COLS 4
    1612             : 
    1613             :     /* we want to return 0 rows if slot is set to zero */
    1614          22 :     replorigin_check_prerequisites(false, true);
    1615             : 
    1616          22 :     InitMaterializedSRF(fcinfo, 0);
    1617             : 
    1618             :     /* prevent slots from being concurrently dropped */
    1619          22 :     LWLockAcquire(ReplicationOriginLock, LW_SHARED);
    1620             : 
    1621             :     /*
    1622             :      * Iterate through all possible replication_states, display if they are
    1623             :      * filled. Note that we do not take any locks, so slightly corrupted/out
    1624             :      * of date values are a possibility.
    1625             :      */
    1626         242 :     for (i = 0; i < max_active_replication_origins; i++)
    1627             :     {
    1628             :         ReplicationState *state;
    1629             :         Datum       values[REPLICATION_ORIGIN_PROGRESS_COLS];
    1630             :         bool        nulls[REPLICATION_ORIGIN_PROGRESS_COLS];
    1631             :         char       *roname;
    1632             : 
    1633         220 :         state = &replication_states[i];
    1634             : 
    1635             :         /* unused slot, nothing to display */
    1636         220 :         if (state->roident == InvalidRepOriginId)
    1637         194 :             continue;
    1638             : 
    1639          26 :         memset(values, 0, sizeof(values));
    1640          26 :         memset(nulls, 1, sizeof(nulls));
    1641             : 
    1642          26 :         values[0] = ObjectIdGetDatum(state->roident);
    1643          26 :         nulls[0] = false;
    1644             : 
    1645             :         /*
    1646             :          * We're not preventing the origin to be dropped concurrently, so
    1647             :          * silently accept that it might be gone.
    1648             :          */
    1649          26 :         if (replorigin_by_oid(state->roident, true,
    1650             :                               &roname))
    1651             :         {
    1652          26 :             values[1] = CStringGetTextDatum(roname);
    1653          26 :             nulls[1] = false;
    1654             :         }
    1655             : 
    1656          26 :         LWLockAcquire(&state->lock, LW_SHARED);
    1657             : 
    1658          26 :         values[2] = LSNGetDatum(state->remote_lsn);
    1659          26 :         nulls[2] = false;
    1660             : 
    1661          26 :         values[3] = LSNGetDatum(state->local_lsn);
    1662          26 :         nulls[3] = false;
    1663             : 
    1664          26 :         LWLockRelease(&state->lock);
    1665             : 
    1666          26 :         tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
    1667             :                              values, nulls);
    1668             :     }
    1669             : 
    1670          22 :     LWLockRelease(ReplicationOriginLock);
    1671             : 
    1672             : #undef REPLICATION_ORIGIN_PROGRESS_COLS
    1673             : 
    1674          22 :     return (Datum) 0;
    1675             : }

Generated by: LCOV version 1.16