LCOV - code coverage report
Current view: top level - src/backend/replication/logical - snapbuild.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 84.9 % 568 482
Test Date: 2026-04-22 18:16:45 Functions: 100.0 % 32 32
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * snapbuild.c
       4              :  *
       5              :  *    Infrastructure for building historic catalog snapshots based on contents
       6              :  *    of the WAL, for the purpose of decoding heapam.c style values in the
       7              :  *    WAL.
       8              :  *
       9              :  * NOTES:
      10              :  *
      11              :  * We build snapshots which can *only* be used to read catalog contents and we
      12              :  * do so by reading and interpreting the WAL stream. The aim is to build a
      13              :  * snapshot that behaves the same as a freshly taken MVCC snapshot would have
      14              :  * at the time the XLogRecord was generated.
      15              :  *
      16              :  * To build the snapshots we reuse the infrastructure built for Hot
      17              :  * Standby. The in-memory snapshots we build look different than HS' because
      18              :  * we have different needs. To successfully decode data from the WAL we only
      19              :  * need to access catalog tables and (sys|rel|cat)cache, not the actual user
      20              :  * tables since the data we decode is wholly contained in the WAL
      21              :  * records. Also, our snapshots need to be different in comparison to normal
      22              :  * MVCC ones because in contrast to those we cannot fully rely on the clog and
      23              :  * pg_subtrans for information about committed transactions because they might
      24              :  * commit in the future from the POV of the WAL entry we're currently
      25              :  * decoding. This definition has the advantage that we only need to prevent
      26              :  * removal of catalog rows, while normal table's rows can still be
      27              :  * removed. This is achieved by using the replication slot mechanism.
      28              :  *
      29              :  * As the percentage of transactions modifying the catalog normally is fairly
      30              :  * small in comparisons to ones only manipulating user data, we keep track of
      31              :  * the committed catalog modifying ones inside [xmin, xmax) instead of keeping
      32              :  * track of all running transactions like it's done in a normal snapshot. Note
      33              :  * that we're generally only looking at transactions that have acquired an
      34              :  * xid. That is we keep a list of transactions between snapshot->(xmin, xmax)
      35              :  * that we consider committed, everything else is considered aborted/in
      36              :  * progress. That also allows us not to care about subtransactions before they
      37              :  * have committed which means this module, in contrast to HS, doesn't have to
      38              :  * care about suboverflowed subtransactions and similar.
      39              :  *
      40              :  * One complexity of doing this is that to e.g. handle mixed DDL/DML
      41              :  * transactions we need Snapshots that see intermediate versions of the
      42              :  * catalog in a transaction. During normal operation this is achieved by using
      43              :  * CommandIds/cmin/cmax. The problem with that however is that for space
      44              :  * efficiency reasons, the cmin and cmax are not included in WAL records. We
      45              :  * cannot read the cmin/cmax from the tuple itself, either, because it is
      46              :  * reset on crash recovery. Even if we could, we could not decode combocids
      47              :  * which are only tracked in the original backend's memory. To work around
      48              :  * that, heapam writes an extra WAL record (XLOG_HEAP2_NEW_CID) every time a
      49              :  * catalog row is modified, which includes the cmin and cmax of the
      50              :  * tuple. During decoding, we insert the ctid->(cmin,cmax) mappings into the
      51              :  * reorder buffer, and use them at visibility checks instead of the cmin/cmax
      52              :  * on the tuple itself. Check the reorderbuffer.c's comment above
      53              :  * ResolveCminCmaxDuringDecoding() for details.
      54              :  *
      55              :  * To facilitate all this we need our own visibility routine, as the normal
      56              :  * ones are optimized for different usecases.
      57              :  *
      58              :  * To replace the normal catalog snapshots with decoding ones use the
      59              :  * SetupHistoricSnapshot() and TeardownHistoricSnapshot() functions.
      60              :  *
      61              :  *
      62              :  *
      63              :  * The snapbuild machinery is starting up in several stages, as illustrated
      64              :  * by the following graph describing the SnapBuild->state transitions:
      65              :  *
      66              :  *         +-------------------------+
      67              :  *    +----|         START           |-------------+
      68              :  *    |    +-------------------------+             |
      69              :  *    |                 |                          |
      70              :  *    |                 |                          |
      71              :  *    |        running_xacts #1                    |
      72              :  *    |                 |                          |
      73              :  *    |                 |                          |
      74              :  *    |                 v                          |
      75              :  *    |    +-------------------------+             v
      76              :  *    |    |   BUILDING_SNAPSHOT     |------------>|
      77              :  *    |    +-------------------------+             |
      78              :  *    |                 |                          |
      79              :  *    |                 |                          |
      80              :  *    | running_xacts #2, xacts from #1 finished   |
      81              :  *    |                 |                          |
      82              :  *    |                 |                          |
      83              :  *    |                 v                          |
      84              :  *    |    +-------------------------+             v
      85              :  *    |    |       FULL_SNAPSHOT     |------------>|
      86              :  *    |    +-------------------------+             |
      87              :  *    |                 |                          |
      88              :  * running_xacts        |                      saved snapshot
      89              :  * with zero xacts      |                 at running_xacts's lsn
      90              :  *    |                 |                          |
      91              :  *    | running_xacts with xacts from #2 finished  |
      92              :  *    |                 |                          |
      93              :  *    |                 v                          |
      94              :  *    |    +-------------------------+             |
      95              :  *    +--->|SNAPBUILD_CONSISTENT  |<------------+
      96              :  *         +-------------------------+
      97              :  *
      98              :  * Initially the machinery is in the START stage. When an xl_running_xacts
      99              :  * record is read that is sufficiently new (above the safe xmin horizon),
     100              :  * there's a state transition. If there were no running xacts when the
     101              :  * xl_running_xacts record was generated, we'll directly go into CONSISTENT
     102              :  * state, otherwise we'll switch to the BUILDING_SNAPSHOT state. Having a full
     103              :  * snapshot means that all transactions that start henceforth can be decoded
     104              :  * in their entirety, but transactions that started previously can't. In
     105              :  * FULL_SNAPSHOT we'll switch into CONSISTENT once all those previously
     106              :  * running transactions have committed or aborted.
     107              :  *
     108              :  * Only transactions that commit after CONSISTENT state has been reached will
     109              :  * be replayed, even though they might have started while still in
     110              :  * FULL_SNAPSHOT. That ensures that we'll reach a point where no previous
     111              :  * changes has been exported, but all the following ones will be. That point
     112              :  * is a convenient point to initialize replication from, which is why we
     113              :  * export a snapshot at that point, which *can* be used to read normal data.
     114              :  *
     115              :  * Copyright (c) 2012-2026, PostgreSQL Global Development Group
     116              :  *
     117              :  * IDENTIFICATION
     118              :  *    src/backend/replication/logical/snapbuild.c
     119              :  *
     120              :  *-------------------------------------------------------------------------
     121              :  */
     122              : 
     123              : #include "postgres.h"
     124              : 
     125              : #include <sys/stat.h>
     126              : #include <unistd.h>
     127              : 
     128              : #include "access/heapam_xlog.h"
     129              : #include "access/transam.h"
     130              : #include "access/xact.h"
     131              : #include "common/file_utils.h"
     132              : #include "miscadmin.h"
     133              : #include "pgstat.h"
     134              : #include "replication/logical.h"
     135              : #include "replication/reorderbuffer.h"
     136              : #include "replication/snapbuild.h"
     137              : #include "replication/snapbuild_internal.h"
     138              : #include "storage/fd.h"
     139              : #include "storage/lmgr.h"
     140              : #include "storage/proc.h"
     141              : #include "storage/procarray.h"
     142              : #include "storage/standby.h"
     143              : #include "utils/builtins.h"
     144              : #include "utils/memutils.h"
     145              : #include "utils/snapmgr.h"
     146              : #include "utils/snapshot.h"
     147              : #include "utils/wait_event.h"
     148              : 
     149              : 
     150              : /*
     151              :  * Starting a transaction -- which we need to do while exporting a snapshot --
     152              :  * removes knowledge about the previously used resowner, so we save it here.
     153              :  */
     154              : static ResourceOwner SavedResourceOwnerDuringExport = NULL;
     155              : static bool ExportInProgress = false;
     156              : 
     157              : /*
     158              :  * If a backend is going to do logical decoding and the output plugin does
     159              :  * not need to access shared catalogs, setting this variable to false can make
     160              :  * the decoding startup faster. In particular, the backend will not need to
     161              :  * wait for completion of already running transactions in other databases.
     162              :  */
     163              : bool        accessSharedCatalogsInDecoding = true;
     164              : 
     165              : /* ->committed and ->catchange manipulation */
     166              : static void SnapBuildPurgeOlderTxn(SnapBuild *builder);
     167              : 
     168              : /* snapshot building/manipulation/distribution functions */
     169              : static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder);
     170              : 
     171              : static void SnapBuildFreeSnapshot(Snapshot snap);
     172              : 
     173              : static void SnapBuildSnapIncRefcount(Snapshot snap);
     174              : 
     175              : static void SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid);
     176              : 
     177              : static inline bool SnapBuildXidHasCatalogChanges(SnapBuild *builder, TransactionId xid,
     178              :                                                  uint32 xinfo);
     179              : 
     180              : /* xlog reading helper functions for SnapBuildProcessRunningXacts */
     181              : static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn,
     182              :                                   xl_running_xacts *running);
     183              : static void SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff);
     184              : 
     185              : /* serialization functions */
     186              : static void SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn);
     187              : static bool SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn);
     188              : static void SnapBuildRestoreContents(int fd, void *dest, Size size, const char *path);
     189              : 
     190              : /*
     191              :  * Allocate a new snapshot builder.
     192              :  *
     193              :  * xmin_horizon is the xid >= which we can be sure no catalog rows have been
     194              :  * removed, start_lsn is the LSN >= we want to replay commits.
     195              :  */
     196              : SnapBuild *
     197         1220 : AllocateSnapshotBuilder(ReorderBuffer *reorder,
     198              :                         TransactionId xmin_horizon,
     199              :                         XLogRecPtr start_lsn,
     200              :                         bool need_full_snapshot,
     201              :                         bool in_slot_creation,
     202              :                         XLogRecPtr two_phase_at)
     203              : {
     204              :     MemoryContext context;
     205              :     MemoryContext oldcontext;
     206              :     SnapBuild  *builder;
     207              : 
     208              :     /* allocate memory in own context, to have better accountability */
     209         1220 :     context = AllocSetContextCreate(CurrentMemoryContext,
     210              :                                     "snapshot builder context",
     211              :                                     ALLOCSET_DEFAULT_SIZES);
     212         1220 :     oldcontext = MemoryContextSwitchTo(context);
     213              : 
     214         1220 :     builder = palloc0_object(SnapBuild);
     215              : 
     216         1220 :     builder->state = SNAPBUILD_START;
     217         1220 :     builder->context = context;
     218         1220 :     builder->reorder = reorder;
     219              :     /* Other struct members initialized by zeroing via palloc0 above */
     220              : 
     221         1220 :     builder->committed.xcnt = 0;
     222         1220 :     builder->committed.xcnt_space = 128; /* arbitrary number */
     223         1220 :     builder->committed.xip =
     224         1220 :         palloc0_array(TransactionId, builder->committed.xcnt_space);
     225         1220 :     builder->committed.includes_all_transactions = true;
     226              : 
     227         1220 :     builder->catchange.xcnt = 0;
     228         1220 :     builder->catchange.xip = NULL;
     229              : 
     230         1220 :     builder->initial_xmin_horizon = xmin_horizon;
     231         1220 :     builder->start_decoding_at = start_lsn;
     232         1220 :     builder->in_slot_creation = in_slot_creation;
     233         1220 :     builder->building_full_snapshot = need_full_snapshot;
     234         1220 :     builder->two_phase_at = two_phase_at;
     235              : 
     236         1220 :     MemoryContextSwitchTo(oldcontext);
     237              : 
     238              :     /* The default is that shared catalog are used. */
     239         1220 :     accessSharedCatalogsInDecoding = true;
     240              : 
     241         1220 :     return builder;
     242              : }
     243              : 
     244              : /*
     245              :  * Free a snapshot builder.
     246              :  */
     247              : void
     248          948 : FreeSnapshotBuilder(SnapBuild *builder)
     249              : {
     250          948 :     MemoryContext context = builder->context;
     251              : 
     252              :     /* free snapshot explicitly, that contains some error checking */
     253          948 :     if (builder->snapshot != NULL)
     254              :     {
     255          231 :         SnapBuildSnapDecRefcount(builder->snapshot);
     256          231 :         builder->snapshot = NULL;
     257              :     }
     258              : 
     259              :     /* The default is that shared catalog are used. */
     260          948 :     accessSharedCatalogsInDecoding = true;
     261              : 
     262              :     /* other resources are deallocated via memory context reset */
     263          948 :     MemoryContextDelete(context);
     264          948 : }
     265              : 
     266              : /*
     267              :  * Free an unreferenced snapshot that has previously been built by us.
     268              :  */
     269              : static void
     270         1834 : SnapBuildFreeSnapshot(Snapshot snap)
     271              : {
     272              :     /* make sure we don't get passed an external snapshot */
     273              :     Assert(snap->snapshot_type == SNAPSHOT_HISTORIC_MVCC);
     274              : 
     275              :     /* make sure nobody modified our snapshot */
     276              :     Assert(snap->curcid == FirstCommandId);
     277              :     Assert(!snap->suboverflowed);
     278              :     Assert(!snap->takenDuringRecovery);
     279              :     Assert(snap->regd_count == 0);
     280              : 
     281              :     /* slightly more likely, so it's checked even without c-asserts */
     282         1834 :     if (snap->copied)
     283            0 :         elog(ERROR, "cannot free a copied snapshot");
     284              : 
     285         1834 :     if (snap->active_count)
     286            0 :         elog(ERROR, "cannot free an active snapshot");
     287              : 
     288         1834 :     pfree(snap);
     289         1834 : }
     290              : 
     291              : /*
     292              :  * In which state of snapshot building are we?
     293              :  */
     294              : SnapBuildState
     295      2247182 : SnapBuildCurrentState(SnapBuild *builder)
     296              : {
     297      2247182 :     return builder->state;
     298              : }
     299              : 
     300              : /*
     301              :  * Return the LSN at which the two-phase decoding was first enabled.
     302              :  */
     303              : XLogRecPtr
     304           33 : SnapBuildGetTwoPhaseAt(SnapBuild *builder)
     305              : {
     306           33 :     return builder->two_phase_at;
     307              : }
     308              : 
     309              : /*
     310              :  * Set the LSN at which two-phase decoding is enabled.
     311              :  */
     312              : void
     313            8 : SnapBuildSetTwoPhaseAt(SnapBuild *builder, XLogRecPtr ptr)
     314              : {
     315            8 :     builder->two_phase_at = ptr;
     316            8 : }
     317              : 
     318              : /*
     319              :  * Should the contents of transaction ending at 'ptr' be decoded?
     320              :  */
     321              : bool
     322       461869 : SnapBuildXactNeedsSkip(SnapBuild *builder, XLogRecPtr ptr)
     323              : {
     324       461869 :     return ptr < builder->start_decoding_at;
     325              : }
     326              : 
     327              : /*
     328              :  * Increase refcount of a snapshot.
     329              :  *
     330              :  * This is used when handing out a snapshot to some external resource or when
     331              :  * adding a Snapshot as builder->snapshot.
     332              :  */
     333              : static void
     334         7954 : SnapBuildSnapIncRefcount(Snapshot snap)
     335              : {
     336         7954 :     snap->active_count++;
     337         7954 : }
     338              : 
     339              : /*
     340              :  * Decrease refcount of a snapshot and free if the refcount reaches zero.
     341              :  *
     342              :  * Externally visible, so that external resources that have been handed an
     343              :  * IncRef'ed Snapshot can adjust its refcount easily.
     344              :  */
     345              : void
     346         7644 : SnapBuildSnapDecRefcount(Snapshot snap)
     347              : {
     348              :     /* make sure we don't get passed an external snapshot */
     349              :     Assert(snap->snapshot_type == SNAPSHOT_HISTORIC_MVCC);
     350              : 
     351              :     /* make sure nobody modified our snapshot */
     352              :     Assert(snap->curcid == FirstCommandId);
     353              :     Assert(!snap->suboverflowed);
     354              :     Assert(!snap->takenDuringRecovery);
     355              : 
     356              :     Assert(snap->regd_count == 0);
     357              : 
     358              :     Assert(snap->active_count > 0);
     359              : 
     360              :     /* slightly more likely, so it's checked even without casserts */
     361         7644 :     if (snap->copied)
     362            0 :         elog(ERROR, "cannot free a copied snapshot");
     363              : 
     364         7644 :     snap->active_count--;
     365         7644 :     if (snap->active_count == 0)
     366         1834 :         SnapBuildFreeSnapshot(snap);
     367         7644 : }
     368              : 
     369              : /*
     370              :  * Build a new snapshot, based on currently committed catalog-modifying
     371              :  * transactions.
     372              :  *
     373              :  * In-progress transactions with catalog access are *not* allowed to modify
     374              :  * these snapshots; they have to copy them and fill in appropriate ->curcid
     375              :  * and ->subxip/subxcnt values.
     376              :  */
     377              : static Snapshot
     378         2340 : SnapBuildBuildSnapshot(SnapBuild *builder)
     379              : {
     380              :     Snapshot    snapshot;
     381              :     Size        ssize;
     382              : 
     383              :     Assert(builder->state >= SNAPBUILD_FULL_SNAPSHOT);
     384              : 
     385         2340 :     ssize = sizeof(SnapshotData)
     386         2340 :         + sizeof(TransactionId) * builder->committed.xcnt
     387         2340 :         + sizeof(TransactionId) * 1 /* toplevel xid */ ;
     388              : 
     389         2340 :     snapshot = MemoryContextAllocZero(builder->context, ssize);
     390              : 
     391         2340 :     snapshot->snapshot_type = SNAPSHOT_HISTORIC_MVCC;
     392              : 
     393              :     /*
     394              :      * We misuse the original meaning of SnapshotData's xip and subxip fields
     395              :      * to make the more fitting for our needs.
     396              :      *
     397              :      * In the 'xip' array we store transactions that have to be treated as
     398              :      * committed. Since we will only ever look at tuples from transactions
     399              :      * that have modified the catalog it's more efficient to store those few
     400              :      * that exist between xmin and xmax (frequently there are none).
     401              :      *
     402              :      * Snapshots that are used in transactions that have modified the catalog
     403              :      * also use the 'subxip' array to store their toplevel xid and all the
     404              :      * subtransaction xids so we can recognize when we need to treat rows as
     405              :      * visible that are not in xip but still need to be visible. Subxip only
     406              :      * gets filled when the transaction is copied into the context of a
     407              :      * catalog modifying transaction since we otherwise share a snapshot
     408              :      * between transactions. As long as a txn hasn't modified the catalog it
     409              :      * doesn't need to treat any uncommitted rows as visible, so there is no
     410              :      * need for those xids.
     411              :      *
     412              :      * Both arrays are qsort'ed so that we can use bsearch() on them.
     413              :      */
     414              :     Assert(TransactionIdIsNormal(builder->xmin));
     415              :     Assert(TransactionIdIsNormal(builder->xmax));
     416              : 
     417         2340 :     snapshot->xmin = builder->xmin;
     418         2340 :     snapshot->xmax = builder->xmax;
     419              : 
     420              :     /* store all transactions to be treated as committed by this snapshot */
     421         2340 :     snapshot->xip =
     422         2340 :         (TransactionId *) ((char *) snapshot + sizeof(SnapshotData));
     423         2340 :     snapshot->xcnt = builder->committed.xcnt;
     424         2340 :     memcpy(snapshot->xip,
     425         2340 :            builder->committed.xip,
     426         2340 :            builder->committed.xcnt * sizeof(TransactionId));
     427              : 
     428              :     /* sort so we can bsearch() */
     429         2340 :     qsort(snapshot->xip, snapshot->xcnt, sizeof(TransactionId), xidComparator);
     430              : 
     431              :     /*
     432              :      * Initially, subxip is empty, i.e. it's a snapshot to be used by
     433              :      * transactions that don't modify the catalog. Will be filled by
     434              :      * ReorderBufferCopySnap() if necessary.
     435              :      */
     436         2340 :     snapshot->subxcnt = 0;
     437         2340 :     snapshot->subxip = NULL;
     438              : 
     439         2340 :     snapshot->suboverflowed = false;
     440         2340 :     snapshot->takenDuringRecovery = false;
     441         2340 :     snapshot->copied = false;
     442         2340 :     snapshot->curcid = FirstCommandId;
     443         2340 :     snapshot->active_count = 0;
     444         2340 :     snapshot->regd_count = 0;
     445         2340 :     snapshot->snapXactCompletionCount = 0;
     446              : 
     447         2340 :     return snapshot;
     448              : }
     449              : 
     450              : /*
     451              :  * Build the initial slot snapshot and convert it to a normal snapshot that
     452              :  * is understood by HeapTupleSatisfiesMVCC.
     453              :  *
     454              :  * The snapshot will be usable directly in current transaction or exported
     455              :  * for loading in different transaction.
     456              :  */
     457              : Snapshot
     458          222 : SnapBuildInitialSnapshot(SnapBuild *builder)
     459              : {
     460              :     Snapshot    snap;
     461              :     TransactionId xid;
     462              :     TransactionId safeXid;
     463              :     TransactionId *newxip;
     464          222 :     int         newxcnt = 0;
     465              : 
     466              :     Assert(XactIsoLevel == XACT_REPEATABLE_READ);
     467              :     Assert(builder->building_full_snapshot);
     468              : 
     469              :     /* don't allow older snapshots */
     470          222 :     InvalidateCatalogSnapshot();    /* about to overwrite MyProc->xmin */
     471          222 :     if (HaveRegisteredOrActiveSnapshot())
     472            0 :         elog(ERROR, "cannot build an initial slot snapshot when snapshots exist");
     473              :     Assert(!HistoricSnapshotActive());
     474              : 
     475          222 :     if (builder->state != SNAPBUILD_CONSISTENT)
     476            0 :         elog(ERROR, "cannot build an initial slot snapshot before reaching a consistent state");
     477              : 
     478          222 :     if (!builder->committed.includes_all_transactions)
     479            0 :         elog(ERROR, "cannot build an initial slot snapshot, not all transactions are monitored anymore");
     480              : 
     481              :     /* so we don't overwrite the existing value */
     482          222 :     if (TransactionIdIsValid(MyProc->xmin))
     483            0 :         elog(ERROR, "cannot build an initial slot snapshot when MyProc->xmin already is valid");
     484              : 
     485          222 :     snap = SnapBuildBuildSnapshot(builder);
     486              : 
     487              :     /*
     488              :      * We know that snap->xmin is alive, enforced by the logical xmin
     489              :      * mechanism. Due to that we can do this without locks, we're only
     490              :      * changing our own value.
     491              :      *
     492              :      * Building an initial snapshot is expensive and an unenforced xmin
     493              :      * horizon would have bad consequences, therefore always double-check that
     494              :      * the horizon is enforced.
     495              :      */
     496          222 :     LWLockAcquire(ProcArrayLock, LW_SHARED);
     497          222 :     safeXid = GetOldestSafeDecodingTransactionId(false);
     498          222 :     LWLockRelease(ProcArrayLock);
     499              : 
     500          222 :     if (TransactionIdFollows(safeXid, snap->xmin))
     501            0 :         elog(ERROR, "cannot build an initial slot snapshot as oldest safe xid %u follows snapshot's xmin %u",
     502              :              safeXid, snap->xmin);
     503              : 
     504          222 :     MyProc->xmin = snap->xmin;
     505              : 
     506              :     /* allocate in transaction context */
     507          222 :     newxip = palloc_array(TransactionId, GetMaxSnapshotXidCount());
     508              : 
     509              :     /*
     510              :      * snapbuild.c builds transactions in an "inverted" manner, which means it
     511              :      * stores committed transactions in ->xip, not ones in progress. Build a
     512              :      * classical snapshot by marking all non-committed transactions as
     513              :      * in-progress. This can be expensive.
     514              :      */
     515          222 :     for (xid = snap->xmin; NormalTransactionIdPrecedes(xid, snap->xmax);)
     516              :     {
     517              :         void       *test;
     518              : 
     519              :         /*
     520              :          * Check whether transaction committed using the decoding snapshot
     521              :          * meaning of ->xip.
     522              :          */
     523            0 :         test = bsearch(&xid, snap->xip, snap->xcnt,
     524              :                        sizeof(TransactionId), xidComparator);
     525              : 
     526            0 :         if (test == NULL)
     527              :         {
     528            0 :             if (newxcnt >= GetMaxSnapshotXidCount())
     529            0 :                 ereport(ERROR,
     530              :                         (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
     531              :                          errmsg("initial slot snapshot too large")));
     532              : 
     533            0 :             newxip[newxcnt++] = xid;
     534              :         }
     535              : 
     536            0 :         TransactionIdAdvance(xid);
     537              :     }
     538              : 
     539              :     /* adjust remaining snapshot fields as needed */
     540          222 :     snap->snapshot_type = SNAPSHOT_MVCC;
     541          222 :     snap->xcnt = newxcnt;
     542          222 :     snap->xip = newxip;
     543              : 
     544          222 :     return snap;
     545              : }
     546              : 
     547              : /*
     548              :  * Export a snapshot so it can be set in another session with SET TRANSACTION
     549              :  * SNAPSHOT.
     550              :  *
     551              :  * For that we need to start a transaction in the current backend as the
     552              :  * importing side checks whether the source transaction is still open to make
     553              :  * sure the xmin horizon hasn't advanced since then.
     554              :  */
     555              : const char *
     556            1 : SnapBuildExportSnapshot(SnapBuild *builder)
     557              : {
     558              :     Snapshot    snap;
     559              :     char       *snapname;
     560              : 
     561            1 :     if (IsTransactionOrTransactionBlock())
     562            0 :         elog(ERROR, "cannot export a snapshot from within a transaction");
     563              : 
     564            1 :     if (SavedResourceOwnerDuringExport)
     565            0 :         elog(ERROR, "can only export one snapshot at a time");
     566              : 
     567            1 :     SavedResourceOwnerDuringExport = CurrentResourceOwner;
     568            1 :     ExportInProgress = true;
     569              : 
     570            1 :     StartTransactionCommand();
     571              : 
     572              :     /* There doesn't seem to a nice API to set these */
     573            1 :     XactIsoLevel = XACT_REPEATABLE_READ;
     574            1 :     XactReadOnly = true;
     575              : 
     576            1 :     snap = SnapBuildInitialSnapshot(builder);
     577              : 
     578              :     /*
     579              :      * now that we've built a plain snapshot, make it active and use the
     580              :      * normal mechanisms for exporting it
     581              :      */
     582            1 :     snapname = ExportSnapshot(snap);
     583              : 
     584            1 :     ereport(LOG,
     585              :             (errmsg_plural("exported logical decoding snapshot: \"%s\" with %u transaction ID",
     586              :                            "exported logical decoding snapshot: \"%s\" with %u transaction IDs",
     587              :                            snap->xcnt,
     588              :                            snapname, snap->xcnt)));
     589            1 :     return snapname;
     590              : }
     591              : 
     592              : /*
     593              :  * Ensure there is a snapshot and if not build one for current transaction.
     594              :  */
     595              : Snapshot
     596            8 : SnapBuildGetOrBuildSnapshot(SnapBuild *builder)
     597              : {
     598              :     Assert(builder->state == SNAPBUILD_CONSISTENT);
     599              : 
     600              :     /* only build a new snapshot if we don't have a prebuilt one */
     601            8 :     if (builder->snapshot == NULL)
     602              :     {
     603            1 :         builder->snapshot = SnapBuildBuildSnapshot(builder);
     604              :         /* increase refcount for the snapshot builder */
     605            1 :         SnapBuildSnapIncRefcount(builder->snapshot);
     606              :     }
     607              : 
     608            8 :     return builder->snapshot;
     609              : }
     610              : 
     611              : /*
     612              :  * Reset a previously SnapBuildExportSnapshot()'ed snapshot if there is
     613              :  * any. Aborts the previously started transaction and resets the resource
     614              :  * owner back to its original value.
     615              :  */
     616              : void
     617         5847 : SnapBuildClearExportedSnapshot(void)
     618              : {
     619              :     ResourceOwner tmpResOwner;
     620              : 
     621              :     /* nothing exported, that is the usual case */
     622         5847 :     if (!ExportInProgress)
     623         5846 :         return;
     624              : 
     625            1 :     if (!IsTransactionState())
     626            0 :         elog(ERROR, "clearing exported snapshot in wrong transaction state");
     627              : 
     628              :     /*
     629              :      * AbortCurrentTransaction() takes care of resetting the snapshot state,
     630              :      * so remember SavedResourceOwnerDuringExport.
     631              :      */
     632            1 :     tmpResOwner = SavedResourceOwnerDuringExport;
     633              : 
     634              :     /* make sure nothing could have ever happened */
     635            1 :     AbortCurrentTransaction();
     636              : 
     637            1 :     CurrentResourceOwner = tmpResOwner;
     638              : }
     639              : 
     640              : /*
     641              :  * Clear snapshot export state during transaction abort.
     642              :  */
     643              : void
     644        35446 : SnapBuildResetExportedSnapshotState(void)
     645              : {
     646        35446 :     SavedResourceOwnerDuringExport = NULL;
     647        35446 :     ExportInProgress = false;
     648        35446 : }
     649              : 
     650              : /*
     651              :  * Handle the effects of a single heap change, appropriate to the current state
     652              :  * of the snapshot builder and returns whether changes made at (xid, lsn) can
     653              :  * be decoded.
     654              :  */
     655              : bool
     656      1568395 : SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
     657              : {
     658              :     /*
     659              :      * We can't handle data in transactions if we haven't built a snapshot
     660              :      * yet, so don't store them.
     661              :      */
     662      1568395 :     if (builder->state < SNAPBUILD_FULL_SNAPSHOT)
     663            0 :         return false;
     664              : 
     665              :     /*
     666              :      * No point in keeping track of changes in transactions that we don't have
     667              :      * enough information about to decode. This means that they started before
     668              :      * we got into the SNAPBUILD_FULL_SNAPSHOT state.
     669              :      */
     670      1568398 :     if (builder->state < SNAPBUILD_CONSISTENT &&
     671            3 :         TransactionIdPrecedes(xid, builder->next_phase_at))
     672            0 :         return false;
     673              : 
     674              :     /*
     675              :      * If the reorderbuffer doesn't yet have a snapshot, add one now, it will
     676              :      * be needed to decode the change we're currently processing.
     677              :      */
     678      1568395 :     if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
     679              :     {
     680              :         /* only build a new snapshot if we don't have a prebuilt one */
     681         4177 :         if (builder->snapshot == NULL)
     682              :         {
     683          462 :             builder->snapshot = SnapBuildBuildSnapshot(builder);
     684              :             /* increase refcount for the snapshot builder */
     685          462 :             SnapBuildSnapIncRefcount(builder->snapshot);
     686              :         }
     687              : 
     688              :         /*
     689              :          * Increase refcount for the transaction we're handing the snapshot
     690              :          * out to.
     691              :          */
     692         4177 :         SnapBuildSnapIncRefcount(builder->snapshot);
     693         4177 :         ReorderBufferSetBaseSnapshot(builder->reorder, xid, lsn,
     694              :                                      builder->snapshot);
     695              :     }
     696              : 
     697      1568395 :     return true;
     698              : }
     699              : 
     700              : /*
     701              :  * Do CommandId/combo CID handling after reading an xl_heap_new_cid record.
     702              :  * This implies that a transaction has done some form of write to system
     703              :  * catalogs.
     704              :  */
     705              : void
     706        28203 : SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
     707              :                        XLogRecPtr lsn, xl_heap_new_cid *xlrec)
     708              : {
     709              :     CommandId   cid;
     710              : 
     711              :     /*
     712              :      * we only log new_cid's if a catalog tuple was modified, so mark the
     713              :      * transaction as containing catalog modifications
     714              :      */
     715        28203 :     ReorderBufferXidSetCatalogChanges(builder->reorder, xid, lsn);
     716              : 
     717        28203 :     ReorderBufferAddNewTupleCids(builder->reorder, xlrec->top_xid, lsn,
     718              :                                  xlrec->target_locator, xlrec->target_tid,
     719              :                                  xlrec->cmin, xlrec->cmax,
     720              :                                  xlrec->combocid);
     721              : 
     722              :     /* figure out new command id */
     723        28203 :     if (xlrec->cmin != InvalidCommandId &&
     724        23886 :         xlrec->cmax != InvalidCommandId)
     725         3323 :         cid = Max(xlrec->cmin, xlrec->cmax);
     726        24880 :     else if (xlrec->cmax != InvalidCommandId)
     727         4317 :         cid = xlrec->cmax;
     728        20563 :     else if (xlrec->cmin != InvalidCommandId)
     729        20563 :         cid = xlrec->cmin;
     730              :     else
     731              :     {
     732            0 :         cid = InvalidCommandId; /* silence compiler */
     733            0 :         elog(ERROR, "xl_heap_new_cid record without a valid CommandId");
     734              :     }
     735              : 
     736        28203 :     ReorderBufferAddNewCommandId(builder->reorder, xid, lsn, cid + 1);
     737        28203 : }
     738              : 
     739              : /*
     740              :  * Add a new Snapshot and invalidation messages to all transactions we're
     741              :  * decoding that currently are in-progress so they can see new catalog contents
     742              :  * made by the transaction that just committed. This is necessary because those
     743              :  * in-progress transactions will use the new catalog's contents from here on
     744              :  * (at the very least everything they do needs to be compatible with newer
     745              :  * catalog contents).
     746              :  */
     747              : static void
     748         1647 : SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid)
     749              : {
     750              :     dlist_iter  txn_i;
     751              :     ReorderBufferTXN *txn;
     752              : 
     753              :     /*
     754              :      * Iterate through all toplevel transactions. This can include
     755              :      * subtransactions which we just don't yet know to be that, but that's
     756              :      * fine, they will just get an unnecessary snapshot and invalidations
     757              :      * queued.
     758              :      */
     759         3328 :     dlist_foreach(txn_i, &builder->reorder->toplevel_by_lsn)
     760              :     {
     761         1681 :         txn = dlist_container(ReorderBufferTXN, node, txn_i.cur);
     762              : 
     763              :         Assert(TransactionIdIsValid(txn->xid));
     764              : 
     765              :         /*
     766              :          * If we don't have a base snapshot yet, there are no changes in this
     767              :          * transaction which in turn implies we don't yet need a snapshot at
     768              :          * all. We'll add a snapshot when the first change gets queued.
     769              :          *
     770              :          * Similarly, we don't need to add invalidations to a transaction
     771              :          * whose base snapshot is not yet set. Once a base snapshot is built,
     772              :          * it will include the xids of committed transactions that have
     773              :          * modified the catalog, thus reflecting the new catalog contents. The
     774              :          * existing catalog cache will have already been invalidated after
     775              :          * processing the invalidations in the transaction that modified
     776              :          * catalogs, ensuring that a fresh cache is constructed during
     777              :          * decoding.
     778              :          *
     779              :          * NB: This works correctly even for subtransactions because
     780              :          * ReorderBufferAssignChild() takes care to transfer the base snapshot
     781              :          * to the top-level transaction, and while iterating the changequeue
     782              :          * we'll get the change from the subtxn.
     783              :          */
     784         1681 :         if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, txn->xid))
     785            2 :             continue;
     786              : 
     787              :         /*
     788              :          * We don't need to add snapshot or invalidations to prepared
     789              :          * transactions as they should not see the new catalog contents.
     790              :          */
     791         1679 :         if (rbtxn_is_prepared(txn))
     792           28 :             continue;
     793              : 
     794         1651 :         elog(DEBUG2, "adding a new snapshot and invalidations to %u at %X/%08X",
     795              :              txn->xid, LSN_FORMAT_ARGS(lsn));
     796              : 
     797              :         /*
     798              :          * increase the snapshot's refcount for the transaction we are handing
     799              :          * it out to
     800              :          */
     801         1651 :         SnapBuildSnapIncRefcount(builder->snapshot);
     802         1651 :         ReorderBufferAddSnapshot(builder->reorder, txn->xid, lsn,
     803              :                                  builder->snapshot);
     804              : 
     805              :         /*
     806              :          * Add invalidation messages to the reorder buffer of in-progress
     807              :          * transactions except the current committed transaction, for which we
     808              :          * will execute invalidations at the end.
     809              :          *
     810              :          * It is required, otherwise, we will end up using the stale catcache
     811              :          * contents built by the current transaction even after its decoding,
     812              :          * which should have been invalidated due to concurrent catalog
     813              :          * changing transaction.
     814              :          *
     815              :          * Distribute only the invalidation messages generated by the current
     816              :          * committed transaction. Invalidation messages received from other
     817              :          * transactions would have already been propagated to the relevant
     818              :          * in-progress transactions. This transaction would have processed
     819              :          * those invalidations, ensuring that subsequent transactions observe
     820              :          * a consistent cache state.
     821              :          */
     822         1651 :         if (txn->xid != xid)
     823              :         {
     824              :             uint32      ninvalidations;
     825           32 :             SharedInvalidationMessage *msgs = NULL;
     826              : 
     827           32 :             ninvalidations = ReorderBufferGetInvalidations(builder->reorder,
     828              :                                                            xid, &msgs);
     829              : 
     830           32 :             if (ninvalidations > 0)
     831              :             {
     832              :                 Assert(msgs != NULL);
     833              : 
     834           28 :                 ReorderBufferAddDistributedInvalidations(builder->reorder,
     835              :                                                          txn->xid, lsn,
     836              :                                                          ninvalidations, msgs);
     837              :             }
     838              :         }
     839              :     }
     840         1647 : }
     841              : 
     842              : /*
     843              :  * Keep track of a new catalog changing transaction that has committed.
     844              :  */
     845              : static void
     846         1656 : SnapBuildAddCommittedTxn(SnapBuild *builder, TransactionId xid)
     847              : {
     848              :     Assert(TransactionIdIsValid(xid));
     849              : 
     850         1656 :     if (builder->committed.xcnt == builder->committed.xcnt_space)
     851              :     {
     852            0 :         builder->committed.xcnt_space = builder->committed.xcnt_space * 2 + 1;
     853              : 
     854            0 :         elog(DEBUG1, "increasing space for committed transactions to %u",
     855              :              (uint32) builder->committed.xcnt_space);
     856              : 
     857            0 :         builder->committed.xip = repalloc_array(builder->committed.xip,
     858              :                                                 TransactionId,
     859              :                                                 builder->committed.xcnt_space);
     860              :     }
     861              : 
     862              :     /*
     863              :      * TODO: It might make sense to keep the array sorted here instead of
     864              :      * doing it every time we build a new snapshot. On the other hand this
     865              :      * gets called repeatedly when a transaction with subtransactions commits.
     866              :      */
     867         1656 :     builder->committed.xip[builder->committed.xcnt++] = xid;
     868         1656 : }
     869              : 
     870              : /*
     871              :  * Remove knowledge about transactions we treat as committed or containing catalog
     872              :  * changes that are smaller than ->xmin. Those won't ever get checked via
     873              :  * the ->committed or ->catchange array, respectively. The committed xids will
     874              :  * get checked via the clog machinery.
     875              :  *
     876              :  * We can ideally remove the transaction from catchange array once it is
     877              :  * finished (committed/aborted) but that could be costly as we need to maintain
     878              :  * the xids order in the array.
     879              :  */
     880              : static void
     881          549 : SnapBuildPurgeOlderTxn(SnapBuild *builder)
     882              : {
     883              :     int         off;
     884              :     TransactionId *workspace;
     885          549 :     int         surviving_xids = 0;
     886              : 
     887              :     /* not ready yet */
     888          549 :     if (!TransactionIdIsNormal(builder->xmin))
     889            0 :         return;
     890              : 
     891              :     /* TODO: Neater algorithm than just copying and iterating? */
     892              :     workspace =
     893          549 :         MemoryContextAlloc(builder->context,
     894          549 :                            builder->committed.xcnt * sizeof(TransactionId));
     895              : 
     896              :     /* copy xids that still are interesting to workspace */
     897          971 :     for (off = 0; off < builder->committed.xcnt; off++)
     898              :     {
     899          422 :         if (NormalTransactionIdPrecedes(builder->committed.xip[off],
     900              :                                         builder->xmin))
     901              :             ;                   /* remove */
     902              :         else
     903            1 :             workspace[surviving_xids++] = builder->committed.xip[off];
     904              :     }
     905              : 
     906              :     /* copy workspace back to persistent state */
     907          549 :     memcpy(builder->committed.xip, workspace,
     908              :            surviving_xids * sizeof(TransactionId));
     909              : 
     910          549 :     elog(DEBUG3, "purged committed transactions from %u to %u, xmin: %u, xmax: %u",
     911              :          (uint32) builder->committed.xcnt, (uint32) surviving_xids,
     912              :          builder->xmin, builder->xmax);
     913          549 :     builder->committed.xcnt = surviving_xids;
     914              : 
     915          549 :     pfree(workspace);
     916              : 
     917              :     /*
     918              :      * Purge xids in ->catchange as well. The purged array must also be sorted
     919              :      * in xidComparator order.
     920              :      */
     921          549 :     if (builder->catchange.xcnt > 0)
     922              :     {
     923              :         /*
     924              :          * Since catchange.xip is sorted, we find the lower bound of xids that
     925              :          * are still interesting.
     926              :          */
     927            9 :         for (off = 0; off < builder->catchange.xcnt; off++)
     928              :         {
     929            6 :             if (TransactionIdFollowsOrEquals(builder->catchange.xip[off],
     930              :                                              builder->xmin))
     931            1 :                 break;
     932              :         }
     933              : 
     934            4 :         surviving_xids = builder->catchange.xcnt - off;
     935              : 
     936            4 :         if (surviving_xids > 0)
     937              :         {
     938            1 :             memmove(builder->catchange.xip, &(builder->catchange.xip[off]),
     939              :                     surviving_xids * sizeof(TransactionId));
     940              :         }
     941              :         else
     942              :         {
     943            3 :             pfree(builder->catchange.xip);
     944            3 :             builder->catchange.xip = NULL;
     945              :         }
     946              : 
     947            4 :         elog(DEBUG3, "purged catalog modifying transactions from %u to %u, xmin: %u, xmax: %u",
     948              :              (uint32) builder->catchange.xcnt, (uint32) surviving_xids,
     949              :              builder->xmin, builder->xmax);
     950            4 :         builder->catchange.xcnt = surviving_xids;
     951              :     }
     952              : }
     953              : 
     954              : /*
     955              :  * Handle everything that needs to be done when a transaction commits
     956              :  */
     957              : void
     958         3965 : SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
     959              :                    int nsubxacts, TransactionId *subxacts, uint32 xinfo)
     960              : {
     961              :     int         nxact;
     962              : 
     963         3965 :     bool        needs_snapshot = false;
     964         3965 :     bool        needs_timetravel = false;
     965         3965 :     bool        sub_needs_timetravel = false;
     966              : 
     967         3965 :     TransactionId xmax = xid;
     968              : 
     969              :     /*
     970              :      * Transactions preceding BUILDING_SNAPSHOT will neither be decoded, nor
     971              :      * will they be part of a snapshot.  So we don't need to record anything.
     972              :      */
     973         3965 :     if (builder->state == SNAPBUILD_START ||
     974         3965 :         (builder->state == SNAPBUILD_BUILDING_SNAPSHOT &&
     975            0 :          TransactionIdPrecedes(xid, builder->next_phase_at)))
     976              :     {
     977              :         /* ensure that only commits after this are getting replayed */
     978            0 :         if (builder->start_decoding_at <= lsn)
     979            0 :             builder->start_decoding_at = lsn + 1;
     980            0 :         return;
     981              :     }
     982              : 
     983         3965 :     if (builder->state < SNAPBUILD_CONSISTENT)
     984              :     {
     985              :         /* ensure that only commits after this are getting replayed */
     986            5 :         if (builder->start_decoding_at <= lsn)
     987            2 :             builder->start_decoding_at = lsn + 1;
     988              : 
     989              :         /*
     990              :          * If building an exportable snapshot, force xid to be tracked, even
     991              :          * if the transaction didn't modify the catalog.
     992              :          */
     993            5 :         if (builder->building_full_snapshot)
     994              :         {
     995            0 :             needs_timetravel = true;
     996              :         }
     997              :     }
     998              : 
     999         5201 :     for (nxact = 0; nxact < nsubxacts; nxact++)
    1000              :     {
    1001         1236 :         TransactionId subxid = subxacts[nxact];
    1002              : 
    1003              :         /*
    1004              :          * Add subtransaction to base snapshot if catalog modifying, we don't
    1005              :          * distinguish to toplevel transactions there.
    1006              :          */
    1007         1236 :         if (SnapBuildXidHasCatalogChanges(builder, subxid, xinfo))
    1008              :         {
    1009            9 :             sub_needs_timetravel = true;
    1010            9 :             needs_snapshot = true;
    1011              : 
    1012            9 :             elog(DEBUG1, "found subtransaction %u:%u with catalog changes",
    1013              :                  xid, subxid);
    1014              : 
    1015            9 :             SnapBuildAddCommittedTxn(builder, subxid);
    1016              : 
    1017            9 :             if (NormalTransactionIdFollows(subxid, xmax))
    1018            9 :                 xmax = subxid;
    1019              :         }
    1020              : 
    1021              :         /*
    1022              :          * If we're forcing timetravel we also need visibility information
    1023              :          * about subtransaction, so keep track of subtransaction's state, even
    1024              :          * if not catalog modifying.  Don't need to distribute a snapshot in
    1025              :          * that case.
    1026              :          */
    1027         1227 :         else if (needs_timetravel)
    1028              :         {
    1029            0 :             SnapBuildAddCommittedTxn(builder, subxid);
    1030            0 :             if (NormalTransactionIdFollows(subxid, xmax))
    1031            0 :                 xmax = subxid;
    1032              :         }
    1033              :     }
    1034              : 
    1035              :     /* if top-level modified catalog, it'll need a snapshot */
    1036         3965 :     if (SnapBuildXidHasCatalogChanges(builder, xid, xinfo))
    1037              :     {
    1038         1646 :         elog(DEBUG2, "found top level transaction %u, with catalog changes",
    1039              :              xid);
    1040         1646 :         needs_snapshot = true;
    1041         1646 :         needs_timetravel = true;
    1042         1646 :         SnapBuildAddCommittedTxn(builder, xid);
    1043              :     }
    1044         2319 :     else if (sub_needs_timetravel)
    1045              :     {
    1046              :         /* track toplevel txn as well, subxact alone isn't meaningful */
    1047            1 :         elog(DEBUG2, "forced transaction %u to do timetravel due to one of its subtransactions",
    1048              :              xid);
    1049            1 :         needs_timetravel = true;
    1050            1 :         SnapBuildAddCommittedTxn(builder, xid);
    1051              :     }
    1052         2318 :     else if (needs_timetravel)
    1053              :     {
    1054            0 :         elog(DEBUG2, "forced transaction %u to do timetravel", xid);
    1055              : 
    1056            0 :         SnapBuildAddCommittedTxn(builder, xid);
    1057              :     }
    1058              : 
    1059         3965 :     if (!needs_timetravel)
    1060              :     {
    1061              :         /* record that we cannot export a general snapshot anymore */
    1062         2318 :         builder->committed.includes_all_transactions = false;
    1063              :     }
    1064              : 
    1065              :     Assert(!needs_snapshot || needs_timetravel);
    1066              : 
    1067              :     /*
    1068              :      * Adjust xmax of the snapshot builder, we only do that for committed,
    1069              :      * catalog modifying, transactions, everything else isn't interesting for
    1070              :      * us since we'll never look at the respective rows.
    1071              :      */
    1072         3965 :     if (needs_timetravel &&
    1073         3294 :         (!TransactionIdIsValid(builder->xmax) ||
    1074         1647 :          TransactionIdFollowsOrEquals(xmax, builder->xmax)))
    1075              :     {
    1076         1647 :         builder->xmax = xmax;
    1077         1647 :         TransactionIdAdvance(builder->xmax);
    1078              :     }
    1079              : 
    1080              :     /* if there's any reason to build a historic snapshot, do so now */
    1081         3965 :     if (needs_snapshot)
    1082              :     {
    1083              :         /*
    1084              :          * If we haven't built a complete snapshot yet there's no need to hand
    1085              :          * it out, it wouldn't (and couldn't) be used anyway.
    1086              :          */
    1087         1647 :         if (builder->state < SNAPBUILD_FULL_SNAPSHOT)
    1088            0 :             return;
    1089              : 
    1090              :         /*
    1091              :          * Decrease the snapshot builder's refcount of the old snapshot, note
    1092              :          * that it still will be used if it has been handed out to the
    1093              :          * reorderbuffer earlier.
    1094              :          */
    1095         1647 :         if (builder->snapshot)
    1096         1647 :             SnapBuildSnapDecRefcount(builder->snapshot);
    1097              : 
    1098         1647 :         builder->snapshot = SnapBuildBuildSnapshot(builder);
    1099              : 
    1100              :         /* we might need to execute invalidations, add snapshot */
    1101         1647 :         if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
    1102              :         {
    1103            8 :             SnapBuildSnapIncRefcount(builder->snapshot);
    1104            8 :             ReorderBufferSetBaseSnapshot(builder->reorder, xid, lsn,
    1105              :                                          builder->snapshot);
    1106              :         }
    1107              : 
    1108              :         /* refcount of the snapshot builder for the new snapshot */
    1109         1647 :         SnapBuildSnapIncRefcount(builder->snapshot);
    1110              : 
    1111              :         /*
    1112              :          * Add a new catalog snapshot and invalidations messages to all
    1113              :          * currently running transactions.
    1114              :          */
    1115         1647 :         SnapBuildDistributeSnapshotAndInval(builder, lsn, xid);
    1116              :     }
    1117              : }
    1118              : 
    1119              : /*
    1120              :  * Check the reorder buffer and the snapshot to see if the given transaction has
    1121              :  * modified catalogs.
    1122              :  */
    1123              : static inline bool
    1124         5201 : SnapBuildXidHasCatalogChanges(SnapBuild *builder, TransactionId xid,
    1125              :                               uint32 xinfo)
    1126              : {
    1127         5201 :     if (ReorderBufferXidHasCatalogChanges(builder->reorder, xid))
    1128         1651 :         return true;
    1129              : 
    1130              :     /*
    1131              :      * The transactions that have changed catalogs must have invalidation
    1132              :      * info.
    1133              :      */
    1134         3550 :     if (!(xinfo & XACT_XINFO_HAS_INVALS))
    1135         3542 :         return false;
    1136              : 
    1137              :     /* Check the catchange XID array */
    1138           12 :     return ((builder->catchange.xcnt > 0) &&
    1139            4 :             (bsearch(&xid, builder->catchange.xip, builder->catchange.xcnt,
    1140              :                      sizeof(TransactionId), xidComparator) != NULL));
    1141              : }
    1142              : 
    1143              : /* -----------------------------------
    1144              :  * Snapshot building functions dealing with xlog records
    1145              :  * -----------------------------------
    1146              :  */
    1147              : 
    1148              : /*
    1149              :  * Process a running xacts record, and use its information to first build a
    1150              :  * historic snapshot and later to release resources that aren't needed
    1151              :  * anymore.
    1152              :  */
    1153              : void
    1154         1714 : SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running,
    1155              :                              bool db_specific)
    1156              : {
    1157              :     ReorderBufferTXN *txn;
    1158              :     TransactionId xmin;
    1159              : 
    1160              :     /*
    1161              :      * If we're not consistent yet, inspect the record to see whether it
    1162              :      * allows to get closer to being consistent. If we are consistent, dump
    1163              :      * our snapshot so others or we, after a restart, can use it.
    1164              :      */
    1165         1714 :     if (builder->state < SNAPBUILD_CONSISTENT)
    1166              :     {
    1167              :         /*
    1168              :          * To reduce the potential for unnecessarily waiting for completion of
    1169              :          * unrelated transactions, the caller can declare that only
    1170              :          * transactions of the current database are relevant at this stage.
    1171              :          */
    1172         1187 :         if (db_specific)
    1173              :         {
    1174              :             /*
    1175              :              * If we must only keep track of transactions running in the
    1176              :              * current database, we need transaction info from exactly that
    1177              :              * database.
    1178              :              */
    1179           12 :             if (running->dbid != MyDatabaseId)
    1180              :             {
    1181            6 :                 LogStandbySnapshot(MyDatabaseId);
    1182              : 
    1183            6 :                 return;
    1184              :             }
    1185              : 
    1186              :             /*
    1187              :              * We'd better be able to check during scan if the plugin does not
    1188              :              * lie.
    1189              :              */
    1190            6 :             if (accessSharedCatalogsInDecoding)
    1191            6 :                 accessSharedCatalogsInDecoding = false;
    1192              :         }
    1193              : 
    1194              :         /* returns false if there's no point in performing cleanup just yet */
    1195         1181 :         if (!SnapBuildFindSnapshot(builder, lsn, running))
    1196         1157 :             return;
    1197              :     }
    1198              :     else
    1199          527 :         SnapBuildSerialize(builder, lsn);
    1200              : 
    1201              :     /*
    1202              :      * Database specific transaction info may exist to reach CONSISTENT state
    1203              :      * faster, however the code below makes no use of it. Moreover, such
    1204              :      * record might cause problems because the following normal (cluster-wide)
    1205              :      * record can have lower value of oldestRunningXid. In that case, let's
    1206              :      * wait with the cleanup for the next regular cluster-wide record.
    1207              :      */
    1208          549 :     if (OidIsValid(running->dbid))
    1209            0 :         return;
    1210              : 
    1211              :     /*
    1212              :      * Update range of interesting xids based on the running xacts
    1213              :      * information. We don't increase ->xmax using it, because once we are in
    1214              :      * a consistent state we can do that ourselves and much more efficiently
    1215              :      * so, because we only need to do it for catalog transactions since we
    1216              :      * only ever look at those.
    1217              :      *
    1218              :      * NB: We only increase xmax when a catalog modifying transaction commits
    1219              :      * (see SnapBuildCommitTxn).  Because of this, xmax can be lower than
    1220              :      * xmin, which looks odd but is correct and actually more efficient, since
    1221              :      * we hit fast paths in heapam_visibility.c.
    1222              :      */
    1223          549 :     builder->xmin = running->oldestRunningXid;
    1224              : 
    1225              :     /* Remove transactions we don't need to keep track off anymore */
    1226          549 :     SnapBuildPurgeOlderTxn(builder);
    1227              : 
    1228              :     /*
    1229              :      * Advance the xmin limit for the current replication slot, to allow
    1230              :      * vacuum to clean up the tuples this slot has been protecting.
    1231              :      *
    1232              :      * The reorderbuffer might have an xmin among the currently running
    1233              :      * snapshots; use it if so.  If not, we need only consider the snapshots
    1234              :      * we'll produce later, which can't be less than the oldest running xid in
    1235              :      * the record we're reading now.
    1236              :      */
    1237          549 :     xmin = ReorderBufferGetOldestXmin(builder->reorder);
    1238          549 :     if (xmin == InvalidTransactionId)
    1239          499 :         xmin = running->oldestRunningXid;
    1240          549 :     elog(DEBUG3, "xmin: %u, xmax: %u, oldest running: %u, oldest xmin: %u",
    1241              :          builder->xmin, builder->xmax, running->oldestRunningXid, xmin);
    1242          549 :     LogicalIncreaseXminForSlot(lsn, xmin);
    1243              : 
    1244              :     /*
    1245              :      * Also tell the slot where we can restart decoding from. We don't want to
    1246              :      * do that after every commit because changing that implies an fsync of
    1247              :      * the logical slot's state file, so we only do it every time we see a
    1248              :      * running xacts record.
    1249              :      *
    1250              :      * Do so by looking for the oldest in progress transaction (determined by
    1251              :      * the first LSN of any of its relevant records). Every transaction
    1252              :      * remembers the last location we stored the snapshot to disk before its
    1253              :      * beginning. That point is where we can restart from.
    1254              :      */
    1255              : 
    1256              :     /*
    1257              :      * Can't know about a serialized snapshot's location if we're not
    1258              :      * consistent.
    1259              :      */
    1260          549 :     if (builder->state < SNAPBUILD_CONSISTENT)
    1261           17 :         return;
    1262              : 
    1263          532 :     txn = ReorderBufferGetOldestTXN(builder->reorder);
    1264              : 
    1265              :     /*
    1266              :      * oldest ongoing txn might have started when we didn't yet serialize
    1267              :      * anything because we hadn't reached a consistent state yet.
    1268              :      */
    1269          532 :     if (txn != NULL && XLogRecPtrIsValid(txn->restart_decoding_lsn))
    1270           27 :         LogicalIncreaseRestartDecodingForSlot(lsn, txn->restart_decoding_lsn);
    1271              : 
    1272              :     /*
    1273              :      * No in-progress transaction, can reuse the last serialized snapshot if
    1274              :      * we have one.
    1275              :      */
    1276          505 :     else if (txn == NULL &&
    1277          473 :              XLogRecPtrIsValid(builder->reorder->current_restart_decoding_lsn) &&
    1278          471 :              XLogRecPtrIsValid(builder->last_serialized_snapshot))
    1279          471 :         LogicalIncreaseRestartDecodingForSlot(lsn,
    1280              :                                               builder->last_serialized_snapshot);
    1281              : }
    1282              : 
    1283              : 
    1284              : /*
    1285              :  * Build the start of a snapshot that's capable of decoding the catalog.
    1286              :  *
    1287              :  * Helper function for SnapBuildProcessRunningXacts() while we're not yet
    1288              :  * consistent.
    1289              :  *
    1290              :  * Returns true if there is a point in performing internal maintenance/cleanup
    1291              :  * using the xl_running_xacts record.
    1292              :  */
    1293              : static bool
    1294         1181 : SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
    1295              : {
    1296              :     /* ---
    1297              :      * Build catalog decoding snapshot incrementally using information about
    1298              :      * the currently running transactions. There are several ways to do that:
    1299              :      *
    1300              :      * a) There were no running transactions when the xl_running_xacts record
    1301              :      *    was inserted, jump to CONSISTENT immediately. We might find such a
    1302              :      *    state while waiting on c)'s sub-states.
    1303              :      *
    1304              :      * b) This (in a previous run) or another decoding slot serialized a
    1305              :      *    snapshot to disk that we can use. Can't use this method while finding
    1306              :      *    the start point for decoding changes as the restart LSN would be an
    1307              :      *    arbitrary LSN but we need to find the start point to extract changes
    1308              :      *    where we won't see the data for partial transactions. Also, we cannot
    1309              :      *    use this method when a slot needs a full snapshot for export or direct
    1310              :      *    use, as that snapshot will only contain catalog modifying transactions.
    1311              :      *
    1312              :      * c) First incrementally build a snapshot for catalog tuples
    1313              :      *    (BUILDING_SNAPSHOT), that requires all, already in-progress,
    1314              :      *    transactions to finish.  Every transaction starting after that
    1315              :      *    (FULL_SNAPSHOT state), has enough information to be decoded.  But
    1316              :      *    for older running transactions no viable snapshot exists yet, so
    1317              :      *    CONSISTENT will only be reached once all of those have finished.
    1318              :      * ---
    1319              :      */
    1320              : 
    1321              :     /*
    1322              :      * xl_running_xacts record is older than what we can use, we might not
    1323              :      * have all necessary catalog rows anymore.
    1324              :      */
    1325         1181 :     if (TransactionIdIsNormal(builder->initial_xmin_horizon) &&
    1326          506 :         NormalTransactionIdPrecedes(running->oldestRunningXid,
    1327              :                                     builder->initial_xmin_horizon))
    1328              :     {
    1329            0 :         ereport(DEBUG1,
    1330              :                 errmsg_internal("skipping snapshot at %X/%08X while building logical decoding snapshot, xmin horizon too low",
    1331              :                                 LSN_FORMAT_ARGS(lsn)),
    1332              :                 errdetail_internal("initial xmin horizon of %u vs the snapshot's %u",
    1333              :                                    builder->initial_xmin_horizon, running->oldestRunningXid));
    1334              : 
    1335              : 
    1336            0 :         SnapBuildWaitSnapshot(running, builder->initial_xmin_horizon);
    1337              : 
    1338            0 :         return true;
    1339              :     }
    1340              : 
    1341              :     /*
    1342              :      * a) No transaction were running, we can jump to consistent.
    1343              :      *
    1344              :      * This is not affected by races around xl_running_xacts, because we can
    1345              :      * miss transaction commits, but currently not transactions starting.
    1346              :      *
    1347              :      * NB: We might have already started to incrementally assemble a snapshot,
    1348              :      * so we need to be careful to deal with that.
    1349              :      */
    1350         1181 :     if (running->oldestRunningXid == running->nextXid)
    1351              :     {
    1352         1149 :         if (!XLogRecPtrIsValid(builder->start_decoding_at) ||
    1353          649 :             builder->start_decoding_at <= lsn)
    1354              :             /* can decode everything after this */
    1355          501 :             builder->start_decoding_at = lsn + 1;
    1356              : 
    1357              :         /* As no transactions were running xmin/xmax can be trivially set. */
    1358         1149 :         builder->xmin = running->nextXid; /* < are finished */
    1359         1149 :         builder->xmax = running->nextXid; /* >= are running */
    1360              : 
    1361              :         /* so we can safely use the faster comparisons */
    1362              :         Assert(TransactionIdIsNormal(builder->xmin));
    1363              :         Assert(TransactionIdIsNormal(builder->xmax));
    1364              : 
    1365         1149 :         builder->state = SNAPBUILD_CONSISTENT;
    1366         1149 :         builder->next_phase_at = InvalidTransactionId;
    1367              : 
    1368         1149 :         ereport(LogicalDecodingLogLevel(),
    1369              :                 errmsg("logical decoding found consistent point at %X/%08X",
    1370              :                        LSN_FORMAT_ARGS(lsn)),
    1371              :                 errdetail("There are no running transactions."));
    1372              : 
    1373         1149 :         return false;
    1374              :     }
    1375              : 
    1376              :     /*
    1377              :      * b) valid on disk state and while neither building full snapshot nor
    1378              :      * creating a slot.
    1379              :      */
    1380           32 :     else if (!builder->building_full_snapshot &&
    1381           50 :              !builder->in_slot_creation &&
    1382           19 :              SnapBuildRestore(builder, lsn))
    1383              :     {
    1384              :         /* there won't be any state to cleanup */
    1385            8 :         return false;
    1386              :     }
    1387              : 
    1388              :     /*
    1389              :      * c) transition from START to BUILDING_SNAPSHOT.
    1390              :      *
    1391              :      * In START state, and a xl_running_xacts record with running xacts is
    1392              :      * encountered.  In that case, switch to BUILDING_SNAPSHOT state, and
    1393              :      * record xl_running_xacts->nextXid.  Once all running xacts have finished
    1394              :      * (i.e. they're all >= nextXid), we have a complete catalog snapshot.  It
    1395              :      * might look that we could use xl_running_xacts's ->xids information to
    1396              :      * get there quicker, but that is problematic because transactions marked
    1397              :      * as running, might already have inserted their commit record - it's
    1398              :      * infeasible to change that with locking.
    1399              :      */
    1400           24 :     else if (builder->state == SNAPBUILD_START)
    1401              :     {
    1402           13 :         builder->state = SNAPBUILD_BUILDING_SNAPSHOT;
    1403           13 :         builder->next_phase_at = running->nextXid;
    1404              : 
    1405              :         /*
    1406              :          * Start with an xmin/xmax that's correct for future, when all the
    1407              :          * currently running transactions have finished. We'll update both
    1408              :          * while waiting for the pending transactions to finish.
    1409              :          */
    1410           13 :         builder->xmin = running->nextXid; /* < are finished */
    1411           13 :         builder->xmax = running->nextXid; /* >= are running */
    1412              : 
    1413              :         /* so we can safely use the faster comparisons */
    1414              :         Assert(TransactionIdIsNormal(builder->xmin));
    1415              :         Assert(TransactionIdIsNormal(builder->xmax));
    1416              : 
    1417           13 :         ereport(LOG,
    1418              :                 errmsg("logical decoding found initial starting point at %X/%08X",
    1419              :                        LSN_FORMAT_ARGS(lsn)),
    1420              :                 errdetail("Waiting for transactions (approximately %d) older than %u to end.",
    1421              :                           running->xcnt, running->nextXid));
    1422              : 
    1423           13 :         SnapBuildWaitSnapshot(running, running->nextXid);
    1424              :     }
    1425              : 
    1426              :     /*
    1427              :      * c) transition from BUILDING_SNAPSHOT to FULL_SNAPSHOT.
    1428              :      *
    1429              :      * In BUILDING_SNAPSHOT state, and this xl_running_xacts' oldestRunningXid
    1430              :      * is >= than nextXid from when we switched to BUILDING_SNAPSHOT.  This
    1431              :      * means all transactions starting afterwards have enough information to
    1432              :      * be decoded.  Switch to FULL_SNAPSHOT.
    1433              :      */
    1434           17 :     else if (builder->state == SNAPBUILD_BUILDING_SNAPSHOT &&
    1435            6 :              TransactionIdPrecedesOrEquals(builder->next_phase_at,
    1436              :                                            running->oldestRunningXid))
    1437              :     {
    1438            5 :         builder->state = SNAPBUILD_FULL_SNAPSHOT;
    1439            5 :         builder->next_phase_at = running->nextXid;
    1440              : 
    1441            5 :         ereport(LOG,
    1442              :                 errmsg("logical decoding found initial consistent point at %X/%08X",
    1443              :                        LSN_FORMAT_ARGS(lsn)),
    1444              :                 errdetail("Waiting for transactions (approximately %d) older than %u to end.",
    1445              :                           running->xcnt, running->nextXid));
    1446              : 
    1447            5 :         SnapBuildWaitSnapshot(running, running->nextXid);
    1448              :     }
    1449              : 
    1450              :     /*
    1451              :      * c) transition from FULL_SNAPSHOT to CONSISTENT.
    1452              :      *
    1453              :      * In FULL_SNAPSHOT state, and this xl_running_xacts' oldestRunningXid is
    1454              :      * >= than nextXid from when we switched to FULL_SNAPSHOT.  This means all
    1455              :      * transactions that are currently in progress have a catalog snapshot,
    1456              :      * and all their changes have been collected.  Switch to CONSISTENT.
    1457              :      */
    1458           11 :     else if (builder->state == SNAPBUILD_FULL_SNAPSHOT &&
    1459            5 :              TransactionIdPrecedesOrEquals(builder->next_phase_at,
    1460              :                                            running->oldestRunningXid))
    1461              :     {
    1462            5 :         builder->state = SNAPBUILD_CONSISTENT;
    1463            5 :         builder->next_phase_at = InvalidTransactionId;
    1464              : 
    1465            5 :         ereport(LogicalDecodingLogLevel(),
    1466              :                 errmsg("logical decoding found consistent point at %X/%08X",
    1467              :                        LSN_FORMAT_ARGS(lsn)),
    1468              :                 errdetail("There are no old transactions anymore."));
    1469              :     }
    1470              : 
    1471              :     /*
    1472              :      * We already started to track running xacts and need to wait for all
    1473              :      * in-progress ones to finish. We fall through to the normal processing of
    1474              :      * records so incremental cleanup can be performed.
    1475              :      */
    1476           22 :     return true;
    1477              : }
    1478              : 
    1479              : /* ---
    1480              :  * Iterate through xids in record, wait for all older than the cutoff to
    1481              :  * finish.  Then, if possible, log a new xl_running_xacts record.
    1482              :  *
    1483              :  * This isn't required for the correctness of decoding, but to:
    1484              :  * a) allow isolationtester to notice that we're currently waiting for
    1485              :  *    something.
    1486              :  * b) log a new xl_running_xacts record where it'd be helpful, without having
    1487              :  *    to wait for bgwriter or checkpointer.
    1488              :  * ---
    1489              :  */
    1490              : static void
    1491           18 : SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff)
    1492              : {
    1493              :     int         off;
    1494              : 
    1495           34 :     for (off = 0; off < running->xcnt; off++)
    1496              :     {
    1497           18 :         TransactionId xid = running->xids[off];
    1498              : 
    1499              :         /*
    1500              :          * Upper layers should prevent that we ever need to wait on ourselves.
    1501              :          * Check anyway, since failing to do so would either result in an
    1502              :          * endless wait or an Assert() failure.
    1503              :          */
    1504           18 :         if (TransactionIdIsCurrentTransactionId(xid))
    1505            0 :             elog(ERROR, "waiting for ourselves");
    1506              : 
    1507           18 :         if (TransactionIdFollows(xid, cutoff))
    1508            0 :             continue;
    1509              : 
    1510           18 :         XactLockTableWait(xid, NULL, NULL, XLTW_None);
    1511              :     }
    1512              : 
    1513              :     /*
    1514              :      * All transactions we needed to finish finished - try to ensure there is
    1515              :      * another xl_running_xacts record in a timely manner, without having to
    1516              :      * wait for bgwriter or checkpointer to log one.  During recovery we can't
    1517              :      * enforce that, so we'll have to wait.
    1518              :      */
    1519           16 :     if (!RecoveryInProgress())
    1520              :     {
    1521              :         /*
    1522              :          * If the last transaction info was about specific database, so needs
    1523              :          * to be the next one - at least until we're in the CONSISTENT state.
    1524              :          */
    1525           16 :         LogStandbySnapshot(running->dbid);
    1526              :     }
    1527           16 : }
    1528              : 
    1529              : #define SnapBuildOnDiskConstantSize \
    1530              :     offsetof(SnapBuildOnDisk, builder)
    1531              : #define SnapBuildOnDiskNotChecksummedSize \
    1532              :     offsetof(SnapBuildOnDisk, version)
    1533              : 
    1534              : #define SNAPBUILD_MAGIC 0x51A1E001
    1535              : #define SNAPBUILD_VERSION 6
    1536              : 
    1537              : /*
    1538              :  * Store/Load a snapshot from disk, depending on the snapshot builder's state.
    1539              :  *
    1540              :  * Supposed to be used by external (i.e. not snapbuild.c) code that just read
    1541              :  * a record that's a potential location for a serialized snapshot.
    1542              :  */
    1543              : void
    1544           96 : SnapBuildSerializationPoint(SnapBuild *builder, XLogRecPtr lsn)
    1545              : {
    1546           96 :     if (builder->state < SNAPBUILD_CONSISTENT)
    1547            0 :         SnapBuildRestore(builder, lsn);
    1548              :     else
    1549           96 :         SnapBuildSerialize(builder, lsn);
    1550           96 : }
    1551              : 
    1552              : /*
    1553              :  * Serialize the snapshot 'builder' at the location 'lsn' if it hasn't already
    1554              :  * been done by another decoding process.
    1555              :  */
    1556              : static void
    1557          623 : SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
    1558              : {
    1559              :     Size        needed_length;
    1560          623 :     SnapBuildOnDisk *ondisk = NULL;
    1561          623 :     TransactionId *catchange_xip = NULL;
    1562              :     MemoryContext old_ctx;
    1563              :     size_t      catchange_xcnt;
    1564              :     char       *ondisk_c;
    1565              :     int         fd;
    1566              :     char        tmppath[MAXPGPATH];
    1567              :     char        path[MAXPGPATH];
    1568              :     int         ret;
    1569              :     struct stat stat_buf;
    1570              :     Size        sz;
    1571              : 
    1572              :     Assert(XLogRecPtrIsValid(lsn));
    1573              :     Assert(!XLogRecPtrIsValid(builder->last_serialized_snapshot) ||
    1574              :            builder->last_serialized_snapshot <= lsn);
    1575              : 
    1576              :     /*
    1577              :      * no point in serializing if we cannot continue to work immediately after
    1578              :      * restoring the snapshot
    1579              :      */
    1580          623 :     if (builder->state < SNAPBUILD_CONSISTENT)
    1581            0 :         return;
    1582              : 
    1583              :     /* consistent snapshots have no next phase */
    1584              :     Assert(builder->next_phase_at == InvalidTransactionId);
    1585              : 
    1586              :     /*
    1587              :      * We identify snapshots by the LSN they are valid for. We don't need to
    1588              :      * include timelines in the name as each LSN maps to exactly one timeline
    1589              :      * unless the user used pg_resetwal or similar. If a user did so, there's
    1590              :      * no hope continuing to decode anyway.
    1591              :      */
    1592          623 :     sprintf(path, "%s/%X-%X.snap",
    1593              :             PG_LOGICAL_SNAPSHOTS_DIR,
    1594          623 :             LSN_FORMAT_ARGS(lsn));
    1595              : 
    1596              :     /*
    1597              :      * first check whether some other backend already has written the snapshot
    1598              :      * for this LSN. It's perfectly fine if there's none, so we accept ENOENT
    1599              :      * as a valid state. Everything else is an unexpected error.
    1600              :      */
    1601          623 :     ret = stat(path, &stat_buf);
    1602              : 
    1603          623 :     if (ret != 0 && errno != ENOENT)
    1604            0 :         ereport(ERROR,
    1605              :                 (errcode_for_file_access(),
    1606              :                  errmsg("could not stat file \"%s\": %m", path)));
    1607              : 
    1608          623 :     else if (ret == 0)
    1609              :     {
    1610              :         /*
    1611              :          * somebody else has already serialized to this point, don't overwrite
    1612              :          * but remember location, so we don't need to read old data again.
    1613              :          *
    1614              :          * To be sure it has been synced to disk after the rename() from the
    1615              :          * tempfile filename to the real filename, we just repeat the fsync.
    1616              :          * That ought to be cheap because in most scenarios it should already
    1617              :          * be safely on disk.
    1618              :          */
    1619          288 :         fsync_fname(path, false);
    1620          288 :         fsync_fname(PG_LOGICAL_SNAPSHOTS_DIR, true);
    1621              : 
    1622          288 :         builder->last_serialized_snapshot = lsn;
    1623          288 :         goto out;
    1624              :     }
    1625              : 
    1626              :     /*
    1627              :      * there is an obvious race condition here between the time we stat(2) the
    1628              :      * file and us writing the file. But we rename the file into place
    1629              :      * atomically and all files created need to contain the same data anyway,
    1630              :      * so this is perfectly fine, although a bit of a resource waste. Locking
    1631              :      * seems like pointless complication.
    1632              :      */
    1633          335 :     elog(DEBUG1, "serializing snapshot to %s", path);
    1634              : 
    1635              :     /* to make sure only we will write to this tempfile, include pid */
    1636          335 :     sprintf(tmppath, "%s/%X-%X.snap.%d.tmp",
    1637              :             PG_LOGICAL_SNAPSHOTS_DIR,
    1638          335 :             LSN_FORMAT_ARGS(lsn), MyProcPid);
    1639              : 
    1640              :     /*
    1641              :      * Unlink temporary file if it already exists, needs to have been before a
    1642              :      * crash/error since we won't enter this function twice from within a
    1643              :      * single decoding slot/backend and the temporary file contains the pid of
    1644              :      * the current process.
    1645              :      */
    1646          335 :     if (unlink(tmppath) != 0 && errno != ENOENT)
    1647            0 :         ereport(ERROR,
    1648              :                 (errcode_for_file_access(),
    1649              :                  errmsg("could not remove file \"%s\": %m", tmppath)));
    1650              : 
    1651          335 :     old_ctx = MemoryContextSwitchTo(builder->context);
    1652              : 
    1653              :     /* Get the catalog modifying transactions that are yet not committed */
    1654          335 :     catchange_xip = ReorderBufferGetCatalogChangesXacts(builder->reorder);
    1655          335 :     catchange_xcnt = dclist_count(&builder->reorder->catchange_txns);
    1656              : 
    1657          335 :     needed_length = sizeof(SnapBuildOnDisk) +
    1658          335 :         sizeof(TransactionId) * (builder->committed.xcnt + catchange_xcnt);
    1659              : 
    1660          335 :     ondisk_c = palloc0(needed_length);
    1661          335 :     ondisk = (SnapBuildOnDisk *) ondisk_c;
    1662          335 :     ondisk->magic = SNAPBUILD_MAGIC;
    1663          335 :     ondisk->version = SNAPBUILD_VERSION;
    1664          335 :     ondisk->length = needed_length;
    1665          335 :     INIT_CRC32C(ondisk->checksum);
    1666          335 :     COMP_CRC32C(ondisk->checksum,
    1667              :                 ((char *) ondisk) + SnapBuildOnDiskNotChecksummedSize,
    1668              :                 SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
    1669          335 :     ondisk_c += sizeof(SnapBuildOnDisk);
    1670              : 
    1671          335 :     memcpy(&ondisk->builder, builder, sizeof(SnapBuild));
    1672              :     /* NULL-ify memory-only data */
    1673          335 :     ondisk->builder.context = NULL;
    1674          335 :     ondisk->builder.snapshot = NULL;
    1675          335 :     ondisk->builder.reorder = NULL;
    1676          335 :     ondisk->builder.committed.xip = NULL;
    1677          335 :     ondisk->builder.catchange.xip = NULL;
    1678              :     /* update catchange only on disk data */
    1679          335 :     ondisk->builder.catchange.xcnt = catchange_xcnt;
    1680              : 
    1681          335 :     COMP_CRC32C(ondisk->checksum,
    1682              :                 &ondisk->builder,
    1683              :                 sizeof(SnapBuild));
    1684              : 
    1685              :     /* copy committed xacts */
    1686          335 :     if (builder->committed.xcnt > 0)
    1687              :     {
    1688           62 :         sz = sizeof(TransactionId) * builder->committed.xcnt;
    1689           62 :         memcpy(ondisk_c, builder->committed.xip, sz);
    1690           62 :         COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
    1691           62 :         ondisk_c += sz;
    1692              :     }
    1693              : 
    1694              :     /* copy catalog modifying xacts */
    1695          335 :     if (catchange_xcnt > 0)
    1696              :     {
    1697            9 :         sz = sizeof(TransactionId) * catchange_xcnt;
    1698            9 :         memcpy(ondisk_c, catchange_xip, sz);
    1699            9 :         COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
    1700            9 :         ondisk_c += sz;
    1701              :     }
    1702              : 
    1703          335 :     FIN_CRC32C(ondisk->checksum);
    1704              : 
    1705              :     /* we have valid data now, open tempfile and write it there */
    1706          335 :     fd = OpenTransientFile(tmppath,
    1707              :                            O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
    1708          335 :     if (fd < 0)
    1709            0 :         ereport(ERROR,
    1710              :                 (errcode_for_file_access(),
    1711              :                  errmsg("could not open file \"%s\": %m", tmppath)));
    1712              : 
    1713          335 :     errno = 0;
    1714          335 :     pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_WRITE);
    1715          335 :     if ((write(fd, ondisk, needed_length)) != needed_length)
    1716              :     {
    1717            0 :         int         save_errno = errno;
    1718              : 
    1719            0 :         CloseTransientFile(fd);
    1720              : 
    1721              :         /* if write didn't set errno, assume problem is no disk space */
    1722            0 :         errno = save_errno ? save_errno : ENOSPC;
    1723            0 :         ereport(ERROR,
    1724              :                 (errcode_for_file_access(),
    1725              :                  errmsg("could not write to file \"%s\": %m", tmppath)));
    1726              :     }
    1727          335 :     pgstat_report_wait_end();
    1728              : 
    1729              :     /*
    1730              :      * fsync the file before renaming so that even if we crash after this we
    1731              :      * have either a fully valid file or nothing.
    1732              :      *
    1733              :      * It's safe to just ERROR on fsync() here because we'll retry the whole
    1734              :      * operation including the writes.
    1735              :      *
    1736              :      * TODO: Do the fsync() via checkpoints/restartpoints, doing it here has
    1737              :      * some noticeable overhead since it's performed synchronously during
    1738              :      * decoding?
    1739              :      */
    1740          335 :     pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_SYNC);
    1741          335 :     if (pg_fsync(fd) != 0)
    1742              :     {
    1743            0 :         int         save_errno = errno;
    1744              : 
    1745            0 :         CloseTransientFile(fd);
    1746            0 :         errno = save_errno;
    1747            0 :         ereport(ERROR,
    1748              :                 (errcode_for_file_access(),
    1749              :                  errmsg("could not fsync file \"%s\": %m", tmppath)));
    1750              :     }
    1751          335 :     pgstat_report_wait_end();
    1752              : 
    1753          335 :     if (CloseTransientFile(fd) != 0)
    1754            0 :         ereport(ERROR,
    1755              :                 (errcode_for_file_access(),
    1756              :                  errmsg("could not close file \"%s\": %m", tmppath)));
    1757              : 
    1758          335 :     fsync_fname(PG_LOGICAL_SNAPSHOTS_DIR, true);
    1759              : 
    1760              :     /*
    1761              :      * We may overwrite the work from some other backend, but that's ok, our
    1762              :      * snapshot is valid as well, we'll just have done some superfluous work.
    1763              :      */
    1764          335 :     if (rename(tmppath, path) != 0)
    1765              :     {
    1766            0 :         ereport(ERROR,
    1767              :                 (errcode_for_file_access(),
    1768              :                  errmsg("could not rename file \"%s\" to \"%s\": %m",
    1769              :                         tmppath, path)));
    1770              :     }
    1771              : 
    1772              :     /* make sure we persist */
    1773          335 :     fsync_fname(path, false);
    1774          335 :     fsync_fname(PG_LOGICAL_SNAPSHOTS_DIR, true);
    1775              : 
    1776              :     /*
    1777              :      * Now there's no way we can lose the dumped state anymore, remember this
    1778              :      * as a serialization point.
    1779              :      */
    1780          335 :     builder->last_serialized_snapshot = lsn;
    1781              : 
    1782          335 :     MemoryContextSwitchTo(old_ctx);
    1783              : 
    1784          623 : out:
    1785          623 :     ReorderBufferSetRestartPoint(builder->reorder,
    1786              :                                  builder->last_serialized_snapshot);
    1787              :     /* be tidy */
    1788          623 :     if (ondisk)
    1789          335 :         pfree(ondisk);
    1790          623 :     if (catchange_xip)
    1791            9 :         pfree(catchange_xip);
    1792              : }
    1793              : 
    1794              : /*
    1795              :  * Restore the logical snapshot file contents to 'ondisk'.
    1796              :  *
    1797              :  * 'context' is the memory context where the catalog modifying/committed xid
    1798              :  * will live.
    1799              :  * If 'missing_ok' is true, will not throw an error if the file is not found.
    1800              :  */
    1801              : bool
    1802           21 : SnapBuildRestoreSnapshot(SnapBuildOnDisk *ondisk, XLogRecPtr lsn,
    1803              :                          MemoryContext context, bool missing_ok)
    1804              : {
    1805              :     int         fd;
    1806              :     pg_crc32c   checksum;
    1807              :     Size        sz;
    1808              :     char        path[MAXPGPATH];
    1809              : 
    1810           21 :     sprintf(path, "%s/%X-%X.snap",
    1811              :             PG_LOGICAL_SNAPSHOTS_DIR,
    1812           21 :             LSN_FORMAT_ARGS(lsn));
    1813              : 
    1814           21 :     fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
    1815              : 
    1816           21 :     if (fd < 0)
    1817              :     {
    1818           11 :         if (missing_ok && errno == ENOENT)
    1819           11 :             return false;
    1820              : 
    1821            0 :         ereport(ERROR,
    1822              :                 (errcode_for_file_access(),
    1823              :                  errmsg("could not open file \"%s\": %m", path)));
    1824              :     }
    1825              : 
    1826              :     /* ----
    1827              :      * Make sure the snapshot had been stored safely to disk, that's normally
    1828              :      * cheap.
    1829              :      * Note that we do not need PANIC here, nobody will be able to use the
    1830              :      * slot without fsyncing, and saving it won't succeed without an fsync()
    1831              :      * either...
    1832              :      * ----
    1833              :      */
    1834           10 :     fsync_fname(path, false);
    1835           10 :     fsync_fname(PG_LOGICAL_SNAPSHOTS_DIR, true);
    1836              : 
    1837              :     /* read statically sized portion of snapshot */
    1838           10 :     SnapBuildRestoreContents(fd, ondisk, SnapBuildOnDiskConstantSize, path);
    1839              : 
    1840           10 :     if (ondisk->magic != SNAPBUILD_MAGIC)
    1841            0 :         ereport(ERROR,
    1842              :                 (errcode(ERRCODE_DATA_CORRUPTED),
    1843              :                  errmsg("snapbuild state file \"%s\" has wrong magic number: %u instead of %u",
    1844              :                         path, ondisk->magic, SNAPBUILD_MAGIC)));
    1845              : 
    1846           10 :     if (ondisk->version != SNAPBUILD_VERSION)
    1847            0 :         ereport(ERROR,
    1848              :                 (errcode(ERRCODE_DATA_CORRUPTED),
    1849              :                  errmsg("snapbuild state file \"%s\" has unsupported version: %u instead of %u",
    1850              :                         path, ondisk->version, SNAPBUILD_VERSION)));
    1851              : 
    1852           10 :     INIT_CRC32C(checksum);
    1853           10 :     COMP_CRC32C(checksum,
    1854              :                 ((char *) ondisk) + SnapBuildOnDiskNotChecksummedSize,
    1855              :                 SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
    1856              : 
    1857              :     /* read SnapBuild */
    1858           10 :     SnapBuildRestoreContents(fd, &ondisk->builder, sizeof(SnapBuild), path);
    1859           10 :     COMP_CRC32C(checksum, &ondisk->builder, sizeof(SnapBuild));
    1860              : 
    1861              :     /* restore committed xacts information */
    1862           10 :     if (ondisk->builder.committed.xcnt > 0)
    1863              :     {
    1864            4 :         sz = sizeof(TransactionId) * ondisk->builder.committed.xcnt;
    1865            4 :         ondisk->builder.committed.xip = MemoryContextAllocZero(context, sz);
    1866            4 :         SnapBuildRestoreContents(fd, ondisk->builder.committed.xip, sz, path);
    1867            4 :         COMP_CRC32C(checksum, ondisk->builder.committed.xip, sz);
    1868              :     }
    1869              : 
    1870              :     /* restore catalog modifying xacts information */
    1871           10 :     if (ondisk->builder.catchange.xcnt > 0)
    1872              :     {
    1873            5 :         sz = sizeof(TransactionId) * ondisk->builder.catchange.xcnt;
    1874            5 :         ondisk->builder.catchange.xip = MemoryContextAllocZero(context, sz);
    1875            5 :         SnapBuildRestoreContents(fd, ondisk->builder.catchange.xip, sz, path);
    1876            5 :         COMP_CRC32C(checksum, ondisk->builder.catchange.xip, sz);
    1877              :     }
    1878              : 
    1879           10 :     if (CloseTransientFile(fd) != 0)
    1880            0 :         ereport(ERROR,
    1881              :                 (errcode_for_file_access(),
    1882              :                  errmsg("could not close file \"%s\": %m", path)));
    1883              : 
    1884           10 :     FIN_CRC32C(checksum);
    1885              : 
    1886              :     /* verify checksum of what we've read */
    1887           10 :     if (!EQ_CRC32C(checksum, ondisk->checksum))
    1888            0 :         ereport(ERROR,
    1889              :                 (errcode(ERRCODE_DATA_CORRUPTED),
    1890              :                  errmsg("checksum mismatch for snapbuild state file \"%s\": is %u, should be %u",
    1891              :                         path, checksum, ondisk->checksum)));
    1892              : 
    1893           10 :     return true;
    1894              : }
    1895              : 
    1896              : /*
    1897              :  * Restore a snapshot into 'builder' if previously one has been stored at the
    1898              :  * location indicated by 'lsn'. Returns true if successful, false otherwise.
    1899              :  */
    1900              : static bool
    1901           19 : SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
    1902              : {
    1903              :     SnapBuildOnDisk ondisk;
    1904              : 
    1905              :     /* no point in loading a snapshot if we're already there */
    1906           19 :     if (builder->state == SNAPBUILD_CONSISTENT)
    1907            0 :         return false;
    1908              : 
    1909              :     /* validate and restore the snapshot to 'ondisk' */
    1910           19 :     if (!SnapBuildRestoreSnapshot(&ondisk, lsn, builder->context, true))
    1911           11 :         return false;
    1912              : 
    1913              :     /*
    1914              :      * ok, we now have a sensible snapshot here, figure out if it has more
    1915              :      * information than we have.
    1916              :      */
    1917              : 
    1918              :     /*
    1919              :      * We are only interested in consistent snapshots for now, comparing
    1920              :      * whether one incomplete snapshot is more "advanced" seems to be
    1921              :      * unnecessarily complex.
    1922              :      */
    1923            8 :     if (ondisk.builder.state < SNAPBUILD_CONSISTENT)
    1924            0 :         goto snapshot_not_interesting;
    1925              : 
    1926              :     /*
    1927              :      * Don't use a snapshot that requires an xmin that we cannot guarantee to
    1928              :      * be available.
    1929              :      */
    1930            8 :     if (TransactionIdPrecedes(ondisk.builder.xmin, builder->initial_xmin_horizon))
    1931            0 :         goto snapshot_not_interesting;
    1932              : 
    1933              :     /*
    1934              :      * Consistent snapshots have no next phase. Reset next_phase_at as it is
    1935              :      * possible that an old value may remain.
    1936              :      */
    1937              :     Assert(ondisk.builder.next_phase_at == InvalidTransactionId);
    1938            8 :     builder->next_phase_at = InvalidTransactionId;
    1939              : 
    1940              :     /* ok, we think the snapshot is sensible, copy over everything important */
    1941            8 :     builder->xmin = ondisk.builder.xmin;
    1942            8 :     builder->xmax = ondisk.builder.xmax;
    1943            8 :     builder->state = ondisk.builder.state;
    1944              : 
    1945            8 :     builder->committed.xcnt = ondisk.builder.committed.xcnt;
    1946              :     /* We only allocated/stored xcnt, not xcnt_space xids ! */
    1947              :     /* don't overwrite preallocated xip, if we don't have anything here */
    1948            8 :     if (builder->committed.xcnt > 0)
    1949              :     {
    1950            2 :         pfree(builder->committed.xip);
    1951            2 :         builder->committed.xcnt_space = ondisk.builder.committed.xcnt;
    1952            2 :         builder->committed.xip = ondisk.builder.committed.xip;
    1953              :     }
    1954            8 :     ondisk.builder.committed.xip = NULL;
    1955              : 
    1956              :     /* set catalog modifying transactions */
    1957            8 :     if (builder->catchange.xip)
    1958            0 :         pfree(builder->catchange.xip);
    1959            8 :     builder->catchange.xcnt = ondisk.builder.catchange.xcnt;
    1960            8 :     builder->catchange.xip = ondisk.builder.catchange.xip;
    1961            8 :     ondisk.builder.catchange.xip = NULL;
    1962              : 
    1963              :     /* our snapshot is not interesting anymore, build a new one */
    1964            8 :     if (builder->snapshot != NULL)
    1965              :     {
    1966            0 :         SnapBuildSnapDecRefcount(builder->snapshot);
    1967              :     }
    1968            8 :     builder->snapshot = SnapBuildBuildSnapshot(builder);
    1969            8 :     SnapBuildSnapIncRefcount(builder->snapshot);
    1970              : 
    1971            8 :     ReorderBufferSetRestartPoint(builder->reorder, lsn);
    1972              : 
    1973              :     Assert(builder->state == SNAPBUILD_CONSISTENT);
    1974              : 
    1975            8 :     ereport(LogicalDecodingLogLevel(),
    1976              :             errmsg("logical decoding found consistent point at %X/%08X",
    1977              :                    LSN_FORMAT_ARGS(lsn)),
    1978              :             errdetail("Logical decoding will begin using saved snapshot."));
    1979            8 :     return true;
    1980              : 
    1981            0 : snapshot_not_interesting:
    1982            0 :     if (ondisk.builder.committed.xip != NULL)
    1983            0 :         pfree(ondisk.builder.committed.xip);
    1984            0 :     if (ondisk.builder.catchange.xip != NULL)
    1985            0 :         pfree(ondisk.builder.catchange.xip);
    1986            0 :     return false;
    1987              : }
    1988              : 
    1989              : /*
    1990              :  * Read the contents of the serialized snapshot to 'dest'.
    1991              :  */
    1992              : static void
    1993           29 : SnapBuildRestoreContents(int fd, void *dest, Size size, const char *path)
    1994              : {
    1995              :     int         readBytes;
    1996              : 
    1997           29 :     pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ);
    1998           29 :     readBytes = read(fd, dest, size);
    1999           29 :     pgstat_report_wait_end();
    2000           29 :     if (readBytes != size)
    2001              :     {
    2002            0 :         int         save_errno = errno;
    2003              : 
    2004            0 :         CloseTransientFile(fd);
    2005              : 
    2006            0 :         if (readBytes < 0)
    2007              :         {
    2008            0 :             errno = save_errno;
    2009            0 :             ereport(ERROR,
    2010              :                     (errcode_for_file_access(),
    2011              :                      errmsg("could not read file \"%s\": %m", path)));
    2012              :         }
    2013              :         else
    2014            0 :             ereport(ERROR,
    2015              :                     (errcode(ERRCODE_DATA_CORRUPTED),
    2016              :                      errmsg("could not read file \"%s\": read %d of %zu",
    2017              :                             path, readBytes, size)));
    2018              :     }
    2019           29 : }
    2020              : 
    2021              : /*
    2022              :  * Remove all serialized snapshots that are not required anymore because no
    2023              :  * slot can need them. This doesn't actually have to run during a checkpoint,
    2024              :  * but it's a convenient point to schedule this.
    2025              :  *
    2026              :  * NB: We run this during checkpoints even if logical decoding is disabled so
    2027              :  * we cleanup old slots at some point after it got disabled.
    2028              :  */
    2029              : void
    2030         1939 : CheckPointSnapBuild(void)
    2031              : {
    2032              :     XLogRecPtr  cutoff;
    2033              :     XLogRecPtr  redo;
    2034              :     DIR        *snap_dir;
    2035              :     struct dirent *snap_de;
    2036              :     char        path[MAXPGPATH + sizeof(PG_LOGICAL_SNAPSHOTS_DIR)];
    2037              : 
    2038              :     /*
    2039              :      * We start off with a minimum of the last redo pointer. No new
    2040              :      * replication slot will start before that, so that's a safe upper bound
    2041              :      * for removal.
    2042              :      */
    2043         1939 :     redo = GetRedoRecPtr();
    2044              : 
    2045              :     /* now check for the restart ptrs from existing slots */
    2046         1939 :     cutoff = ReplicationSlotsComputeLogicalRestartLSN();
    2047              : 
    2048              :     /* don't start earlier than the restart lsn */
    2049         1939 :     if (redo < cutoff)
    2050            1 :         cutoff = redo;
    2051              : 
    2052         1939 :     snap_dir = AllocateDir(PG_LOGICAL_SNAPSHOTS_DIR);
    2053         6124 :     while ((snap_de = ReadDir(snap_dir, PG_LOGICAL_SNAPSHOTS_DIR)) != NULL)
    2054              :     {
    2055              :         uint32      hi;
    2056              :         uint32      lo;
    2057              :         XLogRecPtr  lsn;
    2058              :         PGFileType  de_type;
    2059              : 
    2060         4185 :         if (strcmp(snap_de->d_name, ".") == 0 ||
    2061         2246 :             strcmp(snap_de->d_name, "..") == 0)
    2062         3878 :             continue;
    2063              : 
    2064          307 :         snprintf(path, sizeof(path), "%s/%s", PG_LOGICAL_SNAPSHOTS_DIR, snap_de->d_name);
    2065          307 :         de_type = get_dirent_type(path, snap_de, false, DEBUG1);
    2066              : 
    2067          307 :         if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_REG)
    2068              :         {
    2069            0 :             elog(DEBUG1, "only regular files expected: %s", path);
    2070            0 :             continue;
    2071              :         }
    2072              : 
    2073              :         /*
    2074              :          * temporary filenames from SnapBuildSerialize() include the LSN and
    2075              :          * everything but are postfixed by .$pid.tmp. We can just remove them
    2076              :          * the same as other files because there can be none that are
    2077              :          * currently being written that are older than cutoff.
    2078              :          *
    2079              :          * We just log a message if a file doesn't fit the pattern, it's
    2080              :          * probably some editors lock/state file or similar...
    2081              :          */
    2082          307 :         if (sscanf(snap_de->d_name, "%X-%X.snap", &hi, &lo) != 2)
    2083              :         {
    2084            0 :             ereport(LOG,
    2085              :                     (errmsg("could not parse file name \"%s\"", path)));
    2086            0 :             continue;
    2087              :         }
    2088              : 
    2089          307 :         lsn = ((uint64) hi) << 32 | lo;
    2090              : 
    2091              :         /* check whether we still need it */
    2092          307 :         if (lsn < cutoff || !XLogRecPtrIsValid(cutoff))
    2093              :         {
    2094          203 :             elog(DEBUG1, "removing snapbuild snapshot %s", path);
    2095              : 
    2096              :             /*
    2097              :              * It's not particularly harmful, though strange, if we can't
    2098              :              * remove the file here. Don't prevent the checkpoint from
    2099              :              * completing, that'd be a cure worse than the disease.
    2100              :              */
    2101          203 :             if (unlink(path) < 0)
    2102              :             {
    2103            0 :                 ereport(LOG,
    2104              :                         (errcode_for_file_access(),
    2105              :                          errmsg("could not remove file \"%s\": %m",
    2106              :                                 path)));
    2107            0 :                 continue;
    2108              :             }
    2109              :         }
    2110              :     }
    2111         1939 :     FreeDir(snap_dir);
    2112         1939 : }
    2113              : 
    2114              : /*
    2115              :  * Check if a logical snapshot at the specified point has been serialized.
    2116              :  */
    2117              : bool
    2118           14 : SnapBuildSnapshotExists(XLogRecPtr lsn)
    2119              : {
    2120              :     char        path[MAXPGPATH];
    2121              :     int         ret;
    2122              :     struct stat stat_buf;
    2123              : 
    2124           14 :     sprintf(path, "%s/%X-%X.snap",
    2125              :             PG_LOGICAL_SNAPSHOTS_DIR,
    2126           14 :             LSN_FORMAT_ARGS(lsn));
    2127              : 
    2128           14 :     ret = stat(path, &stat_buf);
    2129              : 
    2130           14 :     if (ret != 0 && errno != ENOENT)
    2131            0 :         ereport(ERROR,
    2132              :                 (errcode_for_file_access(),
    2133              :                  errmsg("could not stat file \"%s\": %m", path)));
    2134              : 
    2135           14 :     return ret == 0;
    2136              : }
        

Generated by: LCOV version 2.0-1