LCOV - code coverage report
Current view: top level - src/backend/replication/logical - origin.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 87.6 % 466 408
Test Date: 2026-02-28 15:14:49 Functions: 93.9 % 33 31
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * origin.c
       4              :  *    Logical replication progress tracking support.
       5              :  *
       6              :  * Copyright (c) 2013-2026, PostgreSQL Global Development Group
       7              :  *
       8              :  * IDENTIFICATION
       9              :  *    src/backend/replication/logical/origin.c
      10              :  *
      11              :  * NOTES
      12              :  *
      13              :  * This file provides the following:
      14              :  * * An infrastructure to name nodes in a replication setup
      15              :  * * A facility to efficiently store and persist replication progress in an
      16              :  *   efficient and durable manner.
      17              :  *
      18              :  * Replication origin consists of a descriptive, user defined, external
      19              :  * name and a short, thus space efficient, internal 2 byte one. This split
      20              :  * exists because replication origin have to be stored in WAL and shared
      21              :  * memory and long descriptors would be inefficient.  For now only use 2 bytes
      22              :  * for the internal id of a replication origin as it seems unlikely that there
      23              :  * soon will be more than 65k nodes in one replication setup; and using only
      24              :  * two bytes allow us to be more space efficient.
      25              :  *
      26              :  * Replication progress is tracked in a shared memory table
      27              :  * (ReplicationState) that's dumped to disk every checkpoint. Entries
      28              :  * ('slots') in this table are identified by the internal id. That's the case
      29              :  * because it allows to increase replication progress during crash
      30              :  * recovery. To allow doing so we store the original LSN (from the originating
      31              :  * system) of a transaction in the commit record. That allows to recover the
      32              :  * precise replayed state after crash recovery; without requiring synchronous
      33              :  * commits. Allowing logical replication to use asynchronous commit is
      34              :  * generally good for performance, but especially important as it allows a
      35              :  * single threaded replay process to keep up with a source that has multiple
      36              :  * backends generating changes concurrently.  For efficiency and simplicity
      37              :  * reasons a backend can setup one replication origin that's from then used as
      38              :  * the source of changes produced by the backend, until reset again.
      39              :  *
      40              :  * This infrastructure is intended to be used in cooperation with logical
      41              :  * decoding. When replaying from a remote system the configured origin is
      42              :  * provided to output plugins, allowing prevention of replication loops and
      43              :  * other filtering.
      44              :  *
      45              :  * There are several levels of locking at work:
      46              :  *
      47              :  * * To create and drop replication origins an exclusive lock on
      48              :  *   pg_replication_slot is required for the duration. That allows us to
      49              :  *   safely and conflict free assign new origins using a dirty snapshot.
      50              :  *
      51              :  * * When creating an in-memory replication progress slot the ReplicationOrigin
      52              :  *   LWLock has to be held exclusively; when iterating over the replication
      53              :  *   progress a shared lock has to be held, the same when advancing the
      54              :  *   replication progress of an individual backend that has not setup as the
      55              :  *   session's replication origin.
      56              :  *
      57              :  * * When manipulating or looking at the remote_lsn and local_lsn fields of a
      58              :  *   replication progress slot that slot's lwlock has to be held. That's
      59              :  *   primarily because we do not assume 8 byte writes (the LSN) is atomic on
      60              :  *   all our platforms, but it also simplifies memory ordering concerns
      61              :  *   between the remote and local lsn. We use a lwlock instead of a spinlock
      62              :  *   so it's less harmful to hold the lock over a WAL write
      63              :  *   (cf. AdvanceReplicationProgress).
      64              :  *
      65              :  * ---------------------------------------------------------------------------
      66              :  */
      67              : 
      68              : #include "postgres.h"
      69              : 
      70              : #include <unistd.h>
      71              : #include <sys/stat.h>
      72              : 
      73              : #include "access/genam.h"
      74              : #include "access/htup_details.h"
      75              : #include "access/table.h"
      76              : #include "access/xact.h"
      77              : #include "access/xloginsert.h"
      78              : #include "catalog/catalog.h"
      79              : #include "catalog/indexing.h"
      80              : #include "catalog/pg_subscription.h"
      81              : #include "funcapi.h"
      82              : #include "miscadmin.h"
      83              : #include "nodes/execnodes.h"
      84              : #include "pgstat.h"
      85              : #include "replication/origin.h"
      86              : #include "replication/slot.h"
      87              : #include "storage/condition_variable.h"
      88              : #include "storage/fd.h"
      89              : #include "storage/ipc.h"
      90              : #include "storage/lmgr.h"
      91              : #include "utils/builtins.h"
      92              : #include "utils/fmgroids.h"
      93              : #include "utils/guc.h"
      94              : #include "utils/pg_lsn.h"
      95              : #include "utils/rel.h"
      96              : #include "utils/snapmgr.h"
      97              : #include "utils/syscache.h"
      98              : 
      99              : /* paths for replication origin checkpoint files */
     100              : #define PG_REPLORIGIN_CHECKPOINT_FILENAME PG_LOGICAL_DIR "/replorigin_checkpoint"
     101              : #define PG_REPLORIGIN_CHECKPOINT_TMPFILE PG_REPLORIGIN_CHECKPOINT_FILENAME ".tmp"
     102              : 
     103              : /* GUC variables */
     104              : int         max_active_replication_origins = 10;
     105              : 
     106              : /*
     107              :  * Replay progress of a single remote node.
     108              :  */
     109              : typedef struct ReplicationState
     110              : {
     111              :     /*
     112              :      * Local identifier for the remote node.
     113              :      */
     114              :     ReplOriginId roident;
     115              : 
     116              :     /*
     117              :      * Location of the latest commit from the remote side.
     118              :      */
     119              :     XLogRecPtr  remote_lsn;
     120              : 
     121              :     /*
     122              :      * Remember the local lsn of the commit record so we can XLogFlush() to it
     123              :      * during a checkpoint so we know the commit record actually is safe on
     124              :      * disk.
     125              :      */
     126              :     XLogRecPtr  local_lsn;
     127              : 
     128              :     /*
     129              :      * PID of backend that's acquired slot, or 0 if none.
     130              :      */
     131              :     int         acquired_by;
     132              : 
     133              :     /* Count of processes that are currently using this origin. */
     134              :     int         refcount;
     135              : 
     136              :     /*
     137              :      * Condition variable that's signaled when acquired_by changes.
     138              :      */
     139              :     ConditionVariable origin_cv;
     140              : 
     141              :     /*
     142              :      * Lock protecting remote_lsn and local_lsn.
     143              :      */
     144              :     LWLock      lock;
     145              : } ReplicationState;
     146              : 
     147              : /*
     148              :  * On disk version of ReplicationState.
     149              :  */
     150              : typedef struct ReplicationStateOnDisk
     151              : {
     152              :     ReplOriginId roident;
     153              :     XLogRecPtr  remote_lsn;
     154              : } ReplicationStateOnDisk;
     155              : 
     156              : 
     157              : typedef struct ReplicationStateCtl
     158              : {
     159              :     /* Tranche to use for per-origin LWLocks */
     160              :     int         tranche_id;
     161              :     /* Array of length max_active_replication_origins */
     162              :     ReplicationState states[FLEXIBLE_ARRAY_MEMBER];
     163              : } ReplicationStateCtl;
     164              : 
     165              : /* Global variable for per-transaction replication origin state */
     166              : ReplOriginXactState replorigin_xact_state = {
     167              :     .origin = InvalidReplOriginId,  /* assumed identity */
     168              :     .origin_lsn = InvalidXLogRecPtr,
     169              :     .origin_timestamp = 0
     170              : };
     171              : 
     172              : /*
     173              :  * Base address into a shared memory array of replication states of size
     174              :  * max_active_replication_origins.
     175              :  */
     176              : static ReplicationState *replication_states;
     177              : 
     178              : /*
     179              :  * Actual shared memory block (replication_states[] is now part of this).
     180              :  */
     181              : static ReplicationStateCtl *replication_states_ctl;
     182              : 
     183              : /*
     184              :  * We keep a pointer to this backend's ReplicationState to avoid having to
     185              :  * search the replication_states array in replorigin_session_advance for each
     186              :  * remote commit.  (Ownership of a backend's own entry can only be changed by
     187              :  * that backend.)
     188              :  */
     189              : static ReplicationState *session_replication_state = NULL;
     190              : 
     191              : /* Magic for on disk files. */
     192              : #define REPLICATION_STATE_MAGIC ((uint32) 0x1257DADE)
     193              : 
     194              : static void
     195           68 : replorigin_check_prerequisites(bool check_origins, bool recoveryOK)
     196              : {
     197           68 :     if (check_origins && max_active_replication_origins == 0)
     198            0 :         ereport(ERROR,
     199              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     200              :                  errmsg("cannot query or manipulate replication origin when \"max_active_replication_origins\" is 0")));
     201              : 
     202           68 :     if (!recoveryOK && RecoveryInProgress())
     203            0 :         ereport(ERROR,
     204              :                 (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
     205              :                  errmsg("cannot manipulate replication origins during recovery")));
     206           68 : }
     207              : 
     208              : 
     209              : /*
     210              :  * IsReservedOriginName
     211              :  *      True iff name is either "none" or "any".
     212              :  */
     213              : static bool
     214           13 : IsReservedOriginName(const char *name)
     215              : {
     216           25 :     return ((pg_strcasecmp(name, LOGICALREP_ORIGIN_NONE) == 0) ||
     217           12 :             (pg_strcasecmp(name, LOGICALREP_ORIGIN_ANY) == 0));
     218              : }
     219              : 
     220              : /* ---------------------------------------------------------------------------
     221              :  * Functions for working with replication origins themselves.
     222              :  * ---------------------------------------------------------------------------
     223              :  */
     224              : 
     225              : /*
     226              :  * Check for a persistent replication origin identified by name.
     227              :  *
     228              :  * Returns InvalidOid if the node isn't known yet and missing_ok is true.
     229              :  */
     230              : ReplOriginId
     231         1006 : replorigin_by_name(const char *roname, bool missing_ok)
     232              : {
     233              :     Form_pg_replication_origin ident;
     234         1006 :     Oid         roident = InvalidOid;
     235              :     HeapTuple   tuple;
     236              :     Datum       roname_d;
     237              : 
     238         1006 :     roname_d = CStringGetTextDatum(roname);
     239              : 
     240         1006 :     tuple = SearchSysCache1(REPLORIGNAME, roname_d);
     241         1006 :     if (HeapTupleIsValid(tuple))
     242              :     {
     243          626 :         ident = (Form_pg_replication_origin) GETSTRUCT(tuple);
     244          626 :         roident = ident->roident;
     245          626 :         ReleaseSysCache(tuple);
     246              :     }
     247          380 :     else if (!missing_ok)
     248            4 :         ereport(ERROR,
     249              :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
     250              :                  errmsg("replication origin \"%s\" does not exist",
     251              :                         roname)));
     252              : 
     253         1002 :     return roident;
     254              : }
     255              : 
     256              : /*
     257              :  * Create a replication origin.
     258              :  *
     259              :  * Needs to be called in a transaction.
     260              :  */
     261              : ReplOriginId
     262          371 : replorigin_create(const char *roname)
     263              : {
     264              :     Oid         roident;
     265          371 :     HeapTuple   tuple = NULL;
     266              :     Relation    rel;
     267              :     Datum       roname_d;
     268              :     SnapshotData SnapshotDirty;
     269              :     SysScanDesc scan;
     270              :     ScanKeyData key;
     271              : 
     272              :     /*
     273              :      * To avoid needing a TOAST table for pg_replication_origin, we limit
     274              :      * replication origin names to 512 bytes.  This should be more than enough
     275              :      * for all practical use.
     276              :      */
     277          371 :     if (strlen(roname) > MAX_RONAME_LEN)
     278            3 :         ereport(ERROR,
     279              :                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
     280              :                  errmsg("replication origin name is too long"),
     281              :                  errdetail("Replication origin names must be no longer than %d bytes.",
     282              :                            MAX_RONAME_LEN)));
     283              : 
     284          368 :     roname_d = CStringGetTextDatum(roname);
     285              : 
     286              :     Assert(IsTransactionState());
     287              : 
     288              :     /*
     289              :      * We need the numeric replication origin to be 16bit wide, so we cannot
     290              :      * rely on the normal oid allocation. Instead we simply scan
     291              :      * pg_replication_origin for the first unused id. That's not particularly
     292              :      * efficient, but this should be a fairly infrequent operation - we can
     293              :      * easily spend a bit more code on this when it turns out it needs to be
     294              :      * faster.
     295              :      *
     296              :      * We handle concurrency by taking an exclusive lock (allowing reads!)
     297              :      * over the table for the duration of the search. Because we use a "dirty
     298              :      * snapshot" we can read rows that other in-progress sessions have
     299              :      * written, even though they would be invisible with normal snapshots. Due
     300              :      * to the exclusive lock there's no danger that new rows can appear while
     301              :      * we're checking.
     302              :      */
     303          368 :     InitDirtySnapshot(SnapshotDirty);
     304              : 
     305          368 :     rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
     306              : 
     307              :     /*
     308              :      * We want to be able to access pg_replication_origin without setting up a
     309              :      * snapshot.  To make that safe, it needs to not have a TOAST table, since
     310              :      * TOASTed data cannot be fetched without a snapshot.  As of this writing,
     311              :      * its only varlena column is roname, which we limit to 512 bytes to avoid
     312              :      * needing out-of-line storage.  If you add a TOAST table to this catalog,
     313              :      * be sure to set up a snapshot everywhere it might be needed.  For more
     314              :      * information, see https://postgr.es/m/ZvMSUPOqUU-VNADN%40nathan.
     315              :      */
     316              :     Assert(!OidIsValid(rel->rd_rel->reltoastrelid));
     317              : 
     318          669 :     for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
     319              :     {
     320              :         bool        nulls[Natts_pg_replication_origin];
     321              :         Datum       values[Natts_pg_replication_origin];
     322              :         bool        collides;
     323              : 
     324          669 :         CHECK_FOR_INTERRUPTS();
     325              : 
     326          669 :         ScanKeyInit(&key,
     327              :                     Anum_pg_replication_origin_roident,
     328              :                     BTEqualStrategyNumber, F_OIDEQ,
     329              :                     ObjectIdGetDatum(roident));
     330              : 
     331          669 :         scan = systable_beginscan(rel, ReplicationOriginIdentIndex,
     332              :                                   true /* indexOK */ ,
     333              :                                   &SnapshotDirty,
     334              :                                   1, &key);
     335              : 
     336          669 :         collides = HeapTupleIsValid(systable_getnext(scan));
     337              : 
     338          669 :         systable_endscan(scan);
     339              : 
     340          669 :         if (!collides)
     341              :         {
     342              :             /*
     343              :              * Ok, found an unused roident, insert the new row and do a CCI,
     344              :              * so our callers can look it up if they want to.
     345              :              */
     346          368 :             memset(&nulls, 0, sizeof(nulls));
     347              : 
     348          368 :             values[Anum_pg_replication_origin_roident - 1] = ObjectIdGetDatum(roident);
     349          368 :             values[Anum_pg_replication_origin_roname - 1] = roname_d;
     350              : 
     351          368 :             tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
     352          368 :             CatalogTupleInsert(rel, tuple);
     353          367 :             CommandCounterIncrement();
     354          367 :             break;
     355              :         }
     356              :     }
     357              : 
     358              :     /* now release lock again,  */
     359          367 :     table_close(rel, ExclusiveLock);
     360              : 
     361          367 :     if (tuple == NULL)
     362            0 :         ereport(ERROR,
     363              :                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
     364              :                  errmsg("could not find free replication origin ID")));
     365              : 
     366          367 :     heap_freetuple(tuple);
     367          367 :     return roident;
     368              : }
     369              : 
     370              : /*
     371              :  * Helper function to drop a replication origin.
     372              :  */
     373              : static void
     374          313 : replorigin_state_clear(ReplOriginId roident, bool nowait)
     375              : {
     376              :     int         i;
     377              : 
     378              :     /*
     379              :      * Clean up the slot state info, if there is any matching slot.
     380              :      */
     381          313 : restart:
     382          313 :     LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
     383              : 
     384         1032 :     for (i = 0; i < max_active_replication_origins; i++)
     385              :     {
     386          986 :         ReplicationState *state = &replication_states[i];
     387              : 
     388          986 :         if (state->roident == roident)
     389              :         {
     390              :             /* found our slot, is it busy? */
     391          267 :             if (state->refcount > 0)
     392              :             {
     393              :                 ConditionVariable *cv;
     394              : 
     395            0 :                 if (nowait)
     396            0 :                     ereport(ERROR,
     397              :                             (errcode(ERRCODE_OBJECT_IN_USE),
     398              :                              (state->acquired_by != 0)
     399              :                              ? errmsg("could not drop replication origin with ID %d, in use by PID %d",
     400              :                                       state->roident,
     401              :                                       state->acquired_by)
     402              :                              : errmsg("could not drop replication origin with ID %d, in use by another process",
     403              :                                       state->roident)));
     404              : 
     405              :                 /*
     406              :                  * We must wait and then retry.  Since we don't know which CV
     407              :                  * to wait on until here, we can't readily use
     408              :                  * ConditionVariablePrepareToSleep (calling it here would be
     409              :                  * wrong, since we could miss the signal if we did so); just
     410              :                  * use ConditionVariableSleep directly.
     411              :                  */
     412            0 :                 cv = &state->origin_cv;
     413              : 
     414            0 :                 LWLockRelease(ReplicationOriginLock);
     415              : 
     416            0 :                 ConditionVariableSleep(cv, WAIT_EVENT_REPLICATION_ORIGIN_DROP);
     417            0 :                 goto restart;
     418              :             }
     419              : 
     420              :             /* first make a WAL log entry */
     421              :             {
     422              :                 xl_replorigin_drop xlrec;
     423              : 
     424          267 :                 xlrec.node_id = roident;
     425          267 :                 XLogBeginInsert();
     426          267 :                 XLogRegisterData(&xlrec, sizeof(xlrec));
     427          267 :                 XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_DROP);
     428              :             }
     429              : 
     430              :             /* then clear the in-memory slot */
     431          267 :             state->roident = InvalidReplOriginId;
     432          267 :             state->remote_lsn = InvalidXLogRecPtr;
     433          267 :             state->local_lsn = InvalidXLogRecPtr;
     434          267 :             break;
     435              :         }
     436              :     }
     437          313 :     LWLockRelease(ReplicationOriginLock);
     438          313 :     ConditionVariableCancelSleep();
     439          313 : }
     440              : 
     441              : /*
     442              :  * Drop replication origin (by name).
     443              :  *
     444              :  * Needs to be called in a transaction.
     445              :  */
     446              : void
     447          500 : replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
     448              : {
     449              :     ReplOriginId roident;
     450              :     Relation    rel;
     451              :     HeapTuple   tuple;
     452              : 
     453              :     Assert(IsTransactionState());
     454              : 
     455          500 :     rel = table_open(ReplicationOriginRelationId, RowExclusiveLock);
     456              : 
     457          500 :     roident = replorigin_by_name(name, missing_ok);
     458              : 
     459              :     /* Lock the origin to prevent concurrent drops. */
     460          499 :     LockSharedObject(ReplicationOriginRelationId, roident, 0,
     461              :                      AccessExclusiveLock);
     462              : 
     463          499 :     tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident));
     464          499 :     if (!HeapTupleIsValid(tuple))
     465              :     {
     466          186 :         if (!missing_ok)
     467            0 :             elog(ERROR, "cache lookup failed for replication origin with ID %d",
     468              :                  roident);
     469              : 
     470              :         /*
     471              :          * We don't need to retain the locks if the origin is already dropped.
     472              :          */
     473          186 :         UnlockSharedObject(ReplicationOriginRelationId, roident, 0,
     474              :                            AccessExclusiveLock);
     475          186 :         table_close(rel, RowExclusiveLock);
     476          186 :         return;
     477              :     }
     478              : 
     479          313 :     replorigin_state_clear(roident, nowait);
     480              : 
     481              :     /*
     482              :      * Now, we can delete the catalog entry.
     483              :      */
     484          313 :     CatalogTupleDelete(rel, &tuple->t_self);
     485          313 :     ReleaseSysCache(tuple);
     486              : 
     487          313 :     CommandCounterIncrement();
     488              : 
     489              :     /* We keep the lock on pg_replication_origin until commit */
     490          313 :     table_close(rel, NoLock);
     491              : }
     492              : 
     493              : /*
     494              :  * Lookup replication origin via its oid and return the name.
     495              :  *
     496              :  * The external name is palloc'd in the calling context.
     497              :  *
     498              :  * Returns true if the origin is known, false otherwise.
     499              :  */
     500              : bool
     501           28 : replorigin_by_oid(ReplOriginId roident, bool missing_ok, char **roname)
     502              : {
     503              :     HeapTuple   tuple;
     504              :     Form_pg_replication_origin ric;
     505              : 
     506              :     Assert(OidIsValid((Oid) roident));
     507              :     Assert(roident != InvalidReplOriginId);
     508              :     Assert(roident != DoNotReplicateId);
     509              : 
     510           28 :     tuple = SearchSysCache1(REPLORIGIDENT,
     511              :                             ObjectIdGetDatum((Oid) roident));
     512              : 
     513           28 :     if (HeapTupleIsValid(tuple))
     514              :     {
     515           25 :         ric = (Form_pg_replication_origin) GETSTRUCT(tuple);
     516           25 :         *roname = text_to_cstring(&ric->roname);
     517           25 :         ReleaseSysCache(tuple);
     518              : 
     519           25 :         return true;
     520              :     }
     521              :     else
     522              :     {
     523            3 :         *roname = NULL;
     524              : 
     525            3 :         if (!missing_ok)
     526            0 :             ereport(ERROR,
     527              :                     (errcode(ERRCODE_UNDEFINED_OBJECT),
     528              :                      errmsg("replication origin with ID %d does not exist",
     529              :                             roident)));
     530              : 
     531            3 :         return false;
     532              :     }
     533              : }
     534              : 
     535              : 
     536              : /* ---------------------------------------------------------------------------
     537              :  * Functions for handling replication progress.
     538              :  * ---------------------------------------------------------------------------
     539              :  */
     540              : 
     541              : Size
     542         4445 : ReplicationOriginShmemSize(void)
     543              : {
     544         4445 :     Size        size = 0;
     545              : 
     546         4445 :     if (max_active_replication_origins == 0)
     547            2 :         return size;
     548              : 
     549         4443 :     size = add_size(size, offsetof(ReplicationStateCtl, states));
     550              : 
     551         4443 :     size = add_size(size,
     552              :                     mul_size(max_active_replication_origins, sizeof(ReplicationState)));
     553         4443 :     return size;
     554              : }
     555              : 
     556              : void
     557         1150 : ReplicationOriginShmemInit(void)
     558              : {
     559              :     bool        found;
     560              : 
     561         1150 :     if (max_active_replication_origins == 0)
     562            1 :         return;
     563              : 
     564         1149 :     replication_states_ctl = (ReplicationStateCtl *)
     565         1149 :         ShmemInitStruct("ReplicationOriginState",
     566              :                         ReplicationOriginShmemSize(),
     567              :                         &found);
     568         1149 :     replication_states = replication_states_ctl->states;
     569              : 
     570         1149 :     if (!found)
     571              :     {
     572              :         int         i;
     573              : 
     574        94146 :         MemSet(replication_states_ctl, 0, ReplicationOriginShmemSize());
     575              : 
     576         1149 :         replication_states_ctl->tranche_id = LWTRANCHE_REPLICATION_ORIGIN_STATE;
     577              : 
     578        12630 :         for (i = 0; i < max_active_replication_origins; i++)
     579              :         {
     580        11481 :             LWLockInitialize(&replication_states[i].lock,
     581        11481 :                              replication_states_ctl->tranche_id);
     582        11481 :             ConditionVariableInit(&replication_states[i].origin_cv);
     583              :         }
     584              :     }
     585              : }
     586              : 
     587              : /* ---------------------------------------------------------------------------
     588              :  * Perform a checkpoint of each replication origin's progress with respect to
     589              :  * the replayed remote_lsn. Make sure that all transactions we refer to in the
     590              :  * checkpoint (local_lsn) are actually on-disk. This might not yet be the case
     591              :  * if the transactions were originally committed asynchronously.
     592              :  *
     593              :  * We store checkpoints in the following format:
     594              :  * +-------+------------------------+------------------+-----+--------+
     595              :  * | MAGIC | ReplicationStateOnDisk | struct Replic... | ... | CRC32C | EOF
     596              :  * +-------+------------------------+------------------+-----+--------+
     597              :  *
     598              :  * So its just the magic, followed by the statically sized
     599              :  * ReplicationStateOnDisk structs. Note that the maximum number of
     600              :  * ReplicationState is determined by max_active_replication_origins.
     601              :  * ---------------------------------------------------------------------------
     602              :  */
     603              : void
     604         1795 : CheckPointReplicationOrigin(void)
     605              : {
     606         1795 :     const char *tmppath = PG_REPLORIGIN_CHECKPOINT_TMPFILE;
     607         1795 :     const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
     608              :     int         tmpfd;
     609              :     int         i;
     610         1795 :     uint32      magic = REPLICATION_STATE_MAGIC;
     611              :     pg_crc32c   crc;
     612              : 
     613         1795 :     if (max_active_replication_origins == 0)
     614            1 :         return;
     615              : 
     616         1794 :     INIT_CRC32C(crc);
     617              : 
     618              :     /* make sure no old temp file is remaining */
     619         1794 :     if (unlink(tmppath) < 0 && errno != ENOENT)
     620            0 :         ereport(PANIC,
     621              :                 (errcode_for_file_access(),
     622              :                  errmsg("could not remove file \"%s\": %m",
     623              :                         tmppath)));
     624              : 
     625              :     /*
     626              :      * no other backend can perform this at the same time; only one checkpoint
     627              :      * can happen at a time.
     628              :      */
     629         1794 :     tmpfd = OpenTransientFile(tmppath,
     630              :                               O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
     631         1794 :     if (tmpfd < 0)
     632            0 :         ereport(PANIC,
     633              :                 (errcode_for_file_access(),
     634              :                  errmsg("could not create file \"%s\": %m",
     635              :                         tmppath)));
     636              : 
     637              :     /* write magic */
     638         1794 :     errno = 0;
     639         1794 :     if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
     640              :     {
     641              :         /* if write didn't set errno, assume problem is no disk space */
     642            0 :         if (errno == 0)
     643            0 :             errno = ENOSPC;
     644            0 :         ereport(PANIC,
     645              :                 (errcode_for_file_access(),
     646              :                  errmsg("could not write to file \"%s\": %m",
     647              :                         tmppath)));
     648              :     }
     649         1794 :     COMP_CRC32C(crc, &magic, sizeof(magic));
     650              : 
     651              :     /* prevent concurrent creations/drops */
     652         1794 :     LWLockAcquire(ReplicationOriginLock, LW_SHARED);
     653              : 
     654              :     /* write actual data */
     655        19734 :     for (i = 0; i < max_active_replication_origins; i++)
     656              :     {
     657              :         ReplicationStateOnDisk disk_state;
     658        17940 :         ReplicationState *curstate = &replication_states[i];
     659              :         XLogRecPtr  local_lsn;
     660              : 
     661        17940 :         if (curstate->roident == InvalidReplOriginId)
     662        17887 :             continue;
     663              : 
     664              :         /* zero, to avoid uninitialized padding bytes */
     665           53 :         memset(&disk_state, 0, sizeof(disk_state));
     666              : 
     667           53 :         LWLockAcquire(&curstate->lock, LW_SHARED);
     668              : 
     669           53 :         disk_state.roident = curstate->roident;
     670              : 
     671           53 :         disk_state.remote_lsn = curstate->remote_lsn;
     672           53 :         local_lsn = curstate->local_lsn;
     673              : 
     674           53 :         LWLockRelease(&curstate->lock);
     675              : 
     676              :         /* make sure we only write out a commit that's persistent */
     677           53 :         XLogFlush(local_lsn);
     678              : 
     679           53 :         errno = 0;
     680           53 :         if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
     681              :             sizeof(disk_state))
     682              :         {
     683              :             /* if write didn't set errno, assume problem is no disk space */
     684            0 :             if (errno == 0)
     685            0 :                 errno = ENOSPC;
     686            0 :             ereport(PANIC,
     687              :                     (errcode_for_file_access(),
     688              :                      errmsg("could not write to file \"%s\": %m",
     689              :                             tmppath)));
     690              :         }
     691              : 
     692           53 :         COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
     693              :     }
     694              : 
     695         1794 :     LWLockRelease(ReplicationOriginLock);
     696              : 
     697              :     /* write out the CRC */
     698         1794 :     FIN_CRC32C(crc);
     699         1794 :     errno = 0;
     700         1794 :     if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
     701              :     {
     702              :         /* if write didn't set errno, assume problem is no disk space */
     703            0 :         if (errno == 0)
     704            0 :             errno = ENOSPC;
     705            0 :         ereport(PANIC,
     706              :                 (errcode_for_file_access(),
     707              :                  errmsg("could not write to file \"%s\": %m",
     708              :                         tmppath)));
     709              :     }
     710              : 
     711         1794 :     if (CloseTransientFile(tmpfd) != 0)
     712            0 :         ereport(PANIC,
     713              :                 (errcode_for_file_access(),
     714              :                  errmsg("could not close file \"%s\": %m",
     715              :                         tmppath)));
     716              : 
     717              :     /* fsync, rename to permanent file, fsync file and directory */
     718         1794 :     durable_rename(tmppath, path, PANIC);
     719              : }
     720              : 
     721              : /*
     722              :  * Recover replication replay status from checkpoint data saved earlier by
     723              :  * CheckPointReplicationOrigin.
     724              :  *
     725              :  * This only needs to be called at startup and *not* during every checkpoint
     726              :  * read during recovery (e.g. in HS or PITR from a base backup) afterwards. All
     727              :  * state thereafter can be recovered by looking at commit records.
     728              :  */
     729              : void
     730         1002 : StartupReplicationOrigin(void)
     731              : {
     732         1002 :     const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
     733              :     int         fd;
     734              :     int         readBytes;
     735         1002 :     uint32      magic = REPLICATION_STATE_MAGIC;
     736         1002 :     int         last_state = 0;
     737              :     pg_crc32c   file_crc;
     738              :     pg_crc32c   crc;
     739              : 
     740              :     /* don't want to overwrite already existing state */
     741              : #ifdef USE_ASSERT_CHECKING
     742              :     static bool already_started = false;
     743              : 
     744              :     Assert(!already_started);
     745              :     already_started = true;
     746              : #endif
     747              : 
     748         1002 :     if (max_active_replication_origins == 0)
     749           52 :         return;
     750              : 
     751         1001 :     INIT_CRC32C(crc);
     752              : 
     753         1001 :     elog(DEBUG2, "starting up replication origin progress state");
     754              : 
     755         1001 :     fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
     756              : 
     757              :     /*
     758              :      * might have had max_active_replication_origins == 0 last run, or we just
     759              :      * brought up a standby.
     760              :      */
     761         1001 :     if (fd < 0 && errno == ENOENT)
     762           51 :         return;
     763          950 :     else if (fd < 0)
     764            0 :         ereport(PANIC,
     765              :                 (errcode_for_file_access(),
     766              :                  errmsg("could not open file \"%s\": %m",
     767              :                         path)));
     768              : 
     769              :     /* verify magic, that is written even if nothing was active */
     770          950 :     readBytes = read(fd, &magic, sizeof(magic));
     771          950 :     if (readBytes != sizeof(magic))
     772              :     {
     773            0 :         if (readBytes < 0)
     774            0 :             ereport(PANIC,
     775              :                     (errcode_for_file_access(),
     776              :                      errmsg("could not read file \"%s\": %m",
     777              :                             path)));
     778              :         else
     779            0 :             ereport(PANIC,
     780              :                     (errcode(ERRCODE_DATA_CORRUPTED),
     781              :                      errmsg("could not read file \"%s\": read %d of %zu",
     782              :                             path, readBytes, sizeof(magic))));
     783              :     }
     784          950 :     COMP_CRC32C(crc, &magic, sizeof(magic));
     785              : 
     786          950 :     if (magic != REPLICATION_STATE_MAGIC)
     787            0 :         ereport(PANIC,
     788              :                 (errmsg("replication checkpoint has wrong magic %u instead of %u",
     789              :                         magic, REPLICATION_STATE_MAGIC)));
     790              : 
     791              :     /* we can skip locking here, no other access is possible */
     792              : 
     793              :     /* recover individual states, until there are no more to be found */
     794              :     while (true)
     795           31 :     {
     796              :         ReplicationStateOnDisk disk_state;
     797              : 
     798          981 :         readBytes = read(fd, &disk_state, sizeof(disk_state));
     799              : 
     800          981 :         if (readBytes < 0)
     801              :         {
     802            0 :             ereport(PANIC,
     803              :                     (errcode_for_file_access(),
     804              :                      errmsg("could not read file \"%s\": %m",
     805              :                             path)));
     806              :         }
     807              : 
     808              :         /* no further data */
     809          981 :         if (readBytes == sizeof(crc))
     810              :         {
     811          950 :             memcpy(&file_crc, &disk_state, sizeof(file_crc));
     812          950 :             break;
     813              :         }
     814              : 
     815           31 :         if (readBytes != sizeof(disk_state))
     816              :         {
     817            0 :             ereport(PANIC,
     818              :                     (errcode_for_file_access(),
     819              :                      errmsg("could not read file \"%s\": read %d of %zu",
     820              :                             path, readBytes, sizeof(disk_state))));
     821              :         }
     822              : 
     823           31 :         COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
     824              : 
     825           31 :         if (last_state == max_active_replication_origins)
     826            0 :             ereport(PANIC,
     827              :                     (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     828              :                      errmsg("could not find free replication state, increase \"max_active_replication_origins\"")));
     829              : 
     830              :         /* copy data to shared memory */
     831           31 :         replication_states[last_state].roident = disk_state.roident;
     832           31 :         replication_states[last_state].remote_lsn = disk_state.remote_lsn;
     833           31 :         last_state++;
     834              : 
     835           31 :         ereport(LOG,
     836              :                 errmsg("recovered replication state of node %d to %X/%08X",
     837              :                        disk_state.roident,
     838              :                        LSN_FORMAT_ARGS(disk_state.remote_lsn)));
     839              :     }
     840              : 
     841              :     /* now check checksum */
     842          950 :     FIN_CRC32C(crc);
     843          950 :     if (file_crc != crc)
     844            0 :         ereport(PANIC,
     845              :                 (errcode(ERRCODE_DATA_CORRUPTED),
     846              :                  errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
     847              :                         crc, file_crc)));
     848              : 
     849          950 :     if (CloseTransientFile(fd) != 0)
     850            0 :         ereport(PANIC,
     851              :                 (errcode_for_file_access(),
     852              :                  errmsg("could not close file \"%s\": %m",
     853              :                         path)));
     854              : }
     855              : 
     856              : void
     857            4 : replorigin_redo(XLogReaderState *record)
     858              : {
     859            4 :     uint8       info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
     860              : 
     861            4 :     switch (info)
     862              :     {
     863            2 :         case XLOG_REPLORIGIN_SET:
     864              :             {
     865            2 :                 xl_replorigin_set *xlrec =
     866            2 :                     (xl_replorigin_set *) XLogRecGetData(record);
     867              : 
     868            2 :                 replorigin_advance(xlrec->node_id,
     869              :                                    xlrec->remote_lsn, record->EndRecPtr,
     870            2 :                                    xlrec->force /* backward */ ,
     871              :                                    false /* WAL log */ );
     872            2 :                 break;
     873              :             }
     874            2 :         case XLOG_REPLORIGIN_DROP:
     875              :             {
     876              :                 xl_replorigin_drop *xlrec;
     877              :                 int         i;
     878              : 
     879            2 :                 xlrec = (xl_replorigin_drop *) XLogRecGetData(record);
     880              : 
     881            3 :                 for (i = 0; i < max_active_replication_origins; i++)
     882              :                 {
     883            3 :                     ReplicationState *state = &replication_states[i];
     884              : 
     885              :                     /* found our slot */
     886            3 :                     if (state->roident == xlrec->node_id)
     887              :                     {
     888              :                         /* reset entry */
     889            2 :                         state->roident = InvalidReplOriginId;
     890            2 :                         state->remote_lsn = InvalidXLogRecPtr;
     891            2 :                         state->local_lsn = InvalidXLogRecPtr;
     892            2 :                         break;
     893              :                     }
     894              :                 }
     895            2 :                 break;
     896              :             }
     897            0 :         default:
     898            0 :             elog(PANIC, "replorigin_redo: unknown op code %u", info);
     899              :     }
     900            4 : }
     901              : 
     902              : 
     903              : /*
     904              :  * Tell the replication origin progress machinery that a commit from 'node'
     905              :  * that originated at the LSN remote_commit on the remote node was replayed
     906              :  * successfully and that we don't need to do so again. In combination with
     907              :  * setting up replorigin_xact_state {.origin_lsn, .origin_timestamp}
     908              :  * that ensures we won't lose knowledge about that after a crash if the
     909              :  * transaction had a persistent effect (think of asynchronous commits).
     910              :  *
     911              :  * local_commit needs to be a local LSN of the commit so that we can make sure
     912              :  * upon a checkpoint that enough WAL has been persisted to disk.
     913              :  *
     914              :  * Needs to be called with a RowExclusiveLock on pg_replication_origin,
     915              :  * unless running in recovery.
     916              :  */
     917              : void
     918          241 : replorigin_advance(ReplOriginId node,
     919              :                    XLogRecPtr remote_commit, XLogRecPtr local_commit,
     920              :                    bool go_backward, bool wal_log)
     921              : {
     922              :     int         i;
     923          241 :     ReplicationState *replication_state = NULL;
     924          241 :     ReplicationState *free_state = NULL;
     925              : 
     926              :     Assert(node != InvalidReplOriginId);
     927              : 
     928              :     /* we don't track DoNotReplicateId */
     929          241 :     if (node == DoNotReplicateId)
     930            0 :         return;
     931              : 
     932              :     /*
     933              :      * XXX: For the case where this is called by WAL replay, it'd be more
     934              :      * efficient to restore into a backend local hashtable and only dump into
     935              :      * shmem after recovery is finished. Let's wait with implementing that
     936              :      * till it's shown to be a measurable expense
     937              :      */
     938              : 
     939              :     /* Lock exclusively, as we may have to create a new table entry. */
     940          241 :     LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
     941              : 
     942              :     /*
     943              :      * Search for either an existing slot for the origin, or a free one we can
     944              :      * use.
     945              :      */
     946         2213 :     for (i = 0; i < max_active_replication_origins; i++)
     947              :     {
     948         2017 :         ReplicationState *curstate = &replication_states[i];
     949              : 
     950              :         /* remember where to insert if necessary */
     951         2017 :         if (curstate->roident == InvalidReplOriginId &&
     952              :             free_state == NULL)
     953              :         {
     954          196 :             free_state = curstate;
     955          196 :             continue;
     956              :         }
     957              : 
     958              :         /* not our slot */
     959         1821 :         if (curstate->roident != node)
     960              :         {
     961         1776 :             continue;
     962              :         }
     963              : 
     964              :         /* ok, found slot */
     965           45 :         replication_state = curstate;
     966              : 
     967           45 :         LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE);
     968              : 
     969              :         /* Make sure it's not used by somebody else */
     970           45 :         if (replication_state->refcount > 0)
     971              :         {
     972            0 :             ereport(ERROR,
     973              :                     (errcode(ERRCODE_OBJECT_IN_USE),
     974              :                      (replication_state->acquired_by != 0)
     975              :                      ? errmsg("replication origin with ID %d is already active for PID %d",
     976              :                               replication_state->roident,
     977              :                               replication_state->acquired_by)
     978              :                      : errmsg("replication origin with ID %d is already active in another process",
     979              :                               replication_state->roident)));
     980              :         }
     981              : 
     982           45 :         break;
     983              :     }
     984              : 
     985          241 :     if (replication_state == NULL && free_state == NULL)
     986            0 :         ereport(ERROR,
     987              :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     988              :                  errmsg("could not find free replication state slot for replication origin with ID %d",
     989              :                         node),
     990              :                  errhint("Increase \"max_active_replication_origins\" and try again.")));
     991              : 
     992          241 :     if (replication_state == NULL)
     993              :     {
     994              :         /* initialize new slot */
     995          196 :         LWLockAcquire(&free_state->lock, LW_EXCLUSIVE);
     996          196 :         replication_state = free_state;
     997              :         Assert(!XLogRecPtrIsValid(replication_state->remote_lsn));
     998              :         Assert(!XLogRecPtrIsValid(replication_state->local_lsn));
     999          196 :         replication_state->roident = node;
    1000              :     }
    1001              : 
    1002              :     Assert(replication_state->roident != InvalidReplOriginId);
    1003              : 
    1004              :     /*
    1005              :      * If somebody "forcefully" sets this slot, WAL log it, so it's durable
    1006              :      * and the standby gets the message. Primarily this will be called during
    1007              :      * WAL replay (of commit records) where no WAL logging is necessary.
    1008              :      */
    1009          241 :     if (wal_log)
    1010              :     {
    1011              :         xl_replorigin_set xlrec;
    1012              : 
    1013          200 :         xlrec.remote_lsn = remote_commit;
    1014          200 :         xlrec.node_id = node;
    1015          200 :         xlrec.force = go_backward;
    1016              : 
    1017          200 :         XLogBeginInsert();
    1018          200 :         XLogRegisterData(&xlrec, sizeof(xlrec));
    1019              : 
    1020          200 :         XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_SET);
    1021              :     }
    1022              : 
    1023              :     /*
    1024              :      * Due to - harmless - race conditions during a checkpoint we could see
    1025              :      * values here that are older than the ones we already have in memory. We
    1026              :      * could also see older values for prepared transactions when the prepare
    1027              :      * is sent at a later point of time along with commit prepared and there
    1028              :      * are other transactions commits between prepare and commit prepared. See
    1029              :      * ReorderBufferFinishPrepared. Don't overwrite those.
    1030              :      */
    1031          241 :     if (go_backward || replication_state->remote_lsn < remote_commit)
    1032          234 :         replication_state->remote_lsn = remote_commit;
    1033          241 :     if (XLogRecPtrIsValid(local_commit) &&
    1034           38 :         (go_backward || replication_state->local_lsn < local_commit))
    1035           40 :         replication_state->local_lsn = local_commit;
    1036          241 :     LWLockRelease(&replication_state->lock);
    1037              : 
    1038              :     /*
    1039              :      * Release *after* changing the LSNs, slot isn't acquired and thus could
    1040              :      * otherwise be dropped anytime.
    1041              :      */
    1042          241 :     LWLockRelease(ReplicationOriginLock);
    1043              : }
    1044              : 
    1045              : 
    1046              : XLogRecPtr
    1047            8 : replorigin_get_progress(ReplOriginId node, bool flush)
    1048              : {
    1049              :     int         i;
    1050            8 :     XLogRecPtr  local_lsn = InvalidXLogRecPtr;
    1051            8 :     XLogRecPtr  remote_lsn = InvalidXLogRecPtr;
    1052              : 
    1053              :     /* prevent slots from being concurrently dropped */
    1054            8 :     LWLockAcquire(ReplicationOriginLock, LW_SHARED);
    1055              : 
    1056           38 :     for (i = 0; i < max_active_replication_origins; i++)
    1057              :     {
    1058              :         ReplicationState *state;
    1059              : 
    1060           35 :         state = &replication_states[i];
    1061              : 
    1062           35 :         if (state->roident == node)
    1063              :         {
    1064            5 :             LWLockAcquire(&state->lock, LW_SHARED);
    1065              : 
    1066            5 :             remote_lsn = state->remote_lsn;
    1067            5 :             local_lsn = state->local_lsn;
    1068              : 
    1069            5 :             LWLockRelease(&state->lock);
    1070              : 
    1071            5 :             break;
    1072              :         }
    1073              :     }
    1074              : 
    1075            8 :     LWLockRelease(ReplicationOriginLock);
    1076              : 
    1077            8 :     if (flush && XLogRecPtrIsValid(local_lsn))
    1078            1 :         XLogFlush(local_lsn);
    1079              : 
    1080            8 :     return remote_lsn;
    1081              : }
    1082              : 
    1083              : /* Helper function to reset the session replication origin */
    1084              : static void
    1085          490 : replorigin_session_reset_internal(void)
    1086              : {
    1087              :     ConditionVariable *cv;
    1088              : 
    1089              :     Assert(session_replication_state != NULL);
    1090              : 
    1091          490 :     LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
    1092              : 
    1093              :     /* The origin must be held by at least one process at this point. */
    1094              :     Assert(session_replication_state->refcount > 0);
    1095              : 
    1096              :     /*
    1097              :      * Reset the PID only if the current session is the first to set up this
    1098              :      * origin. This avoids clearing the first process's PID when any other
    1099              :      * session releases the origin.
    1100              :      */
    1101          490 :     if (session_replication_state->acquired_by == MyProcPid)
    1102          476 :         session_replication_state->acquired_by = 0;
    1103              : 
    1104          490 :     session_replication_state->refcount--;
    1105              : 
    1106          490 :     cv = &session_replication_state->origin_cv;
    1107          490 :     session_replication_state = NULL;
    1108              : 
    1109          490 :     LWLockRelease(ReplicationOriginLock);
    1110              : 
    1111          490 :     ConditionVariableBroadcast(cv);
    1112          490 : }
    1113              : 
    1114              : /*
    1115              :  * Tear down a (possibly) configured session replication origin during process
    1116              :  * exit.
    1117              :  */
    1118              : static void
    1119          486 : ReplicationOriginExitCleanup(int code, Datum arg)
    1120              : {
    1121          486 :     if (session_replication_state == NULL)
    1122          190 :         return;
    1123              : 
    1124          296 :     replorigin_session_reset_internal();
    1125              : }
    1126              : 
    1127              : /*
    1128              :  * Setup a replication origin in the shared memory struct if it doesn't
    1129              :  * already exist and cache access to the specific ReplicationSlot so the
    1130              :  * array doesn't have to be searched when calling
    1131              :  * replorigin_session_advance().
    1132              :  *
    1133              :  * Normally only one such cached origin can exist per process so the cached
    1134              :  * value can only be set again after the previous value is torn down with
    1135              :  * replorigin_session_reset(). For this normal case pass acquired_by = 0
    1136              :  * (meaning the slot is not allowed to be already acquired by another process).
    1137              :  *
    1138              :  * However, sometimes multiple processes can safely re-use the same origin slot
    1139              :  * (for example, multiple parallel apply processes can safely use the same
    1140              :  * origin, provided they maintain commit order by allowing only one process to
    1141              :  * commit at a time). For this case the first process must pass acquired_by =
    1142              :  * 0, and then the other processes sharing that same origin can pass
    1143              :  * acquired_by = PID of the first process.
    1144              :  */
    1145              : void
    1146          492 : replorigin_session_setup(ReplOriginId node, int acquired_by)
    1147              : {
    1148              :     static bool registered_cleanup;
    1149              :     int         i;
    1150          492 :     int         free_slot = -1;
    1151              : 
    1152          492 :     if (!registered_cleanup)
    1153              :     {
    1154          486 :         on_shmem_exit(ReplicationOriginExitCleanup, 0);
    1155          486 :         registered_cleanup = true;
    1156              :     }
    1157              : 
    1158              :     Assert(max_active_replication_origins > 0);
    1159              : 
    1160          492 :     if (session_replication_state != NULL)
    1161            1 :         ereport(ERROR,
    1162              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1163              :                  errmsg("cannot setup replication origin when one is already setup")));
    1164              : 
    1165              :     /* Lock exclusively, as we may have to create a new table entry. */
    1166          491 :     LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
    1167              : 
    1168              :     /*
    1169              :      * Search for either an existing slot for the origin, or a free one we can
    1170              :      * use.
    1171              :      */
    1172         1977 :     for (i = 0; i < max_active_replication_origins; i++)
    1173              :     {
    1174         1857 :         ReplicationState *curstate = &replication_states[i];
    1175              : 
    1176              :         /* remember where to insert if necessary */
    1177         1857 :         if (curstate->roident == InvalidReplOriginId &&
    1178              :             free_slot == -1)
    1179              :         {
    1180          122 :             free_slot = i;
    1181          122 :             continue;
    1182              :         }
    1183              : 
    1184              :         /* not our slot */
    1185         1735 :         if (curstate->roident != node)
    1186         1364 :             continue;
    1187              : 
    1188          371 :         else if (curstate->acquired_by != 0 && acquired_by == 0)
    1189              :         {
    1190            0 :             ereport(ERROR,
    1191              :                     (errcode(ERRCODE_OBJECT_IN_USE),
    1192              :                      errmsg("replication origin with ID %d is already active for PID %d",
    1193              :                             curstate->roident, curstate->acquired_by)));
    1194              :         }
    1195              : 
    1196          371 :         else if (curstate->acquired_by != acquired_by)
    1197              :         {
    1198            0 :             ereport(ERROR,
    1199              :                     (errcode(ERRCODE_OBJECT_IN_USE),
    1200              :                      errmsg("could not find replication state slot for replication origin with OID %u which was acquired by %d",
    1201              :                             node, acquired_by)));
    1202              :         }
    1203              : 
    1204              :         /*
    1205              :          * The origin is in use, but PID is not recorded. This can happen if
    1206              :          * the process that originally acquired the origin exited without
    1207              :          * releasing it. To ensure correctness, other processes cannot acquire
    1208              :          * the origin until all processes currently using it have released it.
    1209              :          */
    1210          371 :         else if (curstate->acquired_by == 0 && curstate->refcount > 0)
    1211            0 :             ereport(ERROR,
    1212              :                     (errcode(ERRCODE_OBJECT_IN_USE),
    1213              :                      errmsg("replication origin with ID %d is already active in another process",
    1214              :                             curstate->roident)));
    1215              : 
    1216              :         /* ok, found slot */
    1217          371 :         session_replication_state = curstate;
    1218          371 :         break;
    1219              :     }
    1220              : 
    1221              : 
    1222          491 :     if (session_replication_state == NULL && free_slot == -1)
    1223            0 :         ereport(ERROR,
    1224              :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
    1225              :                  errmsg("could not find free replication state slot for replication origin with ID %d",
    1226              :                         node),
    1227              :                  errhint("Increase \"max_active_replication_origins\" and try again.")));
    1228          491 :     else if (session_replication_state == NULL)
    1229              :     {
    1230          120 :         if (acquired_by)
    1231            1 :             ereport(ERROR,
    1232              :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1233              :                      errmsg("cannot use PID %d for inactive replication origin with ID %d",
    1234              :                             acquired_by, node)));
    1235              : 
    1236              :         /* initialize new slot */
    1237          119 :         session_replication_state = &replication_states[free_slot];
    1238              :         Assert(!XLogRecPtrIsValid(session_replication_state->remote_lsn));
    1239              :         Assert(!XLogRecPtrIsValid(session_replication_state->local_lsn));
    1240          119 :         session_replication_state->roident = node;
    1241              :     }
    1242              : 
    1243              : 
    1244              :     Assert(session_replication_state->roident != InvalidReplOriginId);
    1245              : 
    1246          490 :     if (acquired_by == 0)
    1247              :     {
    1248          476 :         session_replication_state->acquired_by = MyProcPid;
    1249              :         Assert(session_replication_state->refcount == 0);
    1250              :     }
    1251              :     else
    1252              :     {
    1253              :         /*
    1254              :          * Sanity check: the origin must already be acquired by the process
    1255              :          * passed as input, and at least one process must be using it.
    1256              :          */
    1257              :         Assert(session_replication_state->acquired_by == acquired_by);
    1258              :         Assert(session_replication_state->refcount > 0);
    1259              :     }
    1260              : 
    1261          490 :     session_replication_state->refcount++;
    1262              : 
    1263          490 :     LWLockRelease(ReplicationOriginLock);
    1264              : 
    1265              :     /* probably this one is pointless */
    1266          490 :     ConditionVariableBroadcast(&session_replication_state->origin_cv);
    1267          490 : }
    1268              : 
    1269              : /*
    1270              :  * Reset replay state previously setup in this session.
    1271              :  *
    1272              :  * This function may only be called if an origin was setup with
    1273              :  * replorigin_session_setup().
    1274              :  */
    1275              : void
    1276          196 : replorigin_session_reset(void)
    1277              : {
    1278              :     Assert(max_active_replication_origins != 0);
    1279              : 
    1280          196 :     if (session_replication_state == NULL)
    1281            1 :         ereport(ERROR,
    1282              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1283              :                  errmsg("no replication origin is configured")));
    1284              : 
    1285              :     /*
    1286              :      * Restrict explicit resetting of the replication origin if it was first
    1287              :      * acquired by this process and others are still using it. While the
    1288              :      * system handles this safely (as happens if the first session exits
    1289              :      * without calling reset), it is best to avoid doing so.
    1290              :      */
    1291          195 :     if (session_replication_state->acquired_by == MyProcPid &&
    1292          193 :         session_replication_state->refcount > 1)
    1293            1 :         ereport(ERROR,
    1294              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1295              :                  errmsg("cannot reset replication origin with ID %d because it is still in use by other processes",
    1296              :                         session_replication_state->roident),
    1297              :                  errdetail("This session is the first process for this replication origin, and other processes are currently sharing it."),
    1298              :                  errhint("Reset the replication origin in all other processes before retrying.")));
    1299              : 
    1300          194 :     replorigin_session_reset_internal();
    1301          194 : }
    1302              : 
    1303              : /*
    1304              :  * Do the same work replorigin_advance() does, just on the session's
    1305              :  * configured origin.
    1306              :  *
    1307              :  * This is noticeably cheaper than using replorigin_advance().
    1308              :  */
    1309              : void
    1310         1101 : replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit)
    1311              : {
    1312              :     Assert(session_replication_state != NULL);
    1313              :     Assert(session_replication_state->roident != InvalidReplOriginId);
    1314              : 
    1315         1101 :     LWLockAcquire(&session_replication_state->lock, LW_EXCLUSIVE);
    1316         1101 :     if (session_replication_state->local_lsn < local_commit)
    1317         1101 :         session_replication_state->local_lsn = local_commit;
    1318         1101 :     if (session_replication_state->remote_lsn < remote_commit)
    1319          517 :         session_replication_state->remote_lsn = remote_commit;
    1320         1101 :     LWLockRelease(&session_replication_state->lock);
    1321         1101 : }
    1322              : 
    1323              : /*
    1324              :  * Ask the machinery about the point up to which we successfully replayed
    1325              :  * changes from an already setup replication origin.
    1326              :  */
    1327              : XLogRecPtr
    1328          274 : replorigin_session_get_progress(bool flush)
    1329              : {
    1330              :     XLogRecPtr  remote_lsn;
    1331              :     XLogRecPtr  local_lsn;
    1332              : 
    1333              :     Assert(session_replication_state != NULL);
    1334              : 
    1335          274 :     LWLockAcquire(&session_replication_state->lock, LW_SHARED);
    1336          274 :     remote_lsn = session_replication_state->remote_lsn;
    1337          274 :     local_lsn = session_replication_state->local_lsn;
    1338          274 :     LWLockRelease(&session_replication_state->lock);
    1339              : 
    1340          274 :     if (flush && XLogRecPtrIsValid(local_lsn))
    1341            1 :         XLogFlush(local_lsn);
    1342              : 
    1343          274 :     return remote_lsn;
    1344              : }
    1345              : 
    1346              : /*
    1347              :  * Clear the per-transaction replication origin state.
    1348              :  *
    1349              :  * replorigin_session_origin is also cleared if clear_origin is set.
    1350              :  */
    1351              : void
    1352          770 : replorigin_xact_clear(bool clear_origin)
    1353              : {
    1354          770 :     replorigin_xact_state.origin_lsn = InvalidXLogRecPtr;
    1355          770 :     replorigin_xact_state.origin_timestamp = 0;
    1356          770 :     if (clear_origin)
    1357          770 :         replorigin_xact_state.origin = InvalidReplOriginId;
    1358          770 : }
    1359              : 
    1360              : 
    1361              : /* ---------------------------------------------------------------------------
    1362              :  * SQL functions for working with replication origin.
    1363              :  *
    1364              :  * These mostly should be fairly short wrappers around more generic functions.
    1365              :  * ---------------------------------------------------------------------------
    1366              :  */
    1367              : 
    1368              : /*
    1369              :  * Create replication origin for the passed in name, and return the assigned
    1370              :  * oid.
    1371              :  */
    1372              : Datum
    1373           14 : pg_replication_origin_create(PG_FUNCTION_ARGS)
    1374              : {
    1375              :     char       *name;
    1376              :     ReplOriginId roident;
    1377              : 
    1378           14 :     replorigin_check_prerequisites(false, false);
    1379              : 
    1380           14 :     name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
    1381              : 
    1382              :     /*
    1383              :      * Replication origins "any and "none" are reserved for system options.
    1384              :      * The origins "pg_xxx" are reserved for internal use.
    1385              :      */
    1386           14 :     if (IsReservedName(name) || IsReservedOriginName(name))
    1387            3 :         ereport(ERROR,
    1388              :                 (errcode(ERRCODE_RESERVED_NAME),
    1389              :                  errmsg("replication origin name \"%s\" is reserved",
    1390              :                         name),
    1391              :                  errdetail("Origin names \"%s\", \"%s\", and names starting with \"pg_\" are reserved.",
    1392              :                            LOGICALREP_ORIGIN_ANY, LOGICALREP_ORIGIN_NONE)));
    1393              : 
    1394              :     /*
    1395              :      * If built with appropriate switch, whine when regression-testing
    1396              :      * conventions for replication origin names are violated.
    1397              :      */
    1398              : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
    1399              :     if (strncmp(name, "regress_", 8) != 0)
    1400              :         elog(WARNING, "replication origins created by regression test cases should have names starting with \"regress_\"");
    1401              : #endif
    1402              : 
    1403           11 :     roident = replorigin_create(name);
    1404              : 
    1405            7 :     pfree(name);
    1406              : 
    1407            7 :     PG_RETURN_OID(roident);
    1408              : }
    1409              : 
    1410              : /*
    1411              :  * Drop replication origin.
    1412              :  */
    1413              : Datum
    1414            9 : pg_replication_origin_drop(PG_FUNCTION_ARGS)
    1415              : {
    1416              :     char       *name;
    1417              : 
    1418            9 :     replorigin_check_prerequisites(false, false);
    1419              : 
    1420            9 :     name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
    1421              : 
    1422            9 :     replorigin_drop_by_name(name, false, true);
    1423              : 
    1424            8 :     pfree(name);
    1425              : 
    1426            8 :     PG_RETURN_VOID();
    1427              : }
    1428              : 
    1429              : /*
    1430              :  * Return oid of a replication origin.
    1431              :  */
    1432              : Datum
    1433            0 : pg_replication_origin_oid(PG_FUNCTION_ARGS)
    1434              : {
    1435              :     char       *name;
    1436              :     ReplOriginId roident;
    1437              : 
    1438            0 :     replorigin_check_prerequisites(false, false);
    1439              : 
    1440            0 :     name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
    1441            0 :     roident = replorigin_by_name(name, true);
    1442              : 
    1443            0 :     pfree(name);
    1444              : 
    1445            0 :     if (OidIsValid(roident))
    1446            0 :         PG_RETURN_OID(roident);
    1447            0 :     PG_RETURN_NULL();
    1448              : }
    1449              : 
    1450              : /*
    1451              :  * Setup a replication origin for this session.
    1452              :  */
    1453              : Datum
    1454           11 : pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
    1455              : {
    1456              :     char       *name;
    1457              :     ReplOriginId origin;
    1458              :     int         pid;
    1459              : 
    1460           11 :     replorigin_check_prerequisites(true, false);
    1461              : 
    1462           11 :     name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
    1463           11 :     origin = replorigin_by_name(name, false);
    1464           10 :     pid = PG_GETARG_INT32(1);
    1465           10 :     replorigin_session_setup(origin, pid);
    1466              : 
    1467            8 :     replorigin_xact_state.origin = origin;
    1468              : 
    1469            8 :     pfree(name);
    1470              : 
    1471            8 :     PG_RETURN_VOID();
    1472              : }
    1473              : 
    1474              : /*
    1475              :  * Reset previously setup origin in this session
    1476              :  */
    1477              : Datum
    1478           10 : pg_replication_origin_session_reset(PG_FUNCTION_ARGS)
    1479              : {
    1480           10 :     replorigin_check_prerequisites(true, false);
    1481              : 
    1482           10 :     replorigin_session_reset();
    1483              : 
    1484            8 :     replorigin_xact_clear(true);
    1485              : 
    1486            8 :     PG_RETURN_VOID();
    1487              : }
    1488              : 
    1489              : /*
    1490              :  * Has a replication origin been setup for this session.
    1491              :  */
    1492              : Datum
    1493            4 : pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)
    1494              : {
    1495            4 :     replorigin_check_prerequisites(false, false);
    1496              : 
    1497            4 :     PG_RETURN_BOOL(replorigin_xact_state.origin != InvalidReplOriginId);
    1498              : }
    1499              : 
    1500              : 
    1501              : /*
    1502              :  * Return the replication progress for origin setup in the current session.
    1503              :  *
    1504              :  * If 'flush' is set to true it is ensured that the returned value corresponds
    1505              :  * to a local transaction that has been flushed. This is useful if asynchronous
    1506              :  * commits are used when replaying replicated transactions.
    1507              :  */
    1508              : Datum
    1509            2 : pg_replication_origin_session_progress(PG_FUNCTION_ARGS)
    1510              : {
    1511            2 :     XLogRecPtr  remote_lsn = InvalidXLogRecPtr;
    1512            2 :     bool        flush = PG_GETARG_BOOL(0);
    1513              : 
    1514            2 :     replorigin_check_prerequisites(true, false);
    1515              : 
    1516            2 :     if (session_replication_state == NULL)
    1517            0 :         ereport(ERROR,
    1518              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1519              :                  errmsg("no replication origin is configured")));
    1520              : 
    1521            2 :     remote_lsn = replorigin_session_get_progress(flush);
    1522              : 
    1523            2 :     if (!XLogRecPtrIsValid(remote_lsn))
    1524            0 :         PG_RETURN_NULL();
    1525              : 
    1526            2 :     PG_RETURN_LSN(remote_lsn);
    1527              : }
    1528              : 
    1529              : Datum
    1530            1 : pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)
    1531              : {
    1532            1 :     XLogRecPtr  location = PG_GETARG_LSN(0);
    1533              : 
    1534            1 :     replorigin_check_prerequisites(true, false);
    1535              : 
    1536            1 :     if (session_replication_state == NULL)
    1537            0 :         ereport(ERROR,
    1538              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1539              :                  errmsg("no replication origin is configured")));
    1540              : 
    1541            1 :     replorigin_xact_state.origin_lsn = location;
    1542            1 :     replorigin_xact_state.origin_timestamp = PG_GETARG_TIMESTAMPTZ(1);
    1543              : 
    1544            1 :     PG_RETURN_VOID();
    1545              : }
    1546              : 
    1547              : Datum
    1548            0 : pg_replication_origin_xact_reset(PG_FUNCTION_ARGS)
    1549              : {
    1550            0 :     replorigin_check_prerequisites(true, false);
    1551              : 
    1552              :     /* Do not clear the session origin */
    1553            0 :     replorigin_xact_clear(false);
    1554              : 
    1555            0 :     PG_RETURN_VOID();
    1556              : }
    1557              : 
    1558              : 
    1559              : Datum
    1560            3 : pg_replication_origin_advance(PG_FUNCTION_ARGS)
    1561              : {
    1562            3 :     text       *name = PG_GETARG_TEXT_PP(0);
    1563            3 :     XLogRecPtr  remote_commit = PG_GETARG_LSN(1);
    1564              :     ReplOriginId node;
    1565              : 
    1566            3 :     replorigin_check_prerequisites(true, false);
    1567              : 
    1568              :     /* lock to prevent the replication origin from vanishing */
    1569            3 :     LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
    1570              : 
    1571            3 :     node = replorigin_by_name(text_to_cstring(name), false);
    1572              : 
    1573              :     /*
    1574              :      * Can't sensibly pass a local commit to be flushed at checkpoint - this
    1575              :      * xact hasn't committed yet. This is why this function should be used to
    1576              :      * set up the initial replication state, but not for replay.
    1577              :      */
    1578            2 :     replorigin_advance(node, remote_commit, InvalidXLogRecPtr,
    1579              :                        true /* go backward */ , true /* WAL log */ );
    1580              : 
    1581            2 :     UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
    1582              : 
    1583            2 :     PG_RETURN_VOID();
    1584              : }
    1585              : 
    1586              : 
    1587              : /*
    1588              :  * Return the replication progress for an individual replication origin.
    1589              :  *
    1590              :  * If 'flush' is set to true it is ensured that the returned value corresponds
    1591              :  * to a local transaction that has been flushed. This is useful if asynchronous
    1592              :  * commits are used when replaying replicated transactions.
    1593              :  */
    1594              : Datum
    1595            3 : pg_replication_origin_progress(PG_FUNCTION_ARGS)
    1596              : {
    1597              :     char       *name;
    1598              :     bool        flush;
    1599              :     ReplOriginId roident;
    1600            3 :     XLogRecPtr  remote_lsn = InvalidXLogRecPtr;
    1601              : 
    1602            3 :     replorigin_check_prerequisites(true, true);
    1603              : 
    1604            3 :     name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
    1605            3 :     flush = PG_GETARG_BOOL(1);
    1606              : 
    1607            3 :     roident = replorigin_by_name(name, false);
    1608              :     Assert(OidIsValid(roident));
    1609              : 
    1610            2 :     remote_lsn = replorigin_get_progress(roident, flush);
    1611              : 
    1612            2 :     if (!XLogRecPtrIsValid(remote_lsn))
    1613            0 :         PG_RETURN_NULL();
    1614              : 
    1615            2 :     PG_RETURN_LSN(remote_lsn);
    1616              : }
    1617              : 
    1618              : 
    1619              : Datum
    1620           11 : pg_show_replication_origin_status(PG_FUNCTION_ARGS)
    1621              : {
    1622           11 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
    1623              :     int         i;
    1624              : #define REPLICATION_ORIGIN_PROGRESS_COLS 4
    1625              : 
    1626              :     /* we want to return 0 rows if slot is set to zero */
    1627           11 :     replorigin_check_prerequisites(false, true);
    1628              : 
    1629           11 :     InitMaterializedSRF(fcinfo, 0);
    1630              : 
    1631              :     /* prevent slots from being concurrently dropped */
    1632           11 :     LWLockAcquire(ReplicationOriginLock, LW_SHARED);
    1633              : 
    1634              :     /*
    1635              :      * Iterate through all possible replication_states, display if they are
    1636              :      * filled. Note that we do not take any locks, so slightly corrupted/out
    1637              :      * of date values are a possibility.
    1638              :      */
    1639          121 :     for (i = 0; i < max_active_replication_origins; i++)
    1640              :     {
    1641              :         ReplicationState *state;
    1642              :         Datum       values[REPLICATION_ORIGIN_PROGRESS_COLS];
    1643              :         bool        nulls[REPLICATION_ORIGIN_PROGRESS_COLS];
    1644              :         char       *roname;
    1645              : 
    1646          110 :         state = &replication_states[i];
    1647              : 
    1648              :         /* unused slot, nothing to display */
    1649          110 :         if (state->roident == InvalidReplOriginId)
    1650           97 :             continue;
    1651              : 
    1652           13 :         memset(values, 0, sizeof(values));
    1653           13 :         memset(nulls, 1, sizeof(nulls));
    1654              : 
    1655           13 :         values[0] = ObjectIdGetDatum(state->roident);
    1656           13 :         nulls[0] = false;
    1657              : 
    1658              :         /*
    1659              :          * We're not preventing the origin to be dropped concurrently, so
    1660              :          * silently accept that it might be gone.
    1661              :          */
    1662           13 :         if (replorigin_by_oid(state->roident, true,
    1663              :                               &roname))
    1664              :         {
    1665           13 :             values[1] = CStringGetTextDatum(roname);
    1666           13 :             nulls[1] = false;
    1667              :         }
    1668              : 
    1669           13 :         LWLockAcquire(&state->lock, LW_SHARED);
    1670              : 
    1671           13 :         values[2] = LSNGetDatum(state->remote_lsn);
    1672           13 :         nulls[2] = false;
    1673              : 
    1674           13 :         values[3] = LSNGetDatum(state->local_lsn);
    1675           13 :         nulls[3] = false;
    1676              : 
    1677           13 :         LWLockRelease(&state->lock);
    1678              : 
    1679           13 :         tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
    1680              :                              values, nulls);
    1681              :     }
    1682              : 
    1683           11 :     LWLockRelease(ReplicationOriginLock);
    1684              : 
    1685              : #undef REPLICATION_ORIGIN_PROGRESS_COLS
    1686              : 
    1687           11 :     return (Datum) 0;
    1688              : }
        

Generated by: LCOV version 2.0-1