LCOV - code coverage report
Current view: top level - src/backend/replication/logical - snapbuild.c (source / functions) Hit Total Coverage
Test: PostgreSQL 19devel Lines: 473 559 84.6 %
Date: 2025-12-13 12:18:31 Functions: 32 32 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * snapbuild.c
       4             :  *
       5             :  *    Infrastructure for building historic catalog snapshots based on contents
       6             :  *    of the WAL, for the purpose of decoding heapam.c style values in the
       7             :  *    WAL.
       8             :  *
       9             :  * NOTES:
      10             :  *
      11             :  * We build snapshots which can *only* be used to read catalog contents and we
      12             :  * do so by reading and interpreting the WAL stream. The aim is to build a
      13             :  * snapshot that behaves the same as a freshly taken MVCC snapshot would have
      14             :  * at the time the XLogRecord was generated.
      15             :  *
      16             :  * To build the snapshots we reuse the infrastructure built for Hot
      17             :  * Standby. The in-memory snapshots we build look different than HS' because
      18             :  * we have different needs. To successfully decode data from the WAL we only
      19             :  * need to access catalog tables and (sys|rel|cat)cache, not the actual user
      20             :  * tables since the data we decode is wholly contained in the WAL
      21             :  * records. Also, our snapshots need to be different in comparison to normal
      22             :  * MVCC ones because in contrast to those we cannot fully rely on the clog and
      23             :  * pg_subtrans for information about committed transactions because they might
      24             :  * commit in the future from the POV of the WAL entry we're currently
      25             :  * decoding. This definition has the advantage that we only need to prevent
      26             :  * removal of catalog rows, while normal table's rows can still be
      27             :  * removed. This is achieved by using the replication slot mechanism.
      28             :  *
      29             :  * As the percentage of transactions modifying the catalog normally is fairly
      30             :  * small in comparisons to ones only manipulating user data, we keep track of
      31             :  * the committed catalog modifying ones inside [xmin, xmax) instead of keeping
      32             :  * track of all running transactions like it's done in a normal snapshot. Note
      33             :  * that we're generally only looking at transactions that have acquired an
      34             :  * xid. That is we keep a list of transactions between snapshot->(xmin, xmax)
      35             :  * that we consider committed, everything else is considered aborted/in
      36             :  * progress. That also allows us not to care about subtransactions before they
      37             :  * have committed which means this module, in contrast to HS, doesn't have to
      38             :  * care about suboverflowed subtransactions and similar.
      39             :  *
      40             :  * One complexity of doing this is that to e.g. handle mixed DDL/DML
      41             :  * transactions we need Snapshots that see intermediate versions of the
      42             :  * catalog in a transaction. During normal operation this is achieved by using
      43             :  * CommandIds/cmin/cmax. The problem with that however is that for space
      44             :  * efficiency reasons, the cmin and cmax are not included in WAL records. We
      45             :  * cannot read the cmin/cmax from the tuple itself, either, because it is
      46             :  * reset on crash recovery. Even if we could, we could not decode combocids
      47             :  * which are only tracked in the original backend's memory. To work around
      48             :  * that, heapam writes an extra WAL record (XLOG_HEAP2_NEW_CID) every time a
      49             :  * catalog row is modified, which includes the cmin and cmax of the
      50             :  * tuple. During decoding, we insert the ctid->(cmin,cmax) mappings into the
      51             :  * reorder buffer, and use them at visibility checks instead of the cmin/cmax
      52             :  * on the tuple itself. Check the reorderbuffer.c's comment above
      53             :  * ResolveCminCmaxDuringDecoding() for details.
      54             :  *
      55             :  * To facilitate all this we need our own visibility routine, as the normal
      56             :  * ones are optimized for different usecases.
      57             :  *
      58             :  * To replace the normal catalog snapshots with decoding ones use the
      59             :  * SetupHistoricSnapshot() and TeardownHistoricSnapshot() functions.
      60             :  *
      61             :  *
      62             :  *
      63             :  * The snapbuild machinery is starting up in several stages, as illustrated
      64             :  * by the following graph describing the SnapBuild->state transitions:
      65             :  *
      66             :  *         +-------------------------+
      67             :  *    +----|         START           |-------------+
      68             :  *    |    +-------------------------+             |
      69             :  *    |                 |                          |
      70             :  *    |                 |                          |
      71             :  *    |        running_xacts #1                    |
      72             :  *    |                 |                          |
      73             :  *    |                 |                          |
      74             :  *    |                 v                          |
      75             :  *    |    +-------------------------+             v
      76             :  *    |    |   BUILDING_SNAPSHOT     |------------>|
      77             :  *    |    +-------------------------+             |
      78             :  *    |                 |                          |
      79             :  *    |                 |                          |
      80             :  *    | running_xacts #2, xacts from #1 finished   |
      81             :  *    |                 |                          |
      82             :  *    |                 |                          |
      83             :  *    |                 v                          |
      84             :  *    |    +-------------------------+             v
      85             :  *    |    |       FULL_SNAPSHOT     |------------>|
      86             :  *    |    +-------------------------+             |
      87             :  *    |                 |                          |
      88             :  * running_xacts        |                      saved snapshot
      89             :  * with zero xacts      |                 at running_xacts's lsn
      90             :  *    |                 |                          |
      91             :  *    | running_xacts with xacts from #2 finished  |
      92             :  *    |                 |                          |
      93             :  *    |                 v                          |
      94             :  *    |    +-------------------------+             |
      95             :  *    +--->|SNAPBUILD_CONSISTENT  |<------------+
      96             :  *         +-------------------------+
      97             :  *
      98             :  * Initially the machinery is in the START stage. When an xl_running_xacts
      99             :  * record is read that is sufficiently new (above the safe xmin horizon),
     100             :  * there's a state transition. If there were no running xacts when the
     101             :  * xl_running_xacts record was generated, we'll directly go into CONSISTENT
     102             :  * state, otherwise we'll switch to the BUILDING_SNAPSHOT state. Having a full
     103             :  * snapshot means that all transactions that start henceforth can be decoded
     104             :  * in their entirety, but transactions that started previously can't. In
     105             :  * FULL_SNAPSHOT we'll switch into CONSISTENT once all those previously
     106             :  * running transactions have committed or aborted.
     107             :  *
     108             :  * Only transactions that commit after CONSISTENT state has been reached will
     109             :  * be replayed, even though they might have started while still in
     110             :  * FULL_SNAPSHOT. That ensures that we'll reach a point where no previous
     111             :  * changes has been exported, but all the following ones will be. That point
     112             :  * is a convenient point to initialize replication from, which is why we
     113             :  * export a snapshot at that point, which *can* be used to read normal data.
     114             :  *
     115             :  * Copyright (c) 2012-2025, PostgreSQL Global Development Group
     116             :  *
     117             :  * IDENTIFICATION
     118             :  *    src/backend/replication/logical/snapbuild.c
     119             :  *
     120             :  *-------------------------------------------------------------------------
     121             :  */
     122             : 
     123             : #include "postgres.h"
     124             : 
     125             : #include <sys/stat.h>
     126             : #include <unistd.h>
     127             : 
     128             : #include "access/heapam_xlog.h"
     129             : #include "access/transam.h"
     130             : #include "access/xact.h"
     131             : #include "common/file_utils.h"
     132             : #include "miscadmin.h"
     133             : #include "pgstat.h"
     134             : #include "replication/logical.h"
     135             : #include "replication/reorderbuffer.h"
     136             : #include "replication/snapbuild.h"
     137             : #include "replication/snapbuild_internal.h"
     138             : #include "storage/fd.h"
     139             : #include "storage/lmgr.h"
     140             : #include "storage/proc.h"
     141             : #include "storage/procarray.h"
     142             : #include "storage/standby.h"
     143             : #include "utils/builtins.h"
     144             : #include "utils/memutils.h"
     145             : #include "utils/snapmgr.h"
     146             : #include "utils/snapshot.h"
     147             : /*
     148             :  * Starting a transaction -- which we need to do while exporting a snapshot --
     149             :  * removes knowledge about the previously used resowner, so we save it here.
     150             :  */
     151             : static ResourceOwner SavedResourceOwnerDuringExport = NULL;
     152             : static bool ExportInProgress = false;
     153             : 
     154             : /* ->committed and ->catchange manipulation */
     155             : static void SnapBuildPurgeOlderTxn(SnapBuild *builder);
     156             : 
     157             : /* snapshot building/manipulation/distribution functions */
     158             : static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder);
     159             : 
     160             : static void SnapBuildFreeSnapshot(Snapshot snap);
     161             : 
     162             : static void SnapBuildSnapIncRefcount(Snapshot snap);
     163             : 
     164             : static void SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid);
     165             : 
     166             : static inline bool SnapBuildXidHasCatalogChanges(SnapBuild *builder, TransactionId xid,
     167             :                                                  uint32 xinfo);
     168             : 
     169             : /* xlog reading helper functions for SnapBuildProcessRunningXacts */
     170             : static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running);
     171             : static void SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff);
     172             : 
     173             : /* serialization functions */
     174             : static void SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn);
     175             : static bool SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn);
     176             : static void SnapBuildRestoreContents(int fd, void *dest, Size size, const char *path);
     177             : 
     178             : /*
     179             :  * Allocate a new snapshot builder.
     180             :  *
     181             :  * xmin_horizon is the xid >= which we can be sure no catalog rows have been
     182             :  * removed, start_lsn is the LSN >= we want to replay commits.
     183             :  */
     184             : SnapBuild *
     185        2210 : AllocateSnapshotBuilder(ReorderBuffer *reorder,
     186             :                         TransactionId xmin_horizon,
     187             :                         XLogRecPtr start_lsn,
     188             :                         bool need_full_snapshot,
     189             :                         bool in_slot_creation,
     190             :                         XLogRecPtr two_phase_at)
     191             : {
     192             :     MemoryContext context;
     193             :     MemoryContext oldcontext;
     194             :     SnapBuild  *builder;
     195             : 
     196             :     /* allocate memory in own context, to have better accountability */
     197        2210 :     context = AllocSetContextCreate(CurrentMemoryContext,
     198             :                                     "snapshot builder context",
     199             :                                     ALLOCSET_DEFAULT_SIZES);
     200        2210 :     oldcontext = MemoryContextSwitchTo(context);
     201             : 
     202        2210 :     builder = palloc0_object(SnapBuild);
     203             : 
     204        2210 :     builder->state = SNAPBUILD_START;
     205        2210 :     builder->context = context;
     206        2210 :     builder->reorder = reorder;
     207             :     /* Other struct members initialized by zeroing via palloc0 above */
     208             : 
     209        2210 :     builder->committed.xcnt = 0;
     210        2210 :     builder->committed.xcnt_space = 128; /* arbitrary number */
     211        2210 :     builder->committed.xip =
     212        2210 :         palloc0(builder->committed.xcnt_space * sizeof(TransactionId));
     213        2210 :     builder->committed.includes_all_transactions = true;
     214             : 
     215        2210 :     builder->catchange.xcnt = 0;
     216        2210 :     builder->catchange.xip = NULL;
     217             : 
     218        2210 :     builder->initial_xmin_horizon = xmin_horizon;
     219        2210 :     builder->start_decoding_at = start_lsn;
     220        2210 :     builder->in_slot_creation = in_slot_creation;
     221        2210 :     builder->building_full_snapshot = need_full_snapshot;
     222        2210 :     builder->two_phase_at = two_phase_at;
     223             : 
     224        2210 :     MemoryContextSwitchTo(oldcontext);
     225             : 
     226        2210 :     return builder;
     227             : }
     228             : 
     229             : /*
     230             :  * Free a snapshot builder.
     231             :  */
     232             : void
     233        1754 : FreeSnapshotBuilder(SnapBuild *builder)
     234             : {
     235        1754 :     MemoryContext context = builder->context;
     236             : 
     237             :     /* free snapshot explicitly, that contains some error checking */
     238        1754 :     if (builder->snapshot != NULL)
     239             :     {
     240         426 :         SnapBuildSnapDecRefcount(builder->snapshot);
     241         426 :         builder->snapshot = NULL;
     242             :     }
     243             : 
     244             :     /* other resources are deallocated via memory context reset */
     245        1754 :     MemoryContextDelete(context);
     246        1754 : }
     247             : 
     248             : /*
     249             :  * Free an unreferenced snapshot that has previously been built by us.
     250             :  */
     251             : static void
     252        2738 : SnapBuildFreeSnapshot(Snapshot snap)
     253             : {
     254             :     /* make sure we don't get passed an external snapshot */
     255             :     Assert(snap->snapshot_type == SNAPSHOT_HISTORIC_MVCC);
     256             : 
     257             :     /* make sure nobody modified our snapshot */
     258             :     Assert(snap->curcid == FirstCommandId);
     259             :     Assert(!snap->suboverflowed);
     260             :     Assert(!snap->takenDuringRecovery);
     261             :     Assert(snap->regd_count == 0);
     262             : 
     263             :     /* slightly more likely, so it's checked even without c-asserts */
     264        2738 :     if (snap->copied)
     265           0 :         elog(ERROR, "cannot free a copied snapshot");
     266             : 
     267        2738 :     if (snap->active_count)
     268           0 :         elog(ERROR, "cannot free an active snapshot");
     269             : 
     270        2738 :     pfree(snap);
     271        2738 : }
     272             : 
     273             : /*
     274             :  * In which state of snapshot building are we?
     275             :  */
     276             : SnapBuildState
     277     4241770 : SnapBuildCurrentState(SnapBuild *builder)
     278             : {
     279     4241770 :     return builder->state;
     280             : }
     281             : 
     282             : /*
     283             :  * Return the LSN at which the two-phase decoding was first enabled.
     284             :  */
     285             : XLogRecPtr
     286          62 : SnapBuildGetTwoPhaseAt(SnapBuild *builder)
     287             : {
     288          62 :     return builder->two_phase_at;
     289             : }
     290             : 
     291             : /*
     292             :  * Set the LSN at which two-phase decoding is enabled.
     293             :  */
     294             : void
     295          16 : SnapBuildSetTwoPhaseAt(SnapBuild *builder, XLogRecPtr ptr)
     296             : {
     297          16 :     builder->two_phase_at = ptr;
     298          16 : }
     299             : 
     300             : /*
     301             :  * Should the contents of transaction ending at 'ptr' be decoded?
     302             :  */
     303             : bool
     304      728384 : SnapBuildXactNeedsSkip(SnapBuild *builder, XLogRecPtr ptr)
     305             : {
     306      728384 :     return ptr < builder->start_decoding_at;
     307             : }
     308             : 
     309             : /*
     310             :  * Increase refcount of a snapshot.
     311             :  *
     312             :  * This is used when handing out a snapshot to some external resource or when
     313             :  * adding a Snapshot as builder->snapshot.
     314             :  */
     315             : static void
     316       12118 : SnapBuildSnapIncRefcount(Snapshot snap)
     317             : {
     318       12118 :     snap->active_count++;
     319       12118 : }
     320             : 
     321             : /*
     322             :  * Decrease refcount of a snapshot and free if the refcount reaches zero.
     323             :  *
     324             :  * Externally visible, so that external resources that have been handed an
     325             :  * IncRef'ed Snapshot can adjust its refcount easily.
     326             :  */
     327             : void
     328       11560 : SnapBuildSnapDecRefcount(Snapshot snap)
     329             : {
     330             :     /* make sure we don't get passed an external snapshot */
     331             :     Assert(snap->snapshot_type == SNAPSHOT_HISTORIC_MVCC);
     332             : 
     333             :     /* make sure nobody modified our snapshot */
     334             :     Assert(snap->curcid == FirstCommandId);
     335             :     Assert(!snap->suboverflowed);
     336             :     Assert(!snap->takenDuringRecovery);
     337             : 
     338             :     Assert(snap->regd_count == 0);
     339             : 
     340             :     Assert(snap->active_count > 0);
     341             : 
     342             :     /* slightly more likely, so it's checked even without casserts */
     343       11560 :     if (snap->copied)
     344           0 :         elog(ERROR, "cannot free a copied snapshot");
     345             : 
     346       11560 :     snap->active_count--;
     347       11560 :     if (snap->active_count == 0)
     348        2738 :         SnapBuildFreeSnapshot(snap);
     349       11560 : }
     350             : 
     351             : /*
     352             :  * Build a new snapshot, based on currently committed catalog-modifying
     353             :  * transactions.
     354             :  *
     355             :  * In-progress transactions with catalog access are *not* allowed to modify
     356             :  * these snapshots; they have to copy them and fill in appropriate ->curcid
     357             :  * and ->subxip/subxcnt values.
     358             :  */
     359             : static Snapshot
     360        3608 : SnapBuildBuildSnapshot(SnapBuild *builder)
     361             : {
     362             :     Snapshot    snapshot;
     363             :     Size        ssize;
     364             : 
     365             :     Assert(builder->state >= SNAPBUILD_FULL_SNAPSHOT);
     366             : 
     367        3608 :     ssize = sizeof(SnapshotData)
     368        3608 :         + sizeof(TransactionId) * builder->committed.xcnt
     369        3608 :         + sizeof(TransactionId) * 1 /* toplevel xid */ ;
     370             : 
     371        3608 :     snapshot = MemoryContextAllocZero(builder->context, ssize);
     372             : 
     373        3608 :     snapshot->snapshot_type = SNAPSHOT_HISTORIC_MVCC;
     374             : 
     375             :     /*
     376             :      * We misuse the original meaning of SnapshotData's xip and subxip fields
     377             :      * to make the more fitting for our needs.
     378             :      *
     379             :      * In the 'xip' array we store transactions that have to be treated as
     380             :      * committed. Since we will only ever look at tuples from transactions
     381             :      * that have modified the catalog it's more efficient to store those few
     382             :      * that exist between xmin and xmax (frequently there are none).
     383             :      *
     384             :      * Snapshots that are used in transactions that have modified the catalog
     385             :      * also use the 'subxip' array to store their toplevel xid and all the
     386             :      * subtransaction xids so we can recognize when we need to treat rows as
     387             :      * visible that are not in xip but still need to be visible. Subxip only
     388             :      * gets filled when the transaction is copied into the context of a
     389             :      * catalog modifying transaction since we otherwise share a snapshot
     390             :      * between transactions. As long as a txn hasn't modified the catalog it
     391             :      * doesn't need to treat any uncommitted rows as visible, so there is no
     392             :      * need for those xids.
     393             :      *
     394             :      * Both arrays are qsort'ed so that we can use bsearch() on them.
     395             :      */
     396             :     Assert(TransactionIdIsNormal(builder->xmin));
     397             :     Assert(TransactionIdIsNormal(builder->xmax));
     398             : 
     399        3608 :     snapshot->xmin = builder->xmin;
     400        3608 :     snapshot->xmax = builder->xmax;
     401             : 
     402             :     /* store all transactions to be treated as committed by this snapshot */
     403        3608 :     snapshot->xip =
     404        3608 :         (TransactionId *) ((char *) snapshot + sizeof(SnapshotData));
     405        3608 :     snapshot->xcnt = builder->committed.xcnt;
     406        3608 :     memcpy(snapshot->xip,
     407        3608 :            builder->committed.xip,
     408        3608 :            builder->committed.xcnt * sizeof(TransactionId));
     409             : 
     410             :     /* sort so we can bsearch() */
     411        3608 :     qsort(snapshot->xip, snapshot->xcnt, sizeof(TransactionId), xidComparator);
     412             : 
     413             :     /*
     414             :      * Initially, subxip is empty, i.e. it's a snapshot to be used by
     415             :      * transactions that don't modify the catalog. Will be filled by
     416             :      * ReorderBufferCopySnap() if necessary.
     417             :      */
     418        3608 :     snapshot->subxcnt = 0;
     419        3608 :     snapshot->subxip = NULL;
     420             : 
     421        3608 :     snapshot->suboverflowed = false;
     422        3608 :     snapshot->takenDuringRecovery = false;
     423        3608 :     snapshot->copied = false;
     424        3608 :     snapshot->curcid = FirstCommandId;
     425        3608 :     snapshot->active_count = 0;
     426        3608 :     snapshot->regd_count = 0;
     427        3608 :     snapshot->snapXactCompletionCount = 0;
     428             : 
     429        3608 :     return snapshot;
     430             : }
     431             : 
     432             : /*
     433             :  * Build the initial slot snapshot and convert it to a normal snapshot that
     434             :  * is understood by HeapTupleSatisfiesMVCC.
     435             :  *
     436             :  * The snapshot will be usable directly in current transaction or exported
     437             :  * for loading in different transaction.
     438             :  */
     439             : Snapshot
     440         396 : SnapBuildInitialSnapshot(SnapBuild *builder)
     441             : {
     442             :     Snapshot    snap;
     443             :     TransactionId xid;
     444             :     TransactionId safeXid;
     445             :     TransactionId *newxip;
     446         396 :     int         newxcnt = 0;
     447             : 
     448             :     Assert(XactIsoLevel == XACT_REPEATABLE_READ);
     449             :     Assert(builder->building_full_snapshot);
     450             : 
     451             :     /* don't allow older snapshots */
     452         396 :     InvalidateCatalogSnapshot();    /* about to overwrite MyProc->xmin */
     453         396 :     if (HaveRegisteredOrActiveSnapshot())
     454           0 :         elog(ERROR, "cannot build an initial slot snapshot when snapshots exist");
     455             :     Assert(!HistoricSnapshotActive());
     456             : 
     457         396 :     if (builder->state != SNAPBUILD_CONSISTENT)
     458           0 :         elog(ERROR, "cannot build an initial slot snapshot before reaching a consistent state");
     459             : 
     460         396 :     if (!builder->committed.includes_all_transactions)
     461           0 :         elog(ERROR, "cannot build an initial slot snapshot, not all transactions are monitored anymore");
     462             : 
     463             :     /* so we don't overwrite the existing value */
     464         396 :     if (TransactionIdIsValid(MyProc->xmin))
     465           0 :         elog(ERROR, "cannot build an initial slot snapshot when MyProc->xmin already is valid");
     466             : 
     467         396 :     snap = SnapBuildBuildSnapshot(builder);
     468             : 
     469             :     /*
     470             :      * We know that snap->xmin is alive, enforced by the logical xmin
     471             :      * mechanism. Due to that we can do this without locks, we're only
     472             :      * changing our own value.
     473             :      *
     474             :      * Building an initial snapshot is expensive and an unenforced xmin
     475             :      * horizon would have bad consequences, therefore always double-check that
     476             :      * the horizon is enforced.
     477             :      */
     478         396 :     LWLockAcquire(ProcArrayLock, LW_SHARED);
     479         396 :     safeXid = GetOldestSafeDecodingTransactionId(false);
     480         396 :     LWLockRelease(ProcArrayLock);
     481             : 
     482         396 :     if (TransactionIdFollows(safeXid, snap->xmin))
     483           0 :         elog(ERROR, "cannot build an initial slot snapshot as oldest safe xid %u follows snapshot's xmin %u",
     484             :              safeXid, snap->xmin);
     485             : 
     486         396 :     MyProc->xmin = snap->xmin;
     487             : 
     488             :     /* allocate in transaction context */
     489         396 :     newxip = palloc_array(TransactionId, GetMaxSnapshotXidCount());
     490             : 
     491             :     /*
     492             :      * snapbuild.c builds transactions in an "inverted" manner, which means it
     493             :      * stores committed transactions in ->xip, not ones in progress. Build a
     494             :      * classical snapshot by marking all non-committed transactions as
     495             :      * in-progress. This can be expensive.
     496             :      */
     497         396 :     for (xid = snap->xmin; NormalTransactionIdPrecedes(xid, snap->xmax);)
     498             :     {
     499             :         void       *test;
     500             : 
     501             :         /*
     502             :          * Check whether transaction committed using the decoding snapshot
     503             :          * meaning of ->xip.
     504             :          */
     505           0 :         test = bsearch(&xid, snap->xip, snap->xcnt,
     506             :                        sizeof(TransactionId), xidComparator);
     507             : 
     508           0 :         if (test == NULL)
     509             :         {
     510           0 :             if (newxcnt >= GetMaxSnapshotXidCount())
     511           0 :                 ereport(ERROR,
     512             :                         (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
     513             :                          errmsg("initial slot snapshot too large")));
     514             : 
     515           0 :             newxip[newxcnt++] = xid;
     516             :         }
     517             : 
     518           0 :         TransactionIdAdvance(xid);
     519             :     }
     520             : 
     521             :     /* adjust remaining snapshot fields as needed */
     522         396 :     snap->snapshot_type = SNAPSHOT_MVCC;
     523         396 :     snap->xcnt = newxcnt;
     524         396 :     snap->xip = newxip;
     525             : 
     526         396 :     return snap;
     527             : }
     528             : 
     529             : /*
     530             :  * Export a snapshot so it can be set in another session with SET TRANSACTION
     531             :  * SNAPSHOT.
     532             :  *
     533             :  * For that we need to start a transaction in the current backend as the
     534             :  * importing side checks whether the source transaction is still open to make
     535             :  * sure the xmin horizon hasn't advanced since then.
     536             :  */
     537             : const char *
     538           2 : SnapBuildExportSnapshot(SnapBuild *builder)
     539             : {
     540             :     Snapshot    snap;
     541             :     char       *snapname;
     542             : 
     543           2 :     if (IsTransactionOrTransactionBlock())
     544           0 :         elog(ERROR, "cannot export a snapshot from within a transaction");
     545             : 
     546           2 :     if (SavedResourceOwnerDuringExport)
     547           0 :         elog(ERROR, "can only export one snapshot at a time");
     548             : 
     549           2 :     SavedResourceOwnerDuringExport = CurrentResourceOwner;
     550           2 :     ExportInProgress = true;
     551             : 
     552           2 :     StartTransactionCommand();
     553             : 
     554             :     /* There doesn't seem to a nice API to set these */
     555           2 :     XactIsoLevel = XACT_REPEATABLE_READ;
     556           2 :     XactReadOnly = true;
     557             : 
     558           2 :     snap = SnapBuildInitialSnapshot(builder);
     559             : 
     560             :     /*
     561             :      * now that we've built a plain snapshot, make it active and use the
     562             :      * normal mechanisms for exporting it
     563             :      */
     564           2 :     snapname = ExportSnapshot(snap);
     565             : 
     566           2 :     ereport(LOG,
     567             :             (errmsg_plural("exported logical decoding snapshot: \"%s\" with %u transaction ID",
     568             :                            "exported logical decoding snapshot: \"%s\" with %u transaction IDs",
     569             :                            snap->xcnt,
     570             :                            snapname, snap->xcnt)));
     571           2 :     return snapname;
     572             : }
     573             : 
     574             : /*
     575             :  * Ensure there is a snapshot and if not build one for current transaction.
     576             :  */
     577             : Snapshot
     578          16 : SnapBuildGetOrBuildSnapshot(SnapBuild *builder)
     579             : {
     580             :     Assert(builder->state == SNAPBUILD_CONSISTENT);
     581             : 
     582             :     /* only build a new snapshot if we don't have a prebuilt one */
     583          16 :     if (builder->snapshot == NULL)
     584             :     {
     585           2 :         builder->snapshot = SnapBuildBuildSnapshot(builder);
     586             :         /* increase refcount for the snapshot builder */
     587           2 :         SnapBuildSnapIncRefcount(builder->snapshot);
     588             :     }
     589             : 
     590          16 :     return builder->snapshot;
     591             : }
     592             : 
     593             : /*
     594             :  * Reset a previously SnapBuildExportSnapshot()'ed snapshot if there is
     595             :  * any. Aborts the previously started transaction and resets the resource
     596             :  * owner back to its original value.
     597             :  */
     598             : void
     599       10670 : SnapBuildClearExportedSnapshot(void)
     600             : {
     601             :     ResourceOwner tmpResOwner;
     602             : 
     603             :     /* nothing exported, that is the usual case */
     604       10670 :     if (!ExportInProgress)
     605       10668 :         return;
     606             : 
     607           2 :     if (!IsTransactionState())
     608           0 :         elog(ERROR, "clearing exported snapshot in wrong transaction state");
     609             : 
     610             :     /*
     611             :      * AbortCurrentTransaction() takes care of resetting the snapshot state,
     612             :      * so remember SavedResourceOwnerDuringExport.
     613             :      */
     614           2 :     tmpResOwner = SavedResourceOwnerDuringExport;
     615             : 
     616             :     /* make sure nothing could have ever happened */
     617           2 :     AbortCurrentTransaction();
     618             : 
     619           2 :     CurrentResourceOwner = tmpResOwner;
     620             : }
     621             : 
     622             : /*
     623             :  * Clear snapshot export state during transaction abort.
     624             :  */
     625             : void
     626       51586 : SnapBuildResetExportedSnapshotState(void)
     627             : {
     628       51586 :     SavedResourceOwnerDuringExport = NULL;
     629       51586 :     ExportInProgress = false;
     630       51586 : }
     631             : 
     632             : /*
     633             :  * Handle the effects of a single heap change, appropriate to the current state
     634             :  * of the snapshot builder and returns whether changes made at (xid, lsn) can
     635             :  * be decoded.
     636             :  */
     637             : bool
     638     3113954 : SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
     639             : {
     640             :     /*
     641             :      * We can't handle data in transactions if we haven't built a snapshot
     642             :      * yet, so don't store them.
     643             :      */
     644     3113954 :     if (builder->state < SNAPBUILD_FULL_SNAPSHOT)
     645           0 :         return false;
     646             : 
     647             :     /*
     648             :      * No point in keeping track of changes in transactions that we don't have
     649             :      * enough information about to decode. This means that they started before
     650             :      * we got into the SNAPBUILD_FULL_SNAPSHOT state.
     651             :      */
     652     3113960 :     if (builder->state < SNAPBUILD_CONSISTENT &&
     653           6 :         TransactionIdPrecedes(xid, builder->next_phase_at))
     654           0 :         return false;
     655             : 
     656             :     /*
     657             :      * If the reorderbuffer doesn't yet have a snapshot, add one now, it will
     658             :      * be needed to decode the change we're currently processing.
     659             :      */
     660     3113954 :     if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
     661             :     {
     662             :         /* only build a new snapshot if we don't have a prebuilt one */
     663        6484 :         if (builder->snapshot == NULL)
     664             :         {
     665         800 :             builder->snapshot = SnapBuildBuildSnapshot(builder);
     666             :             /* increase refcount for the snapshot builder */
     667         800 :             SnapBuildSnapIncRefcount(builder->snapshot);
     668             :         }
     669             : 
     670             :         /*
     671             :          * Increase refcount for the transaction we're handing the snapshot
     672             :          * out to.
     673             :          */
     674        6484 :         SnapBuildSnapIncRefcount(builder->snapshot);
     675        6484 :         ReorderBufferSetBaseSnapshot(builder->reorder, xid, lsn,
     676             :                                      builder->snapshot);
     677             :     }
     678             : 
     679     3113954 :     return true;
     680             : }
     681             : 
     682             : /*
     683             :  * Do CommandId/combo CID handling after reading an xl_heap_new_cid record.
     684             :  * This implies that a transaction has done some form of write to system
     685             :  * catalogs.
     686             :  */
     687             : void
     688       48756 : SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
     689             :                        XLogRecPtr lsn, xl_heap_new_cid *xlrec)
     690             : {
     691             :     CommandId   cid;
     692             : 
     693             :     /*
     694             :      * we only log new_cid's if a catalog tuple was modified, so mark the
     695             :      * transaction as containing catalog modifications
     696             :      */
     697       48756 :     ReorderBufferXidSetCatalogChanges(builder->reorder, xid, lsn);
     698             : 
     699       48756 :     ReorderBufferAddNewTupleCids(builder->reorder, xlrec->top_xid, lsn,
     700             :                                  xlrec->target_locator, xlrec->target_tid,
     701             :                                  xlrec->cmin, xlrec->cmax,
     702             :                                  xlrec->combocid);
     703             : 
     704             :     /* figure out new command id */
     705       48756 :     if (xlrec->cmin != InvalidCommandId &&
     706       40996 :         xlrec->cmax != InvalidCommandId)
     707        6428 :         cid = Max(xlrec->cmin, xlrec->cmax);
     708       42328 :     else if (xlrec->cmax != InvalidCommandId)
     709        7760 :         cid = xlrec->cmax;
     710       34568 :     else if (xlrec->cmin != InvalidCommandId)
     711       34568 :         cid = xlrec->cmin;
     712             :     else
     713             :     {
     714           0 :         cid = InvalidCommandId; /* silence compiler */
     715           0 :         elog(ERROR, "xl_heap_new_cid record without a valid CommandId");
     716             :     }
     717             : 
     718       48756 :     ReorderBufferAddNewCommandId(builder->reorder, xid, lsn, cid + 1);
     719       48756 : }
     720             : 
     721             : /*
     722             :  * Add a new Snapshot and invalidation messages to all transactions we're
     723             :  * decoding that currently are in-progress so they can see new catalog contents
     724             :  * made by the transaction that just committed. This is necessary because those
     725             :  * in-progress transactions will use the new catalog's contents from here on
     726             :  * (at the very least everything they do needs to be compatible with newer
     727             :  * catalog contents).
     728             :  */
     729             : static void
     730        2394 : SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid)
     731             : {
     732             :     dlist_iter  txn_i;
     733             :     ReorderBufferTXN *txn;
     734             : 
     735             :     /*
     736             :      * Iterate through all toplevel transactions. This can include
     737             :      * subtransactions which we just don't yet know to be that, but that's
     738             :      * fine, they will just get an unnecessary snapshot and invalidations
     739             :      * queued.
     740             :      */
     741        4860 :     dlist_foreach(txn_i, &builder->reorder->toplevel_by_lsn)
     742             :     {
     743        2466 :         txn = dlist_container(ReorderBufferTXN, node, txn_i.cur);
     744             : 
     745             :         Assert(TransactionIdIsValid(txn->xid));
     746             : 
     747             :         /*
     748             :          * If we don't have a base snapshot yet, there are no changes in this
     749             :          * transaction which in turn implies we don't yet need a snapshot at
     750             :          * all. We'll add a snapshot when the first change gets queued.
     751             :          *
     752             :          * Similarly, we don't need to add invalidations to a transaction
     753             :          * whose base snapshot is not yet set. Once a base snapshot is built,
     754             :          * it will include the xids of committed transactions that have
     755             :          * modified the catalog, thus reflecting the new catalog contents. The
     756             :          * existing catalog cache will have already been invalidated after
     757             :          * processing the invalidations in the transaction that modified
     758             :          * catalogs, ensuring that a fresh cache is constructed during
     759             :          * decoding.
     760             :          *
     761             :          * NB: This works correctly even for subtransactions because
     762             :          * ReorderBufferAssignChild() takes care to transfer the base snapshot
     763             :          * to the top-level transaction, and while iterating the changequeue
     764             :          * we'll get the change from the subtxn.
     765             :          */
     766        2466 :         if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, txn->xid))
     767           4 :             continue;
     768             : 
     769             :         /*
     770             :          * We don't need to add snapshot or invalidations to prepared
     771             :          * transactions as they should not see the new catalog contents.
     772             :          */
     773        2462 :         if (rbtxn_is_prepared(txn))
     774          56 :             continue;
     775             : 
     776        2406 :         elog(DEBUG2, "adding a new snapshot and invalidations to %u at %X/%08X",
     777             :              txn->xid, LSN_FORMAT_ARGS(lsn));
     778             : 
     779             :         /*
     780             :          * increase the snapshot's refcount for the transaction we are handing
     781             :          * it out to
     782             :          */
     783        2406 :         SnapBuildSnapIncRefcount(builder->snapshot);
     784        2406 :         ReorderBufferAddSnapshot(builder->reorder, txn->xid, lsn,
     785             :                                  builder->snapshot);
     786             : 
     787             :         /*
     788             :          * Add invalidation messages to the reorder buffer of in-progress
     789             :          * transactions except the current committed transaction, for which we
     790             :          * will execute invalidations at the end.
     791             :          *
     792             :          * It is required, otherwise, we will end up using the stale catcache
     793             :          * contents built by the current transaction even after its decoding,
     794             :          * which should have been invalidated due to concurrent catalog
     795             :          * changing transaction.
     796             :          *
     797             :          * Distribute only the invalidation messages generated by the current
     798             :          * committed transaction. Invalidation messages received from other
     799             :          * transactions would have already been propagated to the relevant
     800             :          * in-progress transactions. This transaction would have processed
     801             :          * those invalidations, ensuring that subsequent transactions observe
     802             :          * a consistent cache state.
     803             :          */
     804        2406 :         if (txn->xid != xid)
     805             :         {
     806             :             uint32      ninvalidations;
     807          68 :             SharedInvalidationMessage *msgs = NULL;
     808             : 
     809          68 :             ninvalidations = ReorderBufferGetInvalidations(builder->reorder,
     810             :                                                            xid, &msgs);
     811             : 
     812          68 :             if (ninvalidations > 0)
     813             :             {
     814             :                 Assert(msgs != NULL);
     815             : 
     816          60 :                 ReorderBufferAddDistributedInvalidations(builder->reorder,
     817             :                                                          txn->xid, lsn,
     818             :                                                          ninvalidations, msgs);
     819             :             }
     820             :         }
     821             :     }
     822        2394 : }
     823             : 
     824             : /*
     825             :  * Keep track of a new catalog changing transaction that has committed.
     826             :  */
     827             : static void
     828        2412 : SnapBuildAddCommittedTxn(SnapBuild *builder, TransactionId xid)
     829             : {
     830             :     Assert(TransactionIdIsValid(xid));
     831             : 
     832        2412 :     if (builder->committed.xcnt == builder->committed.xcnt_space)
     833             :     {
     834           0 :         builder->committed.xcnt_space = builder->committed.xcnt_space * 2 + 1;
     835             : 
     836           0 :         elog(DEBUG1, "increasing space for committed transactions to %u",
     837             :              (uint32) builder->committed.xcnt_space);
     838             : 
     839           0 :         builder->committed.xip = repalloc(builder->committed.xip,
     840           0 :                                           builder->committed.xcnt_space * sizeof(TransactionId));
     841             :     }
     842             : 
     843             :     /*
     844             :      * TODO: It might make sense to keep the array sorted here instead of
     845             :      * doing it every time we build a new snapshot. On the other hand this
     846             :      * gets called repeatedly when a transaction with subtransactions commits.
     847             :      */
     848        2412 :     builder->committed.xip[builder->committed.xcnt++] = xid;
     849        2412 : }
     850             : 
     851             : /*
     852             :  * Remove knowledge about transactions we treat as committed or containing catalog
     853             :  * changes that are smaller than ->xmin. Those won't ever get checked via
     854             :  * the ->committed or ->catchange array, respectively. The committed xids will
     855             :  * get checked via the clog machinery.
     856             :  *
     857             :  * We can ideally remove the transaction from catchange array once it is
     858             :  * finished (committed/aborted) but that could be costly as we need to maintain
     859             :  * the xids order in the array.
     860             :  */
     861             : static void
     862         828 : SnapBuildPurgeOlderTxn(SnapBuild *builder)
     863             : {
     864             :     int         off;
     865             :     TransactionId *workspace;
     866         828 :     int         surviving_xids = 0;
     867             : 
     868             :     /* not ready yet */
     869         828 :     if (!TransactionIdIsNormal(builder->xmin))
     870           0 :         return;
     871             : 
     872             :     /* TODO: Neater algorithm than just copying and iterating? */
     873             :     workspace =
     874         828 :         MemoryContextAlloc(builder->context,
     875         828 :                            builder->committed.xcnt * sizeof(TransactionId));
     876             : 
     877             :     /* copy xids that still are interesting to workspace */
     878        1272 :     for (off = 0; off < builder->committed.xcnt; off++)
     879             :     {
     880         444 :         if (NormalTransactionIdPrecedes(builder->committed.xip[off],
     881             :                                         builder->xmin))
     882             :             ;                   /* remove */
     883             :         else
     884           2 :             workspace[surviving_xids++] = builder->committed.xip[off];
     885             :     }
     886             : 
     887             :     /* copy workspace back to persistent state */
     888         828 :     memcpy(builder->committed.xip, workspace,
     889             :            surviving_xids * sizeof(TransactionId));
     890             : 
     891         828 :     elog(DEBUG3, "purged committed transactions from %u to %u, xmin: %u, xmax: %u",
     892             :          (uint32) builder->committed.xcnt, (uint32) surviving_xids,
     893             :          builder->xmin, builder->xmax);
     894         828 :     builder->committed.xcnt = surviving_xids;
     895             : 
     896         828 :     pfree(workspace);
     897             : 
     898             :     /*
     899             :      * Purge xids in ->catchange as well. The purged array must also be sorted
     900             :      * in xidComparator order.
     901             :      */
     902         828 :     if (builder->catchange.xcnt > 0)
     903             :     {
     904             :         /*
     905             :          * Since catchange.xip is sorted, we find the lower bound of xids that
     906             :          * are still interesting.
     907             :          */
     908          18 :         for (off = 0; off < builder->catchange.xcnt; off++)
     909             :         {
     910          12 :             if (TransactionIdFollowsOrEquals(builder->catchange.xip[off],
     911             :                                              builder->xmin))
     912           2 :                 break;
     913             :         }
     914             : 
     915           8 :         surviving_xids = builder->catchange.xcnt - off;
     916             : 
     917           8 :         if (surviving_xids > 0)
     918             :         {
     919           2 :             memmove(builder->catchange.xip, &(builder->catchange.xip[off]),
     920             :                     surviving_xids * sizeof(TransactionId));
     921             :         }
     922             :         else
     923             :         {
     924           6 :             pfree(builder->catchange.xip);
     925           6 :             builder->catchange.xip = NULL;
     926             :         }
     927             : 
     928           8 :         elog(DEBUG3, "purged catalog modifying transactions from %u to %u, xmin: %u, xmax: %u",
     929             :              (uint32) builder->catchange.xcnt, (uint32) surviving_xids,
     930             :              builder->xmin, builder->xmax);
     931           8 :         builder->catchange.xcnt = surviving_xids;
     932             :     }
     933             : }
     934             : 
     935             : /*
     936             :  * Handle everything that needs to be done when a transaction commits
     937             :  */
     938             : void
     939        6216 : SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
     940             :                    int nsubxacts, TransactionId *subxacts, uint32 xinfo)
     941             : {
     942             :     int         nxact;
     943             : 
     944        6216 :     bool        needs_snapshot = false;
     945        6216 :     bool        needs_timetravel = false;
     946        6216 :     bool        sub_needs_timetravel = false;
     947             : 
     948        6216 :     TransactionId xmax = xid;
     949             : 
     950             :     /*
     951             :      * Transactions preceding BUILDING_SNAPSHOT will neither be decoded, nor
     952             :      * will they be part of a snapshot.  So we don't need to record anything.
     953             :      */
     954        6216 :     if (builder->state == SNAPBUILD_START ||
     955        6216 :         (builder->state == SNAPBUILD_BUILDING_SNAPSHOT &&
     956           0 :          TransactionIdPrecedes(xid, builder->next_phase_at)))
     957             :     {
     958             :         /* ensure that only commits after this are getting replayed */
     959           0 :         if (builder->start_decoding_at <= lsn)
     960           0 :             builder->start_decoding_at = lsn + 1;
     961           0 :         return;
     962             :     }
     963             : 
     964        6216 :     if (builder->state < SNAPBUILD_CONSISTENT)
     965             :     {
     966             :         /* ensure that only commits after this are getting replayed */
     967          10 :         if (builder->start_decoding_at <= lsn)
     968           4 :             builder->start_decoding_at = lsn + 1;
     969             : 
     970             :         /*
     971             :          * If building an exportable snapshot, force xid to be tracked, even
     972             :          * if the transaction didn't modify the catalog.
     973             :          */
     974          10 :         if (builder->building_full_snapshot)
     975             :         {
     976           0 :             needs_timetravel = true;
     977             :         }
     978             :     }
     979             : 
     980        8714 :     for (nxact = 0; nxact < nsubxacts; nxact++)
     981             :     {
     982        2498 :         TransactionId subxid = subxacts[nxact];
     983             : 
     984             :         /*
     985             :          * Add subtransaction to base snapshot if catalog modifying, we don't
     986             :          * distinguish to toplevel transactions there.
     987             :          */
     988        2498 :         if (SnapBuildXidHasCatalogChanges(builder, subxid, xinfo))
     989             :         {
     990          18 :             sub_needs_timetravel = true;
     991          18 :             needs_snapshot = true;
     992             : 
     993          18 :             elog(DEBUG1, "found subtransaction %u:%u with catalog changes",
     994             :                  xid, subxid);
     995             : 
     996          18 :             SnapBuildAddCommittedTxn(builder, subxid);
     997             : 
     998          18 :             if (NormalTransactionIdFollows(subxid, xmax))
     999          18 :                 xmax = subxid;
    1000             :         }
    1001             : 
    1002             :         /*
    1003             :          * If we're forcing timetravel we also need visibility information
    1004             :          * about subtransaction, so keep track of subtransaction's state, even
    1005             :          * if not catalog modifying.  Don't need to distribute a snapshot in
    1006             :          * that case.
    1007             :          */
    1008        2480 :         else if (needs_timetravel)
    1009             :         {
    1010           0 :             SnapBuildAddCommittedTxn(builder, subxid);
    1011           0 :             if (NormalTransactionIdFollows(subxid, xmax))
    1012           0 :                 xmax = subxid;
    1013             :         }
    1014             :     }
    1015             : 
    1016             :     /* if top-level modified catalog, it'll need a snapshot */
    1017        6216 :     if (SnapBuildXidHasCatalogChanges(builder, xid, xinfo))
    1018             :     {
    1019        2392 :         elog(DEBUG2, "found top level transaction %u, with catalog changes",
    1020             :              xid);
    1021        2392 :         needs_snapshot = true;
    1022        2392 :         needs_timetravel = true;
    1023        2392 :         SnapBuildAddCommittedTxn(builder, xid);
    1024             :     }
    1025        3824 :     else if (sub_needs_timetravel)
    1026             :     {
    1027             :         /* track toplevel txn as well, subxact alone isn't meaningful */
    1028           2 :         elog(DEBUG2, "forced transaction %u to do timetravel due to one of its subtransactions",
    1029             :              xid);
    1030           2 :         needs_timetravel = true;
    1031           2 :         SnapBuildAddCommittedTxn(builder, xid);
    1032             :     }
    1033        3822 :     else if (needs_timetravel)
    1034             :     {
    1035           0 :         elog(DEBUG2, "forced transaction %u to do timetravel", xid);
    1036             : 
    1037           0 :         SnapBuildAddCommittedTxn(builder, xid);
    1038             :     }
    1039             : 
    1040        6216 :     if (!needs_timetravel)
    1041             :     {
    1042             :         /* record that we cannot export a general snapshot anymore */
    1043        3822 :         builder->committed.includes_all_transactions = false;
    1044             :     }
    1045             : 
    1046             :     Assert(!needs_snapshot || needs_timetravel);
    1047             : 
    1048             :     /*
    1049             :      * Adjust xmax of the snapshot builder, we only do that for committed,
    1050             :      * catalog modifying, transactions, everything else isn't interesting for
    1051             :      * us since we'll never look at the respective rows.
    1052             :      */
    1053        6216 :     if (needs_timetravel &&
    1054        4788 :         (!TransactionIdIsValid(builder->xmax) ||
    1055        2394 :          TransactionIdFollowsOrEquals(xmax, builder->xmax)))
    1056             :     {
    1057        2392 :         builder->xmax = xmax;
    1058        2392 :         TransactionIdAdvance(builder->xmax);
    1059             :     }
    1060             : 
    1061             :     /* if there's any reason to build a historic snapshot, do so now */
    1062        6216 :     if (needs_snapshot)
    1063             :     {
    1064             :         /*
    1065             :          * If we haven't built a complete snapshot yet there's no need to hand
    1066             :          * it out, it wouldn't (and couldn't) be used anyway.
    1067             :          */
    1068        2394 :         if (builder->state < SNAPBUILD_FULL_SNAPSHOT)
    1069           0 :             return;
    1070             : 
    1071             :         /*
    1072             :          * Decrease the snapshot builder's refcount of the old snapshot, note
    1073             :          * that it still will be used if it has been handed out to the
    1074             :          * reorderbuffer earlier.
    1075             :          */
    1076        2394 :         if (builder->snapshot)
    1077        2394 :             SnapBuildSnapDecRefcount(builder->snapshot);
    1078             : 
    1079        2394 :         builder->snapshot = SnapBuildBuildSnapshot(builder);
    1080             : 
    1081             :         /* we might need to execute invalidations, add snapshot */
    1082        2394 :         if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
    1083             :         {
    1084          16 :             SnapBuildSnapIncRefcount(builder->snapshot);
    1085          16 :             ReorderBufferSetBaseSnapshot(builder->reorder, xid, lsn,
    1086             :                                          builder->snapshot);
    1087             :         }
    1088             : 
    1089             :         /* refcount of the snapshot builder for the new snapshot */
    1090        2394 :         SnapBuildSnapIncRefcount(builder->snapshot);
    1091             : 
    1092             :         /*
    1093             :          * Add a new catalog snapshot and invalidations messages to all
    1094             :          * currently running transactions.
    1095             :          */
    1096        2394 :         SnapBuildDistributeSnapshotAndInval(builder, lsn, xid);
    1097             :     }
    1098             : }
    1099             : 
    1100             : /*
    1101             :  * Check the reorder buffer and the snapshot to see if the given transaction has
    1102             :  * modified catalogs.
    1103             :  */
    1104             : static inline bool
    1105        8714 : SnapBuildXidHasCatalogChanges(SnapBuild *builder, TransactionId xid,
    1106             :                               uint32 xinfo)
    1107             : {
    1108        8714 :     if (ReorderBufferXidHasCatalogChanges(builder->reorder, xid))
    1109        2402 :         return true;
    1110             : 
    1111             :     /*
    1112             :      * The transactions that have changed catalogs must have invalidation
    1113             :      * info.
    1114             :      */
    1115        6312 :     if (!(xinfo & XACT_XINFO_HAS_INVALS))
    1116        6296 :         return false;
    1117             : 
    1118             :     /* Check the catchange XID array */
    1119          24 :     return ((builder->catchange.xcnt > 0) &&
    1120           8 :             (bsearch(&xid, builder->catchange.xip, builder->catchange.xcnt,
    1121             :                      sizeof(TransactionId), xidComparator) != NULL));
    1122             : }
    1123             : 
    1124             : /* -----------------------------------
    1125             :  * Snapshot building functions dealing with xlog records
    1126             :  * -----------------------------------
    1127             :  */
    1128             : 
    1129             : /*
    1130             :  * Process a running xacts record, and use its information to first build a
    1131             :  * historic snapshot and later to release resources that aren't needed
    1132             :  * anymore.
    1133             :  */
    1134             : void
    1135        2764 : SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
    1136             : {
    1137             :     ReorderBufferTXN *txn;
    1138             :     TransactionId xmin;
    1139             : 
    1140             :     /*
    1141             :      * If we're not consistent yet, inspect the record to see whether it
    1142             :      * allows to get closer to being consistent. If we are consistent, dump
    1143             :      * our snapshot so others or we, after a restart, can use it.
    1144             :      */
    1145        2764 :     if (builder->state < SNAPBUILD_CONSISTENT)
    1146             :     {
    1147             :         /* returns false if there's no point in performing cleanup just yet */
    1148        1982 :         if (!SnapBuildFindSnapshot(builder, lsn, running))
    1149        1932 :             return;
    1150             :     }
    1151             :     else
    1152         782 :         SnapBuildSerialize(builder, lsn);
    1153             : 
    1154             :     /*
    1155             :      * Update range of interesting xids based on the running xacts
    1156             :      * information. We don't increase ->xmax using it, because once we are in
    1157             :      * a consistent state we can do that ourselves and much more efficiently
    1158             :      * so, because we only need to do it for catalog transactions since we
    1159             :      * only ever look at those.
    1160             :      *
    1161             :      * NB: We only increase xmax when a catalog modifying transaction commits
    1162             :      * (see SnapBuildCommitTxn).  Because of this, xmax can be lower than
    1163             :      * xmin, which looks odd but is correct and actually more efficient, since
    1164             :      * we hit fast paths in heapam_visibility.c.
    1165             :      */
    1166         828 :     builder->xmin = running->oldestRunningXid;
    1167             : 
    1168             :     /* Remove transactions we don't need to keep track off anymore */
    1169         828 :     SnapBuildPurgeOlderTxn(builder);
    1170             : 
    1171             :     /*
    1172             :      * Advance the xmin limit for the current replication slot, to allow
    1173             :      * vacuum to clean up the tuples this slot has been protecting.
    1174             :      *
    1175             :      * The reorderbuffer might have an xmin among the currently running
    1176             :      * snapshots; use it if so.  If not, we need only consider the snapshots
    1177             :      * we'll produce later, which can't be less than the oldest running xid in
    1178             :      * the record we're reading now.
    1179             :      */
    1180         828 :     xmin = ReorderBufferGetOldestXmin(builder->reorder);
    1181         828 :     if (xmin == InvalidTransactionId)
    1182         734 :         xmin = running->oldestRunningXid;
    1183         828 :     elog(DEBUG3, "xmin: %u, xmax: %u, oldest running: %u, oldest xmin: %u",
    1184             :          builder->xmin, builder->xmax, running->oldestRunningXid, xmin);
    1185         828 :     LogicalIncreaseXminForSlot(lsn, xmin);
    1186             : 
    1187             :     /*
    1188             :      * Also tell the slot where we can restart decoding from. We don't want to
    1189             :      * do that after every commit because changing that implies an fsync of
    1190             :      * the logical slot's state file, so we only do it every time we see a
    1191             :      * running xacts record.
    1192             :      *
    1193             :      * Do so by looking for the oldest in progress transaction (determined by
    1194             :      * the first LSN of any of its relevant records). Every transaction
    1195             :      * remembers the last location we stored the snapshot to disk before its
    1196             :      * beginning. That point is where we can restart from.
    1197             :      */
    1198             : 
    1199             :     /*
    1200             :      * Can't know about a serialized snapshot's location if we're not
    1201             :      * consistent.
    1202             :      */
    1203         828 :     if (builder->state < SNAPBUILD_CONSISTENT)
    1204          36 :         return;
    1205             : 
    1206         792 :     txn = ReorderBufferGetOldestTXN(builder->reorder);
    1207             : 
    1208             :     /*
    1209             :      * oldest ongoing txn might have started when we didn't yet serialize
    1210             :      * anything because we hadn't reached a consistent state yet.
    1211             :      */
    1212         792 :     if (txn != NULL && XLogRecPtrIsValid(txn->restart_decoding_lsn))
    1213          46 :         LogicalIncreaseRestartDecodingForSlot(lsn, txn->restart_decoding_lsn);
    1214             : 
    1215             :     /*
    1216             :      * No in-progress transaction, can reuse the last serialized snapshot if
    1217             :      * we have one.
    1218             :      */
    1219         746 :     else if (txn == NULL &&
    1220         680 :              XLogRecPtrIsValid(builder->reorder->current_restart_decoding_lsn) &&
    1221         676 :              XLogRecPtrIsValid(builder->last_serialized_snapshot))
    1222         676 :         LogicalIncreaseRestartDecodingForSlot(lsn,
    1223             :                                               builder->last_serialized_snapshot);
    1224             : }
    1225             : 
    1226             : 
    1227             : /*
    1228             :  * Build the start of a snapshot that's capable of decoding the catalog.
    1229             :  *
    1230             :  * Helper function for SnapBuildProcessRunningXacts() while we're not yet
    1231             :  * consistent.
    1232             :  *
    1233             :  * Returns true if there is a point in performing internal maintenance/cleanup
    1234             :  * using the xl_running_xacts record.
    1235             :  */
    1236             : static bool
    1237        1982 : SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
    1238             : {
    1239             :     /* ---
    1240             :      * Build catalog decoding snapshot incrementally using information about
    1241             :      * the currently running transactions. There are several ways to do that:
    1242             :      *
    1243             :      * a) There were no running transactions when the xl_running_xacts record
    1244             :      *    was inserted, jump to CONSISTENT immediately. We might find such a
    1245             :      *    state while waiting on c)'s sub-states.
    1246             :      *
    1247             :      * b) This (in a previous run) or another decoding slot serialized a
    1248             :      *    snapshot to disk that we can use. Can't use this method while finding
    1249             :      *    the start point for decoding changes as the restart LSN would be an
    1250             :      *    arbitrary LSN but we need to find the start point to extract changes
    1251             :      *    where we won't see the data for partial transactions. Also, we cannot
    1252             :      *    use this method when a slot needs a full snapshot for export or direct
    1253             :      *    use, as that snapshot will only contain catalog modifying transactions.
    1254             :      *
    1255             :      * c) First incrementally build a snapshot for catalog tuples
    1256             :      *    (BUILDING_SNAPSHOT), that requires all, already in-progress,
    1257             :      *    transactions to finish.  Every transaction starting after that
    1258             :      *    (FULL_SNAPSHOT state), has enough information to be decoded.  But
    1259             :      *    for older running transactions no viable snapshot exists yet, so
    1260             :      *    CONSISTENT will only be reached once all of those have finished.
    1261             :      * ---
    1262             :      */
    1263             : 
    1264             :     /*
    1265             :      * xl_running_xacts record is older than what we can use, we might not
    1266             :      * have all necessary catalog rows anymore.
    1267             :      */
    1268        1982 :     if (TransactionIdIsNormal(builder->initial_xmin_horizon) &&
    1269         926 :         NormalTransactionIdPrecedes(running->oldestRunningXid,
    1270             :                                     builder->initial_xmin_horizon))
    1271             :     {
    1272           0 :         ereport(DEBUG1,
    1273             :                 errmsg_internal("skipping snapshot at %X/%08X while building logical decoding snapshot, xmin horizon too low",
    1274             :                                 LSN_FORMAT_ARGS(lsn)),
    1275             :                 errdetail_internal("initial xmin horizon of %u vs the snapshot's %u",
    1276             :                                    builder->initial_xmin_horizon, running->oldestRunningXid));
    1277             : 
    1278             : 
    1279           0 :         SnapBuildWaitSnapshot(running, builder->initial_xmin_horizon);
    1280             : 
    1281           0 :         return true;
    1282             :     }
    1283             : 
    1284             :     /*
    1285             :      * a) No transaction were running, we can jump to consistent.
    1286             :      *
    1287             :      * This is not affected by races around xl_running_xacts, because we can
    1288             :      * miss transaction commits, but currently not transactions starting.
    1289             :      *
    1290             :      * NB: We might have already started to incrementally assemble a snapshot,
    1291             :      * so we need to be careful to deal with that.
    1292             :      */
    1293        1982 :     if (running->oldestRunningXid == running->nextXid)
    1294             :     {
    1295        1916 :         if (!XLogRecPtrIsValid(builder->start_decoding_at) ||
    1296        1008 :             builder->start_decoding_at <= lsn)
    1297             :             /* can decode everything after this */
    1298         910 :             builder->start_decoding_at = lsn + 1;
    1299             : 
    1300             :         /* As no transactions were running xmin/xmax can be trivially set. */
    1301        1916 :         builder->xmin = running->nextXid; /* < are finished */
    1302        1916 :         builder->xmax = running->nextXid; /* >= are running */
    1303             : 
    1304             :         /* so we can safely use the faster comparisons */
    1305             :         Assert(TransactionIdIsNormal(builder->xmin));
    1306             :         Assert(TransactionIdIsNormal(builder->xmax));
    1307             : 
    1308        1916 :         builder->state = SNAPBUILD_CONSISTENT;
    1309        1916 :         builder->next_phase_at = InvalidTransactionId;
    1310             : 
    1311        1916 :         ereport(LOG,
    1312             :                 errmsg("logical decoding found consistent point at %X/%08X",
    1313             :                        LSN_FORMAT_ARGS(lsn)),
    1314             :                 errdetail("There are no running transactions."));
    1315             : 
    1316        1916 :         return false;
    1317             :     }
    1318             : 
    1319             :     /*
    1320             :      * b) valid on disk state and while neither building full snapshot nor
    1321             :      * creating a slot.
    1322             :      */
    1323          66 :     else if (!builder->building_full_snapshot &&
    1324         100 :              !builder->in_slot_creation &&
    1325          38 :              SnapBuildRestore(builder, lsn))
    1326             :     {
    1327             :         /* there won't be any state to cleanup */
    1328          16 :         return false;
    1329             :     }
    1330             : 
    1331             :     /*
    1332             :      * c) transition from START to BUILDING_SNAPSHOT.
    1333             :      *
    1334             :      * In START state, and a xl_running_xacts record with running xacts is
    1335             :      * encountered.  In that case, switch to BUILDING_SNAPSHOT state, and
    1336             :      * record xl_running_xacts->nextXid.  Once all running xacts have finished
    1337             :      * (i.e. they're all >= nextXid), we have a complete catalog snapshot.  It
    1338             :      * might look that we could use xl_running_xacts's ->xids information to
    1339             :      * get there quicker, but that is problematic because transactions marked
    1340             :      * as running, might already have inserted their commit record - it's
    1341             :      * infeasible to change that with locking.
    1342             :      */
    1343          50 :     else if (builder->state == SNAPBUILD_START)
    1344             :     {
    1345          28 :         builder->state = SNAPBUILD_BUILDING_SNAPSHOT;
    1346          28 :         builder->next_phase_at = running->nextXid;
    1347             : 
    1348             :         /*
    1349             :          * Start with an xmin/xmax that's correct for future, when all the
    1350             :          * currently running transactions have finished. We'll update both
    1351             :          * while waiting for the pending transactions to finish.
    1352             :          */
    1353          28 :         builder->xmin = running->nextXid; /* < are finished */
    1354          28 :         builder->xmax = running->nextXid; /* >= are running */
    1355             : 
    1356             :         /* so we can safely use the faster comparisons */
    1357             :         Assert(TransactionIdIsNormal(builder->xmin));
    1358             :         Assert(TransactionIdIsNormal(builder->xmax));
    1359             : 
    1360          28 :         ereport(LOG,
    1361             :                 errmsg("logical decoding found initial starting point at %X/%08X",
    1362             :                        LSN_FORMAT_ARGS(lsn)),
    1363             :                 errdetail("Waiting for transactions (approximately %d) older than %u to end.",
    1364             :                           running->xcnt, running->nextXid));
    1365             : 
    1366          28 :         SnapBuildWaitSnapshot(running, running->nextXid);
    1367             :     }
    1368             : 
    1369             :     /*
    1370             :      * c) transition from BUILDING_SNAPSHOT to FULL_SNAPSHOT.
    1371             :      *
    1372             :      * In BUILDING_SNAPSHOT state, and this xl_running_xacts' oldestRunningXid
    1373             :      * is >= than nextXid from when we switched to BUILDING_SNAPSHOT.  This
    1374             :      * means all transactions starting afterwards have enough information to
    1375             :      * be decoded.  Switch to FULL_SNAPSHOT.
    1376             :      */
    1377          34 :     else if (builder->state == SNAPBUILD_BUILDING_SNAPSHOT &&
    1378          12 :              TransactionIdPrecedesOrEquals(builder->next_phase_at,
    1379             :                                            running->oldestRunningXid))
    1380             :     {
    1381          10 :         builder->state = SNAPBUILD_FULL_SNAPSHOT;
    1382          10 :         builder->next_phase_at = running->nextXid;
    1383             : 
    1384          10 :         ereport(LOG,
    1385             :                 errmsg("logical decoding found initial consistent point at %X/%08X",
    1386             :                        LSN_FORMAT_ARGS(lsn)),
    1387             :                 errdetail("Waiting for transactions (approximately %d) older than %u to end.",
    1388             :                           running->xcnt, running->nextXid));
    1389             : 
    1390          10 :         SnapBuildWaitSnapshot(running, running->nextXid);
    1391             :     }
    1392             : 
    1393             :     /*
    1394             :      * c) transition from FULL_SNAPSHOT to CONSISTENT.
    1395             :      *
    1396             :      * In FULL_SNAPSHOT state, and this xl_running_xacts' oldestRunningXid is
    1397             :      * >= than nextXid from when we switched to FULL_SNAPSHOT.  This means all
    1398             :      * transactions that are currently in progress have a catalog snapshot,
    1399             :      * and all their changes have been collected.  Switch to CONSISTENT.
    1400             :      */
    1401          22 :     else if (builder->state == SNAPBUILD_FULL_SNAPSHOT &&
    1402          10 :              TransactionIdPrecedesOrEquals(builder->next_phase_at,
    1403             :                                            running->oldestRunningXid))
    1404             :     {
    1405          10 :         builder->state = SNAPBUILD_CONSISTENT;
    1406          10 :         builder->next_phase_at = InvalidTransactionId;
    1407             : 
    1408          10 :         ereport(LOG,
    1409             :                 errmsg("logical decoding found consistent point at %X/%08X",
    1410             :                        LSN_FORMAT_ARGS(lsn)),
    1411             :                 errdetail("There are no old transactions anymore."));
    1412             :     }
    1413             : 
    1414             :     /*
    1415             :      * We already started to track running xacts and need to wait for all
    1416             :      * in-progress ones to finish. We fall through to the normal processing of
    1417             :      * records so incremental cleanup can be performed.
    1418             :      */
    1419          46 :     return true;
    1420             : }
    1421             : 
    1422             : /* ---
    1423             :  * Iterate through xids in record, wait for all older than the cutoff to
    1424             :  * finish.  Then, if possible, log a new xl_running_xacts record.
    1425             :  *
    1426             :  * This isn't required for the correctness of decoding, but to:
    1427             :  * a) allow isolationtester to notice that we're currently waiting for
    1428             :  *    something.
    1429             :  * b) log a new xl_running_xacts record where it'd be helpful, without having
    1430             :  *    to wait for bgwriter or checkpointer.
    1431             :  * ---
    1432             :  */
    1433             : static void
    1434          38 : SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff)
    1435             : {
    1436             :     int         off;
    1437             : 
    1438          72 :     for (off = 0; off < running->xcnt; off++)
    1439             :     {
    1440          38 :         TransactionId xid = running->xids[off];
    1441             : 
    1442             :         /*
    1443             :          * Upper layers should prevent that we ever need to wait on ourselves.
    1444             :          * Check anyway, since failing to do so would either result in an
    1445             :          * endless wait or an Assert() failure.
    1446             :          */
    1447          38 :         if (TransactionIdIsCurrentTransactionId(xid))
    1448           0 :             elog(ERROR, "waiting for ourselves");
    1449             : 
    1450          38 :         if (TransactionIdFollows(xid, cutoff))
    1451           0 :             continue;
    1452             : 
    1453          38 :         XactLockTableWait(xid, NULL, NULL, XLTW_None);
    1454             :     }
    1455             : 
    1456             :     /*
    1457             :      * All transactions we needed to finish finished - try to ensure there is
    1458             :      * another xl_running_xacts record in a timely manner, without having to
    1459             :      * wait for bgwriter or checkpointer to log one.  During recovery we can't
    1460             :      * enforce that, so we'll have to wait.
    1461             :      */
    1462          34 :     if (!RecoveryInProgress())
    1463             :     {
    1464          34 :         LogStandbySnapshot();
    1465             :     }
    1466          34 : }
    1467             : 
    1468             : #define SnapBuildOnDiskConstantSize \
    1469             :     offsetof(SnapBuildOnDisk, builder)
    1470             : #define SnapBuildOnDiskNotChecksummedSize \
    1471             :     offsetof(SnapBuildOnDisk, version)
    1472             : 
    1473             : #define SNAPBUILD_MAGIC 0x51A1E001
    1474             : #define SNAPBUILD_VERSION 6
    1475             : 
    1476             : /*
    1477             :  * Store/Load a snapshot from disk, depending on the snapshot builder's state.
    1478             :  *
    1479             :  * Supposed to be used by external (i.e. not snapbuild.c) code that just read
    1480             :  * a record that's a potential location for a serialized snapshot.
    1481             :  */
    1482             : void
    1483         104 : SnapBuildSerializationPoint(SnapBuild *builder, XLogRecPtr lsn)
    1484             : {
    1485         104 :     if (builder->state < SNAPBUILD_CONSISTENT)
    1486           0 :         SnapBuildRestore(builder, lsn);
    1487             :     else
    1488         104 :         SnapBuildSerialize(builder, lsn);
    1489         104 : }
    1490             : 
    1491             : /*
    1492             :  * Serialize the snapshot 'builder' at the location 'lsn' if it hasn't already
    1493             :  * been done by another decoding process.
    1494             :  */
    1495             : static void
    1496         886 : SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
    1497             : {
    1498             :     Size        needed_length;
    1499         886 :     SnapBuildOnDisk *ondisk = NULL;
    1500         886 :     TransactionId *catchange_xip = NULL;
    1501             :     MemoryContext old_ctx;
    1502             :     size_t      catchange_xcnt;
    1503             :     char       *ondisk_c;
    1504             :     int         fd;
    1505             :     char        tmppath[MAXPGPATH];
    1506             :     char        path[MAXPGPATH];
    1507             :     int         ret;
    1508             :     struct stat stat_buf;
    1509             :     Size        sz;
    1510             : 
    1511             :     Assert(XLogRecPtrIsValid(lsn));
    1512             :     Assert(!XLogRecPtrIsValid(builder->last_serialized_snapshot) ||
    1513             :            builder->last_serialized_snapshot <= lsn);
    1514             : 
    1515             :     /*
    1516             :      * no point in serializing if we cannot continue to work immediately after
    1517             :      * restoring the snapshot
    1518             :      */
    1519         886 :     if (builder->state < SNAPBUILD_CONSISTENT)
    1520           0 :         return;
    1521             : 
    1522             :     /* consistent snapshots have no next phase */
    1523             :     Assert(builder->next_phase_at == InvalidTransactionId);
    1524             : 
    1525             :     /*
    1526             :      * We identify snapshots by the LSN they are valid for. We don't need to
    1527             :      * include timelines in the name as each LSN maps to exactly one timeline
    1528             :      * unless the user used pg_resetwal or similar. If a user did so, there's
    1529             :      * no hope continuing to decode anyway.
    1530             :      */
    1531         886 :     sprintf(path, "%s/%X-%X.snap",
    1532             :             PG_LOGICAL_SNAPSHOTS_DIR,
    1533         886 :             LSN_FORMAT_ARGS(lsn));
    1534             : 
    1535             :     /*
    1536             :      * first check whether some other backend already has written the snapshot
    1537             :      * for this LSN. It's perfectly fine if there's none, so we accept ENOENT
    1538             :      * as a valid state. Everything else is an unexpected error.
    1539             :      */
    1540         886 :     ret = stat(path, &stat_buf);
    1541             : 
    1542         886 :     if (ret != 0 && errno != ENOENT)
    1543           0 :         ereport(ERROR,
    1544             :                 (errcode_for_file_access(),
    1545             :                  errmsg("could not stat file \"%s\": %m", path)));
    1546             : 
    1547         886 :     else if (ret == 0)
    1548             :     {
    1549             :         /*
    1550             :          * somebody else has already serialized to this point, don't overwrite
    1551             :          * but remember location, so we don't need to read old data again.
    1552             :          *
    1553             :          * To be sure it has been synced to disk after the rename() from the
    1554             :          * tempfile filename to the real filename, we just repeat the fsync.
    1555             :          * That ought to be cheap because in most scenarios it should already
    1556             :          * be safely on disk.
    1557             :          */
    1558         266 :         fsync_fname(path, false);
    1559         266 :         fsync_fname(PG_LOGICAL_SNAPSHOTS_DIR, true);
    1560             : 
    1561         266 :         builder->last_serialized_snapshot = lsn;
    1562         266 :         goto out;
    1563             :     }
    1564             : 
    1565             :     /*
    1566             :      * there is an obvious race condition here between the time we stat(2) the
    1567             :      * file and us writing the file. But we rename the file into place
    1568             :      * atomically and all files created need to contain the same data anyway,
    1569             :      * so this is perfectly fine, although a bit of a resource waste. Locking
    1570             :      * seems like pointless complication.
    1571             :      */
    1572         620 :     elog(DEBUG1, "serializing snapshot to %s", path);
    1573             : 
    1574             :     /* to make sure only we will write to this tempfile, include pid */
    1575         620 :     sprintf(tmppath, "%s/%X-%X.snap.%d.tmp",
    1576             :             PG_LOGICAL_SNAPSHOTS_DIR,
    1577         620 :             LSN_FORMAT_ARGS(lsn), MyProcPid);
    1578             : 
    1579             :     /*
    1580             :      * Unlink temporary file if it already exists, needs to have been before a
    1581             :      * crash/error since we won't enter this function twice from within a
    1582             :      * single decoding slot/backend and the temporary file contains the pid of
    1583             :      * the current process.
    1584             :      */
    1585         620 :     if (unlink(tmppath) != 0 && errno != ENOENT)
    1586           0 :         ereport(ERROR,
    1587             :                 (errcode_for_file_access(),
    1588             :                  errmsg("could not remove file \"%s\": %m", tmppath)));
    1589             : 
    1590         620 :     old_ctx = MemoryContextSwitchTo(builder->context);
    1591             : 
    1592             :     /* Get the catalog modifying transactions that are yet not committed */
    1593         620 :     catchange_xip = ReorderBufferGetCatalogChangesXacts(builder->reorder);
    1594         620 :     catchange_xcnt = dclist_count(&builder->reorder->catchange_txns);
    1595             : 
    1596         620 :     needed_length = sizeof(SnapBuildOnDisk) +
    1597         620 :         sizeof(TransactionId) * (builder->committed.xcnt + catchange_xcnt);
    1598             : 
    1599         620 :     ondisk_c = palloc0(needed_length);
    1600         620 :     ondisk = (SnapBuildOnDisk *) ondisk_c;
    1601         620 :     ondisk->magic = SNAPBUILD_MAGIC;
    1602         620 :     ondisk->version = SNAPBUILD_VERSION;
    1603         620 :     ondisk->length = needed_length;
    1604         620 :     INIT_CRC32C(ondisk->checksum);
    1605         620 :     COMP_CRC32C(ondisk->checksum,
    1606             :                 ((char *) ondisk) + SnapBuildOnDiskNotChecksummedSize,
    1607             :                 SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
    1608         620 :     ondisk_c += sizeof(SnapBuildOnDisk);
    1609             : 
    1610         620 :     memcpy(&ondisk->builder, builder, sizeof(SnapBuild));
    1611             :     /* NULL-ify memory-only data */
    1612         620 :     ondisk->builder.context = NULL;
    1613         620 :     ondisk->builder.snapshot = NULL;
    1614         620 :     ondisk->builder.reorder = NULL;
    1615         620 :     ondisk->builder.committed.xip = NULL;
    1616         620 :     ondisk->builder.catchange.xip = NULL;
    1617             :     /* update catchange only on disk data */
    1618         620 :     ondisk->builder.catchange.xcnt = catchange_xcnt;
    1619             : 
    1620         620 :     COMP_CRC32C(ondisk->checksum,
    1621             :                 &ondisk->builder,
    1622             :                 sizeof(SnapBuild));
    1623             : 
    1624             :     /* copy committed xacts */
    1625         620 :     if (builder->committed.xcnt > 0)
    1626             :     {
    1627         112 :         sz = sizeof(TransactionId) * builder->committed.xcnt;
    1628         112 :         memcpy(ondisk_c, builder->committed.xip, sz);
    1629         112 :         COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
    1630         112 :         ondisk_c += sz;
    1631             :     }
    1632             : 
    1633             :     /* copy catalog modifying xacts */
    1634         620 :     if (catchange_xcnt > 0)
    1635             :     {
    1636          20 :         sz = sizeof(TransactionId) * catchange_xcnt;
    1637          20 :         memcpy(ondisk_c, catchange_xip, sz);
    1638          20 :         COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
    1639          20 :         ondisk_c += sz;
    1640             :     }
    1641             : 
    1642         620 :     FIN_CRC32C(ondisk->checksum);
    1643             : 
    1644             :     /* we have valid data now, open tempfile and write it there */
    1645         620 :     fd = OpenTransientFile(tmppath,
    1646             :                            O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
    1647         620 :     if (fd < 0)
    1648           0 :         ereport(ERROR,
    1649             :                 (errcode_for_file_access(),
    1650             :                  errmsg("could not open file \"%s\": %m", tmppath)));
    1651             : 
    1652         620 :     errno = 0;
    1653         620 :     pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_WRITE);
    1654         620 :     if ((write(fd, ondisk, needed_length)) != needed_length)
    1655             :     {
    1656           0 :         int         save_errno = errno;
    1657             : 
    1658           0 :         CloseTransientFile(fd);
    1659             : 
    1660             :         /* if write didn't set errno, assume problem is no disk space */
    1661           0 :         errno = save_errno ? save_errno : ENOSPC;
    1662           0 :         ereport(ERROR,
    1663             :                 (errcode_for_file_access(),
    1664             :                  errmsg("could not write to file \"%s\": %m", tmppath)));
    1665             :     }
    1666         620 :     pgstat_report_wait_end();
    1667             : 
    1668             :     /*
    1669             :      * fsync the file before renaming so that even if we crash after this we
    1670             :      * have either a fully valid file or nothing.
    1671             :      *
    1672             :      * It's safe to just ERROR on fsync() here because we'll retry the whole
    1673             :      * operation including the writes.
    1674             :      *
    1675             :      * TODO: Do the fsync() via checkpoints/restartpoints, doing it here has
    1676             :      * some noticeable overhead since it's performed synchronously during
    1677             :      * decoding?
    1678             :      */
    1679         620 :     pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_SYNC);
    1680         620 :     if (pg_fsync(fd) != 0)
    1681             :     {
    1682           0 :         int         save_errno = errno;
    1683             : 
    1684           0 :         CloseTransientFile(fd);
    1685           0 :         errno = save_errno;
    1686           0 :         ereport(ERROR,
    1687             :                 (errcode_for_file_access(),
    1688             :                  errmsg("could not fsync file \"%s\": %m", tmppath)));
    1689             :     }
    1690         620 :     pgstat_report_wait_end();
    1691             : 
    1692         620 :     if (CloseTransientFile(fd) != 0)
    1693           0 :         ereport(ERROR,
    1694             :                 (errcode_for_file_access(),
    1695             :                  errmsg("could not close file \"%s\": %m", tmppath)));
    1696             : 
    1697         620 :     fsync_fname(PG_LOGICAL_SNAPSHOTS_DIR, true);
    1698             : 
    1699             :     /*
    1700             :      * We may overwrite the work from some other backend, but that's ok, our
    1701             :      * snapshot is valid as well, we'll just have done some superfluous work.
    1702             :      */
    1703         620 :     if (rename(tmppath, path) != 0)
    1704             :     {
    1705           0 :         ereport(ERROR,
    1706             :                 (errcode_for_file_access(),
    1707             :                  errmsg("could not rename file \"%s\" to \"%s\": %m",
    1708             :                         tmppath, path)));
    1709             :     }
    1710             : 
    1711             :     /* make sure we persist */
    1712         620 :     fsync_fname(path, false);
    1713         620 :     fsync_fname(PG_LOGICAL_SNAPSHOTS_DIR, true);
    1714             : 
    1715             :     /*
    1716             :      * Now there's no way we can lose the dumped state anymore, remember this
    1717             :      * as a serialization point.
    1718             :      */
    1719         620 :     builder->last_serialized_snapshot = lsn;
    1720             : 
    1721         620 :     MemoryContextSwitchTo(old_ctx);
    1722             : 
    1723         886 : out:
    1724         886 :     ReorderBufferSetRestartPoint(builder->reorder,
    1725             :                                  builder->last_serialized_snapshot);
    1726             :     /* be tidy */
    1727         886 :     if (ondisk)
    1728         620 :         pfree(ondisk);
    1729         886 :     if (catchange_xip)
    1730          20 :         pfree(catchange_xip);
    1731             : }
    1732             : 
    1733             : /*
    1734             :  * Restore the logical snapshot file contents to 'ondisk'.
    1735             :  *
    1736             :  * 'context' is the memory context where the catalog modifying/committed xid
    1737             :  * will live.
    1738             :  * If 'missing_ok' is true, will not throw an error if the file is not found.
    1739             :  */
    1740             : bool
    1741          42 : SnapBuildRestoreSnapshot(SnapBuildOnDisk *ondisk, XLogRecPtr lsn,
    1742             :                          MemoryContext context, bool missing_ok)
    1743             : {
    1744             :     int         fd;
    1745             :     pg_crc32c   checksum;
    1746             :     Size        sz;
    1747             :     char        path[MAXPGPATH];
    1748             : 
    1749          42 :     sprintf(path, "%s/%X-%X.snap",
    1750             :             PG_LOGICAL_SNAPSHOTS_DIR,
    1751          42 :             LSN_FORMAT_ARGS(lsn));
    1752             : 
    1753          42 :     fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
    1754             : 
    1755          42 :     if (fd < 0)
    1756             :     {
    1757          22 :         if (missing_ok && errno == ENOENT)
    1758          22 :             return false;
    1759             : 
    1760           0 :         ereport(ERROR,
    1761             :                 (errcode_for_file_access(),
    1762             :                  errmsg("could not open file \"%s\": %m", path)));
    1763             :     }
    1764             : 
    1765             :     /* ----
    1766             :      * Make sure the snapshot had been stored safely to disk, that's normally
    1767             :      * cheap.
    1768             :      * Note that we do not need PANIC here, nobody will be able to use the
    1769             :      * slot without fsyncing, and saving it won't succeed without an fsync()
    1770             :      * either...
    1771             :      * ----
    1772             :      */
    1773          20 :     fsync_fname(path, false);
    1774          20 :     fsync_fname(PG_LOGICAL_SNAPSHOTS_DIR, true);
    1775             : 
    1776             :     /* read statically sized portion of snapshot */
    1777          20 :     SnapBuildRestoreContents(fd, ondisk, SnapBuildOnDiskConstantSize, path);
    1778             : 
    1779          20 :     if (ondisk->magic != SNAPBUILD_MAGIC)
    1780           0 :         ereport(ERROR,
    1781             :                 (errcode(ERRCODE_DATA_CORRUPTED),
    1782             :                  errmsg("snapbuild state file \"%s\" has wrong magic number: %u instead of %u",
    1783             :                         path, ondisk->magic, SNAPBUILD_MAGIC)));
    1784             : 
    1785          20 :     if (ondisk->version != SNAPBUILD_VERSION)
    1786           0 :         ereport(ERROR,
    1787             :                 (errcode(ERRCODE_DATA_CORRUPTED),
    1788             :                  errmsg("snapbuild state file \"%s\" has unsupported version: %u instead of %u",
    1789             :                         path, ondisk->version, SNAPBUILD_VERSION)));
    1790             : 
    1791          20 :     INIT_CRC32C(checksum);
    1792          20 :     COMP_CRC32C(checksum,
    1793             :                 ((char *) ondisk) + SnapBuildOnDiskNotChecksummedSize,
    1794             :                 SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
    1795             : 
    1796             :     /* read SnapBuild */
    1797          20 :     SnapBuildRestoreContents(fd, &ondisk->builder, sizeof(SnapBuild), path);
    1798          20 :     COMP_CRC32C(checksum, &ondisk->builder, sizeof(SnapBuild));
    1799             : 
    1800             :     /* restore committed xacts information */
    1801          20 :     if (ondisk->builder.committed.xcnt > 0)
    1802             :     {
    1803           8 :         sz = sizeof(TransactionId) * ondisk->builder.committed.xcnt;
    1804           8 :         ondisk->builder.committed.xip = MemoryContextAllocZero(context, sz);
    1805           8 :         SnapBuildRestoreContents(fd, ondisk->builder.committed.xip, sz, path);
    1806           8 :         COMP_CRC32C(checksum, ondisk->builder.committed.xip, sz);
    1807             :     }
    1808             : 
    1809             :     /* restore catalog modifying xacts information */
    1810          20 :     if (ondisk->builder.catchange.xcnt > 0)
    1811             :     {
    1812          10 :         sz = sizeof(TransactionId) * ondisk->builder.catchange.xcnt;
    1813          10 :         ondisk->builder.catchange.xip = MemoryContextAllocZero(context, sz);
    1814          10 :         SnapBuildRestoreContents(fd, ondisk->builder.catchange.xip, sz, path);
    1815          10 :         COMP_CRC32C(checksum, ondisk->builder.catchange.xip, sz);
    1816             :     }
    1817             : 
    1818          20 :     if (CloseTransientFile(fd) != 0)
    1819           0 :         ereport(ERROR,
    1820             :                 (errcode_for_file_access(),
    1821             :                  errmsg("could not close file \"%s\": %m", path)));
    1822             : 
    1823          20 :     FIN_CRC32C(checksum);
    1824             : 
    1825             :     /* verify checksum of what we've read */
    1826          20 :     if (!EQ_CRC32C(checksum, ondisk->checksum))
    1827           0 :         ereport(ERROR,
    1828             :                 (errcode(ERRCODE_DATA_CORRUPTED),
    1829             :                  errmsg("checksum mismatch for snapbuild state file \"%s\": is %u, should be %u",
    1830             :                         path, checksum, ondisk->checksum)));
    1831             : 
    1832          20 :     return true;
    1833             : }
    1834             : 
    1835             : /*
    1836             :  * Restore a snapshot into 'builder' if previously one has been stored at the
    1837             :  * location indicated by 'lsn'. Returns true if successful, false otherwise.
    1838             :  */
    1839             : static bool
    1840          38 : SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
    1841             : {
    1842             :     SnapBuildOnDisk ondisk;
    1843             : 
    1844             :     /* no point in loading a snapshot if we're already there */
    1845          38 :     if (builder->state == SNAPBUILD_CONSISTENT)
    1846           0 :         return false;
    1847             : 
    1848             :     /* validate and restore the snapshot to 'ondisk' */
    1849          38 :     if (!SnapBuildRestoreSnapshot(&ondisk, lsn, builder->context, true))
    1850          22 :         return false;
    1851             : 
    1852             :     /*
    1853             :      * ok, we now have a sensible snapshot here, figure out if it has more
    1854             :      * information than we have.
    1855             :      */
    1856             : 
    1857             :     /*
    1858             :      * We are only interested in consistent snapshots for now, comparing
    1859             :      * whether one incomplete snapshot is more "advanced" seems to be
    1860             :      * unnecessarily complex.
    1861             :      */
    1862          16 :     if (ondisk.builder.state < SNAPBUILD_CONSISTENT)
    1863           0 :         goto snapshot_not_interesting;
    1864             : 
    1865             :     /*
    1866             :      * Don't use a snapshot that requires an xmin that we cannot guarantee to
    1867             :      * be available.
    1868             :      */
    1869          16 :     if (TransactionIdPrecedes(ondisk.builder.xmin, builder->initial_xmin_horizon))
    1870           0 :         goto snapshot_not_interesting;
    1871             : 
    1872             :     /*
    1873             :      * Consistent snapshots have no next phase. Reset next_phase_at as it is
    1874             :      * possible that an old value may remain.
    1875             :      */
    1876             :     Assert(ondisk.builder.next_phase_at == InvalidTransactionId);
    1877          16 :     builder->next_phase_at = InvalidTransactionId;
    1878             : 
    1879             :     /* ok, we think the snapshot is sensible, copy over everything important */
    1880          16 :     builder->xmin = ondisk.builder.xmin;
    1881          16 :     builder->xmax = ondisk.builder.xmax;
    1882          16 :     builder->state = ondisk.builder.state;
    1883             : 
    1884          16 :     builder->committed.xcnt = ondisk.builder.committed.xcnt;
    1885             :     /* We only allocated/stored xcnt, not xcnt_space xids ! */
    1886             :     /* don't overwrite preallocated xip, if we don't have anything here */
    1887          16 :     if (builder->committed.xcnt > 0)
    1888             :     {
    1889           4 :         pfree(builder->committed.xip);
    1890           4 :         builder->committed.xcnt_space = ondisk.builder.committed.xcnt;
    1891           4 :         builder->committed.xip = ondisk.builder.committed.xip;
    1892             :     }
    1893          16 :     ondisk.builder.committed.xip = NULL;
    1894             : 
    1895             :     /* set catalog modifying transactions */
    1896          16 :     if (builder->catchange.xip)
    1897           0 :         pfree(builder->catchange.xip);
    1898          16 :     builder->catchange.xcnt = ondisk.builder.catchange.xcnt;
    1899          16 :     builder->catchange.xip = ondisk.builder.catchange.xip;
    1900          16 :     ondisk.builder.catchange.xip = NULL;
    1901             : 
    1902             :     /* our snapshot is not interesting anymore, build a new one */
    1903          16 :     if (builder->snapshot != NULL)
    1904             :     {
    1905           0 :         SnapBuildSnapDecRefcount(builder->snapshot);
    1906             :     }
    1907          16 :     builder->snapshot = SnapBuildBuildSnapshot(builder);
    1908          16 :     SnapBuildSnapIncRefcount(builder->snapshot);
    1909             : 
    1910          16 :     ReorderBufferSetRestartPoint(builder->reorder, lsn);
    1911             : 
    1912             :     Assert(builder->state == SNAPBUILD_CONSISTENT);
    1913             : 
    1914          16 :     ereport(LOG,
    1915             :             errmsg("logical decoding found consistent point at %X/%08X",
    1916             :                    LSN_FORMAT_ARGS(lsn)),
    1917             :             errdetail("Logical decoding will begin using saved snapshot."));
    1918          16 :     return true;
    1919             : 
    1920           0 : snapshot_not_interesting:
    1921           0 :     if (ondisk.builder.committed.xip != NULL)
    1922           0 :         pfree(ondisk.builder.committed.xip);
    1923           0 :     if (ondisk.builder.catchange.xip != NULL)
    1924           0 :         pfree(ondisk.builder.catchange.xip);
    1925           0 :     return false;
    1926             : }
    1927             : 
    1928             : /*
    1929             :  * Read the contents of the serialized snapshot to 'dest'.
    1930             :  */
    1931             : static void
    1932          58 : SnapBuildRestoreContents(int fd, void *dest, Size size, const char *path)
    1933             : {
    1934             :     int         readBytes;
    1935             : 
    1936          58 :     pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ);
    1937          58 :     readBytes = read(fd, dest, size);
    1938          58 :     pgstat_report_wait_end();
    1939          58 :     if (readBytes != size)
    1940             :     {
    1941           0 :         int         save_errno = errno;
    1942             : 
    1943           0 :         CloseTransientFile(fd);
    1944             : 
    1945           0 :         if (readBytes < 0)
    1946             :         {
    1947           0 :             errno = save_errno;
    1948           0 :             ereport(ERROR,
    1949             :                     (errcode_for_file_access(),
    1950             :                      errmsg("could not read file \"%s\": %m", path)));
    1951             :         }
    1952             :         else
    1953           0 :             ereport(ERROR,
    1954             :                     (errcode(ERRCODE_DATA_CORRUPTED),
    1955             :                      errmsg("could not read file \"%s\": read %d of %zu",
    1956             :                             path, readBytes, size)));
    1957             :     }
    1958          58 : }
    1959             : 
    1960             : /*
    1961             :  * Remove all serialized snapshots that are not required anymore because no
    1962             :  * slot can need them. This doesn't actually have to run during a checkpoint,
    1963             :  * but it's a convenient point to schedule this.
    1964             :  *
    1965             :  * NB: We run this during checkpoints even if logical decoding is disabled so
    1966             :  * we cleanup old slots at some point after it got disabled.
    1967             :  */
    1968             : void
    1969        3492 : CheckPointSnapBuild(void)
    1970             : {
    1971             :     XLogRecPtr  cutoff;
    1972             :     XLogRecPtr  redo;
    1973             :     DIR        *snap_dir;
    1974             :     struct dirent *snap_de;
    1975             :     char        path[MAXPGPATH + sizeof(PG_LOGICAL_SNAPSHOTS_DIR)];
    1976             : 
    1977             :     /*
    1978             :      * We start off with a minimum of the last redo pointer. No new
    1979             :      * replication slot will start before that, so that's a safe upper bound
    1980             :      * for removal.
    1981             :      */
    1982        3492 :     redo = GetRedoRecPtr();
    1983             : 
    1984             :     /* now check for the restart ptrs from existing slots */
    1985        3492 :     cutoff = ReplicationSlotsComputeLogicalRestartLSN();
    1986             : 
    1987             :     /* don't start earlier than the restart lsn */
    1988        3492 :     if (redo < cutoff)
    1989           2 :         cutoff = redo;
    1990             : 
    1991        3492 :     snap_dir = AllocateDir(PG_LOGICAL_SNAPSHOTS_DIR);
    1992       11020 :     while ((snap_de = ReadDir(snap_dir, PG_LOGICAL_SNAPSHOTS_DIR)) != NULL)
    1993             :     {
    1994             :         uint32      hi;
    1995             :         uint32      lo;
    1996             :         XLogRecPtr  lsn;
    1997             :         PGFileType  de_type;
    1998             : 
    1999        7528 :         if (strcmp(snap_de->d_name, ".") == 0 ||
    2000        4036 :             strcmp(snap_de->d_name, "..") == 0)
    2001        6984 :             continue;
    2002             : 
    2003         544 :         snprintf(path, sizeof(path), "%s/%s", PG_LOGICAL_SNAPSHOTS_DIR, snap_de->d_name);
    2004         544 :         de_type = get_dirent_type(path, snap_de, false, DEBUG1);
    2005             : 
    2006         544 :         if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_REG)
    2007             :         {
    2008           0 :             elog(DEBUG1, "only regular files expected: %s", path);
    2009           0 :             continue;
    2010             :         }
    2011             : 
    2012             :         /*
    2013             :          * temporary filenames from SnapBuildSerialize() include the LSN and
    2014             :          * everything but are postfixed by .$pid.tmp. We can just remove them
    2015             :          * the same as other files because there can be none that are
    2016             :          * currently being written that are older than cutoff.
    2017             :          *
    2018             :          * We just log a message if a file doesn't fit the pattern, it's
    2019             :          * probably some editors lock/state file or similar...
    2020             :          */
    2021         544 :         if (sscanf(snap_de->d_name, "%X-%X.snap", &hi, &lo) != 2)
    2022             :         {
    2023           0 :             ereport(LOG,
    2024             :                     (errmsg("could not parse file name \"%s\"", path)));
    2025           0 :             continue;
    2026             :         }
    2027             : 
    2028         544 :         lsn = ((uint64) hi) << 32 | lo;
    2029             : 
    2030             :         /* check whether we still need it */
    2031         544 :         if (lsn < cutoff || !XLogRecPtrIsValid(cutoff))
    2032             :         {
    2033         366 :             elog(DEBUG1, "removing snapbuild snapshot %s", path);
    2034             : 
    2035             :             /*
    2036             :              * It's not particularly harmful, though strange, if we can't
    2037             :              * remove the file here. Don't prevent the checkpoint from
    2038             :              * completing, that'd be a cure worse than the disease.
    2039             :              */
    2040         366 :             if (unlink(path) < 0)
    2041             :             {
    2042           0 :                 ereport(LOG,
    2043             :                         (errcode_for_file_access(),
    2044             :                          errmsg("could not remove file \"%s\": %m",
    2045             :                                 path)));
    2046           0 :                 continue;
    2047             :             }
    2048             :         }
    2049             :     }
    2050        3492 :     FreeDir(snap_dir);
    2051        3492 : }
    2052             : 
    2053             : /*
    2054             :  * Check if a logical snapshot at the specified point has been serialized.
    2055             :  */
    2056             : bool
    2057          24 : SnapBuildSnapshotExists(XLogRecPtr lsn)
    2058             : {
    2059             :     char        path[MAXPGPATH];
    2060             :     int         ret;
    2061             :     struct stat stat_buf;
    2062             : 
    2063          24 :     sprintf(path, "%s/%X-%X.snap",
    2064             :             PG_LOGICAL_SNAPSHOTS_DIR,
    2065          24 :             LSN_FORMAT_ARGS(lsn));
    2066             : 
    2067          24 :     ret = stat(path, &stat_buf);
    2068             : 
    2069          24 :     if (ret != 0 && errno != ENOENT)
    2070           0 :         ereport(ERROR,
    2071             :                 (errcode_for_file_access(),
    2072             :                  errmsg("could not stat file \"%s\": %m", path)));
    2073             : 
    2074          24 :     return ret == 0;
    2075             : }

Generated by: LCOV version 1.16