Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * snapbuild.c
4 : *
5 : * Infrastructure for building historic catalog snapshots based on contents
6 : * of the WAL, for the purpose of decoding heapam.c style values in the
7 : * WAL.
8 : *
9 : * NOTES:
10 : *
11 : * We build snapshots which can *only* be used to read catalog contents and we
12 : * do so by reading and interpreting the WAL stream. The aim is to build a
13 : * snapshot that behaves the same as a freshly taken MVCC snapshot would have
14 : * at the time the XLogRecord was generated.
15 : *
16 : * To build the snapshots we reuse the infrastructure built for Hot
17 : * Standby. The in-memory snapshots we build look different than HS' because
18 : * we have different needs. To successfully decode data from the WAL we only
19 : * need to access catalog tables and (sys|rel|cat)cache, not the actual user
20 : * tables since the data we decode is wholly contained in the WAL
21 : * records. Also, our snapshots need to be different in comparison to normal
22 : * MVCC ones because in contrast to those we cannot fully rely on the clog and
23 : * pg_subtrans for information about committed transactions because they might
24 : * commit in the future from the POV of the WAL entry we're currently
25 : * decoding. This definition has the advantage that we only need to prevent
26 : * removal of catalog rows, while normal table's rows can still be
27 : * removed. This is achieved by using the replication slot mechanism.
28 : *
29 : * As the percentage of transactions modifying the catalog normally is fairly
30 : * small in comparisons to ones only manipulating user data, we keep track of
31 : * the committed catalog modifying ones inside [xmin, xmax) instead of keeping
32 : * track of all running transactions like it's done in a normal snapshot. Note
33 : * that we're generally only looking at transactions that have acquired an
34 : * xid. That is we keep a list of transactions between snapshot->(xmin, xmax)
35 : * that we consider committed, everything else is considered aborted/in
36 : * progress. That also allows us not to care about subtransactions before they
37 : * have committed which means this module, in contrast to HS, doesn't have to
38 : * care about suboverflowed subtransactions and similar.
39 : *
40 : * One complexity of doing this is that to e.g. handle mixed DDL/DML
41 : * transactions we need Snapshots that see intermediate versions of the
42 : * catalog in a transaction. During normal operation this is achieved by using
43 : * CommandIds/cmin/cmax. The problem with that however is that for space
44 : * efficiency reasons, the cmin and cmax are not included in WAL records. We
45 : * cannot read the cmin/cmax from the tuple itself, either, because it is
46 : * reset on crash recovery. Even if we could, we could not decode combocids
47 : * which are only tracked in the original backend's memory. To work around
48 : * that, heapam writes an extra WAL record (XLOG_HEAP2_NEW_CID) every time a
49 : * catalog row is modified, which includes the cmin and cmax of the
50 : * tuple. During decoding, we insert the ctid->(cmin,cmax) mappings into the
51 : * reorder buffer, and use them at visibility checks instead of the cmin/cmax
52 : * on the tuple itself. Check the reorderbuffer.c's comment above
53 : * ResolveCminCmaxDuringDecoding() for details.
54 : *
55 : * To facilitate all this we need our own visibility routine, as the normal
56 : * ones are optimized for different usecases.
57 : *
58 : * To replace the normal catalog snapshots with decoding ones use the
59 : * SetupHistoricSnapshot() and TeardownHistoricSnapshot() functions.
60 : *
61 : *
62 : *
63 : * The snapbuild machinery is starting up in several stages, as illustrated
64 : * by the following graph describing the SnapBuild->state transitions:
65 : *
66 : * +-------------------------+
67 : * +----| START |-------------+
68 : * | +-------------------------+ |
69 : * | | |
70 : * | | |
71 : * | running_xacts #1 |
72 : * | | |
73 : * | | |
74 : * | v |
75 : * | +-------------------------+ v
76 : * | | BUILDING_SNAPSHOT |------------>|
77 : * | +-------------------------+ |
78 : * | | |
79 : * | | |
80 : * | running_xacts #2, xacts from #1 finished |
81 : * | | |
82 : * | | |
83 : * | v |
84 : * | +-------------------------+ v
85 : * | | FULL_SNAPSHOT |------------>|
86 : * | +-------------------------+ |
87 : * | | |
88 : * running_xacts | saved snapshot
89 : * with zero xacts | at running_xacts's lsn
90 : * | | |
91 : * | running_xacts with xacts from #2 finished |
92 : * | | |
93 : * | v |
94 : * | +-------------------------+ |
95 : * +--->|SNAPBUILD_CONSISTENT |<------------+
96 : * +-------------------------+
97 : *
98 : * Initially the machinery is in the START stage. When an xl_running_xacts
99 : * record is read that is sufficiently new (above the safe xmin horizon),
100 : * there's a state transition. If there were no running xacts when the
101 : * xl_running_xacts record was generated, we'll directly go into CONSISTENT
102 : * state, otherwise we'll switch to the BUILDING_SNAPSHOT state. Having a full
103 : * snapshot means that all transactions that start henceforth can be decoded
104 : * in their entirety, but transactions that started previously can't. In
105 : * FULL_SNAPSHOT we'll switch into CONSISTENT once all those previously
106 : * running transactions have committed or aborted.
107 : *
108 : * Only transactions that commit after CONSISTENT state has been reached will
109 : * be replayed, even though they might have started while still in
110 : * FULL_SNAPSHOT. That ensures that we'll reach a point where no previous
111 : * changes has been exported, but all the following ones will be. That point
112 : * is a convenient point to initialize replication from, which is why we
113 : * export a snapshot at that point, which *can* be used to read normal data.
114 : *
115 : * Copyright (c) 2012-2025, PostgreSQL Global Development Group
116 : *
117 : * IDENTIFICATION
118 : * src/backend/replication/logical/snapbuild.c
119 : *
120 : *-------------------------------------------------------------------------
121 : */
122 :
123 : #include "postgres.h"
124 :
125 : #include <sys/stat.h>
126 : #include <unistd.h>
127 :
128 : #include "access/heapam_xlog.h"
129 : #include "access/transam.h"
130 : #include "access/xact.h"
131 : #include "common/file_utils.h"
132 : #include "miscadmin.h"
133 : #include "pgstat.h"
134 : #include "replication/logical.h"
135 : #include "replication/reorderbuffer.h"
136 : #include "replication/snapbuild.h"
137 : #include "replication/snapbuild_internal.h"
138 : #include "storage/fd.h"
139 : #include "storage/lmgr.h"
140 : #include "storage/proc.h"
141 : #include "storage/procarray.h"
142 : #include "storage/standby.h"
143 : #include "utils/builtins.h"
144 : #include "utils/memutils.h"
145 : #include "utils/snapmgr.h"
146 : #include "utils/snapshot.h"
147 : /*
148 : * Starting a transaction -- which we need to do while exporting a snapshot --
149 : * removes knowledge about the previously used resowner, so we save it here.
150 : */
151 : static ResourceOwner SavedResourceOwnerDuringExport = NULL;
152 : static bool ExportInProgress = false;
153 :
154 : /* ->committed and ->catchange manipulation */
155 : static void SnapBuildPurgeOlderTxn(SnapBuild *builder);
156 :
157 : /* snapshot building/manipulation/distribution functions */
158 : static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder);
159 :
160 : static void SnapBuildFreeSnapshot(Snapshot snap);
161 :
162 : static void SnapBuildSnapIncRefcount(Snapshot snap);
163 :
164 : static void SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid);
165 :
166 : static inline bool SnapBuildXidHasCatalogChanges(SnapBuild *builder, TransactionId xid,
167 : uint32 xinfo);
168 :
169 : /* xlog reading helper functions for SnapBuildProcessRunningXacts */
170 : static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running);
171 : static void SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff);
172 :
173 : /* serialization functions */
174 : static void SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn);
175 : static bool SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn);
176 : static void SnapBuildRestoreContents(int fd, void *dest, Size size, const char *path);
177 :
178 : /*
179 : * Allocate a new snapshot builder.
180 : *
181 : * xmin_horizon is the xid >= which we can be sure no catalog rows have been
182 : * removed, start_lsn is the LSN >= we want to replay commits.
183 : */
184 : SnapBuild *
185 2080 : AllocateSnapshotBuilder(ReorderBuffer *reorder,
186 : TransactionId xmin_horizon,
187 : XLogRecPtr start_lsn,
188 : bool need_full_snapshot,
189 : bool in_slot_creation,
190 : XLogRecPtr two_phase_at)
191 : {
192 : MemoryContext context;
193 : MemoryContext oldcontext;
194 : SnapBuild *builder;
195 :
196 : /* allocate memory in own context, to have better accountability */
197 2080 : context = AllocSetContextCreate(CurrentMemoryContext,
198 : "snapshot builder context",
199 : ALLOCSET_DEFAULT_SIZES);
200 2080 : oldcontext = MemoryContextSwitchTo(context);
201 :
202 2080 : builder = palloc0(sizeof(SnapBuild));
203 :
204 2080 : builder->state = SNAPBUILD_START;
205 2080 : builder->context = context;
206 2080 : builder->reorder = reorder;
207 : /* Other struct members initialized by zeroing via palloc0 above */
208 :
209 2080 : builder->committed.xcnt = 0;
210 2080 : builder->committed.xcnt_space = 128; /* arbitrary number */
211 2080 : builder->committed.xip =
212 2080 : palloc0(builder->committed.xcnt_space * sizeof(TransactionId));
213 2080 : builder->committed.includes_all_transactions = true;
214 :
215 2080 : builder->catchange.xcnt = 0;
216 2080 : builder->catchange.xip = NULL;
217 :
218 2080 : builder->initial_xmin_horizon = xmin_horizon;
219 2080 : builder->start_decoding_at = start_lsn;
220 2080 : builder->in_slot_creation = in_slot_creation;
221 2080 : builder->building_full_snapshot = need_full_snapshot;
222 2080 : builder->two_phase_at = two_phase_at;
223 :
224 2080 : MemoryContextSwitchTo(oldcontext);
225 :
226 2080 : return builder;
227 : }
228 :
229 : /*
230 : * Free a snapshot builder.
231 : */
232 : void
233 1700 : FreeSnapshotBuilder(SnapBuild *builder)
234 : {
235 1700 : MemoryContext context = builder->context;
236 :
237 : /* free snapshot explicitly, that contains some error checking */
238 1700 : if (builder->snapshot != NULL)
239 : {
240 410 : SnapBuildSnapDecRefcount(builder->snapshot);
241 410 : builder->snapshot = NULL;
242 : }
243 :
244 : /* other resources are deallocated via memory context reset */
245 1700 : MemoryContextDelete(context);
246 1700 : }
247 :
248 : /*
249 : * Free an unreferenced snapshot that has previously been built by us.
250 : */
251 : static void
252 2476 : SnapBuildFreeSnapshot(Snapshot snap)
253 : {
254 : /* make sure we don't get passed an external snapshot */
255 : Assert(snap->snapshot_type == SNAPSHOT_HISTORIC_MVCC);
256 :
257 : /* make sure nobody modified our snapshot */
258 : Assert(snap->curcid == FirstCommandId);
259 : Assert(!snap->suboverflowed);
260 : Assert(!snap->takenDuringRecovery);
261 : Assert(snap->regd_count == 0);
262 :
263 : /* slightly more likely, so it's checked even without c-asserts */
264 2476 : if (snap->copied)
265 0 : elog(ERROR, "cannot free a copied snapshot");
266 :
267 2476 : if (snap->active_count)
268 0 : elog(ERROR, "cannot free an active snapshot");
269 :
270 2476 : pfree(snap);
271 2476 : }
272 :
273 : /*
274 : * In which state of snapshot building are we?
275 : */
276 : SnapBuildState
277 4547816 : SnapBuildCurrentState(SnapBuild *builder)
278 : {
279 4547816 : return builder->state;
280 : }
281 :
282 : /*
283 : * Return the LSN at which the two-phase decoding was first enabled.
284 : */
285 : XLogRecPtr
286 66 : SnapBuildGetTwoPhaseAt(SnapBuild *builder)
287 : {
288 66 : return builder->two_phase_at;
289 : }
290 :
291 : /*
292 : * Set the LSN at which two-phase decoding is enabled.
293 : */
294 : void
295 16 : SnapBuildSetTwoPhaseAt(SnapBuild *builder, XLogRecPtr ptr)
296 : {
297 16 : builder->two_phase_at = ptr;
298 16 : }
299 :
300 : /*
301 : * Should the contents of transaction ending at 'ptr' be decoded?
302 : */
303 : bool
304 1014850 : SnapBuildXactNeedsSkip(SnapBuild *builder, XLogRecPtr ptr)
305 : {
306 1014850 : return ptr < builder->start_decoding_at;
307 : }
308 :
309 : /*
310 : * Increase refcount of a snapshot.
311 : *
312 : * This is used when handing out a snapshot to some external resource or when
313 : * adding a Snapshot as builder->snapshot.
314 : */
315 : static void
316 10778 : SnapBuildSnapIncRefcount(Snapshot snap)
317 : {
318 10778 : snap->active_count++;
319 10778 : }
320 :
321 : /*
322 : * Decrease refcount of a snapshot and free if the refcount reaches zero.
323 : *
324 : * Externally visible, so that external resources that have been handed an
325 : * IncRef'ed Snapshot can adjust its refcount easily.
326 : */
327 : void
328 10360 : SnapBuildSnapDecRefcount(Snapshot snap)
329 : {
330 : /* make sure we don't get passed an external snapshot */
331 : Assert(snap->snapshot_type == SNAPSHOT_HISTORIC_MVCC);
332 :
333 : /* make sure nobody modified our snapshot */
334 : Assert(snap->curcid == FirstCommandId);
335 : Assert(!snap->suboverflowed);
336 : Assert(!snap->takenDuringRecovery);
337 :
338 : Assert(snap->regd_count == 0);
339 :
340 : Assert(snap->active_count > 0);
341 :
342 : /* slightly more likely, so it's checked even without casserts */
343 10360 : if (snap->copied)
344 0 : elog(ERROR, "cannot free a copied snapshot");
345 :
346 10360 : snap->active_count--;
347 10360 : if (snap->active_count == 0)
348 2476 : SnapBuildFreeSnapshot(snap);
349 10360 : }
350 :
351 : /*
352 : * Build a new snapshot, based on currently committed catalog-modifying
353 : * transactions.
354 : *
355 : * In-progress transactions with catalog access are *not* allowed to modify
356 : * these snapshots; they have to copy them and fill in appropriate ->curcid
357 : * and ->subxip/subxcnt values.
358 : */
359 : static Snapshot
360 3248 : SnapBuildBuildSnapshot(SnapBuild *builder)
361 : {
362 : Snapshot snapshot;
363 : Size ssize;
364 :
365 : Assert(builder->state >= SNAPBUILD_FULL_SNAPSHOT);
366 :
367 3248 : ssize = sizeof(SnapshotData)
368 3248 : + sizeof(TransactionId) * builder->committed.xcnt
369 3248 : + sizeof(TransactionId) * 1 /* toplevel xid */ ;
370 :
371 3248 : snapshot = MemoryContextAllocZero(builder->context, ssize);
372 :
373 3248 : snapshot->snapshot_type = SNAPSHOT_HISTORIC_MVCC;
374 :
375 : /*
376 : * We misuse the original meaning of SnapshotData's xip and subxip fields
377 : * to make the more fitting for our needs.
378 : *
379 : * In the 'xip' array we store transactions that have to be treated as
380 : * committed. Since we will only ever look at tuples from transactions
381 : * that have modified the catalog it's more efficient to store those few
382 : * that exist between xmin and xmax (frequently there are none).
383 : *
384 : * Snapshots that are used in transactions that have modified the catalog
385 : * also use the 'subxip' array to store their toplevel xid and all the
386 : * subtransaction xids so we can recognize when we need to treat rows as
387 : * visible that are not in xip but still need to be visible. Subxip only
388 : * gets filled when the transaction is copied into the context of a
389 : * catalog modifying transaction since we otherwise share a snapshot
390 : * between transactions. As long as a txn hasn't modified the catalog it
391 : * doesn't need to treat any uncommitted rows as visible, so there is no
392 : * need for those xids.
393 : *
394 : * Both arrays are qsort'ed so that we can use bsearch() on them.
395 : */
396 : Assert(TransactionIdIsNormal(builder->xmin));
397 : Assert(TransactionIdIsNormal(builder->xmax));
398 :
399 3248 : snapshot->xmin = builder->xmin;
400 3248 : snapshot->xmax = builder->xmax;
401 :
402 : /* store all transactions to be treated as committed by this snapshot */
403 3248 : snapshot->xip =
404 3248 : (TransactionId *) ((char *) snapshot + sizeof(SnapshotData));
405 3248 : snapshot->xcnt = builder->committed.xcnt;
406 3248 : memcpy(snapshot->xip,
407 3248 : builder->committed.xip,
408 3248 : builder->committed.xcnt * sizeof(TransactionId));
409 :
410 : /* sort so we can bsearch() */
411 3248 : qsort(snapshot->xip, snapshot->xcnt, sizeof(TransactionId), xidComparator);
412 :
413 : /*
414 : * Initially, subxip is empty, i.e. it's a snapshot to be used by
415 : * transactions that don't modify the catalog. Will be filled by
416 : * ReorderBufferCopySnap() if necessary.
417 : */
418 3248 : snapshot->subxcnt = 0;
419 3248 : snapshot->subxip = NULL;
420 :
421 3248 : snapshot->suboverflowed = false;
422 3248 : snapshot->takenDuringRecovery = false;
423 3248 : snapshot->copied = false;
424 3248 : snapshot->curcid = FirstCommandId;
425 3248 : snapshot->active_count = 0;
426 3248 : snapshot->regd_count = 0;
427 3248 : snapshot->snapXactCompletionCount = 0;
428 :
429 3248 : return snapshot;
430 : }
431 :
432 : /*
433 : * Build the initial slot snapshot and convert it to a normal snapshot that
434 : * is understood by HeapTupleSatisfiesMVCC.
435 : *
436 : * The snapshot will be usable directly in current transaction or exported
437 : * for loading in different transaction.
438 : */
439 : Snapshot
440 384 : SnapBuildInitialSnapshot(SnapBuild *builder)
441 : {
442 : Snapshot snap;
443 : TransactionId xid;
444 : TransactionId safeXid;
445 : TransactionId *newxip;
446 384 : int newxcnt = 0;
447 :
448 : Assert(XactIsoLevel == XACT_REPEATABLE_READ);
449 : Assert(builder->building_full_snapshot);
450 :
451 : /* don't allow older snapshots */
452 384 : InvalidateCatalogSnapshot(); /* about to overwrite MyProc->xmin */
453 384 : if (HaveRegisteredOrActiveSnapshot())
454 0 : elog(ERROR, "cannot build an initial slot snapshot when snapshots exist");
455 : Assert(!HistoricSnapshotActive());
456 :
457 384 : if (builder->state != SNAPBUILD_CONSISTENT)
458 0 : elog(ERROR, "cannot build an initial slot snapshot before reaching a consistent state");
459 :
460 384 : if (!builder->committed.includes_all_transactions)
461 0 : elog(ERROR, "cannot build an initial slot snapshot, not all transactions are monitored anymore");
462 :
463 : /* so we don't overwrite the existing value */
464 384 : if (TransactionIdIsValid(MyProc->xmin))
465 0 : elog(ERROR, "cannot build an initial slot snapshot when MyProc->xmin already is valid");
466 :
467 384 : snap = SnapBuildBuildSnapshot(builder);
468 :
469 : /*
470 : * We know that snap->xmin is alive, enforced by the logical xmin
471 : * mechanism. Due to that we can do this without locks, we're only
472 : * changing our own value.
473 : *
474 : * Building an initial snapshot is expensive and an unenforced xmin
475 : * horizon would have bad consequences, therefore always double-check that
476 : * the horizon is enforced.
477 : */
478 384 : LWLockAcquire(ProcArrayLock, LW_SHARED);
479 384 : safeXid = GetOldestSafeDecodingTransactionId(false);
480 384 : LWLockRelease(ProcArrayLock);
481 :
482 384 : if (TransactionIdFollows(safeXid, snap->xmin))
483 0 : elog(ERROR, "cannot build an initial slot snapshot as oldest safe xid %u follows snapshot's xmin %u",
484 : safeXid, snap->xmin);
485 :
486 384 : MyProc->xmin = snap->xmin;
487 :
488 : /* allocate in transaction context */
489 : newxip = (TransactionId *)
490 384 : palloc(sizeof(TransactionId) * GetMaxSnapshotXidCount());
491 :
492 : /*
493 : * snapbuild.c builds transactions in an "inverted" manner, which means it
494 : * stores committed transactions in ->xip, not ones in progress. Build a
495 : * classical snapshot by marking all non-committed transactions as
496 : * in-progress. This can be expensive.
497 : */
498 384 : for (xid = snap->xmin; NormalTransactionIdPrecedes(xid, snap->xmax);)
499 : {
500 : void *test;
501 :
502 : /*
503 : * Check whether transaction committed using the decoding snapshot
504 : * meaning of ->xip.
505 : */
506 0 : test = bsearch(&xid, snap->xip, snap->xcnt,
507 : sizeof(TransactionId), xidComparator);
508 :
509 0 : if (test == NULL)
510 : {
511 0 : if (newxcnt >= GetMaxSnapshotXidCount())
512 0 : ereport(ERROR,
513 : (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
514 : errmsg("initial slot snapshot too large")));
515 :
516 0 : newxip[newxcnt++] = xid;
517 : }
518 :
519 0 : TransactionIdAdvance(xid);
520 : }
521 :
522 : /* adjust remaining snapshot fields as needed */
523 384 : snap->snapshot_type = SNAPSHOT_MVCC;
524 384 : snap->xcnt = newxcnt;
525 384 : snap->xip = newxip;
526 :
527 384 : return snap;
528 : }
529 :
530 : /*
531 : * Export a snapshot so it can be set in another session with SET TRANSACTION
532 : * SNAPSHOT.
533 : *
534 : * For that we need to start a transaction in the current backend as the
535 : * importing side checks whether the source transaction is still open to make
536 : * sure the xmin horizon hasn't advanced since then.
537 : */
538 : const char *
539 0 : SnapBuildExportSnapshot(SnapBuild *builder)
540 : {
541 : Snapshot snap;
542 : char *snapname;
543 :
544 0 : if (IsTransactionOrTransactionBlock())
545 0 : elog(ERROR, "cannot export a snapshot from within a transaction");
546 :
547 0 : if (SavedResourceOwnerDuringExport)
548 0 : elog(ERROR, "can only export one snapshot at a time");
549 :
550 0 : SavedResourceOwnerDuringExport = CurrentResourceOwner;
551 0 : ExportInProgress = true;
552 :
553 0 : StartTransactionCommand();
554 :
555 : /* There doesn't seem to a nice API to set these */
556 0 : XactIsoLevel = XACT_REPEATABLE_READ;
557 0 : XactReadOnly = true;
558 :
559 0 : snap = SnapBuildInitialSnapshot(builder);
560 :
561 : /*
562 : * now that we've built a plain snapshot, make it active and use the
563 : * normal mechanisms for exporting it
564 : */
565 0 : snapname = ExportSnapshot(snap);
566 :
567 0 : ereport(LOG,
568 : (errmsg_plural("exported logical decoding snapshot: \"%s\" with %u transaction ID",
569 : "exported logical decoding snapshot: \"%s\" with %u transaction IDs",
570 : snap->xcnt,
571 : snapname, snap->xcnt)));
572 0 : return snapname;
573 : }
574 :
575 : /*
576 : * Ensure there is a snapshot and if not build one for current transaction.
577 : */
578 : Snapshot
579 16 : SnapBuildGetOrBuildSnapshot(SnapBuild *builder)
580 : {
581 : Assert(builder->state == SNAPBUILD_CONSISTENT);
582 :
583 : /* only build a new snapshot if we don't have a prebuilt one */
584 16 : if (builder->snapshot == NULL)
585 : {
586 2 : builder->snapshot = SnapBuildBuildSnapshot(builder);
587 : /* increase refcount for the snapshot builder */
588 2 : SnapBuildSnapIncRefcount(builder->snapshot);
589 : }
590 :
591 16 : return builder->snapshot;
592 : }
593 :
594 : /*
595 : * Reset a previously SnapBuildExportSnapshot()'ed snapshot if there is
596 : * any. Aborts the previously started transaction and resets the resource
597 : * owner back to its original value.
598 : */
599 : void
600 10096 : SnapBuildClearExportedSnapshot(void)
601 : {
602 : ResourceOwner tmpResOwner;
603 :
604 : /* nothing exported, that is the usual case */
605 10096 : if (!ExportInProgress)
606 10096 : return;
607 :
608 0 : if (!IsTransactionState())
609 0 : elog(ERROR, "clearing exported snapshot in wrong transaction state");
610 :
611 : /*
612 : * AbortCurrentTransaction() takes care of resetting the snapshot state,
613 : * so remember SavedResourceOwnerDuringExport.
614 : */
615 0 : tmpResOwner = SavedResourceOwnerDuringExport;
616 :
617 : /* make sure nothing could have ever happened */
618 0 : AbortCurrentTransaction();
619 :
620 0 : CurrentResourceOwner = tmpResOwner;
621 : }
622 :
623 : /*
624 : * Clear snapshot export state during transaction abort.
625 : */
626 : void
627 48902 : SnapBuildResetExportedSnapshotState(void)
628 : {
629 48902 : SavedResourceOwnerDuringExport = NULL;
630 48902 : ExportInProgress = false;
631 48902 : }
632 :
633 : /*
634 : * Handle the effects of a single heap change, appropriate to the current state
635 : * of the snapshot builder and returns whether changes made at (xid, lsn) can
636 : * be decoded.
637 : */
638 : bool
639 3080230 : SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
640 : {
641 : /*
642 : * We can't handle data in transactions if we haven't built a snapshot
643 : * yet, so don't store them.
644 : */
645 3080230 : if (builder->state < SNAPBUILD_FULL_SNAPSHOT)
646 0 : return false;
647 :
648 : /*
649 : * No point in keeping track of changes in transactions that we don't have
650 : * enough information about to decode. This means that they started before
651 : * we got into the SNAPBUILD_FULL_SNAPSHOT state.
652 : */
653 3080236 : if (builder->state < SNAPBUILD_CONSISTENT &&
654 6 : TransactionIdPrecedes(xid, builder->next_phase_at))
655 0 : return false;
656 :
657 : /*
658 : * If the reorderbuffer doesn't yet have a snapshot, add one now, it will
659 : * be needed to decode the change we're currently processing.
660 : */
661 3080230 : if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
662 : {
663 : /* only build a new snapshot if we don't have a prebuilt one */
664 5726 : if (builder->snapshot == NULL)
665 : {
666 698 : builder->snapshot = SnapBuildBuildSnapshot(builder);
667 : /* increase refcount for the snapshot builder */
668 698 : SnapBuildSnapIncRefcount(builder->snapshot);
669 : }
670 :
671 : /*
672 : * Increase refcount for the transaction we're handing the snapshot
673 : * out to.
674 : */
675 5726 : SnapBuildSnapIncRefcount(builder->snapshot);
676 5726 : ReorderBufferSetBaseSnapshot(builder->reorder, xid, lsn,
677 : builder->snapshot);
678 : }
679 :
680 3080230 : return true;
681 : }
682 :
683 : /*
684 : * Do CommandId/combo CID handling after reading an xl_heap_new_cid record.
685 : * This implies that a transaction has done some form of write to system
686 : * catalogs.
687 : */
688 : void
689 48318 : SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
690 : XLogRecPtr lsn, xl_heap_new_cid *xlrec)
691 : {
692 : CommandId cid;
693 :
694 : /*
695 : * we only log new_cid's if a catalog tuple was modified, so mark the
696 : * transaction as containing catalog modifications
697 : */
698 48318 : ReorderBufferXidSetCatalogChanges(builder->reorder, xid, lsn);
699 :
700 48318 : ReorderBufferAddNewTupleCids(builder->reorder, xlrec->top_xid, lsn,
701 : xlrec->target_locator, xlrec->target_tid,
702 : xlrec->cmin, xlrec->cmax,
703 : xlrec->combocid);
704 :
705 : /* figure out new command id */
706 48318 : if (xlrec->cmin != InvalidCommandId &&
707 40836 : xlrec->cmax != InvalidCommandId)
708 6438 : cid = Max(xlrec->cmin, xlrec->cmax);
709 41880 : else if (xlrec->cmax != InvalidCommandId)
710 7482 : cid = xlrec->cmax;
711 34398 : else if (xlrec->cmin != InvalidCommandId)
712 34398 : cid = xlrec->cmin;
713 : else
714 : {
715 0 : cid = InvalidCommandId; /* silence compiler */
716 0 : elog(ERROR, "xl_heap_new_cid record without a valid CommandId");
717 : }
718 :
719 48318 : ReorderBufferAddNewCommandId(builder->reorder, xid, lsn, cid + 1);
720 48318 : }
721 :
722 : /*
723 : * Add a new Snapshot and invalidation messages to all transactions we're
724 : * decoding that currently are in-progress so they can see new catalog contents
725 : * made by the transaction that just committed. This is necessary because those
726 : * in-progress transactions will use the new catalog's contents from here on
727 : * (at the very least everything they do needs to be compatible with newer
728 : * catalog contents).
729 : */
730 : static void
731 2148 : SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid)
732 : {
733 : dlist_iter txn_i;
734 : ReorderBufferTXN *txn;
735 :
736 : /*
737 : * Iterate through all toplevel transactions. This can include
738 : * subtransactions which we just don't yet know to be that, but that's
739 : * fine, they will just get an unnecessary snapshot and invalidations
740 : * queued.
741 : */
742 4354 : dlist_foreach(txn_i, &builder->reorder->toplevel_by_lsn)
743 : {
744 2206 : txn = dlist_container(ReorderBufferTXN, node, txn_i.cur);
745 :
746 : Assert(TransactionIdIsValid(txn->xid));
747 :
748 : /*
749 : * If we don't have a base snapshot yet, there are no changes in this
750 : * transaction which in turn implies we don't yet need a snapshot at
751 : * all. We'll add a snapshot when the first change gets queued.
752 : *
753 : * Similarly, we don't need to add invalidations to a transaction
754 : * whose base snapshot is not yet set. Once a base snapshot is built,
755 : * it will include the xids of committed transactions that have
756 : * modified the catalog, thus reflecting the new catalog contents. The
757 : * existing catalog cache will have already been invalidated after
758 : * processing the invalidations in the transaction that modified
759 : * catalogs, ensuring that a fresh cache is constructed during
760 : * decoding.
761 : *
762 : * NB: This works correctly even for subtransactions because
763 : * ReorderBufferAssignChild() takes care to transfer the base snapshot
764 : * to the top-level transaction, and while iterating the changequeue
765 : * we'll get the change from the subtxn.
766 : */
767 2206 : if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, txn->xid))
768 4 : continue;
769 :
770 : /*
771 : * We don't need to add snapshot or invalidations to prepared
772 : * transactions as they should not see the new catalog contents.
773 : */
774 2202 : if (rbtxn_is_prepared(txn))
775 56 : continue;
776 :
777 2146 : elog(DEBUG2, "adding a new snapshot and invalidations to %u at %X/%X",
778 : txn->xid, LSN_FORMAT_ARGS(lsn));
779 :
780 : /*
781 : * increase the snapshot's refcount for the transaction we are handing
782 : * it out to
783 : */
784 2146 : SnapBuildSnapIncRefcount(builder->snapshot);
785 2146 : ReorderBufferAddSnapshot(builder->reorder, txn->xid, lsn,
786 : builder->snapshot);
787 :
788 : /*
789 : * Add invalidation messages to the reorder buffer of in-progress
790 : * transactions except the current committed transaction, for which we
791 : * will execute invalidations at the end.
792 : *
793 : * It is required, otherwise, we will end up using the stale catcache
794 : * contents built by the current transaction even after its decoding,
795 : * which should have been invalidated due to concurrent catalog
796 : * changing transaction.
797 : */
798 2146 : if (txn->xid != xid)
799 : {
800 : uint32 ninvalidations;
801 54 : SharedInvalidationMessage *msgs = NULL;
802 :
803 54 : ninvalidations = ReorderBufferGetInvalidations(builder->reorder,
804 : xid, &msgs);
805 :
806 54 : if (ninvalidations > 0)
807 : {
808 : Assert(msgs != NULL);
809 :
810 50 : ReorderBufferAddInvalidations(builder->reorder, txn->xid, lsn,
811 : ninvalidations, msgs);
812 : }
813 : }
814 : }
815 2148 : }
816 :
817 : /*
818 : * Keep track of a new catalog changing transaction that has committed.
819 : */
820 : static void
821 2166 : SnapBuildAddCommittedTxn(SnapBuild *builder, TransactionId xid)
822 : {
823 : Assert(TransactionIdIsValid(xid));
824 :
825 2166 : if (builder->committed.xcnt == builder->committed.xcnt_space)
826 : {
827 0 : builder->committed.xcnt_space = builder->committed.xcnt_space * 2 + 1;
828 :
829 0 : elog(DEBUG1, "increasing space for committed transactions to %u",
830 : (uint32) builder->committed.xcnt_space);
831 :
832 0 : builder->committed.xip = repalloc(builder->committed.xip,
833 0 : builder->committed.xcnt_space * sizeof(TransactionId));
834 : }
835 :
836 : /*
837 : * TODO: It might make sense to keep the array sorted here instead of
838 : * doing it every time we build a new snapshot. On the other hand this
839 : * gets called repeatedly when a transaction with subtransactions commits.
840 : */
841 2166 : builder->committed.xip[builder->committed.xcnt++] = xid;
842 2166 : }
843 :
844 : /*
845 : * Remove knowledge about transactions we treat as committed or containing catalog
846 : * changes that are smaller than ->xmin. Those won't ever get checked via
847 : * the ->committed or ->catchange array, respectively. The committed xids will
848 : * get checked via the clog machinery.
849 : *
850 : * We can ideally remove the transaction from catchange array once it is
851 : * finished (committed/aborted) but that could be costly as we need to maintain
852 : * the xids order in the array.
853 : */
854 : static void
855 734 : SnapBuildPurgeOlderTxn(SnapBuild *builder)
856 : {
857 : int off;
858 : TransactionId *workspace;
859 734 : int surviving_xids = 0;
860 :
861 : /* not ready yet */
862 734 : if (!TransactionIdIsNormal(builder->xmin))
863 0 : return;
864 :
865 : /* TODO: Neater algorithm than just copying and iterating? */
866 : workspace =
867 734 : MemoryContextAlloc(builder->context,
868 734 : builder->committed.xcnt * sizeof(TransactionId));
869 :
870 : /* copy xids that still are interesting to workspace */
871 1158 : for (off = 0; off < builder->committed.xcnt; off++)
872 : {
873 424 : if (NormalTransactionIdPrecedes(builder->committed.xip[off],
874 : builder->xmin))
875 : ; /* remove */
876 : else
877 2 : workspace[surviving_xids++] = builder->committed.xip[off];
878 : }
879 :
880 : /* copy workspace back to persistent state */
881 734 : memcpy(builder->committed.xip, workspace,
882 : surviving_xids * sizeof(TransactionId));
883 :
884 734 : elog(DEBUG3, "purged committed transactions from %u to %u, xmin: %u, xmax: %u",
885 : (uint32) builder->committed.xcnt, (uint32) surviving_xids,
886 : builder->xmin, builder->xmax);
887 734 : builder->committed.xcnt = surviving_xids;
888 :
889 734 : pfree(workspace);
890 :
891 : /*
892 : * Purge xids in ->catchange as well. The purged array must also be sorted
893 : * in xidComparator order.
894 : */
895 734 : if (builder->catchange.xcnt > 0)
896 : {
897 : /*
898 : * Since catchange.xip is sorted, we find the lower bound of xids that
899 : * are still interesting.
900 : */
901 14 : for (off = 0; off < builder->catchange.xcnt; off++)
902 : {
903 10 : if (TransactionIdFollowsOrEquals(builder->catchange.xip[off],
904 : builder->xmin))
905 2 : break;
906 : }
907 :
908 6 : surviving_xids = builder->catchange.xcnt - off;
909 :
910 6 : if (surviving_xids > 0)
911 : {
912 2 : memmove(builder->catchange.xip, &(builder->catchange.xip[off]),
913 : surviving_xids * sizeof(TransactionId));
914 : }
915 : else
916 : {
917 4 : pfree(builder->catchange.xip);
918 4 : builder->catchange.xip = NULL;
919 : }
920 :
921 6 : elog(DEBUG3, "purged catalog modifying transactions from %u to %u, xmin: %u, xmax: %u",
922 : (uint32) builder->catchange.xcnt, (uint32) surviving_xids,
923 : builder->xmin, builder->xmax);
924 6 : builder->catchange.xcnt = surviving_xids;
925 : }
926 : }
927 :
928 : /*
929 : * Handle everything that needs to be done when a transaction commits
930 : */
931 : void
932 5642 : SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
933 : int nsubxacts, TransactionId *subxacts, uint32 xinfo)
934 : {
935 : int nxact;
936 :
937 5642 : bool needs_snapshot = false;
938 5642 : bool needs_timetravel = false;
939 5642 : bool sub_needs_timetravel = false;
940 :
941 5642 : TransactionId xmax = xid;
942 :
943 : /*
944 : * Transactions preceding BUILDING_SNAPSHOT will neither be decoded, nor
945 : * will they be part of a snapshot. So we don't need to record anything.
946 : */
947 5642 : if (builder->state == SNAPBUILD_START ||
948 5642 : (builder->state == SNAPBUILD_BUILDING_SNAPSHOT &&
949 0 : TransactionIdPrecedes(xid, builder->next_phase_at)))
950 : {
951 : /* ensure that only commits after this are getting replayed */
952 0 : if (builder->start_decoding_at <= lsn)
953 0 : builder->start_decoding_at = lsn + 1;
954 0 : return;
955 : }
956 :
957 5642 : if (builder->state < SNAPBUILD_CONSISTENT)
958 : {
959 : /* ensure that only commits after this are getting replayed */
960 10 : if (builder->start_decoding_at <= lsn)
961 4 : builder->start_decoding_at = lsn + 1;
962 :
963 : /*
964 : * If building an exportable snapshot, force xid to be tracked, even
965 : * if the transaction didn't modify the catalog.
966 : */
967 10 : if (builder->building_full_snapshot)
968 : {
969 0 : needs_timetravel = true;
970 : }
971 : }
972 :
973 8084 : for (nxact = 0; nxact < nsubxacts; nxact++)
974 : {
975 2442 : TransactionId subxid = subxacts[nxact];
976 :
977 : /*
978 : * Add subtransaction to base snapshot if catalog modifying, we don't
979 : * distinguish to toplevel transactions there.
980 : */
981 2442 : if (SnapBuildXidHasCatalogChanges(builder, subxid, xinfo))
982 : {
983 18 : sub_needs_timetravel = true;
984 18 : needs_snapshot = true;
985 :
986 18 : elog(DEBUG1, "found subtransaction %u:%u with catalog changes",
987 : xid, subxid);
988 :
989 18 : SnapBuildAddCommittedTxn(builder, subxid);
990 :
991 18 : if (NormalTransactionIdFollows(subxid, xmax))
992 18 : xmax = subxid;
993 : }
994 :
995 : /*
996 : * If we're forcing timetravel we also need visibility information
997 : * about subtransaction, so keep track of subtransaction's state, even
998 : * if not catalog modifying. Don't need to distribute a snapshot in
999 : * that case.
1000 : */
1001 2424 : else if (needs_timetravel)
1002 : {
1003 0 : SnapBuildAddCommittedTxn(builder, subxid);
1004 0 : if (NormalTransactionIdFollows(subxid, xmax))
1005 0 : xmax = subxid;
1006 : }
1007 : }
1008 :
1009 : /* if top-level modified catalog, it'll need a snapshot */
1010 5642 : if (SnapBuildXidHasCatalogChanges(builder, xid, xinfo))
1011 : {
1012 2146 : elog(DEBUG2, "found top level transaction %u, with catalog changes",
1013 : xid);
1014 2146 : needs_snapshot = true;
1015 2146 : needs_timetravel = true;
1016 2146 : SnapBuildAddCommittedTxn(builder, xid);
1017 : }
1018 3496 : else if (sub_needs_timetravel)
1019 : {
1020 : /* track toplevel txn as well, subxact alone isn't meaningful */
1021 2 : elog(DEBUG2, "forced transaction %u to do timetravel due to one of its subtransactions",
1022 : xid);
1023 2 : needs_timetravel = true;
1024 2 : SnapBuildAddCommittedTxn(builder, xid);
1025 : }
1026 3494 : else if (needs_timetravel)
1027 : {
1028 0 : elog(DEBUG2, "forced transaction %u to do timetravel", xid);
1029 :
1030 0 : SnapBuildAddCommittedTxn(builder, xid);
1031 : }
1032 :
1033 5642 : if (!needs_timetravel)
1034 : {
1035 : /* record that we cannot export a general snapshot anymore */
1036 3494 : builder->committed.includes_all_transactions = false;
1037 : }
1038 :
1039 : Assert(!needs_snapshot || needs_timetravel);
1040 :
1041 : /*
1042 : * Adjust xmax of the snapshot builder, we only do that for committed,
1043 : * catalog modifying, transactions, everything else isn't interesting for
1044 : * us since we'll never look at the respective rows.
1045 : */
1046 5642 : if (needs_timetravel &&
1047 4296 : (!TransactionIdIsValid(builder->xmax) ||
1048 2148 : TransactionIdFollowsOrEquals(xmax, builder->xmax)))
1049 : {
1050 2148 : builder->xmax = xmax;
1051 2148 : TransactionIdAdvance(builder->xmax);
1052 : }
1053 :
1054 : /* if there's any reason to build a historic snapshot, do so now */
1055 5642 : if (needs_snapshot)
1056 : {
1057 : /*
1058 : * If we haven't built a complete snapshot yet there's no need to hand
1059 : * it out, it wouldn't (and couldn't) be used anyway.
1060 : */
1061 2148 : if (builder->state < SNAPBUILD_FULL_SNAPSHOT)
1062 0 : return;
1063 :
1064 : /*
1065 : * Decrease the snapshot builder's refcount of the old snapshot, note
1066 : * that it still will be used if it has been handed out to the
1067 : * reorderbuffer earlier.
1068 : */
1069 2148 : if (builder->snapshot)
1070 2136 : SnapBuildSnapDecRefcount(builder->snapshot);
1071 :
1072 2148 : builder->snapshot = SnapBuildBuildSnapshot(builder);
1073 :
1074 : /* we might need to execute invalidations, add snapshot */
1075 2148 : if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
1076 : {
1077 42 : SnapBuildSnapIncRefcount(builder->snapshot);
1078 42 : ReorderBufferSetBaseSnapshot(builder->reorder, xid, lsn,
1079 : builder->snapshot);
1080 : }
1081 :
1082 : /* refcount of the snapshot builder for the new snapshot */
1083 2148 : SnapBuildSnapIncRefcount(builder->snapshot);
1084 :
1085 : /*
1086 : * Add a new catalog snapshot and invalidations messages to all
1087 : * currently running transactions.
1088 : */
1089 2148 : SnapBuildDistributeSnapshotAndInval(builder, lsn, xid);
1090 : }
1091 : }
1092 :
1093 : /*
1094 : * Check the reorder buffer and the snapshot to see if the given transaction has
1095 : * modified catalogs.
1096 : */
1097 : static inline bool
1098 8084 : SnapBuildXidHasCatalogChanges(SnapBuild *builder, TransactionId xid,
1099 : uint32 xinfo)
1100 : {
1101 8084 : if (ReorderBufferXidHasCatalogChanges(builder->reorder, xid))
1102 2156 : return true;
1103 :
1104 : /*
1105 : * The transactions that have changed catalogs must have invalidation
1106 : * info.
1107 : */
1108 5928 : if (!(xinfo & XACT_XINFO_HAS_INVALS))
1109 5912 : return false;
1110 :
1111 : /* Check the catchange XID array */
1112 24 : return ((builder->catchange.xcnt > 0) &&
1113 8 : (bsearch(&xid, builder->catchange.xip, builder->catchange.xcnt,
1114 : sizeof(TransactionId), xidComparator) != NULL));
1115 : }
1116 :
1117 : /* -----------------------------------
1118 : * Snapshot building functions dealing with xlog records
1119 : * -----------------------------------
1120 : */
1121 :
1122 : /*
1123 : * Process a running xacts record, and use its information to first build a
1124 : * historic snapshot and later to release resources that aren't needed
1125 : * anymore.
1126 : */
1127 : void
1128 2666 : SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
1129 : {
1130 : ReorderBufferTXN *txn;
1131 : TransactionId xmin;
1132 :
1133 : /*
1134 : * If we're not consistent yet, inspect the record to see whether it
1135 : * allows to get closer to being consistent. If we are consistent, dump
1136 : * our snapshot so others or we, after a restart, can use it.
1137 : */
1138 2666 : if (builder->state < SNAPBUILD_CONSISTENT)
1139 : {
1140 : /* returns false if there's no point in performing cleanup just yet */
1141 1976 : if (!SnapBuildFindSnapshot(builder, lsn, running))
1142 1928 : return;
1143 : }
1144 : else
1145 690 : SnapBuildSerialize(builder, lsn);
1146 :
1147 : /*
1148 : * Update range of interesting xids based on the running xacts
1149 : * information. We don't increase ->xmax using it, because once we are in
1150 : * a consistent state we can do that ourselves and much more efficiently
1151 : * so, because we only need to do it for catalog transactions since we
1152 : * only ever look at those.
1153 : *
1154 : * NB: We only increase xmax when a catalog modifying transaction commits
1155 : * (see SnapBuildCommitTxn). Because of this, xmax can be lower than
1156 : * xmin, which looks odd but is correct and actually more efficient, since
1157 : * we hit fast paths in heapam_visibility.c.
1158 : */
1159 734 : builder->xmin = running->oldestRunningXid;
1160 :
1161 : /* Remove transactions we don't need to keep track off anymore */
1162 734 : SnapBuildPurgeOlderTxn(builder);
1163 :
1164 : /*
1165 : * Advance the xmin limit for the current replication slot, to allow
1166 : * vacuum to clean up the tuples this slot has been protecting.
1167 : *
1168 : * The reorderbuffer might have an xmin among the currently running
1169 : * snapshots; use it if so. If not, we need only consider the snapshots
1170 : * we'll produce later, which can't be less than the oldest running xid in
1171 : * the record we're reading now.
1172 : */
1173 734 : xmin = ReorderBufferGetOldestXmin(builder->reorder);
1174 734 : if (xmin == InvalidTransactionId)
1175 652 : xmin = running->oldestRunningXid;
1176 734 : elog(DEBUG3, "xmin: %u, xmax: %u, oldest running: %u, oldest xmin: %u",
1177 : builder->xmin, builder->xmax, running->oldestRunningXid, xmin);
1178 734 : LogicalIncreaseXminForSlot(lsn, xmin);
1179 :
1180 : /*
1181 : * Also tell the slot where we can restart decoding from. We don't want to
1182 : * do that after every commit because changing that implies an fsync of
1183 : * the logical slot's state file, so we only do it every time we see a
1184 : * running xacts record.
1185 : *
1186 : * Do so by looking for the oldest in progress transaction (determined by
1187 : * the first LSN of any of its relevant records). Every transaction
1188 : * remembers the last location we stored the snapshot to disk before its
1189 : * beginning. That point is where we can restart from.
1190 : */
1191 :
1192 : /*
1193 : * Can't know about a serialized snapshot's location if we're not
1194 : * consistent.
1195 : */
1196 734 : if (builder->state < SNAPBUILD_CONSISTENT)
1197 34 : return;
1198 :
1199 700 : txn = ReorderBufferGetOldestTXN(builder->reorder);
1200 :
1201 : /*
1202 : * oldest ongoing txn might have started when we didn't yet serialize
1203 : * anything because we hadn't reached a consistent state yet.
1204 : */
1205 700 : if (txn != NULL && txn->restart_decoding_lsn != InvalidXLogRecPtr)
1206 46 : LogicalIncreaseRestartDecodingForSlot(lsn, txn->restart_decoding_lsn);
1207 :
1208 : /*
1209 : * No in-progress transaction, can reuse the last serialized snapshot if
1210 : * we have one.
1211 : */
1212 654 : else if (txn == NULL &&
1213 598 : builder->reorder->current_restart_decoding_lsn != InvalidXLogRecPtr &&
1214 594 : builder->last_serialized_snapshot != InvalidXLogRecPtr)
1215 594 : LogicalIncreaseRestartDecodingForSlot(lsn,
1216 : builder->last_serialized_snapshot);
1217 : }
1218 :
1219 :
1220 : /*
1221 : * Build the start of a snapshot that's capable of decoding the catalog.
1222 : *
1223 : * Helper function for SnapBuildProcessRunningXacts() while we're not yet
1224 : * consistent.
1225 : *
1226 : * Returns true if there is a point in performing internal maintenance/cleanup
1227 : * using the xl_running_xacts record.
1228 : */
1229 : static bool
1230 1976 : SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
1231 : {
1232 : /* ---
1233 : * Build catalog decoding snapshot incrementally using information about
1234 : * the currently running transactions. There are several ways to do that:
1235 : *
1236 : * a) There were no running transactions when the xl_running_xacts record
1237 : * was inserted, jump to CONSISTENT immediately. We might find such a
1238 : * state while waiting on c)'s sub-states.
1239 : *
1240 : * b) This (in a previous run) or another decoding slot serialized a
1241 : * snapshot to disk that we can use. Can't use this method while finding
1242 : * the start point for decoding changes as the restart LSN would be an
1243 : * arbitrary LSN but we need to find the start point to extract changes
1244 : * where we won't see the data for partial transactions. Also, we cannot
1245 : * use this method when a slot needs a full snapshot for export or direct
1246 : * use, as that snapshot will only contain catalog modifying transactions.
1247 : *
1248 : * c) First incrementally build a snapshot for catalog tuples
1249 : * (BUILDING_SNAPSHOT), that requires all, already in-progress,
1250 : * transactions to finish. Every transaction starting after that
1251 : * (FULL_SNAPSHOT state), has enough information to be decoded. But
1252 : * for older running transactions no viable snapshot exists yet, so
1253 : * CONSISTENT will only be reached once all of those have finished.
1254 : * ---
1255 : */
1256 :
1257 : /*
1258 : * xl_running_xacts record is older than what we can use, we might not
1259 : * have all necessary catalog rows anymore.
1260 : */
1261 1976 : if (TransactionIdIsNormal(builder->initial_xmin_horizon) &&
1262 892 : NormalTransactionIdPrecedes(running->oldestRunningXid,
1263 : builder->initial_xmin_horizon))
1264 : {
1265 0 : ereport(DEBUG1,
1266 : (errmsg_internal("skipping snapshot at %X/%X while building logical decoding snapshot, xmin horizon too low",
1267 : LSN_FORMAT_ARGS(lsn)),
1268 : errdetail_internal("initial xmin horizon of %u vs the snapshot's %u",
1269 : builder->initial_xmin_horizon, running->oldestRunningXid)));
1270 :
1271 :
1272 0 : SnapBuildWaitSnapshot(running, builder->initial_xmin_horizon);
1273 :
1274 0 : return true;
1275 : }
1276 :
1277 : /*
1278 : * a) No transaction were running, we can jump to consistent.
1279 : *
1280 : * This is not affected by races around xl_running_xacts, because we can
1281 : * miss transaction commits, but currently not transactions starting.
1282 : *
1283 : * NB: We might have already started to incrementally assemble a snapshot,
1284 : * so we need to be careful to deal with that.
1285 : */
1286 1976 : if (running->oldestRunningXid == running->nextXid)
1287 : {
1288 1912 : if (builder->start_decoding_at == InvalidXLogRecPtr ||
1289 1036 : builder->start_decoding_at <= lsn)
1290 : /* can decode everything after this */
1291 878 : builder->start_decoding_at = lsn + 1;
1292 :
1293 : /* As no transactions were running xmin/xmax can be trivially set. */
1294 1912 : builder->xmin = running->nextXid; /* < are finished */
1295 1912 : builder->xmax = running->nextXid; /* >= are running */
1296 :
1297 : /* so we can safely use the faster comparisons */
1298 : Assert(TransactionIdIsNormal(builder->xmin));
1299 : Assert(TransactionIdIsNormal(builder->xmax));
1300 :
1301 1912 : builder->state = SNAPBUILD_CONSISTENT;
1302 1912 : builder->next_phase_at = InvalidTransactionId;
1303 :
1304 1912 : ereport(LOG,
1305 : (errmsg("logical decoding found consistent point at %X/%X",
1306 : LSN_FORMAT_ARGS(lsn)),
1307 : errdetail("There are no running transactions.")));
1308 :
1309 1912 : return false;
1310 : }
1311 :
1312 : /*
1313 : * b) valid on disk state and while neither building full snapshot nor
1314 : * creating a slot.
1315 : */
1316 64 : else if (!builder->building_full_snapshot &&
1317 100 : !builder->in_slot_creation &&
1318 38 : SnapBuildRestore(builder, lsn))
1319 : {
1320 : /* there won't be any state to cleanup */
1321 16 : return false;
1322 : }
1323 :
1324 : /*
1325 : * c) transition from START to BUILDING_SNAPSHOT.
1326 : *
1327 : * In START state, and a xl_running_xacts record with running xacts is
1328 : * encountered. In that case, switch to BUILDING_SNAPSHOT state, and
1329 : * record xl_running_xacts->nextXid. Once all running xacts have finished
1330 : * (i.e. they're all >= nextXid), we have a complete catalog snapshot. It
1331 : * might look that we could use xl_running_xacts's ->xids information to
1332 : * get there quicker, but that is problematic because transactions marked
1333 : * as running, might already have inserted their commit record - it's
1334 : * infeasible to change that with locking.
1335 : */
1336 48 : else if (builder->state == SNAPBUILD_START)
1337 : {
1338 26 : builder->state = SNAPBUILD_BUILDING_SNAPSHOT;
1339 26 : builder->next_phase_at = running->nextXid;
1340 :
1341 : /*
1342 : * Start with an xmin/xmax that's correct for future, when all the
1343 : * currently running transactions have finished. We'll update both
1344 : * while waiting for the pending transactions to finish.
1345 : */
1346 26 : builder->xmin = running->nextXid; /* < are finished */
1347 26 : builder->xmax = running->nextXid; /* >= are running */
1348 :
1349 : /* so we can safely use the faster comparisons */
1350 : Assert(TransactionIdIsNormal(builder->xmin));
1351 : Assert(TransactionIdIsNormal(builder->xmax));
1352 :
1353 26 : ereport(LOG,
1354 : (errmsg("logical decoding found initial starting point at %X/%X",
1355 : LSN_FORMAT_ARGS(lsn)),
1356 : errdetail("Waiting for transactions (approximately %d) older than %u to end.",
1357 : running->xcnt, running->nextXid)));
1358 :
1359 26 : SnapBuildWaitSnapshot(running, running->nextXid);
1360 : }
1361 :
1362 : /*
1363 : * c) transition from BUILDING_SNAPSHOT to FULL_SNAPSHOT.
1364 : *
1365 : * In BUILDING_SNAPSHOT state, and this xl_running_xacts' oldestRunningXid
1366 : * is >= than nextXid from when we switched to BUILDING_SNAPSHOT. This
1367 : * means all transactions starting afterwards have enough information to
1368 : * be decoded. Switch to FULL_SNAPSHOT.
1369 : */
1370 34 : else if (builder->state == SNAPBUILD_BUILDING_SNAPSHOT &&
1371 12 : TransactionIdPrecedesOrEquals(builder->next_phase_at,
1372 : running->oldestRunningXid))
1373 : {
1374 10 : builder->state = SNAPBUILD_FULL_SNAPSHOT;
1375 10 : builder->next_phase_at = running->nextXid;
1376 :
1377 10 : ereport(LOG,
1378 : (errmsg("logical decoding found initial consistent point at %X/%X",
1379 : LSN_FORMAT_ARGS(lsn)),
1380 : errdetail("Waiting for transactions (approximately %d) older than %u to end.",
1381 : running->xcnt, running->nextXid)));
1382 :
1383 10 : SnapBuildWaitSnapshot(running, running->nextXid);
1384 : }
1385 :
1386 : /*
1387 : * c) transition from FULL_SNAPSHOT to CONSISTENT.
1388 : *
1389 : * In FULL_SNAPSHOT state, and this xl_running_xacts' oldestRunningXid is
1390 : * >= than nextXid from when we switched to FULL_SNAPSHOT. This means all
1391 : * transactions that are currently in progress have a catalog snapshot,
1392 : * and all their changes have been collected. Switch to CONSISTENT.
1393 : */
1394 22 : else if (builder->state == SNAPBUILD_FULL_SNAPSHOT &&
1395 10 : TransactionIdPrecedesOrEquals(builder->next_phase_at,
1396 : running->oldestRunningXid))
1397 : {
1398 10 : builder->state = SNAPBUILD_CONSISTENT;
1399 10 : builder->next_phase_at = InvalidTransactionId;
1400 :
1401 10 : ereport(LOG,
1402 : (errmsg("logical decoding found consistent point at %X/%X",
1403 : LSN_FORMAT_ARGS(lsn)),
1404 : errdetail("There are no old transactions anymore.")));
1405 : }
1406 :
1407 : /*
1408 : * We already started to track running xacts and need to wait for all
1409 : * in-progress ones to finish. We fall through to the normal processing of
1410 : * records so incremental cleanup can be performed.
1411 : */
1412 44 : return true;
1413 : }
1414 :
1415 : /* ---
1416 : * Iterate through xids in record, wait for all older than the cutoff to
1417 : * finish. Then, if possible, log a new xl_running_xacts record.
1418 : *
1419 : * This isn't required for the correctness of decoding, but to:
1420 : * a) allow isolationtester to notice that we're currently waiting for
1421 : * something.
1422 : * b) log a new xl_running_xacts record where it'd be helpful, without having
1423 : * to wait for bgwriter or checkpointer.
1424 : * ---
1425 : */
1426 : static void
1427 36 : SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff)
1428 : {
1429 : int off;
1430 :
1431 68 : for (off = 0; off < running->xcnt; off++)
1432 : {
1433 36 : TransactionId xid = running->xids[off];
1434 :
1435 : /*
1436 : * Upper layers should prevent that we ever need to wait on ourselves.
1437 : * Check anyway, since failing to do so would either result in an
1438 : * endless wait or an Assert() failure.
1439 : */
1440 36 : if (TransactionIdIsCurrentTransactionId(xid))
1441 0 : elog(ERROR, "waiting for ourselves");
1442 :
1443 36 : if (TransactionIdFollows(xid, cutoff))
1444 0 : continue;
1445 :
1446 36 : XactLockTableWait(xid, NULL, NULL, XLTW_None);
1447 : }
1448 :
1449 : /*
1450 : * All transactions we needed to finish finished - try to ensure there is
1451 : * another xl_running_xacts record in a timely manner, without having to
1452 : * wait for bgwriter or checkpointer to log one. During recovery we can't
1453 : * enforce that, so we'll have to wait.
1454 : */
1455 32 : if (!RecoveryInProgress())
1456 : {
1457 32 : LogStandbySnapshot();
1458 : }
1459 32 : }
1460 :
1461 : #define SnapBuildOnDiskConstantSize \
1462 : offsetof(SnapBuildOnDisk, builder)
1463 : #define SnapBuildOnDiskNotChecksummedSize \
1464 : offsetof(SnapBuildOnDisk, version)
1465 :
1466 : #define SNAPBUILD_MAGIC 0x51A1E001
1467 : #define SNAPBUILD_VERSION 6
1468 :
1469 : /*
1470 : * Store/Load a snapshot from disk, depending on the snapshot builder's state.
1471 : *
1472 : * Supposed to be used by external (i.e. not snapbuild.c) code that just read
1473 : * a record that's a potential location for a serialized snapshot.
1474 : */
1475 : void
1476 76 : SnapBuildSerializationPoint(SnapBuild *builder, XLogRecPtr lsn)
1477 : {
1478 76 : if (builder->state < SNAPBUILD_CONSISTENT)
1479 0 : SnapBuildRestore(builder, lsn);
1480 : else
1481 76 : SnapBuildSerialize(builder, lsn);
1482 76 : }
1483 :
1484 : /*
1485 : * Serialize the snapshot 'builder' at the location 'lsn' if it hasn't already
1486 : * been done by another decoding process.
1487 : */
1488 : static void
1489 766 : SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
1490 : {
1491 : Size needed_length;
1492 766 : SnapBuildOnDisk *ondisk = NULL;
1493 766 : TransactionId *catchange_xip = NULL;
1494 : MemoryContext old_ctx;
1495 : size_t catchange_xcnt;
1496 : char *ondisk_c;
1497 : int fd;
1498 : char tmppath[MAXPGPATH];
1499 : char path[MAXPGPATH];
1500 : int ret;
1501 : struct stat stat_buf;
1502 : Size sz;
1503 :
1504 : Assert(lsn != InvalidXLogRecPtr);
1505 : Assert(builder->last_serialized_snapshot == InvalidXLogRecPtr ||
1506 : builder->last_serialized_snapshot <= lsn);
1507 :
1508 : /*
1509 : * no point in serializing if we cannot continue to work immediately after
1510 : * restoring the snapshot
1511 : */
1512 766 : if (builder->state < SNAPBUILD_CONSISTENT)
1513 0 : return;
1514 :
1515 : /* consistent snapshots have no next phase */
1516 : Assert(builder->next_phase_at == InvalidTransactionId);
1517 :
1518 : /*
1519 : * We identify snapshots by the LSN they are valid for. We don't need to
1520 : * include timelines in the name as each LSN maps to exactly one timeline
1521 : * unless the user used pg_resetwal or similar. If a user did so, there's
1522 : * no hope continuing to decode anyway.
1523 : */
1524 766 : sprintf(path, "%s/%X-%X.snap",
1525 : PG_LOGICAL_SNAPSHOTS_DIR,
1526 766 : LSN_FORMAT_ARGS(lsn));
1527 :
1528 : /*
1529 : * first check whether some other backend already has written the snapshot
1530 : * for this LSN. It's perfectly fine if there's none, so we accept ENOENT
1531 : * as a valid state. Everything else is an unexpected error.
1532 : */
1533 766 : ret = stat(path, &stat_buf);
1534 :
1535 766 : if (ret != 0 && errno != ENOENT)
1536 0 : ereport(ERROR,
1537 : (errcode_for_file_access(),
1538 : errmsg("could not stat file \"%s\": %m", path)));
1539 :
1540 766 : else if (ret == 0)
1541 : {
1542 : /*
1543 : * somebody else has already serialized to this point, don't overwrite
1544 : * but remember location, so we don't need to read old data again.
1545 : *
1546 : * To be sure it has been synced to disk after the rename() from the
1547 : * tempfile filename to the real filename, we just repeat the fsync.
1548 : * That ought to be cheap because in most scenarios it should already
1549 : * be safely on disk.
1550 : */
1551 206 : fsync_fname(path, false);
1552 206 : fsync_fname(PG_LOGICAL_SNAPSHOTS_DIR, true);
1553 :
1554 206 : builder->last_serialized_snapshot = lsn;
1555 206 : goto out;
1556 : }
1557 :
1558 : /*
1559 : * there is an obvious race condition here between the time we stat(2) the
1560 : * file and us writing the file. But we rename the file into place
1561 : * atomically and all files created need to contain the same data anyway,
1562 : * so this is perfectly fine, although a bit of a resource waste. Locking
1563 : * seems like pointless complication.
1564 : */
1565 560 : elog(DEBUG1, "serializing snapshot to %s", path);
1566 :
1567 : /* to make sure only we will write to this tempfile, include pid */
1568 560 : sprintf(tmppath, "%s/%X-%X.snap.%d.tmp",
1569 : PG_LOGICAL_SNAPSHOTS_DIR,
1570 560 : LSN_FORMAT_ARGS(lsn), MyProcPid);
1571 :
1572 : /*
1573 : * Unlink temporary file if it already exists, needs to have been before a
1574 : * crash/error since we won't enter this function twice from within a
1575 : * single decoding slot/backend and the temporary file contains the pid of
1576 : * the current process.
1577 : */
1578 560 : if (unlink(tmppath) != 0 && errno != ENOENT)
1579 0 : ereport(ERROR,
1580 : (errcode_for_file_access(),
1581 : errmsg("could not remove file \"%s\": %m", tmppath)));
1582 :
1583 560 : old_ctx = MemoryContextSwitchTo(builder->context);
1584 :
1585 : /* Get the catalog modifying transactions that are yet not committed */
1586 560 : catchange_xip = ReorderBufferGetCatalogChangesXacts(builder->reorder);
1587 560 : catchange_xcnt = dclist_count(&builder->reorder->catchange_txns);
1588 :
1589 560 : needed_length = sizeof(SnapBuildOnDisk) +
1590 560 : sizeof(TransactionId) * (builder->committed.xcnt + catchange_xcnt);
1591 :
1592 560 : ondisk_c = palloc0(needed_length);
1593 560 : ondisk = (SnapBuildOnDisk *) ondisk_c;
1594 560 : ondisk->magic = SNAPBUILD_MAGIC;
1595 560 : ondisk->version = SNAPBUILD_VERSION;
1596 560 : ondisk->length = needed_length;
1597 560 : INIT_CRC32C(ondisk->checksum);
1598 560 : COMP_CRC32C(ondisk->checksum,
1599 : ((char *) ondisk) + SnapBuildOnDiskNotChecksummedSize,
1600 : SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
1601 560 : ondisk_c += sizeof(SnapBuildOnDisk);
1602 :
1603 560 : memcpy(&ondisk->builder, builder, sizeof(SnapBuild));
1604 : /* NULL-ify memory-only data */
1605 560 : ondisk->builder.context = NULL;
1606 560 : ondisk->builder.snapshot = NULL;
1607 560 : ondisk->builder.reorder = NULL;
1608 560 : ondisk->builder.committed.xip = NULL;
1609 560 : ondisk->builder.catchange.xip = NULL;
1610 : /* update catchange only on disk data */
1611 560 : ondisk->builder.catchange.xcnt = catchange_xcnt;
1612 :
1613 560 : COMP_CRC32C(ondisk->checksum,
1614 : &ondisk->builder,
1615 : sizeof(SnapBuild));
1616 :
1617 : /* copy committed xacts */
1618 560 : if (builder->committed.xcnt > 0)
1619 : {
1620 102 : sz = sizeof(TransactionId) * builder->committed.xcnt;
1621 102 : memcpy(ondisk_c, builder->committed.xip, sz);
1622 102 : COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
1623 102 : ondisk_c += sz;
1624 : }
1625 :
1626 : /* copy catalog modifying xacts */
1627 560 : if (catchange_xcnt > 0)
1628 : {
1629 18 : sz = sizeof(TransactionId) * catchange_xcnt;
1630 18 : memcpy(ondisk_c, catchange_xip, sz);
1631 18 : COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
1632 18 : ondisk_c += sz;
1633 : }
1634 :
1635 560 : FIN_CRC32C(ondisk->checksum);
1636 :
1637 : /* we have valid data now, open tempfile and write it there */
1638 560 : fd = OpenTransientFile(tmppath,
1639 : O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
1640 560 : if (fd < 0)
1641 0 : ereport(ERROR,
1642 : (errcode_for_file_access(),
1643 : errmsg("could not open file \"%s\": %m", tmppath)));
1644 :
1645 560 : errno = 0;
1646 560 : pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_WRITE);
1647 560 : if ((write(fd, ondisk, needed_length)) != needed_length)
1648 : {
1649 0 : int save_errno = errno;
1650 :
1651 0 : CloseTransientFile(fd);
1652 :
1653 : /* if write didn't set errno, assume problem is no disk space */
1654 0 : errno = save_errno ? save_errno : ENOSPC;
1655 0 : ereport(ERROR,
1656 : (errcode_for_file_access(),
1657 : errmsg("could not write to file \"%s\": %m", tmppath)));
1658 : }
1659 560 : pgstat_report_wait_end();
1660 :
1661 : /*
1662 : * fsync the file before renaming so that even if we crash after this we
1663 : * have either a fully valid file or nothing.
1664 : *
1665 : * It's safe to just ERROR on fsync() here because we'll retry the whole
1666 : * operation including the writes.
1667 : *
1668 : * TODO: Do the fsync() via checkpoints/restartpoints, doing it here has
1669 : * some noticeable overhead since it's performed synchronously during
1670 : * decoding?
1671 : */
1672 560 : pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_SYNC);
1673 560 : if (pg_fsync(fd) != 0)
1674 : {
1675 0 : int save_errno = errno;
1676 :
1677 0 : CloseTransientFile(fd);
1678 0 : errno = save_errno;
1679 0 : ereport(ERROR,
1680 : (errcode_for_file_access(),
1681 : errmsg("could not fsync file \"%s\": %m", tmppath)));
1682 : }
1683 560 : pgstat_report_wait_end();
1684 :
1685 560 : if (CloseTransientFile(fd) != 0)
1686 0 : ereport(ERROR,
1687 : (errcode_for_file_access(),
1688 : errmsg("could not close file \"%s\": %m", tmppath)));
1689 :
1690 560 : fsync_fname(PG_LOGICAL_SNAPSHOTS_DIR, true);
1691 :
1692 : /*
1693 : * We may overwrite the work from some other backend, but that's ok, our
1694 : * snapshot is valid as well, we'll just have done some superfluous work.
1695 : */
1696 560 : if (rename(tmppath, path) != 0)
1697 : {
1698 0 : ereport(ERROR,
1699 : (errcode_for_file_access(),
1700 : errmsg("could not rename file \"%s\" to \"%s\": %m",
1701 : tmppath, path)));
1702 : }
1703 :
1704 : /* make sure we persist */
1705 560 : fsync_fname(path, false);
1706 560 : fsync_fname(PG_LOGICAL_SNAPSHOTS_DIR, true);
1707 :
1708 : /*
1709 : * Now there's no way we can lose the dumped state anymore, remember this
1710 : * as a serialization point.
1711 : */
1712 560 : builder->last_serialized_snapshot = lsn;
1713 :
1714 560 : MemoryContextSwitchTo(old_ctx);
1715 :
1716 766 : out:
1717 766 : ReorderBufferSetRestartPoint(builder->reorder,
1718 : builder->last_serialized_snapshot);
1719 : /* be tidy */
1720 766 : if (ondisk)
1721 560 : pfree(ondisk);
1722 766 : if (catchange_xip)
1723 18 : pfree(catchange_xip);
1724 : }
1725 :
1726 : /*
1727 : * Restore the logical snapshot file contents to 'ondisk'.
1728 : *
1729 : * 'context' is the memory context where the catalog modifying/committed xid
1730 : * will live.
1731 : * If 'missing_ok' is true, will not throw an error if the file is not found.
1732 : */
1733 : bool
1734 42 : SnapBuildRestoreSnapshot(SnapBuildOnDisk *ondisk, XLogRecPtr lsn,
1735 : MemoryContext context, bool missing_ok)
1736 : {
1737 : int fd;
1738 : pg_crc32c checksum;
1739 : Size sz;
1740 : char path[MAXPGPATH];
1741 :
1742 42 : sprintf(path, "%s/%X-%X.snap",
1743 : PG_LOGICAL_SNAPSHOTS_DIR,
1744 42 : LSN_FORMAT_ARGS(lsn));
1745 :
1746 42 : fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
1747 :
1748 42 : if (fd < 0)
1749 : {
1750 22 : if (missing_ok && errno == ENOENT)
1751 22 : return false;
1752 :
1753 0 : ereport(ERROR,
1754 : (errcode_for_file_access(),
1755 : errmsg("could not open file \"%s\": %m", path)));
1756 : }
1757 :
1758 : /* ----
1759 : * Make sure the snapshot had been stored safely to disk, that's normally
1760 : * cheap.
1761 : * Note that we do not need PANIC here, nobody will be able to use the
1762 : * slot without fsyncing, and saving it won't succeed without an fsync()
1763 : * either...
1764 : * ----
1765 : */
1766 20 : fsync_fname(path, false);
1767 20 : fsync_fname(PG_LOGICAL_SNAPSHOTS_DIR, true);
1768 :
1769 : /* read statically sized portion of snapshot */
1770 20 : SnapBuildRestoreContents(fd, ondisk, SnapBuildOnDiskConstantSize, path);
1771 :
1772 20 : if (ondisk->magic != SNAPBUILD_MAGIC)
1773 0 : ereport(ERROR,
1774 : (errcode(ERRCODE_DATA_CORRUPTED),
1775 : errmsg("snapbuild state file \"%s\" has wrong magic number: %u instead of %u",
1776 : path, ondisk->magic, SNAPBUILD_MAGIC)));
1777 :
1778 20 : if (ondisk->version != SNAPBUILD_VERSION)
1779 0 : ereport(ERROR,
1780 : (errcode(ERRCODE_DATA_CORRUPTED),
1781 : errmsg("snapbuild state file \"%s\" has unsupported version: %u instead of %u",
1782 : path, ondisk->version, SNAPBUILD_VERSION)));
1783 :
1784 20 : INIT_CRC32C(checksum);
1785 20 : COMP_CRC32C(checksum,
1786 : ((char *) ondisk) + SnapBuildOnDiskNotChecksummedSize,
1787 : SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
1788 :
1789 : /* read SnapBuild */
1790 20 : SnapBuildRestoreContents(fd, &ondisk->builder, sizeof(SnapBuild), path);
1791 20 : COMP_CRC32C(checksum, &ondisk->builder, sizeof(SnapBuild));
1792 :
1793 : /* restore committed xacts information */
1794 20 : if (ondisk->builder.committed.xcnt > 0)
1795 : {
1796 8 : sz = sizeof(TransactionId) * ondisk->builder.committed.xcnt;
1797 8 : ondisk->builder.committed.xip = MemoryContextAllocZero(context, sz);
1798 8 : SnapBuildRestoreContents(fd, ondisk->builder.committed.xip, sz, path);
1799 8 : COMP_CRC32C(checksum, ondisk->builder.committed.xip, sz);
1800 : }
1801 :
1802 : /* restore catalog modifying xacts information */
1803 20 : if (ondisk->builder.catchange.xcnt > 0)
1804 : {
1805 10 : sz = sizeof(TransactionId) * ondisk->builder.catchange.xcnt;
1806 10 : ondisk->builder.catchange.xip = MemoryContextAllocZero(context, sz);
1807 10 : SnapBuildRestoreContents(fd, ondisk->builder.catchange.xip, sz, path);
1808 10 : COMP_CRC32C(checksum, ondisk->builder.catchange.xip, sz);
1809 : }
1810 :
1811 20 : if (CloseTransientFile(fd) != 0)
1812 0 : ereport(ERROR,
1813 : (errcode_for_file_access(),
1814 : errmsg("could not close file \"%s\": %m", path)));
1815 :
1816 20 : FIN_CRC32C(checksum);
1817 :
1818 : /* verify checksum of what we've read */
1819 20 : if (!EQ_CRC32C(checksum, ondisk->checksum))
1820 0 : ereport(ERROR,
1821 : (errcode(ERRCODE_DATA_CORRUPTED),
1822 : errmsg("checksum mismatch for snapbuild state file \"%s\": is %u, should be %u",
1823 : path, checksum, ondisk->checksum)));
1824 :
1825 20 : return true;
1826 : }
1827 :
1828 : /*
1829 : * Restore a snapshot into 'builder' if previously one has been stored at the
1830 : * location indicated by 'lsn'. Returns true if successful, false otherwise.
1831 : */
1832 : static bool
1833 38 : SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
1834 : {
1835 : SnapBuildOnDisk ondisk;
1836 :
1837 : /* no point in loading a snapshot if we're already there */
1838 38 : if (builder->state == SNAPBUILD_CONSISTENT)
1839 0 : return false;
1840 :
1841 : /* validate and restore the snapshot to 'ondisk' */
1842 38 : if (!SnapBuildRestoreSnapshot(&ondisk, lsn, builder->context, true))
1843 22 : return false;
1844 :
1845 : /*
1846 : * ok, we now have a sensible snapshot here, figure out if it has more
1847 : * information than we have.
1848 : */
1849 :
1850 : /*
1851 : * We are only interested in consistent snapshots for now, comparing
1852 : * whether one incomplete snapshot is more "advanced" seems to be
1853 : * unnecessarily complex.
1854 : */
1855 16 : if (ondisk.builder.state < SNAPBUILD_CONSISTENT)
1856 0 : goto snapshot_not_interesting;
1857 :
1858 : /*
1859 : * Don't use a snapshot that requires an xmin that we cannot guarantee to
1860 : * be available.
1861 : */
1862 16 : if (TransactionIdPrecedes(ondisk.builder.xmin, builder->initial_xmin_horizon))
1863 0 : goto snapshot_not_interesting;
1864 :
1865 : /*
1866 : * Consistent snapshots have no next phase. Reset next_phase_at as it is
1867 : * possible that an old value may remain.
1868 : */
1869 : Assert(ondisk.builder.next_phase_at == InvalidTransactionId);
1870 16 : builder->next_phase_at = InvalidTransactionId;
1871 :
1872 : /* ok, we think the snapshot is sensible, copy over everything important */
1873 16 : builder->xmin = ondisk.builder.xmin;
1874 16 : builder->xmax = ondisk.builder.xmax;
1875 16 : builder->state = ondisk.builder.state;
1876 :
1877 16 : builder->committed.xcnt = ondisk.builder.committed.xcnt;
1878 : /* We only allocated/stored xcnt, not xcnt_space xids ! */
1879 : /* don't overwrite preallocated xip, if we don't have anything here */
1880 16 : if (builder->committed.xcnt > 0)
1881 : {
1882 4 : pfree(builder->committed.xip);
1883 4 : builder->committed.xcnt_space = ondisk.builder.committed.xcnt;
1884 4 : builder->committed.xip = ondisk.builder.committed.xip;
1885 : }
1886 16 : ondisk.builder.committed.xip = NULL;
1887 :
1888 : /* set catalog modifying transactions */
1889 16 : if (builder->catchange.xip)
1890 0 : pfree(builder->catchange.xip);
1891 16 : builder->catchange.xcnt = ondisk.builder.catchange.xcnt;
1892 16 : builder->catchange.xip = ondisk.builder.catchange.xip;
1893 16 : ondisk.builder.catchange.xip = NULL;
1894 :
1895 : /* our snapshot is not interesting anymore, build a new one */
1896 16 : if (builder->snapshot != NULL)
1897 : {
1898 0 : SnapBuildSnapDecRefcount(builder->snapshot);
1899 : }
1900 16 : builder->snapshot = SnapBuildBuildSnapshot(builder);
1901 16 : SnapBuildSnapIncRefcount(builder->snapshot);
1902 :
1903 16 : ReorderBufferSetRestartPoint(builder->reorder, lsn);
1904 :
1905 : Assert(builder->state == SNAPBUILD_CONSISTENT);
1906 :
1907 16 : ereport(LOG,
1908 : (errmsg("logical decoding found consistent point at %X/%X",
1909 : LSN_FORMAT_ARGS(lsn)),
1910 : errdetail("Logical decoding will begin using saved snapshot.")));
1911 16 : return true;
1912 :
1913 0 : snapshot_not_interesting:
1914 0 : if (ondisk.builder.committed.xip != NULL)
1915 0 : pfree(ondisk.builder.committed.xip);
1916 0 : if (ondisk.builder.catchange.xip != NULL)
1917 0 : pfree(ondisk.builder.catchange.xip);
1918 0 : return false;
1919 : }
1920 :
1921 : /*
1922 : * Read the contents of the serialized snapshot to 'dest'.
1923 : */
1924 : static void
1925 58 : SnapBuildRestoreContents(int fd, void *dest, Size size, const char *path)
1926 : {
1927 : int readBytes;
1928 :
1929 58 : pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ);
1930 58 : readBytes = read(fd, dest, size);
1931 58 : pgstat_report_wait_end();
1932 58 : if (readBytes != size)
1933 : {
1934 0 : int save_errno = errno;
1935 :
1936 0 : CloseTransientFile(fd);
1937 :
1938 0 : if (readBytes < 0)
1939 : {
1940 0 : errno = save_errno;
1941 0 : ereport(ERROR,
1942 : (errcode_for_file_access(),
1943 : errmsg("could not read file \"%s\": %m", path)));
1944 : }
1945 : else
1946 0 : ereport(ERROR,
1947 : (errcode(ERRCODE_DATA_CORRUPTED),
1948 : errmsg("could not read file \"%s\": read %d of %zu",
1949 : path, readBytes, size)));
1950 : }
1951 58 : }
1952 :
1953 : /*
1954 : * Remove all serialized snapshots that are not required anymore because no
1955 : * slot can need them. This doesn't actually have to run during a checkpoint,
1956 : * but it's a convenient point to schedule this.
1957 : *
1958 : * NB: We run this during checkpoints even if logical decoding is disabled so
1959 : * we cleanup old slots at some point after it got disabled.
1960 : */
1961 : void
1962 2658 : CheckPointSnapBuild(void)
1963 : {
1964 : XLogRecPtr cutoff;
1965 : XLogRecPtr redo;
1966 : DIR *snap_dir;
1967 : struct dirent *snap_de;
1968 : char path[MAXPGPATH + sizeof(PG_LOGICAL_SNAPSHOTS_DIR)];
1969 :
1970 : /*
1971 : * We start off with a minimum of the last redo pointer. No new
1972 : * replication slot will start before that, so that's a safe upper bound
1973 : * for removal.
1974 : */
1975 2658 : redo = GetRedoRecPtr();
1976 :
1977 : /* now check for the restart ptrs from existing slots */
1978 2658 : cutoff = ReplicationSlotsComputeLogicalRestartLSN();
1979 :
1980 : /* don't start earlier than the restart lsn */
1981 2658 : if (redo < cutoff)
1982 2 : cutoff = redo;
1983 :
1984 2658 : snap_dir = AllocateDir(PG_LOGICAL_SNAPSHOTS_DIR);
1985 8430 : while ((snap_de = ReadDir(snap_dir, PG_LOGICAL_SNAPSHOTS_DIR)) != NULL)
1986 : {
1987 : uint32 hi;
1988 : uint32 lo;
1989 : XLogRecPtr lsn;
1990 : PGFileType de_type;
1991 :
1992 5772 : if (strcmp(snap_de->d_name, ".") == 0 ||
1993 3114 : strcmp(snap_de->d_name, "..") == 0)
1994 5316 : continue;
1995 :
1996 456 : snprintf(path, sizeof(path), "%s/%s", PG_LOGICAL_SNAPSHOTS_DIR, snap_de->d_name);
1997 456 : de_type = get_dirent_type(path, snap_de, false, DEBUG1);
1998 :
1999 456 : if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_REG)
2000 : {
2001 0 : elog(DEBUG1, "only regular files expected: %s", path);
2002 0 : continue;
2003 : }
2004 :
2005 : /*
2006 : * temporary filenames from SnapBuildSerialize() include the LSN and
2007 : * everything but are postfixed by .$pid.tmp. We can just remove them
2008 : * the same as other files because there can be none that are
2009 : * currently being written that are older than cutoff.
2010 : *
2011 : * We just log a message if a file doesn't fit the pattern, it's
2012 : * probably some editors lock/state file or similar...
2013 : */
2014 456 : if (sscanf(snap_de->d_name, "%X-%X.snap", &hi, &lo) != 2)
2015 : {
2016 0 : ereport(LOG,
2017 : (errmsg("could not parse file name \"%s\"", path)));
2018 0 : continue;
2019 : }
2020 :
2021 456 : lsn = ((uint64) hi) << 32 | lo;
2022 :
2023 : /* check whether we still need it */
2024 456 : if (lsn < cutoff || cutoff == InvalidXLogRecPtr)
2025 : {
2026 340 : elog(DEBUG1, "removing snapbuild snapshot %s", path);
2027 :
2028 : /*
2029 : * It's not particularly harmful, though strange, if we can't
2030 : * remove the file here. Don't prevent the checkpoint from
2031 : * completing, that'd be a cure worse than the disease.
2032 : */
2033 340 : if (unlink(path) < 0)
2034 : {
2035 0 : ereport(LOG,
2036 : (errcode_for_file_access(),
2037 : errmsg("could not remove file \"%s\": %m",
2038 : path)));
2039 0 : continue;
2040 : }
2041 : }
2042 : }
2043 2658 : FreeDir(snap_dir);
2044 2658 : }
2045 :
2046 : /*
2047 : * Check if a logical snapshot at the specified point has been serialized.
2048 : */
2049 : bool
2050 22 : SnapBuildSnapshotExists(XLogRecPtr lsn)
2051 : {
2052 : char path[MAXPGPATH];
2053 : int ret;
2054 : struct stat stat_buf;
2055 :
2056 22 : sprintf(path, "%s/%X-%X.snap",
2057 : PG_LOGICAL_SNAPSHOTS_DIR,
2058 22 : LSN_FORMAT_ARGS(lsn));
2059 :
2060 22 : ret = stat(path, &stat_buf);
2061 :
2062 22 : if (ret != 0 && errno != ENOENT)
2063 0 : ereport(ERROR,
2064 : (errcode_for_file_access(),
2065 : errmsg("could not stat file \"%s\": %m", path)));
2066 :
2067 22 : return ret == 0;
2068 : }
|