Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * snapbuild.c
4 : *
5 : * Infrastructure for building historic catalog snapshots based on contents
6 : * of the WAL, for the purpose of decoding heapam.c style values in the
7 : * WAL.
8 : *
9 : * NOTES:
10 : *
11 : * We build snapshots which can *only* be used to read catalog contents and we
12 : * do so by reading and interpreting the WAL stream. The aim is to build a
13 : * snapshot that behaves the same as a freshly taken MVCC snapshot would have
14 : * at the time the XLogRecord was generated.
15 : *
16 : * To build the snapshots we reuse the infrastructure built for Hot
17 : * Standby. The in-memory snapshots we build look different than HS' because
18 : * we have different needs. To successfully decode data from the WAL we only
19 : * need to access catalog tables and (sys|rel|cat)cache, not the actual user
20 : * tables since the data we decode is wholly contained in the WAL
21 : * records. Also, our snapshots need to be different in comparison to normal
22 : * MVCC ones because in contrast to those we cannot fully rely on the clog and
23 : * pg_subtrans for information about committed transactions because they might
24 : * commit in the future from the POV of the WAL entry we're currently
25 : * decoding. This definition has the advantage that we only need to prevent
26 : * removal of catalog rows, while normal table's rows can still be
27 : * removed. This is achieved by using the replication slot mechanism.
28 : *
29 : * As the percentage of transactions modifying the catalog normally is fairly
30 : * small in comparisons to ones only manipulating user data, we keep track of
31 : * the committed catalog modifying ones inside [xmin, xmax) instead of keeping
32 : * track of all running transactions like it's done in a normal snapshot. Note
33 : * that we're generally only looking at transactions that have acquired an
34 : * xid. That is we keep a list of transactions between snapshot->(xmin, xmax)
35 : * that we consider committed, everything else is considered aborted/in
36 : * progress. That also allows us not to care about subtransactions before they
37 : * have committed which means this module, in contrast to HS, doesn't have to
38 : * care about suboverflowed subtransactions and similar.
39 : *
40 : * One complexity of doing this is that to e.g. handle mixed DDL/DML
41 : * transactions we need Snapshots that see intermediate versions of the
42 : * catalog in a transaction. During normal operation this is achieved by using
43 : * CommandIds/cmin/cmax. The problem with that however is that for space
44 : * efficiency reasons, the cmin and cmax are not included in WAL records. We
45 : * cannot read the cmin/cmax from the tuple itself, either, because it is
46 : * reset on crash recovery. Even if we could, we could not decode combocids
47 : * which are only tracked in the original backend's memory. To work around
48 : * that, heapam writes an extra WAL record (XLOG_HEAP2_NEW_CID) every time a
49 : * catalog row is modified, which includes the cmin and cmax of the
50 : * tuple. During decoding, we insert the ctid->(cmin,cmax) mappings into the
51 : * reorder buffer, and use them at visibility checks instead of the cmin/cmax
52 : * on the tuple itself. Check the reorderbuffer.c's comment above
53 : * ResolveCminCmaxDuringDecoding() for details.
54 : *
55 : * To facilitate all this we need our own visibility routine, as the normal
56 : * ones are optimized for different usecases.
57 : *
58 : * To replace the normal catalog snapshots with decoding ones use the
59 : * SetupHistoricSnapshot() and TeardownHistoricSnapshot() functions.
60 : *
61 : *
62 : *
63 : * The snapbuild machinery is starting up in several stages, as illustrated
64 : * by the following graph describing the SnapBuild->state transitions:
65 : *
66 : * +-------------------------+
67 : * +----| START |-------------+
68 : * | +-------------------------+ |
69 : * | | |
70 : * | | |
71 : * | running_xacts #1 |
72 : * | | |
73 : * | | |
74 : * | v |
75 : * | +-------------------------+ v
76 : * | | BUILDING_SNAPSHOT |------------>|
77 : * | +-------------------------+ |
78 : * | | |
79 : * | | |
80 : * | running_xacts #2, xacts from #1 finished |
81 : * | | |
82 : * | | |
83 : * | v |
84 : * | +-------------------------+ v
85 : * | | FULL_SNAPSHOT |------------>|
86 : * | +-------------------------+ |
87 : * | | |
88 : * running_xacts | saved snapshot
89 : * with zero xacts | at running_xacts's lsn
90 : * | | |
91 : * | running_xacts with xacts from #2 finished |
92 : * | | |
93 : * | v |
94 : * | +-------------------------+ |
95 : * +--->|SNAPBUILD_CONSISTENT |<------------+
96 : * +-------------------------+
97 : *
98 : * Initially the machinery is in the START stage. When an xl_running_xacts
99 : * record is read that is sufficiently new (above the safe xmin horizon),
100 : * there's a state transition. If there were no running xacts when the
101 : * xl_running_xacts record was generated, we'll directly go into CONSISTENT
102 : * state, otherwise we'll switch to the BUILDING_SNAPSHOT state. Having a full
103 : * snapshot means that all transactions that start henceforth can be decoded
104 : * in their entirety, but transactions that started previously can't. In
105 : * FULL_SNAPSHOT we'll switch into CONSISTENT once all those previously
106 : * running transactions have committed or aborted.
107 : *
108 : * Only transactions that commit after CONSISTENT state has been reached will
109 : * be replayed, even though they might have started while still in
110 : * FULL_SNAPSHOT. That ensures that we'll reach a point where no previous
111 : * changes has been exported, but all the following ones will be. That point
112 : * is a convenient point to initialize replication from, which is why we
113 : * export a snapshot at that point, which *can* be used to read normal data.
114 : *
115 : * Copyright (c) 2012-2026, PostgreSQL Global Development Group
116 : *
117 : * IDENTIFICATION
118 : * src/backend/replication/logical/snapbuild.c
119 : *
120 : *-------------------------------------------------------------------------
121 : */
122 :
123 : #include "postgres.h"
124 :
125 : #include <sys/stat.h>
126 : #include <unistd.h>
127 :
128 : #include "access/heapam_xlog.h"
129 : #include "access/transam.h"
130 : #include "access/xact.h"
131 : #include "common/file_utils.h"
132 : #include "miscadmin.h"
133 : #include "pgstat.h"
134 : #include "replication/logical.h"
135 : #include "replication/reorderbuffer.h"
136 : #include "replication/snapbuild.h"
137 : #include "replication/snapbuild_internal.h"
138 : #include "storage/fd.h"
139 : #include "storage/lmgr.h"
140 : #include "storage/proc.h"
141 : #include "storage/procarray.h"
142 : #include "storage/standby.h"
143 : #include "utils/builtins.h"
144 : #include "utils/memutils.h"
145 : #include "utils/snapmgr.h"
146 : #include "utils/snapshot.h"
147 : #include "utils/wait_event.h"
148 :
149 :
150 : /*
151 : * Starting a transaction -- which we need to do while exporting a snapshot --
152 : * removes knowledge about the previously used resowner, so we save it here.
153 : */
154 : static ResourceOwner SavedResourceOwnerDuringExport = NULL;
155 : static bool ExportInProgress = false;
156 :
157 : /*
158 : * If a backend is going to do logical decoding and the output plugin does
159 : * not need to access shared catalogs, setting this variable to false can make
160 : * the decoding startup faster. In particular, the backend will not need to
161 : * wait for completion of already running transactions in other databases.
162 : */
163 : bool accessSharedCatalogsInDecoding = true;
164 :
165 : /* ->committed and ->catchange manipulation */
166 : static void SnapBuildPurgeOlderTxn(SnapBuild *builder);
167 :
168 : /* snapshot building/manipulation/distribution functions */
169 : static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder);
170 :
171 : static void SnapBuildFreeSnapshot(Snapshot snap);
172 :
173 : static void SnapBuildSnapIncRefcount(Snapshot snap);
174 :
175 : static void SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid);
176 :
177 : static inline bool SnapBuildXidHasCatalogChanges(SnapBuild *builder, TransactionId xid,
178 : uint32 xinfo);
179 :
180 : /* xlog reading helper functions for SnapBuildProcessRunningXacts */
181 : static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn,
182 : xl_running_xacts *running);
183 : static void SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff);
184 :
185 : /* serialization functions */
186 : static void SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn);
187 : static bool SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn);
188 : static void SnapBuildRestoreContents(int fd, void *dest, Size size, const char *path);
189 :
190 : /*
191 : * Allocate a new snapshot builder.
192 : *
193 : * xmin_horizon is the xid >= which we can be sure no catalog rows have been
194 : * removed, start_lsn is the LSN >= we want to replay commits.
195 : */
196 : SnapBuild *
197 1220 : AllocateSnapshotBuilder(ReorderBuffer *reorder,
198 : TransactionId xmin_horizon,
199 : XLogRecPtr start_lsn,
200 : bool need_full_snapshot,
201 : bool in_slot_creation,
202 : XLogRecPtr two_phase_at)
203 : {
204 : MemoryContext context;
205 : MemoryContext oldcontext;
206 : SnapBuild *builder;
207 :
208 : /* allocate memory in own context, to have better accountability */
209 1220 : context = AllocSetContextCreate(CurrentMemoryContext,
210 : "snapshot builder context",
211 : ALLOCSET_DEFAULT_SIZES);
212 1220 : oldcontext = MemoryContextSwitchTo(context);
213 :
214 1220 : builder = palloc0_object(SnapBuild);
215 :
216 1220 : builder->state = SNAPBUILD_START;
217 1220 : builder->context = context;
218 1220 : builder->reorder = reorder;
219 : /* Other struct members initialized by zeroing via palloc0 above */
220 :
221 1220 : builder->committed.xcnt = 0;
222 1220 : builder->committed.xcnt_space = 128; /* arbitrary number */
223 1220 : builder->committed.xip =
224 1220 : palloc0_array(TransactionId, builder->committed.xcnt_space);
225 1220 : builder->committed.includes_all_transactions = true;
226 :
227 1220 : builder->catchange.xcnt = 0;
228 1220 : builder->catchange.xip = NULL;
229 :
230 1220 : builder->initial_xmin_horizon = xmin_horizon;
231 1220 : builder->start_decoding_at = start_lsn;
232 1220 : builder->in_slot_creation = in_slot_creation;
233 1220 : builder->building_full_snapshot = need_full_snapshot;
234 1220 : builder->two_phase_at = two_phase_at;
235 :
236 1220 : MemoryContextSwitchTo(oldcontext);
237 :
238 : /* The default is that shared catalog are used. */
239 1220 : accessSharedCatalogsInDecoding = true;
240 :
241 1220 : return builder;
242 : }
243 :
244 : /*
245 : * Free a snapshot builder.
246 : */
247 : void
248 948 : FreeSnapshotBuilder(SnapBuild *builder)
249 : {
250 948 : MemoryContext context = builder->context;
251 :
252 : /* free snapshot explicitly, that contains some error checking */
253 948 : if (builder->snapshot != NULL)
254 : {
255 231 : SnapBuildSnapDecRefcount(builder->snapshot);
256 231 : builder->snapshot = NULL;
257 : }
258 :
259 : /* The default is that shared catalog are used. */
260 948 : accessSharedCatalogsInDecoding = true;
261 :
262 : /* other resources are deallocated via memory context reset */
263 948 : MemoryContextDelete(context);
264 948 : }
265 :
266 : /*
267 : * Free an unreferenced snapshot that has previously been built by us.
268 : */
269 : static void
270 1834 : SnapBuildFreeSnapshot(Snapshot snap)
271 : {
272 : /* make sure we don't get passed an external snapshot */
273 : Assert(snap->snapshot_type == SNAPSHOT_HISTORIC_MVCC);
274 :
275 : /* make sure nobody modified our snapshot */
276 : Assert(snap->curcid == FirstCommandId);
277 : Assert(!snap->suboverflowed);
278 : Assert(!snap->takenDuringRecovery);
279 : Assert(snap->regd_count == 0);
280 :
281 : /* slightly more likely, so it's checked even without c-asserts */
282 1834 : if (snap->copied)
283 0 : elog(ERROR, "cannot free a copied snapshot");
284 :
285 1834 : if (snap->active_count)
286 0 : elog(ERROR, "cannot free an active snapshot");
287 :
288 1834 : pfree(snap);
289 1834 : }
290 :
291 : /*
292 : * In which state of snapshot building are we?
293 : */
294 : SnapBuildState
295 2247182 : SnapBuildCurrentState(SnapBuild *builder)
296 : {
297 2247182 : return builder->state;
298 : }
299 :
300 : /*
301 : * Return the LSN at which the two-phase decoding was first enabled.
302 : */
303 : XLogRecPtr
304 33 : SnapBuildGetTwoPhaseAt(SnapBuild *builder)
305 : {
306 33 : return builder->two_phase_at;
307 : }
308 :
309 : /*
310 : * Set the LSN at which two-phase decoding is enabled.
311 : */
312 : void
313 8 : SnapBuildSetTwoPhaseAt(SnapBuild *builder, XLogRecPtr ptr)
314 : {
315 8 : builder->two_phase_at = ptr;
316 8 : }
317 :
318 : /*
319 : * Should the contents of transaction ending at 'ptr' be decoded?
320 : */
321 : bool
322 461869 : SnapBuildXactNeedsSkip(SnapBuild *builder, XLogRecPtr ptr)
323 : {
324 461869 : return ptr < builder->start_decoding_at;
325 : }
326 :
327 : /*
328 : * Increase refcount of a snapshot.
329 : *
330 : * This is used when handing out a snapshot to some external resource or when
331 : * adding a Snapshot as builder->snapshot.
332 : */
333 : static void
334 7954 : SnapBuildSnapIncRefcount(Snapshot snap)
335 : {
336 7954 : snap->active_count++;
337 7954 : }
338 :
339 : /*
340 : * Decrease refcount of a snapshot and free if the refcount reaches zero.
341 : *
342 : * Externally visible, so that external resources that have been handed an
343 : * IncRef'ed Snapshot can adjust its refcount easily.
344 : */
345 : void
346 7644 : SnapBuildSnapDecRefcount(Snapshot snap)
347 : {
348 : /* make sure we don't get passed an external snapshot */
349 : Assert(snap->snapshot_type == SNAPSHOT_HISTORIC_MVCC);
350 :
351 : /* make sure nobody modified our snapshot */
352 : Assert(snap->curcid == FirstCommandId);
353 : Assert(!snap->suboverflowed);
354 : Assert(!snap->takenDuringRecovery);
355 :
356 : Assert(snap->regd_count == 0);
357 :
358 : Assert(snap->active_count > 0);
359 :
360 : /* slightly more likely, so it's checked even without casserts */
361 7644 : if (snap->copied)
362 0 : elog(ERROR, "cannot free a copied snapshot");
363 :
364 7644 : snap->active_count--;
365 7644 : if (snap->active_count == 0)
366 1834 : SnapBuildFreeSnapshot(snap);
367 7644 : }
368 :
369 : /*
370 : * Build a new snapshot, based on currently committed catalog-modifying
371 : * transactions.
372 : *
373 : * In-progress transactions with catalog access are *not* allowed to modify
374 : * these snapshots; they have to copy them and fill in appropriate ->curcid
375 : * and ->subxip/subxcnt values.
376 : */
377 : static Snapshot
378 2340 : SnapBuildBuildSnapshot(SnapBuild *builder)
379 : {
380 : Snapshot snapshot;
381 : Size ssize;
382 :
383 : Assert(builder->state >= SNAPBUILD_FULL_SNAPSHOT);
384 :
385 2340 : ssize = sizeof(SnapshotData)
386 2340 : + sizeof(TransactionId) * builder->committed.xcnt
387 2340 : + sizeof(TransactionId) * 1 /* toplevel xid */ ;
388 :
389 2340 : snapshot = MemoryContextAllocZero(builder->context, ssize);
390 :
391 2340 : snapshot->snapshot_type = SNAPSHOT_HISTORIC_MVCC;
392 :
393 : /*
394 : * We misuse the original meaning of SnapshotData's xip and subxip fields
395 : * to make the more fitting for our needs.
396 : *
397 : * In the 'xip' array we store transactions that have to be treated as
398 : * committed. Since we will only ever look at tuples from transactions
399 : * that have modified the catalog it's more efficient to store those few
400 : * that exist between xmin and xmax (frequently there are none).
401 : *
402 : * Snapshots that are used in transactions that have modified the catalog
403 : * also use the 'subxip' array to store their toplevel xid and all the
404 : * subtransaction xids so we can recognize when we need to treat rows as
405 : * visible that are not in xip but still need to be visible. Subxip only
406 : * gets filled when the transaction is copied into the context of a
407 : * catalog modifying transaction since we otherwise share a snapshot
408 : * between transactions. As long as a txn hasn't modified the catalog it
409 : * doesn't need to treat any uncommitted rows as visible, so there is no
410 : * need for those xids.
411 : *
412 : * Both arrays are qsort'ed so that we can use bsearch() on them.
413 : */
414 : Assert(TransactionIdIsNormal(builder->xmin));
415 : Assert(TransactionIdIsNormal(builder->xmax));
416 :
417 2340 : snapshot->xmin = builder->xmin;
418 2340 : snapshot->xmax = builder->xmax;
419 :
420 : /* store all transactions to be treated as committed by this snapshot */
421 2340 : snapshot->xip =
422 2340 : (TransactionId *) ((char *) snapshot + sizeof(SnapshotData));
423 2340 : snapshot->xcnt = builder->committed.xcnt;
424 2340 : memcpy(snapshot->xip,
425 2340 : builder->committed.xip,
426 2340 : builder->committed.xcnt * sizeof(TransactionId));
427 :
428 : /* sort so we can bsearch() */
429 2340 : qsort(snapshot->xip, snapshot->xcnt, sizeof(TransactionId), xidComparator);
430 :
431 : /*
432 : * Initially, subxip is empty, i.e. it's a snapshot to be used by
433 : * transactions that don't modify the catalog. Will be filled by
434 : * ReorderBufferCopySnap() if necessary.
435 : */
436 2340 : snapshot->subxcnt = 0;
437 2340 : snapshot->subxip = NULL;
438 :
439 2340 : snapshot->suboverflowed = false;
440 2340 : snapshot->takenDuringRecovery = false;
441 2340 : snapshot->copied = false;
442 2340 : snapshot->curcid = FirstCommandId;
443 2340 : snapshot->active_count = 0;
444 2340 : snapshot->regd_count = 0;
445 2340 : snapshot->snapXactCompletionCount = 0;
446 :
447 2340 : return snapshot;
448 : }
449 :
450 : /*
451 : * Build the initial slot snapshot and convert it to a normal snapshot that
452 : * is understood by HeapTupleSatisfiesMVCC.
453 : *
454 : * The snapshot will be usable directly in current transaction or exported
455 : * for loading in different transaction.
456 : */
457 : Snapshot
458 222 : SnapBuildInitialSnapshot(SnapBuild *builder)
459 : {
460 : Snapshot snap;
461 : TransactionId xid;
462 : TransactionId safeXid;
463 : TransactionId *newxip;
464 222 : int newxcnt = 0;
465 :
466 : Assert(XactIsoLevel == XACT_REPEATABLE_READ);
467 : Assert(builder->building_full_snapshot);
468 :
469 : /* don't allow older snapshots */
470 222 : InvalidateCatalogSnapshot(); /* about to overwrite MyProc->xmin */
471 222 : if (HaveRegisteredOrActiveSnapshot())
472 0 : elog(ERROR, "cannot build an initial slot snapshot when snapshots exist");
473 : Assert(!HistoricSnapshotActive());
474 :
475 222 : if (builder->state != SNAPBUILD_CONSISTENT)
476 0 : elog(ERROR, "cannot build an initial slot snapshot before reaching a consistent state");
477 :
478 222 : if (!builder->committed.includes_all_transactions)
479 0 : elog(ERROR, "cannot build an initial slot snapshot, not all transactions are monitored anymore");
480 :
481 : /* so we don't overwrite the existing value */
482 222 : if (TransactionIdIsValid(MyProc->xmin))
483 0 : elog(ERROR, "cannot build an initial slot snapshot when MyProc->xmin already is valid");
484 :
485 222 : snap = SnapBuildBuildSnapshot(builder);
486 :
487 : /*
488 : * We know that snap->xmin is alive, enforced by the logical xmin
489 : * mechanism. Due to that we can do this without locks, we're only
490 : * changing our own value.
491 : *
492 : * Building an initial snapshot is expensive and an unenforced xmin
493 : * horizon would have bad consequences, therefore always double-check that
494 : * the horizon is enforced.
495 : */
496 222 : LWLockAcquire(ProcArrayLock, LW_SHARED);
497 222 : safeXid = GetOldestSafeDecodingTransactionId(false);
498 222 : LWLockRelease(ProcArrayLock);
499 :
500 222 : if (TransactionIdFollows(safeXid, snap->xmin))
501 0 : elog(ERROR, "cannot build an initial slot snapshot as oldest safe xid %u follows snapshot's xmin %u",
502 : safeXid, snap->xmin);
503 :
504 222 : MyProc->xmin = snap->xmin;
505 :
506 : /* allocate in transaction context */
507 222 : newxip = palloc_array(TransactionId, GetMaxSnapshotXidCount());
508 :
509 : /*
510 : * snapbuild.c builds transactions in an "inverted" manner, which means it
511 : * stores committed transactions in ->xip, not ones in progress. Build a
512 : * classical snapshot by marking all non-committed transactions as
513 : * in-progress. This can be expensive.
514 : */
515 222 : for (xid = snap->xmin; NormalTransactionIdPrecedes(xid, snap->xmax);)
516 : {
517 : void *test;
518 :
519 : /*
520 : * Check whether transaction committed using the decoding snapshot
521 : * meaning of ->xip.
522 : */
523 0 : test = bsearch(&xid, snap->xip, snap->xcnt,
524 : sizeof(TransactionId), xidComparator);
525 :
526 0 : if (test == NULL)
527 : {
528 0 : if (newxcnt >= GetMaxSnapshotXidCount())
529 0 : ereport(ERROR,
530 : (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
531 : errmsg("initial slot snapshot too large")));
532 :
533 0 : newxip[newxcnt++] = xid;
534 : }
535 :
536 0 : TransactionIdAdvance(xid);
537 : }
538 :
539 : /* adjust remaining snapshot fields as needed */
540 222 : snap->snapshot_type = SNAPSHOT_MVCC;
541 222 : snap->xcnt = newxcnt;
542 222 : snap->xip = newxip;
543 :
544 222 : return snap;
545 : }
546 :
547 : /*
548 : * Export a snapshot so it can be set in another session with SET TRANSACTION
549 : * SNAPSHOT.
550 : *
551 : * For that we need to start a transaction in the current backend as the
552 : * importing side checks whether the source transaction is still open to make
553 : * sure the xmin horizon hasn't advanced since then.
554 : */
555 : const char *
556 1 : SnapBuildExportSnapshot(SnapBuild *builder)
557 : {
558 : Snapshot snap;
559 : char *snapname;
560 :
561 1 : if (IsTransactionOrTransactionBlock())
562 0 : elog(ERROR, "cannot export a snapshot from within a transaction");
563 :
564 1 : if (SavedResourceOwnerDuringExport)
565 0 : elog(ERROR, "can only export one snapshot at a time");
566 :
567 1 : SavedResourceOwnerDuringExport = CurrentResourceOwner;
568 1 : ExportInProgress = true;
569 :
570 1 : StartTransactionCommand();
571 :
572 : /* There doesn't seem to a nice API to set these */
573 1 : XactIsoLevel = XACT_REPEATABLE_READ;
574 1 : XactReadOnly = true;
575 :
576 1 : snap = SnapBuildInitialSnapshot(builder);
577 :
578 : /*
579 : * now that we've built a plain snapshot, make it active and use the
580 : * normal mechanisms for exporting it
581 : */
582 1 : snapname = ExportSnapshot(snap);
583 :
584 1 : ereport(LOG,
585 : (errmsg_plural("exported logical decoding snapshot: \"%s\" with %u transaction ID",
586 : "exported logical decoding snapshot: \"%s\" with %u transaction IDs",
587 : snap->xcnt,
588 : snapname, snap->xcnt)));
589 1 : return snapname;
590 : }
591 :
592 : /*
593 : * Ensure there is a snapshot and if not build one for current transaction.
594 : */
595 : Snapshot
596 8 : SnapBuildGetOrBuildSnapshot(SnapBuild *builder)
597 : {
598 : Assert(builder->state == SNAPBUILD_CONSISTENT);
599 :
600 : /* only build a new snapshot if we don't have a prebuilt one */
601 8 : if (builder->snapshot == NULL)
602 : {
603 1 : builder->snapshot = SnapBuildBuildSnapshot(builder);
604 : /* increase refcount for the snapshot builder */
605 1 : SnapBuildSnapIncRefcount(builder->snapshot);
606 : }
607 :
608 8 : return builder->snapshot;
609 : }
610 :
611 : /*
612 : * Reset a previously SnapBuildExportSnapshot()'ed snapshot if there is
613 : * any. Aborts the previously started transaction and resets the resource
614 : * owner back to its original value.
615 : */
616 : void
617 5847 : SnapBuildClearExportedSnapshot(void)
618 : {
619 : ResourceOwner tmpResOwner;
620 :
621 : /* nothing exported, that is the usual case */
622 5847 : if (!ExportInProgress)
623 5846 : return;
624 :
625 1 : if (!IsTransactionState())
626 0 : elog(ERROR, "clearing exported snapshot in wrong transaction state");
627 :
628 : /*
629 : * AbortCurrentTransaction() takes care of resetting the snapshot state,
630 : * so remember SavedResourceOwnerDuringExport.
631 : */
632 1 : tmpResOwner = SavedResourceOwnerDuringExport;
633 :
634 : /* make sure nothing could have ever happened */
635 1 : AbortCurrentTransaction();
636 :
637 1 : CurrentResourceOwner = tmpResOwner;
638 : }
639 :
640 : /*
641 : * Clear snapshot export state during transaction abort.
642 : */
643 : void
644 35446 : SnapBuildResetExportedSnapshotState(void)
645 : {
646 35446 : SavedResourceOwnerDuringExport = NULL;
647 35446 : ExportInProgress = false;
648 35446 : }
649 :
650 : /*
651 : * Handle the effects of a single heap change, appropriate to the current state
652 : * of the snapshot builder and returns whether changes made at (xid, lsn) can
653 : * be decoded.
654 : */
655 : bool
656 1568395 : SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
657 : {
658 : /*
659 : * We can't handle data in transactions if we haven't built a snapshot
660 : * yet, so don't store them.
661 : */
662 1568395 : if (builder->state < SNAPBUILD_FULL_SNAPSHOT)
663 0 : return false;
664 :
665 : /*
666 : * No point in keeping track of changes in transactions that we don't have
667 : * enough information about to decode. This means that they started before
668 : * we got into the SNAPBUILD_FULL_SNAPSHOT state.
669 : */
670 1568398 : if (builder->state < SNAPBUILD_CONSISTENT &&
671 3 : TransactionIdPrecedes(xid, builder->next_phase_at))
672 0 : return false;
673 :
674 : /*
675 : * If the reorderbuffer doesn't yet have a snapshot, add one now, it will
676 : * be needed to decode the change we're currently processing.
677 : */
678 1568395 : if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
679 : {
680 : /* only build a new snapshot if we don't have a prebuilt one */
681 4177 : if (builder->snapshot == NULL)
682 : {
683 462 : builder->snapshot = SnapBuildBuildSnapshot(builder);
684 : /* increase refcount for the snapshot builder */
685 462 : SnapBuildSnapIncRefcount(builder->snapshot);
686 : }
687 :
688 : /*
689 : * Increase refcount for the transaction we're handing the snapshot
690 : * out to.
691 : */
692 4177 : SnapBuildSnapIncRefcount(builder->snapshot);
693 4177 : ReorderBufferSetBaseSnapshot(builder->reorder, xid, lsn,
694 : builder->snapshot);
695 : }
696 :
697 1568395 : return true;
698 : }
699 :
700 : /*
701 : * Do CommandId/combo CID handling after reading an xl_heap_new_cid record.
702 : * This implies that a transaction has done some form of write to system
703 : * catalogs.
704 : */
705 : void
706 28203 : SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
707 : XLogRecPtr lsn, xl_heap_new_cid *xlrec)
708 : {
709 : CommandId cid;
710 :
711 : /*
712 : * we only log new_cid's if a catalog tuple was modified, so mark the
713 : * transaction as containing catalog modifications
714 : */
715 28203 : ReorderBufferXidSetCatalogChanges(builder->reorder, xid, lsn);
716 :
717 28203 : ReorderBufferAddNewTupleCids(builder->reorder, xlrec->top_xid, lsn,
718 : xlrec->target_locator, xlrec->target_tid,
719 : xlrec->cmin, xlrec->cmax,
720 : xlrec->combocid);
721 :
722 : /* figure out new command id */
723 28203 : if (xlrec->cmin != InvalidCommandId &&
724 23886 : xlrec->cmax != InvalidCommandId)
725 3323 : cid = Max(xlrec->cmin, xlrec->cmax);
726 24880 : else if (xlrec->cmax != InvalidCommandId)
727 4317 : cid = xlrec->cmax;
728 20563 : else if (xlrec->cmin != InvalidCommandId)
729 20563 : cid = xlrec->cmin;
730 : else
731 : {
732 0 : cid = InvalidCommandId; /* silence compiler */
733 0 : elog(ERROR, "xl_heap_new_cid record without a valid CommandId");
734 : }
735 :
736 28203 : ReorderBufferAddNewCommandId(builder->reorder, xid, lsn, cid + 1);
737 28203 : }
738 :
739 : /*
740 : * Add a new Snapshot and invalidation messages to all transactions we're
741 : * decoding that currently are in-progress so they can see new catalog contents
742 : * made by the transaction that just committed. This is necessary because those
743 : * in-progress transactions will use the new catalog's contents from here on
744 : * (at the very least everything they do needs to be compatible with newer
745 : * catalog contents).
746 : */
747 : static void
748 1647 : SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid)
749 : {
750 : dlist_iter txn_i;
751 : ReorderBufferTXN *txn;
752 :
753 : /*
754 : * Iterate through all toplevel transactions. This can include
755 : * subtransactions which we just don't yet know to be that, but that's
756 : * fine, they will just get an unnecessary snapshot and invalidations
757 : * queued.
758 : */
759 3328 : dlist_foreach(txn_i, &builder->reorder->toplevel_by_lsn)
760 : {
761 1681 : txn = dlist_container(ReorderBufferTXN, node, txn_i.cur);
762 :
763 : Assert(TransactionIdIsValid(txn->xid));
764 :
765 : /*
766 : * If we don't have a base snapshot yet, there are no changes in this
767 : * transaction which in turn implies we don't yet need a snapshot at
768 : * all. We'll add a snapshot when the first change gets queued.
769 : *
770 : * Similarly, we don't need to add invalidations to a transaction
771 : * whose base snapshot is not yet set. Once a base snapshot is built,
772 : * it will include the xids of committed transactions that have
773 : * modified the catalog, thus reflecting the new catalog contents. The
774 : * existing catalog cache will have already been invalidated after
775 : * processing the invalidations in the transaction that modified
776 : * catalogs, ensuring that a fresh cache is constructed during
777 : * decoding.
778 : *
779 : * NB: This works correctly even for subtransactions because
780 : * ReorderBufferAssignChild() takes care to transfer the base snapshot
781 : * to the top-level transaction, and while iterating the changequeue
782 : * we'll get the change from the subtxn.
783 : */
784 1681 : if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, txn->xid))
785 2 : continue;
786 :
787 : /*
788 : * We don't need to add snapshot or invalidations to prepared
789 : * transactions as they should not see the new catalog contents.
790 : */
791 1679 : if (rbtxn_is_prepared(txn))
792 28 : continue;
793 :
794 1651 : elog(DEBUG2, "adding a new snapshot and invalidations to %u at %X/%08X",
795 : txn->xid, LSN_FORMAT_ARGS(lsn));
796 :
797 : /*
798 : * increase the snapshot's refcount for the transaction we are handing
799 : * it out to
800 : */
801 1651 : SnapBuildSnapIncRefcount(builder->snapshot);
802 1651 : ReorderBufferAddSnapshot(builder->reorder, txn->xid, lsn,
803 : builder->snapshot);
804 :
805 : /*
806 : * Add invalidation messages to the reorder buffer of in-progress
807 : * transactions except the current committed transaction, for which we
808 : * will execute invalidations at the end.
809 : *
810 : * It is required, otherwise, we will end up using the stale catcache
811 : * contents built by the current transaction even after its decoding,
812 : * which should have been invalidated due to concurrent catalog
813 : * changing transaction.
814 : *
815 : * Distribute only the invalidation messages generated by the current
816 : * committed transaction. Invalidation messages received from other
817 : * transactions would have already been propagated to the relevant
818 : * in-progress transactions. This transaction would have processed
819 : * those invalidations, ensuring that subsequent transactions observe
820 : * a consistent cache state.
821 : */
822 1651 : if (txn->xid != xid)
823 : {
824 : uint32 ninvalidations;
825 32 : SharedInvalidationMessage *msgs = NULL;
826 :
827 32 : ninvalidations = ReorderBufferGetInvalidations(builder->reorder,
828 : xid, &msgs);
829 :
830 32 : if (ninvalidations > 0)
831 : {
832 : Assert(msgs != NULL);
833 :
834 28 : ReorderBufferAddDistributedInvalidations(builder->reorder,
835 : txn->xid, lsn,
836 : ninvalidations, msgs);
837 : }
838 : }
839 : }
840 1647 : }
841 :
842 : /*
843 : * Keep track of a new catalog changing transaction that has committed.
844 : */
845 : static void
846 1656 : SnapBuildAddCommittedTxn(SnapBuild *builder, TransactionId xid)
847 : {
848 : Assert(TransactionIdIsValid(xid));
849 :
850 1656 : if (builder->committed.xcnt == builder->committed.xcnt_space)
851 : {
852 0 : builder->committed.xcnt_space = builder->committed.xcnt_space * 2 + 1;
853 :
854 0 : elog(DEBUG1, "increasing space for committed transactions to %u",
855 : (uint32) builder->committed.xcnt_space);
856 :
857 0 : builder->committed.xip = repalloc_array(builder->committed.xip,
858 : TransactionId,
859 : builder->committed.xcnt_space);
860 : }
861 :
862 : /*
863 : * TODO: It might make sense to keep the array sorted here instead of
864 : * doing it every time we build a new snapshot. On the other hand this
865 : * gets called repeatedly when a transaction with subtransactions commits.
866 : */
867 1656 : builder->committed.xip[builder->committed.xcnt++] = xid;
868 1656 : }
869 :
870 : /*
871 : * Remove knowledge about transactions we treat as committed or containing catalog
872 : * changes that are smaller than ->xmin. Those won't ever get checked via
873 : * the ->committed or ->catchange array, respectively. The committed xids will
874 : * get checked via the clog machinery.
875 : *
876 : * We can ideally remove the transaction from catchange array once it is
877 : * finished (committed/aborted) but that could be costly as we need to maintain
878 : * the xids order in the array.
879 : */
880 : static void
881 549 : SnapBuildPurgeOlderTxn(SnapBuild *builder)
882 : {
883 : int off;
884 : TransactionId *workspace;
885 549 : int surviving_xids = 0;
886 :
887 : /* not ready yet */
888 549 : if (!TransactionIdIsNormal(builder->xmin))
889 0 : return;
890 :
891 : /* TODO: Neater algorithm than just copying and iterating? */
892 : workspace =
893 549 : MemoryContextAlloc(builder->context,
894 549 : builder->committed.xcnt * sizeof(TransactionId));
895 :
896 : /* copy xids that still are interesting to workspace */
897 971 : for (off = 0; off < builder->committed.xcnt; off++)
898 : {
899 422 : if (NormalTransactionIdPrecedes(builder->committed.xip[off],
900 : builder->xmin))
901 : ; /* remove */
902 : else
903 1 : workspace[surviving_xids++] = builder->committed.xip[off];
904 : }
905 :
906 : /* copy workspace back to persistent state */
907 549 : memcpy(builder->committed.xip, workspace,
908 : surviving_xids * sizeof(TransactionId));
909 :
910 549 : elog(DEBUG3, "purged committed transactions from %u to %u, xmin: %u, xmax: %u",
911 : (uint32) builder->committed.xcnt, (uint32) surviving_xids,
912 : builder->xmin, builder->xmax);
913 549 : builder->committed.xcnt = surviving_xids;
914 :
915 549 : pfree(workspace);
916 :
917 : /*
918 : * Purge xids in ->catchange as well. The purged array must also be sorted
919 : * in xidComparator order.
920 : */
921 549 : if (builder->catchange.xcnt > 0)
922 : {
923 : /*
924 : * Since catchange.xip is sorted, we find the lower bound of xids that
925 : * are still interesting.
926 : */
927 9 : for (off = 0; off < builder->catchange.xcnt; off++)
928 : {
929 6 : if (TransactionIdFollowsOrEquals(builder->catchange.xip[off],
930 : builder->xmin))
931 1 : break;
932 : }
933 :
934 4 : surviving_xids = builder->catchange.xcnt - off;
935 :
936 4 : if (surviving_xids > 0)
937 : {
938 1 : memmove(builder->catchange.xip, &(builder->catchange.xip[off]),
939 : surviving_xids * sizeof(TransactionId));
940 : }
941 : else
942 : {
943 3 : pfree(builder->catchange.xip);
944 3 : builder->catchange.xip = NULL;
945 : }
946 :
947 4 : elog(DEBUG3, "purged catalog modifying transactions from %u to %u, xmin: %u, xmax: %u",
948 : (uint32) builder->catchange.xcnt, (uint32) surviving_xids,
949 : builder->xmin, builder->xmax);
950 4 : builder->catchange.xcnt = surviving_xids;
951 : }
952 : }
953 :
954 : /*
955 : * Handle everything that needs to be done when a transaction commits
956 : */
957 : void
958 3965 : SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
959 : int nsubxacts, TransactionId *subxacts, uint32 xinfo)
960 : {
961 : int nxact;
962 :
963 3965 : bool needs_snapshot = false;
964 3965 : bool needs_timetravel = false;
965 3965 : bool sub_needs_timetravel = false;
966 :
967 3965 : TransactionId xmax = xid;
968 :
969 : /*
970 : * Transactions preceding BUILDING_SNAPSHOT will neither be decoded, nor
971 : * will they be part of a snapshot. So we don't need to record anything.
972 : */
973 3965 : if (builder->state == SNAPBUILD_START ||
974 3965 : (builder->state == SNAPBUILD_BUILDING_SNAPSHOT &&
975 0 : TransactionIdPrecedes(xid, builder->next_phase_at)))
976 : {
977 : /* ensure that only commits after this are getting replayed */
978 0 : if (builder->start_decoding_at <= lsn)
979 0 : builder->start_decoding_at = lsn + 1;
980 0 : return;
981 : }
982 :
983 3965 : if (builder->state < SNAPBUILD_CONSISTENT)
984 : {
985 : /* ensure that only commits after this are getting replayed */
986 5 : if (builder->start_decoding_at <= lsn)
987 2 : builder->start_decoding_at = lsn + 1;
988 :
989 : /*
990 : * If building an exportable snapshot, force xid to be tracked, even
991 : * if the transaction didn't modify the catalog.
992 : */
993 5 : if (builder->building_full_snapshot)
994 : {
995 0 : needs_timetravel = true;
996 : }
997 : }
998 :
999 5201 : for (nxact = 0; nxact < nsubxacts; nxact++)
1000 : {
1001 1236 : TransactionId subxid = subxacts[nxact];
1002 :
1003 : /*
1004 : * Add subtransaction to base snapshot if catalog modifying, we don't
1005 : * distinguish to toplevel transactions there.
1006 : */
1007 1236 : if (SnapBuildXidHasCatalogChanges(builder, subxid, xinfo))
1008 : {
1009 9 : sub_needs_timetravel = true;
1010 9 : needs_snapshot = true;
1011 :
1012 9 : elog(DEBUG1, "found subtransaction %u:%u with catalog changes",
1013 : xid, subxid);
1014 :
1015 9 : SnapBuildAddCommittedTxn(builder, subxid);
1016 :
1017 9 : if (NormalTransactionIdFollows(subxid, xmax))
1018 9 : xmax = subxid;
1019 : }
1020 :
1021 : /*
1022 : * If we're forcing timetravel we also need visibility information
1023 : * about subtransaction, so keep track of subtransaction's state, even
1024 : * if not catalog modifying. Don't need to distribute a snapshot in
1025 : * that case.
1026 : */
1027 1227 : else if (needs_timetravel)
1028 : {
1029 0 : SnapBuildAddCommittedTxn(builder, subxid);
1030 0 : if (NormalTransactionIdFollows(subxid, xmax))
1031 0 : xmax = subxid;
1032 : }
1033 : }
1034 :
1035 : /* if top-level modified catalog, it'll need a snapshot */
1036 3965 : if (SnapBuildXidHasCatalogChanges(builder, xid, xinfo))
1037 : {
1038 1646 : elog(DEBUG2, "found top level transaction %u, with catalog changes",
1039 : xid);
1040 1646 : needs_snapshot = true;
1041 1646 : needs_timetravel = true;
1042 1646 : SnapBuildAddCommittedTxn(builder, xid);
1043 : }
1044 2319 : else if (sub_needs_timetravel)
1045 : {
1046 : /* track toplevel txn as well, subxact alone isn't meaningful */
1047 1 : elog(DEBUG2, "forced transaction %u to do timetravel due to one of its subtransactions",
1048 : xid);
1049 1 : needs_timetravel = true;
1050 1 : SnapBuildAddCommittedTxn(builder, xid);
1051 : }
1052 2318 : else if (needs_timetravel)
1053 : {
1054 0 : elog(DEBUG2, "forced transaction %u to do timetravel", xid);
1055 :
1056 0 : SnapBuildAddCommittedTxn(builder, xid);
1057 : }
1058 :
1059 3965 : if (!needs_timetravel)
1060 : {
1061 : /* record that we cannot export a general snapshot anymore */
1062 2318 : builder->committed.includes_all_transactions = false;
1063 : }
1064 :
1065 : Assert(!needs_snapshot || needs_timetravel);
1066 :
1067 : /*
1068 : * Adjust xmax of the snapshot builder, we only do that for committed,
1069 : * catalog modifying, transactions, everything else isn't interesting for
1070 : * us since we'll never look at the respective rows.
1071 : */
1072 3965 : if (needs_timetravel &&
1073 3294 : (!TransactionIdIsValid(builder->xmax) ||
1074 1647 : TransactionIdFollowsOrEquals(xmax, builder->xmax)))
1075 : {
1076 1647 : builder->xmax = xmax;
1077 1647 : TransactionIdAdvance(builder->xmax);
1078 : }
1079 :
1080 : /* if there's any reason to build a historic snapshot, do so now */
1081 3965 : if (needs_snapshot)
1082 : {
1083 : /*
1084 : * If we haven't built a complete snapshot yet there's no need to hand
1085 : * it out, it wouldn't (and couldn't) be used anyway.
1086 : */
1087 1647 : if (builder->state < SNAPBUILD_FULL_SNAPSHOT)
1088 0 : return;
1089 :
1090 : /*
1091 : * Decrease the snapshot builder's refcount of the old snapshot, note
1092 : * that it still will be used if it has been handed out to the
1093 : * reorderbuffer earlier.
1094 : */
1095 1647 : if (builder->snapshot)
1096 1647 : SnapBuildSnapDecRefcount(builder->snapshot);
1097 :
1098 1647 : builder->snapshot = SnapBuildBuildSnapshot(builder);
1099 :
1100 : /* we might need to execute invalidations, add snapshot */
1101 1647 : if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
1102 : {
1103 8 : SnapBuildSnapIncRefcount(builder->snapshot);
1104 8 : ReorderBufferSetBaseSnapshot(builder->reorder, xid, lsn,
1105 : builder->snapshot);
1106 : }
1107 :
1108 : /* refcount of the snapshot builder for the new snapshot */
1109 1647 : SnapBuildSnapIncRefcount(builder->snapshot);
1110 :
1111 : /*
1112 : * Add a new catalog snapshot and invalidations messages to all
1113 : * currently running transactions.
1114 : */
1115 1647 : SnapBuildDistributeSnapshotAndInval(builder, lsn, xid);
1116 : }
1117 : }
1118 :
1119 : /*
1120 : * Check the reorder buffer and the snapshot to see if the given transaction has
1121 : * modified catalogs.
1122 : */
1123 : static inline bool
1124 5201 : SnapBuildXidHasCatalogChanges(SnapBuild *builder, TransactionId xid,
1125 : uint32 xinfo)
1126 : {
1127 5201 : if (ReorderBufferXidHasCatalogChanges(builder->reorder, xid))
1128 1651 : return true;
1129 :
1130 : /*
1131 : * The transactions that have changed catalogs must have invalidation
1132 : * info.
1133 : */
1134 3550 : if (!(xinfo & XACT_XINFO_HAS_INVALS))
1135 3542 : return false;
1136 :
1137 : /* Check the catchange XID array */
1138 12 : return ((builder->catchange.xcnt > 0) &&
1139 4 : (bsearch(&xid, builder->catchange.xip, builder->catchange.xcnt,
1140 : sizeof(TransactionId), xidComparator) != NULL));
1141 : }
1142 :
1143 : /* -----------------------------------
1144 : * Snapshot building functions dealing with xlog records
1145 : * -----------------------------------
1146 : */
1147 :
1148 : /*
1149 : * Process a running xacts record, and use its information to first build a
1150 : * historic snapshot and later to release resources that aren't needed
1151 : * anymore.
1152 : */
1153 : void
1154 1714 : SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running,
1155 : bool db_specific)
1156 : {
1157 : ReorderBufferTXN *txn;
1158 : TransactionId xmin;
1159 :
1160 : /*
1161 : * If we're not consistent yet, inspect the record to see whether it
1162 : * allows to get closer to being consistent. If we are consistent, dump
1163 : * our snapshot so others or we, after a restart, can use it.
1164 : */
1165 1714 : if (builder->state < SNAPBUILD_CONSISTENT)
1166 : {
1167 : /*
1168 : * To reduce the potential for unnecessarily waiting for completion of
1169 : * unrelated transactions, the caller can declare that only
1170 : * transactions of the current database are relevant at this stage.
1171 : */
1172 1187 : if (db_specific)
1173 : {
1174 : /*
1175 : * If we must only keep track of transactions running in the
1176 : * current database, we need transaction info from exactly that
1177 : * database.
1178 : */
1179 12 : if (running->dbid != MyDatabaseId)
1180 : {
1181 6 : LogStandbySnapshot(MyDatabaseId);
1182 :
1183 6 : return;
1184 : }
1185 :
1186 : /*
1187 : * We'd better be able to check during scan if the plugin does not
1188 : * lie.
1189 : */
1190 6 : if (accessSharedCatalogsInDecoding)
1191 6 : accessSharedCatalogsInDecoding = false;
1192 : }
1193 :
1194 : /* returns false if there's no point in performing cleanup just yet */
1195 1181 : if (!SnapBuildFindSnapshot(builder, lsn, running))
1196 1157 : return;
1197 : }
1198 : else
1199 527 : SnapBuildSerialize(builder, lsn);
1200 :
1201 : /*
1202 : * Database specific transaction info may exist to reach CONSISTENT state
1203 : * faster, however the code below makes no use of it. Moreover, such
1204 : * record might cause problems because the following normal (cluster-wide)
1205 : * record can have lower value of oldestRunningXid. In that case, let's
1206 : * wait with the cleanup for the next regular cluster-wide record.
1207 : */
1208 549 : if (OidIsValid(running->dbid))
1209 0 : return;
1210 :
1211 : /*
1212 : * Update range of interesting xids based on the running xacts
1213 : * information. We don't increase ->xmax using it, because once we are in
1214 : * a consistent state we can do that ourselves and much more efficiently
1215 : * so, because we only need to do it for catalog transactions since we
1216 : * only ever look at those.
1217 : *
1218 : * NB: We only increase xmax when a catalog modifying transaction commits
1219 : * (see SnapBuildCommitTxn). Because of this, xmax can be lower than
1220 : * xmin, which looks odd but is correct and actually more efficient, since
1221 : * we hit fast paths in heapam_visibility.c.
1222 : */
1223 549 : builder->xmin = running->oldestRunningXid;
1224 :
1225 : /* Remove transactions we don't need to keep track off anymore */
1226 549 : SnapBuildPurgeOlderTxn(builder);
1227 :
1228 : /*
1229 : * Advance the xmin limit for the current replication slot, to allow
1230 : * vacuum to clean up the tuples this slot has been protecting.
1231 : *
1232 : * The reorderbuffer might have an xmin among the currently running
1233 : * snapshots; use it if so. If not, we need only consider the snapshots
1234 : * we'll produce later, which can't be less than the oldest running xid in
1235 : * the record we're reading now.
1236 : */
1237 549 : xmin = ReorderBufferGetOldestXmin(builder->reorder);
1238 549 : if (xmin == InvalidTransactionId)
1239 499 : xmin = running->oldestRunningXid;
1240 549 : elog(DEBUG3, "xmin: %u, xmax: %u, oldest running: %u, oldest xmin: %u",
1241 : builder->xmin, builder->xmax, running->oldestRunningXid, xmin);
1242 549 : LogicalIncreaseXminForSlot(lsn, xmin);
1243 :
1244 : /*
1245 : * Also tell the slot where we can restart decoding from. We don't want to
1246 : * do that after every commit because changing that implies an fsync of
1247 : * the logical slot's state file, so we only do it every time we see a
1248 : * running xacts record.
1249 : *
1250 : * Do so by looking for the oldest in progress transaction (determined by
1251 : * the first LSN of any of its relevant records). Every transaction
1252 : * remembers the last location we stored the snapshot to disk before its
1253 : * beginning. That point is where we can restart from.
1254 : */
1255 :
1256 : /*
1257 : * Can't know about a serialized snapshot's location if we're not
1258 : * consistent.
1259 : */
1260 549 : if (builder->state < SNAPBUILD_CONSISTENT)
1261 17 : return;
1262 :
1263 532 : txn = ReorderBufferGetOldestTXN(builder->reorder);
1264 :
1265 : /*
1266 : * oldest ongoing txn might have started when we didn't yet serialize
1267 : * anything because we hadn't reached a consistent state yet.
1268 : */
1269 532 : if (txn != NULL && XLogRecPtrIsValid(txn->restart_decoding_lsn))
1270 27 : LogicalIncreaseRestartDecodingForSlot(lsn, txn->restart_decoding_lsn);
1271 :
1272 : /*
1273 : * No in-progress transaction, can reuse the last serialized snapshot if
1274 : * we have one.
1275 : */
1276 505 : else if (txn == NULL &&
1277 473 : XLogRecPtrIsValid(builder->reorder->current_restart_decoding_lsn) &&
1278 471 : XLogRecPtrIsValid(builder->last_serialized_snapshot))
1279 471 : LogicalIncreaseRestartDecodingForSlot(lsn,
1280 : builder->last_serialized_snapshot);
1281 : }
1282 :
1283 :
1284 : /*
1285 : * Build the start of a snapshot that's capable of decoding the catalog.
1286 : *
1287 : * Helper function for SnapBuildProcessRunningXacts() while we're not yet
1288 : * consistent.
1289 : *
1290 : * Returns true if there is a point in performing internal maintenance/cleanup
1291 : * using the xl_running_xacts record.
1292 : */
1293 : static bool
1294 1181 : SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
1295 : {
1296 : /* ---
1297 : * Build catalog decoding snapshot incrementally using information about
1298 : * the currently running transactions. There are several ways to do that:
1299 : *
1300 : * a) There were no running transactions when the xl_running_xacts record
1301 : * was inserted, jump to CONSISTENT immediately. We might find such a
1302 : * state while waiting on c)'s sub-states.
1303 : *
1304 : * b) This (in a previous run) or another decoding slot serialized a
1305 : * snapshot to disk that we can use. Can't use this method while finding
1306 : * the start point for decoding changes as the restart LSN would be an
1307 : * arbitrary LSN but we need to find the start point to extract changes
1308 : * where we won't see the data for partial transactions. Also, we cannot
1309 : * use this method when a slot needs a full snapshot for export or direct
1310 : * use, as that snapshot will only contain catalog modifying transactions.
1311 : *
1312 : * c) First incrementally build a snapshot for catalog tuples
1313 : * (BUILDING_SNAPSHOT), that requires all, already in-progress,
1314 : * transactions to finish. Every transaction starting after that
1315 : * (FULL_SNAPSHOT state), has enough information to be decoded. But
1316 : * for older running transactions no viable snapshot exists yet, so
1317 : * CONSISTENT will only be reached once all of those have finished.
1318 : * ---
1319 : */
1320 :
1321 : /*
1322 : * xl_running_xacts record is older than what we can use, we might not
1323 : * have all necessary catalog rows anymore.
1324 : */
1325 1181 : if (TransactionIdIsNormal(builder->initial_xmin_horizon) &&
1326 506 : NormalTransactionIdPrecedes(running->oldestRunningXid,
1327 : builder->initial_xmin_horizon))
1328 : {
1329 0 : ereport(DEBUG1,
1330 : errmsg_internal("skipping snapshot at %X/%08X while building logical decoding snapshot, xmin horizon too low",
1331 : LSN_FORMAT_ARGS(lsn)),
1332 : errdetail_internal("initial xmin horizon of %u vs the snapshot's %u",
1333 : builder->initial_xmin_horizon, running->oldestRunningXid));
1334 :
1335 :
1336 0 : SnapBuildWaitSnapshot(running, builder->initial_xmin_horizon);
1337 :
1338 0 : return true;
1339 : }
1340 :
1341 : /*
1342 : * a) No transaction were running, we can jump to consistent.
1343 : *
1344 : * This is not affected by races around xl_running_xacts, because we can
1345 : * miss transaction commits, but currently not transactions starting.
1346 : *
1347 : * NB: We might have already started to incrementally assemble a snapshot,
1348 : * so we need to be careful to deal with that.
1349 : */
1350 1181 : if (running->oldestRunningXid == running->nextXid)
1351 : {
1352 1149 : if (!XLogRecPtrIsValid(builder->start_decoding_at) ||
1353 649 : builder->start_decoding_at <= lsn)
1354 : /* can decode everything after this */
1355 501 : builder->start_decoding_at = lsn + 1;
1356 :
1357 : /* As no transactions were running xmin/xmax can be trivially set. */
1358 1149 : builder->xmin = running->nextXid; /* < are finished */
1359 1149 : builder->xmax = running->nextXid; /* >= are running */
1360 :
1361 : /* so we can safely use the faster comparisons */
1362 : Assert(TransactionIdIsNormal(builder->xmin));
1363 : Assert(TransactionIdIsNormal(builder->xmax));
1364 :
1365 1149 : builder->state = SNAPBUILD_CONSISTENT;
1366 1149 : builder->next_phase_at = InvalidTransactionId;
1367 :
1368 1149 : ereport(LogicalDecodingLogLevel(),
1369 : errmsg("logical decoding found consistent point at %X/%08X",
1370 : LSN_FORMAT_ARGS(lsn)),
1371 : errdetail("There are no running transactions."));
1372 :
1373 1149 : return false;
1374 : }
1375 :
1376 : /*
1377 : * b) valid on disk state and while neither building full snapshot nor
1378 : * creating a slot.
1379 : */
1380 32 : else if (!builder->building_full_snapshot &&
1381 50 : !builder->in_slot_creation &&
1382 19 : SnapBuildRestore(builder, lsn))
1383 : {
1384 : /* there won't be any state to cleanup */
1385 8 : return false;
1386 : }
1387 :
1388 : /*
1389 : * c) transition from START to BUILDING_SNAPSHOT.
1390 : *
1391 : * In START state, and a xl_running_xacts record with running xacts is
1392 : * encountered. In that case, switch to BUILDING_SNAPSHOT state, and
1393 : * record xl_running_xacts->nextXid. Once all running xacts have finished
1394 : * (i.e. they're all >= nextXid), we have a complete catalog snapshot. It
1395 : * might look that we could use xl_running_xacts's ->xids information to
1396 : * get there quicker, but that is problematic because transactions marked
1397 : * as running, might already have inserted their commit record - it's
1398 : * infeasible to change that with locking.
1399 : */
1400 24 : else if (builder->state == SNAPBUILD_START)
1401 : {
1402 13 : builder->state = SNAPBUILD_BUILDING_SNAPSHOT;
1403 13 : builder->next_phase_at = running->nextXid;
1404 :
1405 : /*
1406 : * Start with an xmin/xmax that's correct for future, when all the
1407 : * currently running transactions have finished. We'll update both
1408 : * while waiting for the pending transactions to finish.
1409 : */
1410 13 : builder->xmin = running->nextXid; /* < are finished */
1411 13 : builder->xmax = running->nextXid; /* >= are running */
1412 :
1413 : /* so we can safely use the faster comparisons */
1414 : Assert(TransactionIdIsNormal(builder->xmin));
1415 : Assert(TransactionIdIsNormal(builder->xmax));
1416 :
1417 13 : ereport(LOG,
1418 : errmsg("logical decoding found initial starting point at %X/%08X",
1419 : LSN_FORMAT_ARGS(lsn)),
1420 : errdetail("Waiting for transactions (approximately %d) older than %u to end.",
1421 : running->xcnt, running->nextXid));
1422 :
1423 13 : SnapBuildWaitSnapshot(running, running->nextXid);
1424 : }
1425 :
1426 : /*
1427 : * c) transition from BUILDING_SNAPSHOT to FULL_SNAPSHOT.
1428 : *
1429 : * In BUILDING_SNAPSHOT state, and this xl_running_xacts' oldestRunningXid
1430 : * is >= than nextXid from when we switched to BUILDING_SNAPSHOT. This
1431 : * means all transactions starting afterwards have enough information to
1432 : * be decoded. Switch to FULL_SNAPSHOT.
1433 : */
1434 17 : else if (builder->state == SNAPBUILD_BUILDING_SNAPSHOT &&
1435 6 : TransactionIdPrecedesOrEquals(builder->next_phase_at,
1436 : running->oldestRunningXid))
1437 : {
1438 5 : builder->state = SNAPBUILD_FULL_SNAPSHOT;
1439 5 : builder->next_phase_at = running->nextXid;
1440 :
1441 5 : ereport(LOG,
1442 : errmsg("logical decoding found initial consistent point at %X/%08X",
1443 : LSN_FORMAT_ARGS(lsn)),
1444 : errdetail("Waiting for transactions (approximately %d) older than %u to end.",
1445 : running->xcnt, running->nextXid));
1446 :
1447 5 : SnapBuildWaitSnapshot(running, running->nextXid);
1448 : }
1449 :
1450 : /*
1451 : * c) transition from FULL_SNAPSHOT to CONSISTENT.
1452 : *
1453 : * In FULL_SNAPSHOT state, and this xl_running_xacts' oldestRunningXid is
1454 : * >= than nextXid from when we switched to FULL_SNAPSHOT. This means all
1455 : * transactions that are currently in progress have a catalog snapshot,
1456 : * and all their changes have been collected. Switch to CONSISTENT.
1457 : */
1458 11 : else if (builder->state == SNAPBUILD_FULL_SNAPSHOT &&
1459 5 : TransactionIdPrecedesOrEquals(builder->next_phase_at,
1460 : running->oldestRunningXid))
1461 : {
1462 5 : builder->state = SNAPBUILD_CONSISTENT;
1463 5 : builder->next_phase_at = InvalidTransactionId;
1464 :
1465 5 : ereport(LogicalDecodingLogLevel(),
1466 : errmsg("logical decoding found consistent point at %X/%08X",
1467 : LSN_FORMAT_ARGS(lsn)),
1468 : errdetail("There are no old transactions anymore."));
1469 : }
1470 :
1471 : /*
1472 : * We already started to track running xacts and need to wait for all
1473 : * in-progress ones to finish. We fall through to the normal processing of
1474 : * records so incremental cleanup can be performed.
1475 : */
1476 22 : return true;
1477 : }
1478 :
1479 : /* ---
1480 : * Iterate through xids in record, wait for all older than the cutoff to
1481 : * finish. Then, if possible, log a new xl_running_xacts record.
1482 : *
1483 : * This isn't required for the correctness of decoding, but to:
1484 : * a) allow isolationtester to notice that we're currently waiting for
1485 : * something.
1486 : * b) log a new xl_running_xacts record where it'd be helpful, without having
1487 : * to wait for bgwriter or checkpointer.
1488 : * ---
1489 : */
1490 : static void
1491 18 : SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff)
1492 : {
1493 : int off;
1494 :
1495 34 : for (off = 0; off < running->xcnt; off++)
1496 : {
1497 18 : TransactionId xid = running->xids[off];
1498 :
1499 : /*
1500 : * Upper layers should prevent that we ever need to wait on ourselves.
1501 : * Check anyway, since failing to do so would either result in an
1502 : * endless wait or an Assert() failure.
1503 : */
1504 18 : if (TransactionIdIsCurrentTransactionId(xid))
1505 0 : elog(ERROR, "waiting for ourselves");
1506 :
1507 18 : if (TransactionIdFollows(xid, cutoff))
1508 0 : continue;
1509 :
1510 18 : XactLockTableWait(xid, NULL, NULL, XLTW_None);
1511 : }
1512 :
1513 : /*
1514 : * All transactions we needed to finish finished - try to ensure there is
1515 : * another xl_running_xacts record in a timely manner, without having to
1516 : * wait for bgwriter or checkpointer to log one. During recovery we can't
1517 : * enforce that, so we'll have to wait.
1518 : */
1519 16 : if (!RecoveryInProgress())
1520 : {
1521 : /*
1522 : * If the last transaction info was about specific database, so needs
1523 : * to be the next one - at least until we're in the CONSISTENT state.
1524 : */
1525 16 : LogStandbySnapshot(running->dbid);
1526 : }
1527 16 : }
1528 :
1529 : #define SnapBuildOnDiskConstantSize \
1530 : offsetof(SnapBuildOnDisk, builder)
1531 : #define SnapBuildOnDiskNotChecksummedSize \
1532 : offsetof(SnapBuildOnDisk, version)
1533 :
1534 : #define SNAPBUILD_MAGIC 0x51A1E001
1535 : #define SNAPBUILD_VERSION 6
1536 :
1537 : /*
1538 : * Store/Load a snapshot from disk, depending on the snapshot builder's state.
1539 : *
1540 : * Supposed to be used by external (i.e. not snapbuild.c) code that just read
1541 : * a record that's a potential location for a serialized snapshot.
1542 : */
1543 : void
1544 96 : SnapBuildSerializationPoint(SnapBuild *builder, XLogRecPtr lsn)
1545 : {
1546 96 : if (builder->state < SNAPBUILD_CONSISTENT)
1547 0 : SnapBuildRestore(builder, lsn);
1548 : else
1549 96 : SnapBuildSerialize(builder, lsn);
1550 96 : }
1551 :
1552 : /*
1553 : * Serialize the snapshot 'builder' at the location 'lsn' if it hasn't already
1554 : * been done by another decoding process.
1555 : */
1556 : static void
1557 623 : SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
1558 : {
1559 : Size needed_length;
1560 623 : SnapBuildOnDisk *ondisk = NULL;
1561 623 : TransactionId *catchange_xip = NULL;
1562 : MemoryContext old_ctx;
1563 : size_t catchange_xcnt;
1564 : char *ondisk_c;
1565 : int fd;
1566 : char tmppath[MAXPGPATH];
1567 : char path[MAXPGPATH];
1568 : int ret;
1569 : struct stat stat_buf;
1570 : Size sz;
1571 :
1572 : Assert(XLogRecPtrIsValid(lsn));
1573 : Assert(!XLogRecPtrIsValid(builder->last_serialized_snapshot) ||
1574 : builder->last_serialized_snapshot <= lsn);
1575 :
1576 : /*
1577 : * no point in serializing if we cannot continue to work immediately after
1578 : * restoring the snapshot
1579 : */
1580 623 : if (builder->state < SNAPBUILD_CONSISTENT)
1581 0 : return;
1582 :
1583 : /* consistent snapshots have no next phase */
1584 : Assert(builder->next_phase_at == InvalidTransactionId);
1585 :
1586 : /*
1587 : * We identify snapshots by the LSN they are valid for. We don't need to
1588 : * include timelines in the name as each LSN maps to exactly one timeline
1589 : * unless the user used pg_resetwal or similar. If a user did so, there's
1590 : * no hope continuing to decode anyway.
1591 : */
1592 623 : sprintf(path, "%s/%X-%X.snap",
1593 : PG_LOGICAL_SNAPSHOTS_DIR,
1594 623 : LSN_FORMAT_ARGS(lsn));
1595 :
1596 : /*
1597 : * first check whether some other backend already has written the snapshot
1598 : * for this LSN. It's perfectly fine if there's none, so we accept ENOENT
1599 : * as a valid state. Everything else is an unexpected error.
1600 : */
1601 623 : ret = stat(path, &stat_buf);
1602 :
1603 623 : if (ret != 0 && errno != ENOENT)
1604 0 : ereport(ERROR,
1605 : (errcode_for_file_access(),
1606 : errmsg("could not stat file \"%s\": %m", path)));
1607 :
1608 623 : else if (ret == 0)
1609 : {
1610 : /*
1611 : * somebody else has already serialized to this point, don't overwrite
1612 : * but remember location, so we don't need to read old data again.
1613 : *
1614 : * To be sure it has been synced to disk after the rename() from the
1615 : * tempfile filename to the real filename, we just repeat the fsync.
1616 : * That ought to be cheap because in most scenarios it should already
1617 : * be safely on disk.
1618 : */
1619 288 : fsync_fname(path, false);
1620 288 : fsync_fname(PG_LOGICAL_SNAPSHOTS_DIR, true);
1621 :
1622 288 : builder->last_serialized_snapshot = lsn;
1623 288 : goto out;
1624 : }
1625 :
1626 : /*
1627 : * there is an obvious race condition here between the time we stat(2) the
1628 : * file and us writing the file. But we rename the file into place
1629 : * atomically and all files created need to contain the same data anyway,
1630 : * so this is perfectly fine, although a bit of a resource waste. Locking
1631 : * seems like pointless complication.
1632 : */
1633 335 : elog(DEBUG1, "serializing snapshot to %s", path);
1634 :
1635 : /* to make sure only we will write to this tempfile, include pid */
1636 335 : sprintf(tmppath, "%s/%X-%X.snap.%d.tmp",
1637 : PG_LOGICAL_SNAPSHOTS_DIR,
1638 335 : LSN_FORMAT_ARGS(lsn), MyProcPid);
1639 :
1640 : /*
1641 : * Unlink temporary file if it already exists, needs to have been before a
1642 : * crash/error since we won't enter this function twice from within a
1643 : * single decoding slot/backend and the temporary file contains the pid of
1644 : * the current process.
1645 : */
1646 335 : if (unlink(tmppath) != 0 && errno != ENOENT)
1647 0 : ereport(ERROR,
1648 : (errcode_for_file_access(),
1649 : errmsg("could not remove file \"%s\": %m", tmppath)));
1650 :
1651 335 : old_ctx = MemoryContextSwitchTo(builder->context);
1652 :
1653 : /* Get the catalog modifying transactions that are yet not committed */
1654 335 : catchange_xip = ReorderBufferGetCatalogChangesXacts(builder->reorder);
1655 335 : catchange_xcnt = dclist_count(&builder->reorder->catchange_txns);
1656 :
1657 335 : needed_length = sizeof(SnapBuildOnDisk) +
1658 335 : sizeof(TransactionId) * (builder->committed.xcnt + catchange_xcnt);
1659 :
1660 335 : ondisk_c = palloc0(needed_length);
1661 335 : ondisk = (SnapBuildOnDisk *) ondisk_c;
1662 335 : ondisk->magic = SNAPBUILD_MAGIC;
1663 335 : ondisk->version = SNAPBUILD_VERSION;
1664 335 : ondisk->length = needed_length;
1665 335 : INIT_CRC32C(ondisk->checksum);
1666 335 : COMP_CRC32C(ondisk->checksum,
1667 : ((char *) ondisk) + SnapBuildOnDiskNotChecksummedSize,
1668 : SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
1669 335 : ondisk_c += sizeof(SnapBuildOnDisk);
1670 :
1671 335 : memcpy(&ondisk->builder, builder, sizeof(SnapBuild));
1672 : /* NULL-ify memory-only data */
1673 335 : ondisk->builder.context = NULL;
1674 335 : ondisk->builder.snapshot = NULL;
1675 335 : ondisk->builder.reorder = NULL;
1676 335 : ondisk->builder.committed.xip = NULL;
1677 335 : ondisk->builder.catchange.xip = NULL;
1678 : /* update catchange only on disk data */
1679 335 : ondisk->builder.catchange.xcnt = catchange_xcnt;
1680 :
1681 335 : COMP_CRC32C(ondisk->checksum,
1682 : &ondisk->builder,
1683 : sizeof(SnapBuild));
1684 :
1685 : /* copy committed xacts */
1686 335 : if (builder->committed.xcnt > 0)
1687 : {
1688 62 : sz = sizeof(TransactionId) * builder->committed.xcnt;
1689 62 : memcpy(ondisk_c, builder->committed.xip, sz);
1690 62 : COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
1691 62 : ondisk_c += sz;
1692 : }
1693 :
1694 : /* copy catalog modifying xacts */
1695 335 : if (catchange_xcnt > 0)
1696 : {
1697 9 : sz = sizeof(TransactionId) * catchange_xcnt;
1698 9 : memcpy(ondisk_c, catchange_xip, sz);
1699 9 : COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
1700 9 : ondisk_c += sz;
1701 : }
1702 :
1703 335 : FIN_CRC32C(ondisk->checksum);
1704 :
1705 : /* we have valid data now, open tempfile and write it there */
1706 335 : fd = OpenTransientFile(tmppath,
1707 : O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
1708 335 : if (fd < 0)
1709 0 : ereport(ERROR,
1710 : (errcode_for_file_access(),
1711 : errmsg("could not open file \"%s\": %m", tmppath)));
1712 :
1713 335 : errno = 0;
1714 335 : pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_WRITE);
1715 335 : if ((write(fd, ondisk, needed_length)) != needed_length)
1716 : {
1717 0 : int save_errno = errno;
1718 :
1719 0 : CloseTransientFile(fd);
1720 :
1721 : /* if write didn't set errno, assume problem is no disk space */
1722 0 : errno = save_errno ? save_errno : ENOSPC;
1723 0 : ereport(ERROR,
1724 : (errcode_for_file_access(),
1725 : errmsg("could not write to file \"%s\": %m", tmppath)));
1726 : }
1727 335 : pgstat_report_wait_end();
1728 :
1729 : /*
1730 : * fsync the file before renaming so that even if we crash after this we
1731 : * have either a fully valid file or nothing.
1732 : *
1733 : * It's safe to just ERROR on fsync() here because we'll retry the whole
1734 : * operation including the writes.
1735 : *
1736 : * TODO: Do the fsync() via checkpoints/restartpoints, doing it here has
1737 : * some noticeable overhead since it's performed synchronously during
1738 : * decoding?
1739 : */
1740 335 : pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_SYNC);
1741 335 : if (pg_fsync(fd) != 0)
1742 : {
1743 0 : int save_errno = errno;
1744 :
1745 0 : CloseTransientFile(fd);
1746 0 : errno = save_errno;
1747 0 : ereport(ERROR,
1748 : (errcode_for_file_access(),
1749 : errmsg("could not fsync file \"%s\": %m", tmppath)));
1750 : }
1751 335 : pgstat_report_wait_end();
1752 :
1753 335 : if (CloseTransientFile(fd) != 0)
1754 0 : ereport(ERROR,
1755 : (errcode_for_file_access(),
1756 : errmsg("could not close file \"%s\": %m", tmppath)));
1757 :
1758 335 : fsync_fname(PG_LOGICAL_SNAPSHOTS_DIR, true);
1759 :
1760 : /*
1761 : * We may overwrite the work from some other backend, but that's ok, our
1762 : * snapshot is valid as well, we'll just have done some superfluous work.
1763 : */
1764 335 : if (rename(tmppath, path) != 0)
1765 : {
1766 0 : ereport(ERROR,
1767 : (errcode_for_file_access(),
1768 : errmsg("could not rename file \"%s\" to \"%s\": %m",
1769 : tmppath, path)));
1770 : }
1771 :
1772 : /* make sure we persist */
1773 335 : fsync_fname(path, false);
1774 335 : fsync_fname(PG_LOGICAL_SNAPSHOTS_DIR, true);
1775 :
1776 : /*
1777 : * Now there's no way we can lose the dumped state anymore, remember this
1778 : * as a serialization point.
1779 : */
1780 335 : builder->last_serialized_snapshot = lsn;
1781 :
1782 335 : MemoryContextSwitchTo(old_ctx);
1783 :
1784 623 : out:
1785 623 : ReorderBufferSetRestartPoint(builder->reorder,
1786 : builder->last_serialized_snapshot);
1787 : /* be tidy */
1788 623 : if (ondisk)
1789 335 : pfree(ondisk);
1790 623 : if (catchange_xip)
1791 9 : pfree(catchange_xip);
1792 : }
1793 :
1794 : /*
1795 : * Restore the logical snapshot file contents to 'ondisk'.
1796 : *
1797 : * 'context' is the memory context where the catalog modifying/committed xid
1798 : * will live.
1799 : * If 'missing_ok' is true, will not throw an error if the file is not found.
1800 : */
1801 : bool
1802 21 : SnapBuildRestoreSnapshot(SnapBuildOnDisk *ondisk, XLogRecPtr lsn,
1803 : MemoryContext context, bool missing_ok)
1804 : {
1805 : int fd;
1806 : pg_crc32c checksum;
1807 : Size sz;
1808 : char path[MAXPGPATH];
1809 :
1810 21 : sprintf(path, "%s/%X-%X.snap",
1811 : PG_LOGICAL_SNAPSHOTS_DIR,
1812 21 : LSN_FORMAT_ARGS(lsn));
1813 :
1814 21 : fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
1815 :
1816 21 : if (fd < 0)
1817 : {
1818 11 : if (missing_ok && errno == ENOENT)
1819 11 : return false;
1820 :
1821 0 : ereport(ERROR,
1822 : (errcode_for_file_access(),
1823 : errmsg("could not open file \"%s\": %m", path)));
1824 : }
1825 :
1826 : /* ----
1827 : * Make sure the snapshot had been stored safely to disk, that's normally
1828 : * cheap.
1829 : * Note that we do not need PANIC here, nobody will be able to use the
1830 : * slot without fsyncing, and saving it won't succeed without an fsync()
1831 : * either...
1832 : * ----
1833 : */
1834 10 : fsync_fname(path, false);
1835 10 : fsync_fname(PG_LOGICAL_SNAPSHOTS_DIR, true);
1836 :
1837 : /* read statically sized portion of snapshot */
1838 10 : SnapBuildRestoreContents(fd, ondisk, SnapBuildOnDiskConstantSize, path);
1839 :
1840 10 : if (ondisk->magic != SNAPBUILD_MAGIC)
1841 0 : ereport(ERROR,
1842 : (errcode(ERRCODE_DATA_CORRUPTED),
1843 : errmsg("snapbuild state file \"%s\" has wrong magic number: %u instead of %u",
1844 : path, ondisk->magic, SNAPBUILD_MAGIC)));
1845 :
1846 10 : if (ondisk->version != SNAPBUILD_VERSION)
1847 0 : ereport(ERROR,
1848 : (errcode(ERRCODE_DATA_CORRUPTED),
1849 : errmsg("snapbuild state file \"%s\" has unsupported version: %u instead of %u",
1850 : path, ondisk->version, SNAPBUILD_VERSION)));
1851 :
1852 10 : INIT_CRC32C(checksum);
1853 10 : COMP_CRC32C(checksum,
1854 : ((char *) ondisk) + SnapBuildOnDiskNotChecksummedSize,
1855 : SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
1856 :
1857 : /* read SnapBuild */
1858 10 : SnapBuildRestoreContents(fd, &ondisk->builder, sizeof(SnapBuild), path);
1859 10 : COMP_CRC32C(checksum, &ondisk->builder, sizeof(SnapBuild));
1860 :
1861 : /* restore committed xacts information */
1862 10 : if (ondisk->builder.committed.xcnt > 0)
1863 : {
1864 4 : sz = sizeof(TransactionId) * ondisk->builder.committed.xcnt;
1865 4 : ondisk->builder.committed.xip = MemoryContextAllocZero(context, sz);
1866 4 : SnapBuildRestoreContents(fd, ondisk->builder.committed.xip, sz, path);
1867 4 : COMP_CRC32C(checksum, ondisk->builder.committed.xip, sz);
1868 : }
1869 :
1870 : /* restore catalog modifying xacts information */
1871 10 : if (ondisk->builder.catchange.xcnt > 0)
1872 : {
1873 5 : sz = sizeof(TransactionId) * ondisk->builder.catchange.xcnt;
1874 5 : ondisk->builder.catchange.xip = MemoryContextAllocZero(context, sz);
1875 5 : SnapBuildRestoreContents(fd, ondisk->builder.catchange.xip, sz, path);
1876 5 : COMP_CRC32C(checksum, ondisk->builder.catchange.xip, sz);
1877 : }
1878 :
1879 10 : if (CloseTransientFile(fd) != 0)
1880 0 : ereport(ERROR,
1881 : (errcode_for_file_access(),
1882 : errmsg("could not close file \"%s\": %m", path)));
1883 :
1884 10 : FIN_CRC32C(checksum);
1885 :
1886 : /* verify checksum of what we've read */
1887 10 : if (!EQ_CRC32C(checksum, ondisk->checksum))
1888 0 : ereport(ERROR,
1889 : (errcode(ERRCODE_DATA_CORRUPTED),
1890 : errmsg("checksum mismatch for snapbuild state file \"%s\": is %u, should be %u",
1891 : path, checksum, ondisk->checksum)));
1892 :
1893 10 : return true;
1894 : }
1895 :
1896 : /*
1897 : * Restore a snapshot into 'builder' if previously one has been stored at the
1898 : * location indicated by 'lsn'. Returns true if successful, false otherwise.
1899 : */
1900 : static bool
1901 19 : SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
1902 : {
1903 : SnapBuildOnDisk ondisk;
1904 :
1905 : /* no point in loading a snapshot if we're already there */
1906 19 : if (builder->state == SNAPBUILD_CONSISTENT)
1907 0 : return false;
1908 :
1909 : /* validate and restore the snapshot to 'ondisk' */
1910 19 : if (!SnapBuildRestoreSnapshot(&ondisk, lsn, builder->context, true))
1911 11 : return false;
1912 :
1913 : /*
1914 : * ok, we now have a sensible snapshot here, figure out if it has more
1915 : * information than we have.
1916 : */
1917 :
1918 : /*
1919 : * We are only interested in consistent snapshots for now, comparing
1920 : * whether one incomplete snapshot is more "advanced" seems to be
1921 : * unnecessarily complex.
1922 : */
1923 8 : if (ondisk.builder.state < SNAPBUILD_CONSISTENT)
1924 0 : goto snapshot_not_interesting;
1925 :
1926 : /*
1927 : * Don't use a snapshot that requires an xmin that we cannot guarantee to
1928 : * be available.
1929 : */
1930 8 : if (TransactionIdPrecedes(ondisk.builder.xmin, builder->initial_xmin_horizon))
1931 0 : goto snapshot_not_interesting;
1932 :
1933 : /*
1934 : * Consistent snapshots have no next phase. Reset next_phase_at as it is
1935 : * possible that an old value may remain.
1936 : */
1937 : Assert(ondisk.builder.next_phase_at == InvalidTransactionId);
1938 8 : builder->next_phase_at = InvalidTransactionId;
1939 :
1940 : /* ok, we think the snapshot is sensible, copy over everything important */
1941 8 : builder->xmin = ondisk.builder.xmin;
1942 8 : builder->xmax = ondisk.builder.xmax;
1943 8 : builder->state = ondisk.builder.state;
1944 :
1945 8 : builder->committed.xcnt = ondisk.builder.committed.xcnt;
1946 : /* We only allocated/stored xcnt, not xcnt_space xids ! */
1947 : /* don't overwrite preallocated xip, if we don't have anything here */
1948 8 : if (builder->committed.xcnt > 0)
1949 : {
1950 2 : pfree(builder->committed.xip);
1951 2 : builder->committed.xcnt_space = ondisk.builder.committed.xcnt;
1952 2 : builder->committed.xip = ondisk.builder.committed.xip;
1953 : }
1954 8 : ondisk.builder.committed.xip = NULL;
1955 :
1956 : /* set catalog modifying transactions */
1957 8 : if (builder->catchange.xip)
1958 0 : pfree(builder->catchange.xip);
1959 8 : builder->catchange.xcnt = ondisk.builder.catchange.xcnt;
1960 8 : builder->catchange.xip = ondisk.builder.catchange.xip;
1961 8 : ondisk.builder.catchange.xip = NULL;
1962 :
1963 : /* our snapshot is not interesting anymore, build a new one */
1964 8 : if (builder->snapshot != NULL)
1965 : {
1966 0 : SnapBuildSnapDecRefcount(builder->snapshot);
1967 : }
1968 8 : builder->snapshot = SnapBuildBuildSnapshot(builder);
1969 8 : SnapBuildSnapIncRefcount(builder->snapshot);
1970 :
1971 8 : ReorderBufferSetRestartPoint(builder->reorder, lsn);
1972 :
1973 : Assert(builder->state == SNAPBUILD_CONSISTENT);
1974 :
1975 8 : ereport(LogicalDecodingLogLevel(),
1976 : errmsg("logical decoding found consistent point at %X/%08X",
1977 : LSN_FORMAT_ARGS(lsn)),
1978 : errdetail("Logical decoding will begin using saved snapshot."));
1979 8 : return true;
1980 :
1981 0 : snapshot_not_interesting:
1982 0 : if (ondisk.builder.committed.xip != NULL)
1983 0 : pfree(ondisk.builder.committed.xip);
1984 0 : if (ondisk.builder.catchange.xip != NULL)
1985 0 : pfree(ondisk.builder.catchange.xip);
1986 0 : return false;
1987 : }
1988 :
1989 : /*
1990 : * Read the contents of the serialized snapshot to 'dest'.
1991 : */
1992 : static void
1993 29 : SnapBuildRestoreContents(int fd, void *dest, Size size, const char *path)
1994 : {
1995 : int readBytes;
1996 :
1997 29 : pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ);
1998 29 : readBytes = read(fd, dest, size);
1999 29 : pgstat_report_wait_end();
2000 29 : if (readBytes != size)
2001 : {
2002 0 : int save_errno = errno;
2003 :
2004 0 : CloseTransientFile(fd);
2005 :
2006 0 : if (readBytes < 0)
2007 : {
2008 0 : errno = save_errno;
2009 0 : ereport(ERROR,
2010 : (errcode_for_file_access(),
2011 : errmsg("could not read file \"%s\": %m", path)));
2012 : }
2013 : else
2014 0 : ereport(ERROR,
2015 : (errcode(ERRCODE_DATA_CORRUPTED),
2016 : errmsg("could not read file \"%s\": read %d of %zu",
2017 : path, readBytes, size)));
2018 : }
2019 29 : }
2020 :
2021 : /*
2022 : * Remove all serialized snapshots that are not required anymore because no
2023 : * slot can need them. This doesn't actually have to run during a checkpoint,
2024 : * but it's a convenient point to schedule this.
2025 : *
2026 : * NB: We run this during checkpoints even if logical decoding is disabled so
2027 : * we cleanup old slots at some point after it got disabled.
2028 : */
2029 : void
2030 1939 : CheckPointSnapBuild(void)
2031 : {
2032 : XLogRecPtr cutoff;
2033 : XLogRecPtr redo;
2034 : DIR *snap_dir;
2035 : struct dirent *snap_de;
2036 : char path[MAXPGPATH + sizeof(PG_LOGICAL_SNAPSHOTS_DIR)];
2037 :
2038 : /*
2039 : * We start off with a minimum of the last redo pointer. No new
2040 : * replication slot will start before that, so that's a safe upper bound
2041 : * for removal.
2042 : */
2043 1939 : redo = GetRedoRecPtr();
2044 :
2045 : /* now check for the restart ptrs from existing slots */
2046 1939 : cutoff = ReplicationSlotsComputeLogicalRestartLSN();
2047 :
2048 : /* don't start earlier than the restart lsn */
2049 1939 : if (redo < cutoff)
2050 1 : cutoff = redo;
2051 :
2052 1939 : snap_dir = AllocateDir(PG_LOGICAL_SNAPSHOTS_DIR);
2053 6124 : while ((snap_de = ReadDir(snap_dir, PG_LOGICAL_SNAPSHOTS_DIR)) != NULL)
2054 : {
2055 : uint32 hi;
2056 : uint32 lo;
2057 : XLogRecPtr lsn;
2058 : PGFileType de_type;
2059 :
2060 4185 : if (strcmp(snap_de->d_name, ".") == 0 ||
2061 2246 : strcmp(snap_de->d_name, "..") == 0)
2062 3878 : continue;
2063 :
2064 307 : snprintf(path, sizeof(path), "%s/%s", PG_LOGICAL_SNAPSHOTS_DIR, snap_de->d_name);
2065 307 : de_type = get_dirent_type(path, snap_de, false, DEBUG1);
2066 :
2067 307 : if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_REG)
2068 : {
2069 0 : elog(DEBUG1, "only regular files expected: %s", path);
2070 0 : continue;
2071 : }
2072 :
2073 : /*
2074 : * temporary filenames from SnapBuildSerialize() include the LSN and
2075 : * everything but are postfixed by .$pid.tmp. We can just remove them
2076 : * the same as other files because there can be none that are
2077 : * currently being written that are older than cutoff.
2078 : *
2079 : * We just log a message if a file doesn't fit the pattern, it's
2080 : * probably some editors lock/state file or similar...
2081 : */
2082 307 : if (sscanf(snap_de->d_name, "%X-%X.snap", &hi, &lo) != 2)
2083 : {
2084 0 : ereport(LOG,
2085 : (errmsg("could not parse file name \"%s\"", path)));
2086 0 : continue;
2087 : }
2088 :
2089 307 : lsn = ((uint64) hi) << 32 | lo;
2090 :
2091 : /* check whether we still need it */
2092 307 : if (lsn < cutoff || !XLogRecPtrIsValid(cutoff))
2093 : {
2094 203 : elog(DEBUG1, "removing snapbuild snapshot %s", path);
2095 :
2096 : /*
2097 : * It's not particularly harmful, though strange, if we can't
2098 : * remove the file here. Don't prevent the checkpoint from
2099 : * completing, that'd be a cure worse than the disease.
2100 : */
2101 203 : if (unlink(path) < 0)
2102 : {
2103 0 : ereport(LOG,
2104 : (errcode_for_file_access(),
2105 : errmsg("could not remove file \"%s\": %m",
2106 : path)));
2107 0 : continue;
2108 : }
2109 : }
2110 : }
2111 1939 : FreeDir(snap_dir);
2112 1939 : }
2113 :
2114 : /*
2115 : * Check if a logical snapshot at the specified point has been serialized.
2116 : */
2117 : bool
2118 14 : SnapBuildSnapshotExists(XLogRecPtr lsn)
2119 : {
2120 : char path[MAXPGPATH];
2121 : int ret;
2122 : struct stat stat_buf;
2123 :
2124 14 : sprintf(path, "%s/%X-%X.snap",
2125 : PG_LOGICAL_SNAPSHOTS_DIR,
2126 14 : LSN_FORMAT_ARGS(lsn));
2127 :
2128 14 : ret = stat(path, &stat_buf);
2129 :
2130 14 : if (ret != 0 && errno != ENOENT)
2131 0 : ereport(ERROR,
2132 : (errcode_for_file_access(),
2133 : errmsg("could not stat file \"%s\": %m", path)));
2134 :
2135 14 : return ret == 0;
2136 : }
|