Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * snapbuild.c
4 : *
5 : * Infrastructure for building historic catalog snapshots based on contents
6 : * of the WAL, for the purpose of decoding heapam.c style values in the
7 : * WAL.
8 : *
9 : * NOTES:
10 : *
11 : * We build snapshots which can *only* be used to read catalog contents and we
12 : * do so by reading and interpreting the WAL stream. The aim is to build a
13 : * snapshot that behaves the same as a freshly taken MVCC snapshot would have
14 : * at the time the XLogRecord was generated.
15 : *
16 : * To build the snapshots we reuse the infrastructure built for Hot
17 : * Standby. The in-memory snapshots we build look different than HS' because
18 : * we have different needs. To successfully decode data from the WAL we only
19 : * need to access catalog tables and (sys|rel|cat)cache, not the actual user
20 : * tables since the data we decode is wholly contained in the WAL
21 : * records. Also, our snapshots need to be different in comparison to normal
22 : * MVCC ones because in contrast to those we cannot fully rely on the clog and
23 : * pg_subtrans for information about committed transactions because they might
24 : * commit in the future from the POV of the WAL entry we're currently
25 : * decoding. This definition has the advantage that we only need to prevent
26 : * removal of catalog rows, while normal table's rows can still be
27 : * removed. This is achieved by using the replication slot mechanism.
28 : *
29 : * As the percentage of transactions modifying the catalog normally is fairly
30 : * small in comparisons to ones only manipulating user data, we keep track of
31 : * the committed catalog modifying ones inside [xmin, xmax) instead of keeping
32 : * track of all running transactions like it's done in a normal snapshot. Note
33 : * that we're generally only looking at transactions that have acquired an
34 : * xid. That is we keep a list of transactions between snapshot->(xmin, xmax)
35 : * that we consider committed, everything else is considered aborted/in
36 : * progress. That also allows us not to care about subtransactions before they
37 : * have committed which means this module, in contrast to HS, doesn't have to
38 : * care about suboverflowed subtransactions and similar.
39 : *
40 : * One complexity of doing this is that to e.g. handle mixed DDL/DML
41 : * transactions we need Snapshots that see intermediate versions of the
42 : * catalog in a transaction. During normal operation this is achieved by using
43 : * CommandIds/cmin/cmax. The problem with that however is that for space
44 : * efficiency reasons, the cmin and cmax are not included in WAL records. We
45 : * cannot read the cmin/cmax from the tuple itself, either, because it is
46 : * reset on crash recovery. Even if we could, we could not decode combocids
47 : * which are only tracked in the original backend's memory. To work around
48 : * that, heapam writes an extra WAL record (XLOG_HEAP2_NEW_CID) every time a
49 : * catalog row is modified, which includes the cmin and cmax of the
50 : * tuple. During decoding, we insert the ctid->(cmin,cmax) mappings into the
51 : * reorder buffer, and use them at visibility checks instead of the cmin/cmax
52 : * on the tuple itself. Check the reorderbuffer.c's comment above
53 : * ResolveCminCmaxDuringDecoding() for details.
54 : *
55 : * To facilitate all this we need our own visibility routine, as the normal
56 : * ones are optimized for different usecases.
57 : *
58 : * To replace the normal catalog snapshots with decoding ones use the
59 : * SetupHistoricSnapshot() and TeardownHistoricSnapshot() functions.
60 : *
61 : *
62 : *
63 : * The snapbuild machinery is starting up in several stages, as illustrated
64 : * by the following graph describing the SnapBuild->state transitions:
65 : *
66 : * +-------------------------+
67 : * +----| START |-------------+
68 : * | +-------------------------+ |
69 : * | | |
70 : * | | |
71 : * | running_xacts #1 |
72 : * | | |
73 : * | | |
74 : * | v |
75 : * | +-------------------------+ v
76 : * | | BUILDING_SNAPSHOT |------------>|
77 : * | +-------------------------+ |
78 : * | | |
79 : * | | |
80 : * | running_xacts #2, xacts from #1 finished |
81 : * | | |
82 : * | | |
83 : * | v |
84 : * | +-------------------------+ v
85 : * | | FULL_SNAPSHOT |------------>|
86 : * | +-------------------------+ |
87 : * | | |
88 : * running_xacts | saved snapshot
89 : * with zero xacts | at running_xacts's lsn
90 : * | | |
91 : * | running_xacts with xacts from #2 finished |
92 : * | | |
93 : * | v |
94 : * | +-------------------------+ |
95 : * +--->|SNAPBUILD_CONSISTENT |<------------+
96 : * +-------------------------+
97 : *
98 : * Initially the machinery is in the START stage. When an xl_running_xacts
99 : * record is read that is sufficiently new (above the safe xmin horizon),
100 : * there's a state transition. If there were no running xacts when the
101 : * xl_running_xacts record was generated, we'll directly go into CONSISTENT
102 : * state, otherwise we'll switch to the BUILDING_SNAPSHOT state. Having a full
103 : * snapshot means that all transactions that start henceforth can be decoded
104 : * in their entirety, but transactions that started previously can't. In
105 : * FULL_SNAPSHOT we'll switch into CONSISTENT once all those previously
106 : * running transactions have committed or aborted.
107 : *
108 : * Only transactions that commit after CONSISTENT state has been reached will
109 : * be replayed, even though they might have started while still in
110 : * FULL_SNAPSHOT. That ensures that we'll reach a point where no previous
111 : * changes has been exported, but all the following ones will be. That point
112 : * is a convenient point to initialize replication from, which is why we
113 : * export a snapshot at that point, which *can* be used to read normal data.
114 : *
115 : * Copyright (c) 2012-2026, PostgreSQL Global Development Group
116 : *
117 : * IDENTIFICATION
118 : * src/backend/replication/logical/snapbuild.c
119 : *
120 : *-------------------------------------------------------------------------
121 : */
122 :
123 : #include "postgres.h"
124 :
125 : #include <sys/stat.h>
126 : #include <unistd.h>
127 :
128 : #include "access/heapam_xlog.h"
129 : #include "access/transam.h"
130 : #include "access/xact.h"
131 : #include "common/file_utils.h"
132 : #include "miscadmin.h"
133 : #include "pgstat.h"
134 : #include "replication/logical.h"
135 : #include "replication/reorderbuffer.h"
136 : #include "replication/snapbuild.h"
137 : #include "replication/snapbuild_internal.h"
138 : #include "storage/fd.h"
139 : #include "storage/lmgr.h"
140 : #include "storage/proc.h"
141 : #include "storage/procarray.h"
142 : #include "storage/standby.h"
143 : #include "utils/builtins.h"
144 : #include "utils/memutils.h"
145 : #include "utils/snapmgr.h"
146 : #include "utils/snapshot.h"
147 : #include "utils/wait_event.h"
148 :
149 :
150 : /*
151 : * Starting a transaction -- which we need to do while exporting a snapshot --
152 : * removes knowledge about the previously used resowner, so we save it here.
153 : */
154 : static ResourceOwner SavedResourceOwnerDuringExport = NULL;
155 : static bool ExportInProgress = false;
156 :
157 : /* ->committed and ->catchange manipulation */
158 : static void SnapBuildPurgeOlderTxn(SnapBuild *builder);
159 :
160 : /* snapshot building/manipulation/distribution functions */
161 : static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder);
162 :
163 : static void SnapBuildFreeSnapshot(Snapshot snap);
164 :
165 : static void SnapBuildSnapIncRefcount(Snapshot snap);
166 :
167 : static void SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid);
168 :
169 : static inline bool SnapBuildXidHasCatalogChanges(SnapBuild *builder, TransactionId xid,
170 : uint32 xinfo);
171 :
172 : /* xlog reading helper functions for SnapBuildProcessRunningXacts */
173 : static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running);
174 : static void SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff);
175 :
176 : /* serialization functions */
177 : static void SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn);
178 : static bool SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn);
179 : static void SnapBuildRestoreContents(int fd, void *dest, Size size, const char *path);
180 :
181 : /*
182 : * Allocate a new snapshot builder.
183 : *
184 : * xmin_horizon is the xid >= which we can be sure no catalog rows have been
185 : * removed, start_lsn is the LSN >= we want to replay commits.
186 : */
187 : SnapBuild *
188 1170 : AllocateSnapshotBuilder(ReorderBuffer *reorder,
189 : TransactionId xmin_horizon,
190 : XLogRecPtr start_lsn,
191 : bool need_full_snapshot,
192 : bool in_slot_creation,
193 : XLogRecPtr two_phase_at)
194 : {
195 : MemoryContext context;
196 : MemoryContext oldcontext;
197 : SnapBuild *builder;
198 :
199 : /* allocate memory in own context, to have better accountability */
200 1170 : context = AllocSetContextCreate(CurrentMemoryContext,
201 : "snapshot builder context",
202 : ALLOCSET_DEFAULT_SIZES);
203 1170 : oldcontext = MemoryContextSwitchTo(context);
204 :
205 1170 : builder = palloc0_object(SnapBuild);
206 :
207 1170 : builder->state = SNAPBUILD_START;
208 1170 : builder->context = context;
209 1170 : builder->reorder = reorder;
210 : /* Other struct members initialized by zeroing via palloc0 above */
211 :
212 1170 : builder->committed.xcnt = 0;
213 1170 : builder->committed.xcnt_space = 128; /* arbitrary number */
214 1170 : builder->committed.xip =
215 1170 : palloc0_array(TransactionId, builder->committed.xcnt_space);
216 1170 : builder->committed.includes_all_transactions = true;
217 :
218 1170 : builder->catchange.xcnt = 0;
219 1170 : builder->catchange.xip = NULL;
220 :
221 1170 : builder->initial_xmin_horizon = xmin_horizon;
222 1170 : builder->start_decoding_at = start_lsn;
223 1170 : builder->in_slot_creation = in_slot_creation;
224 1170 : builder->building_full_snapshot = need_full_snapshot;
225 1170 : builder->two_phase_at = two_phase_at;
226 :
227 1170 : MemoryContextSwitchTo(oldcontext);
228 :
229 1170 : return builder;
230 : }
231 :
232 : /*
233 : * Free a snapshot builder.
234 : */
235 : void
236 924 : FreeSnapshotBuilder(SnapBuild *builder)
237 : {
238 924 : MemoryContext context = builder->context;
239 :
240 : /* free snapshot explicitly, that contains some error checking */
241 924 : if (builder->snapshot != NULL)
242 : {
243 223 : SnapBuildSnapDecRefcount(builder->snapshot);
244 223 : builder->snapshot = NULL;
245 : }
246 :
247 : /* other resources are deallocated via memory context reset */
248 924 : MemoryContextDelete(context);
249 924 : }
250 :
251 : /*
252 : * Free an unreferenced snapshot that has previously been built by us.
253 : */
254 : static void
255 1490 : SnapBuildFreeSnapshot(Snapshot snap)
256 : {
257 : /* make sure we don't get passed an external snapshot */
258 : Assert(snap->snapshot_type == SNAPSHOT_HISTORIC_MVCC);
259 :
260 : /* make sure nobody modified our snapshot */
261 : Assert(snap->curcid == FirstCommandId);
262 : Assert(!snap->suboverflowed);
263 : Assert(!snap->takenDuringRecovery);
264 : Assert(snap->regd_count == 0);
265 :
266 : /* slightly more likely, so it's checked even without c-asserts */
267 1490 : if (snap->copied)
268 0 : elog(ERROR, "cannot free a copied snapshot");
269 :
270 1490 : if (snap->active_count)
271 0 : elog(ERROR, "cannot free an active snapshot");
272 :
273 1490 : pfree(snap);
274 1490 : }
275 :
276 : /*
277 : * In which state of snapshot building are we?
278 : */
279 : SnapBuildState
280 2135448 : SnapBuildCurrentState(SnapBuild *builder)
281 : {
282 2135448 : return builder->state;
283 : }
284 :
285 : /*
286 : * Return the LSN at which the two-phase decoding was first enabled.
287 : */
288 : XLogRecPtr
289 34 : SnapBuildGetTwoPhaseAt(SnapBuild *builder)
290 : {
291 34 : return builder->two_phase_at;
292 : }
293 :
294 : /*
295 : * Set the LSN at which two-phase decoding is enabled.
296 : */
297 : void
298 8 : SnapBuildSetTwoPhaseAt(SnapBuild *builder, XLogRecPtr ptr)
299 : {
300 8 : builder->two_phase_at = ptr;
301 8 : }
302 :
303 : /*
304 : * Should the contents of transaction ending at 'ptr' be decoded?
305 : */
306 : bool
307 508539 : SnapBuildXactNeedsSkip(SnapBuild *builder, XLogRecPtr ptr)
308 : {
309 508539 : return ptr < builder->start_decoding_at;
310 : }
311 :
312 : /*
313 : * Increase refcount of a snapshot.
314 : *
315 : * This is used when handing out a snapshot to some external resource or when
316 : * adding a Snapshot as builder->snapshot.
317 : */
318 : static void
319 6549 : SnapBuildSnapIncRefcount(Snapshot snap)
320 : {
321 6549 : snap->active_count++;
322 6549 : }
323 :
324 : /*
325 : * Decrease refcount of a snapshot and free if the refcount reaches zero.
326 : *
327 : * Externally visible, so that external resources that have been handed an
328 : * IncRef'ed Snapshot can adjust its refcount easily.
329 : */
330 : void
331 6264 : SnapBuildSnapDecRefcount(Snapshot snap)
332 : {
333 : /* make sure we don't get passed an external snapshot */
334 : Assert(snap->snapshot_type == SNAPSHOT_HISTORIC_MVCC);
335 :
336 : /* make sure nobody modified our snapshot */
337 : Assert(snap->curcid == FirstCommandId);
338 : Assert(!snap->suboverflowed);
339 : Assert(!snap->takenDuringRecovery);
340 :
341 : Assert(snap->regd_count == 0);
342 :
343 : Assert(snap->active_count > 0);
344 :
345 : /* slightly more likely, so it's checked even without casserts */
346 6264 : if (snap->copied)
347 0 : elog(ERROR, "cannot free a copied snapshot");
348 :
349 6264 : snap->active_count--;
350 6264 : if (snap->active_count == 0)
351 1490 : SnapBuildFreeSnapshot(snap);
352 6264 : }
353 :
354 : /*
355 : * Build a new snapshot, based on currently committed catalog-modifying
356 : * transactions.
357 : *
358 : * In-progress transactions with catalog access are *not* allowed to modify
359 : * these snapshots; they have to copy them and fill in appropriate ->curcid
360 : * and ->subxip/subxcnt values.
361 : */
362 : static Snapshot
363 1955 : SnapBuildBuildSnapshot(SnapBuild *builder)
364 : {
365 : Snapshot snapshot;
366 : Size ssize;
367 :
368 : Assert(builder->state >= SNAPBUILD_FULL_SNAPSHOT);
369 :
370 1955 : ssize = sizeof(SnapshotData)
371 1955 : + sizeof(TransactionId) * builder->committed.xcnt
372 1955 : + sizeof(TransactionId) * 1 /* toplevel xid */ ;
373 :
374 1955 : snapshot = MemoryContextAllocZero(builder->context, ssize);
375 :
376 1955 : snapshot->snapshot_type = SNAPSHOT_HISTORIC_MVCC;
377 :
378 : /*
379 : * We misuse the original meaning of SnapshotData's xip and subxip fields
380 : * to make the more fitting for our needs.
381 : *
382 : * In the 'xip' array we store transactions that have to be treated as
383 : * committed. Since we will only ever look at tuples from transactions
384 : * that have modified the catalog it's more efficient to store those few
385 : * that exist between xmin and xmax (frequently there are none).
386 : *
387 : * Snapshots that are used in transactions that have modified the catalog
388 : * also use the 'subxip' array to store their toplevel xid and all the
389 : * subtransaction xids so we can recognize when we need to treat rows as
390 : * visible that are not in xip but still need to be visible. Subxip only
391 : * gets filled when the transaction is copied into the context of a
392 : * catalog modifying transaction since we otherwise share a snapshot
393 : * between transactions. As long as a txn hasn't modified the catalog it
394 : * doesn't need to treat any uncommitted rows as visible, so there is no
395 : * need for those xids.
396 : *
397 : * Both arrays are qsort'ed so that we can use bsearch() on them.
398 : */
399 : Assert(TransactionIdIsNormal(builder->xmin));
400 : Assert(TransactionIdIsNormal(builder->xmax));
401 :
402 1955 : snapshot->xmin = builder->xmin;
403 1955 : snapshot->xmax = builder->xmax;
404 :
405 : /* store all transactions to be treated as committed by this snapshot */
406 1955 : snapshot->xip =
407 1955 : (TransactionId *) ((char *) snapshot + sizeof(SnapshotData));
408 1955 : snapshot->xcnt = builder->committed.xcnt;
409 1955 : memcpy(snapshot->xip,
410 1955 : builder->committed.xip,
411 1955 : builder->committed.xcnt * sizeof(TransactionId));
412 :
413 : /* sort so we can bsearch() */
414 1955 : qsort(snapshot->xip, snapshot->xcnt, sizeof(TransactionId), xidComparator);
415 :
416 : /*
417 : * Initially, subxip is empty, i.e. it's a snapshot to be used by
418 : * transactions that don't modify the catalog. Will be filled by
419 : * ReorderBufferCopySnap() if necessary.
420 : */
421 1955 : snapshot->subxcnt = 0;
422 1955 : snapshot->subxip = NULL;
423 :
424 1955 : snapshot->suboverflowed = false;
425 1955 : snapshot->takenDuringRecovery = false;
426 1955 : snapshot->copied = false;
427 1955 : snapshot->curcid = FirstCommandId;
428 1955 : snapshot->active_count = 0;
429 1955 : snapshot->regd_count = 0;
430 1955 : snapshot->snapXactCompletionCount = 0;
431 :
432 1955 : return snapshot;
433 : }
434 :
435 : /*
436 : * Build the initial slot snapshot and convert it to a normal snapshot that
437 : * is understood by HeapTupleSatisfiesMVCC.
438 : *
439 : * The snapshot will be usable directly in current transaction or exported
440 : * for loading in different transaction.
441 : */
442 : Snapshot
443 210 : SnapBuildInitialSnapshot(SnapBuild *builder)
444 : {
445 : Snapshot snap;
446 : TransactionId xid;
447 : TransactionId safeXid;
448 : TransactionId *newxip;
449 210 : int newxcnt = 0;
450 :
451 : Assert(XactIsoLevel == XACT_REPEATABLE_READ);
452 : Assert(builder->building_full_snapshot);
453 :
454 : /* don't allow older snapshots */
455 210 : InvalidateCatalogSnapshot(); /* about to overwrite MyProc->xmin */
456 210 : if (HaveRegisteredOrActiveSnapshot())
457 0 : elog(ERROR, "cannot build an initial slot snapshot when snapshots exist");
458 : Assert(!HistoricSnapshotActive());
459 :
460 210 : if (builder->state != SNAPBUILD_CONSISTENT)
461 0 : elog(ERROR, "cannot build an initial slot snapshot before reaching a consistent state");
462 :
463 210 : if (!builder->committed.includes_all_transactions)
464 0 : elog(ERROR, "cannot build an initial slot snapshot, not all transactions are monitored anymore");
465 :
466 : /* so we don't overwrite the existing value */
467 210 : if (TransactionIdIsValid(MyProc->xmin))
468 0 : elog(ERROR, "cannot build an initial slot snapshot when MyProc->xmin already is valid");
469 :
470 210 : snap = SnapBuildBuildSnapshot(builder);
471 :
472 : /*
473 : * We know that snap->xmin is alive, enforced by the logical xmin
474 : * mechanism. Due to that we can do this without locks, we're only
475 : * changing our own value.
476 : *
477 : * Building an initial snapshot is expensive and an unenforced xmin
478 : * horizon would have bad consequences, therefore always double-check that
479 : * the horizon is enforced.
480 : */
481 210 : LWLockAcquire(ProcArrayLock, LW_SHARED);
482 210 : safeXid = GetOldestSafeDecodingTransactionId(false);
483 210 : LWLockRelease(ProcArrayLock);
484 :
485 210 : if (TransactionIdFollows(safeXid, snap->xmin))
486 0 : elog(ERROR, "cannot build an initial slot snapshot as oldest safe xid %u follows snapshot's xmin %u",
487 : safeXid, snap->xmin);
488 :
489 210 : MyProc->xmin = snap->xmin;
490 :
491 : /* allocate in transaction context */
492 210 : newxip = palloc_array(TransactionId, GetMaxSnapshotXidCount());
493 :
494 : /*
495 : * snapbuild.c builds transactions in an "inverted" manner, which means it
496 : * stores committed transactions in ->xip, not ones in progress. Build a
497 : * classical snapshot by marking all non-committed transactions as
498 : * in-progress. This can be expensive.
499 : */
500 210 : for (xid = snap->xmin; NormalTransactionIdPrecedes(xid, snap->xmax);)
501 : {
502 : void *test;
503 :
504 : /*
505 : * Check whether transaction committed using the decoding snapshot
506 : * meaning of ->xip.
507 : */
508 0 : test = bsearch(&xid, snap->xip, snap->xcnt,
509 : sizeof(TransactionId), xidComparator);
510 :
511 0 : if (test == NULL)
512 : {
513 0 : if (newxcnt >= GetMaxSnapshotXidCount())
514 0 : ereport(ERROR,
515 : (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
516 : errmsg("initial slot snapshot too large")));
517 :
518 0 : newxip[newxcnt++] = xid;
519 : }
520 :
521 0 : TransactionIdAdvance(xid);
522 : }
523 :
524 : /* adjust remaining snapshot fields as needed */
525 210 : snap->snapshot_type = SNAPSHOT_MVCC;
526 210 : snap->xcnt = newxcnt;
527 210 : snap->xip = newxip;
528 :
529 210 : return snap;
530 : }
531 :
532 : /*
533 : * Export a snapshot so it can be set in another session with SET TRANSACTION
534 : * SNAPSHOT.
535 : *
536 : * For that we need to start a transaction in the current backend as the
537 : * importing side checks whether the source transaction is still open to make
538 : * sure the xmin horizon hasn't advanced since then.
539 : */
540 : const char *
541 1 : SnapBuildExportSnapshot(SnapBuild *builder)
542 : {
543 : Snapshot snap;
544 : char *snapname;
545 :
546 1 : if (IsTransactionOrTransactionBlock())
547 0 : elog(ERROR, "cannot export a snapshot from within a transaction");
548 :
549 1 : if (SavedResourceOwnerDuringExport)
550 0 : elog(ERROR, "can only export one snapshot at a time");
551 :
552 1 : SavedResourceOwnerDuringExport = CurrentResourceOwner;
553 1 : ExportInProgress = true;
554 :
555 1 : StartTransactionCommand();
556 :
557 : /* There doesn't seem to a nice API to set these */
558 1 : XactIsoLevel = XACT_REPEATABLE_READ;
559 1 : XactReadOnly = true;
560 :
561 1 : snap = SnapBuildInitialSnapshot(builder);
562 :
563 : /*
564 : * now that we've built a plain snapshot, make it active and use the
565 : * normal mechanisms for exporting it
566 : */
567 1 : snapname = ExportSnapshot(snap);
568 :
569 1 : ereport(LOG,
570 : (errmsg_plural("exported logical decoding snapshot: \"%s\" with %u transaction ID",
571 : "exported logical decoding snapshot: \"%s\" with %u transaction IDs",
572 : snap->xcnt,
573 : snapname, snap->xcnt)));
574 1 : return snapname;
575 : }
576 :
577 : /*
578 : * Ensure there is a snapshot and if not build one for current transaction.
579 : */
580 : Snapshot
581 9 : SnapBuildGetOrBuildSnapshot(SnapBuild *builder)
582 : {
583 : Assert(builder->state == SNAPBUILD_CONSISTENT);
584 :
585 : /* only build a new snapshot if we don't have a prebuilt one */
586 9 : if (builder->snapshot == NULL)
587 : {
588 2 : builder->snapshot = SnapBuildBuildSnapshot(builder);
589 : /* increase refcount for the snapshot builder */
590 2 : SnapBuildSnapIncRefcount(builder->snapshot);
591 : }
592 :
593 9 : return builder->snapshot;
594 : }
595 :
596 : /*
597 : * Reset a previously SnapBuildExportSnapshot()'ed snapshot if there is
598 : * any. Aborts the previously started transaction and resets the resource
599 : * owner back to its original value.
600 : */
601 : void
602 5671 : SnapBuildClearExportedSnapshot(void)
603 : {
604 : ResourceOwner tmpResOwner;
605 :
606 : /* nothing exported, that is the usual case */
607 5671 : if (!ExportInProgress)
608 5670 : return;
609 :
610 1 : if (!IsTransactionState())
611 0 : elog(ERROR, "clearing exported snapshot in wrong transaction state");
612 :
613 : /*
614 : * AbortCurrentTransaction() takes care of resetting the snapshot state,
615 : * so remember SavedResourceOwnerDuringExport.
616 : */
617 1 : tmpResOwner = SavedResourceOwnerDuringExport;
618 :
619 : /* make sure nothing could have ever happened */
620 1 : AbortCurrentTransaction();
621 :
622 1 : CurrentResourceOwner = tmpResOwner;
623 : }
624 :
625 : /*
626 : * Clear snapshot export state during transaction abort.
627 : */
628 : void
629 26632 : SnapBuildResetExportedSnapshotState(void)
630 : {
631 26632 : SavedResourceOwnerDuringExport = NULL;
632 26632 : ExportInProgress = false;
633 26632 : }
634 :
635 : /*
636 : * Handle the effects of a single heap change, appropriate to the current state
637 : * of the snapshot builder and returns whether changes made at (xid, lsn) can
638 : * be decoded.
639 : */
640 : bool
641 1401008 : SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
642 : {
643 : /*
644 : * We can't handle data in transactions if we haven't built a snapshot
645 : * yet, so don't store them.
646 : */
647 1401008 : if (builder->state < SNAPBUILD_FULL_SNAPSHOT)
648 0 : return false;
649 :
650 : /*
651 : * No point in keeping track of changes in transactions that we don't have
652 : * enough information about to decode. This means that they started before
653 : * we got into the SNAPBUILD_FULL_SNAPSHOT state.
654 : */
655 1401011 : if (builder->state < SNAPBUILD_CONSISTENT &&
656 3 : TransactionIdPrecedes(xid, builder->next_phase_at))
657 0 : return false;
658 :
659 : /*
660 : * If the reorderbuffer doesn't yet have a snapshot, add one now, it will
661 : * be needed to decode the change we're currently processing.
662 : */
663 1401008 : if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
664 : {
665 : /* only build a new snapshot if we don't have a prebuilt one */
666 3481 : if (builder->snapshot == NULL)
667 : {
668 426 : builder->snapshot = SnapBuildBuildSnapshot(builder);
669 : /* increase refcount for the snapshot builder */
670 426 : SnapBuildSnapIncRefcount(builder->snapshot);
671 : }
672 :
673 : /*
674 : * Increase refcount for the transaction we're handing the snapshot
675 : * out to.
676 : */
677 3481 : SnapBuildSnapIncRefcount(builder->snapshot);
678 3481 : ReorderBufferSetBaseSnapshot(builder->reorder, xid, lsn,
679 : builder->snapshot);
680 : }
681 :
682 1401008 : return true;
683 : }
684 :
685 : /*
686 : * Do CommandId/combo CID handling after reading an xl_heap_new_cid record.
687 : * This implies that a transaction has done some form of write to system
688 : * catalogs.
689 : */
690 : void
691 23624 : SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
692 : XLogRecPtr lsn, xl_heap_new_cid *xlrec)
693 : {
694 : CommandId cid;
695 :
696 : /*
697 : * we only log new_cid's if a catalog tuple was modified, so mark the
698 : * transaction as containing catalog modifications
699 : */
700 23624 : ReorderBufferXidSetCatalogChanges(builder->reorder, xid, lsn);
701 :
702 23624 : ReorderBufferAddNewTupleCids(builder->reorder, xlrec->top_xid, lsn,
703 : xlrec->target_locator, xlrec->target_tid,
704 : xlrec->cmin, xlrec->cmax,
705 : xlrec->combocid);
706 :
707 : /* figure out new command id */
708 23624 : if (xlrec->cmin != InvalidCommandId &&
709 19621 : xlrec->cmax != InvalidCommandId)
710 3182 : cid = Max(xlrec->cmin, xlrec->cmax);
711 20442 : else if (xlrec->cmax != InvalidCommandId)
712 4003 : cid = xlrec->cmax;
713 16439 : else if (xlrec->cmin != InvalidCommandId)
714 16439 : cid = xlrec->cmin;
715 : else
716 : {
717 0 : cid = InvalidCommandId; /* silence compiler */
718 0 : elog(ERROR, "xl_heap_new_cid record without a valid CommandId");
719 : }
720 :
721 23624 : ReorderBufferAddNewCommandId(builder->reorder, xid, lsn, cid + 1);
722 23624 : }
723 :
724 : /*
725 : * Add a new Snapshot and invalidation messages to all transactions we're
726 : * decoding that currently are in-progress so they can see new catalog contents
727 : * made by the transaction that just committed. This is necessary because those
728 : * in-progress transactions will use the new catalog's contents from here on
729 : * (at the very least everything they do needs to be compatible with newer
730 : * catalog contents).
731 : */
732 : static void
733 1310 : SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid)
734 : {
735 : dlist_iter txn_i;
736 : ReorderBufferTXN *txn;
737 :
738 : /*
739 : * Iterate through all toplevel transactions. This can include
740 : * subtransactions which we just don't yet know to be that, but that's
741 : * fine, they will just get an unnecessary snapshot and invalidations
742 : * queued.
743 : */
744 2655 : dlist_foreach(txn_i, &builder->reorder->toplevel_by_lsn)
745 : {
746 1345 : txn = dlist_container(ReorderBufferTXN, node, txn_i.cur);
747 :
748 : Assert(TransactionIdIsValid(txn->xid));
749 :
750 : /*
751 : * If we don't have a base snapshot yet, there are no changes in this
752 : * transaction which in turn implies we don't yet need a snapshot at
753 : * all. We'll add a snapshot when the first change gets queued.
754 : *
755 : * Similarly, we don't need to add invalidations to a transaction
756 : * whose base snapshot is not yet set. Once a base snapshot is built,
757 : * it will include the xids of committed transactions that have
758 : * modified the catalog, thus reflecting the new catalog contents. The
759 : * existing catalog cache will have already been invalidated after
760 : * processing the invalidations in the transaction that modified
761 : * catalogs, ensuring that a fresh cache is constructed during
762 : * decoding.
763 : *
764 : * NB: This works correctly even for subtransactions because
765 : * ReorderBufferAssignChild() takes care to transfer the base snapshot
766 : * to the top-level transaction, and while iterating the changequeue
767 : * we'll get the change from the subtxn.
768 : */
769 1345 : if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, txn->xid))
770 2 : continue;
771 :
772 : /*
773 : * We don't need to add snapshot or invalidations to prepared
774 : * transactions as they should not see the new catalog contents.
775 : */
776 1343 : if (rbtxn_is_prepared(txn))
777 28 : continue;
778 :
779 1315 : elog(DEBUG2, "adding a new snapshot and invalidations to %u at %X/%08X",
780 : txn->xid, LSN_FORMAT_ARGS(lsn));
781 :
782 : /*
783 : * increase the snapshot's refcount for the transaction we are handing
784 : * it out to
785 : */
786 1315 : SnapBuildSnapIncRefcount(builder->snapshot);
787 1315 : ReorderBufferAddSnapshot(builder->reorder, txn->xid, lsn,
788 : builder->snapshot);
789 :
790 : /*
791 : * Add invalidation messages to the reorder buffer of in-progress
792 : * transactions except the current committed transaction, for which we
793 : * will execute invalidations at the end.
794 : *
795 : * It is required, otherwise, we will end up using the stale catcache
796 : * contents built by the current transaction even after its decoding,
797 : * which should have been invalidated due to concurrent catalog
798 : * changing transaction.
799 : *
800 : * Distribute only the invalidation messages generated by the current
801 : * committed transaction. Invalidation messages received from other
802 : * transactions would have already been propagated to the relevant
803 : * in-progress transactions. This transaction would have processed
804 : * those invalidations, ensuring that subsequent transactions observe
805 : * a consistent cache state.
806 : */
807 1315 : if (txn->xid != xid)
808 : {
809 : uint32 ninvalidations;
810 33 : SharedInvalidationMessage *msgs = NULL;
811 :
812 33 : ninvalidations = ReorderBufferGetInvalidations(builder->reorder,
813 : xid, &msgs);
814 :
815 33 : if (ninvalidations > 0)
816 : {
817 : Assert(msgs != NULL);
818 :
819 29 : ReorderBufferAddDistributedInvalidations(builder->reorder,
820 : txn->xid, lsn,
821 : ninvalidations, msgs);
822 : }
823 : }
824 : }
825 1310 : }
826 :
827 : /*
828 : * Keep track of a new catalog changing transaction that has committed.
829 : */
830 : static void
831 1319 : SnapBuildAddCommittedTxn(SnapBuild *builder, TransactionId xid)
832 : {
833 : Assert(TransactionIdIsValid(xid));
834 :
835 1319 : if (builder->committed.xcnt == builder->committed.xcnt_space)
836 : {
837 0 : builder->committed.xcnt_space = builder->committed.xcnt_space * 2 + 1;
838 :
839 0 : elog(DEBUG1, "increasing space for committed transactions to %u",
840 : (uint32) builder->committed.xcnt_space);
841 :
842 0 : builder->committed.xip = repalloc_array(builder->committed.xip,
843 : TransactionId,
844 : builder->committed.xcnt_space);
845 : }
846 :
847 : /*
848 : * TODO: It might make sense to keep the array sorted here instead of
849 : * doing it every time we build a new snapshot. On the other hand this
850 : * gets called repeatedly when a transaction with subtransactions commits.
851 : */
852 1319 : builder->committed.xip[builder->committed.xcnt++] = xid;
853 1319 : }
854 :
855 : /*
856 : * Remove knowledge about transactions we treat as committed or containing catalog
857 : * changes that are smaller than ->xmin. Those won't ever get checked via
858 : * the ->committed or ->catchange array, respectively. The committed xids will
859 : * get checked via the clog machinery.
860 : *
861 : * We can ideally remove the transaction from catchange array once it is
862 : * finished (committed/aborted) but that could be costly as we need to maintain
863 : * the xids order in the array.
864 : */
865 : static void
866 494 : SnapBuildPurgeOlderTxn(SnapBuild *builder)
867 : {
868 : int off;
869 : TransactionId *workspace;
870 494 : int surviving_xids = 0;
871 :
872 : /* not ready yet */
873 494 : if (!TransactionIdIsNormal(builder->xmin))
874 0 : return;
875 :
876 : /* TODO: Neater algorithm than just copying and iterating? */
877 : workspace =
878 494 : MemoryContextAlloc(builder->context,
879 494 : builder->committed.xcnt * sizeof(TransactionId));
880 :
881 : /* copy xids that still are interesting to workspace */
882 752 : for (off = 0; off < builder->committed.xcnt; off++)
883 : {
884 258 : if (NormalTransactionIdPrecedes(builder->committed.xip[off],
885 : builder->xmin))
886 : ; /* remove */
887 : else
888 1 : workspace[surviving_xids++] = builder->committed.xip[off];
889 : }
890 :
891 : /* copy workspace back to persistent state */
892 494 : memcpy(builder->committed.xip, workspace,
893 : surviving_xids * sizeof(TransactionId));
894 :
895 494 : elog(DEBUG3, "purged committed transactions from %u to %u, xmin: %u, xmax: %u",
896 : (uint32) builder->committed.xcnt, (uint32) surviving_xids,
897 : builder->xmin, builder->xmax);
898 494 : builder->committed.xcnt = surviving_xids;
899 :
900 494 : pfree(workspace);
901 :
902 : /*
903 : * Purge xids in ->catchange as well. The purged array must also be sorted
904 : * in xidComparator order.
905 : */
906 494 : if (builder->catchange.xcnt > 0)
907 : {
908 : /*
909 : * Since catchange.xip is sorted, we find the lower bound of xids that
910 : * are still interesting.
911 : */
912 7 : for (off = 0; off < builder->catchange.xcnt; off++)
913 : {
914 5 : if (TransactionIdFollowsOrEquals(builder->catchange.xip[off],
915 : builder->xmin))
916 1 : break;
917 : }
918 :
919 3 : surviving_xids = builder->catchange.xcnt - off;
920 :
921 3 : if (surviving_xids > 0)
922 : {
923 1 : memmove(builder->catchange.xip, &(builder->catchange.xip[off]),
924 : surviving_xids * sizeof(TransactionId));
925 : }
926 : else
927 : {
928 2 : pfree(builder->catchange.xip);
929 2 : builder->catchange.xip = NULL;
930 : }
931 :
932 3 : elog(DEBUG3, "purged catalog modifying transactions from %u to %u, xmin: %u, xmax: %u",
933 : (uint32) builder->catchange.xcnt, (uint32) surviving_xids,
934 : builder->xmin, builder->xmax);
935 3 : builder->catchange.xcnt = surviving_xids;
936 : }
937 : }
938 :
939 : /*
940 : * Handle everything that needs to be done when a transaction commits
941 : */
942 : void
943 3332 : SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
944 : int nsubxacts, TransactionId *subxacts, uint32 xinfo)
945 : {
946 : int nxact;
947 :
948 3332 : bool needs_snapshot = false;
949 3332 : bool needs_timetravel = false;
950 3332 : bool sub_needs_timetravel = false;
951 :
952 3332 : TransactionId xmax = xid;
953 :
954 : /*
955 : * Transactions preceding BUILDING_SNAPSHOT will neither be decoded, nor
956 : * will they be part of a snapshot. So we don't need to record anything.
957 : */
958 3332 : if (builder->state == SNAPBUILD_START ||
959 3332 : (builder->state == SNAPBUILD_BUILDING_SNAPSHOT &&
960 0 : TransactionIdPrecedes(xid, builder->next_phase_at)))
961 : {
962 : /* ensure that only commits after this are getting replayed */
963 0 : if (builder->start_decoding_at <= lsn)
964 0 : builder->start_decoding_at = lsn + 1;
965 0 : return;
966 : }
967 :
968 3332 : if (builder->state < SNAPBUILD_CONSISTENT)
969 : {
970 : /* ensure that only commits after this are getting replayed */
971 5 : if (builder->start_decoding_at <= lsn)
972 2 : builder->start_decoding_at = lsn + 1;
973 :
974 : /*
975 : * If building an exportable snapshot, force xid to be tracked, even
976 : * if the transaction didn't modify the catalog.
977 : */
978 5 : if (builder->building_full_snapshot)
979 : {
980 0 : needs_timetravel = true;
981 : }
982 : }
983 :
984 4541 : for (nxact = 0; nxact < nsubxacts; nxact++)
985 : {
986 1209 : TransactionId subxid = subxacts[nxact];
987 :
988 : /*
989 : * Add subtransaction to base snapshot if catalog modifying, we don't
990 : * distinguish to toplevel transactions there.
991 : */
992 1209 : if (SnapBuildXidHasCatalogChanges(builder, subxid, xinfo))
993 : {
994 9 : sub_needs_timetravel = true;
995 9 : needs_snapshot = true;
996 :
997 9 : elog(DEBUG1, "found subtransaction %u:%u with catalog changes",
998 : xid, subxid);
999 :
1000 9 : SnapBuildAddCommittedTxn(builder, subxid);
1001 :
1002 9 : if (NormalTransactionIdFollows(subxid, xmax))
1003 9 : xmax = subxid;
1004 : }
1005 :
1006 : /*
1007 : * If we're forcing timetravel we also need visibility information
1008 : * about subtransaction, so keep track of subtransaction's state, even
1009 : * if not catalog modifying. Don't need to distribute a snapshot in
1010 : * that case.
1011 : */
1012 1200 : else if (needs_timetravel)
1013 : {
1014 0 : SnapBuildAddCommittedTxn(builder, subxid);
1015 0 : if (NormalTransactionIdFollows(subxid, xmax))
1016 0 : xmax = subxid;
1017 : }
1018 : }
1019 :
1020 : /* if top-level modified catalog, it'll need a snapshot */
1021 3332 : if (SnapBuildXidHasCatalogChanges(builder, xid, xinfo))
1022 : {
1023 1309 : elog(DEBUG2, "found top level transaction %u, with catalog changes",
1024 : xid);
1025 1309 : needs_snapshot = true;
1026 1309 : needs_timetravel = true;
1027 1309 : SnapBuildAddCommittedTxn(builder, xid);
1028 : }
1029 2023 : else if (sub_needs_timetravel)
1030 : {
1031 : /* track toplevel txn as well, subxact alone isn't meaningful */
1032 1 : elog(DEBUG2, "forced transaction %u to do timetravel due to one of its subtransactions",
1033 : xid);
1034 1 : needs_timetravel = true;
1035 1 : SnapBuildAddCommittedTxn(builder, xid);
1036 : }
1037 2022 : else if (needs_timetravel)
1038 : {
1039 0 : elog(DEBUG2, "forced transaction %u to do timetravel", xid);
1040 :
1041 0 : SnapBuildAddCommittedTxn(builder, xid);
1042 : }
1043 :
1044 3332 : if (!needs_timetravel)
1045 : {
1046 : /* record that we cannot export a general snapshot anymore */
1047 2022 : builder->committed.includes_all_transactions = false;
1048 : }
1049 :
1050 : Assert(!needs_snapshot || needs_timetravel);
1051 :
1052 : /*
1053 : * Adjust xmax of the snapshot builder, we only do that for committed,
1054 : * catalog modifying, transactions, everything else isn't interesting for
1055 : * us since we'll never look at the respective rows.
1056 : */
1057 3332 : if (needs_timetravel &&
1058 2620 : (!TransactionIdIsValid(builder->xmax) ||
1059 1310 : TransactionIdFollowsOrEquals(xmax, builder->xmax)))
1060 : {
1061 1310 : builder->xmax = xmax;
1062 1310 : TransactionIdAdvance(builder->xmax);
1063 : }
1064 :
1065 : /* if there's any reason to build a historic snapshot, do so now */
1066 3332 : if (needs_snapshot)
1067 : {
1068 : /*
1069 : * If we haven't built a complete snapshot yet there's no need to hand
1070 : * it out, it wouldn't (and couldn't) be used anyway.
1071 : */
1072 1310 : if (builder->state < SNAPBUILD_FULL_SNAPSHOT)
1073 0 : return;
1074 :
1075 : /*
1076 : * Decrease the snapshot builder's refcount of the old snapshot, note
1077 : * that it still will be used if it has been handed out to the
1078 : * reorderbuffer earlier.
1079 : */
1080 1310 : if (builder->snapshot)
1081 1310 : SnapBuildSnapDecRefcount(builder->snapshot);
1082 :
1083 1310 : builder->snapshot = SnapBuildBuildSnapshot(builder);
1084 :
1085 : /* we might need to execute invalidations, add snapshot */
1086 1310 : if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
1087 : {
1088 8 : SnapBuildSnapIncRefcount(builder->snapshot);
1089 8 : ReorderBufferSetBaseSnapshot(builder->reorder, xid, lsn,
1090 : builder->snapshot);
1091 : }
1092 :
1093 : /* refcount of the snapshot builder for the new snapshot */
1094 1310 : SnapBuildSnapIncRefcount(builder->snapshot);
1095 :
1096 : /*
1097 : * Add a new catalog snapshot and invalidations messages to all
1098 : * currently running transactions.
1099 : */
1100 1310 : SnapBuildDistributeSnapshotAndInval(builder, lsn, xid);
1101 : }
1102 : }
1103 :
1104 : /*
1105 : * Check the reorder buffer and the snapshot to see if the given transaction has
1106 : * modified catalogs.
1107 : */
1108 : static inline bool
1109 4541 : SnapBuildXidHasCatalogChanges(SnapBuild *builder, TransactionId xid,
1110 : uint32 xinfo)
1111 : {
1112 4541 : if (ReorderBufferXidHasCatalogChanges(builder->reorder, xid))
1113 1314 : return true;
1114 :
1115 : /*
1116 : * The transactions that have changed catalogs must have invalidation
1117 : * info.
1118 : */
1119 3227 : if (!(xinfo & XACT_XINFO_HAS_INVALS))
1120 3219 : return false;
1121 :
1122 : /* Check the catchange XID array */
1123 12 : return ((builder->catchange.xcnt > 0) &&
1124 4 : (bsearch(&xid, builder->catchange.xip, builder->catchange.xcnt,
1125 : sizeof(TransactionId), xidComparator) != NULL));
1126 : }
1127 :
1128 : /* -----------------------------------
1129 : * Snapshot building functions dealing with xlog records
1130 : * -----------------------------------
1131 : */
1132 :
1133 : /*
1134 : * Process a running xacts record, and use its information to first build a
1135 : * historic snapshot and later to release resources that aren't needed
1136 : * anymore.
1137 : */
1138 : void
1139 1605 : SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
1140 : {
1141 : ReorderBufferTXN *txn;
1142 : TransactionId xmin;
1143 :
1144 : /*
1145 : * If we're not consistent yet, inspect the record to see whether it
1146 : * allows to get closer to being consistent. If we are consistent, dump
1147 : * our snapshot so others or we, after a restart, can use it.
1148 : */
1149 1605 : if (builder->state < SNAPBUILD_CONSISTENT)
1150 : {
1151 : /* returns false if there's no point in performing cleanup just yet */
1152 1132 : if (!SnapBuildFindSnapshot(builder, lsn, running))
1153 1109 : return;
1154 : }
1155 : else
1156 473 : SnapBuildSerialize(builder, lsn);
1157 :
1158 : /*
1159 : * Update range of interesting xids based on the running xacts
1160 : * information. We don't increase ->xmax using it, because once we are in
1161 : * a consistent state we can do that ourselves and much more efficiently
1162 : * so, because we only need to do it for catalog transactions since we
1163 : * only ever look at those.
1164 : *
1165 : * NB: We only increase xmax when a catalog modifying transaction commits
1166 : * (see SnapBuildCommitTxn). Because of this, xmax can be lower than
1167 : * xmin, which looks odd but is correct and actually more efficient, since
1168 : * we hit fast paths in heapam_visibility.c.
1169 : */
1170 494 : builder->xmin = running->oldestRunningXid;
1171 :
1172 : /* Remove transactions we don't need to keep track off anymore */
1173 494 : SnapBuildPurgeOlderTxn(builder);
1174 :
1175 : /*
1176 : * Advance the xmin limit for the current replication slot, to allow
1177 : * vacuum to clean up the tuples this slot has been protecting.
1178 : *
1179 : * The reorderbuffer might have an xmin among the currently running
1180 : * snapshots; use it if so. If not, we need only consider the snapshots
1181 : * we'll produce later, which can't be less than the oldest running xid in
1182 : * the record we're reading now.
1183 : */
1184 494 : xmin = ReorderBufferGetOldestXmin(builder->reorder);
1185 494 : if (xmin == InvalidTransactionId)
1186 448 : xmin = running->oldestRunningXid;
1187 494 : elog(DEBUG3, "xmin: %u, xmax: %u, oldest running: %u, oldest xmin: %u",
1188 : builder->xmin, builder->xmax, running->oldestRunningXid, xmin);
1189 494 : LogicalIncreaseXminForSlot(lsn, xmin);
1190 :
1191 : /*
1192 : * Also tell the slot where we can restart decoding from. We don't want to
1193 : * do that after every commit because changing that implies an fsync of
1194 : * the logical slot's state file, so we only do it every time we see a
1195 : * running xacts record.
1196 : *
1197 : * Do so by looking for the oldest in progress transaction (determined by
1198 : * the first LSN of any of its relevant records). Every transaction
1199 : * remembers the last location we stored the snapshot to disk before its
1200 : * beginning. That point is where we can restart from.
1201 : */
1202 :
1203 : /*
1204 : * Can't know about a serialized snapshot's location if we're not
1205 : * consistent.
1206 : */
1207 494 : if (builder->state < SNAPBUILD_CONSISTENT)
1208 16 : return;
1209 :
1210 478 : txn = ReorderBufferGetOldestTXN(builder->reorder);
1211 :
1212 : /*
1213 : * oldest ongoing txn might have started when we didn't yet serialize
1214 : * anything because we hadn't reached a consistent state yet.
1215 : */
1216 478 : if (txn != NULL && XLogRecPtrIsValid(txn->restart_decoding_lsn))
1217 24 : LogicalIncreaseRestartDecodingForSlot(lsn, txn->restart_decoding_lsn);
1218 :
1219 : /*
1220 : * No in-progress transaction, can reuse the last serialized snapshot if
1221 : * we have one.
1222 : */
1223 454 : else if (txn == NULL &&
1224 423 : XLogRecPtrIsValid(builder->reorder->current_restart_decoding_lsn) &&
1225 421 : XLogRecPtrIsValid(builder->last_serialized_snapshot))
1226 421 : LogicalIncreaseRestartDecodingForSlot(lsn,
1227 : builder->last_serialized_snapshot);
1228 : }
1229 :
1230 :
1231 : /*
1232 : * Build the start of a snapshot that's capable of decoding the catalog.
1233 : *
1234 : * Helper function for SnapBuildProcessRunningXacts() while we're not yet
1235 : * consistent.
1236 : *
1237 : * Returns true if there is a point in performing internal maintenance/cleanup
1238 : * using the xl_running_xacts record.
1239 : */
1240 : static bool
1241 1132 : SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
1242 : {
1243 : /* ---
1244 : * Build catalog decoding snapshot incrementally using information about
1245 : * the currently running transactions. There are several ways to do that:
1246 : *
1247 : * a) There were no running transactions when the xl_running_xacts record
1248 : * was inserted, jump to CONSISTENT immediately. We might find such a
1249 : * state while waiting on c)'s sub-states.
1250 : *
1251 : * b) This (in a previous run) or another decoding slot serialized a
1252 : * snapshot to disk that we can use. Can't use this method while finding
1253 : * the start point for decoding changes as the restart LSN would be an
1254 : * arbitrary LSN but we need to find the start point to extract changes
1255 : * where we won't see the data for partial transactions. Also, we cannot
1256 : * use this method when a slot needs a full snapshot for export or direct
1257 : * use, as that snapshot will only contain catalog modifying transactions.
1258 : *
1259 : * c) First incrementally build a snapshot for catalog tuples
1260 : * (BUILDING_SNAPSHOT), that requires all, already in-progress,
1261 : * transactions to finish. Every transaction starting after that
1262 : * (FULL_SNAPSHOT state), has enough information to be decoded. But
1263 : * for older running transactions no viable snapshot exists yet, so
1264 : * CONSISTENT will only be reached once all of those have finished.
1265 : * ---
1266 : */
1267 :
1268 : /*
1269 : * xl_running_xacts record is older than what we can use, we might not
1270 : * have all necessary catalog rows anymore.
1271 : */
1272 1132 : if (TransactionIdIsNormal(builder->initial_xmin_horizon) &&
1273 492 : NormalTransactionIdPrecedes(running->oldestRunningXid,
1274 : builder->initial_xmin_horizon))
1275 : {
1276 0 : ereport(DEBUG1,
1277 : errmsg_internal("skipping snapshot at %X/%08X while building logical decoding snapshot, xmin horizon too low",
1278 : LSN_FORMAT_ARGS(lsn)),
1279 : errdetail_internal("initial xmin horizon of %u vs the snapshot's %u",
1280 : builder->initial_xmin_horizon, running->oldestRunningXid));
1281 :
1282 :
1283 0 : SnapBuildWaitSnapshot(running, builder->initial_xmin_horizon);
1284 :
1285 0 : return true;
1286 : }
1287 :
1288 : /*
1289 : * a) No transaction were running, we can jump to consistent.
1290 : *
1291 : * This is not affected by races around xl_running_xacts, because we can
1292 : * miss transaction commits, but currently not transactions starting.
1293 : *
1294 : * NB: We might have already started to incrementally assemble a snapshot,
1295 : * so we need to be careful to deal with that.
1296 : */
1297 1132 : if (running->oldestRunningXid == running->nextXid)
1298 : {
1299 1102 : if (!XLogRecPtrIsValid(builder->start_decoding_at) ||
1300 616 : builder->start_decoding_at <= lsn)
1301 : /* can decode everything after this */
1302 487 : builder->start_decoding_at = lsn + 1;
1303 :
1304 : /* As no transactions were running xmin/xmax can be trivially set. */
1305 1102 : builder->xmin = running->nextXid; /* < are finished */
1306 1102 : builder->xmax = running->nextXid; /* >= are running */
1307 :
1308 : /* so we can safely use the faster comparisons */
1309 : Assert(TransactionIdIsNormal(builder->xmin));
1310 : Assert(TransactionIdIsNormal(builder->xmax));
1311 :
1312 1102 : builder->state = SNAPBUILD_CONSISTENT;
1313 1102 : builder->next_phase_at = InvalidTransactionId;
1314 :
1315 1102 : ereport(LOG,
1316 : errmsg("logical decoding found consistent point at %X/%08X",
1317 : LSN_FORMAT_ARGS(lsn)),
1318 : errdetail("There are no running transactions."));
1319 :
1320 1102 : return false;
1321 : }
1322 :
1323 : /*
1324 : * b) valid on disk state and while neither building full snapshot nor
1325 : * creating a slot.
1326 : */
1327 30 : else if (!builder->building_full_snapshot &&
1328 48 : !builder->in_slot_creation &&
1329 18 : SnapBuildRestore(builder, lsn))
1330 : {
1331 : /* there won't be any state to cleanup */
1332 7 : return false;
1333 : }
1334 :
1335 : /*
1336 : * c) transition from START to BUILDING_SNAPSHOT.
1337 : *
1338 : * In START state, and a xl_running_xacts record with running xacts is
1339 : * encountered. In that case, switch to BUILDING_SNAPSHOT state, and
1340 : * record xl_running_xacts->nextXid. Once all running xacts have finished
1341 : * (i.e. they're all >= nextXid), we have a complete catalog snapshot. It
1342 : * might look that we could use xl_running_xacts's ->xids information to
1343 : * get there quicker, but that is problematic because transactions marked
1344 : * as running, might already have inserted their commit record - it's
1345 : * infeasible to change that with locking.
1346 : */
1347 23 : else if (builder->state == SNAPBUILD_START)
1348 : {
1349 12 : builder->state = SNAPBUILD_BUILDING_SNAPSHOT;
1350 12 : builder->next_phase_at = running->nextXid;
1351 :
1352 : /*
1353 : * Start with an xmin/xmax that's correct for future, when all the
1354 : * currently running transactions have finished. We'll update both
1355 : * while waiting for the pending transactions to finish.
1356 : */
1357 12 : builder->xmin = running->nextXid; /* < are finished */
1358 12 : builder->xmax = running->nextXid; /* >= are running */
1359 :
1360 : /* so we can safely use the faster comparisons */
1361 : Assert(TransactionIdIsNormal(builder->xmin));
1362 : Assert(TransactionIdIsNormal(builder->xmax));
1363 :
1364 12 : ereport(LOG,
1365 : errmsg("logical decoding found initial starting point at %X/%08X",
1366 : LSN_FORMAT_ARGS(lsn)),
1367 : errdetail("Waiting for transactions (approximately %d) older than %u to end.",
1368 : running->xcnt, running->nextXid));
1369 :
1370 12 : SnapBuildWaitSnapshot(running, running->nextXid);
1371 : }
1372 :
1373 : /*
1374 : * c) transition from BUILDING_SNAPSHOT to FULL_SNAPSHOT.
1375 : *
1376 : * In BUILDING_SNAPSHOT state, and this xl_running_xacts' oldestRunningXid
1377 : * is >= than nextXid from when we switched to BUILDING_SNAPSHOT. This
1378 : * means all transactions starting afterwards have enough information to
1379 : * be decoded. Switch to FULL_SNAPSHOT.
1380 : */
1381 17 : else if (builder->state == SNAPBUILD_BUILDING_SNAPSHOT &&
1382 6 : TransactionIdPrecedesOrEquals(builder->next_phase_at,
1383 : running->oldestRunningXid))
1384 : {
1385 5 : builder->state = SNAPBUILD_FULL_SNAPSHOT;
1386 5 : builder->next_phase_at = running->nextXid;
1387 :
1388 5 : ereport(LOG,
1389 : errmsg("logical decoding found initial consistent point at %X/%08X",
1390 : LSN_FORMAT_ARGS(lsn)),
1391 : errdetail("Waiting for transactions (approximately %d) older than %u to end.",
1392 : running->xcnt, running->nextXid));
1393 :
1394 5 : SnapBuildWaitSnapshot(running, running->nextXid);
1395 : }
1396 :
1397 : /*
1398 : * c) transition from FULL_SNAPSHOT to CONSISTENT.
1399 : *
1400 : * In FULL_SNAPSHOT state, and this xl_running_xacts' oldestRunningXid is
1401 : * >= than nextXid from when we switched to FULL_SNAPSHOT. This means all
1402 : * transactions that are currently in progress have a catalog snapshot,
1403 : * and all their changes have been collected. Switch to CONSISTENT.
1404 : */
1405 11 : else if (builder->state == SNAPBUILD_FULL_SNAPSHOT &&
1406 5 : TransactionIdPrecedesOrEquals(builder->next_phase_at,
1407 : running->oldestRunningXid))
1408 : {
1409 5 : builder->state = SNAPBUILD_CONSISTENT;
1410 5 : builder->next_phase_at = InvalidTransactionId;
1411 :
1412 5 : ereport(LOG,
1413 : errmsg("logical decoding found consistent point at %X/%08X",
1414 : LSN_FORMAT_ARGS(lsn)),
1415 : errdetail("There are no old transactions anymore."));
1416 : }
1417 :
1418 : /*
1419 : * We already started to track running xacts and need to wait for all
1420 : * in-progress ones to finish. We fall through to the normal processing of
1421 : * records so incremental cleanup can be performed.
1422 : */
1423 21 : return true;
1424 : }
1425 :
1426 : /* ---
1427 : * Iterate through xids in record, wait for all older than the cutoff to
1428 : * finish. Then, if possible, log a new xl_running_xacts record.
1429 : *
1430 : * This isn't required for the correctness of decoding, but to:
1431 : * a) allow isolationtester to notice that we're currently waiting for
1432 : * something.
1433 : * b) log a new xl_running_xacts record where it'd be helpful, without having
1434 : * to wait for bgwriter or checkpointer.
1435 : * ---
1436 : */
1437 : static void
1438 17 : SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff)
1439 : {
1440 : int off;
1441 :
1442 32 : for (off = 0; off < running->xcnt; off++)
1443 : {
1444 17 : TransactionId xid = running->xids[off];
1445 :
1446 : /*
1447 : * Upper layers should prevent that we ever need to wait on ourselves.
1448 : * Check anyway, since failing to do so would either result in an
1449 : * endless wait or an Assert() failure.
1450 : */
1451 17 : if (TransactionIdIsCurrentTransactionId(xid))
1452 0 : elog(ERROR, "waiting for ourselves");
1453 :
1454 17 : if (TransactionIdFollows(xid, cutoff))
1455 0 : continue;
1456 :
1457 17 : XactLockTableWait(xid, NULL, NULL, XLTW_None);
1458 : }
1459 :
1460 : /*
1461 : * All transactions we needed to finish finished - try to ensure there is
1462 : * another xl_running_xacts record in a timely manner, without having to
1463 : * wait for bgwriter or checkpointer to log one. During recovery we can't
1464 : * enforce that, so we'll have to wait.
1465 : */
1466 15 : if (!RecoveryInProgress())
1467 : {
1468 15 : LogStandbySnapshot();
1469 : }
1470 15 : }
1471 :
1472 : #define SnapBuildOnDiskConstantSize \
1473 : offsetof(SnapBuildOnDisk, builder)
1474 : #define SnapBuildOnDiskNotChecksummedSize \
1475 : offsetof(SnapBuildOnDisk, version)
1476 :
1477 : #define SNAPBUILD_MAGIC 0x51A1E001
1478 : #define SNAPBUILD_VERSION 6
1479 :
1480 : /*
1481 : * Store/Load a snapshot from disk, depending on the snapshot builder's state.
1482 : *
1483 : * Supposed to be used by external (i.e. not snapbuild.c) code that just read
1484 : * a record that's a potential location for a serialized snapshot.
1485 : */
1486 : void
1487 74 : SnapBuildSerializationPoint(SnapBuild *builder, XLogRecPtr lsn)
1488 : {
1489 74 : if (builder->state < SNAPBUILD_CONSISTENT)
1490 0 : SnapBuildRestore(builder, lsn);
1491 : else
1492 74 : SnapBuildSerialize(builder, lsn);
1493 74 : }
1494 :
1495 : /*
1496 : * Serialize the snapshot 'builder' at the location 'lsn' if it hasn't already
1497 : * been done by another decoding process.
1498 : */
1499 : static void
1500 547 : SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
1501 : {
1502 : Size needed_length;
1503 547 : SnapBuildOnDisk *ondisk = NULL;
1504 547 : TransactionId *catchange_xip = NULL;
1505 : MemoryContext old_ctx;
1506 : size_t catchange_xcnt;
1507 : char *ondisk_c;
1508 : int fd;
1509 : char tmppath[MAXPGPATH];
1510 : char path[MAXPGPATH];
1511 : int ret;
1512 : struct stat stat_buf;
1513 : Size sz;
1514 :
1515 : Assert(XLogRecPtrIsValid(lsn));
1516 : Assert(!XLogRecPtrIsValid(builder->last_serialized_snapshot) ||
1517 : builder->last_serialized_snapshot <= lsn);
1518 :
1519 : /*
1520 : * no point in serializing if we cannot continue to work immediately after
1521 : * restoring the snapshot
1522 : */
1523 547 : if (builder->state < SNAPBUILD_CONSISTENT)
1524 0 : return;
1525 :
1526 : /* consistent snapshots have no next phase */
1527 : Assert(builder->next_phase_at == InvalidTransactionId);
1528 :
1529 : /*
1530 : * We identify snapshots by the LSN they are valid for. We don't need to
1531 : * include timelines in the name as each LSN maps to exactly one timeline
1532 : * unless the user used pg_resetwal or similar. If a user did so, there's
1533 : * no hope continuing to decode anyway.
1534 : */
1535 547 : sprintf(path, "%s/%X-%X.snap",
1536 : PG_LOGICAL_SNAPSHOTS_DIR,
1537 547 : LSN_FORMAT_ARGS(lsn));
1538 :
1539 : /*
1540 : * first check whether some other backend already has written the snapshot
1541 : * for this LSN. It's perfectly fine if there's none, so we accept ENOENT
1542 : * as a valid state. Everything else is an unexpected error.
1543 : */
1544 547 : ret = stat(path, &stat_buf);
1545 :
1546 547 : if (ret != 0 && errno != ENOENT)
1547 0 : ereport(ERROR,
1548 : (errcode_for_file_access(),
1549 : errmsg("could not stat file \"%s\": %m", path)));
1550 :
1551 547 : else if (ret == 0)
1552 : {
1553 : /*
1554 : * somebody else has already serialized to this point, don't overwrite
1555 : * but remember location, so we don't need to read old data again.
1556 : *
1557 : * To be sure it has been synced to disk after the rename() from the
1558 : * tempfile filename to the real filename, we just repeat the fsync.
1559 : * That ought to be cheap because in most scenarios it should already
1560 : * be safely on disk.
1561 : */
1562 221 : fsync_fname(path, false);
1563 221 : fsync_fname(PG_LOGICAL_SNAPSHOTS_DIR, true);
1564 :
1565 221 : builder->last_serialized_snapshot = lsn;
1566 221 : goto out;
1567 : }
1568 :
1569 : /*
1570 : * there is an obvious race condition here between the time we stat(2) the
1571 : * file and us writing the file. But we rename the file into place
1572 : * atomically and all files created need to contain the same data anyway,
1573 : * so this is perfectly fine, although a bit of a resource waste. Locking
1574 : * seems like pointless complication.
1575 : */
1576 326 : elog(DEBUG1, "serializing snapshot to %s", path);
1577 :
1578 : /* to make sure only we will write to this tempfile, include pid */
1579 326 : sprintf(tmppath, "%s/%X-%X.snap.%d.tmp",
1580 : PG_LOGICAL_SNAPSHOTS_DIR,
1581 326 : LSN_FORMAT_ARGS(lsn), MyProcPid);
1582 :
1583 : /*
1584 : * Unlink temporary file if it already exists, needs to have been before a
1585 : * crash/error since we won't enter this function twice from within a
1586 : * single decoding slot/backend and the temporary file contains the pid of
1587 : * the current process.
1588 : */
1589 326 : if (unlink(tmppath) != 0 && errno != ENOENT)
1590 0 : ereport(ERROR,
1591 : (errcode_for_file_access(),
1592 : errmsg("could not remove file \"%s\": %m", tmppath)));
1593 :
1594 326 : old_ctx = MemoryContextSwitchTo(builder->context);
1595 :
1596 : /* Get the catalog modifying transactions that are yet not committed */
1597 326 : catchange_xip = ReorderBufferGetCatalogChangesXacts(builder->reorder);
1598 326 : catchange_xcnt = dclist_count(&builder->reorder->catchange_txns);
1599 :
1600 326 : needed_length = sizeof(SnapBuildOnDisk) +
1601 326 : sizeof(TransactionId) * (builder->committed.xcnt + catchange_xcnt);
1602 :
1603 326 : ondisk_c = palloc0(needed_length);
1604 326 : ondisk = (SnapBuildOnDisk *) ondisk_c;
1605 326 : ondisk->magic = SNAPBUILD_MAGIC;
1606 326 : ondisk->version = SNAPBUILD_VERSION;
1607 326 : ondisk->length = needed_length;
1608 326 : INIT_CRC32C(ondisk->checksum);
1609 326 : COMP_CRC32C(ondisk->checksum,
1610 : ((char *) ondisk) + SnapBuildOnDiskNotChecksummedSize,
1611 : SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
1612 326 : ondisk_c += sizeof(SnapBuildOnDisk);
1613 :
1614 326 : memcpy(&ondisk->builder, builder, sizeof(SnapBuild));
1615 : /* NULL-ify memory-only data */
1616 326 : ondisk->builder.context = NULL;
1617 326 : ondisk->builder.snapshot = NULL;
1618 326 : ondisk->builder.reorder = NULL;
1619 326 : ondisk->builder.committed.xip = NULL;
1620 326 : ondisk->builder.catchange.xip = NULL;
1621 : /* update catchange only on disk data */
1622 326 : ondisk->builder.catchange.xcnt = catchange_xcnt;
1623 :
1624 326 : COMP_CRC32C(ondisk->checksum,
1625 : &ondisk->builder,
1626 : sizeof(SnapBuild));
1627 :
1628 : /* copy committed xacts */
1629 326 : if (builder->committed.xcnt > 0)
1630 : {
1631 60 : sz = sizeof(TransactionId) * builder->committed.xcnt;
1632 60 : memcpy(ondisk_c, builder->committed.xip, sz);
1633 60 : COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
1634 60 : ondisk_c += sz;
1635 : }
1636 :
1637 : /* copy catalog modifying xacts */
1638 326 : if (catchange_xcnt > 0)
1639 : {
1640 8 : sz = sizeof(TransactionId) * catchange_xcnt;
1641 8 : memcpy(ondisk_c, catchange_xip, sz);
1642 8 : COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
1643 8 : ondisk_c += sz;
1644 : }
1645 :
1646 326 : FIN_CRC32C(ondisk->checksum);
1647 :
1648 : /* we have valid data now, open tempfile and write it there */
1649 326 : fd = OpenTransientFile(tmppath,
1650 : O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
1651 326 : if (fd < 0)
1652 0 : ereport(ERROR,
1653 : (errcode_for_file_access(),
1654 : errmsg("could not open file \"%s\": %m", tmppath)));
1655 :
1656 326 : errno = 0;
1657 326 : pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_WRITE);
1658 326 : if ((write(fd, ondisk, needed_length)) != needed_length)
1659 : {
1660 0 : int save_errno = errno;
1661 :
1662 0 : CloseTransientFile(fd);
1663 :
1664 : /* if write didn't set errno, assume problem is no disk space */
1665 0 : errno = save_errno ? save_errno : ENOSPC;
1666 0 : ereport(ERROR,
1667 : (errcode_for_file_access(),
1668 : errmsg("could not write to file \"%s\": %m", tmppath)));
1669 : }
1670 326 : pgstat_report_wait_end();
1671 :
1672 : /*
1673 : * fsync the file before renaming so that even if we crash after this we
1674 : * have either a fully valid file or nothing.
1675 : *
1676 : * It's safe to just ERROR on fsync() here because we'll retry the whole
1677 : * operation including the writes.
1678 : *
1679 : * TODO: Do the fsync() via checkpoints/restartpoints, doing it here has
1680 : * some noticeable overhead since it's performed synchronously during
1681 : * decoding?
1682 : */
1683 326 : pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_SYNC);
1684 326 : if (pg_fsync(fd) != 0)
1685 : {
1686 0 : int save_errno = errno;
1687 :
1688 0 : CloseTransientFile(fd);
1689 0 : errno = save_errno;
1690 0 : ereport(ERROR,
1691 : (errcode_for_file_access(),
1692 : errmsg("could not fsync file \"%s\": %m", tmppath)));
1693 : }
1694 326 : pgstat_report_wait_end();
1695 :
1696 326 : if (CloseTransientFile(fd) != 0)
1697 0 : ereport(ERROR,
1698 : (errcode_for_file_access(),
1699 : errmsg("could not close file \"%s\": %m", tmppath)));
1700 :
1701 326 : fsync_fname(PG_LOGICAL_SNAPSHOTS_DIR, true);
1702 :
1703 : /*
1704 : * We may overwrite the work from some other backend, but that's ok, our
1705 : * snapshot is valid as well, we'll just have done some superfluous work.
1706 : */
1707 326 : if (rename(tmppath, path) != 0)
1708 : {
1709 0 : ereport(ERROR,
1710 : (errcode_for_file_access(),
1711 : errmsg("could not rename file \"%s\" to \"%s\": %m",
1712 : tmppath, path)));
1713 : }
1714 :
1715 : /* make sure we persist */
1716 326 : fsync_fname(path, false);
1717 326 : fsync_fname(PG_LOGICAL_SNAPSHOTS_DIR, true);
1718 :
1719 : /*
1720 : * Now there's no way we can lose the dumped state anymore, remember this
1721 : * as a serialization point.
1722 : */
1723 326 : builder->last_serialized_snapshot = lsn;
1724 :
1725 326 : MemoryContextSwitchTo(old_ctx);
1726 :
1727 547 : out:
1728 547 : ReorderBufferSetRestartPoint(builder->reorder,
1729 : builder->last_serialized_snapshot);
1730 : /* be tidy */
1731 547 : if (ondisk)
1732 326 : pfree(ondisk);
1733 547 : if (catchange_xip)
1734 8 : pfree(catchange_xip);
1735 : }
1736 :
1737 : /*
1738 : * Restore the logical snapshot file contents to 'ondisk'.
1739 : *
1740 : * 'context' is the memory context where the catalog modifying/committed xid
1741 : * will live.
1742 : * If 'missing_ok' is true, will not throw an error if the file is not found.
1743 : */
1744 : bool
1745 20 : SnapBuildRestoreSnapshot(SnapBuildOnDisk *ondisk, XLogRecPtr lsn,
1746 : MemoryContext context, bool missing_ok)
1747 : {
1748 : int fd;
1749 : pg_crc32c checksum;
1750 : Size sz;
1751 : char path[MAXPGPATH];
1752 :
1753 20 : sprintf(path, "%s/%X-%X.snap",
1754 : PG_LOGICAL_SNAPSHOTS_DIR,
1755 20 : LSN_FORMAT_ARGS(lsn));
1756 :
1757 20 : fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
1758 :
1759 20 : if (fd < 0)
1760 : {
1761 11 : if (missing_ok && errno == ENOENT)
1762 11 : return false;
1763 :
1764 0 : ereport(ERROR,
1765 : (errcode_for_file_access(),
1766 : errmsg("could not open file \"%s\": %m", path)));
1767 : }
1768 :
1769 : /* ----
1770 : * Make sure the snapshot had been stored safely to disk, that's normally
1771 : * cheap.
1772 : * Note that we do not need PANIC here, nobody will be able to use the
1773 : * slot without fsyncing, and saving it won't succeed without an fsync()
1774 : * either...
1775 : * ----
1776 : */
1777 9 : fsync_fname(path, false);
1778 9 : fsync_fname(PG_LOGICAL_SNAPSHOTS_DIR, true);
1779 :
1780 : /* read statically sized portion of snapshot */
1781 9 : SnapBuildRestoreContents(fd, ondisk, SnapBuildOnDiskConstantSize, path);
1782 :
1783 9 : if (ondisk->magic != SNAPBUILD_MAGIC)
1784 0 : ereport(ERROR,
1785 : (errcode(ERRCODE_DATA_CORRUPTED),
1786 : errmsg("snapbuild state file \"%s\" has wrong magic number: %u instead of %u",
1787 : path, ondisk->magic, SNAPBUILD_MAGIC)));
1788 :
1789 9 : if (ondisk->version != SNAPBUILD_VERSION)
1790 0 : ereport(ERROR,
1791 : (errcode(ERRCODE_DATA_CORRUPTED),
1792 : errmsg("snapbuild state file \"%s\" has unsupported version: %u instead of %u",
1793 : path, ondisk->version, SNAPBUILD_VERSION)));
1794 :
1795 9 : INIT_CRC32C(checksum);
1796 9 : COMP_CRC32C(checksum,
1797 : ((char *) ondisk) + SnapBuildOnDiskNotChecksummedSize,
1798 : SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
1799 :
1800 : /* read SnapBuild */
1801 9 : SnapBuildRestoreContents(fd, &ondisk->builder, sizeof(SnapBuild), path);
1802 9 : COMP_CRC32C(checksum, &ondisk->builder, sizeof(SnapBuild));
1803 :
1804 : /* restore committed xacts information */
1805 9 : if (ondisk->builder.committed.xcnt > 0)
1806 : {
1807 4 : sz = sizeof(TransactionId) * ondisk->builder.committed.xcnt;
1808 4 : ondisk->builder.committed.xip = MemoryContextAllocZero(context, sz);
1809 4 : SnapBuildRestoreContents(fd, ondisk->builder.committed.xip, sz, path);
1810 4 : COMP_CRC32C(checksum, ondisk->builder.committed.xip, sz);
1811 : }
1812 :
1813 : /* restore catalog modifying xacts information */
1814 9 : if (ondisk->builder.catchange.xcnt > 0)
1815 : {
1816 4 : sz = sizeof(TransactionId) * ondisk->builder.catchange.xcnt;
1817 4 : ondisk->builder.catchange.xip = MemoryContextAllocZero(context, sz);
1818 4 : SnapBuildRestoreContents(fd, ondisk->builder.catchange.xip, sz, path);
1819 4 : COMP_CRC32C(checksum, ondisk->builder.catchange.xip, sz);
1820 : }
1821 :
1822 9 : if (CloseTransientFile(fd) != 0)
1823 0 : ereport(ERROR,
1824 : (errcode_for_file_access(),
1825 : errmsg("could not close file \"%s\": %m", path)));
1826 :
1827 9 : FIN_CRC32C(checksum);
1828 :
1829 : /* verify checksum of what we've read */
1830 9 : if (!EQ_CRC32C(checksum, ondisk->checksum))
1831 0 : ereport(ERROR,
1832 : (errcode(ERRCODE_DATA_CORRUPTED),
1833 : errmsg("checksum mismatch for snapbuild state file \"%s\": is %u, should be %u",
1834 : path, checksum, ondisk->checksum)));
1835 :
1836 9 : return true;
1837 : }
1838 :
1839 : /*
1840 : * Restore a snapshot into 'builder' if previously one has been stored at the
1841 : * location indicated by 'lsn'. Returns true if successful, false otherwise.
1842 : */
1843 : static bool
1844 18 : SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
1845 : {
1846 : SnapBuildOnDisk ondisk;
1847 :
1848 : /* no point in loading a snapshot if we're already there */
1849 18 : if (builder->state == SNAPBUILD_CONSISTENT)
1850 0 : return false;
1851 :
1852 : /* validate and restore the snapshot to 'ondisk' */
1853 18 : if (!SnapBuildRestoreSnapshot(&ondisk, lsn, builder->context, true))
1854 11 : return false;
1855 :
1856 : /*
1857 : * ok, we now have a sensible snapshot here, figure out if it has more
1858 : * information than we have.
1859 : */
1860 :
1861 : /*
1862 : * We are only interested in consistent snapshots for now, comparing
1863 : * whether one incomplete snapshot is more "advanced" seems to be
1864 : * unnecessarily complex.
1865 : */
1866 7 : if (ondisk.builder.state < SNAPBUILD_CONSISTENT)
1867 0 : goto snapshot_not_interesting;
1868 :
1869 : /*
1870 : * Don't use a snapshot that requires an xmin that we cannot guarantee to
1871 : * be available.
1872 : */
1873 7 : if (TransactionIdPrecedes(ondisk.builder.xmin, builder->initial_xmin_horizon))
1874 0 : goto snapshot_not_interesting;
1875 :
1876 : /*
1877 : * Consistent snapshots have no next phase. Reset next_phase_at as it is
1878 : * possible that an old value may remain.
1879 : */
1880 : Assert(ondisk.builder.next_phase_at == InvalidTransactionId);
1881 7 : builder->next_phase_at = InvalidTransactionId;
1882 :
1883 : /* ok, we think the snapshot is sensible, copy over everything important */
1884 7 : builder->xmin = ondisk.builder.xmin;
1885 7 : builder->xmax = ondisk.builder.xmax;
1886 7 : builder->state = ondisk.builder.state;
1887 :
1888 7 : builder->committed.xcnt = ondisk.builder.committed.xcnt;
1889 : /* We only allocated/stored xcnt, not xcnt_space xids ! */
1890 : /* don't overwrite preallocated xip, if we don't have anything here */
1891 7 : if (builder->committed.xcnt > 0)
1892 : {
1893 2 : pfree(builder->committed.xip);
1894 2 : builder->committed.xcnt_space = ondisk.builder.committed.xcnt;
1895 2 : builder->committed.xip = ondisk.builder.committed.xip;
1896 : }
1897 7 : ondisk.builder.committed.xip = NULL;
1898 :
1899 : /* set catalog modifying transactions */
1900 7 : if (builder->catchange.xip)
1901 0 : pfree(builder->catchange.xip);
1902 7 : builder->catchange.xcnt = ondisk.builder.catchange.xcnt;
1903 7 : builder->catchange.xip = ondisk.builder.catchange.xip;
1904 7 : ondisk.builder.catchange.xip = NULL;
1905 :
1906 : /* our snapshot is not interesting anymore, build a new one */
1907 7 : if (builder->snapshot != NULL)
1908 : {
1909 0 : SnapBuildSnapDecRefcount(builder->snapshot);
1910 : }
1911 7 : builder->snapshot = SnapBuildBuildSnapshot(builder);
1912 7 : SnapBuildSnapIncRefcount(builder->snapshot);
1913 :
1914 7 : ReorderBufferSetRestartPoint(builder->reorder, lsn);
1915 :
1916 : Assert(builder->state == SNAPBUILD_CONSISTENT);
1917 :
1918 7 : ereport(LOG,
1919 : errmsg("logical decoding found consistent point at %X/%08X",
1920 : LSN_FORMAT_ARGS(lsn)),
1921 : errdetail("Logical decoding will begin using saved snapshot."));
1922 7 : return true;
1923 :
1924 0 : snapshot_not_interesting:
1925 0 : if (ondisk.builder.committed.xip != NULL)
1926 0 : pfree(ondisk.builder.committed.xip);
1927 0 : if (ondisk.builder.catchange.xip != NULL)
1928 0 : pfree(ondisk.builder.catchange.xip);
1929 0 : return false;
1930 : }
1931 :
1932 : /*
1933 : * Read the contents of the serialized snapshot to 'dest'.
1934 : */
1935 : static void
1936 26 : SnapBuildRestoreContents(int fd, void *dest, Size size, const char *path)
1937 : {
1938 : int readBytes;
1939 :
1940 26 : pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ);
1941 26 : readBytes = read(fd, dest, size);
1942 26 : pgstat_report_wait_end();
1943 26 : if (readBytes != size)
1944 : {
1945 0 : int save_errno = errno;
1946 :
1947 0 : CloseTransientFile(fd);
1948 :
1949 0 : if (readBytes < 0)
1950 : {
1951 0 : errno = save_errno;
1952 0 : ereport(ERROR,
1953 : (errcode_for_file_access(),
1954 : errmsg("could not read file \"%s\": %m", path)));
1955 : }
1956 : else
1957 0 : ereport(ERROR,
1958 : (errcode(ERRCODE_DATA_CORRUPTED),
1959 : errmsg("could not read file \"%s\": read %d of %zu",
1960 : path, readBytes, size)));
1961 : }
1962 26 : }
1963 :
1964 : /*
1965 : * Remove all serialized snapshots that are not required anymore because no
1966 : * slot can need them. This doesn't actually have to run during a checkpoint,
1967 : * but it's a convenient point to schedule this.
1968 : *
1969 : * NB: We run this during checkpoints even if logical decoding is disabled so
1970 : * we cleanup old slots at some point after it got disabled.
1971 : */
1972 : void
1973 1806 : CheckPointSnapBuild(void)
1974 : {
1975 : XLogRecPtr cutoff;
1976 : XLogRecPtr redo;
1977 : DIR *snap_dir;
1978 : struct dirent *snap_de;
1979 : char path[MAXPGPATH + sizeof(PG_LOGICAL_SNAPSHOTS_DIR)];
1980 :
1981 : /*
1982 : * We start off with a minimum of the last redo pointer. No new
1983 : * replication slot will start before that, so that's a safe upper bound
1984 : * for removal.
1985 : */
1986 1806 : redo = GetRedoRecPtr();
1987 :
1988 : /* now check for the restart ptrs from existing slots */
1989 1806 : cutoff = ReplicationSlotsComputeLogicalRestartLSN();
1990 :
1991 : /* don't start earlier than the restart lsn */
1992 1806 : if (redo < cutoff)
1993 1 : cutoff = redo;
1994 :
1995 1806 : snap_dir = AllocateDir(PG_LOGICAL_SNAPSHOTS_DIR);
1996 5707 : while ((snap_de = ReadDir(snap_dir, PG_LOGICAL_SNAPSHOTS_DIR)) != NULL)
1997 : {
1998 : uint32 hi;
1999 : uint32 lo;
2000 : XLogRecPtr lsn;
2001 : PGFileType de_type;
2002 :
2003 3901 : if (strcmp(snap_de->d_name, ".") == 0 ||
2004 2095 : strcmp(snap_de->d_name, "..") == 0)
2005 3612 : continue;
2006 :
2007 289 : snprintf(path, sizeof(path), "%s/%s", PG_LOGICAL_SNAPSHOTS_DIR, snap_de->d_name);
2008 289 : de_type = get_dirent_type(path, snap_de, false, DEBUG1);
2009 :
2010 289 : if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_REG)
2011 : {
2012 0 : elog(DEBUG1, "only regular files expected: %s", path);
2013 0 : continue;
2014 : }
2015 :
2016 : /*
2017 : * temporary filenames from SnapBuildSerialize() include the LSN and
2018 : * everything but are postfixed by .$pid.tmp. We can just remove them
2019 : * the same as other files because there can be none that are
2020 : * currently being written that are older than cutoff.
2021 : *
2022 : * We just log a message if a file doesn't fit the pattern, it's
2023 : * probably some editors lock/state file or similar...
2024 : */
2025 289 : if (sscanf(snap_de->d_name, "%X-%X.snap", &hi, &lo) != 2)
2026 : {
2027 0 : ereport(LOG,
2028 : (errmsg("could not parse file name \"%s\"", path)));
2029 0 : continue;
2030 : }
2031 :
2032 289 : lsn = ((uint64) hi) << 32 | lo;
2033 :
2034 : /* check whether we still need it */
2035 289 : if (lsn < cutoff || !XLogRecPtrIsValid(cutoff))
2036 : {
2037 192 : elog(DEBUG1, "removing snapbuild snapshot %s", path);
2038 :
2039 : /*
2040 : * It's not particularly harmful, though strange, if we can't
2041 : * remove the file here. Don't prevent the checkpoint from
2042 : * completing, that'd be a cure worse than the disease.
2043 : */
2044 192 : if (unlink(path) < 0)
2045 : {
2046 0 : ereport(LOG,
2047 : (errcode_for_file_access(),
2048 : errmsg("could not remove file \"%s\": %m",
2049 : path)));
2050 0 : continue;
2051 : }
2052 : }
2053 : }
2054 1806 : FreeDir(snap_dir);
2055 1806 : }
2056 :
2057 : /*
2058 : * Check if a logical snapshot at the specified point has been serialized.
2059 : */
2060 : bool
2061 13 : SnapBuildSnapshotExists(XLogRecPtr lsn)
2062 : {
2063 : char path[MAXPGPATH];
2064 : int ret;
2065 : struct stat stat_buf;
2066 :
2067 13 : sprintf(path, "%s/%X-%X.snap",
2068 : PG_LOGICAL_SNAPSHOTS_DIR,
2069 13 : LSN_FORMAT_ARGS(lsn));
2070 :
2071 13 : ret = stat(path, &stat_buf);
2072 :
2073 13 : if (ret != 0 && errno != ENOENT)
2074 0 : ereport(ERROR,
2075 : (errcode_for_file_access(),
2076 : errmsg("could not stat file \"%s\": %m", path)));
2077 :
2078 13 : return ret == 0;
2079 : }
|