Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * snapbuild.c
4 : *
5 : * Infrastructure for building historic catalog snapshots based on contents
6 : * of the WAL, for the purpose of decoding heapam.c style values in the
7 : * WAL.
8 : *
9 : * NOTES:
10 : *
11 : * We build snapshots which can *only* be used to read catalog contents and we
12 : * do so by reading and interpreting the WAL stream. The aim is to build a
13 : * snapshot that behaves the same as a freshly taken MVCC snapshot would have
14 : * at the time the XLogRecord was generated.
15 : *
16 : * To build the snapshots we reuse the infrastructure built for Hot
17 : * Standby. The in-memory snapshots we build look different than HS' because
18 : * we have different needs. To successfully decode data from the WAL we only
19 : * need to access catalog tables and (sys|rel|cat)cache, not the actual user
20 : * tables since the data we decode is wholly contained in the WAL
21 : * records. Also, our snapshots need to be different in comparison to normal
22 : * MVCC ones because in contrast to those we cannot fully rely on the clog and
23 : * pg_subtrans for information about committed transactions because they might
24 : * commit in the future from the POV of the WAL entry we're currently
25 : * decoding. This definition has the advantage that we only need to prevent
26 : * removal of catalog rows, while normal table's rows can still be
27 : * removed. This is achieved by using the replication slot mechanism.
28 : *
29 : * As the percentage of transactions modifying the catalog normally is fairly
30 : * small in comparisons to ones only manipulating user data, we keep track of
31 : * the committed catalog modifying ones inside [xmin, xmax) instead of keeping
32 : * track of all running transactions like it's done in a normal snapshot. Note
33 : * that we're generally only looking at transactions that have acquired an
34 : * xid. That is we keep a list of transactions between snapshot->(xmin, xmax)
35 : * that we consider committed, everything else is considered aborted/in
36 : * progress. That also allows us not to care about subtransactions before they
37 : * have committed which means this module, in contrast to HS, doesn't have to
38 : * care about suboverflowed subtransactions and similar.
39 : *
40 : * One complexity of doing this is that to e.g. handle mixed DDL/DML
41 : * transactions we need Snapshots that see intermediate versions of the
42 : * catalog in a transaction. During normal operation this is achieved by using
43 : * CommandIds/cmin/cmax. The problem with that however is that for space
44 : * efficiency reasons, the cmin and cmax are not included in WAL records. We
45 : * cannot read the cmin/cmax from the tuple itself, either, because it is
46 : * reset on crash recovery. Even if we could, we could not decode combocids
47 : * which are only tracked in the original backend's memory. To work around
48 : * that, heapam writes an extra WAL record (XLOG_HEAP2_NEW_CID) every time a
49 : * catalog row is modified, which includes the cmin and cmax of the
50 : * tuple. During decoding, we insert the ctid->(cmin,cmax) mappings into the
51 : * reorder buffer, and use them at visibility checks instead of the cmin/cmax
52 : * on the tuple itself. Check the reorderbuffer.c's comment above
53 : * ResolveCminCmaxDuringDecoding() for details.
54 : *
55 : * To facilitate all this we need our own visibility routine, as the normal
56 : * ones are optimized for different usecases.
57 : *
58 : * To replace the normal catalog snapshots with decoding ones use the
59 : * SetupHistoricSnapshot() and TeardownHistoricSnapshot() functions.
60 : *
61 : *
62 : *
63 : * The snapbuild machinery is starting up in several stages, as illustrated
64 : * by the following graph describing the SnapBuild->state transitions:
65 : *
66 : * +-------------------------+
67 : * +----| START |-------------+
68 : * | +-------------------------+ |
69 : * | | |
70 : * | | |
71 : * | running_xacts #1 |
72 : * | | |
73 : * | | |
74 : * | v |
75 : * | +-------------------------+ v
76 : * | | BUILDING_SNAPSHOT |------------>|
77 : * | +-------------------------+ |
78 : * | | |
79 : * | | |
80 : * | running_xacts #2, xacts from #1 finished |
81 : * | | |
82 : * | | |
83 : * | v |
84 : * | +-------------------------+ v
85 : * | | FULL_SNAPSHOT |------------>|
86 : * | +-------------------------+ |
87 : * | | |
88 : * running_xacts | saved snapshot
89 : * with zero xacts | at running_xacts's lsn
90 : * | | |
91 : * | running_xacts with xacts from #2 finished |
92 : * | | |
93 : * | v |
94 : * | +-------------------------+ |
95 : * +--->|SNAPBUILD_CONSISTENT |<------------+
96 : * +-------------------------+
97 : *
98 : * Initially the machinery is in the START stage. When an xl_running_xacts
99 : * record is read that is sufficiently new (above the safe xmin horizon),
100 : * there's a state transition. If there were no running xacts when the
101 : * xl_running_xacts record was generated, we'll directly go into CONSISTENT
102 : * state, otherwise we'll switch to the BUILDING_SNAPSHOT state. Having a full
103 : * snapshot means that all transactions that start henceforth can be decoded
104 : * in their entirety, but transactions that started previously can't. In
105 : * FULL_SNAPSHOT we'll switch into CONSISTENT once all those previously
106 : * running transactions have committed or aborted.
107 : *
108 : * Only transactions that commit after CONSISTENT state has been reached will
109 : * be replayed, even though they might have started while still in
110 : * FULL_SNAPSHOT. That ensures that we'll reach a point where no previous
111 : * changes has been exported, but all the following ones will be. That point
112 : * is a convenient point to initialize replication from, which is why we
113 : * export a snapshot at that point, which *can* be used to read normal data.
114 : *
115 : * Copyright (c) 2012-2026, PostgreSQL Global Development Group
116 : *
117 : * IDENTIFICATION
118 : * src/backend/replication/logical/snapbuild.c
119 : *
120 : *-------------------------------------------------------------------------
121 : */
122 :
123 : #include "postgres.h"
124 :
125 : #include <sys/stat.h>
126 : #include <unistd.h>
127 :
128 : #include "access/heapam_xlog.h"
129 : #include "access/transam.h"
130 : #include "access/xact.h"
131 : #include "common/file_utils.h"
132 : #include "miscadmin.h"
133 : #include "pgstat.h"
134 : #include "replication/logical.h"
135 : #include "replication/reorderbuffer.h"
136 : #include "replication/snapbuild.h"
137 : #include "replication/snapbuild_internal.h"
138 : #include "storage/fd.h"
139 : #include "storage/lmgr.h"
140 : #include "storage/proc.h"
141 : #include "storage/procarray.h"
142 : #include "storage/standby.h"
143 : #include "utils/builtins.h"
144 : #include "utils/memutils.h"
145 : #include "utils/snapmgr.h"
146 : #include "utils/snapshot.h"
147 : #include "utils/wait_event.h"
148 :
149 :
150 : /*
151 : * Starting a transaction -- which we need to do while exporting a snapshot --
152 : * removes knowledge about the previously used resowner, so we save it here.
153 : */
154 : static ResourceOwner SavedResourceOwnerDuringExport = NULL;
155 : static bool ExportInProgress = false;
156 :
157 : /* ->committed and ->catchange manipulation */
158 : static void SnapBuildPurgeOlderTxn(SnapBuild *builder);
159 :
160 : /* snapshot building/manipulation/distribution functions */
161 : static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder);
162 :
163 : static void SnapBuildFreeSnapshot(Snapshot snap);
164 :
165 : static void SnapBuildSnapIncRefcount(Snapshot snap);
166 :
167 : static void SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid);
168 :
169 : static inline bool SnapBuildXidHasCatalogChanges(SnapBuild *builder, TransactionId xid,
170 : uint32 xinfo);
171 :
172 : /* xlog reading helper functions for SnapBuildProcessRunningXacts */
173 : static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn,
174 : xl_running_xacts *running);
175 : static void SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff);
176 :
177 : /* serialization functions */
178 : static void SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn);
179 : static bool SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn);
180 : static void SnapBuildRestoreContents(int fd, void *dest, Size size, const char *path);
181 :
182 : /*
183 : * Allocate a new snapshot builder.
184 : *
185 : * xmin_horizon is the xid >= which we can be sure no catalog rows have been
186 : * removed, start_lsn is the LSN >= we want to replay commits.
187 : */
188 : SnapBuild *
189 1222 : AllocateSnapshotBuilder(ReorderBuffer *reorder,
190 : TransactionId xmin_horizon,
191 : XLogRecPtr start_lsn,
192 : bool need_full_snapshot,
193 : bool in_slot_creation,
194 : XLogRecPtr two_phase_at)
195 : {
196 : MemoryContext context;
197 : MemoryContext oldcontext;
198 : SnapBuild *builder;
199 :
200 : /* allocate memory in own context, to have better accountability */
201 1222 : context = AllocSetContextCreate(CurrentMemoryContext,
202 : "snapshot builder context",
203 : ALLOCSET_DEFAULT_SIZES);
204 1222 : oldcontext = MemoryContextSwitchTo(context);
205 :
206 1222 : builder = palloc0_object(SnapBuild);
207 :
208 1222 : builder->state = SNAPBUILD_START;
209 1222 : builder->context = context;
210 1222 : builder->reorder = reorder;
211 : /* Other struct members initialized by zeroing via palloc0 above */
212 :
213 1222 : builder->committed.xcnt = 0;
214 1222 : builder->committed.xcnt_space = 128; /* arbitrary number */
215 1222 : builder->committed.xip =
216 1222 : palloc0_array(TransactionId, builder->committed.xcnt_space);
217 1222 : builder->committed.includes_all_transactions = true;
218 :
219 1222 : builder->catchange.xcnt = 0;
220 1222 : builder->catchange.xip = NULL;
221 :
222 1222 : builder->initial_xmin_horizon = xmin_horizon;
223 1222 : builder->start_decoding_at = start_lsn;
224 1222 : builder->in_slot_creation = in_slot_creation;
225 1222 : builder->building_full_snapshot = need_full_snapshot;
226 1222 : builder->two_phase_at = two_phase_at;
227 :
228 1222 : MemoryContextSwitchTo(oldcontext);
229 :
230 1222 : return builder;
231 : }
232 :
233 : /*
234 : * Free a snapshot builder.
235 : */
236 : void
237 948 : FreeSnapshotBuilder(SnapBuild *builder)
238 : {
239 948 : MemoryContext context = builder->context;
240 :
241 : /* free snapshot explicitly, that contains some error checking */
242 948 : if (builder->snapshot != NULL)
243 : {
244 232 : SnapBuildSnapDecRefcount(builder->snapshot);
245 232 : builder->snapshot = NULL;
246 : }
247 :
248 : /* other resources are deallocated via memory context reset */
249 948 : MemoryContextDelete(context);
250 948 : }
251 :
252 : /*
253 : * Free an unreferenced snapshot that has previously been built by us.
254 : */
255 : static void
256 1870 : SnapBuildFreeSnapshot(Snapshot snap)
257 : {
258 : /* make sure we don't get passed an external snapshot */
259 : Assert(snap->snapshot_type == SNAPSHOT_HISTORIC_MVCC);
260 :
261 : /* make sure nobody modified our snapshot */
262 : Assert(snap->curcid == FirstCommandId);
263 : Assert(!snap->suboverflowed);
264 : Assert(!snap->takenDuringRecovery);
265 : Assert(snap->regd_count == 0);
266 :
267 : /* slightly more likely, so it's checked even without c-asserts */
268 1870 : if (snap->copied)
269 0 : elog(ERROR, "cannot free a copied snapshot");
270 :
271 1870 : if (snap->active_count)
272 0 : elog(ERROR, "cannot free an active snapshot");
273 :
274 1870 : pfree(snap);
275 1870 : }
276 :
277 : /*
278 : * In which state of snapshot building are we?
279 : */
280 : SnapBuildState
281 1900758 : SnapBuildCurrentState(SnapBuild *builder)
282 : {
283 1900758 : return builder->state;
284 : }
285 :
286 : /*
287 : * Return the LSN at which the two-phase decoding was first enabled.
288 : */
289 : XLogRecPtr
290 34 : SnapBuildGetTwoPhaseAt(SnapBuild *builder)
291 : {
292 34 : return builder->two_phase_at;
293 : }
294 :
295 : /*
296 : * Set the LSN at which two-phase decoding is enabled.
297 : */
298 : void
299 8 : SnapBuildSetTwoPhaseAt(SnapBuild *builder, XLogRecPtr ptr)
300 : {
301 8 : builder->two_phase_at = ptr;
302 8 : }
303 :
304 : /*
305 : * Should the contents of transaction ending at 'ptr' be decoded?
306 : */
307 : bool
308 411735 : SnapBuildXactNeedsSkip(SnapBuild *builder, XLogRecPtr ptr)
309 : {
310 411735 : return ptr < builder->start_decoding_at;
311 : }
312 :
313 : /*
314 : * Increase refcount of a snapshot.
315 : *
316 : * This is used when handing out a snapshot to some external resource or when
317 : * adding a Snapshot as builder->snapshot.
318 : */
319 : static void
320 8013 : SnapBuildSnapIncRefcount(Snapshot snap)
321 : {
322 8013 : snap->active_count++;
323 8013 : }
324 :
325 : /*
326 : * Decrease refcount of a snapshot and free if the refcount reaches zero.
327 : *
328 : * Externally visible, so that external resources that have been handed an
329 : * IncRef'ed Snapshot can adjust its refcount easily.
330 : */
331 : void
332 7698 : SnapBuildSnapDecRefcount(Snapshot snap)
333 : {
334 : /* make sure we don't get passed an external snapshot */
335 : Assert(snap->snapshot_type == SNAPSHOT_HISTORIC_MVCC);
336 :
337 : /* make sure nobody modified our snapshot */
338 : Assert(snap->curcid == FirstCommandId);
339 : Assert(!snap->suboverflowed);
340 : Assert(!snap->takenDuringRecovery);
341 :
342 : Assert(snap->regd_count == 0);
343 :
344 : Assert(snap->active_count > 0);
345 :
346 : /* slightly more likely, so it's checked even without casserts */
347 7698 : if (snap->copied)
348 0 : elog(ERROR, "cannot free a copied snapshot");
349 :
350 7698 : snap->active_count--;
351 7698 : if (snap->active_count == 0)
352 1870 : SnapBuildFreeSnapshot(snap);
353 7698 : }
354 :
355 : /*
356 : * Build a new snapshot, based on currently committed catalog-modifying
357 : * transactions.
358 : *
359 : * In-progress transactions with catalog access are *not* allowed to modify
360 : * these snapshots; they have to copy them and fill in appropriate ->curcid
361 : * and ->subxip/subxcnt values.
362 : */
363 : static Snapshot
364 2377 : SnapBuildBuildSnapshot(SnapBuild *builder)
365 : {
366 : Snapshot snapshot;
367 : Size ssize;
368 :
369 : Assert(builder->state >= SNAPBUILD_FULL_SNAPSHOT);
370 :
371 2377 : ssize = sizeof(SnapshotData)
372 2377 : + sizeof(TransactionId) * builder->committed.xcnt
373 2377 : + sizeof(TransactionId) * 1 /* toplevel xid */ ;
374 :
375 2377 : snapshot = MemoryContextAllocZero(builder->context, ssize);
376 :
377 2377 : snapshot->snapshot_type = SNAPSHOT_HISTORIC_MVCC;
378 :
379 : /*
380 : * We misuse the original meaning of SnapshotData's xip and subxip fields
381 : * to make the more fitting for our needs.
382 : *
383 : * In the 'xip' array we store transactions that have to be treated as
384 : * committed. Since we will only ever look at tuples from transactions
385 : * that have modified the catalog it's more efficient to store those few
386 : * that exist between xmin and xmax (frequently there are none).
387 : *
388 : * Snapshots that are used in transactions that have modified the catalog
389 : * also use the 'subxip' array to store their toplevel xid and all the
390 : * subtransaction xids so we can recognize when we need to treat rows as
391 : * visible that are not in xip but still need to be visible. Subxip only
392 : * gets filled when the transaction is copied into the context of a
393 : * catalog modifying transaction since we otherwise share a snapshot
394 : * between transactions. As long as a txn hasn't modified the catalog it
395 : * doesn't need to treat any uncommitted rows as visible, so there is no
396 : * need for those xids.
397 : *
398 : * Both arrays are qsort'ed so that we can use bsearch() on them.
399 : */
400 : Assert(TransactionIdIsNormal(builder->xmin));
401 : Assert(TransactionIdIsNormal(builder->xmax));
402 :
403 2377 : snapshot->xmin = builder->xmin;
404 2377 : snapshot->xmax = builder->xmax;
405 :
406 : /* store all transactions to be treated as committed by this snapshot */
407 2377 : snapshot->xip =
408 2377 : (TransactionId *) ((char *) snapshot + sizeof(SnapshotData));
409 2377 : snapshot->xcnt = builder->committed.xcnt;
410 2377 : memcpy(snapshot->xip,
411 2377 : builder->committed.xip,
412 2377 : builder->committed.xcnt * sizeof(TransactionId));
413 :
414 : /* sort so we can bsearch() */
415 2377 : qsort(snapshot->xip, snapshot->xcnt, sizeof(TransactionId), xidComparator);
416 :
417 : /*
418 : * Initially, subxip is empty, i.e. it's a snapshot to be used by
419 : * transactions that don't modify the catalog. Will be filled by
420 : * ReorderBufferCopySnap() if necessary.
421 : */
422 2377 : snapshot->subxcnt = 0;
423 2377 : snapshot->subxip = NULL;
424 :
425 2377 : snapshot->suboverflowed = false;
426 2377 : snapshot->takenDuringRecovery = false;
427 2377 : snapshot->copied = false;
428 2377 : snapshot->curcid = FirstCommandId;
429 2377 : snapshot->active_count = 0;
430 2377 : snapshot->regd_count = 0;
431 2377 : snapshot->snapXactCompletionCount = 0;
432 :
433 2377 : return snapshot;
434 : }
435 :
436 : /*
437 : * Build the initial slot snapshot and convert it to a normal snapshot that
438 : * is understood by HeapTupleSatisfiesMVCC.
439 : *
440 : * The snapshot will be usable directly in current transaction or exported
441 : * for loading in different transaction.
442 : */
443 : Snapshot
444 221 : SnapBuildInitialSnapshot(SnapBuild *builder)
445 : {
446 : Snapshot snap;
447 : TransactionId xid;
448 : TransactionId safeXid;
449 : TransactionId *newxip;
450 221 : int newxcnt = 0;
451 :
452 : Assert(XactIsoLevel == XACT_REPEATABLE_READ);
453 : Assert(builder->building_full_snapshot);
454 :
455 : /* don't allow older snapshots */
456 221 : InvalidateCatalogSnapshot(); /* about to overwrite MyProc->xmin */
457 221 : if (HaveRegisteredOrActiveSnapshot())
458 0 : elog(ERROR, "cannot build an initial slot snapshot when snapshots exist");
459 : Assert(!HistoricSnapshotActive());
460 :
461 221 : if (builder->state != SNAPBUILD_CONSISTENT)
462 0 : elog(ERROR, "cannot build an initial slot snapshot before reaching a consistent state");
463 :
464 221 : if (!builder->committed.includes_all_transactions)
465 0 : elog(ERROR, "cannot build an initial slot snapshot, not all transactions are monitored anymore");
466 :
467 : /* so we don't overwrite the existing value */
468 221 : if (TransactionIdIsValid(MyProc->xmin))
469 0 : elog(ERROR, "cannot build an initial slot snapshot when MyProc->xmin already is valid");
470 :
471 221 : snap = SnapBuildBuildSnapshot(builder);
472 :
473 : /*
474 : * We know that snap->xmin is alive, enforced by the logical xmin
475 : * mechanism. Due to that we can do this without locks, we're only
476 : * changing our own value.
477 : *
478 : * Building an initial snapshot is expensive and an unenforced xmin
479 : * horizon would have bad consequences, therefore always double-check that
480 : * the horizon is enforced.
481 : */
482 221 : LWLockAcquire(ProcArrayLock, LW_SHARED);
483 221 : safeXid = GetOldestSafeDecodingTransactionId(false);
484 221 : LWLockRelease(ProcArrayLock);
485 :
486 221 : if (TransactionIdFollows(safeXid, snap->xmin))
487 0 : elog(ERROR, "cannot build an initial slot snapshot as oldest safe xid %u follows snapshot's xmin %u",
488 : safeXid, snap->xmin);
489 :
490 221 : MyProc->xmin = snap->xmin;
491 :
492 : /* allocate in transaction context */
493 221 : newxip = palloc_array(TransactionId, GetMaxSnapshotXidCount());
494 :
495 : /*
496 : * snapbuild.c builds transactions in an "inverted" manner, which means it
497 : * stores committed transactions in ->xip, not ones in progress. Build a
498 : * classical snapshot by marking all non-committed transactions as
499 : * in-progress. This can be expensive.
500 : */
501 221 : for (xid = snap->xmin; NormalTransactionIdPrecedes(xid, snap->xmax);)
502 : {
503 : void *test;
504 :
505 : /*
506 : * Check whether transaction committed using the decoding snapshot
507 : * meaning of ->xip.
508 : */
509 0 : test = bsearch(&xid, snap->xip, snap->xcnt,
510 : sizeof(TransactionId), xidComparator);
511 :
512 0 : if (test == NULL)
513 : {
514 0 : if (newxcnt >= GetMaxSnapshotXidCount())
515 0 : ereport(ERROR,
516 : (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
517 : errmsg("initial slot snapshot too large")));
518 :
519 0 : newxip[newxcnt++] = xid;
520 : }
521 :
522 0 : TransactionIdAdvance(xid);
523 : }
524 :
525 : /* adjust remaining snapshot fields as needed */
526 221 : snap->snapshot_type = SNAPSHOT_MVCC;
527 221 : snap->xcnt = newxcnt;
528 221 : snap->xip = newxip;
529 :
530 221 : return snap;
531 : }
532 :
533 : /*
534 : * Export a snapshot so it can be set in another session with SET TRANSACTION
535 : * SNAPSHOT.
536 : *
537 : * For that we need to start a transaction in the current backend as the
538 : * importing side checks whether the source transaction is still open to make
539 : * sure the xmin horizon hasn't advanced since then.
540 : */
541 : const char *
542 1 : SnapBuildExportSnapshot(SnapBuild *builder)
543 : {
544 : Snapshot snap;
545 : char *snapname;
546 :
547 1 : if (IsTransactionOrTransactionBlock())
548 0 : elog(ERROR, "cannot export a snapshot from within a transaction");
549 :
550 1 : if (SavedResourceOwnerDuringExport)
551 0 : elog(ERROR, "can only export one snapshot at a time");
552 :
553 1 : SavedResourceOwnerDuringExport = CurrentResourceOwner;
554 1 : ExportInProgress = true;
555 :
556 1 : StartTransactionCommand();
557 :
558 : /* There doesn't seem to a nice API to set these */
559 1 : XactIsoLevel = XACT_REPEATABLE_READ;
560 1 : XactReadOnly = true;
561 :
562 1 : snap = SnapBuildInitialSnapshot(builder);
563 :
564 : /*
565 : * now that we've built a plain snapshot, make it active and use the
566 : * normal mechanisms for exporting it
567 : */
568 1 : snapname = ExportSnapshot(snap);
569 :
570 1 : ereport(LOG,
571 : (errmsg_plural("exported logical decoding snapshot: \"%s\" with %u transaction ID",
572 : "exported logical decoding snapshot: \"%s\" with %u transaction IDs",
573 : snap->xcnt,
574 : snapname, snap->xcnt)));
575 1 : return snapname;
576 : }
577 :
578 : /*
579 : * Ensure there is a snapshot and if not build one for current transaction.
580 : */
581 : Snapshot
582 9 : SnapBuildGetOrBuildSnapshot(SnapBuild *builder)
583 : {
584 : Assert(builder->state == SNAPBUILD_CONSISTENT);
585 :
586 : /* only build a new snapshot if we don't have a prebuilt one */
587 9 : if (builder->snapshot == NULL)
588 : {
589 2 : builder->snapshot = SnapBuildBuildSnapshot(builder);
590 : /* increase refcount for the snapshot builder */
591 2 : SnapBuildSnapIncRefcount(builder->snapshot);
592 : }
593 :
594 9 : return builder->snapshot;
595 : }
596 :
597 : /*
598 : * Reset a previously SnapBuildExportSnapshot()'ed snapshot if there is
599 : * any. Aborts the previously started transaction and resets the resource
600 : * owner back to its original value.
601 : */
602 : void
603 5915 : SnapBuildClearExportedSnapshot(void)
604 : {
605 : ResourceOwner tmpResOwner;
606 :
607 : /* nothing exported, that is the usual case */
608 5915 : if (!ExportInProgress)
609 5914 : return;
610 :
611 1 : if (!IsTransactionState())
612 0 : elog(ERROR, "clearing exported snapshot in wrong transaction state");
613 :
614 : /*
615 : * AbortCurrentTransaction() takes care of resetting the snapshot state,
616 : * so remember SavedResourceOwnerDuringExport.
617 : */
618 1 : tmpResOwner = SavedResourceOwnerDuringExport;
619 :
620 : /* make sure nothing could have ever happened */
621 1 : AbortCurrentTransaction();
622 :
623 1 : CurrentResourceOwner = tmpResOwner;
624 : }
625 :
626 : /*
627 : * Clear snapshot export state during transaction abort.
628 : */
629 : void
630 35696 : SnapBuildResetExportedSnapshotState(void)
631 : {
632 35696 : SavedResourceOwnerDuringExport = NULL;
633 35696 : ExportInProgress = false;
634 35696 : }
635 :
636 : /*
637 : * Handle the effects of a single heap change, appropriate to the current state
638 : * of the snapshot builder and returns whether changes made at (xid, lsn) can
639 : * be decoded.
640 : */
641 : bool
642 1288211 : SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
643 : {
644 : /*
645 : * We can't handle data in transactions if we haven't built a snapshot
646 : * yet, so don't store them.
647 : */
648 1288211 : if (builder->state < SNAPBUILD_FULL_SNAPSHOT)
649 0 : return false;
650 :
651 : /*
652 : * No point in keeping track of changes in transactions that we don't have
653 : * enough information about to decode. This means that they started before
654 : * we got into the SNAPBUILD_FULL_SNAPSHOT state.
655 : */
656 1288214 : if (builder->state < SNAPBUILD_CONSISTENT &&
657 3 : TransactionIdPrecedes(xid, builder->next_phase_at))
658 0 : return false;
659 :
660 : /*
661 : * If the reorderbuffer doesn't yet have a snapshot, add one now, it will
662 : * be needed to decode the change we're currently processing.
663 : */
664 1288211 : if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
665 : {
666 : /* only build a new snapshot if we don't have a prebuilt one */
667 4161 : if (builder->snapshot == NULL)
668 : {
669 463 : builder->snapshot = SnapBuildBuildSnapshot(builder);
670 : /* increase refcount for the snapshot builder */
671 463 : SnapBuildSnapIncRefcount(builder->snapshot);
672 : }
673 :
674 : /*
675 : * Increase refcount for the transaction we're handing the snapshot
676 : * out to.
677 : */
678 4161 : SnapBuildSnapIncRefcount(builder->snapshot);
679 4161 : ReorderBufferSetBaseSnapshot(builder->reorder, xid, lsn,
680 : builder->snapshot);
681 : }
682 :
683 1288211 : return true;
684 : }
685 :
686 : /*
687 : * Do CommandId/combo CID handling after reading an xl_heap_new_cid record.
688 : * This implies that a transaction has done some form of write to system
689 : * catalogs.
690 : */
691 : void
692 28210 : SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
693 : XLogRecPtr lsn, xl_heap_new_cid *xlrec)
694 : {
695 : CommandId cid;
696 :
697 : /*
698 : * we only log new_cid's if a catalog tuple was modified, so mark the
699 : * transaction as containing catalog modifications
700 : */
701 28210 : ReorderBufferXidSetCatalogChanges(builder->reorder, xid, lsn);
702 :
703 28210 : ReorderBufferAddNewTupleCids(builder->reorder, xlrec->top_xid, lsn,
704 : xlrec->target_locator, xlrec->target_tid,
705 : xlrec->cmin, xlrec->cmax,
706 : xlrec->combocid);
707 :
708 : /* figure out new command id */
709 28210 : if (xlrec->cmin != InvalidCommandId &&
710 23791 : xlrec->cmax != InvalidCommandId)
711 3319 : cid = Max(xlrec->cmin, xlrec->cmax);
712 24891 : else if (xlrec->cmax != InvalidCommandId)
713 4419 : cid = xlrec->cmax;
714 20472 : else if (xlrec->cmin != InvalidCommandId)
715 20472 : cid = xlrec->cmin;
716 : else
717 : {
718 0 : cid = InvalidCommandId; /* silence compiler */
719 0 : elog(ERROR, "xl_heap_new_cid record without a valid CommandId");
720 : }
721 :
722 28210 : ReorderBufferAddNewCommandId(builder->reorder, xid, lsn, cid + 1);
723 28210 : }
724 :
725 : /*
726 : * Add a new Snapshot and invalidation messages to all transactions we're
727 : * decoding that currently are in-progress so they can see new catalog contents
728 : * made by the transaction that just committed. This is necessary because those
729 : * in-progress transactions will use the new catalog's contents from here on
730 : * (at the very least everything they do needs to be compatible with newer
731 : * catalog contents).
732 : */
733 : static void
734 1683 : SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid)
735 : {
736 : dlist_iter txn_i;
737 : ReorderBufferTXN *txn;
738 :
739 : /*
740 : * Iterate through all toplevel transactions. This can include
741 : * subtransactions which we just don't yet know to be that, but that's
742 : * fine, they will just get an unnecessary snapshot and invalidations
743 : * queued.
744 : */
745 3401 : dlist_foreach(txn_i, &builder->reorder->toplevel_by_lsn)
746 : {
747 1718 : txn = dlist_container(ReorderBufferTXN, node, txn_i.cur);
748 :
749 : Assert(TransactionIdIsValid(txn->xid));
750 :
751 : /*
752 : * If we don't have a base snapshot yet, there are no changes in this
753 : * transaction which in turn implies we don't yet need a snapshot at
754 : * all. We'll add a snapshot when the first change gets queued.
755 : *
756 : * Similarly, we don't need to add invalidations to a transaction
757 : * whose base snapshot is not yet set. Once a base snapshot is built,
758 : * it will include the xids of committed transactions that have
759 : * modified the catalog, thus reflecting the new catalog contents. The
760 : * existing catalog cache will have already been invalidated after
761 : * processing the invalidations in the transaction that modified
762 : * catalogs, ensuring that a fresh cache is constructed during
763 : * decoding.
764 : *
765 : * NB: This works correctly even for subtransactions because
766 : * ReorderBufferAssignChild() takes care to transfer the base snapshot
767 : * to the top-level transaction, and while iterating the changequeue
768 : * we'll get the change from the subtxn.
769 : */
770 1718 : if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, txn->xid))
771 2 : continue;
772 :
773 : /*
774 : * We don't need to add snapshot or invalidations to prepared
775 : * transactions as they should not see the new catalog contents.
776 : */
777 1716 : if (rbtxn_is_prepared(txn))
778 28 : continue;
779 :
780 1688 : elog(DEBUG2, "adding a new snapshot and invalidations to %u at %X/%08X",
781 : txn->xid, LSN_FORMAT_ARGS(lsn));
782 :
783 : /*
784 : * increase the snapshot's refcount for the transaction we are handing
785 : * it out to
786 : */
787 1688 : SnapBuildSnapIncRefcount(builder->snapshot);
788 1688 : ReorderBufferAddSnapshot(builder->reorder, txn->xid, lsn,
789 : builder->snapshot);
790 :
791 : /*
792 : * Add invalidation messages to the reorder buffer of in-progress
793 : * transactions except the current committed transaction, for which we
794 : * will execute invalidations at the end.
795 : *
796 : * It is required, otherwise, we will end up using the stale catcache
797 : * contents built by the current transaction even after its decoding,
798 : * which should have been invalidated due to concurrent catalog
799 : * changing transaction.
800 : *
801 : * Distribute only the invalidation messages generated by the current
802 : * committed transaction. Invalidation messages received from other
803 : * transactions would have already been propagated to the relevant
804 : * in-progress transactions. This transaction would have processed
805 : * those invalidations, ensuring that subsequent transactions observe
806 : * a consistent cache state.
807 : */
808 1688 : if (txn->xid != xid)
809 : {
810 : uint32 ninvalidations;
811 33 : SharedInvalidationMessage *msgs = NULL;
812 :
813 33 : ninvalidations = ReorderBufferGetInvalidations(builder->reorder,
814 : xid, &msgs);
815 :
816 33 : if (ninvalidations > 0)
817 : {
818 : Assert(msgs != NULL);
819 :
820 29 : ReorderBufferAddDistributedInvalidations(builder->reorder,
821 : txn->xid, lsn,
822 : ninvalidations, msgs);
823 : }
824 : }
825 : }
826 1683 : }
827 :
828 : /*
829 : * Keep track of a new catalog changing transaction that has committed.
830 : */
831 : static void
832 1692 : SnapBuildAddCommittedTxn(SnapBuild *builder, TransactionId xid)
833 : {
834 : Assert(TransactionIdIsValid(xid));
835 :
836 1692 : if (builder->committed.xcnt == builder->committed.xcnt_space)
837 : {
838 0 : builder->committed.xcnt_space = builder->committed.xcnt_space * 2 + 1;
839 :
840 0 : elog(DEBUG1, "increasing space for committed transactions to %u",
841 : (uint32) builder->committed.xcnt_space);
842 :
843 0 : builder->committed.xip = repalloc_array(builder->committed.xip,
844 : TransactionId,
845 : builder->committed.xcnt_space);
846 : }
847 :
848 : /*
849 : * TODO: It might make sense to keep the array sorted here instead of
850 : * doing it every time we build a new snapshot. On the other hand this
851 : * gets called repeatedly when a transaction with subtransactions commits.
852 : */
853 1692 : builder->committed.xip[builder->committed.xcnt++] = xid;
854 1692 : }
855 :
856 : /*
857 : * Remove knowledge about transactions we treat as committed or containing catalog
858 : * changes that are smaller than ->xmin. Those won't ever get checked via
859 : * the ->committed or ->catchange array, respectively. The committed xids will
860 : * get checked via the clog machinery.
861 : *
862 : * We can ideally remove the transaction from catchange array once it is
863 : * finished (committed/aborted) but that could be costly as we need to maintain
864 : * the xids order in the array.
865 : */
866 : static void
867 546 : SnapBuildPurgeOlderTxn(SnapBuild *builder)
868 : {
869 : int off;
870 : TransactionId *workspace;
871 546 : int surviving_xids = 0;
872 :
873 : /* not ready yet */
874 546 : if (!TransactionIdIsNormal(builder->xmin))
875 0 : return;
876 :
877 : /* TODO: Neater algorithm than just copying and iterating? */
878 : workspace =
879 546 : MemoryContextAlloc(builder->context,
880 546 : builder->committed.xcnt * sizeof(TransactionId));
881 :
882 : /* copy xids that still are interesting to workspace */
883 924 : for (off = 0; off < builder->committed.xcnt; off++)
884 : {
885 378 : if (NormalTransactionIdPrecedes(builder->committed.xip[off],
886 : builder->xmin))
887 : ; /* remove */
888 : else
889 1 : workspace[surviving_xids++] = builder->committed.xip[off];
890 : }
891 :
892 : /* copy workspace back to persistent state */
893 546 : memcpy(builder->committed.xip, workspace,
894 : surviving_xids * sizeof(TransactionId));
895 :
896 546 : elog(DEBUG3, "purged committed transactions from %u to %u, xmin: %u, xmax: %u",
897 : (uint32) builder->committed.xcnt, (uint32) surviving_xids,
898 : builder->xmin, builder->xmax);
899 546 : builder->committed.xcnt = surviving_xids;
900 :
901 546 : pfree(workspace);
902 :
903 : /*
904 : * Purge xids in ->catchange as well. The purged array must also be sorted
905 : * in xidComparator order.
906 : */
907 546 : if (builder->catchange.xcnt > 0)
908 : {
909 : /*
910 : * Since catchange.xip is sorted, we find the lower bound of xids that
911 : * are still interesting.
912 : */
913 9 : for (off = 0; off < builder->catchange.xcnt; off++)
914 : {
915 6 : if (TransactionIdFollowsOrEquals(builder->catchange.xip[off],
916 : builder->xmin))
917 1 : break;
918 : }
919 :
920 4 : surviving_xids = builder->catchange.xcnt - off;
921 :
922 4 : if (surviving_xids > 0)
923 : {
924 1 : memmove(builder->catchange.xip, &(builder->catchange.xip[off]),
925 : surviving_xids * sizeof(TransactionId));
926 : }
927 : else
928 : {
929 3 : pfree(builder->catchange.xip);
930 3 : builder->catchange.xip = NULL;
931 : }
932 :
933 4 : elog(DEBUG3, "purged catalog modifying transactions from %u to %u, xmin: %u, xmax: %u",
934 : (uint32) builder->catchange.xcnt, (uint32) surviving_xids,
935 : builder->xmin, builder->xmax);
936 4 : builder->catchange.xcnt = surviving_xids;
937 : }
938 : }
939 :
940 : /*
941 : * Handle everything that needs to be done when a transaction commits
942 : */
943 : void
944 3986 : SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
945 : int nsubxacts, TransactionId *subxacts, uint32 xinfo)
946 : {
947 : int nxact;
948 :
949 3986 : bool needs_snapshot = false;
950 3986 : bool needs_timetravel = false;
951 3986 : bool sub_needs_timetravel = false;
952 :
953 3986 : TransactionId xmax = xid;
954 :
955 : /*
956 : * Transactions preceding BUILDING_SNAPSHOT will neither be decoded, nor
957 : * will they be part of a snapshot. So we don't need to record anything.
958 : */
959 3986 : if (builder->state == SNAPBUILD_START ||
960 3986 : (builder->state == SNAPBUILD_BUILDING_SNAPSHOT &&
961 0 : TransactionIdPrecedes(xid, builder->next_phase_at)))
962 : {
963 : /* ensure that only commits after this are getting replayed */
964 0 : if (builder->start_decoding_at <= lsn)
965 0 : builder->start_decoding_at = lsn + 1;
966 0 : return;
967 : }
968 :
969 3986 : if (builder->state < SNAPBUILD_CONSISTENT)
970 : {
971 : /* ensure that only commits after this are getting replayed */
972 5 : if (builder->start_decoding_at <= lsn)
973 2 : builder->start_decoding_at = lsn + 1;
974 :
975 : /*
976 : * If building an exportable snapshot, force xid to be tracked, even
977 : * if the transaction didn't modify the catalog.
978 : */
979 5 : if (builder->building_full_snapshot)
980 : {
981 0 : needs_timetravel = true;
982 : }
983 : }
984 :
985 5203 : for (nxact = 0; nxact < nsubxacts; nxact++)
986 : {
987 1217 : TransactionId subxid = subxacts[nxact];
988 :
989 : /*
990 : * Add subtransaction to base snapshot if catalog modifying, we don't
991 : * distinguish to toplevel transactions there.
992 : */
993 1217 : if (SnapBuildXidHasCatalogChanges(builder, subxid, xinfo))
994 : {
995 9 : sub_needs_timetravel = true;
996 9 : needs_snapshot = true;
997 :
998 9 : elog(DEBUG1, "found subtransaction %u:%u with catalog changes",
999 : xid, subxid);
1000 :
1001 9 : SnapBuildAddCommittedTxn(builder, subxid);
1002 :
1003 9 : if (NormalTransactionIdFollows(subxid, xmax))
1004 9 : xmax = subxid;
1005 : }
1006 :
1007 : /*
1008 : * If we're forcing timetravel we also need visibility information
1009 : * about subtransaction, so keep track of subtransaction's state, even
1010 : * if not catalog modifying. Don't need to distribute a snapshot in
1011 : * that case.
1012 : */
1013 1208 : else if (needs_timetravel)
1014 : {
1015 0 : SnapBuildAddCommittedTxn(builder, subxid);
1016 0 : if (NormalTransactionIdFollows(subxid, xmax))
1017 0 : xmax = subxid;
1018 : }
1019 : }
1020 :
1021 : /* if top-level modified catalog, it'll need a snapshot */
1022 3986 : if (SnapBuildXidHasCatalogChanges(builder, xid, xinfo))
1023 : {
1024 1682 : elog(DEBUG2, "found top level transaction %u, with catalog changes",
1025 : xid);
1026 1682 : needs_snapshot = true;
1027 1682 : needs_timetravel = true;
1028 1682 : SnapBuildAddCommittedTxn(builder, xid);
1029 : }
1030 2304 : else if (sub_needs_timetravel)
1031 : {
1032 : /* track toplevel txn as well, subxact alone isn't meaningful */
1033 1 : elog(DEBUG2, "forced transaction %u to do timetravel due to one of its subtransactions",
1034 : xid);
1035 1 : needs_timetravel = true;
1036 1 : SnapBuildAddCommittedTxn(builder, xid);
1037 : }
1038 2303 : else if (needs_timetravel)
1039 : {
1040 0 : elog(DEBUG2, "forced transaction %u to do timetravel", xid);
1041 :
1042 0 : SnapBuildAddCommittedTxn(builder, xid);
1043 : }
1044 :
1045 3986 : if (!needs_timetravel)
1046 : {
1047 : /* record that we cannot export a general snapshot anymore */
1048 2303 : builder->committed.includes_all_transactions = false;
1049 : }
1050 :
1051 : Assert(!needs_snapshot || needs_timetravel);
1052 :
1053 : /*
1054 : * Adjust xmax of the snapshot builder, we only do that for committed,
1055 : * catalog modifying, transactions, everything else isn't interesting for
1056 : * us since we'll never look at the respective rows.
1057 : */
1058 3986 : if (needs_timetravel &&
1059 3366 : (!TransactionIdIsValid(builder->xmax) ||
1060 1683 : TransactionIdFollowsOrEquals(xmax, builder->xmax)))
1061 : {
1062 1683 : builder->xmax = xmax;
1063 1683 : TransactionIdAdvance(builder->xmax);
1064 : }
1065 :
1066 : /* if there's any reason to build a historic snapshot, do so now */
1067 3986 : if (needs_snapshot)
1068 : {
1069 : /*
1070 : * If we haven't built a complete snapshot yet there's no need to hand
1071 : * it out, it wouldn't (and couldn't) be used anyway.
1072 : */
1073 1683 : if (builder->state < SNAPBUILD_FULL_SNAPSHOT)
1074 0 : return;
1075 :
1076 : /*
1077 : * Decrease the snapshot builder's refcount of the old snapshot, note
1078 : * that it still will be used if it has been handed out to the
1079 : * reorderbuffer earlier.
1080 : */
1081 1683 : if (builder->snapshot)
1082 1683 : SnapBuildSnapDecRefcount(builder->snapshot);
1083 :
1084 1683 : builder->snapshot = SnapBuildBuildSnapshot(builder);
1085 :
1086 : /* we might need to execute invalidations, add snapshot */
1087 1683 : if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
1088 : {
1089 8 : SnapBuildSnapIncRefcount(builder->snapshot);
1090 8 : ReorderBufferSetBaseSnapshot(builder->reorder, xid, lsn,
1091 : builder->snapshot);
1092 : }
1093 :
1094 : /* refcount of the snapshot builder for the new snapshot */
1095 1683 : SnapBuildSnapIncRefcount(builder->snapshot);
1096 :
1097 : /*
1098 : * Add a new catalog snapshot and invalidations messages to all
1099 : * currently running transactions.
1100 : */
1101 1683 : SnapBuildDistributeSnapshotAndInval(builder, lsn, xid);
1102 : }
1103 : }
1104 :
1105 : /*
1106 : * Check the reorder buffer and the snapshot to see if the given transaction has
1107 : * modified catalogs.
1108 : */
1109 : static inline bool
1110 5203 : SnapBuildXidHasCatalogChanges(SnapBuild *builder, TransactionId xid,
1111 : uint32 xinfo)
1112 : {
1113 5203 : if (ReorderBufferXidHasCatalogChanges(builder->reorder, xid))
1114 1687 : return true;
1115 :
1116 : /*
1117 : * The transactions that have changed catalogs must have invalidation
1118 : * info.
1119 : */
1120 3516 : if (!(xinfo & XACT_XINFO_HAS_INVALS))
1121 3508 : return false;
1122 :
1123 : /* Check the catchange XID array */
1124 12 : return ((builder->catchange.xcnt > 0) &&
1125 4 : (bsearch(&xid, builder->catchange.xip, builder->catchange.xcnt,
1126 : sizeof(TransactionId), xidComparator) != NULL));
1127 : }
1128 :
1129 : /* -----------------------------------
1130 : * Snapshot building functions dealing with xlog records
1131 : * -----------------------------------
1132 : */
1133 :
1134 : /*
1135 : * Process a running xacts record, and use its information to first build a
1136 : * historic snapshot and later to release resources that aren't needed
1137 : * anymore.
1138 : */
1139 : void
1140 1710 : SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
1141 : {
1142 : ReorderBufferTXN *txn;
1143 : TransactionId xmin;
1144 :
1145 : /*
1146 : * If we're not consistent yet, inspect the record to see whether it
1147 : * allows to get closer to being consistent. If we are consistent, dump
1148 : * our snapshot so others or we, after a restart, can use it.
1149 : */
1150 1710 : if (builder->state < SNAPBUILD_CONSISTENT)
1151 : {
1152 : /* returns false if there's no point in performing cleanup just yet */
1153 1186 : if (!SnapBuildFindSnapshot(builder, lsn, running))
1154 1162 : return;
1155 : }
1156 : else
1157 524 : SnapBuildSerialize(builder, lsn);
1158 :
1159 : /*
1160 : * Update range of interesting xids based on the running xacts
1161 : * information. We don't increase ->xmax using it, because once we are in
1162 : * a consistent state we can do that ourselves and much more efficiently
1163 : * so, because we only need to do it for catalog transactions since we
1164 : * only ever look at those.
1165 : *
1166 : * NB: We only increase xmax when a catalog modifying transaction commits
1167 : * (see SnapBuildCommitTxn). Because of this, xmax can be lower than
1168 : * xmin, which looks odd but is correct and actually more efficient, since
1169 : * we hit fast paths in heapam_visibility.c.
1170 : */
1171 546 : builder->xmin = running->oldestRunningXid;
1172 :
1173 : /* Remove transactions we don't need to keep track off anymore */
1174 546 : SnapBuildPurgeOlderTxn(builder);
1175 :
1176 : /*
1177 : * Advance the xmin limit for the current replication slot, to allow
1178 : * vacuum to clean up the tuples this slot has been protecting.
1179 : *
1180 : * The reorderbuffer might have an xmin among the currently running
1181 : * snapshots; use it if so. If not, we need only consider the snapshots
1182 : * we'll produce later, which can't be less than the oldest running xid in
1183 : * the record we're reading now.
1184 : */
1185 546 : xmin = ReorderBufferGetOldestXmin(builder->reorder);
1186 546 : if (xmin == InvalidTransactionId)
1187 499 : xmin = running->oldestRunningXid;
1188 546 : elog(DEBUG3, "xmin: %u, xmax: %u, oldest running: %u, oldest xmin: %u",
1189 : builder->xmin, builder->xmax, running->oldestRunningXid, xmin);
1190 546 : LogicalIncreaseXminForSlot(lsn, xmin);
1191 :
1192 : /*
1193 : * Also tell the slot where we can restart decoding from. We don't want to
1194 : * do that after every commit because changing that implies an fsync of
1195 : * the logical slot's state file, so we only do it every time we see a
1196 : * running xacts record.
1197 : *
1198 : * Do so by looking for the oldest in progress transaction (determined by
1199 : * the first LSN of any of its relevant records). Every transaction
1200 : * remembers the last location we stored the snapshot to disk before its
1201 : * beginning. That point is where we can restart from.
1202 : */
1203 :
1204 : /*
1205 : * Can't know about a serialized snapshot's location if we're not
1206 : * consistent.
1207 : */
1208 546 : if (builder->state < SNAPBUILD_CONSISTENT)
1209 17 : return;
1210 :
1211 529 : txn = ReorderBufferGetOldestTXN(builder->reorder);
1212 :
1213 : /*
1214 : * oldest ongoing txn might have started when we didn't yet serialize
1215 : * anything because we hadn't reached a consistent state yet.
1216 : */
1217 529 : if (txn != NULL && XLogRecPtrIsValid(txn->restart_decoding_lsn))
1218 23 : LogicalIncreaseRestartDecodingForSlot(lsn, txn->restart_decoding_lsn);
1219 :
1220 : /*
1221 : * No in-progress transaction, can reuse the last serialized snapshot if
1222 : * we have one.
1223 : */
1224 506 : else if (txn == NULL &&
1225 473 : XLogRecPtrIsValid(builder->reorder->current_restart_decoding_lsn) &&
1226 471 : XLogRecPtrIsValid(builder->last_serialized_snapshot))
1227 471 : LogicalIncreaseRestartDecodingForSlot(lsn,
1228 : builder->last_serialized_snapshot);
1229 : }
1230 :
1231 :
1232 : /*
1233 : * Build the start of a snapshot that's capable of decoding the catalog.
1234 : *
1235 : * Helper function for SnapBuildProcessRunningXacts() while we're not yet
1236 : * consistent.
1237 : *
1238 : * Returns true if there is a point in performing internal maintenance/cleanup
1239 : * using the xl_running_xacts record.
1240 : */
1241 : static bool
1242 1186 : SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
1243 : {
1244 : /* ---
1245 : * Build catalog decoding snapshot incrementally using information about
1246 : * the currently running transactions. There are several ways to do that:
1247 : *
1248 : * a) There were no running transactions when the xl_running_xacts record
1249 : * was inserted, jump to CONSISTENT immediately. We might find such a
1250 : * state while waiting on c)'s sub-states.
1251 : *
1252 : * b) This (in a previous run) or another decoding slot serialized a
1253 : * snapshot to disk that we can use. Can't use this method while finding
1254 : * the start point for decoding changes as the restart LSN would be an
1255 : * arbitrary LSN but we need to find the start point to extract changes
1256 : * where we won't see the data for partial transactions. Also, we cannot
1257 : * use this method when a slot needs a full snapshot for export or direct
1258 : * use, as that snapshot will only contain catalog modifying transactions.
1259 : *
1260 : * c) First incrementally build a snapshot for catalog tuples
1261 : * (BUILDING_SNAPSHOT), that requires all, already in-progress,
1262 : * transactions to finish. Every transaction starting after that
1263 : * (FULL_SNAPSHOT state), has enough information to be decoded. But
1264 : * for older running transactions no viable snapshot exists yet, so
1265 : * CONSISTENT will only be reached once all of those have finished.
1266 : * ---
1267 : */
1268 :
1269 : /*
1270 : * xl_running_xacts record is older than what we can use, we might not
1271 : * have all necessary catalog rows anymore.
1272 : */
1273 1186 : if (TransactionIdIsNormal(builder->initial_xmin_horizon) &&
1274 505 : NormalTransactionIdPrecedes(running->oldestRunningXid,
1275 : builder->initial_xmin_horizon))
1276 : {
1277 0 : ereport(DEBUG1,
1278 : errmsg_internal("skipping snapshot at %X/%08X while building logical decoding snapshot, xmin horizon too low",
1279 : LSN_FORMAT_ARGS(lsn)),
1280 : errdetail_internal("initial xmin horizon of %u vs the snapshot's %u",
1281 : builder->initial_xmin_horizon, running->oldestRunningXid));
1282 :
1283 :
1284 0 : SnapBuildWaitSnapshot(running, builder->initial_xmin_horizon);
1285 :
1286 0 : return true;
1287 : }
1288 :
1289 : /*
1290 : * a) No transaction were running, we can jump to consistent.
1291 : *
1292 : * This is not affected by races around xl_running_xacts, because we can
1293 : * miss transaction commits, but currently not transactions starting.
1294 : *
1295 : * NB: We might have already started to incrementally assemble a snapshot,
1296 : * so we need to be careful to deal with that.
1297 : */
1298 1186 : if (running->oldestRunningXid == running->nextXid)
1299 : {
1300 1154 : if (!XLogRecPtrIsValid(builder->start_decoding_at) ||
1301 655 : builder->start_decoding_at <= lsn)
1302 : /* can decode everything after this */
1303 500 : builder->start_decoding_at = lsn + 1;
1304 :
1305 : /* As no transactions were running xmin/xmax can be trivially set. */
1306 1154 : builder->xmin = running->nextXid; /* < are finished */
1307 1154 : builder->xmax = running->nextXid; /* >= are running */
1308 :
1309 : /* so we can safely use the faster comparisons */
1310 : Assert(TransactionIdIsNormal(builder->xmin));
1311 : Assert(TransactionIdIsNormal(builder->xmax));
1312 :
1313 1154 : builder->state = SNAPBUILD_CONSISTENT;
1314 1154 : builder->next_phase_at = InvalidTransactionId;
1315 :
1316 1154 : ereport(LogicalDecodingLogLevel(),
1317 : errmsg("logical decoding found consistent point at %X/%08X",
1318 : LSN_FORMAT_ARGS(lsn)),
1319 : errdetail("There are no running transactions."));
1320 :
1321 1154 : return false;
1322 : }
1323 :
1324 : /*
1325 : * b) valid on disk state and while neither building full snapshot nor
1326 : * creating a slot.
1327 : */
1328 32 : else if (!builder->building_full_snapshot &&
1329 50 : !builder->in_slot_creation &&
1330 19 : SnapBuildRestore(builder, lsn))
1331 : {
1332 : /* there won't be any state to cleanup */
1333 8 : return false;
1334 : }
1335 :
1336 : /*
1337 : * c) transition from START to BUILDING_SNAPSHOT.
1338 : *
1339 : * In START state, and a xl_running_xacts record with running xacts is
1340 : * encountered. In that case, switch to BUILDING_SNAPSHOT state, and
1341 : * record xl_running_xacts->nextXid. Once all running xacts have finished
1342 : * (i.e. they're all >= nextXid), we have a complete catalog snapshot. It
1343 : * might look that we could use xl_running_xacts's ->xids information to
1344 : * get there quicker, but that is problematic because transactions marked
1345 : * as running, might already have inserted their commit record - it's
1346 : * infeasible to change that with locking.
1347 : */
1348 24 : else if (builder->state == SNAPBUILD_START)
1349 : {
1350 13 : builder->state = SNAPBUILD_BUILDING_SNAPSHOT;
1351 13 : builder->next_phase_at = running->nextXid;
1352 :
1353 : /*
1354 : * Start with an xmin/xmax that's correct for future, when all the
1355 : * currently running transactions have finished. We'll update both
1356 : * while waiting for the pending transactions to finish.
1357 : */
1358 13 : builder->xmin = running->nextXid; /* < are finished */
1359 13 : builder->xmax = running->nextXid; /* >= are running */
1360 :
1361 : /* so we can safely use the faster comparisons */
1362 : Assert(TransactionIdIsNormal(builder->xmin));
1363 : Assert(TransactionIdIsNormal(builder->xmax));
1364 :
1365 13 : ereport(LOG,
1366 : errmsg("logical decoding found initial starting point at %X/%08X",
1367 : LSN_FORMAT_ARGS(lsn)),
1368 : errdetail("Waiting for transactions (approximately %d) older than %u to end.",
1369 : running->xcnt, running->nextXid));
1370 :
1371 13 : SnapBuildWaitSnapshot(running, running->nextXid);
1372 : }
1373 :
1374 : /*
1375 : * c) transition from BUILDING_SNAPSHOT to FULL_SNAPSHOT.
1376 : *
1377 : * In BUILDING_SNAPSHOT state, and this xl_running_xacts' oldestRunningXid
1378 : * is >= than nextXid from when we switched to BUILDING_SNAPSHOT. This
1379 : * means all transactions starting afterwards have enough information to
1380 : * be decoded. Switch to FULL_SNAPSHOT.
1381 : */
1382 17 : else if (builder->state == SNAPBUILD_BUILDING_SNAPSHOT &&
1383 6 : TransactionIdPrecedesOrEquals(builder->next_phase_at,
1384 : running->oldestRunningXid))
1385 : {
1386 5 : builder->state = SNAPBUILD_FULL_SNAPSHOT;
1387 5 : builder->next_phase_at = running->nextXid;
1388 :
1389 5 : ereport(LOG,
1390 : errmsg("logical decoding found initial consistent point at %X/%08X",
1391 : LSN_FORMAT_ARGS(lsn)),
1392 : errdetail("Waiting for transactions (approximately %d) older than %u to end.",
1393 : running->xcnt, running->nextXid));
1394 :
1395 5 : SnapBuildWaitSnapshot(running, running->nextXid);
1396 : }
1397 :
1398 : /*
1399 : * c) transition from FULL_SNAPSHOT to CONSISTENT.
1400 : *
1401 : * In FULL_SNAPSHOT state, and this xl_running_xacts' oldestRunningXid is
1402 : * >= than nextXid from when we switched to FULL_SNAPSHOT. This means all
1403 : * transactions that are currently in progress have a catalog snapshot,
1404 : * and all their changes have been collected. Switch to CONSISTENT.
1405 : */
1406 11 : else if (builder->state == SNAPBUILD_FULL_SNAPSHOT &&
1407 5 : TransactionIdPrecedesOrEquals(builder->next_phase_at,
1408 : running->oldestRunningXid))
1409 : {
1410 5 : builder->state = SNAPBUILD_CONSISTENT;
1411 5 : builder->next_phase_at = InvalidTransactionId;
1412 :
1413 5 : ereport(LogicalDecodingLogLevel(),
1414 : errmsg("logical decoding found consistent point at %X/%08X",
1415 : LSN_FORMAT_ARGS(lsn)),
1416 : errdetail("There are no old transactions anymore."));
1417 : }
1418 :
1419 : /*
1420 : * We already started to track running xacts and need to wait for all
1421 : * in-progress ones to finish. We fall through to the normal processing of
1422 : * records so incremental cleanup can be performed.
1423 : */
1424 22 : return true;
1425 : }
1426 :
1427 : /* ---
1428 : * Iterate through xids in record, wait for all older than the cutoff to
1429 : * finish. Then, if possible, log a new xl_running_xacts record.
1430 : *
1431 : * This isn't required for the correctness of decoding, but to:
1432 : * a) allow isolationtester to notice that we're currently waiting for
1433 : * something.
1434 : * b) log a new xl_running_xacts record where it'd be helpful, without having
1435 : * to wait for bgwriter or checkpointer.
1436 : * ---
1437 : */
1438 : static void
1439 18 : SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff)
1440 : {
1441 : int off;
1442 :
1443 34 : for (off = 0; off < running->xcnt; off++)
1444 : {
1445 18 : TransactionId xid = running->xids[off];
1446 :
1447 : /*
1448 : * Upper layers should prevent that we ever need to wait on ourselves.
1449 : * Check anyway, since failing to do so would either result in an
1450 : * endless wait or an Assert() failure.
1451 : */
1452 18 : if (TransactionIdIsCurrentTransactionId(xid))
1453 0 : elog(ERROR, "waiting for ourselves");
1454 :
1455 18 : if (TransactionIdFollows(xid, cutoff))
1456 0 : continue;
1457 :
1458 18 : XactLockTableWait(xid, NULL, NULL, XLTW_None);
1459 : }
1460 :
1461 : /*
1462 : * All transactions we needed to finish finished - try to ensure there is
1463 : * another xl_running_xacts record in a timely manner, without having to
1464 : * wait for bgwriter or checkpointer to log one. During recovery we can't
1465 : * enforce that, so we'll have to wait.
1466 : */
1467 16 : if (!RecoveryInProgress())
1468 : {
1469 16 : LogStandbySnapshot();
1470 : }
1471 16 : }
1472 :
1473 : #define SnapBuildOnDiskConstantSize \
1474 : offsetof(SnapBuildOnDisk, builder)
1475 : #define SnapBuildOnDiskNotChecksummedSize \
1476 : offsetof(SnapBuildOnDisk, version)
1477 :
1478 : #define SNAPBUILD_MAGIC 0x51A1E001
1479 : #define SNAPBUILD_VERSION 6
1480 :
1481 : /*
1482 : * Store/Load a snapshot from disk, depending on the snapshot builder's state.
1483 : *
1484 : * Supposed to be used by external (i.e. not snapbuild.c) code that just read
1485 : * a record that's a potential location for a serialized snapshot.
1486 : */
1487 : void
1488 93 : SnapBuildSerializationPoint(SnapBuild *builder, XLogRecPtr lsn)
1489 : {
1490 93 : if (builder->state < SNAPBUILD_CONSISTENT)
1491 0 : SnapBuildRestore(builder, lsn);
1492 : else
1493 93 : SnapBuildSerialize(builder, lsn);
1494 93 : }
1495 :
1496 : /*
1497 : * Serialize the snapshot 'builder' at the location 'lsn' if it hasn't already
1498 : * been done by another decoding process.
1499 : */
1500 : static void
1501 617 : SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
1502 : {
1503 : Size needed_length;
1504 617 : SnapBuildOnDisk *ondisk = NULL;
1505 617 : TransactionId *catchange_xip = NULL;
1506 : MemoryContext old_ctx;
1507 : size_t catchange_xcnt;
1508 : char *ondisk_c;
1509 : int fd;
1510 : char tmppath[MAXPGPATH];
1511 : char path[MAXPGPATH];
1512 : int ret;
1513 : struct stat stat_buf;
1514 : Size sz;
1515 :
1516 : Assert(XLogRecPtrIsValid(lsn));
1517 : Assert(!XLogRecPtrIsValid(builder->last_serialized_snapshot) ||
1518 : builder->last_serialized_snapshot <= lsn);
1519 :
1520 : /*
1521 : * no point in serializing if we cannot continue to work immediately after
1522 : * restoring the snapshot
1523 : */
1524 617 : if (builder->state < SNAPBUILD_CONSISTENT)
1525 0 : return;
1526 :
1527 : /* consistent snapshots have no next phase */
1528 : Assert(builder->next_phase_at == InvalidTransactionId);
1529 :
1530 : /*
1531 : * We identify snapshots by the LSN they are valid for. We don't need to
1532 : * include timelines in the name as each LSN maps to exactly one timeline
1533 : * unless the user used pg_resetwal or similar. If a user did so, there's
1534 : * no hope continuing to decode anyway.
1535 : */
1536 617 : sprintf(path, "%s/%X-%X.snap",
1537 : PG_LOGICAL_SNAPSHOTS_DIR,
1538 617 : LSN_FORMAT_ARGS(lsn));
1539 :
1540 : /*
1541 : * first check whether some other backend already has written the snapshot
1542 : * for this LSN. It's perfectly fine if there's none, so we accept ENOENT
1543 : * as a valid state. Everything else is an unexpected error.
1544 : */
1545 617 : ret = stat(path, &stat_buf);
1546 :
1547 617 : if (ret != 0 && errno != ENOENT)
1548 0 : ereport(ERROR,
1549 : (errcode_for_file_access(),
1550 : errmsg("could not stat file \"%s\": %m", path)));
1551 :
1552 617 : else if (ret == 0)
1553 : {
1554 : /*
1555 : * somebody else has already serialized to this point, don't overwrite
1556 : * but remember location, so we don't need to read old data again.
1557 : *
1558 : * To be sure it has been synced to disk after the rename() from the
1559 : * tempfile filename to the real filename, we just repeat the fsync.
1560 : * That ought to be cheap because in most scenarios it should already
1561 : * be safely on disk.
1562 : */
1563 284 : fsync_fname(path, false);
1564 284 : fsync_fname(PG_LOGICAL_SNAPSHOTS_DIR, true);
1565 :
1566 284 : builder->last_serialized_snapshot = lsn;
1567 284 : goto out;
1568 : }
1569 :
1570 : /*
1571 : * there is an obvious race condition here between the time we stat(2) the
1572 : * file and us writing the file. But we rename the file into place
1573 : * atomically and all files created need to contain the same data anyway,
1574 : * so this is perfectly fine, although a bit of a resource waste. Locking
1575 : * seems like pointless complication.
1576 : */
1577 333 : elog(DEBUG1, "serializing snapshot to %s", path);
1578 :
1579 : /* to make sure only we will write to this tempfile, include pid */
1580 333 : sprintf(tmppath, "%s/%X-%X.snap.%d.tmp",
1581 : PG_LOGICAL_SNAPSHOTS_DIR,
1582 333 : LSN_FORMAT_ARGS(lsn), MyProcPid);
1583 :
1584 : /*
1585 : * Unlink temporary file if it already exists, needs to have been before a
1586 : * crash/error since we won't enter this function twice from within a
1587 : * single decoding slot/backend and the temporary file contains the pid of
1588 : * the current process.
1589 : */
1590 333 : if (unlink(tmppath) != 0 && errno != ENOENT)
1591 0 : ereport(ERROR,
1592 : (errcode_for_file_access(),
1593 : errmsg("could not remove file \"%s\": %m", tmppath)));
1594 :
1595 333 : old_ctx = MemoryContextSwitchTo(builder->context);
1596 :
1597 : /* Get the catalog modifying transactions that are yet not committed */
1598 333 : catchange_xip = ReorderBufferGetCatalogChangesXacts(builder->reorder);
1599 333 : catchange_xcnt = dclist_count(&builder->reorder->catchange_txns);
1600 :
1601 333 : needed_length = sizeof(SnapBuildOnDisk) +
1602 333 : sizeof(TransactionId) * (builder->committed.xcnt + catchange_xcnt);
1603 :
1604 333 : ondisk_c = palloc0(needed_length);
1605 333 : ondisk = (SnapBuildOnDisk *) ondisk_c;
1606 333 : ondisk->magic = SNAPBUILD_MAGIC;
1607 333 : ondisk->version = SNAPBUILD_VERSION;
1608 333 : ondisk->length = needed_length;
1609 333 : INIT_CRC32C(ondisk->checksum);
1610 333 : COMP_CRC32C(ondisk->checksum,
1611 : ((char *) ondisk) + SnapBuildOnDiskNotChecksummedSize,
1612 : SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
1613 333 : ondisk_c += sizeof(SnapBuildOnDisk);
1614 :
1615 333 : memcpy(&ondisk->builder, builder, sizeof(SnapBuild));
1616 : /* NULL-ify memory-only data */
1617 333 : ondisk->builder.context = NULL;
1618 333 : ondisk->builder.snapshot = NULL;
1619 333 : ondisk->builder.reorder = NULL;
1620 333 : ondisk->builder.committed.xip = NULL;
1621 333 : ondisk->builder.catchange.xip = NULL;
1622 : /* update catchange only on disk data */
1623 333 : ondisk->builder.catchange.xcnt = catchange_xcnt;
1624 :
1625 333 : COMP_CRC32C(ondisk->checksum,
1626 : &ondisk->builder,
1627 : sizeof(SnapBuild));
1628 :
1629 : /* copy committed xacts */
1630 333 : if (builder->committed.xcnt > 0)
1631 : {
1632 60 : sz = sizeof(TransactionId) * builder->committed.xcnt;
1633 60 : memcpy(ondisk_c, builder->committed.xip, sz);
1634 60 : COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
1635 60 : ondisk_c += sz;
1636 : }
1637 :
1638 : /* copy catalog modifying xacts */
1639 333 : if (catchange_xcnt > 0)
1640 : {
1641 8 : sz = sizeof(TransactionId) * catchange_xcnt;
1642 8 : memcpy(ondisk_c, catchange_xip, sz);
1643 8 : COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
1644 8 : ondisk_c += sz;
1645 : }
1646 :
1647 333 : FIN_CRC32C(ondisk->checksum);
1648 :
1649 : /* we have valid data now, open tempfile and write it there */
1650 333 : fd = OpenTransientFile(tmppath,
1651 : O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
1652 333 : if (fd < 0)
1653 0 : ereport(ERROR,
1654 : (errcode_for_file_access(),
1655 : errmsg("could not open file \"%s\": %m", tmppath)));
1656 :
1657 333 : errno = 0;
1658 333 : pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_WRITE);
1659 333 : if ((write(fd, ondisk, needed_length)) != needed_length)
1660 : {
1661 0 : int save_errno = errno;
1662 :
1663 0 : CloseTransientFile(fd);
1664 :
1665 : /* if write didn't set errno, assume problem is no disk space */
1666 0 : errno = save_errno ? save_errno : ENOSPC;
1667 0 : ereport(ERROR,
1668 : (errcode_for_file_access(),
1669 : errmsg("could not write to file \"%s\": %m", tmppath)));
1670 : }
1671 333 : pgstat_report_wait_end();
1672 :
1673 : /*
1674 : * fsync the file before renaming so that even if we crash after this we
1675 : * have either a fully valid file or nothing.
1676 : *
1677 : * It's safe to just ERROR on fsync() here because we'll retry the whole
1678 : * operation including the writes.
1679 : *
1680 : * TODO: Do the fsync() via checkpoints/restartpoints, doing it here has
1681 : * some noticeable overhead since it's performed synchronously during
1682 : * decoding?
1683 : */
1684 333 : pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_SYNC);
1685 333 : if (pg_fsync(fd) != 0)
1686 : {
1687 0 : int save_errno = errno;
1688 :
1689 0 : CloseTransientFile(fd);
1690 0 : errno = save_errno;
1691 0 : ereport(ERROR,
1692 : (errcode_for_file_access(),
1693 : errmsg("could not fsync file \"%s\": %m", tmppath)));
1694 : }
1695 333 : pgstat_report_wait_end();
1696 :
1697 333 : if (CloseTransientFile(fd) != 0)
1698 0 : ereport(ERROR,
1699 : (errcode_for_file_access(),
1700 : errmsg("could not close file \"%s\": %m", tmppath)));
1701 :
1702 333 : fsync_fname(PG_LOGICAL_SNAPSHOTS_DIR, true);
1703 :
1704 : /*
1705 : * We may overwrite the work from some other backend, but that's ok, our
1706 : * snapshot is valid as well, we'll just have done some superfluous work.
1707 : */
1708 333 : if (rename(tmppath, path) != 0)
1709 : {
1710 0 : ereport(ERROR,
1711 : (errcode_for_file_access(),
1712 : errmsg("could not rename file \"%s\" to \"%s\": %m",
1713 : tmppath, path)));
1714 : }
1715 :
1716 : /* make sure we persist */
1717 333 : fsync_fname(path, false);
1718 333 : fsync_fname(PG_LOGICAL_SNAPSHOTS_DIR, true);
1719 :
1720 : /*
1721 : * Now there's no way we can lose the dumped state anymore, remember this
1722 : * as a serialization point.
1723 : */
1724 333 : builder->last_serialized_snapshot = lsn;
1725 :
1726 333 : MemoryContextSwitchTo(old_ctx);
1727 :
1728 617 : out:
1729 617 : ReorderBufferSetRestartPoint(builder->reorder,
1730 : builder->last_serialized_snapshot);
1731 : /* be tidy */
1732 617 : if (ondisk)
1733 333 : pfree(ondisk);
1734 617 : if (catchange_xip)
1735 8 : pfree(catchange_xip);
1736 : }
1737 :
1738 : /*
1739 : * Restore the logical snapshot file contents to 'ondisk'.
1740 : *
1741 : * 'context' is the memory context where the catalog modifying/committed xid
1742 : * will live.
1743 : * If 'missing_ok' is true, will not throw an error if the file is not found.
1744 : */
1745 : bool
1746 21 : SnapBuildRestoreSnapshot(SnapBuildOnDisk *ondisk, XLogRecPtr lsn,
1747 : MemoryContext context, bool missing_ok)
1748 : {
1749 : int fd;
1750 : pg_crc32c checksum;
1751 : Size sz;
1752 : char path[MAXPGPATH];
1753 :
1754 21 : sprintf(path, "%s/%X-%X.snap",
1755 : PG_LOGICAL_SNAPSHOTS_DIR,
1756 21 : LSN_FORMAT_ARGS(lsn));
1757 :
1758 21 : fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
1759 :
1760 21 : if (fd < 0)
1761 : {
1762 11 : if (missing_ok && errno == ENOENT)
1763 11 : return false;
1764 :
1765 0 : ereport(ERROR,
1766 : (errcode_for_file_access(),
1767 : errmsg("could not open file \"%s\": %m", path)));
1768 : }
1769 :
1770 : /* ----
1771 : * Make sure the snapshot had been stored safely to disk, that's normally
1772 : * cheap.
1773 : * Note that we do not need PANIC here, nobody will be able to use the
1774 : * slot without fsyncing, and saving it won't succeed without an fsync()
1775 : * either...
1776 : * ----
1777 : */
1778 10 : fsync_fname(path, false);
1779 10 : fsync_fname(PG_LOGICAL_SNAPSHOTS_DIR, true);
1780 :
1781 : /* read statically sized portion of snapshot */
1782 10 : SnapBuildRestoreContents(fd, ondisk, SnapBuildOnDiskConstantSize, path);
1783 :
1784 10 : if (ondisk->magic != SNAPBUILD_MAGIC)
1785 0 : ereport(ERROR,
1786 : (errcode(ERRCODE_DATA_CORRUPTED),
1787 : errmsg("snapbuild state file \"%s\" has wrong magic number: %u instead of %u",
1788 : path, ondisk->magic, SNAPBUILD_MAGIC)));
1789 :
1790 10 : if (ondisk->version != SNAPBUILD_VERSION)
1791 0 : ereport(ERROR,
1792 : (errcode(ERRCODE_DATA_CORRUPTED),
1793 : errmsg("snapbuild state file \"%s\" has unsupported version: %u instead of %u",
1794 : path, ondisk->version, SNAPBUILD_VERSION)));
1795 :
1796 10 : INIT_CRC32C(checksum);
1797 10 : COMP_CRC32C(checksum,
1798 : ((char *) ondisk) + SnapBuildOnDiskNotChecksummedSize,
1799 : SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
1800 :
1801 : /* read SnapBuild */
1802 10 : SnapBuildRestoreContents(fd, &ondisk->builder, sizeof(SnapBuild), path);
1803 10 : COMP_CRC32C(checksum, &ondisk->builder, sizeof(SnapBuild));
1804 :
1805 : /* restore committed xacts information */
1806 10 : if (ondisk->builder.committed.xcnt > 0)
1807 : {
1808 4 : sz = sizeof(TransactionId) * ondisk->builder.committed.xcnt;
1809 4 : ondisk->builder.committed.xip = MemoryContextAllocZero(context, sz);
1810 4 : SnapBuildRestoreContents(fd, ondisk->builder.committed.xip, sz, path);
1811 4 : COMP_CRC32C(checksum, ondisk->builder.committed.xip, sz);
1812 : }
1813 :
1814 : /* restore catalog modifying xacts information */
1815 10 : if (ondisk->builder.catchange.xcnt > 0)
1816 : {
1817 5 : sz = sizeof(TransactionId) * ondisk->builder.catchange.xcnt;
1818 5 : ondisk->builder.catchange.xip = MemoryContextAllocZero(context, sz);
1819 5 : SnapBuildRestoreContents(fd, ondisk->builder.catchange.xip, sz, path);
1820 5 : COMP_CRC32C(checksum, ondisk->builder.catchange.xip, sz);
1821 : }
1822 :
1823 10 : if (CloseTransientFile(fd) != 0)
1824 0 : ereport(ERROR,
1825 : (errcode_for_file_access(),
1826 : errmsg("could not close file \"%s\": %m", path)));
1827 :
1828 10 : FIN_CRC32C(checksum);
1829 :
1830 : /* verify checksum of what we've read */
1831 10 : if (!EQ_CRC32C(checksum, ondisk->checksum))
1832 0 : ereport(ERROR,
1833 : (errcode(ERRCODE_DATA_CORRUPTED),
1834 : errmsg("checksum mismatch for snapbuild state file \"%s\": is %u, should be %u",
1835 : path, checksum, ondisk->checksum)));
1836 :
1837 10 : return true;
1838 : }
1839 :
1840 : /*
1841 : * Restore a snapshot into 'builder' if previously one has been stored at the
1842 : * location indicated by 'lsn'. Returns true if successful, false otherwise.
1843 : */
1844 : static bool
1845 19 : SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
1846 : {
1847 : SnapBuildOnDisk ondisk;
1848 :
1849 : /* no point in loading a snapshot if we're already there */
1850 19 : if (builder->state == SNAPBUILD_CONSISTENT)
1851 0 : return false;
1852 :
1853 : /* validate and restore the snapshot to 'ondisk' */
1854 19 : if (!SnapBuildRestoreSnapshot(&ondisk, lsn, builder->context, true))
1855 11 : return false;
1856 :
1857 : /*
1858 : * ok, we now have a sensible snapshot here, figure out if it has more
1859 : * information than we have.
1860 : */
1861 :
1862 : /*
1863 : * We are only interested in consistent snapshots for now, comparing
1864 : * whether one incomplete snapshot is more "advanced" seems to be
1865 : * unnecessarily complex.
1866 : */
1867 8 : if (ondisk.builder.state < SNAPBUILD_CONSISTENT)
1868 0 : goto snapshot_not_interesting;
1869 :
1870 : /*
1871 : * Don't use a snapshot that requires an xmin that we cannot guarantee to
1872 : * be available.
1873 : */
1874 8 : if (TransactionIdPrecedes(ondisk.builder.xmin, builder->initial_xmin_horizon))
1875 0 : goto snapshot_not_interesting;
1876 :
1877 : /*
1878 : * Consistent snapshots have no next phase. Reset next_phase_at as it is
1879 : * possible that an old value may remain.
1880 : */
1881 : Assert(ondisk.builder.next_phase_at == InvalidTransactionId);
1882 8 : builder->next_phase_at = InvalidTransactionId;
1883 :
1884 : /* ok, we think the snapshot is sensible, copy over everything important */
1885 8 : builder->xmin = ondisk.builder.xmin;
1886 8 : builder->xmax = ondisk.builder.xmax;
1887 8 : builder->state = ondisk.builder.state;
1888 :
1889 8 : builder->committed.xcnt = ondisk.builder.committed.xcnt;
1890 : /* We only allocated/stored xcnt, not xcnt_space xids ! */
1891 : /* don't overwrite preallocated xip, if we don't have anything here */
1892 8 : if (builder->committed.xcnt > 0)
1893 : {
1894 2 : pfree(builder->committed.xip);
1895 2 : builder->committed.xcnt_space = ondisk.builder.committed.xcnt;
1896 2 : builder->committed.xip = ondisk.builder.committed.xip;
1897 : }
1898 8 : ondisk.builder.committed.xip = NULL;
1899 :
1900 : /* set catalog modifying transactions */
1901 8 : if (builder->catchange.xip)
1902 0 : pfree(builder->catchange.xip);
1903 8 : builder->catchange.xcnt = ondisk.builder.catchange.xcnt;
1904 8 : builder->catchange.xip = ondisk.builder.catchange.xip;
1905 8 : ondisk.builder.catchange.xip = NULL;
1906 :
1907 : /* our snapshot is not interesting anymore, build a new one */
1908 8 : if (builder->snapshot != NULL)
1909 : {
1910 0 : SnapBuildSnapDecRefcount(builder->snapshot);
1911 : }
1912 8 : builder->snapshot = SnapBuildBuildSnapshot(builder);
1913 8 : SnapBuildSnapIncRefcount(builder->snapshot);
1914 :
1915 8 : ReorderBufferSetRestartPoint(builder->reorder, lsn);
1916 :
1917 : Assert(builder->state == SNAPBUILD_CONSISTENT);
1918 :
1919 8 : ereport(LogicalDecodingLogLevel(),
1920 : errmsg("logical decoding found consistent point at %X/%08X",
1921 : LSN_FORMAT_ARGS(lsn)),
1922 : errdetail("Logical decoding will begin using saved snapshot."));
1923 8 : return true;
1924 :
1925 0 : snapshot_not_interesting:
1926 0 : if (ondisk.builder.committed.xip != NULL)
1927 0 : pfree(ondisk.builder.committed.xip);
1928 0 : if (ondisk.builder.catchange.xip != NULL)
1929 0 : pfree(ondisk.builder.catchange.xip);
1930 0 : return false;
1931 : }
1932 :
1933 : /*
1934 : * Read the contents of the serialized snapshot to 'dest'.
1935 : */
1936 : static void
1937 29 : SnapBuildRestoreContents(int fd, void *dest, Size size, const char *path)
1938 : {
1939 : int readBytes;
1940 :
1941 29 : pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ);
1942 29 : readBytes = read(fd, dest, size);
1943 29 : pgstat_report_wait_end();
1944 29 : if (readBytes != size)
1945 : {
1946 0 : int save_errno = errno;
1947 :
1948 0 : CloseTransientFile(fd);
1949 :
1950 0 : if (readBytes < 0)
1951 : {
1952 0 : errno = save_errno;
1953 0 : ereport(ERROR,
1954 : (errcode_for_file_access(),
1955 : errmsg("could not read file \"%s\": %m", path)));
1956 : }
1957 : else
1958 0 : ereport(ERROR,
1959 : (errcode(ERRCODE_DATA_CORRUPTED),
1960 : errmsg("could not read file \"%s\": read %d of %zu",
1961 : path, readBytes, size)));
1962 : }
1963 29 : }
1964 :
1965 : /*
1966 : * Remove all serialized snapshots that are not required anymore because no
1967 : * slot can need them. This doesn't actually have to run during a checkpoint,
1968 : * but it's a convenient point to schedule this.
1969 : *
1970 : * NB: We run this during checkpoints even if logical decoding is disabled so
1971 : * we cleanup old slots at some point after it got disabled.
1972 : */
1973 : void
1974 1952 : CheckPointSnapBuild(void)
1975 : {
1976 : XLogRecPtr cutoff;
1977 : XLogRecPtr redo;
1978 : DIR *snap_dir;
1979 : struct dirent *snap_de;
1980 : char path[MAXPGPATH + sizeof(PG_LOGICAL_SNAPSHOTS_DIR)];
1981 :
1982 : /*
1983 : * We start off with a minimum of the last redo pointer. No new
1984 : * replication slot will start before that, so that's a safe upper bound
1985 : * for removal.
1986 : */
1987 1952 : redo = GetRedoRecPtr();
1988 :
1989 : /* now check for the restart ptrs from existing slots */
1990 1952 : cutoff = ReplicationSlotsComputeLogicalRestartLSN();
1991 :
1992 : /* don't start earlier than the restart lsn */
1993 1952 : if (redo < cutoff)
1994 1 : cutoff = redo;
1995 :
1996 1952 : snap_dir = AllocateDir(PG_LOGICAL_SNAPSHOTS_DIR);
1997 6160 : while ((snap_de = ReadDir(snap_dir, PG_LOGICAL_SNAPSHOTS_DIR)) != NULL)
1998 : {
1999 : uint32 hi;
2000 : uint32 lo;
2001 : XLogRecPtr lsn;
2002 : PGFileType de_type;
2003 :
2004 4208 : if (strcmp(snap_de->d_name, ".") == 0 ||
2005 2256 : strcmp(snap_de->d_name, "..") == 0)
2006 3904 : continue;
2007 :
2008 304 : snprintf(path, sizeof(path), "%s/%s", PG_LOGICAL_SNAPSHOTS_DIR, snap_de->d_name);
2009 304 : de_type = get_dirent_type(path, snap_de, false, DEBUG1);
2010 :
2011 304 : if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_REG)
2012 : {
2013 0 : elog(DEBUG1, "only regular files expected: %s", path);
2014 0 : continue;
2015 : }
2016 :
2017 : /*
2018 : * temporary filenames from SnapBuildSerialize() include the LSN and
2019 : * everything but are postfixed by .$pid.tmp. We can just remove them
2020 : * the same as other files because there can be none that are
2021 : * currently being written that are older than cutoff.
2022 : *
2023 : * We just log a message if a file doesn't fit the pattern, it's
2024 : * probably some editors lock/state file or similar...
2025 : */
2026 304 : if (sscanf(snap_de->d_name, "%X-%X.snap", &hi, &lo) != 2)
2027 : {
2028 0 : ereport(LOG,
2029 : (errmsg("could not parse file name \"%s\"", path)));
2030 0 : continue;
2031 : }
2032 :
2033 304 : lsn = ((uint64) hi) << 32 | lo;
2034 :
2035 : /* check whether we still need it */
2036 304 : if (lsn < cutoff || !XLogRecPtrIsValid(cutoff))
2037 : {
2038 201 : elog(DEBUG1, "removing snapbuild snapshot %s", path);
2039 :
2040 : /*
2041 : * It's not particularly harmful, though strange, if we can't
2042 : * remove the file here. Don't prevent the checkpoint from
2043 : * completing, that'd be a cure worse than the disease.
2044 : */
2045 201 : if (unlink(path) < 0)
2046 : {
2047 0 : ereport(LOG,
2048 : (errcode_for_file_access(),
2049 : errmsg("could not remove file \"%s\": %m",
2050 : path)));
2051 0 : continue;
2052 : }
2053 : }
2054 : }
2055 1952 : FreeDir(snap_dir);
2056 1952 : }
2057 :
2058 : /*
2059 : * Check if a logical snapshot at the specified point has been serialized.
2060 : */
2061 : bool
2062 16 : SnapBuildSnapshotExists(XLogRecPtr lsn)
2063 : {
2064 : char path[MAXPGPATH];
2065 : int ret;
2066 : struct stat stat_buf;
2067 :
2068 16 : sprintf(path, "%s/%X-%X.snap",
2069 : PG_LOGICAL_SNAPSHOTS_DIR,
2070 16 : LSN_FORMAT_ARGS(lsn));
2071 :
2072 16 : ret = stat(path, &stat_buf);
2073 :
2074 16 : if (ret != 0 && errno != ENOENT)
2075 0 : ereport(ERROR,
2076 : (errcode_for_file_access(),
2077 : errmsg("could not stat file \"%s\": %m", path)));
2078 :
2079 16 : return ret == 0;
2080 : }
|