LCOV - code coverage report
Current view: top level - src/backend/replication/logical - origin.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 86.9 % 467 406
Test Date: 2026-04-07 14:16:30 Functions: 91.2 % 34 31
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * origin.c
       4              :  *    Logical replication progress tracking support.
       5              :  *
       6              :  * Copyright (c) 2013-2026, PostgreSQL Global Development Group
       7              :  *
       8              :  * IDENTIFICATION
       9              :  *    src/backend/replication/logical/origin.c
      10              :  *
      11              :  * NOTES
      12              :  *
      13              :  * This file provides the following:
      14              :  * * An infrastructure to name nodes in a replication setup
      15              :  * * A facility to efficiently store and persist replication progress in an
      16              :  *   efficient and durable manner.
      17              :  *
      18              :  * Replication origin consists of a descriptive, user defined, external
      19              :  * name and a short, thus space efficient, internal 2 byte one. This split
      20              :  * exists because replication origin have to be stored in WAL and shared
      21              :  * memory and long descriptors would be inefficient.  For now only use 2 bytes
      22              :  * for the internal id of a replication origin as it seems unlikely that there
      23              :  * soon will be more than 65k nodes in one replication setup; and using only
      24              :  * two bytes allow us to be more space efficient.
      25              :  *
      26              :  * Replication progress is tracked in a shared memory table
      27              :  * (ReplicationState) that's dumped to disk every checkpoint. Entries
      28              :  * ('slots') in this table are identified by the internal id. That's the case
      29              :  * because it allows to increase replication progress during crash
      30              :  * recovery. To allow doing so we store the original LSN (from the originating
      31              :  * system) of a transaction in the commit record. That allows to recover the
      32              :  * precise replayed state after crash recovery; without requiring synchronous
      33              :  * commits. Allowing logical replication to use asynchronous commit is
      34              :  * generally good for performance, but especially important as it allows a
      35              :  * single threaded replay process to keep up with a source that has multiple
      36              :  * backends generating changes concurrently.  For efficiency and simplicity
      37              :  * reasons a backend can setup one replication origin that's from then used as
      38              :  * the source of changes produced by the backend, until reset again.
      39              :  *
      40              :  * This infrastructure is intended to be used in cooperation with logical
      41              :  * decoding. When replaying from a remote system the configured origin is
      42              :  * provided to output plugins, allowing prevention of replication loops and
      43              :  * other filtering.
      44              :  *
      45              :  * There are several levels of locking at work:
      46              :  *
      47              :  * * To create and drop replication origins an exclusive lock on
      48              :  *   pg_replication_slot is required for the duration. That allows us to
      49              :  *   safely and conflict free assign new origins using a dirty snapshot.
      50              :  *
      51              :  * * When creating an in-memory replication progress slot the ReplicationOrigin
      52              :  *   LWLock has to be held exclusively; when iterating over the replication
      53              :  *   progress a shared lock has to be held, the same when advancing the
      54              :  *   replication progress of an individual backend that has not setup as the
      55              :  *   session's replication origin.
      56              :  *
      57              :  * * When manipulating or looking at the remote_lsn and local_lsn fields of a
      58              :  *   replication progress slot that slot's lwlock has to be held. That's
      59              :  *   primarily because we do not assume 8 byte writes (the LSN) is atomic on
      60              :  *   all our platforms, but it also simplifies memory ordering concerns
      61              :  *   between the remote and local lsn. We use a lwlock instead of a spinlock
      62              :  *   so it's less harmful to hold the lock over a WAL write
      63              :  *   (cf. AdvanceReplicationProgress).
      64              :  *
      65              :  * ---------------------------------------------------------------------------
      66              :  */
      67              : 
      68              : #include "postgres.h"
      69              : 
      70              : #include <unistd.h>
      71              : #include <sys/stat.h>
      72              : 
      73              : #include "access/genam.h"
      74              : #include "access/htup_details.h"
      75              : #include "access/table.h"
      76              : #include "access/xact.h"
      77              : #include "access/xloginsert.h"
      78              : #include "catalog/catalog.h"
      79              : #include "catalog/indexing.h"
      80              : #include "catalog/pg_subscription.h"
      81              : #include "funcapi.h"
      82              : #include "miscadmin.h"
      83              : #include "nodes/execnodes.h"
      84              : #include "pgstat.h"
      85              : #include "replication/origin.h"
      86              : #include "replication/slot.h"
      87              : #include "storage/condition_variable.h"
      88              : #include "storage/fd.h"
      89              : #include "storage/ipc.h"
      90              : #include "storage/lmgr.h"
      91              : #include "storage/subsystems.h"
      92              : #include "utils/builtins.h"
      93              : #include "utils/fmgroids.h"
      94              : #include "utils/guc.h"
      95              : #include "utils/pg_lsn.h"
      96              : #include "utils/rel.h"
      97              : #include "utils/snapmgr.h"
      98              : #include "utils/syscache.h"
      99              : #include "utils/wait_event.h"
     100              : 
     101              : /* paths for replication origin checkpoint files */
     102              : #define PG_REPLORIGIN_CHECKPOINT_FILENAME PG_LOGICAL_DIR "/replorigin_checkpoint"
     103              : #define PG_REPLORIGIN_CHECKPOINT_TMPFILE PG_REPLORIGIN_CHECKPOINT_FILENAME ".tmp"
     104              : 
     105              : /* GUC variables */
     106              : int         max_active_replication_origins = 10;
     107              : 
     108              : /*
     109              :  * Replay progress of a single remote node.
     110              :  */
     111              : typedef struct ReplicationState
     112              : {
     113              :     /*
     114              :      * Local identifier for the remote node.
     115              :      */
     116              :     ReplOriginId roident;
     117              : 
     118              :     /*
     119              :      * Location of the latest commit from the remote side.
     120              :      */
     121              :     XLogRecPtr  remote_lsn;
     122              : 
     123              :     /*
     124              :      * Remember the local lsn of the commit record so we can XLogFlush() to it
     125              :      * during a checkpoint so we know the commit record actually is safe on
     126              :      * disk.
     127              :      */
     128              :     XLogRecPtr  local_lsn;
     129              : 
     130              :     /*
     131              :      * PID of backend that's acquired slot, or 0 if none.
     132              :      */
     133              :     int         acquired_by;
     134              : 
     135              :     /* Count of processes that are currently using this origin. */
     136              :     int         refcount;
     137              : 
     138              :     /*
     139              :      * Condition variable that's signaled when acquired_by changes.
     140              :      */
     141              :     ConditionVariable origin_cv;
     142              : 
     143              :     /*
     144              :      * Lock protecting remote_lsn and local_lsn.
     145              :      */
     146              :     LWLock      lock;
     147              : } ReplicationState;
     148              : 
     149              : /*
     150              :  * On disk version of ReplicationState.
     151              :  */
     152              : typedef struct ReplicationStateOnDisk
     153              : {
     154              :     ReplOriginId roident;
     155              :     XLogRecPtr  remote_lsn;
     156              : } ReplicationStateOnDisk;
     157              : 
     158              : 
     159              : typedef struct ReplicationStateCtl
     160              : {
     161              :     /* Tranche to use for per-origin LWLocks */
     162              :     int         tranche_id;
     163              :     /* Array of length max_active_replication_origins */
     164              :     ReplicationState states[FLEXIBLE_ARRAY_MEMBER];
     165              : } ReplicationStateCtl;
     166              : 
     167              : /* Global variable for per-transaction replication origin state */
     168              : ReplOriginXactState replorigin_xact_state = {
     169              :     .origin = InvalidReplOriginId,  /* assumed identity */
     170              :     .origin_lsn = InvalidXLogRecPtr,
     171              :     .origin_timestamp = 0
     172              : };
     173              : 
     174              : /*
     175              :  * Base address into a shared memory array of replication states of size
     176              :  * max_active_replication_origins.
     177              :  */
     178              : static ReplicationState *replication_states;
     179              : 
     180              : static void ReplicationOriginShmemRequest(void *arg);
     181              : static void ReplicationOriginShmemInit(void *arg);
     182              : static void ReplicationOriginShmemAttach(void *arg);
     183              : 
     184              : const ShmemCallbacks ReplicationOriginShmemCallbacks = {
     185              :     .request_fn = ReplicationOriginShmemRequest,
     186              :     .init_fn = ReplicationOriginShmemInit,
     187              :     .attach_fn = ReplicationOriginShmemAttach,
     188              : };
     189              : 
     190              : /*
     191              :  * Actual shared memory block (replication_states[] is now part of this).
     192              :  */
     193              : static ReplicationStateCtl *replication_states_ctl;
     194              : 
     195              : /*
     196              :  * We keep a pointer to this backend's ReplicationState to avoid having to
     197              :  * search the replication_states array in replorigin_session_advance for each
     198              :  * remote commit.  (Ownership of a backend's own entry can only be changed by
     199              :  * that backend.)
     200              :  */
     201              : static ReplicationState *session_replication_state = NULL;
     202              : 
     203              : /* Magic for on disk files. */
     204              : #define REPLICATION_STATE_MAGIC ((uint32) 0x1257DADE)
     205              : 
     206              : static void
     207           69 : replorigin_check_prerequisites(bool check_origins, bool recoveryOK)
     208              : {
     209           69 :     if (check_origins && max_active_replication_origins == 0)
     210            0 :         ereport(ERROR,
     211              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     212              :                  errmsg("cannot query or manipulate replication origin when \"max_active_replication_origins\" is 0")));
     213              : 
     214           69 :     if (!recoveryOK && RecoveryInProgress())
     215            0 :         ereport(ERROR,
     216              :                 (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
     217              :                  errmsg("cannot manipulate replication origins during recovery")));
     218           69 : }
     219              : 
     220              : 
     221              : /*
     222              :  * IsReservedOriginName
     223              :  *      True iff name is either "none" or "any".
     224              :  */
     225              : static bool
     226           14 : IsReservedOriginName(const char *name)
     227              : {
     228           27 :     return ((pg_strcasecmp(name, LOGICALREP_ORIGIN_NONE) == 0) ||
     229           13 :             (pg_strcasecmp(name, LOGICALREP_ORIGIN_ANY) == 0));
     230              : }
     231              : 
     232              : /* ---------------------------------------------------------------------------
     233              :  * Functions for working with replication origins themselves.
     234              :  * ---------------------------------------------------------------------------
     235              :  */
     236              : 
     237              : /*
     238              :  * Check for a persistent replication origin identified by name.
     239              :  *
     240              :  * Returns InvalidOid if the node isn't known yet and missing_ok is true.
     241              :  */
     242              : ReplOriginId
     243         1116 : replorigin_by_name(const char *roname, bool missing_ok)
     244              : {
     245              :     Form_pg_replication_origin ident;
     246         1116 :     Oid         roident = InvalidOid;
     247              :     HeapTuple   tuple;
     248              :     Datum       roname_d;
     249              : 
     250         1116 :     roname_d = CStringGetTextDatum(roname);
     251              : 
     252         1116 :     tuple = SearchSysCache1(REPLORIGNAME, roname_d);
     253         1116 :     if (HeapTupleIsValid(tuple))
     254              :     {
     255          710 :         ident = (Form_pg_replication_origin) GETSTRUCT(tuple);
     256          710 :         roident = ident->roident;
     257          710 :         ReleaseSysCache(tuple);
     258              :     }
     259          406 :     else if (!missing_ok)
     260            4 :         ereport(ERROR,
     261              :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
     262              :                  errmsg("replication origin \"%s\" does not exist",
     263              :                         roname)));
     264              : 
     265         1112 :     return roident;
     266              : }
     267              : 
     268              : /*
     269              :  * Create a replication origin.
     270              :  *
     271              :  * Needs to be called in a transaction.
     272              :  */
     273              : ReplOriginId
     274          411 : replorigin_create(const char *roname)
     275              : {
     276              :     Oid         roident;
     277          411 :     HeapTuple   tuple = NULL;
     278              :     Relation    rel;
     279              :     Datum       roname_d;
     280              :     SnapshotData SnapshotDirty;
     281              :     SysScanDesc scan;
     282              :     ScanKeyData key;
     283              : 
     284              :     /*
     285              :      * To avoid needing a TOAST table for pg_replication_origin, we limit
     286              :      * replication origin names to 512 bytes.  This should be more than enough
     287              :      * for all practical use.
     288              :      */
     289          411 :     if (strlen(roname) > MAX_RONAME_LEN)
     290            4 :         ereport(ERROR,
     291              :                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
     292              :                  errmsg("replication origin name is too long"),
     293              :                  errdetail("Replication origin names must be no longer than %d bytes.",
     294              :                            MAX_RONAME_LEN)));
     295              : 
     296          407 :     roname_d = CStringGetTextDatum(roname);
     297              : 
     298              :     Assert(IsTransactionState());
     299              : 
     300              :     /*
     301              :      * We need the numeric replication origin to be 16bit wide, so we cannot
     302              :      * rely on the normal oid allocation. Instead we simply scan
     303              :      * pg_replication_origin for the first unused id. That's not particularly
     304              :      * efficient, but this should be a fairly infrequent operation - we can
     305              :      * easily spend a bit more code on this when it turns out it needs to be
     306              :      * faster.
     307              :      *
     308              :      * We handle concurrency by taking an exclusive lock (allowing reads!)
     309              :      * over the table for the duration of the search. Because we use a "dirty
     310              :      * snapshot" we can read rows that other in-progress sessions have
     311              :      * written, even though they would be invisible with normal snapshots. Due
     312              :      * to the exclusive lock there's no danger that new rows can appear while
     313              :      * we're checking.
     314              :      */
     315          407 :     InitDirtySnapshot(SnapshotDirty);
     316              : 
     317          407 :     rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
     318              : 
     319              :     /*
     320              :      * We want to be able to access pg_replication_origin without setting up a
     321              :      * snapshot.  To make that safe, it needs to not have a TOAST table, since
     322              :      * TOASTed data cannot be fetched without a snapshot.  As of this writing,
     323              :      * its only varlena column is roname, which we limit to 512 bytes to avoid
     324              :      * needing out-of-line storage.  If you add a TOAST table to this catalog,
     325              :      * be sure to set up a snapshot everywhere it might be needed.  For more
     326              :      * information, see https://postgr.es/m/ZvMSUPOqUU-VNADN%40nathan.
     327              :      */
     328              :     Assert(!OidIsValid(rel->rd_rel->reltoastrelid));
     329              : 
     330          718 :     for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
     331              :     {
     332              :         bool        nulls[Natts_pg_replication_origin];
     333              :         Datum       values[Natts_pg_replication_origin];
     334              :         bool        collides;
     335              : 
     336          718 :         CHECK_FOR_INTERRUPTS();
     337              : 
     338          718 :         ScanKeyInit(&key,
     339              :                     Anum_pg_replication_origin_roident,
     340              :                     BTEqualStrategyNumber, F_OIDEQ,
     341              :                     ObjectIdGetDatum(roident));
     342              : 
     343          718 :         scan = systable_beginscan(rel, ReplicationOriginIdentIndex,
     344              :                                   true /* indexOK */ ,
     345              :                                   &SnapshotDirty,
     346              :                                   1, &key);
     347              : 
     348          718 :         collides = HeapTupleIsValid(systable_getnext(scan));
     349              : 
     350          718 :         systable_endscan(scan);
     351              : 
     352          718 :         if (!collides)
     353              :         {
     354              :             /*
     355              :              * Ok, found an unused roident, insert the new row and do a CCI,
     356              :              * so our callers can look it up if they want to.
     357              :              */
     358          407 :             memset(&nulls, 0, sizeof(nulls));
     359              : 
     360          407 :             values[Anum_pg_replication_origin_roident - 1] = ObjectIdGetDatum(roident);
     361          407 :             values[Anum_pg_replication_origin_roname - 1] = roname_d;
     362              : 
     363          407 :             tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
     364          407 :             CatalogTupleInsert(rel, tuple);
     365          406 :             CommandCounterIncrement();
     366          406 :             break;
     367              :         }
     368              :     }
     369              : 
     370              :     /* now release lock again,  */
     371          406 :     table_close(rel, ExclusiveLock);
     372              : 
     373          406 :     if (tuple == NULL)
     374            0 :         ereport(ERROR,
     375              :                 (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
     376              :                  errmsg("could not find free replication origin ID")));
     377              : 
     378          406 :     heap_freetuple(tuple);
     379          406 :     return roident;
     380              : }
     381              : 
     382              : /*
     383              :  * Helper function to drop a replication origin.
     384              :  */
     385              : static void
     386          353 : replorigin_state_clear(ReplOriginId roident, bool nowait)
     387              : {
     388              :     int         i;
     389              : 
     390              :     /*
     391              :      * Clean up the slot state info, if there is any matching slot.
     392              :      */
     393          353 : restart:
     394          353 :     LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
     395              : 
     396         1303 :     for (i = 0; i < max_active_replication_origins; i++)
     397              :     {
     398         1235 :         ReplicationState *state = &replication_states[i];
     399              : 
     400         1235 :         if (state->roident == roident)
     401              :         {
     402              :             /* found our slot, is it busy? */
     403          285 :             if (state->refcount > 0)
     404              :             {
     405              :                 ConditionVariable *cv;
     406              : 
     407            0 :                 if (nowait)
     408            0 :                     ereport(ERROR,
     409              :                             (errcode(ERRCODE_OBJECT_IN_USE),
     410              :                              (state->acquired_by != 0)
     411              :                              ? errmsg("could not drop replication origin with ID %d, in use by PID %d",
     412              :                                       state->roident,
     413              :                                       state->acquired_by)
     414              :                              : errmsg("could not drop replication origin with ID %d, in use by another process",
     415              :                                       state->roident)));
     416              : 
     417              :                 /*
     418              :                  * We must wait and then retry.  Since we don't know which CV
     419              :                  * to wait on until here, we can't readily use
     420              :                  * ConditionVariablePrepareToSleep (calling it here would be
     421              :                  * wrong, since we could miss the signal if we did so); just
     422              :                  * use ConditionVariableSleep directly.
     423              :                  */
     424            0 :                 cv = &state->origin_cv;
     425              : 
     426            0 :                 LWLockRelease(ReplicationOriginLock);
     427              : 
     428            0 :                 ConditionVariableSleep(cv, WAIT_EVENT_REPLICATION_ORIGIN_DROP);
     429            0 :                 goto restart;
     430              :             }
     431              : 
     432              :             /* first make a WAL log entry */
     433              :             {
     434              :                 xl_replorigin_drop xlrec;
     435              : 
     436          285 :                 xlrec.node_id = roident;
     437          285 :                 XLogBeginInsert();
     438          285 :                 XLogRegisterData(&xlrec, sizeof(xlrec));
     439          285 :                 XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_DROP);
     440              :             }
     441              : 
     442              :             /* then clear the in-memory slot */
     443          285 :             state->roident = InvalidReplOriginId;
     444          285 :             state->remote_lsn = InvalidXLogRecPtr;
     445          285 :             state->local_lsn = InvalidXLogRecPtr;
     446          285 :             break;
     447              :         }
     448              :     }
     449          353 :     LWLockRelease(ReplicationOriginLock);
     450          353 :     ConditionVariableCancelSleep();
     451          353 : }
     452              : 
     453              : /*
     454              :  * Drop replication origin (by name).
     455              :  *
     456              :  * Needs to be called in a transaction.
     457              :  */
     458              : void
     459          552 : replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
     460              : {
     461              :     ReplOriginId roident;
     462              :     Relation    rel;
     463              :     HeapTuple   tuple;
     464              : 
     465              :     Assert(IsTransactionState());
     466              : 
     467          552 :     rel = table_open(ReplicationOriginRelationId, RowExclusiveLock);
     468              : 
     469          552 :     roident = replorigin_by_name(name, missing_ok);
     470              : 
     471              :     /* Lock the origin to prevent concurrent drops. */
     472          551 :     LockSharedObject(ReplicationOriginRelationId, roident, 0,
     473              :                      AccessExclusiveLock);
     474              : 
     475          551 :     tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident));
     476          551 :     if (!HeapTupleIsValid(tuple))
     477              :     {
     478          198 :         if (!missing_ok)
     479            0 :             elog(ERROR, "cache lookup failed for replication origin with ID %d",
     480              :                  roident);
     481              : 
     482              :         /*
     483              :          * We don't need to retain the locks if the origin is already dropped.
     484              :          */
     485          198 :         UnlockSharedObject(ReplicationOriginRelationId, roident, 0,
     486              :                            AccessExclusiveLock);
     487          198 :         table_close(rel, RowExclusiveLock);
     488          198 :         return;
     489              :     }
     490              : 
     491          353 :     replorigin_state_clear(roident, nowait);
     492              : 
     493              :     /*
     494              :      * Now, we can delete the catalog entry.
     495              :      */
     496          353 :     CatalogTupleDelete(rel, &tuple->t_self);
     497          353 :     ReleaseSysCache(tuple);
     498              : 
     499          353 :     CommandCounterIncrement();
     500              : 
     501              :     /* We keep the lock on pg_replication_origin until commit */
     502          353 :     table_close(rel, NoLock);
     503              : }
     504              : 
     505              : /*
     506              :  * Lookup replication origin via its oid and return the name.
     507              :  *
     508              :  * The external name is palloc'd in the calling context.
     509              :  *
     510              :  * Returns true if the origin is known, false otherwise.
     511              :  */
     512              : bool
     513           27 : replorigin_by_oid(ReplOriginId roident, bool missing_ok, char **roname)
     514              : {
     515              :     HeapTuple   tuple;
     516              :     Form_pg_replication_origin ric;
     517              : 
     518              :     Assert(OidIsValid((Oid) roident));
     519              :     Assert(roident != InvalidReplOriginId);
     520              :     Assert(roident != DoNotReplicateId);
     521              : 
     522           27 :     tuple = SearchSysCache1(REPLORIGIDENT,
     523              :                             ObjectIdGetDatum((Oid) roident));
     524              : 
     525           27 :     if (HeapTupleIsValid(tuple))
     526              :     {
     527           24 :         ric = (Form_pg_replication_origin) GETSTRUCT(tuple);
     528           24 :         *roname = text_to_cstring(&ric->roname);
     529           24 :         ReleaseSysCache(tuple);
     530              : 
     531           24 :         return true;
     532              :     }
     533              :     else
     534              :     {
     535            3 :         *roname = NULL;
     536              : 
     537            3 :         if (!missing_ok)
     538            0 :             ereport(ERROR,
     539              :                     (errcode(ERRCODE_UNDEFINED_OBJECT),
     540              :                      errmsg("replication origin with ID %d does not exist",
     541              :                             roident)));
     542              : 
     543            3 :         return false;
     544              :     }
     545              : }
     546              : 
     547              : 
     548              : /* ---------------------------------------------------------------------------
     549              :  * Functions for handling replication progress.
     550              :  * ---------------------------------------------------------------------------
     551              :  */
     552              : 
     553              : static void
     554         1234 : ReplicationOriginShmemRequest(void *arg)
     555              : {
     556         1234 :     Size        size = 0;
     557              : 
     558         1234 :     if (max_active_replication_origins == 0)
     559            1 :         return;
     560              : 
     561         1233 :     size = add_size(size, offsetof(ReplicationStateCtl, states));
     562         1233 :     size = add_size(size,
     563              :                     mul_size(max_active_replication_origins, sizeof(ReplicationState)));
     564         1233 :     ShmemRequestStruct(.name = "ReplicationOriginState",
     565              :                        .size = size,
     566              :                        .ptr = (void **) &replication_states_ctl,
     567              :         );
     568              : }
     569              : 
     570              : static void
     571         1231 : ReplicationOriginShmemInit(void *arg)
     572              : {
     573         1231 :     if (max_active_replication_origins == 0)
     574            1 :         return;
     575              : 
     576         1230 :     replication_states = replication_states_ctl->states;
     577              : 
     578         1230 :     replication_states_ctl->tranche_id = LWTRANCHE_REPLICATION_ORIGIN_STATE;
     579              : 
     580        13521 :     for (int i = 0; i < max_active_replication_origins; i++)
     581              :     {
     582        12291 :         LWLockInitialize(&replication_states[i].lock,
     583        12291 :                          replication_states_ctl->tranche_id);
     584        12291 :         ConditionVariableInit(&replication_states[i].origin_cv);
     585              :     }
     586              : }
     587              : 
     588              : static void
     589            0 : ReplicationOriginShmemAttach(void *arg)
     590              : {
     591            0 :     if (max_active_replication_origins == 0)
     592            0 :         return;
     593              : 
     594            0 :     replication_states = replication_states_ctl->states;
     595              : }
     596              : 
     597              : /* ---------------------------------------------------------------------------
     598              :  * Perform a checkpoint of each replication origin's progress with respect to
     599              :  * the replayed remote_lsn. Make sure that all transactions we refer to in the
     600              :  * checkpoint (local_lsn) are actually on-disk. This might not yet be the case
     601              :  * if the transactions were originally committed asynchronously.
     602              :  *
     603              :  * We store checkpoints in the following format:
     604              :  * +-------+------------------------+------------------+-----+--------+
     605              :  * | MAGIC | ReplicationStateOnDisk | struct Replic... | ... | CRC32C | EOF
     606              :  * +-------+------------------------+------------------+-----+--------+
     607              :  *
     608              :  * So its just the magic, followed by the statically sized
     609              :  * ReplicationStateOnDisk structs. Note that the maximum number of
     610              :  * ReplicationState is determined by max_active_replication_origins.
     611              :  * ---------------------------------------------------------------------------
     612              :  */
     613              : void
     614         1931 : CheckPointReplicationOrigin(void)
     615              : {
     616         1931 :     const char *tmppath = PG_REPLORIGIN_CHECKPOINT_TMPFILE;
     617         1931 :     const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
     618              :     int         tmpfd;
     619              :     int         i;
     620         1931 :     uint32      magic = REPLICATION_STATE_MAGIC;
     621              :     pg_crc32c   crc;
     622              : 
     623         1931 :     if (max_active_replication_origins == 0)
     624            1 :         return;
     625              : 
     626         1930 :     INIT_CRC32C(crc);
     627              : 
     628              :     /* make sure no old temp file is remaining */
     629         1930 :     if (unlink(tmppath) < 0 && errno != ENOENT)
     630            0 :         ereport(PANIC,
     631              :                 (errcode_for_file_access(),
     632              :                  errmsg("could not remove file \"%s\": %m",
     633              :                         tmppath)));
     634              : 
     635              :     /*
     636              :      * no other backend can perform this at the same time; only one checkpoint
     637              :      * can happen at a time.
     638              :      */
     639         1930 :     tmpfd = OpenTransientFile(tmppath,
     640              :                               O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
     641         1930 :     if (tmpfd < 0)
     642            0 :         ereport(PANIC,
     643              :                 (errcode_for_file_access(),
     644              :                  errmsg("could not create file \"%s\": %m",
     645              :                         tmppath)));
     646              : 
     647              :     /* write magic */
     648         1930 :     errno = 0;
     649         1930 :     if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
     650              :     {
     651              :         /* if write didn't set errno, assume problem is no disk space */
     652            0 :         if (errno == 0)
     653            0 :             errno = ENOSPC;
     654            0 :         ereport(PANIC,
     655              :                 (errcode_for_file_access(),
     656              :                  errmsg("could not write to file \"%s\": %m",
     657              :                         tmppath)));
     658              :     }
     659         1930 :     COMP_CRC32C(crc, &magic, sizeof(magic));
     660              : 
     661              :     /* prevent concurrent creations/drops */
     662         1930 :     LWLockAcquire(ReplicationOriginLock, LW_SHARED);
     663              : 
     664              :     /* write actual data */
     665        21230 :     for (i = 0; i < max_active_replication_origins; i++)
     666              :     {
     667              :         ReplicationStateOnDisk disk_state;
     668        19300 :         ReplicationState *curstate = &replication_states[i];
     669              :         XLogRecPtr  local_lsn;
     670              : 
     671        19300 :         if (curstate->roident == InvalidReplOriginId)
     672        19246 :             continue;
     673              : 
     674              :         /* zero, to avoid uninitialized padding bytes */
     675           54 :         memset(&disk_state, 0, sizeof(disk_state));
     676              : 
     677           54 :         LWLockAcquire(&curstate->lock, LW_SHARED);
     678              : 
     679           54 :         disk_state.roident = curstate->roident;
     680              : 
     681           54 :         disk_state.remote_lsn = curstate->remote_lsn;
     682           54 :         local_lsn = curstate->local_lsn;
     683              : 
     684           54 :         LWLockRelease(&curstate->lock);
     685              : 
     686              :         /* make sure we only write out a commit that's persistent */
     687           54 :         XLogFlush(local_lsn);
     688              : 
     689           54 :         errno = 0;
     690           54 :         if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
     691              :             sizeof(disk_state))
     692              :         {
     693              :             /* if write didn't set errno, assume problem is no disk space */
     694            0 :             if (errno == 0)
     695            0 :                 errno = ENOSPC;
     696            0 :             ereport(PANIC,
     697              :                     (errcode_for_file_access(),
     698              :                      errmsg("could not write to file \"%s\": %m",
     699              :                             tmppath)));
     700              :         }
     701              : 
     702           54 :         COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
     703              :     }
     704              : 
     705         1930 :     LWLockRelease(ReplicationOriginLock);
     706              : 
     707              :     /* write out the CRC */
     708         1930 :     FIN_CRC32C(crc);
     709         1930 :     errno = 0;
     710         1930 :     if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
     711              :     {
     712              :         /* if write didn't set errno, assume problem is no disk space */
     713            0 :         if (errno == 0)
     714            0 :             errno = ENOSPC;
     715            0 :         ereport(PANIC,
     716              :                 (errcode_for_file_access(),
     717              :                  errmsg("could not write to file \"%s\": %m",
     718              :                         tmppath)));
     719              :     }
     720              : 
     721         1930 :     if (CloseTransientFile(tmpfd) != 0)
     722            0 :         ereport(PANIC,
     723              :                 (errcode_for_file_access(),
     724              :                  errmsg("could not close file \"%s\": %m",
     725              :                         tmppath)));
     726              : 
     727              :     /* fsync, rename to permanent file, fsync file and directory */
     728         1930 :     durable_rename(tmppath, path, PANIC);
     729              : }
     730              : 
     731              : /*
     732              :  * Recover replication replay status from checkpoint data saved earlier by
     733              :  * CheckPointReplicationOrigin.
     734              :  *
     735              :  * This only needs to be called at startup and *not* during every checkpoint
     736              :  * read during recovery (e.g. in HS or PITR from a base backup) afterwards. All
     737              :  * state thereafter can be recovered by looking at commit records.
     738              :  */
     739              : void
     740         1070 : StartupReplicationOrigin(void)
     741              : {
     742         1070 :     const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
     743              :     int         fd;
     744              :     int         readBytes;
     745         1070 :     uint32      magic = REPLICATION_STATE_MAGIC;
     746         1070 :     int         last_state = 0;
     747              :     pg_crc32c   file_crc;
     748              :     pg_crc32c   crc;
     749              : 
     750              :     /* don't want to overwrite already existing state */
     751              : #ifdef USE_ASSERT_CHECKING
     752              :     static bool already_started = false;
     753              : 
     754              :     Assert(!already_started);
     755              :     already_started = true;
     756              : #endif
     757              : 
     758         1070 :     if (max_active_replication_origins == 0)
     759           58 :         return;
     760              : 
     761         1069 :     INIT_CRC32C(crc);
     762              : 
     763         1069 :     elog(DEBUG2, "starting up replication origin progress state");
     764              : 
     765         1069 :     fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
     766              : 
     767              :     /*
     768              :      * might have had max_active_replication_origins == 0 last run, or we just
     769              :      * brought up a standby.
     770              :      */
     771         1069 :     if (fd < 0 && errno == ENOENT)
     772           57 :         return;
     773         1012 :     else if (fd < 0)
     774            0 :         ereport(PANIC,
     775              :                 (errcode_for_file_access(),
     776              :                  errmsg("could not open file \"%s\": %m",
     777              :                         path)));
     778              : 
     779              :     /* verify magic, that is written even if nothing was active */
     780         1012 :     readBytes = read(fd, &magic, sizeof(magic));
     781         1012 :     if (readBytes != sizeof(magic))
     782              :     {
     783            0 :         if (readBytes < 0)
     784            0 :             ereport(PANIC,
     785              :                     (errcode_for_file_access(),
     786              :                      errmsg("could not read file \"%s\": %m",
     787              :                             path)));
     788              :         else
     789            0 :             ereport(PANIC,
     790              :                     (errcode(ERRCODE_DATA_CORRUPTED),
     791              :                      errmsg("could not read file \"%s\": read %d of %zu",
     792              :                             path, readBytes, sizeof(magic))));
     793              :     }
     794         1012 :     COMP_CRC32C(crc, &magic, sizeof(magic));
     795              : 
     796         1012 :     if (magic != REPLICATION_STATE_MAGIC)
     797            0 :         ereport(PANIC,
     798              :                 (errmsg("replication checkpoint has wrong magic %u instead of %u",
     799              :                         magic, REPLICATION_STATE_MAGIC)));
     800              : 
     801              :     /* we can skip locking here, no other access is possible */
     802              : 
     803              :     /* recover individual states, until there are no more to be found */
     804              :     while (true)
     805           31 :     {
     806              :         ReplicationStateOnDisk disk_state;
     807              : 
     808         1043 :         readBytes = read(fd, &disk_state, sizeof(disk_state));
     809              : 
     810         1043 :         if (readBytes < 0)
     811              :         {
     812            0 :             ereport(PANIC,
     813              :                     (errcode_for_file_access(),
     814              :                      errmsg("could not read file \"%s\": %m",
     815              :                             path)));
     816              :         }
     817              : 
     818              :         /* no further data */
     819         1043 :         if (readBytes == sizeof(crc))
     820              :         {
     821         1012 :             memcpy(&file_crc, &disk_state, sizeof(file_crc));
     822         1012 :             break;
     823              :         }
     824              : 
     825           31 :         if (readBytes != sizeof(disk_state))
     826              :         {
     827            0 :             ereport(PANIC,
     828              :                     (errcode_for_file_access(),
     829              :                      errmsg("could not read file \"%s\": read %d of %zu",
     830              :                             path, readBytes, sizeof(disk_state))));
     831              :         }
     832              : 
     833           31 :         COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
     834              : 
     835           31 :         if (last_state == max_active_replication_origins)
     836            0 :             ereport(PANIC,
     837              :                     (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     838              :                      errmsg("could not find free replication state, increase \"max_active_replication_origins\"")));
     839              : 
     840              :         /* copy data to shared memory */
     841           31 :         replication_states[last_state].roident = disk_state.roident;
     842           31 :         replication_states[last_state].remote_lsn = disk_state.remote_lsn;
     843           31 :         last_state++;
     844              : 
     845           31 :         ereport(LOG,
     846              :                 errmsg("recovered replication state of node %d to %X/%08X",
     847              :                        disk_state.roident,
     848              :                        LSN_FORMAT_ARGS(disk_state.remote_lsn)));
     849              :     }
     850              : 
     851              :     /* now check checksum */
     852         1012 :     FIN_CRC32C(crc);
     853         1012 :     if (file_crc != crc)
     854            0 :         ereport(PANIC,
     855              :                 (errcode(ERRCODE_DATA_CORRUPTED),
     856              :                  errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
     857              :                         crc, file_crc)));
     858              : 
     859         1012 :     if (CloseTransientFile(fd) != 0)
     860            0 :         ereport(PANIC,
     861              :                 (errcode_for_file_access(),
     862              :                  errmsg("could not close file \"%s\": %m",
     863              :                         path)));
     864              : }
     865              : 
     866              : void
     867            4 : replorigin_redo(XLogReaderState *record)
     868              : {
     869            4 :     uint8       info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
     870              : 
     871            4 :     switch (info)
     872              :     {
     873            2 :         case XLOG_REPLORIGIN_SET:
     874              :             {
     875            2 :                 xl_replorigin_set *xlrec =
     876            2 :                     (xl_replorigin_set *) XLogRecGetData(record);
     877              : 
     878            2 :                 replorigin_advance(xlrec->node_id,
     879              :                                    xlrec->remote_lsn, record->EndRecPtr,
     880            2 :                                    xlrec->force /* backward */ ,
     881              :                                    false /* WAL log */ );
     882            2 :                 break;
     883              :             }
     884            2 :         case XLOG_REPLORIGIN_DROP:
     885              :             {
     886              :                 xl_replorigin_drop *xlrec;
     887              :                 int         i;
     888              : 
     889            2 :                 xlrec = (xl_replorigin_drop *) XLogRecGetData(record);
     890              : 
     891            3 :                 for (i = 0; i < max_active_replication_origins; i++)
     892              :                 {
     893            3 :                     ReplicationState *state = &replication_states[i];
     894              : 
     895              :                     /* found our slot */
     896            3 :                     if (state->roident == xlrec->node_id)
     897              :                     {
     898              :                         /* reset entry */
     899            2 :                         state->roident = InvalidReplOriginId;
     900            2 :                         state->remote_lsn = InvalidXLogRecPtr;
     901            2 :                         state->local_lsn = InvalidXLogRecPtr;
     902            2 :                         break;
     903              :                     }
     904              :                 }
     905            2 :                 break;
     906              :             }
     907            0 :         default:
     908            0 :             elog(PANIC, "replorigin_redo: unknown op code %u", info);
     909              :     }
     910            4 : }
     911              : 
     912              : 
     913              : /*
     914              :  * Tell the replication origin progress machinery that a commit from 'node'
     915              :  * that originated at the LSN remote_commit on the remote node was replayed
     916              :  * successfully and that we don't need to do so again. In combination with
     917              :  * setting up replorigin_xact_state {.origin_lsn, .origin_timestamp}
     918              :  * that ensures we won't lose knowledge about that after a crash if the
     919              :  * transaction had a persistent effect (think of asynchronous commits).
     920              :  *
     921              :  * local_commit needs to be a local LSN of the commit so that we can make sure
     922              :  * upon a checkpoint that enough WAL has been persisted to disk.
     923              :  *
     924              :  * Needs to be called with a RowExclusiveLock on pg_replication_origin,
     925              :  * unless running in recovery.
     926              :  */
     927              : void
     928          257 : replorigin_advance(ReplOriginId node,
     929              :                    XLogRecPtr remote_commit, XLogRecPtr local_commit,
     930              :                    bool go_backward, bool wal_log)
     931              : {
     932              :     int         i;
     933          257 :     ReplicationState *replication_state = NULL;
     934          257 :     ReplicationState *free_state = NULL;
     935              : 
     936              :     Assert(node != InvalidReplOriginId);
     937              : 
     938              :     /* we don't track DoNotReplicateId */
     939          257 :     if (node == DoNotReplicateId)
     940            0 :         return;
     941              : 
     942              :     /*
     943              :      * XXX: For the case where this is called by WAL replay, it'd be more
     944              :      * efficient to restore into a backend local hashtable and only dump into
     945              :      * shmem after recovery is finished. Let's wait with implementing that
     946              :      * till it's shown to be a measurable expense
     947              :      */
     948              : 
     949              :     /* Lock exclusively, as we may have to create a new table entry. */
     950          257 :     LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
     951              : 
     952              :     /*
     953              :      * Search for either an existing slot for the origin, or a free one we can
     954              :      * use.
     955              :      */
     956         2371 :     for (i = 0; i < max_active_replication_origins; i++)
     957              :     {
     958         2161 :         ReplicationState *curstate = &replication_states[i];
     959              : 
     960              :         /* remember where to insert if necessary */
     961         2161 :         if (curstate->roident == InvalidReplOriginId &&
     962              :             free_state == NULL)
     963              :         {
     964          211 :             free_state = curstate;
     965          211 :             continue;
     966              :         }
     967              : 
     968              :         /* not our slot */
     969         1950 :         if (curstate->roident != node)
     970              :         {
     971         1903 :             continue;
     972              :         }
     973              : 
     974              :         /* ok, found slot */
     975           47 :         replication_state = curstate;
     976              : 
     977           47 :         LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE);
     978              : 
     979              :         /* Make sure it's not used by somebody else */
     980           47 :         if (replication_state->refcount > 0)
     981              :         {
     982            0 :             ereport(ERROR,
     983              :                     (errcode(ERRCODE_OBJECT_IN_USE),
     984              :                      (replication_state->acquired_by != 0)
     985              :                      ? errmsg("replication origin with ID %d is already active for PID %d",
     986              :                               replication_state->roident,
     987              :                               replication_state->acquired_by)
     988              :                      : errmsg("replication origin with ID %d is already active in another process",
     989              :                               replication_state->roident)));
     990              :         }
     991              : 
     992           47 :         break;
     993              :     }
     994              : 
     995          257 :     if (replication_state == NULL && free_state == NULL)
     996            0 :         ereport(ERROR,
     997              :                 (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
     998              :                  errmsg("could not find free replication state slot for replication origin with ID %d",
     999              :                         node),
    1000              :                  errhint("Increase \"max_active_replication_origins\" and try again.")));
    1001              : 
    1002          257 :     if (replication_state == NULL)
    1003              :     {
    1004              :         /* initialize new slot */
    1005          210 :         LWLockAcquire(&free_state->lock, LW_EXCLUSIVE);
    1006          210 :         replication_state = free_state;
    1007              :         Assert(!XLogRecPtrIsValid(replication_state->remote_lsn));
    1008              :         Assert(!XLogRecPtrIsValid(replication_state->local_lsn));
    1009          210 :         replication_state->roident = node;
    1010              :     }
    1011              : 
    1012              :     Assert(replication_state->roident != InvalidReplOriginId);
    1013              : 
    1014              :     /*
    1015              :      * If somebody "forcefully" sets this slot, WAL log it, so it's durable
    1016              :      * and the standby gets the message. Primarily this will be called during
    1017              :      * WAL replay (of commit records) where no WAL logging is necessary.
    1018              :      */
    1019          257 :     if (wal_log)
    1020              :     {
    1021              :         xl_replorigin_set xlrec;
    1022              : 
    1023          216 :         xlrec.remote_lsn = remote_commit;
    1024          216 :         xlrec.node_id = node;
    1025          216 :         xlrec.force = go_backward;
    1026              : 
    1027          216 :         XLogBeginInsert();
    1028          216 :         XLogRegisterData(&xlrec, sizeof(xlrec));
    1029              : 
    1030          216 :         XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_SET);
    1031              :     }
    1032              : 
    1033              :     /*
    1034              :      * Due to - harmless - race conditions during a checkpoint we could see
    1035              :      * values here that are older than the ones we already have in memory. We
    1036              :      * could also see older values for prepared transactions when the prepare
    1037              :      * is sent at a later point of time along with commit prepared and there
    1038              :      * are other transactions commits between prepare and commit prepared. See
    1039              :      * ReorderBufferFinishPrepared. Don't overwrite those.
    1040              :      */
    1041          257 :     if (go_backward || replication_state->remote_lsn < remote_commit)
    1042          250 :         replication_state->remote_lsn = remote_commit;
    1043          257 :     if (XLogRecPtrIsValid(local_commit) &&
    1044           38 :         (go_backward || replication_state->local_lsn < local_commit))
    1045           40 :         replication_state->local_lsn = local_commit;
    1046          257 :     LWLockRelease(&replication_state->lock);
    1047              : 
    1048              :     /*
    1049              :      * Release *after* changing the LSNs, slot isn't acquired and thus could
    1050              :      * otherwise be dropped anytime.
    1051              :      */
    1052          257 :     LWLockRelease(ReplicationOriginLock);
    1053              : }
    1054              : 
    1055              : 
    1056              : XLogRecPtr
    1057            9 : replorigin_get_progress(ReplOriginId node, bool flush)
    1058              : {
    1059              :     int         i;
    1060            9 :     XLogRecPtr  local_lsn = InvalidXLogRecPtr;
    1061            9 :     XLogRecPtr  remote_lsn = InvalidXLogRecPtr;
    1062              : 
    1063              :     /* prevent slots from being concurrently dropped */
    1064            9 :     LWLockAcquire(ReplicationOriginLock, LW_SHARED);
    1065              : 
    1066           49 :     for (i = 0; i < max_active_replication_origins; i++)
    1067              :     {
    1068              :         ReplicationState *state;
    1069              : 
    1070           45 :         state = &replication_states[i];
    1071              : 
    1072           45 :         if (state->roident == node)
    1073              :         {
    1074            5 :             LWLockAcquire(&state->lock, LW_SHARED);
    1075              : 
    1076            5 :             remote_lsn = state->remote_lsn;
    1077            5 :             local_lsn = state->local_lsn;
    1078              : 
    1079            5 :             LWLockRelease(&state->lock);
    1080              : 
    1081            5 :             break;
    1082              :         }
    1083              :     }
    1084              : 
    1085            9 :     LWLockRelease(ReplicationOriginLock);
    1086              : 
    1087            9 :     if (flush && XLogRecPtrIsValid(local_lsn))
    1088            1 :         XLogFlush(local_lsn);
    1089              : 
    1090            9 :     return remote_lsn;
    1091              : }
    1092              : 
    1093              : /* Helper function to reset the session replication origin */
    1094              : static void
    1095          546 : replorigin_session_reset_internal(void)
    1096              : {
    1097              :     ConditionVariable *cv;
    1098              : 
    1099              :     Assert(session_replication_state != NULL);
    1100              : 
    1101          546 :     LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
    1102              : 
    1103              :     /* The origin must be held by at least one process at this point. */
    1104              :     Assert(session_replication_state->refcount > 0);
    1105              : 
    1106              :     /*
    1107              :      * Reset the PID only if the current session is the first to set up this
    1108              :      * origin. This avoids clearing the first process's PID when any other
    1109              :      * session releases the origin.
    1110              :      */
    1111          546 :     if (session_replication_state->acquired_by == MyProcPid)
    1112          531 :         session_replication_state->acquired_by = 0;
    1113              : 
    1114          546 :     session_replication_state->refcount--;
    1115              : 
    1116          546 :     cv = &session_replication_state->origin_cv;
    1117          546 :     session_replication_state = NULL;
    1118              : 
    1119          546 :     LWLockRelease(ReplicationOriginLock);
    1120              : 
    1121          546 :     ConditionVariableBroadcast(cv);
    1122          546 : }
    1123              : 
    1124              : /*
    1125              :  * Tear down a (possibly) configured session replication origin during process
    1126              :  * exit.
    1127              :  */
    1128              : static void
    1129          543 : ReplicationOriginExitCleanup(int code, Datum arg)
    1130              : {
    1131          543 :     if (session_replication_state == NULL)
    1132          204 :         return;
    1133              : 
    1134          339 :     replorigin_session_reset_internal();
    1135              : }
    1136              : 
    1137              : /*
    1138              :  * Setup a replication origin in the shared memory struct if it doesn't
    1139              :  * already exist and cache access to the specific ReplicationSlot so the
    1140              :  * array doesn't have to be searched when calling
    1141              :  * replorigin_session_advance().
    1142              :  *
    1143              :  * Normally only one such cached origin can exist per process so the cached
    1144              :  * value can only be set again after the previous value is torn down with
    1145              :  * replorigin_session_reset(). For this normal case pass acquired_by = 0
    1146              :  * (meaning the slot is not allowed to be already acquired by another process).
    1147              :  *
    1148              :  * However, sometimes multiple processes can safely re-use the same origin slot
    1149              :  * (for example, multiple parallel apply processes can safely use the same
    1150              :  * origin, provided they maintain commit order by allowing only one process to
    1151              :  * commit at a time). For this case the first process must pass acquired_by =
    1152              :  * 0, and then the other processes sharing that same origin can pass
    1153              :  * acquired_by = PID of the first process.
    1154              :  */
    1155              : void
    1156          549 : replorigin_session_setup(ReplOriginId node, int acquired_by)
    1157              : {
    1158              :     static bool registered_cleanup;
    1159              :     int         i;
    1160          549 :     int         free_slot = -1;
    1161              : 
    1162          549 :     if (!registered_cleanup)
    1163              :     {
    1164          543 :         on_shmem_exit(ReplicationOriginExitCleanup, 0);
    1165          543 :         registered_cleanup = true;
    1166              :     }
    1167              : 
    1168              :     Assert(max_active_replication_origins > 0);
    1169              : 
    1170          549 :     if (session_replication_state != NULL)
    1171            1 :         ereport(ERROR,
    1172              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1173              :                  errmsg("cannot setup replication origin when one is already setup")));
    1174              : 
    1175              :     /* Lock exclusively, as we may have to create a new table entry. */
    1176          548 :     LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
    1177              : 
    1178              :     /*
    1179              :      * Search for either an existing slot for the origin, or a free one we can
    1180              :      * use.
    1181              :      */
    1182         2110 :     for (i = 0; i < max_active_replication_origins; i++)
    1183              :     {
    1184         1984 :         ReplicationState *curstate = &replication_states[i];
    1185              : 
    1186              :         /* remember where to insert if necessary */
    1187         1984 :         if (curstate->roident == InvalidReplOriginId &&
    1188              :             free_slot == -1)
    1189              :         {
    1190          128 :             free_slot = i;
    1191          128 :             continue;
    1192              :         }
    1193              : 
    1194              :         /* not our slot */
    1195         1856 :         if (curstate->roident != node)
    1196         1434 :             continue;
    1197              : 
    1198          422 :         if (acquired_by == 0)
    1199              :         {
    1200              :             /* With acquired_by == 0, we need the origin to be free */
    1201          407 :             if (curstate->acquired_by != 0)
    1202              :             {
    1203            1 :                 ereport(ERROR,
    1204              :                         (errcode(ERRCODE_OBJECT_IN_USE),
    1205              :                          errmsg("replication origin with ID %d is already active for PID %d",
    1206              :                                 curstate->roident, curstate->acquired_by)));
    1207              :             }
    1208          406 :             else if (curstate->refcount > 0)
    1209              :             {
    1210              :                 /*
    1211              :                  * The origin is in use, but PID is not recorded. This can
    1212              :                  * happen if the process that originally acquired the origin
    1213              :                  * exited without releasing it. To ensure correctness, other
    1214              :                  * processes cannot acquire the origin until all processes
    1215              :                  * currently using it have released it.
    1216              :                  */
    1217            0 :                 ereport(ERROR,
    1218              :                         (errcode(ERRCODE_OBJECT_IN_USE),
    1219              :                          errmsg("replication origin with ID %d is already active in another process",
    1220              :                                 curstate->roident)));
    1221              :             }
    1222              :         }
    1223              :         else
    1224              :         {
    1225              :             /*
    1226              :              * With acquired_by != 0, we need the origin to be active by the
    1227              :              * given PID
    1228              :              */
    1229           15 :             if (curstate->acquired_by != acquired_by)
    1230            0 :                 ereport(ERROR,
    1231              :                         (errcode(ERRCODE_OBJECT_IN_USE),
    1232              :                          errmsg("replication origin with ID %d is not active for PID %d",
    1233              :                                 curstate->roident, acquired_by)));
    1234              : 
    1235              :             /*
    1236              :              * Here, it is okay to have refcount > 0 as more than one process
    1237              :              * can safely re-use the origin.
    1238              :              */
    1239              :         }
    1240              : 
    1241              :         /* ok, found slot */
    1242          421 :         session_replication_state = curstate;
    1243          421 :         break;
    1244              :     }
    1245              : 
    1246          547 :     if (session_replication_state == NULL)
    1247              :     {
    1248          126 :         if (acquired_by != 0)
    1249            1 :             ereport(ERROR,
    1250              :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1251              :                      errmsg("cannot use PID %d for inactive replication origin with ID %d",
    1252              :                             acquired_by, node)));
    1253              : 
    1254              :         /* initialize new slot */
    1255          125 :         if (free_slot == -1)
    1256            0 :             ereport(ERROR,
    1257              :                     (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
    1258              :                      errmsg("could not find free replication state slot for replication origin with ID %d",
    1259              :                             node),
    1260              :                      errhint("Increase \"max_active_replication_origins\" and try again.")));
    1261              : 
    1262          125 :         session_replication_state = &replication_states[free_slot];
    1263              :         Assert(!XLogRecPtrIsValid(session_replication_state->remote_lsn));
    1264              :         Assert(!XLogRecPtrIsValid(session_replication_state->local_lsn));
    1265          125 :         session_replication_state->roident = node;
    1266              :     }
    1267              : 
    1268              : 
    1269              :     Assert(session_replication_state->roident != InvalidReplOriginId);
    1270              : 
    1271          546 :     if (acquired_by == 0)
    1272              :     {
    1273          531 :         session_replication_state->acquired_by = MyProcPid;
    1274              :         Assert(session_replication_state->refcount == 0);
    1275              :     }
    1276              :     else
    1277              :     {
    1278              :         /*
    1279              :          * Sanity check: the origin must already be acquired by the process
    1280              :          * passed as input, and at least one process must be using it.
    1281              :          */
    1282              :         Assert(session_replication_state->acquired_by == acquired_by);
    1283              :         Assert(session_replication_state->refcount > 0);
    1284              :     }
    1285              : 
    1286          546 :     session_replication_state->refcount++;
    1287              : 
    1288          546 :     LWLockRelease(ReplicationOriginLock);
    1289              : 
    1290              :     /* probably this one is pointless */
    1291          546 :     ConditionVariableBroadcast(&session_replication_state->origin_cv);
    1292          546 : }
    1293              : 
    1294              : /*
    1295              :  * Reset replay state previously setup in this session.
    1296              :  *
    1297              :  * This function may only be called if an origin was setup with
    1298              :  * replorigin_session_setup().
    1299              :  */
    1300              : void
    1301          209 : replorigin_session_reset(void)
    1302              : {
    1303              :     Assert(max_active_replication_origins != 0);
    1304              : 
    1305          209 :     if (session_replication_state == NULL)
    1306            1 :         ereport(ERROR,
    1307              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1308              :                  errmsg("no replication origin is configured")));
    1309              : 
    1310              :     /*
    1311              :      * Restrict explicit resetting of the replication origin if it was first
    1312              :      * acquired by this process and others are still using it. While the
    1313              :      * system handles this safely (as happens if the first session exits
    1314              :      * without calling reset), it is best to avoid doing so.
    1315              :      */
    1316          208 :     if (session_replication_state->acquired_by == MyProcPid &&
    1317          206 :         session_replication_state->refcount > 1)
    1318            1 :         ereport(ERROR,
    1319              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1320              :                  errmsg("cannot reset replication origin with ID %d because it is still in use by other processes",
    1321              :                         session_replication_state->roident),
    1322              :                  errdetail("This session is the first process for this replication origin, and other processes are currently sharing it."),
    1323              :                  errhint("Reset the replication origin in all other processes before retrying.")));
    1324              : 
    1325          207 :     replorigin_session_reset_internal();
    1326          207 : }
    1327              : 
    1328              : /*
    1329              :  * Do the same work replorigin_advance() does, just on the session's
    1330              :  * configured origin.
    1331              :  *
    1332              :  * This is noticeably cheaper than using replorigin_advance().
    1333              :  */
    1334              : void
    1335         1168 : replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit)
    1336              : {
    1337              :     Assert(session_replication_state != NULL);
    1338              :     Assert(session_replication_state->roident != InvalidReplOriginId);
    1339              : 
    1340         1168 :     LWLockAcquire(&session_replication_state->lock, LW_EXCLUSIVE);
    1341         1168 :     if (session_replication_state->local_lsn < local_commit)
    1342         1168 :         session_replication_state->local_lsn = local_commit;
    1343         1168 :     if (session_replication_state->remote_lsn < remote_commit)
    1344          544 :         session_replication_state->remote_lsn = remote_commit;
    1345         1168 :     LWLockRelease(&session_replication_state->lock);
    1346         1168 : }
    1347              : 
    1348              : /*
    1349              :  * Ask the machinery about the point up to which we successfully replayed
    1350              :  * changes from an already setup replication origin.
    1351              :  */
    1352              : XLogRecPtr
    1353          313 : replorigin_session_get_progress(bool flush)
    1354              : {
    1355              :     XLogRecPtr  remote_lsn;
    1356              :     XLogRecPtr  local_lsn;
    1357              : 
    1358              :     Assert(session_replication_state != NULL);
    1359              : 
    1360          313 :     LWLockAcquire(&session_replication_state->lock, LW_SHARED);
    1361          313 :     remote_lsn = session_replication_state->remote_lsn;
    1362          313 :     local_lsn = session_replication_state->local_lsn;
    1363          313 :     LWLockRelease(&session_replication_state->lock);
    1364              : 
    1365          313 :     if (flush && XLogRecPtrIsValid(local_lsn))
    1366            1 :         XLogFlush(local_lsn);
    1367              : 
    1368          313 :     return remote_lsn;
    1369              : }
    1370              : 
    1371              : /*
    1372              :  * Clear the per-transaction replication origin state.
    1373              :  *
    1374              :  * replorigin_session_origin is also cleared if clear_origin is set.
    1375              :  */
    1376              : void
    1377          864 : replorigin_xact_clear(bool clear_origin)
    1378              : {
    1379          864 :     replorigin_xact_state.origin_lsn = InvalidXLogRecPtr;
    1380          864 :     replorigin_xact_state.origin_timestamp = 0;
    1381          864 :     if (clear_origin)
    1382          864 :         replorigin_xact_state.origin = InvalidReplOriginId;
    1383          864 : }
    1384              : 
    1385              : 
    1386              : /* ---------------------------------------------------------------------------
    1387              :  * SQL functions for working with replication origin.
    1388              :  *
    1389              :  * These mostly should be fairly short wrappers around more generic functions.
    1390              :  * ---------------------------------------------------------------------------
    1391              :  */
    1392              : 
    1393              : /*
    1394              :  * Create replication origin for the passed in name, and return the assigned
    1395              :  * oid.
    1396              :  */
    1397              : Datum
    1398           15 : pg_replication_origin_create(PG_FUNCTION_ARGS)
    1399              : {
    1400              :     char       *name;
    1401              :     ReplOriginId roident;
    1402              : 
    1403           15 :     replorigin_check_prerequisites(false, false);
    1404              : 
    1405           15 :     name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
    1406              : 
    1407              :     /*
    1408              :      * Replication origins "any and "none" are reserved for system options.
    1409              :      * The origins "pg_xxx" are reserved for internal use.
    1410              :      */
    1411           15 :     if (IsReservedName(name) || IsReservedOriginName(name))
    1412            3 :         ereport(ERROR,
    1413              :                 (errcode(ERRCODE_RESERVED_NAME),
    1414              :                  errmsg("replication origin name \"%s\" is reserved",
    1415              :                         name),
    1416              :                  errdetail("Origin names \"%s\", \"%s\", and names starting with \"pg_\" are reserved.",
    1417              :                            LOGICALREP_ORIGIN_ANY, LOGICALREP_ORIGIN_NONE)));
    1418              : 
    1419              :     /*
    1420              :      * If built with appropriate switch, whine when regression-testing
    1421              :      * conventions for replication origin names are violated.
    1422              :      */
    1423              : #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
    1424              :     if (strncmp(name, "regress_", 8) != 0)
    1425              :         elog(WARNING, "replication origins created by regression test cases should have names starting with \"regress_\"");
    1426              : #endif
    1427              : 
    1428           12 :     roident = replorigin_create(name);
    1429              : 
    1430            7 :     pfree(name);
    1431              : 
    1432            7 :     PG_RETURN_OID(roident);
    1433              : }
    1434              : 
    1435              : /*
    1436              :  * Drop replication origin.
    1437              :  */
    1438              : Datum
    1439            9 : pg_replication_origin_drop(PG_FUNCTION_ARGS)
    1440              : {
    1441              :     char       *name;
    1442              : 
    1443            9 :     replorigin_check_prerequisites(false, false);
    1444              : 
    1445            9 :     name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
    1446              : 
    1447            9 :     replorigin_drop_by_name(name, false, true);
    1448              : 
    1449            8 :     pfree(name);
    1450              : 
    1451            8 :     PG_RETURN_VOID();
    1452              : }
    1453              : 
    1454              : /*
    1455              :  * Return oid of a replication origin.
    1456              :  */
    1457              : Datum
    1458            0 : pg_replication_origin_oid(PG_FUNCTION_ARGS)
    1459              : {
    1460              :     char       *name;
    1461              :     ReplOriginId roident;
    1462              : 
    1463            0 :     replorigin_check_prerequisites(false, false);
    1464              : 
    1465            0 :     name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
    1466            0 :     roident = replorigin_by_name(name, true);
    1467              : 
    1468            0 :     pfree(name);
    1469              : 
    1470            0 :     if (OidIsValid(roident))
    1471            0 :         PG_RETURN_OID(roident);
    1472            0 :     PG_RETURN_NULL();
    1473              : }
    1474              : 
    1475              : /*
    1476              :  * Setup a replication origin for this session.
    1477              :  */
    1478              : Datum
    1479           11 : pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
    1480              : {
    1481              :     char       *name;
    1482              :     ReplOriginId origin;
    1483              :     int         pid;
    1484              : 
    1485           11 :     replorigin_check_prerequisites(true, false);
    1486              : 
    1487           11 :     name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
    1488           11 :     origin = replorigin_by_name(name, false);
    1489           10 :     pid = PG_GETARG_INT32(1);
    1490           10 :     replorigin_session_setup(origin, pid);
    1491              : 
    1492            8 :     replorigin_xact_state.origin = origin;
    1493              : 
    1494            8 :     pfree(name);
    1495              : 
    1496            8 :     PG_RETURN_VOID();
    1497              : }
    1498              : 
    1499              : /*
    1500              :  * Reset previously setup origin in this session
    1501              :  */
    1502              : Datum
    1503           10 : pg_replication_origin_session_reset(PG_FUNCTION_ARGS)
    1504              : {
    1505           10 :     replorigin_check_prerequisites(true, false);
    1506              : 
    1507           10 :     replorigin_session_reset();
    1508              : 
    1509            8 :     replorigin_xact_clear(true);
    1510              : 
    1511            8 :     PG_RETURN_VOID();
    1512              : }
    1513              : 
    1514              : /*
    1515              :  * Has a replication origin been setup for this session.
    1516              :  */
    1517              : Datum
    1518            4 : pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)
    1519              : {
    1520            4 :     replorigin_check_prerequisites(false, false);
    1521              : 
    1522            4 :     PG_RETURN_BOOL(replorigin_xact_state.origin != InvalidReplOriginId);
    1523              : }
    1524              : 
    1525              : 
    1526              : /*
    1527              :  * Return the replication progress for origin setup in the current session.
    1528              :  *
    1529              :  * If 'flush' is set to true it is ensured that the returned value corresponds
    1530              :  * to a local transaction that has been flushed. This is useful if asynchronous
    1531              :  * commits are used when replaying replicated transactions.
    1532              :  */
    1533              : Datum
    1534            2 : pg_replication_origin_session_progress(PG_FUNCTION_ARGS)
    1535              : {
    1536            2 :     XLogRecPtr  remote_lsn = InvalidXLogRecPtr;
    1537            2 :     bool        flush = PG_GETARG_BOOL(0);
    1538              : 
    1539            2 :     replorigin_check_prerequisites(true, false);
    1540              : 
    1541            2 :     if (session_replication_state == NULL)
    1542            0 :         ereport(ERROR,
    1543              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1544              :                  errmsg("no replication origin is configured")));
    1545              : 
    1546            2 :     remote_lsn = replorigin_session_get_progress(flush);
    1547              : 
    1548            2 :     if (!XLogRecPtrIsValid(remote_lsn))
    1549            0 :         PG_RETURN_NULL();
    1550              : 
    1551            2 :     PG_RETURN_LSN(remote_lsn);
    1552              : }
    1553              : 
    1554              : Datum
    1555            1 : pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)
    1556              : {
    1557            1 :     XLogRecPtr  location = PG_GETARG_LSN(0);
    1558              : 
    1559            1 :     replorigin_check_prerequisites(true, false);
    1560              : 
    1561            1 :     if (session_replication_state == NULL)
    1562            0 :         ereport(ERROR,
    1563              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1564              :                  errmsg("no replication origin is configured")));
    1565              : 
    1566            1 :     replorigin_xact_state.origin_lsn = location;
    1567            1 :     replorigin_xact_state.origin_timestamp = PG_GETARG_TIMESTAMPTZ(1);
    1568              : 
    1569            1 :     PG_RETURN_VOID();
    1570              : }
    1571              : 
    1572              : Datum
    1573            0 : pg_replication_origin_xact_reset(PG_FUNCTION_ARGS)
    1574              : {
    1575            0 :     replorigin_check_prerequisites(true, false);
    1576              : 
    1577              :     /* Do not clear the session origin */
    1578            0 :     replorigin_xact_clear(false);
    1579              : 
    1580            0 :     PG_RETURN_VOID();
    1581              : }
    1582              : 
    1583              : 
    1584              : Datum
    1585            3 : pg_replication_origin_advance(PG_FUNCTION_ARGS)
    1586              : {
    1587            3 :     text       *name = PG_GETARG_TEXT_PP(0);
    1588            3 :     XLogRecPtr  remote_commit = PG_GETARG_LSN(1);
    1589              :     ReplOriginId node;
    1590              : 
    1591            3 :     replorigin_check_prerequisites(true, false);
    1592              : 
    1593              :     /* lock to prevent the replication origin from vanishing */
    1594            3 :     LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
    1595              : 
    1596            3 :     node = replorigin_by_name(text_to_cstring(name), false);
    1597              : 
    1598              :     /*
    1599              :      * Can't sensibly pass a local commit to be flushed at checkpoint - this
    1600              :      * xact hasn't committed yet. This is why this function should be used to
    1601              :      * set up the initial replication state, but not for replay.
    1602              :      */
    1603            2 :     replorigin_advance(node, remote_commit, InvalidXLogRecPtr,
    1604              :                        true /* go backward */ , true /* WAL log */ );
    1605              : 
    1606            2 :     UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
    1607              : 
    1608            2 :     PG_RETURN_VOID();
    1609              : }
    1610              : 
    1611              : 
    1612              : /*
    1613              :  * Return the replication progress for an individual replication origin.
    1614              :  *
    1615              :  * If 'flush' is set to true it is ensured that the returned value corresponds
    1616              :  * to a local transaction that has been flushed. This is useful if asynchronous
    1617              :  * commits are used when replaying replicated transactions.
    1618              :  */
    1619              : Datum
    1620            3 : pg_replication_origin_progress(PG_FUNCTION_ARGS)
    1621              : {
    1622              :     char       *name;
    1623              :     bool        flush;
    1624              :     ReplOriginId roident;
    1625            3 :     XLogRecPtr  remote_lsn = InvalidXLogRecPtr;
    1626              : 
    1627            3 :     replorigin_check_prerequisites(true, true);
    1628              : 
    1629            3 :     name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
    1630            3 :     flush = PG_GETARG_BOOL(1);
    1631              : 
    1632            3 :     roident = replorigin_by_name(name, false);
    1633              :     Assert(OidIsValid(roident));
    1634              : 
    1635            2 :     remote_lsn = replorigin_get_progress(roident, flush);
    1636              : 
    1637            2 :     if (!XLogRecPtrIsValid(remote_lsn))
    1638            0 :         PG_RETURN_NULL();
    1639              : 
    1640            2 :     PG_RETURN_LSN(remote_lsn);
    1641              : }
    1642              : 
    1643              : 
    1644              : Datum
    1645           11 : pg_show_replication_origin_status(PG_FUNCTION_ARGS)
    1646              : {
    1647           11 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
    1648              :     int         i;
    1649              : #define REPLICATION_ORIGIN_PROGRESS_COLS 4
    1650              : 
    1651              :     /* we want to return 0 rows if slot is set to zero */
    1652           11 :     replorigin_check_prerequisites(false, true);
    1653              : 
    1654           11 :     InitMaterializedSRF(fcinfo, 0);
    1655              : 
    1656              :     /* prevent slots from being concurrently dropped */
    1657           11 :     LWLockAcquire(ReplicationOriginLock, LW_SHARED);
    1658              : 
    1659              :     /*
    1660              :      * Iterate through all possible replication_states, display if they are
    1661              :      * filled. Note that we do not take any locks, so slightly corrupted/out
    1662              :      * of date values are a possibility.
    1663              :      */
    1664          121 :     for (i = 0; i < max_active_replication_origins; i++)
    1665              :     {
    1666              :         ReplicationState *state;
    1667              :         Datum       values[REPLICATION_ORIGIN_PROGRESS_COLS];
    1668              :         bool        nulls[REPLICATION_ORIGIN_PROGRESS_COLS];
    1669              :         char       *roname;
    1670              : 
    1671          110 :         state = &replication_states[i];
    1672              : 
    1673              :         /* unused slot, nothing to display */
    1674          110 :         if (state->roident == InvalidReplOriginId)
    1675           97 :             continue;
    1676              : 
    1677           13 :         memset(values, 0, sizeof(values));
    1678           13 :         memset(nulls, 1, sizeof(nulls));
    1679              : 
    1680           13 :         values[0] = ObjectIdGetDatum(state->roident);
    1681           13 :         nulls[0] = false;
    1682              : 
    1683              :         /*
    1684              :          * We're not preventing the origin to be dropped concurrently, so
    1685              :          * silently accept that it might be gone.
    1686              :          */
    1687           13 :         if (replorigin_by_oid(state->roident, true,
    1688              :                               &roname))
    1689              :         {
    1690           13 :             values[1] = CStringGetTextDatum(roname);
    1691           13 :             nulls[1] = false;
    1692              :         }
    1693              : 
    1694           13 :         LWLockAcquire(&state->lock, LW_SHARED);
    1695              : 
    1696           13 :         values[2] = LSNGetDatum(state->remote_lsn);
    1697           13 :         nulls[2] = false;
    1698              : 
    1699           13 :         values[3] = LSNGetDatum(state->local_lsn);
    1700           13 :         nulls[3] = false;
    1701              : 
    1702           13 :         LWLockRelease(&state->lock);
    1703              : 
    1704           13 :         tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
    1705              :                              values, nulls);
    1706              :     }
    1707              : 
    1708           11 :     LWLockRelease(ReplicationOriginLock);
    1709              : 
    1710              : #undef REPLICATION_ORIGIN_PROGRESS_COLS
    1711              : 
    1712           11 :     return (Datum) 0;
    1713              : }
        

Generated by: LCOV version 2.0-1