LCOV - code coverage report
Current view: top level - src/backend/replication/logical - origin.c (source / functions) Hit Total Coverage
Test: PostgreSQL 13devel Lines: 319 454 70.3 %
Date: 2019-11-15 22:06:47 Functions: 24 29 82.8 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * origin.c
       4             :  *    Logical replication progress tracking support.
       5             :  *
       6             :  * Copyright (c) 2013-2019, PostgreSQL Global Development Group
       7             :  *
       8             :  * IDENTIFICATION
       9             :  *    src/backend/replication/logical/origin.c
      10             :  *
      11             :  * NOTES
      12             :  *
      13             :  * This file provides the following:
      14             :  * * An infrastructure to name nodes in a replication setup
      15             :  * * A facility to efficiently store and persist replication progress in an
      16             :  *   efficient and durable manner.
      17             :  *
      18             :  * Replication origin consist out of a descriptive, user defined, external
      19             :  * name and a short, thus space efficient, internal 2 byte one. This split
      20             :  * exists because replication origin have to be stored in WAL and shared
      21             :  * memory and long descriptors would be inefficient.  For now only use 2 bytes
      22             :  * for the internal id of a replication origin as it seems unlikely that there
      23             :  * soon will be more than 65k nodes in one replication setup; and using only
      24             :  * two bytes allow us to be more space efficient.
      25             :  *
      26             :  * Replication progress is tracked in a shared memory table
      27             :  * (ReplicationState) that's dumped to disk every checkpoint. Entries
      28             :  * ('slots') in this table are identified by the internal id. That's the case
      29             :  * because it allows to increase replication progress during crash
      30             :  * recovery. To allow doing so we store the original LSN (from the originating
      31             :  * system) of a transaction in the commit record. That allows to recover the
      32             :  * precise replayed state after crash recovery; without requiring synchronous
      33             :  * commits. Allowing logical replication to use asynchronous commit is
      34             :  * generally good for performance, but especially important as it allows a
      35             :  * single threaded replay process to keep up with a source that has multiple
      36             :  * backends generating changes concurrently.  For efficiency and simplicity
      37             :  * reasons a backend can setup one replication origin that's from then used as
      38             :  * the source of changes produced by the backend, until reset again.
      39             :  *
      40             :  * This infrastructure is intended to be used in cooperation with logical
      41             :  * decoding. When replaying from a remote system the configured origin is
      42             :  * provided to output plugins, allowing prevention of replication loops and
      43             :  * other filtering.
      44             :  *
      45             :  * There are several levels of locking at work:
      46             :  *
      47             :  * * To create and drop replication origins an exclusive lock on
      48             :  *   pg_replication_slot is required for the duration. That allows us to
      49             :  *   safely and conflict free assign new origins using a dirty snapshot.
      50             :  *
      51             :  * * When creating an in-memory replication progress slot the ReplicationOrigin
      52             :  *   LWLock has to be held exclusively; when iterating over the replication
      53             :  *   progress a shared lock has to be held, the same when advancing the
      54             :  *   replication progress of an individual backend that has not setup as the
      55             :  *   session's replication origin.
      56             :  *
      57             :  * * When manipulating or looking at the remote_lsn and local_lsn fields of a
      58             :  *   replication progress slot that slot's lwlock has to be held. That's
      59             :  *   primarily because we do not assume 8 byte writes (the LSN) is atomic on
      60             :  *   all our platforms, but it also simplifies memory ordering concerns
      61             :  *   between the remote and local lsn. We use a lwlock instead of a spinlock
      62             :  *   so it's less harmful to hold the lock over a WAL write
      63             :  *   (cf. AdvanceReplicationProgress).
      64             :  *
      65             :  * ---------------------------------------------------------------------------
      66             :  */
      67             : 
      68             : #include "postgres.h"
      69             : 
      70             : #include <unistd.h>
      71             : #include <sys/stat.h>
      72             : 
      73             : #include "access/genam.h"
      74             : #include "access/htup_details.h"
      75             : #include "access/table.h"
      76             : #include "access/xact.h"
      77             : #include "catalog/catalog.h"
      78             : #include "catalog/indexing.h"
      79             : #include "funcapi.h"
      80             : #include "miscadmin.h"
      81             : #include "nodes/execnodes.h"
      82             : #include "pgstat.h"
      83             : #include "replication/logical.h"
      84             : #include "replication/origin.h"
      85             : #include "storage/condition_variable.h"
      86             : #include "storage/copydir.h"
      87             : #include "storage/fd.h"
      88             : #include "storage/ipc.h"
      89             : #include "storage/lmgr.h"
      90             : #include "utils/builtins.h"
      91             : #include "utils/fmgroids.h"
      92             : #include "utils/pg_lsn.h"
      93             : #include "utils/rel.h"
      94             : #include "utils/snapmgr.h"
      95             : #include "utils/syscache.h"
      96             : 
      97             : /*
      98             :  * Replay progress of a single remote node.
      99             :  */
     100             : typedef struct ReplicationState
     101             : {
     102             :     /*
     103             :      * Local identifier for the remote node.
     104             :      */
     105             :     RepOriginId roident;
     106             : 
     107             :     /*
     108             :      * Location of the latest commit from the remote side.
     109             :      */
     110             :     XLogRecPtr  remote_lsn;
     111             : 
     112             :     /*
     113             :      * Remember the local lsn of the commit record so we can XLogFlush() to it
     114             :      * during a checkpoint so we know the commit record actually is safe on
     115             :      * disk.
     116             :      */
     117             :     XLogRecPtr  local_lsn;
     118             : 
     119             :     /*
     120             :      * PID of backend that's acquired slot, or 0 if none.
     121             :      */
     122             :     int         acquired_by;
     123             : 
     124             :     /*
     125             :      * Condition variable that's signalled when acquired_by changes.
     126             :      */
     127             :     ConditionVariable origin_cv;
     128             : 
     129             :     /*
     130             :      * Lock protecting remote_lsn and local_lsn.
     131             :      */
     132             :     LWLock      lock;
     133             : } ReplicationState;
     134             : 
     135             : /*
     136             :  * On disk version of ReplicationState.
     137             :  */
     138             : typedef struct ReplicationStateOnDisk
     139             : {
     140             :     RepOriginId roident;
     141             :     XLogRecPtr  remote_lsn;
     142             : } ReplicationStateOnDisk;
     143             : 
     144             : 
     145             : typedef struct ReplicationStateCtl
     146             : {
     147             :     int         tranche_id;
     148             :     ReplicationState states[FLEXIBLE_ARRAY_MEMBER];
     149             : } ReplicationStateCtl;
     150             : 
     151             : /* external variables */
     152             : RepOriginId replorigin_session_origin = InvalidRepOriginId; /* assumed identity */
     153             : XLogRecPtr  replorigin_session_origin_lsn = InvalidXLogRecPtr;
     154             : TimestampTz replorigin_session_origin_timestamp = 0;
     155             : 
     156             : /*
     157             :  * Base address into a shared memory array of replication states of size
     158             :  * max_replication_slots.
     159             :  *
     160             :  * XXX: Should we use a separate variable to size this rather than
     161             :  * max_replication_slots?
     162             :  */
     163             : static ReplicationState *replication_states;
     164             : static ReplicationStateCtl *replication_states_ctl;
     165             : 
     166             : /*
     167             :  * Backend-local, cached element from ReplicationState for use in a backend
     168             :  * replaying remote commits, so we don't have to search ReplicationState for
     169             :  * the backends current RepOriginId.
     170             :  */
     171             : static ReplicationState *session_replication_state = NULL;
     172             : 
     173             : /* Magic for on disk files. */
     174             : #define REPLICATION_STATE_MAGIC ((uint32) 0x1257DADE)
     175             : 
     176             : static void
     177          38 : replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
     178             : {
     179          38 :     if (!superuser())
     180           0 :         ereport(ERROR,
     181             :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     182             :                  errmsg("only superusers can query or manipulate replication origins")));
     183             : 
     184          38 :     if (check_slots && max_replication_slots == 0)
     185           0 :         ereport(ERROR,
     186             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     187             :                  errmsg("cannot query or manipulate replication origin when max_replication_slots = 0")));
     188             : 
     189          38 :     if (!recoveryOK && RecoveryInProgress())
     190           0 :         ereport(ERROR,
     191             :                 (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
     192             :                  errmsg("cannot manipulate replication origins during recovery")));
     193             : 
     194          38 : }
     195             : 
     196             : 
     197             : /* ---------------------------------------------------------------------------
     198             :  * Functions for working with replication origins themselves.
     199             :  * ---------------------------------------------------------------------------
     200             :  */
     201             : 
     202             : /*
     203             :  * Check for a persistent replication origin identified by name.
     204             :  *
     205             :  * Returns InvalidOid if the node isn't known yet and missing_ok is true.
     206             :  */
     207             : RepOriginId
     208         102 : replorigin_by_name(char *roname, bool missing_ok)
     209             : {
     210             :     Form_pg_replication_origin ident;
     211         102 :     Oid         roident = InvalidOid;
     212             :     HeapTuple   tuple;
     213             :     Datum       roname_d;
     214             : 
     215         102 :     roname_d = CStringGetTextDatum(roname);
     216             : 
     217         102 :     tuple = SearchSysCache1(REPLORIGNAME, roname_d);
     218         102 :     if (HeapTupleIsValid(tuple))
     219             :     {
     220          94 :         ident = (Form_pg_replication_origin) GETSTRUCT(tuple);
     221          94 :         roident = ident->roident;
     222          94 :         ReleaseSysCache(tuple);
     223             :     }
     224           8 :     else if (!missing_ok)
     225           8 :         ereport(ERROR,
     226             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
     227             :                  errmsg("replication origin \"%s\" does not exist",
     228             :                         roname)));
     229             : 
     230          94 :     return roident;
     231             : }
     232             : 
     233             : /*
     234             :  * Create a replication origin.
     235             :  *
     236             :  * Needs to be called in a transaction.
     237             :  */
     238             : RepOriginId
     239          56 : replorigin_create(char *roname)
     240             : {
     241             :     Oid         roident;
     242          56 :     HeapTuple   tuple = NULL;
     243             :     Relation    rel;
     244             :     Datum       roname_d;
     245             :     SnapshotData SnapshotDirty;
     246             :     SysScanDesc scan;
     247             :     ScanKeyData key;
     248             : 
     249          56 :     roname_d = CStringGetTextDatum(roname);
     250             : 
     251             :     Assert(IsTransactionState());
     252             : 
     253             :     /*
     254             :      * We need the numeric replication origin to be 16bit wide, so we cannot
     255             :      * rely on the normal oid allocation. Instead we simply scan
     256             :      * pg_replication_origin for the first unused id. That's not particularly
     257             :      * efficient, but this should be a fairly infrequent operation - we can
     258             :      * easily spend a bit more code on this when it turns out it needs to be
     259             :      * faster.
     260             :      *
     261             :      * We handle concurrency by taking an exclusive lock (allowing reads!)
     262             :      * over the table for the duration of the search. Because we use a "dirty
     263             :      * snapshot" we can read rows that other in-progress sessions have
     264             :      * written, even though they would be invisible with normal snapshots. Due
     265             :      * to the exclusive lock there's no danger that new rows can appear while
     266             :      * we're checking.
     267             :      */
     268          56 :     InitDirtySnapshot(SnapshotDirty);
     269             : 
     270          56 :     rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
     271             : 
     272          72 :     for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
     273             :     {
     274             :         bool        nulls[Natts_pg_replication_origin];
     275             :         Datum       values[Natts_pg_replication_origin];
     276             :         bool        collides;
     277             : 
     278          72 :         CHECK_FOR_INTERRUPTS();
     279             : 
     280          72 :         ScanKeyInit(&key,
     281             :                     Anum_pg_replication_origin_roident,
     282             :                     BTEqualStrategyNumber, F_OIDEQ,
     283             :                     ObjectIdGetDatum(roident));
     284             : 
     285          72 :         scan = systable_beginscan(rel, ReplicationOriginIdentIndex,
     286             :                                   true /* indexOK */ ,
     287             :                                   &SnapshotDirty,
     288             :                                   1, &key);
     289             : 
     290          72 :         collides = HeapTupleIsValid(systable_getnext(scan));
     291             : 
     292          72 :         systable_endscan(scan);
     293             : 
     294          72 :         if (!collides)
     295             :         {
     296             :             /*
     297             :              * Ok, found an unused roident, insert the new row and do a CCI,
     298             :              * so our callers can look it up if they want to.
     299             :              */
     300          56 :             memset(&nulls, 0, sizeof(nulls));
     301             : 
     302          56 :             values[Anum_pg_replication_origin_roident - 1] = ObjectIdGetDatum(roident);
     303          56 :             values[Anum_pg_replication_origin_roname - 1] = roname_d;
     304             : 
     305          56 :             tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
     306          56 :             CatalogTupleInsert(rel, tuple);
     307          54 :             CommandCounterIncrement();
     308          54 :             break;
     309             :         }
     310             :     }
     311             : 
     312             :     /* now release lock again,  */
     313          54 :     table_close(rel, ExclusiveLock);
     314             : 
     315          54 :     if (tuple == NULL)
     316           0 :         ereport(ERROR,
     317             :                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
     318             :                  errmsg("could not find free replication origin OID")));
     319             : 
     320          54 :     heap_freetuple(tuple);
     321          54 :     return roident;
     322             : }
     323             : 
     324             : 
     325             : /*
     326             :  * Drop replication origin.
     327             :  *
     328             :  * Needs to be called in a transaction.
     329             :  */
     330             : void
     331          32 : replorigin_drop(RepOriginId roident, bool nowait)
     332             : {
     333             :     HeapTuple   tuple;
     334             :     Relation    rel;
     335             :     int         i;
     336             : 
     337             :     Assert(IsTransactionState());
     338             : 
     339             :     /*
     340             :      * To interlock against concurrent drops, we hold ExclusiveLock on
     341             :      * pg_replication_origin throughout this function.
     342             :      */
     343          32 :     rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
     344             : 
     345             :     /*
     346             :      * First, clean up the slot state info, if there is any matching slot.
     347             :      */
     348             : restart:
     349          32 :     tuple = NULL;
     350          32 :     LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
     351             : 
     352         184 :     for (i = 0; i < max_replication_slots; i++)
     353             :     {
     354         168 :         ReplicationState *state = &replication_states[i];
     355             : 
     356         168 :         if (state->roident == roident)
     357             :         {
     358             :             /* found our slot, is it busy? */
     359          16 :             if (state->acquired_by != 0)
     360             :             {
     361             :                 ConditionVariable *cv;
     362             : 
     363           0 :                 if (nowait)
     364           0 :                     ereport(ERROR,
     365             :                             (errcode(ERRCODE_OBJECT_IN_USE),
     366             :                              errmsg("could not drop replication origin with OID %d, in use by PID %d",
     367             :                                     state->roident,
     368             :                                     state->acquired_by)));
     369             : 
     370             :                 /*
     371             :                  * We must wait and then retry.  Since we don't know which CV
     372             :                  * to wait on until here, we can't readily use
     373             :                  * ConditionVariablePrepareToSleep (calling it here would be
     374             :                  * wrong, since we could miss the signal if we did so); just
     375             :                  * use ConditionVariableSleep directly.
     376             :                  */
     377           0 :                 cv = &state->origin_cv;
     378             : 
     379           0 :                 LWLockRelease(ReplicationOriginLock);
     380             : 
     381           0 :                 ConditionVariableSleep(cv, WAIT_EVENT_REPLICATION_ORIGIN_DROP);
     382           0 :                 goto restart;
     383             :             }
     384             : 
     385             :             /* first make a WAL log entry */
     386             :             {
     387             :                 xl_replorigin_drop xlrec;
     388             : 
     389          16 :                 xlrec.node_id = roident;
     390          16 :                 XLogBeginInsert();
     391          16 :                 XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
     392          16 :                 XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_DROP);
     393             :             }
     394             : 
     395             :             /* then clear the in-memory slot */
     396          16 :             state->roident = InvalidRepOriginId;
     397          16 :             state->remote_lsn = InvalidXLogRecPtr;
     398          16 :             state->local_lsn = InvalidXLogRecPtr;
     399          16 :             break;
     400             :         }
     401             :     }
     402          32 :     LWLockRelease(ReplicationOriginLock);
     403          32 :     ConditionVariableCancelSleep();
     404             : 
     405             :     /*
     406             :      * Now, we can delete the catalog entry.
     407             :      */
     408          32 :     tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident));
     409          32 :     if (!HeapTupleIsValid(tuple))
     410           0 :         elog(ERROR, "cache lookup failed for replication origin with oid %u",
     411             :              roident);
     412             : 
     413          32 :     CatalogTupleDelete(rel, &tuple->t_self);
     414          32 :     ReleaseSysCache(tuple);
     415             : 
     416          32 :     CommandCounterIncrement();
     417             : 
     418             :     /* now release lock again */
     419          32 :     table_close(rel, ExclusiveLock);
     420          32 : }
     421             : 
     422             : 
     423             : /*
     424             :  * Lookup replication origin via it's oid and return the name.
     425             :  *
     426             :  * The external name is palloc'd in the calling context.
     427             :  *
     428             :  * Returns true if the origin is known, false otherwise.
     429             :  */
     430             : bool
     431           2 : replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
     432             : {
     433             :     HeapTuple   tuple;
     434             :     Form_pg_replication_origin ric;
     435             : 
     436             :     Assert(OidIsValid((Oid) roident));
     437             :     Assert(roident != InvalidRepOriginId);
     438             :     Assert(roident != DoNotReplicateId);
     439             : 
     440           2 :     tuple = SearchSysCache1(REPLORIGIDENT,
     441             :                             ObjectIdGetDatum((Oid) roident));
     442             : 
     443           2 :     if (HeapTupleIsValid(tuple))
     444             :     {
     445           2 :         ric = (Form_pg_replication_origin) GETSTRUCT(tuple);
     446           2 :         *roname = text_to_cstring(&ric->roname);
     447           2 :         ReleaseSysCache(tuple);
     448             : 
     449           2 :         return true;
     450             :     }
     451             :     else
     452             :     {
     453           0 :         *roname = NULL;
     454             : 
     455           0 :         if (!missing_ok)
     456           0 :             ereport(ERROR,
     457             :                     (errcode(ERRCODE_UNDEFINED_OBJECT),
     458             :                      errmsg("replication origin with OID %u does not exist",
     459             :                             roident)));
     460             : 
     461           0 :         return false;
     462             :     }
     463             : }
     464             : 
     465             : 
     466             : /* ---------------------------------------------------------------------------
     467             :  * Functions for handling replication progress.
     468             :  * ---------------------------------------------------------------------------
     469             :  */
     470             : 
     471             : Size
     472        5674 : ReplicationOriginShmemSize(void)
     473             : {
     474        5674 :     Size        size = 0;
     475             : 
     476             :     /*
     477             :      * XXX: max_replication_slots is arguably the wrong thing to use, as here
     478             :      * we keep the replay state of *remote* transactions. But for now it seems
     479             :      * sufficient to reuse it, lest we introduce a separate GUC.
     480             :      */
     481        5674 :     if (max_replication_slots == 0)
     482           0 :         return size;
     483             : 
     484        5674 :     size = add_size(size, offsetof(ReplicationStateCtl, states));
     485             : 
     486        5674 :     size = add_size(size,
     487             :                     mul_size(max_replication_slots, sizeof(ReplicationState)));
     488        5674 :     return size;
     489             : }
     490             : 
     491             : void
     492        1890 : ReplicationOriginShmemInit(void)
     493             : {
     494             :     bool        found;
     495             : 
     496        1890 :     if (max_replication_slots == 0)
     497           0 :         return;
     498             : 
     499        1890 :     replication_states_ctl = (ReplicationStateCtl *)
     500        1890 :         ShmemInitStruct("ReplicationOriginState",
     501             :                         ReplicationOriginShmemSize(),
     502             :                         &found);
     503        1890 :     replication_states = replication_states_ctl->states;
     504             : 
     505        1890 :     if (!found)
     506             :     {
     507             :         int         i;
     508             : 
     509        1890 :         replication_states_ctl->tranche_id = LWTRANCHE_REPLICATION_ORIGIN;
     510             : 
     511        1890 :         MemSet(replication_states, 0, ReplicationOriginShmemSize());
     512             : 
     513       19222 :         for (i = 0; i < max_replication_slots; i++)
     514             :         {
     515       17332 :             LWLockInitialize(&replication_states[i].lock,
     516       17332 :                              replication_states_ctl->tranche_id);
     517       17332 :             ConditionVariableInit(&replication_states[i].origin_cv);
     518             :         }
     519             :     }
     520             : 
     521        1890 :     LWLockRegisterTranche(replication_states_ctl->tranche_id,
     522             :                           "replication_origin");
     523             : }
     524             : 
     525             : /* ---------------------------------------------------------------------------
     526             :  * Perform a checkpoint of each replication origin's progress with respect to
     527             :  * the replayed remote_lsn. Make sure that all transactions we refer to in the
     528             :  * checkpoint (local_lsn) are actually on-disk. This might not yet be the case
     529             :  * if the transactions were originally committed asynchronously.
     530             :  *
     531             :  * We store checkpoints in the following format:
     532             :  * +-------+------------------------+------------------+-----+--------+
     533             :  * | MAGIC | ReplicationStateOnDisk | struct Replic... | ... | CRC32C | EOF
     534             :  * +-------+------------------------+------------------+-----+--------+
     535             :  *
     536             :  * So its just the magic, followed by the statically sized
     537             :  * ReplicationStateOnDisk structs. Note that the maximum number of
     538             :  * ReplicationState is determined by max_replication_slots.
     539             :  * ---------------------------------------------------------------------------
     540             :  */
     541             : void
     542        2832 : CheckPointReplicationOrigin(void)
     543             : {
     544        2832 :     const char *tmppath = "pg_logical/replorigin_checkpoint.tmp";
     545        2832 :     const char *path = "pg_logical/replorigin_checkpoint";
     546             :     int         tmpfd;
     547             :     int         i;
     548        2832 :     uint32      magic = REPLICATION_STATE_MAGIC;
     549             :     pg_crc32c   crc;
     550             : 
     551        2832 :     if (max_replication_slots == 0)
     552           0 :         return;
     553             : 
     554        2832 :     INIT_CRC32C(crc);
     555             : 
     556             :     /* make sure no old temp file is remaining */
     557        2832 :     if (unlink(tmppath) < 0 && errno != ENOENT)
     558           0 :         ereport(PANIC,
     559             :                 (errcode_for_file_access(),
     560             :                  errmsg("could not remove file \"%s\": %m",
     561             :                         tmppath)));
     562             : 
     563             :     /*
     564             :      * no other backend can perform this at the same time, we're protected by
     565             :      * CheckpointLock.
     566             :      */
     567        2832 :     tmpfd = OpenTransientFile(tmppath,
     568             :                               O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
     569        2832 :     if (tmpfd < 0)
     570           0 :         ereport(PANIC,
     571             :                 (errcode_for_file_access(),
     572             :                  errmsg("could not create file \"%s\": %m",
     573             :                         tmppath)));
     574             : 
     575             :     /* write magic */
     576        2832 :     errno = 0;
     577        2832 :     if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
     578             :     {
     579             :         /* if write didn't set errno, assume problem is no disk space */
     580           0 :         if (errno == 0)
     581           0 :             errno = ENOSPC;
     582           0 :         ereport(PANIC,
     583             :                 (errcode_for_file_access(),
     584             :                  errmsg("could not write to file \"%s\": %m",
     585             :                         tmppath)));
     586             :     }
     587        2832 :     COMP_CRC32C(crc, &magic, sizeof(magic));
     588             : 
     589             :     /* prevent concurrent creations/drops */
     590        2832 :     LWLockAcquire(ReplicationOriginLock, LW_SHARED);
     591             : 
     592             :     /* write actual data */
     593       29884 :     for (i = 0; i < max_replication_slots; i++)
     594             :     {
     595             :         ReplicationStateOnDisk disk_state;
     596       27052 :         ReplicationState *curstate = &replication_states[i];
     597             :         XLogRecPtr  local_lsn;
     598             : 
     599       27052 :         if (curstate->roident == InvalidRepOriginId)
     600       27038 :             continue;
     601             : 
     602             :         /* zero, to avoid uninitialized padding bytes */
     603          14 :         memset(&disk_state, 0, sizeof(disk_state));
     604             : 
     605          14 :         LWLockAcquire(&curstate->lock, LW_SHARED);
     606             : 
     607          14 :         disk_state.roident = curstate->roident;
     608             : 
     609          14 :         disk_state.remote_lsn = curstate->remote_lsn;
     610          14 :         local_lsn = curstate->local_lsn;
     611             : 
     612          14 :         LWLockRelease(&curstate->lock);
     613             : 
     614             :         /* make sure we only write out a commit that's persistent */
     615          14 :         XLogFlush(local_lsn);
     616             : 
     617          14 :         errno = 0;
     618          14 :         if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
     619             :             sizeof(disk_state))
     620             :         {
     621             :             /* if write didn't set errno, assume problem is no disk space */
     622           0 :             if (errno == 0)
     623           0 :                 errno = ENOSPC;
     624           0 :             ereport(PANIC,
     625             :                     (errcode_for_file_access(),
     626             :                      errmsg("could not write to file \"%s\": %m",
     627             :                             tmppath)));
     628             :         }
     629             : 
     630          14 :         COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
     631             :     }
     632             : 
     633        2832 :     LWLockRelease(ReplicationOriginLock);
     634             : 
     635             :     /* write out the CRC */
     636        2832 :     FIN_CRC32C(crc);
     637        2832 :     errno = 0;
     638        2832 :     if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
     639             :     {
     640             :         /* if write didn't set errno, assume problem is no disk space */
     641           0 :         if (errno == 0)
     642           0 :             errno = ENOSPC;
     643           0 :         ereport(PANIC,
     644             :                 (errcode_for_file_access(),
     645             :                  errmsg("could not write to file \"%s\": %m",
     646             :                         tmppath)));
     647             :     }
     648             : 
     649        2832 :     if (CloseTransientFile(tmpfd) != 0)
     650           0 :         ereport(PANIC,
     651             :                 (errcode_for_file_access(),
     652             :                  errmsg("could not close file \"%s\": %m",
     653             :                         tmppath)));
     654             : 
     655             :     /* fsync, rename to permanent file, fsync file and directory */
     656        2832 :     durable_rename(tmppath, path, PANIC);
     657             : }
     658             : 
     659             : /*
     660             :  * Recover replication replay status from checkpoint data saved earlier by
     661             :  * CheckPointReplicationOrigin.
     662             :  *
     663             :  * This only needs to be called at startup and *not* during every checkpoint
     664             :  * read during recovery (e.g. in HS or PITR from a base backup) afterwards. All
     665             :  * state thereafter can be recovered by looking at commit records.
     666             :  */
     667             : void
     668        1192 : StartupReplicationOrigin(void)
     669             : {
     670        1192 :     const char *path = "pg_logical/replorigin_checkpoint";
     671             :     int         fd;
     672             :     int         readBytes;
     673        1192 :     uint32      magic = REPLICATION_STATE_MAGIC;
     674        1192 :     int         last_state = 0;
     675             :     pg_crc32c   file_crc;
     676             :     pg_crc32c   crc;
     677             : 
     678             :     /* don't want to overwrite already existing state */
     679             : #ifdef USE_ASSERT_CHECKING
     680             :     static bool already_started = false;
     681             : 
     682             :     Assert(!already_started);
     683             :     already_started = true;
     684             : #endif
     685             : 
     686        1192 :     if (max_replication_slots == 0)
     687         322 :         return;
     688             : 
     689        1192 :     INIT_CRC32C(crc);
     690             : 
     691        1192 :     elog(DEBUG2, "starting up replication origin progress state");
     692             : 
     693        1192 :     fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
     694             : 
     695             :     /*
     696             :      * might have had max_replication_slots == 0 last run, or we just brought
     697             :      * up a standby.
     698             :      */
     699        1192 :     if (fd < 0 && errno == ENOENT)
     700         322 :         return;
     701         870 :     else if (fd < 0)
     702           0 :         ereport(PANIC,
     703             :                 (errcode_for_file_access(),
     704             :                  errmsg("could not open file \"%s\": %m",
     705             :                         path)));
     706             : 
     707             :     /* verify magic, that is written even if nothing was active */
     708         870 :     readBytes = read(fd, &magic, sizeof(magic));
     709         870 :     if (readBytes != sizeof(magic))
     710             :     {
     711           0 :         if (readBytes < 0)
     712           0 :             ereport(PANIC,
     713             :                     (errcode_for_file_access(),
     714             :                      errmsg("could not read file \"%s\": %m",
     715             :                             path)));
     716             :         else
     717           0 :             ereport(PANIC,
     718             :                     (errcode(ERRCODE_DATA_CORRUPTED),
     719             :                      errmsg("could not read file \"%s\": read %d of %zu",
     720             :                             path, readBytes, sizeof(magic))));
     721             :     }
     722         870 :     COMP_CRC32C(crc, &magic, sizeof(magic));
     723             : 
     724         870 :     if (magic != REPLICATION_STATE_MAGIC)
     725           0 :         ereport(PANIC,
     726             :                 (errmsg("replication checkpoint has wrong magic %u instead of %u",
     727             :                         magic, REPLICATION_STATE_MAGIC)));
     728             : 
     729             :     /* we can skip locking here, no other access is possible */
     730             : 
     731             :     /* recover individual states, until there are no more to be found */
     732             :     while (true)
     733           0 :     {
     734             :         ReplicationStateOnDisk disk_state;
     735             : 
     736         870 :         readBytes = read(fd, &disk_state, sizeof(disk_state));
     737             : 
     738             :         /* no further data */
     739         870 :         if (readBytes == sizeof(crc))
     740             :         {
     741             :             /* not pretty, but simple ... */
     742         870 :             file_crc = *(pg_crc32c *) &disk_state;
     743         870 :             break;
     744             :         }
     745             : 
     746           0 :         if (readBytes < 0)
     747             :         {
     748           0 :             ereport(PANIC,
     749             :                     (errcode_for_file_access(),
     750             :                      errmsg("could not read file \"%s\": %m",
     751             :                             path)));
     752             :         }
     753             : 
     754           0 :         if (readBytes != sizeof(disk_state))
     755             :         {
     756           0 :             ereport(PANIC,
     757             :                     (errcode_for_file_access(),
     758             :                      errmsg("could not read file \"%s\": read %d of %zu",
     759             :                             path, readBytes, sizeof(disk_state))));
     760             :         }
     761             : 
     762           0 :         COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
     763             : 
     764           0 :         if (last_state == max_replication_slots)
     765           0 :             ereport(PANIC,
     766             :                     (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     767             :                      errmsg("could not find free replication state, increase max_replication_slots")));
     768             : 
     769             :         /* copy data to shared memory */
     770           0 :         replication_states[last_state].roident = disk_state.roident;
     771           0 :         replication_states[last_state].remote_lsn = disk_state.remote_lsn;
     772           0 :         last_state++;
     773             : 
     774           0 :         elog(LOG, "recovered replication state of node %u to %X/%X",
     775             :              disk_state.roident,
     776             :              (uint32) (disk_state.remote_lsn >> 32),
     777             :              (uint32) disk_state.remote_lsn);
     778             :     }
     779             : 
     780             :     /* now check checksum */
     781         870 :     FIN_CRC32C(crc);
     782         870 :     if (file_crc != crc)
     783           0 :         ereport(PANIC,
     784             :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     785             :                  errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
     786             :                         crc, file_crc)));
     787             : 
     788         870 :     if (CloseTransientFile(fd) != 0)
     789           0 :         ereport(PANIC,
     790             :                 (errcode_for_file_access(),
     791             :                  errmsg("could not close file \"%s\": %m",
     792             :                         path)));
     793             : }
     794             : 
     795             : void
     796           0 : replorigin_redo(XLogReaderState *record)
     797             : {
     798           0 :     uint8       info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
     799             : 
     800           0 :     switch (info)
     801             :     {
     802             :         case XLOG_REPLORIGIN_SET:
     803             :             {
     804           0 :                 xl_replorigin_set *xlrec =
     805             :                 (xl_replorigin_set *) XLogRecGetData(record);
     806             : 
     807           0 :                 replorigin_advance(xlrec->node_id,
     808             :                                    xlrec->remote_lsn, record->EndRecPtr,
     809           0 :                                    xlrec->force /* backward */ ,
     810             :                                    false /* WAL log */ );
     811           0 :                 break;
     812             :             }
     813             :         case XLOG_REPLORIGIN_DROP:
     814             :             {
     815             :                 xl_replorigin_drop *xlrec;
     816             :                 int         i;
     817             : 
     818           0 :                 xlrec = (xl_replorigin_drop *) XLogRecGetData(record);
     819             : 
     820           0 :                 for (i = 0; i < max_replication_slots; i++)
     821             :                 {
     822           0 :                     ReplicationState *state = &replication_states[i];
     823             : 
     824             :                     /* found our slot */
     825           0 :                     if (state->roident == xlrec->node_id)
     826             :                     {
     827             :                         /* reset entry */
     828           0 :                         state->roident = InvalidRepOriginId;
     829           0 :                         state->remote_lsn = InvalidXLogRecPtr;
     830           0 :                         state->local_lsn = InvalidXLogRecPtr;
     831           0 :                         break;
     832             :                     }
     833             :                 }
     834           0 :                 break;
     835             :             }
     836             :         default:
     837           0 :             elog(PANIC, "replorigin_redo: unknown op code %u", info);
     838             :     }
     839           0 : }
     840             : 
     841             : 
     842             : /*
     843             :  * Tell the replication origin progress machinery that a commit from 'node'
     844             :  * that originated at the LSN remote_commit on the remote node was replayed
     845             :  * successfully and that we don't need to do so again. In combination with
     846             :  * setting up replorigin_session_origin_lsn and replorigin_session_origin
     847             :  * that ensures we won't loose knowledge about that after a crash if the
     848             :  * transaction had a persistent effect (think of asynchronous commits).
     849             :  *
     850             :  * local_commit needs to be a local LSN of the commit so that we can make sure
     851             :  * upon a checkpoint that enough WAL has been persisted to disk.
     852             :  *
     853             :  * Needs to be called with a RowExclusiveLock on pg_replication_origin,
     854             :  * unless running in recovery.
     855             :  */
     856             : void
     857           0 : replorigin_advance(RepOriginId node,
     858             :                    XLogRecPtr remote_commit, XLogRecPtr local_commit,
     859             :                    bool go_backward, bool wal_log)
     860             : {
     861             :     int         i;
     862           0 :     ReplicationState *replication_state = NULL;
     863           0 :     ReplicationState *free_state = NULL;
     864             : 
     865             :     Assert(node != InvalidRepOriginId);
     866             : 
     867             :     /* we don't track DoNotReplicateId */
     868           0 :     if (node == DoNotReplicateId)
     869           0 :         return;
     870             : 
     871             :     /*
     872             :      * XXX: For the case where this is called by WAL replay, it'd be more
     873             :      * efficient to restore into a backend local hashtable and only dump into
     874             :      * shmem after recovery is finished. Let's wait with implementing that
     875             :      * till it's shown to be a measurable expense
     876             :      */
     877             : 
     878             :     /* Lock exclusively, as we may have to create a new table entry. */
     879           0 :     LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
     880             : 
     881             :     /*
     882             :      * Search for either an existing slot for the origin, or a free one we can
     883             :      * use.
     884             :      */
     885           0 :     for (i = 0; i < max_replication_slots; i++)
     886             :     {
     887           0 :         ReplicationState *curstate = &replication_states[i];
     888             : 
     889             :         /* remember where to insert if necessary */
     890           0 :         if (curstate->roident == InvalidRepOriginId &&
     891             :             free_state == NULL)
     892             :         {
     893           0 :             free_state = curstate;
     894           0 :             continue;
     895             :         }
     896             : 
     897             :         /* not our slot */
     898           0 :         if (curstate->roident != node)
     899             :         {
     900           0 :             continue;
     901             :         }
     902             : 
     903             :         /* ok, found slot */
     904           0 :         replication_state = curstate;
     905             : 
     906           0 :         LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE);
     907             : 
     908             :         /* Make sure it's not used by somebody else */
     909           0 :         if (replication_state->acquired_by != 0)
     910             :         {
     911           0 :             ereport(ERROR,
     912             :                     (errcode(ERRCODE_OBJECT_IN_USE),
     913             :                      errmsg("replication origin with OID %d is already active for PID %d",
     914             :                             replication_state->roident,
     915             :                             replication_state->acquired_by)));
     916             :         }
     917             : 
     918           0 :         break;
     919             :     }
     920             : 
     921           0 :     if (replication_state == NULL && free_state == NULL)
     922           0 :         ereport(ERROR,
     923             :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     924             :                  errmsg("could not find free replication state slot for replication origin with OID %u",
     925             :                         node),
     926             :                  errhint("Increase max_replication_slots and try again.")));
     927             : 
     928           0 :     if (replication_state == NULL)
     929             :     {
     930             :         /* initialize new slot */
     931           0 :         LWLockAcquire(&free_state->lock, LW_EXCLUSIVE);
     932           0 :         replication_state = free_state;
     933             :         Assert(replication_state->remote_lsn == InvalidXLogRecPtr);
     934             :         Assert(replication_state->local_lsn == InvalidXLogRecPtr);
     935           0 :         replication_state->roident = node;
     936             :     }
     937             : 
     938             :     Assert(replication_state->roident != InvalidRepOriginId);
     939             : 
     940             :     /*
     941             :      * If somebody "forcefully" sets this slot, WAL log it, so it's durable
     942             :      * and the standby gets the message. Primarily this will be called during
     943             :      * WAL replay (of commit records) where no WAL logging is necessary.
     944             :      */
     945           0 :     if (wal_log)
     946             :     {
     947             :         xl_replorigin_set xlrec;
     948             : 
     949           0 :         xlrec.remote_lsn = remote_commit;
     950           0 :         xlrec.node_id = node;
     951           0 :         xlrec.force = go_backward;
     952             : 
     953           0 :         XLogBeginInsert();
     954           0 :         XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
     955             : 
     956           0 :         XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_SET);
     957             :     }
     958             : 
     959             :     /*
     960             :      * Due to - harmless - race conditions during a checkpoint we could see
     961             :      * values here that are older than the ones we already have in memory.
     962             :      * Don't overwrite those.
     963             :      */
     964           0 :     if (go_backward || replication_state->remote_lsn < remote_commit)
     965           0 :         replication_state->remote_lsn = remote_commit;
     966           0 :     if (local_commit != InvalidXLogRecPtr &&
     967           0 :         (go_backward || replication_state->local_lsn < local_commit))
     968           0 :         replication_state->local_lsn = local_commit;
     969           0 :     LWLockRelease(&replication_state->lock);
     970             : 
     971             :     /*
     972             :      * Release *after* changing the LSNs, slot isn't acquired and thus could
     973             :      * otherwise be dropped anytime.
     974             :      */
     975           0 :     LWLockRelease(ReplicationOriginLock);
     976             : }
     977             : 
     978             : 
     979             : XLogRecPtr
     980           4 : replorigin_get_progress(RepOriginId node, bool flush)
     981             : {
     982             :     int         i;
     983           4 :     XLogRecPtr  local_lsn = InvalidXLogRecPtr;
     984           4 :     XLogRecPtr  remote_lsn = InvalidXLogRecPtr;
     985             : 
     986             :     /* prevent slots from being concurrently dropped */
     987           4 :     LWLockAcquire(ReplicationOriginLock, LW_SHARED);
     988             : 
     989           4 :     for (i = 0; i < max_replication_slots; i++)
     990             :     {
     991             :         ReplicationState *state;
     992             : 
     993           4 :         state = &replication_states[i];
     994             : 
     995           4 :         if (state->roident == node)
     996             :         {
     997           4 :             LWLockAcquire(&state->lock, LW_SHARED);
     998             : 
     999           4 :             remote_lsn = state->remote_lsn;
    1000           4 :             local_lsn = state->local_lsn;
    1001             : 
    1002           4 :             LWLockRelease(&state->lock);
    1003             : 
    1004           4 :             break;
    1005             :         }
    1006             :     }
    1007             : 
    1008           4 :     LWLockRelease(ReplicationOriginLock);
    1009             : 
    1010           4 :     if (flush && local_lsn != InvalidXLogRecPtr)
    1011           2 :         XLogFlush(local_lsn);
    1012             : 
    1013           4 :     return remote_lsn;
    1014             : }
    1015             : 
    1016             : /*
    1017             :  * Tear down a (possibly) configured session replication origin during process
    1018             :  * exit.
    1019             :  */
    1020             : static void
    1021          56 : ReplicationOriginExitCleanup(int code, Datum arg)
    1022             : {
    1023          56 :     ConditionVariable *cv = NULL;
    1024             : 
    1025          56 :     LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
    1026             : 
    1027         110 :     if (session_replication_state != NULL &&
    1028          54 :         session_replication_state->acquired_by == MyProcPid)
    1029             :     {
    1030          54 :         cv = &session_replication_state->origin_cv;
    1031             : 
    1032          54 :         session_replication_state->acquired_by = 0;
    1033          54 :         session_replication_state = NULL;
    1034             :     }
    1035             : 
    1036          56 :     LWLockRelease(ReplicationOriginLock);
    1037             : 
    1038          56 :     if (cv)
    1039          54 :         ConditionVariableBroadcast(cv);
    1040          56 : }
    1041             : 
    1042             : /*
    1043             :  * Setup a replication origin in the shared memory struct if it doesn't
    1044             :  * already exists and cache access to the specific ReplicationSlot so the
    1045             :  * array doesn't have to be searched when calling
    1046             :  * replorigin_session_advance().
    1047             :  *
    1048             :  * Obviously only one such cached origin can exist per process and the current
    1049             :  * cached value can only be set again after the previous value is torn down
    1050             :  * with replorigin_session_reset().
    1051             :  */
    1052             : void
    1053          58 : replorigin_session_setup(RepOriginId node)
    1054             : {
    1055             :     static bool registered_cleanup;
    1056             :     int         i;
    1057          58 :     int         free_slot = -1;
    1058             : 
    1059          58 :     if (!registered_cleanup)
    1060             :     {
    1061          56 :         on_shmem_exit(ReplicationOriginExitCleanup, 0);
    1062          56 :         registered_cleanup = true;
    1063             :     }
    1064             : 
    1065             :     Assert(max_replication_slots > 0);
    1066             : 
    1067          58 :     if (session_replication_state != NULL)
    1068           2 :         ereport(ERROR,
    1069             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1070             :                  errmsg("cannot setup replication origin when one is already setup")));
    1071             : 
    1072             :     /* Lock exclusively, as we may have to create a new table entry. */
    1073          56 :     LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
    1074             : 
    1075             :     /*
    1076             :      * Search for either an existing slot for the origin, or a free one we can
    1077             :      * use.
    1078             :      */
    1079         334 :     for (i = 0; i < max_replication_slots; i++)
    1080             :     {
    1081         278 :         ReplicationState *curstate = &replication_states[i];
    1082             : 
    1083             :         /* remember where to insert if necessary */
    1084         278 :         if (curstate->roident == InvalidRepOriginId &&
    1085             :             free_slot == -1)
    1086             :         {
    1087          56 :             free_slot = i;
    1088          56 :             continue;
    1089             :         }
    1090             : 
    1091             :         /* not our slot */
    1092         222 :         if (curstate->roident != node)
    1093         202 :             continue;
    1094             : 
    1095          20 :         else if (curstate->acquired_by != 0)
    1096             :         {
    1097           0 :             ereport(ERROR,
    1098             :                     (errcode(ERRCODE_OBJECT_IN_USE),
    1099             :                      errmsg("replication origin with OID %d is already active for PID %d",
    1100             :                             curstate->roident, curstate->acquired_by)));
    1101             :         }
    1102             : 
    1103             :         /* ok, found slot */
    1104          20 :         session_replication_state = curstate;
    1105             :     }
    1106             : 
    1107             : 
    1108          56 :     if (session_replication_state == NULL && free_slot == -1)
    1109           0 :         ereport(ERROR,
    1110             :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
    1111             :                  errmsg("could not find free replication state slot for replication origin with OID %u",
    1112             :                         node),
    1113             :                  errhint("Increase max_replication_slots and try again.")));
    1114          56 :     else if (session_replication_state == NULL)
    1115             :     {
    1116             :         /* initialize new slot */
    1117          36 :         session_replication_state = &replication_states[free_slot];
    1118             :         Assert(session_replication_state->remote_lsn == InvalidXLogRecPtr);
    1119             :         Assert(session_replication_state->local_lsn == InvalidXLogRecPtr);
    1120          36 :         session_replication_state->roident = node;
    1121             :     }
    1122             : 
    1123             : 
    1124             :     Assert(session_replication_state->roident != InvalidRepOriginId);
    1125             : 
    1126          56 :     session_replication_state->acquired_by = MyProcPid;
    1127             : 
    1128          56 :     LWLockRelease(ReplicationOriginLock);
    1129             : 
    1130             :     /* probably this one is pointless */
    1131          56 :     ConditionVariableBroadcast(&session_replication_state->origin_cv);
    1132          56 : }
    1133             : 
    1134             : /*
    1135             :  * Reset replay state previously setup in this session.
    1136             :  *
    1137             :  * This function may only be called if an origin was setup with
    1138             :  * replorigin_session_setup().
    1139             :  */
    1140             : void
    1141           4 : replorigin_session_reset(void)
    1142             : {
    1143             :     ConditionVariable *cv;
    1144             : 
    1145             :     Assert(max_replication_slots != 0);
    1146             : 
    1147           4 :     if (session_replication_state == NULL)
    1148           2 :         ereport(ERROR,
    1149             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1150             :                  errmsg("no replication origin is configured")));
    1151             : 
    1152           2 :     LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
    1153             : 
    1154           2 :     session_replication_state->acquired_by = 0;
    1155           2 :     cv = &session_replication_state->origin_cv;
    1156           2 :     session_replication_state = NULL;
    1157             : 
    1158           2 :     LWLockRelease(ReplicationOriginLock);
    1159             : 
    1160           2 :     ConditionVariableBroadcast(cv);
    1161           2 : }
    1162             : 
    1163             : /*
    1164             :  * Do the same work replorigin_advance() does, just on the session's
    1165             :  * configured origin.
    1166             :  *
    1167             :  * This is noticeably cheaper than using replorigin_advance().
    1168             :  */
    1169             : void
    1170         276 : replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit)
    1171             : {
    1172             :     Assert(session_replication_state != NULL);
    1173             :     Assert(session_replication_state->roident != InvalidRepOriginId);
    1174             : 
    1175         276 :     LWLockAcquire(&session_replication_state->lock, LW_EXCLUSIVE);
    1176         276 :     if (session_replication_state->local_lsn < local_commit)
    1177         276 :         session_replication_state->local_lsn = local_commit;
    1178         276 :     if (session_replication_state->remote_lsn < remote_commit)
    1179         208 :         session_replication_state->remote_lsn = remote_commit;
    1180         276 :     LWLockRelease(&session_replication_state->lock);
    1181         276 : }
    1182             : 
    1183             : /*
    1184             :  * Ask the machinery about the point up to which we successfully replayed
    1185             :  * changes from an already setup replication origin.
    1186             :  */
    1187             : XLogRecPtr
    1188          58 : replorigin_session_get_progress(bool flush)
    1189             : {
    1190             :     XLogRecPtr  remote_lsn;
    1191             :     XLogRecPtr  local_lsn;
    1192             : 
    1193             :     Assert(session_replication_state != NULL);
    1194             : 
    1195          58 :     LWLockAcquire(&session_replication_state->lock, LW_SHARED);
    1196          58 :     remote_lsn = session_replication_state->remote_lsn;
    1197          58 :     local_lsn = session_replication_state->local_lsn;
    1198          58 :     LWLockRelease(&session_replication_state->lock);
    1199             : 
    1200          58 :     if (flush && local_lsn != InvalidXLogRecPtr)
    1201           2 :         XLogFlush(local_lsn);
    1202             : 
    1203          58 :     return remote_lsn;
    1204             : }
    1205             : 
    1206             : 
    1207             : 
    1208             : /* ---------------------------------------------------------------------------
    1209             :  * SQL functions for working with replication origin.
    1210             :  *
    1211             :  * These mostly should be fairly short wrappers around more generic functions.
    1212             :  * ---------------------------------------------------------------------------
    1213             :  */
    1214             : 
    1215             : /*
    1216             :  * Create replication origin for the passed in name, and return the assigned
    1217             :  * oid.
    1218             :  */
    1219             : Datum
    1220           6 : pg_replication_origin_create(PG_FUNCTION_ARGS)
    1221             : {
    1222             :     char       *name;
    1223             :     RepOriginId roident;
    1224             : 
    1225           6 :     replorigin_check_prerequisites(false, false);
    1226             : 
    1227           6 :     name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
    1228             : 
    1229             :     /* Replication origins "pg_xxx" are reserved for internal use */
    1230           6 :     if (IsReservedName(name))
    1231           0 :         ereport(ERROR,
    1232             :                 (errcode(ERRCODE_RESERVED_NAME),
    1233             :                  errmsg("replication origin name \"%s\" is reserved",
    1234             :                         name),
    1235             :                  errdetail("Origin names starting with \"pg_\" are reserved.")));
    1236             : 
    1237             :     /*
    1238             :      * If built with appropriate switch, whine when regression-testing
    1239             :      * conventions for replication origin names are violated.
    1240             :      */
    1241             : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
    1242             :     if (strncmp(name, "regress_", 8) != 0)
    1243             :         elog(WARNING, "replication origins created by regression test cases should have names starting with \"regress_\"");
    1244             : #endif
    1245             : 
    1246           6 :     roident = replorigin_create(name);
    1247             : 
    1248           4 :     pfree(name);
    1249             : 
    1250           4 :     PG_RETURN_OID(roident);
    1251             : }
    1252             : 
    1253             : /*
    1254             :  * Drop replication origin.
    1255             :  */
    1256             : Datum
    1257           6 : pg_replication_origin_drop(PG_FUNCTION_ARGS)
    1258             : {
    1259             :     char       *name;
    1260             :     RepOriginId roident;
    1261             : 
    1262           6 :     replorigin_check_prerequisites(false, false);
    1263             : 
    1264           6 :     name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
    1265             : 
    1266           6 :     roident = replorigin_by_name(name, false);
    1267             :     Assert(OidIsValid(roident));
    1268             : 
    1269           4 :     replorigin_drop(roident, true);
    1270             : 
    1271           4 :     pfree(name);
    1272             : 
    1273           4 :     PG_RETURN_VOID();
    1274             : }
    1275             : 
    1276             : /*
    1277             :  * Return oid of a replication origin.
    1278             :  */
    1279             : Datum
    1280           0 : pg_replication_origin_oid(PG_FUNCTION_ARGS)
    1281             : {
    1282             :     char       *name;
    1283             :     RepOriginId roident;
    1284             : 
    1285           0 :     replorigin_check_prerequisites(false, false);
    1286             : 
    1287           0 :     name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
    1288           0 :     roident = replorigin_by_name(name, true);
    1289             : 
    1290           0 :     pfree(name);
    1291             : 
    1292           0 :     if (OidIsValid(roident))
    1293           0 :         PG_RETURN_OID(roident);
    1294           0 :     PG_RETURN_NULL();
    1295             : }
    1296             : 
    1297             : /*
    1298             :  * Setup a replication origin for this session.
    1299             :  */
    1300             : Datum
    1301           6 : pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
    1302             : {
    1303             :     char       *name;
    1304             :     RepOriginId origin;
    1305             : 
    1306           6 :     replorigin_check_prerequisites(true, false);
    1307             : 
    1308           6 :     name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
    1309           6 :     origin = replorigin_by_name(name, false);
    1310           4 :     replorigin_session_setup(origin);
    1311             : 
    1312           2 :     replorigin_session_origin = origin;
    1313             : 
    1314           2 :     pfree(name);
    1315             : 
    1316           2 :     PG_RETURN_VOID();
    1317             : }
    1318             : 
    1319             : /*
    1320             :  * Reset previously setup origin in this session
    1321             :  */
    1322             : Datum
    1323           4 : pg_replication_origin_session_reset(PG_FUNCTION_ARGS)
    1324             : {
    1325           4 :     replorigin_check_prerequisites(true, false);
    1326             : 
    1327           4 :     replorigin_session_reset();
    1328             : 
    1329           2 :     replorigin_session_origin = InvalidRepOriginId;
    1330           2 :     replorigin_session_origin_lsn = InvalidXLogRecPtr;
    1331           2 :     replorigin_session_origin_timestamp = 0;
    1332             : 
    1333           2 :     PG_RETURN_VOID();
    1334             : }
    1335             : 
    1336             : /*
    1337             :  * Has a replication origin been setup for this session.
    1338             :  */
    1339             : Datum
    1340           0 : pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)
    1341             : {
    1342           0 :     replorigin_check_prerequisites(false, false);
    1343             : 
    1344           0 :     PG_RETURN_BOOL(replorigin_session_origin != InvalidRepOriginId);
    1345             : }
    1346             : 
    1347             : 
    1348             : /*
    1349             :  * Return the replication progress for origin setup in the current session.
    1350             :  *
    1351             :  * If 'flush' is set to true it is ensured that the returned value corresponds
    1352             :  * to a local transaction that has been flushed. This is useful if asynchronous
    1353             :  * commits are used when replaying replicated transactions.
    1354             :  */
    1355             : Datum
    1356           4 : pg_replication_origin_session_progress(PG_FUNCTION_ARGS)
    1357             : {
    1358           4 :     XLogRecPtr  remote_lsn = InvalidXLogRecPtr;
    1359           4 :     bool        flush = PG_GETARG_BOOL(0);
    1360             : 
    1361           4 :     replorigin_check_prerequisites(true, false);
    1362             : 
    1363           4 :     if (session_replication_state == NULL)
    1364           0 :         ereport(ERROR,
    1365             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1366             :                  errmsg("no replication origin is configured")));
    1367             : 
    1368           4 :     remote_lsn = replorigin_session_get_progress(flush);
    1369             : 
    1370           4 :     if (remote_lsn == InvalidXLogRecPtr)
    1371           0 :         PG_RETURN_NULL();
    1372             : 
    1373           4 :     PG_RETURN_LSN(remote_lsn);
    1374             : }
    1375             : 
    1376             : Datum
    1377           2 : pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)
    1378             : {
    1379           2 :     XLogRecPtr  location = PG_GETARG_LSN(0);
    1380             : 
    1381           2 :     replorigin_check_prerequisites(true, false);
    1382             : 
    1383           2 :     if (session_replication_state == NULL)
    1384           0 :         ereport(ERROR,
    1385             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1386             :                  errmsg("no replication origin is configured")));
    1387             : 
    1388           2 :     replorigin_session_origin_lsn = location;
    1389           2 :     replorigin_session_origin_timestamp = PG_GETARG_TIMESTAMPTZ(1);
    1390             : 
    1391           2 :     PG_RETURN_VOID();
    1392             : }
    1393             : 
    1394             : Datum
    1395           0 : pg_replication_origin_xact_reset(PG_FUNCTION_ARGS)
    1396             : {
    1397           0 :     replorigin_check_prerequisites(true, false);
    1398             : 
    1399           0 :     replorigin_session_origin_lsn = InvalidXLogRecPtr;
    1400           0 :     replorigin_session_origin_timestamp = 0;
    1401             : 
    1402           0 :     PG_RETURN_VOID();
    1403             : }
    1404             : 
    1405             : 
    1406             : Datum
    1407           2 : pg_replication_origin_advance(PG_FUNCTION_ARGS)
    1408             : {
    1409           2 :     text       *name = PG_GETARG_TEXT_PP(0);
    1410           2 :     XLogRecPtr  remote_commit = PG_GETARG_LSN(1);
    1411             :     RepOriginId node;
    1412             : 
    1413           2 :     replorigin_check_prerequisites(true, false);
    1414             : 
    1415             :     /* lock to prevent the replication origin from vanishing */
    1416           2 :     LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
    1417             : 
    1418           2 :     node = replorigin_by_name(text_to_cstring(name), false);
    1419             : 
    1420             :     /*
    1421             :      * Can't sensibly pass a local commit to be flushed at checkpoint - this
    1422             :      * xact hasn't committed yet. This is why this function should be used to
    1423             :      * set up the initial replication state, but not for replay.
    1424             :      */
    1425           0 :     replorigin_advance(node, remote_commit, InvalidXLogRecPtr,
    1426             :                        true /* go backward */ , true /* WAL log */ );
    1427             : 
    1428           0 :     UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
    1429             : 
    1430           0 :     PG_RETURN_VOID();
    1431             : }
    1432             : 
    1433             : 
    1434             : /*
    1435             :  * Return the replication progress for an individual replication origin.
    1436             :  *
    1437             :  * If 'flush' is set to true it is ensured that the returned value corresponds
    1438             :  * to a local transaction that has been flushed. This is useful if asynchronous
    1439             :  * commits are used when replaying replicated transactions.
    1440             :  */
    1441             : Datum
    1442           6 : pg_replication_origin_progress(PG_FUNCTION_ARGS)
    1443             : {
    1444             :     char       *name;
    1445             :     bool        flush;
    1446             :     RepOriginId roident;
    1447           6 :     XLogRecPtr  remote_lsn = InvalidXLogRecPtr;
    1448             : 
    1449           6 :     replorigin_check_prerequisites(true, true);
    1450             : 
    1451           6 :     name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
    1452           6 :     flush = PG_GETARG_BOOL(1);
    1453             : 
    1454           6 :     roident = replorigin_by_name(name, false);
    1455             :     Assert(OidIsValid(roident));
    1456             : 
    1457           4 :     remote_lsn = replorigin_get_progress(roident, flush);
    1458             : 
    1459           4 :     if (remote_lsn == InvalidXLogRecPtr)
    1460           0 :         PG_RETURN_NULL();
    1461             : 
    1462           4 :     PG_RETURN_LSN(remote_lsn);
    1463             : }
    1464             : 
    1465             : 
    1466             : Datum
    1467           2 : pg_show_replication_origin_status(PG_FUNCTION_ARGS)
    1468             : {
    1469           2 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
    1470             :     TupleDesc   tupdesc;
    1471             :     Tuplestorestate *tupstore;
    1472             :     MemoryContext per_query_ctx;
    1473             :     MemoryContext oldcontext;
    1474             :     int         i;
    1475             : #define REPLICATION_ORIGIN_PROGRESS_COLS 4
    1476             : 
    1477             :     /* we want to return 0 rows if slot is set to zero */
    1478           2 :     replorigin_check_prerequisites(false, true);
    1479             : 
    1480           2 :     if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
    1481           0 :         ereport(ERROR,
    1482             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1483             :                  errmsg("set-valued function called in context that cannot accept a set")));
    1484           2 :     if (!(rsinfo->allowedModes & SFRM_Materialize))
    1485           0 :         ereport(ERROR,
    1486             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1487             :                  errmsg("materialize mode required, but it is not allowed in this context")));
    1488           2 :     if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
    1489           0 :         elog(ERROR, "return type must be a row type");
    1490             : 
    1491           2 :     if (tupdesc->natts != REPLICATION_ORIGIN_PROGRESS_COLS)
    1492           0 :         elog(ERROR, "wrong function definition");
    1493             : 
    1494           2 :     per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
    1495           2 :     oldcontext = MemoryContextSwitchTo(per_query_ctx);
    1496             : 
    1497           2 :     tupstore = tuplestore_begin_heap(true, false, work_mem);
    1498           2 :     rsinfo->returnMode = SFRM_Materialize;
    1499           2 :     rsinfo->setResult = tupstore;
    1500           2 :     rsinfo->setDesc = tupdesc;
    1501             : 
    1502           2 :     MemoryContextSwitchTo(oldcontext);
    1503             : 
    1504             : 
    1505             :     /* prevent slots from being concurrently dropped */
    1506           2 :     LWLockAcquire(ReplicationOriginLock, LW_SHARED);
    1507             : 
    1508             :     /*
    1509             :      * Iterate through all possible replication_states, display if they are
    1510             :      * filled. Note that we do not take any locks, so slightly corrupted/out
    1511             :      * of date values are a possibility.
    1512             :      */
    1513          10 :     for (i = 0; i < max_replication_slots; i++)
    1514             :     {
    1515             :         ReplicationState *state;
    1516             :         Datum       values[REPLICATION_ORIGIN_PROGRESS_COLS];
    1517             :         bool        nulls[REPLICATION_ORIGIN_PROGRESS_COLS];
    1518             :         char       *roname;
    1519             : 
    1520           8 :         state = &replication_states[i];
    1521             : 
    1522             :         /* unused slot, nothing to display */
    1523           8 :         if (state->roident == InvalidRepOriginId)
    1524           6 :             continue;
    1525             : 
    1526           2 :         memset(values, 0, sizeof(values));
    1527           2 :         memset(nulls, 1, sizeof(nulls));
    1528             : 
    1529           2 :         values[0] = ObjectIdGetDatum(state->roident);
    1530           2 :         nulls[0] = false;
    1531             : 
    1532             :         /*
    1533             :          * We're not preventing the origin to be dropped concurrently, so
    1534             :          * silently accept that it might be gone.
    1535             :          */
    1536           2 :         if (replorigin_by_oid(state->roident, true,
    1537             :                               &roname))
    1538             :         {
    1539           2 :             values[1] = CStringGetTextDatum(roname);
    1540           2 :             nulls[1] = false;
    1541             :         }
    1542             : 
    1543           2 :         LWLockAcquire(&state->lock, LW_SHARED);
    1544             : 
    1545           2 :         values[2] = LSNGetDatum(state->remote_lsn);
    1546           2 :         nulls[2] = false;
    1547             : 
    1548           2 :         values[3] = LSNGetDatum(state->local_lsn);
    1549           2 :         nulls[3] = false;
    1550             : 
    1551           2 :         LWLockRelease(&state->lock);
    1552             : 
    1553           2 :         tuplestore_putvalues(tupstore, tupdesc, values, nulls);
    1554             :     }
    1555             : 
    1556             :     tuplestore_donestoring(tupstore);
    1557             : 
    1558           2 :     LWLockRelease(ReplicationOriginLock);
    1559             : 
    1560             : #undef REPLICATION_ORIGIN_PROGRESS_COLS
    1561             : 
    1562           2 :     return (Datum) 0;
    1563             : }

Generated by: LCOV version 1.13