LCOV - code coverage report
Current view: top level - src/backend/replication/logical - origin.c (source / functions) Hit Total Coverage
Test: PostgreSQL 15devel Lines: 388 459 84.5 %
Date: 2021-12-09 03:08:47 Functions: 27 30 90.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * origin.c
       4             :  *    Logical replication progress tracking support.
       5             :  *
       6             :  * Copyright (c) 2013-2021, PostgreSQL Global Development Group
       7             :  *
       8             :  * IDENTIFICATION
       9             :  *    src/backend/replication/logical/origin.c
      10             :  *
      11             :  * NOTES
      12             :  *
      13             :  * This file provides the following:
      14             :  * * An infrastructure to name nodes in a replication setup
      15             :  * * A facility to efficiently store and persist replication progress in an
      16             :  *   efficient and durable manner.
      17             :  *
      18             :  * Replication origin consist out of a descriptive, user defined, external
      19             :  * name and a short, thus space efficient, internal 2 byte one. This split
      20             :  * exists because replication origin have to be stored in WAL and shared
      21             :  * memory and long descriptors would be inefficient.  For now only use 2 bytes
      22             :  * for the internal id of a replication origin as it seems unlikely that there
      23             :  * soon will be more than 65k nodes in one replication setup; and using only
      24             :  * two bytes allow us to be more space efficient.
      25             :  *
      26             :  * Replication progress is tracked in a shared memory table
      27             :  * (ReplicationState) that's dumped to disk every checkpoint. Entries
      28             :  * ('slots') in this table are identified by the internal id. That's the case
      29             :  * because it allows to increase replication progress during crash
      30             :  * recovery. To allow doing so we store the original LSN (from the originating
      31             :  * system) of a transaction in the commit record. That allows to recover the
      32             :  * precise replayed state after crash recovery; without requiring synchronous
      33             :  * commits. Allowing logical replication to use asynchronous commit is
      34             :  * generally good for performance, but especially important as it allows a
      35             :  * single threaded replay process to keep up with a source that has multiple
      36             :  * backends generating changes concurrently.  For efficiency and simplicity
      37             :  * reasons a backend can setup one replication origin that's from then used as
      38             :  * the source of changes produced by the backend, until reset again.
      39             :  *
      40             :  * This infrastructure is intended to be used in cooperation with logical
      41             :  * decoding. When replaying from a remote system the configured origin is
      42             :  * provided to output plugins, allowing prevention of replication loops and
      43             :  * other filtering.
      44             :  *
      45             :  * There are several levels of locking at work:
      46             :  *
      47             :  * * To create and drop replication origins an exclusive lock on
      48             :  *   pg_replication_slot is required for the duration. That allows us to
      49             :  *   safely and conflict free assign new origins using a dirty snapshot.
      50             :  *
      51             :  * * When creating an in-memory replication progress slot the ReplicationOrigin
      52             :  *   LWLock has to be held exclusively; when iterating over the replication
      53             :  *   progress a shared lock has to be held, the same when advancing the
      54             :  *   replication progress of an individual backend that has not setup as the
      55             :  *   session's replication origin.
      56             :  *
      57             :  * * When manipulating or looking at the remote_lsn and local_lsn fields of a
      58             :  *   replication progress slot that slot's lwlock has to be held. That's
      59             :  *   primarily because we do not assume 8 byte writes (the LSN) is atomic on
      60             :  *   all our platforms, but it also simplifies memory ordering concerns
      61             :  *   between the remote and local lsn. We use a lwlock instead of a spinlock
      62             :  *   so it's less harmful to hold the lock over a WAL write
      63             :  *   (cf. AdvanceReplicationProgress).
      64             :  *
      65             :  * ---------------------------------------------------------------------------
      66             :  */
      67             : 
      68             : #include "postgres.h"
      69             : 
      70             : #include <unistd.h>
      71             : #include <sys/stat.h>
      72             : 
      73             : #include "access/genam.h"
      74             : #include "access/htup_details.h"
      75             : #include "access/table.h"
      76             : #include "access/xact.h"
      77             : #include "catalog/catalog.h"
      78             : #include "catalog/indexing.h"
      79             : #include "funcapi.h"
      80             : #include "miscadmin.h"
      81             : #include "nodes/execnodes.h"
      82             : #include "pgstat.h"
      83             : #include "replication/logical.h"
      84             : #include "replication/origin.h"
      85             : #include "storage/condition_variable.h"
      86             : #include "storage/copydir.h"
      87             : #include "storage/fd.h"
      88             : #include "storage/ipc.h"
      89             : #include "storage/lmgr.h"
      90             : #include "utils/builtins.h"
      91             : #include "utils/fmgroids.h"
      92             : #include "utils/pg_lsn.h"
      93             : #include "utils/rel.h"
      94             : #include "utils/snapmgr.h"
      95             : #include "utils/syscache.h"
      96             : 
      97             : /*
      98             :  * Replay progress of a single remote node.
      99             :  */
     100             : typedef struct ReplicationState
     101             : {
     102             :     /*
     103             :      * Local identifier for the remote node.
     104             :      */
     105             :     RepOriginId roident;
     106             : 
     107             :     /*
     108             :      * Location of the latest commit from the remote side.
     109             :      */
     110             :     XLogRecPtr  remote_lsn;
     111             : 
     112             :     /*
     113             :      * Remember the local lsn of the commit record so we can XLogFlush() to it
     114             :      * during a checkpoint so we know the commit record actually is safe on
     115             :      * disk.
     116             :      */
     117             :     XLogRecPtr  local_lsn;
     118             : 
     119             :     /*
     120             :      * PID of backend that's acquired slot, or 0 if none.
     121             :      */
     122             :     int         acquired_by;
     123             : 
     124             :     /*
     125             :      * Condition variable that's signaled when acquired_by changes.
     126             :      */
     127             :     ConditionVariable origin_cv;
     128             : 
     129             :     /*
     130             :      * Lock protecting remote_lsn and local_lsn.
     131             :      */
     132             :     LWLock      lock;
     133             : } ReplicationState;
     134             : 
     135             : /*
     136             :  * On disk version of ReplicationState.
     137             :  */
     138             : typedef struct ReplicationStateOnDisk
     139             : {
     140             :     RepOriginId roident;
     141             :     XLogRecPtr  remote_lsn;
     142             : } ReplicationStateOnDisk;
     143             : 
     144             : 
     145             : typedef struct ReplicationStateCtl
     146             : {
     147             :     /* Tranche to use for per-origin LWLocks */
     148             :     int         tranche_id;
     149             :     /* Array of length max_replication_slots */
     150             :     ReplicationState states[FLEXIBLE_ARRAY_MEMBER];
     151             : } ReplicationStateCtl;
     152             : 
     153             : /* external variables */
     154             : RepOriginId replorigin_session_origin = InvalidRepOriginId; /* assumed identity */
     155             : XLogRecPtr  replorigin_session_origin_lsn = InvalidXLogRecPtr;
     156             : TimestampTz replorigin_session_origin_timestamp = 0;
     157             : 
     158             : /*
     159             :  * Base address into a shared memory array of replication states of size
     160             :  * max_replication_slots.
     161             :  *
     162             :  * XXX: Should we use a separate variable to size this rather than
     163             :  * max_replication_slots?
     164             :  */
     165             : static ReplicationState *replication_states;
     166             : 
     167             : /*
     168             :  * Actual shared memory block (replication_states[] is now part of this).
     169             :  */
     170             : static ReplicationStateCtl *replication_states_ctl;
     171             : 
     172             : /*
     173             :  * Backend-local, cached element from ReplicationState for use in a backend
     174             :  * replaying remote commits, so we don't have to search ReplicationState for
     175             :  * the backends current RepOriginId.
     176             :  */
     177             : static ReplicationState *session_replication_state = NULL;
     178             : 
     179             : /* Magic for on disk files. */
     180             : #define REPLICATION_STATE_MAGIC ((uint32) 0x1257DADE)
     181             : 
     182             : static void
     183          46 : replorigin_check_prerequisites(bool check_slots, bool recoveryOK)
     184             : {
     185          46 :     if (check_slots && max_replication_slots == 0)
     186           0 :         ereport(ERROR,
     187             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     188             :                  errmsg("cannot query or manipulate replication origin when max_replication_slots = 0")));
     189             : 
     190          46 :     if (!recoveryOK && RecoveryInProgress())
     191           0 :         ereport(ERROR,
     192             :                 (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
     193             :                  errmsg("cannot manipulate replication origins during recovery")));
     194             : 
     195          46 : }
     196             : 
     197             : 
     198             : /* ---------------------------------------------------------------------------
     199             :  * Functions for working with replication origins themselves.
     200             :  * ---------------------------------------------------------------------------
     201             :  */
     202             : 
     203             : /*
     204             :  * Check for a persistent replication origin identified by name.
     205             :  *
     206             :  * Returns InvalidOid if the node isn't known yet and missing_ok is true.
     207             :  */
     208             : RepOriginId
     209         558 : replorigin_by_name(const char *roname, bool missing_ok)
     210             : {
     211             :     Form_pg_replication_origin ident;
     212         558 :     Oid         roident = InvalidOid;
     213             :     HeapTuple   tuple;
     214             :     Datum       roname_d;
     215             : 
     216         558 :     roname_d = CStringGetTextDatum(roname);
     217             : 
     218         558 :     tuple = SearchSysCache1(REPLORIGNAME, roname_d);
     219         558 :     if (HeapTupleIsValid(tuple))
     220             :     {
     221         376 :         ident = (Form_pg_replication_origin) GETSTRUCT(tuple);
     222         376 :         roident = ident->roident;
     223         376 :         ReleaseSysCache(tuple);
     224             :     }
     225         182 :     else if (!missing_ok)
     226           8 :         ereport(ERROR,
     227             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
     228             :                  errmsg("replication origin \"%s\" does not exist",
     229             :                         roname)));
     230             : 
     231         550 :     return roident;
     232             : }
     233             : 
     234             : /*
     235             :  * Create a replication origin.
     236             :  *
     237             :  * Needs to be called in a transaction.
     238             :  */
     239             : RepOriginId
     240         302 : replorigin_create(const char *roname)
     241             : {
     242             :     Oid         roident;
     243         302 :     HeapTuple   tuple = NULL;
     244             :     Relation    rel;
     245             :     Datum       roname_d;
     246             :     SnapshotData SnapshotDirty;
     247             :     SysScanDesc scan;
     248             :     ScanKeyData key;
     249             : 
     250         302 :     roname_d = CStringGetTextDatum(roname);
     251             : 
     252             :     Assert(IsTransactionState());
     253             : 
     254             :     /*
     255             :      * We need the numeric replication origin to be 16bit wide, so we cannot
     256             :      * rely on the normal oid allocation. Instead we simply scan
     257             :      * pg_replication_origin for the first unused id. That's not particularly
     258             :      * efficient, but this should be a fairly infrequent operation - we can
     259             :      * easily spend a bit more code on this when it turns out it needs to be
     260             :      * faster.
     261             :      *
     262             :      * We handle concurrency by taking an exclusive lock (allowing reads!)
     263             :      * over the table for the duration of the search. Because we use a "dirty
     264             :      * snapshot" we can read rows that other in-progress sessions have
     265             :      * written, even though they would be invisible with normal snapshots. Due
     266             :      * to the exclusive lock there's no danger that new rows can appear while
     267             :      * we're checking.
     268             :      */
     269         302 :     InitDirtySnapshot(SnapshotDirty);
     270             : 
     271         302 :     rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
     272             : 
     273         554 :     for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
     274             :     {
     275             :         bool        nulls[Natts_pg_replication_origin];
     276             :         Datum       values[Natts_pg_replication_origin];
     277             :         bool        collides;
     278             : 
     279         554 :         CHECK_FOR_INTERRUPTS();
     280             : 
     281         554 :         ScanKeyInit(&key,
     282             :                     Anum_pg_replication_origin_roident,
     283             :                     BTEqualStrategyNumber, F_OIDEQ,
     284             :                     ObjectIdGetDatum(roident));
     285             : 
     286         554 :         scan = systable_beginscan(rel, ReplicationOriginIdentIndex,
     287             :                                   true /* indexOK */ ,
     288             :                                   &SnapshotDirty,
     289             :                                   1, &key);
     290             : 
     291         554 :         collides = HeapTupleIsValid(systable_getnext(scan));
     292             : 
     293         554 :         systable_endscan(scan);
     294             : 
     295         554 :         if (!collides)
     296             :         {
     297             :             /*
     298             :              * Ok, found an unused roident, insert the new row and do a CCI,
     299             :              * so our callers can look it up if they want to.
     300             :              */
     301         302 :             memset(&nulls, 0, sizeof(nulls));
     302             : 
     303         302 :             values[Anum_pg_replication_origin_roident - 1] = ObjectIdGetDatum(roident);
     304         302 :             values[Anum_pg_replication_origin_roname - 1] = roname_d;
     305             : 
     306         302 :             tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
     307         302 :             CatalogTupleInsert(rel, tuple);
     308         300 :             CommandCounterIncrement();
     309         300 :             break;
     310             :         }
     311             :     }
     312             : 
     313             :     /* now release lock again,  */
     314         300 :     table_close(rel, ExclusiveLock);
     315             : 
     316         300 :     if (tuple == NULL)
     317           0 :         ereport(ERROR,
     318             :                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
     319             :                  errmsg("could not find free replication origin OID")));
     320             : 
     321         300 :     heap_freetuple(tuple);
     322         300 :     return roident;
     323             : }
     324             : 
     325             : /*
     326             :  * Helper function to drop a replication origin.
     327             :  */
     328             : static void
     329         238 : replorigin_drop_guts(Relation rel, RepOriginId roident, bool nowait)
     330             : {
     331             :     HeapTuple   tuple;
     332             :     int         i;
     333             : 
     334             :     /*
     335             :      * First, clean up the slot state info, if there is any matching slot.
     336             :      */
     337         238 : restart:
     338         238 :     tuple = NULL;
     339         238 :     LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
     340             : 
     341         810 :     for (i = 0; i < max_replication_slots; i++)
     342             :     {
     343         774 :         ReplicationState *state = &replication_states[i];
     344             : 
     345         774 :         if (state->roident == roident)
     346             :         {
     347             :             /* found our slot, is it busy? */
     348         202 :             if (state->acquired_by != 0)
     349             :             {
     350             :                 ConditionVariable *cv;
     351             : 
     352           0 :                 if (nowait)
     353           0 :                     ereport(ERROR,
     354             :                             (errcode(ERRCODE_OBJECT_IN_USE),
     355             :                              errmsg("could not drop replication origin with OID %d, in use by PID %d",
     356             :                                     state->roident,
     357             :                                     state->acquired_by)));
     358             : 
     359             :                 /*
     360             :                  * We must wait and then retry.  Since we don't know which CV
     361             :                  * to wait on until here, we can't readily use
     362             :                  * ConditionVariablePrepareToSleep (calling it here would be
     363             :                  * wrong, since we could miss the signal if we did so); just
     364             :                  * use ConditionVariableSleep directly.
     365             :                  */
     366           0 :                 cv = &state->origin_cv;
     367             : 
     368           0 :                 LWLockRelease(ReplicationOriginLock);
     369             : 
     370           0 :                 ConditionVariableSleep(cv, WAIT_EVENT_REPLICATION_ORIGIN_DROP);
     371           0 :                 goto restart;
     372             :             }
     373             : 
     374             :             /* first make a WAL log entry */
     375             :             {
     376             :                 xl_replorigin_drop xlrec;
     377             : 
     378         202 :                 xlrec.node_id = roident;
     379         202 :                 XLogBeginInsert();
     380         202 :                 XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
     381         202 :                 XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_DROP);
     382             :             }
     383             : 
     384             :             /* then clear the in-memory slot */
     385         202 :             state->roident = InvalidRepOriginId;
     386         202 :             state->remote_lsn = InvalidXLogRecPtr;
     387         202 :             state->local_lsn = InvalidXLogRecPtr;
     388         202 :             break;
     389             :         }
     390             :     }
     391         238 :     LWLockRelease(ReplicationOriginLock);
     392         238 :     ConditionVariableCancelSleep();
     393             : 
     394             :     /*
     395             :      * Now, we can delete the catalog entry.
     396             :      */
     397         238 :     tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident));
     398         238 :     if (!HeapTupleIsValid(tuple))
     399           0 :         elog(ERROR, "cache lookup failed for replication origin with oid %u",
     400             :              roident);
     401             : 
     402         238 :     CatalogTupleDelete(rel, &tuple->t_self);
     403         238 :     ReleaseSysCache(tuple);
     404             : 
     405         238 :     CommandCounterIncrement();
     406         238 : }
     407             : 
     408             : /*
     409             :  * Drop replication origin (by name).
     410             :  *
     411             :  * Needs to be called in a transaction.
     412             :  */
     413             : void
     414         242 : replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
     415             : {
     416             :     RepOriginId roident;
     417             :     Relation    rel;
     418             : 
     419             :     Assert(IsTransactionState());
     420             : 
     421             :     /*
     422             :      * To interlock against concurrent drops, we hold ExclusiveLock on
     423             :      * pg_replication_origin till xact commit.
     424             :      *
     425             :      * XXX We can optimize this by acquiring the lock on a specific origin by
     426             :      * using LockSharedObject if required. However, for that, we first to
     427             :      * acquire a lock on ReplicationOriginRelationId, get the origin_id, lock
     428             :      * the specific origin and then re-check if the origin still exists.
     429             :      */
     430         242 :     rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
     431             : 
     432         242 :     roident = replorigin_by_name(name, missing_ok);
     433             : 
     434         240 :     if (OidIsValid(roident))
     435         238 :         replorigin_drop_guts(rel, roident, nowait);
     436             : 
     437             :     /* We keep the lock on pg_replication_origin until commit */
     438         240 :     table_close(rel, NoLock);
     439         240 : }
     440             : 
     441             : /*
     442             :  * Lookup replication origin via its oid and return the name.
     443             :  *
     444             :  * The external name is palloc'd in the calling context.
     445             :  *
     446             :  * Returns true if the origin is known, false otherwise.
     447             :  */
     448             : bool
     449          28 : replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
     450             : {
     451             :     HeapTuple   tuple;
     452             :     Form_pg_replication_origin ric;
     453             : 
     454             :     Assert(OidIsValid((Oid) roident));
     455             :     Assert(roident != InvalidRepOriginId);
     456             :     Assert(roident != DoNotReplicateId);
     457             : 
     458          28 :     tuple = SearchSysCache1(REPLORIGIDENT,
     459             :                             ObjectIdGetDatum((Oid) roident));
     460             : 
     461          28 :     if (HeapTupleIsValid(tuple))
     462             :     {
     463          24 :         ric = (Form_pg_replication_origin) GETSTRUCT(tuple);
     464          24 :         *roname = text_to_cstring(&ric->roname);
     465          24 :         ReleaseSysCache(tuple);
     466             : 
     467          24 :         return true;
     468             :     }
     469             :     else
     470             :     {
     471           4 :         *roname = NULL;
     472             : 
     473           4 :         if (!missing_ok)
     474           0 :             ereport(ERROR,
     475             :                     (errcode(ERRCODE_UNDEFINED_OBJECT),
     476             :                      errmsg("replication origin with OID %u does not exist",
     477             :                             roident)));
     478             : 
     479           4 :         return false;
     480             :     }
     481             : }
     482             : 
     483             : 
     484             : /* ---------------------------------------------------------------------------
     485             :  * Functions for handling replication progress.
     486             :  * ---------------------------------------------------------------------------
     487             :  */
     488             : 
     489             : Size
     490        9630 : ReplicationOriginShmemSize(void)
     491             : {
     492        9630 :     Size        size = 0;
     493             : 
     494             :     /*
     495             :      * XXX: max_replication_slots is arguably the wrong thing to use, as here
     496             :      * we keep the replay state of *remote* transactions. But for now it seems
     497             :      * sufficient to reuse it, rather than introduce a separate GUC.
     498             :      */
     499        9630 :     if (max_replication_slots == 0)
     500           0 :         return size;
     501             : 
     502        9630 :     size = add_size(size, offsetof(ReplicationStateCtl, states));
     503             : 
     504        9630 :     size = add_size(size,
     505             :                     mul_size(max_replication_slots, sizeof(ReplicationState)));
     506        9630 :     return size;
     507             : }
     508             : 
     509             : void
     510        2894 : ReplicationOriginShmemInit(void)
     511             : {
     512             :     bool        found;
     513             : 
     514        2894 :     if (max_replication_slots == 0)
     515           0 :         return;
     516             : 
     517        2894 :     replication_states_ctl = (ReplicationStateCtl *)
     518        2894 :         ShmemInitStruct("ReplicationOriginState",
     519             :                         ReplicationOriginShmemSize(),
     520             :                         &found);
     521        2894 :     replication_states = replication_states_ctl->states;
     522             : 
     523        2894 :     if (!found)
     524             :     {
     525             :         int         i;
     526             : 
     527      207472 :         MemSet(replication_states_ctl, 0, ReplicationOriginShmemSize());
     528             : 
     529        2894 :         replication_states_ctl->tranche_id = LWTRANCHE_REPLICATION_ORIGIN_STATE;
     530             : 
     531       31706 :         for (i = 0; i < max_replication_slots; i++)
     532             :         {
     533       28812 :             LWLockInitialize(&replication_states[i].lock,
     534       28812 :                              replication_states_ctl->tranche_id);
     535       28812 :             ConditionVariableInit(&replication_states[i].origin_cv);
     536             :         }
     537             :     }
     538             : }
     539             : 
     540             : /* ---------------------------------------------------------------------------
     541             :  * Perform a checkpoint of each replication origin's progress with respect to
     542             :  * the replayed remote_lsn. Make sure that all transactions we refer to in the
     543             :  * checkpoint (local_lsn) are actually on-disk. This might not yet be the case
     544             :  * if the transactions were originally committed asynchronously.
     545             :  *
     546             :  * We store checkpoints in the following format:
     547             :  * +-------+------------------------+------------------+-----+--------+
     548             :  * | MAGIC | ReplicationStateOnDisk | struct Replic... | ... | CRC32C | EOF
     549             :  * +-------+------------------------+------------------+-----+--------+
     550             :  *
     551             :  * So its just the magic, followed by the statically sized
     552             :  * ReplicationStateOnDisk structs. Note that the maximum number of
     553             :  * ReplicationState is determined by max_replication_slots.
     554             :  * ---------------------------------------------------------------------------
     555             :  */
     556             : void
     557        4142 : CheckPointReplicationOrigin(void)
     558             : {
     559        4142 :     const char *tmppath = "pg_logical/replorigin_checkpoint.tmp";
     560        4142 :     const char *path = "pg_logical/replorigin_checkpoint";
     561             :     int         tmpfd;
     562             :     int         i;
     563        4142 :     uint32      magic = REPLICATION_STATE_MAGIC;
     564             :     pg_crc32c   crc;
     565             : 
     566        4142 :     if (max_replication_slots == 0)
     567           0 :         return;
     568             : 
     569        4142 :     INIT_CRC32C(crc);
     570             : 
     571             :     /* make sure no old temp file is remaining */
     572        4142 :     if (unlink(tmppath) < 0 && errno != ENOENT)
     573           0 :         ereport(PANIC,
     574             :                 (errcode_for_file_access(),
     575             :                  errmsg("could not remove file \"%s\": %m",
     576             :                         tmppath)));
     577             : 
     578             :     /*
     579             :      * no other backend can perform this at the same time; only one checkpoint
     580             :      * can happen at a time.
     581             :      */
     582        4142 :     tmpfd = OpenTransientFile(tmppath,
     583             :                               O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
     584        4142 :     if (tmpfd < 0)
     585           0 :         ereport(PANIC,
     586             :                 (errcode_for_file_access(),
     587             :                  errmsg("could not create file \"%s\": %m",
     588             :                         tmppath)));
     589             : 
     590             :     /* write magic */
     591        4142 :     errno = 0;
     592        4142 :     if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
     593             :     {
     594             :         /* if write didn't set errno, assume problem is no disk space */
     595           0 :         if (errno == 0)
     596           0 :             errno = ENOSPC;
     597           0 :         ereport(PANIC,
     598             :                 (errcode_for_file_access(),
     599             :                  errmsg("could not write to file \"%s\": %m",
     600             :                         tmppath)));
     601             :     }
     602        4142 :     COMP_CRC32C(crc, &magic, sizeof(magic));
     603             : 
     604             :     /* prevent concurrent creations/drops */
     605        4142 :     LWLockAcquire(ReplicationOriginLock, LW_SHARED);
     606             : 
     607             :     /* write actual data */
     608       45378 :     for (i = 0; i < max_replication_slots; i++)
     609             :     {
     610             :         ReplicationStateOnDisk disk_state;
     611       41236 :         ReplicationState *curstate = &replication_states[i];
     612             :         XLogRecPtr  local_lsn;
     613             : 
     614       41236 :         if (curstate->roident == InvalidRepOriginId)
     615       41196 :             continue;
     616             : 
     617             :         /* zero, to avoid uninitialized padding bytes */
     618          40 :         memset(&disk_state, 0, sizeof(disk_state));
     619             : 
     620          40 :         LWLockAcquire(&curstate->lock, LW_SHARED);
     621             : 
     622          40 :         disk_state.roident = curstate->roident;
     623             : 
     624          40 :         disk_state.remote_lsn = curstate->remote_lsn;
     625          40 :         local_lsn = curstate->local_lsn;
     626             : 
     627          40 :         LWLockRelease(&curstate->lock);
     628             : 
     629             :         /* make sure we only write out a commit that's persistent */
     630          40 :         XLogFlush(local_lsn);
     631             : 
     632          40 :         errno = 0;
     633          40 :         if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
     634             :             sizeof(disk_state))
     635             :         {
     636             :             /* if write didn't set errno, assume problem is no disk space */
     637           0 :             if (errno == 0)
     638           0 :                 errno = ENOSPC;
     639           0 :             ereport(PANIC,
     640             :                     (errcode_for_file_access(),
     641             :                      errmsg("could not write to file \"%s\": %m",
     642             :                             tmppath)));
     643             :         }
     644             : 
     645          40 :         COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
     646             :     }
     647             : 
     648        4142 :     LWLockRelease(ReplicationOriginLock);
     649             : 
     650             :     /* write out the CRC */
     651        4142 :     FIN_CRC32C(crc);
     652        4142 :     errno = 0;
     653        4142 :     if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
     654             :     {
     655             :         /* if write didn't set errno, assume problem is no disk space */
     656           0 :         if (errno == 0)
     657           0 :             errno = ENOSPC;
     658           0 :         ereport(PANIC,
     659             :                 (errcode_for_file_access(),
     660             :                  errmsg("could not write to file \"%s\": %m",
     661             :                         tmppath)));
     662             :     }
     663             : 
     664        4142 :     if (CloseTransientFile(tmpfd) != 0)
     665           0 :         ereport(PANIC,
     666             :                 (errcode_for_file_access(),
     667             :                  errmsg("could not close file \"%s\": %m",
     668             :                         tmppath)));
     669             : 
     670             :     /* fsync, rename to permanent file, fsync file and directory */
     671        4142 :     durable_rename(tmppath, path, PANIC);
     672             : }
     673             : 
     674             : /*
     675             :  * Recover replication replay status from checkpoint data saved earlier by
     676             :  * CheckPointReplicationOrigin.
     677             :  *
     678             :  * This only needs to be called at startup and *not* during every checkpoint
     679             :  * read during recovery (e.g. in HS or PITR from a base backup) afterwards. All
     680             :  * state thereafter can be recovered by looking at commit records.
     681             :  */
     682             : void
     683        1860 : StartupReplicationOrigin(void)
     684             : {
     685        1860 :     const char *path = "pg_logical/replorigin_checkpoint";
     686             :     int         fd;
     687             :     int         readBytes;
     688        1860 :     uint32      magic = REPLICATION_STATE_MAGIC;
     689        1860 :     int         last_state = 0;
     690             :     pg_crc32c   file_crc;
     691             :     pg_crc32c   crc;
     692             : 
     693             :     /* don't want to overwrite already existing state */
     694             : #ifdef USE_ASSERT_CHECKING
     695             :     static bool already_started = false;
     696             : 
     697             :     Assert(!already_started);
     698             :     already_started = true;
     699             : #endif
     700             : 
     701        1860 :     if (max_replication_slots == 0)
     702         482 :         return;
     703             : 
     704        1860 :     INIT_CRC32C(crc);
     705             : 
     706        1860 :     elog(DEBUG2, "starting up replication origin progress state");
     707             : 
     708        1860 :     fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
     709             : 
     710             :     /*
     711             :      * might have had max_replication_slots == 0 last run, or we just brought
     712             :      * up a standby.
     713             :      */
     714        1860 :     if (fd < 0 && errno == ENOENT)
     715         482 :         return;
     716        1378 :     else if (fd < 0)
     717           0 :         ereport(PANIC,
     718             :                 (errcode_for_file_access(),
     719             :                  errmsg("could not open file \"%s\": %m",
     720             :                         path)));
     721             : 
     722             :     /* verify magic, that is written even if nothing was active */
     723        1378 :     readBytes = read(fd, &magic, sizeof(magic));
     724        1378 :     if (readBytes != sizeof(magic))
     725             :     {
     726           0 :         if (readBytes < 0)
     727           0 :             ereport(PANIC,
     728             :                     (errcode_for_file_access(),
     729             :                      errmsg("could not read file \"%s\": %m",
     730             :                             path)));
     731             :         else
     732           0 :             ereport(PANIC,
     733             :                     (errcode(ERRCODE_DATA_CORRUPTED),
     734             :                      errmsg("could not read file \"%s\": read %d of %zu",
     735             :                             path, readBytes, sizeof(magic))));
     736             :     }
     737        1378 :     COMP_CRC32C(crc, &magic, sizeof(magic));
     738             : 
     739        1378 :     if (magic != REPLICATION_STATE_MAGIC)
     740           0 :         ereport(PANIC,
     741             :                 (errmsg("replication checkpoint has wrong magic %u instead of %u",
     742             :                         magic, REPLICATION_STATE_MAGIC)));
     743             : 
     744             :     /* we can skip locking here, no other access is possible */
     745             : 
     746             :     /* recover individual states, until there are no more to be found */
     747             :     while (true)
     748           4 :     {
     749             :         ReplicationStateOnDisk disk_state;
     750             : 
     751        1382 :         readBytes = read(fd, &disk_state, sizeof(disk_state));
     752             : 
     753             :         /* no further data */
     754        1382 :         if (readBytes == sizeof(crc))
     755             :         {
     756             :             /* not pretty, but simple ... */
     757        1378 :             file_crc = *(pg_crc32c *) &disk_state;
     758        1378 :             break;
     759             :         }
     760             : 
     761           4 :         if (readBytes < 0)
     762             :         {
     763           0 :             ereport(PANIC,
     764             :                     (errcode_for_file_access(),
     765             :                      errmsg("could not read file \"%s\": %m",
     766             :                             path)));
     767             :         }
     768             : 
     769           4 :         if (readBytes != sizeof(disk_state))
     770             :         {
     771           0 :             ereport(PANIC,
     772             :                     (errcode_for_file_access(),
     773             :                      errmsg("could not read file \"%s\": read %d of %zu",
     774             :                             path, readBytes, sizeof(disk_state))));
     775             :         }
     776             : 
     777           4 :         COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
     778             : 
     779           4 :         if (last_state == max_replication_slots)
     780           0 :             ereport(PANIC,
     781             :                     (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     782             :                      errmsg("could not find free replication state, increase max_replication_slots")));
     783             : 
     784             :         /* copy data to shared memory */
     785           4 :         replication_states[last_state].roident = disk_state.roident;
     786           4 :         replication_states[last_state].remote_lsn = disk_state.remote_lsn;
     787           4 :         last_state++;
     788             : 
     789           4 :         ereport(LOG,
     790             :                 (errmsg("recovered replication state of node %u to %X/%X",
     791             :                         disk_state.roident,
     792             :                         LSN_FORMAT_ARGS(disk_state.remote_lsn))));
     793             :     }
     794             : 
     795             :     /* now check checksum */
     796        1378 :     FIN_CRC32C(crc);
     797        1378 :     if (file_crc != crc)
     798           0 :         ereport(PANIC,
     799             :                 (errcode(ERRCODE_DATA_CORRUPTED),
     800             :                  errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
     801             :                         crc, file_crc)));
     802             : 
     803        1378 :     if (CloseTransientFile(fd) != 0)
     804           0 :         ereport(PANIC,
     805             :                 (errcode_for_file_access(),
     806             :                  errmsg("could not close file \"%s\": %m",
     807             :                         path)));
     808             : }
     809             : 
     810             : void
     811           8 : replorigin_redo(XLogReaderState *record)
     812             : {
     813           8 :     uint8       info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
     814             : 
     815           8 :     switch (info)
     816             :     {
     817           4 :         case XLOG_REPLORIGIN_SET:
     818             :             {
     819           4 :                 xl_replorigin_set *xlrec =
     820             :                 (xl_replorigin_set *) XLogRecGetData(record);
     821             : 
     822           4 :                 replorigin_advance(xlrec->node_id,
     823             :                                    xlrec->remote_lsn, record->EndRecPtr,
     824           4 :                                    xlrec->force /* backward */ ,
     825             :                                    false /* WAL log */ );
     826           4 :                 break;
     827             :             }
     828           4 :         case XLOG_REPLORIGIN_DROP:
     829             :             {
     830             :                 xl_replorigin_drop *xlrec;
     831             :                 int         i;
     832             : 
     833           4 :                 xlrec = (xl_replorigin_drop *) XLogRecGetData(record);
     834             : 
     835           4 :                 for (i = 0; i < max_replication_slots; i++)
     836             :                 {
     837           4 :                     ReplicationState *state = &replication_states[i];
     838             : 
     839             :                     /* found our slot */
     840           4 :                     if (state->roident == xlrec->node_id)
     841             :                     {
     842             :                         /* reset entry */
     843           4 :                         state->roident = InvalidRepOriginId;
     844           4 :                         state->remote_lsn = InvalidXLogRecPtr;
     845           4 :                         state->local_lsn = InvalidXLogRecPtr;
     846           4 :                         break;
     847             :                     }
     848             :                 }
     849           4 :                 break;
     850             :             }
     851           0 :         default:
     852           0 :             elog(PANIC, "replorigin_redo: unknown op code %u", info);
     853             :     }
     854           8 : }
     855             : 
     856             : 
     857             : /*
     858             :  * Tell the replication origin progress machinery that a commit from 'node'
     859             :  * that originated at the LSN remote_commit on the remote node was replayed
     860             :  * successfully and that we don't need to do so again. In combination with
     861             :  * setting up replorigin_session_origin_lsn and replorigin_session_origin
     862             :  * that ensures we won't lose knowledge about that after a crash if the
     863             :  * transaction had a persistent effect (think of asynchronous commits).
     864             :  *
     865             :  * local_commit needs to be a local LSN of the commit so that we can make sure
     866             :  * upon a checkpoint that enough WAL has been persisted to disk.
     867             :  *
     868             :  * Needs to be called with a RowExclusiveLock on pg_replication_origin,
     869             :  * unless running in recovery.
     870             :  */
     871             : void
     872         220 : replorigin_advance(RepOriginId node,
     873             :                    XLogRecPtr remote_commit, XLogRecPtr local_commit,
     874             :                    bool go_backward, bool wal_log)
     875             : {
     876             :     int         i;
     877         220 :     ReplicationState *replication_state = NULL;
     878         220 :     ReplicationState *free_state = NULL;
     879             : 
     880             :     Assert(node != InvalidRepOriginId);
     881             : 
     882             :     /* we don't track DoNotReplicateId */
     883         220 :     if (node == DoNotReplicateId)
     884           0 :         return;
     885             : 
     886             :     /*
     887             :      * XXX: For the case where this is called by WAL replay, it'd be more
     888             :      * efficient to restore into a backend local hashtable and only dump into
     889             :      * shmem after recovery is finished. Let's wait with implementing that
     890             :      * till it's shown to be a measurable expense
     891             :      */
     892             : 
     893             :     /* Lock exclusively, as we may have to create a new table entry. */
     894         220 :     LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
     895             : 
     896             :     /*
     897             :      * Search for either an existing slot for the origin, or a free one we can
     898             :      * use.
     899             :      */
     900        1984 :     for (i = 0; i < max_replication_slots; i++)
     901             :     {
     902        1808 :         ReplicationState *curstate = &replication_states[i];
     903             : 
     904             :         /* remember where to insert if necessary */
     905        1808 :         if (curstate->roident == InvalidRepOriginId &&
     906             :             free_state == NULL)
     907             :         {
     908         176 :             free_state = curstate;
     909         176 :             continue;
     910             :         }
     911             : 
     912             :         /* not our slot */
     913        1632 :         if (curstate->roident != node)
     914             :         {
     915        1588 :             continue;
     916             :         }
     917             : 
     918             :         /* ok, found slot */
     919          44 :         replication_state = curstate;
     920             : 
     921          44 :         LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE);
     922             : 
     923             :         /* Make sure it's not used by somebody else */
     924          44 :         if (replication_state->acquired_by != 0)
     925             :         {
     926           0 :             ereport(ERROR,
     927             :                     (errcode(ERRCODE_OBJECT_IN_USE),
     928             :                      errmsg("replication origin with OID %d is already active for PID %d",
     929             :                             replication_state->roident,
     930             :                             replication_state->acquired_by)));
     931             :         }
     932             : 
     933          44 :         break;
     934             :     }
     935             : 
     936         220 :     if (replication_state == NULL && free_state == NULL)
     937           0 :         ereport(ERROR,
     938             :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     939             :                  errmsg("could not find free replication state slot for replication origin with OID %u",
     940             :                         node),
     941             :                  errhint("Increase max_replication_slots and try again.")));
     942             : 
     943         220 :     if (replication_state == NULL)
     944             :     {
     945             :         /* initialize new slot */
     946         176 :         LWLockAcquire(&free_state->lock, LW_EXCLUSIVE);
     947         176 :         replication_state = free_state;
     948             :         Assert(replication_state->remote_lsn == InvalidXLogRecPtr);
     949             :         Assert(replication_state->local_lsn == InvalidXLogRecPtr);
     950         176 :         replication_state->roident = node;
     951             :     }
     952             : 
     953             :     Assert(replication_state->roident != InvalidRepOriginId);
     954             : 
     955             :     /*
     956             :      * If somebody "forcefully" sets this slot, WAL log it, so it's durable
     957             :      * and the standby gets the message. Primarily this will be called during
     958             :      * WAL replay (of commit records) where no WAL logging is necessary.
     959             :      */
     960         220 :     if (wal_log)
     961             :     {
     962             :         xl_replorigin_set xlrec;
     963             : 
     964         172 :         xlrec.remote_lsn = remote_commit;
     965         172 :         xlrec.node_id = node;
     966         172 :         xlrec.force = go_backward;
     967             : 
     968         172 :         XLogBeginInsert();
     969         172 :         XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
     970             : 
     971         172 :         XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_SET);
     972             :     }
     973             : 
     974             :     /*
     975             :      * Due to - harmless - race conditions during a checkpoint we could see
     976             :      * values here that are older than the ones we already have in memory. We
     977             :      * could also see older values for prepared transactions when the prepare
     978             :      * is sent at a later point of time along with commit prepared and there
     979             :      * are other transactions commits between prepare and commit prepared. See
     980             :      * ReorderBufferFinishPrepared. Don't overwrite those.
     981             :      */
     982         220 :     if (go_backward || replication_state->remote_lsn < remote_commit)
     983         204 :         replication_state->remote_lsn = remote_commit;
     984         220 :     if (local_commit != InvalidXLogRecPtr &&
     985          44 :         (go_backward || replication_state->local_lsn < local_commit))
     986          48 :         replication_state->local_lsn = local_commit;
     987         220 :     LWLockRelease(&replication_state->lock);
     988             : 
     989             :     /*
     990             :      * Release *after* changing the LSNs, slot isn't acquired and thus could
     991             :      * otherwise be dropped anytime.
     992             :      */
     993         220 :     LWLockRelease(ReplicationOriginLock);
     994             : }
     995             : 
     996             : 
     997             : XLogRecPtr
     998           4 : replorigin_get_progress(RepOriginId node, bool flush)
     999             : {
    1000             :     int         i;
    1001           4 :     XLogRecPtr  local_lsn = InvalidXLogRecPtr;
    1002           4 :     XLogRecPtr  remote_lsn = InvalidXLogRecPtr;
    1003             : 
    1004             :     /* prevent slots from being concurrently dropped */
    1005           4 :     LWLockAcquire(ReplicationOriginLock, LW_SHARED);
    1006             : 
    1007           4 :     for (i = 0; i < max_replication_slots; i++)
    1008             :     {
    1009             :         ReplicationState *state;
    1010             : 
    1011           4 :         state = &replication_states[i];
    1012             : 
    1013           4 :         if (state->roident == node)
    1014             :         {
    1015           4 :             LWLockAcquire(&state->lock, LW_SHARED);
    1016             : 
    1017           4 :             remote_lsn = state->remote_lsn;
    1018           4 :             local_lsn = state->local_lsn;
    1019             : 
    1020           4 :             LWLockRelease(&state->lock);
    1021             : 
    1022           4 :             break;
    1023             :         }
    1024             :     }
    1025             : 
    1026           4 :     LWLockRelease(ReplicationOriginLock);
    1027             : 
    1028           4 :     if (flush && local_lsn != InvalidXLogRecPtr)
    1029           2 :         XLogFlush(local_lsn);
    1030             : 
    1031           4 :     return remote_lsn;
    1032             : }
    1033             : 
    1034             : /*
    1035             :  * Tear down a (possibly) configured session replication origin during process
    1036             :  * exit.
    1037             :  */
    1038             : static void
    1039         304 : ReplicationOriginExitCleanup(int code, Datum arg)
    1040             : {
    1041         304 :     ConditionVariable *cv = NULL;
    1042             : 
    1043         304 :     LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
    1044             : 
    1045         304 :     if (session_replication_state != NULL &&
    1046         300 :         session_replication_state->acquired_by == MyProcPid)
    1047             :     {
    1048         300 :         cv = &session_replication_state->origin_cv;
    1049             : 
    1050         300 :         session_replication_state->acquired_by = 0;
    1051         300 :         session_replication_state = NULL;
    1052             :     }
    1053             : 
    1054         304 :     LWLockRelease(ReplicationOriginLock);
    1055             : 
    1056         304 :     if (cv)
    1057         300 :         ConditionVariableBroadcast(cv);
    1058         304 : }
    1059             : 
    1060             : /*
    1061             :  * Setup a replication origin in the shared memory struct if it doesn't
    1062             :  * already exists and cache access to the specific ReplicationSlot so the
    1063             :  * array doesn't have to be searched when calling
    1064             :  * replorigin_session_advance().
    1065             :  *
    1066             :  * Obviously only one such cached origin can exist per process and the current
    1067             :  * cached value can only be set again after the previous value is torn down
    1068             :  * with replorigin_session_reset().
    1069             :  */
    1070             : void
    1071         306 : replorigin_session_setup(RepOriginId node)
    1072             : {
    1073             :     static bool registered_cleanup;
    1074             :     int         i;
    1075         306 :     int         free_slot = -1;
    1076             : 
    1077         306 :     if (!registered_cleanup)
    1078             :     {
    1079         304 :         on_shmem_exit(ReplicationOriginExitCleanup, 0);
    1080         304 :         registered_cleanup = true;
    1081             :     }
    1082             : 
    1083             :     Assert(max_replication_slots > 0);
    1084             : 
    1085         306 :     if (session_replication_state != NULL)
    1086           2 :         ereport(ERROR,
    1087             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1088             :                  errmsg("cannot setup replication origin when one is already setup")));
    1089             : 
    1090             :     /* Lock exclusively, as we may have to create a new table entry. */
    1091         304 :     LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
    1092             : 
    1093             :     /*
    1094             :      * Search for either an existing slot for the origin, or a free one we can
    1095             :      * use.
    1096             :      */
    1097        3332 :     for (i = 0; i < max_replication_slots; i++)
    1098             :     {
    1099        3028 :         ReplicationState *curstate = &replication_states[i];
    1100             : 
    1101             :         /* remember where to insert if necessary */
    1102        3028 :         if (curstate->roident == InvalidRepOriginId &&
    1103             :             free_slot == -1)
    1104             :         {
    1105         304 :             free_slot = i;
    1106         304 :             continue;
    1107             :         }
    1108             : 
    1109             :         /* not our slot */
    1110        2724 :         if (curstate->roident != node)
    1111        2508 :             continue;
    1112             : 
    1113         216 :         else if (curstate->acquired_by != 0)
    1114             :         {
    1115           0 :             ereport(ERROR,
    1116             :                     (errcode(ERRCODE_OBJECT_IN_USE),
    1117             :                      errmsg("replication origin with OID %d is already active for PID %d",
    1118             :                             curstate->roident, curstate->acquired_by)));
    1119             :         }
    1120             : 
    1121             :         /* ok, found slot */
    1122         216 :         session_replication_state = curstate;
    1123             :     }
    1124             : 
    1125             : 
    1126         304 :     if (session_replication_state == NULL && free_slot == -1)
    1127           0 :         ereport(ERROR,
    1128             :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
    1129             :                  errmsg("could not find free replication state slot for replication origin with OID %u",
    1130             :                         node),
    1131             :                  errhint("Increase max_replication_slots and try again.")));
    1132         304 :     else if (session_replication_state == NULL)
    1133             :     {
    1134             :         /* initialize new slot */
    1135          88 :         session_replication_state = &replication_states[free_slot];
    1136             :         Assert(session_replication_state->remote_lsn == InvalidXLogRecPtr);
    1137             :         Assert(session_replication_state->local_lsn == InvalidXLogRecPtr);
    1138          88 :         session_replication_state->roident = node;
    1139             :     }
    1140             : 
    1141             : 
    1142             :     Assert(session_replication_state->roident != InvalidRepOriginId);
    1143             : 
    1144         304 :     session_replication_state->acquired_by = MyProcPid;
    1145             : 
    1146         304 :     LWLockRelease(ReplicationOriginLock);
    1147             : 
    1148             :     /* probably this one is pointless */
    1149         304 :     ConditionVariableBroadcast(&session_replication_state->origin_cv);
    1150         304 : }
    1151             : 
    1152             : /*
    1153             :  * Reset replay state previously setup in this session.
    1154             :  *
    1155             :  * This function may only be called if an origin was setup with
    1156             :  * replorigin_session_setup().
    1157             :  */
    1158             : void
    1159           6 : replorigin_session_reset(void)
    1160             : {
    1161             :     ConditionVariable *cv;
    1162             : 
    1163             :     Assert(max_replication_slots != 0);
    1164             : 
    1165           6 :     if (session_replication_state == NULL)
    1166           2 :         ereport(ERROR,
    1167             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1168             :                  errmsg("no replication origin is configured")));
    1169             : 
    1170           4 :     LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
    1171             : 
    1172           4 :     session_replication_state->acquired_by = 0;
    1173           4 :     cv = &session_replication_state->origin_cv;
    1174           4 :     session_replication_state = NULL;
    1175             : 
    1176           4 :     LWLockRelease(ReplicationOriginLock);
    1177             : 
    1178           4 :     ConditionVariableBroadcast(cv);
    1179           4 : }
    1180             : 
    1181             : /*
    1182             :  * Do the same work replorigin_advance() does, just on the session's
    1183             :  * configured origin.
    1184             :  *
    1185             :  * This is noticeably cheaper than using replorigin_advance().
    1186             :  */
    1187             : void
    1188        1016 : replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit)
    1189             : {
    1190             :     Assert(session_replication_state != NULL);
    1191             :     Assert(session_replication_state->roident != InvalidRepOriginId);
    1192             : 
    1193        1016 :     LWLockAcquire(&session_replication_state->lock, LW_EXCLUSIVE);
    1194        1016 :     if (session_replication_state->local_lsn < local_commit)
    1195        1016 :         session_replication_state->local_lsn = local_commit;
    1196        1016 :     if (session_replication_state->remote_lsn < remote_commit)
    1197         524 :         session_replication_state->remote_lsn = remote_commit;
    1198        1016 :     LWLockRelease(&session_replication_state->lock);
    1199        1016 : }
    1200             : 
    1201             : /*
    1202             :  * Ask the machinery about the point up to which we successfully replayed
    1203             :  * changes from an already setup replication origin.
    1204             :  */
    1205             : XLogRecPtr
    1206         132 : replorigin_session_get_progress(bool flush)
    1207             : {
    1208             :     XLogRecPtr  remote_lsn;
    1209             :     XLogRecPtr  local_lsn;
    1210             : 
    1211             :     Assert(session_replication_state != NULL);
    1212             : 
    1213         132 :     LWLockAcquire(&session_replication_state->lock, LW_SHARED);
    1214         132 :     remote_lsn = session_replication_state->remote_lsn;
    1215         132 :     local_lsn = session_replication_state->local_lsn;
    1216         132 :     LWLockRelease(&session_replication_state->lock);
    1217             : 
    1218         132 :     if (flush && local_lsn != InvalidXLogRecPtr)
    1219           2 :         XLogFlush(local_lsn);
    1220             : 
    1221         132 :     return remote_lsn;
    1222             : }
    1223             : 
    1224             : 
    1225             : 
    1226             : /* ---------------------------------------------------------------------------
    1227             :  * SQL functions for working with replication origin.
    1228             :  *
    1229             :  * These mostly should be fairly short wrappers around more generic functions.
    1230             :  * ---------------------------------------------------------------------------
    1231             :  */
    1232             : 
    1233             : /*
    1234             :  * Create replication origin for the passed in name, and return the assigned
    1235             :  * oid.
    1236             :  */
    1237             : Datum
    1238           8 : pg_replication_origin_create(PG_FUNCTION_ARGS)
    1239             : {
    1240             :     char       *name;
    1241             :     RepOriginId roident;
    1242             : 
    1243           8 :     replorigin_check_prerequisites(false, false);
    1244             : 
    1245           8 :     name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
    1246             : 
    1247             :     /* Replication origins "pg_xxx" are reserved for internal use */
    1248           8 :     if (IsReservedName(name))
    1249           0 :         ereport(ERROR,
    1250             :                 (errcode(ERRCODE_RESERVED_NAME),
    1251             :                  errmsg("replication origin name \"%s\" is reserved",
    1252             :                         name),
    1253             :                  errdetail("Origin names starting with \"pg_\" are reserved.")));
    1254             : 
    1255             :     /*
    1256             :      * If built with appropriate switch, whine when regression-testing
    1257             :      * conventions for replication origin names are violated.
    1258             :      */
    1259             : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
    1260             :     if (strncmp(name, "regress_", 8) != 0)
    1261             :         elog(WARNING, "replication origins created by regression test cases should have names starting with \"regress_\"");
    1262             : #endif
    1263             : 
    1264           8 :     roident = replorigin_create(name);
    1265             : 
    1266           6 :     pfree(name);
    1267             : 
    1268           6 :     PG_RETURN_OID(roident);
    1269             : }
    1270             : 
    1271             : /*
    1272             :  * Drop replication origin.
    1273             :  */
    1274             : Datum
    1275           8 : pg_replication_origin_drop(PG_FUNCTION_ARGS)
    1276             : {
    1277             :     char       *name;
    1278             : 
    1279           8 :     replorigin_check_prerequisites(false, false);
    1280             : 
    1281           8 :     name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
    1282             : 
    1283           8 :     replorigin_drop_by_name(name, false, true);
    1284             : 
    1285           6 :     pfree(name);
    1286             : 
    1287           6 :     PG_RETURN_VOID();
    1288             : }
    1289             : 
    1290             : /*
    1291             :  * Return oid of a replication origin.
    1292             :  */
    1293             : Datum
    1294           0 : pg_replication_origin_oid(PG_FUNCTION_ARGS)
    1295             : {
    1296             :     char       *name;
    1297             :     RepOriginId roident;
    1298             : 
    1299           0 :     replorigin_check_prerequisites(false, false);
    1300             : 
    1301           0 :     name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
    1302           0 :     roident = replorigin_by_name(name, true);
    1303             : 
    1304           0 :     pfree(name);
    1305             : 
    1306           0 :     if (OidIsValid(roident))
    1307           0 :         PG_RETURN_OID(roident);
    1308           0 :     PG_RETURN_NULL();
    1309             : }
    1310             : 
    1311             : /*
    1312             :  * Setup a replication origin for this session.
    1313             :  */
    1314             : Datum
    1315           8 : pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
    1316             : {
    1317             :     char       *name;
    1318             :     RepOriginId origin;
    1319             : 
    1320           8 :     replorigin_check_prerequisites(true, false);
    1321             : 
    1322           8 :     name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
    1323           8 :     origin = replorigin_by_name(name, false);
    1324           6 :     replorigin_session_setup(origin);
    1325             : 
    1326           4 :     replorigin_session_origin = origin;
    1327             : 
    1328           4 :     pfree(name);
    1329             : 
    1330           4 :     PG_RETURN_VOID();
    1331             : }
    1332             : 
    1333             : /*
    1334             :  * Reset previously setup origin in this session
    1335             :  */
    1336             : Datum
    1337           6 : pg_replication_origin_session_reset(PG_FUNCTION_ARGS)
    1338             : {
    1339           6 :     replorigin_check_prerequisites(true, false);
    1340             : 
    1341           6 :     replorigin_session_reset();
    1342             : 
    1343           4 :     replorigin_session_origin = InvalidRepOriginId;
    1344           4 :     replorigin_session_origin_lsn = InvalidXLogRecPtr;
    1345           4 :     replorigin_session_origin_timestamp = 0;
    1346             : 
    1347           4 :     PG_RETURN_VOID();
    1348             : }
    1349             : 
    1350             : /*
    1351             :  * Has a replication origin been setup for this session.
    1352             :  */
    1353             : Datum
    1354           0 : pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)
    1355             : {
    1356           0 :     replorigin_check_prerequisites(false, false);
    1357             : 
    1358           0 :     PG_RETURN_BOOL(replorigin_session_origin != InvalidRepOriginId);
    1359             : }
    1360             : 
    1361             : 
    1362             : /*
    1363             :  * Return the replication progress for origin setup in the current session.
    1364             :  *
    1365             :  * If 'flush' is set to true it is ensured that the returned value corresponds
    1366             :  * to a local transaction that has been flushed. This is useful if asynchronous
    1367             :  * commits are used when replaying replicated transactions.
    1368             :  */
    1369             : Datum
    1370           4 : pg_replication_origin_session_progress(PG_FUNCTION_ARGS)
    1371             : {
    1372           4 :     XLogRecPtr  remote_lsn = InvalidXLogRecPtr;
    1373           4 :     bool        flush = PG_GETARG_BOOL(0);
    1374             : 
    1375           4 :     replorigin_check_prerequisites(true, false);
    1376             : 
    1377           4 :     if (session_replication_state == NULL)
    1378           0 :         ereport(ERROR,
    1379             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1380             :                  errmsg("no replication origin is configured")));
    1381             : 
    1382           4 :     remote_lsn = replorigin_session_get_progress(flush);
    1383             : 
    1384           4 :     if (remote_lsn == InvalidXLogRecPtr)
    1385           0 :         PG_RETURN_NULL();
    1386             : 
    1387           4 :     PG_RETURN_LSN(remote_lsn);
    1388             : }
    1389             : 
    1390             : Datum
    1391           2 : pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)
    1392             : {
    1393           2 :     XLogRecPtr  location = PG_GETARG_LSN(0);
    1394             : 
    1395           2 :     replorigin_check_prerequisites(true, false);
    1396             : 
    1397           2 :     if (session_replication_state == NULL)
    1398           0 :         ereport(ERROR,
    1399             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1400             :                  errmsg("no replication origin is configured")));
    1401             : 
    1402           2 :     replorigin_session_origin_lsn = location;
    1403           2 :     replorigin_session_origin_timestamp = PG_GETARG_TIMESTAMPTZ(1);
    1404             : 
    1405           2 :     PG_RETURN_VOID();
    1406             : }
    1407             : 
    1408             : Datum
    1409           0 : pg_replication_origin_xact_reset(PG_FUNCTION_ARGS)
    1410             : {
    1411           0 :     replorigin_check_prerequisites(true, false);
    1412             : 
    1413           0 :     replorigin_session_origin_lsn = InvalidXLogRecPtr;
    1414           0 :     replorigin_session_origin_timestamp = 0;
    1415             : 
    1416           0 :     PG_RETURN_VOID();
    1417             : }
    1418             : 
    1419             : 
    1420             : Datum
    1421           2 : pg_replication_origin_advance(PG_FUNCTION_ARGS)
    1422             : {
    1423           2 :     text       *name = PG_GETARG_TEXT_PP(0);
    1424           2 :     XLogRecPtr  remote_commit = PG_GETARG_LSN(1);
    1425             :     RepOriginId node;
    1426             : 
    1427           2 :     replorigin_check_prerequisites(true, false);
    1428             : 
    1429             :     /* lock to prevent the replication origin from vanishing */
    1430           2 :     LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
    1431             : 
    1432           2 :     node = replorigin_by_name(text_to_cstring(name), false);
    1433             : 
    1434             :     /*
    1435             :      * Can't sensibly pass a local commit to be flushed at checkpoint - this
    1436             :      * xact hasn't committed yet. This is why this function should be used to
    1437             :      * set up the initial replication state, but not for replay.
    1438             :      */
    1439           0 :     replorigin_advance(node, remote_commit, InvalidXLogRecPtr,
    1440             :                        true /* go backward */ , true /* WAL log */ );
    1441             : 
    1442           0 :     UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
    1443             : 
    1444           0 :     PG_RETURN_VOID();
    1445             : }
    1446             : 
    1447             : 
    1448             : /*
    1449             :  * Return the replication progress for an individual replication origin.
    1450             :  *
    1451             :  * If 'flush' is set to true it is ensured that the returned value corresponds
    1452             :  * to a local transaction that has been flushed. This is useful if asynchronous
    1453             :  * commits are used when replaying replicated transactions.
    1454             :  */
    1455             : Datum
    1456           6 : pg_replication_origin_progress(PG_FUNCTION_ARGS)
    1457             : {
    1458             :     char       *name;
    1459             :     bool        flush;
    1460             :     RepOriginId roident;
    1461           6 :     XLogRecPtr  remote_lsn = InvalidXLogRecPtr;
    1462             : 
    1463           6 :     replorigin_check_prerequisites(true, true);
    1464             : 
    1465           6 :     name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
    1466           6 :     flush = PG_GETARG_BOOL(1);
    1467             : 
    1468           6 :     roident = replorigin_by_name(name, false);
    1469             :     Assert(OidIsValid(roident));
    1470             : 
    1471           4 :     remote_lsn = replorigin_get_progress(roident, flush);
    1472             : 
    1473           4 :     if (remote_lsn == InvalidXLogRecPtr)
    1474           0 :         PG_RETURN_NULL();
    1475             : 
    1476           4 :     PG_RETURN_LSN(remote_lsn);
    1477             : }
    1478             : 
    1479             : 
    1480             : Datum
    1481           2 : pg_show_replication_origin_status(PG_FUNCTION_ARGS)
    1482             : {
    1483           2 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
    1484             :     TupleDesc   tupdesc;
    1485             :     Tuplestorestate *tupstore;
    1486             :     MemoryContext per_query_ctx;
    1487             :     MemoryContext oldcontext;
    1488             :     int         i;
    1489             : #define REPLICATION_ORIGIN_PROGRESS_COLS 4
    1490             : 
    1491             :     /* we want to return 0 rows if slot is set to zero */
    1492           2 :     replorigin_check_prerequisites(false, true);
    1493             : 
    1494           2 :     if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
    1495           0 :         ereport(ERROR,
    1496             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1497             :                  errmsg("set-valued function called in context that cannot accept a set")));
    1498           2 :     if (!(rsinfo->allowedModes & SFRM_Materialize))
    1499           0 :         ereport(ERROR,
    1500             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1501             :                  errmsg("materialize mode required, but it is not allowed in this context")));
    1502           2 :     if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
    1503           0 :         elog(ERROR, "return type must be a row type");
    1504             : 
    1505           2 :     if (tupdesc->natts != REPLICATION_ORIGIN_PROGRESS_COLS)
    1506           0 :         elog(ERROR, "wrong function definition");
    1507             : 
    1508           2 :     per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
    1509           2 :     oldcontext = MemoryContextSwitchTo(per_query_ctx);
    1510             : 
    1511           2 :     tupstore = tuplestore_begin_heap(true, false, work_mem);
    1512           2 :     rsinfo->returnMode = SFRM_Materialize;
    1513           2 :     rsinfo->setResult = tupstore;
    1514           2 :     rsinfo->setDesc = tupdesc;
    1515             : 
    1516           2 :     MemoryContextSwitchTo(oldcontext);
    1517             : 
    1518             : 
    1519             :     /* prevent slots from being concurrently dropped */
    1520           2 :     LWLockAcquire(ReplicationOriginLock, LW_SHARED);
    1521             : 
    1522             :     /*
    1523             :      * Iterate through all possible replication_states, display if they are
    1524             :      * filled. Note that we do not take any locks, so slightly corrupted/out
    1525             :      * of date values are a possibility.
    1526             :      */
    1527          10 :     for (i = 0; i < max_replication_slots; i++)
    1528             :     {
    1529             :         ReplicationState *state;
    1530             :         Datum       values[REPLICATION_ORIGIN_PROGRESS_COLS];
    1531             :         bool        nulls[REPLICATION_ORIGIN_PROGRESS_COLS];
    1532             :         char       *roname;
    1533             : 
    1534           8 :         state = &replication_states[i];
    1535             : 
    1536             :         /* unused slot, nothing to display */
    1537           8 :         if (state->roident == InvalidRepOriginId)
    1538           6 :             continue;
    1539             : 
    1540           2 :         memset(values, 0, sizeof(values));
    1541           2 :         memset(nulls, 1, sizeof(nulls));
    1542             : 
    1543           2 :         values[0] = ObjectIdGetDatum(state->roident);
    1544           2 :         nulls[0] = false;
    1545             : 
    1546             :         /*
    1547             :          * We're not preventing the origin to be dropped concurrently, so
    1548             :          * silently accept that it might be gone.
    1549             :          */
    1550           2 :         if (replorigin_by_oid(state->roident, true,
    1551             :                               &roname))
    1552             :         {
    1553           2 :             values[1] = CStringGetTextDatum(roname);
    1554           2 :             nulls[1] = false;
    1555             :         }
    1556             : 
    1557           2 :         LWLockAcquire(&state->lock, LW_SHARED);
    1558             : 
    1559           2 :         values[2] = LSNGetDatum(state->remote_lsn);
    1560           2 :         nulls[2] = false;
    1561             : 
    1562           2 :         values[3] = LSNGetDatum(state->local_lsn);
    1563           2 :         nulls[3] = false;
    1564             : 
    1565           2 :         LWLockRelease(&state->lock);
    1566             : 
    1567           2 :         tuplestore_putvalues(tupstore, tupdesc, values, nulls);
    1568             :     }
    1569             : 
    1570             :     tuplestore_donestoring(tupstore);
    1571             : 
    1572           2 :     LWLockRelease(ReplicationOriginLock);
    1573             : 
    1574             : #undef REPLICATION_ORIGIN_PROGRESS_COLS
    1575             : 
    1576           2 :     return (Datum) 0;
    1577             : }

Generated by: LCOV version 1.14