LCOV - code coverage report
Current view: top level - src/backend/replication/logical - origin.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 394 455 86.6 %
Date: 2025-04-19 19:15:24 Functions: 28 31 90.3 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * origin.c
       4             :  *    Logical replication progress tracking support.
       5             :  *
       6             :  * Copyright (c) 2013-2025, PostgreSQL Global Development Group
       7             :  *
       8             :  * IDENTIFICATION
       9             :  *    src/backend/replication/logical/origin.c
      10             :  *
      11             :  * NOTES
      12             :  *
      13             :  * This file provides the following:
      14             :  * * An infrastructure to name nodes in a replication setup
      15             :  * * A facility to efficiently store and persist replication progress in an
      16             :  *   efficient and durable manner.
      17             :  *
      18             :  * Replication origin consist out of a descriptive, user defined, external
      19             :  * name and a short, thus space efficient, internal 2 byte one. This split
      20             :  * exists because replication origin have to be stored in WAL and shared
      21             :  * memory and long descriptors would be inefficient.  For now only use 2 bytes
      22             :  * for the internal id of a replication origin as it seems unlikely that there
      23             :  * soon will be more than 65k nodes in one replication setup; and using only
      24             :  * two bytes allow us to be more space efficient.
      25             :  *
      26             :  * Replication progress is tracked in a shared memory table
      27             :  * (ReplicationState) that's dumped to disk every checkpoint. Entries
      28             :  * ('slots') in this table are identified by the internal id. That's the case
      29             :  * because it allows to increase replication progress during crash
      30             :  * recovery. To allow doing so we store the original LSN (from the originating
      31             :  * system) of a transaction in the commit record. That allows to recover the
      32             :  * precise replayed state after crash recovery; without requiring synchronous
      33             :  * commits. Allowing logical replication to use asynchronous commit is
      34             :  * generally good for performance, but especially important as it allows a
      35             :  * single threaded replay process to keep up with a source that has multiple
      36             :  * backends generating changes concurrently.  For efficiency and simplicity
      37             :  * reasons a backend can setup one replication origin that's from then used as
      38             :  * the source of changes produced by the backend, until reset again.
      39             :  *
      40             :  * This infrastructure is intended to be used in cooperation with logical
      41             :  * decoding. When replaying from a remote system the configured origin is
      42             :  * provided to output plugins, allowing prevention of replication loops and
      43             :  * other filtering.
      44             :  *
      45             :  * There are several levels of locking at work:
      46             :  *
      47             :  * * To create and drop replication origins an exclusive lock on
      48             :  *   pg_replication_slot is required for the duration. That allows us to
      49             :  *   safely and conflict free assign new origins using a dirty snapshot.
      50             :  *
      51             :  * * When creating an in-memory replication progress slot the ReplicationOrigin
      52             :  *   LWLock has to be held exclusively; when iterating over the replication
      53             :  *   progress a shared lock has to be held, the same when advancing the
      54             :  *   replication progress of an individual backend that has not setup as the
      55             :  *   session's replication origin.
      56             :  *
      57             :  * * When manipulating or looking at the remote_lsn and local_lsn fields of a
      58             :  *   replication progress slot that slot's lwlock has to be held. That's
      59             :  *   primarily because we do not assume 8 byte writes (the LSN) is atomic on
      60             :  *   all our platforms, but it also simplifies memory ordering concerns
      61             :  *   between the remote and local lsn. We use a lwlock instead of a spinlock
      62             :  *   so it's less harmful to hold the lock over a WAL write
      63             :  *   (cf. AdvanceReplicationProgress).
      64             :  *
      65             :  * ---------------------------------------------------------------------------
      66             :  */
      67             : 
      68             : #include "postgres.h"
      69             : 
      70             : #include <unistd.h>
      71             : #include <sys/stat.h>
      72             : 
      73             : #include "access/genam.h"
      74             : #include "access/htup_details.h"
      75             : #include "access/table.h"
      76             : #include "access/xact.h"
      77             : #include "access/xloginsert.h"
      78             : #include "catalog/catalog.h"
      79             : #include "catalog/indexing.h"
      80             : #include "catalog/pg_subscription.h"
      81             : #include "funcapi.h"
      82             : #include "miscadmin.h"
      83             : #include "nodes/execnodes.h"
      84             : #include "pgstat.h"
      85             : #include "replication/origin.h"
      86             : #include "replication/slot.h"
      87             : #include "storage/condition_variable.h"
      88             : #include "storage/fd.h"
      89             : #include "storage/ipc.h"
      90             : #include "storage/lmgr.h"
      91             : #include "utils/builtins.h"
      92             : #include "utils/fmgroids.h"
      93             : #include "utils/guc.h"
      94             : #include "utils/pg_lsn.h"
      95             : #include "utils/rel.h"
      96             : #include "utils/snapmgr.h"
      97             : #include "utils/syscache.h"
      98             : 
      99             : /* paths for replication origin checkpoint files */
     100             : #define PG_REPLORIGIN_CHECKPOINT_FILENAME PG_LOGICAL_DIR "/replorigin_checkpoint"
     101             : #define PG_REPLORIGIN_CHECKPOINT_TMPFILE PG_REPLORIGIN_CHECKPOINT_FILENAME ".tmp"
     102             : 
     103             : /* GUC variables */
     104             : int         max_active_replication_origins = 10;
     105             : 
     106             : /*
     107             :  * Replay progress of a single remote node.
     108             :  */
     109             : typedef struct ReplicationState
     110             : {
     111             :     /*
     112             :      * Local identifier for the remote node.
     113             :      */
     114             :     RepOriginId roident;
     115             : 
     116             :     /*
     117             :      * Location of the latest commit from the remote side.
     118             :      */
     119             :     XLogRecPtr  remote_lsn;
     120             : 
     121             :     /*
     122             :      * Remember the local lsn of the commit record so we can XLogFlush() to it
     123             :      * during a checkpoint so we know the commit record actually is safe on
     124             :      * disk.
     125             :      */
     126             :     XLogRecPtr  local_lsn;
     127             : 
     128             :     /*
     129             :      * PID of backend that's acquired slot, or 0 if none.
     130             :      */
     131             :     int         acquired_by;
     132             : 
     133             :     /*
     134             :      * Condition variable that's signaled when acquired_by changes.
     135             :      */
     136             :     ConditionVariable origin_cv;
     137             : 
     138             :     /*
     139             :      * Lock protecting remote_lsn and local_lsn.
     140             :      */
     141             :     LWLock      lock;
     142             : } ReplicationState;
     143             : 
     144             : /*
     145             :  * On disk version of ReplicationState.
     146             :  */
     147             : typedef struct ReplicationStateOnDisk
     148             : {
     149             :     RepOriginId roident;
     150             :     XLogRecPtr  remote_lsn;
     151             : } ReplicationStateOnDisk;
     152             : 
     153             : 
     154             : typedef struct ReplicationStateCtl
     155             : {
     156             :     /* Tranche to use for per-origin LWLocks */
     157             :     int         tranche_id;
     158             :     /* Array of length max_active_replication_origins */
     159             :     ReplicationState states[FLEXIBLE_ARRAY_MEMBER];
     160             : } ReplicationStateCtl;
     161             : 
     162             : /* external variables */
     163             : RepOriginId replorigin_session_origin = InvalidRepOriginId; /* assumed identity */
     164             : XLogRecPtr  replorigin_session_origin_lsn = InvalidXLogRecPtr;
     165             : TimestampTz replorigin_session_origin_timestamp = 0;
     166             : 
     167             : /*
     168             :  * Base address into a shared memory array of replication states of size
     169             :  * max_active_replication_origins.
     170             :  */
     171             : static ReplicationState *replication_states;
     172             : 
     173             : /*
     174             :  * Actual shared memory block (replication_states[] is now part of this).
     175             :  */
     176             : static ReplicationStateCtl *replication_states_ctl;
     177             : 
     178             : /*
     179             :  * We keep a pointer to this backend's ReplicationState to avoid having to
     180             :  * search the replication_states array in replorigin_session_advance for each
     181             :  * remote commit.  (Ownership of a backend's own entry can only be changed by
     182             :  * that backend.)
     183             :  */
     184             : static ReplicationState *session_replication_state = NULL;
     185             : 
     186             : /* Magic for on disk files. */
     187             : #define REPLICATION_STATE_MAGIC ((uint32) 0x1257DADE)
     188             : 
     189             : static void
     190          84 : replorigin_check_prerequisites(bool check_origins, bool recoveryOK)
     191             : {
     192          84 :     if (check_origins && max_active_replication_origins == 0)
     193           0 :         ereport(ERROR,
     194             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     195             :                  errmsg("cannot query or manipulate replication origin when \"max_active_replication_origins\" is 0")));
     196             : 
     197          84 :     if (!recoveryOK && RecoveryInProgress())
     198           0 :         ereport(ERROR,
     199             :                 (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
     200             :                  errmsg("cannot manipulate replication origins during recovery")));
     201          84 : }
     202             : 
     203             : 
     204             : /*
     205             :  * IsReservedOriginName
     206             :  *      True iff name is either "none" or "any".
     207             :  */
     208             : static bool
     209          16 : IsReservedOriginName(const char *name)
     210             : {
     211          30 :     return ((pg_strcasecmp(name, LOGICALREP_ORIGIN_NONE) == 0) ||
     212          14 :             (pg_strcasecmp(name, LOGICALREP_ORIGIN_ANY) == 0));
     213             : }
     214             : 
     215             : /* ---------------------------------------------------------------------------
     216             :  * Functions for working with replication origins themselves.
     217             :  * ---------------------------------------------------------------------------
     218             :  */
     219             : 
     220             : /*
     221             :  * Check for a persistent replication origin identified by name.
     222             :  *
     223             :  * Returns InvalidOid if the node isn't known yet and missing_ok is true.
     224             :  */
     225             : RepOriginId
     226        1824 : replorigin_by_name(const char *roname, bool missing_ok)
     227             : {
     228             :     Form_pg_replication_origin ident;
     229        1824 :     Oid         roident = InvalidOid;
     230             :     HeapTuple   tuple;
     231             :     Datum       roname_d;
     232             : 
     233        1824 :     roname_d = CStringGetTextDatum(roname);
     234             : 
     235        1824 :     tuple = SearchSysCache1(REPLORIGNAME, roname_d);
     236        1824 :     if (HeapTupleIsValid(tuple))
     237             :     {
     238        1064 :         ident = (Form_pg_replication_origin) GETSTRUCT(tuple);
     239        1064 :         roident = ident->roident;
     240        1064 :         ReleaseSysCache(tuple);
     241             :     }
     242         760 :     else if (!missing_ok)
     243           8 :         ereport(ERROR,
     244             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
     245             :                  errmsg("replication origin \"%s\" does not exist",
     246             :                         roname)));
     247             : 
     248        1816 :     return roident;
     249             : }
     250             : 
     251             : /*
     252             :  * Create a replication origin.
     253             :  *
     254             :  * Needs to be called in a transaction.
     255             :  */
     256             : RepOriginId
     257         708 : replorigin_create(const char *roname)
     258             : {
     259             :     Oid         roident;
     260         708 :     HeapTuple   tuple = NULL;
     261             :     Relation    rel;
     262             :     Datum       roname_d;
     263             :     SnapshotData SnapshotDirty;
     264             :     SysScanDesc scan;
     265             :     ScanKeyData key;
     266             : 
     267         708 :     roname_d = CStringGetTextDatum(roname);
     268             : 
     269             :     Assert(IsTransactionState());
     270             : 
     271             :     /*
     272             :      * We need the numeric replication origin to be 16bit wide, so we cannot
     273             :      * rely on the normal oid allocation. Instead we simply scan
     274             :      * pg_replication_origin for the first unused id. That's not particularly
     275             :      * efficient, but this should be a fairly infrequent operation - we can
     276             :      * easily spend a bit more code on this when it turns out it needs to be
     277             :      * faster.
     278             :      *
     279             :      * We handle concurrency by taking an exclusive lock (allowing reads!)
     280             :      * over the table for the duration of the search. Because we use a "dirty
     281             :      * snapshot" we can read rows that other in-progress sessions have
     282             :      * written, even though they would be invisible with normal snapshots. Due
     283             :      * to the exclusive lock there's no danger that new rows can appear while
     284             :      * we're checking.
     285             :      */
     286         708 :     InitDirtySnapshot(SnapshotDirty);
     287             : 
     288         708 :     rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
     289             : 
     290        1238 :     for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
     291             :     {
     292             :         bool        nulls[Natts_pg_replication_origin];
     293             :         Datum       values[Natts_pg_replication_origin];
     294             :         bool        collides;
     295             : 
     296        1238 :         CHECK_FOR_INTERRUPTS();
     297             : 
     298        1238 :         ScanKeyInit(&key,
     299             :                     Anum_pg_replication_origin_roident,
     300             :                     BTEqualStrategyNumber, F_OIDEQ,
     301             :                     ObjectIdGetDatum(roident));
     302             : 
     303        1238 :         scan = systable_beginscan(rel, ReplicationOriginIdentIndex,
     304             :                                   true /* indexOK */ ,
     305             :                                   &SnapshotDirty,
     306             :                                   1, &key);
     307             : 
     308        1238 :         collides = HeapTupleIsValid(systable_getnext(scan));
     309             : 
     310        1238 :         systable_endscan(scan);
     311             : 
     312        1238 :         if (!collides)
     313             :         {
     314             :             /*
     315             :              * Ok, found an unused roident, insert the new row and do a CCI,
     316             :              * so our callers can look it up if they want to.
     317             :              */
     318         708 :             memset(&nulls, 0, sizeof(nulls));
     319             : 
     320         708 :             values[Anum_pg_replication_origin_roident - 1] = ObjectIdGetDatum(roident);
     321         708 :             values[Anum_pg_replication_origin_roname - 1] = roname_d;
     322             : 
     323         708 :             tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
     324         708 :             CatalogTupleInsert(rel, tuple);
     325         706 :             CommandCounterIncrement();
     326         706 :             break;
     327             :         }
     328             :     }
     329             : 
     330             :     /* now release lock again,  */
     331         706 :     table_close(rel, ExclusiveLock);
     332             : 
     333         706 :     if (tuple == NULL)
     334           0 :         ereport(ERROR,
     335             :                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
     336             :                  errmsg("could not find free replication origin ID")));
     337             : 
     338         706 :     heap_freetuple(tuple);
     339         706 :     return roident;
     340             : }
     341             : 
     342             : /*
     343             :  * Helper function to drop a replication origin.
     344             :  */
     345             : static void
     346         582 : replorigin_state_clear(RepOriginId roident, bool nowait)
     347             : {
     348             :     int         i;
     349             : 
     350             :     /*
     351             :      * Clean up the slot state info, if there is any matching slot.
     352             :      */
     353         582 : restart:
     354         582 :     LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
     355             : 
     356        1762 :     for (i = 0; i < max_active_replication_origins; i++)
     357             :     {
     358        1688 :         ReplicationState *state = &replication_states[i];
     359             : 
     360        1688 :         if (state->roident == roident)
     361             :         {
     362             :             /* found our slot, is it busy? */
     363         508 :             if (state->acquired_by != 0)
     364             :             {
     365             :                 ConditionVariable *cv;
     366             : 
     367           0 :                 if (nowait)
     368           0 :                     ereport(ERROR,
     369             :                             (errcode(ERRCODE_OBJECT_IN_USE),
     370             :                              errmsg("could not drop replication origin with ID %d, in use by PID %d",
     371             :                                     state->roident,
     372             :                                     state->acquired_by)));
     373             : 
     374             :                 /*
     375             :                  * We must wait and then retry.  Since we don't know which CV
     376             :                  * to wait on until here, we can't readily use
     377             :                  * ConditionVariablePrepareToSleep (calling it here would be
     378             :                  * wrong, since we could miss the signal if we did so); just
     379             :                  * use ConditionVariableSleep directly.
     380             :                  */
     381           0 :                 cv = &state->origin_cv;
     382             : 
     383           0 :                 LWLockRelease(ReplicationOriginLock);
     384             : 
     385           0 :                 ConditionVariableSleep(cv, WAIT_EVENT_REPLICATION_ORIGIN_DROP);
     386           0 :                 goto restart;
     387             :             }
     388             : 
     389             :             /* first make a WAL log entry */
     390             :             {
     391             :                 xl_replorigin_drop xlrec;
     392             : 
     393         508 :                 xlrec.node_id = roident;
     394         508 :                 XLogBeginInsert();
     395         508 :                 XLogRegisterData(&xlrec, sizeof(xlrec));
     396         508 :                 XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_DROP);
     397             :             }
     398             : 
     399             :             /* then clear the in-memory slot */
     400         508 :             state->roident = InvalidRepOriginId;
     401         508 :             state->remote_lsn = InvalidXLogRecPtr;
     402         508 :             state->local_lsn = InvalidXLogRecPtr;
     403         508 :             break;
     404             :         }
     405             :     }
     406         582 :     LWLockRelease(ReplicationOriginLock);
     407         582 :     ConditionVariableCancelSleep();
     408         582 : }
     409             : 
     410             : /*
     411             :  * Drop replication origin (by name).
     412             :  *
     413             :  * Needs to be called in a transaction.
     414             :  */
     415             : void
     416         954 : replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
     417             : {
     418             :     RepOriginId roident;
     419             :     Relation    rel;
     420             :     HeapTuple   tuple;
     421             : 
     422             :     Assert(IsTransactionState());
     423             : 
     424         954 :     rel = table_open(ReplicationOriginRelationId, RowExclusiveLock);
     425             : 
     426         954 :     roident = replorigin_by_name(name, missing_ok);
     427             : 
     428             :     /* Lock the origin to prevent concurrent drops. */
     429         952 :     LockSharedObject(ReplicationOriginRelationId, roident, 0,
     430             :                      AccessExclusiveLock);
     431             : 
     432         952 :     tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident));
     433         952 :     if (!HeapTupleIsValid(tuple))
     434             :     {
     435         370 :         if (!missing_ok)
     436           0 :             elog(ERROR, "cache lookup failed for replication origin with ID %d",
     437             :                  roident);
     438             : 
     439             :         /*
     440             :          * We don't need to retain the locks if the origin is already dropped.
     441             :          */
     442         370 :         UnlockSharedObject(ReplicationOriginRelationId, roident, 0,
     443             :                            AccessExclusiveLock);
     444         370 :         table_close(rel, RowExclusiveLock);
     445         370 :         return;
     446             :     }
     447             : 
     448         582 :     replorigin_state_clear(roident, nowait);
     449             : 
     450             :     /*
     451             :      * Now, we can delete the catalog entry.
     452             :      */
     453         582 :     CatalogTupleDelete(rel, &tuple->t_self);
     454         582 :     ReleaseSysCache(tuple);
     455             : 
     456         582 :     CommandCounterIncrement();
     457             : 
     458             :     /* We keep the lock on pg_replication_origin until commit */
     459         582 :     table_close(rel, NoLock);
     460             : }
     461             : 
     462             : /*
     463             :  * Lookup replication origin via its oid and return the name.
     464             :  *
     465             :  * The external name is palloc'd in the calling context.
     466             :  *
     467             :  * Returns true if the origin is known, false otherwise.
     468             :  */
     469             : bool
     470          40 : replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
     471             : {
     472             :     HeapTuple   tuple;
     473             :     Form_pg_replication_origin ric;
     474             : 
     475             :     Assert(OidIsValid((Oid) roident));
     476             :     Assert(roident != InvalidRepOriginId);
     477             :     Assert(roident != DoNotReplicateId);
     478             : 
     479          40 :     tuple = SearchSysCache1(REPLORIGIDENT,
     480             :                             ObjectIdGetDatum((Oid) roident));
     481             : 
     482          40 :     if (HeapTupleIsValid(tuple))
     483             :     {
     484          34 :         ric = (Form_pg_replication_origin) GETSTRUCT(tuple);
     485          34 :         *roname = text_to_cstring(&ric->roname);
     486          34 :         ReleaseSysCache(tuple);
     487             : 
     488          34 :         return true;
     489             :     }
     490             :     else
     491             :     {
     492           6 :         *roname = NULL;
     493             : 
     494           6 :         if (!missing_ok)
     495           0 :             ereport(ERROR,
     496             :                     (errcode(ERRCODE_UNDEFINED_OBJECT),
     497             :                      errmsg("replication origin with ID %d does not exist",
     498             :                             roident)));
     499             : 
     500           6 :         return false;
     501             :     }
     502             : }
     503             : 
     504             : 
     505             : /* ---------------------------------------------------------------------------
     506             :  * Functions for handling replication progress.
     507             :  * ---------------------------------------------------------------------------
     508             :  */
     509             : 
     510             : Size
     511        8044 : ReplicationOriginShmemSize(void)
     512             : {
     513        8044 :     Size        size = 0;
     514             : 
     515        8044 :     if (max_active_replication_origins == 0)
     516           4 :         return size;
     517             : 
     518        8040 :     size = add_size(size, offsetof(ReplicationStateCtl, states));
     519             : 
     520        8040 :     size = add_size(size,
     521             :                     mul_size(max_active_replication_origins, sizeof(ReplicationState)));
     522        8040 :     return size;
     523             : }
     524             : 
     525             : void
     526        2084 : ReplicationOriginShmemInit(void)
     527             : {
     528             :     bool        found;
     529             : 
     530        2084 :     if (max_active_replication_origins == 0)
     531           2 :         return;
     532             : 
     533        2082 :     replication_states_ctl = (ReplicationStateCtl *)
     534        2082 :         ShmemInitStruct("ReplicationOriginState",
     535             :                         ReplicationOriginShmemSize(),
     536             :                         &found);
     537        2082 :     replication_states = replication_states_ctl->states;
     538             : 
     539        2082 :     if (!found)
     540             :     {
     541             :         int         i;
     542             : 
     543      149778 :         MemSet(replication_states_ctl, 0, ReplicationOriginShmemSize());
     544             : 
     545        2082 :         replication_states_ctl->tranche_id = LWTRANCHE_REPLICATION_ORIGIN_STATE;
     546             : 
     547       22884 :         for (i = 0; i < max_active_replication_origins; i++)
     548             :         {
     549       20802 :             LWLockInitialize(&replication_states[i].lock,
     550       20802 :                              replication_states_ctl->tranche_id);
     551       20802 :             ConditionVariableInit(&replication_states[i].origin_cv);
     552             :         }
     553             :     }
     554             : }
     555             : 
     556             : /* ---------------------------------------------------------------------------
     557             :  * Perform a checkpoint of each replication origin's progress with respect to
     558             :  * the replayed remote_lsn. Make sure that all transactions we refer to in the
     559             :  * checkpoint (local_lsn) are actually on-disk. This might not yet be the case
     560             :  * if the transactions were originally committed asynchronously.
     561             :  *
     562             :  * We store checkpoints in the following format:
     563             :  * +-------+------------------------+------------------+-----+--------+
     564             :  * | MAGIC | ReplicationStateOnDisk | struct Replic... | ... | CRC32C | EOF
     565             :  * +-------+------------------------+------------------+-----+--------+
     566             :  *
     567             :  * So its just the magic, followed by the statically sized
     568             :  * ReplicationStateOnDisk structs. Note that the maximum number of
     569             :  * ReplicationState is determined by max_active_replication_origins.
     570             :  * ---------------------------------------------------------------------------
     571             :  */
     572             : void
     573        2658 : CheckPointReplicationOrigin(void)
     574             : {
     575        2658 :     const char *tmppath = PG_REPLORIGIN_CHECKPOINT_TMPFILE;
     576        2658 :     const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
     577             :     int         tmpfd;
     578             :     int         i;
     579        2658 :     uint32      magic = REPLICATION_STATE_MAGIC;
     580             :     pg_crc32c   crc;
     581             : 
     582        2658 :     if (max_active_replication_origins == 0)
     583           2 :         return;
     584             : 
     585        2656 :     INIT_CRC32C(crc);
     586             : 
     587             :     /* make sure no old temp file is remaining */
     588        2656 :     if (unlink(tmppath) < 0 && errno != ENOENT)
     589           0 :         ereport(PANIC,
     590             :                 (errcode_for_file_access(),
     591             :                  errmsg("could not remove file \"%s\": %m",
     592             :                         tmppath)));
     593             : 
     594             :     /*
     595             :      * no other backend can perform this at the same time; only one checkpoint
     596             :      * can happen at a time.
     597             :      */
     598        2656 :     tmpfd = OpenTransientFile(tmppath,
     599             :                               O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
     600        2656 :     if (tmpfd < 0)
     601           0 :         ereport(PANIC,
     602             :                 (errcode_for_file_access(),
     603             :                  errmsg("could not create file \"%s\": %m",
     604             :                         tmppath)));
     605             : 
     606             :     /* write magic */
     607        2656 :     errno = 0;
     608        2656 :     if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
     609             :     {
     610             :         /* if write didn't set errno, assume problem is no disk space */
     611           0 :         if (errno == 0)
     612           0 :             errno = ENOSPC;
     613           0 :         ereport(PANIC,
     614             :                 (errcode_for_file_access(),
     615             :                  errmsg("could not write to file \"%s\": %m",
     616             :                         tmppath)));
     617             :     }
     618        2656 :     COMP_CRC32C(crc, &magic, sizeof(magic));
     619             : 
     620             :     /* prevent concurrent creations/drops */
     621        2656 :     LWLockAcquire(ReplicationOriginLock, LW_SHARED);
     622             : 
     623             :     /* write actual data */
     624       29216 :     for (i = 0; i < max_active_replication_origins; i++)
     625             :     {
     626             :         ReplicationStateOnDisk disk_state;
     627       26560 :         ReplicationState *curstate = &replication_states[i];
     628             :         XLogRecPtr  local_lsn;
     629             : 
     630       26560 :         if (curstate->roident == InvalidRepOriginId)
     631       26474 :             continue;
     632             : 
     633             :         /* zero, to avoid uninitialized padding bytes */
     634          86 :         memset(&disk_state, 0, sizeof(disk_state));
     635             : 
     636          86 :         LWLockAcquire(&curstate->lock, LW_SHARED);
     637             : 
     638          86 :         disk_state.roident = curstate->roident;
     639             : 
     640          86 :         disk_state.remote_lsn = curstate->remote_lsn;
     641          86 :         local_lsn = curstate->local_lsn;
     642             : 
     643          86 :         LWLockRelease(&curstate->lock);
     644             : 
     645             :         /* make sure we only write out a commit that's persistent */
     646          86 :         XLogFlush(local_lsn);
     647             : 
     648          86 :         errno = 0;
     649          86 :         if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
     650             :             sizeof(disk_state))
     651             :         {
     652             :             /* if write didn't set errno, assume problem is no disk space */
     653           0 :             if (errno == 0)
     654           0 :                 errno = ENOSPC;
     655           0 :             ereport(PANIC,
     656             :                     (errcode_for_file_access(),
     657             :                      errmsg("could not write to file \"%s\": %m",
     658             :                             tmppath)));
     659             :         }
     660             : 
     661          86 :         COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
     662             :     }
     663             : 
     664        2656 :     LWLockRelease(ReplicationOriginLock);
     665             : 
     666             :     /* write out the CRC */
     667        2656 :     FIN_CRC32C(crc);
     668        2656 :     errno = 0;
     669        2656 :     if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
     670             :     {
     671             :         /* if write didn't set errno, assume problem is no disk space */
     672           0 :         if (errno == 0)
     673           0 :             errno = ENOSPC;
     674           0 :         ereport(PANIC,
     675             :                 (errcode_for_file_access(),
     676             :                  errmsg("could not write to file \"%s\": %m",
     677             :                         tmppath)));
     678             :     }
     679             : 
     680        2656 :     if (CloseTransientFile(tmpfd) != 0)
     681           0 :         ereport(PANIC,
     682             :                 (errcode_for_file_access(),
     683             :                  errmsg("could not close file \"%s\": %m",
     684             :                         tmppath)));
     685             : 
     686             :     /* fsync, rename to permanent file, fsync file and directory */
     687        2656 :     durable_rename(tmppath, path, PANIC);
     688             : }
     689             : 
     690             : /*
     691             :  * Recover replication replay status from checkpoint data saved earlier by
     692             :  * CheckPointReplicationOrigin.
     693             :  *
     694             :  * This only needs to be called at startup and *not* during every checkpoint
     695             :  * read during recovery (e.g. in HS or PITR from a base backup) afterwards. All
     696             :  * state thereafter can be recovered by looking at commit records.
     697             :  */
     698             : void
     699        1800 : StartupReplicationOrigin(void)
     700             : {
     701        1800 :     const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
     702             :     int         fd;
     703             :     int         readBytes;
     704        1800 :     uint32      magic = REPLICATION_STATE_MAGIC;
     705        1800 :     int         last_state = 0;
     706             :     pg_crc32c   file_crc;
     707             :     pg_crc32c   crc;
     708             : 
     709             :     /* don't want to overwrite already existing state */
     710             : #ifdef USE_ASSERT_CHECKING
     711             :     static bool already_started = false;
     712             : 
     713             :     Assert(!already_started);
     714             :     already_started = true;
     715             : #endif
     716             : 
     717        1800 :     if (max_active_replication_origins == 0)
     718          98 :         return;
     719             : 
     720        1798 :     INIT_CRC32C(crc);
     721             : 
     722        1798 :     elog(DEBUG2, "starting up replication origin progress state");
     723             : 
     724        1798 :     fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
     725             : 
     726             :     /*
     727             :      * might have had max_active_replication_origins == 0 last run, or we just
     728             :      * brought up a standby.
     729             :      */
     730        1798 :     if (fd < 0 && errno == ENOENT)
     731          96 :         return;
     732        1702 :     else if (fd < 0)
     733           0 :         ereport(PANIC,
     734             :                 (errcode_for_file_access(),
     735             :                  errmsg("could not open file \"%s\": %m",
     736             :                         path)));
     737             : 
     738             :     /* verify magic, that is written even if nothing was active */
     739        1702 :     readBytes = read(fd, &magic, sizeof(magic));
     740        1702 :     if (readBytes != sizeof(magic))
     741             :     {
     742           0 :         if (readBytes < 0)
     743           0 :             ereport(PANIC,
     744             :                     (errcode_for_file_access(),
     745             :                      errmsg("could not read file \"%s\": %m",
     746             :                             path)));
     747             :         else
     748           0 :             ereport(PANIC,
     749             :                     (errcode(ERRCODE_DATA_CORRUPTED),
     750             :                      errmsg("could not read file \"%s\": read %d of %zu",
     751             :                             path, readBytes, sizeof(magic))));
     752             :     }
     753        1702 :     COMP_CRC32C(crc, &magic, sizeof(magic));
     754             : 
     755        1702 :     if (magic != REPLICATION_STATE_MAGIC)
     756           0 :         ereport(PANIC,
     757             :                 (errmsg("replication checkpoint has wrong magic %u instead of %u",
     758             :                         magic, REPLICATION_STATE_MAGIC)));
     759             : 
     760             :     /* we can skip locking here, no other access is possible */
     761             : 
     762             :     /* recover individual states, until there are no more to be found */
     763             :     while (true)
     764          38 :     {
     765             :         ReplicationStateOnDisk disk_state;
     766             : 
     767        1740 :         readBytes = read(fd, &disk_state, sizeof(disk_state));
     768             : 
     769             :         /* no further data */
     770        1740 :         if (readBytes == sizeof(crc))
     771             :         {
     772             :             /* not pretty, but simple ... */
     773        1702 :             file_crc = *(pg_crc32c *) &disk_state;
     774        1702 :             break;
     775             :         }
     776             : 
     777          38 :         if (readBytes < 0)
     778             :         {
     779           0 :             ereport(PANIC,
     780             :                     (errcode_for_file_access(),
     781             :                      errmsg("could not read file \"%s\": %m",
     782             :                             path)));
     783             :         }
     784             : 
     785          38 :         if (readBytes != sizeof(disk_state))
     786             :         {
     787           0 :             ereport(PANIC,
     788             :                     (errcode_for_file_access(),
     789             :                      errmsg("could not read file \"%s\": read %d of %zu",
     790             :                             path, readBytes, sizeof(disk_state))));
     791             :         }
     792             : 
     793          38 :         COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
     794             : 
     795          38 :         if (last_state == max_active_replication_origins)
     796           0 :             ereport(PANIC,
     797             :                     (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     798             :                      errmsg("could not find free replication state, increase \"max_active_replication_origins\"")));
     799             : 
     800             :         /* copy data to shared memory */
     801          38 :         replication_states[last_state].roident = disk_state.roident;
     802          38 :         replication_states[last_state].remote_lsn = disk_state.remote_lsn;
     803          38 :         last_state++;
     804             : 
     805          38 :         ereport(LOG,
     806             :                 (errmsg("recovered replication state of node %d to %X/%X",
     807             :                         disk_state.roident,
     808             :                         LSN_FORMAT_ARGS(disk_state.remote_lsn))));
     809             :     }
     810             : 
     811             :     /* now check checksum */
     812        1702 :     FIN_CRC32C(crc);
     813        1702 :     if (file_crc != crc)
     814           0 :         ereport(PANIC,
     815             :                 (errcode(ERRCODE_DATA_CORRUPTED),
     816             :                  errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
     817             :                         crc, file_crc)));
     818             : 
     819        1702 :     if (CloseTransientFile(fd) != 0)
     820           0 :         ereport(PANIC,
     821             :                 (errcode_for_file_access(),
     822             :                  errmsg("could not close file \"%s\": %m",
     823             :                         path)));
     824             : }
     825             : 
     826             : void
     827           8 : replorigin_redo(XLogReaderState *record)
     828             : {
     829           8 :     uint8       info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
     830             : 
     831           8 :     switch (info)
     832             :     {
     833           4 :         case XLOG_REPLORIGIN_SET:
     834             :             {
     835           4 :                 xl_replorigin_set *xlrec =
     836           4 :                     (xl_replorigin_set *) XLogRecGetData(record);
     837             : 
     838           4 :                 replorigin_advance(xlrec->node_id,
     839             :                                    xlrec->remote_lsn, record->EndRecPtr,
     840           4 :                                    xlrec->force /* backward */ ,
     841             :                                    false /* WAL log */ );
     842           4 :                 break;
     843             :             }
     844           4 :         case XLOG_REPLORIGIN_DROP:
     845             :             {
     846             :                 xl_replorigin_drop *xlrec;
     847             :                 int         i;
     848             : 
     849           4 :                 xlrec = (xl_replorigin_drop *) XLogRecGetData(record);
     850             : 
     851           6 :                 for (i = 0; i < max_active_replication_origins; i++)
     852             :                 {
     853           6 :                     ReplicationState *state = &replication_states[i];
     854             : 
     855             :                     /* found our slot */
     856           6 :                     if (state->roident == xlrec->node_id)
     857             :                     {
     858             :                         /* reset entry */
     859           4 :                         state->roident = InvalidRepOriginId;
     860           4 :                         state->remote_lsn = InvalidXLogRecPtr;
     861           4 :                         state->local_lsn = InvalidXLogRecPtr;
     862           4 :                         break;
     863             :                     }
     864             :                 }
     865           4 :                 break;
     866             :             }
     867           0 :         default:
     868           0 :             elog(PANIC, "replorigin_redo: unknown op code %u", info);
     869             :     }
     870           8 : }
     871             : 
     872             : 
     873             : /*
     874             :  * Tell the replication origin progress machinery that a commit from 'node'
     875             :  * that originated at the LSN remote_commit on the remote node was replayed
     876             :  * successfully and that we don't need to do so again. In combination with
     877             :  * setting up replorigin_session_origin_lsn and replorigin_session_origin
     878             :  * that ensures we won't lose knowledge about that after a crash if the
     879             :  * transaction had a persistent effect (think of asynchronous commits).
     880             :  *
     881             :  * local_commit needs to be a local LSN of the commit so that we can make sure
     882             :  * upon a checkpoint that enough WAL has been persisted to disk.
     883             :  *
     884             :  * Needs to be called with a RowExclusiveLock on pg_replication_origin,
     885             :  * unless running in recovery.
     886             :  */
     887             : void
     888         468 : replorigin_advance(RepOriginId node,
     889             :                    XLogRecPtr remote_commit, XLogRecPtr local_commit,
     890             :                    bool go_backward, bool wal_log)
     891             : {
     892             :     int         i;
     893         468 :     ReplicationState *replication_state = NULL;
     894         468 :     ReplicationState *free_state = NULL;
     895             : 
     896             :     Assert(node != InvalidRepOriginId);
     897             : 
     898             :     /* we don't track DoNotReplicateId */
     899         468 :     if (node == DoNotReplicateId)
     900           0 :         return;
     901             : 
     902             :     /*
     903             :      * XXX: For the case where this is called by WAL replay, it'd be more
     904             :      * efficient to restore into a backend local hashtable and only dump into
     905             :      * shmem after recovery is finished. Let's wait with implementing that
     906             :      * till it's shown to be a measurable expense
     907             :      */
     908             : 
     909             :     /* Lock exclusively, as we may have to create a new table entry. */
     910         468 :     LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
     911             : 
     912             :     /*
     913             :      * Search for either an existing slot for the origin, or a free one we can
     914             :      * use.
     915             :      */
     916        4306 :     for (i = 0; i < max_active_replication_origins; i++)
     917             :     {
     918        3924 :         ReplicationState *curstate = &replication_states[i];
     919             : 
     920             :         /* remember where to insert if necessary */
     921        3924 :         if (curstate->roident == InvalidRepOriginId &&
     922             :             free_state == NULL)
     923             :         {
     924         382 :             free_state = curstate;
     925         382 :             continue;
     926             :         }
     927             : 
     928             :         /* not our slot */
     929        3542 :         if (curstate->roident != node)
     930             :         {
     931        3456 :             continue;
     932             :         }
     933             : 
     934             :         /* ok, found slot */
     935          86 :         replication_state = curstate;
     936             : 
     937          86 :         LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE);
     938             : 
     939             :         /* Make sure it's not used by somebody else */
     940          86 :         if (replication_state->acquired_by != 0)
     941             :         {
     942           0 :             ereport(ERROR,
     943             :                     (errcode(ERRCODE_OBJECT_IN_USE),
     944             :                      errmsg("replication origin with ID %d is already active for PID %d",
     945             :                             replication_state->roident,
     946             :                             replication_state->acquired_by)));
     947             :         }
     948             : 
     949          86 :         break;
     950             :     }
     951             : 
     952         468 :     if (replication_state == NULL && free_state == NULL)
     953           0 :         ereport(ERROR,
     954             :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     955             :                  errmsg("could not find free replication state slot for replication origin with ID %d",
     956             :                         node),
     957             :                  errhint("Increase \"max_active_replication_origins\" and try again.")));
     958             : 
     959         468 :     if (replication_state == NULL)
     960             :     {
     961             :         /* initialize new slot */
     962         382 :         LWLockAcquire(&free_state->lock, LW_EXCLUSIVE);
     963         382 :         replication_state = free_state;
     964             :         Assert(replication_state->remote_lsn == InvalidXLogRecPtr);
     965             :         Assert(replication_state->local_lsn == InvalidXLogRecPtr);
     966         382 :         replication_state->roident = node;
     967             :     }
     968             : 
     969             :     Assert(replication_state->roident != InvalidRepOriginId);
     970             : 
     971             :     /*
     972             :      * If somebody "forcefully" sets this slot, WAL log it, so it's durable
     973             :      * and the standby gets the message. Primarily this will be called during
     974             :      * WAL replay (of commit records) where no WAL logging is necessary.
     975             :      */
     976         468 :     if (wal_log)
     977             :     {
     978             :         xl_replorigin_set xlrec;
     979             : 
     980         386 :         xlrec.remote_lsn = remote_commit;
     981         386 :         xlrec.node_id = node;
     982         386 :         xlrec.force = go_backward;
     983             : 
     984         386 :         XLogBeginInsert();
     985         386 :         XLogRegisterData(&xlrec, sizeof(xlrec));
     986             : 
     987         386 :         XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_SET);
     988             :     }
     989             : 
     990             :     /*
     991             :      * Due to - harmless - race conditions during a checkpoint we could see
     992             :      * values here that are older than the ones we already have in memory. We
     993             :      * could also see older values for prepared transactions when the prepare
     994             :      * is sent at a later point of time along with commit prepared and there
     995             :      * are other transactions commits between prepare and commit prepared. See
     996             :      * ReorderBufferFinishPrepared. Don't overwrite those.
     997             :      */
     998         468 :     if (go_backward || replication_state->remote_lsn < remote_commit)
     999         454 :         replication_state->remote_lsn = remote_commit;
    1000         468 :     if (local_commit != InvalidXLogRecPtr &&
    1001          76 :         (go_backward || replication_state->local_lsn < local_commit))
    1002          80 :         replication_state->local_lsn = local_commit;
    1003         468 :     LWLockRelease(&replication_state->lock);
    1004             : 
    1005             :     /*
    1006             :      * Release *after* changing the LSNs, slot isn't acquired and thus could
    1007             :      * otherwise be dropped anytime.
    1008             :      */
    1009         468 :     LWLockRelease(ReplicationOriginLock);
    1010             : }
    1011             : 
    1012             : 
    1013             : XLogRecPtr
    1014          16 : replorigin_get_progress(RepOriginId node, bool flush)
    1015             : {
    1016             :     int         i;
    1017          16 :     XLogRecPtr  local_lsn = InvalidXLogRecPtr;
    1018          16 :     XLogRecPtr  remote_lsn = InvalidXLogRecPtr;
    1019             : 
    1020             :     /* prevent slots from being concurrently dropped */
    1021          16 :     LWLockAcquire(ReplicationOriginLock, LW_SHARED);
    1022             : 
    1023          76 :     for (i = 0; i < max_active_replication_origins; i++)
    1024             :     {
    1025             :         ReplicationState *state;
    1026             : 
    1027          70 :         state = &replication_states[i];
    1028             : 
    1029          70 :         if (state->roident == node)
    1030             :         {
    1031          10 :             LWLockAcquire(&state->lock, LW_SHARED);
    1032             : 
    1033          10 :             remote_lsn = state->remote_lsn;
    1034          10 :             local_lsn = state->local_lsn;
    1035             : 
    1036          10 :             LWLockRelease(&state->lock);
    1037             : 
    1038          10 :             break;
    1039             :         }
    1040             :     }
    1041             : 
    1042          16 :     LWLockRelease(ReplicationOriginLock);
    1043             : 
    1044          16 :     if (flush && local_lsn != InvalidXLogRecPtr)
    1045           2 :         XLogFlush(local_lsn);
    1046             : 
    1047          16 :     return remote_lsn;
    1048             : }
    1049             : 
    1050             : /*
    1051             :  * Tear down a (possibly) configured session replication origin during process
    1052             :  * exit.
    1053             :  */
    1054             : static void
    1055         836 : ReplicationOriginExitCleanup(int code, Datum arg)
    1056             : {
    1057         836 :     ConditionVariable *cv = NULL;
    1058             : 
    1059         836 :     if (session_replication_state == NULL)
    1060         366 :         return;
    1061             : 
    1062         470 :     LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
    1063             : 
    1064         470 :     if (session_replication_state->acquired_by == MyProcPid)
    1065             :     {
    1066         450 :         cv = &session_replication_state->origin_cv;
    1067             : 
    1068         450 :         session_replication_state->acquired_by = 0;
    1069         450 :         session_replication_state = NULL;
    1070             :     }
    1071             : 
    1072         470 :     LWLockRelease(ReplicationOriginLock);
    1073             : 
    1074         470 :     if (cv)
    1075         450 :         ConditionVariableBroadcast(cv);
    1076             : }
    1077             : 
    1078             : /*
    1079             :  * Setup a replication origin in the shared memory struct if it doesn't
    1080             :  * already exist and cache access to the specific ReplicationSlot so the
    1081             :  * array doesn't have to be searched when calling
    1082             :  * replorigin_session_advance().
    1083             :  *
    1084             :  * Normally only one such cached origin can exist per process so the cached
    1085             :  * value can only be set again after the previous value is torn down with
    1086             :  * replorigin_session_reset(). For this normal case pass acquired_by = 0
    1087             :  * (meaning the slot is not allowed to be already acquired by another process).
    1088             :  *
    1089             :  * However, sometimes multiple processes can safely re-use the same origin slot
    1090             :  * (for example, multiple parallel apply processes can safely use the same
    1091             :  * origin, provided they maintain commit order by allowing only one process to
    1092             :  * commit at a time). For this case the first process must pass acquired_by =
    1093             :  * 0, and then the other processes sharing that same origin can pass
    1094             :  * acquired_by = PID of the first process.
    1095             :  */
    1096             : void
    1097         842 : replorigin_session_setup(RepOriginId node, int acquired_by)
    1098             : {
    1099             :     static bool registered_cleanup;
    1100             :     int         i;
    1101         842 :     int         free_slot = -1;
    1102             : 
    1103         842 :     if (!registered_cleanup)
    1104             :     {
    1105         836 :         on_shmem_exit(ReplicationOriginExitCleanup, 0);
    1106         836 :         registered_cleanup = true;
    1107             :     }
    1108             : 
    1109             :     Assert(max_active_replication_origins > 0);
    1110             : 
    1111         842 :     if (session_replication_state != NULL)
    1112           2 :         ereport(ERROR,
    1113             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1114             :                  errmsg("cannot setup replication origin when one is already setup")));
    1115             : 
    1116             :     /* Lock exclusively, as we may have to create a new table entry. */
    1117         840 :     LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
    1118             : 
    1119             :     /*
    1120             :      * Search for either an existing slot for the origin, or a free one we can
    1121             :      * use.
    1122             :      */
    1123        3556 :     for (i = 0; i < max_active_replication_origins; i++)
    1124             :     {
    1125        3332 :         ReplicationState *curstate = &replication_states[i];
    1126             : 
    1127             :         /* remember where to insert if necessary */
    1128        3332 :         if (curstate->roident == InvalidRepOriginId &&
    1129             :             free_slot == -1)
    1130             :         {
    1131         228 :             free_slot = i;
    1132         228 :             continue;
    1133             :         }
    1134             : 
    1135             :         /* not our slot */
    1136        3104 :         if (curstate->roident != node)
    1137        2488 :             continue;
    1138             : 
    1139         616 :         else if (curstate->acquired_by != 0 && acquired_by == 0)
    1140             :         {
    1141           0 :             ereport(ERROR,
    1142             :                     (errcode(ERRCODE_OBJECT_IN_USE),
    1143             :                      errmsg("replication origin with ID %d is already active for PID %d",
    1144             :                             curstate->roident, curstate->acquired_by)));
    1145             :         }
    1146             : 
    1147             :         /* ok, found slot */
    1148         616 :         session_replication_state = curstate;
    1149         616 :         break;
    1150             :     }
    1151             : 
    1152             : 
    1153         840 :     if (session_replication_state == NULL && free_slot == -1)
    1154           0 :         ereport(ERROR,
    1155             :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
    1156             :                  errmsg("could not find free replication state slot for replication origin with ID %d",
    1157             :                         node),
    1158             :                  errhint("Increase \"max_active_replication_origins\" and try again.")));
    1159         840 :     else if (session_replication_state == NULL)
    1160             :     {
    1161             :         /* initialize new slot */
    1162         224 :         session_replication_state = &replication_states[free_slot];
    1163             :         Assert(session_replication_state->remote_lsn == InvalidXLogRecPtr);
    1164             :         Assert(session_replication_state->local_lsn == InvalidXLogRecPtr);
    1165         224 :         session_replication_state->roident = node;
    1166             :     }
    1167             : 
    1168             : 
    1169             :     Assert(session_replication_state->roident != InvalidRepOriginId);
    1170             : 
    1171         840 :     if (acquired_by == 0)
    1172         820 :         session_replication_state->acquired_by = MyProcPid;
    1173          20 :     else if (session_replication_state->acquired_by != acquired_by)
    1174           0 :         elog(ERROR, "could not find replication state slot for replication origin with OID %u which was acquired by %d",
    1175             :              node, acquired_by);
    1176             : 
    1177         840 :     LWLockRelease(ReplicationOriginLock);
    1178             : 
    1179             :     /* probably this one is pointless */
    1180         840 :     ConditionVariableBroadcast(&session_replication_state->origin_cv);
    1181         840 : }
    1182             : 
    1183             : /*
    1184             :  * Reset replay state previously setup in this session.
    1185             :  *
    1186             :  * This function may only be called if an origin was setup with
    1187             :  * replorigin_session_setup().
    1188             :  */
    1189             : void
    1190         372 : replorigin_session_reset(void)
    1191             : {
    1192             :     ConditionVariable *cv;
    1193             : 
    1194             :     Assert(max_active_replication_origins != 0);
    1195             : 
    1196         372 :     if (session_replication_state == NULL)
    1197           2 :         ereport(ERROR,
    1198             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1199             :                  errmsg("no replication origin is configured")));
    1200             : 
    1201         370 :     LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
    1202             : 
    1203         370 :     session_replication_state->acquired_by = 0;
    1204         370 :     cv = &session_replication_state->origin_cv;
    1205         370 :     session_replication_state = NULL;
    1206             : 
    1207         370 :     LWLockRelease(ReplicationOriginLock);
    1208             : 
    1209         370 :     ConditionVariableBroadcast(cv);
    1210         370 : }
    1211             : 
    1212             : /*
    1213             :  * Do the same work replorigin_advance() does, just on the session's
    1214             :  * configured origin.
    1215             :  *
    1216             :  * This is noticeably cheaper than using replorigin_advance().
    1217             :  */
    1218             : void
    1219        2148 : replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit)
    1220             : {
    1221             :     Assert(session_replication_state != NULL);
    1222             :     Assert(session_replication_state->roident != InvalidRepOriginId);
    1223             : 
    1224        2148 :     LWLockAcquire(&session_replication_state->lock, LW_EXCLUSIVE);
    1225        2148 :     if (session_replication_state->local_lsn < local_commit)
    1226        2148 :         session_replication_state->local_lsn = local_commit;
    1227        2148 :     if (session_replication_state->remote_lsn < remote_commit)
    1228        1014 :         session_replication_state->remote_lsn = remote_commit;
    1229        2148 :     LWLockRelease(&session_replication_state->lock);
    1230        2148 : }
    1231             : 
    1232             : /*
    1233             :  * Ask the machinery about the point up to which we successfully replayed
    1234             :  * changes from an already setup replication origin.
    1235             :  */
    1236             : XLogRecPtr
    1237         434 : replorigin_session_get_progress(bool flush)
    1238             : {
    1239             :     XLogRecPtr  remote_lsn;
    1240             :     XLogRecPtr  local_lsn;
    1241             : 
    1242             :     Assert(session_replication_state != NULL);
    1243             : 
    1244         434 :     LWLockAcquire(&session_replication_state->lock, LW_SHARED);
    1245         434 :     remote_lsn = session_replication_state->remote_lsn;
    1246         434 :     local_lsn = session_replication_state->local_lsn;
    1247         434 :     LWLockRelease(&session_replication_state->lock);
    1248             : 
    1249         434 :     if (flush && local_lsn != InvalidXLogRecPtr)
    1250           2 :         XLogFlush(local_lsn);
    1251             : 
    1252         434 :     return remote_lsn;
    1253             : }
    1254             : 
    1255             : 
    1256             : 
    1257             : /* ---------------------------------------------------------------------------
    1258             :  * SQL functions for working with replication origin.
    1259             :  *
    1260             :  * These mostly should be fairly short wrappers around more generic functions.
    1261             :  * ---------------------------------------------------------------------------
    1262             :  */
    1263             : 
    1264             : /*
    1265             :  * Create replication origin for the passed in name, and return the assigned
    1266             :  * oid.
    1267             :  */
    1268             : Datum
    1269          18 : pg_replication_origin_create(PG_FUNCTION_ARGS)
    1270             : {
    1271             :     char       *name;
    1272             :     RepOriginId roident;
    1273             : 
    1274          18 :     replorigin_check_prerequisites(false, false);
    1275             : 
    1276          18 :     name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
    1277             : 
    1278             :     /*
    1279             :      * Replication origins "any and "none" are reserved for system options.
    1280             :      * The origins "pg_xxx" are reserved for internal use.
    1281             :      */
    1282          18 :     if (IsReservedName(name) || IsReservedOriginName(name))
    1283           6 :         ereport(ERROR,
    1284             :                 (errcode(ERRCODE_RESERVED_NAME),
    1285             :                  errmsg("replication origin name \"%s\" is reserved",
    1286             :                         name),
    1287             :                  errdetail("Origin names \"%s\", \"%s\", and names starting with \"pg_\" are reserved.",
    1288             :                            LOGICALREP_ORIGIN_ANY, LOGICALREP_ORIGIN_NONE)));
    1289             : 
    1290             :     /*
    1291             :      * If built with appropriate switch, whine when regression-testing
    1292             :      * conventions for replication origin names are violated.
    1293             :      */
    1294             : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
    1295             :     if (strncmp(name, "regress_", 8) != 0)
    1296             :         elog(WARNING, "replication origins created by regression test cases should have names starting with \"regress_\"");
    1297             : #endif
    1298             : 
    1299          12 :     roident = replorigin_create(name);
    1300             : 
    1301          10 :     pfree(name);
    1302             : 
    1303          10 :     PG_RETURN_OID(roident);
    1304             : }
    1305             : 
    1306             : /*
    1307             :  * Drop replication origin.
    1308             :  */
    1309             : Datum
    1310          14 : pg_replication_origin_drop(PG_FUNCTION_ARGS)
    1311             : {
    1312             :     char       *name;
    1313             : 
    1314          14 :     replorigin_check_prerequisites(false, false);
    1315             : 
    1316          14 :     name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
    1317             : 
    1318          14 :     replorigin_drop_by_name(name, false, true);
    1319             : 
    1320          12 :     pfree(name);
    1321             : 
    1322          12 :     PG_RETURN_VOID();
    1323             : }
    1324             : 
    1325             : /*
    1326             :  * Return oid of a replication origin.
    1327             :  */
    1328             : Datum
    1329           0 : pg_replication_origin_oid(PG_FUNCTION_ARGS)
    1330             : {
    1331             :     char       *name;
    1332             :     RepOriginId roident;
    1333             : 
    1334           0 :     replorigin_check_prerequisites(false, false);
    1335             : 
    1336           0 :     name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
    1337           0 :     roident = replorigin_by_name(name, true);
    1338             : 
    1339           0 :     pfree(name);
    1340             : 
    1341           0 :     if (OidIsValid(roident))
    1342           0 :         PG_RETURN_OID(roident);
    1343           0 :     PG_RETURN_NULL();
    1344             : }
    1345             : 
    1346             : /*
    1347             :  * Setup a replication origin for this session.
    1348             :  */
    1349             : Datum
    1350          12 : pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
    1351             : {
    1352             :     char       *name;
    1353             :     RepOriginId origin;
    1354             : 
    1355          12 :     replorigin_check_prerequisites(true, false);
    1356             : 
    1357          12 :     name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
    1358          12 :     origin = replorigin_by_name(name, false);
    1359          10 :     replorigin_session_setup(origin, 0);
    1360             : 
    1361           8 :     replorigin_session_origin = origin;
    1362             : 
    1363           8 :     pfree(name);
    1364             : 
    1365           8 :     PG_RETURN_VOID();
    1366             : }
    1367             : 
    1368             : /*
    1369             :  * Reset previously setup origin in this session
    1370             :  */
    1371             : Datum
    1372          10 : pg_replication_origin_session_reset(PG_FUNCTION_ARGS)
    1373             : {
    1374          10 :     replorigin_check_prerequisites(true, false);
    1375             : 
    1376          10 :     replorigin_session_reset();
    1377             : 
    1378           8 :     replorigin_session_origin = InvalidRepOriginId;
    1379           8 :     replorigin_session_origin_lsn = InvalidXLogRecPtr;
    1380           8 :     replorigin_session_origin_timestamp = 0;
    1381             : 
    1382           8 :     PG_RETURN_VOID();
    1383             : }
    1384             : 
    1385             : /*
    1386             :  * Has a replication origin been setup for this session.
    1387             :  */
    1388             : Datum
    1389           0 : pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)
    1390             : {
    1391           0 :     replorigin_check_prerequisites(false, false);
    1392             : 
    1393           0 :     PG_RETURN_BOOL(replorigin_session_origin != InvalidRepOriginId);
    1394             : }
    1395             : 
    1396             : 
    1397             : /*
    1398             :  * Return the replication progress for origin setup in the current session.
    1399             :  *
    1400             :  * If 'flush' is set to true it is ensured that the returned value corresponds
    1401             :  * to a local transaction that has been flushed. This is useful if asynchronous
    1402             :  * commits are used when replaying replicated transactions.
    1403             :  */
    1404             : Datum
    1405           4 : pg_replication_origin_session_progress(PG_FUNCTION_ARGS)
    1406             : {
    1407           4 :     XLogRecPtr  remote_lsn = InvalidXLogRecPtr;
    1408           4 :     bool        flush = PG_GETARG_BOOL(0);
    1409             : 
    1410           4 :     replorigin_check_prerequisites(true, false);
    1411             : 
    1412           4 :     if (session_replication_state == NULL)
    1413           0 :         ereport(ERROR,
    1414             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1415             :                  errmsg("no replication origin is configured")));
    1416             : 
    1417           4 :     remote_lsn = replorigin_session_get_progress(flush);
    1418             : 
    1419           4 :     if (remote_lsn == InvalidXLogRecPtr)
    1420           0 :         PG_RETURN_NULL();
    1421             : 
    1422           4 :     PG_RETURN_LSN(remote_lsn);
    1423             : }
    1424             : 
    1425             : Datum
    1426           2 : pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)
    1427             : {
    1428           2 :     XLogRecPtr  location = PG_GETARG_LSN(0);
    1429             : 
    1430           2 :     replorigin_check_prerequisites(true, false);
    1431             : 
    1432           2 :     if (session_replication_state == NULL)
    1433           0 :         ereport(ERROR,
    1434             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1435             :                  errmsg("no replication origin is configured")));
    1436             : 
    1437           2 :     replorigin_session_origin_lsn = location;
    1438           2 :     replorigin_session_origin_timestamp = PG_GETARG_TIMESTAMPTZ(1);
    1439             : 
    1440           2 :     PG_RETURN_VOID();
    1441             : }
    1442             : 
    1443             : Datum
    1444           0 : pg_replication_origin_xact_reset(PG_FUNCTION_ARGS)
    1445             : {
    1446           0 :     replorigin_check_prerequisites(true, false);
    1447             : 
    1448           0 :     replorigin_session_origin_lsn = InvalidXLogRecPtr;
    1449           0 :     replorigin_session_origin_timestamp = 0;
    1450             : 
    1451           0 :     PG_RETURN_VOID();
    1452             : }
    1453             : 
    1454             : 
    1455             : Datum
    1456           6 : pg_replication_origin_advance(PG_FUNCTION_ARGS)
    1457             : {
    1458           6 :     text       *name = PG_GETARG_TEXT_PP(0);
    1459           6 :     XLogRecPtr  remote_commit = PG_GETARG_LSN(1);
    1460             :     RepOriginId node;
    1461             : 
    1462           6 :     replorigin_check_prerequisites(true, false);
    1463             : 
    1464             :     /* lock to prevent the replication origin from vanishing */
    1465           6 :     LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
    1466             : 
    1467           6 :     node = replorigin_by_name(text_to_cstring(name), false);
    1468             : 
    1469             :     /*
    1470             :      * Can't sensibly pass a local commit to be flushed at checkpoint - this
    1471             :      * xact hasn't committed yet. This is why this function should be used to
    1472             :      * set up the initial replication state, but not for replay.
    1473             :      */
    1474           4 :     replorigin_advance(node, remote_commit, InvalidXLogRecPtr,
    1475             :                        true /* go backward */ , true /* WAL log */ );
    1476             : 
    1477           4 :     UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
    1478             : 
    1479           4 :     PG_RETURN_VOID();
    1480             : }
    1481             : 
    1482             : 
    1483             : /*
    1484             :  * Return the replication progress for an individual replication origin.
    1485             :  *
    1486             :  * If 'flush' is set to true it is ensured that the returned value corresponds
    1487             :  * to a local transaction that has been flushed. This is useful if asynchronous
    1488             :  * commits are used when replaying replicated transactions.
    1489             :  */
    1490             : Datum
    1491           6 : pg_replication_origin_progress(PG_FUNCTION_ARGS)
    1492             : {
    1493             :     char       *name;
    1494             :     bool        flush;
    1495             :     RepOriginId roident;
    1496           6 :     XLogRecPtr  remote_lsn = InvalidXLogRecPtr;
    1497             : 
    1498           6 :     replorigin_check_prerequisites(true, true);
    1499             : 
    1500           6 :     name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
    1501           6 :     flush = PG_GETARG_BOOL(1);
    1502             : 
    1503           6 :     roident = replorigin_by_name(name, false);
    1504             :     Assert(OidIsValid(roident));
    1505             : 
    1506           4 :     remote_lsn = replorigin_get_progress(roident, flush);
    1507             : 
    1508           4 :     if (remote_lsn == InvalidXLogRecPtr)
    1509           0 :         PG_RETURN_NULL();
    1510             : 
    1511           4 :     PG_RETURN_LSN(remote_lsn);
    1512             : }
    1513             : 
    1514             : 
    1515             : Datum
    1516          12 : pg_show_replication_origin_status(PG_FUNCTION_ARGS)
    1517             : {
    1518          12 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
    1519             :     int         i;
    1520             : #define REPLICATION_ORIGIN_PROGRESS_COLS 4
    1521             : 
    1522             :     /* we want to return 0 rows if slot is set to zero */
    1523          12 :     replorigin_check_prerequisites(false, true);
    1524             : 
    1525          12 :     InitMaterializedSRF(fcinfo, 0);
    1526             : 
    1527             :     /* prevent slots from being concurrently dropped */
    1528          12 :     LWLockAcquire(ReplicationOriginLock, LW_SHARED);
    1529             : 
    1530             :     /*
    1531             :      * Iterate through all possible replication_states, display if they are
    1532             :      * filled. Note that we do not take any locks, so slightly corrupted/out
    1533             :      * of date values are a possibility.
    1534             :      */
    1535         132 :     for (i = 0; i < max_active_replication_origins; i++)
    1536             :     {
    1537             :         ReplicationState *state;
    1538             :         Datum       values[REPLICATION_ORIGIN_PROGRESS_COLS];
    1539             :         bool        nulls[REPLICATION_ORIGIN_PROGRESS_COLS];
    1540             :         char       *roname;
    1541             : 
    1542         120 :         state = &replication_states[i];
    1543             : 
    1544             :         /* unused slot, nothing to display */
    1545         120 :         if (state->roident == InvalidRepOriginId)
    1546         110 :             continue;
    1547             : 
    1548          10 :         memset(values, 0, sizeof(values));
    1549          10 :         memset(nulls, 1, sizeof(nulls));
    1550             : 
    1551          10 :         values[0] = ObjectIdGetDatum(state->roident);
    1552          10 :         nulls[0] = false;
    1553             : 
    1554             :         /*
    1555             :          * We're not preventing the origin to be dropped concurrently, so
    1556             :          * silently accept that it might be gone.
    1557             :          */
    1558          10 :         if (replorigin_by_oid(state->roident, true,
    1559             :                               &roname))
    1560             :         {
    1561          10 :             values[1] = CStringGetTextDatum(roname);
    1562          10 :             nulls[1] = false;
    1563             :         }
    1564             : 
    1565          10 :         LWLockAcquire(&state->lock, LW_SHARED);
    1566             : 
    1567          10 :         values[2] = LSNGetDatum(state->remote_lsn);
    1568          10 :         nulls[2] = false;
    1569             : 
    1570          10 :         values[3] = LSNGetDatum(state->local_lsn);
    1571          10 :         nulls[3] = false;
    1572             : 
    1573          10 :         LWLockRelease(&state->lock);
    1574             : 
    1575          10 :         tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
    1576             :                              values, nulls);
    1577             :     }
    1578             : 
    1579          12 :     LWLockRelease(ReplicationOriginLock);
    1580             : 
    1581             : #undef REPLICATION_ORIGIN_PROGRESS_COLS
    1582             : 
    1583          12 :     return (Datum) 0;
    1584             : }

Generated by: LCOV version 1.14