LCOV - code coverage report
Current view: top level - src/backend/replication/logical - origin.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18beta1 Lines: 396 457 86.7 %
Date: 2025-05-11 12:15:22 Functions: 28 31 90.3 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * origin.c
       4             :  *    Logical replication progress tracking support.
       5             :  *
       6             :  * Copyright (c) 2013-2025, PostgreSQL Global Development Group
       7             :  *
       8             :  * IDENTIFICATION
       9             :  *    src/backend/replication/logical/origin.c
      10             :  *
      11             :  * NOTES
      12             :  *
      13             :  * This file provides the following:
      14             :  * * An infrastructure to name nodes in a replication setup
      15             :  * * A facility to efficiently store and persist replication progress in an
      16             :  *   efficient and durable manner.
      17             :  *
      18             :  * Replication origin consist out of a descriptive, user defined, external
      19             :  * name and a short, thus space efficient, internal 2 byte one. This split
      20             :  * exists because replication origin have to be stored in WAL and shared
      21             :  * memory and long descriptors would be inefficient.  For now only use 2 bytes
      22             :  * for the internal id of a replication origin as it seems unlikely that there
      23             :  * soon will be more than 65k nodes in one replication setup; and using only
      24             :  * two bytes allow us to be more space efficient.
      25             :  *
      26             :  * Replication progress is tracked in a shared memory table
      27             :  * (ReplicationState) that's dumped to disk every checkpoint. Entries
      28             :  * ('slots') in this table are identified by the internal id. That's the case
      29             :  * because it allows to increase replication progress during crash
      30             :  * recovery. To allow doing so we store the original LSN (from the originating
      31             :  * system) of a transaction in the commit record. That allows to recover the
      32             :  * precise replayed state after crash recovery; without requiring synchronous
      33             :  * commits. Allowing logical replication to use asynchronous commit is
      34             :  * generally good for performance, but especially important as it allows a
      35             :  * single threaded replay process to keep up with a source that has multiple
      36             :  * backends generating changes concurrently.  For efficiency and simplicity
      37             :  * reasons a backend can setup one replication origin that's from then used as
      38             :  * the source of changes produced by the backend, until reset again.
      39             :  *
      40             :  * This infrastructure is intended to be used in cooperation with logical
      41             :  * decoding. When replaying from a remote system the configured origin is
      42             :  * provided to output plugins, allowing prevention of replication loops and
      43             :  * other filtering.
      44             :  *
      45             :  * There are several levels of locking at work:
      46             :  *
      47             :  * * To create and drop replication origins an exclusive lock on
      48             :  *   pg_replication_slot is required for the duration. That allows us to
      49             :  *   safely and conflict free assign new origins using a dirty snapshot.
      50             :  *
      51             :  * * When creating an in-memory replication progress slot the ReplicationOrigin
      52             :  *   LWLock has to be held exclusively; when iterating over the replication
      53             :  *   progress a shared lock has to be held, the same when advancing the
      54             :  *   replication progress of an individual backend that has not setup as the
      55             :  *   session's replication origin.
      56             :  *
      57             :  * * When manipulating or looking at the remote_lsn and local_lsn fields of a
      58             :  *   replication progress slot that slot's lwlock has to be held. That's
      59             :  *   primarily because we do not assume 8 byte writes (the LSN) is atomic on
      60             :  *   all our platforms, but it also simplifies memory ordering concerns
      61             :  *   between the remote and local lsn. We use a lwlock instead of a spinlock
      62             :  *   so it's less harmful to hold the lock over a WAL write
      63             :  *   (cf. AdvanceReplicationProgress).
      64             :  *
      65             :  * ---------------------------------------------------------------------------
      66             :  */
      67             : 
      68             : #include "postgres.h"
      69             : 
      70             : #include <unistd.h>
      71             : #include <sys/stat.h>
      72             : 
      73             : #include "access/genam.h"
      74             : #include "access/htup_details.h"
      75             : #include "access/table.h"
      76             : #include "access/xact.h"
      77             : #include "access/xloginsert.h"
      78             : #include "catalog/catalog.h"
      79             : #include "catalog/indexing.h"
      80             : #include "catalog/pg_subscription.h"
      81             : #include "funcapi.h"
      82             : #include "miscadmin.h"
      83             : #include "nodes/execnodes.h"
      84             : #include "pgstat.h"
      85             : #include "replication/origin.h"
      86             : #include "replication/slot.h"
      87             : #include "storage/condition_variable.h"
      88             : #include "storage/fd.h"
      89             : #include "storage/ipc.h"
      90             : #include "storage/lmgr.h"
      91             : #include "utils/builtins.h"
      92             : #include "utils/fmgroids.h"
      93             : #include "utils/guc.h"
      94             : #include "utils/pg_lsn.h"
      95             : #include "utils/rel.h"
      96             : #include "utils/snapmgr.h"
      97             : #include "utils/syscache.h"
      98             : 
      99             : /* paths for replication origin checkpoint files */
     100             : #define PG_REPLORIGIN_CHECKPOINT_FILENAME PG_LOGICAL_DIR "/replorigin_checkpoint"
     101             : #define PG_REPLORIGIN_CHECKPOINT_TMPFILE PG_REPLORIGIN_CHECKPOINT_FILENAME ".tmp"
     102             : 
     103             : /* GUC variables */
     104             : int         max_active_replication_origins = 10;
     105             : 
     106             : /*
     107             :  * Replay progress of a single remote node.
     108             :  */
     109             : typedef struct ReplicationState
     110             : {
     111             :     /*
     112             :      * Local identifier for the remote node.
     113             :      */
     114             :     RepOriginId roident;
     115             : 
     116             :     /*
     117             :      * Location of the latest commit from the remote side.
     118             :      */
     119             :     XLogRecPtr  remote_lsn;
     120             : 
     121             :     /*
     122             :      * Remember the local lsn of the commit record so we can XLogFlush() to it
     123             :      * during a checkpoint so we know the commit record actually is safe on
     124             :      * disk.
     125             :      */
     126             :     XLogRecPtr  local_lsn;
     127             : 
     128             :     /*
     129             :      * PID of backend that's acquired slot, or 0 if none.
     130             :      */
     131             :     int         acquired_by;
     132             : 
     133             :     /*
     134             :      * Condition variable that's signaled when acquired_by changes.
     135             :      */
     136             :     ConditionVariable origin_cv;
     137             : 
     138             :     /*
     139             :      * Lock protecting remote_lsn and local_lsn.
     140             :      */
     141             :     LWLock      lock;
     142             : } ReplicationState;
     143             : 
     144             : /*
     145             :  * On disk version of ReplicationState.
     146             :  */
     147             : typedef struct ReplicationStateOnDisk
     148             : {
     149             :     RepOriginId roident;
     150             :     XLogRecPtr  remote_lsn;
     151             : } ReplicationStateOnDisk;
     152             : 
     153             : 
     154             : typedef struct ReplicationStateCtl
     155             : {
     156             :     /* Tranche to use for per-origin LWLocks */
     157             :     int         tranche_id;
     158             :     /* Array of length max_active_replication_origins */
     159             :     ReplicationState states[FLEXIBLE_ARRAY_MEMBER];
     160             : } ReplicationStateCtl;
     161             : 
     162             : /* external variables */
     163             : RepOriginId replorigin_session_origin = InvalidRepOriginId; /* assumed identity */
     164             : XLogRecPtr  replorigin_session_origin_lsn = InvalidXLogRecPtr;
     165             : TimestampTz replorigin_session_origin_timestamp = 0;
     166             : 
     167             : /*
     168             :  * Base address into a shared memory array of replication states of size
     169             :  * max_active_replication_origins.
     170             :  */
     171             : static ReplicationState *replication_states;
     172             : 
     173             : /*
     174             :  * Actual shared memory block (replication_states[] is now part of this).
     175             :  */
     176             : static ReplicationStateCtl *replication_states_ctl;
     177             : 
     178             : /*
     179             :  * We keep a pointer to this backend's ReplicationState to avoid having to
     180             :  * search the replication_states array in replorigin_session_advance for each
     181             :  * remote commit.  (Ownership of a backend's own entry can only be changed by
     182             :  * that backend.)
     183             :  */
     184             : static ReplicationState *session_replication_state = NULL;
     185             : 
     186             : /* Magic for on disk files. */
     187             : #define REPLICATION_STATE_MAGIC ((uint32) 0x1257DADE)
     188             : 
     189             : static void
     190          94 : replorigin_check_prerequisites(bool check_origins, bool recoveryOK)
     191             : {
     192          94 :     if (check_origins && max_active_replication_origins == 0)
     193           0 :         ereport(ERROR,
     194             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     195             :                  errmsg("cannot query or manipulate replication origin when \"max_active_replication_origins\" is 0")));
     196             : 
     197          94 :     if (!recoveryOK && RecoveryInProgress())
     198           0 :         ereport(ERROR,
     199             :                 (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
     200             :                  errmsg("cannot manipulate replication origins during recovery")));
     201          94 : }
     202             : 
     203             : 
     204             : /*
     205             :  * IsReservedOriginName
     206             :  *      True iff name is either "none" or "any".
     207             :  */
     208             : static bool
     209          22 : IsReservedOriginName(const char *name)
     210             : {
     211          42 :     return ((pg_strcasecmp(name, LOGICALREP_ORIGIN_NONE) == 0) ||
     212          20 :             (pg_strcasecmp(name, LOGICALREP_ORIGIN_ANY) == 0));
     213             : }
     214             : 
     215             : /* ---------------------------------------------------------------------------
     216             :  * Functions for working with replication origins themselves.
     217             :  * ---------------------------------------------------------------------------
     218             :  */
     219             : 
     220             : /*
     221             :  * Check for a persistent replication origin identified by name.
     222             :  *
     223             :  * Returns InvalidOid if the node isn't known yet and missing_ok is true.
     224             :  */
     225             : RepOriginId
     226        1842 : replorigin_by_name(const char *roname, bool missing_ok)
     227             : {
     228             :     Form_pg_replication_origin ident;
     229        1842 :     Oid         roident = InvalidOid;
     230             :     HeapTuple   tuple;
     231             :     Datum       roname_d;
     232             : 
     233        1842 :     roname_d = CStringGetTextDatum(roname);
     234             : 
     235        1842 :     tuple = SearchSysCache1(REPLORIGNAME, roname_d);
     236        1842 :     if (HeapTupleIsValid(tuple))
     237             :     {
     238        1076 :         ident = (Form_pg_replication_origin) GETSTRUCT(tuple);
     239        1076 :         roident = ident->roident;
     240        1076 :         ReleaseSysCache(tuple);
     241             :     }
     242         766 :     else if (!missing_ok)
     243           8 :         ereport(ERROR,
     244             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
     245             :                  errmsg("replication origin \"%s\" does not exist",
     246             :                         roname)));
     247             : 
     248        1834 :     return roident;
     249             : }
     250             : 
     251             : /*
     252             :  * Create a replication origin.
     253             :  *
     254             :  * Needs to be called in a transaction.
     255             :  */
     256             : RepOriginId
     257         720 : replorigin_create(const char *roname)
     258             : {
     259             :     Oid         roident;
     260         720 :     HeapTuple   tuple = NULL;
     261             :     Relation    rel;
     262             :     Datum       roname_d;
     263             :     SnapshotData SnapshotDirty;
     264             :     SysScanDesc scan;
     265             :     ScanKeyData key;
     266             : 
     267             :     /*
     268             :      * To avoid needing a TOAST table for pg_replication_origin, we limit
     269             :      * replication origin names to 512 bytes.  This should be more than enough
     270             :      * for all practical use.
     271             :      */
     272         720 :     if (strlen(roname) > MAX_RONAME_LEN)
     273           6 :         ereport(ERROR,
     274             :                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
     275             :                  errmsg("replication origin name is too long"),
     276             :                  errdetail("Replication origin names must be no longer than %d bytes.",
     277             :                            MAX_RONAME_LEN)));
     278             : 
     279         714 :     roname_d = CStringGetTextDatum(roname);
     280             : 
     281             :     Assert(IsTransactionState());
     282             : 
     283             :     /*
     284             :      * We need the numeric replication origin to be 16bit wide, so we cannot
     285             :      * rely on the normal oid allocation. Instead we simply scan
     286             :      * pg_replication_origin for the first unused id. That's not particularly
     287             :      * efficient, but this should be a fairly infrequent operation - we can
     288             :      * easily spend a bit more code on this when it turns out it needs to be
     289             :      * faster.
     290             :      *
     291             :      * We handle concurrency by taking an exclusive lock (allowing reads!)
     292             :      * over the table for the duration of the search. Because we use a "dirty
     293             :      * snapshot" we can read rows that other in-progress sessions have
     294             :      * written, even though they would be invisible with normal snapshots. Due
     295             :      * to the exclusive lock there's no danger that new rows can appear while
     296             :      * we're checking.
     297             :      */
     298         714 :     InitDirtySnapshot(SnapshotDirty);
     299             : 
     300         714 :     rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
     301             : 
     302             :     /*
     303             :      * We want to be able to access pg_replication_origin without setting up a
     304             :      * snapshot.  To make that safe, it needs to not have a TOAST table, since
     305             :      * TOASTed data cannot be fetched without a snapshot.  As of this writing,
     306             :      * its only varlena column is roname, which we limit to 512 bytes to avoid
     307             :      * needing out-of-line storage.  If you add a TOAST table to this catalog,
     308             :      * be sure to set up a snapshot everywhere it might be needed.  For more
     309             :      * information, see https://postgr.es/m/ZvMSUPOqUU-VNADN%40nathan.
     310             :      */
     311             :     Assert(!OidIsValid(rel->rd_rel->reltoastrelid));
     312             : 
     313        1256 :     for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
     314             :     {
     315             :         bool        nulls[Natts_pg_replication_origin];
     316             :         Datum       values[Natts_pg_replication_origin];
     317             :         bool        collides;
     318             : 
     319        1256 :         CHECK_FOR_INTERRUPTS();
     320             : 
     321        1256 :         ScanKeyInit(&key,
     322             :                     Anum_pg_replication_origin_roident,
     323             :                     BTEqualStrategyNumber, F_OIDEQ,
     324             :                     ObjectIdGetDatum(roident));
     325             : 
     326        1256 :         scan = systable_beginscan(rel, ReplicationOriginIdentIndex,
     327             :                                   true /* indexOK */ ,
     328             :                                   &SnapshotDirty,
     329             :                                   1, &key);
     330             : 
     331        1256 :         collides = HeapTupleIsValid(systable_getnext(scan));
     332             : 
     333        1256 :         systable_endscan(scan);
     334             : 
     335        1256 :         if (!collides)
     336             :         {
     337             :             /*
     338             :              * Ok, found an unused roident, insert the new row and do a CCI,
     339             :              * so our callers can look it up if they want to.
     340             :              */
     341         714 :             memset(&nulls, 0, sizeof(nulls));
     342             : 
     343         714 :             values[Anum_pg_replication_origin_roident - 1] = ObjectIdGetDatum(roident);
     344         714 :             values[Anum_pg_replication_origin_roname - 1] = roname_d;
     345             : 
     346         714 :             tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
     347         714 :             CatalogTupleInsert(rel, tuple);
     348         712 :             CommandCounterIncrement();
     349         712 :             break;
     350             :         }
     351             :     }
     352             : 
     353             :     /* now release lock again,  */
     354         712 :     table_close(rel, ExclusiveLock);
     355             : 
     356         712 :     if (tuple == NULL)
     357           0 :         ereport(ERROR,
     358             :                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
     359             :                  errmsg("could not find free replication origin ID")));
     360             : 
     361         712 :     heap_freetuple(tuple);
     362         712 :     return roident;
     363             : }
     364             : 
     365             : /*
     366             :  * Helper function to drop a replication origin.
     367             :  */
     368             : static void
     369         582 : replorigin_state_clear(RepOriginId roident, bool nowait)
     370             : {
     371             :     int         i;
     372             : 
     373             :     /*
     374             :      * Clean up the slot state info, if there is any matching slot.
     375             :      */
     376         582 : restart:
     377         582 :     LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
     378             : 
     379        1768 :     for (i = 0; i < max_active_replication_origins; i++)
     380             :     {
     381        1694 :         ReplicationState *state = &replication_states[i];
     382             : 
     383        1694 :         if (state->roident == roident)
     384             :         {
     385             :             /* found our slot, is it busy? */
     386         508 :             if (state->acquired_by != 0)
     387             :             {
     388             :                 ConditionVariable *cv;
     389             : 
     390           0 :                 if (nowait)
     391           0 :                     ereport(ERROR,
     392             :                             (errcode(ERRCODE_OBJECT_IN_USE),
     393             :                              errmsg("could not drop replication origin with ID %d, in use by PID %d",
     394             :                                     state->roident,
     395             :                                     state->acquired_by)));
     396             : 
     397             :                 /*
     398             :                  * We must wait and then retry.  Since we don't know which CV
     399             :                  * to wait on until here, we can't readily use
     400             :                  * ConditionVariablePrepareToSleep (calling it here would be
     401             :                  * wrong, since we could miss the signal if we did so); just
     402             :                  * use ConditionVariableSleep directly.
     403             :                  */
     404           0 :                 cv = &state->origin_cv;
     405             : 
     406           0 :                 LWLockRelease(ReplicationOriginLock);
     407             : 
     408           0 :                 ConditionVariableSleep(cv, WAIT_EVENT_REPLICATION_ORIGIN_DROP);
     409           0 :                 goto restart;
     410             :             }
     411             : 
     412             :             /* first make a WAL log entry */
     413             :             {
     414             :                 xl_replorigin_drop xlrec;
     415             : 
     416         508 :                 xlrec.node_id = roident;
     417         508 :                 XLogBeginInsert();
     418         508 :                 XLogRegisterData(&xlrec, sizeof(xlrec));
     419         508 :                 XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_DROP);
     420             :             }
     421             : 
     422             :             /* then clear the in-memory slot */
     423         508 :             state->roident = InvalidRepOriginId;
     424         508 :             state->remote_lsn = InvalidXLogRecPtr;
     425         508 :             state->local_lsn = InvalidXLogRecPtr;
     426         508 :             break;
     427             :         }
     428             :     }
     429         582 :     LWLockRelease(ReplicationOriginLock);
     430         582 :     ConditionVariableCancelSleep();
     431         582 : }
     432             : 
     433             : /*
     434             :  * Drop replication origin (by name).
     435             :  *
     436             :  * Needs to be called in a transaction.
     437             :  */
     438             : void
     439         956 : replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
     440             : {
     441             :     RepOriginId roident;
     442             :     Relation    rel;
     443             :     HeapTuple   tuple;
     444             : 
     445             :     Assert(IsTransactionState());
     446             : 
     447         956 :     rel = table_open(ReplicationOriginRelationId, RowExclusiveLock);
     448             : 
     449         956 :     roident = replorigin_by_name(name, missing_ok);
     450             : 
     451             :     /* Lock the origin to prevent concurrent drops. */
     452         954 :     LockSharedObject(ReplicationOriginRelationId, roident, 0,
     453             :                      AccessExclusiveLock);
     454             : 
     455         954 :     tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident));
     456         954 :     if (!HeapTupleIsValid(tuple))
     457             :     {
     458         372 :         if (!missing_ok)
     459           0 :             elog(ERROR, "cache lookup failed for replication origin with ID %d",
     460             :                  roident);
     461             : 
     462             :         /*
     463             :          * We don't need to retain the locks if the origin is already dropped.
     464             :          */
     465         372 :         UnlockSharedObject(ReplicationOriginRelationId, roident, 0,
     466             :                            AccessExclusiveLock);
     467         372 :         table_close(rel, RowExclusiveLock);
     468         372 :         return;
     469             :     }
     470             : 
     471         582 :     replorigin_state_clear(roident, nowait);
     472             : 
     473             :     /*
     474             :      * Now, we can delete the catalog entry.
     475             :      */
     476         582 :     CatalogTupleDelete(rel, &tuple->t_self);
     477         582 :     ReleaseSysCache(tuple);
     478             : 
     479         582 :     CommandCounterIncrement();
     480             : 
     481             :     /* We keep the lock on pg_replication_origin until commit */
     482         582 :     table_close(rel, NoLock);
     483             : }
     484             : 
     485             : /*
     486             :  * Lookup replication origin via its oid and return the name.
     487             :  *
     488             :  * The external name is palloc'd in the calling context.
     489             :  *
     490             :  * Returns true if the origin is known, false otherwise.
     491             :  */
     492             : bool
     493          54 : replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
     494             : {
     495             :     HeapTuple   tuple;
     496             :     Form_pg_replication_origin ric;
     497             : 
     498             :     Assert(OidIsValid((Oid) roident));
     499             :     Assert(roident != InvalidRepOriginId);
     500             :     Assert(roident != DoNotReplicateId);
     501             : 
     502          54 :     tuple = SearchSysCache1(REPLORIGIDENT,
     503             :                             ObjectIdGetDatum((Oid) roident));
     504             : 
     505          54 :     if (HeapTupleIsValid(tuple))
     506             :     {
     507          48 :         ric = (Form_pg_replication_origin) GETSTRUCT(tuple);
     508          48 :         *roname = text_to_cstring(&ric->roname);
     509          48 :         ReleaseSysCache(tuple);
     510             : 
     511          48 :         return true;
     512             :     }
     513             :     else
     514             :     {
     515           6 :         *roname = NULL;
     516             : 
     517           6 :         if (!missing_ok)
     518           0 :             ereport(ERROR,
     519             :                     (errcode(ERRCODE_UNDEFINED_OBJECT),
     520             :                      errmsg("replication origin with ID %d does not exist",
     521             :                             roident)));
     522             : 
     523           6 :         return false;
     524             :     }
     525             : }
     526             : 
     527             : 
     528             : /* ---------------------------------------------------------------------------
     529             :  * Functions for handling replication progress.
     530             :  * ---------------------------------------------------------------------------
     531             :  */
     532             : 
     533             : Size
     534        8102 : ReplicationOriginShmemSize(void)
     535             : {
     536        8102 :     Size        size = 0;
     537             : 
     538        8102 :     if (max_active_replication_origins == 0)
     539           4 :         return size;
     540             : 
     541        8098 :     size = add_size(size, offsetof(ReplicationStateCtl, states));
     542             : 
     543        8098 :     size = add_size(size,
     544             :                     mul_size(max_active_replication_origins, sizeof(ReplicationState)));
     545        8098 :     return size;
     546             : }
     547             : 
     548             : void
     549        2100 : ReplicationOriginShmemInit(void)
     550             : {
     551             :     bool        found;
     552             : 
     553        2100 :     if (max_active_replication_origins == 0)
     554           2 :         return;
     555             : 
     556        2098 :     replication_states_ctl = (ReplicationStateCtl *)
     557        2098 :         ShmemInitStruct("ReplicationOriginState",
     558             :                         ReplicationOriginShmemSize(),
     559             :                         &found);
     560        2098 :     replication_states = replication_states_ctl->states;
     561             : 
     562        2098 :     if (!found)
     563             :     {
     564             :         int         i;
     565             : 
     566      150930 :         MemSet(replication_states_ctl, 0, ReplicationOriginShmemSize());
     567             : 
     568        2098 :         replication_states_ctl->tranche_id = LWTRANCHE_REPLICATION_ORIGIN_STATE;
     569             : 
     570       23060 :         for (i = 0; i < max_active_replication_origins; i++)
     571             :         {
     572       20962 :             LWLockInitialize(&replication_states[i].lock,
     573       20962 :                              replication_states_ctl->tranche_id);
     574       20962 :             ConditionVariableInit(&replication_states[i].origin_cv);
     575             :         }
     576             :     }
     577             : }
     578             : 
     579             : /* ---------------------------------------------------------------------------
     580             :  * Perform a checkpoint of each replication origin's progress with respect to
     581             :  * the replayed remote_lsn. Make sure that all transactions we refer to in the
     582             :  * checkpoint (local_lsn) are actually on-disk. This might not yet be the case
     583             :  * if the transactions were originally committed asynchronously.
     584             :  *
     585             :  * We store checkpoints in the following format:
     586             :  * +-------+------------------------+------------------+-----+--------+
     587             :  * | MAGIC | ReplicationStateOnDisk | struct Replic... | ... | CRC32C | EOF
     588             :  * +-------+------------------------+------------------+-----+--------+
     589             :  *
     590             :  * So its just the magic, followed by the statically sized
     591             :  * ReplicationStateOnDisk structs. Note that the maximum number of
     592             :  * ReplicationState is determined by max_active_replication_origins.
     593             :  * ---------------------------------------------------------------------------
     594             :  */
     595             : void
     596        3318 : CheckPointReplicationOrigin(void)
     597             : {
     598        3318 :     const char *tmppath = PG_REPLORIGIN_CHECKPOINT_TMPFILE;
     599        3318 :     const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
     600             :     int         tmpfd;
     601             :     int         i;
     602        3318 :     uint32      magic = REPLICATION_STATE_MAGIC;
     603             :     pg_crc32c   crc;
     604             : 
     605        3318 :     if (max_active_replication_origins == 0)
     606           2 :         return;
     607             : 
     608        3316 :     INIT_CRC32C(crc);
     609             : 
     610             :     /* make sure no old temp file is remaining */
     611        3316 :     if (unlink(tmppath) < 0 && errno != ENOENT)
     612           0 :         ereport(PANIC,
     613             :                 (errcode_for_file_access(),
     614             :                  errmsg("could not remove file \"%s\": %m",
     615             :                         tmppath)));
     616             : 
     617             :     /*
     618             :      * no other backend can perform this at the same time; only one checkpoint
     619             :      * can happen at a time.
     620             :      */
     621        3316 :     tmpfd = OpenTransientFile(tmppath,
     622             :                               O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
     623        3316 :     if (tmpfd < 0)
     624           0 :         ereport(PANIC,
     625             :                 (errcode_for_file_access(),
     626             :                  errmsg("could not create file \"%s\": %m",
     627             :                         tmppath)));
     628             : 
     629             :     /* write magic */
     630        3316 :     errno = 0;
     631        3316 :     if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
     632             :     {
     633             :         /* if write didn't set errno, assume problem is no disk space */
     634           0 :         if (errno == 0)
     635           0 :             errno = ENOSPC;
     636           0 :         ereport(PANIC,
     637             :                 (errcode_for_file_access(),
     638             :                  errmsg("could not write to file \"%s\": %m",
     639             :                         tmppath)));
     640             :     }
     641        3316 :     COMP_CRC32C(crc, &magic, sizeof(magic));
     642             : 
     643             :     /* prevent concurrent creations/drops */
     644        3316 :     LWLockAcquire(ReplicationOriginLock, LW_SHARED);
     645             : 
     646             :     /* write actual data */
     647       36476 :     for (i = 0; i < max_active_replication_origins; i++)
     648             :     {
     649             :         ReplicationStateOnDisk disk_state;
     650       33160 :         ReplicationState *curstate = &replication_states[i];
     651             :         XLogRecPtr  local_lsn;
     652             : 
     653       33160 :         if (curstate->roident == InvalidRepOriginId)
     654       33068 :             continue;
     655             : 
     656             :         /* zero, to avoid uninitialized padding bytes */
     657          92 :         memset(&disk_state, 0, sizeof(disk_state));
     658             : 
     659          92 :         LWLockAcquire(&curstate->lock, LW_SHARED);
     660             : 
     661          92 :         disk_state.roident = curstate->roident;
     662             : 
     663          92 :         disk_state.remote_lsn = curstate->remote_lsn;
     664          92 :         local_lsn = curstate->local_lsn;
     665             : 
     666          92 :         LWLockRelease(&curstate->lock);
     667             : 
     668             :         /* make sure we only write out a commit that's persistent */
     669          92 :         XLogFlush(local_lsn);
     670             : 
     671          92 :         errno = 0;
     672          92 :         if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
     673             :             sizeof(disk_state))
     674             :         {
     675             :             /* if write didn't set errno, assume problem is no disk space */
     676           0 :             if (errno == 0)
     677           0 :                 errno = ENOSPC;
     678           0 :             ereport(PANIC,
     679             :                     (errcode_for_file_access(),
     680             :                      errmsg("could not write to file \"%s\": %m",
     681             :                             tmppath)));
     682             :         }
     683             : 
     684          92 :         COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
     685             :     }
     686             : 
     687        3316 :     LWLockRelease(ReplicationOriginLock);
     688             : 
     689             :     /* write out the CRC */
     690        3316 :     FIN_CRC32C(crc);
     691        3316 :     errno = 0;
     692        3316 :     if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
     693             :     {
     694             :         /* if write didn't set errno, assume problem is no disk space */
     695           0 :         if (errno == 0)
     696           0 :             errno = ENOSPC;
     697           0 :         ereport(PANIC,
     698             :                 (errcode_for_file_access(),
     699             :                  errmsg("could not write to file \"%s\": %m",
     700             :                         tmppath)));
     701             :     }
     702             : 
     703        3316 :     if (CloseTransientFile(tmpfd) != 0)
     704           0 :         ereport(PANIC,
     705             :                 (errcode_for_file_access(),
     706             :                  errmsg("could not close file \"%s\": %m",
     707             :                         tmppath)));
     708             : 
     709             :     /* fsync, rename to permanent file, fsync file and directory */
     710        3316 :     durable_rename(tmppath, path, PANIC);
     711             : }
     712             : 
     713             : /*
     714             :  * Recover replication replay status from checkpoint data saved earlier by
     715             :  * CheckPointReplicationOrigin.
     716             :  *
     717             :  * This only needs to be called at startup and *not* during every checkpoint
     718             :  * read during recovery (e.g. in HS or PITR from a base backup) afterwards. All
     719             :  * state thereafter can be recovered by looking at commit records.
     720             :  */
     721             : void
     722        1812 : StartupReplicationOrigin(void)
     723             : {
     724        1812 :     const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
     725             :     int         fd;
     726             :     int         readBytes;
     727        1812 :     uint32      magic = REPLICATION_STATE_MAGIC;
     728        1812 :     int         last_state = 0;
     729             :     pg_crc32c   file_crc;
     730             :     pg_crc32c   crc;
     731             : 
     732             :     /* don't want to overwrite already existing state */
     733             : #ifdef USE_ASSERT_CHECKING
     734             :     static bool already_started = false;
     735             : 
     736             :     Assert(!already_started);
     737             :     already_started = true;
     738             : #endif
     739             : 
     740        1812 :     if (max_active_replication_origins == 0)
     741         100 :         return;
     742             : 
     743        1810 :     INIT_CRC32C(crc);
     744             : 
     745        1810 :     elog(DEBUG2, "starting up replication origin progress state");
     746             : 
     747        1810 :     fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
     748             : 
     749             :     /*
     750             :      * might have had max_active_replication_origins == 0 last run, or we just
     751             :      * brought up a standby.
     752             :      */
     753        1810 :     if (fd < 0 && errno == ENOENT)
     754          98 :         return;
     755        1712 :     else if (fd < 0)
     756           0 :         ereport(PANIC,
     757             :                 (errcode_for_file_access(),
     758             :                  errmsg("could not open file \"%s\": %m",
     759             :                         path)));
     760             : 
     761             :     /* verify magic, that is written even if nothing was active */
     762        1712 :     readBytes = read(fd, &magic, sizeof(magic));
     763        1712 :     if (readBytes != sizeof(magic))
     764             :     {
     765           0 :         if (readBytes < 0)
     766           0 :             ereport(PANIC,
     767             :                     (errcode_for_file_access(),
     768             :                      errmsg("could not read file \"%s\": %m",
     769             :                             path)));
     770             :         else
     771           0 :             ereport(PANIC,
     772             :                     (errcode(ERRCODE_DATA_CORRUPTED),
     773             :                      errmsg("could not read file \"%s\": read %d of %zu",
     774             :                             path, readBytes, sizeof(magic))));
     775             :     }
     776        1712 :     COMP_CRC32C(crc, &magic, sizeof(magic));
     777             : 
     778        1712 :     if (magic != REPLICATION_STATE_MAGIC)
     779           0 :         ereport(PANIC,
     780             :                 (errmsg("replication checkpoint has wrong magic %u instead of %u",
     781             :                         magic, REPLICATION_STATE_MAGIC)));
     782             : 
     783             :     /* we can skip locking here, no other access is possible */
     784             : 
     785             :     /* recover individual states, until there are no more to be found */
     786             :     while (true)
     787          42 :     {
     788             :         ReplicationStateOnDisk disk_state;
     789             : 
     790        1754 :         readBytes = read(fd, &disk_state, sizeof(disk_state));
     791             : 
     792             :         /* no further data */
     793        1754 :         if (readBytes == sizeof(crc))
     794             :         {
     795             :             /* not pretty, but simple ... */
     796        1712 :             file_crc = *(pg_crc32c *) &disk_state;
     797        1712 :             break;
     798             :         }
     799             : 
     800          42 :         if (readBytes < 0)
     801             :         {
     802           0 :             ereport(PANIC,
     803             :                     (errcode_for_file_access(),
     804             :                      errmsg("could not read file \"%s\": %m",
     805             :                             path)));
     806             :         }
     807             : 
     808          42 :         if (readBytes != sizeof(disk_state))
     809             :         {
     810           0 :             ereport(PANIC,
     811             :                     (errcode_for_file_access(),
     812             :                      errmsg("could not read file \"%s\": read %d of %zu",
     813             :                             path, readBytes, sizeof(disk_state))));
     814             :         }
     815             : 
     816          42 :         COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
     817             : 
     818          42 :         if (last_state == max_active_replication_origins)
     819           0 :             ereport(PANIC,
     820             :                     (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     821             :                      errmsg("could not find free replication state, increase \"max_active_replication_origins\"")));
     822             : 
     823             :         /* copy data to shared memory */
     824          42 :         replication_states[last_state].roident = disk_state.roident;
     825          42 :         replication_states[last_state].remote_lsn = disk_state.remote_lsn;
     826          42 :         last_state++;
     827             : 
     828          42 :         ereport(LOG,
     829             :                 (errmsg("recovered replication state of node %d to %X/%X",
     830             :                         disk_state.roident,
     831             :                         LSN_FORMAT_ARGS(disk_state.remote_lsn))));
     832             :     }
     833             : 
     834             :     /* now check checksum */
     835        1712 :     FIN_CRC32C(crc);
     836        1712 :     if (file_crc != crc)
     837           0 :         ereport(PANIC,
     838             :                 (errcode(ERRCODE_DATA_CORRUPTED),
     839             :                  errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
     840             :                         crc, file_crc)));
     841             : 
     842        1712 :     if (CloseTransientFile(fd) != 0)
     843           0 :         ereport(PANIC,
     844             :                 (errcode_for_file_access(),
     845             :                  errmsg("could not close file \"%s\": %m",
     846             :                         path)));
     847             : }
     848             : 
     849             : void
     850           8 : replorigin_redo(XLogReaderState *record)
     851             : {
     852           8 :     uint8       info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
     853             : 
     854           8 :     switch (info)
     855             :     {
     856           4 :         case XLOG_REPLORIGIN_SET:
     857             :             {
     858           4 :                 xl_replorigin_set *xlrec =
     859           4 :                     (xl_replorigin_set *) XLogRecGetData(record);
     860             : 
     861           4 :                 replorigin_advance(xlrec->node_id,
     862             :                                    xlrec->remote_lsn, record->EndRecPtr,
     863           4 :                                    xlrec->force /* backward */ ,
     864             :                                    false /* WAL log */ );
     865           4 :                 break;
     866             :             }
     867           4 :         case XLOG_REPLORIGIN_DROP:
     868             :             {
     869             :                 xl_replorigin_drop *xlrec;
     870             :                 int         i;
     871             : 
     872           4 :                 xlrec = (xl_replorigin_drop *) XLogRecGetData(record);
     873             : 
     874           6 :                 for (i = 0; i < max_active_replication_origins; i++)
     875             :                 {
     876           6 :                     ReplicationState *state = &replication_states[i];
     877             : 
     878             :                     /* found our slot */
     879           6 :                     if (state->roident == xlrec->node_id)
     880             :                     {
     881             :                         /* reset entry */
     882           4 :                         state->roident = InvalidRepOriginId;
     883           4 :                         state->remote_lsn = InvalidXLogRecPtr;
     884           4 :                         state->local_lsn = InvalidXLogRecPtr;
     885           4 :                         break;
     886             :                     }
     887             :                 }
     888           4 :                 break;
     889             :             }
     890           0 :         default:
     891           0 :             elog(PANIC, "replorigin_redo: unknown op code %u", info);
     892             :     }
     893           8 : }
     894             : 
     895             : 
     896             : /*
     897             :  * Tell the replication origin progress machinery that a commit from 'node'
     898             :  * that originated at the LSN remote_commit on the remote node was replayed
     899             :  * successfully and that we don't need to do so again. In combination with
     900             :  * setting up replorigin_session_origin_lsn and replorigin_session_origin
     901             :  * that ensures we won't lose knowledge about that after a crash if the
     902             :  * transaction had a persistent effect (think of asynchronous commits).
     903             :  *
     904             :  * local_commit needs to be a local LSN of the commit so that we can make sure
     905             :  * upon a checkpoint that enough WAL has been persisted to disk.
     906             :  *
     907             :  * Needs to be called with a RowExclusiveLock on pg_replication_origin,
     908             :  * unless running in recovery.
     909             :  */
     910             : void
     911         472 : replorigin_advance(RepOriginId node,
     912             :                    XLogRecPtr remote_commit, XLogRecPtr local_commit,
     913             :                    bool go_backward, bool wal_log)
     914             : {
     915             :     int         i;
     916         472 :     ReplicationState *replication_state = NULL;
     917         472 :     ReplicationState *free_state = NULL;
     918             : 
     919             :     Assert(node != InvalidRepOriginId);
     920             : 
     921             :     /* we don't track DoNotReplicateId */
     922         472 :     if (node == DoNotReplicateId)
     923           0 :         return;
     924             : 
     925             :     /*
     926             :      * XXX: For the case where this is called by WAL replay, it'd be more
     927             :      * efficient to restore into a backend local hashtable and only dump into
     928             :      * shmem after recovery is finished. Let's wait with implementing that
     929             :      * till it's shown to be a measurable expense
     930             :      */
     931             : 
     932             :     /* Lock exclusively, as we may have to create a new table entry. */
     933         472 :     LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
     934             : 
     935             :     /*
     936             :      * Search for either an existing slot for the origin, or a free one we can
     937             :      * use.
     938             :      */
     939        4314 :     for (i = 0; i < max_active_replication_origins; i++)
     940             :     {
     941        3932 :         ReplicationState *curstate = &replication_states[i];
     942             : 
     943             :         /* remember where to insert if necessary */
     944        3932 :         if (curstate->roident == InvalidRepOriginId &&
     945             :             free_state == NULL)
     946             :         {
     947         382 :             free_state = curstate;
     948         382 :             continue;
     949             :         }
     950             : 
     951             :         /* not our slot */
     952        3550 :         if (curstate->roident != node)
     953             :         {
     954        3460 :             continue;
     955             :         }
     956             : 
     957             :         /* ok, found slot */
     958          90 :         replication_state = curstate;
     959             : 
     960          90 :         LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE);
     961             : 
     962             :         /* Make sure it's not used by somebody else */
     963          90 :         if (replication_state->acquired_by != 0)
     964             :         {
     965           0 :             ereport(ERROR,
     966             :                     (errcode(ERRCODE_OBJECT_IN_USE),
     967             :                      errmsg("replication origin with ID %d is already active for PID %d",
     968             :                             replication_state->roident,
     969             :                             replication_state->acquired_by)));
     970             :         }
     971             : 
     972          90 :         break;
     973             :     }
     974             : 
     975         472 :     if (replication_state == NULL && free_state == NULL)
     976           0 :         ereport(ERROR,
     977             :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     978             :                  errmsg("could not find free replication state slot for replication origin with ID %d",
     979             :                         node),
     980             :                  errhint("Increase \"max_active_replication_origins\" and try again.")));
     981             : 
     982         472 :     if (replication_state == NULL)
     983             :     {
     984             :         /* initialize new slot */
     985         382 :         LWLockAcquire(&free_state->lock, LW_EXCLUSIVE);
     986         382 :         replication_state = free_state;
     987             :         Assert(replication_state->remote_lsn == InvalidXLogRecPtr);
     988             :         Assert(replication_state->local_lsn == InvalidXLogRecPtr);
     989         382 :         replication_state->roident = node;
     990             :     }
     991             : 
     992             :     Assert(replication_state->roident != InvalidRepOriginId);
     993             : 
     994             :     /*
     995             :      * If somebody "forcefully" sets this slot, WAL log it, so it's durable
     996             :      * and the standby gets the message. Primarily this will be called during
     997             :      * WAL replay (of commit records) where no WAL logging is necessary.
     998             :      */
     999         472 :     if (wal_log)
    1000             :     {
    1001             :         xl_replorigin_set xlrec;
    1002             : 
    1003         390 :         xlrec.remote_lsn = remote_commit;
    1004         390 :         xlrec.node_id = node;
    1005         390 :         xlrec.force = go_backward;
    1006             : 
    1007         390 :         XLogBeginInsert();
    1008         390 :         XLogRegisterData(&xlrec, sizeof(xlrec));
    1009             : 
    1010         390 :         XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_SET);
    1011             :     }
    1012             : 
    1013             :     /*
    1014             :      * Due to - harmless - race conditions during a checkpoint we could see
    1015             :      * values here that are older than the ones we already have in memory. We
    1016             :      * could also see older values for prepared transactions when the prepare
    1017             :      * is sent at a later point of time along with commit prepared and there
    1018             :      * are other transactions commits between prepare and commit prepared. See
    1019             :      * ReorderBufferFinishPrepared. Don't overwrite those.
    1020             :      */
    1021         472 :     if (go_backward || replication_state->remote_lsn < remote_commit)
    1022         458 :         replication_state->remote_lsn = remote_commit;
    1023         472 :     if (local_commit != InvalidXLogRecPtr &&
    1024          76 :         (go_backward || replication_state->local_lsn < local_commit))
    1025          80 :         replication_state->local_lsn = local_commit;
    1026         472 :     LWLockRelease(&replication_state->lock);
    1027             : 
    1028             :     /*
    1029             :      * Release *after* changing the LSNs, slot isn't acquired and thus could
    1030             :      * otherwise be dropped anytime.
    1031             :      */
    1032         472 :     LWLockRelease(ReplicationOriginLock);
    1033             : }
    1034             : 
    1035             : 
    1036             : XLogRecPtr
    1037          16 : replorigin_get_progress(RepOriginId node, bool flush)
    1038             : {
    1039             :     int         i;
    1040          16 :     XLogRecPtr  local_lsn = InvalidXLogRecPtr;
    1041          16 :     XLogRecPtr  remote_lsn = InvalidXLogRecPtr;
    1042             : 
    1043             :     /* prevent slots from being concurrently dropped */
    1044          16 :     LWLockAcquire(ReplicationOriginLock, LW_SHARED);
    1045             : 
    1046          76 :     for (i = 0; i < max_active_replication_origins; i++)
    1047             :     {
    1048             :         ReplicationState *state;
    1049             : 
    1050          70 :         state = &replication_states[i];
    1051             : 
    1052          70 :         if (state->roident == node)
    1053             :         {
    1054          10 :             LWLockAcquire(&state->lock, LW_SHARED);
    1055             : 
    1056          10 :             remote_lsn = state->remote_lsn;
    1057          10 :             local_lsn = state->local_lsn;
    1058             : 
    1059          10 :             LWLockRelease(&state->lock);
    1060             : 
    1061          10 :             break;
    1062             :         }
    1063             :     }
    1064             : 
    1065          16 :     LWLockRelease(ReplicationOriginLock);
    1066             : 
    1067          16 :     if (flush && local_lsn != InvalidXLogRecPtr)
    1068           2 :         XLogFlush(local_lsn);
    1069             : 
    1070          16 :     return remote_lsn;
    1071             : }
    1072             : 
    1073             : /*
    1074             :  * Tear down a (possibly) configured session replication origin during process
    1075             :  * exit.
    1076             :  */
    1077             : static void
    1078         852 : ReplicationOriginExitCleanup(int code, Datum arg)
    1079             : {
    1080         852 :     ConditionVariable *cv = NULL;
    1081             : 
    1082         852 :     if (session_replication_state == NULL)
    1083         366 :         return;
    1084             : 
    1085         486 :     LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
    1086             : 
    1087         486 :     if (session_replication_state->acquired_by == MyProcPid)
    1088             :     {
    1089         466 :         cv = &session_replication_state->origin_cv;
    1090             : 
    1091         466 :         session_replication_state->acquired_by = 0;
    1092         466 :         session_replication_state = NULL;
    1093             :     }
    1094             : 
    1095         486 :     LWLockRelease(ReplicationOriginLock);
    1096             : 
    1097         486 :     if (cv)
    1098         466 :         ConditionVariableBroadcast(cv);
    1099             : }
    1100             : 
    1101             : /*
    1102             :  * Setup a replication origin in the shared memory struct if it doesn't
    1103             :  * already exist and cache access to the specific ReplicationSlot so the
    1104             :  * array doesn't have to be searched when calling
    1105             :  * replorigin_session_advance().
    1106             :  *
    1107             :  * Normally only one such cached origin can exist per process so the cached
    1108             :  * value can only be set again after the previous value is torn down with
    1109             :  * replorigin_session_reset(). For this normal case pass acquired_by = 0
    1110             :  * (meaning the slot is not allowed to be already acquired by another process).
    1111             :  *
    1112             :  * However, sometimes multiple processes can safely re-use the same origin slot
    1113             :  * (for example, multiple parallel apply processes can safely use the same
    1114             :  * origin, provided they maintain commit order by allowing only one process to
    1115             :  * commit at a time). For this case the first process must pass acquired_by =
    1116             :  * 0, and then the other processes sharing that same origin can pass
    1117             :  * acquired_by = PID of the first process.
    1118             :  */
    1119             : void
    1120         858 : replorigin_session_setup(RepOriginId node, int acquired_by)
    1121             : {
    1122             :     static bool registered_cleanup;
    1123             :     int         i;
    1124         858 :     int         free_slot = -1;
    1125             : 
    1126         858 :     if (!registered_cleanup)
    1127             :     {
    1128         852 :         on_shmem_exit(ReplicationOriginExitCleanup, 0);
    1129         852 :         registered_cleanup = true;
    1130             :     }
    1131             : 
    1132             :     Assert(max_active_replication_origins > 0);
    1133             : 
    1134         858 :     if (session_replication_state != NULL)
    1135           2 :         ereport(ERROR,
    1136             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1137             :                  errmsg("cannot setup replication origin when one is already setup")));
    1138             : 
    1139             :     /* Lock exclusively, as we may have to create a new table entry. */
    1140         856 :     LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
    1141             : 
    1142             :     /*
    1143             :      * Search for either an existing slot for the origin, or a free one we can
    1144             :      * use.
    1145             :      */
    1146        3612 :     for (i = 0; i < max_active_replication_origins; i++)
    1147             :     {
    1148        3386 :         ReplicationState *curstate = &replication_states[i];
    1149             : 
    1150             :         /* remember where to insert if necessary */
    1151        3386 :         if (curstate->roident == InvalidRepOriginId &&
    1152             :             free_slot == -1)
    1153             :         {
    1154         230 :             free_slot = i;
    1155         230 :             continue;
    1156             :         }
    1157             : 
    1158             :         /* not our slot */
    1159        3156 :         if (curstate->roident != node)
    1160        2526 :             continue;
    1161             : 
    1162         630 :         else if (curstate->acquired_by != 0 && acquired_by == 0)
    1163             :         {
    1164           0 :             ereport(ERROR,
    1165             :                     (errcode(ERRCODE_OBJECT_IN_USE),
    1166             :                      errmsg("replication origin with ID %d is already active for PID %d",
    1167             :                             curstate->roident, curstate->acquired_by)));
    1168             :         }
    1169             : 
    1170             :         /* ok, found slot */
    1171         630 :         session_replication_state = curstate;
    1172         630 :         break;
    1173             :     }
    1174             : 
    1175             : 
    1176         856 :     if (session_replication_state == NULL && free_slot == -1)
    1177           0 :         ereport(ERROR,
    1178             :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
    1179             :                  errmsg("could not find free replication state slot for replication origin with ID %d",
    1180             :                         node),
    1181             :                  errhint("Increase \"max_active_replication_origins\" and try again.")));
    1182         856 :     else if (session_replication_state == NULL)
    1183             :     {
    1184             :         /* initialize new slot */
    1185         226 :         session_replication_state = &replication_states[free_slot];
    1186             :         Assert(session_replication_state->remote_lsn == InvalidXLogRecPtr);
    1187             :         Assert(session_replication_state->local_lsn == InvalidXLogRecPtr);
    1188         226 :         session_replication_state->roident = node;
    1189             :     }
    1190             : 
    1191             : 
    1192             :     Assert(session_replication_state->roident != InvalidRepOriginId);
    1193             : 
    1194         856 :     if (acquired_by == 0)
    1195         836 :         session_replication_state->acquired_by = MyProcPid;
    1196          20 :     else if (session_replication_state->acquired_by != acquired_by)
    1197           0 :         elog(ERROR, "could not find replication state slot for replication origin with OID %u which was acquired by %d",
    1198             :              node, acquired_by);
    1199             : 
    1200         856 :     LWLockRelease(ReplicationOriginLock);
    1201             : 
    1202             :     /* probably this one is pointless */
    1203         856 :     ConditionVariableBroadcast(&session_replication_state->origin_cv);
    1204         856 : }
    1205             : 
    1206             : /*
    1207             :  * Reset replay state previously setup in this session.
    1208             :  *
    1209             :  * This function may only be called if an origin was setup with
    1210             :  * replorigin_session_setup().
    1211             :  */
    1212             : void
    1213         372 : replorigin_session_reset(void)
    1214             : {
    1215             :     ConditionVariable *cv;
    1216             : 
    1217             :     Assert(max_active_replication_origins != 0);
    1218             : 
    1219         372 :     if (session_replication_state == NULL)
    1220           2 :         ereport(ERROR,
    1221             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1222             :                  errmsg("no replication origin is configured")));
    1223             : 
    1224         370 :     LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
    1225             : 
    1226         370 :     session_replication_state->acquired_by = 0;
    1227         370 :     cv = &session_replication_state->origin_cv;
    1228         370 :     session_replication_state = NULL;
    1229             : 
    1230         370 :     LWLockRelease(ReplicationOriginLock);
    1231             : 
    1232         370 :     ConditionVariableBroadcast(cv);
    1233         370 : }
    1234             : 
    1235             : /*
    1236             :  * Do the same work replorigin_advance() does, just on the session's
    1237             :  * configured origin.
    1238             :  *
    1239             :  * This is noticeably cheaper than using replorigin_advance().
    1240             :  */
    1241             : void
    1242        2152 : replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit)
    1243             : {
    1244             :     Assert(session_replication_state != NULL);
    1245             :     Assert(session_replication_state->roident != InvalidRepOriginId);
    1246             : 
    1247        2152 :     LWLockAcquire(&session_replication_state->lock, LW_EXCLUSIVE);
    1248        2152 :     if (session_replication_state->local_lsn < local_commit)
    1249        2152 :         session_replication_state->local_lsn = local_commit;
    1250        2152 :     if (session_replication_state->remote_lsn < remote_commit)
    1251        1018 :         session_replication_state->remote_lsn = remote_commit;
    1252        2152 :     LWLockRelease(&session_replication_state->lock);
    1253        2152 : }
    1254             : 
    1255             : /*
    1256             :  * Ask the machinery about the point up to which we successfully replayed
    1257             :  * changes from an already setup replication origin.
    1258             :  */
    1259             : XLogRecPtr
    1260         446 : replorigin_session_get_progress(bool flush)
    1261             : {
    1262             :     XLogRecPtr  remote_lsn;
    1263             :     XLogRecPtr  local_lsn;
    1264             : 
    1265             :     Assert(session_replication_state != NULL);
    1266             : 
    1267         446 :     LWLockAcquire(&session_replication_state->lock, LW_SHARED);
    1268         446 :     remote_lsn = session_replication_state->remote_lsn;
    1269         446 :     local_lsn = session_replication_state->local_lsn;
    1270         446 :     LWLockRelease(&session_replication_state->lock);
    1271             : 
    1272         446 :     if (flush && local_lsn != InvalidXLogRecPtr)
    1273           2 :         XLogFlush(local_lsn);
    1274             : 
    1275         446 :     return remote_lsn;
    1276             : }
    1277             : 
    1278             : 
    1279             : 
    1280             : /* ---------------------------------------------------------------------------
    1281             :  * SQL functions for working with replication origin.
    1282             :  *
    1283             :  * These mostly should be fairly short wrappers around more generic functions.
    1284             :  * ---------------------------------------------------------------------------
    1285             :  */
    1286             : 
    1287             : /*
    1288             :  * Create replication origin for the passed in name, and return the assigned
    1289             :  * oid.
    1290             :  */
    1291             : Datum
    1292          24 : pg_replication_origin_create(PG_FUNCTION_ARGS)
    1293             : {
    1294             :     char       *name;
    1295             :     RepOriginId roident;
    1296             : 
    1297          24 :     replorigin_check_prerequisites(false, false);
    1298             : 
    1299          24 :     name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
    1300             : 
    1301             :     /*
    1302             :      * Replication origins "any and "none" are reserved for system options.
    1303             :      * The origins "pg_xxx" are reserved for internal use.
    1304             :      */
    1305          24 :     if (IsReservedName(name) || IsReservedOriginName(name))
    1306           6 :         ereport(ERROR,
    1307             :                 (errcode(ERRCODE_RESERVED_NAME),
    1308             :                  errmsg("replication origin name \"%s\" is reserved",
    1309             :                         name),
    1310             :                  errdetail("Origin names \"%s\", \"%s\", and names starting with \"pg_\" are reserved.",
    1311             :                            LOGICALREP_ORIGIN_ANY, LOGICALREP_ORIGIN_NONE)));
    1312             : 
    1313             :     /*
    1314             :      * If built with appropriate switch, whine when regression-testing
    1315             :      * conventions for replication origin names are violated.
    1316             :      */
    1317             : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
    1318             :     if (strncmp(name, "regress_", 8) != 0)
    1319             :         elog(WARNING, "replication origins created by regression test cases should have names starting with \"regress_\"");
    1320             : #endif
    1321             : 
    1322          18 :     roident = replorigin_create(name);
    1323             : 
    1324          10 :     pfree(name);
    1325             : 
    1326          10 :     PG_RETURN_OID(roident);
    1327             : }
    1328             : 
    1329             : /*
    1330             :  * Drop replication origin.
    1331             :  */
    1332             : Datum
    1333          14 : pg_replication_origin_drop(PG_FUNCTION_ARGS)
    1334             : {
    1335             :     char       *name;
    1336             : 
    1337          14 :     replorigin_check_prerequisites(false, false);
    1338             : 
    1339          14 :     name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
    1340             : 
    1341          14 :     replorigin_drop_by_name(name, false, true);
    1342             : 
    1343          12 :     pfree(name);
    1344             : 
    1345          12 :     PG_RETURN_VOID();
    1346             : }
    1347             : 
    1348             : /*
    1349             :  * Return oid of a replication origin.
    1350             :  */
    1351             : Datum
    1352           0 : pg_replication_origin_oid(PG_FUNCTION_ARGS)
    1353             : {
    1354             :     char       *name;
    1355             :     RepOriginId roident;
    1356             : 
    1357           0 :     replorigin_check_prerequisites(false, false);
    1358             : 
    1359           0 :     name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
    1360           0 :     roident = replorigin_by_name(name, true);
    1361             : 
    1362           0 :     pfree(name);
    1363             : 
    1364           0 :     if (OidIsValid(roident))
    1365           0 :         PG_RETURN_OID(roident);
    1366           0 :     PG_RETURN_NULL();
    1367             : }
    1368             : 
    1369             : /*
    1370             :  * Setup a replication origin for this session.
    1371             :  */
    1372             : Datum
    1373          12 : pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
    1374             : {
    1375             :     char       *name;
    1376             :     RepOriginId origin;
    1377             : 
    1378          12 :     replorigin_check_prerequisites(true, false);
    1379             : 
    1380          12 :     name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
    1381          12 :     origin = replorigin_by_name(name, false);
    1382          10 :     replorigin_session_setup(origin, 0);
    1383             : 
    1384           8 :     replorigin_session_origin = origin;
    1385             : 
    1386           8 :     pfree(name);
    1387             : 
    1388           8 :     PG_RETURN_VOID();
    1389             : }
    1390             : 
    1391             : /*
    1392             :  * Reset previously setup origin in this session
    1393             :  */
    1394             : Datum
    1395          10 : pg_replication_origin_session_reset(PG_FUNCTION_ARGS)
    1396             : {
    1397          10 :     replorigin_check_prerequisites(true, false);
    1398             : 
    1399          10 :     replorigin_session_reset();
    1400             : 
    1401           8 :     replorigin_session_origin = InvalidRepOriginId;
    1402           8 :     replorigin_session_origin_lsn = InvalidXLogRecPtr;
    1403           8 :     replorigin_session_origin_timestamp = 0;
    1404             : 
    1405           8 :     PG_RETURN_VOID();
    1406             : }
    1407             : 
    1408             : /*
    1409             :  * Has a replication origin been setup for this session.
    1410             :  */
    1411             : Datum
    1412           0 : pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)
    1413             : {
    1414           0 :     replorigin_check_prerequisites(false, false);
    1415             : 
    1416           0 :     PG_RETURN_BOOL(replorigin_session_origin != InvalidRepOriginId);
    1417             : }
    1418             : 
    1419             : 
    1420             : /*
    1421             :  * Return the replication progress for origin setup in the current session.
    1422             :  *
    1423             :  * If 'flush' is set to true it is ensured that the returned value corresponds
    1424             :  * to a local transaction that has been flushed. This is useful if asynchronous
    1425             :  * commits are used when replaying replicated transactions.
    1426             :  */
    1427             : Datum
    1428           4 : pg_replication_origin_session_progress(PG_FUNCTION_ARGS)
    1429             : {
    1430           4 :     XLogRecPtr  remote_lsn = InvalidXLogRecPtr;
    1431           4 :     bool        flush = PG_GETARG_BOOL(0);
    1432             : 
    1433           4 :     replorigin_check_prerequisites(true, false);
    1434             : 
    1435           4 :     if (session_replication_state == NULL)
    1436           0 :         ereport(ERROR,
    1437             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1438             :                  errmsg("no replication origin is configured")));
    1439             : 
    1440           4 :     remote_lsn = replorigin_session_get_progress(flush);
    1441             : 
    1442           4 :     if (remote_lsn == InvalidXLogRecPtr)
    1443           0 :         PG_RETURN_NULL();
    1444             : 
    1445           4 :     PG_RETURN_LSN(remote_lsn);
    1446             : }
    1447             : 
    1448             : Datum
    1449           2 : pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)
    1450             : {
    1451           2 :     XLogRecPtr  location = PG_GETARG_LSN(0);
    1452             : 
    1453           2 :     replorigin_check_prerequisites(true, false);
    1454             : 
    1455           2 :     if (session_replication_state == NULL)
    1456           0 :         ereport(ERROR,
    1457             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1458             :                  errmsg("no replication origin is configured")));
    1459             : 
    1460           2 :     replorigin_session_origin_lsn = location;
    1461           2 :     replorigin_session_origin_timestamp = PG_GETARG_TIMESTAMPTZ(1);
    1462             : 
    1463           2 :     PG_RETURN_VOID();
    1464             : }
    1465             : 
    1466             : Datum
    1467           0 : pg_replication_origin_xact_reset(PG_FUNCTION_ARGS)
    1468             : {
    1469           0 :     replorigin_check_prerequisites(true, false);
    1470             : 
    1471           0 :     replorigin_session_origin_lsn = InvalidXLogRecPtr;
    1472           0 :     replorigin_session_origin_timestamp = 0;
    1473             : 
    1474           0 :     PG_RETURN_VOID();
    1475             : }
    1476             : 
    1477             : 
    1478             : Datum
    1479           6 : pg_replication_origin_advance(PG_FUNCTION_ARGS)
    1480             : {
    1481           6 :     text       *name = PG_GETARG_TEXT_PP(0);
    1482           6 :     XLogRecPtr  remote_commit = PG_GETARG_LSN(1);
    1483             :     RepOriginId node;
    1484             : 
    1485           6 :     replorigin_check_prerequisites(true, false);
    1486             : 
    1487             :     /* lock to prevent the replication origin from vanishing */
    1488           6 :     LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
    1489             : 
    1490           6 :     node = replorigin_by_name(text_to_cstring(name), false);
    1491             : 
    1492             :     /*
    1493             :      * Can't sensibly pass a local commit to be flushed at checkpoint - this
    1494             :      * xact hasn't committed yet. This is why this function should be used to
    1495             :      * set up the initial replication state, but not for replay.
    1496             :      */
    1497           4 :     replorigin_advance(node, remote_commit, InvalidXLogRecPtr,
    1498             :                        true /* go backward */ , true /* WAL log */ );
    1499             : 
    1500           4 :     UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
    1501             : 
    1502           4 :     PG_RETURN_VOID();
    1503             : }
    1504             : 
    1505             : 
    1506             : /*
    1507             :  * Return the replication progress for an individual replication origin.
    1508             :  *
    1509             :  * If 'flush' is set to true it is ensured that the returned value corresponds
    1510             :  * to a local transaction that has been flushed. This is useful if asynchronous
    1511             :  * commits are used when replaying replicated transactions.
    1512             :  */
    1513             : Datum
    1514           6 : pg_replication_origin_progress(PG_FUNCTION_ARGS)
    1515             : {
    1516             :     char       *name;
    1517             :     bool        flush;
    1518             :     RepOriginId roident;
    1519           6 :     XLogRecPtr  remote_lsn = InvalidXLogRecPtr;
    1520             : 
    1521           6 :     replorigin_check_prerequisites(true, true);
    1522             : 
    1523           6 :     name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
    1524           6 :     flush = PG_GETARG_BOOL(1);
    1525             : 
    1526           6 :     roident = replorigin_by_name(name, false);
    1527             :     Assert(OidIsValid(roident));
    1528             : 
    1529           4 :     remote_lsn = replorigin_get_progress(roident, flush);
    1530             : 
    1531           4 :     if (remote_lsn == InvalidXLogRecPtr)
    1532           0 :         PG_RETURN_NULL();
    1533             : 
    1534           4 :     PG_RETURN_LSN(remote_lsn);
    1535             : }
    1536             : 
    1537             : 
    1538             : Datum
    1539          16 : pg_show_replication_origin_status(PG_FUNCTION_ARGS)
    1540             : {
    1541          16 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
    1542             :     int         i;
    1543             : #define REPLICATION_ORIGIN_PROGRESS_COLS 4
    1544             : 
    1545             :     /* we want to return 0 rows if slot is set to zero */
    1546          16 :     replorigin_check_prerequisites(false, true);
    1547             : 
    1548          16 :     InitMaterializedSRF(fcinfo, 0);
    1549             : 
    1550             :     /* prevent slots from being concurrently dropped */
    1551          16 :     LWLockAcquire(ReplicationOriginLock, LW_SHARED);
    1552             : 
    1553             :     /*
    1554             :      * Iterate through all possible replication_states, display if they are
    1555             :      * filled. Note that we do not take any locks, so slightly corrupted/out
    1556             :      * of date values are a possibility.
    1557             :      */
    1558         176 :     for (i = 0; i < max_active_replication_origins; i++)
    1559             :     {
    1560             :         ReplicationState *state;
    1561             :         Datum       values[REPLICATION_ORIGIN_PROGRESS_COLS];
    1562             :         bool        nulls[REPLICATION_ORIGIN_PROGRESS_COLS];
    1563             :         char       *roname;
    1564             : 
    1565         160 :         state = &replication_states[i];
    1566             : 
    1567             :         /* unused slot, nothing to display */
    1568         160 :         if (state->roident == InvalidRepOriginId)
    1569         138 :             continue;
    1570             : 
    1571          22 :         memset(values, 0, sizeof(values));
    1572          22 :         memset(nulls, 1, sizeof(nulls));
    1573             : 
    1574          22 :         values[0] = ObjectIdGetDatum(state->roident);
    1575          22 :         nulls[0] = false;
    1576             : 
    1577             :         /*
    1578             :          * We're not preventing the origin to be dropped concurrently, so
    1579             :          * silently accept that it might be gone.
    1580             :          */
    1581          22 :         if (replorigin_by_oid(state->roident, true,
    1582             :                               &roname))
    1583             :         {
    1584          22 :             values[1] = CStringGetTextDatum(roname);
    1585          22 :             nulls[1] = false;
    1586             :         }
    1587             : 
    1588          22 :         LWLockAcquire(&state->lock, LW_SHARED);
    1589             : 
    1590          22 :         values[2] = LSNGetDatum(state->remote_lsn);
    1591          22 :         nulls[2] = false;
    1592             : 
    1593          22 :         values[3] = LSNGetDatum(state->local_lsn);
    1594          22 :         nulls[3] = false;
    1595             : 
    1596          22 :         LWLockRelease(&state->lock);
    1597             : 
    1598          22 :         tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
    1599             :                              values, nulls);
    1600             :     }
    1601             : 
    1602          16 :     LWLockRelease(ReplicationOriginLock);
    1603             : 
    1604             : #undef REPLICATION_ORIGIN_PROGRESS_COLS
    1605             : 
    1606          16 :     return (Datum) 0;
    1607             : }

Generated by: LCOV version 1.14