LCOV - code coverage report
Current view: top level - src/backend/replication/logical - snapbuild.c (source / functions) Hit Total Coverage
Test: PostgreSQL 17devel Lines: 444 548 81.0 %
Date: 2024-04-19 16:11:40 Functions: 30 31 96.8 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * snapbuild.c
       4             :  *
       5             :  *    Infrastructure for building historic catalog snapshots based on contents
       6             :  *    of the WAL, for the purpose of decoding heapam.c style values in the
       7             :  *    WAL.
       8             :  *
       9             :  * NOTES:
      10             :  *
      11             :  * We build snapshots which can *only* be used to read catalog contents and we
      12             :  * do so by reading and interpreting the WAL stream. The aim is to build a
      13             :  * snapshot that behaves the same as a freshly taken MVCC snapshot would have
      14             :  * at the time the XLogRecord was generated.
      15             :  *
      16             :  * To build the snapshots we reuse the infrastructure built for Hot
      17             :  * Standby. The in-memory snapshots we build look different than HS' because
      18             :  * we have different needs. To successfully decode data from the WAL we only
      19             :  * need to access catalog tables and (sys|rel|cat)cache, not the actual user
      20             :  * tables since the data we decode is wholly contained in the WAL
      21             :  * records. Also, our snapshots need to be different in comparison to normal
      22             :  * MVCC ones because in contrast to those we cannot fully rely on the clog and
      23             :  * pg_subtrans for information about committed transactions because they might
      24             :  * commit in the future from the POV of the WAL entry we're currently
      25             :  * decoding. This definition has the advantage that we only need to prevent
      26             :  * removal of catalog rows, while normal table's rows can still be
      27             :  * removed. This is achieved by using the replication slot mechanism.
      28             :  *
      29             :  * As the percentage of transactions modifying the catalog normally is fairly
      30             :  * small in comparisons to ones only manipulating user data, we keep track of
      31             :  * the committed catalog modifying ones inside [xmin, xmax) instead of keeping
      32             :  * track of all running transactions like it's done in a normal snapshot. Note
      33             :  * that we're generally only looking at transactions that have acquired an
      34             :  * xid. That is we keep a list of transactions between snapshot->(xmin, xmax)
      35             :  * that we consider committed, everything else is considered aborted/in
      36             :  * progress. That also allows us not to care about subtransactions before they
      37             :  * have committed which means this module, in contrast to HS, doesn't have to
      38             :  * care about suboverflowed subtransactions and similar.
      39             :  *
      40             :  * One complexity of doing this is that to e.g. handle mixed DDL/DML
      41             :  * transactions we need Snapshots that see intermediate versions of the
      42             :  * catalog in a transaction. During normal operation this is achieved by using
      43             :  * CommandIds/cmin/cmax. The problem with that however is that for space
      44             :  * efficiency reasons, the cmin and cmax are not included in WAL records. We
      45             :  * cannot read the cmin/cmax from the tuple itself, either, because it is
      46             :  * reset on crash recovery. Even if we could, we could not decode combocids
      47             :  * which are only tracked in the original backend's memory. To work around
      48             :  * that, heapam writes an extra WAL record (XLOG_HEAP2_NEW_CID) every time a
      49             :  * catalog row is modified, which includes the cmin and cmax of the
      50             :  * tuple. During decoding, we insert the ctid->(cmin,cmax) mappings into the
      51             :  * reorder buffer, and use them at visibility checks instead of the cmin/cmax
      52             :  * on the tuple itself. Check the reorderbuffer.c's comment above
      53             :  * ResolveCminCmaxDuringDecoding() for details.
      54             :  *
      55             :  * To facilitate all this we need our own visibility routine, as the normal
      56             :  * ones are optimized for different usecases.
      57             :  *
      58             :  * To replace the normal catalog snapshots with decoding ones use the
      59             :  * SetupHistoricSnapshot() and TeardownHistoricSnapshot() functions.
      60             :  *
      61             :  *
      62             :  *
      63             :  * The snapbuild machinery is starting up in several stages, as illustrated
      64             :  * by the following graph describing the SnapBuild->state transitions:
      65             :  *
      66             :  *         +-------------------------+
      67             :  *    +----|         START           |-------------+
      68             :  *    |    +-------------------------+             |
      69             :  *    |                 |                          |
      70             :  *    |                 |                          |
      71             :  *    |        running_xacts #1                    |
      72             :  *    |                 |                          |
      73             :  *    |                 |                          |
      74             :  *    |                 v                          |
      75             :  *    |    +-------------------------+             v
      76             :  *    |    |   BUILDING_SNAPSHOT     |------------>|
      77             :  *    |    +-------------------------+             |
      78             :  *    |                 |                          |
      79             :  *    |                 |                          |
      80             :  *    | running_xacts #2, xacts from #1 finished   |
      81             :  *    |                 |                          |
      82             :  *    |                 |                          |
      83             :  *    |                 v                          |
      84             :  *    |    +-------------------------+             v
      85             :  *    |    |       FULL_SNAPSHOT     |------------>|
      86             :  *    |    +-------------------------+             |
      87             :  *    |                 |                          |
      88             :  * running_xacts        |                      saved snapshot
      89             :  * with zero xacts      |                 at running_xacts's lsn
      90             :  *    |                 |                          |
      91             :  *    | running_xacts with xacts from #2 finished  |
      92             :  *    |                 |                          |
      93             :  *    |                 v                          |
      94             :  *    |    +-------------------------+             |
      95             :  *    +--->|SNAPBUILD_CONSISTENT  |<------------+
      96             :  *         +-------------------------+
      97             :  *
      98             :  * Initially the machinery is in the START stage. When an xl_running_xacts
      99             :  * record is read that is sufficiently new (above the safe xmin horizon),
     100             :  * there's a state transition. If there were no running xacts when the
     101             :  * xl_running_xacts record was generated, we'll directly go into CONSISTENT
     102             :  * state, otherwise we'll switch to the BUILDING_SNAPSHOT state. Having a full
     103             :  * snapshot means that all transactions that start henceforth can be decoded
     104             :  * in their entirety, but transactions that started previously can't. In
     105             :  * FULL_SNAPSHOT we'll switch into CONSISTENT once all those previously
     106             :  * running transactions have committed or aborted.
     107             :  *
     108             :  * Only transactions that commit after CONSISTENT state has been reached will
     109             :  * be replayed, even though they might have started while still in
     110             :  * FULL_SNAPSHOT. That ensures that we'll reach a point where no previous
     111             :  * changes has been exported, but all the following ones will be. That point
     112             :  * is a convenient point to initialize replication from, which is why we
     113             :  * export a snapshot at that point, which *can* be used to read normal data.
     114             :  *
     115             :  * Copyright (c) 2012-2024, PostgreSQL Global Development Group
     116             :  *
     117             :  * IDENTIFICATION
     118             :  *    src/backend/replication/logical/snapbuild.c
     119             :  *
     120             :  *-------------------------------------------------------------------------
     121             :  */
     122             : 
     123             : #include "postgres.h"
     124             : 
     125             : #include <sys/stat.h>
     126             : #include <unistd.h>
     127             : 
     128             : #include "access/heapam_xlog.h"
     129             : #include "access/transam.h"
     130             : #include "access/xact.h"
     131             : #include "common/file_utils.h"
     132             : #include "miscadmin.h"
     133             : #include "pgstat.h"
     134             : #include "replication/logical.h"
     135             : #include "replication/reorderbuffer.h"
     136             : #include "replication/snapbuild.h"
     137             : #include "storage/fd.h"
     138             : #include "storage/lmgr.h"
     139             : #include "storage/proc.h"
     140             : #include "storage/procarray.h"
     141             : #include "storage/standby.h"
     142             : #include "utils/builtins.h"
     143             : #include "utils/memutils.h"
     144             : #include "utils/snapmgr.h"
     145             : #include "utils/snapshot.h"
     146             : 
     147             : /*
     148             :  * This struct contains the current state of the snapshot building
     149             :  * machinery. Besides a forward declaration in the header, it is not exposed
     150             :  * to the public, so we can easily change its contents.
     151             :  */
     152             : struct SnapBuild
     153             : {
     154             :     /* how far are we along building our first full snapshot */
     155             :     SnapBuildState state;
     156             : 
     157             :     /* private memory context used to allocate memory for this module. */
     158             :     MemoryContext context;
     159             : 
     160             :     /* all transactions < than this have committed/aborted */
     161             :     TransactionId xmin;
     162             : 
     163             :     /* all transactions >= than this are uncommitted */
     164             :     TransactionId xmax;
     165             : 
     166             :     /*
     167             :      * Don't replay commits from an LSN < this LSN. This can be set externally
     168             :      * but it will also be advanced (never retreat) from within snapbuild.c.
     169             :      */
     170             :     XLogRecPtr  start_decoding_at;
     171             : 
     172             :     /*
     173             :      * LSN at which two-phase decoding was enabled or LSN at which we found a
     174             :      * consistent point at the time of slot creation.
     175             :      *
     176             :      * The prepared transactions, that were skipped because previously
     177             :      * two-phase was not enabled or are not covered by initial snapshot, need
     178             :      * to be sent later along with commit prepared and they must be before
     179             :      * this point.
     180             :      */
     181             :     XLogRecPtr  two_phase_at;
     182             : 
     183             :     /*
     184             :      * Don't start decoding WAL until the "xl_running_xacts" information
     185             :      * indicates there are no running xids with an xid smaller than this.
     186             :      */
     187             :     TransactionId initial_xmin_horizon;
     188             : 
     189             :     /* Indicates if we are building full snapshot or just catalog one. */
     190             :     bool        building_full_snapshot;
     191             : 
     192             :     /*
     193             :      * Snapshot that's valid to see the catalog state seen at this moment.
     194             :      */
     195             :     Snapshot    snapshot;
     196             : 
     197             :     /*
     198             :      * LSN of the last location we are sure a snapshot has been serialized to.
     199             :      */
     200             :     XLogRecPtr  last_serialized_snapshot;
     201             : 
     202             :     /*
     203             :      * The reorderbuffer we need to update with usable snapshots et al.
     204             :      */
     205             :     ReorderBuffer *reorder;
     206             : 
     207             :     /*
     208             :      * TransactionId at which the next phase of initial snapshot building will
     209             :      * happen. InvalidTransactionId if not known (i.e. SNAPBUILD_START), or
     210             :      * when no next phase necessary (SNAPBUILD_CONSISTENT).
     211             :      */
     212             :     TransactionId next_phase_at;
     213             : 
     214             :     /*
     215             :      * Array of transactions which could have catalog changes that committed
     216             :      * between xmin and xmax.
     217             :      */
     218             :     struct
     219             :     {
     220             :         /* number of committed transactions */
     221             :         size_t      xcnt;
     222             : 
     223             :         /* available space for committed transactions */
     224             :         size_t      xcnt_space;
     225             : 
     226             :         /*
     227             :          * Until we reach a CONSISTENT state, we record commits of all
     228             :          * transactions, not just the catalog changing ones. Record when that
     229             :          * changes so we know we cannot export a snapshot safely anymore.
     230             :          */
     231             :         bool        includes_all_transactions;
     232             : 
     233             :         /*
     234             :          * Array of committed transactions that have modified the catalog.
     235             :          *
     236             :          * As this array is frequently modified we do *not* keep it in
     237             :          * xidComparator order. Instead we sort the array when building &
     238             :          * distributing a snapshot.
     239             :          *
     240             :          * TODO: It's unclear whether that reasoning has much merit. Every
     241             :          * time we add something here after becoming consistent will also
     242             :          * require distributing a snapshot. Storing them sorted would
     243             :          * potentially also make it easier to purge (but more complicated wrt
     244             :          * wraparound?). Should be improved if sorting while building the
     245             :          * snapshot shows up in profiles.
     246             :          */
     247             :         TransactionId *xip;
     248             :     }           committed;
     249             : 
     250             :     /*
     251             :      * Array of transactions and subtransactions that had modified catalogs
     252             :      * and were running when the snapshot was serialized.
     253             :      *
     254             :      * We normally rely on some WAL record types such as HEAP2_NEW_CID to know
     255             :      * if the transaction has changed the catalog. But it could happen that
     256             :      * the logical decoding decodes only the commit record of the transaction
     257             :      * after restoring the previously serialized snapshot in which case we
     258             :      * will miss adding the xid to the snapshot and end up looking at the
     259             :      * catalogs with the wrong snapshot.
     260             :      *
     261             :      * Now to avoid the above problem, we serialize the transactions that had
     262             :      * modified the catalogs and are still running at the time of snapshot
     263             :      * serialization. We fill this array while restoring the snapshot and then
     264             :      * refer it while decoding commit to ensure if the xact has modified the
     265             :      * catalog. We discard this array when all the xids in the list become old
     266             :      * enough to matter. See SnapBuildPurgeOlderTxn for details.
     267             :      */
     268             :     struct
     269             :     {
     270             :         /* number of transactions */
     271             :         size_t      xcnt;
     272             : 
     273             :         /* This array must be sorted in xidComparator order */
     274             :         TransactionId *xip;
     275             :     }           catchange;
     276             : };
     277             : 
     278             : /*
     279             :  * Starting a transaction -- which we need to do while exporting a snapshot --
     280             :  * removes knowledge about the previously used resowner, so we save it here.
     281             :  */
     282             : static ResourceOwner SavedResourceOwnerDuringExport = NULL;
     283             : static bool ExportInProgress = false;
     284             : 
     285             : /* ->committed and ->catchange manipulation */
     286             : static void SnapBuildPurgeOlderTxn(SnapBuild *builder);
     287             : 
     288             : /* snapshot building/manipulation/distribution functions */
     289             : static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder);
     290             : 
     291             : static void SnapBuildFreeSnapshot(Snapshot snap);
     292             : 
     293             : static void SnapBuildSnapIncRefcount(Snapshot snap);
     294             : 
     295             : static void SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn);
     296             : 
     297             : static inline bool SnapBuildXidHasCatalogChanges(SnapBuild *builder, TransactionId xid,
     298             :                                                  uint32 xinfo);
     299             : 
     300             : /* xlog reading helper functions for SnapBuildProcessRunningXacts */
     301             : static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running);
     302             : static void SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff);
     303             : 
     304             : /* serialization functions */
     305             : static void SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn);
     306             : static bool SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn);
     307             : static void SnapBuildRestoreContents(int fd, char *dest, Size size, const char *path);
     308             : 
     309             : /*
     310             :  * Allocate a new snapshot builder.
     311             :  *
     312             :  * xmin_horizon is the xid >= which we can be sure no catalog rows have been
     313             :  * removed, start_lsn is the LSN >= we want to replay commits.
     314             :  */
     315             : SnapBuild *
     316        1856 : AllocateSnapshotBuilder(ReorderBuffer *reorder,
     317             :                         TransactionId xmin_horizon,
     318             :                         XLogRecPtr start_lsn,
     319             :                         bool need_full_snapshot,
     320             :                         XLogRecPtr two_phase_at)
     321             : {
     322             :     MemoryContext context;
     323             :     MemoryContext oldcontext;
     324             :     SnapBuild  *builder;
     325             : 
     326             :     /* allocate memory in own context, to have better accountability */
     327        1856 :     context = AllocSetContextCreate(CurrentMemoryContext,
     328             :                                     "snapshot builder context",
     329             :                                     ALLOCSET_DEFAULT_SIZES);
     330        1856 :     oldcontext = MemoryContextSwitchTo(context);
     331             : 
     332        1856 :     builder = palloc0(sizeof(SnapBuild));
     333             : 
     334        1856 :     builder->state = SNAPBUILD_START;
     335        1856 :     builder->context = context;
     336        1856 :     builder->reorder = reorder;
     337             :     /* Other struct members initialized by zeroing via palloc0 above */
     338             : 
     339        1856 :     builder->committed.xcnt = 0;
     340        1856 :     builder->committed.xcnt_space = 128; /* arbitrary number */
     341        1856 :     builder->committed.xip =
     342        1856 :         palloc0(builder->committed.xcnt_space * sizeof(TransactionId));
     343        1856 :     builder->committed.includes_all_transactions = true;
     344             : 
     345        1856 :     builder->catchange.xcnt = 0;
     346        1856 :     builder->catchange.xip = NULL;
     347             : 
     348        1856 :     builder->initial_xmin_horizon = xmin_horizon;
     349        1856 :     builder->start_decoding_at = start_lsn;
     350        1856 :     builder->building_full_snapshot = need_full_snapshot;
     351        1856 :     builder->two_phase_at = two_phase_at;
     352             : 
     353        1856 :     MemoryContextSwitchTo(oldcontext);
     354             : 
     355        1856 :     return builder;
     356             : }
     357             : 
     358             : /*
     359             :  * Free a snapshot builder.
     360             :  */
     361             : void
     362        1532 : FreeSnapshotBuilder(SnapBuild *builder)
     363             : {
     364        1532 :     MemoryContext context = builder->context;
     365             : 
     366             :     /* free snapshot explicitly, that contains some error checking */
     367        1532 :     if (builder->snapshot != NULL)
     368             :     {
     369         380 :         SnapBuildSnapDecRefcount(builder->snapshot);
     370         380 :         builder->snapshot = NULL;
     371             :     }
     372             : 
     373             :     /* other resources are deallocated via memory context reset */
     374        1532 :     MemoryContextDelete(context);
     375        1532 : }
     376             : 
     377             : /*
     378             :  * Free an unreferenced snapshot that has previously been built by us.
     379             :  */
     380             : static void
     381        2238 : SnapBuildFreeSnapshot(Snapshot snap)
     382             : {
     383             :     /* make sure we don't get passed an external snapshot */
     384             :     Assert(snap->snapshot_type == SNAPSHOT_HISTORIC_MVCC);
     385             : 
     386             :     /* make sure nobody modified our snapshot */
     387             :     Assert(snap->curcid == FirstCommandId);
     388             :     Assert(!snap->suboverflowed);
     389             :     Assert(!snap->takenDuringRecovery);
     390             :     Assert(snap->regd_count == 0);
     391             : 
     392             :     /* slightly more likely, so it's checked even without c-asserts */
     393        2238 :     if (snap->copied)
     394           0 :         elog(ERROR, "cannot free a copied snapshot");
     395             : 
     396        2238 :     if (snap->active_count)
     397           0 :         elog(ERROR, "cannot free an active snapshot");
     398             : 
     399        2238 :     pfree(snap);
     400        2238 : }
     401             : 
     402             : /*
     403             :  * In which state of snapshot building are we?
     404             :  */
     405             : SnapBuildState
     406     4643166 : SnapBuildCurrentState(SnapBuild *builder)
     407             : {
     408     4643166 :     return builder->state;
     409             : }
     410             : 
     411             : /*
     412             :  * Return the LSN at which the two-phase decoding was first enabled.
     413             :  */
     414             : XLogRecPtr
     415          58 : SnapBuildGetTwoPhaseAt(SnapBuild *builder)
     416             : {
     417          58 :     return builder->two_phase_at;
     418             : }
     419             : 
     420             : /*
     421             :  * Set the LSN at which two-phase decoding is enabled.
     422             :  */
     423             : void
     424          12 : SnapBuildSetTwoPhaseAt(SnapBuild *builder, XLogRecPtr ptr)
     425             : {
     426          12 :     builder->two_phase_at = ptr;
     427          12 : }
     428             : 
     429             : /*
     430             :  * Should the contents of transaction ending at 'ptr' be decoded?
     431             :  */
     432             : bool
     433      982996 : SnapBuildXactNeedsSkip(SnapBuild *builder, XLogRecPtr ptr)
     434             : {
     435      982996 :     return ptr < builder->start_decoding_at;
     436             : }
     437             : 
     438             : /*
     439             :  * Increase refcount of a snapshot.
     440             :  *
     441             :  * This is used when handing out a snapshot to some external resource or when
     442             :  * adding a Snapshot as builder->snapshot.
     443             :  */
     444             : static void
     445        9642 : SnapBuildSnapIncRefcount(Snapshot snap)
     446             : {
     447        9642 :     snap->active_count++;
     448        9642 : }
     449             : 
     450             : /*
     451             :  * Decrease refcount of a snapshot and free if the refcount reaches zero.
     452             :  *
     453             :  * Externally visible, so that external resources that have been handed an
     454             :  * IncRef'ed Snapshot can adjust its refcount easily.
     455             :  */
     456             : void
     457        9296 : SnapBuildSnapDecRefcount(Snapshot snap)
     458             : {
     459             :     /* make sure we don't get passed an external snapshot */
     460             :     Assert(snap->snapshot_type == SNAPSHOT_HISTORIC_MVCC);
     461             : 
     462             :     /* make sure nobody modified our snapshot */
     463             :     Assert(snap->curcid == FirstCommandId);
     464             :     Assert(!snap->suboverflowed);
     465             :     Assert(!snap->takenDuringRecovery);
     466             : 
     467             :     Assert(snap->regd_count == 0);
     468             : 
     469             :     Assert(snap->active_count > 0);
     470             : 
     471             :     /* slightly more likely, so it's checked even without casserts */
     472        9296 :     if (snap->copied)
     473           0 :         elog(ERROR, "cannot free a copied snapshot");
     474             : 
     475        9296 :     snap->active_count--;
     476        9296 :     if (snap->active_count == 0)
     477        2238 :         SnapBuildFreeSnapshot(snap);
     478        9296 : }
     479             : 
     480             : /*
     481             :  * Build a new snapshot, based on currently committed catalog-modifying
     482             :  * transactions.
     483             :  *
     484             :  * In-progress transactions with catalog access are *not* allowed to modify
     485             :  * these snapshots; they have to copy them and fill in appropriate ->curcid
     486             :  * and ->subxip/subxcnt values.
     487             :  */
     488             : static Snapshot
     489        2908 : SnapBuildBuildSnapshot(SnapBuild *builder)
     490             : {
     491             :     Snapshot    snapshot;
     492             :     Size        ssize;
     493             : 
     494             :     Assert(builder->state >= SNAPBUILD_FULL_SNAPSHOT);
     495             : 
     496        2908 :     ssize = sizeof(SnapshotData)
     497        2908 :         + sizeof(TransactionId) * builder->committed.xcnt
     498        2908 :         + sizeof(TransactionId) * 1 /* toplevel xid */ ;
     499             : 
     500        2908 :     snapshot = MemoryContextAllocZero(builder->context, ssize);
     501             : 
     502        2908 :     snapshot->snapshot_type = SNAPSHOT_HISTORIC_MVCC;
     503             : 
     504             :     /*
     505             :      * We misuse the original meaning of SnapshotData's xip and subxip fields
     506             :      * to make the more fitting for our needs.
     507             :      *
     508             :      * In the 'xip' array we store transactions that have to be treated as
     509             :      * committed. Since we will only ever look at tuples from transactions
     510             :      * that have modified the catalog it's more efficient to store those few
     511             :      * that exist between xmin and xmax (frequently there are none).
     512             :      *
     513             :      * Snapshots that are used in transactions that have modified the catalog
     514             :      * also use the 'subxip' array to store their toplevel xid and all the
     515             :      * subtransaction xids so we can recognize when we need to treat rows as
     516             :      * visible that are not in xip but still need to be visible. Subxip only
     517             :      * gets filled when the transaction is copied into the context of a
     518             :      * catalog modifying transaction since we otherwise share a snapshot
     519             :      * between transactions. As long as a txn hasn't modified the catalog it
     520             :      * doesn't need to treat any uncommitted rows as visible, so there is no
     521             :      * need for those xids.
     522             :      *
     523             :      * Both arrays are qsort'ed so that we can use bsearch() on them.
     524             :      */
     525             :     Assert(TransactionIdIsNormal(builder->xmin));
     526             :     Assert(TransactionIdIsNormal(builder->xmax));
     527             : 
     528        2908 :     snapshot->xmin = builder->xmin;
     529        2908 :     snapshot->xmax = builder->xmax;
     530             : 
     531             :     /* store all transactions to be treated as committed by this snapshot */
     532        2908 :     snapshot->xip =
     533        2908 :         (TransactionId *) ((char *) snapshot + sizeof(SnapshotData));
     534        2908 :     snapshot->xcnt = builder->committed.xcnt;
     535        2908 :     memcpy(snapshot->xip,
     536        2908 :            builder->committed.xip,
     537        2908 :            builder->committed.xcnt * sizeof(TransactionId));
     538             : 
     539             :     /* sort so we can bsearch() */
     540        2908 :     qsort(snapshot->xip, snapshot->xcnt, sizeof(TransactionId), xidComparator);
     541             : 
     542             :     /*
     543             :      * Initially, subxip is empty, i.e. it's a snapshot to be used by
     544             :      * transactions that don't modify the catalog. Will be filled by
     545             :      * ReorderBufferCopySnap() if necessary.
     546             :      */
     547        2908 :     snapshot->subxcnt = 0;
     548        2908 :     snapshot->subxip = NULL;
     549             : 
     550        2908 :     snapshot->suboverflowed = false;
     551        2908 :     snapshot->takenDuringRecovery = false;
     552        2908 :     snapshot->copied = false;
     553        2908 :     snapshot->curcid = FirstCommandId;
     554        2908 :     snapshot->active_count = 0;
     555        2908 :     snapshot->regd_count = 0;
     556        2908 :     snapshot->snapXactCompletionCount = 0;
     557             : 
     558        2908 :     return snapshot;
     559             : }
     560             : 
     561             : /*
     562             :  * Build the initial slot snapshot and convert it to a normal snapshot that
     563             :  * is understood by HeapTupleSatisfiesMVCC.
     564             :  *
     565             :  * The snapshot will be usable directly in current transaction or exported
     566             :  * for loading in different transaction.
     567             :  */
     568             : Snapshot
     569         342 : SnapBuildInitialSnapshot(SnapBuild *builder)
     570             : {
     571             :     Snapshot    snap;
     572             :     TransactionId xid;
     573             :     TransactionId safeXid;
     574             :     TransactionId *newxip;
     575         342 :     int         newxcnt = 0;
     576             : 
     577             :     Assert(XactIsoLevel == XACT_REPEATABLE_READ);
     578             :     Assert(builder->building_full_snapshot);
     579             : 
     580             :     /* don't allow older snapshots */
     581         342 :     InvalidateCatalogSnapshot();    /* about to overwrite MyProc->xmin */
     582         342 :     if (HaveRegisteredOrActiveSnapshot())
     583           0 :         elog(ERROR, "cannot build an initial slot snapshot when snapshots exist");
     584             :     Assert(!HistoricSnapshotActive());
     585             : 
     586         342 :     if (builder->state != SNAPBUILD_CONSISTENT)
     587           0 :         elog(ERROR, "cannot build an initial slot snapshot before reaching a consistent state");
     588             : 
     589         342 :     if (!builder->committed.includes_all_transactions)
     590           0 :         elog(ERROR, "cannot build an initial slot snapshot, not all transactions are monitored anymore");
     591             : 
     592             :     /* so we don't overwrite the existing value */
     593         342 :     if (TransactionIdIsValid(MyProc->xmin))
     594           0 :         elog(ERROR, "cannot build an initial slot snapshot when MyProc->xmin already is valid");
     595             : 
     596         342 :     snap = SnapBuildBuildSnapshot(builder);
     597             : 
     598             :     /*
     599             :      * We know that snap->xmin is alive, enforced by the logical xmin
     600             :      * mechanism. Due to that we can do this without locks, we're only
     601             :      * changing our own value.
     602             :      *
     603             :      * Building an initial snapshot is expensive and an unenforced xmin
     604             :      * horizon would have bad consequences, therefore always double-check that
     605             :      * the horizon is enforced.
     606             :      */
     607         342 :     LWLockAcquire(ProcArrayLock, LW_SHARED);
     608         342 :     safeXid = GetOldestSafeDecodingTransactionId(false);
     609         342 :     LWLockRelease(ProcArrayLock);
     610             : 
     611         342 :     if (TransactionIdFollows(safeXid, snap->xmin))
     612           0 :         elog(ERROR, "cannot build an initial slot snapshot as oldest safe xid %u follows snapshot's xmin %u",
     613             :              safeXid, snap->xmin);
     614             : 
     615         342 :     MyProc->xmin = snap->xmin;
     616             : 
     617             :     /* allocate in transaction context */
     618             :     newxip = (TransactionId *)
     619         342 :         palloc(sizeof(TransactionId) * GetMaxSnapshotXidCount());
     620             : 
     621             :     /*
     622             :      * snapbuild.c builds transactions in an "inverted" manner, which means it
     623             :      * stores committed transactions in ->xip, not ones in progress. Build a
     624             :      * classical snapshot by marking all non-committed transactions as
     625             :      * in-progress. This can be expensive.
     626             :      */
     627         342 :     for (xid = snap->xmin; NormalTransactionIdPrecedes(xid, snap->xmax);)
     628             :     {
     629             :         void       *test;
     630             : 
     631             :         /*
     632             :          * Check whether transaction committed using the decoding snapshot
     633             :          * meaning of ->xip.
     634             :          */
     635           0 :         test = bsearch(&xid, snap->xip, snap->xcnt,
     636             :                        sizeof(TransactionId), xidComparator);
     637             : 
     638           0 :         if (test == NULL)
     639             :         {
     640           0 :             if (newxcnt >= GetMaxSnapshotXidCount())
     641           0 :                 ereport(ERROR,
     642             :                         (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
     643             :                          errmsg("initial slot snapshot too large")));
     644             : 
     645           0 :             newxip[newxcnt++] = xid;
     646             :         }
     647             : 
     648           0 :         TransactionIdAdvance(xid);
     649             :     }
     650             : 
     651             :     /* adjust remaining snapshot fields as needed */
     652         342 :     snap->snapshot_type = SNAPSHOT_MVCC;
     653         342 :     snap->xcnt = newxcnt;
     654         342 :     snap->xip = newxip;
     655             : 
     656         342 :     return snap;
     657             : }
     658             : 
     659             : /*
     660             :  * Export a snapshot so it can be set in another session with SET TRANSACTION
     661             :  * SNAPSHOT.
     662             :  *
     663             :  * For that we need to start a transaction in the current backend as the
     664             :  * importing side checks whether the source transaction is still open to make
     665             :  * sure the xmin horizon hasn't advanced since then.
     666             :  */
     667             : const char *
     668           0 : SnapBuildExportSnapshot(SnapBuild *builder)
     669             : {
     670             :     Snapshot    snap;
     671             :     char       *snapname;
     672             : 
     673           0 :     if (IsTransactionOrTransactionBlock())
     674           0 :         elog(ERROR, "cannot export a snapshot from within a transaction");
     675             : 
     676           0 :     if (SavedResourceOwnerDuringExport)
     677           0 :         elog(ERROR, "can only export one snapshot at a time");
     678             : 
     679           0 :     SavedResourceOwnerDuringExport = CurrentResourceOwner;
     680           0 :     ExportInProgress = true;
     681             : 
     682           0 :     StartTransactionCommand();
     683             : 
     684             :     /* There doesn't seem to a nice API to set these */
     685           0 :     XactIsoLevel = XACT_REPEATABLE_READ;
     686           0 :     XactReadOnly = true;
     687             : 
     688           0 :     snap = SnapBuildInitialSnapshot(builder);
     689             : 
     690             :     /*
     691             :      * now that we've built a plain snapshot, make it active and use the
     692             :      * normal mechanisms for exporting it
     693             :      */
     694           0 :     snapname = ExportSnapshot(snap);
     695             : 
     696           0 :     ereport(LOG,
     697             :             (errmsg_plural("exported logical decoding snapshot: \"%s\" with %u transaction ID",
     698             :                            "exported logical decoding snapshot: \"%s\" with %u transaction IDs",
     699             :                            snap->xcnt,
     700             :                            snapname, snap->xcnt)));
     701           0 :     return snapname;
     702             : }
     703             : 
     704             : /*
     705             :  * Ensure there is a snapshot and if not build one for current transaction.
     706             :  */
     707             : Snapshot
     708          16 : SnapBuildGetOrBuildSnapshot(SnapBuild *builder)
     709             : {
     710             :     Assert(builder->state == SNAPBUILD_CONSISTENT);
     711             : 
     712             :     /* only build a new snapshot if we don't have a prebuilt one */
     713          16 :     if (builder->snapshot == NULL)
     714             :     {
     715           2 :         builder->snapshot = SnapBuildBuildSnapshot(builder);
     716             :         /* increase refcount for the snapshot builder */
     717           2 :         SnapBuildSnapIncRefcount(builder->snapshot);
     718             :     }
     719             : 
     720          16 :     return builder->snapshot;
     721             : }
     722             : 
     723             : /*
     724             :  * Reset a previously SnapBuildExportSnapshot()'ed snapshot if there is
     725             :  * any. Aborts the previously started transaction and resets the resource
     726             :  * owner back to its original value.
     727             :  */
     728             : void
     729        8930 : SnapBuildClearExportedSnapshot(void)
     730             : {
     731             :     ResourceOwner tmpResOwner;
     732             : 
     733             :     /* nothing exported, that is the usual case */
     734        8930 :     if (!ExportInProgress)
     735        8930 :         return;
     736             : 
     737           0 :     if (!IsTransactionState())
     738           0 :         elog(ERROR, "clearing exported snapshot in wrong transaction state");
     739             : 
     740             :     /*
     741             :      * AbortCurrentTransaction() takes care of resetting the snapshot state,
     742             :      * so remember SavedResourceOwnerDuringExport.
     743             :      */
     744           0 :     tmpResOwner = SavedResourceOwnerDuringExport;
     745             : 
     746             :     /* make sure nothing could have ever happened */
     747           0 :     AbortCurrentTransaction();
     748             : 
     749           0 :     CurrentResourceOwner = tmpResOwner;
     750             : }
     751             : 
     752             : /*
     753             :  * Clear snapshot export state during transaction abort.
     754             :  */
     755             : void
     756       45124 : SnapBuildResetExportedSnapshotState(void)
     757             : {
     758       45124 :     SavedResourceOwnerDuringExport = NULL;
     759       45124 :     ExportInProgress = false;
     760       45124 : }
     761             : 
     762             : /*
     763             :  * Handle the effects of a single heap change, appropriate to the current state
     764             :  * of the snapshot builder and returns whether changes made at (xid, lsn) can
     765             :  * be decoded.
     766             :  */
     767             : bool
     768     3216568 : SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
     769             : {
     770             :     /*
     771             :      * We can't handle data in transactions if we haven't built a snapshot
     772             :      * yet, so don't store them.
     773             :      */
     774     3216568 :     if (builder->state < SNAPBUILD_FULL_SNAPSHOT)
     775           0 :         return false;
     776             : 
     777             :     /*
     778             :      * No point in keeping track of changes in transactions that we don't have
     779             :      * enough information about to decode. This means that they started before
     780             :      * we got into the SNAPBUILD_FULL_SNAPSHOT state.
     781             :      */
     782     3216574 :     if (builder->state < SNAPBUILD_CONSISTENT &&
     783           6 :         TransactionIdPrecedes(xid, builder->next_phase_at))
     784           0 :         return false;
     785             : 
     786             :     /*
     787             :      * If the reorderbuffer doesn't yet have a snapshot, add one now, it will
     788             :      * be needed to decode the change we're currently processing.
     789             :      */
     790     3216568 :     if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
     791             :     {
     792             :         /* only build a new snapshot if we don't have a prebuilt one */
     793        5096 :         if (builder->snapshot == NULL)
     794             :         {
     795         618 :             builder->snapshot = SnapBuildBuildSnapshot(builder);
     796             :             /* increase refcount for the snapshot builder */
     797         618 :             SnapBuildSnapIncRefcount(builder->snapshot);
     798             :         }
     799             : 
     800             :         /*
     801             :          * Increase refcount for the transaction we're handing the snapshot
     802             :          * out to.
     803             :          */
     804        5096 :         SnapBuildSnapIncRefcount(builder->snapshot);
     805        5096 :         ReorderBufferSetBaseSnapshot(builder->reorder, xid, lsn,
     806             :                                      builder->snapshot);
     807             :     }
     808             : 
     809     3216568 :     return true;
     810             : }
     811             : 
     812             : /*
     813             :  * Do CommandId/combo CID handling after reading an xl_heap_new_cid record.
     814             :  * This implies that a transaction has done some form of write to system
     815             :  * catalogs.
     816             :  */
     817             : void
     818       42644 : SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
     819             :                        XLogRecPtr lsn, xl_heap_new_cid *xlrec)
     820             : {
     821             :     CommandId   cid;
     822             : 
     823             :     /*
     824             :      * we only log new_cid's if a catalog tuple was modified, so mark the
     825             :      * transaction as containing catalog modifications
     826             :      */
     827       42644 :     ReorderBufferXidSetCatalogChanges(builder->reorder, xid, lsn);
     828             : 
     829       42644 :     ReorderBufferAddNewTupleCids(builder->reorder, xlrec->top_xid, lsn,
     830             :                                  xlrec->target_locator, xlrec->target_tid,
     831             :                                  xlrec->cmin, xlrec->cmax,
     832             :                                  xlrec->combocid);
     833             : 
     834             :     /* figure out new command id */
     835       42644 :     if (xlrec->cmin != InvalidCommandId &&
     836       36542 :         xlrec->cmax != InvalidCommandId)
     837        6118 :         cid = Max(xlrec->cmin, xlrec->cmax);
     838       36526 :     else if (xlrec->cmax != InvalidCommandId)
     839        6102 :         cid = xlrec->cmax;
     840       30424 :     else if (xlrec->cmin != InvalidCommandId)
     841       30424 :         cid = xlrec->cmin;
     842             :     else
     843             :     {
     844           0 :         cid = InvalidCommandId; /* silence compiler */
     845           0 :         elog(ERROR, "xl_heap_new_cid record without a valid CommandId");
     846             :     }
     847             : 
     848       42644 :     ReorderBufferAddNewCommandId(builder->reorder, xid, lsn, cid + 1);
     849       42644 : }
     850             : 
     851             : /*
     852             :  * Add a new Snapshot to all transactions we're decoding that currently are
     853             :  * in-progress so they can see new catalog contents made by the transaction
     854             :  * that just committed. This is necessary because those in-progress
     855             :  * transactions will use the new catalog's contents from here on (at the very
     856             :  * least everything they do needs to be compatible with newer catalog
     857             :  * contents).
     858             :  */
     859             : static void
     860        1932 : SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
     861             : {
     862             :     dlist_iter  txn_i;
     863             :     ReorderBufferTXN *txn;
     864             : 
     865             :     /*
     866             :      * Iterate through all toplevel transactions. This can include
     867             :      * subtransactions which we just don't yet know to be that, but that's
     868             :      * fine, they will just get an unnecessary snapshot queued.
     869             :      */
     870        3920 :     dlist_foreach(txn_i, &builder->reorder->toplevel_by_lsn)
     871             :     {
     872        1988 :         txn = dlist_container(ReorderBufferTXN, node, txn_i.cur);
     873             : 
     874             :         Assert(TransactionIdIsValid(txn->xid));
     875             : 
     876             :         /*
     877             :          * If we don't have a base snapshot yet, there are no changes in this
     878             :          * transaction which in turn implies we don't yet need a snapshot at
     879             :          * all. We'll add a snapshot when the first change gets queued.
     880             :          *
     881             :          * NB: This works correctly even for subtransactions because
     882             :          * ReorderBufferAssignChild() takes care to transfer the base snapshot
     883             :          * to the top-level transaction, and while iterating the changequeue
     884             :          * we'll get the change from the subtxn.
     885             :          */
     886        1988 :         if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, txn->xid))
     887           4 :             continue;
     888             : 
     889             :         /*
     890             :          * We don't need to add snapshot to prepared transactions as they
     891             :          * should not see the new catalog contents.
     892             :          */
     893        1984 :         if (rbtxn_prepared(txn) || rbtxn_skip_prepared(txn))
     894          44 :             continue;
     895             : 
     896        1940 :         elog(DEBUG2, "adding a new snapshot to %u at %X/%X",
     897             :              txn->xid, LSN_FORMAT_ARGS(lsn));
     898             : 
     899             :         /*
     900             :          * increase the snapshot's refcount for the transaction we are handing
     901             :          * it out to
     902             :          */
     903        1940 :         SnapBuildSnapIncRefcount(builder->snapshot);
     904        1940 :         ReorderBufferAddSnapshot(builder->reorder, txn->xid, lsn,
     905             :                                  builder->snapshot);
     906             :     }
     907        1932 : }
     908             : 
     909             : /*
     910             :  * Keep track of a new catalog changing transaction that has committed.
     911             :  */
     912             : static void
     913        1950 : SnapBuildAddCommittedTxn(SnapBuild *builder, TransactionId xid)
     914             : {
     915             :     Assert(TransactionIdIsValid(xid));
     916             : 
     917        1950 :     if (builder->committed.xcnt == builder->committed.xcnt_space)
     918             :     {
     919           0 :         builder->committed.xcnt_space = builder->committed.xcnt_space * 2 + 1;
     920             : 
     921           0 :         elog(DEBUG1, "increasing space for committed transactions to %u",
     922             :              (uint32) builder->committed.xcnt_space);
     923             : 
     924           0 :         builder->committed.xip = repalloc(builder->committed.xip,
     925           0 :                                           builder->committed.xcnt_space * sizeof(TransactionId));
     926             :     }
     927             : 
     928             :     /*
     929             :      * TODO: It might make sense to keep the array sorted here instead of
     930             :      * doing it every time we build a new snapshot. On the other hand this
     931             :      * gets called repeatedly when a transaction with subtransactions commits.
     932             :      */
     933        1950 :     builder->committed.xip[builder->committed.xcnt++] = xid;
     934        1950 : }
     935             : 
     936             : /*
     937             :  * Remove knowledge about transactions we treat as committed or containing catalog
     938             :  * changes that are smaller than ->xmin. Those won't ever get checked via
     939             :  * the ->committed or ->catchange array, respectively. The committed xids will
     940             :  * get checked via the clog machinery.
     941             :  *
     942             :  * We can ideally remove the transaction from catchange array once it is
     943             :  * finished (committed/aborted) but that could be costly as we need to maintain
     944             :  * the xids order in the array.
     945             :  */
     946             : static void
     947         618 : SnapBuildPurgeOlderTxn(SnapBuild *builder)
     948             : {
     949             :     int         off;
     950             :     TransactionId *workspace;
     951         618 :     int         surviving_xids = 0;
     952             : 
     953             :     /* not ready yet */
     954         618 :     if (!TransactionIdIsNormal(builder->xmin))
     955           0 :         return;
     956             : 
     957             :     /* TODO: Neater algorithm than just copying and iterating? */
     958             :     workspace =
     959         618 :         MemoryContextAlloc(builder->context,
     960         618 :                            builder->committed.xcnt * sizeof(TransactionId));
     961             : 
     962             :     /* copy xids that still are interesting to workspace */
     963        1002 :     for (off = 0; off < builder->committed.xcnt; off++)
     964             :     {
     965         384 :         if (NormalTransactionIdPrecedes(builder->committed.xip[off],
     966             :                                         builder->xmin))
     967             :             ;                   /* remove */
     968             :         else
     969           0 :             workspace[surviving_xids++] = builder->committed.xip[off];
     970             :     }
     971             : 
     972             :     /* copy workspace back to persistent state */
     973         618 :     memcpy(builder->committed.xip, workspace,
     974             :            surviving_xids * sizeof(TransactionId));
     975             : 
     976         618 :     elog(DEBUG3, "purged committed transactions from %u to %u, xmin: %u, xmax: %u",
     977             :          (uint32) builder->committed.xcnt, (uint32) surviving_xids,
     978             :          builder->xmin, builder->xmax);
     979         618 :     builder->committed.xcnt = surviving_xids;
     980             : 
     981         618 :     pfree(workspace);
     982             : 
     983             :     /*
     984             :      * Purge xids in ->catchange as well. The purged array must also be sorted
     985             :      * in xidComparator order.
     986             :      */
     987         618 :     if (builder->catchange.xcnt > 0)
     988             :     {
     989             :         /*
     990             :          * Since catchange.xip is sorted, we find the lower bound of xids that
     991             :          * are still interesting.
     992             :          */
     993          14 :         for (off = 0; off < builder->catchange.xcnt; off++)
     994             :         {
     995          10 :             if (TransactionIdFollowsOrEquals(builder->catchange.xip[off],
     996             :                                              builder->xmin))
     997           2 :                 break;
     998             :         }
     999             : 
    1000           6 :         surviving_xids = builder->catchange.xcnt - off;
    1001             : 
    1002           6 :         if (surviving_xids > 0)
    1003             :         {
    1004           2 :             memmove(builder->catchange.xip, &(builder->catchange.xip[off]),
    1005             :                     surviving_xids * sizeof(TransactionId));
    1006             :         }
    1007             :         else
    1008             :         {
    1009           4 :             pfree(builder->catchange.xip);
    1010           4 :             builder->catchange.xip = NULL;
    1011             :         }
    1012             : 
    1013           6 :         elog(DEBUG3, "purged catalog modifying transactions from %u to %u, xmin: %u, xmax: %u",
    1014             :              (uint32) builder->catchange.xcnt, (uint32) surviving_xids,
    1015             :              builder->xmin, builder->xmax);
    1016           6 :         builder->catchange.xcnt = surviving_xids;
    1017             :     }
    1018             : }
    1019             : 
    1020             : /*
    1021             :  * Handle everything that needs to be done when a transaction commits
    1022             :  */
    1023             : void
    1024        5032 : SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
    1025             :                    int nsubxacts, TransactionId *subxacts, uint32 xinfo)
    1026             : {
    1027             :     int         nxact;
    1028             : 
    1029        5032 :     bool        needs_snapshot = false;
    1030        5032 :     bool        needs_timetravel = false;
    1031        5032 :     bool        sub_needs_timetravel = false;
    1032             : 
    1033        5032 :     TransactionId xmax = xid;
    1034             : 
    1035             :     /*
    1036             :      * Transactions preceding BUILDING_SNAPSHOT will neither be decoded, nor
    1037             :      * will they be part of a snapshot.  So we don't need to record anything.
    1038             :      */
    1039        5032 :     if (builder->state == SNAPBUILD_START ||
    1040        5032 :         (builder->state == SNAPBUILD_BUILDING_SNAPSHOT &&
    1041           0 :          TransactionIdPrecedes(xid, builder->next_phase_at)))
    1042             :     {
    1043             :         /* ensure that only commits after this are getting replayed */
    1044           0 :         if (builder->start_decoding_at <= lsn)
    1045           0 :             builder->start_decoding_at = lsn + 1;
    1046           0 :         return;
    1047             :     }
    1048             : 
    1049        5032 :     if (builder->state < SNAPBUILD_CONSISTENT)
    1050             :     {
    1051             :         /* ensure that only commits after this are getting replayed */
    1052          10 :         if (builder->start_decoding_at <= lsn)
    1053           4 :             builder->start_decoding_at = lsn + 1;
    1054             : 
    1055             :         /*
    1056             :          * If building an exportable snapshot, force xid to be tracked, even
    1057             :          * if the transaction didn't modify the catalog.
    1058             :          */
    1059          10 :         if (builder->building_full_snapshot)
    1060             :         {
    1061           0 :             needs_timetravel = true;
    1062             :         }
    1063             :     }
    1064             : 
    1065        7496 :     for (nxact = 0; nxact < nsubxacts; nxact++)
    1066             :     {
    1067        2464 :         TransactionId subxid = subxacts[nxact];
    1068             : 
    1069             :         /*
    1070             :          * Add subtransaction to base snapshot if catalog modifying, we don't
    1071             :          * distinguish to toplevel transactions there.
    1072             :          */
    1073        2464 :         if (SnapBuildXidHasCatalogChanges(builder, subxid, xinfo))
    1074             :         {
    1075          18 :             sub_needs_timetravel = true;
    1076          18 :             needs_snapshot = true;
    1077             : 
    1078          18 :             elog(DEBUG1, "found subtransaction %u:%u with catalog changes",
    1079             :                  xid, subxid);
    1080             : 
    1081          18 :             SnapBuildAddCommittedTxn(builder, subxid);
    1082             : 
    1083          18 :             if (NormalTransactionIdFollows(subxid, xmax))
    1084          18 :                 xmax = subxid;
    1085             :         }
    1086             : 
    1087             :         /*
    1088             :          * If we're forcing timetravel we also need visibility information
    1089             :          * about subtransaction, so keep track of subtransaction's state, even
    1090             :          * if not catalog modifying.  Don't need to distribute a snapshot in
    1091             :          * that case.
    1092             :          */
    1093        2446 :         else if (needs_timetravel)
    1094             :         {
    1095           0 :             SnapBuildAddCommittedTxn(builder, subxid);
    1096           0 :             if (NormalTransactionIdFollows(subxid, xmax))
    1097           0 :                 xmax = subxid;
    1098             :         }
    1099             :     }
    1100             : 
    1101             :     /* if top-level modified catalog, it'll need a snapshot */
    1102        5032 :     if (SnapBuildXidHasCatalogChanges(builder, xid, xinfo))
    1103             :     {
    1104        1930 :         elog(DEBUG2, "found top level transaction %u, with catalog changes",
    1105             :              xid);
    1106        1930 :         needs_snapshot = true;
    1107        1930 :         needs_timetravel = true;
    1108        1930 :         SnapBuildAddCommittedTxn(builder, xid);
    1109             :     }
    1110        3102 :     else if (sub_needs_timetravel)
    1111             :     {
    1112             :         /* track toplevel txn as well, subxact alone isn't meaningful */
    1113           2 :         elog(DEBUG2, "forced transaction %u to do timetravel due to one of its subtransactions",
    1114             :              xid);
    1115           2 :         needs_timetravel = true;
    1116           2 :         SnapBuildAddCommittedTxn(builder, xid);
    1117             :     }
    1118        3100 :     else if (needs_timetravel)
    1119             :     {
    1120           0 :         elog(DEBUG2, "forced transaction %u to do timetravel", xid);
    1121             : 
    1122           0 :         SnapBuildAddCommittedTxn(builder, xid);
    1123             :     }
    1124             : 
    1125        5032 :     if (!needs_timetravel)
    1126             :     {
    1127             :         /* record that we cannot export a general snapshot anymore */
    1128        3100 :         builder->committed.includes_all_transactions = false;
    1129             :     }
    1130             : 
    1131             :     Assert(!needs_snapshot || needs_timetravel);
    1132             : 
    1133             :     /*
    1134             :      * Adjust xmax of the snapshot builder, we only do that for committed,
    1135             :      * catalog modifying, transactions, everything else isn't interesting for
    1136             :      * us since we'll never look at the respective rows.
    1137             :      */
    1138        5032 :     if (needs_timetravel &&
    1139        3864 :         (!TransactionIdIsValid(builder->xmax) ||
    1140        1932 :          TransactionIdFollowsOrEquals(xmax, builder->xmax)))
    1141             :     {
    1142        1932 :         builder->xmax = xmax;
    1143        1932 :         TransactionIdAdvance(builder->xmax);
    1144             :     }
    1145             : 
    1146             :     /* if there's any reason to build a historic snapshot, do so now */
    1147        5032 :     if (needs_snapshot)
    1148             :     {
    1149             :         /*
    1150             :          * If we haven't built a complete snapshot yet there's no need to hand
    1151             :          * it out, it wouldn't (and couldn't) be used anyway.
    1152             :          */
    1153        1932 :         if (builder->state < SNAPBUILD_FULL_SNAPSHOT)
    1154           0 :             return;
    1155             : 
    1156             :         /*
    1157             :          * Decrease the snapshot builder's refcount of the old snapshot, note
    1158             :          * that it still will be used if it has been handed out to the
    1159             :          * reorderbuffer earlier.
    1160             :          */
    1161        1932 :         if (builder->snapshot)
    1162        1920 :             SnapBuildSnapDecRefcount(builder->snapshot);
    1163             : 
    1164        1932 :         builder->snapshot = SnapBuildBuildSnapshot(builder);
    1165             : 
    1166             :         /* we might need to execute invalidations, add snapshot */
    1167        1932 :         if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
    1168             :         {
    1169          40 :             SnapBuildSnapIncRefcount(builder->snapshot);
    1170          40 :             ReorderBufferSetBaseSnapshot(builder->reorder, xid, lsn,
    1171             :                                          builder->snapshot);
    1172             :         }
    1173             : 
    1174             :         /* refcount of the snapshot builder for the new snapshot */
    1175        1932 :         SnapBuildSnapIncRefcount(builder->snapshot);
    1176             : 
    1177             :         /* add a new catalog snapshot to all currently running transactions */
    1178        1932 :         SnapBuildDistributeNewCatalogSnapshot(builder, lsn);
    1179             :     }
    1180             : }
    1181             : 
    1182             : /*
    1183             :  * Check the reorder buffer and the snapshot to see if the given transaction has
    1184             :  * modified catalogs.
    1185             :  */
    1186             : static inline bool
    1187        7496 : SnapBuildXidHasCatalogChanges(SnapBuild *builder, TransactionId xid,
    1188             :                               uint32 xinfo)
    1189             : {
    1190        7496 :     if (ReorderBufferXidHasCatalogChanges(builder->reorder, xid))
    1191        1940 :         return true;
    1192             : 
    1193             :     /*
    1194             :      * The transactions that have changed catalogs must have invalidation
    1195             :      * info.
    1196             :      */
    1197        5556 :     if (!(xinfo & XACT_XINFO_HAS_INVALS))
    1198        5540 :         return false;
    1199             : 
    1200             :     /* Check the catchange XID array */
    1201          24 :     return ((builder->catchange.xcnt > 0) &&
    1202           8 :             (bsearch(&xid, builder->catchange.xip, builder->catchange.xcnt,
    1203             :                      sizeof(TransactionId), xidComparator) != NULL));
    1204             : }
    1205             : 
    1206             : /* -----------------------------------
    1207             :  * Snapshot building functions dealing with xlog records
    1208             :  * -----------------------------------
    1209             :  */
    1210             : 
    1211             : /*
    1212             :  * Process a running xacts record, and use its information to first build a
    1213             :  * historic snapshot and later to release resources that aren't needed
    1214             :  * anymore.
    1215             :  */
    1216             : void
    1217        2428 : SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
    1218             : {
    1219             :     ReorderBufferTXN *txn;
    1220             :     TransactionId xmin;
    1221             : 
    1222             :     /*
    1223             :      * If we're not consistent yet, inspect the record to see whether it
    1224             :      * allows to get closer to being consistent. If we are consistent, dump
    1225             :      * our snapshot so others or we, after a restart, can use it.
    1226             :      */
    1227        2428 :     if (builder->state < SNAPBUILD_CONSISTENT)
    1228             :     {
    1229             :         /* returns false if there's no point in performing cleanup just yet */
    1230        1854 :         if (!SnapBuildFindSnapshot(builder, lsn, running))
    1231        1806 :             return;
    1232             :     }
    1233             :     else
    1234         574 :         SnapBuildSerialize(builder, lsn);
    1235             : 
    1236             :     /*
    1237             :      * Update range of interesting xids based on the running xacts
    1238             :      * information. We don't increase ->xmax using it, because once we are in
    1239             :      * a consistent state we can do that ourselves and much more efficiently
    1240             :      * so, because we only need to do it for catalog transactions since we
    1241             :      * only ever look at those.
    1242             :      *
    1243             :      * NB: We only increase xmax when a catalog modifying transaction commits
    1244             :      * (see SnapBuildCommitTxn).  Because of this, xmax can be lower than
    1245             :      * xmin, which looks odd but is correct and actually more efficient, since
    1246             :      * we hit fast paths in heapam_visibility.c.
    1247             :      */
    1248         618 :     builder->xmin = running->oldestRunningXid;
    1249             : 
    1250             :     /* Remove transactions we don't need to keep track off anymore */
    1251         618 :     SnapBuildPurgeOlderTxn(builder);
    1252             : 
    1253             :     /*
    1254             :      * Advance the xmin limit for the current replication slot, to allow
    1255             :      * vacuum to clean up the tuples this slot has been protecting.
    1256             :      *
    1257             :      * The reorderbuffer might have an xmin among the currently running
    1258             :      * snapshots; use it if so.  If not, we need only consider the snapshots
    1259             :      * we'll produce later, which can't be less than the oldest running xid in
    1260             :      * the record we're reading now.
    1261             :      */
    1262         618 :     xmin = ReorderBufferGetOldestXmin(builder->reorder);
    1263         618 :     if (xmin == InvalidTransactionId)
    1264         550 :         xmin = running->oldestRunningXid;
    1265         618 :     elog(DEBUG3, "xmin: %u, xmax: %u, oldest running: %u, oldest xmin: %u",
    1266             :          builder->xmin, builder->xmax, running->oldestRunningXid, xmin);
    1267         618 :     LogicalIncreaseXminForSlot(lsn, xmin);
    1268             : 
    1269             :     /*
    1270             :      * Also tell the slot where we can restart decoding from. We don't want to
    1271             :      * do that after every commit because changing that implies an fsync of
    1272             :      * the logical slot's state file, so we only do it every time we see a
    1273             :      * running xacts record.
    1274             :      *
    1275             :      * Do so by looking for the oldest in progress transaction (determined by
    1276             :      * the first LSN of any of its relevant records). Every transaction
    1277             :      * remembers the last location we stored the snapshot to disk before its
    1278             :      * beginning. That point is where we can restart from.
    1279             :      */
    1280             : 
    1281             :     /*
    1282             :      * Can't know about a serialized snapshot's location if we're not
    1283             :      * consistent.
    1284             :      */
    1285         618 :     if (builder->state < SNAPBUILD_CONSISTENT)
    1286          34 :         return;
    1287             : 
    1288         584 :     txn = ReorderBufferGetOldestTXN(builder->reorder);
    1289             : 
    1290             :     /*
    1291             :      * oldest ongoing txn might have started when we didn't yet serialize
    1292             :      * anything because we hadn't reached a consistent state yet.
    1293             :      */
    1294         584 :     if (txn != NULL && txn->restart_decoding_lsn != InvalidXLogRecPtr)
    1295          46 :         LogicalIncreaseRestartDecodingForSlot(lsn, txn->restart_decoding_lsn);
    1296             : 
    1297             :     /*
    1298             :      * No in-progress transaction, can reuse the last serialized snapshot if
    1299             :      * we have one.
    1300             :      */
    1301         538 :     else if (txn == NULL &&
    1302         496 :              builder->reorder->current_restart_decoding_lsn != InvalidXLogRecPtr &&
    1303         492 :              builder->last_serialized_snapshot != InvalidXLogRecPtr)
    1304         492 :         LogicalIncreaseRestartDecodingForSlot(lsn,
    1305             :                                               builder->last_serialized_snapshot);
    1306             : }
    1307             : 
    1308             : 
    1309             : /*
    1310             :  * Build the start of a snapshot that's capable of decoding the catalog.
    1311             :  *
    1312             :  * Helper function for SnapBuildProcessRunningXacts() while we're not yet
    1313             :  * consistent.
    1314             :  *
    1315             :  * Returns true if there is a point in performing internal maintenance/cleanup
    1316             :  * using the xl_running_xacts record.
    1317             :  */
    1318             : static bool
    1319        1854 : SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
    1320             : {
    1321             :     /* ---
    1322             :      * Build catalog decoding snapshot incrementally using information about
    1323             :      * the currently running transactions. There are several ways to do that:
    1324             :      *
    1325             :      * a) There were no running transactions when the xl_running_xacts record
    1326             :      *    was inserted, jump to CONSISTENT immediately. We might find such a
    1327             :      *    state while waiting on c)'s sub-states.
    1328             :      *
    1329             :      * b) This (in a previous run) or another decoding slot serialized a
    1330             :      *    snapshot to disk that we can use.  Can't use this method for the
    1331             :      *    initial snapshot when slot is being created and needs full snapshot
    1332             :      *    for export or direct use, as that snapshot will only contain catalog
    1333             :      *    modifying transactions.
    1334             :      *
    1335             :      * c) First incrementally build a snapshot for catalog tuples
    1336             :      *    (BUILDING_SNAPSHOT), that requires all, already in-progress,
    1337             :      *    transactions to finish.  Every transaction starting after that
    1338             :      *    (FULL_SNAPSHOT state), has enough information to be decoded.  But
    1339             :      *    for older running transactions no viable snapshot exists yet, so
    1340             :      *    CONSISTENT will only be reached once all of those have finished.
    1341             :      * ---
    1342             :      */
    1343             : 
    1344             :     /*
    1345             :      * xl_running_xacts record is older than what we can use, we might not
    1346             :      * have all necessary catalog rows anymore.
    1347             :      */
    1348        1854 :     if (TransactionIdIsNormal(builder->initial_xmin_horizon) &&
    1349         798 :         NormalTransactionIdPrecedes(running->oldestRunningXid,
    1350             :                                     builder->initial_xmin_horizon))
    1351             :     {
    1352           0 :         ereport(DEBUG1,
    1353             :                 (errmsg_internal("skipping snapshot at %X/%X while building logical decoding snapshot, xmin horizon too low",
    1354             :                                  LSN_FORMAT_ARGS(lsn)),
    1355             :                  errdetail_internal("initial xmin horizon of %u vs the snapshot's %u",
    1356             :                                     builder->initial_xmin_horizon, running->oldestRunningXid)));
    1357             : 
    1358             : 
    1359           0 :         SnapBuildWaitSnapshot(running, builder->initial_xmin_horizon);
    1360             : 
    1361           0 :         return true;
    1362             :     }
    1363             : 
    1364             :     /*
    1365             :      * a) No transaction were running, we can jump to consistent.
    1366             :      *
    1367             :      * This is not affected by races around xl_running_xacts, because we can
    1368             :      * miss transaction commits, but currently not transactions starting.
    1369             :      *
    1370             :      * NB: We might have already started to incrementally assemble a snapshot,
    1371             :      * so we need to be careful to deal with that.
    1372             :      */
    1373        1854 :     if (running->oldestRunningXid == running->nextXid)
    1374             :     {
    1375        1792 :         if (builder->start_decoding_at == InvalidXLogRecPtr ||
    1376        1010 :             builder->start_decoding_at <= lsn)
    1377             :             /* can decode everything after this */
    1378         784 :             builder->start_decoding_at = lsn + 1;
    1379             : 
    1380             :         /* As no transactions were running xmin/xmax can be trivially set. */
    1381        1792 :         builder->xmin = running->nextXid; /* < are finished */
    1382        1792 :         builder->xmax = running->nextXid; /* >= are running */
    1383             : 
    1384             :         /* so we can safely use the faster comparisons */
    1385             :         Assert(TransactionIdIsNormal(builder->xmin));
    1386             :         Assert(TransactionIdIsNormal(builder->xmax));
    1387             : 
    1388        1792 :         builder->state = SNAPBUILD_CONSISTENT;
    1389        1792 :         builder->next_phase_at = InvalidTransactionId;
    1390             : 
    1391        1792 :         ereport(LOG,
    1392             :                 (errmsg("logical decoding found consistent point at %X/%X",
    1393             :                         LSN_FORMAT_ARGS(lsn)),
    1394             :                  errdetail("There are no running transactions.")));
    1395             : 
    1396        1792 :         return false;
    1397             :     }
    1398             :     /* b) valid on disk state and not building full snapshot */
    1399         122 :     else if (!builder->building_full_snapshot &&
    1400          60 :              SnapBuildRestore(builder, lsn))
    1401             :     {
    1402             :         /* there won't be any state to cleanup */
    1403          14 :         return false;
    1404             :     }
    1405             : 
    1406             :     /*
    1407             :      * c) transition from START to BUILDING_SNAPSHOT.
    1408             :      *
    1409             :      * In START state, and a xl_running_xacts record with running xacts is
    1410             :      * encountered.  In that case, switch to BUILDING_SNAPSHOT state, and
    1411             :      * record xl_running_xacts->nextXid.  Once all running xacts have finished
    1412             :      * (i.e. they're all >= nextXid), we have a complete catalog snapshot.  It
    1413             :      * might look that we could use xl_running_xacts's ->xids information to
    1414             :      * get there quicker, but that is problematic because transactions marked
    1415             :      * as running, might already have inserted their commit record - it's
    1416             :      * infeasible to change that with locking.
    1417             :      */
    1418          48 :     else if (builder->state == SNAPBUILD_START)
    1419             :     {
    1420          28 :         builder->state = SNAPBUILD_BUILDING_SNAPSHOT;
    1421          28 :         builder->next_phase_at = running->nextXid;
    1422             : 
    1423             :         /*
    1424             :          * Start with an xmin/xmax that's correct for future, when all the
    1425             :          * currently running transactions have finished. We'll update both
    1426             :          * while waiting for the pending transactions to finish.
    1427             :          */
    1428          28 :         builder->xmin = running->nextXid; /* < are finished */
    1429          28 :         builder->xmax = running->nextXid; /* >= are running */
    1430             : 
    1431             :         /* so we can safely use the faster comparisons */
    1432             :         Assert(TransactionIdIsNormal(builder->xmin));
    1433             :         Assert(TransactionIdIsNormal(builder->xmax));
    1434             : 
    1435          28 :         ereport(LOG,
    1436             :                 (errmsg("logical decoding found initial starting point at %X/%X",
    1437             :                         LSN_FORMAT_ARGS(lsn)),
    1438             :                  errdetail("Waiting for transactions (approximately %d) older than %u to end.",
    1439             :                            running->xcnt, running->nextXid)));
    1440             : 
    1441          28 :         SnapBuildWaitSnapshot(running, running->nextXid);
    1442             :     }
    1443             : 
    1444             :     /*
    1445             :      * c) transition from BUILDING_SNAPSHOT to FULL_SNAPSHOT.
    1446             :      *
    1447             :      * In BUILDING_SNAPSHOT state, and this xl_running_xacts' oldestRunningXid
    1448             :      * is >= than nextXid from when we switched to BUILDING_SNAPSHOT.  This
    1449             :      * means all transactions starting afterwards have enough information to
    1450             :      * be decoded.  Switch to FULL_SNAPSHOT.
    1451             :      */
    1452          30 :     else if (builder->state == SNAPBUILD_BUILDING_SNAPSHOT &&
    1453          10 :              TransactionIdPrecedesOrEquals(builder->next_phase_at,
    1454             :                                            running->oldestRunningXid))
    1455             :     {
    1456          10 :         builder->state = SNAPBUILD_FULL_SNAPSHOT;
    1457          10 :         builder->next_phase_at = running->nextXid;
    1458             : 
    1459          10 :         ereport(LOG,
    1460             :                 (errmsg("logical decoding found initial consistent point at %X/%X",
    1461             :                         LSN_FORMAT_ARGS(lsn)),
    1462             :                  errdetail("Waiting for transactions (approximately %d) older than %u to end.",
    1463             :                            running->xcnt, running->nextXid)));
    1464             : 
    1465          10 :         SnapBuildWaitSnapshot(running, running->nextXid);
    1466             :     }
    1467             : 
    1468             :     /*
    1469             :      * c) transition from FULL_SNAPSHOT to CONSISTENT.
    1470             :      *
    1471             :      * In FULL_SNAPSHOT state, and this xl_running_xacts' oldestRunningXid is
    1472             :      * >= than nextXid from when we switched to FULL_SNAPSHOT.  This means all
    1473             :      * transactions that are currently in progress have a catalog snapshot,
    1474             :      * and all their changes have been collected.  Switch to CONSISTENT.
    1475             :      */
    1476          20 :     else if (builder->state == SNAPBUILD_FULL_SNAPSHOT &&
    1477          10 :              TransactionIdPrecedesOrEquals(builder->next_phase_at,
    1478             :                                            running->oldestRunningXid))
    1479             :     {
    1480          10 :         builder->state = SNAPBUILD_CONSISTENT;
    1481          10 :         builder->next_phase_at = InvalidTransactionId;
    1482             : 
    1483          10 :         ereport(LOG,
    1484             :                 (errmsg("logical decoding found consistent point at %X/%X",
    1485             :                         LSN_FORMAT_ARGS(lsn)),
    1486             :                  errdetail("There are no old transactions anymore.")));
    1487             :     }
    1488             : 
    1489             :     /*
    1490             :      * We already started to track running xacts and need to wait for all
    1491             :      * in-progress ones to finish. We fall through to the normal processing of
    1492             :      * records so incremental cleanup can be performed.
    1493             :      */
    1494          44 :     return true;
    1495             : }
    1496             : 
    1497             : /* ---
    1498             :  * Iterate through xids in record, wait for all older than the cutoff to
    1499             :  * finish.  Then, if possible, log a new xl_running_xacts record.
    1500             :  *
    1501             :  * This isn't required for the correctness of decoding, but to:
    1502             :  * a) allow isolationtester to notice that we're currently waiting for
    1503             :  *    something.
    1504             :  * b) log a new xl_running_xacts record where it'd be helpful, without having
    1505             :  *    to wait for bgwriter or checkpointer.
    1506             :  * ---
    1507             :  */
    1508             : static void
    1509          38 : SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff)
    1510             : {
    1511             :     int         off;
    1512             : 
    1513          72 :     for (off = 0; off < running->xcnt; off++)
    1514             :     {
    1515          38 :         TransactionId xid = running->xids[off];
    1516             : 
    1517             :         /*
    1518             :          * Upper layers should prevent that we ever need to wait on ourselves.
    1519             :          * Check anyway, since failing to do so would either result in an
    1520             :          * endless wait or an Assert() failure.
    1521             :          */
    1522          38 :         if (TransactionIdIsCurrentTransactionId(xid))
    1523           0 :             elog(ERROR, "waiting for ourselves");
    1524             : 
    1525          38 :         if (TransactionIdFollows(xid, cutoff))
    1526           0 :             continue;
    1527             : 
    1528          38 :         XactLockTableWait(xid, NULL, NULL, XLTW_None);
    1529             :     }
    1530             : 
    1531             :     /*
    1532             :      * All transactions we needed to finish finished - try to ensure there is
    1533             :      * another xl_running_xacts record in a timely manner, without having to
    1534             :      * wait for bgwriter or checkpointer to log one.  During recovery we can't
    1535             :      * enforce that, so we'll have to wait.
    1536             :      */
    1537          34 :     if (!RecoveryInProgress())
    1538             :     {
    1539          34 :         LogStandbySnapshot();
    1540             :     }
    1541          34 : }
    1542             : 
    1543             : /* -----------------------------------
    1544             :  * Snapshot serialization support
    1545             :  * -----------------------------------
    1546             :  */
    1547             : 
    1548             : /*
    1549             :  * We store current state of struct SnapBuild on disk in the following manner:
    1550             :  *
    1551             :  * struct SnapBuildOnDisk;
    1552             :  * TransactionId * committed.xcnt; (*not xcnt_space*)
    1553             :  * TransactionId * catchange.xcnt;
    1554             :  *
    1555             :  */
    1556             : typedef struct SnapBuildOnDisk
    1557             : {
    1558             :     /* first part of this struct needs to be version independent */
    1559             : 
    1560             :     /* data not covered by checksum */
    1561             :     uint32      magic;
    1562             :     pg_crc32c   checksum;
    1563             : 
    1564             :     /* data covered by checksum */
    1565             : 
    1566             :     /* version, in case we want to support pg_upgrade */
    1567             :     uint32      version;
    1568             :     /* how large is the on disk data, excluding the constant sized part */
    1569             :     uint32      length;
    1570             : 
    1571             :     /* version dependent part */
    1572             :     SnapBuild   builder;
    1573             : 
    1574             :     /* variable amount of TransactionIds follows */
    1575             : } SnapBuildOnDisk;
    1576             : 
    1577             : #define SnapBuildOnDiskConstantSize \
    1578             :     offsetof(SnapBuildOnDisk, builder)
    1579             : #define SnapBuildOnDiskNotChecksummedSize \
    1580             :     offsetof(SnapBuildOnDisk, version)
    1581             : 
    1582             : #define SNAPBUILD_MAGIC 0x51A1E001
    1583             : #define SNAPBUILD_VERSION 5
    1584             : 
    1585             : /*
    1586             :  * Store/Load a snapshot from disk, depending on the snapshot builder's state.
    1587             :  *
    1588             :  * Supposed to be used by external (i.e. not snapbuild.c) code that just read
    1589             :  * a record that's a potential location for a serialized snapshot.
    1590             :  */
    1591             : void
    1592          74 : SnapBuildSerializationPoint(SnapBuild *builder, XLogRecPtr lsn)
    1593             : {
    1594          74 :     if (builder->state < SNAPBUILD_CONSISTENT)
    1595           0 :         SnapBuildRestore(builder, lsn);
    1596             :     else
    1597          74 :         SnapBuildSerialize(builder, lsn);
    1598          74 : }
    1599             : 
    1600             : /*
    1601             :  * Serialize the snapshot 'builder' at the location 'lsn' if it hasn't already
    1602             :  * been done by another decoding process.
    1603             :  */
    1604             : static void
    1605         648 : SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
    1606             : {
    1607             :     Size        needed_length;
    1608         648 :     SnapBuildOnDisk *ondisk = NULL;
    1609         648 :     TransactionId *catchange_xip = NULL;
    1610             :     MemoryContext old_ctx;
    1611             :     size_t      catchange_xcnt;
    1612             :     char       *ondisk_c;
    1613             :     int         fd;
    1614             :     char        tmppath[MAXPGPATH];
    1615             :     char        path[MAXPGPATH];
    1616             :     int         ret;
    1617             :     struct stat stat_buf;
    1618             :     Size        sz;
    1619             : 
    1620             :     Assert(lsn != InvalidXLogRecPtr);
    1621             :     Assert(builder->last_serialized_snapshot == InvalidXLogRecPtr ||
    1622             :            builder->last_serialized_snapshot <= lsn);
    1623             : 
    1624             :     /*
    1625             :      * no point in serializing if we cannot continue to work immediately after
    1626             :      * restoring the snapshot
    1627             :      */
    1628         648 :     if (builder->state < SNAPBUILD_CONSISTENT)
    1629           0 :         return;
    1630             : 
    1631             :     /* consistent snapshots have no next phase */
    1632             :     Assert(builder->next_phase_at == InvalidTransactionId);
    1633             : 
    1634             :     /*
    1635             :      * We identify snapshots by the LSN they are valid for. We don't need to
    1636             :      * include timelines in the name as each LSN maps to exactly one timeline
    1637             :      * unless the user used pg_resetwal or similar. If a user did so, there's
    1638             :      * no hope continuing to decode anyway.
    1639             :      */
    1640         648 :     sprintf(path, "pg_logical/snapshots/%X-%X.snap",
    1641         648 :             LSN_FORMAT_ARGS(lsn));
    1642             : 
    1643             :     /*
    1644             :      * first check whether some other backend already has written the snapshot
    1645             :      * for this LSN. It's perfectly fine if there's none, so we accept ENOENT
    1646             :      * as a valid state. Everything else is an unexpected error.
    1647             :      */
    1648         648 :     ret = stat(path, &stat_buf);
    1649             : 
    1650         648 :     if (ret != 0 && errno != ENOENT)
    1651           0 :         ereport(ERROR,
    1652             :                 (errcode_for_file_access(),
    1653             :                  errmsg("could not stat file \"%s\": %m", path)));
    1654             : 
    1655         648 :     else if (ret == 0)
    1656             :     {
    1657             :         /*
    1658             :          * somebody else has already serialized to this point, don't overwrite
    1659             :          * but remember location, so we don't need to read old data again.
    1660             :          *
    1661             :          * To be sure it has been synced to disk after the rename() from the
    1662             :          * tempfile filename to the real filename, we just repeat the fsync.
    1663             :          * That ought to be cheap because in most scenarios it should already
    1664             :          * be safely on disk.
    1665             :          */
    1666         146 :         fsync_fname(path, false);
    1667         146 :         fsync_fname("pg_logical/snapshots", true);
    1668             : 
    1669         146 :         builder->last_serialized_snapshot = lsn;
    1670         146 :         goto out;
    1671             :     }
    1672             : 
    1673             :     /*
    1674             :      * there is an obvious race condition here between the time we stat(2) the
    1675             :      * file and us writing the file. But we rename the file into place
    1676             :      * atomically and all files created need to contain the same data anyway,
    1677             :      * so this is perfectly fine, although a bit of a resource waste. Locking
    1678             :      * seems like pointless complication.
    1679             :      */
    1680         502 :     elog(DEBUG1, "serializing snapshot to %s", path);
    1681             : 
    1682             :     /* to make sure only we will write to this tempfile, include pid */
    1683         502 :     sprintf(tmppath, "pg_logical/snapshots/%X-%X.snap.%d.tmp",
    1684         502 :             LSN_FORMAT_ARGS(lsn), MyProcPid);
    1685             : 
    1686             :     /*
    1687             :      * Unlink temporary file if it already exists, needs to have been before a
    1688             :      * crash/error since we won't enter this function twice from within a
    1689             :      * single decoding slot/backend and the temporary file contains the pid of
    1690             :      * the current process.
    1691             :      */
    1692         502 :     if (unlink(tmppath) != 0 && errno != ENOENT)
    1693           0 :         ereport(ERROR,
    1694             :                 (errcode_for_file_access(),
    1695             :                  errmsg("could not remove file \"%s\": %m", tmppath)));
    1696             : 
    1697         502 :     old_ctx = MemoryContextSwitchTo(builder->context);
    1698             : 
    1699             :     /* Get the catalog modifying transactions that are yet not committed */
    1700         502 :     catchange_xip = ReorderBufferGetCatalogChangesXacts(builder->reorder);
    1701         502 :     catchange_xcnt = dclist_count(&builder->reorder->catchange_txns);
    1702             : 
    1703         502 :     needed_length = sizeof(SnapBuildOnDisk) +
    1704         502 :         sizeof(TransactionId) * (builder->committed.xcnt + catchange_xcnt);
    1705             : 
    1706         502 :     ondisk_c = palloc0(needed_length);
    1707         502 :     ondisk = (SnapBuildOnDisk *) ondisk_c;
    1708         502 :     ondisk->magic = SNAPBUILD_MAGIC;
    1709         502 :     ondisk->version = SNAPBUILD_VERSION;
    1710         502 :     ondisk->length = needed_length;
    1711         502 :     INIT_CRC32C(ondisk->checksum);
    1712         502 :     COMP_CRC32C(ondisk->checksum,
    1713             :                 ((char *) ondisk) + SnapBuildOnDiskNotChecksummedSize,
    1714             :                 SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
    1715         502 :     ondisk_c += sizeof(SnapBuildOnDisk);
    1716             : 
    1717         502 :     memcpy(&ondisk->builder, builder, sizeof(SnapBuild));
    1718             :     /* NULL-ify memory-only data */
    1719         502 :     ondisk->builder.context = NULL;
    1720         502 :     ondisk->builder.snapshot = NULL;
    1721         502 :     ondisk->builder.reorder = NULL;
    1722         502 :     ondisk->builder.committed.xip = NULL;
    1723         502 :     ondisk->builder.catchange.xip = NULL;
    1724             :     /* update catchange only on disk data */
    1725         502 :     ondisk->builder.catchange.xcnt = catchange_xcnt;
    1726             : 
    1727         502 :     COMP_CRC32C(ondisk->checksum,
    1728             :                 &ondisk->builder,
    1729             :                 sizeof(SnapBuild));
    1730             : 
    1731             :     /* copy committed xacts */
    1732         502 :     if (builder->committed.xcnt > 0)
    1733             :     {
    1734          86 :         sz = sizeof(TransactionId) * builder->committed.xcnt;
    1735          86 :         memcpy(ondisk_c, builder->committed.xip, sz);
    1736          86 :         COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
    1737          86 :         ondisk_c += sz;
    1738             :     }
    1739             : 
    1740             :     /* copy catalog modifying xacts */
    1741         502 :     if (catchange_xcnt > 0)
    1742             :     {
    1743          14 :         sz = sizeof(TransactionId) * catchange_xcnt;
    1744          14 :         memcpy(ondisk_c, catchange_xip, sz);
    1745          14 :         COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
    1746          14 :         ondisk_c += sz;
    1747             :     }
    1748             : 
    1749         502 :     FIN_CRC32C(ondisk->checksum);
    1750             : 
    1751             :     /* we have valid data now, open tempfile and write it there */
    1752         502 :     fd = OpenTransientFile(tmppath,
    1753             :                            O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
    1754         502 :     if (fd < 0)
    1755           0 :         ereport(ERROR,
    1756             :                 (errcode_for_file_access(),
    1757             :                  errmsg("could not open file \"%s\": %m", tmppath)));
    1758             : 
    1759         502 :     errno = 0;
    1760         502 :     pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_WRITE);
    1761         502 :     if ((write(fd, ondisk, needed_length)) != needed_length)
    1762             :     {
    1763           0 :         int         save_errno = errno;
    1764             : 
    1765           0 :         CloseTransientFile(fd);
    1766             : 
    1767             :         /* if write didn't set errno, assume problem is no disk space */
    1768           0 :         errno = save_errno ? save_errno : ENOSPC;
    1769           0 :         ereport(ERROR,
    1770             :                 (errcode_for_file_access(),
    1771             :                  errmsg("could not write to file \"%s\": %m", tmppath)));
    1772             :     }
    1773         502 :     pgstat_report_wait_end();
    1774             : 
    1775             :     /*
    1776             :      * fsync the file before renaming so that even if we crash after this we
    1777             :      * have either a fully valid file or nothing.
    1778             :      *
    1779             :      * It's safe to just ERROR on fsync() here because we'll retry the whole
    1780             :      * operation including the writes.
    1781             :      *
    1782             :      * TODO: Do the fsync() via checkpoints/restartpoints, doing it here has
    1783             :      * some noticeable overhead since it's performed synchronously during
    1784             :      * decoding?
    1785             :      */
    1786         502 :     pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_SYNC);
    1787         502 :     if (pg_fsync(fd) != 0)
    1788             :     {
    1789           0 :         int         save_errno = errno;
    1790             : 
    1791           0 :         CloseTransientFile(fd);
    1792           0 :         errno = save_errno;
    1793           0 :         ereport(ERROR,
    1794             :                 (errcode_for_file_access(),
    1795             :                  errmsg("could not fsync file \"%s\": %m", tmppath)));
    1796             :     }
    1797         502 :     pgstat_report_wait_end();
    1798             : 
    1799         502 :     if (CloseTransientFile(fd) != 0)
    1800           0 :         ereport(ERROR,
    1801             :                 (errcode_for_file_access(),
    1802             :                  errmsg("could not close file \"%s\": %m", tmppath)));
    1803             : 
    1804         502 :     fsync_fname("pg_logical/snapshots", true);
    1805             : 
    1806             :     /*
    1807             :      * We may overwrite the work from some other backend, but that's ok, our
    1808             :      * snapshot is valid as well, we'll just have done some superfluous work.
    1809             :      */
    1810         502 :     if (rename(tmppath, path) != 0)
    1811             :     {
    1812           0 :         ereport(ERROR,
    1813             :                 (errcode_for_file_access(),
    1814             :                  errmsg("could not rename file \"%s\" to \"%s\": %m",
    1815             :                         tmppath, path)));
    1816             :     }
    1817             : 
    1818             :     /* make sure we persist */
    1819         502 :     fsync_fname(path, false);
    1820         502 :     fsync_fname("pg_logical/snapshots", true);
    1821             : 
    1822             :     /*
    1823             :      * Now there's no way we can lose the dumped state anymore, remember this
    1824             :      * as a serialization point.
    1825             :      */
    1826         502 :     builder->last_serialized_snapshot = lsn;
    1827             : 
    1828         502 :     MemoryContextSwitchTo(old_ctx);
    1829             : 
    1830         648 : out:
    1831         648 :     ReorderBufferSetRestartPoint(builder->reorder,
    1832             :                                  builder->last_serialized_snapshot);
    1833             :     /* be tidy */
    1834         648 :     if (ondisk)
    1835         502 :         pfree(ondisk);
    1836         648 :     if (catchange_xip)
    1837          14 :         pfree(catchange_xip);
    1838             : }
    1839             : 
    1840             : /*
    1841             :  * Restore a snapshot into 'builder' if previously one has been stored at the
    1842             :  * location indicated by 'lsn'. Returns true if successful, false otherwise.
    1843             :  */
    1844             : static bool
    1845          60 : SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
    1846             : {
    1847             :     SnapBuildOnDisk ondisk;
    1848             :     int         fd;
    1849             :     char        path[MAXPGPATH];
    1850             :     Size        sz;
    1851             :     pg_crc32c   checksum;
    1852             : 
    1853             :     /* no point in loading a snapshot if we're already there */
    1854          60 :     if (builder->state == SNAPBUILD_CONSISTENT)
    1855           0 :         return false;
    1856             : 
    1857          60 :     sprintf(path, "pg_logical/snapshots/%X-%X.snap",
    1858          60 :             LSN_FORMAT_ARGS(lsn));
    1859             : 
    1860          60 :     fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
    1861             : 
    1862          60 :     if (fd < 0 && errno == ENOENT)
    1863          46 :         return false;
    1864          14 :     else if (fd < 0)
    1865           0 :         ereport(ERROR,
    1866             :                 (errcode_for_file_access(),
    1867             :                  errmsg("could not open file \"%s\": %m", path)));
    1868             : 
    1869             :     /* ----
    1870             :      * Make sure the snapshot had been stored safely to disk, that's normally
    1871             :      * cheap.
    1872             :      * Note that we do not need PANIC here, nobody will be able to use the
    1873             :      * slot without fsyncing, and saving it won't succeed without an fsync()
    1874             :      * either...
    1875             :      * ----
    1876             :      */
    1877          14 :     fsync_fname(path, false);
    1878          14 :     fsync_fname("pg_logical/snapshots", true);
    1879             : 
    1880             : 
    1881             :     /* read statically sized portion of snapshot */
    1882          14 :     SnapBuildRestoreContents(fd, (char *) &ondisk, SnapBuildOnDiskConstantSize, path);
    1883             : 
    1884          14 :     if (ondisk.magic != SNAPBUILD_MAGIC)
    1885           0 :         ereport(ERROR,
    1886             :                 (errcode(ERRCODE_DATA_CORRUPTED),
    1887             :                  errmsg("snapbuild state file \"%s\" has wrong magic number: %u instead of %u",
    1888             :                         path, ondisk.magic, SNAPBUILD_MAGIC)));
    1889             : 
    1890          14 :     if (ondisk.version != SNAPBUILD_VERSION)
    1891           0 :         ereport(ERROR,
    1892             :                 (errcode(ERRCODE_DATA_CORRUPTED),
    1893             :                  errmsg("snapbuild state file \"%s\" has unsupported version: %u instead of %u",
    1894             :                         path, ondisk.version, SNAPBUILD_VERSION)));
    1895             : 
    1896          14 :     INIT_CRC32C(checksum);
    1897          14 :     COMP_CRC32C(checksum,
    1898             :                 ((char *) &ondisk) + SnapBuildOnDiskNotChecksummedSize,
    1899             :                 SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
    1900             : 
    1901             :     /* read SnapBuild */
    1902          14 :     SnapBuildRestoreContents(fd, (char *) &ondisk.builder, sizeof(SnapBuild), path);
    1903          14 :     COMP_CRC32C(checksum, &ondisk.builder, sizeof(SnapBuild));
    1904             : 
    1905             :     /* restore committed xacts information */
    1906          14 :     if (ondisk.builder.committed.xcnt > 0)
    1907             :     {
    1908           4 :         sz = sizeof(TransactionId) * ondisk.builder.committed.xcnt;
    1909           4 :         ondisk.builder.committed.xip = MemoryContextAllocZero(builder->context, sz);
    1910           4 :         SnapBuildRestoreContents(fd, (char *) ondisk.builder.committed.xip, sz, path);
    1911           4 :         COMP_CRC32C(checksum, ondisk.builder.committed.xip, sz);
    1912             :     }
    1913             : 
    1914             :     /* restore catalog modifying xacts information */
    1915          14 :     if (ondisk.builder.catchange.xcnt > 0)
    1916             :     {
    1917           6 :         sz = sizeof(TransactionId) * ondisk.builder.catchange.xcnt;
    1918           6 :         ondisk.builder.catchange.xip = MemoryContextAllocZero(builder->context, sz);
    1919           6 :         SnapBuildRestoreContents(fd, (char *) ondisk.builder.catchange.xip, sz, path);
    1920           6 :         COMP_CRC32C(checksum, ondisk.builder.catchange.xip, sz);
    1921             :     }
    1922             : 
    1923          14 :     if (CloseTransientFile(fd) != 0)
    1924           0 :         ereport(ERROR,
    1925             :                 (errcode_for_file_access(),
    1926             :                  errmsg("could not close file \"%s\": %m", path)));
    1927             : 
    1928          14 :     FIN_CRC32C(checksum);
    1929             : 
    1930             :     /* verify checksum of what we've read */
    1931          14 :     if (!EQ_CRC32C(checksum, ondisk.checksum))
    1932           0 :         ereport(ERROR,
    1933             :                 (errcode(ERRCODE_DATA_CORRUPTED),
    1934             :                  errmsg("checksum mismatch for snapbuild state file \"%s\": is %u, should be %u",
    1935             :                         path, checksum, ondisk.checksum)));
    1936             : 
    1937             :     /*
    1938             :      * ok, we now have a sensible snapshot here, figure out if it has more
    1939             :      * information than we have.
    1940             :      */
    1941             : 
    1942             :     /*
    1943             :      * We are only interested in consistent snapshots for now, comparing
    1944             :      * whether one incomplete snapshot is more "advanced" seems to be
    1945             :      * unnecessarily complex.
    1946             :      */
    1947          14 :     if (ondisk.builder.state < SNAPBUILD_CONSISTENT)
    1948           0 :         goto snapshot_not_interesting;
    1949             : 
    1950             :     /*
    1951             :      * Don't use a snapshot that requires an xmin that we cannot guarantee to
    1952             :      * be available.
    1953             :      */
    1954          14 :     if (TransactionIdPrecedes(ondisk.builder.xmin, builder->initial_xmin_horizon))
    1955           0 :         goto snapshot_not_interesting;
    1956             : 
    1957             :     /*
    1958             :      * Consistent snapshots have no next phase. Reset next_phase_at as it is
    1959             :      * possible that an old value may remain.
    1960             :      */
    1961             :     Assert(ondisk.builder.next_phase_at == InvalidTransactionId);
    1962          14 :     builder->next_phase_at = InvalidTransactionId;
    1963             : 
    1964             :     /* ok, we think the snapshot is sensible, copy over everything important */
    1965          14 :     builder->xmin = ondisk.builder.xmin;
    1966          14 :     builder->xmax = ondisk.builder.xmax;
    1967          14 :     builder->state = ondisk.builder.state;
    1968             : 
    1969          14 :     builder->committed.xcnt = ondisk.builder.committed.xcnt;
    1970             :     /* We only allocated/stored xcnt, not xcnt_space xids ! */
    1971             :     /* don't overwrite preallocated xip, if we don't have anything here */
    1972          14 :     if (builder->committed.xcnt > 0)
    1973             :     {
    1974           4 :         pfree(builder->committed.xip);
    1975           4 :         builder->committed.xcnt_space = ondisk.builder.committed.xcnt;
    1976           4 :         builder->committed.xip = ondisk.builder.committed.xip;
    1977             :     }
    1978          14 :     ondisk.builder.committed.xip = NULL;
    1979             : 
    1980             :     /* set catalog modifying transactions */
    1981          14 :     if (builder->catchange.xip)
    1982           0 :         pfree(builder->catchange.xip);
    1983          14 :     builder->catchange.xcnt = ondisk.builder.catchange.xcnt;
    1984          14 :     builder->catchange.xip = ondisk.builder.catchange.xip;
    1985          14 :     ondisk.builder.catchange.xip = NULL;
    1986             : 
    1987             :     /* our snapshot is not interesting anymore, build a new one */
    1988          14 :     if (builder->snapshot != NULL)
    1989             :     {
    1990           0 :         SnapBuildSnapDecRefcount(builder->snapshot);
    1991             :     }
    1992          14 :     builder->snapshot = SnapBuildBuildSnapshot(builder);
    1993          14 :     SnapBuildSnapIncRefcount(builder->snapshot);
    1994             : 
    1995          14 :     ReorderBufferSetRestartPoint(builder->reorder, lsn);
    1996             : 
    1997             :     Assert(builder->state == SNAPBUILD_CONSISTENT);
    1998             : 
    1999          14 :     ereport(LOG,
    2000             :             (errmsg("logical decoding found consistent point at %X/%X",
    2001             :                     LSN_FORMAT_ARGS(lsn)),
    2002             :              errdetail("Logical decoding will begin using saved snapshot.")));
    2003          14 :     return true;
    2004             : 
    2005           0 : snapshot_not_interesting:
    2006           0 :     if (ondisk.builder.committed.xip != NULL)
    2007           0 :         pfree(ondisk.builder.committed.xip);
    2008           0 :     if (ondisk.builder.catchange.xip != NULL)
    2009           0 :         pfree(ondisk.builder.catchange.xip);
    2010           0 :     return false;
    2011             : }
    2012             : 
    2013             : /*
    2014             :  * Read the contents of the serialized snapshot to 'dest'.
    2015             :  */
    2016             : static void
    2017          38 : SnapBuildRestoreContents(int fd, char *dest, Size size, const char *path)
    2018             : {
    2019             :     int         readBytes;
    2020             : 
    2021          38 :     pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ);
    2022          38 :     readBytes = read(fd, dest, size);
    2023          38 :     pgstat_report_wait_end();
    2024          38 :     if (readBytes != size)
    2025             :     {
    2026           0 :         int         save_errno = errno;
    2027             : 
    2028           0 :         CloseTransientFile(fd);
    2029             : 
    2030           0 :         if (readBytes < 0)
    2031             :         {
    2032           0 :             errno = save_errno;
    2033           0 :             ereport(ERROR,
    2034             :                     (errcode_for_file_access(),
    2035             :                      errmsg("could not read file \"%s\": %m", path)));
    2036             :         }
    2037             :         else
    2038           0 :             ereport(ERROR,
    2039             :                     (errcode(ERRCODE_DATA_CORRUPTED),
    2040             :                      errmsg("could not read file \"%s\": read %d of %zu",
    2041             :                             path, readBytes, size)));
    2042             :     }
    2043          38 : }
    2044             : 
    2045             : /*
    2046             :  * Remove all serialized snapshots that are not required anymore because no
    2047             :  * slot can need them. This doesn't actually have to run during a checkpoint,
    2048             :  * but it's a convenient point to schedule this.
    2049             :  *
    2050             :  * NB: We run this during checkpoints even if logical decoding is disabled so
    2051             :  * we cleanup old slots at some point after it got disabled.
    2052             :  */
    2053             : void
    2054        1704 : CheckPointSnapBuild(void)
    2055             : {
    2056             :     XLogRecPtr  cutoff;
    2057             :     XLogRecPtr  redo;
    2058             :     DIR        *snap_dir;
    2059             :     struct dirent *snap_de;
    2060             :     char        path[MAXPGPATH + 21];
    2061             : 
    2062             :     /*
    2063             :      * We start off with a minimum of the last redo pointer. No new
    2064             :      * replication slot will start before that, so that's a safe upper bound
    2065             :      * for removal.
    2066             :      */
    2067        1704 :     redo = GetRedoRecPtr();
    2068             : 
    2069             :     /* now check for the restart ptrs from existing slots */
    2070        1704 :     cutoff = ReplicationSlotsComputeLogicalRestartLSN();
    2071             : 
    2072             :     /* don't start earlier than the restart lsn */
    2073        1704 :     if (redo < cutoff)
    2074           0 :         cutoff = redo;
    2075             : 
    2076        1704 :     snap_dir = AllocateDir("pg_logical/snapshots");
    2077        5548 :     while ((snap_de = ReadDir(snap_dir, "pg_logical/snapshots")) != NULL)
    2078             :     {
    2079             :         uint32      hi;
    2080             :         uint32      lo;
    2081             :         XLogRecPtr  lsn;
    2082             :         PGFileType  de_type;
    2083             : 
    2084        3844 :         if (strcmp(snap_de->d_name, ".") == 0 ||
    2085        2140 :             strcmp(snap_de->d_name, "..") == 0)
    2086        3408 :             continue;
    2087             : 
    2088         436 :         snprintf(path, sizeof(path), "pg_logical/snapshots/%s", snap_de->d_name);
    2089         436 :         de_type = get_dirent_type(path, snap_de, false, DEBUG1);
    2090             : 
    2091         436 :         if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_REG)
    2092             :         {
    2093           0 :             elog(DEBUG1, "only regular files expected: %s", path);
    2094           0 :             continue;
    2095             :         }
    2096             : 
    2097             :         /*
    2098             :          * temporary filenames from SnapBuildSerialize() include the LSN and
    2099             :          * everything but are postfixed by .$pid.tmp. We can just remove them
    2100             :          * the same as other files because there can be none that are
    2101             :          * currently being written that are older than cutoff.
    2102             :          *
    2103             :          * We just log a message if a file doesn't fit the pattern, it's
    2104             :          * probably some editors lock/state file or similar...
    2105             :          */
    2106         436 :         if (sscanf(snap_de->d_name, "%X-%X.snap", &hi, &lo) != 2)
    2107             :         {
    2108           0 :             ereport(LOG,
    2109             :                     (errmsg("could not parse file name \"%s\"", path)));
    2110           0 :             continue;
    2111             :         }
    2112             : 
    2113         436 :         lsn = ((uint64) hi) << 32 | lo;
    2114             : 
    2115             :         /* check whether we still need it */
    2116         436 :         if (lsn < cutoff || cutoff == InvalidXLogRecPtr)
    2117             :         {
    2118         314 :             elog(DEBUG1, "removing snapbuild snapshot %s", path);
    2119             : 
    2120             :             /*
    2121             :              * It's not particularly harmful, though strange, if we can't
    2122             :              * remove the file here. Don't prevent the checkpoint from
    2123             :              * completing, that'd be a cure worse than the disease.
    2124             :              */
    2125         314 :             if (unlink(path) < 0)
    2126             :             {
    2127           0 :                 ereport(LOG,
    2128             :                         (errcode_for_file_access(),
    2129             :                          errmsg("could not remove file \"%s\": %m",
    2130             :                                 path)));
    2131           0 :                 continue;
    2132             :             }
    2133             :         }
    2134             :     }
    2135        1704 :     FreeDir(snap_dir);
    2136        1704 : }
    2137             : 
    2138             : /*
    2139             :  * Check if a logical snapshot at the specified point has been serialized.
    2140             :  */
    2141             : bool
    2142          20 : SnapBuildSnapshotExists(XLogRecPtr lsn)
    2143             : {
    2144             :     char        path[MAXPGPATH];
    2145             :     int         ret;
    2146             :     struct stat stat_buf;
    2147             : 
    2148          20 :     sprintf(path, "pg_logical/snapshots/%X-%X.snap",
    2149          20 :             LSN_FORMAT_ARGS(lsn));
    2150             : 
    2151          20 :     ret = stat(path, &stat_buf);
    2152             : 
    2153          20 :     if (ret != 0 && errno != ENOENT)
    2154           0 :         ereport(ERROR,
    2155             :                 (errcode_for_file_access(),
    2156             :                  errmsg("could not stat file \"%s\": %m", path)));
    2157             : 
    2158          20 :     return ret == 0;
    2159             : }

Generated by: LCOV version 1.14