LCOV - code coverage report
Current view: top level - src/backend/replication/logical - snapbuild.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 84.8 % 558 473
Test Date: 2026-03-12 06:14:44 Functions: 100.0 % 32 32
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * snapbuild.c
       4              :  *
       5              :  *    Infrastructure for building historic catalog snapshots based on contents
       6              :  *    of the WAL, for the purpose of decoding heapam.c style values in the
       7              :  *    WAL.
       8              :  *
       9              :  * NOTES:
      10              :  *
      11              :  * We build snapshots which can *only* be used to read catalog contents and we
      12              :  * do so by reading and interpreting the WAL stream. The aim is to build a
      13              :  * snapshot that behaves the same as a freshly taken MVCC snapshot would have
      14              :  * at the time the XLogRecord was generated.
      15              :  *
      16              :  * To build the snapshots we reuse the infrastructure built for Hot
      17              :  * Standby. The in-memory snapshots we build look different than HS' because
      18              :  * we have different needs. To successfully decode data from the WAL we only
      19              :  * need to access catalog tables and (sys|rel|cat)cache, not the actual user
      20              :  * tables since the data we decode is wholly contained in the WAL
      21              :  * records. Also, our snapshots need to be different in comparison to normal
      22              :  * MVCC ones because in contrast to those we cannot fully rely on the clog and
      23              :  * pg_subtrans for information about committed transactions because they might
      24              :  * commit in the future from the POV of the WAL entry we're currently
      25              :  * decoding. This definition has the advantage that we only need to prevent
      26              :  * removal of catalog rows, while normal table's rows can still be
      27              :  * removed. This is achieved by using the replication slot mechanism.
      28              :  *
      29              :  * As the percentage of transactions modifying the catalog normally is fairly
      30              :  * small in comparisons to ones only manipulating user data, we keep track of
      31              :  * the committed catalog modifying ones inside [xmin, xmax) instead of keeping
      32              :  * track of all running transactions like it's done in a normal snapshot. Note
      33              :  * that we're generally only looking at transactions that have acquired an
      34              :  * xid. That is we keep a list of transactions between snapshot->(xmin, xmax)
      35              :  * that we consider committed, everything else is considered aborted/in
      36              :  * progress. That also allows us not to care about subtransactions before they
      37              :  * have committed which means this module, in contrast to HS, doesn't have to
      38              :  * care about suboverflowed subtransactions and similar.
      39              :  *
      40              :  * One complexity of doing this is that to e.g. handle mixed DDL/DML
      41              :  * transactions we need Snapshots that see intermediate versions of the
      42              :  * catalog in a transaction. During normal operation this is achieved by using
      43              :  * CommandIds/cmin/cmax. The problem with that however is that for space
      44              :  * efficiency reasons, the cmin and cmax are not included in WAL records. We
      45              :  * cannot read the cmin/cmax from the tuple itself, either, because it is
      46              :  * reset on crash recovery. Even if we could, we could not decode combocids
      47              :  * which are only tracked in the original backend's memory. To work around
      48              :  * that, heapam writes an extra WAL record (XLOG_HEAP2_NEW_CID) every time a
      49              :  * catalog row is modified, which includes the cmin and cmax of the
      50              :  * tuple. During decoding, we insert the ctid->(cmin,cmax) mappings into the
      51              :  * reorder buffer, and use them at visibility checks instead of the cmin/cmax
      52              :  * on the tuple itself. Check the reorderbuffer.c's comment above
      53              :  * ResolveCminCmaxDuringDecoding() for details.
      54              :  *
      55              :  * To facilitate all this we need our own visibility routine, as the normal
      56              :  * ones are optimized for different usecases.
      57              :  *
      58              :  * To replace the normal catalog snapshots with decoding ones use the
      59              :  * SetupHistoricSnapshot() and TeardownHistoricSnapshot() functions.
      60              :  *
      61              :  *
      62              :  *
      63              :  * The snapbuild machinery is starting up in several stages, as illustrated
      64              :  * by the following graph describing the SnapBuild->state transitions:
      65              :  *
      66              :  *         +-------------------------+
      67              :  *    +----|         START           |-------------+
      68              :  *    |    +-------------------------+             |
      69              :  *    |                 |                          |
      70              :  *    |                 |                          |
      71              :  *    |        running_xacts #1                    |
      72              :  *    |                 |                          |
      73              :  *    |                 |                          |
      74              :  *    |                 v                          |
      75              :  *    |    +-------------------------+             v
      76              :  *    |    |   BUILDING_SNAPSHOT     |------------>|
      77              :  *    |    +-------------------------+             |
      78              :  *    |                 |                          |
      79              :  *    |                 |                          |
      80              :  *    | running_xacts #2, xacts from #1 finished   |
      81              :  *    |                 |                          |
      82              :  *    |                 |                          |
      83              :  *    |                 v                          |
      84              :  *    |    +-------------------------+             v
      85              :  *    |    |       FULL_SNAPSHOT     |------------>|
      86              :  *    |    +-------------------------+             |
      87              :  *    |                 |                          |
      88              :  * running_xacts        |                      saved snapshot
      89              :  * with zero xacts      |                 at running_xacts's lsn
      90              :  *    |                 |                          |
      91              :  *    | running_xacts with xacts from #2 finished  |
      92              :  *    |                 |                          |
      93              :  *    |                 v                          |
      94              :  *    |    +-------------------------+             |
      95              :  *    +--->|SNAPBUILD_CONSISTENT  |<------------+
      96              :  *         +-------------------------+
      97              :  *
      98              :  * Initially the machinery is in the START stage. When an xl_running_xacts
      99              :  * record is read that is sufficiently new (above the safe xmin horizon),
     100              :  * there's a state transition. If there were no running xacts when the
     101              :  * xl_running_xacts record was generated, we'll directly go into CONSISTENT
     102              :  * state, otherwise we'll switch to the BUILDING_SNAPSHOT state. Having a full
     103              :  * snapshot means that all transactions that start henceforth can be decoded
     104              :  * in their entirety, but transactions that started previously can't. In
     105              :  * FULL_SNAPSHOT we'll switch into CONSISTENT once all those previously
     106              :  * running transactions have committed or aborted.
     107              :  *
     108              :  * Only transactions that commit after CONSISTENT state has been reached will
     109              :  * be replayed, even though they might have started while still in
     110              :  * FULL_SNAPSHOT. That ensures that we'll reach a point where no previous
     111              :  * changes has been exported, but all the following ones will be. That point
     112              :  * is a convenient point to initialize replication from, which is why we
     113              :  * export a snapshot at that point, which *can* be used to read normal data.
     114              :  *
     115              :  * Copyright (c) 2012-2026, PostgreSQL Global Development Group
     116              :  *
     117              :  * IDENTIFICATION
     118              :  *    src/backend/replication/logical/snapbuild.c
     119              :  *
     120              :  *-------------------------------------------------------------------------
     121              :  */
     122              : 
     123              : #include "postgres.h"
     124              : 
     125              : #include <sys/stat.h>
     126              : #include <unistd.h>
     127              : 
     128              : #include "access/heapam_xlog.h"
     129              : #include "access/transam.h"
     130              : #include "access/xact.h"
     131              : #include "common/file_utils.h"
     132              : #include "miscadmin.h"
     133              : #include "pgstat.h"
     134              : #include "replication/logical.h"
     135              : #include "replication/reorderbuffer.h"
     136              : #include "replication/snapbuild.h"
     137              : #include "replication/snapbuild_internal.h"
     138              : #include "storage/fd.h"
     139              : #include "storage/lmgr.h"
     140              : #include "storage/proc.h"
     141              : #include "storage/procarray.h"
     142              : #include "storage/standby.h"
     143              : #include "utils/builtins.h"
     144              : #include "utils/memutils.h"
     145              : #include "utils/snapmgr.h"
     146              : #include "utils/snapshot.h"
     147              : #include "utils/wait_event.h"
     148              : 
     149              : 
     150              : /*
     151              :  * Starting a transaction -- which we need to do while exporting a snapshot --
     152              :  * removes knowledge about the previously used resowner, so we save it here.
     153              :  */
     154              : static ResourceOwner SavedResourceOwnerDuringExport = NULL;
     155              : static bool ExportInProgress = false;
     156              : 
     157              : /* ->committed and ->catchange manipulation */
     158              : static void SnapBuildPurgeOlderTxn(SnapBuild *builder);
     159              : 
     160              : /* snapshot building/manipulation/distribution functions */
     161              : static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder);
     162              : 
     163              : static void SnapBuildFreeSnapshot(Snapshot snap);
     164              : 
     165              : static void SnapBuildSnapIncRefcount(Snapshot snap);
     166              : 
     167              : static void SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid);
     168              : 
     169              : static inline bool SnapBuildXidHasCatalogChanges(SnapBuild *builder, TransactionId xid,
     170              :                                                  uint32 xinfo);
     171              : 
     172              : /* xlog reading helper functions for SnapBuildProcessRunningXacts */
     173              : static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running);
     174              : static void SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff);
     175              : 
     176              : /* serialization functions */
     177              : static void SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn);
     178              : static bool SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn);
     179              : static void SnapBuildRestoreContents(int fd, void *dest, Size size, const char *path);
     180              : 
     181              : /*
     182              :  * Allocate a new snapshot builder.
     183              :  *
     184              :  * xmin_horizon is the xid >= which we can be sure no catalog rows have been
     185              :  * removed, start_lsn is the LSN >= we want to replay commits.
     186              :  */
     187              : SnapBuild *
     188         1170 : AllocateSnapshotBuilder(ReorderBuffer *reorder,
     189              :                         TransactionId xmin_horizon,
     190              :                         XLogRecPtr start_lsn,
     191              :                         bool need_full_snapshot,
     192              :                         bool in_slot_creation,
     193              :                         XLogRecPtr two_phase_at)
     194              : {
     195              :     MemoryContext context;
     196              :     MemoryContext oldcontext;
     197              :     SnapBuild  *builder;
     198              : 
     199              :     /* allocate memory in own context, to have better accountability */
     200         1170 :     context = AllocSetContextCreate(CurrentMemoryContext,
     201              :                                     "snapshot builder context",
     202              :                                     ALLOCSET_DEFAULT_SIZES);
     203         1170 :     oldcontext = MemoryContextSwitchTo(context);
     204              : 
     205         1170 :     builder = palloc0_object(SnapBuild);
     206              : 
     207         1170 :     builder->state = SNAPBUILD_START;
     208         1170 :     builder->context = context;
     209         1170 :     builder->reorder = reorder;
     210              :     /* Other struct members initialized by zeroing via palloc0 above */
     211              : 
     212         1170 :     builder->committed.xcnt = 0;
     213         1170 :     builder->committed.xcnt_space = 128; /* arbitrary number */
     214         1170 :     builder->committed.xip =
     215         1170 :         palloc0_array(TransactionId, builder->committed.xcnt_space);
     216         1170 :     builder->committed.includes_all_transactions = true;
     217              : 
     218         1170 :     builder->catchange.xcnt = 0;
     219         1170 :     builder->catchange.xip = NULL;
     220              : 
     221         1170 :     builder->initial_xmin_horizon = xmin_horizon;
     222         1170 :     builder->start_decoding_at = start_lsn;
     223         1170 :     builder->in_slot_creation = in_slot_creation;
     224         1170 :     builder->building_full_snapshot = need_full_snapshot;
     225         1170 :     builder->two_phase_at = two_phase_at;
     226              : 
     227         1170 :     MemoryContextSwitchTo(oldcontext);
     228              : 
     229         1170 :     return builder;
     230              : }
     231              : 
     232              : /*
     233              :  * Free a snapshot builder.
     234              :  */
     235              : void
     236          924 : FreeSnapshotBuilder(SnapBuild *builder)
     237              : {
     238          924 :     MemoryContext context = builder->context;
     239              : 
     240              :     /* free snapshot explicitly, that contains some error checking */
     241          924 :     if (builder->snapshot != NULL)
     242              :     {
     243          223 :         SnapBuildSnapDecRefcount(builder->snapshot);
     244          223 :         builder->snapshot = NULL;
     245              :     }
     246              : 
     247              :     /* other resources are deallocated via memory context reset */
     248          924 :     MemoryContextDelete(context);
     249          924 : }
     250              : 
     251              : /*
     252              :  * Free an unreferenced snapshot that has previously been built by us.
     253              :  */
     254              : static void
     255         1490 : SnapBuildFreeSnapshot(Snapshot snap)
     256              : {
     257              :     /* make sure we don't get passed an external snapshot */
     258              :     Assert(snap->snapshot_type == SNAPSHOT_HISTORIC_MVCC);
     259              : 
     260              :     /* make sure nobody modified our snapshot */
     261              :     Assert(snap->curcid == FirstCommandId);
     262              :     Assert(!snap->suboverflowed);
     263              :     Assert(!snap->takenDuringRecovery);
     264              :     Assert(snap->regd_count == 0);
     265              : 
     266              :     /* slightly more likely, so it's checked even without c-asserts */
     267         1490 :     if (snap->copied)
     268            0 :         elog(ERROR, "cannot free a copied snapshot");
     269              : 
     270         1490 :     if (snap->active_count)
     271            0 :         elog(ERROR, "cannot free an active snapshot");
     272              : 
     273         1490 :     pfree(snap);
     274         1490 : }
     275              : 
     276              : /*
     277              :  * In which state of snapshot building are we?
     278              :  */
     279              : SnapBuildState
     280      2135448 : SnapBuildCurrentState(SnapBuild *builder)
     281              : {
     282      2135448 :     return builder->state;
     283              : }
     284              : 
     285              : /*
     286              :  * Return the LSN at which the two-phase decoding was first enabled.
     287              :  */
     288              : XLogRecPtr
     289           34 : SnapBuildGetTwoPhaseAt(SnapBuild *builder)
     290              : {
     291           34 :     return builder->two_phase_at;
     292              : }
     293              : 
     294              : /*
     295              :  * Set the LSN at which two-phase decoding is enabled.
     296              :  */
     297              : void
     298            8 : SnapBuildSetTwoPhaseAt(SnapBuild *builder, XLogRecPtr ptr)
     299              : {
     300            8 :     builder->two_phase_at = ptr;
     301            8 : }
     302              : 
     303              : /*
     304              :  * Should the contents of transaction ending at 'ptr' be decoded?
     305              :  */
     306              : bool
     307       508539 : SnapBuildXactNeedsSkip(SnapBuild *builder, XLogRecPtr ptr)
     308              : {
     309       508539 :     return ptr < builder->start_decoding_at;
     310              : }
     311              : 
     312              : /*
     313              :  * Increase refcount of a snapshot.
     314              :  *
     315              :  * This is used when handing out a snapshot to some external resource or when
     316              :  * adding a Snapshot as builder->snapshot.
     317              :  */
     318              : static void
     319         6549 : SnapBuildSnapIncRefcount(Snapshot snap)
     320              : {
     321         6549 :     snap->active_count++;
     322         6549 : }
     323              : 
     324              : /*
     325              :  * Decrease refcount of a snapshot and free if the refcount reaches zero.
     326              :  *
     327              :  * Externally visible, so that external resources that have been handed an
     328              :  * IncRef'ed Snapshot can adjust its refcount easily.
     329              :  */
     330              : void
     331         6264 : SnapBuildSnapDecRefcount(Snapshot snap)
     332              : {
     333              :     /* make sure we don't get passed an external snapshot */
     334              :     Assert(snap->snapshot_type == SNAPSHOT_HISTORIC_MVCC);
     335              : 
     336              :     /* make sure nobody modified our snapshot */
     337              :     Assert(snap->curcid == FirstCommandId);
     338              :     Assert(!snap->suboverflowed);
     339              :     Assert(!snap->takenDuringRecovery);
     340              : 
     341              :     Assert(snap->regd_count == 0);
     342              : 
     343              :     Assert(snap->active_count > 0);
     344              : 
     345              :     /* slightly more likely, so it's checked even without casserts */
     346         6264 :     if (snap->copied)
     347            0 :         elog(ERROR, "cannot free a copied snapshot");
     348              : 
     349         6264 :     snap->active_count--;
     350         6264 :     if (snap->active_count == 0)
     351         1490 :         SnapBuildFreeSnapshot(snap);
     352         6264 : }
     353              : 
     354              : /*
     355              :  * Build a new snapshot, based on currently committed catalog-modifying
     356              :  * transactions.
     357              :  *
     358              :  * In-progress transactions with catalog access are *not* allowed to modify
     359              :  * these snapshots; they have to copy them and fill in appropriate ->curcid
     360              :  * and ->subxip/subxcnt values.
     361              :  */
     362              : static Snapshot
     363         1955 : SnapBuildBuildSnapshot(SnapBuild *builder)
     364              : {
     365              :     Snapshot    snapshot;
     366              :     Size        ssize;
     367              : 
     368              :     Assert(builder->state >= SNAPBUILD_FULL_SNAPSHOT);
     369              : 
     370         1955 :     ssize = sizeof(SnapshotData)
     371         1955 :         + sizeof(TransactionId) * builder->committed.xcnt
     372         1955 :         + sizeof(TransactionId) * 1 /* toplevel xid */ ;
     373              : 
     374         1955 :     snapshot = MemoryContextAllocZero(builder->context, ssize);
     375              : 
     376         1955 :     snapshot->snapshot_type = SNAPSHOT_HISTORIC_MVCC;
     377              : 
     378              :     /*
     379              :      * We misuse the original meaning of SnapshotData's xip and subxip fields
     380              :      * to make the more fitting for our needs.
     381              :      *
     382              :      * In the 'xip' array we store transactions that have to be treated as
     383              :      * committed. Since we will only ever look at tuples from transactions
     384              :      * that have modified the catalog it's more efficient to store those few
     385              :      * that exist between xmin and xmax (frequently there are none).
     386              :      *
     387              :      * Snapshots that are used in transactions that have modified the catalog
     388              :      * also use the 'subxip' array to store their toplevel xid and all the
     389              :      * subtransaction xids so we can recognize when we need to treat rows as
     390              :      * visible that are not in xip but still need to be visible. Subxip only
     391              :      * gets filled when the transaction is copied into the context of a
     392              :      * catalog modifying transaction since we otherwise share a snapshot
     393              :      * between transactions. As long as a txn hasn't modified the catalog it
     394              :      * doesn't need to treat any uncommitted rows as visible, so there is no
     395              :      * need for those xids.
     396              :      *
     397              :      * Both arrays are qsort'ed so that we can use bsearch() on them.
     398              :      */
     399              :     Assert(TransactionIdIsNormal(builder->xmin));
     400              :     Assert(TransactionIdIsNormal(builder->xmax));
     401              : 
     402         1955 :     snapshot->xmin = builder->xmin;
     403         1955 :     snapshot->xmax = builder->xmax;
     404              : 
     405              :     /* store all transactions to be treated as committed by this snapshot */
     406         1955 :     snapshot->xip =
     407         1955 :         (TransactionId *) ((char *) snapshot + sizeof(SnapshotData));
     408         1955 :     snapshot->xcnt = builder->committed.xcnt;
     409         1955 :     memcpy(snapshot->xip,
     410         1955 :            builder->committed.xip,
     411         1955 :            builder->committed.xcnt * sizeof(TransactionId));
     412              : 
     413              :     /* sort so we can bsearch() */
     414         1955 :     qsort(snapshot->xip, snapshot->xcnt, sizeof(TransactionId), xidComparator);
     415              : 
     416              :     /*
     417              :      * Initially, subxip is empty, i.e. it's a snapshot to be used by
     418              :      * transactions that don't modify the catalog. Will be filled by
     419              :      * ReorderBufferCopySnap() if necessary.
     420              :      */
     421         1955 :     snapshot->subxcnt = 0;
     422         1955 :     snapshot->subxip = NULL;
     423              : 
     424         1955 :     snapshot->suboverflowed = false;
     425         1955 :     snapshot->takenDuringRecovery = false;
     426         1955 :     snapshot->copied = false;
     427         1955 :     snapshot->curcid = FirstCommandId;
     428         1955 :     snapshot->active_count = 0;
     429         1955 :     snapshot->regd_count = 0;
     430         1955 :     snapshot->snapXactCompletionCount = 0;
     431              : 
     432         1955 :     return snapshot;
     433              : }
     434              : 
     435              : /*
     436              :  * Build the initial slot snapshot and convert it to a normal snapshot that
     437              :  * is understood by HeapTupleSatisfiesMVCC.
     438              :  *
     439              :  * The snapshot will be usable directly in current transaction or exported
     440              :  * for loading in different transaction.
     441              :  */
     442              : Snapshot
     443          210 : SnapBuildInitialSnapshot(SnapBuild *builder)
     444              : {
     445              :     Snapshot    snap;
     446              :     TransactionId xid;
     447              :     TransactionId safeXid;
     448              :     TransactionId *newxip;
     449          210 :     int         newxcnt = 0;
     450              : 
     451              :     Assert(XactIsoLevel == XACT_REPEATABLE_READ);
     452              :     Assert(builder->building_full_snapshot);
     453              : 
     454              :     /* don't allow older snapshots */
     455          210 :     InvalidateCatalogSnapshot();    /* about to overwrite MyProc->xmin */
     456          210 :     if (HaveRegisteredOrActiveSnapshot())
     457            0 :         elog(ERROR, "cannot build an initial slot snapshot when snapshots exist");
     458              :     Assert(!HistoricSnapshotActive());
     459              : 
     460          210 :     if (builder->state != SNAPBUILD_CONSISTENT)
     461            0 :         elog(ERROR, "cannot build an initial slot snapshot before reaching a consistent state");
     462              : 
     463          210 :     if (!builder->committed.includes_all_transactions)
     464            0 :         elog(ERROR, "cannot build an initial slot snapshot, not all transactions are monitored anymore");
     465              : 
     466              :     /* so we don't overwrite the existing value */
     467          210 :     if (TransactionIdIsValid(MyProc->xmin))
     468            0 :         elog(ERROR, "cannot build an initial slot snapshot when MyProc->xmin already is valid");
     469              : 
     470          210 :     snap = SnapBuildBuildSnapshot(builder);
     471              : 
     472              :     /*
     473              :      * We know that snap->xmin is alive, enforced by the logical xmin
     474              :      * mechanism. Due to that we can do this without locks, we're only
     475              :      * changing our own value.
     476              :      *
     477              :      * Building an initial snapshot is expensive and an unenforced xmin
     478              :      * horizon would have bad consequences, therefore always double-check that
     479              :      * the horizon is enforced.
     480              :      */
     481          210 :     LWLockAcquire(ProcArrayLock, LW_SHARED);
     482          210 :     safeXid = GetOldestSafeDecodingTransactionId(false);
     483          210 :     LWLockRelease(ProcArrayLock);
     484              : 
     485          210 :     if (TransactionIdFollows(safeXid, snap->xmin))
     486            0 :         elog(ERROR, "cannot build an initial slot snapshot as oldest safe xid %u follows snapshot's xmin %u",
     487              :              safeXid, snap->xmin);
     488              : 
     489          210 :     MyProc->xmin = snap->xmin;
     490              : 
     491              :     /* allocate in transaction context */
     492          210 :     newxip = palloc_array(TransactionId, GetMaxSnapshotXidCount());
     493              : 
     494              :     /*
     495              :      * snapbuild.c builds transactions in an "inverted" manner, which means it
     496              :      * stores committed transactions in ->xip, not ones in progress. Build a
     497              :      * classical snapshot by marking all non-committed transactions as
     498              :      * in-progress. This can be expensive.
     499              :      */
     500          210 :     for (xid = snap->xmin; NormalTransactionIdPrecedes(xid, snap->xmax);)
     501              :     {
     502              :         void       *test;
     503              : 
     504              :         /*
     505              :          * Check whether transaction committed using the decoding snapshot
     506              :          * meaning of ->xip.
     507              :          */
     508            0 :         test = bsearch(&xid, snap->xip, snap->xcnt,
     509              :                        sizeof(TransactionId), xidComparator);
     510              : 
     511            0 :         if (test == NULL)
     512              :         {
     513            0 :             if (newxcnt >= GetMaxSnapshotXidCount())
     514            0 :                 ereport(ERROR,
     515              :                         (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
     516              :                          errmsg("initial slot snapshot too large")));
     517              : 
     518            0 :             newxip[newxcnt++] = xid;
     519              :         }
     520              : 
     521            0 :         TransactionIdAdvance(xid);
     522              :     }
     523              : 
     524              :     /* adjust remaining snapshot fields as needed */
     525          210 :     snap->snapshot_type = SNAPSHOT_MVCC;
     526          210 :     snap->xcnt = newxcnt;
     527          210 :     snap->xip = newxip;
     528              : 
     529          210 :     return snap;
     530              : }
     531              : 
     532              : /*
     533              :  * Export a snapshot so it can be set in another session with SET TRANSACTION
     534              :  * SNAPSHOT.
     535              :  *
     536              :  * For that we need to start a transaction in the current backend as the
     537              :  * importing side checks whether the source transaction is still open to make
     538              :  * sure the xmin horizon hasn't advanced since then.
     539              :  */
     540              : const char *
     541            1 : SnapBuildExportSnapshot(SnapBuild *builder)
     542              : {
     543              :     Snapshot    snap;
     544              :     char       *snapname;
     545              : 
     546            1 :     if (IsTransactionOrTransactionBlock())
     547            0 :         elog(ERROR, "cannot export a snapshot from within a transaction");
     548              : 
     549            1 :     if (SavedResourceOwnerDuringExport)
     550            0 :         elog(ERROR, "can only export one snapshot at a time");
     551              : 
     552            1 :     SavedResourceOwnerDuringExport = CurrentResourceOwner;
     553            1 :     ExportInProgress = true;
     554              : 
     555            1 :     StartTransactionCommand();
     556              : 
     557              :     /* There doesn't seem to a nice API to set these */
     558            1 :     XactIsoLevel = XACT_REPEATABLE_READ;
     559            1 :     XactReadOnly = true;
     560              : 
     561            1 :     snap = SnapBuildInitialSnapshot(builder);
     562              : 
     563              :     /*
     564              :      * now that we've built a plain snapshot, make it active and use the
     565              :      * normal mechanisms for exporting it
     566              :      */
     567            1 :     snapname = ExportSnapshot(snap);
     568              : 
     569            1 :     ereport(LOG,
     570              :             (errmsg_plural("exported logical decoding snapshot: \"%s\" with %u transaction ID",
     571              :                            "exported logical decoding snapshot: \"%s\" with %u transaction IDs",
     572              :                            snap->xcnt,
     573              :                            snapname, snap->xcnt)));
     574            1 :     return snapname;
     575              : }
     576              : 
     577              : /*
     578              :  * Ensure there is a snapshot and if not build one for current transaction.
     579              :  */
     580              : Snapshot
     581            9 : SnapBuildGetOrBuildSnapshot(SnapBuild *builder)
     582              : {
     583              :     Assert(builder->state == SNAPBUILD_CONSISTENT);
     584              : 
     585              :     /* only build a new snapshot if we don't have a prebuilt one */
     586            9 :     if (builder->snapshot == NULL)
     587              :     {
     588            2 :         builder->snapshot = SnapBuildBuildSnapshot(builder);
     589              :         /* increase refcount for the snapshot builder */
     590            2 :         SnapBuildSnapIncRefcount(builder->snapshot);
     591              :     }
     592              : 
     593            9 :     return builder->snapshot;
     594              : }
     595              : 
     596              : /*
     597              :  * Reset a previously SnapBuildExportSnapshot()'ed snapshot if there is
     598              :  * any. Aborts the previously started transaction and resets the resource
     599              :  * owner back to its original value.
     600              :  */
     601              : void
     602         5671 : SnapBuildClearExportedSnapshot(void)
     603              : {
     604              :     ResourceOwner tmpResOwner;
     605              : 
     606              :     /* nothing exported, that is the usual case */
     607         5671 :     if (!ExportInProgress)
     608         5670 :         return;
     609              : 
     610            1 :     if (!IsTransactionState())
     611            0 :         elog(ERROR, "clearing exported snapshot in wrong transaction state");
     612              : 
     613              :     /*
     614              :      * AbortCurrentTransaction() takes care of resetting the snapshot state,
     615              :      * so remember SavedResourceOwnerDuringExport.
     616              :      */
     617            1 :     tmpResOwner = SavedResourceOwnerDuringExport;
     618              : 
     619              :     /* make sure nothing could have ever happened */
     620            1 :     AbortCurrentTransaction();
     621              : 
     622            1 :     CurrentResourceOwner = tmpResOwner;
     623              : }
     624              : 
     625              : /*
     626              :  * Clear snapshot export state during transaction abort.
     627              :  */
     628              : void
     629        26632 : SnapBuildResetExportedSnapshotState(void)
     630              : {
     631        26632 :     SavedResourceOwnerDuringExport = NULL;
     632        26632 :     ExportInProgress = false;
     633        26632 : }
     634              : 
     635              : /*
     636              :  * Handle the effects of a single heap change, appropriate to the current state
     637              :  * of the snapshot builder and returns whether changes made at (xid, lsn) can
     638              :  * be decoded.
     639              :  */
     640              : bool
     641      1401008 : SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
     642              : {
     643              :     /*
     644              :      * We can't handle data in transactions if we haven't built a snapshot
     645              :      * yet, so don't store them.
     646              :      */
     647      1401008 :     if (builder->state < SNAPBUILD_FULL_SNAPSHOT)
     648            0 :         return false;
     649              : 
     650              :     /*
     651              :      * No point in keeping track of changes in transactions that we don't have
     652              :      * enough information about to decode. This means that they started before
     653              :      * we got into the SNAPBUILD_FULL_SNAPSHOT state.
     654              :      */
     655      1401011 :     if (builder->state < SNAPBUILD_CONSISTENT &&
     656            3 :         TransactionIdPrecedes(xid, builder->next_phase_at))
     657            0 :         return false;
     658              : 
     659              :     /*
     660              :      * If the reorderbuffer doesn't yet have a snapshot, add one now, it will
     661              :      * be needed to decode the change we're currently processing.
     662              :      */
     663      1401008 :     if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
     664              :     {
     665              :         /* only build a new snapshot if we don't have a prebuilt one */
     666         3481 :         if (builder->snapshot == NULL)
     667              :         {
     668          426 :             builder->snapshot = SnapBuildBuildSnapshot(builder);
     669              :             /* increase refcount for the snapshot builder */
     670          426 :             SnapBuildSnapIncRefcount(builder->snapshot);
     671              :         }
     672              : 
     673              :         /*
     674              :          * Increase refcount for the transaction we're handing the snapshot
     675              :          * out to.
     676              :          */
     677         3481 :         SnapBuildSnapIncRefcount(builder->snapshot);
     678         3481 :         ReorderBufferSetBaseSnapshot(builder->reorder, xid, lsn,
     679              :                                      builder->snapshot);
     680              :     }
     681              : 
     682      1401008 :     return true;
     683              : }
     684              : 
     685              : /*
     686              :  * Do CommandId/combo CID handling after reading an xl_heap_new_cid record.
     687              :  * This implies that a transaction has done some form of write to system
     688              :  * catalogs.
     689              :  */
     690              : void
     691        23624 : SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
     692              :                        XLogRecPtr lsn, xl_heap_new_cid *xlrec)
     693              : {
     694              :     CommandId   cid;
     695              : 
     696              :     /*
     697              :      * we only log new_cid's if a catalog tuple was modified, so mark the
     698              :      * transaction as containing catalog modifications
     699              :      */
     700        23624 :     ReorderBufferXidSetCatalogChanges(builder->reorder, xid, lsn);
     701              : 
     702        23624 :     ReorderBufferAddNewTupleCids(builder->reorder, xlrec->top_xid, lsn,
     703              :                                  xlrec->target_locator, xlrec->target_tid,
     704              :                                  xlrec->cmin, xlrec->cmax,
     705              :                                  xlrec->combocid);
     706              : 
     707              :     /* figure out new command id */
     708        23624 :     if (xlrec->cmin != InvalidCommandId &&
     709        19621 :         xlrec->cmax != InvalidCommandId)
     710         3182 :         cid = Max(xlrec->cmin, xlrec->cmax);
     711        20442 :     else if (xlrec->cmax != InvalidCommandId)
     712         4003 :         cid = xlrec->cmax;
     713        16439 :     else if (xlrec->cmin != InvalidCommandId)
     714        16439 :         cid = xlrec->cmin;
     715              :     else
     716              :     {
     717            0 :         cid = InvalidCommandId; /* silence compiler */
     718            0 :         elog(ERROR, "xl_heap_new_cid record without a valid CommandId");
     719              :     }
     720              : 
     721        23624 :     ReorderBufferAddNewCommandId(builder->reorder, xid, lsn, cid + 1);
     722        23624 : }
     723              : 
     724              : /*
     725              :  * Add a new Snapshot and invalidation messages to all transactions we're
     726              :  * decoding that currently are in-progress so they can see new catalog contents
     727              :  * made by the transaction that just committed. This is necessary because those
     728              :  * in-progress transactions will use the new catalog's contents from here on
     729              :  * (at the very least everything they do needs to be compatible with newer
     730              :  * catalog contents).
     731              :  */
     732              : static void
     733         1310 : SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid)
     734              : {
     735              :     dlist_iter  txn_i;
     736              :     ReorderBufferTXN *txn;
     737              : 
     738              :     /*
     739              :      * Iterate through all toplevel transactions. This can include
     740              :      * subtransactions which we just don't yet know to be that, but that's
     741              :      * fine, they will just get an unnecessary snapshot and invalidations
     742              :      * queued.
     743              :      */
     744         2655 :     dlist_foreach(txn_i, &builder->reorder->toplevel_by_lsn)
     745              :     {
     746         1345 :         txn = dlist_container(ReorderBufferTXN, node, txn_i.cur);
     747              : 
     748              :         Assert(TransactionIdIsValid(txn->xid));
     749              : 
     750              :         /*
     751              :          * If we don't have a base snapshot yet, there are no changes in this
     752              :          * transaction which in turn implies we don't yet need a snapshot at
     753              :          * all. We'll add a snapshot when the first change gets queued.
     754              :          *
     755              :          * Similarly, we don't need to add invalidations to a transaction
     756              :          * whose base snapshot is not yet set. Once a base snapshot is built,
     757              :          * it will include the xids of committed transactions that have
     758              :          * modified the catalog, thus reflecting the new catalog contents. The
     759              :          * existing catalog cache will have already been invalidated after
     760              :          * processing the invalidations in the transaction that modified
     761              :          * catalogs, ensuring that a fresh cache is constructed during
     762              :          * decoding.
     763              :          *
     764              :          * NB: This works correctly even for subtransactions because
     765              :          * ReorderBufferAssignChild() takes care to transfer the base snapshot
     766              :          * to the top-level transaction, and while iterating the changequeue
     767              :          * we'll get the change from the subtxn.
     768              :          */
     769         1345 :         if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, txn->xid))
     770            2 :             continue;
     771              : 
     772              :         /*
     773              :          * We don't need to add snapshot or invalidations to prepared
     774              :          * transactions as they should not see the new catalog contents.
     775              :          */
     776         1343 :         if (rbtxn_is_prepared(txn))
     777           28 :             continue;
     778              : 
     779         1315 :         elog(DEBUG2, "adding a new snapshot and invalidations to %u at %X/%08X",
     780              :              txn->xid, LSN_FORMAT_ARGS(lsn));
     781              : 
     782              :         /*
     783              :          * increase the snapshot's refcount for the transaction we are handing
     784              :          * it out to
     785              :          */
     786         1315 :         SnapBuildSnapIncRefcount(builder->snapshot);
     787         1315 :         ReorderBufferAddSnapshot(builder->reorder, txn->xid, lsn,
     788              :                                  builder->snapshot);
     789              : 
     790              :         /*
     791              :          * Add invalidation messages to the reorder buffer of in-progress
     792              :          * transactions except the current committed transaction, for which we
     793              :          * will execute invalidations at the end.
     794              :          *
     795              :          * It is required, otherwise, we will end up using the stale catcache
     796              :          * contents built by the current transaction even after its decoding,
     797              :          * which should have been invalidated due to concurrent catalog
     798              :          * changing transaction.
     799              :          *
     800              :          * Distribute only the invalidation messages generated by the current
     801              :          * committed transaction. Invalidation messages received from other
     802              :          * transactions would have already been propagated to the relevant
     803              :          * in-progress transactions. This transaction would have processed
     804              :          * those invalidations, ensuring that subsequent transactions observe
     805              :          * a consistent cache state.
     806              :          */
     807         1315 :         if (txn->xid != xid)
     808              :         {
     809              :             uint32      ninvalidations;
     810           33 :             SharedInvalidationMessage *msgs = NULL;
     811              : 
     812           33 :             ninvalidations = ReorderBufferGetInvalidations(builder->reorder,
     813              :                                                            xid, &msgs);
     814              : 
     815           33 :             if (ninvalidations > 0)
     816              :             {
     817              :                 Assert(msgs != NULL);
     818              : 
     819           29 :                 ReorderBufferAddDistributedInvalidations(builder->reorder,
     820              :                                                          txn->xid, lsn,
     821              :                                                          ninvalidations, msgs);
     822              :             }
     823              :         }
     824              :     }
     825         1310 : }
     826              : 
     827              : /*
     828              :  * Keep track of a new catalog changing transaction that has committed.
     829              :  */
     830              : static void
     831         1319 : SnapBuildAddCommittedTxn(SnapBuild *builder, TransactionId xid)
     832              : {
     833              :     Assert(TransactionIdIsValid(xid));
     834              : 
     835         1319 :     if (builder->committed.xcnt == builder->committed.xcnt_space)
     836              :     {
     837            0 :         builder->committed.xcnt_space = builder->committed.xcnt_space * 2 + 1;
     838              : 
     839            0 :         elog(DEBUG1, "increasing space for committed transactions to %u",
     840              :              (uint32) builder->committed.xcnt_space);
     841              : 
     842            0 :         builder->committed.xip = repalloc_array(builder->committed.xip,
     843              :                                                 TransactionId,
     844              :                                                 builder->committed.xcnt_space);
     845              :     }
     846              : 
     847              :     /*
     848              :      * TODO: It might make sense to keep the array sorted here instead of
     849              :      * doing it every time we build a new snapshot. On the other hand this
     850              :      * gets called repeatedly when a transaction with subtransactions commits.
     851              :      */
     852         1319 :     builder->committed.xip[builder->committed.xcnt++] = xid;
     853         1319 : }
     854              : 
     855              : /*
     856              :  * Remove knowledge about transactions we treat as committed or containing catalog
     857              :  * changes that are smaller than ->xmin. Those won't ever get checked via
     858              :  * the ->committed or ->catchange array, respectively. The committed xids will
     859              :  * get checked via the clog machinery.
     860              :  *
     861              :  * We can ideally remove the transaction from catchange array once it is
     862              :  * finished (committed/aborted) but that could be costly as we need to maintain
     863              :  * the xids order in the array.
     864              :  */
     865              : static void
     866          494 : SnapBuildPurgeOlderTxn(SnapBuild *builder)
     867              : {
     868              :     int         off;
     869              :     TransactionId *workspace;
     870          494 :     int         surviving_xids = 0;
     871              : 
     872              :     /* not ready yet */
     873          494 :     if (!TransactionIdIsNormal(builder->xmin))
     874            0 :         return;
     875              : 
     876              :     /* TODO: Neater algorithm than just copying and iterating? */
     877              :     workspace =
     878          494 :         MemoryContextAlloc(builder->context,
     879          494 :                            builder->committed.xcnt * sizeof(TransactionId));
     880              : 
     881              :     /* copy xids that still are interesting to workspace */
     882          752 :     for (off = 0; off < builder->committed.xcnt; off++)
     883              :     {
     884          258 :         if (NormalTransactionIdPrecedes(builder->committed.xip[off],
     885              :                                         builder->xmin))
     886              :             ;                   /* remove */
     887              :         else
     888            1 :             workspace[surviving_xids++] = builder->committed.xip[off];
     889              :     }
     890              : 
     891              :     /* copy workspace back to persistent state */
     892          494 :     memcpy(builder->committed.xip, workspace,
     893              :            surviving_xids * sizeof(TransactionId));
     894              : 
     895          494 :     elog(DEBUG3, "purged committed transactions from %u to %u, xmin: %u, xmax: %u",
     896              :          (uint32) builder->committed.xcnt, (uint32) surviving_xids,
     897              :          builder->xmin, builder->xmax);
     898          494 :     builder->committed.xcnt = surviving_xids;
     899              : 
     900          494 :     pfree(workspace);
     901              : 
     902              :     /*
     903              :      * Purge xids in ->catchange as well. The purged array must also be sorted
     904              :      * in xidComparator order.
     905              :      */
     906          494 :     if (builder->catchange.xcnt > 0)
     907              :     {
     908              :         /*
     909              :          * Since catchange.xip is sorted, we find the lower bound of xids that
     910              :          * are still interesting.
     911              :          */
     912            7 :         for (off = 0; off < builder->catchange.xcnt; off++)
     913              :         {
     914            5 :             if (TransactionIdFollowsOrEquals(builder->catchange.xip[off],
     915              :                                              builder->xmin))
     916            1 :                 break;
     917              :         }
     918              : 
     919            3 :         surviving_xids = builder->catchange.xcnt - off;
     920              : 
     921            3 :         if (surviving_xids > 0)
     922              :         {
     923            1 :             memmove(builder->catchange.xip, &(builder->catchange.xip[off]),
     924              :                     surviving_xids * sizeof(TransactionId));
     925              :         }
     926              :         else
     927              :         {
     928            2 :             pfree(builder->catchange.xip);
     929            2 :             builder->catchange.xip = NULL;
     930              :         }
     931              : 
     932            3 :         elog(DEBUG3, "purged catalog modifying transactions from %u to %u, xmin: %u, xmax: %u",
     933              :              (uint32) builder->catchange.xcnt, (uint32) surviving_xids,
     934              :              builder->xmin, builder->xmax);
     935            3 :         builder->catchange.xcnt = surviving_xids;
     936              :     }
     937              : }
     938              : 
     939              : /*
     940              :  * Handle everything that needs to be done when a transaction commits
     941              :  */
     942              : void
     943         3332 : SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
     944              :                    int nsubxacts, TransactionId *subxacts, uint32 xinfo)
     945              : {
     946              :     int         nxact;
     947              : 
     948         3332 :     bool        needs_snapshot = false;
     949         3332 :     bool        needs_timetravel = false;
     950         3332 :     bool        sub_needs_timetravel = false;
     951              : 
     952         3332 :     TransactionId xmax = xid;
     953              : 
     954              :     /*
     955              :      * Transactions preceding BUILDING_SNAPSHOT will neither be decoded, nor
     956              :      * will they be part of a snapshot.  So we don't need to record anything.
     957              :      */
     958         3332 :     if (builder->state == SNAPBUILD_START ||
     959         3332 :         (builder->state == SNAPBUILD_BUILDING_SNAPSHOT &&
     960            0 :          TransactionIdPrecedes(xid, builder->next_phase_at)))
     961              :     {
     962              :         /* ensure that only commits after this are getting replayed */
     963            0 :         if (builder->start_decoding_at <= lsn)
     964            0 :             builder->start_decoding_at = lsn + 1;
     965            0 :         return;
     966              :     }
     967              : 
     968         3332 :     if (builder->state < SNAPBUILD_CONSISTENT)
     969              :     {
     970              :         /* ensure that only commits after this are getting replayed */
     971            5 :         if (builder->start_decoding_at <= lsn)
     972            2 :             builder->start_decoding_at = lsn + 1;
     973              : 
     974              :         /*
     975              :          * If building an exportable snapshot, force xid to be tracked, even
     976              :          * if the transaction didn't modify the catalog.
     977              :          */
     978            5 :         if (builder->building_full_snapshot)
     979              :         {
     980            0 :             needs_timetravel = true;
     981              :         }
     982              :     }
     983              : 
     984         4541 :     for (nxact = 0; nxact < nsubxacts; nxact++)
     985              :     {
     986         1209 :         TransactionId subxid = subxacts[nxact];
     987              : 
     988              :         /*
     989              :          * Add subtransaction to base snapshot if catalog modifying, we don't
     990              :          * distinguish to toplevel transactions there.
     991              :          */
     992         1209 :         if (SnapBuildXidHasCatalogChanges(builder, subxid, xinfo))
     993              :         {
     994            9 :             sub_needs_timetravel = true;
     995            9 :             needs_snapshot = true;
     996              : 
     997            9 :             elog(DEBUG1, "found subtransaction %u:%u with catalog changes",
     998              :                  xid, subxid);
     999              : 
    1000            9 :             SnapBuildAddCommittedTxn(builder, subxid);
    1001              : 
    1002            9 :             if (NormalTransactionIdFollows(subxid, xmax))
    1003            9 :                 xmax = subxid;
    1004              :         }
    1005              : 
    1006              :         /*
    1007              :          * If we're forcing timetravel we also need visibility information
    1008              :          * about subtransaction, so keep track of subtransaction's state, even
    1009              :          * if not catalog modifying.  Don't need to distribute a snapshot in
    1010              :          * that case.
    1011              :          */
    1012         1200 :         else if (needs_timetravel)
    1013              :         {
    1014            0 :             SnapBuildAddCommittedTxn(builder, subxid);
    1015            0 :             if (NormalTransactionIdFollows(subxid, xmax))
    1016            0 :                 xmax = subxid;
    1017              :         }
    1018              :     }
    1019              : 
    1020              :     /* if top-level modified catalog, it'll need a snapshot */
    1021         3332 :     if (SnapBuildXidHasCatalogChanges(builder, xid, xinfo))
    1022              :     {
    1023         1309 :         elog(DEBUG2, "found top level transaction %u, with catalog changes",
    1024              :              xid);
    1025         1309 :         needs_snapshot = true;
    1026         1309 :         needs_timetravel = true;
    1027         1309 :         SnapBuildAddCommittedTxn(builder, xid);
    1028              :     }
    1029         2023 :     else if (sub_needs_timetravel)
    1030              :     {
    1031              :         /* track toplevel txn as well, subxact alone isn't meaningful */
    1032            1 :         elog(DEBUG2, "forced transaction %u to do timetravel due to one of its subtransactions",
    1033              :              xid);
    1034            1 :         needs_timetravel = true;
    1035            1 :         SnapBuildAddCommittedTxn(builder, xid);
    1036              :     }
    1037         2022 :     else if (needs_timetravel)
    1038              :     {
    1039            0 :         elog(DEBUG2, "forced transaction %u to do timetravel", xid);
    1040              : 
    1041            0 :         SnapBuildAddCommittedTxn(builder, xid);
    1042              :     }
    1043              : 
    1044         3332 :     if (!needs_timetravel)
    1045              :     {
    1046              :         /* record that we cannot export a general snapshot anymore */
    1047         2022 :         builder->committed.includes_all_transactions = false;
    1048              :     }
    1049              : 
    1050              :     Assert(!needs_snapshot || needs_timetravel);
    1051              : 
    1052              :     /*
    1053              :      * Adjust xmax of the snapshot builder, we only do that for committed,
    1054              :      * catalog modifying, transactions, everything else isn't interesting for
    1055              :      * us since we'll never look at the respective rows.
    1056              :      */
    1057         3332 :     if (needs_timetravel &&
    1058         2620 :         (!TransactionIdIsValid(builder->xmax) ||
    1059         1310 :          TransactionIdFollowsOrEquals(xmax, builder->xmax)))
    1060              :     {
    1061         1310 :         builder->xmax = xmax;
    1062         1310 :         TransactionIdAdvance(builder->xmax);
    1063              :     }
    1064              : 
    1065              :     /* if there's any reason to build a historic snapshot, do so now */
    1066         3332 :     if (needs_snapshot)
    1067              :     {
    1068              :         /*
    1069              :          * If we haven't built a complete snapshot yet there's no need to hand
    1070              :          * it out, it wouldn't (and couldn't) be used anyway.
    1071              :          */
    1072         1310 :         if (builder->state < SNAPBUILD_FULL_SNAPSHOT)
    1073            0 :             return;
    1074              : 
    1075              :         /*
    1076              :          * Decrease the snapshot builder's refcount of the old snapshot, note
    1077              :          * that it still will be used if it has been handed out to the
    1078              :          * reorderbuffer earlier.
    1079              :          */
    1080         1310 :         if (builder->snapshot)
    1081         1310 :             SnapBuildSnapDecRefcount(builder->snapshot);
    1082              : 
    1083         1310 :         builder->snapshot = SnapBuildBuildSnapshot(builder);
    1084              : 
    1085              :         /* we might need to execute invalidations, add snapshot */
    1086         1310 :         if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
    1087              :         {
    1088            8 :             SnapBuildSnapIncRefcount(builder->snapshot);
    1089            8 :             ReorderBufferSetBaseSnapshot(builder->reorder, xid, lsn,
    1090              :                                          builder->snapshot);
    1091              :         }
    1092              : 
    1093              :         /* refcount of the snapshot builder for the new snapshot */
    1094         1310 :         SnapBuildSnapIncRefcount(builder->snapshot);
    1095              : 
    1096              :         /*
    1097              :          * Add a new catalog snapshot and invalidations messages to all
    1098              :          * currently running transactions.
    1099              :          */
    1100         1310 :         SnapBuildDistributeSnapshotAndInval(builder, lsn, xid);
    1101              :     }
    1102              : }
    1103              : 
    1104              : /*
    1105              :  * Check the reorder buffer and the snapshot to see if the given transaction has
    1106              :  * modified catalogs.
    1107              :  */
    1108              : static inline bool
    1109         4541 : SnapBuildXidHasCatalogChanges(SnapBuild *builder, TransactionId xid,
    1110              :                               uint32 xinfo)
    1111              : {
    1112         4541 :     if (ReorderBufferXidHasCatalogChanges(builder->reorder, xid))
    1113         1314 :         return true;
    1114              : 
    1115              :     /*
    1116              :      * The transactions that have changed catalogs must have invalidation
    1117              :      * info.
    1118              :      */
    1119         3227 :     if (!(xinfo & XACT_XINFO_HAS_INVALS))
    1120         3219 :         return false;
    1121              : 
    1122              :     /* Check the catchange XID array */
    1123           12 :     return ((builder->catchange.xcnt > 0) &&
    1124            4 :             (bsearch(&xid, builder->catchange.xip, builder->catchange.xcnt,
    1125              :                      sizeof(TransactionId), xidComparator) != NULL));
    1126              : }
    1127              : 
    1128              : /* -----------------------------------
    1129              :  * Snapshot building functions dealing with xlog records
    1130              :  * -----------------------------------
    1131              :  */
    1132              : 
    1133              : /*
    1134              :  * Process a running xacts record, and use its information to first build a
    1135              :  * historic snapshot and later to release resources that aren't needed
    1136              :  * anymore.
    1137              :  */
    1138              : void
    1139         1605 : SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
    1140              : {
    1141              :     ReorderBufferTXN *txn;
    1142              :     TransactionId xmin;
    1143              : 
    1144              :     /*
    1145              :      * If we're not consistent yet, inspect the record to see whether it
    1146              :      * allows to get closer to being consistent. If we are consistent, dump
    1147              :      * our snapshot so others or we, after a restart, can use it.
    1148              :      */
    1149         1605 :     if (builder->state < SNAPBUILD_CONSISTENT)
    1150              :     {
    1151              :         /* returns false if there's no point in performing cleanup just yet */
    1152         1132 :         if (!SnapBuildFindSnapshot(builder, lsn, running))
    1153         1109 :             return;
    1154              :     }
    1155              :     else
    1156          473 :         SnapBuildSerialize(builder, lsn);
    1157              : 
    1158              :     /*
    1159              :      * Update range of interesting xids based on the running xacts
    1160              :      * information. We don't increase ->xmax using it, because once we are in
    1161              :      * a consistent state we can do that ourselves and much more efficiently
    1162              :      * so, because we only need to do it for catalog transactions since we
    1163              :      * only ever look at those.
    1164              :      *
    1165              :      * NB: We only increase xmax when a catalog modifying transaction commits
    1166              :      * (see SnapBuildCommitTxn).  Because of this, xmax can be lower than
    1167              :      * xmin, which looks odd but is correct and actually more efficient, since
    1168              :      * we hit fast paths in heapam_visibility.c.
    1169              :      */
    1170          494 :     builder->xmin = running->oldestRunningXid;
    1171              : 
    1172              :     /* Remove transactions we don't need to keep track off anymore */
    1173          494 :     SnapBuildPurgeOlderTxn(builder);
    1174              : 
    1175              :     /*
    1176              :      * Advance the xmin limit for the current replication slot, to allow
    1177              :      * vacuum to clean up the tuples this slot has been protecting.
    1178              :      *
    1179              :      * The reorderbuffer might have an xmin among the currently running
    1180              :      * snapshots; use it if so.  If not, we need only consider the snapshots
    1181              :      * we'll produce later, which can't be less than the oldest running xid in
    1182              :      * the record we're reading now.
    1183              :      */
    1184          494 :     xmin = ReorderBufferGetOldestXmin(builder->reorder);
    1185          494 :     if (xmin == InvalidTransactionId)
    1186          448 :         xmin = running->oldestRunningXid;
    1187          494 :     elog(DEBUG3, "xmin: %u, xmax: %u, oldest running: %u, oldest xmin: %u",
    1188              :          builder->xmin, builder->xmax, running->oldestRunningXid, xmin);
    1189          494 :     LogicalIncreaseXminForSlot(lsn, xmin);
    1190              : 
    1191              :     /*
    1192              :      * Also tell the slot where we can restart decoding from. We don't want to
    1193              :      * do that after every commit because changing that implies an fsync of
    1194              :      * the logical slot's state file, so we only do it every time we see a
    1195              :      * running xacts record.
    1196              :      *
    1197              :      * Do so by looking for the oldest in progress transaction (determined by
    1198              :      * the first LSN of any of its relevant records). Every transaction
    1199              :      * remembers the last location we stored the snapshot to disk before its
    1200              :      * beginning. That point is where we can restart from.
    1201              :      */
    1202              : 
    1203              :     /*
    1204              :      * Can't know about a serialized snapshot's location if we're not
    1205              :      * consistent.
    1206              :      */
    1207          494 :     if (builder->state < SNAPBUILD_CONSISTENT)
    1208           16 :         return;
    1209              : 
    1210          478 :     txn = ReorderBufferGetOldestTXN(builder->reorder);
    1211              : 
    1212              :     /*
    1213              :      * oldest ongoing txn might have started when we didn't yet serialize
    1214              :      * anything because we hadn't reached a consistent state yet.
    1215              :      */
    1216          478 :     if (txn != NULL && XLogRecPtrIsValid(txn->restart_decoding_lsn))
    1217           24 :         LogicalIncreaseRestartDecodingForSlot(lsn, txn->restart_decoding_lsn);
    1218              : 
    1219              :     /*
    1220              :      * No in-progress transaction, can reuse the last serialized snapshot if
    1221              :      * we have one.
    1222              :      */
    1223          454 :     else if (txn == NULL &&
    1224          423 :              XLogRecPtrIsValid(builder->reorder->current_restart_decoding_lsn) &&
    1225          421 :              XLogRecPtrIsValid(builder->last_serialized_snapshot))
    1226          421 :         LogicalIncreaseRestartDecodingForSlot(lsn,
    1227              :                                               builder->last_serialized_snapshot);
    1228              : }
    1229              : 
    1230              : 
    1231              : /*
    1232              :  * Build the start of a snapshot that's capable of decoding the catalog.
    1233              :  *
    1234              :  * Helper function for SnapBuildProcessRunningXacts() while we're not yet
    1235              :  * consistent.
    1236              :  *
    1237              :  * Returns true if there is a point in performing internal maintenance/cleanup
    1238              :  * using the xl_running_xacts record.
    1239              :  */
    1240              : static bool
    1241         1132 : SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
    1242              : {
    1243              :     /* ---
    1244              :      * Build catalog decoding snapshot incrementally using information about
    1245              :      * the currently running transactions. There are several ways to do that:
    1246              :      *
    1247              :      * a) There were no running transactions when the xl_running_xacts record
    1248              :      *    was inserted, jump to CONSISTENT immediately. We might find such a
    1249              :      *    state while waiting on c)'s sub-states.
    1250              :      *
    1251              :      * b) This (in a previous run) or another decoding slot serialized a
    1252              :      *    snapshot to disk that we can use. Can't use this method while finding
    1253              :      *    the start point for decoding changes as the restart LSN would be an
    1254              :      *    arbitrary LSN but we need to find the start point to extract changes
    1255              :      *    where we won't see the data for partial transactions. Also, we cannot
    1256              :      *    use this method when a slot needs a full snapshot for export or direct
    1257              :      *    use, as that snapshot will only contain catalog modifying transactions.
    1258              :      *
    1259              :      * c) First incrementally build a snapshot for catalog tuples
    1260              :      *    (BUILDING_SNAPSHOT), that requires all, already in-progress,
    1261              :      *    transactions to finish.  Every transaction starting after that
    1262              :      *    (FULL_SNAPSHOT state), has enough information to be decoded.  But
    1263              :      *    for older running transactions no viable snapshot exists yet, so
    1264              :      *    CONSISTENT will only be reached once all of those have finished.
    1265              :      * ---
    1266              :      */
    1267              : 
    1268              :     /*
    1269              :      * xl_running_xacts record is older than what we can use, we might not
    1270              :      * have all necessary catalog rows anymore.
    1271              :      */
    1272         1132 :     if (TransactionIdIsNormal(builder->initial_xmin_horizon) &&
    1273          492 :         NormalTransactionIdPrecedes(running->oldestRunningXid,
    1274              :                                     builder->initial_xmin_horizon))
    1275              :     {
    1276            0 :         ereport(DEBUG1,
    1277              :                 errmsg_internal("skipping snapshot at %X/%08X while building logical decoding snapshot, xmin horizon too low",
    1278              :                                 LSN_FORMAT_ARGS(lsn)),
    1279              :                 errdetail_internal("initial xmin horizon of %u vs the snapshot's %u",
    1280              :                                    builder->initial_xmin_horizon, running->oldestRunningXid));
    1281              : 
    1282              : 
    1283            0 :         SnapBuildWaitSnapshot(running, builder->initial_xmin_horizon);
    1284              : 
    1285            0 :         return true;
    1286              :     }
    1287              : 
    1288              :     /*
    1289              :      * a) No transaction were running, we can jump to consistent.
    1290              :      *
    1291              :      * This is not affected by races around xl_running_xacts, because we can
    1292              :      * miss transaction commits, but currently not transactions starting.
    1293              :      *
    1294              :      * NB: We might have already started to incrementally assemble a snapshot,
    1295              :      * so we need to be careful to deal with that.
    1296              :      */
    1297         1132 :     if (running->oldestRunningXid == running->nextXid)
    1298              :     {
    1299         1102 :         if (!XLogRecPtrIsValid(builder->start_decoding_at) ||
    1300          616 :             builder->start_decoding_at <= lsn)
    1301              :             /* can decode everything after this */
    1302          487 :             builder->start_decoding_at = lsn + 1;
    1303              : 
    1304              :         /* As no transactions were running xmin/xmax can be trivially set. */
    1305         1102 :         builder->xmin = running->nextXid; /* < are finished */
    1306         1102 :         builder->xmax = running->nextXid; /* >= are running */
    1307              : 
    1308              :         /* so we can safely use the faster comparisons */
    1309              :         Assert(TransactionIdIsNormal(builder->xmin));
    1310              :         Assert(TransactionIdIsNormal(builder->xmax));
    1311              : 
    1312         1102 :         builder->state = SNAPBUILD_CONSISTENT;
    1313         1102 :         builder->next_phase_at = InvalidTransactionId;
    1314              : 
    1315         1102 :         ereport(LOG,
    1316              :                 errmsg("logical decoding found consistent point at %X/%08X",
    1317              :                        LSN_FORMAT_ARGS(lsn)),
    1318              :                 errdetail("There are no running transactions."));
    1319              : 
    1320         1102 :         return false;
    1321              :     }
    1322              : 
    1323              :     /*
    1324              :      * b) valid on disk state and while neither building full snapshot nor
    1325              :      * creating a slot.
    1326              :      */
    1327           30 :     else if (!builder->building_full_snapshot &&
    1328           48 :              !builder->in_slot_creation &&
    1329           18 :              SnapBuildRestore(builder, lsn))
    1330              :     {
    1331              :         /* there won't be any state to cleanup */
    1332            7 :         return false;
    1333              :     }
    1334              : 
    1335              :     /*
    1336              :      * c) transition from START to BUILDING_SNAPSHOT.
    1337              :      *
    1338              :      * In START state, and a xl_running_xacts record with running xacts is
    1339              :      * encountered.  In that case, switch to BUILDING_SNAPSHOT state, and
    1340              :      * record xl_running_xacts->nextXid.  Once all running xacts have finished
    1341              :      * (i.e. they're all >= nextXid), we have a complete catalog snapshot.  It
    1342              :      * might look that we could use xl_running_xacts's ->xids information to
    1343              :      * get there quicker, but that is problematic because transactions marked
    1344              :      * as running, might already have inserted their commit record - it's
    1345              :      * infeasible to change that with locking.
    1346              :      */
    1347           23 :     else if (builder->state == SNAPBUILD_START)
    1348              :     {
    1349           12 :         builder->state = SNAPBUILD_BUILDING_SNAPSHOT;
    1350           12 :         builder->next_phase_at = running->nextXid;
    1351              : 
    1352              :         /*
    1353              :          * Start with an xmin/xmax that's correct for future, when all the
    1354              :          * currently running transactions have finished. We'll update both
    1355              :          * while waiting for the pending transactions to finish.
    1356              :          */
    1357           12 :         builder->xmin = running->nextXid; /* < are finished */
    1358           12 :         builder->xmax = running->nextXid; /* >= are running */
    1359              : 
    1360              :         /* so we can safely use the faster comparisons */
    1361              :         Assert(TransactionIdIsNormal(builder->xmin));
    1362              :         Assert(TransactionIdIsNormal(builder->xmax));
    1363              : 
    1364           12 :         ereport(LOG,
    1365              :                 errmsg("logical decoding found initial starting point at %X/%08X",
    1366              :                        LSN_FORMAT_ARGS(lsn)),
    1367              :                 errdetail("Waiting for transactions (approximately %d) older than %u to end.",
    1368              :                           running->xcnt, running->nextXid));
    1369              : 
    1370           12 :         SnapBuildWaitSnapshot(running, running->nextXid);
    1371              :     }
    1372              : 
    1373              :     /*
    1374              :      * c) transition from BUILDING_SNAPSHOT to FULL_SNAPSHOT.
    1375              :      *
    1376              :      * In BUILDING_SNAPSHOT state, and this xl_running_xacts' oldestRunningXid
    1377              :      * is >= than nextXid from when we switched to BUILDING_SNAPSHOT.  This
    1378              :      * means all transactions starting afterwards have enough information to
    1379              :      * be decoded.  Switch to FULL_SNAPSHOT.
    1380              :      */
    1381           17 :     else if (builder->state == SNAPBUILD_BUILDING_SNAPSHOT &&
    1382            6 :              TransactionIdPrecedesOrEquals(builder->next_phase_at,
    1383              :                                            running->oldestRunningXid))
    1384              :     {
    1385            5 :         builder->state = SNAPBUILD_FULL_SNAPSHOT;
    1386            5 :         builder->next_phase_at = running->nextXid;
    1387              : 
    1388            5 :         ereport(LOG,
    1389              :                 errmsg("logical decoding found initial consistent point at %X/%08X",
    1390              :                        LSN_FORMAT_ARGS(lsn)),
    1391              :                 errdetail("Waiting for transactions (approximately %d) older than %u to end.",
    1392              :                           running->xcnt, running->nextXid));
    1393              : 
    1394            5 :         SnapBuildWaitSnapshot(running, running->nextXid);
    1395              :     }
    1396              : 
    1397              :     /*
    1398              :      * c) transition from FULL_SNAPSHOT to CONSISTENT.
    1399              :      *
    1400              :      * In FULL_SNAPSHOT state, and this xl_running_xacts' oldestRunningXid is
    1401              :      * >= than nextXid from when we switched to FULL_SNAPSHOT.  This means all
    1402              :      * transactions that are currently in progress have a catalog snapshot,
    1403              :      * and all their changes have been collected.  Switch to CONSISTENT.
    1404              :      */
    1405           11 :     else if (builder->state == SNAPBUILD_FULL_SNAPSHOT &&
    1406            5 :              TransactionIdPrecedesOrEquals(builder->next_phase_at,
    1407              :                                            running->oldestRunningXid))
    1408              :     {
    1409            5 :         builder->state = SNAPBUILD_CONSISTENT;
    1410            5 :         builder->next_phase_at = InvalidTransactionId;
    1411              : 
    1412            5 :         ereport(LOG,
    1413              :                 errmsg("logical decoding found consistent point at %X/%08X",
    1414              :                        LSN_FORMAT_ARGS(lsn)),
    1415              :                 errdetail("There are no old transactions anymore."));
    1416              :     }
    1417              : 
    1418              :     /*
    1419              :      * We already started to track running xacts and need to wait for all
    1420              :      * in-progress ones to finish. We fall through to the normal processing of
    1421              :      * records so incremental cleanup can be performed.
    1422              :      */
    1423           21 :     return true;
    1424              : }
    1425              : 
    1426              : /* ---
    1427              :  * Iterate through xids in record, wait for all older than the cutoff to
    1428              :  * finish.  Then, if possible, log a new xl_running_xacts record.
    1429              :  *
    1430              :  * This isn't required for the correctness of decoding, but to:
    1431              :  * a) allow isolationtester to notice that we're currently waiting for
    1432              :  *    something.
    1433              :  * b) log a new xl_running_xacts record where it'd be helpful, without having
    1434              :  *    to wait for bgwriter or checkpointer.
    1435              :  * ---
    1436              :  */
    1437              : static void
    1438           17 : SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff)
    1439              : {
    1440              :     int         off;
    1441              : 
    1442           32 :     for (off = 0; off < running->xcnt; off++)
    1443              :     {
    1444           17 :         TransactionId xid = running->xids[off];
    1445              : 
    1446              :         /*
    1447              :          * Upper layers should prevent that we ever need to wait on ourselves.
    1448              :          * Check anyway, since failing to do so would either result in an
    1449              :          * endless wait or an Assert() failure.
    1450              :          */
    1451           17 :         if (TransactionIdIsCurrentTransactionId(xid))
    1452            0 :             elog(ERROR, "waiting for ourselves");
    1453              : 
    1454           17 :         if (TransactionIdFollows(xid, cutoff))
    1455            0 :             continue;
    1456              : 
    1457           17 :         XactLockTableWait(xid, NULL, NULL, XLTW_None);
    1458              :     }
    1459              : 
    1460              :     /*
    1461              :      * All transactions we needed to finish finished - try to ensure there is
    1462              :      * another xl_running_xacts record in a timely manner, without having to
    1463              :      * wait for bgwriter or checkpointer to log one.  During recovery we can't
    1464              :      * enforce that, so we'll have to wait.
    1465              :      */
    1466           15 :     if (!RecoveryInProgress())
    1467              :     {
    1468           15 :         LogStandbySnapshot();
    1469              :     }
    1470           15 : }
    1471              : 
    1472              : #define SnapBuildOnDiskConstantSize \
    1473              :     offsetof(SnapBuildOnDisk, builder)
    1474              : #define SnapBuildOnDiskNotChecksummedSize \
    1475              :     offsetof(SnapBuildOnDisk, version)
    1476              : 
    1477              : #define SNAPBUILD_MAGIC 0x51A1E001
    1478              : #define SNAPBUILD_VERSION 6
    1479              : 
    1480              : /*
    1481              :  * Store/Load a snapshot from disk, depending on the snapshot builder's state.
    1482              :  *
    1483              :  * Supposed to be used by external (i.e. not snapbuild.c) code that just read
    1484              :  * a record that's a potential location for a serialized snapshot.
    1485              :  */
    1486              : void
    1487           74 : SnapBuildSerializationPoint(SnapBuild *builder, XLogRecPtr lsn)
    1488              : {
    1489           74 :     if (builder->state < SNAPBUILD_CONSISTENT)
    1490            0 :         SnapBuildRestore(builder, lsn);
    1491              :     else
    1492           74 :         SnapBuildSerialize(builder, lsn);
    1493           74 : }
    1494              : 
    1495              : /*
    1496              :  * Serialize the snapshot 'builder' at the location 'lsn' if it hasn't already
    1497              :  * been done by another decoding process.
    1498              :  */
    1499              : static void
    1500          547 : SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
    1501              : {
    1502              :     Size        needed_length;
    1503          547 :     SnapBuildOnDisk *ondisk = NULL;
    1504          547 :     TransactionId *catchange_xip = NULL;
    1505              :     MemoryContext old_ctx;
    1506              :     size_t      catchange_xcnt;
    1507              :     char       *ondisk_c;
    1508              :     int         fd;
    1509              :     char        tmppath[MAXPGPATH];
    1510              :     char        path[MAXPGPATH];
    1511              :     int         ret;
    1512              :     struct stat stat_buf;
    1513              :     Size        sz;
    1514              : 
    1515              :     Assert(XLogRecPtrIsValid(lsn));
    1516              :     Assert(!XLogRecPtrIsValid(builder->last_serialized_snapshot) ||
    1517              :            builder->last_serialized_snapshot <= lsn);
    1518              : 
    1519              :     /*
    1520              :      * no point in serializing if we cannot continue to work immediately after
    1521              :      * restoring the snapshot
    1522              :      */
    1523          547 :     if (builder->state < SNAPBUILD_CONSISTENT)
    1524            0 :         return;
    1525              : 
    1526              :     /* consistent snapshots have no next phase */
    1527              :     Assert(builder->next_phase_at == InvalidTransactionId);
    1528              : 
    1529              :     /*
    1530              :      * We identify snapshots by the LSN they are valid for. We don't need to
    1531              :      * include timelines in the name as each LSN maps to exactly one timeline
    1532              :      * unless the user used pg_resetwal or similar. If a user did so, there's
    1533              :      * no hope continuing to decode anyway.
    1534              :      */
    1535          547 :     sprintf(path, "%s/%X-%X.snap",
    1536              :             PG_LOGICAL_SNAPSHOTS_DIR,
    1537          547 :             LSN_FORMAT_ARGS(lsn));
    1538              : 
    1539              :     /*
    1540              :      * first check whether some other backend already has written the snapshot
    1541              :      * for this LSN. It's perfectly fine if there's none, so we accept ENOENT
    1542              :      * as a valid state. Everything else is an unexpected error.
    1543              :      */
    1544          547 :     ret = stat(path, &stat_buf);
    1545              : 
    1546          547 :     if (ret != 0 && errno != ENOENT)
    1547            0 :         ereport(ERROR,
    1548              :                 (errcode_for_file_access(),
    1549              :                  errmsg("could not stat file \"%s\": %m", path)));
    1550              : 
    1551          547 :     else if (ret == 0)
    1552              :     {
    1553              :         /*
    1554              :          * somebody else has already serialized to this point, don't overwrite
    1555              :          * but remember location, so we don't need to read old data again.
    1556              :          *
    1557              :          * To be sure it has been synced to disk after the rename() from the
    1558              :          * tempfile filename to the real filename, we just repeat the fsync.
    1559              :          * That ought to be cheap because in most scenarios it should already
    1560              :          * be safely on disk.
    1561              :          */
    1562          221 :         fsync_fname(path, false);
    1563          221 :         fsync_fname(PG_LOGICAL_SNAPSHOTS_DIR, true);
    1564              : 
    1565          221 :         builder->last_serialized_snapshot = lsn;
    1566          221 :         goto out;
    1567              :     }
    1568              : 
    1569              :     /*
    1570              :      * there is an obvious race condition here between the time we stat(2) the
    1571              :      * file and us writing the file. But we rename the file into place
    1572              :      * atomically and all files created need to contain the same data anyway,
    1573              :      * so this is perfectly fine, although a bit of a resource waste. Locking
    1574              :      * seems like pointless complication.
    1575              :      */
    1576          326 :     elog(DEBUG1, "serializing snapshot to %s", path);
    1577              : 
    1578              :     /* to make sure only we will write to this tempfile, include pid */
    1579          326 :     sprintf(tmppath, "%s/%X-%X.snap.%d.tmp",
    1580              :             PG_LOGICAL_SNAPSHOTS_DIR,
    1581          326 :             LSN_FORMAT_ARGS(lsn), MyProcPid);
    1582              : 
    1583              :     /*
    1584              :      * Unlink temporary file if it already exists, needs to have been before a
    1585              :      * crash/error since we won't enter this function twice from within a
    1586              :      * single decoding slot/backend and the temporary file contains the pid of
    1587              :      * the current process.
    1588              :      */
    1589          326 :     if (unlink(tmppath) != 0 && errno != ENOENT)
    1590            0 :         ereport(ERROR,
    1591              :                 (errcode_for_file_access(),
    1592              :                  errmsg("could not remove file \"%s\": %m", tmppath)));
    1593              : 
    1594          326 :     old_ctx = MemoryContextSwitchTo(builder->context);
    1595              : 
    1596              :     /* Get the catalog modifying transactions that are yet not committed */
    1597          326 :     catchange_xip = ReorderBufferGetCatalogChangesXacts(builder->reorder);
    1598          326 :     catchange_xcnt = dclist_count(&builder->reorder->catchange_txns);
    1599              : 
    1600          326 :     needed_length = sizeof(SnapBuildOnDisk) +
    1601          326 :         sizeof(TransactionId) * (builder->committed.xcnt + catchange_xcnt);
    1602              : 
    1603          326 :     ondisk_c = palloc0(needed_length);
    1604          326 :     ondisk = (SnapBuildOnDisk *) ondisk_c;
    1605          326 :     ondisk->magic = SNAPBUILD_MAGIC;
    1606          326 :     ondisk->version = SNAPBUILD_VERSION;
    1607          326 :     ondisk->length = needed_length;
    1608          326 :     INIT_CRC32C(ondisk->checksum);
    1609          326 :     COMP_CRC32C(ondisk->checksum,
    1610              :                 ((char *) ondisk) + SnapBuildOnDiskNotChecksummedSize,
    1611              :                 SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
    1612          326 :     ondisk_c += sizeof(SnapBuildOnDisk);
    1613              : 
    1614          326 :     memcpy(&ondisk->builder, builder, sizeof(SnapBuild));
    1615              :     /* NULL-ify memory-only data */
    1616          326 :     ondisk->builder.context = NULL;
    1617          326 :     ondisk->builder.snapshot = NULL;
    1618          326 :     ondisk->builder.reorder = NULL;
    1619          326 :     ondisk->builder.committed.xip = NULL;
    1620          326 :     ondisk->builder.catchange.xip = NULL;
    1621              :     /* update catchange only on disk data */
    1622          326 :     ondisk->builder.catchange.xcnt = catchange_xcnt;
    1623              : 
    1624          326 :     COMP_CRC32C(ondisk->checksum,
    1625              :                 &ondisk->builder,
    1626              :                 sizeof(SnapBuild));
    1627              : 
    1628              :     /* copy committed xacts */
    1629          326 :     if (builder->committed.xcnt > 0)
    1630              :     {
    1631           60 :         sz = sizeof(TransactionId) * builder->committed.xcnt;
    1632           60 :         memcpy(ondisk_c, builder->committed.xip, sz);
    1633           60 :         COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
    1634           60 :         ondisk_c += sz;
    1635              :     }
    1636              : 
    1637              :     /* copy catalog modifying xacts */
    1638          326 :     if (catchange_xcnt > 0)
    1639              :     {
    1640            8 :         sz = sizeof(TransactionId) * catchange_xcnt;
    1641            8 :         memcpy(ondisk_c, catchange_xip, sz);
    1642            8 :         COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
    1643            8 :         ondisk_c += sz;
    1644              :     }
    1645              : 
    1646          326 :     FIN_CRC32C(ondisk->checksum);
    1647              : 
    1648              :     /* we have valid data now, open tempfile and write it there */
    1649          326 :     fd = OpenTransientFile(tmppath,
    1650              :                            O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
    1651          326 :     if (fd < 0)
    1652            0 :         ereport(ERROR,
    1653              :                 (errcode_for_file_access(),
    1654              :                  errmsg("could not open file \"%s\": %m", tmppath)));
    1655              : 
    1656          326 :     errno = 0;
    1657          326 :     pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_WRITE);
    1658          326 :     if ((write(fd, ondisk, needed_length)) != needed_length)
    1659              :     {
    1660            0 :         int         save_errno = errno;
    1661              : 
    1662            0 :         CloseTransientFile(fd);
    1663              : 
    1664              :         /* if write didn't set errno, assume problem is no disk space */
    1665            0 :         errno = save_errno ? save_errno : ENOSPC;
    1666            0 :         ereport(ERROR,
    1667              :                 (errcode_for_file_access(),
    1668              :                  errmsg("could not write to file \"%s\": %m", tmppath)));
    1669              :     }
    1670          326 :     pgstat_report_wait_end();
    1671              : 
    1672              :     /*
    1673              :      * fsync the file before renaming so that even if we crash after this we
    1674              :      * have either a fully valid file or nothing.
    1675              :      *
    1676              :      * It's safe to just ERROR on fsync() here because we'll retry the whole
    1677              :      * operation including the writes.
    1678              :      *
    1679              :      * TODO: Do the fsync() via checkpoints/restartpoints, doing it here has
    1680              :      * some noticeable overhead since it's performed synchronously during
    1681              :      * decoding?
    1682              :      */
    1683          326 :     pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_SYNC);
    1684          326 :     if (pg_fsync(fd) != 0)
    1685              :     {
    1686            0 :         int         save_errno = errno;
    1687              : 
    1688            0 :         CloseTransientFile(fd);
    1689            0 :         errno = save_errno;
    1690            0 :         ereport(ERROR,
    1691              :                 (errcode_for_file_access(),
    1692              :                  errmsg("could not fsync file \"%s\": %m", tmppath)));
    1693              :     }
    1694          326 :     pgstat_report_wait_end();
    1695              : 
    1696          326 :     if (CloseTransientFile(fd) != 0)
    1697            0 :         ereport(ERROR,
    1698              :                 (errcode_for_file_access(),
    1699              :                  errmsg("could not close file \"%s\": %m", tmppath)));
    1700              : 
    1701          326 :     fsync_fname(PG_LOGICAL_SNAPSHOTS_DIR, true);
    1702              : 
    1703              :     /*
    1704              :      * We may overwrite the work from some other backend, but that's ok, our
    1705              :      * snapshot is valid as well, we'll just have done some superfluous work.
    1706              :      */
    1707          326 :     if (rename(tmppath, path) != 0)
    1708              :     {
    1709            0 :         ereport(ERROR,
    1710              :                 (errcode_for_file_access(),
    1711              :                  errmsg("could not rename file \"%s\" to \"%s\": %m",
    1712              :                         tmppath, path)));
    1713              :     }
    1714              : 
    1715              :     /* make sure we persist */
    1716          326 :     fsync_fname(path, false);
    1717          326 :     fsync_fname(PG_LOGICAL_SNAPSHOTS_DIR, true);
    1718              : 
    1719              :     /*
    1720              :      * Now there's no way we can lose the dumped state anymore, remember this
    1721              :      * as a serialization point.
    1722              :      */
    1723          326 :     builder->last_serialized_snapshot = lsn;
    1724              : 
    1725          326 :     MemoryContextSwitchTo(old_ctx);
    1726              : 
    1727          547 : out:
    1728          547 :     ReorderBufferSetRestartPoint(builder->reorder,
    1729              :                                  builder->last_serialized_snapshot);
    1730              :     /* be tidy */
    1731          547 :     if (ondisk)
    1732          326 :         pfree(ondisk);
    1733          547 :     if (catchange_xip)
    1734            8 :         pfree(catchange_xip);
    1735              : }
    1736              : 
    1737              : /*
    1738              :  * Restore the logical snapshot file contents to 'ondisk'.
    1739              :  *
    1740              :  * 'context' is the memory context where the catalog modifying/committed xid
    1741              :  * will live.
    1742              :  * If 'missing_ok' is true, will not throw an error if the file is not found.
    1743              :  */
    1744              : bool
    1745           20 : SnapBuildRestoreSnapshot(SnapBuildOnDisk *ondisk, XLogRecPtr lsn,
    1746              :                          MemoryContext context, bool missing_ok)
    1747              : {
    1748              :     int         fd;
    1749              :     pg_crc32c   checksum;
    1750              :     Size        sz;
    1751              :     char        path[MAXPGPATH];
    1752              : 
    1753           20 :     sprintf(path, "%s/%X-%X.snap",
    1754              :             PG_LOGICAL_SNAPSHOTS_DIR,
    1755           20 :             LSN_FORMAT_ARGS(lsn));
    1756              : 
    1757           20 :     fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
    1758              : 
    1759           20 :     if (fd < 0)
    1760              :     {
    1761           11 :         if (missing_ok && errno == ENOENT)
    1762           11 :             return false;
    1763              : 
    1764            0 :         ereport(ERROR,
    1765              :                 (errcode_for_file_access(),
    1766              :                  errmsg("could not open file \"%s\": %m", path)));
    1767              :     }
    1768              : 
    1769              :     /* ----
    1770              :      * Make sure the snapshot had been stored safely to disk, that's normally
    1771              :      * cheap.
    1772              :      * Note that we do not need PANIC here, nobody will be able to use the
    1773              :      * slot without fsyncing, and saving it won't succeed without an fsync()
    1774              :      * either...
    1775              :      * ----
    1776              :      */
    1777            9 :     fsync_fname(path, false);
    1778            9 :     fsync_fname(PG_LOGICAL_SNAPSHOTS_DIR, true);
    1779              : 
    1780              :     /* read statically sized portion of snapshot */
    1781            9 :     SnapBuildRestoreContents(fd, ondisk, SnapBuildOnDiskConstantSize, path);
    1782              : 
    1783            9 :     if (ondisk->magic != SNAPBUILD_MAGIC)
    1784            0 :         ereport(ERROR,
    1785              :                 (errcode(ERRCODE_DATA_CORRUPTED),
    1786              :                  errmsg("snapbuild state file \"%s\" has wrong magic number: %u instead of %u",
    1787              :                         path, ondisk->magic, SNAPBUILD_MAGIC)));
    1788              : 
    1789            9 :     if (ondisk->version != SNAPBUILD_VERSION)
    1790            0 :         ereport(ERROR,
    1791              :                 (errcode(ERRCODE_DATA_CORRUPTED),
    1792              :                  errmsg("snapbuild state file \"%s\" has unsupported version: %u instead of %u",
    1793              :                         path, ondisk->version, SNAPBUILD_VERSION)));
    1794              : 
    1795            9 :     INIT_CRC32C(checksum);
    1796            9 :     COMP_CRC32C(checksum,
    1797              :                 ((char *) ondisk) + SnapBuildOnDiskNotChecksummedSize,
    1798              :                 SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
    1799              : 
    1800              :     /* read SnapBuild */
    1801            9 :     SnapBuildRestoreContents(fd, &ondisk->builder, sizeof(SnapBuild), path);
    1802            9 :     COMP_CRC32C(checksum, &ondisk->builder, sizeof(SnapBuild));
    1803              : 
    1804              :     /* restore committed xacts information */
    1805            9 :     if (ondisk->builder.committed.xcnt > 0)
    1806              :     {
    1807            4 :         sz = sizeof(TransactionId) * ondisk->builder.committed.xcnt;
    1808            4 :         ondisk->builder.committed.xip = MemoryContextAllocZero(context, sz);
    1809            4 :         SnapBuildRestoreContents(fd, ondisk->builder.committed.xip, sz, path);
    1810            4 :         COMP_CRC32C(checksum, ondisk->builder.committed.xip, sz);
    1811              :     }
    1812              : 
    1813              :     /* restore catalog modifying xacts information */
    1814            9 :     if (ondisk->builder.catchange.xcnt > 0)
    1815              :     {
    1816            4 :         sz = sizeof(TransactionId) * ondisk->builder.catchange.xcnt;
    1817            4 :         ondisk->builder.catchange.xip = MemoryContextAllocZero(context, sz);
    1818            4 :         SnapBuildRestoreContents(fd, ondisk->builder.catchange.xip, sz, path);
    1819            4 :         COMP_CRC32C(checksum, ondisk->builder.catchange.xip, sz);
    1820              :     }
    1821              : 
    1822            9 :     if (CloseTransientFile(fd) != 0)
    1823            0 :         ereport(ERROR,
    1824              :                 (errcode_for_file_access(),
    1825              :                  errmsg("could not close file \"%s\": %m", path)));
    1826              : 
    1827            9 :     FIN_CRC32C(checksum);
    1828              : 
    1829              :     /* verify checksum of what we've read */
    1830            9 :     if (!EQ_CRC32C(checksum, ondisk->checksum))
    1831            0 :         ereport(ERROR,
    1832              :                 (errcode(ERRCODE_DATA_CORRUPTED),
    1833              :                  errmsg("checksum mismatch for snapbuild state file \"%s\": is %u, should be %u",
    1834              :                         path, checksum, ondisk->checksum)));
    1835              : 
    1836            9 :     return true;
    1837              : }
    1838              : 
    1839              : /*
    1840              :  * Restore a snapshot into 'builder' if previously one has been stored at the
    1841              :  * location indicated by 'lsn'. Returns true if successful, false otherwise.
    1842              :  */
    1843              : static bool
    1844           18 : SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
    1845              : {
    1846              :     SnapBuildOnDisk ondisk;
    1847              : 
    1848              :     /* no point in loading a snapshot if we're already there */
    1849           18 :     if (builder->state == SNAPBUILD_CONSISTENT)
    1850            0 :         return false;
    1851              : 
    1852              :     /* validate and restore the snapshot to 'ondisk' */
    1853           18 :     if (!SnapBuildRestoreSnapshot(&ondisk, lsn, builder->context, true))
    1854           11 :         return false;
    1855              : 
    1856              :     /*
    1857              :      * ok, we now have a sensible snapshot here, figure out if it has more
    1858              :      * information than we have.
    1859              :      */
    1860              : 
    1861              :     /*
    1862              :      * We are only interested in consistent snapshots for now, comparing
    1863              :      * whether one incomplete snapshot is more "advanced" seems to be
    1864              :      * unnecessarily complex.
    1865              :      */
    1866            7 :     if (ondisk.builder.state < SNAPBUILD_CONSISTENT)
    1867            0 :         goto snapshot_not_interesting;
    1868              : 
    1869              :     /*
    1870              :      * Don't use a snapshot that requires an xmin that we cannot guarantee to
    1871              :      * be available.
    1872              :      */
    1873            7 :     if (TransactionIdPrecedes(ondisk.builder.xmin, builder->initial_xmin_horizon))
    1874            0 :         goto snapshot_not_interesting;
    1875              : 
    1876              :     /*
    1877              :      * Consistent snapshots have no next phase. Reset next_phase_at as it is
    1878              :      * possible that an old value may remain.
    1879              :      */
    1880              :     Assert(ondisk.builder.next_phase_at == InvalidTransactionId);
    1881            7 :     builder->next_phase_at = InvalidTransactionId;
    1882              : 
    1883              :     /* ok, we think the snapshot is sensible, copy over everything important */
    1884            7 :     builder->xmin = ondisk.builder.xmin;
    1885            7 :     builder->xmax = ondisk.builder.xmax;
    1886            7 :     builder->state = ondisk.builder.state;
    1887              : 
    1888            7 :     builder->committed.xcnt = ondisk.builder.committed.xcnt;
    1889              :     /* We only allocated/stored xcnt, not xcnt_space xids ! */
    1890              :     /* don't overwrite preallocated xip, if we don't have anything here */
    1891            7 :     if (builder->committed.xcnt > 0)
    1892              :     {
    1893            2 :         pfree(builder->committed.xip);
    1894            2 :         builder->committed.xcnt_space = ondisk.builder.committed.xcnt;
    1895            2 :         builder->committed.xip = ondisk.builder.committed.xip;
    1896              :     }
    1897            7 :     ondisk.builder.committed.xip = NULL;
    1898              : 
    1899              :     /* set catalog modifying transactions */
    1900            7 :     if (builder->catchange.xip)
    1901            0 :         pfree(builder->catchange.xip);
    1902            7 :     builder->catchange.xcnt = ondisk.builder.catchange.xcnt;
    1903            7 :     builder->catchange.xip = ondisk.builder.catchange.xip;
    1904            7 :     ondisk.builder.catchange.xip = NULL;
    1905              : 
    1906              :     /* our snapshot is not interesting anymore, build a new one */
    1907            7 :     if (builder->snapshot != NULL)
    1908              :     {
    1909            0 :         SnapBuildSnapDecRefcount(builder->snapshot);
    1910              :     }
    1911            7 :     builder->snapshot = SnapBuildBuildSnapshot(builder);
    1912            7 :     SnapBuildSnapIncRefcount(builder->snapshot);
    1913              : 
    1914            7 :     ReorderBufferSetRestartPoint(builder->reorder, lsn);
    1915              : 
    1916              :     Assert(builder->state == SNAPBUILD_CONSISTENT);
    1917              : 
    1918            7 :     ereport(LOG,
    1919              :             errmsg("logical decoding found consistent point at %X/%08X",
    1920              :                    LSN_FORMAT_ARGS(lsn)),
    1921              :             errdetail("Logical decoding will begin using saved snapshot."));
    1922            7 :     return true;
    1923              : 
    1924            0 : snapshot_not_interesting:
    1925            0 :     if (ondisk.builder.committed.xip != NULL)
    1926            0 :         pfree(ondisk.builder.committed.xip);
    1927            0 :     if (ondisk.builder.catchange.xip != NULL)
    1928            0 :         pfree(ondisk.builder.catchange.xip);
    1929            0 :     return false;
    1930              : }
    1931              : 
    1932              : /*
    1933              :  * Read the contents of the serialized snapshot to 'dest'.
    1934              :  */
    1935              : static void
    1936           26 : SnapBuildRestoreContents(int fd, void *dest, Size size, const char *path)
    1937              : {
    1938              :     int         readBytes;
    1939              : 
    1940           26 :     pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ);
    1941           26 :     readBytes = read(fd, dest, size);
    1942           26 :     pgstat_report_wait_end();
    1943           26 :     if (readBytes != size)
    1944              :     {
    1945            0 :         int         save_errno = errno;
    1946              : 
    1947            0 :         CloseTransientFile(fd);
    1948              : 
    1949            0 :         if (readBytes < 0)
    1950              :         {
    1951            0 :             errno = save_errno;
    1952            0 :             ereport(ERROR,
    1953              :                     (errcode_for_file_access(),
    1954              :                      errmsg("could not read file \"%s\": %m", path)));
    1955              :         }
    1956              :         else
    1957            0 :             ereport(ERROR,
    1958              :                     (errcode(ERRCODE_DATA_CORRUPTED),
    1959              :                      errmsg("could not read file \"%s\": read %d of %zu",
    1960              :                             path, readBytes, size)));
    1961              :     }
    1962           26 : }
    1963              : 
    1964              : /*
    1965              :  * Remove all serialized snapshots that are not required anymore because no
    1966              :  * slot can need them. This doesn't actually have to run during a checkpoint,
    1967              :  * but it's a convenient point to schedule this.
    1968              :  *
    1969              :  * NB: We run this during checkpoints even if logical decoding is disabled so
    1970              :  * we cleanup old slots at some point after it got disabled.
    1971              :  */
    1972              : void
    1973         1806 : CheckPointSnapBuild(void)
    1974              : {
    1975              :     XLogRecPtr  cutoff;
    1976              :     XLogRecPtr  redo;
    1977              :     DIR        *snap_dir;
    1978              :     struct dirent *snap_de;
    1979              :     char        path[MAXPGPATH + sizeof(PG_LOGICAL_SNAPSHOTS_DIR)];
    1980              : 
    1981              :     /*
    1982              :      * We start off with a minimum of the last redo pointer. No new
    1983              :      * replication slot will start before that, so that's a safe upper bound
    1984              :      * for removal.
    1985              :      */
    1986         1806 :     redo = GetRedoRecPtr();
    1987              : 
    1988              :     /* now check for the restart ptrs from existing slots */
    1989         1806 :     cutoff = ReplicationSlotsComputeLogicalRestartLSN();
    1990              : 
    1991              :     /* don't start earlier than the restart lsn */
    1992         1806 :     if (redo < cutoff)
    1993            1 :         cutoff = redo;
    1994              : 
    1995         1806 :     snap_dir = AllocateDir(PG_LOGICAL_SNAPSHOTS_DIR);
    1996         5707 :     while ((snap_de = ReadDir(snap_dir, PG_LOGICAL_SNAPSHOTS_DIR)) != NULL)
    1997              :     {
    1998              :         uint32      hi;
    1999              :         uint32      lo;
    2000              :         XLogRecPtr  lsn;
    2001              :         PGFileType  de_type;
    2002              : 
    2003         3901 :         if (strcmp(snap_de->d_name, ".") == 0 ||
    2004         2095 :             strcmp(snap_de->d_name, "..") == 0)
    2005         3612 :             continue;
    2006              : 
    2007          289 :         snprintf(path, sizeof(path), "%s/%s", PG_LOGICAL_SNAPSHOTS_DIR, snap_de->d_name);
    2008          289 :         de_type = get_dirent_type(path, snap_de, false, DEBUG1);
    2009              : 
    2010          289 :         if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_REG)
    2011              :         {
    2012            0 :             elog(DEBUG1, "only regular files expected: %s", path);
    2013            0 :             continue;
    2014              :         }
    2015              : 
    2016              :         /*
    2017              :          * temporary filenames from SnapBuildSerialize() include the LSN and
    2018              :          * everything but are postfixed by .$pid.tmp. We can just remove them
    2019              :          * the same as other files because there can be none that are
    2020              :          * currently being written that are older than cutoff.
    2021              :          *
    2022              :          * We just log a message if a file doesn't fit the pattern, it's
    2023              :          * probably some editors lock/state file or similar...
    2024              :          */
    2025          289 :         if (sscanf(snap_de->d_name, "%X-%X.snap", &hi, &lo) != 2)
    2026              :         {
    2027            0 :             ereport(LOG,
    2028              :                     (errmsg("could not parse file name \"%s\"", path)));
    2029            0 :             continue;
    2030              :         }
    2031              : 
    2032          289 :         lsn = ((uint64) hi) << 32 | lo;
    2033              : 
    2034              :         /* check whether we still need it */
    2035          289 :         if (lsn < cutoff || !XLogRecPtrIsValid(cutoff))
    2036              :         {
    2037          192 :             elog(DEBUG1, "removing snapbuild snapshot %s", path);
    2038              : 
    2039              :             /*
    2040              :              * It's not particularly harmful, though strange, if we can't
    2041              :              * remove the file here. Don't prevent the checkpoint from
    2042              :              * completing, that'd be a cure worse than the disease.
    2043              :              */
    2044          192 :             if (unlink(path) < 0)
    2045              :             {
    2046            0 :                 ereport(LOG,
    2047              :                         (errcode_for_file_access(),
    2048              :                          errmsg("could not remove file \"%s\": %m",
    2049              :                                 path)));
    2050            0 :                 continue;
    2051              :             }
    2052              :         }
    2053              :     }
    2054         1806 :     FreeDir(snap_dir);
    2055         1806 : }
    2056              : 
    2057              : /*
    2058              :  * Check if a logical snapshot at the specified point has been serialized.
    2059              :  */
    2060              : bool
    2061           13 : SnapBuildSnapshotExists(XLogRecPtr lsn)
    2062              : {
    2063              :     char        path[MAXPGPATH];
    2064              :     int         ret;
    2065              :     struct stat stat_buf;
    2066              : 
    2067           13 :     sprintf(path, "%s/%X-%X.snap",
    2068              :             PG_LOGICAL_SNAPSHOTS_DIR,
    2069           13 :             LSN_FORMAT_ARGS(lsn));
    2070              : 
    2071           13 :     ret = stat(path, &stat_buf);
    2072              : 
    2073           13 :     if (ret != 0 && errno != ENOENT)
    2074            0 :         ereport(ERROR,
    2075              :                 (errcode_for_file_access(),
    2076              :                  errmsg("could not stat file \"%s\": %m", path)));
    2077              : 
    2078           13 :     return ret == 0;
    2079              : }
        

Generated by: LCOV version 2.0-1