LCOV - code coverage report
Current view: top level - src/backend/replication/logical - origin.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 87.8 % 466 409
Test Date: 2026-03-07 11:14:52 Functions: 93.9 % 33 31
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * origin.c
       4              :  *    Logical replication progress tracking support.
       5              :  *
       6              :  * Copyright (c) 2013-2026, PostgreSQL Global Development Group
       7              :  *
       8              :  * IDENTIFICATION
       9              :  *    src/backend/replication/logical/origin.c
      10              :  *
      11              :  * NOTES
      12              :  *
      13              :  * This file provides the following:
      14              :  * * An infrastructure to name nodes in a replication setup
      15              :  * * A facility to efficiently store and persist replication progress in an
      16              :  *   efficient and durable manner.
      17              :  *
      18              :  * Replication origin consists of a descriptive, user defined, external
      19              :  * name and a short, thus space efficient, internal 2 byte one. This split
      20              :  * exists because replication origin have to be stored in WAL and shared
      21              :  * memory and long descriptors would be inefficient.  For now only use 2 bytes
      22              :  * for the internal id of a replication origin as it seems unlikely that there
      23              :  * soon will be more than 65k nodes in one replication setup; and using only
      24              :  * two bytes allow us to be more space efficient.
      25              :  *
      26              :  * Replication progress is tracked in a shared memory table
      27              :  * (ReplicationState) that's dumped to disk every checkpoint. Entries
      28              :  * ('slots') in this table are identified by the internal id. That's the case
      29              :  * because it allows to increase replication progress during crash
      30              :  * recovery. To allow doing so we store the original LSN (from the originating
      31              :  * system) of a transaction in the commit record. That allows to recover the
      32              :  * precise replayed state after crash recovery; without requiring synchronous
      33              :  * commits. Allowing logical replication to use asynchronous commit is
      34              :  * generally good for performance, but especially important as it allows a
      35              :  * single threaded replay process to keep up with a source that has multiple
      36              :  * backends generating changes concurrently.  For efficiency and simplicity
      37              :  * reasons a backend can setup one replication origin that's from then used as
      38              :  * the source of changes produced by the backend, until reset again.
      39              :  *
      40              :  * This infrastructure is intended to be used in cooperation with logical
      41              :  * decoding. When replaying from a remote system the configured origin is
      42              :  * provided to output plugins, allowing prevention of replication loops and
      43              :  * other filtering.
      44              :  *
      45              :  * There are several levels of locking at work:
      46              :  *
      47              :  * * To create and drop replication origins an exclusive lock on
      48              :  *   pg_replication_slot is required for the duration. That allows us to
      49              :  *   safely and conflict free assign new origins using a dirty snapshot.
      50              :  *
      51              :  * * When creating an in-memory replication progress slot the ReplicationOrigin
      52              :  *   LWLock has to be held exclusively; when iterating over the replication
      53              :  *   progress a shared lock has to be held, the same when advancing the
      54              :  *   replication progress of an individual backend that has not setup as the
      55              :  *   session's replication origin.
      56              :  *
      57              :  * * When manipulating or looking at the remote_lsn and local_lsn fields of a
      58              :  *   replication progress slot that slot's lwlock has to be held. That's
      59              :  *   primarily because we do not assume 8 byte writes (the LSN) is atomic on
      60              :  *   all our platforms, but it also simplifies memory ordering concerns
      61              :  *   between the remote and local lsn. We use a lwlock instead of a spinlock
      62              :  *   so it's less harmful to hold the lock over a WAL write
      63              :  *   (cf. AdvanceReplicationProgress).
      64              :  *
      65              :  * ---------------------------------------------------------------------------
      66              :  */
      67              : 
      68              : #include "postgres.h"
      69              : 
      70              : #include <unistd.h>
      71              : #include <sys/stat.h>
      72              : 
      73              : #include "access/genam.h"
      74              : #include "access/htup_details.h"
      75              : #include "access/table.h"
      76              : #include "access/xact.h"
      77              : #include "access/xloginsert.h"
      78              : #include "catalog/catalog.h"
      79              : #include "catalog/indexing.h"
      80              : #include "catalog/pg_subscription.h"
      81              : #include "funcapi.h"
      82              : #include "miscadmin.h"
      83              : #include "nodes/execnodes.h"
      84              : #include "pgstat.h"
      85              : #include "replication/origin.h"
      86              : #include "replication/slot.h"
      87              : #include "storage/condition_variable.h"
      88              : #include "storage/fd.h"
      89              : #include "storage/ipc.h"
      90              : #include "storage/lmgr.h"
      91              : #include "utils/builtins.h"
      92              : #include "utils/fmgroids.h"
      93              : #include "utils/guc.h"
      94              : #include "utils/pg_lsn.h"
      95              : #include "utils/rel.h"
      96              : #include "utils/snapmgr.h"
      97              : #include "utils/syscache.h"
      98              : #include "utils/wait_event.h"
      99              : 
     100              : /* paths for replication origin checkpoint files */
     101              : #define PG_REPLORIGIN_CHECKPOINT_FILENAME PG_LOGICAL_DIR "/replorigin_checkpoint"
     102              : #define PG_REPLORIGIN_CHECKPOINT_TMPFILE PG_REPLORIGIN_CHECKPOINT_FILENAME ".tmp"
     103              : 
     104              : /* GUC variables */
     105              : int         max_active_replication_origins = 10;
     106              : 
     107              : /*
     108              :  * Replay progress of a single remote node.
     109              :  */
     110              : typedef struct ReplicationState
     111              : {
     112              :     /*
     113              :      * Local identifier for the remote node.
     114              :      */
     115              :     ReplOriginId roident;
     116              : 
     117              :     /*
     118              :      * Location of the latest commit from the remote side.
     119              :      */
     120              :     XLogRecPtr  remote_lsn;
     121              : 
     122              :     /*
     123              :      * Remember the local lsn of the commit record so we can XLogFlush() to it
     124              :      * during a checkpoint so we know the commit record actually is safe on
     125              :      * disk.
     126              :      */
     127              :     XLogRecPtr  local_lsn;
     128              : 
     129              :     /*
     130              :      * PID of backend that's acquired slot, or 0 if none.
     131              :      */
     132              :     int         acquired_by;
     133              : 
     134              :     /* Count of processes that are currently using this origin. */
     135              :     int         refcount;
     136              : 
     137              :     /*
     138              :      * Condition variable that's signaled when acquired_by changes.
     139              :      */
     140              :     ConditionVariable origin_cv;
     141              : 
     142              :     /*
     143              :      * Lock protecting remote_lsn and local_lsn.
     144              :      */
     145              :     LWLock      lock;
     146              : } ReplicationState;
     147              : 
     148              : /*
     149              :  * On disk version of ReplicationState.
     150              :  */
     151              : typedef struct ReplicationStateOnDisk
     152              : {
     153              :     ReplOriginId roident;
     154              :     XLogRecPtr  remote_lsn;
     155              : } ReplicationStateOnDisk;
     156              : 
     157              : 
     158              : typedef struct ReplicationStateCtl
     159              : {
     160              :     /* Tranche to use for per-origin LWLocks */
     161              :     int         tranche_id;
     162              :     /* Array of length max_active_replication_origins */
     163              :     ReplicationState states[FLEXIBLE_ARRAY_MEMBER];
     164              : } ReplicationStateCtl;
     165              : 
     166              : /* Global variable for per-transaction replication origin state */
     167              : ReplOriginXactState replorigin_xact_state = {
     168              :     .origin = InvalidReplOriginId,  /* assumed identity */
     169              :     .origin_lsn = InvalidXLogRecPtr,
     170              :     .origin_timestamp = 0
     171              : };
     172              : 
     173              : /*
     174              :  * Base address into a shared memory array of replication states of size
     175              :  * max_active_replication_origins.
     176              :  */
     177              : static ReplicationState *replication_states;
     178              : 
     179              : /*
     180              :  * Actual shared memory block (replication_states[] is now part of this).
     181              :  */
     182              : static ReplicationStateCtl *replication_states_ctl;
     183              : 
     184              : /*
     185              :  * We keep a pointer to this backend's ReplicationState to avoid having to
     186              :  * search the replication_states array in replorigin_session_advance for each
     187              :  * remote commit.  (Ownership of a backend's own entry can only be changed by
     188              :  * that backend.)
     189              :  */
     190              : static ReplicationState *session_replication_state = NULL;
     191              : 
     192              : /* Magic for on disk files. */
     193              : #define REPLICATION_STATE_MAGIC ((uint32) 0x1257DADE)
     194              : 
     195              : static void
     196           68 : replorigin_check_prerequisites(bool check_origins, bool recoveryOK)
     197              : {
     198           68 :     if (check_origins && max_active_replication_origins == 0)
     199            0 :         ereport(ERROR,
     200              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     201              :                  errmsg("cannot query or manipulate replication origin when \"max_active_replication_origins\" is 0")));
     202              : 
     203           68 :     if (!recoveryOK && RecoveryInProgress())
     204            0 :         ereport(ERROR,
     205              :                 (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
     206              :                  errmsg("cannot manipulate replication origins during recovery")));
     207           68 : }
     208              : 
     209              : 
     210              : /*
     211              :  * IsReservedOriginName
     212              :  *      True iff name is either "none" or "any".
     213              :  */
     214              : static bool
     215           13 : IsReservedOriginName(const char *name)
     216              : {
     217           25 :     return ((pg_strcasecmp(name, LOGICALREP_ORIGIN_NONE) == 0) ||
     218           12 :             (pg_strcasecmp(name, LOGICALREP_ORIGIN_ANY) == 0));
     219              : }
     220              : 
     221              : /* ---------------------------------------------------------------------------
     222              :  * Functions for working with replication origins themselves.
     223              :  * ---------------------------------------------------------------------------
     224              :  */
     225              : 
     226              : /*
     227              :  * Check for a persistent replication origin identified by name.
     228              :  *
     229              :  * Returns InvalidOid if the node isn't known yet and missing_ok is true.
     230              :  */
     231              : ReplOriginId
     232         1064 : replorigin_by_name(const char *roname, bool missing_ok)
     233              : {
     234              :     Form_pg_replication_origin ident;
     235         1064 :     Oid         roident = InvalidOid;
     236              :     HeapTuple   tuple;
     237              :     Datum       roname_d;
     238              : 
     239         1064 :     roname_d = CStringGetTextDatum(roname);
     240              : 
     241         1064 :     tuple = SearchSysCache1(REPLORIGNAME, roname_d);
     242         1064 :     if (HeapTupleIsValid(tuple))
     243              :     {
     244          667 :         ident = (Form_pg_replication_origin) GETSTRUCT(tuple);
     245          667 :         roident = ident->roident;
     246          667 :         ReleaseSysCache(tuple);
     247              :     }
     248          397 :     else if (!missing_ok)
     249            4 :         ereport(ERROR,
     250              :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
     251              :                  errmsg("replication origin \"%s\" does not exist",
     252              :                         roname)));
     253              : 
     254         1060 :     return roident;
     255              : }
     256              : 
     257              : /*
     258              :  * Create a replication origin.
     259              :  *
     260              :  * Needs to be called in a transaction.
     261              :  */
     262              : ReplOriginId
     263          389 : replorigin_create(const char *roname)
     264              : {
     265              :     Oid         roident;
     266          389 :     HeapTuple   tuple = NULL;
     267              :     Relation    rel;
     268              :     Datum       roname_d;
     269              :     SnapshotData SnapshotDirty;
     270              :     SysScanDesc scan;
     271              :     ScanKeyData key;
     272              : 
     273              :     /*
     274              :      * To avoid needing a TOAST table for pg_replication_origin, we limit
     275              :      * replication origin names to 512 bytes.  This should be more than enough
     276              :      * for all practical use.
     277              :      */
     278          389 :     if (strlen(roname) > MAX_RONAME_LEN)
     279            3 :         ereport(ERROR,
     280              :                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
     281              :                  errmsg("replication origin name is too long"),
     282              :                  errdetail("Replication origin names must be no longer than %d bytes.",
     283              :                            MAX_RONAME_LEN)));
     284              : 
     285          386 :     roname_d = CStringGetTextDatum(roname);
     286              : 
     287              :     Assert(IsTransactionState());
     288              : 
     289              :     /*
     290              :      * We need the numeric replication origin to be 16bit wide, so we cannot
     291              :      * rely on the normal oid allocation. Instead we simply scan
     292              :      * pg_replication_origin for the first unused id. That's not particularly
     293              :      * efficient, but this should be a fairly infrequent operation - we can
     294              :      * easily spend a bit more code on this when it turns out it needs to be
     295              :      * faster.
     296              :      *
     297              :      * We handle concurrency by taking an exclusive lock (allowing reads!)
     298              :      * over the table for the duration of the search. Because we use a "dirty
     299              :      * snapshot" we can read rows that other in-progress sessions have
     300              :      * written, even though they would be invisible with normal snapshots. Due
     301              :      * to the exclusive lock there's no danger that new rows can appear while
     302              :      * we're checking.
     303              :      */
     304          386 :     InitDirtySnapshot(SnapshotDirty);
     305              : 
     306          386 :     rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
     307              : 
     308              :     /*
     309              :      * We want to be able to access pg_replication_origin without setting up a
     310              :      * snapshot.  To make that safe, it needs to not have a TOAST table, since
     311              :      * TOASTed data cannot be fetched without a snapshot.  As of this writing,
     312              :      * its only varlena column is roname, which we limit to 512 bytes to avoid
     313              :      * needing out-of-line storage.  If you add a TOAST table to this catalog,
     314              :      * be sure to set up a snapshot everywhere it might be needed.  For more
     315              :      * information, see https://postgr.es/m/ZvMSUPOqUU-VNADN%40nathan.
     316              :      */
     317              :     Assert(!OidIsValid(rel->rd_rel->reltoastrelid));
     318              : 
     319          696 :     for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
     320              :     {
     321              :         bool        nulls[Natts_pg_replication_origin];
     322              :         Datum       values[Natts_pg_replication_origin];
     323              :         bool        collides;
     324              : 
     325          696 :         CHECK_FOR_INTERRUPTS();
     326              : 
     327          696 :         ScanKeyInit(&key,
     328              :                     Anum_pg_replication_origin_roident,
     329              :                     BTEqualStrategyNumber, F_OIDEQ,
     330              :                     ObjectIdGetDatum(roident));
     331              : 
     332          696 :         scan = systable_beginscan(rel, ReplicationOriginIdentIndex,
     333              :                                   true /* indexOK */ ,
     334              :                                   &SnapshotDirty,
     335              :                                   1, &key);
     336              : 
     337          696 :         collides = HeapTupleIsValid(systable_getnext(scan));
     338              : 
     339          696 :         systable_endscan(scan);
     340              : 
     341          696 :         if (!collides)
     342              :         {
     343              :             /*
     344              :              * Ok, found an unused roident, insert the new row and do a CCI,
     345              :              * so our callers can look it up if they want to.
     346              :              */
     347          386 :             memset(&nulls, 0, sizeof(nulls));
     348              : 
     349          386 :             values[Anum_pg_replication_origin_roident - 1] = ObjectIdGetDatum(roident);
     350          386 :             values[Anum_pg_replication_origin_roname - 1] = roname_d;
     351              : 
     352          386 :             tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
     353          386 :             CatalogTupleInsert(rel, tuple);
     354          385 :             CommandCounterIncrement();
     355          385 :             break;
     356              :         }
     357              :     }
     358              : 
     359              :     /* now release lock again,  */
     360          385 :     table_close(rel, ExclusiveLock);
     361              : 
     362          385 :     if (tuple == NULL)
     363            0 :         ereport(ERROR,
     364              :                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
     365              :                  errmsg("could not find free replication origin ID")));
     366              : 
     367          385 :     heap_freetuple(tuple);
     368          385 :     return roident;
     369              : }
     370              : 
     371              : /*
     372              :  * Helper function to drop a replication origin.
     373              :  */
     374              : static void
     375          333 : replorigin_state_clear(ReplOriginId roident, bool nowait)
     376              : {
     377              :     int         i;
     378              : 
     379              :     /*
     380              :      * Clean up the slot state info, if there is any matching slot.
     381              :      */
     382          333 : restart:
     383          333 :     LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
     384              : 
     385         1129 :     for (i = 0; i < max_active_replication_origins; i++)
     386              :     {
     387         1076 :         ReplicationState *state = &replication_states[i];
     388              : 
     389         1076 :         if (state->roident == roident)
     390              :         {
     391              :             /* found our slot, is it busy? */
     392          280 :             if (state->refcount > 0)
     393              :             {
     394              :                 ConditionVariable *cv;
     395              : 
     396            0 :                 if (nowait)
     397            0 :                     ereport(ERROR,
     398              :                             (errcode(ERRCODE_OBJECT_IN_USE),
     399              :                              (state->acquired_by != 0)
     400              :                              ? errmsg("could not drop replication origin with ID %d, in use by PID %d",
     401              :                                       state->roident,
     402              :                                       state->acquired_by)
     403              :                              : errmsg("could not drop replication origin with ID %d, in use by another process",
     404              :                                       state->roident)));
     405              : 
     406              :                 /*
     407              :                  * We must wait and then retry.  Since we don't know which CV
     408              :                  * to wait on until here, we can't readily use
     409              :                  * ConditionVariablePrepareToSleep (calling it here would be
     410              :                  * wrong, since we could miss the signal if we did so); just
     411              :                  * use ConditionVariableSleep directly.
     412              :                  */
     413            0 :                 cv = &state->origin_cv;
     414              : 
     415            0 :                 LWLockRelease(ReplicationOriginLock);
     416              : 
     417            0 :                 ConditionVariableSleep(cv, WAIT_EVENT_REPLICATION_ORIGIN_DROP);
     418            0 :                 goto restart;
     419              :             }
     420              : 
     421              :             /* first make a WAL log entry */
     422              :             {
     423              :                 xl_replorigin_drop xlrec;
     424              : 
     425          280 :                 xlrec.node_id = roident;
     426          280 :                 XLogBeginInsert();
     427          280 :                 XLogRegisterData(&xlrec, sizeof(xlrec));
     428          280 :                 XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_DROP);
     429              :             }
     430              : 
     431              :             /* then clear the in-memory slot */
     432          280 :             state->roident = InvalidReplOriginId;
     433          280 :             state->remote_lsn = InvalidXLogRecPtr;
     434          280 :             state->local_lsn = InvalidXLogRecPtr;
     435          280 :             break;
     436              :         }
     437              :     }
     438          333 :     LWLockRelease(ReplicationOriginLock);
     439          333 :     ConditionVariableCancelSleep();
     440          333 : }
     441              : 
     442              : /*
     443              :  * Drop replication origin (by name).
     444              :  *
     445              :  * Needs to be called in a transaction.
     446              :  */
     447              : void
     448          528 : replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
     449              : {
     450              :     ReplOriginId roident;
     451              :     Relation    rel;
     452              :     HeapTuple   tuple;
     453              : 
     454              :     Assert(IsTransactionState());
     455              : 
     456          528 :     rel = table_open(ReplicationOriginRelationId, RowExclusiveLock);
     457              : 
     458          528 :     roident = replorigin_by_name(name, missing_ok);
     459              : 
     460              :     /* Lock the origin to prevent concurrent drops. */
     461          527 :     LockSharedObject(ReplicationOriginRelationId, roident, 0,
     462              :                      AccessExclusiveLock);
     463              : 
     464          527 :     tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident));
     465          527 :     if (!HeapTupleIsValid(tuple))
     466              :     {
     467          194 :         if (!missing_ok)
     468            0 :             elog(ERROR, "cache lookup failed for replication origin with ID %d",
     469              :                  roident);
     470              : 
     471              :         /*
     472              :          * We don't need to retain the locks if the origin is already dropped.
     473              :          */
     474          194 :         UnlockSharedObject(ReplicationOriginRelationId, roident, 0,
     475              :                            AccessExclusiveLock);
     476          194 :         table_close(rel, RowExclusiveLock);
     477          194 :         return;
     478              :     }
     479              : 
     480          333 :     replorigin_state_clear(roident, nowait);
     481              : 
     482              :     /*
     483              :      * Now, we can delete the catalog entry.
     484              :      */
     485          333 :     CatalogTupleDelete(rel, &tuple->t_self);
     486          333 :     ReleaseSysCache(tuple);
     487              : 
     488          333 :     CommandCounterIncrement();
     489              : 
     490              :     /* We keep the lock on pg_replication_origin until commit */
     491          333 :     table_close(rel, NoLock);
     492              : }
     493              : 
     494              : /*
     495              :  * Lookup replication origin via its oid and return the name.
     496              :  *
     497              :  * The external name is palloc'd in the calling context.
     498              :  *
     499              :  * Returns true if the origin is known, false otherwise.
     500              :  */
     501              : bool
     502           28 : replorigin_by_oid(ReplOriginId roident, bool missing_ok, char **roname)
     503              : {
     504              :     HeapTuple   tuple;
     505              :     Form_pg_replication_origin ric;
     506              : 
     507              :     Assert(OidIsValid((Oid) roident));
     508              :     Assert(roident != InvalidReplOriginId);
     509              :     Assert(roident != DoNotReplicateId);
     510              : 
     511           28 :     tuple = SearchSysCache1(REPLORIGIDENT,
     512              :                             ObjectIdGetDatum((Oid) roident));
     513              : 
     514           28 :     if (HeapTupleIsValid(tuple))
     515              :     {
     516           25 :         ric = (Form_pg_replication_origin) GETSTRUCT(tuple);
     517           25 :         *roname = text_to_cstring(&ric->roname);
     518           25 :         ReleaseSysCache(tuple);
     519              : 
     520           25 :         return true;
     521              :     }
     522              :     else
     523              :     {
     524            3 :         *roname = NULL;
     525              : 
     526            3 :         if (!missing_ok)
     527            0 :             ereport(ERROR,
     528              :                     (errcode(ERRCODE_UNDEFINED_OBJECT),
     529              :                      errmsg("replication origin with ID %d does not exist",
     530              :                             roident)));
     531              : 
     532            3 :         return false;
     533              :     }
     534              : }
     535              : 
     536              : 
     537              : /* ---------------------------------------------------------------------------
     538              :  * Functions for handling replication progress.
     539              :  * ---------------------------------------------------------------------------
     540              :  */
     541              : 
     542              : Size
     543         4469 : ReplicationOriginShmemSize(void)
     544              : {
     545         4469 :     Size        size = 0;
     546              : 
     547         4469 :     if (max_active_replication_origins == 0)
     548            2 :         return size;
     549              : 
     550         4467 :     size = add_size(size, offsetof(ReplicationStateCtl, states));
     551              : 
     552         4467 :     size = add_size(size,
     553              :                     mul_size(max_active_replication_origins, sizeof(ReplicationState)));
     554         4467 :     return size;
     555              : }
     556              : 
     557              : void
     558         1156 : ReplicationOriginShmemInit(void)
     559              : {
     560              :     bool        found;
     561              : 
     562         1156 :     if (max_active_replication_origins == 0)
     563            1 :         return;
     564              : 
     565         1155 :     replication_states_ctl = (ReplicationStateCtl *)
     566         1155 :         ShmemInitStruct("ReplicationOriginState",
     567              :                         ReplicationOriginShmemSize(),
     568              :                         &found);
     569         1155 :     replication_states = replication_states_ctl->states;
     570              : 
     571         1155 :     if (!found)
     572              :     {
     573              :         int         i;
     574              : 
     575        94638 :         MemSet(replication_states_ctl, 0, ReplicationOriginShmemSize());
     576              : 
     577         1155 :         replication_states_ctl->tranche_id = LWTRANCHE_REPLICATION_ORIGIN_STATE;
     578              : 
     579        12696 :         for (i = 0; i < max_active_replication_origins; i++)
     580              :         {
     581        11541 :             LWLockInitialize(&replication_states[i].lock,
     582        11541 :                              replication_states_ctl->tranche_id);
     583        11541 :             ConditionVariableInit(&replication_states[i].origin_cv);
     584              :         }
     585              :     }
     586              : }
     587              : 
     588              : /* ---------------------------------------------------------------------------
     589              :  * Perform a checkpoint of each replication origin's progress with respect to
     590              :  * the replayed remote_lsn. Make sure that all transactions we refer to in the
     591              :  * checkpoint (local_lsn) are actually on-disk. This might not yet be the case
     592              :  * if the transactions were originally committed asynchronously.
     593              :  *
     594              :  * We store checkpoints in the following format:
     595              :  * +-------+------------------------+------------------+-----+--------+
     596              :  * | MAGIC | ReplicationStateOnDisk | struct Replic... | ... | CRC32C | EOF
     597              :  * +-------+------------------------+------------------+-----+--------+
     598              :  *
     599              :  * So its just the magic, followed by the statically sized
     600              :  * ReplicationStateOnDisk structs. Note that the maximum number of
     601              :  * ReplicationState is determined by max_active_replication_origins.
     602              :  * ---------------------------------------------------------------------------
     603              :  */
     604              : void
     605         1808 : CheckPointReplicationOrigin(void)
     606              : {
     607         1808 :     const char *tmppath = PG_REPLORIGIN_CHECKPOINT_TMPFILE;
     608         1808 :     const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
     609              :     int         tmpfd;
     610              :     int         i;
     611         1808 :     uint32      magic = REPLICATION_STATE_MAGIC;
     612              :     pg_crc32c   crc;
     613              : 
     614         1808 :     if (max_active_replication_origins == 0)
     615            1 :         return;
     616              : 
     617         1807 :     INIT_CRC32C(crc);
     618              : 
     619              :     /* make sure no old temp file is remaining */
     620         1807 :     if (unlink(tmppath) < 0 && errno != ENOENT)
     621            0 :         ereport(PANIC,
     622              :                 (errcode_for_file_access(),
     623              :                  errmsg("could not remove file \"%s\": %m",
     624              :                         tmppath)));
     625              : 
     626              :     /*
     627              :      * no other backend can perform this at the same time; only one checkpoint
     628              :      * can happen at a time.
     629              :      */
     630         1807 :     tmpfd = OpenTransientFile(tmppath,
     631              :                               O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
     632         1807 :     if (tmpfd < 0)
     633            0 :         ereport(PANIC,
     634              :                 (errcode_for_file_access(),
     635              :                  errmsg("could not create file \"%s\": %m",
     636              :                         tmppath)));
     637              : 
     638              :     /* write magic */
     639         1807 :     errno = 0;
     640         1807 :     if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
     641              :     {
     642              :         /* if write didn't set errno, assume problem is no disk space */
     643            0 :         if (errno == 0)
     644            0 :             errno = ENOSPC;
     645            0 :         ereport(PANIC,
     646              :                 (errcode_for_file_access(),
     647              :                  errmsg("could not write to file \"%s\": %m",
     648              :                         tmppath)));
     649              :     }
     650         1807 :     COMP_CRC32C(crc, &magic, sizeof(magic));
     651              : 
     652              :     /* prevent concurrent creations/drops */
     653         1807 :     LWLockAcquire(ReplicationOriginLock, LW_SHARED);
     654              : 
     655              :     /* write actual data */
     656        19877 :     for (i = 0; i < max_active_replication_origins; i++)
     657              :     {
     658              :         ReplicationStateOnDisk disk_state;
     659        18070 :         ReplicationState *curstate = &replication_states[i];
     660              :         XLogRecPtr  local_lsn;
     661              : 
     662        18070 :         if (curstate->roident == InvalidReplOriginId)
     663        18017 :             continue;
     664              : 
     665              :         /* zero, to avoid uninitialized padding bytes */
     666           53 :         memset(&disk_state, 0, sizeof(disk_state));
     667              : 
     668           53 :         LWLockAcquire(&curstate->lock, LW_SHARED);
     669              : 
     670           53 :         disk_state.roident = curstate->roident;
     671              : 
     672           53 :         disk_state.remote_lsn = curstate->remote_lsn;
     673           53 :         local_lsn = curstate->local_lsn;
     674              : 
     675           53 :         LWLockRelease(&curstate->lock);
     676              : 
     677              :         /* make sure we only write out a commit that's persistent */
     678           53 :         XLogFlush(local_lsn);
     679              : 
     680           53 :         errno = 0;
     681           53 :         if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
     682              :             sizeof(disk_state))
     683              :         {
     684              :             /* if write didn't set errno, assume problem is no disk space */
     685            0 :             if (errno == 0)
     686            0 :                 errno = ENOSPC;
     687            0 :             ereport(PANIC,
     688              :                     (errcode_for_file_access(),
     689              :                      errmsg("could not write to file \"%s\": %m",
     690              :                             tmppath)));
     691              :         }
     692              : 
     693           53 :         COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
     694              :     }
     695              : 
     696         1807 :     LWLockRelease(ReplicationOriginLock);
     697              : 
     698              :     /* write out the CRC */
     699         1807 :     FIN_CRC32C(crc);
     700         1807 :     errno = 0;
     701         1807 :     if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
     702              :     {
     703              :         /* if write didn't set errno, assume problem is no disk space */
     704            0 :         if (errno == 0)
     705            0 :             errno = ENOSPC;
     706            0 :         ereport(PANIC,
     707              :                 (errcode_for_file_access(),
     708              :                  errmsg("could not write to file \"%s\": %m",
     709              :                         tmppath)));
     710              :     }
     711              : 
     712         1807 :     if (CloseTransientFile(tmpfd) != 0)
     713            0 :         ereport(PANIC,
     714              :                 (errcode_for_file_access(),
     715              :                  errmsg("could not close file \"%s\": %m",
     716              :                         tmppath)));
     717              : 
     718              :     /* fsync, rename to permanent file, fsync file and directory */
     719         1807 :     durable_rename(tmppath, path, PANIC);
     720              : }
     721              : 
     722              : /*
     723              :  * Recover replication replay status from checkpoint data saved earlier by
     724              :  * CheckPointReplicationOrigin.
     725              :  *
     726              :  * This only needs to be called at startup and *not* during every checkpoint
     727              :  * read during recovery (e.g. in HS or PITR from a base backup) afterwards. All
     728              :  * state thereafter can be recovered by looking at commit records.
     729              :  */
     730              : void
     731         1008 : StartupReplicationOrigin(void)
     732              : {
     733         1008 :     const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
     734              :     int         fd;
     735              :     int         readBytes;
     736         1008 :     uint32      magic = REPLICATION_STATE_MAGIC;
     737         1008 :     int         last_state = 0;
     738              :     pg_crc32c   file_crc;
     739              :     pg_crc32c   crc;
     740              : 
     741              :     /* don't want to overwrite already existing state */
     742              : #ifdef USE_ASSERT_CHECKING
     743              :     static bool already_started = false;
     744              : 
     745              :     Assert(!already_started);
     746              :     already_started = true;
     747              : #endif
     748              : 
     749         1008 :     if (max_active_replication_origins == 0)
     750           52 :         return;
     751              : 
     752         1007 :     INIT_CRC32C(crc);
     753              : 
     754         1007 :     elog(DEBUG2, "starting up replication origin progress state");
     755              : 
     756         1007 :     fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
     757              : 
     758              :     /*
     759              :      * might have had max_active_replication_origins == 0 last run, or we just
     760              :      * brought up a standby.
     761              :      */
     762         1007 :     if (fd < 0 && errno == ENOENT)
     763           51 :         return;
     764          956 :     else if (fd < 0)
     765            0 :         ereport(PANIC,
     766              :                 (errcode_for_file_access(),
     767              :                  errmsg("could not open file \"%s\": %m",
     768              :                         path)));
     769              : 
     770              :     /* verify magic, that is written even if nothing was active */
     771          956 :     readBytes = read(fd, &magic, sizeof(magic));
     772          956 :     if (readBytes != sizeof(magic))
     773              :     {
     774            0 :         if (readBytes < 0)
     775            0 :             ereport(PANIC,
     776              :                     (errcode_for_file_access(),
     777              :                      errmsg("could not read file \"%s\": %m",
     778              :                             path)));
     779              :         else
     780            0 :             ereport(PANIC,
     781              :                     (errcode(ERRCODE_DATA_CORRUPTED),
     782              :                      errmsg("could not read file \"%s\": read %d of %zu",
     783              :                             path, readBytes, sizeof(magic))));
     784              :     }
     785          956 :     COMP_CRC32C(crc, &magic, sizeof(magic));
     786              : 
     787          956 :     if (magic != REPLICATION_STATE_MAGIC)
     788            0 :         ereport(PANIC,
     789              :                 (errmsg("replication checkpoint has wrong magic %u instead of %u",
     790              :                         magic, REPLICATION_STATE_MAGIC)));
     791              : 
     792              :     /* we can skip locking here, no other access is possible */
     793              : 
     794              :     /* recover individual states, until there are no more to be found */
     795              :     while (true)
     796           31 :     {
     797              :         ReplicationStateOnDisk disk_state;
     798              : 
     799          987 :         readBytes = read(fd, &disk_state, sizeof(disk_state));
     800              : 
     801          987 :         if (readBytes < 0)
     802              :         {
     803            0 :             ereport(PANIC,
     804              :                     (errcode_for_file_access(),
     805              :                      errmsg("could not read file \"%s\": %m",
     806              :                             path)));
     807              :         }
     808              : 
     809              :         /* no further data */
     810          987 :         if (readBytes == sizeof(crc))
     811              :         {
     812          956 :             memcpy(&file_crc, &disk_state, sizeof(file_crc));
     813          956 :             break;
     814              :         }
     815              : 
     816           31 :         if (readBytes != sizeof(disk_state))
     817              :         {
     818            0 :             ereport(PANIC,
     819              :                     (errcode_for_file_access(),
     820              :                      errmsg("could not read file \"%s\": read %d of %zu",
     821              :                             path, readBytes, sizeof(disk_state))));
     822              :         }
     823              : 
     824           31 :         COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
     825              : 
     826           31 :         if (last_state == max_active_replication_origins)
     827            0 :             ereport(PANIC,
     828              :                     (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     829              :                      errmsg("could not find free replication state, increase \"max_active_replication_origins\"")));
     830              : 
     831              :         /* copy data to shared memory */
     832           31 :         replication_states[last_state].roident = disk_state.roident;
     833           31 :         replication_states[last_state].remote_lsn = disk_state.remote_lsn;
     834           31 :         last_state++;
     835              : 
     836           31 :         ereport(LOG,
     837              :                 errmsg("recovered replication state of node %d to %X/%08X",
     838              :                        disk_state.roident,
     839              :                        LSN_FORMAT_ARGS(disk_state.remote_lsn)));
     840              :     }
     841              : 
     842              :     /* now check checksum */
     843          956 :     FIN_CRC32C(crc);
     844          956 :     if (file_crc != crc)
     845            0 :         ereport(PANIC,
     846              :                 (errcode(ERRCODE_DATA_CORRUPTED),
     847              :                  errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
     848              :                         crc, file_crc)));
     849              : 
     850          956 :     if (CloseTransientFile(fd) != 0)
     851            0 :         ereport(PANIC,
     852              :                 (errcode_for_file_access(),
     853              :                  errmsg("could not close file \"%s\": %m",
     854              :                         path)));
     855              : }
     856              : 
     857              : void
     858            4 : replorigin_redo(XLogReaderState *record)
     859              : {
     860            4 :     uint8       info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
     861              : 
     862            4 :     switch (info)
     863              :     {
     864            2 :         case XLOG_REPLORIGIN_SET:
     865              :             {
     866            2 :                 xl_replorigin_set *xlrec =
     867            2 :                     (xl_replorigin_set *) XLogRecGetData(record);
     868              : 
     869            2 :                 replorigin_advance(xlrec->node_id,
     870              :                                    xlrec->remote_lsn, record->EndRecPtr,
     871            2 :                                    xlrec->force /* backward */ ,
     872              :                                    false /* WAL log */ );
     873            2 :                 break;
     874              :             }
     875            2 :         case XLOG_REPLORIGIN_DROP:
     876              :             {
     877              :                 xl_replorigin_drop *xlrec;
     878              :                 int         i;
     879              : 
     880            2 :                 xlrec = (xl_replorigin_drop *) XLogRecGetData(record);
     881              : 
     882            3 :                 for (i = 0; i < max_active_replication_origins; i++)
     883              :                 {
     884            3 :                     ReplicationState *state = &replication_states[i];
     885              : 
     886              :                     /* found our slot */
     887            3 :                     if (state->roident == xlrec->node_id)
     888              :                     {
     889              :                         /* reset entry */
     890            2 :                         state->roident = InvalidReplOriginId;
     891            2 :                         state->remote_lsn = InvalidXLogRecPtr;
     892            2 :                         state->local_lsn = InvalidXLogRecPtr;
     893            2 :                         break;
     894              :                     }
     895              :                 }
     896            2 :                 break;
     897              :             }
     898            0 :         default:
     899            0 :             elog(PANIC, "replorigin_redo: unknown op code %u", info);
     900              :     }
     901            4 : }
     902              : 
     903              : 
     904              : /*
     905              :  * Tell the replication origin progress machinery that a commit from 'node'
     906              :  * that originated at the LSN remote_commit on the remote node was replayed
     907              :  * successfully and that we don't need to do so again. In combination with
     908              :  * setting up replorigin_xact_state {.origin_lsn, .origin_timestamp}
     909              :  * that ensures we won't lose knowledge about that after a crash if the
     910              :  * transaction had a persistent effect (think of asynchronous commits).
     911              :  *
     912              :  * local_commit needs to be a local LSN of the commit so that we can make sure
     913              :  * upon a checkpoint that enough WAL has been persisted to disk.
     914              :  *
     915              :  * Needs to be called with a RowExclusiveLock on pg_replication_origin,
     916              :  * unless running in recovery.
     917              :  */
     918              : void
     919          253 : replorigin_advance(ReplOriginId node,
     920              :                    XLogRecPtr remote_commit, XLogRecPtr local_commit,
     921              :                    bool go_backward, bool wal_log)
     922              : {
     923              :     int         i;
     924          253 :     ReplicationState *replication_state = NULL;
     925          253 :     ReplicationState *free_state = NULL;
     926              : 
     927              :     Assert(node != InvalidReplOriginId);
     928              : 
     929              :     /* we don't track DoNotReplicateId */
     930          253 :     if (node == DoNotReplicateId)
     931            0 :         return;
     932              : 
     933              :     /*
     934              :      * XXX: For the case where this is called by WAL replay, it'd be more
     935              :      * efficient to restore into a backend local hashtable and only dump into
     936              :      * shmem after recovery is finished. Let's wait with implementing that
     937              :      * till it's shown to be a measurable expense
     938              :      */
     939              : 
     940              :     /* Lock exclusively, as we may have to create a new table entry. */
     941          253 :     LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
     942              : 
     943              :     /*
     944              :      * Search for either an existing slot for the origin, or a free one we can
     945              :      * use.
     946              :      */
     947         2326 :     for (i = 0; i < max_active_replication_origins; i++)
     948              :     {
     949         2121 :         ReplicationState *curstate = &replication_states[i];
     950              : 
     951              :         /* remember where to insert if necessary */
     952         2121 :         if (curstate->roident == InvalidReplOriginId &&
     953              :             free_state == NULL)
     954              :         {
     955          213 :             free_state = curstate;
     956          213 :             continue;
     957              :         }
     958              : 
     959              :         /* not our slot */
     960         1908 :         if (curstate->roident != node)
     961              :         {
     962         1860 :             continue;
     963              :         }
     964              : 
     965              :         /* ok, found slot */
     966           48 :         replication_state = curstate;
     967              : 
     968           48 :         LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE);
     969              : 
     970              :         /* Make sure it's not used by somebody else */
     971           48 :         if (replication_state->refcount > 0)
     972              :         {
     973            0 :             ereport(ERROR,
     974              :                     (errcode(ERRCODE_OBJECT_IN_USE),
     975              :                      (replication_state->acquired_by != 0)
     976              :                      ? errmsg("replication origin with ID %d is already active for PID %d",
     977              :                               replication_state->roident,
     978              :                               replication_state->acquired_by)
     979              :                      : errmsg("replication origin with ID %d is already active in another process",
     980              :                               replication_state->roident)));
     981              :         }
     982              : 
     983           48 :         break;
     984              :     }
     985              : 
     986          253 :     if (replication_state == NULL && free_state == NULL)
     987            0 :         ereport(ERROR,
     988              :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     989              :                  errmsg("could not find free replication state slot for replication origin with ID %d",
     990              :                         node),
     991              :                  errhint("Increase \"max_active_replication_origins\" and try again.")));
     992              : 
     993          253 :     if (replication_state == NULL)
     994              :     {
     995              :         /* initialize new slot */
     996          205 :         LWLockAcquire(&free_state->lock, LW_EXCLUSIVE);
     997          205 :         replication_state = free_state;
     998              :         Assert(!XLogRecPtrIsValid(replication_state->remote_lsn));
     999              :         Assert(!XLogRecPtrIsValid(replication_state->local_lsn));
    1000          205 :         replication_state->roident = node;
    1001              :     }
    1002              : 
    1003              :     Assert(replication_state->roident != InvalidReplOriginId);
    1004              : 
    1005              :     /*
    1006              :      * If somebody "forcefully" sets this slot, WAL log it, so it's durable
    1007              :      * and the standby gets the message. Primarily this will be called during
    1008              :      * WAL replay (of commit records) where no WAL logging is necessary.
    1009              :      */
    1010          253 :     if (wal_log)
    1011              :     {
    1012              :         xl_replorigin_set xlrec;
    1013              : 
    1014          212 :         xlrec.remote_lsn = remote_commit;
    1015          212 :         xlrec.node_id = node;
    1016          212 :         xlrec.force = go_backward;
    1017              : 
    1018          212 :         XLogBeginInsert();
    1019          212 :         XLogRegisterData(&xlrec, sizeof(xlrec));
    1020              : 
    1021          212 :         XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_SET);
    1022              :     }
    1023              : 
    1024              :     /*
    1025              :      * Due to - harmless - race conditions during a checkpoint we could see
    1026              :      * values here that are older than the ones we already have in memory. We
    1027              :      * could also see older values for prepared transactions when the prepare
    1028              :      * is sent at a later point of time along with commit prepared and there
    1029              :      * are other transactions commits between prepare and commit prepared. See
    1030              :      * ReorderBufferFinishPrepared. Don't overwrite those.
    1031              :      */
    1032          253 :     if (go_backward || replication_state->remote_lsn < remote_commit)
    1033          246 :         replication_state->remote_lsn = remote_commit;
    1034          253 :     if (XLogRecPtrIsValid(local_commit) &&
    1035           38 :         (go_backward || replication_state->local_lsn < local_commit))
    1036           40 :         replication_state->local_lsn = local_commit;
    1037          253 :     LWLockRelease(&replication_state->lock);
    1038              : 
    1039              :     /*
    1040              :      * Release *after* changing the LSNs, slot isn't acquired and thus could
    1041              :      * otherwise be dropped anytime.
    1042              :      */
    1043          253 :     LWLockRelease(ReplicationOriginLock);
    1044              : }
    1045              : 
    1046              : 
    1047              : XLogRecPtr
    1048            8 : replorigin_get_progress(ReplOriginId node, bool flush)
    1049              : {
    1050              :     int         i;
    1051            8 :     XLogRecPtr  local_lsn = InvalidXLogRecPtr;
    1052            8 :     XLogRecPtr  remote_lsn = InvalidXLogRecPtr;
    1053              : 
    1054              :     /* prevent slots from being concurrently dropped */
    1055            8 :     LWLockAcquire(ReplicationOriginLock, LW_SHARED);
    1056              : 
    1057           38 :     for (i = 0; i < max_active_replication_origins; i++)
    1058              :     {
    1059              :         ReplicationState *state;
    1060              : 
    1061           35 :         state = &replication_states[i];
    1062              : 
    1063           35 :         if (state->roident == node)
    1064              :         {
    1065            5 :             LWLockAcquire(&state->lock, LW_SHARED);
    1066              : 
    1067            5 :             remote_lsn = state->remote_lsn;
    1068            5 :             local_lsn = state->local_lsn;
    1069              : 
    1070            5 :             LWLockRelease(&state->lock);
    1071              : 
    1072            5 :             break;
    1073              :         }
    1074              :     }
    1075              : 
    1076            8 :     LWLockRelease(ReplicationOriginLock);
    1077              : 
    1078            8 :     if (flush && XLogRecPtrIsValid(local_lsn))
    1079            1 :         XLogFlush(local_lsn);
    1080              : 
    1081            8 :     return remote_lsn;
    1082              : }
    1083              : 
    1084              : /* Helper function to reset the session replication origin */
    1085              : static void
    1086          519 : replorigin_session_reset_internal(void)
    1087              : {
    1088              :     ConditionVariable *cv;
    1089              : 
    1090              :     Assert(session_replication_state != NULL);
    1091              : 
    1092          519 :     LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
    1093              : 
    1094              :     /* The origin must be held by at least one process at this point. */
    1095              :     Assert(session_replication_state->refcount > 0);
    1096              : 
    1097              :     /*
    1098              :      * Reset the PID only if the current session is the first to set up this
    1099              :      * origin. This avoids clearing the first process's PID when any other
    1100              :      * session releases the origin.
    1101              :      */
    1102          519 :     if (session_replication_state->acquired_by == MyProcPid)
    1103          505 :         session_replication_state->acquired_by = 0;
    1104              : 
    1105          519 :     session_replication_state->refcount--;
    1106              : 
    1107          519 :     cv = &session_replication_state->origin_cv;
    1108          519 :     session_replication_state = NULL;
    1109              : 
    1110          519 :     LWLockRelease(ReplicationOriginLock);
    1111              : 
    1112          519 :     ConditionVariableBroadcast(cv);
    1113          519 : }
    1114              : 
    1115              : /*
    1116              :  * Tear down a (possibly) configured session replication origin during process
    1117              :  * exit.
    1118              :  */
    1119              : static void
    1120          516 : ReplicationOriginExitCleanup(int code, Datum arg)
    1121              : {
    1122          516 :     if (session_replication_state == NULL)
    1123          200 :         return;
    1124              : 
    1125          316 :     replorigin_session_reset_internal();
    1126              : }
    1127              : 
    1128              : /*
    1129              :  * Setup a replication origin in the shared memory struct if it doesn't
    1130              :  * already exist and cache access to the specific ReplicationSlot so the
    1131              :  * array doesn't have to be searched when calling
    1132              :  * replorigin_session_advance().
    1133              :  *
    1134              :  * Normally only one such cached origin can exist per process so the cached
    1135              :  * value can only be set again after the previous value is torn down with
    1136              :  * replorigin_session_reset(). For this normal case pass acquired_by = 0
    1137              :  * (meaning the slot is not allowed to be already acquired by another process).
    1138              :  *
    1139              :  * However, sometimes multiple processes can safely re-use the same origin slot
    1140              :  * (for example, multiple parallel apply processes can safely use the same
    1141              :  * origin, provided they maintain commit order by allowing only one process to
    1142              :  * commit at a time). For this case the first process must pass acquired_by =
    1143              :  * 0, and then the other processes sharing that same origin can pass
    1144              :  * acquired_by = PID of the first process.
    1145              :  */
    1146              : void
    1147          522 : replorigin_session_setup(ReplOriginId node, int acquired_by)
    1148              : {
    1149              :     static bool registered_cleanup;
    1150              :     int         i;
    1151          522 :     int         free_slot = -1;
    1152              : 
    1153          522 :     if (!registered_cleanup)
    1154              :     {
    1155          516 :         on_shmem_exit(ReplicationOriginExitCleanup, 0);
    1156          516 :         registered_cleanup = true;
    1157              :     }
    1158              : 
    1159              :     Assert(max_active_replication_origins > 0);
    1160              : 
    1161          522 :     if (session_replication_state != NULL)
    1162            1 :         ereport(ERROR,
    1163              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1164              :                  errmsg("cannot setup replication origin when one is already setup")));
    1165              : 
    1166              :     /* Lock exclusively, as we may have to create a new table entry. */
    1167          521 :     LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
    1168              : 
    1169              :     /*
    1170              :      * Search for either an existing slot for the origin, or a free one we can
    1171              :      * use.
    1172              :      */
    1173         2074 :     for (i = 0; i < max_active_replication_origins; i++)
    1174              :     {
    1175         1949 :         ReplicationState *curstate = &replication_states[i];
    1176              : 
    1177              :         /* remember where to insert if necessary */
    1178         1949 :         if (curstate->roident == InvalidReplOriginId &&
    1179              :             free_slot == -1)
    1180              :         {
    1181          128 :             free_slot = i;
    1182          128 :             continue;
    1183              :         }
    1184              : 
    1185              :         /* not our slot */
    1186         1821 :         if (curstate->roident != node)
    1187         1425 :             continue;
    1188              : 
    1189          396 :         else if (curstate->acquired_by != 0 && acquired_by == 0)
    1190              :         {
    1191            1 :             ereport(ERROR,
    1192              :                     (errcode(ERRCODE_OBJECT_IN_USE),
    1193              :                      errmsg("replication origin with ID %d is already active for PID %d",
    1194              :                             curstate->roident, curstate->acquired_by)));
    1195              :         }
    1196              : 
    1197          395 :         else if (curstate->acquired_by != acquired_by)
    1198              :         {
    1199            0 :             ereport(ERROR,
    1200              :                     (errcode(ERRCODE_OBJECT_IN_USE),
    1201              :                      errmsg("could not find replication state slot for replication origin with OID %u which was acquired by %d",
    1202              :                             node, acquired_by)));
    1203              :         }
    1204              : 
    1205              :         /*
    1206              :          * The origin is in use, but PID is not recorded. This can happen if
    1207              :          * the process that originally acquired the origin exited without
    1208              :          * releasing it. To ensure correctness, other processes cannot acquire
    1209              :          * the origin until all processes currently using it have released it.
    1210              :          */
    1211          395 :         else if (curstate->acquired_by == 0 && curstate->refcount > 0)
    1212            0 :             ereport(ERROR,
    1213              :                     (errcode(ERRCODE_OBJECT_IN_USE),
    1214              :                      errmsg("replication origin with ID %d is already active in another process",
    1215              :                             curstate->roident)));
    1216              : 
    1217              :         /* ok, found slot */
    1218          395 :         session_replication_state = curstate;
    1219          395 :         break;
    1220              :     }
    1221              : 
    1222              : 
    1223          520 :     if (session_replication_state == NULL && free_slot == -1)
    1224            0 :         ereport(ERROR,
    1225              :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
    1226              :                  errmsg("could not find free replication state slot for replication origin with ID %d",
    1227              :                         node),
    1228              :                  errhint("Increase \"max_active_replication_origins\" and try again.")));
    1229          520 :     else if (session_replication_state == NULL)
    1230              :     {
    1231          125 :         if (acquired_by)
    1232            1 :             ereport(ERROR,
    1233              :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1234              :                      errmsg("cannot use PID %d for inactive replication origin with ID %d",
    1235              :                             acquired_by, node)));
    1236              : 
    1237              :         /* initialize new slot */
    1238          124 :         session_replication_state = &replication_states[free_slot];
    1239              :         Assert(!XLogRecPtrIsValid(session_replication_state->remote_lsn));
    1240              :         Assert(!XLogRecPtrIsValid(session_replication_state->local_lsn));
    1241          124 :         session_replication_state->roident = node;
    1242              :     }
    1243              : 
    1244              : 
    1245              :     Assert(session_replication_state->roident != InvalidReplOriginId);
    1246              : 
    1247          519 :     if (acquired_by == 0)
    1248              :     {
    1249          505 :         session_replication_state->acquired_by = MyProcPid;
    1250              :         Assert(session_replication_state->refcount == 0);
    1251              :     }
    1252              :     else
    1253              :     {
    1254              :         /*
    1255              :          * Sanity check: the origin must already be acquired by the process
    1256              :          * passed as input, and at least one process must be using it.
    1257              :          */
    1258              :         Assert(session_replication_state->acquired_by == acquired_by);
    1259              :         Assert(session_replication_state->refcount > 0);
    1260              :     }
    1261              : 
    1262          519 :     session_replication_state->refcount++;
    1263              : 
    1264          519 :     LWLockRelease(ReplicationOriginLock);
    1265              : 
    1266              :     /* probably this one is pointless */
    1267          519 :     ConditionVariableBroadcast(&session_replication_state->origin_cv);
    1268          519 : }
    1269              : 
    1270              : /*
    1271              :  * Reset replay state previously setup in this session.
    1272              :  *
    1273              :  * This function may only be called if an origin was setup with
    1274              :  * replorigin_session_setup().
    1275              :  */
    1276              : void
    1277          205 : replorigin_session_reset(void)
    1278              : {
    1279              :     Assert(max_active_replication_origins != 0);
    1280              : 
    1281          205 :     if (session_replication_state == NULL)
    1282            1 :         ereport(ERROR,
    1283              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1284              :                  errmsg("no replication origin is configured")));
    1285              : 
    1286              :     /*
    1287              :      * Restrict explicit resetting of the replication origin if it was first
    1288              :      * acquired by this process and others are still using it. While the
    1289              :      * system handles this safely (as happens if the first session exits
    1290              :      * without calling reset), it is best to avoid doing so.
    1291              :      */
    1292          204 :     if (session_replication_state->acquired_by == MyProcPid &&
    1293          202 :         session_replication_state->refcount > 1)
    1294            1 :         ereport(ERROR,
    1295              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1296              :                  errmsg("cannot reset replication origin with ID %d because it is still in use by other processes",
    1297              :                         session_replication_state->roident),
    1298              :                  errdetail("This session is the first process for this replication origin, and other processes are currently sharing it."),
    1299              :                  errhint("Reset the replication origin in all other processes before retrying.")));
    1300              : 
    1301          203 :     replorigin_session_reset_internal();
    1302          203 : }
    1303              : 
    1304              : /*
    1305              :  * Do the same work replorigin_advance() does, just on the session's
    1306              :  * configured origin.
    1307              :  *
    1308              :  * This is noticeably cheaper than using replorigin_advance().
    1309              :  */
    1310              : void
    1311         1131 : replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit)
    1312              : {
    1313              :     Assert(session_replication_state != NULL);
    1314              :     Assert(session_replication_state->roident != InvalidReplOriginId);
    1315              : 
    1316         1131 :     LWLockAcquire(&session_replication_state->lock, LW_EXCLUSIVE);
    1317         1131 :     if (session_replication_state->local_lsn < local_commit)
    1318         1131 :         session_replication_state->local_lsn = local_commit;
    1319         1131 :     if (session_replication_state->remote_lsn < remote_commit)
    1320          524 :         session_replication_state->remote_lsn = remote_commit;
    1321         1131 :     LWLockRelease(&session_replication_state->lock);
    1322         1131 : }
    1323              : 
    1324              : /*
    1325              :  * Ask the machinery about the point up to which we successfully replayed
    1326              :  * changes from an already setup replication origin.
    1327              :  */
    1328              : XLogRecPtr
    1329          291 : replorigin_session_get_progress(bool flush)
    1330              : {
    1331              :     XLogRecPtr  remote_lsn;
    1332              :     XLogRecPtr  local_lsn;
    1333              : 
    1334              :     Assert(session_replication_state != NULL);
    1335              : 
    1336          291 :     LWLockAcquire(&session_replication_state->lock, LW_SHARED);
    1337          291 :     remote_lsn = session_replication_state->remote_lsn;
    1338          291 :     local_lsn = session_replication_state->local_lsn;
    1339          291 :     LWLockRelease(&session_replication_state->lock);
    1340              : 
    1341          291 :     if (flush && XLogRecPtrIsValid(local_lsn))
    1342            1 :         XLogFlush(local_lsn);
    1343              : 
    1344          291 :     return remote_lsn;
    1345              : }
    1346              : 
    1347              : /*
    1348              :  * Clear the per-transaction replication origin state.
    1349              :  *
    1350              :  * replorigin_session_origin is also cleared if clear_origin is set.
    1351              :  */
    1352              : void
    1353          817 : replorigin_xact_clear(bool clear_origin)
    1354              : {
    1355          817 :     replorigin_xact_state.origin_lsn = InvalidXLogRecPtr;
    1356          817 :     replorigin_xact_state.origin_timestamp = 0;
    1357          817 :     if (clear_origin)
    1358          817 :         replorigin_xact_state.origin = InvalidReplOriginId;
    1359          817 : }
    1360              : 
    1361              : 
    1362              : /* ---------------------------------------------------------------------------
    1363              :  * SQL functions for working with replication origin.
    1364              :  *
    1365              :  * These mostly should be fairly short wrappers around more generic functions.
    1366              :  * ---------------------------------------------------------------------------
    1367              :  */
    1368              : 
    1369              : /*
    1370              :  * Create replication origin for the passed in name, and return the assigned
    1371              :  * oid.
    1372              :  */
    1373              : Datum
    1374           14 : pg_replication_origin_create(PG_FUNCTION_ARGS)
    1375              : {
    1376              :     char       *name;
    1377              :     ReplOriginId roident;
    1378              : 
    1379           14 :     replorigin_check_prerequisites(false, false);
    1380              : 
    1381           14 :     name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
    1382              : 
    1383              :     /*
    1384              :      * Replication origins "any and "none" are reserved for system options.
    1385              :      * The origins "pg_xxx" are reserved for internal use.
    1386              :      */
    1387           14 :     if (IsReservedName(name) || IsReservedOriginName(name))
    1388            3 :         ereport(ERROR,
    1389              :                 (errcode(ERRCODE_RESERVED_NAME),
    1390              :                  errmsg("replication origin name \"%s\" is reserved",
    1391              :                         name),
    1392              :                  errdetail("Origin names \"%s\", \"%s\", and names starting with \"pg_\" are reserved.",
    1393              :                            LOGICALREP_ORIGIN_ANY, LOGICALREP_ORIGIN_NONE)));
    1394              : 
    1395              :     /*
    1396              :      * If built with appropriate switch, whine when regression-testing
    1397              :      * conventions for replication origin names are violated.
    1398              :      */
    1399              : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
    1400              :     if (strncmp(name, "regress_", 8) != 0)
    1401              :         elog(WARNING, "replication origins created by regression test cases should have names starting with \"regress_\"");
    1402              : #endif
    1403              : 
    1404           11 :     roident = replorigin_create(name);
    1405              : 
    1406            7 :     pfree(name);
    1407              : 
    1408            7 :     PG_RETURN_OID(roident);
    1409              : }
    1410              : 
    1411              : /*
    1412              :  * Drop replication origin.
    1413              :  */
    1414              : Datum
    1415            9 : pg_replication_origin_drop(PG_FUNCTION_ARGS)
    1416              : {
    1417              :     char       *name;
    1418              : 
    1419            9 :     replorigin_check_prerequisites(false, false);
    1420              : 
    1421            9 :     name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
    1422              : 
    1423            9 :     replorigin_drop_by_name(name, false, true);
    1424              : 
    1425            8 :     pfree(name);
    1426              : 
    1427            8 :     PG_RETURN_VOID();
    1428              : }
    1429              : 
    1430              : /*
    1431              :  * Return oid of a replication origin.
    1432              :  */
    1433              : Datum
    1434            0 : pg_replication_origin_oid(PG_FUNCTION_ARGS)
    1435              : {
    1436              :     char       *name;
    1437              :     ReplOriginId roident;
    1438              : 
    1439            0 :     replorigin_check_prerequisites(false, false);
    1440              : 
    1441            0 :     name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
    1442            0 :     roident = replorigin_by_name(name, true);
    1443              : 
    1444            0 :     pfree(name);
    1445              : 
    1446            0 :     if (OidIsValid(roident))
    1447            0 :         PG_RETURN_OID(roident);
    1448            0 :     PG_RETURN_NULL();
    1449              : }
    1450              : 
    1451              : /*
    1452              :  * Setup a replication origin for this session.
    1453              :  */
    1454              : Datum
    1455           11 : pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
    1456              : {
    1457              :     char       *name;
    1458              :     ReplOriginId origin;
    1459              :     int         pid;
    1460              : 
    1461           11 :     replorigin_check_prerequisites(true, false);
    1462              : 
    1463           11 :     name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
    1464           11 :     origin = replorigin_by_name(name, false);
    1465           10 :     pid = PG_GETARG_INT32(1);
    1466           10 :     replorigin_session_setup(origin, pid);
    1467              : 
    1468            8 :     replorigin_xact_state.origin = origin;
    1469              : 
    1470            8 :     pfree(name);
    1471              : 
    1472            8 :     PG_RETURN_VOID();
    1473              : }
    1474              : 
    1475              : /*
    1476              :  * Reset previously setup origin in this session
    1477              :  */
    1478              : Datum
    1479           10 : pg_replication_origin_session_reset(PG_FUNCTION_ARGS)
    1480              : {
    1481           10 :     replorigin_check_prerequisites(true, false);
    1482              : 
    1483           10 :     replorigin_session_reset();
    1484              : 
    1485            8 :     replorigin_xact_clear(true);
    1486              : 
    1487            8 :     PG_RETURN_VOID();
    1488              : }
    1489              : 
    1490              : /*
    1491              :  * Has a replication origin been setup for this session.
    1492              :  */
    1493              : Datum
    1494            4 : pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)
    1495              : {
    1496            4 :     replorigin_check_prerequisites(false, false);
    1497              : 
    1498            4 :     PG_RETURN_BOOL(replorigin_xact_state.origin != InvalidReplOriginId);
    1499              : }
    1500              : 
    1501              : 
    1502              : /*
    1503              :  * Return the replication progress for origin setup in the current session.
    1504              :  *
    1505              :  * If 'flush' is set to true it is ensured that the returned value corresponds
    1506              :  * to a local transaction that has been flushed. This is useful if asynchronous
    1507              :  * commits are used when replaying replicated transactions.
    1508              :  */
    1509              : Datum
    1510            2 : pg_replication_origin_session_progress(PG_FUNCTION_ARGS)
    1511              : {
    1512            2 :     XLogRecPtr  remote_lsn = InvalidXLogRecPtr;
    1513            2 :     bool        flush = PG_GETARG_BOOL(0);
    1514              : 
    1515            2 :     replorigin_check_prerequisites(true, false);
    1516              : 
    1517            2 :     if (session_replication_state == NULL)
    1518            0 :         ereport(ERROR,
    1519              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1520              :                  errmsg("no replication origin is configured")));
    1521              : 
    1522            2 :     remote_lsn = replorigin_session_get_progress(flush);
    1523              : 
    1524            2 :     if (!XLogRecPtrIsValid(remote_lsn))
    1525            0 :         PG_RETURN_NULL();
    1526              : 
    1527            2 :     PG_RETURN_LSN(remote_lsn);
    1528              : }
    1529              : 
    1530              : Datum
    1531            1 : pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)
    1532              : {
    1533            1 :     XLogRecPtr  location = PG_GETARG_LSN(0);
    1534              : 
    1535            1 :     replorigin_check_prerequisites(true, false);
    1536              : 
    1537            1 :     if (session_replication_state == NULL)
    1538            0 :         ereport(ERROR,
    1539              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1540              :                  errmsg("no replication origin is configured")));
    1541              : 
    1542            1 :     replorigin_xact_state.origin_lsn = location;
    1543            1 :     replorigin_xact_state.origin_timestamp = PG_GETARG_TIMESTAMPTZ(1);
    1544              : 
    1545            1 :     PG_RETURN_VOID();
    1546              : }
    1547              : 
    1548              : Datum
    1549            0 : pg_replication_origin_xact_reset(PG_FUNCTION_ARGS)
    1550              : {
    1551            0 :     replorigin_check_prerequisites(true, false);
    1552              : 
    1553              :     /* Do not clear the session origin */
    1554            0 :     replorigin_xact_clear(false);
    1555              : 
    1556            0 :     PG_RETURN_VOID();
    1557              : }
    1558              : 
    1559              : 
    1560              : Datum
    1561            3 : pg_replication_origin_advance(PG_FUNCTION_ARGS)
    1562              : {
    1563            3 :     text       *name = PG_GETARG_TEXT_PP(0);
    1564            3 :     XLogRecPtr  remote_commit = PG_GETARG_LSN(1);
    1565              :     ReplOriginId node;
    1566              : 
    1567            3 :     replorigin_check_prerequisites(true, false);
    1568              : 
    1569              :     /* lock to prevent the replication origin from vanishing */
    1570            3 :     LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
    1571              : 
    1572            3 :     node = replorigin_by_name(text_to_cstring(name), false);
    1573              : 
    1574              :     /*
    1575              :      * Can't sensibly pass a local commit to be flushed at checkpoint - this
    1576              :      * xact hasn't committed yet. This is why this function should be used to
    1577              :      * set up the initial replication state, but not for replay.
    1578              :      */
    1579            2 :     replorigin_advance(node, remote_commit, InvalidXLogRecPtr,
    1580              :                        true /* go backward */ , true /* WAL log */ );
    1581              : 
    1582            2 :     UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
    1583              : 
    1584            2 :     PG_RETURN_VOID();
    1585              : }
    1586              : 
    1587              : 
    1588              : /*
    1589              :  * Return the replication progress for an individual replication origin.
    1590              :  *
    1591              :  * If 'flush' is set to true it is ensured that the returned value corresponds
    1592              :  * to a local transaction that has been flushed. This is useful if asynchronous
    1593              :  * commits are used when replaying replicated transactions.
    1594              :  */
    1595              : Datum
    1596            3 : pg_replication_origin_progress(PG_FUNCTION_ARGS)
    1597              : {
    1598              :     char       *name;
    1599              :     bool        flush;
    1600              :     ReplOriginId roident;
    1601            3 :     XLogRecPtr  remote_lsn = InvalidXLogRecPtr;
    1602              : 
    1603            3 :     replorigin_check_prerequisites(true, true);
    1604              : 
    1605            3 :     name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
    1606            3 :     flush = PG_GETARG_BOOL(1);
    1607              : 
    1608            3 :     roident = replorigin_by_name(name, false);
    1609              :     Assert(OidIsValid(roident));
    1610              : 
    1611            2 :     remote_lsn = replorigin_get_progress(roident, flush);
    1612              : 
    1613            2 :     if (!XLogRecPtrIsValid(remote_lsn))
    1614            0 :         PG_RETURN_NULL();
    1615              : 
    1616            2 :     PG_RETURN_LSN(remote_lsn);
    1617              : }
    1618              : 
    1619              : 
    1620              : Datum
    1621           11 : pg_show_replication_origin_status(PG_FUNCTION_ARGS)
    1622              : {
    1623           11 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
    1624              :     int         i;
    1625              : #define REPLICATION_ORIGIN_PROGRESS_COLS 4
    1626              : 
    1627              :     /* we want to return 0 rows if slot is set to zero */
    1628           11 :     replorigin_check_prerequisites(false, true);
    1629              : 
    1630           11 :     InitMaterializedSRF(fcinfo, 0);
    1631              : 
    1632              :     /* prevent slots from being concurrently dropped */
    1633           11 :     LWLockAcquire(ReplicationOriginLock, LW_SHARED);
    1634              : 
    1635              :     /*
    1636              :      * Iterate through all possible replication_states, display if they are
    1637              :      * filled. Note that we do not take any locks, so slightly corrupted/out
    1638              :      * of date values are a possibility.
    1639              :      */
    1640          121 :     for (i = 0; i < max_active_replication_origins; i++)
    1641              :     {
    1642              :         ReplicationState *state;
    1643              :         Datum       values[REPLICATION_ORIGIN_PROGRESS_COLS];
    1644              :         bool        nulls[REPLICATION_ORIGIN_PROGRESS_COLS];
    1645              :         char       *roname;
    1646              : 
    1647          110 :         state = &replication_states[i];
    1648              : 
    1649              :         /* unused slot, nothing to display */
    1650          110 :         if (state->roident == InvalidReplOriginId)
    1651           97 :             continue;
    1652              : 
    1653           13 :         memset(values, 0, sizeof(values));
    1654           13 :         memset(nulls, 1, sizeof(nulls));
    1655              : 
    1656           13 :         values[0] = ObjectIdGetDatum(state->roident);
    1657           13 :         nulls[0] = false;
    1658              : 
    1659              :         /*
    1660              :          * We're not preventing the origin to be dropped concurrently, so
    1661              :          * silently accept that it might be gone.
    1662              :          */
    1663           13 :         if (replorigin_by_oid(state->roident, true,
    1664              :                               &roname))
    1665              :         {
    1666           13 :             values[1] = CStringGetTextDatum(roname);
    1667           13 :             nulls[1] = false;
    1668              :         }
    1669              : 
    1670           13 :         LWLockAcquire(&state->lock, LW_SHARED);
    1671              : 
    1672           13 :         values[2] = LSNGetDatum(state->remote_lsn);
    1673           13 :         nulls[2] = false;
    1674              : 
    1675           13 :         values[3] = LSNGetDatum(state->local_lsn);
    1676           13 :         nulls[3] = false;
    1677              : 
    1678           13 :         LWLockRelease(&state->lock);
    1679              : 
    1680           13 :         tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
    1681              :                              values, nulls);
    1682              :     }
    1683              : 
    1684           11 :     LWLockRelease(ReplicationOriginLock);
    1685              : 
    1686              : #undef REPLICATION_ORIGIN_PROGRESS_COLS
    1687              : 
    1688           11 :     return (Datum) 0;
    1689              : }
        

Generated by: LCOV version 2.0-1