LCOV - code coverage report
Current view: top level - src/backend/replication/logical - snapbuild.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19beta1 Lines: 84.8 % 558 473
Test Date: 2026-06-06 16:16:23 Functions: 100.0 % 32 32
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * snapbuild.c
       4              :  *
       5              :  *    Infrastructure for building historic catalog snapshots based on contents
       6              :  *    of the WAL, for the purpose of decoding heapam.c style values in the
       7              :  *    WAL.
       8              :  *
       9              :  * NOTES:
      10              :  *
      11              :  * We build snapshots which can *only* be used to read catalog contents and we
      12              :  * do so by reading and interpreting the WAL stream. The aim is to build a
      13              :  * snapshot that behaves the same as a freshly taken MVCC snapshot would have
      14              :  * at the time the XLogRecord was generated.
      15              :  *
      16              :  * To build the snapshots we reuse the infrastructure built for Hot
      17              :  * Standby. The in-memory snapshots we build look different than HS' because
      18              :  * we have different needs. To successfully decode data from the WAL we only
      19              :  * need to access catalog tables and (sys|rel|cat)cache, not the actual user
      20              :  * tables since the data we decode is wholly contained in the WAL
      21              :  * records. Also, our snapshots need to be different in comparison to normal
      22              :  * MVCC ones because in contrast to those we cannot fully rely on the clog and
      23              :  * pg_subtrans for information about committed transactions because they might
      24              :  * commit in the future from the POV of the WAL entry we're currently
      25              :  * decoding. This definition has the advantage that we only need to prevent
      26              :  * removal of catalog rows, while normal table's rows can still be
      27              :  * removed. This is achieved by using the replication slot mechanism.
      28              :  *
      29              :  * As the percentage of transactions modifying the catalog normally is fairly
      30              :  * small in comparisons to ones only manipulating user data, we keep track of
      31              :  * the committed catalog modifying ones inside [xmin, xmax) instead of keeping
      32              :  * track of all running transactions like it's done in a normal snapshot. Note
      33              :  * that we're generally only looking at transactions that have acquired an
      34              :  * xid. That is we keep a list of transactions between snapshot->(xmin, xmax)
      35              :  * that we consider committed, everything else is considered aborted/in
      36              :  * progress. That also allows us not to care about subtransactions before they
      37              :  * have committed which means this module, in contrast to HS, doesn't have to
      38              :  * care about suboverflowed subtransactions and similar.
      39              :  *
      40              :  * One complexity of doing this is that to e.g. handle mixed DDL/DML
      41              :  * transactions we need Snapshots that see intermediate versions of the
      42              :  * catalog in a transaction. During normal operation this is achieved by using
      43              :  * CommandIds/cmin/cmax. The problem with that however is that for space
      44              :  * efficiency reasons, the cmin and cmax are not included in WAL records. We
      45              :  * cannot read the cmin/cmax from the tuple itself, either, because it is
      46              :  * reset on crash recovery. Even if we could, we could not decode combocids
      47              :  * which are only tracked in the original backend's memory. To work around
      48              :  * that, heapam writes an extra WAL record (XLOG_HEAP2_NEW_CID) every time a
      49              :  * catalog row is modified, which includes the cmin and cmax of the
      50              :  * tuple. During decoding, we insert the ctid->(cmin,cmax) mappings into the
      51              :  * reorder buffer, and use them at visibility checks instead of the cmin/cmax
      52              :  * on the tuple itself. Check the reorderbuffer.c's comment above
      53              :  * ResolveCminCmaxDuringDecoding() for details.
      54              :  *
      55              :  * To facilitate all this we need our own visibility routine, as the normal
      56              :  * ones are optimized for different usecases.
      57              :  *
      58              :  * To replace the normal catalog snapshots with decoding ones use the
      59              :  * SetupHistoricSnapshot() and TeardownHistoricSnapshot() functions.
      60              :  *
      61              :  *
      62              :  *
      63              :  * The snapbuild machinery is starting up in several stages, as illustrated
      64              :  * by the following graph describing the SnapBuild->state transitions:
      65              :  *
      66              :  *         +-------------------------+
      67              :  *    +----|         START           |-------------+
      68              :  *    |    +-------------------------+             |
      69              :  *    |                 |                          |
      70              :  *    |                 |                          |
      71              :  *    |        running_xacts #1                    |
      72              :  *    |                 |                          |
      73              :  *    |                 |                          |
      74              :  *    |                 v                          |
      75              :  *    |    +-------------------------+             v
      76              :  *    |    |   BUILDING_SNAPSHOT     |------------>|
      77              :  *    |    +-------------------------+             |
      78              :  *    |                 |                          |
      79              :  *    |                 |                          |
      80              :  *    | running_xacts #2, xacts from #1 finished   |
      81              :  *    |                 |                          |
      82              :  *    |                 |                          |
      83              :  *    |                 v                          |
      84              :  *    |    +-------------------------+             v
      85              :  *    |    |       FULL_SNAPSHOT     |------------>|
      86              :  *    |    +-------------------------+             |
      87              :  *    |                 |                          |
      88              :  * running_xacts        |                      saved snapshot
      89              :  * with zero xacts      |                 at running_xacts's lsn
      90              :  *    |                 |                          |
      91              :  *    | running_xacts with xacts from #2 finished  |
      92              :  *    |                 |                          |
      93              :  *    |                 v                          |
      94              :  *    |    +-------------------------+             |
      95              :  *    +--->|SNAPBUILD_CONSISTENT  |<------------+
      96              :  *         +-------------------------+
      97              :  *
      98              :  * Initially the machinery is in the START stage. When an xl_running_xacts
      99              :  * record is read that is sufficiently new (above the safe xmin horizon),
     100              :  * there's a state transition. If there were no running xacts when the
     101              :  * xl_running_xacts record was generated, we'll directly go into CONSISTENT
     102              :  * state, otherwise we'll switch to the BUILDING_SNAPSHOT state. Having a full
     103              :  * snapshot means that all transactions that start henceforth can be decoded
     104              :  * in their entirety, but transactions that started previously can't. In
     105              :  * FULL_SNAPSHOT we'll switch into CONSISTENT once all those previously
     106              :  * running transactions have committed or aborted.
     107              :  *
     108              :  * Only transactions that commit after CONSISTENT state has been reached will
     109              :  * be replayed, even though they might have started while still in
     110              :  * FULL_SNAPSHOT. That ensures that we'll reach a point where no previous
     111              :  * changes has been exported, but all the following ones will be. That point
     112              :  * is a convenient point to initialize replication from, which is why we
     113              :  * export a snapshot at that point, which *can* be used to read normal data.
     114              :  *
     115              :  * Copyright (c) 2012-2026, PostgreSQL Global Development Group
     116              :  *
     117              :  * IDENTIFICATION
     118              :  *    src/backend/replication/logical/snapbuild.c
     119              :  *
     120              :  *-------------------------------------------------------------------------
     121              :  */
     122              : 
     123              : #include "postgres.h"
     124              : 
     125              : #include <sys/stat.h>
     126              : #include <unistd.h>
     127              : 
     128              : #include "access/heapam_xlog.h"
     129              : #include "access/transam.h"
     130              : #include "access/xact.h"
     131              : #include "common/file_utils.h"
     132              : #include "miscadmin.h"
     133              : #include "pgstat.h"
     134              : #include "replication/logical.h"
     135              : #include "replication/reorderbuffer.h"
     136              : #include "replication/snapbuild.h"
     137              : #include "replication/snapbuild_internal.h"
     138              : #include "storage/fd.h"
     139              : #include "storage/lmgr.h"
     140              : #include "storage/proc.h"
     141              : #include "storage/procarray.h"
     142              : #include "storage/standby.h"
     143              : #include "utils/builtins.h"
     144              : #include "utils/memutils.h"
     145              : #include "utils/snapmgr.h"
     146              : #include "utils/snapshot.h"
     147              : #include "utils/wait_event.h"
     148              : 
     149              : 
     150              : /*
     151              :  * Starting a transaction -- which we need to do while exporting a snapshot --
     152              :  * removes knowledge about the previously used resowner, so we save it here.
     153              :  */
     154              : static ResourceOwner SavedResourceOwnerDuringExport = NULL;
     155              : static bool ExportInProgress = false;
     156              : 
     157              : /* ->committed and ->catchange manipulation */
     158              : static void SnapBuildPurgeOlderTxn(SnapBuild *builder);
     159              : 
     160              : /* snapshot building/manipulation/distribution functions */
     161              : static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder);
     162              : 
     163              : static void SnapBuildFreeSnapshot(Snapshot snap);
     164              : 
     165              : static void SnapBuildSnapIncRefcount(Snapshot snap);
     166              : 
     167              : static void SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid);
     168              : 
     169              : static inline bool SnapBuildXidHasCatalogChanges(SnapBuild *builder, TransactionId xid,
     170              :                                                  uint32 xinfo);
     171              : 
     172              : /* xlog reading helper functions for SnapBuildProcessRunningXacts */
     173              : static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn,
     174              :                                   xl_running_xacts *running);
     175              : static void SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff);
     176              : 
     177              : /* serialization functions */
     178              : static void SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn);
     179              : static bool SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn);
     180              : static void SnapBuildRestoreContents(int fd, void *dest, Size size, const char *path);
     181              : 
     182              : /*
     183              :  * Allocate a new snapshot builder.
     184              :  *
     185              :  * xmin_horizon is the xid >= which we can be sure no catalog rows have been
     186              :  * removed, start_lsn is the LSN >= we want to replay commits.
     187              :  */
     188              : SnapBuild *
     189         1222 : AllocateSnapshotBuilder(ReorderBuffer *reorder,
     190              :                         TransactionId xmin_horizon,
     191              :                         XLogRecPtr start_lsn,
     192              :                         bool need_full_snapshot,
     193              :                         bool in_slot_creation,
     194              :                         XLogRecPtr two_phase_at)
     195              : {
     196              :     MemoryContext context;
     197              :     MemoryContext oldcontext;
     198              :     SnapBuild  *builder;
     199              : 
     200              :     /* allocate memory in own context, to have better accountability */
     201         1222 :     context = AllocSetContextCreate(CurrentMemoryContext,
     202              :                                     "snapshot builder context",
     203              :                                     ALLOCSET_DEFAULT_SIZES);
     204         1222 :     oldcontext = MemoryContextSwitchTo(context);
     205              : 
     206         1222 :     builder = palloc0_object(SnapBuild);
     207              : 
     208         1222 :     builder->state = SNAPBUILD_START;
     209         1222 :     builder->context = context;
     210         1222 :     builder->reorder = reorder;
     211              :     /* Other struct members initialized by zeroing via palloc0 above */
     212              : 
     213         1222 :     builder->committed.xcnt = 0;
     214         1222 :     builder->committed.xcnt_space = 128; /* arbitrary number */
     215         1222 :     builder->committed.xip =
     216         1222 :         palloc0_array(TransactionId, builder->committed.xcnt_space);
     217         1222 :     builder->committed.includes_all_transactions = true;
     218              : 
     219         1222 :     builder->catchange.xcnt = 0;
     220         1222 :     builder->catchange.xip = NULL;
     221              : 
     222         1222 :     builder->initial_xmin_horizon = xmin_horizon;
     223         1222 :     builder->start_decoding_at = start_lsn;
     224         1222 :     builder->in_slot_creation = in_slot_creation;
     225         1222 :     builder->building_full_snapshot = need_full_snapshot;
     226         1222 :     builder->two_phase_at = two_phase_at;
     227              : 
     228         1222 :     MemoryContextSwitchTo(oldcontext);
     229              : 
     230         1222 :     return builder;
     231              : }
     232              : 
     233              : /*
     234              :  * Free a snapshot builder.
     235              :  */
     236              : void
     237          948 : FreeSnapshotBuilder(SnapBuild *builder)
     238              : {
     239          948 :     MemoryContext context = builder->context;
     240              : 
     241              :     /* free snapshot explicitly, that contains some error checking */
     242          948 :     if (builder->snapshot != NULL)
     243              :     {
     244          232 :         SnapBuildSnapDecRefcount(builder->snapshot);
     245          232 :         builder->snapshot = NULL;
     246              :     }
     247              : 
     248              :     /* other resources are deallocated via memory context reset */
     249          948 :     MemoryContextDelete(context);
     250          948 : }
     251              : 
     252              : /*
     253              :  * Free an unreferenced snapshot that has previously been built by us.
     254              :  */
     255              : static void
     256         1870 : SnapBuildFreeSnapshot(Snapshot snap)
     257              : {
     258              :     /* make sure we don't get passed an external snapshot */
     259              :     Assert(snap->snapshot_type == SNAPSHOT_HISTORIC_MVCC);
     260              : 
     261              :     /* make sure nobody modified our snapshot */
     262              :     Assert(snap->curcid == FirstCommandId);
     263              :     Assert(!snap->suboverflowed);
     264              :     Assert(!snap->takenDuringRecovery);
     265              :     Assert(snap->regd_count == 0);
     266              : 
     267              :     /* slightly more likely, so it's checked even without c-asserts */
     268         1870 :     if (snap->copied)
     269            0 :         elog(ERROR, "cannot free a copied snapshot");
     270              : 
     271         1870 :     if (snap->active_count)
     272            0 :         elog(ERROR, "cannot free an active snapshot");
     273              : 
     274         1870 :     pfree(snap);
     275         1870 : }
     276              : 
     277              : /*
     278              :  * In which state of snapshot building are we?
     279              :  */
     280              : SnapBuildState
     281      1900758 : SnapBuildCurrentState(SnapBuild *builder)
     282              : {
     283      1900758 :     return builder->state;
     284              : }
     285              : 
     286              : /*
     287              :  * Return the LSN at which the two-phase decoding was first enabled.
     288              :  */
     289              : XLogRecPtr
     290           34 : SnapBuildGetTwoPhaseAt(SnapBuild *builder)
     291              : {
     292           34 :     return builder->two_phase_at;
     293              : }
     294              : 
     295              : /*
     296              :  * Set the LSN at which two-phase decoding is enabled.
     297              :  */
     298              : void
     299            8 : SnapBuildSetTwoPhaseAt(SnapBuild *builder, XLogRecPtr ptr)
     300              : {
     301            8 :     builder->two_phase_at = ptr;
     302            8 : }
     303              : 
     304              : /*
     305              :  * Should the contents of transaction ending at 'ptr' be decoded?
     306              :  */
     307              : bool
     308       411735 : SnapBuildXactNeedsSkip(SnapBuild *builder, XLogRecPtr ptr)
     309              : {
     310       411735 :     return ptr < builder->start_decoding_at;
     311              : }
     312              : 
     313              : /*
     314              :  * Increase refcount of a snapshot.
     315              :  *
     316              :  * This is used when handing out a snapshot to some external resource or when
     317              :  * adding a Snapshot as builder->snapshot.
     318              :  */
     319              : static void
     320         8013 : SnapBuildSnapIncRefcount(Snapshot snap)
     321              : {
     322         8013 :     snap->active_count++;
     323         8013 : }
     324              : 
     325              : /*
     326              :  * Decrease refcount of a snapshot and free if the refcount reaches zero.
     327              :  *
     328              :  * Externally visible, so that external resources that have been handed an
     329              :  * IncRef'ed Snapshot can adjust its refcount easily.
     330              :  */
     331              : void
     332         7698 : SnapBuildSnapDecRefcount(Snapshot snap)
     333              : {
     334              :     /* make sure we don't get passed an external snapshot */
     335              :     Assert(snap->snapshot_type == SNAPSHOT_HISTORIC_MVCC);
     336              : 
     337              :     /* make sure nobody modified our snapshot */
     338              :     Assert(snap->curcid == FirstCommandId);
     339              :     Assert(!snap->suboverflowed);
     340              :     Assert(!snap->takenDuringRecovery);
     341              : 
     342              :     Assert(snap->regd_count == 0);
     343              : 
     344              :     Assert(snap->active_count > 0);
     345              : 
     346              :     /* slightly more likely, so it's checked even without casserts */
     347         7698 :     if (snap->copied)
     348            0 :         elog(ERROR, "cannot free a copied snapshot");
     349              : 
     350         7698 :     snap->active_count--;
     351         7698 :     if (snap->active_count == 0)
     352         1870 :         SnapBuildFreeSnapshot(snap);
     353         7698 : }
     354              : 
     355              : /*
     356              :  * Build a new snapshot, based on currently committed catalog-modifying
     357              :  * transactions.
     358              :  *
     359              :  * In-progress transactions with catalog access are *not* allowed to modify
     360              :  * these snapshots; they have to copy them and fill in appropriate ->curcid
     361              :  * and ->subxip/subxcnt values.
     362              :  */
     363              : static Snapshot
     364         2377 : SnapBuildBuildSnapshot(SnapBuild *builder)
     365              : {
     366              :     Snapshot    snapshot;
     367              :     Size        ssize;
     368              : 
     369              :     Assert(builder->state >= SNAPBUILD_FULL_SNAPSHOT);
     370              : 
     371         2377 :     ssize = sizeof(SnapshotData)
     372         2377 :         + sizeof(TransactionId) * builder->committed.xcnt
     373         2377 :         + sizeof(TransactionId) * 1 /* toplevel xid */ ;
     374              : 
     375         2377 :     snapshot = MemoryContextAllocZero(builder->context, ssize);
     376              : 
     377         2377 :     snapshot->snapshot_type = SNAPSHOT_HISTORIC_MVCC;
     378              : 
     379              :     /*
     380              :      * We misuse the original meaning of SnapshotData's xip and subxip fields
     381              :      * to make the more fitting for our needs.
     382              :      *
     383              :      * In the 'xip' array we store transactions that have to be treated as
     384              :      * committed. Since we will only ever look at tuples from transactions
     385              :      * that have modified the catalog it's more efficient to store those few
     386              :      * that exist between xmin and xmax (frequently there are none).
     387              :      *
     388              :      * Snapshots that are used in transactions that have modified the catalog
     389              :      * also use the 'subxip' array to store their toplevel xid and all the
     390              :      * subtransaction xids so we can recognize when we need to treat rows as
     391              :      * visible that are not in xip but still need to be visible. Subxip only
     392              :      * gets filled when the transaction is copied into the context of a
     393              :      * catalog modifying transaction since we otherwise share a snapshot
     394              :      * between transactions. As long as a txn hasn't modified the catalog it
     395              :      * doesn't need to treat any uncommitted rows as visible, so there is no
     396              :      * need for those xids.
     397              :      *
     398              :      * Both arrays are qsort'ed so that we can use bsearch() on them.
     399              :      */
     400              :     Assert(TransactionIdIsNormal(builder->xmin));
     401              :     Assert(TransactionIdIsNormal(builder->xmax));
     402              : 
     403         2377 :     snapshot->xmin = builder->xmin;
     404         2377 :     snapshot->xmax = builder->xmax;
     405              : 
     406              :     /* store all transactions to be treated as committed by this snapshot */
     407         2377 :     snapshot->xip =
     408         2377 :         (TransactionId *) ((char *) snapshot + sizeof(SnapshotData));
     409         2377 :     snapshot->xcnt = builder->committed.xcnt;
     410         2377 :     memcpy(snapshot->xip,
     411         2377 :            builder->committed.xip,
     412         2377 :            builder->committed.xcnt * sizeof(TransactionId));
     413              : 
     414              :     /* sort so we can bsearch() */
     415         2377 :     qsort(snapshot->xip, snapshot->xcnt, sizeof(TransactionId), xidComparator);
     416              : 
     417              :     /*
     418              :      * Initially, subxip is empty, i.e. it's a snapshot to be used by
     419              :      * transactions that don't modify the catalog. Will be filled by
     420              :      * ReorderBufferCopySnap() if necessary.
     421              :      */
     422         2377 :     snapshot->subxcnt = 0;
     423         2377 :     snapshot->subxip = NULL;
     424              : 
     425         2377 :     snapshot->suboverflowed = false;
     426         2377 :     snapshot->takenDuringRecovery = false;
     427         2377 :     snapshot->copied = false;
     428         2377 :     snapshot->curcid = FirstCommandId;
     429         2377 :     snapshot->active_count = 0;
     430         2377 :     snapshot->regd_count = 0;
     431         2377 :     snapshot->snapXactCompletionCount = 0;
     432              : 
     433         2377 :     return snapshot;
     434              : }
     435              : 
     436              : /*
     437              :  * Build the initial slot snapshot and convert it to a normal snapshot that
     438              :  * is understood by HeapTupleSatisfiesMVCC.
     439              :  *
     440              :  * The snapshot will be usable directly in current transaction or exported
     441              :  * for loading in different transaction.
     442              :  */
     443              : Snapshot
     444          221 : SnapBuildInitialSnapshot(SnapBuild *builder)
     445              : {
     446              :     Snapshot    snap;
     447              :     TransactionId xid;
     448              :     TransactionId safeXid;
     449              :     TransactionId *newxip;
     450          221 :     int         newxcnt = 0;
     451              : 
     452              :     Assert(XactIsoLevel == XACT_REPEATABLE_READ);
     453              :     Assert(builder->building_full_snapshot);
     454              : 
     455              :     /* don't allow older snapshots */
     456          221 :     InvalidateCatalogSnapshot();    /* about to overwrite MyProc->xmin */
     457          221 :     if (HaveRegisteredOrActiveSnapshot())
     458            0 :         elog(ERROR, "cannot build an initial slot snapshot when snapshots exist");
     459              :     Assert(!HistoricSnapshotActive());
     460              : 
     461          221 :     if (builder->state != SNAPBUILD_CONSISTENT)
     462            0 :         elog(ERROR, "cannot build an initial slot snapshot before reaching a consistent state");
     463              : 
     464          221 :     if (!builder->committed.includes_all_transactions)
     465            0 :         elog(ERROR, "cannot build an initial slot snapshot, not all transactions are monitored anymore");
     466              : 
     467              :     /* so we don't overwrite the existing value */
     468          221 :     if (TransactionIdIsValid(MyProc->xmin))
     469            0 :         elog(ERROR, "cannot build an initial slot snapshot when MyProc->xmin already is valid");
     470              : 
     471          221 :     snap = SnapBuildBuildSnapshot(builder);
     472              : 
     473              :     /*
     474              :      * We know that snap->xmin is alive, enforced by the logical xmin
     475              :      * mechanism. Due to that we can do this without locks, we're only
     476              :      * changing our own value.
     477              :      *
     478              :      * Building an initial snapshot is expensive and an unenforced xmin
     479              :      * horizon would have bad consequences, therefore always double-check that
     480              :      * the horizon is enforced.
     481              :      */
     482          221 :     LWLockAcquire(ProcArrayLock, LW_SHARED);
     483          221 :     safeXid = GetOldestSafeDecodingTransactionId(false);
     484          221 :     LWLockRelease(ProcArrayLock);
     485              : 
     486          221 :     if (TransactionIdFollows(safeXid, snap->xmin))
     487            0 :         elog(ERROR, "cannot build an initial slot snapshot as oldest safe xid %u follows snapshot's xmin %u",
     488              :              safeXid, snap->xmin);
     489              : 
     490          221 :     MyProc->xmin = snap->xmin;
     491              : 
     492              :     /* allocate in transaction context */
     493          221 :     newxip = palloc_array(TransactionId, GetMaxSnapshotXidCount());
     494              : 
     495              :     /*
     496              :      * snapbuild.c builds transactions in an "inverted" manner, which means it
     497              :      * stores committed transactions in ->xip, not ones in progress. Build a
     498              :      * classical snapshot by marking all non-committed transactions as
     499              :      * in-progress. This can be expensive.
     500              :      */
     501          221 :     for (xid = snap->xmin; NormalTransactionIdPrecedes(xid, snap->xmax);)
     502              :     {
     503              :         void       *test;
     504              : 
     505              :         /*
     506              :          * Check whether transaction committed using the decoding snapshot
     507              :          * meaning of ->xip.
     508              :          */
     509            0 :         test = bsearch(&xid, snap->xip, snap->xcnt,
     510              :                        sizeof(TransactionId), xidComparator);
     511              : 
     512            0 :         if (test == NULL)
     513              :         {
     514            0 :             if (newxcnt >= GetMaxSnapshotXidCount())
     515            0 :                 ereport(ERROR,
     516              :                         (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
     517              :                          errmsg("initial slot snapshot too large")));
     518              : 
     519            0 :             newxip[newxcnt++] = xid;
     520              :         }
     521              : 
     522            0 :         TransactionIdAdvance(xid);
     523              :     }
     524              : 
     525              :     /* adjust remaining snapshot fields as needed */
     526          221 :     snap->snapshot_type = SNAPSHOT_MVCC;
     527          221 :     snap->xcnt = newxcnt;
     528          221 :     snap->xip = newxip;
     529              : 
     530          221 :     return snap;
     531              : }
     532              : 
     533              : /*
     534              :  * Export a snapshot so it can be set in another session with SET TRANSACTION
     535              :  * SNAPSHOT.
     536              :  *
     537              :  * For that we need to start a transaction in the current backend as the
     538              :  * importing side checks whether the source transaction is still open to make
     539              :  * sure the xmin horizon hasn't advanced since then.
     540              :  */
     541              : const char *
     542            1 : SnapBuildExportSnapshot(SnapBuild *builder)
     543              : {
     544              :     Snapshot    snap;
     545              :     char       *snapname;
     546              : 
     547            1 :     if (IsTransactionOrTransactionBlock())
     548            0 :         elog(ERROR, "cannot export a snapshot from within a transaction");
     549              : 
     550            1 :     if (SavedResourceOwnerDuringExport)
     551            0 :         elog(ERROR, "can only export one snapshot at a time");
     552              : 
     553            1 :     SavedResourceOwnerDuringExport = CurrentResourceOwner;
     554            1 :     ExportInProgress = true;
     555              : 
     556            1 :     StartTransactionCommand();
     557              : 
     558              :     /* There doesn't seem to a nice API to set these */
     559            1 :     XactIsoLevel = XACT_REPEATABLE_READ;
     560            1 :     XactReadOnly = true;
     561              : 
     562            1 :     snap = SnapBuildInitialSnapshot(builder);
     563              : 
     564              :     /*
     565              :      * now that we've built a plain snapshot, make it active and use the
     566              :      * normal mechanisms for exporting it
     567              :      */
     568            1 :     snapname = ExportSnapshot(snap);
     569              : 
     570            1 :     ereport(LOG,
     571              :             (errmsg_plural("exported logical decoding snapshot: \"%s\" with %u transaction ID",
     572              :                            "exported logical decoding snapshot: \"%s\" with %u transaction IDs",
     573              :                            snap->xcnt,
     574              :                            snapname, snap->xcnt)));
     575            1 :     return snapname;
     576              : }
     577              : 
     578              : /*
     579              :  * Ensure there is a snapshot and if not build one for current transaction.
     580              :  */
     581              : Snapshot
     582            9 : SnapBuildGetOrBuildSnapshot(SnapBuild *builder)
     583              : {
     584              :     Assert(builder->state == SNAPBUILD_CONSISTENT);
     585              : 
     586              :     /* only build a new snapshot if we don't have a prebuilt one */
     587            9 :     if (builder->snapshot == NULL)
     588              :     {
     589            2 :         builder->snapshot = SnapBuildBuildSnapshot(builder);
     590              :         /* increase refcount for the snapshot builder */
     591            2 :         SnapBuildSnapIncRefcount(builder->snapshot);
     592              :     }
     593              : 
     594            9 :     return builder->snapshot;
     595              : }
     596              : 
     597              : /*
     598              :  * Reset a previously SnapBuildExportSnapshot()'ed snapshot if there is
     599              :  * any. Aborts the previously started transaction and resets the resource
     600              :  * owner back to its original value.
     601              :  */
     602              : void
     603         5915 : SnapBuildClearExportedSnapshot(void)
     604              : {
     605              :     ResourceOwner tmpResOwner;
     606              : 
     607              :     /* nothing exported, that is the usual case */
     608         5915 :     if (!ExportInProgress)
     609         5914 :         return;
     610              : 
     611            1 :     if (!IsTransactionState())
     612            0 :         elog(ERROR, "clearing exported snapshot in wrong transaction state");
     613              : 
     614              :     /*
     615              :      * AbortCurrentTransaction() takes care of resetting the snapshot state,
     616              :      * so remember SavedResourceOwnerDuringExport.
     617              :      */
     618            1 :     tmpResOwner = SavedResourceOwnerDuringExport;
     619              : 
     620              :     /* make sure nothing could have ever happened */
     621            1 :     AbortCurrentTransaction();
     622              : 
     623            1 :     CurrentResourceOwner = tmpResOwner;
     624              : }
     625              : 
     626              : /*
     627              :  * Clear snapshot export state during transaction abort.
     628              :  */
     629              : void
     630        35696 : SnapBuildResetExportedSnapshotState(void)
     631              : {
     632        35696 :     SavedResourceOwnerDuringExport = NULL;
     633        35696 :     ExportInProgress = false;
     634        35696 : }
     635              : 
     636              : /*
     637              :  * Handle the effects of a single heap change, appropriate to the current state
     638              :  * of the snapshot builder and returns whether changes made at (xid, lsn) can
     639              :  * be decoded.
     640              :  */
     641              : bool
     642      1288211 : SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
     643              : {
     644              :     /*
     645              :      * We can't handle data in transactions if we haven't built a snapshot
     646              :      * yet, so don't store them.
     647              :      */
     648      1288211 :     if (builder->state < SNAPBUILD_FULL_SNAPSHOT)
     649            0 :         return false;
     650              : 
     651              :     /*
     652              :      * No point in keeping track of changes in transactions that we don't have
     653              :      * enough information about to decode. This means that they started before
     654              :      * we got into the SNAPBUILD_FULL_SNAPSHOT state.
     655              :      */
     656      1288214 :     if (builder->state < SNAPBUILD_CONSISTENT &&
     657            3 :         TransactionIdPrecedes(xid, builder->next_phase_at))
     658            0 :         return false;
     659              : 
     660              :     /*
     661              :      * If the reorderbuffer doesn't yet have a snapshot, add one now, it will
     662              :      * be needed to decode the change we're currently processing.
     663              :      */
     664      1288211 :     if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
     665              :     {
     666              :         /* only build a new snapshot if we don't have a prebuilt one */
     667         4161 :         if (builder->snapshot == NULL)
     668              :         {
     669          463 :             builder->snapshot = SnapBuildBuildSnapshot(builder);
     670              :             /* increase refcount for the snapshot builder */
     671          463 :             SnapBuildSnapIncRefcount(builder->snapshot);
     672              :         }
     673              : 
     674              :         /*
     675              :          * Increase refcount for the transaction we're handing the snapshot
     676              :          * out to.
     677              :          */
     678         4161 :         SnapBuildSnapIncRefcount(builder->snapshot);
     679         4161 :         ReorderBufferSetBaseSnapshot(builder->reorder, xid, lsn,
     680              :                                      builder->snapshot);
     681              :     }
     682              : 
     683      1288211 :     return true;
     684              : }
     685              : 
     686              : /*
     687              :  * Do CommandId/combo CID handling after reading an xl_heap_new_cid record.
     688              :  * This implies that a transaction has done some form of write to system
     689              :  * catalogs.
     690              :  */
     691              : void
     692        28210 : SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
     693              :                        XLogRecPtr lsn, xl_heap_new_cid *xlrec)
     694              : {
     695              :     CommandId   cid;
     696              : 
     697              :     /*
     698              :      * we only log new_cid's if a catalog tuple was modified, so mark the
     699              :      * transaction as containing catalog modifications
     700              :      */
     701        28210 :     ReorderBufferXidSetCatalogChanges(builder->reorder, xid, lsn);
     702              : 
     703        28210 :     ReorderBufferAddNewTupleCids(builder->reorder, xlrec->top_xid, lsn,
     704              :                                  xlrec->target_locator, xlrec->target_tid,
     705              :                                  xlrec->cmin, xlrec->cmax,
     706              :                                  xlrec->combocid);
     707              : 
     708              :     /* figure out new command id */
     709        28210 :     if (xlrec->cmin != InvalidCommandId &&
     710        23791 :         xlrec->cmax != InvalidCommandId)
     711         3319 :         cid = Max(xlrec->cmin, xlrec->cmax);
     712        24891 :     else if (xlrec->cmax != InvalidCommandId)
     713         4419 :         cid = xlrec->cmax;
     714        20472 :     else if (xlrec->cmin != InvalidCommandId)
     715        20472 :         cid = xlrec->cmin;
     716              :     else
     717              :     {
     718            0 :         cid = InvalidCommandId; /* silence compiler */
     719            0 :         elog(ERROR, "xl_heap_new_cid record without a valid CommandId");
     720              :     }
     721              : 
     722        28210 :     ReorderBufferAddNewCommandId(builder->reorder, xid, lsn, cid + 1);
     723        28210 : }
     724              : 
     725              : /*
     726              :  * Add a new Snapshot and invalidation messages to all transactions we're
     727              :  * decoding that currently are in-progress so they can see new catalog contents
     728              :  * made by the transaction that just committed. This is necessary because those
     729              :  * in-progress transactions will use the new catalog's contents from here on
     730              :  * (at the very least everything they do needs to be compatible with newer
     731              :  * catalog contents).
     732              :  */
     733              : static void
     734         1683 : SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid)
     735              : {
     736              :     dlist_iter  txn_i;
     737              :     ReorderBufferTXN *txn;
     738              : 
     739              :     /*
     740              :      * Iterate through all toplevel transactions. This can include
     741              :      * subtransactions which we just don't yet know to be that, but that's
     742              :      * fine, they will just get an unnecessary snapshot and invalidations
     743              :      * queued.
     744              :      */
     745         3401 :     dlist_foreach(txn_i, &builder->reorder->toplevel_by_lsn)
     746              :     {
     747         1718 :         txn = dlist_container(ReorderBufferTXN, node, txn_i.cur);
     748              : 
     749              :         Assert(TransactionIdIsValid(txn->xid));
     750              : 
     751              :         /*
     752              :          * If we don't have a base snapshot yet, there are no changes in this
     753              :          * transaction which in turn implies we don't yet need a snapshot at
     754              :          * all. We'll add a snapshot when the first change gets queued.
     755              :          *
     756              :          * Similarly, we don't need to add invalidations to a transaction
     757              :          * whose base snapshot is not yet set. Once a base snapshot is built,
     758              :          * it will include the xids of committed transactions that have
     759              :          * modified the catalog, thus reflecting the new catalog contents. The
     760              :          * existing catalog cache will have already been invalidated after
     761              :          * processing the invalidations in the transaction that modified
     762              :          * catalogs, ensuring that a fresh cache is constructed during
     763              :          * decoding.
     764              :          *
     765              :          * NB: This works correctly even for subtransactions because
     766              :          * ReorderBufferAssignChild() takes care to transfer the base snapshot
     767              :          * to the top-level transaction, and while iterating the changequeue
     768              :          * we'll get the change from the subtxn.
     769              :          */
     770         1718 :         if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, txn->xid))
     771            2 :             continue;
     772              : 
     773              :         /*
     774              :          * We don't need to add snapshot or invalidations to prepared
     775              :          * transactions as they should not see the new catalog contents.
     776              :          */
     777         1716 :         if (rbtxn_is_prepared(txn))
     778           28 :             continue;
     779              : 
     780         1688 :         elog(DEBUG2, "adding a new snapshot and invalidations to %u at %X/%08X",
     781              :              txn->xid, LSN_FORMAT_ARGS(lsn));
     782              : 
     783              :         /*
     784              :          * increase the snapshot's refcount for the transaction we are handing
     785              :          * it out to
     786              :          */
     787         1688 :         SnapBuildSnapIncRefcount(builder->snapshot);
     788         1688 :         ReorderBufferAddSnapshot(builder->reorder, txn->xid, lsn,
     789              :                                  builder->snapshot);
     790              : 
     791              :         /*
     792              :          * Add invalidation messages to the reorder buffer of in-progress
     793              :          * transactions except the current committed transaction, for which we
     794              :          * will execute invalidations at the end.
     795              :          *
     796              :          * It is required, otherwise, we will end up using the stale catcache
     797              :          * contents built by the current transaction even after its decoding,
     798              :          * which should have been invalidated due to concurrent catalog
     799              :          * changing transaction.
     800              :          *
     801              :          * Distribute only the invalidation messages generated by the current
     802              :          * committed transaction. Invalidation messages received from other
     803              :          * transactions would have already been propagated to the relevant
     804              :          * in-progress transactions. This transaction would have processed
     805              :          * those invalidations, ensuring that subsequent transactions observe
     806              :          * a consistent cache state.
     807              :          */
     808         1688 :         if (txn->xid != xid)
     809              :         {
     810              :             uint32      ninvalidations;
     811           33 :             SharedInvalidationMessage *msgs = NULL;
     812              : 
     813           33 :             ninvalidations = ReorderBufferGetInvalidations(builder->reorder,
     814              :                                                            xid, &msgs);
     815              : 
     816           33 :             if (ninvalidations > 0)
     817              :             {
     818              :                 Assert(msgs != NULL);
     819              : 
     820           29 :                 ReorderBufferAddDistributedInvalidations(builder->reorder,
     821              :                                                          txn->xid, lsn,
     822              :                                                          ninvalidations, msgs);
     823              :             }
     824              :         }
     825              :     }
     826         1683 : }
     827              : 
     828              : /*
     829              :  * Keep track of a new catalog changing transaction that has committed.
     830              :  */
     831              : static void
     832         1692 : SnapBuildAddCommittedTxn(SnapBuild *builder, TransactionId xid)
     833              : {
     834              :     Assert(TransactionIdIsValid(xid));
     835              : 
     836         1692 :     if (builder->committed.xcnt == builder->committed.xcnt_space)
     837              :     {
     838            0 :         builder->committed.xcnt_space = builder->committed.xcnt_space * 2 + 1;
     839              : 
     840            0 :         elog(DEBUG1, "increasing space for committed transactions to %u",
     841              :              (uint32) builder->committed.xcnt_space);
     842              : 
     843            0 :         builder->committed.xip = repalloc_array(builder->committed.xip,
     844              :                                                 TransactionId,
     845              :                                                 builder->committed.xcnt_space);
     846              :     }
     847              : 
     848              :     /*
     849              :      * TODO: It might make sense to keep the array sorted here instead of
     850              :      * doing it every time we build a new snapshot. On the other hand this
     851              :      * gets called repeatedly when a transaction with subtransactions commits.
     852              :      */
     853         1692 :     builder->committed.xip[builder->committed.xcnt++] = xid;
     854         1692 : }
     855              : 
     856              : /*
     857              :  * Remove knowledge about transactions we treat as committed or containing catalog
     858              :  * changes that are smaller than ->xmin. Those won't ever get checked via
     859              :  * the ->committed or ->catchange array, respectively. The committed xids will
     860              :  * get checked via the clog machinery.
     861              :  *
     862              :  * We can ideally remove the transaction from catchange array once it is
     863              :  * finished (committed/aborted) but that could be costly as we need to maintain
     864              :  * the xids order in the array.
     865              :  */
     866              : static void
     867          546 : SnapBuildPurgeOlderTxn(SnapBuild *builder)
     868              : {
     869              :     int         off;
     870              :     TransactionId *workspace;
     871          546 :     int         surviving_xids = 0;
     872              : 
     873              :     /* not ready yet */
     874          546 :     if (!TransactionIdIsNormal(builder->xmin))
     875            0 :         return;
     876              : 
     877              :     /* TODO: Neater algorithm than just copying and iterating? */
     878              :     workspace =
     879          546 :         MemoryContextAlloc(builder->context,
     880          546 :                            builder->committed.xcnt * sizeof(TransactionId));
     881              : 
     882              :     /* copy xids that still are interesting to workspace */
     883          924 :     for (off = 0; off < builder->committed.xcnt; off++)
     884              :     {
     885          378 :         if (NormalTransactionIdPrecedes(builder->committed.xip[off],
     886              :                                         builder->xmin))
     887              :             ;                   /* remove */
     888              :         else
     889            1 :             workspace[surviving_xids++] = builder->committed.xip[off];
     890              :     }
     891              : 
     892              :     /* copy workspace back to persistent state */
     893          546 :     memcpy(builder->committed.xip, workspace,
     894              :            surviving_xids * sizeof(TransactionId));
     895              : 
     896          546 :     elog(DEBUG3, "purged committed transactions from %u to %u, xmin: %u, xmax: %u",
     897              :          (uint32) builder->committed.xcnt, (uint32) surviving_xids,
     898              :          builder->xmin, builder->xmax);
     899          546 :     builder->committed.xcnt = surviving_xids;
     900              : 
     901          546 :     pfree(workspace);
     902              : 
     903              :     /*
     904              :      * Purge xids in ->catchange as well. The purged array must also be sorted
     905              :      * in xidComparator order.
     906              :      */
     907          546 :     if (builder->catchange.xcnt > 0)
     908              :     {
     909              :         /*
     910              :          * Since catchange.xip is sorted, we find the lower bound of xids that
     911              :          * are still interesting.
     912              :          */
     913            9 :         for (off = 0; off < builder->catchange.xcnt; off++)
     914              :         {
     915            6 :             if (TransactionIdFollowsOrEquals(builder->catchange.xip[off],
     916              :                                              builder->xmin))
     917            1 :                 break;
     918              :         }
     919              : 
     920            4 :         surviving_xids = builder->catchange.xcnt - off;
     921              : 
     922            4 :         if (surviving_xids > 0)
     923              :         {
     924            1 :             memmove(builder->catchange.xip, &(builder->catchange.xip[off]),
     925              :                     surviving_xids * sizeof(TransactionId));
     926              :         }
     927              :         else
     928              :         {
     929            3 :             pfree(builder->catchange.xip);
     930            3 :             builder->catchange.xip = NULL;
     931              :         }
     932              : 
     933            4 :         elog(DEBUG3, "purged catalog modifying transactions from %u to %u, xmin: %u, xmax: %u",
     934              :              (uint32) builder->catchange.xcnt, (uint32) surviving_xids,
     935              :              builder->xmin, builder->xmax);
     936            4 :         builder->catchange.xcnt = surviving_xids;
     937              :     }
     938              : }
     939              : 
     940              : /*
     941              :  * Handle everything that needs to be done when a transaction commits
     942              :  */
     943              : void
     944         3986 : SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
     945              :                    int nsubxacts, TransactionId *subxacts, uint32 xinfo)
     946              : {
     947              :     int         nxact;
     948              : 
     949         3986 :     bool        needs_snapshot = false;
     950         3986 :     bool        needs_timetravel = false;
     951         3986 :     bool        sub_needs_timetravel = false;
     952              : 
     953         3986 :     TransactionId xmax = xid;
     954              : 
     955              :     /*
     956              :      * Transactions preceding BUILDING_SNAPSHOT will neither be decoded, nor
     957              :      * will they be part of a snapshot.  So we don't need to record anything.
     958              :      */
     959         3986 :     if (builder->state == SNAPBUILD_START ||
     960         3986 :         (builder->state == SNAPBUILD_BUILDING_SNAPSHOT &&
     961            0 :          TransactionIdPrecedes(xid, builder->next_phase_at)))
     962              :     {
     963              :         /* ensure that only commits after this are getting replayed */
     964            0 :         if (builder->start_decoding_at <= lsn)
     965            0 :             builder->start_decoding_at = lsn + 1;
     966            0 :         return;
     967              :     }
     968              : 
     969         3986 :     if (builder->state < SNAPBUILD_CONSISTENT)
     970              :     {
     971              :         /* ensure that only commits after this are getting replayed */
     972            5 :         if (builder->start_decoding_at <= lsn)
     973            2 :             builder->start_decoding_at = lsn + 1;
     974              : 
     975              :         /*
     976              :          * If building an exportable snapshot, force xid to be tracked, even
     977              :          * if the transaction didn't modify the catalog.
     978              :          */
     979            5 :         if (builder->building_full_snapshot)
     980              :         {
     981            0 :             needs_timetravel = true;
     982              :         }
     983              :     }
     984              : 
     985         5203 :     for (nxact = 0; nxact < nsubxacts; nxact++)
     986              :     {
     987         1217 :         TransactionId subxid = subxacts[nxact];
     988              : 
     989              :         /*
     990              :          * Add subtransaction to base snapshot if catalog modifying, we don't
     991              :          * distinguish to toplevel transactions there.
     992              :          */
     993         1217 :         if (SnapBuildXidHasCatalogChanges(builder, subxid, xinfo))
     994              :         {
     995            9 :             sub_needs_timetravel = true;
     996            9 :             needs_snapshot = true;
     997              : 
     998            9 :             elog(DEBUG1, "found subtransaction %u:%u with catalog changes",
     999              :                  xid, subxid);
    1000              : 
    1001            9 :             SnapBuildAddCommittedTxn(builder, subxid);
    1002              : 
    1003            9 :             if (NormalTransactionIdFollows(subxid, xmax))
    1004            9 :                 xmax = subxid;
    1005              :         }
    1006              : 
    1007              :         /*
    1008              :          * If we're forcing timetravel we also need visibility information
    1009              :          * about subtransaction, so keep track of subtransaction's state, even
    1010              :          * if not catalog modifying.  Don't need to distribute a snapshot in
    1011              :          * that case.
    1012              :          */
    1013         1208 :         else if (needs_timetravel)
    1014              :         {
    1015            0 :             SnapBuildAddCommittedTxn(builder, subxid);
    1016            0 :             if (NormalTransactionIdFollows(subxid, xmax))
    1017            0 :                 xmax = subxid;
    1018              :         }
    1019              :     }
    1020              : 
    1021              :     /* if top-level modified catalog, it'll need a snapshot */
    1022         3986 :     if (SnapBuildXidHasCatalogChanges(builder, xid, xinfo))
    1023              :     {
    1024         1682 :         elog(DEBUG2, "found top level transaction %u, with catalog changes",
    1025              :              xid);
    1026         1682 :         needs_snapshot = true;
    1027         1682 :         needs_timetravel = true;
    1028         1682 :         SnapBuildAddCommittedTxn(builder, xid);
    1029              :     }
    1030         2304 :     else if (sub_needs_timetravel)
    1031              :     {
    1032              :         /* track toplevel txn as well, subxact alone isn't meaningful */
    1033            1 :         elog(DEBUG2, "forced transaction %u to do timetravel due to one of its subtransactions",
    1034              :              xid);
    1035            1 :         needs_timetravel = true;
    1036            1 :         SnapBuildAddCommittedTxn(builder, xid);
    1037              :     }
    1038         2303 :     else if (needs_timetravel)
    1039              :     {
    1040            0 :         elog(DEBUG2, "forced transaction %u to do timetravel", xid);
    1041              : 
    1042            0 :         SnapBuildAddCommittedTxn(builder, xid);
    1043              :     }
    1044              : 
    1045         3986 :     if (!needs_timetravel)
    1046              :     {
    1047              :         /* record that we cannot export a general snapshot anymore */
    1048         2303 :         builder->committed.includes_all_transactions = false;
    1049              :     }
    1050              : 
    1051              :     Assert(!needs_snapshot || needs_timetravel);
    1052              : 
    1053              :     /*
    1054              :      * Adjust xmax of the snapshot builder, we only do that for committed,
    1055              :      * catalog modifying, transactions, everything else isn't interesting for
    1056              :      * us since we'll never look at the respective rows.
    1057              :      */
    1058         3986 :     if (needs_timetravel &&
    1059         3366 :         (!TransactionIdIsValid(builder->xmax) ||
    1060         1683 :          TransactionIdFollowsOrEquals(xmax, builder->xmax)))
    1061              :     {
    1062         1683 :         builder->xmax = xmax;
    1063         1683 :         TransactionIdAdvance(builder->xmax);
    1064              :     }
    1065              : 
    1066              :     /* if there's any reason to build a historic snapshot, do so now */
    1067         3986 :     if (needs_snapshot)
    1068              :     {
    1069              :         /*
    1070              :          * If we haven't built a complete snapshot yet there's no need to hand
    1071              :          * it out, it wouldn't (and couldn't) be used anyway.
    1072              :          */
    1073         1683 :         if (builder->state < SNAPBUILD_FULL_SNAPSHOT)
    1074            0 :             return;
    1075              : 
    1076              :         /*
    1077              :          * Decrease the snapshot builder's refcount of the old snapshot, note
    1078              :          * that it still will be used if it has been handed out to the
    1079              :          * reorderbuffer earlier.
    1080              :          */
    1081         1683 :         if (builder->snapshot)
    1082         1683 :             SnapBuildSnapDecRefcount(builder->snapshot);
    1083              : 
    1084         1683 :         builder->snapshot = SnapBuildBuildSnapshot(builder);
    1085              : 
    1086              :         /* we might need to execute invalidations, add snapshot */
    1087         1683 :         if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
    1088              :         {
    1089            8 :             SnapBuildSnapIncRefcount(builder->snapshot);
    1090            8 :             ReorderBufferSetBaseSnapshot(builder->reorder, xid, lsn,
    1091              :                                          builder->snapshot);
    1092              :         }
    1093              : 
    1094              :         /* refcount of the snapshot builder for the new snapshot */
    1095         1683 :         SnapBuildSnapIncRefcount(builder->snapshot);
    1096              : 
    1097              :         /*
    1098              :          * Add a new catalog snapshot and invalidations messages to all
    1099              :          * currently running transactions.
    1100              :          */
    1101         1683 :         SnapBuildDistributeSnapshotAndInval(builder, lsn, xid);
    1102              :     }
    1103              : }
    1104              : 
    1105              : /*
    1106              :  * Check the reorder buffer and the snapshot to see if the given transaction has
    1107              :  * modified catalogs.
    1108              :  */
    1109              : static inline bool
    1110         5203 : SnapBuildXidHasCatalogChanges(SnapBuild *builder, TransactionId xid,
    1111              :                               uint32 xinfo)
    1112              : {
    1113         5203 :     if (ReorderBufferXidHasCatalogChanges(builder->reorder, xid))
    1114         1687 :         return true;
    1115              : 
    1116              :     /*
    1117              :      * The transactions that have changed catalogs must have invalidation
    1118              :      * info.
    1119              :      */
    1120         3516 :     if (!(xinfo & XACT_XINFO_HAS_INVALS))
    1121         3508 :         return false;
    1122              : 
    1123              :     /* Check the catchange XID array */
    1124           12 :     return ((builder->catchange.xcnt > 0) &&
    1125            4 :             (bsearch(&xid, builder->catchange.xip, builder->catchange.xcnt,
    1126              :                      sizeof(TransactionId), xidComparator) != NULL));
    1127              : }
    1128              : 
    1129              : /* -----------------------------------
    1130              :  * Snapshot building functions dealing with xlog records
    1131              :  * -----------------------------------
    1132              :  */
    1133              : 
    1134              : /*
    1135              :  * Process a running xacts record, and use its information to first build a
    1136              :  * historic snapshot and later to release resources that aren't needed
    1137              :  * anymore.
    1138              :  */
    1139              : void
    1140         1710 : SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
    1141              : {
    1142              :     ReorderBufferTXN *txn;
    1143              :     TransactionId xmin;
    1144              : 
    1145              :     /*
    1146              :      * If we're not consistent yet, inspect the record to see whether it
    1147              :      * allows to get closer to being consistent. If we are consistent, dump
    1148              :      * our snapshot so others or we, after a restart, can use it.
    1149              :      */
    1150         1710 :     if (builder->state < SNAPBUILD_CONSISTENT)
    1151              :     {
    1152              :         /* returns false if there's no point in performing cleanup just yet */
    1153         1186 :         if (!SnapBuildFindSnapshot(builder, lsn, running))
    1154         1162 :             return;
    1155              :     }
    1156              :     else
    1157          524 :         SnapBuildSerialize(builder, lsn);
    1158              : 
    1159              :     /*
    1160              :      * Update range of interesting xids based on the running xacts
    1161              :      * information. We don't increase ->xmax using it, because once we are in
    1162              :      * a consistent state we can do that ourselves and much more efficiently
    1163              :      * so, because we only need to do it for catalog transactions since we
    1164              :      * only ever look at those.
    1165              :      *
    1166              :      * NB: We only increase xmax when a catalog modifying transaction commits
    1167              :      * (see SnapBuildCommitTxn).  Because of this, xmax can be lower than
    1168              :      * xmin, which looks odd but is correct and actually more efficient, since
    1169              :      * we hit fast paths in heapam_visibility.c.
    1170              :      */
    1171          546 :     builder->xmin = running->oldestRunningXid;
    1172              : 
    1173              :     /* Remove transactions we don't need to keep track off anymore */
    1174          546 :     SnapBuildPurgeOlderTxn(builder);
    1175              : 
    1176              :     /*
    1177              :      * Advance the xmin limit for the current replication slot, to allow
    1178              :      * vacuum to clean up the tuples this slot has been protecting.
    1179              :      *
    1180              :      * The reorderbuffer might have an xmin among the currently running
    1181              :      * snapshots; use it if so.  If not, we need only consider the snapshots
    1182              :      * we'll produce later, which can't be less than the oldest running xid in
    1183              :      * the record we're reading now.
    1184              :      */
    1185          546 :     xmin = ReorderBufferGetOldestXmin(builder->reorder);
    1186          546 :     if (xmin == InvalidTransactionId)
    1187          499 :         xmin = running->oldestRunningXid;
    1188          546 :     elog(DEBUG3, "xmin: %u, xmax: %u, oldest running: %u, oldest xmin: %u",
    1189              :          builder->xmin, builder->xmax, running->oldestRunningXid, xmin);
    1190          546 :     LogicalIncreaseXminForSlot(lsn, xmin);
    1191              : 
    1192              :     /*
    1193              :      * Also tell the slot where we can restart decoding from. We don't want to
    1194              :      * do that after every commit because changing that implies an fsync of
    1195              :      * the logical slot's state file, so we only do it every time we see a
    1196              :      * running xacts record.
    1197              :      *
    1198              :      * Do so by looking for the oldest in progress transaction (determined by
    1199              :      * the first LSN of any of its relevant records). Every transaction
    1200              :      * remembers the last location we stored the snapshot to disk before its
    1201              :      * beginning. That point is where we can restart from.
    1202              :      */
    1203              : 
    1204              :     /*
    1205              :      * Can't know about a serialized snapshot's location if we're not
    1206              :      * consistent.
    1207              :      */
    1208          546 :     if (builder->state < SNAPBUILD_CONSISTENT)
    1209           17 :         return;
    1210              : 
    1211          529 :     txn = ReorderBufferGetOldestTXN(builder->reorder);
    1212              : 
    1213              :     /*
    1214              :      * oldest ongoing txn might have started when we didn't yet serialize
    1215              :      * anything because we hadn't reached a consistent state yet.
    1216              :      */
    1217          529 :     if (txn != NULL && XLogRecPtrIsValid(txn->restart_decoding_lsn))
    1218           23 :         LogicalIncreaseRestartDecodingForSlot(lsn, txn->restart_decoding_lsn);
    1219              : 
    1220              :     /*
    1221              :      * No in-progress transaction, can reuse the last serialized snapshot if
    1222              :      * we have one.
    1223              :      */
    1224          506 :     else if (txn == NULL &&
    1225          473 :              XLogRecPtrIsValid(builder->reorder->current_restart_decoding_lsn) &&
    1226          471 :              XLogRecPtrIsValid(builder->last_serialized_snapshot))
    1227          471 :         LogicalIncreaseRestartDecodingForSlot(lsn,
    1228              :                                               builder->last_serialized_snapshot);
    1229              : }
    1230              : 
    1231              : 
    1232              : /*
    1233              :  * Build the start of a snapshot that's capable of decoding the catalog.
    1234              :  *
    1235              :  * Helper function for SnapBuildProcessRunningXacts() while we're not yet
    1236              :  * consistent.
    1237              :  *
    1238              :  * Returns true if there is a point in performing internal maintenance/cleanup
    1239              :  * using the xl_running_xacts record.
    1240              :  */
    1241              : static bool
    1242         1186 : SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
    1243              : {
    1244              :     /* ---
    1245              :      * Build catalog decoding snapshot incrementally using information about
    1246              :      * the currently running transactions. There are several ways to do that:
    1247              :      *
    1248              :      * a) There were no running transactions when the xl_running_xacts record
    1249              :      *    was inserted, jump to CONSISTENT immediately. We might find such a
    1250              :      *    state while waiting on c)'s sub-states.
    1251              :      *
    1252              :      * b) This (in a previous run) or another decoding slot serialized a
    1253              :      *    snapshot to disk that we can use. Can't use this method while finding
    1254              :      *    the start point for decoding changes as the restart LSN would be an
    1255              :      *    arbitrary LSN but we need to find the start point to extract changes
    1256              :      *    where we won't see the data for partial transactions. Also, we cannot
    1257              :      *    use this method when a slot needs a full snapshot for export or direct
    1258              :      *    use, as that snapshot will only contain catalog modifying transactions.
    1259              :      *
    1260              :      * c) First incrementally build a snapshot for catalog tuples
    1261              :      *    (BUILDING_SNAPSHOT), that requires all, already in-progress,
    1262              :      *    transactions to finish.  Every transaction starting after that
    1263              :      *    (FULL_SNAPSHOT state), has enough information to be decoded.  But
    1264              :      *    for older running transactions no viable snapshot exists yet, so
    1265              :      *    CONSISTENT will only be reached once all of those have finished.
    1266              :      * ---
    1267              :      */
    1268              : 
    1269              :     /*
    1270              :      * xl_running_xacts record is older than what we can use, we might not
    1271              :      * have all necessary catalog rows anymore.
    1272              :      */
    1273         1186 :     if (TransactionIdIsNormal(builder->initial_xmin_horizon) &&
    1274          505 :         NormalTransactionIdPrecedes(running->oldestRunningXid,
    1275              :                                     builder->initial_xmin_horizon))
    1276              :     {
    1277            0 :         ereport(DEBUG1,
    1278              :                 errmsg_internal("skipping snapshot at %X/%08X while building logical decoding snapshot, xmin horizon too low",
    1279              :                                 LSN_FORMAT_ARGS(lsn)),
    1280              :                 errdetail_internal("initial xmin horizon of %u vs the snapshot's %u",
    1281              :                                    builder->initial_xmin_horizon, running->oldestRunningXid));
    1282              : 
    1283              : 
    1284            0 :         SnapBuildWaitSnapshot(running, builder->initial_xmin_horizon);
    1285              : 
    1286            0 :         return true;
    1287              :     }
    1288              : 
    1289              :     /*
    1290              :      * a) No transaction were running, we can jump to consistent.
    1291              :      *
    1292              :      * This is not affected by races around xl_running_xacts, because we can
    1293              :      * miss transaction commits, but currently not transactions starting.
    1294              :      *
    1295              :      * NB: We might have already started to incrementally assemble a snapshot,
    1296              :      * so we need to be careful to deal with that.
    1297              :      */
    1298         1186 :     if (running->oldestRunningXid == running->nextXid)
    1299              :     {
    1300         1154 :         if (!XLogRecPtrIsValid(builder->start_decoding_at) ||
    1301          655 :             builder->start_decoding_at <= lsn)
    1302              :             /* can decode everything after this */
    1303          500 :             builder->start_decoding_at = lsn + 1;
    1304              : 
    1305              :         /* As no transactions were running xmin/xmax can be trivially set. */
    1306         1154 :         builder->xmin = running->nextXid; /* < are finished */
    1307         1154 :         builder->xmax = running->nextXid; /* >= are running */
    1308              : 
    1309              :         /* so we can safely use the faster comparisons */
    1310              :         Assert(TransactionIdIsNormal(builder->xmin));
    1311              :         Assert(TransactionIdIsNormal(builder->xmax));
    1312              : 
    1313         1154 :         builder->state = SNAPBUILD_CONSISTENT;
    1314         1154 :         builder->next_phase_at = InvalidTransactionId;
    1315              : 
    1316         1154 :         ereport(LogicalDecodingLogLevel(),
    1317              :                 errmsg("logical decoding found consistent point at %X/%08X",
    1318              :                        LSN_FORMAT_ARGS(lsn)),
    1319              :                 errdetail("There are no running transactions."));
    1320              : 
    1321         1154 :         return false;
    1322              :     }
    1323              : 
    1324              :     /*
    1325              :      * b) valid on disk state and while neither building full snapshot nor
    1326              :      * creating a slot.
    1327              :      */
    1328           32 :     else if (!builder->building_full_snapshot &&
    1329           50 :              !builder->in_slot_creation &&
    1330           19 :              SnapBuildRestore(builder, lsn))
    1331              :     {
    1332              :         /* there won't be any state to cleanup */
    1333            8 :         return false;
    1334              :     }
    1335              : 
    1336              :     /*
    1337              :      * c) transition from START to BUILDING_SNAPSHOT.
    1338              :      *
    1339              :      * In START state, and a xl_running_xacts record with running xacts is
    1340              :      * encountered.  In that case, switch to BUILDING_SNAPSHOT state, and
    1341              :      * record xl_running_xacts->nextXid.  Once all running xacts have finished
    1342              :      * (i.e. they're all >= nextXid), we have a complete catalog snapshot.  It
    1343              :      * might look that we could use xl_running_xacts's ->xids information to
    1344              :      * get there quicker, but that is problematic because transactions marked
    1345              :      * as running, might already have inserted their commit record - it's
    1346              :      * infeasible to change that with locking.
    1347              :      */
    1348           24 :     else if (builder->state == SNAPBUILD_START)
    1349              :     {
    1350           13 :         builder->state = SNAPBUILD_BUILDING_SNAPSHOT;
    1351           13 :         builder->next_phase_at = running->nextXid;
    1352              : 
    1353              :         /*
    1354              :          * Start with an xmin/xmax that's correct for future, when all the
    1355              :          * currently running transactions have finished. We'll update both
    1356              :          * while waiting for the pending transactions to finish.
    1357              :          */
    1358           13 :         builder->xmin = running->nextXid; /* < are finished */
    1359           13 :         builder->xmax = running->nextXid; /* >= are running */
    1360              : 
    1361              :         /* so we can safely use the faster comparisons */
    1362              :         Assert(TransactionIdIsNormal(builder->xmin));
    1363              :         Assert(TransactionIdIsNormal(builder->xmax));
    1364              : 
    1365           13 :         ereport(LOG,
    1366              :                 errmsg("logical decoding found initial starting point at %X/%08X",
    1367              :                        LSN_FORMAT_ARGS(lsn)),
    1368              :                 errdetail("Waiting for transactions (approximately %d) older than %u to end.",
    1369              :                           running->xcnt, running->nextXid));
    1370              : 
    1371           13 :         SnapBuildWaitSnapshot(running, running->nextXid);
    1372              :     }
    1373              : 
    1374              :     /*
    1375              :      * c) transition from BUILDING_SNAPSHOT to FULL_SNAPSHOT.
    1376              :      *
    1377              :      * In BUILDING_SNAPSHOT state, and this xl_running_xacts' oldestRunningXid
    1378              :      * is >= than nextXid from when we switched to BUILDING_SNAPSHOT.  This
    1379              :      * means all transactions starting afterwards have enough information to
    1380              :      * be decoded.  Switch to FULL_SNAPSHOT.
    1381              :      */
    1382           17 :     else if (builder->state == SNAPBUILD_BUILDING_SNAPSHOT &&
    1383            6 :              TransactionIdPrecedesOrEquals(builder->next_phase_at,
    1384              :                                            running->oldestRunningXid))
    1385              :     {
    1386            5 :         builder->state = SNAPBUILD_FULL_SNAPSHOT;
    1387            5 :         builder->next_phase_at = running->nextXid;
    1388              : 
    1389            5 :         ereport(LOG,
    1390              :                 errmsg("logical decoding found initial consistent point at %X/%08X",
    1391              :                        LSN_FORMAT_ARGS(lsn)),
    1392              :                 errdetail("Waiting for transactions (approximately %d) older than %u to end.",
    1393              :                           running->xcnt, running->nextXid));
    1394              : 
    1395            5 :         SnapBuildWaitSnapshot(running, running->nextXid);
    1396              :     }
    1397              : 
    1398              :     /*
    1399              :      * c) transition from FULL_SNAPSHOT to CONSISTENT.
    1400              :      *
    1401              :      * In FULL_SNAPSHOT state, and this xl_running_xacts' oldestRunningXid is
    1402              :      * >= than nextXid from when we switched to FULL_SNAPSHOT.  This means all
    1403              :      * transactions that are currently in progress have a catalog snapshot,
    1404              :      * and all their changes have been collected.  Switch to CONSISTENT.
    1405              :      */
    1406           11 :     else if (builder->state == SNAPBUILD_FULL_SNAPSHOT &&
    1407            5 :              TransactionIdPrecedesOrEquals(builder->next_phase_at,
    1408              :                                            running->oldestRunningXid))
    1409              :     {
    1410            5 :         builder->state = SNAPBUILD_CONSISTENT;
    1411            5 :         builder->next_phase_at = InvalidTransactionId;
    1412              : 
    1413            5 :         ereport(LogicalDecodingLogLevel(),
    1414              :                 errmsg("logical decoding found consistent point at %X/%08X",
    1415              :                        LSN_FORMAT_ARGS(lsn)),
    1416              :                 errdetail("There are no old transactions anymore."));
    1417              :     }
    1418              : 
    1419              :     /*
    1420              :      * We already started to track running xacts and need to wait for all
    1421              :      * in-progress ones to finish. We fall through to the normal processing of
    1422              :      * records so incremental cleanup can be performed.
    1423              :      */
    1424           22 :     return true;
    1425              : }
    1426              : 
    1427              : /* ---
    1428              :  * Iterate through xids in record, wait for all older than the cutoff to
    1429              :  * finish.  Then, if possible, log a new xl_running_xacts record.
    1430              :  *
    1431              :  * This isn't required for the correctness of decoding, but to:
    1432              :  * a) allow isolationtester to notice that we're currently waiting for
    1433              :  *    something.
    1434              :  * b) log a new xl_running_xacts record where it'd be helpful, without having
    1435              :  *    to wait for bgwriter or checkpointer.
    1436              :  * ---
    1437              :  */
    1438              : static void
    1439           18 : SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff)
    1440              : {
    1441              :     int         off;
    1442              : 
    1443           34 :     for (off = 0; off < running->xcnt; off++)
    1444              :     {
    1445           18 :         TransactionId xid = running->xids[off];
    1446              : 
    1447              :         /*
    1448              :          * Upper layers should prevent that we ever need to wait on ourselves.
    1449              :          * Check anyway, since failing to do so would either result in an
    1450              :          * endless wait or an Assert() failure.
    1451              :          */
    1452           18 :         if (TransactionIdIsCurrentTransactionId(xid))
    1453            0 :             elog(ERROR, "waiting for ourselves");
    1454              : 
    1455           18 :         if (TransactionIdFollows(xid, cutoff))
    1456            0 :             continue;
    1457              : 
    1458           18 :         XactLockTableWait(xid, NULL, NULL, XLTW_None);
    1459              :     }
    1460              : 
    1461              :     /*
    1462              :      * All transactions we needed to finish finished - try to ensure there is
    1463              :      * another xl_running_xacts record in a timely manner, without having to
    1464              :      * wait for bgwriter or checkpointer to log one.  During recovery we can't
    1465              :      * enforce that, so we'll have to wait.
    1466              :      */
    1467           16 :     if (!RecoveryInProgress())
    1468              :     {
    1469           16 :         LogStandbySnapshot();
    1470              :     }
    1471           16 : }
    1472              : 
    1473              : #define SnapBuildOnDiskConstantSize \
    1474              :     offsetof(SnapBuildOnDisk, builder)
    1475              : #define SnapBuildOnDiskNotChecksummedSize \
    1476              :     offsetof(SnapBuildOnDisk, version)
    1477              : 
    1478              : #define SNAPBUILD_MAGIC 0x51A1E001
    1479              : #define SNAPBUILD_VERSION 6
    1480              : 
    1481              : /*
    1482              :  * Store/Load a snapshot from disk, depending on the snapshot builder's state.
    1483              :  *
    1484              :  * Supposed to be used by external (i.e. not snapbuild.c) code that just read
    1485              :  * a record that's a potential location for a serialized snapshot.
    1486              :  */
    1487              : void
    1488           93 : SnapBuildSerializationPoint(SnapBuild *builder, XLogRecPtr lsn)
    1489              : {
    1490           93 :     if (builder->state < SNAPBUILD_CONSISTENT)
    1491            0 :         SnapBuildRestore(builder, lsn);
    1492              :     else
    1493           93 :         SnapBuildSerialize(builder, lsn);
    1494           93 : }
    1495              : 
    1496              : /*
    1497              :  * Serialize the snapshot 'builder' at the location 'lsn' if it hasn't already
    1498              :  * been done by another decoding process.
    1499              :  */
    1500              : static void
    1501          617 : SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
    1502              : {
    1503              :     Size        needed_length;
    1504          617 :     SnapBuildOnDisk *ondisk = NULL;
    1505          617 :     TransactionId *catchange_xip = NULL;
    1506              :     MemoryContext old_ctx;
    1507              :     size_t      catchange_xcnt;
    1508              :     char       *ondisk_c;
    1509              :     int         fd;
    1510              :     char        tmppath[MAXPGPATH];
    1511              :     char        path[MAXPGPATH];
    1512              :     int         ret;
    1513              :     struct stat stat_buf;
    1514              :     Size        sz;
    1515              : 
    1516              :     Assert(XLogRecPtrIsValid(lsn));
    1517              :     Assert(!XLogRecPtrIsValid(builder->last_serialized_snapshot) ||
    1518              :            builder->last_serialized_snapshot <= lsn);
    1519              : 
    1520              :     /*
    1521              :      * no point in serializing if we cannot continue to work immediately after
    1522              :      * restoring the snapshot
    1523              :      */
    1524          617 :     if (builder->state < SNAPBUILD_CONSISTENT)
    1525            0 :         return;
    1526              : 
    1527              :     /* consistent snapshots have no next phase */
    1528              :     Assert(builder->next_phase_at == InvalidTransactionId);
    1529              : 
    1530              :     /*
    1531              :      * We identify snapshots by the LSN they are valid for. We don't need to
    1532              :      * include timelines in the name as each LSN maps to exactly one timeline
    1533              :      * unless the user used pg_resetwal or similar. If a user did so, there's
    1534              :      * no hope continuing to decode anyway.
    1535              :      */
    1536          617 :     sprintf(path, "%s/%X-%X.snap",
    1537              :             PG_LOGICAL_SNAPSHOTS_DIR,
    1538          617 :             LSN_FORMAT_ARGS(lsn));
    1539              : 
    1540              :     /*
    1541              :      * first check whether some other backend already has written the snapshot
    1542              :      * for this LSN. It's perfectly fine if there's none, so we accept ENOENT
    1543              :      * as a valid state. Everything else is an unexpected error.
    1544              :      */
    1545          617 :     ret = stat(path, &stat_buf);
    1546              : 
    1547          617 :     if (ret != 0 && errno != ENOENT)
    1548            0 :         ereport(ERROR,
    1549              :                 (errcode_for_file_access(),
    1550              :                  errmsg("could not stat file \"%s\": %m", path)));
    1551              : 
    1552          617 :     else if (ret == 0)
    1553              :     {
    1554              :         /*
    1555              :          * somebody else has already serialized to this point, don't overwrite
    1556              :          * but remember location, so we don't need to read old data again.
    1557              :          *
    1558              :          * To be sure it has been synced to disk after the rename() from the
    1559              :          * tempfile filename to the real filename, we just repeat the fsync.
    1560              :          * That ought to be cheap because in most scenarios it should already
    1561              :          * be safely on disk.
    1562              :          */
    1563          284 :         fsync_fname(path, false);
    1564          284 :         fsync_fname(PG_LOGICAL_SNAPSHOTS_DIR, true);
    1565              : 
    1566          284 :         builder->last_serialized_snapshot = lsn;
    1567          284 :         goto out;
    1568              :     }
    1569              : 
    1570              :     /*
    1571              :      * there is an obvious race condition here between the time we stat(2) the
    1572              :      * file and us writing the file. But we rename the file into place
    1573              :      * atomically and all files created need to contain the same data anyway,
    1574              :      * so this is perfectly fine, although a bit of a resource waste. Locking
    1575              :      * seems like pointless complication.
    1576              :      */
    1577          333 :     elog(DEBUG1, "serializing snapshot to %s", path);
    1578              : 
    1579              :     /* to make sure only we will write to this tempfile, include pid */
    1580          333 :     sprintf(tmppath, "%s/%X-%X.snap.%d.tmp",
    1581              :             PG_LOGICAL_SNAPSHOTS_DIR,
    1582          333 :             LSN_FORMAT_ARGS(lsn), MyProcPid);
    1583              : 
    1584              :     /*
    1585              :      * Unlink temporary file if it already exists, needs to have been before a
    1586              :      * crash/error since we won't enter this function twice from within a
    1587              :      * single decoding slot/backend and the temporary file contains the pid of
    1588              :      * the current process.
    1589              :      */
    1590          333 :     if (unlink(tmppath) != 0 && errno != ENOENT)
    1591            0 :         ereport(ERROR,
    1592              :                 (errcode_for_file_access(),
    1593              :                  errmsg("could not remove file \"%s\": %m", tmppath)));
    1594              : 
    1595          333 :     old_ctx = MemoryContextSwitchTo(builder->context);
    1596              : 
    1597              :     /* Get the catalog modifying transactions that are yet not committed */
    1598          333 :     catchange_xip = ReorderBufferGetCatalogChangesXacts(builder->reorder);
    1599          333 :     catchange_xcnt = dclist_count(&builder->reorder->catchange_txns);
    1600              : 
    1601          333 :     needed_length = sizeof(SnapBuildOnDisk) +
    1602          333 :         sizeof(TransactionId) * (builder->committed.xcnt + catchange_xcnt);
    1603              : 
    1604          333 :     ondisk_c = palloc0(needed_length);
    1605          333 :     ondisk = (SnapBuildOnDisk *) ondisk_c;
    1606          333 :     ondisk->magic = SNAPBUILD_MAGIC;
    1607          333 :     ondisk->version = SNAPBUILD_VERSION;
    1608          333 :     ondisk->length = needed_length;
    1609          333 :     INIT_CRC32C(ondisk->checksum);
    1610          333 :     COMP_CRC32C(ondisk->checksum,
    1611              :                 ((char *) ondisk) + SnapBuildOnDiskNotChecksummedSize,
    1612              :                 SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
    1613          333 :     ondisk_c += sizeof(SnapBuildOnDisk);
    1614              : 
    1615          333 :     memcpy(&ondisk->builder, builder, sizeof(SnapBuild));
    1616              :     /* NULL-ify memory-only data */
    1617          333 :     ondisk->builder.context = NULL;
    1618          333 :     ondisk->builder.snapshot = NULL;
    1619          333 :     ondisk->builder.reorder = NULL;
    1620          333 :     ondisk->builder.committed.xip = NULL;
    1621          333 :     ondisk->builder.catchange.xip = NULL;
    1622              :     /* update catchange only on disk data */
    1623          333 :     ondisk->builder.catchange.xcnt = catchange_xcnt;
    1624              : 
    1625          333 :     COMP_CRC32C(ondisk->checksum,
    1626              :                 &ondisk->builder,
    1627              :                 sizeof(SnapBuild));
    1628              : 
    1629              :     /* copy committed xacts */
    1630          333 :     if (builder->committed.xcnt > 0)
    1631              :     {
    1632           60 :         sz = sizeof(TransactionId) * builder->committed.xcnt;
    1633           60 :         memcpy(ondisk_c, builder->committed.xip, sz);
    1634           60 :         COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
    1635           60 :         ondisk_c += sz;
    1636              :     }
    1637              : 
    1638              :     /* copy catalog modifying xacts */
    1639          333 :     if (catchange_xcnt > 0)
    1640              :     {
    1641            8 :         sz = sizeof(TransactionId) * catchange_xcnt;
    1642            8 :         memcpy(ondisk_c, catchange_xip, sz);
    1643            8 :         COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
    1644            8 :         ondisk_c += sz;
    1645              :     }
    1646              : 
    1647          333 :     FIN_CRC32C(ondisk->checksum);
    1648              : 
    1649              :     /* we have valid data now, open tempfile and write it there */
    1650          333 :     fd = OpenTransientFile(tmppath,
    1651              :                            O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
    1652          333 :     if (fd < 0)
    1653            0 :         ereport(ERROR,
    1654              :                 (errcode_for_file_access(),
    1655              :                  errmsg("could not open file \"%s\": %m", tmppath)));
    1656              : 
    1657          333 :     errno = 0;
    1658          333 :     pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_WRITE);
    1659          333 :     if ((write(fd, ondisk, needed_length)) != needed_length)
    1660              :     {
    1661            0 :         int         save_errno = errno;
    1662              : 
    1663            0 :         CloseTransientFile(fd);
    1664              : 
    1665              :         /* if write didn't set errno, assume problem is no disk space */
    1666            0 :         errno = save_errno ? save_errno : ENOSPC;
    1667            0 :         ereport(ERROR,
    1668              :                 (errcode_for_file_access(),
    1669              :                  errmsg("could not write to file \"%s\": %m", tmppath)));
    1670              :     }
    1671          333 :     pgstat_report_wait_end();
    1672              : 
    1673              :     /*
    1674              :      * fsync the file before renaming so that even if we crash after this we
    1675              :      * have either a fully valid file or nothing.
    1676              :      *
    1677              :      * It's safe to just ERROR on fsync() here because we'll retry the whole
    1678              :      * operation including the writes.
    1679              :      *
    1680              :      * TODO: Do the fsync() via checkpoints/restartpoints, doing it here has
    1681              :      * some noticeable overhead since it's performed synchronously during
    1682              :      * decoding?
    1683              :      */
    1684          333 :     pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_SYNC);
    1685          333 :     if (pg_fsync(fd) != 0)
    1686              :     {
    1687            0 :         int         save_errno = errno;
    1688              : 
    1689            0 :         CloseTransientFile(fd);
    1690            0 :         errno = save_errno;
    1691            0 :         ereport(ERROR,
    1692              :                 (errcode_for_file_access(),
    1693              :                  errmsg("could not fsync file \"%s\": %m", tmppath)));
    1694              :     }
    1695          333 :     pgstat_report_wait_end();
    1696              : 
    1697          333 :     if (CloseTransientFile(fd) != 0)
    1698            0 :         ereport(ERROR,
    1699              :                 (errcode_for_file_access(),
    1700              :                  errmsg("could not close file \"%s\": %m", tmppath)));
    1701              : 
    1702          333 :     fsync_fname(PG_LOGICAL_SNAPSHOTS_DIR, true);
    1703              : 
    1704              :     /*
    1705              :      * We may overwrite the work from some other backend, but that's ok, our
    1706              :      * snapshot is valid as well, we'll just have done some superfluous work.
    1707              :      */
    1708          333 :     if (rename(tmppath, path) != 0)
    1709              :     {
    1710            0 :         ereport(ERROR,
    1711              :                 (errcode_for_file_access(),
    1712              :                  errmsg("could not rename file \"%s\" to \"%s\": %m",
    1713              :                         tmppath, path)));
    1714              :     }
    1715              : 
    1716              :     /* make sure we persist */
    1717          333 :     fsync_fname(path, false);
    1718          333 :     fsync_fname(PG_LOGICAL_SNAPSHOTS_DIR, true);
    1719              : 
    1720              :     /*
    1721              :      * Now there's no way we can lose the dumped state anymore, remember this
    1722              :      * as a serialization point.
    1723              :      */
    1724          333 :     builder->last_serialized_snapshot = lsn;
    1725              : 
    1726          333 :     MemoryContextSwitchTo(old_ctx);
    1727              : 
    1728          617 : out:
    1729          617 :     ReorderBufferSetRestartPoint(builder->reorder,
    1730              :                                  builder->last_serialized_snapshot);
    1731              :     /* be tidy */
    1732          617 :     if (ondisk)
    1733          333 :         pfree(ondisk);
    1734          617 :     if (catchange_xip)
    1735            8 :         pfree(catchange_xip);
    1736              : }
    1737              : 
    1738              : /*
    1739              :  * Restore the logical snapshot file contents to 'ondisk'.
    1740              :  *
    1741              :  * 'context' is the memory context where the catalog modifying/committed xid
    1742              :  * will live.
    1743              :  * If 'missing_ok' is true, will not throw an error if the file is not found.
    1744              :  */
    1745              : bool
    1746           21 : SnapBuildRestoreSnapshot(SnapBuildOnDisk *ondisk, XLogRecPtr lsn,
    1747              :                          MemoryContext context, bool missing_ok)
    1748              : {
    1749              :     int         fd;
    1750              :     pg_crc32c   checksum;
    1751              :     Size        sz;
    1752              :     char        path[MAXPGPATH];
    1753              : 
    1754           21 :     sprintf(path, "%s/%X-%X.snap",
    1755              :             PG_LOGICAL_SNAPSHOTS_DIR,
    1756           21 :             LSN_FORMAT_ARGS(lsn));
    1757              : 
    1758           21 :     fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
    1759              : 
    1760           21 :     if (fd < 0)
    1761              :     {
    1762           11 :         if (missing_ok && errno == ENOENT)
    1763           11 :             return false;
    1764              : 
    1765            0 :         ereport(ERROR,
    1766              :                 (errcode_for_file_access(),
    1767              :                  errmsg("could not open file \"%s\": %m", path)));
    1768              :     }
    1769              : 
    1770              :     /* ----
    1771              :      * Make sure the snapshot had been stored safely to disk, that's normally
    1772              :      * cheap.
    1773              :      * Note that we do not need PANIC here, nobody will be able to use the
    1774              :      * slot without fsyncing, and saving it won't succeed without an fsync()
    1775              :      * either...
    1776              :      * ----
    1777              :      */
    1778           10 :     fsync_fname(path, false);
    1779           10 :     fsync_fname(PG_LOGICAL_SNAPSHOTS_DIR, true);
    1780              : 
    1781              :     /* read statically sized portion of snapshot */
    1782           10 :     SnapBuildRestoreContents(fd, ondisk, SnapBuildOnDiskConstantSize, path);
    1783              : 
    1784           10 :     if (ondisk->magic != SNAPBUILD_MAGIC)
    1785            0 :         ereport(ERROR,
    1786              :                 (errcode(ERRCODE_DATA_CORRUPTED),
    1787              :                  errmsg("snapbuild state file \"%s\" has wrong magic number: %u instead of %u",
    1788              :                         path, ondisk->magic, SNAPBUILD_MAGIC)));
    1789              : 
    1790           10 :     if (ondisk->version != SNAPBUILD_VERSION)
    1791            0 :         ereport(ERROR,
    1792              :                 (errcode(ERRCODE_DATA_CORRUPTED),
    1793              :                  errmsg("snapbuild state file \"%s\" has unsupported version: %u instead of %u",
    1794              :                         path, ondisk->version, SNAPBUILD_VERSION)));
    1795              : 
    1796           10 :     INIT_CRC32C(checksum);
    1797           10 :     COMP_CRC32C(checksum,
    1798              :                 ((char *) ondisk) + SnapBuildOnDiskNotChecksummedSize,
    1799              :                 SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
    1800              : 
    1801              :     /* read SnapBuild */
    1802           10 :     SnapBuildRestoreContents(fd, &ondisk->builder, sizeof(SnapBuild), path);
    1803           10 :     COMP_CRC32C(checksum, &ondisk->builder, sizeof(SnapBuild));
    1804              : 
    1805              :     /* restore committed xacts information */
    1806           10 :     if (ondisk->builder.committed.xcnt > 0)
    1807              :     {
    1808            4 :         sz = sizeof(TransactionId) * ondisk->builder.committed.xcnt;
    1809            4 :         ondisk->builder.committed.xip = MemoryContextAllocZero(context, sz);
    1810            4 :         SnapBuildRestoreContents(fd, ondisk->builder.committed.xip, sz, path);
    1811            4 :         COMP_CRC32C(checksum, ondisk->builder.committed.xip, sz);
    1812              :     }
    1813              : 
    1814              :     /* restore catalog modifying xacts information */
    1815           10 :     if (ondisk->builder.catchange.xcnt > 0)
    1816              :     {
    1817            5 :         sz = sizeof(TransactionId) * ondisk->builder.catchange.xcnt;
    1818            5 :         ondisk->builder.catchange.xip = MemoryContextAllocZero(context, sz);
    1819            5 :         SnapBuildRestoreContents(fd, ondisk->builder.catchange.xip, sz, path);
    1820            5 :         COMP_CRC32C(checksum, ondisk->builder.catchange.xip, sz);
    1821              :     }
    1822              : 
    1823           10 :     if (CloseTransientFile(fd) != 0)
    1824            0 :         ereport(ERROR,
    1825              :                 (errcode_for_file_access(),
    1826              :                  errmsg("could not close file \"%s\": %m", path)));
    1827              : 
    1828           10 :     FIN_CRC32C(checksum);
    1829              : 
    1830              :     /* verify checksum of what we've read */
    1831           10 :     if (!EQ_CRC32C(checksum, ondisk->checksum))
    1832            0 :         ereport(ERROR,
    1833              :                 (errcode(ERRCODE_DATA_CORRUPTED),
    1834              :                  errmsg("checksum mismatch for snapbuild state file \"%s\": is %u, should be %u",
    1835              :                         path, checksum, ondisk->checksum)));
    1836              : 
    1837           10 :     return true;
    1838              : }
    1839              : 
    1840              : /*
    1841              :  * Restore a snapshot into 'builder' if previously one has been stored at the
    1842              :  * location indicated by 'lsn'. Returns true if successful, false otherwise.
    1843              :  */
    1844              : static bool
    1845           19 : SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
    1846              : {
    1847              :     SnapBuildOnDisk ondisk;
    1848              : 
    1849              :     /* no point in loading a snapshot if we're already there */
    1850           19 :     if (builder->state == SNAPBUILD_CONSISTENT)
    1851            0 :         return false;
    1852              : 
    1853              :     /* validate and restore the snapshot to 'ondisk' */
    1854           19 :     if (!SnapBuildRestoreSnapshot(&ondisk, lsn, builder->context, true))
    1855           11 :         return false;
    1856              : 
    1857              :     /*
    1858              :      * ok, we now have a sensible snapshot here, figure out if it has more
    1859              :      * information than we have.
    1860              :      */
    1861              : 
    1862              :     /*
    1863              :      * We are only interested in consistent snapshots for now, comparing
    1864              :      * whether one incomplete snapshot is more "advanced" seems to be
    1865              :      * unnecessarily complex.
    1866              :      */
    1867            8 :     if (ondisk.builder.state < SNAPBUILD_CONSISTENT)
    1868            0 :         goto snapshot_not_interesting;
    1869              : 
    1870              :     /*
    1871              :      * Don't use a snapshot that requires an xmin that we cannot guarantee to
    1872              :      * be available.
    1873              :      */
    1874            8 :     if (TransactionIdPrecedes(ondisk.builder.xmin, builder->initial_xmin_horizon))
    1875            0 :         goto snapshot_not_interesting;
    1876              : 
    1877              :     /*
    1878              :      * Consistent snapshots have no next phase. Reset next_phase_at as it is
    1879              :      * possible that an old value may remain.
    1880              :      */
    1881              :     Assert(ondisk.builder.next_phase_at == InvalidTransactionId);
    1882            8 :     builder->next_phase_at = InvalidTransactionId;
    1883              : 
    1884              :     /* ok, we think the snapshot is sensible, copy over everything important */
    1885            8 :     builder->xmin = ondisk.builder.xmin;
    1886            8 :     builder->xmax = ondisk.builder.xmax;
    1887            8 :     builder->state = ondisk.builder.state;
    1888              : 
    1889            8 :     builder->committed.xcnt = ondisk.builder.committed.xcnt;
    1890              :     /* We only allocated/stored xcnt, not xcnt_space xids ! */
    1891              :     /* don't overwrite preallocated xip, if we don't have anything here */
    1892            8 :     if (builder->committed.xcnt > 0)
    1893              :     {
    1894            2 :         pfree(builder->committed.xip);
    1895            2 :         builder->committed.xcnt_space = ondisk.builder.committed.xcnt;
    1896            2 :         builder->committed.xip = ondisk.builder.committed.xip;
    1897              :     }
    1898            8 :     ondisk.builder.committed.xip = NULL;
    1899              : 
    1900              :     /* set catalog modifying transactions */
    1901            8 :     if (builder->catchange.xip)
    1902            0 :         pfree(builder->catchange.xip);
    1903            8 :     builder->catchange.xcnt = ondisk.builder.catchange.xcnt;
    1904            8 :     builder->catchange.xip = ondisk.builder.catchange.xip;
    1905            8 :     ondisk.builder.catchange.xip = NULL;
    1906              : 
    1907              :     /* our snapshot is not interesting anymore, build a new one */
    1908            8 :     if (builder->snapshot != NULL)
    1909              :     {
    1910            0 :         SnapBuildSnapDecRefcount(builder->snapshot);
    1911              :     }
    1912            8 :     builder->snapshot = SnapBuildBuildSnapshot(builder);
    1913            8 :     SnapBuildSnapIncRefcount(builder->snapshot);
    1914              : 
    1915            8 :     ReorderBufferSetRestartPoint(builder->reorder, lsn);
    1916              : 
    1917              :     Assert(builder->state == SNAPBUILD_CONSISTENT);
    1918              : 
    1919            8 :     ereport(LogicalDecodingLogLevel(),
    1920              :             errmsg("logical decoding found consistent point at %X/%08X",
    1921              :                    LSN_FORMAT_ARGS(lsn)),
    1922              :             errdetail("Logical decoding will begin using saved snapshot."));
    1923            8 :     return true;
    1924              : 
    1925            0 : snapshot_not_interesting:
    1926            0 :     if (ondisk.builder.committed.xip != NULL)
    1927            0 :         pfree(ondisk.builder.committed.xip);
    1928            0 :     if (ondisk.builder.catchange.xip != NULL)
    1929            0 :         pfree(ondisk.builder.catchange.xip);
    1930            0 :     return false;
    1931              : }
    1932              : 
    1933              : /*
    1934              :  * Read the contents of the serialized snapshot to 'dest'.
    1935              :  */
    1936              : static void
    1937           29 : SnapBuildRestoreContents(int fd, void *dest, Size size, const char *path)
    1938              : {
    1939              :     int         readBytes;
    1940              : 
    1941           29 :     pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ);
    1942           29 :     readBytes = read(fd, dest, size);
    1943           29 :     pgstat_report_wait_end();
    1944           29 :     if (readBytes != size)
    1945              :     {
    1946            0 :         int         save_errno = errno;
    1947              : 
    1948            0 :         CloseTransientFile(fd);
    1949              : 
    1950            0 :         if (readBytes < 0)
    1951              :         {
    1952            0 :             errno = save_errno;
    1953            0 :             ereport(ERROR,
    1954              :                     (errcode_for_file_access(),
    1955              :                      errmsg("could not read file \"%s\": %m", path)));
    1956              :         }
    1957              :         else
    1958            0 :             ereport(ERROR,
    1959              :                     (errcode(ERRCODE_DATA_CORRUPTED),
    1960              :                      errmsg("could not read file \"%s\": read %d of %zu",
    1961              :                             path, readBytes, size)));
    1962              :     }
    1963           29 : }
    1964              : 
    1965              : /*
    1966              :  * Remove all serialized snapshots that are not required anymore because no
    1967              :  * slot can need them. This doesn't actually have to run during a checkpoint,
    1968              :  * but it's a convenient point to schedule this.
    1969              :  *
    1970              :  * NB: We run this during checkpoints even if logical decoding is disabled so
    1971              :  * we cleanup old slots at some point after it got disabled.
    1972              :  */
    1973              : void
    1974         1952 : CheckPointSnapBuild(void)
    1975              : {
    1976              :     XLogRecPtr  cutoff;
    1977              :     XLogRecPtr  redo;
    1978              :     DIR        *snap_dir;
    1979              :     struct dirent *snap_de;
    1980              :     char        path[MAXPGPATH + sizeof(PG_LOGICAL_SNAPSHOTS_DIR)];
    1981              : 
    1982              :     /*
    1983              :      * We start off with a minimum of the last redo pointer. No new
    1984              :      * replication slot will start before that, so that's a safe upper bound
    1985              :      * for removal.
    1986              :      */
    1987         1952 :     redo = GetRedoRecPtr();
    1988              : 
    1989              :     /* now check for the restart ptrs from existing slots */
    1990         1952 :     cutoff = ReplicationSlotsComputeLogicalRestartLSN();
    1991              : 
    1992              :     /* don't start earlier than the restart lsn */
    1993         1952 :     if (redo < cutoff)
    1994            1 :         cutoff = redo;
    1995              : 
    1996         1952 :     snap_dir = AllocateDir(PG_LOGICAL_SNAPSHOTS_DIR);
    1997         6160 :     while ((snap_de = ReadDir(snap_dir, PG_LOGICAL_SNAPSHOTS_DIR)) != NULL)
    1998              :     {
    1999              :         uint32      hi;
    2000              :         uint32      lo;
    2001              :         XLogRecPtr  lsn;
    2002              :         PGFileType  de_type;
    2003              : 
    2004         4208 :         if (strcmp(snap_de->d_name, ".") == 0 ||
    2005         2256 :             strcmp(snap_de->d_name, "..") == 0)
    2006         3904 :             continue;
    2007              : 
    2008          304 :         snprintf(path, sizeof(path), "%s/%s", PG_LOGICAL_SNAPSHOTS_DIR, snap_de->d_name);
    2009          304 :         de_type = get_dirent_type(path, snap_de, false, DEBUG1);
    2010              : 
    2011          304 :         if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_REG)
    2012              :         {
    2013            0 :             elog(DEBUG1, "only regular files expected: %s", path);
    2014            0 :             continue;
    2015              :         }
    2016              : 
    2017              :         /*
    2018              :          * temporary filenames from SnapBuildSerialize() include the LSN and
    2019              :          * everything but are postfixed by .$pid.tmp. We can just remove them
    2020              :          * the same as other files because there can be none that are
    2021              :          * currently being written that are older than cutoff.
    2022              :          *
    2023              :          * We just log a message if a file doesn't fit the pattern, it's
    2024              :          * probably some editors lock/state file or similar...
    2025              :          */
    2026          304 :         if (sscanf(snap_de->d_name, "%X-%X.snap", &hi, &lo) != 2)
    2027              :         {
    2028            0 :             ereport(LOG,
    2029              :                     (errmsg("could not parse file name \"%s\"", path)));
    2030            0 :             continue;
    2031              :         }
    2032              : 
    2033          304 :         lsn = ((uint64) hi) << 32 | lo;
    2034              : 
    2035              :         /* check whether we still need it */
    2036          304 :         if (lsn < cutoff || !XLogRecPtrIsValid(cutoff))
    2037              :         {
    2038          201 :             elog(DEBUG1, "removing snapbuild snapshot %s", path);
    2039              : 
    2040              :             /*
    2041              :              * It's not particularly harmful, though strange, if we can't
    2042              :              * remove the file here. Don't prevent the checkpoint from
    2043              :              * completing, that'd be a cure worse than the disease.
    2044              :              */
    2045          201 :             if (unlink(path) < 0)
    2046              :             {
    2047            0 :                 ereport(LOG,
    2048              :                         (errcode_for_file_access(),
    2049              :                          errmsg("could not remove file \"%s\": %m",
    2050              :                                 path)));
    2051            0 :                 continue;
    2052              :             }
    2053              :         }
    2054              :     }
    2055         1952 :     FreeDir(snap_dir);
    2056         1952 : }
    2057              : 
    2058              : /*
    2059              :  * Check if a logical snapshot at the specified point has been serialized.
    2060              :  */
    2061              : bool
    2062           16 : SnapBuildSnapshotExists(XLogRecPtr lsn)
    2063              : {
    2064              :     char        path[MAXPGPATH];
    2065              :     int         ret;
    2066              :     struct stat stat_buf;
    2067              : 
    2068           16 :     sprintf(path, "%s/%X-%X.snap",
    2069              :             PG_LOGICAL_SNAPSHOTS_DIR,
    2070           16 :             LSN_FORMAT_ARGS(lsn));
    2071              : 
    2072           16 :     ret = stat(path, &stat_buf);
    2073              : 
    2074           16 :     if (ret != 0 && errno != ENOENT)
    2075            0 :         ereport(ERROR,
    2076              :                 (errcode_for_file_access(),
    2077              :                  errmsg("could not stat file \"%s\": %m", path)));
    2078              : 
    2079           16 :     return ret == 0;
    2080              : }
        

Generated by: LCOV version 2.0-1