Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * twophase.c
4 : * Two-phase commit support functions.
5 : *
6 : * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/backend/access/transam/twophase.c
11 : *
12 : * NOTES
13 : * Each global transaction is associated with a global transaction
14 : * identifier (GID). The client assigns a GID to a postgres
15 : * transaction with the PREPARE TRANSACTION command.
16 : *
17 : * We keep all active global transactions in a shared memory array.
18 : * When the PREPARE TRANSACTION command is issued, the GID is
19 : * reserved for the transaction in the array. This is done before
20 : * a WAL entry is made, because the reservation checks for duplicate
21 : * GIDs and aborts the transaction if there already is a global
22 : * transaction in prepared state with the same GID.
23 : *
24 : * A global transaction (gxact) also has dummy PGPROC; this is what keeps
25 : * the XID considered running by TransactionIdIsInProgress. It is also
26 : * convenient as a PGPROC to hook the gxact's locks to.
27 : *
28 : * Information to recover prepared transactions in case of crash is
29 : * now stored in WAL for the common case. In some cases there will be
30 : * an extended period between preparing a GXACT and commit/abort, in
31 : * which case we need to separately record prepared transaction data
32 : * in permanent storage. This includes locking information, pending
33 : * notifications etc. All that state information is written to the
34 : * per-transaction state file in the pg_twophase directory.
35 : * All prepared transactions will be written prior to shutdown.
36 : *
37 : * Life track of state data is following:
38 : *
39 : * * On PREPARE TRANSACTION backend writes state data only to the WAL and
40 : * stores pointer to the start of the WAL record in
41 : * gxact->prepare_start_lsn.
42 : * * If COMMIT occurs before checkpoint then backend reads data from WAL
43 : * using prepare_start_lsn.
44 : * * On checkpoint state data copied to files in pg_twophase directory and
45 : * fsynced
46 : * * If COMMIT happens after checkpoint then backend reads state data from
47 : * files
48 : *
49 : * During replay and replication, TwoPhaseState also holds information
50 : * about active prepared transactions that haven't been moved to disk yet.
51 : *
52 : * Replay of twophase records happens by the following rules:
53 : *
54 : * * At the beginning of recovery, pg_twophase is scanned once, filling
55 : * TwoPhaseState with entries marked with gxact->inredo and
56 : * gxact->ondisk. Two-phase file data older than the XID horizon of
57 : * the redo position are discarded.
58 : * * On PREPARE redo, the transaction is added to TwoPhaseState->prepXacts.
59 : * gxact->inredo is set to true for such entries.
60 : * * On Checkpoint we iterate through TwoPhaseState->prepXacts entries
61 : * that have gxact->inredo set and are behind the redo_horizon. We
62 : * save them to disk and then switch gxact->ondisk to true.
63 : * * On COMMIT/ABORT we delete the entry from TwoPhaseState->prepXacts.
64 : * If gxact->ondisk is true, the corresponding entry from the disk
65 : * is additionally deleted.
66 : * * RecoverPreparedTransactions(), StandbyRecoverPreparedTransactions()
67 : * and PrescanPreparedTransactions() have been modified to go through
68 : * gxact->inredo entries that have not made it to disk.
69 : *
70 : *-------------------------------------------------------------------------
71 : */
72 : #include "postgres.h"
73 :
74 : #include <fcntl.h>
75 : #include <sys/stat.h>
76 : #include <time.h>
77 : #include <unistd.h>
78 :
79 : #include "access/commit_ts.h"
80 : #include "access/htup_details.h"
81 : #include "access/subtrans.h"
82 : #include "access/transam.h"
83 : #include "access/twophase.h"
84 : #include "access/twophase_rmgr.h"
85 : #include "access/xact.h"
86 : #include "access/xlog.h"
87 : #include "access/xloginsert.h"
88 : #include "access/xlogreader.h"
89 : #include "access/xlogrecovery.h"
90 : #include "access/xlogutils.h"
91 : #include "catalog/pg_type.h"
92 : #include "catalog/storage.h"
93 : #include "funcapi.h"
94 : #include "miscadmin.h"
95 : #include "pg_trace.h"
96 : #include "pgstat.h"
97 : #include "replication/origin.h"
98 : #include "replication/syncrep.h"
99 : #include "storage/fd.h"
100 : #include "storage/ipc.h"
101 : #include "storage/md.h"
102 : #include "storage/predicate.h"
103 : #include "storage/proc.h"
104 : #include "storage/procarray.h"
105 : #include "utils/builtins.h"
106 : #include "utils/memutils.h"
107 : #include "utils/timestamp.h"
108 :
109 : /*
110 : * Directory where Two-phase commit files reside within PGDATA
111 : */
112 : #define TWOPHASE_DIR "pg_twophase"
113 :
114 : /* GUC variable, can't be changed after startup */
115 : int max_prepared_xacts = 0;
116 :
117 : /*
118 : * This struct describes one global transaction that is in prepared state
119 : * or attempting to become prepared.
120 : *
121 : * The lifecycle of a global transaction is:
122 : *
123 : * 1. After checking that the requested GID is not in use, set up an entry in
124 : * the TwoPhaseState->prepXacts array with the correct GID and valid = false,
125 : * and mark it as locked by my backend.
126 : *
127 : * 2. After successfully completing prepare, set valid = true and enter the
128 : * referenced PGPROC into the global ProcArray.
129 : *
130 : * 3. To begin COMMIT PREPARED or ROLLBACK PREPARED, check that the entry is
131 : * valid and not locked, then mark the entry as locked by storing my current
132 : * proc number into locking_backend. This prevents concurrent attempts to
133 : * commit or rollback the same prepared xact.
134 : *
135 : * 4. On completion of COMMIT PREPARED or ROLLBACK PREPARED, remove the entry
136 : * from the ProcArray and the TwoPhaseState->prepXacts array and return it to
137 : * the freelist.
138 : *
139 : * Note that if the preparing transaction fails between steps 1 and 2, the
140 : * entry must be removed so that the GID and the GlobalTransaction struct
141 : * can be reused. See AtAbort_Twophase().
142 : *
143 : * typedef struct GlobalTransactionData *GlobalTransaction appears in
144 : * twophase.h
145 : */
146 :
147 : typedef struct GlobalTransactionData
148 : {
149 : GlobalTransaction next; /* list link for free list */
150 : int pgprocno; /* ID of associated dummy PGPROC */
151 : TimestampTz prepared_at; /* time of preparation */
152 :
153 : /*
154 : * Note that we need to keep track of two LSNs for each GXACT. We keep
155 : * track of the start LSN because this is the address we must use to read
156 : * state data back from WAL when committing a prepared GXACT. We keep
157 : * track of the end LSN because that is the LSN we need to wait for prior
158 : * to commit.
159 : */
160 : XLogRecPtr prepare_start_lsn; /* XLOG offset of prepare record start */
161 : XLogRecPtr prepare_end_lsn; /* XLOG offset of prepare record end */
162 : TransactionId xid; /* The GXACT id */
163 :
164 : Oid owner; /* ID of user that executed the xact */
165 : ProcNumber locking_backend; /* backend currently working on the xact */
166 : bool valid; /* true if PGPROC entry is in proc array */
167 : bool ondisk; /* true if prepare state file is on disk */
168 : bool inredo; /* true if entry was added via xlog_redo */
169 : char gid[GIDSIZE]; /* The GID assigned to the prepared xact */
170 : } GlobalTransactionData;
171 :
172 : /*
173 : * Two Phase Commit shared state. Access to this struct is protected
174 : * by TwoPhaseStateLock.
175 : */
176 : typedef struct TwoPhaseStateData
177 : {
178 : /* Head of linked list of free GlobalTransactionData structs */
179 : GlobalTransaction freeGXacts;
180 :
181 : /* Number of valid prepXacts entries. */
182 : int numPrepXacts;
183 :
184 : /* There are max_prepared_xacts items in this array */
185 : GlobalTransaction prepXacts[FLEXIBLE_ARRAY_MEMBER];
186 : } TwoPhaseStateData;
187 :
188 : static TwoPhaseStateData *TwoPhaseState;
189 :
190 : /*
191 : * Global transaction entry currently locked by us, if any. Note that any
192 : * access to the entry pointed to by this variable must be protected by
193 : * TwoPhaseStateLock, though obviously the pointer itself doesn't need to be
194 : * (since it's just local memory).
195 : */
196 : static GlobalTransaction MyLockedGxact = NULL;
197 :
198 : static bool twophaseExitRegistered = false;
199 :
200 : static void RecordTransactionCommitPrepared(TransactionId xid,
201 : int nchildren,
202 : TransactionId *children,
203 : int nrels,
204 : RelFileLocator *rels,
205 : int nstats,
206 : xl_xact_stats_item *stats,
207 : int ninvalmsgs,
208 : SharedInvalidationMessage *invalmsgs,
209 : bool initfileinval,
210 : const char *gid);
211 : static void RecordTransactionAbortPrepared(TransactionId xid,
212 : int nchildren,
213 : TransactionId *children,
214 : int nrels,
215 : RelFileLocator *rels,
216 : int nstats,
217 : xl_xact_stats_item *stats,
218 : const char *gid);
219 : static void ProcessRecords(char *bufptr, TransactionId xid,
220 : const TwoPhaseCallback callbacks[]);
221 : static void RemoveGXact(GlobalTransaction gxact);
222 :
223 : static void XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len);
224 : static char *ProcessTwoPhaseBuffer(TransactionId xid,
225 : XLogRecPtr prepare_start_lsn,
226 : bool fromdisk, bool setParent, bool setNextXid);
227 : static void MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid,
228 : const char *gid, TimestampTz prepared_at, Oid owner,
229 : Oid databaseid);
230 : static void RemoveTwoPhaseFile(TransactionId xid, bool giveWarning);
231 : static void RecreateTwoPhaseFile(TransactionId xid, void *content, int len);
232 :
233 : /*
234 : * Initialization of shared memory
235 : */
236 : Size
237 5436 : TwoPhaseShmemSize(void)
238 : {
239 : Size size;
240 :
241 : /* Need the fixed struct, the array of pointers, and the GTD structs */
242 5436 : size = offsetof(TwoPhaseStateData, prepXacts);
243 5436 : size = add_size(size, mul_size(max_prepared_xacts,
244 : sizeof(GlobalTransaction)));
245 5436 : size = MAXALIGN(size);
246 5436 : size = add_size(size, mul_size(max_prepared_xacts,
247 : sizeof(GlobalTransactionData)));
248 :
249 5436 : return size;
250 : }
251 :
252 : void
253 1902 : TwoPhaseShmemInit(void)
254 : {
255 : bool found;
256 :
257 1902 : TwoPhaseState = ShmemInitStruct("Prepared Transaction Table",
258 : TwoPhaseShmemSize(),
259 : &found);
260 1902 : if (!IsUnderPostmaster)
261 : {
262 : GlobalTransaction gxacts;
263 : int i;
264 :
265 : Assert(!found);
266 1902 : TwoPhaseState->freeGXacts = NULL;
267 1902 : TwoPhaseState->numPrepXacts = 0;
268 :
269 : /*
270 : * Initialize the linked list of free GlobalTransactionData structs
271 : */
272 1902 : gxacts = (GlobalTransaction)
273 1902 : ((char *) TwoPhaseState +
274 1902 : MAXALIGN(offsetof(TwoPhaseStateData, prepXacts) +
275 : sizeof(GlobalTransaction) * max_prepared_xacts));
276 3552 : for (i = 0; i < max_prepared_xacts; i++)
277 : {
278 : /* insert into linked list */
279 1650 : gxacts[i].next = TwoPhaseState->freeGXacts;
280 1650 : TwoPhaseState->freeGXacts = &gxacts[i];
281 :
282 : /* associate it with a PGPROC assigned by InitProcGlobal */
283 1650 : gxacts[i].pgprocno = GetNumberFromPGProc(&PreparedXactProcs[i]);
284 : }
285 : }
286 : else
287 : Assert(found);
288 1902 : }
289 :
290 : /*
291 : * Exit hook to unlock the global transaction entry we're working on.
292 : */
293 : static void
294 268 : AtProcExit_Twophase(int code, Datum arg)
295 : {
296 : /* same logic as abort */
297 268 : AtAbort_Twophase();
298 268 : }
299 :
300 : /*
301 : * Abort hook to unlock the global transaction entry we're working on.
302 : */
303 : void
304 46816 : AtAbort_Twophase(void)
305 : {
306 46816 : if (MyLockedGxact == NULL)
307 46812 : return;
308 :
309 : /*
310 : * What to do with the locked global transaction entry? If we were in the
311 : * process of preparing the transaction, but haven't written the WAL
312 : * record and state file yet, the transaction must not be considered as
313 : * prepared. Likewise, if we are in the process of finishing an
314 : * already-prepared transaction, and fail after having already written the
315 : * 2nd phase commit or rollback record to the WAL, the transaction should
316 : * not be considered as prepared anymore. In those cases, just remove the
317 : * entry from shared memory.
318 : *
319 : * Otherwise, the entry must be left in place so that the transaction can
320 : * be finished later, so just unlock it.
321 : *
322 : * If we abort during prepare, after having written the WAL record, we
323 : * might not have transferred all locks and other state to the prepared
324 : * transaction yet. Likewise, if we abort during commit or rollback,
325 : * after having written the WAL record, we might not have released all the
326 : * resources held by the transaction yet. In those cases, the in-memory
327 : * state can be wrong, but it's too late to back out.
328 : */
329 4 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
330 4 : if (!MyLockedGxact->valid)
331 4 : RemoveGXact(MyLockedGxact);
332 : else
333 0 : MyLockedGxact->locking_backend = INVALID_PROC_NUMBER;
334 4 : LWLockRelease(TwoPhaseStateLock);
335 :
336 4 : MyLockedGxact = NULL;
337 : }
338 :
339 : /*
340 : * This is called after we have finished transferring state to the prepared
341 : * PGPROC entry.
342 : */
343 : void
344 822 : PostPrepare_Twophase(void)
345 : {
346 822 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
347 822 : MyLockedGxact->locking_backend = INVALID_PROC_NUMBER;
348 822 : LWLockRelease(TwoPhaseStateLock);
349 :
350 822 : MyLockedGxact = NULL;
351 822 : }
352 :
353 :
354 : /*
355 : * MarkAsPreparing
356 : * Reserve the GID for the given transaction.
357 : */
358 : GlobalTransaction
359 786 : MarkAsPreparing(TransactionId xid, const char *gid,
360 : TimestampTz prepared_at, Oid owner, Oid databaseid)
361 : {
362 : GlobalTransaction gxact;
363 : int i;
364 :
365 786 : if (strlen(gid) >= GIDSIZE)
366 0 : ereport(ERROR,
367 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
368 : errmsg("transaction identifier \"%s\" is too long",
369 : gid)));
370 :
371 : /* fail immediately if feature is disabled */
372 786 : if (max_prepared_xacts == 0)
373 20 : ereport(ERROR,
374 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
375 : errmsg("prepared transactions are disabled"),
376 : errhint("Set \"max_prepared_transactions\" to a nonzero value.")));
377 :
378 : /* on first call, register the exit hook */
379 766 : if (!twophaseExitRegistered)
380 : {
381 144 : before_shmem_exit(AtProcExit_Twophase, 0);
382 144 : twophaseExitRegistered = true;
383 : }
384 :
385 766 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
386 :
387 : /* Check for conflicting GID */
388 1472 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
389 : {
390 710 : gxact = TwoPhaseState->prepXacts[i];
391 710 : if (strcmp(gxact->gid, gid) == 0)
392 : {
393 4 : ereport(ERROR,
394 : (errcode(ERRCODE_DUPLICATE_OBJECT),
395 : errmsg("transaction identifier \"%s\" is already in use",
396 : gid)));
397 : }
398 : }
399 :
400 : /* Get a free gxact from the freelist */
401 762 : if (TwoPhaseState->freeGXacts == NULL)
402 0 : ereport(ERROR,
403 : (errcode(ERRCODE_OUT_OF_MEMORY),
404 : errmsg("maximum number of prepared transactions reached"),
405 : errhint("Increase \"max_prepared_transactions\" (currently %d).",
406 : max_prepared_xacts)));
407 762 : gxact = TwoPhaseState->freeGXacts;
408 762 : TwoPhaseState->freeGXacts = gxact->next;
409 :
410 762 : MarkAsPreparingGuts(gxact, xid, gid, prepared_at, owner, databaseid);
411 :
412 762 : gxact->ondisk = false;
413 :
414 : /* And insert it into the active array */
415 : Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
416 762 : TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;
417 :
418 762 : LWLockRelease(TwoPhaseStateLock);
419 :
420 762 : return gxact;
421 : }
422 :
423 : /*
424 : * MarkAsPreparingGuts
425 : *
426 : * This uses a gxact struct and puts it into the active array.
427 : * NOTE: this is also used when reloading a gxact after a crash; so avoid
428 : * assuming that we can use very much backend context.
429 : *
430 : * Note: This function should be called with appropriate locks held.
431 : */
432 : static void
433 826 : MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid, const char *gid,
434 : TimestampTz prepared_at, Oid owner, Oid databaseid)
435 : {
436 : PGPROC *proc;
437 : int i;
438 :
439 : Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
440 :
441 : Assert(gxact != NULL);
442 826 : proc = GetPGProcByNumber(gxact->pgprocno);
443 :
444 : /* Initialize the PGPROC entry */
445 86730 : MemSet(proc, 0, sizeof(PGPROC));
446 826 : dlist_node_init(&proc->links);
447 826 : proc->waitStatus = PROC_WAIT_STATUS_OK;
448 826 : if (LocalTransactionIdIsValid(MyProc->vxid.lxid))
449 : {
450 : /* clone VXID, for TwoPhaseGetXidByVirtualXID() to find */
451 762 : proc->vxid.lxid = MyProc->vxid.lxid;
452 762 : proc->vxid.procNumber = MyProcNumber;
453 : }
454 : else
455 : {
456 : Assert(AmStartupProcess() || !IsPostmasterEnvironment);
457 : /* GetLockConflicts() uses this to specify a wait on the XID */
458 64 : proc->vxid.lxid = xid;
459 64 : proc->vxid.procNumber = INVALID_PROC_NUMBER;
460 : }
461 826 : proc->xid = xid;
462 : Assert(proc->xmin == InvalidTransactionId);
463 826 : proc->delayChkptFlags = 0;
464 826 : proc->statusFlags = 0;
465 826 : proc->pid = 0;
466 826 : proc->databaseId = databaseid;
467 826 : proc->roleId = owner;
468 826 : proc->tempNamespaceId = InvalidOid;
469 826 : proc->isBackgroundWorker = false;
470 826 : proc->lwWaiting = LW_WS_NOT_WAITING;
471 826 : proc->lwWaitMode = 0;
472 826 : proc->waitLock = NULL;
473 826 : proc->waitProcLock = NULL;
474 826 : pg_atomic_init_u64(&proc->waitStart, 0);
475 14042 : for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
476 13216 : dlist_init(&proc->myProcLocks[i]);
477 : /* subxid data must be filled later by GXactLoadSubxactData */
478 826 : proc->subxidStatus.overflowed = false;
479 826 : proc->subxidStatus.count = 0;
480 :
481 826 : gxact->prepared_at = prepared_at;
482 826 : gxact->xid = xid;
483 826 : gxact->owner = owner;
484 826 : gxact->locking_backend = MyProcNumber;
485 826 : gxact->valid = false;
486 826 : gxact->inredo = false;
487 826 : strcpy(gxact->gid, gid);
488 :
489 : /*
490 : * Remember that we have this GlobalTransaction entry locked for us. If we
491 : * abort after this, we must release it.
492 : */
493 826 : MyLockedGxact = gxact;
494 826 : }
495 :
496 : /*
497 : * GXactLoadSubxactData
498 : *
499 : * If the transaction being persisted had any subtransactions, this must
500 : * be called before MarkAsPrepared() to load information into the dummy
501 : * PGPROC.
502 : */
503 : static void
504 348 : GXactLoadSubxactData(GlobalTransaction gxact, int nsubxacts,
505 : TransactionId *children)
506 : {
507 348 : PGPROC *proc = GetPGProcByNumber(gxact->pgprocno);
508 :
509 : /* We need no extra lock since the GXACT isn't valid yet */
510 348 : if (nsubxacts > PGPROC_MAX_CACHED_SUBXIDS)
511 : {
512 8 : proc->subxidStatus.overflowed = true;
513 8 : nsubxacts = PGPROC_MAX_CACHED_SUBXIDS;
514 : }
515 348 : if (nsubxacts > 0)
516 : {
517 316 : memcpy(proc->subxids.xids, children,
518 : nsubxacts * sizeof(TransactionId));
519 316 : proc->subxidStatus.count = nsubxacts;
520 : }
521 348 : }
522 :
523 : /*
524 : * MarkAsPrepared
525 : * Mark the GXACT as fully valid, and enter it into the global ProcArray.
526 : *
527 : * lock_held indicates whether caller already holds TwoPhaseStateLock.
528 : */
529 : static void
530 822 : MarkAsPrepared(GlobalTransaction gxact, bool lock_held)
531 : {
532 : /* Lock here may be overkill, but I'm not convinced of that ... */
533 822 : if (!lock_held)
534 758 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
535 : Assert(!gxact->valid);
536 822 : gxact->valid = true;
537 822 : if (!lock_held)
538 758 : LWLockRelease(TwoPhaseStateLock);
539 :
540 : /*
541 : * Put it into the global ProcArray so TransactionIdIsInProgress considers
542 : * the XID as still running.
543 : */
544 822 : ProcArrayAdd(GetPGProcByNumber(gxact->pgprocno));
545 822 : }
546 :
547 : /*
548 : * LockGXact
549 : * Locate the prepared transaction and mark it busy for COMMIT or PREPARE.
550 : */
551 : static GlobalTransaction
552 784 : LockGXact(const char *gid, Oid user)
553 : {
554 : int i;
555 :
556 : /* on first call, register the exit hook */
557 784 : if (!twophaseExitRegistered)
558 : {
559 124 : before_shmem_exit(AtProcExit_Twophase, 0);
560 124 : twophaseExitRegistered = true;
561 : }
562 :
563 784 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
564 :
565 1374 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
566 : {
567 1362 : GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
568 1362 : PGPROC *proc = GetPGProcByNumber(gxact->pgprocno);
569 :
570 : /* Ignore not-yet-valid GIDs */
571 1362 : if (!gxact->valid)
572 0 : continue;
573 1362 : if (strcmp(gxact->gid, gid) != 0)
574 590 : continue;
575 :
576 : /* Found it, but has someone else got it locked? */
577 772 : if (gxact->locking_backend != INVALID_PROC_NUMBER)
578 0 : ereport(ERROR,
579 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
580 : errmsg("prepared transaction with identifier \"%s\" is busy",
581 : gid)));
582 :
583 772 : if (user != gxact->owner && !superuser_arg(user))
584 0 : ereport(ERROR,
585 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
586 : errmsg("permission denied to finish prepared transaction"),
587 : errhint("Must be superuser or the user that prepared the transaction.")));
588 :
589 : /*
590 : * Note: it probably would be possible to allow committing from
591 : * another database; but at the moment NOTIFY is known not to work and
592 : * there may be some other issues as well. Hence disallow until
593 : * someone gets motivated to make it work.
594 : */
595 772 : if (MyDatabaseId != proc->databaseId)
596 0 : ereport(ERROR,
597 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
598 : errmsg("prepared transaction belongs to another database"),
599 : errhint("Connect to the database where the transaction was prepared to finish it.")));
600 :
601 : /* OK for me to lock it */
602 772 : gxact->locking_backend = MyProcNumber;
603 772 : MyLockedGxact = gxact;
604 :
605 772 : LWLockRelease(TwoPhaseStateLock);
606 :
607 772 : return gxact;
608 : }
609 :
610 12 : LWLockRelease(TwoPhaseStateLock);
611 :
612 12 : ereport(ERROR,
613 : (errcode(ERRCODE_UNDEFINED_OBJECT),
614 : errmsg("prepared transaction with identifier \"%s\" does not exist",
615 : gid)));
616 :
617 : /* NOTREACHED */
618 : return NULL;
619 : }
620 :
621 : /*
622 : * RemoveGXact
623 : * Remove the prepared transaction from the shared memory array.
624 : *
625 : * NB: caller should have already removed it from ProcArray
626 : */
627 : static void
628 888 : RemoveGXact(GlobalTransaction gxact)
629 : {
630 : int i;
631 :
632 : Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
633 :
634 1470 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
635 : {
636 1470 : if (gxact == TwoPhaseState->prepXacts[i])
637 : {
638 : /* remove from the active array */
639 888 : TwoPhaseState->numPrepXacts--;
640 888 : TwoPhaseState->prepXacts[i] = TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts];
641 :
642 : /* and put it back in the freelist */
643 888 : gxact->next = TwoPhaseState->freeGXacts;
644 888 : TwoPhaseState->freeGXacts = gxact;
645 :
646 888 : return;
647 : }
648 : }
649 :
650 0 : elog(ERROR, "failed to find %p in GlobalTransaction array", gxact);
651 : }
652 :
653 : /*
654 : * Returns an array of all prepared transactions for the user-level
655 : * function pg_prepared_xact.
656 : *
657 : * The returned array and all its elements are copies of internal data
658 : * structures, to minimize the time we need to hold the TwoPhaseStateLock.
659 : *
660 : * WARNING -- we return even those transactions that are not fully prepared
661 : * yet. The caller should filter them out if he doesn't want them.
662 : *
663 : * The returned array is palloc'd.
664 : */
665 : static int
666 204 : GetPreparedTransactionList(GlobalTransaction *gxacts)
667 : {
668 : GlobalTransaction array;
669 : int num;
670 : int i;
671 :
672 204 : LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
673 :
674 204 : if (TwoPhaseState->numPrepXacts == 0)
675 : {
676 122 : LWLockRelease(TwoPhaseStateLock);
677 :
678 122 : *gxacts = NULL;
679 122 : return 0;
680 : }
681 :
682 82 : num = TwoPhaseState->numPrepXacts;
683 82 : array = (GlobalTransaction) palloc(sizeof(GlobalTransactionData) * num);
684 82 : *gxacts = array;
685 174 : for (i = 0; i < num; i++)
686 92 : memcpy(array + i, TwoPhaseState->prepXacts[i],
687 : sizeof(GlobalTransactionData));
688 :
689 82 : LWLockRelease(TwoPhaseStateLock);
690 :
691 82 : return num;
692 : }
693 :
694 :
695 : /* Working status for pg_prepared_xact */
696 : typedef struct
697 : {
698 : GlobalTransaction array;
699 : int ngxacts;
700 : int currIdx;
701 : } Working_State;
702 :
703 : /*
704 : * pg_prepared_xact
705 : * Produce a view with one row per prepared transaction.
706 : *
707 : * This function is here so we don't have to export the
708 : * GlobalTransactionData struct definition.
709 : */
710 : Datum
711 296 : pg_prepared_xact(PG_FUNCTION_ARGS)
712 : {
713 : FuncCallContext *funcctx;
714 : Working_State *status;
715 :
716 296 : if (SRF_IS_FIRSTCALL())
717 : {
718 : TupleDesc tupdesc;
719 : MemoryContext oldcontext;
720 :
721 : /* create a function context for cross-call persistence */
722 204 : funcctx = SRF_FIRSTCALL_INIT();
723 :
724 : /*
725 : * Switch to memory context appropriate for multiple function calls
726 : */
727 204 : oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
728 :
729 : /* build tupdesc for result tuples */
730 : /* this had better match pg_prepared_xacts view in system_views.sql */
731 204 : tupdesc = CreateTemplateTupleDesc(5);
732 204 : TupleDescInitEntry(tupdesc, (AttrNumber) 1, "transaction",
733 : XIDOID, -1, 0);
734 204 : TupleDescInitEntry(tupdesc, (AttrNumber) 2, "gid",
735 : TEXTOID, -1, 0);
736 204 : TupleDescInitEntry(tupdesc, (AttrNumber) 3, "prepared",
737 : TIMESTAMPTZOID, -1, 0);
738 204 : TupleDescInitEntry(tupdesc, (AttrNumber) 4, "ownerid",
739 : OIDOID, -1, 0);
740 204 : TupleDescInitEntry(tupdesc, (AttrNumber) 5, "dbid",
741 : OIDOID, -1, 0);
742 :
743 204 : funcctx->tuple_desc = BlessTupleDesc(tupdesc);
744 :
745 : /*
746 : * Collect all the 2PC status information that we will format and send
747 : * out as a result set.
748 : */
749 204 : status = (Working_State *) palloc(sizeof(Working_State));
750 204 : funcctx->user_fctx = (void *) status;
751 :
752 204 : status->ngxacts = GetPreparedTransactionList(&status->array);
753 204 : status->currIdx = 0;
754 :
755 204 : MemoryContextSwitchTo(oldcontext);
756 : }
757 :
758 296 : funcctx = SRF_PERCALL_SETUP();
759 296 : status = (Working_State *) funcctx->user_fctx;
760 :
761 296 : while (status->array != NULL && status->currIdx < status->ngxacts)
762 : {
763 92 : GlobalTransaction gxact = &status->array[status->currIdx++];
764 92 : PGPROC *proc = GetPGProcByNumber(gxact->pgprocno);
765 92 : Datum values[5] = {0};
766 92 : bool nulls[5] = {0};
767 : HeapTuple tuple;
768 : Datum result;
769 :
770 92 : if (!gxact->valid)
771 0 : continue;
772 :
773 : /*
774 : * Form tuple with appropriate data.
775 : */
776 :
777 92 : values[0] = TransactionIdGetDatum(proc->xid);
778 92 : values[1] = CStringGetTextDatum(gxact->gid);
779 92 : values[2] = TimestampTzGetDatum(gxact->prepared_at);
780 92 : values[3] = ObjectIdGetDatum(gxact->owner);
781 92 : values[4] = ObjectIdGetDatum(proc->databaseId);
782 :
783 92 : tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
784 92 : result = HeapTupleGetDatum(tuple);
785 92 : SRF_RETURN_NEXT(funcctx, result);
786 : }
787 :
788 204 : SRF_RETURN_DONE(funcctx);
789 : }
790 :
791 : /*
792 : * TwoPhaseGetGXact
793 : * Get the GlobalTransaction struct for a prepared transaction
794 : * specified by XID
795 : *
796 : * If lock_held is set to true, TwoPhaseStateLock will not be taken, so the
797 : * caller had better hold it.
798 : */
799 : static GlobalTransaction
800 2948 : TwoPhaseGetGXact(TransactionId xid, bool lock_held)
801 : {
802 2948 : GlobalTransaction result = NULL;
803 : int i;
804 :
805 : static TransactionId cached_xid = InvalidTransactionId;
806 : static GlobalTransaction cached_gxact = NULL;
807 :
808 : Assert(!lock_held || LWLockHeldByMe(TwoPhaseStateLock));
809 :
810 : /*
811 : * During a recovery, COMMIT PREPARED, or ABORT PREPARED, we'll be called
812 : * repeatedly for the same XID. We can save work with a simple cache.
813 : */
814 2948 : if (xid == cached_xid)
815 1984 : return cached_gxact;
816 :
817 964 : if (!lock_held)
818 822 : LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
819 :
820 1676 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
821 : {
822 1676 : GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
823 :
824 1676 : if (gxact->xid == xid)
825 : {
826 964 : result = gxact;
827 964 : break;
828 : }
829 : }
830 :
831 964 : if (!lock_held)
832 822 : LWLockRelease(TwoPhaseStateLock);
833 :
834 964 : if (result == NULL) /* should not happen */
835 0 : elog(ERROR, "failed to find GlobalTransaction for xid %u", xid);
836 :
837 964 : cached_xid = xid;
838 964 : cached_gxact = result;
839 :
840 964 : return result;
841 : }
842 :
843 : /*
844 : * TwoPhaseGetXidByVirtualXID
845 : * Lookup VXID among xacts prepared since last startup.
846 : *
847 : * (This won't find recovered xacts.) If more than one matches, return any
848 : * and set "have_more" to true. To witness multiple matches, a single
849 : * proc number must consume 2^32 LXIDs, with no intervening database restart.
850 : */
851 : TransactionId
852 184 : TwoPhaseGetXidByVirtualXID(VirtualTransactionId vxid,
853 : bool *have_more)
854 : {
855 : int i;
856 184 : TransactionId result = InvalidTransactionId;
857 :
858 : Assert(VirtualTransactionIdIsValid(vxid));
859 184 : LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
860 :
861 290 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
862 : {
863 106 : GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
864 : PGPROC *proc;
865 : VirtualTransactionId proc_vxid;
866 :
867 106 : if (!gxact->valid)
868 2 : continue;
869 104 : proc = GetPGProcByNumber(gxact->pgprocno);
870 104 : GET_VXID_FROM_PGPROC(proc_vxid, *proc);
871 104 : if (VirtualTransactionIdEquals(vxid, proc_vxid))
872 : {
873 : /*
874 : * Startup process sets proc->vxid.procNumber to
875 : * INVALID_PROC_NUMBER.
876 : */
877 : Assert(!gxact->inredo);
878 :
879 24 : if (result != InvalidTransactionId)
880 : {
881 0 : *have_more = true;
882 0 : break;
883 : }
884 24 : result = gxact->xid;
885 : }
886 : }
887 :
888 184 : LWLockRelease(TwoPhaseStateLock);
889 :
890 184 : return result;
891 : }
892 :
893 : /*
894 : * TwoPhaseGetDummyProcNumber
895 : * Get the dummy proc number for prepared transaction specified by XID
896 : *
897 : * Dummy proc numbers are similar to proc numbers of real backends. They
898 : * start at MaxBackends, and are unique across all currently active real
899 : * backends and prepared transactions. If lock_held is set to true,
900 : * TwoPhaseStateLock will not be taken, so the caller had better hold it.
901 : */
902 : ProcNumber
903 220 : TwoPhaseGetDummyProcNumber(TransactionId xid, bool lock_held)
904 : {
905 220 : GlobalTransaction gxact = TwoPhaseGetGXact(xid, lock_held);
906 :
907 220 : return gxact->pgprocno;
908 : }
909 :
910 : /*
911 : * TwoPhaseGetDummyProc
912 : * Get the PGPROC that represents a prepared transaction specified by XID
913 : *
914 : * If lock_held is set to true, TwoPhaseStateLock will not be taken, so the
915 : * caller had better hold it.
916 : */
917 : PGPROC *
918 2728 : TwoPhaseGetDummyProc(TransactionId xid, bool lock_held)
919 : {
920 2728 : GlobalTransaction gxact = TwoPhaseGetGXact(xid, lock_held);
921 :
922 2728 : return GetPGProcByNumber(gxact->pgprocno);
923 : }
924 :
925 : /************************************************************************/
926 : /* State file support */
927 : /************************************************************************/
928 :
929 : /*
930 : * Compute the FullTransactionId for the given TransactionId.
931 : *
932 : * The wrap logic is safe here because the span of active xids cannot exceed one
933 : * epoch at any given time.
934 : */
935 : static inline FullTransactionId
936 946 : AdjustToFullTransactionId(TransactionId xid)
937 : {
938 : FullTransactionId nextFullXid;
939 : TransactionId nextXid;
940 : uint32 epoch;
941 :
942 : Assert(TransactionIdIsValid(xid));
943 :
944 946 : LWLockAcquire(XidGenLock, LW_SHARED);
945 946 : nextFullXid = TransamVariables->nextXid;
946 946 : LWLockRelease(XidGenLock);
947 :
948 946 : nextXid = XidFromFullTransactionId(nextFullXid);
949 946 : epoch = EpochFromFullTransactionId(nextFullXid);
950 946 : if (unlikely(xid > nextXid))
951 : {
952 : /* Wraparound occurred, must be from a prev epoch. */
953 : Assert(epoch > 0);
954 0 : epoch--;
955 : }
956 :
957 946 : return FullTransactionIdFromEpochAndXid(epoch, xid);
958 : }
959 :
960 : static inline int
961 946 : TwoPhaseFilePath(char *path, TransactionId xid)
962 : {
963 946 : FullTransactionId fxid = AdjustToFullTransactionId(xid);
964 :
965 1892 : return snprintf(path, MAXPGPATH, TWOPHASE_DIR "/%08X%08X",
966 946 : EpochFromFullTransactionId(fxid),
967 946 : XidFromFullTransactionId(fxid));
968 : }
969 :
970 : /*
971 : * 2PC state file format:
972 : *
973 : * 1. TwoPhaseFileHeader
974 : * 2. TransactionId[] (subtransactions)
975 : * 3. RelFileLocator[] (files to be deleted at commit)
976 : * 4. RelFileLocator[] (files to be deleted at abort)
977 : * 5. SharedInvalidationMessage[] (inval messages to be sent at commit)
978 : * 6. TwoPhaseRecordOnDisk
979 : * 7. ...
980 : * 8. TwoPhaseRecordOnDisk (end sentinel, rmid == TWOPHASE_RM_END_ID)
981 : * 9. checksum (CRC-32C)
982 : *
983 : * Each segment except the final checksum is MAXALIGN'd.
984 : */
985 :
986 : /*
987 : * Header for a 2PC state file
988 : */
989 : #define TWOPHASE_MAGIC 0x57F94534 /* format identifier */
990 :
991 : typedef xl_xact_prepare TwoPhaseFileHeader;
992 :
993 : /*
994 : * Header for each record in a state file
995 : *
996 : * NOTE: len counts only the rmgr data, not the TwoPhaseRecordOnDisk header.
997 : * The rmgr data will be stored starting on a MAXALIGN boundary.
998 : */
999 : typedef struct TwoPhaseRecordOnDisk
1000 : {
1001 : uint32 len; /* length of rmgr data */
1002 : TwoPhaseRmgrId rmid; /* resource manager for this record */
1003 : uint16 info; /* flag bits for use by rmgr */
1004 : } TwoPhaseRecordOnDisk;
1005 :
1006 : /*
1007 : * During prepare, the state file is assembled in memory before writing it
1008 : * to WAL and the actual state file. We use a chain of StateFileChunk blocks
1009 : * for that.
1010 : */
1011 : typedef struct StateFileChunk
1012 : {
1013 : char *data;
1014 : uint32 len;
1015 : struct StateFileChunk *next;
1016 : } StateFileChunk;
1017 :
1018 : static struct xllist
1019 : {
1020 : StateFileChunk *head; /* first data block in the chain */
1021 : StateFileChunk *tail; /* last block in chain */
1022 : uint32 num_chunks;
1023 : uint32 bytes_free; /* free bytes left in tail block */
1024 : uint32 total_len; /* total data bytes in chain */
1025 : } records;
1026 :
1027 :
1028 : /*
1029 : * Append a block of data to records data structure.
1030 : *
1031 : * NB: each block is padded to a MAXALIGN multiple. This must be
1032 : * accounted for when the file is later read!
1033 : *
1034 : * The data is copied, so the caller is free to modify it afterwards.
1035 : */
1036 : static void
1037 8264 : save_state_data(const void *data, uint32 len)
1038 : {
1039 8264 : uint32 padlen = MAXALIGN(len);
1040 :
1041 8264 : if (padlen > records.bytes_free)
1042 : {
1043 86 : records.tail->next = palloc0(sizeof(StateFileChunk));
1044 86 : records.tail = records.tail->next;
1045 86 : records.tail->len = 0;
1046 86 : records.tail->next = NULL;
1047 86 : records.num_chunks++;
1048 :
1049 86 : records.bytes_free = Max(padlen, 512);
1050 86 : records.tail->data = palloc(records.bytes_free);
1051 : }
1052 :
1053 8264 : memcpy(((char *) records.tail->data) + records.tail->len, data, len);
1054 8264 : records.tail->len += padlen;
1055 8264 : records.bytes_free -= padlen;
1056 8264 : records.total_len += padlen;
1057 8264 : }
1058 :
1059 : /*
1060 : * Start preparing a state file.
1061 : *
1062 : * Initializes data structure and inserts the 2PC file header record.
1063 : */
1064 : void
1065 762 : StartPrepare(GlobalTransaction gxact)
1066 : {
1067 762 : PGPROC *proc = GetPGProcByNumber(gxact->pgprocno);
1068 762 : TransactionId xid = gxact->xid;
1069 : TwoPhaseFileHeader hdr;
1070 : TransactionId *children;
1071 : RelFileLocator *commitrels;
1072 : RelFileLocator *abortrels;
1073 762 : xl_xact_stats_item *abortstats = NULL;
1074 762 : xl_xact_stats_item *commitstats = NULL;
1075 : SharedInvalidationMessage *invalmsgs;
1076 :
1077 : /* Initialize linked list */
1078 762 : records.head = palloc0(sizeof(StateFileChunk));
1079 762 : records.head->len = 0;
1080 762 : records.head->next = NULL;
1081 :
1082 762 : records.bytes_free = Max(sizeof(TwoPhaseFileHeader), 512);
1083 762 : records.head->data = palloc(records.bytes_free);
1084 :
1085 762 : records.tail = records.head;
1086 762 : records.num_chunks = 1;
1087 :
1088 762 : records.total_len = 0;
1089 :
1090 : /* Create header */
1091 762 : hdr.magic = TWOPHASE_MAGIC;
1092 762 : hdr.total_len = 0; /* EndPrepare will fill this in */
1093 762 : hdr.xid = xid;
1094 762 : hdr.database = proc->databaseId;
1095 762 : hdr.prepared_at = gxact->prepared_at;
1096 762 : hdr.owner = gxact->owner;
1097 762 : hdr.nsubxacts = xactGetCommittedChildren(&children);
1098 762 : hdr.ncommitrels = smgrGetPendingDeletes(true, &commitrels);
1099 762 : hdr.nabortrels = smgrGetPendingDeletes(false, &abortrels);
1100 762 : hdr.ncommitstats =
1101 762 : pgstat_get_transactional_drops(true, &commitstats);
1102 762 : hdr.nabortstats =
1103 762 : pgstat_get_transactional_drops(false, &abortstats);
1104 762 : hdr.ninvalmsgs = xactGetCommittedInvalidationMessages(&invalmsgs,
1105 : &hdr.initfileinval);
1106 762 : hdr.gidlen = strlen(gxact->gid) + 1; /* Include '\0' */
1107 : /* EndPrepare will fill the origin data, if necessary */
1108 762 : hdr.origin_lsn = InvalidXLogRecPtr;
1109 762 : hdr.origin_timestamp = 0;
1110 :
1111 762 : save_state_data(&hdr, sizeof(TwoPhaseFileHeader));
1112 762 : save_state_data(gxact->gid, hdr.gidlen);
1113 :
1114 : /*
1115 : * Add the additional info about subxacts, deletable files and cache
1116 : * invalidation messages.
1117 : */
1118 762 : if (hdr.nsubxacts > 0)
1119 : {
1120 284 : save_state_data(children, hdr.nsubxacts * sizeof(TransactionId));
1121 : /* While we have the child-xact data, stuff it in the gxact too */
1122 284 : GXactLoadSubxactData(gxact, hdr.nsubxacts, children);
1123 : }
1124 762 : if (hdr.ncommitrels > 0)
1125 : {
1126 18 : save_state_data(commitrels, hdr.ncommitrels * sizeof(RelFileLocator));
1127 18 : pfree(commitrels);
1128 : }
1129 762 : if (hdr.nabortrels > 0)
1130 : {
1131 34 : save_state_data(abortrels, hdr.nabortrels * sizeof(RelFileLocator));
1132 34 : pfree(abortrels);
1133 : }
1134 762 : if (hdr.ncommitstats > 0)
1135 : {
1136 18 : save_state_data(commitstats,
1137 18 : hdr.ncommitstats * sizeof(xl_xact_stats_item));
1138 18 : pfree(commitstats);
1139 : }
1140 762 : if (hdr.nabortstats > 0)
1141 : {
1142 26 : save_state_data(abortstats,
1143 26 : hdr.nabortstats * sizeof(xl_xact_stats_item));
1144 26 : pfree(abortstats);
1145 : }
1146 762 : if (hdr.ninvalmsgs > 0)
1147 : {
1148 46 : save_state_data(invalmsgs,
1149 46 : hdr.ninvalmsgs * sizeof(SharedInvalidationMessage));
1150 46 : pfree(invalmsgs);
1151 : }
1152 762 : }
1153 :
1154 : /*
1155 : * Finish preparing state data and writing it to WAL.
1156 : */
1157 : void
1158 758 : EndPrepare(GlobalTransaction gxact)
1159 : {
1160 : TwoPhaseFileHeader *hdr;
1161 : StateFileChunk *record;
1162 : bool replorigin;
1163 :
1164 : /* Add the end sentinel to the list of 2PC records */
1165 758 : RegisterTwoPhaseRecord(TWOPHASE_RM_END_ID, 0,
1166 : NULL, 0);
1167 :
1168 : /* Go back and fill in total_len in the file header record */
1169 758 : hdr = (TwoPhaseFileHeader *) records.head->data;
1170 : Assert(hdr->magic == TWOPHASE_MAGIC);
1171 758 : hdr->total_len = records.total_len + sizeof(pg_crc32c);
1172 :
1173 806 : replorigin = (replorigin_session_origin != InvalidRepOriginId &&
1174 48 : replorigin_session_origin != DoNotReplicateId);
1175 :
1176 758 : if (replorigin)
1177 : {
1178 48 : hdr->origin_lsn = replorigin_session_origin_lsn;
1179 48 : hdr->origin_timestamp = replorigin_session_origin_timestamp;
1180 : }
1181 :
1182 : /*
1183 : * If the data size exceeds MaxAllocSize, we won't be able to read it in
1184 : * ReadTwoPhaseFile. Check for that now, rather than fail in the case
1185 : * where we write data to file and then re-read at commit time.
1186 : */
1187 758 : if (hdr->total_len > MaxAllocSize)
1188 0 : ereport(ERROR,
1189 : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
1190 : errmsg("two-phase state file maximum length exceeded")));
1191 :
1192 : /*
1193 : * Now writing 2PC state data to WAL. We let the WAL's CRC protection
1194 : * cover us, so no need to calculate a separate CRC.
1195 : *
1196 : * We have to set DELAY_CHKPT_START here, too; otherwise a checkpoint
1197 : * starting immediately after the WAL record is inserted could complete
1198 : * without fsync'ing our state file. (This is essentially the same kind
1199 : * of race condition as the COMMIT-to-clog-write case that
1200 : * RecordTransactionCommit uses DELAY_CHKPT_START for; see notes there.)
1201 : *
1202 : * We save the PREPARE record's location in the gxact for later use by
1203 : * CheckPointTwoPhase.
1204 : */
1205 758 : XLogEnsureRecordSpace(0, records.num_chunks);
1206 :
1207 758 : START_CRIT_SECTION();
1208 :
1209 : Assert((MyProc->delayChkptFlags & DELAY_CHKPT_START) == 0);
1210 758 : MyProc->delayChkptFlags |= DELAY_CHKPT_START;
1211 :
1212 758 : XLogBeginInsert();
1213 1602 : for (record = records.head; record != NULL; record = record->next)
1214 844 : XLogRegisterData(record->data, record->len);
1215 :
1216 758 : XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
1217 :
1218 758 : gxact->prepare_end_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE);
1219 :
1220 758 : if (replorigin)
1221 : {
1222 : /* Move LSNs forward for this replication origin */
1223 48 : replorigin_session_advance(replorigin_session_origin_lsn,
1224 : gxact->prepare_end_lsn);
1225 : }
1226 :
1227 758 : XLogFlush(gxact->prepare_end_lsn);
1228 :
1229 : /* If we crash now, we have prepared: WAL replay will fix things */
1230 :
1231 : /* Store record's start location to read that later on Commit */
1232 758 : gxact->prepare_start_lsn = ProcLastRecPtr;
1233 :
1234 : /*
1235 : * Mark the prepared transaction as valid. As soon as xact.c marks MyProc
1236 : * as not running our XID (which it will do immediately after this
1237 : * function returns), others can commit/rollback the xact.
1238 : *
1239 : * NB: a side effect of this is to make a dummy ProcArray entry for the
1240 : * prepared XID. This must happen before we clear the XID from MyProc /
1241 : * ProcGlobal->xids[], else there is a window where the XID is not running
1242 : * according to TransactionIdIsInProgress, and onlookers would be entitled
1243 : * to assume the xact crashed. Instead we have a window where the same
1244 : * XID appears twice in ProcArray, which is OK.
1245 : */
1246 758 : MarkAsPrepared(gxact, false);
1247 :
1248 : /*
1249 : * Now we can mark ourselves as out of the commit critical section: a
1250 : * checkpoint starting after this will certainly see the gxact as a
1251 : * candidate for fsyncing.
1252 : */
1253 758 : MyProc->delayChkptFlags &= ~DELAY_CHKPT_START;
1254 :
1255 : /*
1256 : * Remember that we have this GlobalTransaction entry locked for us. If
1257 : * we crash after this point, it's too late to abort, but we must unlock
1258 : * it so that the prepared transaction can be committed or rolled back.
1259 : */
1260 758 : MyLockedGxact = gxact;
1261 :
1262 758 : END_CRIT_SECTION();
1263 :
1264 : /*
1265 : * Wait for synchronous replication, if required.
1266 : *
1267 : * Note that at this stage we have marked the prepare, but still show as
1268 : * running in the procarray (twice!) and continue to hold locks.
1269 : */
1270 758 : SyncRepWaitForLSN(gxact->prepare_end_lsn, false);
1271 :
1272 758 : records.tail = records.head = NULL;
1273 758 : records.num_chunks = 0;
1274 758 : }
1275 :
1276 : /*
1277 : * Register a 2PC record to be written to state file.
1278 : */
1279 : void
1280 3536 : RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, uint16 info,
1281 : const void *data, uint32 len)
1282 : {
1283 : TwoPhaseRecordOnDisk record;
1284 :
1285 3536 : record.rmid = rmid;
1286 3536 : record.info = info;
1287 3536 : record.len = len;
1288 3536 : save_state_data(&record, sizeof(TwoPhaseRecordOnDisk));
1289 3536 : if (len > 0)
1290 2778 : save_state_data(data, len);
1291 3536 : }
1292 :
1293 :
1294 : /*
1295 : * Read and validate the state file for xid.
1296 : *
1297 : * If it looks OK (has a valid magic number and CRC), return the palloc'd
1298 : * contents of the file, issuing an error when finding corrupted data. If
1299 : * missing_ok is true, which indicates that missing files can be safely
1300 : * ignored, then return NULL. This state can be reached when doing recovery.
1301 : */
1302 : static char *
1303 690 : ReadTwoPhaseFile(TransactionId xid, bool missing_ok)
1304 : {
1305 : char path[MAXPGPATH];
1306 : char *buf;
1307 : TwoPhaseFileHeader *hdr;
1308 : int fd;
1309 : struct stat stat;
1310 : uint32 crc_offset;
1311 : pg_crc32c calc_crc,
1312 : file_crc;
1313 : int r;
1314 :
1315 690 : TwoPhaseFilePath(path, xid);
1316 :
1317 690 : fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
1318 690 : if (fd < 0)
1319 : {
1320 546 : if (missing_ok && errno == ENOENT)
1321 546 : return NULL;
1322 :
1323 0 : ereport(ERROR,
1324 : (errcode_for_file_access(),
1325 : errmsg("could not open file \"%s\": %m", path)));
1326 : }
1327 :
1328 : /*
1329 : * Check file length. We can determine a lower bound pretty easily. We
1330 : * set an upper bound to avoid palloc() failure on a corrupt file, though
1331 : * we can't guarantee that we won't get an out of memory error anyway,
1332 : * even on a valid file.
1333 : */
1334 144 : if (fstat(fd, &stat))
1335 0 : ereport(ERROR,
1336 : (errcode_for_file_access(),
1337 : errmsg("could not stat file \"%s\": %m", path)));
1338 :
1339 144 : if (stat.st_size < (MAXALIGN(sizeof(TwoPhaseFileHeader)) +
1340 : MAXALIGN(sizeof(TwoPhaseRecordOnDisk)) +
1341 144 : sizeof(pg_crc32c)) ||
1342 144 : stat.st_size > MaxAllocSize)
1343 0 : ereport(ERROR,
1344 : (errcode(ERRCODE_DATA_CORRUPTED),
1345 : errmsg_plural("incorrect size of file \"%s\": %lld byte",
1346 : "incorrect size of file \"%s\": %lld bytes",
1347 : (long long int) stat.st_size, path,
1348 : (long long int) stat.st_size)));
1349 :
1350 144 : crc_offset = stat.st_size - sizeof(pg_crc32c);
1351 144 : if (crc_offset != MAXALIGN(crc_offset))
1352 0 : ereport(ERROR,
1353 : (errcode(ERRCODE_DATA_CORRUPTED),
1354 : errmsg("incorrect alignment of CRC offset for file \"%s\"",
1355 : path)));
1356 :
1357 : /*
1358 : * OK, slurp in the file.
1359 : */
1360 144 : buf = (char *) palloc(stat.st_size);
1361 :
1362 144 : pgstat_report_wait_start(WAIT_EVENT_TWOPHASE_FILE_READ);
1363 144 : r = read(fd, buf, stat.st_size);
1364 144 : if (r != stat.st_size)
1365 : {
1366 0 : if (r < 0)
1367 0 : ereport(ERROR,
1368 : (errcode_for_file_access(),
1369 : errmsg("could not read file \"%s\": %m", path)));
1370 : else
1371 0 : ereport(ERROR,
1372 : (errmsg("could not read file \"%s\": read %d of %lld",
1373 : path, r, (long long int) stat.st_size)));
1374 : }
1375 :
1376 144 : pgstat_report_wait_end();
1377 :
1378 144 : if (CloseTransientFile(fd) != 0)
1379 0 : ereport(ERROR,
1380 : (errcode_for_file_access(),
1381 : errmsg("could not close file \"%s\": %m", path)));
1382 :
1383 144 : hdr = (TwoPhaseFileHeader *) buf;
1384 144 : if (hdr->magic != TWOPHASE_MAGIC)
1385 0 : ereport(ERROR,
1386 : (errcode(ERRCODE_DATA_CORRUPTED),
1387 : errmsg("invalid magic number stored in file \"%s\"",
1388 : path)));
1389 :
1390 144 : if (hdr->total_len != stat.st_size)
1391 0 : ereport(ERROR,
1392 : (errcode(ERRCODE_DATA_CORRUPTED),
1393 : errmsg("invalid size stored in file \"%s\"",
1394 : path)));
1395 :
1396 144 : INIT_CRC32C(calc_crc);
1397 144 : COMP_CRC32C(calc_crc, buf, crc_offset);
1398 144 : FIN_CRC32C(calc_crc);
1399 :
1400 144 : file_crc = *((pg_crc32c *) (buf + crc_offset));
1401 :
1402 144 : if (!EQ_CRC32C(calc_crc, file_crc))
1403 0 : ereport(ERROR,
1404 : (errcode(ERRCODE_DATA_CORRUPTED),
1405 : errmsg("calculated CRC checksum does not match value stored in file \"%s\"",
1406 : path)));
1407 :
1408 144 : return buf;
1409 : }
1410 :
1411 :
1412 : /*
1413 : * Reads 2PC data from xlog. During checkpoint this data will be moved to
1414 : * twophase files and ReadTwoPhaseFile should be used instead.
1415 : *
1416 : * Note clearly that this function can access WAL during normal operation,
1417 : * similarly to the way WALSender or Logical Decoding would do.
1418 : */
1419 : static void
1420 920 : XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
1421 : {
1422 : XLogRecord *record;
1423 : XLogReaderState *xlogreader;
1424 : char *errormsg;
1425 :
1426 920 : xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
1427 920 : XL_ROUTINE(.page_read = &read_local_xlog_page,
1428 : .segment_open = &wal_segment_open,
1429 : .segment_close = &wal_segment_close),
1430 : NULL);
1431 920 : if (!xlogreader)
1432 0 : ereport(ERROR,
1433 : (errcode(ERRCODE_OUT_OF_MEMORY),
1434 : errmsg("out of memory"),
1435 : errdetail("Failed while allocating a WAL reading processor.")));
1436 :
1437 920 : XLogBeginRead(xlogreader, lsn);
1438 920 : record = XLogReadRecord(xlogreader, &errormsg);
1439 :
1440 920 : if (record == NULL)
1441 : {
1442 0 : if (errormsg)
1443 0 : ereport(ERROR,
1444 : (errcode_for_file_access(),
1445 : errmsg("could not read two-phase state from WAL at %X/%X: %s",
1446 : LSN_FORMAT_ARGS(lsn), errormsg)));
1447 : else
1448 0 : ereport(ERROR,
1449 : (errcode_for_file_access(),
1450 : errmsg("could not read two-phase state from WAL at %X/%X",
1451 : LSN_FORMAT_ARGS(lsn))));
1452 : }
1453 :
1454 920 : if (XLogRecGetRmid(xlogreader) != RM_XACT_ID ||
1455 920 : (XLogRecGetInfo(xlogreader) & XLOG_XACT_OPMASK) != XLOG_XACT_PREPARE)
1456 0 : ereport(ERROR,
1457 : (errcode_for_file_access(),
1458 : errmsg("expected two-phase state data is not present in WAL at %X/%X",
1459 : LSN_FORMAT_ARGS(lsn))));
1460 :
1461 920 : if (len != NULL)
1462 48 : *len = XLogRecGetDataLen(xlogreader);
1463 :
1464 920 : *buf = palloc(sizeof(char) * XLogRecGetDataLen(xlogreader));
1465 920 : memcpy(*buf, XLogRecGetData(xlogreader), sizeof(char) * XLogRecGetDataLen(xlogreader));
1466 :
1467 920 : XLogReaderFree(xlogreader);
1468 920 : }
1469 :
1470 :
1471 : /*
1472 : * Confirms an xid is prepared, during recovery
1473 : */
1474 : bool
1475 546 : StandbyTransactionIdIsPrepared(TransactionId xid)
1476 : {
1477 : char *buf;
1478 : TwoPhaseFileHeader *hdr;
1479 : bool result;
1480 :
1481 : Assert(TransactionIdIsValid(xid));
1482 :
1483 546 : if (max_prepared_xacts <= 0)
1484 0 : return false; /* nothing to do */
1485 :
1486 : /* Read and validate file */
1487 546 : buf = ReadTwoPhaseFile(xid, true);
1488 546 : if (buf == NULL)
1489 546 : return false;
1490 :
1491 : /* Check header also */
1492 0 : hdr = (TwoPhaseFileHeader *) buf;
1493 0 : result = TransactionIdEquals(hdr->xid, xid);
1494 0 : pfree(buf);
1495 :
1496 0 : return result;
1497 : }
1498 :
1499 : /*
1500 : * FinishPreparedTransaction: execute COMMIT PREPARED or ROLLBACK PREPARED
1501 : */
1502 : void
1503 784 : FinishPreparedTransaction(const char *gid, bool isCommit)
1504 : {
1505 : GlobalTransaction gxact;
1506 : PGPROC *proc;
1507 : TransactionId xid;
1508 : bool ondisk;
1509 : char *buf;
1510 : char *bufptr;
1511 : TwoPhaseFileHeader *hdr;
1512 : TransactionId latestXid;
1513 : TransactionId *children;
1514 : RelFileLocator *commitrels;
1515 : RelFileLocator *abortrels;
1516 : RelFileLocator *delrels;
1517 : int ndelrels;
1518 : xl_xact_stats_item *commitstats;
1519 : xl_xact_stats_item *abortstats;
1520 : SharedInvalidationMessage *invalmsgs;
1521 :
1522 : /*
1523 : * Validate the GID, and lock the GXACT to ensure that two backends do not
1524 : * try to commit the same GID at once.
1525 : */
1526 784 : gxact = LockGXact(gid, GetUserId());
1527 772 : proc = GetPGProcByNumber(gxact->pgprocno);
1528 772 : xid = gxact->xid;
1529 :
1530 : /*
1531 : * Read and validate 2PC state data. State data will typically be stored
1532 : * in WAL files if the LSN is after the last checkpoint record, or moved
1533 : * to disk if for some reason they have lived for a long time.
1534 : */
1535 772 : if (gxact->ondisk)
1536 50 : buf = ReadTwoPhaseFile(xid, false);
1537 : else
1538 722 : XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, NULL);
1539 :
1540 :
1541 : /*
1542 : * Disassemble the header area
1543 : */
1544 772 : hdr = (TwoPhaseFileHeader *) buf;
1545 : Assert(TransactionIdEquals(hdr->xid, xid));
1546 772 : bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
1547 772 : bufptr += MAXALIGN(hdr->gidlen);
1548 772 : children = (TransactionId *) bufptr;
1549 772 : bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
1550 772 : commitrels = (RelFileLocator *) bufptr;
1551 772 : bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileLocator));
1552 772 : abortrels = (RelFileLocator *) bufptr;
1553 772 : bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileLocator));
1554 772 : commitstats = (xl_xact_stats_item *) bufptr;
1555 772 : bufptr += MAXALIGN(hdr->ncommitstats * sizeof(xl_xact_stats_item));
1556 772 : abortstats = (xl_xact_stats_item *) bufptr;
1557 772 : bufptr += MAXALIGN(hdr->nabortstats * sizeof(xl_xact_stats_item));
1558 772 : invalmsgs = (SharedInvalidationMessage *) bufptr;
1559 772 : bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
1560 :
1561 : /* compute latestXid among all children */
1562 772 : latestXid = TransactionIdLatest(xid, hdr->nsubxacts, children);
1563 :
1564 : /* Prevent cancel/die interrupt while cleaning up */
1565 772 : HOLD_INTERRUPTS();
1566 :
1567 : /*
1568 : * The order of operations here is critical: make the XLOG entry for
1569 : * commit or abort, then mark the transaction committed or aborted in
1570 : * pg_xact, then remove its PGPROC from the global ProcArray (which means
1571 : * TransactionIdIsInProgress will stop saying the prepared xact is in
1572 : * progress), then run the post-commit or post-abort callbacks. The
1573 : * callbacks will release the locks the transaction held.
1574 : */
1575 772 : if (isCommit)
1576 694 : RecordTransactionCommitPrepared(xid,
1577 : hdr->nsubxacts, children,
1578 : hdr->ncommitrels, commitrels,
1579 : hdr->ncommitstats,
1580 : commitstats,
1581 : hdr->ninvalmsgs, invalmsgs,
1582 694 : hdr->initfileinval, gid);
1583 : else
1584 78 : RecordTransactionAbortPrepared(xid,
1585 : hdr->nsubxacts, children,
1586 : hdr->nabortrels, abortrels,
1587 : hdr->nabortstats,
1588 : abortstats,
1589 : gid);
1590 :
1591 772 : ProcArrayRemove(proc, latestXid);
1592 :
1593 : /*
1594 : * In case we fail while running the callbacks, mark the gxact invalid so
1595 : * no one else will try to commit/rollback, and so it will be recycled if
1596 : * we fail after this point. It is still locked by our backend so it
1597 : * won't go away yet.
1598 : *
1599 : * (We assume it's safe to do this without taking TwoPhaseStateLock.)
1600 : */
1601 772 : gxact->valid = false;
1602 :
1603 : /*
1604 : * We have to remove any files that were supposed to be dropped. For
1605 : * consistency with the regular xact.c code paths, must do this before
1606 : * releasing locks, so do it before running the callbacks.
1607 : *
1608 : * NB: this code knows that we couldn't be dropping any temp rels ...
1609 : */
1610 772 : if (isCommit)
1611 : {
1612 694 : delrels = commitrels;
1613 694 : ndelrels = hdr->ncommitrels;
1614 : }
1615 : else
1616 : {
1617 78 : delrels = abortrels;
1618 78 : ndelrels = hdr->nabortrels;
1619 : }
1620 :
1621 : /* Make sure files supposed to be dropped are dropped */
1622 772 : DropRelationFiles(delrels, ndelrels, false);
1623 :
1624 772 : if (isCommit)
1625 694 : pgstat_execute_transactional_drops(hdr->ncommitstats, commitstats, false);
1626 : else
1627 78 : pgstat_execute_transactional_drops(hdr->nabortstats, abortstats, false);
1628 :
1629 : /*
1630 : * Handle cache invalidation messages.
1631 : *
1632 : * Relcache init file invalidation requires processing both before and
1633 : * after we send the SI messages, only when committing. See
1634 : * AtEOXact_Inval().
1635 : */
1636 772 : if (isCommit)
1637 : {
1638 694 : if (hdr->initfileinval)
1639 0 : RelationCacheInitFilePreInvalidate();
1640 694 : SendSharedInvalidMessages(invalmsgs, hdr->ninvalmsgs);
1641 694 : if (hdr->initfileinval)
1642 0 : RelationCacheInitFilePostInvalidate();
1643 : }
1644 :
1645 : /*
1646 : * Acquire the two-phase lock. We want to work on the two-phase callbacks
1647 : * while holding it to avoid potential conflicts with other transactions
1648 : * attempting to use the same GID, so the lock is released once the shared
1649 : * memory state is cleared.
1650 : */
1651 772 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
1652 :
1653 : /* And now do the callbacks */
1654 772 : if (isCommit)
1655 694 : ProcessRecords(bufptr, xid, twophase_postcommit_callbacks);
1656 : else
1657 78 : ProcessRecords(bufptr, xid, twophase_postabort_callbacks);
1658 :
1659 772 : PredicateLockTwoPhaseFinish(xid, isCommit);
1660 :
1661 : /*
1662 : * Read this value while holding the two-phase lock, as the on-disk 2PC
1663 : * file is physically removed after the lock is released.
1664 : */
1665 772 : ondisk = gxact->ondisk;
1666 :
1667 : /* Clear shared memory state */
1668 772 : RemoveGXact(gxact);
1669 :
1670 : /*
1671 : * Release the lock as all callbacks are called and shared memory cleanup
1672 : * is done.
1673 : */
1674 772 : LWLockRelease(TwoPhaseStateLock);
1675 :
1676 : /* Count the prepared xact as committed or aborted */
1677 772 : AtEOXact_PgStat(isCommit, false);
1678 :
1679 : /*
1680 : * And now we can clean up any files we may have left.
1681 : */
1682 772 : if (ondisk)
1683 50 : RemoveTwoPhaseFile(xid, true);
1684 :
1685 772 : MyLockedGxact = NULL;
1686 :
1687 772 : RESUME_INTERRUPTS();
1688 :
1689 772 : pfree(buf);
1690 772 : }
1691 :
1692 : /*
1693 : * Scan 2PC state data in memory and call the indicated callbacks for each 2PC record.
1694 : */
1695 : static void
1696 4000 : ProcessRecords(char *bufptr, TransactionId xid,
1697 : const TwoPhaseCallback callbacks[])
1698 : {
1699 : for (;;)
1700 3164 : {
1701 4000 : TwoPhaseRecordOnDisk *record = (TwoPhaseRecordOnDisk *) bufptr;
1702 :
1703 : Assert(record->rmid <= TWOPHASE_RM_MAX_ID);
1704 4000 : if (record->rmid == TWOPHASE_RM_END_ID)
1705 836 : break;
1706 :
1707 3164 : bufptr += MAXALIGN(sizeof(TwoPhaseRecordOnDisk));
1708 :
1709 3164 : if (callbacks[record->rmid] != NULL)
1710 3008 : callbacks[record->rmid] (xid, record->info,
1711 : (void *) bufptr, record->len);
1712 :
1713 3164 : bufptr += MAXALIGN(record->len);
1714 : }
1715 836 : }
1716 :
1717 : /*
1718 : * Remove the 2PC file for the specified XID.
1719 : *
1720 : * If giveWarning is false, do not complain about file-not-present;
1721 : * this is an expected case during WAL replay.
1722 : */
1723 : static void
1724 58 : RemoveTwoPhaseFile(TransactionId xid, bool giveWarning)
1725 : {
1726 : char path[MAXPGPATH];
1727 :
1728 58 : TwoPhaseFilePath(path, xid);
1729 58 : if (unlink(path))
1730 0 : if (errno != ENOENT || giveWarning)
1731 0 : ereport(WARNING,
1732 : (errcode_for_file_access(),
1733 : errmsg("could not remove file \"%s\": %m", path)));
1734 58 : }
1735 :
1736 : /*
1737 : * Recreates a state file. This is used in WAL replay and during
1738 : * checkpoint creation.
1739 : *
1740 : * Note: content and len don't include CRC.
1741 : */
1742 : static void
1743 48 : RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
1744 : {
1745 : char path[MAXPGPATH];
1746 : pg_crc32c statefile_crc;
1747 : int fd;
1748 :
1749 : /* Recompute CRC */
1750 48 : INIT_CRC32C(statefile_crc);
1751 48 : COMP_CRC32C(statefile_crc, content, len);
1752 48 : FIN_CRC32C(statefile_crc);
1753 :
1754 48 : TwoPhaseFilePath(path, xid);
1755 :
1756 48 : fd = OpenTransientFile(path,
1757 : O_CREAT | O_TRUNC | O_WRONLY | PG_BINARY);
1758 48 : if (fd < 0)
1759 0 : ereport(ERROR,
1760 : (errcode_for_file_access(),
1761 : errmsg("could not recreate file \"%s\": %m", path)));
1762 :
1763 : /* Write content and CRC */
1764 48 : errno = 0;
1765 48 : pgstat_report_wait_start(WAIT_EVENT_TWOPHASE_FILE_WRITE);
1766 48 : if (write(fd, content, len) != len)
1767 : {
1768 : /* if write didn't set errno, assume problem is no disk space */
1769 0 : if (errno == 0)
1770 0 : errno = ENOSPC;
1771 0 : ereport(ERROR,
1772 : (errcode_for_file_access(),
1773 : errmsg("could not write file \"%s\": %m", path)));
1774 : }
1775 48 : if (write(fd, &statefile_crc, sizeof(pg_crc32c)) != sizeof(pg_crc32c))
1776 : {
1777 : /* if write didn't set errno, assume problem is no disk space */
1778 0 : if (errno == 0)
1779 0 : errno = ENOSPC;
1780 0 : ereport(ERROR,
1781 : (errcode_for_file_access(),
1782 : errmsg("could not write file \"%s\": %m", path)));
1783 : }
1784 48 : pgstat_report_wait_end();
1785 :
1786 : /*
1787 : * We must fsync the file because the end-of-replay checkpoint will not do
1788 : * so, there being no GXACT in shared memory yet to tell it to.
1789 : */
1790 48 : pgstat_report_wait_start(WAIT_EVENT_TWOPHASE_FILE_SYNC);
1791 48 : if (pg_fsync(fd) != 0)
1792 0 : ereport(ERROR,
1793 : (errcode_for_file_access(),
1794 : errmsg("could not fsync file \"%s\": %m", path)));
1795 48 : pgstat_report_wait_end();
1796 :
1797 48 : if (CloseTransientFile(fd) != 0)
1798 0 : ereport(ERROR,
1799 : (errcode_for_file_access(),
1800 : errmsg("could not close file \"%s\": %m", path)));
1801 48 : }
1802 :
1803 : /*
1804 : * CheckPointTwoPhase -- handle 2PC component of checkpointing.
1805 : *
1806 : * We must fsync the state file of any GXACT that is valid or has been
1807 : * generated during redo and has a PREPARE LSN <= the checkpoint's redo
1808 : * horizon. (If the gxact isn't valid yet, has not been generated in
1809 : * redo, or has a later LSN, this checkpoint is not responsible for
1810 : * fsyncing it.)
1811 : *
1812 : * This is deliberately run as late as possible in the checkpoint sequence,
1813 : * because GXACTs ordinarily have short lifespans, and so it is quite
1814 : * possible that GXACTs that were valid at checkpoint start will no longer
1815 : * exist if we wait a little bit. With typical checkpoint settings this
1816 : * will be about 3 minutes for an online checkpoint, so as a result we
1817 : * expect that there will be no GXACTs that need to be copied to disk.
1818 : *
1819 : * If a GXACT remains valid across multiple checkpoints, it will already
1820 : * be on disk so we don't bother to repeat that write.
1821 : */
1822 : void
1823 2472 : CheckPointTwoPhase(XLogRecPtr redo_horizon)
1824 : {
1825 : int i;
1826 2472 : int serialized_xacts = 0;
1827 :
1828 2472 : if (max_prepared_xacts <= 0)
1829 1488 : return; /* nothing to do */
1830 :
1831 : TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_START();
1832 :
1833 : /*
1834 : * We are expecting there to be zero GXACTs that need to be copied to
1835 : * disk, so we perform all I/O while holding TwoPhaseStateLock for
1836 : * simplicity. This prevents any new xacts from preparing while this
1837 : * occurs, which shouldn't be a problem since the presence of long-lived
1838 : * prepared xacts indicates the transaction manager isn't active.
1839 : *
1840 : * It's also possible to move I/O out of the lock, but on every error we
1841 : * should check whether somebody committed our transaction in different
1842 : * backend. Let's leave this optimization for future, if somebody will
1843 : * spot that this place cause bottleneck.
1844 : *
1845 : * Note that it isn't possible for there to be a GXACT with a
1846 : * prepare_end_lsn set prior to the last checkpoint yet is marked invalid,
1847 : * because of the efforts with delayChkptFlags.
1848 : */
1849 984 : LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
1850 1054 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
1851 : {
1852 : /*
1853 : * Note that we are using gxact not PGPROC so this works in recovery
1854 : * also
1855 : */
1856 70 : GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
1857 :
1858 70 : if ((gxact->valid || gxact->inredo) &&
1859 70 : !gxact->ondisk &&
1860 62 : gxact->prepare_end_lsn <= redo_horizon)
1861 : {
1862 : char *buf;
1863 : int len;
1864 :
1865 48 : XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, &len);
1866 48 : RecreateTwoPhaseFile(gxact->xid, buf, len);
1867 48 : gxact->ondisk = true;
1868 48 : gxact->prepare_start_lsn = InvalidXLogRecPtr;
1869 48 : gxact->prepare_end_lsn = InvalidXLogRecPtr;
1870 48 : pfree(buf);
1871 48 : serialized_xacts++;
1872 : }
1873 : }
1874 984 : LWLockRelease(TwoPhaseStateLock);
1875 :
1876 : /*
1877 : * Flush unconditionally the parent directory to make any information
1878 : * durable on disk. Two-phase files could have been removed and those
1879 : * removals need to be made persistent as well as any files newly created
1880 : * previously since the last checkpoint.
1881 : */
1882 984 : fsync_fname(TWOPHASE_DIR, true);
1883 :
1884 : TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_DONE();
1885 :
1886 984 : if (log_checkpoints && serialized_xacts > 0)
1887 40 : ereport(LOG,
1888 : (errmsg_plural("%u two-phase state file was written "
1889 : "for a long-running prepared transaction",
1890 : "%u two-phase state files were written "
1891 : "for long-running prepared transactions",
1892 : serialized_xacts,
1893 : serialized_xacts)));
1894 : }
1895 :
1896 : /*
1897 : * restoreTwoPhaseData
1898 : *
1899 : * Scan pg_twophase and fill TwoPhaseState depending on the on-disk data.
1900 : * This is called once at the beginning of recovery, saving any extra
1901 : * lookups in the future. Two-phase files that are newer than the
1902 : * minimum XID horizon are discarded on the way.
1903 : */
1904 : void
1905 1634 : restoreTwoPhaseData(void)
1906 : {
1907 : DIR *cldir;
1908 : struct dirent *clde;
1909 :
1910 1634 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
1911 1634 : cldir = AllocateDir(TWOPHASE_DIR);
1912 4932 : while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL)
1913 : {
1914 3298 : if (strlen(clde->d_name) == 16 &&
1915 30 : strspn(clde->d_name, "0123456789ABCDEF") == 16)
1916 : {
1917 : TransactionId xid;
1918 : FullTransactionId fxid;
1919 : char *buf;
1920 :
1921 30 : fxid = FullTransactionIdFromU64(strtou64(clde->d_name, NULL, 16));
1922 30 : xid = XidFromFullTransactionId(fxid);
1923 :
1924 30 : buf = ProcessTwoPhaseBuffer(xid, InvalidXLogRecPtr,
1925 : true, false, false);
1926 30 : if (buf == NULL)
1927 0 : continue;
1928 :
1929 30 : PrepareRedoAdd(buf, InvalidXLogRecPtr,
1930 : InvalidXLogRecPtr, InvalidRepOriginId);
1931 : }
1932 : }
1933 1634 : LWLockRelease(TwoPhaseStateLock);
1934 1634 : FreeDir(cldir);
1935 1634 : }
1936 :
1937 : /*
1938 : * PrescanPreparedTransactions
1939 : *
1940 : * Scan the shared memory entries of TwoPhaseState and determine the range
1941 : * of valid XIDs present. This is run during database startup, after we
1942 : * have completed reading WAL. TransamVariables->nextXid has been set to
1943 : * one more than the highest XID for which evidence exists in WAL.
1944 : *
1945 : * We throw away any prepared xacts with main XID beyond nextXid --- if any
1946 : * are present, it suggests that the DBA has done a PITR recovery to an
1947 : * earlier point in time without cleaning out pg_twophase. We dare not
1948 : * try to recover such prepared xacts since they likely depend on database
1949 : * state that doesn't exist now.
1950 : *
1951 : * However, we will advance nextXid beyond any subxact XIDs belonging to
1952 : * valid prepared xacts. We need to do this since subxact commit doesn't
1953 : * write a WAL entry, and so there might be no evidence in WAL of those
1954 : * subxact XIDs.
1955 : *
1956 : * On corrupted two-phase files, fail immediately. Keeping around broken
1957 : * entries and let replay continue causes harm on the system, and a new
1958 : * backup should be rolled in.
1959 : *
1960 : * Our other responsibility is to determine and return the oldest valid XID
1961 : * among the prepared xacts (if none, return TransamVariables->nextXid).
1962 : * This is needed to synchronize pg_subtrans startup properly.
1963 : *
1964 : * If xids_p and nxids_p are not NULL, pointer to a palloc'd array of all
1965 : * top-level xids is stored in *xids_p. The number of entries in the array
1966 : * is returned in *nxids_p.
1967 : */
1968 : TransactionId
1969 1638 : PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
1970 : {
1971 1638 : FullTransactionId nextXid = TransamVariables->nextXid;
1972 1638 : TransactionId origNextXid = XidFromFullTransactionId(nextXid);
1973 1638 : TransactionId result = origNextXid;
1974 1638 : TransactionId *xids = NULL;
1975 1638 : int nxids = 0;
1976 1638 : int allocsize = 0;
1977 : int i;
1978 :
1979 1638 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
1980 1740 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
1981 : {
1982 : TransactionId xid;
1983 : char *buf;
1984 102 : GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
1985 :
1986 : Assert(gxact->inredo);
1987 :
1988 102 : xid = gxact->xid;
1989 :
1990 102 : buf = ProcessTwoPhaseBuffer(xid,
1991 : gxact->prepare_start_lsn,
1992 102 : gxact->ondisk, false, true);
1993 :
1994 102 : if (buf == NULL)
1995 0 : continue;
1996 :
1997 : /*
1998 : * OK, we think this file is valid. Incorporate xid into the
1999 : * running-minimum result.
2000 : */
2001 102 : if (TransactionIdPrecedes(xid, result))
2002 84 : result = xid;
2003 :
2004 102 : if (xids_p)
2005 : {
2006 38 : if (nxids == allocsize)
2007 : {
2008 30 : if (nxids == 0)
2009 : {
2010 30 : allocsize = 10;
2011 30 : xids = palloc(allocsize * sizeof(TransactionId));
2012 : }
2013 : else
2014 : {
2015 0 : allocsize = allocsize * 2;
2016 0 : xids = repalloc(xids, allocsize * sizeof(TransactionId));
2017 : }
2018 : }
2019 38 : xids[nxids++] = xid;
2020 : }
2021 :
2022 102 : pfree(buf);
2023 : }
2024 1638 : LWLockRelease(TwoPhaseStateLock);
2025 :
2026 1638 : if (xids_p)
2027 : {
2028 110 : *xids_p = xids;
2029 110 : *nxids_p = nxids;
2030 : }
2031 :
2032 1638 : return result;
2033 : }
2034 :
2035 : /*
2036 : * StandbyRecoverPreparedTransactions
2037 : *
2038 : * Scan the shared memory entries of TwoPhaseState and setup all the required
2039 : * information to allow standby queries to treat prepared transactions as still
2040 : * active.
2041 : *
2042 : * This is never called at the end of recovery - we use
2043 : * RecoverPreparedTransactions() at that point.
2044 : *
2045 : * This updates pg_subtrans, so that any subtransactions will be correctly
2046 : * seen as in-progress in snapshots taken during recovery.
2047 : */
2048 : void
2049 110 : StandbyRecoverPreparedTransactions(void)
2050 : {
2051 : int i;
2052 :
2053 110 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
2054 148 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
2055 : {
2056 : TransactionId xid;
2057 : char *buf;
2058 38 : GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
2059 :
2060 : Assert(gxact->inredo);
2061 :
2062 38 : xid = gxact->xid;
2063 :
2064 38 : buf = ProcessTwoPhaseBuffer(xid,
2065 : gxact->prepare_start_lsn,
2066 38 : gxact->ondisk, true, false);
2067 38 : if (buf != NULL)
2068 38 : pfree(buf);
2069 : }
2070 110 : LWLockRelease(TwoPhaseStateLock);
2071 110 : }
2072 :
2073 : /*
2074 : * RecoverPreparedTransactions
2075 : *
2076 : * Scan the shared memory entries of TwoPhaseState and reload the state for
2077 : * each prepared transaction (reacquire locks, etc).
2078 : *
2079 : * This is run at the end of recovery, but before we allow backends to write
2080 : * WAL.
2081 : *
2082 : * At the end of recovery the way we take snapshots will change. We now need
2083 : * to mark all running transactions with their full SubTransSetParent() info
2084 : * to allow normal snapshots to work correctly if snapshots overflow.
2085 : * We do this here because by definition prepared transactions are the only
2086 : * type of write transaction still running, so this is necessary and
2087 : * complete.
2088 : */
2089 : void
2090 1528 : RecoverPreparedTransactions(void)
2091 : {
2092 : int i;
2093 :
2094 1528 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
2095 1592 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
2096 : {
2097 : TransactionId xid;
2098 : char *buf;
2099 64 : GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
2100 : char *bufptr;
2101 : TwoPhaseFileHeader *hdr;
2102 : TransactionId *subxids;
2103 : const char *gid;
2104 :
2105 64 : xid = gxact->xid;
2106 :
2107 : /*
2108 : * Reconstruct subtrans state for the transaction --- needed because
2109 : * pg_subtrans is not preserved over a restart. Note that we are
2110 : * linking all the subtransactions directly to the top-level XID;
2111 : * there may originally have been a more complex hierarchy, but
2112 : * there's no need to restore that exactly. It's possible that
2113 : * SubTransSetParent has been set before, if the prepared transaction
2114 : * generated xid assignment records.
2115 : */
2116 64 : buf = ProcessTwoPhaseBuffer(xid,
2117 : gxact->prepare_start_lsn,
2118 64 : gxact->ondisk, true, false);
2119 64 : if (buf == NULL)
2120 0 : continue;
2121 :
2122 64 : ereport(LOG,
2123 : (errmsg("recovering prepared transaction %u from shared memory", xid)));
2124 :
2125 64 : hdr = (TwoPhaseFileHeader *) buf;
2126 : Assert(TransactionIdEquals(hdr->xid, xid));
2127 64 : bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
2128 64 : gid = (const char *) bufptr;
2129 64 : bufptr += MAXALIGN(hdr->gidlen);
2130 64 : subxids = (TransactionId *) bufptr;
2131 64 : bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
2132 64 : bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileLocator));
2133 64 : bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileLocator));
2134 64 : bufptr += MAXALIGN(hdr->ncommitstats * sizeof(xl_xact_stats_item));
2135 64 : bufptr += MAXALIGN(hdr->nabortstats * sizeof(xl_xact_stats_item));
2136 64 : bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
2137 :
2138 : /*
2139 : * Recreate its GXACT and dummy PGPROC. But, check whether it was
2140 : * added in redo and already has a shmem entry for it.
2141 : */
2142 64 : MarkAsPreparingGuts(gxact, xid, gid,
2143 : hdr->prepared_at,
2144 : hdr->owner, hdr->database);
2145 :
2146 : /* recovered, so reset the flag for entries generated by redo */
2147 64 : gxact->inredo = false;
2148 :
2149 64 : GXactLoadSubxactData(gxact, hdr->nsubxacts, subxids);
2150 64 : MarkAsPrepared(gxact, true);
2151 :
2152 64 : LWLockRelease(TwoPhaseStateLock);
2153 :
2154 : /*
2155 : * Recover other state (notably locks) using resource managers.
2156 : */
2157 64 : ProcessRecords(bufptr, xid, twophase_recover_callbacks);
2158 :
2159 : /*
2160 : * Release locks held by the standby process after we process each
2161 : * prepared transaction. As a result, we don't need too many
2162 : * additional locks at any one time.
2163 : */
2164 64 : if (InHotStandby)
2165 12 : StandbyReleaseLockTree(xid, hdr->nsubxacts, subxids);
2166 :
2167 : /*
2168 : * We're done with recovering this transaction. Clear MyLockedGxact,
2169 : * like we do in PrepareTransaction() during normal operation.
2170 : */
2171 64 : PostPrepare_Twophase();
2172 :
2173 64 : pfree(buf);
2174 :
2175 64 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
2176 : }
2177 :
2178 1528 : LWLockRelease(TwoPhaseStateLock);
2179 1528 : }
2180 :
2181 : /*
2182 : * ProcessTwoPhaseBuffer
2183 : *
2184 : * Given a transaction id, read it either from disk or read it directly
2185 : * via shmem xlog record pointer using the provided "prepare_start_lsn".
2186 : *
2187 : * If setParent is true, set up subtransaction parent linkages.
2188 : *
2189 : * If setNextXid is true, set TransamVariables->nextXid to the newest
2190 : * value scanned.
2191 : */
2192 : static char *
2193 234 : ProcessTwoPhaseBuffer(TransactionId xid,
2194 : XLogRecPtr prepare_start_lsn,
2195 : bool fromdisk,
2196 : bool setParent, bool setNextXid)
2197 : {
2198 234 : FullTransactionId nextXid = TransamVariables->nextXid;
2199 234 : TransactionId origNextXid = XidFromFullTransactionId(nextXid);
2200 : TransactionId *subxids;
2201 : char *buf;
2202 : TwoPhaseFileHeader *hdr;
2203 : int i;
2204 :
2205 : Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
2206 :
2207 234 : if (!fromdisk)
2208 : Assert(prepare_start_lsn != InvalidXLogRecPtr);
2209 :
2210 : /* Already processed? */
2211 234 : if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
2212 : {
2213 0 : if (fromdisk)
2214 : {
2215 0 : ereport(WARNING,
2216 : (errmsg("removing stale two-phase state file for transaction %u",
2217 : xid)));
2218 0 : RemoveTwoPhaseFile(xid, true);
2219 : }
2220 : else
2221 : {
2222 0 : ereport(WARNING,
2223 : (errmsg("removing stale two-phase state from memory for transaction %u",
2224 : xid)));
2225 0 : PrepareRedoRemove(xid, true);
2226 : }
2227 0 : return NULL;
2228 : }
2229 :
2230 : /* Reject XID if too new */
2231 234 : if (TransactionIdFollowsOrEquals(xid, origNextXid))
2232 : {
2233 0 : if (fromdisk)
2234 : {
2235 0 : ereport(WARNING,
2236 : (errmsg("removing future two-phase state file for transaction %u",
2237 : xid)));
2238 0 : RemoveTwoPhaseFile(xid, true);
2239 : }
2240 : else
2241 : {
2242 0 : ereport(WARNING,
2243 : (errmsg("removing future two-phase state from memory for transaction %u",
2244 : xid)));
2245 0 : PrepareRedoRemove(xid, true);
2246 : }
2247 0 : return NULL;
2248 : }
2249 :
2250 234 : if (fromdisk)
2251 : {
2252 : /* Read and validate file */
2253 94 : buf = ReadTwoPhaseFile(xid, false);
2254 : }
2255 : else
2256 : {
2257 : /* Read xlog data */
2258 140 : XlogReadTwoPhaseData(prepare_start_lsn, &buf, NULL);
2259 : }
2260 :
2261 : /* Deconstruct header */
2262 234 : hdr = (TwoPhaseFileHeader *) buf;
2263 234 : if (!TransactionIdEquals(hdr->xid, xid))
2264 : {
2265 0 : if (fromdisk)
2266 0 : ereport(ERROR,
2267 : (errcode(ERRCODE_DATA_CORRUPTED),
2268 : errmsg("corrupted two-phase state file for transaction %u",
2269 : xid)));
2270 : else
2271 0 : ereport(ERROR,
2272 : (errcode(ERRCODE_DATA_CORRUPTED),
2273 : errmsg("corrupted two-phase state in memory for transaction %u",
2274 : xid)));
2275 : }
2276 :
2277 : /*
2278 : * Examine subtransaction XIDs ... they should all follow main XID, and
2279 : * they may force us to advance nextXid.
2280 : */
2281 234 : subxids = (TransactionId *) (buf +
2282 234 : MAXALIGN(sizeof(TwoPhaseFileHeader)) +
2283 234 : MAXALIGN(hdr->gidlen));
2284 3798 : for (i = 0; i < hdr->nsubxacts; i++)
2285 : {
2286 3564 : TransactionId subxid = subxids[i];
2287 :
2288 : Assert(TransactionIdFollows(subxid, xid));
2289 :
2290 : /* update nextXid if needed */
2291 3564 : if (setNextXid)
2292 1642 : AdvanceNextFullTransactionIdPastXid(subxid);
2293 :
2294 3564 : if (setParent)
2295 1642 : SubTransSetParent(subxid, xid);
2296 : }
2297 :
2298 234 : return buf;
2299 : }
2300 :
2301 :
2302 : /*
2303 : * RecordTransactionCommitPrepared
2304 : *
2305 : * This is basically the same as RecordTransactionCommit (q.v. if you change
2306 : * this function): in particular, we must set DELAY_CHKPT_START to avoid a
2307 : * race condition.
2308 : *
2309 : * We know the transaction made at least one XLOG entry (its PREPARE),
2310 : * so it is never possible to optimize out the commit record.
2311 : */
2312 : static void
2313 694 : RecordTransactionCommitPrepared(TransactionId xid,
2314 : int nchildren,
2315 : TransactionId *children,
2316 : int nrels,
2317 : RelFileLocator *rels,
2318 : int nstats,
2319 : xl_xact_stats_item *stats,
2320 : int ninvalmsgs,
2321 : SharedInvalidationMessage *invalmsgs,
2322 : bool initfileinval,
2323 : const char *gid)
2324 : {
2325 : XLogRecPtr recptr;
2326 694 : TimestampTz committs = GetCurrentTimestamp();
2327 : bool replorigin;
2328 :
2329 : /*
2330 : * Are we using the replication origins feature? Or, in other words, are
2331 : * we replaying remote actions?
2332 : */
2333 736 : replorigin = (replorigin_session_origin != InvalidRepOriginId &&
2334 42 : replorigin_session_origin != DoNotReplicateId);
2335 :
2336 694 : START_CRIT_SECTION();
2337 :
2338 : /* See notes in RecordTransactionCommit */
2339 : Assert((MyProc->delayChkptFlags & DELAY_CHKPT_START) == 0);
2340 694 : MyProc->delayChkptFlags |= DELAY_CHKPT_START;
2341 :
2342 : /*
2343 : * Emit the XLOG commit record. Note that we mark 2PC commits as
2344 : * potentially having AccessExclusiveLocks since we don't know whether or
2345 : * not they do.
2346 : */
2347 694 : recptr = XactLogCommitRecord(committs,
2348 : nchildren, children, nrels, rels,
2349 : nstats, stats,
2350 : ninvalmsgs, invalmsgs,
2351 : initfileinval,
2352 694 : MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
2353 : xid, gid);
2354 :
2355 :
2356 694 : if (replorigin)
2357 : /* Move LSNs forward for this replication origin */
2358 42 : replorigin_session_advance(replorigin_session_origin_lsn,
2359 : XactLastRecEnd);
2360 :
2361 : /*
2362 : * Record commit timestamp. The value comes from plain commit timestamp
2363 : * if replorigin is not enabled, or replorigin already set a value for us
2364 : * in replorigin_session_origin_timestamp otherwise.
2365 : *
2366 : * We don't need to WAL-log anything here, as the commit record written
2367 : * above already contains the data.
2368 : */
2369 694 : if (!replorigin || replorigin_session_origin_timestamp == 0)
2370 652 : replorigin_session_origin_timestamp = committs;
2371 :
2372 694 : TransactionTreeSetCommitTsData(xid, nchildren, children,
2373 : replorigin_session_origin_timestamp,
2374 : replorigin_session_origin);
2375 :
2376 : /*
2377 : * We don't currently try to sleep before flush here ... nor is there any
2378 : * support for async commit of a prepared xact (the very idea is probably
2379 : * a contradiction)
2380 : */
2381 :
2382 : /* Flush XLOG to disk */
2383 694 : XLogFlush(recptr);
2384 :
2385 : /* Mark the transaction committed in pg_xact */
2386 694 : TransactionIdCommitTree(xid, nchildren, children);
2387 :
2388 : /* Checkpoint can proceed now */
2389 694 : MyProc->delayChkptFlags &= ~DELAY_CHKPT_START;
2390 :
2391 694 : END_CRIT_SECTION();
2392 :
2393 : /*
2394 : * Wait for synchronous replication, if required.
2395 : *
2396 : * Note that at this stage we have marked clog, but still show as running
2397 : * in the procarray and continue to hold locks.
2398 : */
2399 694 : SyncRepWaitForLSN(recptr, true);
2400 694 : }
2401 :
2402 : /*
2403 : * RecordTransactionAbortPrepared
2404 : *
2405 : * This is basically the same as RecordTransactionAbort.
2406 : *
2407 : * We know the transaction made at least one XLOG entry (its PREPARE),
2408 : * so it is never possible to optimize out the abort record.
2409 : */
2410 : static void
2411 78 : RecordTransactionAbortPrepared(TransactionId xid,
2412 : int nchildren,
2413 : TransactionId *children,
2414 : int nrels,
2415 : RelFileLocator *rels,
2416 : int nstats,
2417 : xl_xact_stats_item *stats,
2418 : const char *gid)
2419 : {
2420 : XLogRecPtr recptr;
2421 : bool replorigin;
2422 :
2423 : /*
2424 : * Are we using the replication origins feature? Or, in other words, are
2425 : * we replaying remote actions?
2426 : */
2427 90 : replorigin = (replorigin_session_origin != InvalidRepOriginId &&
2428 12 : replorigin_session_origin != DoNotReplicateId);
2429 :
2430 : /*
2431 : * Catch the scenario where we aborted partway through
2432 : * RecordTransactionCommitPrepared ...
2433 : */
2434 78 : if (TransactionIdDidCommit(xid))
2435 0 : elog(PANIC, "cannot abort transaction %u, it was already committed",
2436 : xid);
2437 :
2438 78 : START_CRIT_SECTION();
2439 :
2440 : /*
2441 : * Emit the XLOG commit record. Note that we mark 2PC aborts as
2442 : * potentially having AccessExclusiveLocks since we don't know whether or
2443 : * not they do.
2444 : */
2445 78 : recptr = XactLogAbortRecord(GetCurrentTimestamp(),
2446 : nchildren, children,
2447 : nrels, rels,
2448 : nstats, stats,
2449 78 : MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
2450 : xid, gid);
2451 :
2452 78 : if (replorigin)
2453 : /* Move LSNs forward for this replication origin */
2454 12 : replorigin_session_advance(replorigin_session_origin_lsn,
2455 : XactLastRecEnd);
2456 :
2457 : /* Always flush, since we're about to remove the 2PC state file */
2458 78 : XLogFlush(recptr);
2459 :
2460 : /*
2461 : * Mark the transaction aborted in clog. This is not absolutely necessary
2462 : * but we may as well do it while we are here.
2463 : */
2464 78 : TransactionIdAbortTree(xid, nchildren, children);
2465 :
2466 78 : END_CRIT_SECTION();
2467 :
2468 : /*
2469 : * Wait for synchronous replication, if required.
2470 : *
2471 : * Note that at this stage we have marked clog, but still show as running
2472 : * in the procarray and continue to hold locks.
2473 : */
2474 78 : SyncRepWaitForLSN(recptr, false);
2475 78 : }
2476 :
2477 : /*
2478 : * PrepareRedoAdd
2479 : *
2480 : * Store pointers to the start/end of the WAL record along with the xid in
2481 : * a gxact entry in shared memory TwoPhaseState structure. If caller
2482 : * specifies InvalidXLogRecPtr as WAL location to fetch the two-phase
2483 : * data, the entry is marked as located on disk.
2484 : */
2485 : void
2486 180 : PrepareRedoAdd(char *buf, XLogRecPtr start_lsn,
2487 : XLogRecPtr end_lsn, RepOriginId origin_id)
2488 : {
2489 180 : TwoPhaseFileHeader *hdr = (TwoPhaseFileHeader *) buf;
2490 : char *bufptr;
2491 : const char *gid;
2492 : GlobalTransaction gxact;
2493 :
2494 : Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
2495 : Assert(RecoveryInProgress());
2496 :
2497 180 : bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
2498 180 : gid = (const char *) bufptr;
2499 :
2500 : /*
2501 : * Reserve the GID for the given transaction in the redo code path.
2502 : *
2503 : * This creates a gxact struct and puts it into the active array.
2504 : *
2505 : * In redo, this struct is mainly used to track PREPARE/COMMIT entries in
2506 : * shared memory. Hence, we only fill up the bare minimum contents here.
2507 : * The gxact also gets marked with gxact->inredo set to true to indicate
2508 : * that it got added in the redo phase
2509 : */
2510 :
2511 : /*
2512 : * In the event of a crash while a checkpoint was running, it may be
2513 : * possible that some two-phase data found its way to disk while its
2514 : * corresponding record needs to be replayed in the follow-up recovery. As
2515 : * the 2PC data was on disk, it has already been restored at the beginning
2516 : * of recovery with restoreTwoPhaseData(), so skip this record to avoid
2517 : * duplicates in TwoPhaseState. If a consistent state has been reached,
2518 : * the record is added to TwoPhaseState and it should have no
2519 : * corresponding file in pg_twophase.
2520 : */
2521 180 : if (!XLogRecPtrIsInvalid(start_lsn))
2522 : {
2523 : char path[MAXPGPATH];
2524 :
2525 150 : TwoPhaseFilePath(path, hdr->xid);
2526 :
2527 150 : if (access(path, F_OK) == 0)
2528 : {
2529 0 : ereport(reachedConsistency ? ERROR : WARNING,
2530 : (errmsg("could not recover two-phase state file for transaction %u",
2531 : hdr->xid),
2532 : errdetail("Two-phase state file has been found in WAL record %X/%X, but this transaction has already been restored from disk.",
2533 : LSN_FORMAT_ARGS(start_lsn))));
2534 0 : return;
2535 : }
2536 :
2537 150 : if (errno != ENOENT)
2538 0 : ereport(ERROR,
2539 : (errcode_for_file_access(),
2540 : errmsg("could not access file \"%s\": %m", path)));
2541 : }
2542 :
2543 : /* Get a free gxact from the freelist */
2544 180 : if (TwoPhaseState->freeGXacts == NULL)
2545 0 : ereport(ERROR,
2546 : (errcode(ERRCODE_OUT_OF_MEMORY),
2547 : errmsg("maximum number of prepared transactions reached"),
2548 : errhint("Increase \"max_prepared_transactions\" (currently %d).",
2549 : max_prepared_xacts)));
2550 180 : gxact = TwoPhaseState->freeGXacts;
2551 180 : TwoPhaseState->freeGXacts = gxact->next;
2552 :
2553 180 : gxact->prepared_at = hdr->prepared_at;
2554 180 : gxact->prepare_start_lsn = start_lsn;
2555 180 : gxact->prepare_end_lsn = end_lsn;
2556 180 : gxact->xid = hdr->xid;
2557 180 : gxact->owner = hdr->owner;
2558 180 : gxact->locking_backend = INVALID_PROC_NUMBER;
2559 180 : gxact->valid = false;
2560 180 : gxact->ondisk = XLogRecPtrIsInvalid(start_lsn);
2561 180 : gxact->inredo = true; /* yes, added in redo */
2562 180 : strcpy(gxact->gid, gid);
2563 :
2564 : /* And insert it into the active array */
2565 : Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
2566 180 : TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;
2567 :
2568 180 : if (origin_id != InvalidRepOriginId)
2569 : {
2570 : /* recover apply progress */
2571 26 : replorigin_advance(origin_id, hdr->origin_lsn, end_lsn,
2572 : false /* backward */ , false /* WAL */ );
2573 : }
2574 :
2575 180 : elog(DEBUG2, "added 2PC data in shared memory for transaction %u", gxact->xid);
2576 : }
2577 :
2578 : /*
2579 : * PrepareRedoRemove
2580 : *
2581 : * Remove the corresponding gxact entry from TwoPhaseState. Also remove
2582 : * the 2PC file if a prepared transaction was saved via an earlier checkpoint.
2583 : *
2584 : * Caller must hold TwoPhaseStateLock in exclusive mode, because TwoPhaseState
2585 : * is updated.
2586 : */
2587 : void
2588 130 : PrepareRedoRemove(TransactionId xid, bool giveWarning)
2589 : {
2590 130 : GlobalTransaction gxact = NULL;
2591 : int i;
2592 130 : bool found = false;
2593 :
2594 : Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
2595 : Assert(RecoveryInProgress());
2596 :
2597 130 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
2598 : {
2599 112 : gxact = TwoPhaseState->prepXacts[i];
2600 :
2601 112 : if (gxact->xid == xid)
2602 : {
2603 : Assert(gxact->inredo);
2604 112 : found = true;
2605 112 : break;
2606 : }
2607 : }
2608 :
2609 : /*
2610 : * Just leave if there is nothing, this is expected during WAL replay.
2611 : */
2612 130 : if (!found)
2613 18 : return;
2614 :
2615 : /*
2616 : * And now we can clean up any files we may have left.
2617 : */
2618 112 : elog(DEBUG2, "removing 2PC data for transaction %u", xid);
2619 112 : if (gxact->ondisk)
2620 8 : RemoveTwoPhaseFile(xid, giveWarning);
2621 112 : RemoveGXact(gxact);
2622 : }
2623 :
2624 : /*
2625 : * LookupGXact
2626 : * Check if the prepared transaction with the given GID, lsn and timestamp
2627 : * exists.
2628 : *
2629 : * Note that we always compare with the LSN where prepare ends because that is
2630 : * what is stored as origin_lsn in the 2PC file.
2631 : *
2632 : * This function is primarily used to check if the prepared transaction
2633 : * received from the upstream (remote node) already exists. Checking only GID
2634 : * is not sufficient because a different prepared xact with the same GID can
2635 : * exist on the same node. So, we are ensuring to match origin_lsn and
2636 : * origin_timestamp of prepared xact to avoid the possibility of a match of
2637 : * prepared xact from two different nodes.
2638 : */
2639 : bool
2640 10 : LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn,
2641 : TimestampTz origin_prepare_timestamp)
2642 : {
2643 : int i;
2644 10 : bool found = false;
2645 :
2646 10 : LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
2647 10 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
2648 : {
2649 10 : GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
2650 :
2651 : /* Ignore not-yet-valid GIDs. */
2652 10 : if (gxact->valid && strcmp(gxact->gid, gid) == 0)
2653 : {
2654 : char *buf;
2655 : TwoPhaseFileHeader *hdr;
2656 :
2657 : /*
2658 : * We are not expecting collisions of GXACTs (same gid) between
2659 : * publisher and subscribers, so we perform all I/O while holding
2660 : * TwoPhaseStateLock for simplicity.
2661 : *
2662 : * To move the I/O out of the lock, we need to ensure that no
2663 : * other backend commits the prepared xact in the meantime. We can
2664 : * do this optimization if we encounter many collisions in GID
2665 : * between publisher and subscriber.
2666 : */
2667 10 : if (gxact->ondisk)
2668 0 : buf = ReadTwoPhaseFile(gxact->xid, false);
2669 : else
2670 : {
2671 : Assert(gxact->prepare_start_lsn);
2672 10 : XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, NULL);
2673 : }
2674 :
2675 10 : hdr = (TwoPhaseFileHeader *) buf;
2676 :
2677 10 : if (hdr->origin_lsn == prepare_end_lsn &&
2678 10 : hdr->origin_timestamp == origin_prepare_timestamp)
2679 : {
2680 10 : found = true;
2681 10 : pfree(buf);
2682 10 : break;
2683 : }
2684 :
2685 0 : pfree(buf);
2686 : }
2687 : }
2688 10 : LWLockRelease(TwoPhaseStateLock);
2689 10 : return found;
2690 : }
2691 :
2692 : /*
2693 : * TwoPhaseTransactionGid
2694 : * Form the prepared transaction GID for two_phase transactions.
2695 : *
2696 : * Return the GID in the supplied buffer.
2697 : */
2698 : void
2699 96 : TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid_res, int szgid)
2700 : {
2701 : Assert(OidIsValid(subid));
2702 :
2703 96 : if (!TransactionIdIsValid(xid))
2704 0 : ereport(ERROR,
2705 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
2706 : errmsg_internal("invalid two-phase transaction ID")));
2707 :
2708 96 : snprintf(gid_res, szgid, "pg_gid_%u_%u", subid, xid);
2709 96 : }
2710 :
2711 : /*
2712 : * IsTwoPhaseTransactionGidForSubid
2713 : * Check whether the given GID (as formed by TwoPhaseTransactionGid) is
2714 : * for the specified 'subid'.
2715 : */
2716 : static bool
2717 0 : IsTwoPhaseTransactionGidForSubid(Oid subid, char *gid)
2718 : {
2719 : int ret;
2720 : Oid subid_from_gid;
2721 : TransactionId xid_from_gid;
2722 : char gid_tmp[GIDSIZE];
2723 :
2724 : /* Extract the subid and xid from the given GID */
2725 0 : ret = sscanf(gid, "pg_gid_%u_%u", &subid_from_gid, &xid_from_gid);
2726 :
2727 : /*
2728 : * Check that the given GID has expected format, and at least the subid
2729 : * matches.
2730 : */
2731 0 : if (ret != 2 || subid != subid_from_gid)
2732 0 : return false;
2733 :
2734 : /*
2735 : * Reconstruct a temporary GID based on the subid and xid extracted from
2736 : * the given GID and check whether the temporary GID and the given GID
2737 : * match.
2738 : */
2739 0 : TwoPhaseTransactionGid(subid, xid_from_gid, gid_tmp, sizeof(gid_tmp));
2740 :
2741 0 : return strcmp(gid, gid_tmp) == 0;
2742 : }
2743 :
2744 : /*
2745 : * LookupGXactBySubid
2746 : * Check if the prepared transaction done by apply worker exists.
2747 : */
2748 : bool
2749 2 : LookupGXactBySubid(Oid subid)
2750 : {
2751 2 : bool found = false;
2752 :
2753 2 : LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
2754 2 : for (int i = 0; i < TwoPhaseState->numPrepXacts; i++)
2755 : {
2756 0 : GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
2757 :
2758 : /* Ignore not-yet-valid GIDs. */
2759 0 : if (gxact->valid &&
2760 0 : IsTwoPhaseTransactionGidForSubid(subid, gxact->gid))
2761 : {
2762 0 : found = true;
2763 0 : break;
2764 : }
2765 : }
2766 2 : LWLockRelease(TwoPhaseStateLock);
2767 :
2768 2 : return found;
2769 : }
|