Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * twophase.c
4 : * Two-phase commit support functions.
5 : *
6 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/backend/access/transam/twophase.c
11 : *
12 : * NOTES
13 : * Each global transaction is associated with a global transaction
14 : * identifier (GID). The client assigns a GID to a postgres
15 : * transaction with the PREPARE TRANSACTION command.
16 : *
17 : * We keep all active global transactions in a shared memory array.
18 : * When the PREPARE TRANSACTION command is issued, the GID is
19 : * reserved for the transaction in the array. This is done before
20 : * a WAL entry is made, because the reservation checks for duplicate
21 : * GIDs and aborts the transaction if there already is a global
22 : * transaction in prepared state with the same GID.
23 : *
24 : * A global transaction (gxact) also has dummy PGPROC; this is what keeps
25 : * the XID considered running by TransactionIdIsInProgress. It is also
26 : * convenient as a PGPROC to hook the gxact's locks to.
27 : *
28 : * Information to recover prepared transactions in case of crash is
29 : * now stored in WAL for the common case. In some cases there will be
30 : * an extended period between preparing a GXACT and commit/abort, in
31 : * which case we need to separately record prepared transaction data
32 : * in permanent storage. This includes locking information, pending
33 : * notifications etc. All that state information is written to the
34 : * per-transaction state file in the pg_twophase directory.
35 : * All prepared transactions will be written prior to shutdown.
36 : *
37 : * Life track of state data is following:
38 : *
39 : * * On PREPARE TRANSACTION backend writes state data only to the WAL and
40 : * stores pointer to the start of the WAL record in
41 : * gxact->prepare_start_lsn.
42 : * * If COMMIT occurs before checkpoint then backend reads data from WAL
43 : * using prepare_start_lsn.
44 : * * On checkpoint state data copied to files in pg_twophase directory and
45 : * fsynced
46 : * * If COMMIT happens after checkpoint then backend reads state data from
47 : * files
48 : *
49 : * During replay and replication, TwoPhaseState also holds information
50 : * about active prepared transactions that haven't been moved to disk yet.
51 : *
52 : * Replay of twophase records happens by the following rules:
53 : *
54 : * * At the beginning of recovery, pg_twophase is scanned once, filling
55 : * TwoPhaseState with entries marked with gxact->inredo and
56 : * gxact->ondisk. Two-phase file data older than the XID horizon of
57 : * the redo position are discarded.
58 : * * On PREPARE redo, the transaction is added to TwoPhaseState->prepXacts.
59 : * gxact->inredo is set to true for such entries.
60 : * * On Checkpoint we iterate through TwoPhaseState->prepXacts entries
61 : * that have gxact->inredo set and are behind the redo_horizon. We
62 : * save them to disk and then switch gxact->ondisk to true.
63 : * * On COMMIT/ABORT we delete the entry from TwoPhaseState->prepXacts.
64 : * If gxact->ondisk is true, the corresponding entry from the disk
65 : * is additionally deleted.
66 : * * RecoverPreparedTransactions(), StandbyRecoverPreparedTransactions()
67 : * and PrescanPreparedTransactions() have been modified to go through
68 : * gxact->inredo entries that have not made it to disk.
69 : *
70 : *-------------------------------------------------------------------------
71 : */
72 : #include "postgres.h"
73 :
74 : #include <fcntl.h>
75 : #include <sys/stat.h>
76 : #include <time.h>
77 : #include <unistd.h>
78 :
79 : #include "access/commit_ts.h"
80 : #include "access/htup_details.h"
81 : #include "access/subtrans.h"
82 : #include "access/transam.h"
83 : #include "access/twophase.h"
84 : #include "access/twophase_rmgr.h"
85 : #include "access/xact.h"
86 : #include "access/xlog.h"
87 : #include "access/xloginsert.h"
88 : #include "access/xlogreader.h"
89 : #include "access/xlogrecovery.h"
90 : #include "access/xlogutils.h"
91 : #include "catalog/pg_type.h"
92 : #include "catalog/storage.h"
93 : #include "funcapi.h"
94 : #include "miscadmin.h"
95 : #include "pg_trace.h"
96 : #include "pgstat.h"
97 : #include "replication/origin.h"
98 : #include "replication/syncrep.h"
99 : #include "storage/fd.h"
100 : #include "storage/ipc.h"
101 : #include "storage/md.h"
102 : #include "storage/predicate.h"
103 : #include "storage/proc.h"
104 : #include "storage/procarray.h"
105 : #include "utils/builtins.h"
106 : #include "utils/memutils.h"
107 : #include "utils/timestamp.h"
108 :
109 : /*
110 : * Directory where Two-phase commit files reside within PGDATA
111 : */
112 : #define TWOPHASE_DIR "pg_twophase"
113 :
114 : /* GUC variable, can't be changed after startup */
115 : int max_prepared_xacts = 0;
116 :
117 : /*
118 : * This struct describes one global transaction that is in prepared state
119 : * or attempting to become prepared.
120 : *
121 : * The lifecycle of a global transaction is:
122 : *
123 : * 1. After checking that the requested GID is not in use, set up an entry in
124 : * the TwoPhaseState->prepXacts array with the correct GID and valid = false,
125 : * and mark it as locked by my backend.
126 : *
127 : * 2. After successfully completing prepare, set valid = true and enter the
128 : * referenced PGPROC into the global ProcArray.
129 : *
130 : * 3. To begin COMMIT PREPARED or ROLLBACK PREPARED, check that the entry is
131 : * valid and not locked, then mark the entry as locked by storing my current
132 : * proc number into locking_backend. This prevents concurrent attempts to
133 : * commit or rollback the same prepared xact.
134 : *
135 : * 4. On completion of COMMIT PREPARED or ROLLBACK PREPARED, remove the entry
136 : * from the ProcArray and the TwoPhaseState->prepXacts array and return it to
137 : * the freelist.
138 : *
139 : * Note that if the preparing transaction fails between steps 1 and 2, the
140 : * entry must be removed so that the GID and the GlobalTransaction struct
141 : * can be reused. See AtAbort_Twophase().
142 : *
143 : * typedef struct GlobalTransactionData *GlobalTransaction appears in
144 : * twophase.h
145 : */
146 :
147 : typedef struct GlobalTransactionData
148 : {
149 : GlobalTransaction next; /* list link for free list */
150 : int pgprocno; /* ID of associated dummy PGPROC */
151 : TimestampTz prepared_at; /* time of preparation */
152 :
153 : /*
154 : * Note that we need to keep track of two LSNs for each GXACT. We keep
155 : * track of the start LSN because this is the address we must use to read
156 : * state data back from WAL when committing a prepared GXACT. We keep
157 : * track of the end LSN because that is the LSN we need to wait for prior
158 : * to commit.
159 : */
160 : XLogRecPtr prepare_start_lsn; /* XLOG offset of prepare record start */
161 : XLogRecPtr prepare_end_lsn; /* XLOG offset of prepare record end */
162 : FullTransactionId fxid; /* The GXACT full xid */
163 :
164 : Oid owner; /* ID of user that executed the xact */
165 : ProcNumber locking_backend; /* backend currently working on the xact */
166 : bool valid; /* true if PGPROC entry is in proc array */
167 : bool ondisk; /* true if prepare state file is on disk */
168 : bool inredo; /* true if entry was added via xlog_redo */
169 : char gid[GIDSIZE]; /* The GID assigned to the prepared xact */
170 : } GlobalTransactionData;
171 :
172 : /*
173 : * Two Phase Commit shared state. Access to this struct is protected
174 : * by TwoPhaseStateLock.
175 : */
176 : typedef struct TwoPhaseStateData
177 : {
178 : /* Head of linked list of free GlobalTransactionData structs */
179 : GlobalTransaction freeGXacts;
180 :
181 : /* Number of valid prepXacts entries. */
182 : int numPrepXacts;
183 :
184 : /* There are max_prepared_xacts items in this array */
185 : GlobalTransaction prepXacts[FLEXIBLE_ARRAY_MEMBER];
186 : } TwoPhaseStateData;
187 :
188 : static TwoPhaseStateData *TwoPhaseState;
189 :
190 : /*
191 : * Global transaction entry currently locked by us, if any. Note that any
192 : * access to the entry pointed to by this variable must be protected by
193 : * TwoPhaseStateLock, though obviously the pointer itself doesn't need to be
194 : * (since it's just local memory).
195 : */
196 : static GlobalTransaction MyLockedGxact = NULL;
197 :
198 : static bool twophaseExitRegistered = false;
199 :
200 : static void PrepareRedoRemoveFull(FullTransactionId fxid, bool giveWarning);
201 : static void RecordTransactionCommitPrepared(TransactionId xid,
202 : int nchildren,
203 : TransactionId *children,
204 : int nrels,
205 : RelFileLocator *rels,
206 : int nstats,
207 : xl_xact_stats_item *stats,
208 : int ninvalmsgs,
209 : SharedInvalidationMessage *invalmsgs,
210 : bool initfileinval,
211 : const char *gid);
212 : static void RecordTransactionAbortPrepared(TransactionId xid,
213 : int nchildren,
214 : TransactionId *children,
215 : int nrels,
216 : RelFileLocator *rels,
217 : int nstats,
218 : xl_xact_stats_item *stats,
219 : const char *gid);
220 : static void ProcessRecords(char *bufptr, FullTransactionId fxid,
221 : const TwoPhaseCallback callbacks[]);
222 : static void RemoveGXact(GlobalTransaction gxact);
223 :
224 : static void XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len);
225 : static char *ProcessTwoPhaseBuffer(FullTransactionId fxid,
226 : XLogRecPtr prepare_start_lsn,
227 : bool fromdisk, bool setParent, bool setNextXid);
228 : static void MarkAsPreparingGuts(GlobalTransaction gxact, FullTransactionId fxid,
229 : const char *gid, TimestampTz prepared_at, Oid owner,
230 : Oid databaseid);
231 : static void RemoveTwoPhaseFile(FullTransactionId fxid, bool giveWarning);
232 : static void RecreateTwoPhaseFile(FullTransactionId fxid, void *content, int len);
233 :
234 : /*
235 : * Initialization of shared memory
236 : */
237 : Size
238 6150 : TwoPhaseShmemSize(void)
239 : {
240 : Size size;
241 :
242 : /* Need the fixed struct, the array of pointers, and the GTD structs */
243 6150 : size = offsetof(TwoPhaseStateData, prepXacts);
244 6150 : size = add_size(size, mul_size(max_prepared_xacts,
245 : sizeof(GlobalTransaction)));
246 6150 : size = MAXALIGN(size);
247 6150 : size = add_size(size, mul_size(max_prepared_xacts,
248 : sizeof(GlobalTransactionData)));
249 :
250 6150 : return size;
251 : }
252 :
253 : void
254 2152 : TwoPhaseShmemInit(void)
255 : {
256 : bool found;
257 :
258 2152 : TwoPhaseState = ShmemInitStruct("Prepared Transaction Table",
259 : TwoPhaseShmemSize(),
260 : &found);
261 2152 : if (!IsUnderPostmaster)
262 : {
263 : GlobalTransaction gxacts;
264 : int i;
265 :
266 : Assert(!found);
267 2152 : TwoPhaseState->freeGXacts = NULL;
268 2152 : TwoPhaseState->numPrepXacts = 0;
269 :
270 : /*
271 : * Initialize the linked list of free GlobalTransactionData structs
272 : */
273 2152 : gxacts = (GlobalTransaction)
274 2152 : ((char *) TwoPhaseState +
275 2152 : MAXALIGN(offsetof(TwoPhaseStateData, prepXacts) +
276 : sizeof(GlobalTransaction) * max_prepared_xacts));
277 3838 : for (i = 0; i < max_prepared_xacts; i++)
278 : {
279 : /* insert into linked list */
280 1686 : gxacts[i].next = TwoPhaseState->freeGXacts;
281 1686 : TwoPhaseState->freeGXacts = &gxacts[i];
282 :
283 : /* associate it with a PGPROC assigned by InitProcGlobal */
284 1686 : gxacts[i].pgprocno = GetNumberFromPGProc(&PreparedXactProcs[i]);
285 : }
286 : }
287 : else
288 : Assert(found);
289 2152 : }
290 :
291 : /*
292 : * Exit hook to unlock the global transaction entry we're working on.
293 : */
294 : static void
295 268 : AtProcExit_Twophase(int code, Datum arg)
296 : {
297 : /* same logic as abort */
298 268 : AtAbort_Twophase();
299 268 : }
300 :
301 : /*
302 : * Abort hook to unlock the global transaction entry we're working on.
303 : */
304 : void
305 49594 : AtAbort_Twophase(void)
306 : {
307 49594 : if (MyLockedGxact == NULL)
308 49590 : return;
309 :
310 : /*
311 : * What to do with the locked global transaction entry? If we were in the
312 : * process of preparing the transaction, but haven't written the WAL
313 : * record and state file yet, the transaction must not be considered as
314 : * prepared. Likewise, if we are in the process of finishing an
315 : * already-prepared transaction, and fail after having already written the
316 : * 2nd phase commit or rollback record to the WAL, the transaction should
317 : * not be considered as prepared anymore. In those cases, just remove the
318 : * entry from shared memory.
319 : *
320 : * Otherwise, the entry must be left in place so that the transaction can
321 : * be finished later, so just unlock it.
322 : *
323 : * If we abort during prepare, after having written the WAL record, we
324 : * might not have transferred all locks and other state to the prepared
325 : * transaction yet. Likewise, if we abort during commit or rollback,
326 : * after having written the WAL record, we might not have released all the
327 : * resources held by the transaction yet. In those cases, the in-memory
328 : * state can be wrong, but it's too late to back out.
329 : */
330 4 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
331 4 : if (!MyLockedGxact->valid)
332 4 : RemoveGXact(MyLockedGxact);
333 : else
334 0 : MyLockedGxact->locking_backend = INVALID_PROC_NUMBER;
335 4 : LWLockRelease(TwoPhaseStateLock);
336 :
337 4 : MyLockedGxact = NULL;
338 : }
339 :
340 : /*
341 : * This is called after we have finished transferring state to the prepared
342 : * PGPROC entry.
343 : */
344 : void
345 636 : PostPrepare_Twophase(void)
346 : {
347 636 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
348 636 : MyLockedGxact->locking_backend = INVALID_PROC_NUMBER;
349 636 : LWLockRelease(TwoPhaseStateLock);
350 :
351 636 : MyLockedGxact = NULL;
352 636 : }
353 :
354 :
355 : /*
356 : * MarkAsPreparing
357 : * Reserve the GID for the given transaction.
358 : */
359 : GlobalTransaction
360 598 : MarkAsPreparing(FullTransactionId fxid, const char *gid,
361 : TimestampTz prepared_at, Oid owner, Oid databaseid)
362 : {
363 : GlobalTransaction gxact;
364 : int i;
365 :
366 598 : if (strlen(gid) >= GIDSIZE)
367 0 : ereport(ERROR,
368 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
369 : errmsg("transaction identifier \"%s\" is too long",
370 : gid)));
371 :
372 : /* fail immediately if feature is disabled */
373 598 : if (max_prepared_xacts == 0)
374 20 : ereport(ERROR,
375 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
376 : errmsg("prepared transactions are disabled"),
377 : errhint("Set \"max_prepared_transactions\" to a nonzero value.")));
378 :
379 : /* on first call, register the exit hook */
380 578 : if (!twophaseExitRegistered)
381 : {
382 148 : before_shmem_exit(AtProcExit_Twophase, 0);
383 148 : twophaseExitRegistered = true;
384 : }
385 :
386 578 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
387 :
388 : /* Check for conflicting GID */
389 1034 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
390 : {
391 460 : gxact = TwoPhaseState->prepXacts[i];
392 460 : if (strcmp(gxact->gid, gid) == 0)
393 : {
394 4 : ereport(ERROR,
395 : (errcode(ERRCODE_DUPLICATE_OBJECT),
396 : errmsg("transaction identifier \"%s\" is already in use",
397 : gid)));
398 : }
399 : }
400 :
401 : /* Get a free gxact from the freelist */
402 574 : if (TwoPhaseState->freeGXacts == NULL)
403 0 : ereport(ERROR,
404 : (errcode(ERRCODE_OUT_OF_MEMORY),
405 : errmsg("maximum number of prepared transactions reached"),
406 : errhint("Increase \"max_prepared_transactions\" (currently %d).",
407 : max_prepared_xacts)));
408 574 : gxact = TwoPhaseState->freeGXacts;
409 574 : TwoPhaseState->freeGXacts = gxact->next;
410 :
411 574 : MarkAsPreparingGuts(gxact, fxid, gid, prepared_at, owner, databaseid);
412 :
413 574 : gxact->ondisk = false;
414 :
415 : /* And insert it into the active array */
416 : Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
417 574 : TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;
418 :
419 574 : LWLockRelease(TwoPhaseStateLock);
420 :
421 574 : return gxact;
422 : }
423 :
424 : /*
425 : * MarkAsPreparingGuts
426 : *
427 : * This uses a gxact struct and puts it into the active array.
428 : * NOTE: this is also used when reloading a gxact after a crash; so avoid
429 : * assuming that we can use very much backend context.
430 : *
431 : * Note: This function should be called with appropriate locks held.
432 : */
433 : static void
434 640 : MarkAsPreparingGuts(GlobalTransaction gxact, FullTransactionId fxid,
435 : const char *gid, TimestampTz prepared_at, Oid owner,
436 : Oid databaseid)
437 : {
438 : PGPROC *proc;
439 : int i;
440 640 : TransactionId xid = XidFromFullTransactionId(fxid);
441 :
442 : Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
443 :
444 : Assert(gxact != NULL);
445 640 : proc = GetPGProcByNumber(gxact->pgprocno);
446 :
447 : /* Initialize the PGPROC entry */
448 67200 : MemSet(proc, 0, sizeof(PGPROC));
449 640 : dlist_node_init(&proc->links);
450 640 : proc->waitStatus = PROC_WAIT_STATUS_OK;
451 640 : if (LocalTransactionIdIsValid(MyProc->vxid.lxid))
452 : {
453 : /* clone VXID, for TwoPhaseGetXidByVirtualXID() to find */
454 574 : proc->vxid.lxid = MyProc->vxid.lxid;
455 574 : proc->vxid.procNumber = MyProcNumber;
456 : }
457 : else
458 : {
459 : Assert(AmStartupProcess() || !IsPostmasterEnvironment);
460 : /* GetLockConflicts() uses this to specify a wait on the XID */
461 66 : proc->vxid.lxid = xid;
462 66 : proc->vxid.procNumber = INVALID_PROC_NUMBER;
463 : }
464 640 : proc->xid = xid;
465 : Assert(proc->xmin == InvalidTransactionId);
466 640 : proc->delayChkptFlags = 0;
467 640 : proc->statusFlags = 0;
468 640 : proc->pid = 0;
469 640 : proc->databaseId = databaseid;
470 640 : proc->roleId = owner;
471 640 : proc->tempNamespaceId = InvalidOid;
472 640 : proc->isRegularBackend = false;
473 640 : proc->lwWaiting = LW_WS_NOT_WAITING;
474 640 : proc->lwWaitMode = 0;
475 640 : proc->waitLock = NULL;
476 640 : proc->waitProcLock = NULL;
477 640 : pg_atomic_init_u64(&proc->waitStart, 0);
478 10880 : for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
479 10240 : dlist_init(&proc->myProcLocks[i]);
480 : /* subxid data must be filled later by GXactLoadSubxactData */
481 640 : proc->subxidStatus.overflowed = false;
482 640 : proc->subxidStatus.count = 0;
483 :
484 640 : gxact->prepared_at = prepared_at;
485 640 : gxact->fxid = fxid;
486 640 : gxact->owner = owner;
487 640 : gxact->locking_backend = MyProcNumber;
488 640 : gxact->valid = false;
489 640 : gxact->inredo = false;
490 640 : strcpy(gxact->gid, gid);
491 :
492 : /*
493 : * Remember that we have this GlobalTransaction entry locked for us. If we
494 : * abort after this, we must release it.
495 : */
496 640 : MyLockedGxact = gxact;
497 640 : }
498 :
499 : /*
500 : * GXactLoadSubxactData
501 : *
502 : * If the transaction being persisted had any subtransactions, this must
503 : * be called before MarkAsPrepared() to load information into the dummy
504 : * PGPROC.
505 : */
506 : static void
507 268 : GXactLoadSubxactData(GlobalTransaction gxact, int nsubxacts,
508 : TransactionId *children)
509 : {
510 268 : PGPROC *proc = GetPGProcByNumber(gxact->pgprocno);
511 :
512 : /* We need no extra lock since the GXACT isn't valid yet */
513 268 : if (nsubxacts > PGPROC_MAX_CACHED_SUBXIDS)
514 : {
515 8 : proc->subxidStatus.overflowed = true;
516 8 : nsubxacts = PGPROC_MAX_CACHED_SUBXIDS;
517 : }
518 268 : if (nsubxacts > 0)
519 : {
520 234 : memcpy(proc->subxids.xids, children,
521 : nsubxacts * sizeof(TransactionId));
522 234 : proc->subxidStatus.count = nsubxacts;
523 : }
524 268 : }
525 :
526 : /*
527 : * MarkAsPrepared
528 : * Mark the GXACT as fully valid, and enter it into the global ProcArray.
529 : *
530 : * lock_held indicates whether caller already holds TwoPhaseStateLock.
531 : */
532 : static void
533 636 : MarkAsPrepared(GlobalTransaction gxact, bool lock_held)
534 : {
535 : /* Lock here may be overkill, but I'm not convinced of that ... */
536 636 : if (!lock_held)
537 570 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
538 : Assert(!gxact->valid);
539 636 : gxact->valid = true;
540 636 : if (!lock_held)
541 570 : LWLockRelease(TwoPhaseStateLock);
542 :
543 : /*
544 : * Put it into the global ProcArray so TransactionIdIsInProgress considers
545 : * the XID as still running.
546 : */
547 636 : ProcArrayAdd(GetPGProcByNumber(gxact->pgprocno));
548 636 : }
549 :
550 : /*
551 : * LockGXact
552 : * Locate the prepared transaction and mark it busy for COMMIT or PREPARE.
553 : */
554 : static GlobalTransaction
555 590 : LockGXact(const char *gid, Oid user)
556 : {
557 : int i;
558 :
559 : /* on first call, register the exit hook */
560 590 : if (!twophaseExitRegistered)
561 : {
562 120 : before_shmem_exit(AtProcExit_Twophase, 0);
563 120 : twophaseExitRegistered = true;
564 : }
565 :
566 590 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
567 :
568 980 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
569 : {
570 968 : GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
571 968 : PGPROC *proc = GetPGProcByNumber(gxact->pgprocno);
572 :
573 : /* Ignore not-yet-valid GIDs */
574 968 : if (!gxact->valid)
575 6 : continue;
576 962 : if (strcmp(gxact->gid, gid) != 0)
577 384 : continue;
578 :
579 : /* Found it, but has someone else got it locked? */
580 578 : if (gxact->locking_backend != INVALID_PROC_NUMBER)
581 0 : ereport(ERROR,
582 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
583 : errmsg("prepared transaction with identifier \"%s\" is busy",
584 : gid)));
585 :
586 578 : if (user != gxact->owner && !superuser_arg(user))
587 0 : ereport(ERROR,
588 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
589 : errmsg("permission denied to finish prepared transaction"),
590 : errhint("Must be superuser or the user that prepared the transaction.")));
591 :
592 : /*
593 : * Note: it probably would be possible to allow committing from
594 : * another database; but at the moment NOTIFY is known not to work and
595 : * there may be some other issues as well. Hence disallow until
596 : * someone gets motivated to make it work.
597 : */
598 578 : if (MyDatabaseId != proc->databaseId)
599 0 : ereport(ERROR,
600 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
601 : errmsg("prepared transaction belongs to another database"),
602 : errhint("Connect to the database where the transaction was prepared to finish it.")));
603 :
604 : /* OK for me to lock it */
605 578 : gxact->locking_backend = MyProcNumber;
606 578 : MyLockedGxact = gxact;
607 :
608 578 : LWLockRelease(TwoPhaseStateLock);
609 :
610 578 : return gxact;
611 : }
612 :
613 12 : LWLockRelease(TwoPhaseStateLock);
614 :
615 12 : ereport(ERROR,
616 : (errcode(ERRCODE_UNDEFINED_OBJECT),
617 : errmsg("prepared transaction with identifier \"%s\" does not exist",
618 : gid)));
619 :
620 : /* NOTREACHED */
621 : return NULL;
622 : }
623 :
624 : /*
625 : * RemoveGXact
626 : * Remove the prepared transaction from the shared memory array.
627 : *
628 : * NB: caller should have already removed it from ProcArray
629 : */
630 : static void
631 694 : RemoveGXact(GlobalTransaction gxact)
632 : {
633 : int i;
634 :
635 : Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
636 :
637 1082 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
638 : {
639 1082 : if (gxact == TwoPhaseState->prepXacts[i])
640 : {
641 : /* remove from the active array */
642 694 : TwoPhaseState->numPrepXacts--;
643 694 : TwoPhaseState->prepXacts[i] = TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts];
644 :
645 : /* and put it back in the freelist */
646 694 : gxact->next = TwoPhaseState->freeGXacts;
647 694 : TwoPhaseState->freeGXacts = gxact;
648 :
649 694 : return;
650 : }
651 : }
652 :
653 0 : elog(ERROR, "failed to find %p in GlobalTransaction array", gxact);
654 : }
655 :
656 : /*
657 : * Returns an array of all prepared transactions for the user-level
658 : * function pg_prepared_xact.
659 : *
660 : * The returned array and all its elements are copies of internal data
661 : * structures, to minimize the time we need to hold the TwoPhaseStateLock.
662 : *
663 : * WARNING -- we return even those transactions that are not fully prepared
664 : * yet. The caller should filter them out if he doesn't want them.
665 : *
666 : * The returned array is palloc'd.
667 : */
668 : static int
669 230 : GetPreparedTransactionList(GlobalTransaction *gxacts)
670 : {
671 : GlobalTransaction array;
672 : int num;
673 : int i;
674 :
675 230 : LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
676 :
677 230 : if (TwoPhaseState->numPrepXacts == 0)
678 : {
679 148 : LWLockRelease(TwoPhaseStateLock);
680 :
681 148 : *gxacts = NULL;
682 148 : return 0;
683 : }
684 :
685 82 : num = TwoPhaseState->numPrepXacts;
686 82 : array = (GlobalTransaction) palloc(sizeof(GlobalTransactionData) * num);
687 82 : *gxacts = array;
688 174 : for (i = 0; i < num; i++)
689 92 : memcpy(array + i, TwoPhaseState->prepXacts[i],
690 : sizeof(GlobalTransactionData));
691 :
692 82 : LWLockRelease(TwoPhaseStateLock);
693 :
694 82 : return num;
695 : }
696 :
697 :
698 : /* Working status for pg_prepared_xact */
699 : typedef struct
700 : {
701 : GlobalTransaction array;
702 : int ngxacts;
703 : int currIdx;
704 : } Working_State;
705 :
706 : /*
707 : * pg_prepared_xact
708 : * Produce a view with one row per prepared transaction.
709 : *
710 : * This function is here so we don't have to export the
711 : * GlobalTransactionData struct definition.
712 : */
713 : Datum
714 322 : pg_prepared_xact(PG_FUNCTION_ARGS)
715 : {
716 : FuncCallContext *funcctx;
717 : Working_State *status;
718 :
719 322 : if (SRF_IS_FIRSTCALL())
720 : {
721 : TupleDesc tupdesc;
722 : MemoryContext oldcontext;
723 :
724 : /* create a function context for cross-call persistence */
725 230 : funcctx = SRF_FIRSTCALL_INIT();
726 :
727 : /*
728 : * Switch to memory context appropriate for multiple function calls
729 : */
730 230 : oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
731 :
732 : /* build tupdesc for result tuples */
733 : /* this had better match pg_prepared_xacts view in system_views.sql */
734 230 : tupdesc = CreateTemplateTupleDesc(5);
735 230 : TupleDescInitEntry(tupdesc, (AttrNumber) 1, "transaction",
736 : XIDOID, -1, 0);
737 230 : TupleDescInitEntry(tupdesc, (AttrNumber) 2, "gid",
738 : TEXTOID, -1, 0);
739 230 : TupleDescInitEntry(tupdesc, (AttrNumber) 3, "prepared",
740 : TIMESTAMPTZOID, -1, 0);
741 230 : TupleDescInitEntry(tupdesc, (AttrNumber) 4, "ownerid",
742 : OIDOID, -1, 0);
743 230 : TupleDescInitEntry(tupdesc, (AttrNumber) 5, "dbid",
744 : OIDOID, -1, 0);
745 :
746 230 : funcctx->tuple_desc = BlessTupleDesc(tupdesc);
747 :
748 : /*
749 : * Collect all the 2PC status information that we will format and send
750 : * out as a result set.
751 : */
752 230 : status = (Working_State *) palloc(sizeof(Working_State));
753 230 : funcctx->user_fctx = status;
754 :
755 230 : status->ngxacts = GetPreparedTransactionList(&status->array);
756 230 : status->currIdx = 0;
757 :
758 230 : MemoryContextSwitchTo(oldcontext);
759 : }
760 :
761 322 : funcctx = SRF_PERCALL_SETUP();
762 322 : status = (Working_State *) funcctx->user_fctx;
763 :
764 322 : while (status->array != NULL && status->currIdx < status->ngxacts)
765 : {
766 92 : GlobalTransaction gxact = &status->array[status->currIdx++];
767 92 : PGPROC *proc = GetPGProcByNumber(gxact->pgprocno);
768 92 : Datum values[5] = {0};
769 92 : bool nulls[5] = {0};
770 : HeapTuple tuple;
771 : Datum result;
772 :
773 92 : if (!gxact->valid)
774 0 : continue;
775 :
776 : /*
777 : * Form tuple with appropriate data.
778 : */
779 :
780 92 : values[0] = TransactionIdGetDatum(proc->xid);
781 92 : values[1] = CStringGetTextDatum(gxact->gid);
782 92 : values[2] = TimestampTzGetDatum(gxact->prepared_at);
783 92 : values[3] = ObjectIdGetDatum(gxact->owner);
784 92 : values[4] = ObjectIdGetDatum(proc->databaseId);
785 :
786 92 : tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
787 92 : result = HeapTupleGetDatum(tuple);
788 92 : SRF_RETURN_NEXT(funcctx, result);
789 : }
790 :
791 230 : SRF_RETURN_DONE(funcctx);
792 : }
793 :
794 : /*
795 : * TwoPhaseGetGXact
796 : * Get the GlobalTransaction struct for a prepared transaction
797 : * specified by XID
798 : *
799 : * If lock_held is set to true, TwoPhaseStateLock will not be taken, so the
800 : * caller had better hold it.
801 : */
802 : static GlobalTransaction
803 2376 : TwoPhaseGetGXact(FullTransactionId fxid, bool lock_held)
804 : {
805 2376 : GlobalTransaction result = NULL;
806 : int i;
807 :
808 : static FullTransactionId cached_fxid = {InvalidTransactionId};
809 : static GlobalTransaction cached_gxact = NULL;
810 :
811 : Assert(!lock_held || LWLockHeldByMe(TwoPhaseStateLock));
812 :
813 : /*
814 : * During a recovery, COMMIT PREPARED, or ABORT PREPARED, we'll be called
815 : * repeatedly for the same XID. We can save work with a simple cache.
816 : */
817 2376 : if (FullTransactionIdEquals(fxid, cached_fxid))
818 1602 : return cached_gxact;
819 :
820 774 : if (!lock_held)
821 636 : LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
822 :
823 1242 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
824 : {
825 1242 : GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
826 :
827 1242 : if (FullTransactionIdEquals(gxact->fxid, fxid))
828 : {
829 774 : result = gxact;
830 774 : break;
831 : }
832 : }
833 :
834 774 : if (!lock_held)
835 636 : LWLockRelease(TwoPhaseStateLock);
836 :
837 774 : if (result == NULL) /* should not happen */
838 0 : elog(ERROR, "failed to find GlobalTransaction for xid %u",
839 : XidFromFullTransactionId(fxid));
840 :
841 774 : cached_fxid = fxid;
842 774 : cached_gxact = result;
843 :
844 774 : return result;
845 : }
846 :
847 : /*
848 : * TwoPhaseGetXidByVirtualXID
849 : * Lookup VXID among xacts prepared since last startup.
850 : *
851 : * (This won't find recovered xacts.) If more than one matches, return any
852 : * and set "have_more" to true. To witness multiple matches, a single
853 : * proc number must consume 2^32 LXIDs, with no intervening database restart.
854 : */
855 : TransactionId
856 178 : TwoPhaseGetXidByVirtualXID(VirtualTransactionId vxid,
857 : bool *have_more)
858 : {
859 : int i;
860 178 : TransactionId result = InvalidTransactionId;
861 :
862 : Assert(VirtualTransactionIdIsValid(vxid));
863 178 : LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
864 :
865 286 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
866 : {
867 108 : GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
868 : PGPROC *proc;
869 : VirtualTransactionId proc_vxid;
870 :
871 108 : if (!gxact->valid)
872 0 : continue;
873 108 : proc = GetPGProcByNumber(gxact->pgprocno);
874 108 : GET_VXID_FROM_PGPROC(proc_vxid, *proc);
875 108 : if (VirtualTransactionIdEquals(vxid, proc_vxid))
876 : {
877 : /*
878 : * Startup process sets proc->vxid.procNumber to
879 : * INVALID_PROC_NUMBER.
880 : */
881 : Assert(!gxact->inredo);
882 :
883 34 : if (result != InvalidTransactionId)
884 : {
885 0 : *have_more = true;
886 0 : break;
887 : }
888 34 : result = XidFromFullTransactionId(gxact->fxid);
889 : }
890 : }
891 :
892 178 : LWLockRelease(TwoPhaseStateLock);
893 :
894 178 : return result;
895 : }
896 :
897 : /*
898 : * TwoPhaseGetDummyProcNumber
899 : * Get the dummy proc number for prepared transaction
900 : *
901 : * Dummy proc numbers are similar to proc numbers of real backends. They
902 : * start at MaxBackends, and are unique across all currently active real
903 : * backends and prepared transactions. If lock_held is set to true,
904 : * TwoPhaseStateLock will not be taken, so the caller had better hold it.
905 : */
906 : ProcNumber
907 220 : TwoPhaseGetDummyProcNumber(FullTransactionId fxid, bool lock_held)
908 : {
909 220 : GlobalTransaction gxact = TwoPhaseGetGXact(fxid, lock_held);
910 :
911 220 : return gxact->pgprocno;
912 : }
913 :
914 : /*
915 : * TwoPhaseGetDummyProc
916 : * Get the PGPROC that represents a prepared transaction
917 : *
918 : * If lock_held is set to true, TwoPhaseStateLock will not be taken, so the
919 : * caller had better hold it.
920 : */
921 : PGPROC *
922 2156 : TwoPhaseGetDummyProc(FullTransactionId fxid, bool lock_held)
923 : {
924 2156 : GlobalTransaction gxact = TwoPhaseGetGXact(fxid, lock_held);
925 :
926 2156 : return GetPGProcByNumber(gxact->pgprocno);
927 : }
928 :
929 : /************************************************************************/
930 : /* State file support */
931 : /************************************************************************/
932 :
933 : /*
934 : * Compute the FullTransactionId for the given TransactionId.
935 : *
936 : * This is safe if the xid has not yet reached COMMIT PREPARED or ROLLBACK
937 : * PREPARED. After those commands, concurrent vac_truncate_clog() may make
938 : * the xid cease to qualify as allowable. XXX Not all callers limit their
939 : * calls accordingly.
940 : */
941 : static inline FullTransactionId
942 646 : AdjustToFullTransactionId(TransactionId xid)
943 : {
944 : Assert(TransactionIdIsValid(xid));
945 646 : return FullTransactionIdFromAllowableAt(ReadNextFullTransactionId(), xid);
946 : }
947 :
948 : static inline int
949 1052 : TwoPhaseFilePath(char *path, FullTransactionId fxid)
950 : {
951 2104 : return snprintf(path, MAXPGPATH, TWOPHASE_DIR "/%08X%08X",
952 1052 : EpochFromFullTransactionId(fxid),
953 1052 : XidFromFullTransactionId(fxid));
954 : }
955 :
956 : /*
957 : * 2PC state file format:
958 : *
959 : * 1. TwoPhaseFileHeader
960 : * 2. TransactionId[] (subtransactions)
961 : * 3. RelFileLocator[] (files to be deleted at commit)
962 : * 4. RelFileLocator[] (files to be deleted at abort)
963 : * 5. SharedInvalidationMessage[] (inval messages to be sent at commit)
964 : * 6. TwoPhaseRecordOnDisk
965 : * 7. ...
966 : * 8. TwoPhaseRecordOnDisk (end sentinel, rmid == TWOPHASE_RM_END_ID)
967 : * 9. checksum (CRC-32C)
968 : *
969 : * Each segment except the final checksum is MAXALIGN'd.
970 : */
971 :
972 : /*
973 : * Header for a 2PC state file
974 : */
975 : #define TWOPHASE_MAGIC 0x57F94534 /* format identifier */
976 :
977 : typedef xl_xact_prepare TwoPhaseFileHeader;
978 :
979 : /*
980 : * Header for each record in a state file
981 : *
982 : * NOTE: len counts only the rmgr data, not the TwoPhaseRecordOnDisk header.
983 : * The rmgr data will be stored starting on a MAXALIGN boundary.
984 : */
985 : typedef struct TwoPhaseRecordOnDisk
986 : {
987 : uint32 len; /* length of rmgr data */
988 : TwoPhaseRmgrId rmid; /* resource manager for this record */
989 : uint16 info; /* flag bits for use by rmgr */
990 : } TwoPhaseRecordOnDisk;
991 :
992 : /*
993 : * During prepare, the state file is assembled in memory before writing it
994 : * to WAL and the actual state file. We use a chain of StateFileChunk blocks
995 : * for that.
996 : */
997 : typedef struct StateFileChunk
998 : {
999 : char *data;
1000 : uint32 len;
1001 : struct StateFileChunk *next;
1002 : } StateFileChunk;
1003 :
1004 : static struct xllist
1005 : {
1006 : StateFileChunk *head; /* first data block in the chain */
1007 : StateFileChunk *tail; /* last block in chain */
1008 : uint32 num_chunks;
1009 : uint32 bytes_free; /* free bytes left in tail block */
1010 : uint32 total_len; /* total data bytes in chain */
1011 : } records;
1012 :
1013 :
1014 : /*
1015 : * Append a block of data to records data structure.
1016 : *
1017 : * NB: each block is padded to a MAXALIGN multiple. This must be
1018 : * accounted for when the file is later read!
1019 : *
1020 : * The data is copied, so the caller is free to modify it afterwards.
1021 : */
1022 : static void
1023 6490 : save_state_data(const void *data, uint32 len)
1024 : {
1025 6490 : uint32 padlen = MAXALIGN(len);
1026 :
1027 6490 : if (padlen > records.bytes_free)
1028 : {
1029 86 : records.tail->next = palloc0(sizeof(StateFileChunk));
1030 86 : records.tail = records.tail->next;
1031 86 : records.tail->len = 0;
1032 86 : records.tail->next = NULL;
1033 86 : records.num_chunks++;
1034 :
1035 86 : records.bytes_free = Max(padlen, 512);
1036 86 : records.tail->data = palloc(records.bytes_free);
1037 : }
1038 :
1039 6490 : memcpy(((char *) records.tail->data) + records.tail->len, data, len);
1040 6490 : records.tail->len += padlen;
1041 6490 : records.bytes_free -= padlen;
1042 6490 : records.total_len += padlen;
1043 6490 : }
1044 :
1045 : /*
1046 : * Start preparing a state file.
1047 : *
1048 : * Initializes data structure and inserts the 2PC file header record.
1049 : */
1050 : void
1051 574 : StartPrepare(GlobalTransaction gxact)
1052 : {
1053 574 : PGPROC *proc = GetPGProcByNumber(gxact->pgprocno);
1054 574 : TransactionId xid = XidFromFullTransactionId(gxact->fxid);
1055 : TwoPhaseFileHeader hdr;
1056 : TransactionId *children;
1057 : RelFileLocator *commitrels;
1058 : RelFileLocator *abortrels;
1059 574 : xl_xact_stats_item *abortstats = NULL;
1060 574 : xl_xact_stats_item *commitstats = NULL;
1061 : SharedInvalidationMessage *invalmsgs;
1062 :
1063 : /* Initialize linked list */
1064 574 : records.head = palloc0(sizeof(StateFileChunk));
1065 574 : records.head->len = 0;
1066 574 : records.head->next = NULL;
1067 :
1068 574 : records.bytes_free = Max(sizeof(TwoPhaseFileHeader), 512);
1069 574 : records.head->data = palloc(records.bytes_free);
1070 :
1071 574 : records.tail = records.head;
1072 574 : records.num_chunks = 1;
1073 :
1074 574 : records.total_len = 0;
1075 :
1076 : /* Create header */
1077 574 : hdr.magic = TWOPHASE_MAGIC;
1078 574 : hdr.total_len = 0; /* EndPrepare will fill this in */
1079 574 : hdr.xid = xid;
1080 574 : hdr.database = proc->databaseId;
1081 574 : hdr.prepared_at = gxact->prepared_at;
1082 574 : hdr.owner = gxact->owner;
1083 574 : hdr.nsubxacts = xactGetCommittedChildren(&children);
1084 574 : hdr.ncommitrels = smgrGetPendingDeletes(true, &commitrels);
1085 574 : hdr.nabortrels = smgrGetPendingDeletes(false, &abortrels);
1086 574 : hdr.ncommitstats =
1087 574 : pgstat_get_transactional_drops(true, &commitstats);
1088 574 : hdr.nabortstats =
1089 574 : pgstat_get_transactional_drops(false, &abortstats);
1090 574 : hdr.ninvalmsgs = xactGetCommittedInvalidationMessages(&invalmsgs,
1091 : &hdr.initfileinval);
1092 574 : hdr.gidlen = strlen(gxact->gid) + 1; /* Include '\0' */
1093 : /* EndPrepare will fill the origin data, if necessary */
1094 574 : hdr.origin_lsn = InvalidXLogRecPtr;
1095 574 : hdr.origin_timestamp = 0;
1096 :
1097 574 : save_state_data(&hdr, sizeof(TwoPhaseFileHeader));
1098 574 : save_state_data(gxact->gid, hdr.gidlen);
1099 :
1100 : /*
1101 : * Add the additional info about subxacts, deletable files and cache
1102 : * invalidation messages.
1103 : */
1104 574 : if (hdr.nsubxacts > 0)
1105 : {
1106 202 : save_state_data(children, hdr.nsubxacts * sizeof(TransactionId));
1107 : /* While we have the child-xact data, stuff it in the gxact too */
1108 202 : GXactLoadSubxactData(gxact, hdr.nsubxacts, children);
1109 : }
1110 574 : if (hdr.ncommitrels > 0)
1111 : {
1112 18 : save_state_data(commitrels, hdr.ncommitrels * sizeof(RelFileLocator));
1113 18 : pfree(commitrels);
1114 : }
1115 574 : if (hdr.nabortrels > 0)
1116 : {
1117 34 : save_state_data(abortrels, hdr.nabortrels * sizeof(RelFileLocator));
1118 34 : pfree(abortrels);
1119 : }
1120 574 : if (hdr.ncommitstats > 0)
1121 : {
1122 18 : save_state_data(commitstats,
1123 18 : hdr.ncommitstats * sizeof(xl_xact_stats_item));
1124 18 : pfree(commitstats);
1125 : }
1126 574 : if (hdr.nabortstats > 0)
1127 : {
1128 26 : save_state_data(abortstats,
1129 26 : hdr.nabortstats * sizeof(xl_xact_stats_item));
1130 26 : pfree(abortstats);
1131 : }
1132 574 : if (hdr.ninvalmsgs > 0)
1133 : {
1134 46 : save_state_data(invalmsgs,
1135 46 : hdr.ninvalmsgs * sizeof(SharedInvalidationMessage));
1136 46 : pfree(invalmsgs);
1137 : }
1138 574 : }
1139 :
1140 : /*
1141 : * Finish preparing state data and writing it to WAL.
1142 : */
1143 : void
1144 570 : EndPrepare(GlobalTransaction gxact)
1145 : {
1146 : TwoPhaseFileHeader *hdr;
1147 : StateFileChunk *record;
1148 : bool replorigin;
1149 :
1150 : /* Add the end sentinel to the list of 2PC records */
1151 570 : RegisterTwoPhaseRecord(TWOPHASE_RM_END_ID, 0,
1152 : NULL, 0);
1153 :
1154 : /* Go back and fill in total_len in the file header record */
1155 570 : hdr = (TwoPhaseFileHeader *) records.head->data;
1156 : Assert(hdr->magic == TWOPHASE_MAGIC);
1157 570 : hdr->total_len = records.total_len + sizeof(pg_crc32c);
1158 :
1159 618 : replorigin = (replorigin_session_origin != InvalidRepOriginId &&
1160 48 : replorigin_session_origin != DoNotReplicateId);
1161 :
1162 570 : if (replorigin)
1163 : {
1164 48 : hdr->origin_lsn = replorigin_session_origin_lsn;
1165 48 : hdr->origin_timestamp = replorigin_session_origin_timestamp;
1166 : }
1167 :
1168 : /*
1169 : * If the data size exceeds MaxAllocSize, we won't be able to read it in
1170 : * ReadTwoPhaseFile. Check for that now, rather than fail in the case
1171 : * where we write data to file and then re-read at commit time.
1172 : */
1173 570 : if (hdr->total_len > MaxAllocSize)
1174 0 : ereport(ERROR,
1175 : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
1176 : errmsg("two-phase state file maximum length exceeded")));
1177 :
1178 : /*
1179 : * Now writing 2PC state data to WAL. We let the WAL's CRC protection
1180 : * cover us, so no need to calculate a separate CRC.
1181 : *
1182 : * We have to set DELAY_CHKPT_START here, too; otherwise a checkpoint
1183 : * starting immediately after the WAL record is inserted could complete
1184 : * without fsync'ing our state file. (This is essentially the same kind
1185 : * of race condition as the COMMIT-to-clog-write case that
1186 : * RecordTransactionCommit uses DELAY_CHKPT_IN_COMMIT for; see notes
1187 : * there.) Note that DELAY_CHKPT_IN_COMMIT is used to find transactions in
1188 : * the critical commit section. We need to know about such transactions
1189 : * for conflict detection in logical replication. See
1190 : * GetOldestActiveTransactionId(true, false) and its use.
1191 : *
1192 : * We save the PREPARE record's location in the gxact for later use by
1193 : * CheckPointTwoPhase.
1194 : */
1195 570 : XLogEnsureRecordSpace(0, records.num_chunks);
1196 :
1197 570 : START_CRIT_SECTION();
1198 :
1199 : Assert((MyProc->delayChkptFlags & DELAY_CHKPT_START) == 0);
1200 570 : MyProc->delayChkptFlags |= DELAY_CHKPT_START;
1201 :
1202 570 : XLogBeginInsert();
1203 1226 : for (record = records.head; record != NULL; record = record->next)
1204 656 : XLogRegisterData(record->data, record->len);
1205 :
1206 570 : XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
1207 :
1208 570 : gxact->prepare_end_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE);
1209 :
1210 570 : if (replorigin)
1211 : {
1212 : /* Move LSNs forward for this replication origin */
1213 48 : replorigin_session_advance(replorigin_session_origin_lsn,
1214 : gxact->prepare_end_lsn);
1215 : }
1216 :
1217 570 : XLogFlush(gxact->prepare_end_lsn);
1218 :
1219 : /* If we crash now, we have prepared: WAL replay will fix things */
1220 :
1221 : /* Store record's start location to read that later on Commit */
1222 570 : gxact->prepare_start_lsn = ProcLastRecPtr;
1223 :
1224 : /*
1225 : * Mark the prepared transaction as valid. As soon as xact.c marks MyProc
1226 : * as not running our XID (which it will do immediately after this
1227 : * function returns), others can commit/rollback the xact.
1228 : *
1229 : * NB: a side effect of this is to make a dummy ProcArray entry for the
1230 : * prepared XID. This must happen before we clear the XID from MyProc /
1231 : * ProcGlobal->xids[], else there is a window where the XID is not running
1232 : * according to TransactionIdIsInProgress, and onlookers would be entitled
1233 : * to assume the xact crashed. Instead we have a window where the same
1234 : * XID appears twice in ProcArray, which is OK.
1235 : */
1236 570 : MarkAsPrepared(gxact, false);
1237 :
1238 : /*
1239 : * Now we can mark ourselves as out of the commit critical section: a
1240 : * checkpoint starting after this will certainly see the gxact as a
1241 : * candidate for fsyncing.
1242 : */
1243 570 : MyProc->delayChkptFlags &= ~DELAY_CHKPT_START;
1244 :
1245 : /*
1246 : * Remember that we have this GlobalTransaction entry locked for us. If
1247 : * we crash after this point, it's too late to abort, but we must unlock
1248 : * it so that the prepared transaction can be committed or rolled back.
1249 : */
1250 570 : MyLockedGxact = gxact;
1251 :
1252 570 : END_CRIT_SECTION();
1253 :
1254 : /*
1255 : * Wait for synchronous replication, if required.
1256 : *
1257 : * Note that at this stage we have marked the prepare, but still show as
1258 : * running in the procarray (twice!) and continue to hold locks.
1259 : */
1260 570 : SyncRepWaitForLSN(gxact->prepare_end_lsn, false);
1261 :
1262 570 : records.tail = records.head = NULL;
1263 570 : records.num_chunks = 0;
1264 570 : }
1265 :
1266 : /*
1267 : * Register a 2PC record to be written to state file.
1268 : */
1269 : void
1270 2784 : RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, uint16 info,
1271 : const void *data, uint32 len)
1272 : {
1273 : TwoPhaseRecordOnDisk record;
1274 :
1275 2784 : record.rmid = rmid;
1276 2784 : record.info = info;
1277 2784 : record.len = len;
1278 2784 : save_state_data(&record, sizeof(TwoPhaseRecordOnDisk));
1279 2784 : if (len > 0)
1280 2214 : save_state_data(data, len);
1281 2784 : }
1282 :
1283 :
1284 : /*
1285 : * Read and validate the state file for xid.
1286 : *
1287 : * If it looks OK (has a valid magic number and CRC), return the palloc'd
1288 : * contents of the file, issuing an error when finding corrupted data. If
1289 : * missing_ok is true, which indicates that missing files can be safely
1290 : * ignored, then return NULL. This state can be reached when doing recovery
1291 : * after discarding two-phase files from frozen epochs.
1292 : */
1293 : static char *
1294 788 : ReadTwoPhaseFile(FullTransactionId fxid, bool missing_ok)
1295 : {
1296 : char path[MAXPGPATH];
1297 : char *buf;
1298 : TwoPhaseFileHeader *hdr;
1299 : int fd;
1300 : struct stat stat;
1301 : uint32 crc_offset;
1302 : pg_crc32c calc_crc,
1303 : file_crc;
1304 : int r;
1305 :
1306 788 : TwoPhaseFilePath(path, fxid);
1307 :
1308 788 : fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
1309 788 : if (fd < 0)
1310 : {
1311 646 : if (missing_ok && errno == ENOENT)
1312 646 : return NULL;
1313 :
1314 0 : ereport(ERROR,
1315 : (errcode_for_file_access(),
1316 : errmsg("could not open file \"%s\": %m", path)));
1317 : }
1318 :
1319 : /*
1320 : * Check file length. We can determine a lower bound pretty easily. We
1321 : * set an upper bound to avoid palloc() failure on a corrupt file, though
1322 : * we can't guarantee that we won't get an out of memory error anyway,
1323 : * even on a valid file.
1324 : */
1325 142 : if (fstat(fd, &stat))
1326 0 : ereport(ERROR,
1327 : (errcode_for_file_access(),
1328 : errmsg("could not stat file \"%s\": %m", path)));
1329 :
1330 142 : if (stat.st_size < (MAXALIGN(sizeof(TwoPhaseFileHeader)) +
1331 : MAXALIGN(sizeof(TwoPhaseRecordOnDisk)) +
1332 142 : sizeof(pg_crc32c)) ||
1333 142 : stat.st_size > MaxAllocSize)
1334 0 : ereport(ERROR,
1335 : (errcode(ERRCODE_DATA_CORRUPTED),
1336 : errmsg_plural("incorrect size of file \"%s\": %lld byte",
1337 : "incorrect size of file \"%s\": %lld bytes",
1338 : (long long int) stat.st_size, path,
1339 : (long long int) stat.st_size)));
1340 :
1341 142 : crc_offset = stat.st_size - sizeof(pg_crc32c);
1342 142 : if (crc_offset != MAXALIGN(crc_offset))
1343 0 : ereport(ERROR,
1344 : (errcode(ERRCODE_DATA_CORRUPTED),
1345 : errmsg("incorrect alignment of CRC offset for file \"%s\"",
1346 : path)));
1347 :
1348 : /*
1349 : * OK, slurp in the file.
1350 : */
1351 142 : buf = (char *) palloc(stat.st_size);
1352 :
1353 142 : pgstat_report_wait_start(WAIT_EVENT_TWOPHASE_FILE_READ);
1354 142 : r = read(fd, buf, stat.st_size);
1355 142 : if (r != stat.st_size)
1356 : {
1357 0 : if (r < 0)
1358 0 : ereport(ERROR,
1359 : (errcode_for_file_access(),
1360 : errmsg("could not read file \"%s\": %m", path)));
1361 : else
1362 0 : ereport(ERROR,
1363 : (errmsg("could not read file \"%s\": read %d of %lld",
1364 : path, r, (long long int) stat.st_size)));
1365 : }
1366 :
1367 142 : pgstat_report_wait_end();
1368 :
1369 142 : if (CloseTransientFile(fd) != 0)
1370 0 : ereport(ERROR,
1371 : (errcode_for_file_access(),
1372 : errmsg("could not close file \"%s\": %m", path)));
1373 :
1374 142 : hdr = (TwoPhaseFileHeader *) buf;
1375 142 : if (hdr->magic != TWOPHASE_MAGIC)
1376 0 : ereport(ERROR,
1377 : (errcode(ERRCODE_DATA_CORRUPTED),
1378 : errmsg("invalid magic number stored in file \"%s\"",
1379 : path)));
1380 :
1381 142 : if (hdr->total_len != stat.st_size)
1382 0 : ereport(ERROR,
1383 : (errcode(ERRCODE_DATA_CORRUPTED),
1384 : errmsg("invalid size stored in file \"%s\"",
1385 : path)));
1386 :
1387 142 : INIT_CRC32C(calc_crc);
1388 142 : COMP_CRC32C(calc_crc, buf, crc_offset);
1389 142 : FIN_CRC32C(calc_crc);
1390 :
1391 142 : file_crc = *((pg_crc32c *) (buf + crc_offset));
1392 :
1393 142 : if (!EQ_CRC32C(calc_crc, file_crc))
1394 0 : ereport(ERROR,
1395 : (errcode(ERRCODE_DATA_CORRUPTED),
1396 : errmsg("calculated CRC checksum does not match value stored in file \"%s\"",
1397 : path)));
1398 :
1399 142 : return buf;
1400 : }
1401 :
1402 :
1403 : /*
1404 : * Reads 2PC data from xlog. During checkpoint this data will be moved to
1405 : * twophase files and ReadTwoPhaseFile should be used instead.
1406 : *
1407 : * Note clearly that this function can access WAL during normal operation,
1408 : * similarly to the way WALSender or Logical Decoding would do.
1409 : */
1410 : static void
1411 738 : XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
1412 : {
1413 : XLogRecord *record;
1414 : XLogReaderState *xlogreader;
1415 : char *errormsg;
1416 :
1417 738 : xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
1418 738 : XL_ROUTINE(.page_read = &read_local_xlog_page,
1419 : .segment_open = &wal_segment_open,
1420 : .segment_close = &wal_segment_close),
1421 : NULL);
1422 738 : if (!xlogreader)
1423 0 : ereport(ERROR,
1424 : (errcode(ERRCODE_OUT_OF_MEMORY),
1425 : errmsg("out of memory"),
1426 : errdetail("Failed while allocating a WAL reading processor.")));
1427 :
1428 738 : XLogBeginRead(xlogreader, lsn);
1429 738 : record = XLogReadRecord(xlogreader, &errormsg);
1430 :
1431 738 : if (record == NULL)
1432 : {
1433 0 : if (errormsg)
1434 0 : ereport(ERROR,
1435 : (errcode_for_file_access(),
1436 : errmsg("could not read two-phase state from WAL at %X/%08X: %s",
1437 : LSN_FORMAT_ARGS(lsn), errormsg)));
1438 : else
1439 0 : ereport(ERROR,
1440 : (errcode_for_file_access(),
1441 : errmsg("could not read two-phase state from WAL at %X/%08X",
1442 : LSN_FORMAT_ARGS(lsn))));
1443 : }
1444 :
1445 738 : if (XLogRecGetRmid(xlogreader) != RM_XACT_ID ||
1446 738 : (XLogRecGetInfo(xlogreader) & XLOG_XACT_OPMASK) != XLOG_XACT_PREPARE)
1447 0 : ereport(ERROR,
1448 : (errcode_for_file_access(),
1449 : errmsg("expected two-phase state data is not present in WAL at %X/%08X",
1450 : LSN_FORMAT_ARGS(lsn))));
1451 :
1452 738 : if (len != NULL)
1453 54 : *len = XLogRecGetDataLen(xlogreader);
1454 :
1455 738 : *buf = palloc(sizeof(char) * XLogRecGetDataLen(xlogreader));
1456 738 : memcpy(*buf, XLogRecGetData(xlogreader), sizeof(char) * XLogRecGetDataLen(xlogreader));
1457 :
1458 738 : XLogReaderFree(xlogreader);
1459 738 : }
1460 :
1461 :
1462 : /*
1463 : * Confirms an xid is prepared, during recovery
1464 : */
1465 : bool
1466 646 : StandbyTransactionIdIsPrepared(TransactionId xid)
1467 : {
1468 : char *buf;
1469 : TwoPhaseFileHeader *hdr;
1470 : bool result;
1471 : FullTransactionId fxid;
1472 :
1473 : Assert(TransactionIdIsValid(xid));
1474 :
1475 646 : if (max_prepared_xacts <= 0)
1476 0 : return false; /* nothing to do */
1477 :
1478 : /* Read and validate file */
1479 646 : fxid = AdjustToFullTransactionId(xid);
1480 646 : buf = ReadTwoPhaseFile(fxid, true);
1481 646 : if (buf == NULL)
1482 646 : return false;
1483 :
1484 : /* Check header also */
1485 0 : hdr = (TwoPhaseFileHeader *) buf;
1486 0 : result = TransactionIdEquals(hdr->xid, xid);
1487 0 : pfree(buf);
1488 :
1489 0 : return result;
1490 : }
1491 :
1492 : /*
1493 : * FinishPreparedTransaction: execute COMMIT PREPARED or ROLLBACK PREPARED
1494 : */
1495 : void
1496 590 : FinishPreparedTransaction(const char *gid, bool isCommit)
1497 : {
1498 : GlobalTransaction gxact;
1499 : PGPROC *proc;
1500 : FullTransactionId fxid;
1501 : TransactionId xid;
1502 : bool ondisk;
1503 : char *buf;
1504 : char *bufptr;
1505 : TwoPhaseFileHeader *hdr;
1506 : TransactionId latestXid;
1507 : TransactionId *children;
1508 : RelFileLocator *commitrels;
1509 : RelFileLocator *abortrels;
1510 : RelFileLocator *delrels;
1511 : int ndelrels;
1512 : xl_xact_stats_item *commitstats;
1513 : xl_xact_stats_item *abortstats;
1514 : SharedInvalidationMessage *invalmsgs;
1515 :
1516 : /*
1517 : * Validate the GID, and lock the GXACT to ensure that two backends do not
1518 : * try to commit the same GID at once.
1519 : */
1520 590 : gxact = LockGXact(gid, GetUserId());
1521 578 : proc = GetPGProcByNumber(gxact->pgprocno);
1522 578 : fxid = gxact->fxid;
1523 578 : xid = XidFromFullTransactionId(fxid);
1524 :
1525 : /*
1526 : * Read and validate 2PC state data. State data will typically be stored
1527 : * in WAL files if the LSN is after the last checkpoint record, or moved
1528 : * to disk if for some reason they have lived for a long time.
1529 : */
1530 578 : if (gxact->ondisk)
1531 48 : buf = ReadTwoPhaseFile(fxid, false);
1532 : else
1533 530 : XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, NULL);
1534 :
1535 :
1536 : /*
1537 : * Disassemble the header area
1538 : */
1539 578 : hdr = (TwoPhaseFileHeader *) buf;
1540 : Assert(TransactionIdEquals(hdr->xid, xid));
1541 578 : bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
1542 578 : bufptr += MAXALIGN(hdr->gidlen);
1543 578 : children = (TransactionId *) bufptr;
1544 578 : bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
1545 578 : commitrels = (RelFileLocator *) bufptr;
1546 578 : bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileLocator));
1547 578 : abortrels = (RelFileLocator *) bufptr;
1548 578 : bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileLocator));
1549 578 : commitstats = (xl_xact_stats_item *) bufptr;
1550 578 : bufptr += MAXALIGN(hdr->ncommitstats * sizeof(xl_xact_stats_item));
1551 578 : abortstats = (xl_xact_stats_item *) bufptr;
1552 578 : bufptr += MAXALIGN(hdr->nabortstats * sizeof(xl_xact_stats_item));
1553 578 : invalmsgs = (SharedInvalidationMessage *) bufptr;
1554 578 : bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
1555 :
1556 : /* compute latestXid among all children */
1557 578 : latestXid = TransactionIdLatest(xid, hdr->nsubxacts, children);
1558 :
1559 : /* Prevent cancel/die interrupt while cleaning up */
1560 578 : HOLD_INTERRUPTS();
1561 :
1562 : /*
1563 : * The order of operations here is critical: make the XLOG entry for
1564 : * commit or abort, then mark the transaction committed or aborted in
1565 : * pg_xact, then remove its PGPROC from the global ProcArray (which means
1566 : * TransactionIdIsInProgress will stop saying the prepared xact is in
1567 : * progress), then run the post-commit or post-abort callbacks. The
1568 : * callbacks will release the locks the transaction held.
1569 : */
1570 578 : if (isCommit)
1571 502 : RecordTransactionCommitPrepared(xid,
1572 : hdr->nsubxacts, children,
1573 : hdr->ncommitrels, commitrels,
1574 : hdr->ncommitstats,
1575 : commitstats,
1576 : hdr->ninvalmsgs, invalmsgs,
1577 502 : hdr->initfileinval, gid);
1578 : else
1579 76 : RecordTransactionAbortPrepared(xid,
1580 : hdr->nsubxacts, children,
1581 : hdr->nabortrels, abortrels,
1582 : hdr->nabortstats,
1583 : abortstats,
1584 : gid);
1585 :
1586 578 : ProcArrayRemove(proc, latestXid);
1587 :
1588 : /*
1589 : * In case we fail while running the callbacks, mark the gxact invalid so
1590 : * no one else will try to commit/rollback, and so it will be recycled if
1591 : * we fail after this point. It is still locked by our backend so it
1592 : * won't go away yet.
1593 : *
1594 : * (We assume it's safe to do this without taking TwoPhaseStateLock.)
1595 : */
1596 578 : gxact->valid = false;
1597 :
1598 : /*
1599 : * We have to remove any files that were supposed to be dropped. For
1600 : * consistency with the regular xact.c code paths, must do this before
1601 : * releasing locks, so do it before running the callbacks.
1602 : *
1603 : * NB: this code knows that we couldn't be dropping any temp rels ...
1604 : */
1605 578 : if (isCommit)
1606 : {
1607 502 : delrels = commitrels;
1608 502 : ndelrels = hdr->ncommitrels;
1609 : }
1610 : else
1611 : {
1612 76 : delrels = abortrels;
1613 76 : ndelrels = hdr->nabortrels;
1614 : }
1615 :
1616 : /* Make sure files supposed to be dropped are dropped */
1617 578 : DropRelationFiles(delrels, ndelrels, false);
1618 :
1619 578 : if (isCommit)
1620 502 : pgstat_execute_transactional_drops(hdr->ncommitstats, commitstats, false);
1621 : else
1622 76 : pgstat_execute_transactional_drops(hdr->nabortstats, abortstats, false);
1623 :
1624 : /*
1625 : * Handle cache invalidation messages.
1626 : *
1627 : * Relcache init file invalidation requires processing both before and
1628 : * after we send the SI messages, only when committing. See
1629 : * AtEOXact_Inval().
1630 : */
1631 578 : if (isCommit)
1632 : {
1633 502 : if (hdr->initfileinval)
1634 0 : RelationCacheInitFilePreInvalidate();
1635 502 : SendSharedInvalidMessages(invalmsgs, hdr->ninvalmsgs);
1636 502 : if (hdr->initfileinval)
1637 0 : RelationCacheInitFilePostInvalidate();
1638 : }
1639 :
1640 : /*
1641 : * Acquire the two-phase lock. We want to work on the two-phase callbacks
1642 : * while holding it to avoid potential conflicts with other transactions
1643 : * attempting to use the same GID, so the lock is released once the shared
1644 : * memory state is cleared.
1645 : */
1646 578 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
1647 :
1648 : /* And now do the callbacks */
1649 578 : if (isCommit)
1650 502 : ProcessRecords(bufptr, fxid, twophase_postcommit_callbacks);
1651 : else
1652 76 : ProcessRecords(bufptr, fxid, twophase_postabort_callbacks);
1653 :
1654 578 : PredicateLockTwoPhaseFinish(fxid, isCommit);
1655 :
1656 : /*
1657 : * Read this value while holding the two-phase lock, as the on-disk 2PC
1658 : * file is physically removed after the lock is released.
1659 : */
1660 578 : ondisk = gxact->ondisk;
1661 :
1662 : /* Clear shared memory state */
1663 578 : RemoveGXact(gxact);
1664 :
1665 : /*
1666 : * Release the lock as all callbacks are called and shared memory cleanup
1667 : * is done.
1668 : */
1669 578 : LWLockRelease(TwoPhaseStateLock);
1670 :
1671 : /* Count the prepared xact as committed or aborted */
1672 578 : AtEOXact_PgStat(isCommit, false);
1673 :
1674 : /*
1675 : * And now we can clean up any files we may have left.
1676 : */
1677 578 : if (ondisk)
1678 50 : RemoveTwoPhaseFile(fxid, true);
1679 :
1680 578 : MyLockedGxact = NULL;
1681 :
1682 578 : RESUME_INTERRUPTS();
1683 :
1684 578 : pfree(buf);
1685 578 : }
1686 :
1687 : /*
1688 : * Scan 2PC state data in memory and call the indicated callbacks for each 2PC record.
1689 : */
1690 : static void
1691 644 : ProcessRecords(char *bufptr, FullTransactionId fxid,
1692 : const TwoPhaseCallback callbacks[])
1693 : {
1694 : for (;;)
1695 2588 : {
1696 3232 : TwoPhaseRecordOnDisk *record = (TwoPhaseRecordOnDisk *) bufptr;
1697 :
1698 : Assert(record->rmid <= TWOPHASE_RM_MAX_ID);
1699 3232 : if (record->rmid == TWOPHASE_RM_END_ID)
1700 644 : break;
1701 :
1702 2588 : bufptr += MAXALIGN(sizeof(TwoPhaseRecordOnDisk));
1703 :
1704 2588 : if (callbacks[record->rmid] != NULL)
1705 2430 : callbacks[record->rmid] (fxid, record->info, bufptr, record->len);
1706 :
1707 2588 : bufptr += MAXALIGN(record->len);
1708 : }
1709 644 : }
1710 :
1711 : /*
1712 : * Remove the 2PC file.
1713 : *
1714 : * If giveWarning is false, do not complain about file-not-present;
1715 : * this is an expected case during WAL replay.
1716 : *
1717 : * This routine is used at early stages at recovery where future and
1718 : * past orphaned files are checked, hence the FullTransactionId to build
1719 : * a complete file name fit for the removal.
1720 : */
1721 : static void
1722 58 : RemoveTwoPhaseFile(FullTransactionId fxid, bool giveWarning)
1723 : {
1724 : char path[MAXPGPATH];
1725 :
1726 58 : TwoPhaseFilePath(path, fxid);
1727 58 : if (unlink(path))
1728 0 : if (errno != ENOENT || giveWarning)
1729 0 : ereport(WARNING,
1730 : (errcode_for_file_access(),
1731 : errmsg("could not remove file \"%s\": %m", path)));
1732 58 : }
1733 :
1734 : /*
1735 : * Recreates a state file. This is used in WAL replay and during
1736 : * checkpoint creation.
1737 : *
1738 : * Note: content and len don't include CRC.
1739 : */
1740 : static void
1741 54 : RecreateTwoPhaseFile(FullTransactionId fxid, void *content, int len)
1742 : {
1743 : char path[MAXPGPATH];
1744 : pg_crc32c statefile_crc;
1745 : int fd;
1746 :
1747 : /* Recompute CRC */
1748 54 : INIT_CRC32C(statefile_crc);
1749 54 : COMP_CRC32C(statefile_crc, content, len);
1750 54 : FIN_CRC32C(statefile_crc);
1751 :
1752 54 : TwoPhaseFilePath(path, fxid);
1753 :
1754 54 : fd = OpenTransientFile(path,
1755 : O_CREAT | O_TRUNC | O_WRONLY | PG_BINARY);
1756 54 : if (fd < 0)
1757 0 : ereport(ERROR,
1758 : (errcode_for_file_access(),
1759 : errmsg("could not recreate file \"%s\": %m", path)));
1760 :
1761 : /* Write content and CRC */
1762 54 : errno = 0;
1763 54 : pgstat_report_wait_start(WAIT_EVENT_TWOPHASE_FILE_WRITE);
1764 54 : if (write(fd, content, len) != len)
1765 : {
1766 : /* if write didn't set errno, assume problem is no disk space */
1767 0 : if (errno == 0)
1768 0 : errno = ENOSPC;
1769 0 : ereport(ERROR,
1770 : (errcode_for_file_access(),
1771 : errmsg("could not write file \"%s\": %m", path)));
1772 : }
1773 54 : if (write(fd, &statefile_crc, sizeof(pg_crc32c)) != sizeof(pg_crc32c))
1774 : {
1775 : /* if write didn't set errno, assume problem is no disk space */
1776 0 : if (errno == 0)
1777 0 : errno = ENOSPC;
1778 0 : ereport(ERROR,
1779 : (errcode_for_file_access(),
1780 : errmsg("could not write file \"%s\": %m", path)));
1781 : }
1782 54 : pgstat_report_wait_end();
1783 :
1784 : /*
1785 : * We must fsync the file because the end-of-replay checkpoint will not do
1786 : * so, there being no GXACT in shared memory yet to tell it to.
1787 : */
1788 54 : pgstat_report_wait_start(WAIT_EVENT_TWOPHASE_FILE_SYNC);
1789 54 : if (pg_fsync(fd) != 0)
1790 0 : ereport(ERROR,
1791 : (errcode_for_file_access(),
1792 : errmsg("could not fsync file \"%s\": %m", path)));
1793 54 : pgstat_report_wait_end();
1794 :
1795 54 : if (CloseTransientFile(fd) != 0)
1796 0 : ereport(ERROR,
1797 : (errcode_for_file_access(),
1798 : errmsg("could not close file \"%s\": %m", path)));
1799 54 : }
1800 :
1801 : /*
1802 : * CheckPointTwoPhase -- handle 2PC component of checkpointing.
1803 : *
1804 : * We must fsync the state file of any GXACT that is valid or has been
1805 : * generated during redo and has a PREPARE LSN <= the checkpoint's redo
1806 : * horizon. (If the gxact isn't valid yet, has not been generated in
1807 : * redo, or has a later LSN, this checkpoint is not responsible for
1808 : * fsyncing it.)
1809 : *
1810 : * This is deliberately run as late as possible in the checkpoint sequence,
1811 : * because GXACTs ordinarily have short lifespans, and so it is quite
1812 : * possible that GXACTs that were valid at checkpoint start will no longer
1813 : * exist if we wait a little bit. With typical checkpoint settings this
1814 : * will be about 3 minutes for an online checkpoint, so as a result we
1815 : * expect that there will be no GXACTs that need to be copied to disk.
1816 : *
1817 : * If a GXACT remains valid across multiple checkpoints, it will already
1818 : * be on disk so we don't bother to repeat that write.
1819 : */
1820 : void
1821 3380 : CheckPointTwoPhase(XLogRecPtr redo_horizon)
1822 : {
1823 : int i;
1824 3380 : int serialized_xacts = 0;
1825 :
1826 3380 : if (max_prepared_xacts <= 0)
1827 2360 : return; /* nothing to do */
1828 :
1829 : TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_START();
1830 :
1831 : /*
1832 : * We are expecting there to be zero GXACTs that need to be copied to
1833 : * disk, so we perform all I/O while holding TwoPhaseStateLock for
1834 : * simplicity. This prevents any new xacts from preparing while this
1835 : * occurs, which shouldn't be a problem since the presence of long-lived
1836 : * prepared xacts indicates the transaction manager isn't active.
1837 : *
1838 : * It's also possible to move I/O out of the lock, but on every error we
1839 : * should check whether somebody committed our transaction in different
1840 : * backend. Let's leave this optimization for future, if somebody will
1841 : * spot that this place cause bottleneck.
1842 : *
1843 : * Note that it isn't possible for there to be a GXACT with a
1844 : * prepare_end_lsn set prior to the last checkpoint yet is marked invalid,
1845 : * because of the efforts with delayChkptFlags.
1846 : */
1847 1020 : LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
1848 1086 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
1849 : {
1850 : /*
1851 : * Note that we are using gxact not PGPROC so this works in recovery
1852 : * also
1853 : */
1854 66 : GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
1855 :
1856 66 : if ((gxact->valid || gxact->inredo) &&
1857 66 : !gxact->ondisk &&
1858 58 : gxact->prepare_end_lsn <= redo_horizon)
1859 : {
1860 : char *buf;
1861 : int len;
1862 :
1863 54 : XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, &len);
1864 54 : RecreateTwoPhaseFile(gxact->fxid, buf, len);
1865 54 : gxact->ondisk = true;
1866 54 : gxact->prepare_start_lsn = InvalidXLogRecPtr;
1867 54 : gxact->prepare_end_lsn = InvalidXLogRecPtr;
1868 54 : pfree(buf);
1869 54 : serialized_xacts++;
1870 : }
1871 : }
1872 1020 : LWLockRelease(TwoPhaseStateLock);
1873 :
1874 : /*
1875 : * Flush unconditionally the parent directory to make any information
1876 : * durable on disk. Two-phase files could have been removed and those
1877 : * removals need to be made persistent as well as any files newly created
1878 : * previously since the last checkpoint.
1879 : */
1880 1020 : fsync_fname(TWOPHASE_DIR, true);
1881 :
1882 : TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_DONE();
1883 :
1884 1020 : if (log_checkpoints && serialized_xacts > 0)
1885 46 : ereport(LOG,
1886 : (errmsg_plural("%u two-phase state file was written "
1887 : "for a long-running prepared transaction",
1888 : "%u two-phase state files were written "
1889 : "for long-running prepared transactions",
1890 : serialized_xacts,
1891 : serialized_xacts)));
1892 : }
1893 :
1894 : /*
1895 : * restoreTwoPhaseData
1896 : *
1897 : * Scan pg_twophase and fill TwoPhaseState depending on the on-disk data.
1898 : * This is called once at the beginning of recovery, saving any extra
1899 : * lookups in the future. Two-phase files that are newer than the
1900 : * minimum XID horizon are discarded on the way.
1901 : */
1902 : void
1903 1862 : restoreTwoPhaseData(void)
1904 : {
1905 : DIR *cldir;
1906 : struct dirent *clde;
1907 :
1908 1862 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
1909 1862 : cldir = AllocateDir(TWOPHASE_DIR);
1910 5616 : while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL)
1911 : {
1912 3754 : if (strlen(clde->d_name) == 16 &&
1913 30 : strspn(clde->d_name, "0123456789ABCDEF") == 16)
1914 : {
1915 : FullTransactionId fxid;
1916 : char *buf;
1917 :
1918 30 : fxid = FullTransactionIdFromU64(strtou64(clde->d_name, NULL, 16));
1919 :
1920 30 : buf = ProcessTwoPhaseBuffer(fxid, InvalidXLogRecPtr,
1921 : true, false, false);
1922 30 : if (buf == NULL)
1923 0 : continue;
1924 :
1925 30 : PrepareRedoAdd(fxid, buf, InvalidXLogRecPtr,
1926 : InvalidXLogRecPtr, InvalidRepOriginId);
1927 : }
1928 : }
1929 1862 : LWLockRelease(TwoPhaseStateLock);
1930 1862 : FreeDir(cldir);
1931 1862 : }
1932 :
1933 : /*
1934 : * PrescanPreparedTransactions
1935 : *
1936 : * Scan the shared memory entries of TwoPhaseState and determine the range
1937 : * of valid XIDs present. This is run during database startup, after we
1938 : * have completed reading WAL. TransamVariables->nextXid has been set to
1939 : * one more than the highest XID for which evidence exists in WAL.
1940 : *
1941 : * We throw away any prepared xacts with main XID beyond nextXid --- if any
1942 : * are present, it suggests that the DBA has done a PITR recovery to an
1943 : * earlier point in time without cleaning out pg_twophase. We dare not
1944 : * try to recover such prepared xacts since they likely depend on database
1945 : * state that doesn't exist now.
1946 : *
1947 : * However, we will advance nextXid beyond any subxact XIDs belonging to
1948 : * valid prepared xacts. We need to do this since subxact commit doesn't
1949 : * write a WAL entry, and so there might be no evidence in WAL of those
1950 : * subxact XIDs.
1951 : *
1952 : * On corrupted two-phase files, fail immediately. Keeping around broken
1953 : * entries and let replay continue causes harm on the system, and a new
1954 : * backup should be rolled in.
1955 : *
1956 : * Our other responsibility is to determine and return the oldest valid XID
1957 : * among the prepared xacts (if none, return TransamVariables->nextXid).
1958 : * This is needed to synchronize pg_subtrans startup properly.
1959 : *
1960 : * If xids_p and nxids_p are not NULL, pointer to a palloc'd array of all
1961 : * top-level xids is stored in *xids_p. The number of entries in the array
1962 : * is returned in *nxids_p.
1963 : */
1964 : TransactionId
1965 1862 : PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
1966 : {
1967 1862 : FullTransactionId nextXid = TransamVariables->nextXid;
1968 1862 : TransactionId origNextXid = XidFromFullTransactionId(nextXid);
1969 1862 : TransactionId result = origNextXid;
1970 1862 : TransactionId *xids = NULL;
1971 1862 : int nxids = 0;
1972 1862 : int allocsize = 0;
1973 : int i;
1974 :
1975 1862 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
1976 1966 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
1977 : {
1978 : TransactionId xid;
1979 : char *buf;
1980 104 : GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
1981 :
1982 : Assert(gxact->inredo);
1983 :
1984 104 : buf = ProcessTwoPhaseBuffer(gxact->fxid,
1985 : gxact->prepare_start_lsn,
1986 104 : gxact->ondisk, false, true);
1987 :
1988 104 : if (buf == NULL)
1989 0 : continue;
1990 :
1991 : /*
1992 : * OK, we think this file is valid. Incorporate xid into the
1993 : * running-minimum result.
1994 : */
1995 104 : xid = XidFromFullTransactionId(gxact->fxid);
1996 104 : if (TransactionIdPrecedes(xid, result))
1997 90 : result = xid;
1998 :
1999 104 : if (xids_p)
2000 : {
2001 38 : if (nxids == allocsize)
2002 : {
2003 30 : if (nxids == 0)
2004 : {
2005 30 : allocsize = 10;
2006 30 : xids = palloc(allocsize * sizeof(TransactionId));
2007 : }
2008 : else
2009 : {
2010 0 : allocsize = allocsize * 2;
2011 0 : xids = repalloc(xids, allocsize * sizeof(TransactionId));
2012 : }
2013 : }
2014 38 : xids[nxids++] = xid;
2015 : }
2016 :
2017 104 : pfree(buf);
2018 : }
2019 1862 : LWLockRelease(TwoPhaseStateLock);
2020 :
2021 1862 : if (xids_p)
2022 : {
2023 116 : *xids_p = xids;
2024 116 : *nxids_p = nxids;
2025 : }
2026 :
2027 1862 : return result;
2028 : }
2029 :
2030 : /*
2031 : * StandbyRecoverPreparedTransactions
2032 : *
2033 : * Scan the shared memory entries of TwoPhaseState and setup all the required
2034 : * information to allow standby queries to treat prepared transactions as still
2035 : * active.
2036 : *
2037 : * This is never called at the end of recovery - we use
2038 : * RecoverPreparedTransactions() at that point.
2039 : *
2040 : * This updates pg_subtrans, so that any subtransactions will be correctly
2041 : * seen as in-progress in snapshots taken during recovery.
2042 : */
2043 : void
2044 116 : StandbyRecoverPreparedTransactions(void)
2045 : {
2046 : int i;
2047 :
2048 116 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
2049 154 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
2050 : {
2051 : char *buf;
2052 38 : GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
2053 :
2054 : Assert(gxact->inredo);
2055 :
2056 38 : buf = ProcessTwoPhaseBuffer(gxact->fxid,
2057 : gxact->prepare_start_lsn,
2058 38 : gxact->ondisk, true, false);
2059 38 : if (buf != NULL)
2060 38 : pfree(buf);
2061 : }
2062 116 : LWLockRelease(TwoPhaseStateLock);
2063 116 : }
2064 :
2065 : /*
2066 : * RecoverPreparedTransactions
2067 : *
2068 : * Scan the shared memory entries of TwoPhaseState and reload the state for
2069 : * each prepared transaction (reacquire locks, etc).
2070 : *
2071 : * This is run at the end of recovery, but before we allow backends to write
2072 : * WAL.
2073 : *
2074 : * At the end of recovery the way we take snapshots will change. We now need
2075 : * to mark all running transactions with their full SubTransSetParent() info
2076 : * to allow normal snapshots to work correctly if snapshots overflow.
2077 : * We do this here because by definition prepared transactions are the only
2078 : * type of write transaction still running, so this is necessary and
2079 : * complete.
2080 : */
2081 : void
2082 1746 : RecoverPreparedTransactions(void)
2083 : {
2084 : int i;
2085 :
2086 1746 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
2087 1812 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
2088 : {
2089 : char *buf;
2090 66 : GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
2091 66 : FullTransactionId fxid = gxact->fxid;
2092 : char *bufptr;
2093 : TwoPhaseFileHeader *hdr;
2094 : TransactionId *subxids;
2095 : const char *gid;
2096 :
2097 : /*
2098 : * Reconstruct subtrans state for the transaction --- needed because
2099 : * pg_subtrans is not preserved over a restart. Note that we are
2100 : * linking all the subtransactions directly to the top-level XID;
2101 : * there may originally have been a more complex hierarchy, but
2102 : * there's no need to restore that exactly. It's possible that
2103 : * SubTransSetParent has been set before, if the prepared transaction
2104 : * generated xid assignment records.
2105 : */
2106 66 : buf = ProcessTwoPhaseBuffer(gxact->fxid,
2107 : gxact->prepare_start_lsn,
2108 66 : gxact->ondisk, true, false);
2109 66 : if (buf == NULL)
2110 0 : continue;
2111 :
2112 66 : ereport(LOG,
2113 : (errmsg("recovering prepared transaction %u of epoch %u from shared memory",
2114 : XidFromFullTransactionId(gxact->fxid),
2115 : EpochFromFullTransactionId(gxact->fxid))));
2116 :
2117 66 : hdr = (TwoPhaseFileHeader *) buf;
2118 : Assert(TransactionIdEquals(hdr->xid,
2119 : XidFromFullTransactionId(gxact->fxid)));
2120 66 : bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
2121 66 : gid = (const char *) bufptr;
2122 66 : bufptr += MAXALIGN(hdr->gidlen);
2123 66 : subxids = (TransactionId *) bufptr;
2124 66 : bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
2125 66 : bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileLocator));
2126 66 : bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileLocator));
2127 66 : bufptr += MAXALIGN(hdr->ncommitstats * sizeof(xl_xact_stats_item));
2128 66 : bufptr += MAXALIGN(hdr->nabortstats * sizeof(xl_xact_stats_item));
2129 66 : bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
2130 :
2131 : /*
2132 : * Recreate its GXACT and dummy PGPROC. But, check whether it was
2133 : * added in redo and already has a shmem entry for it.
2134 : */
2135 66 : MarkAsPreparingGuts(gxact, gxact->fxid, gid,
2136 : hdr->prepared_at,
2137 : hdr->owner, hdr->database);
2138 :
2139 : /* recovered, so reset the flag for entries generated by redo */
2140 66 : gxact->inredo = false;
2141 :
2142 66 : GXactLoadSubxactData(gxact, hdr->nsubxacts, subxids);
2143 66 : MarkAsPrepared(gxact, true);
2144 :
2145 66 : LWLockRelease(TwoPhaseStateLock);
2146 :
2147 : /*
2148 : * Recover other state (notably locks) using resource managers.
2149 : */
2150 66 : ProcessRecords(bufptr, fxid, twophase_recover_callbacks);
2151 :
2152 : /*
2153 : * Release locks held by the standby process after we process each
2154 : * prepared transaction. As a result, we don't need too many
2155 : * additional locks at any one time.
2156 : */
2157 66 : if (InHotStandby)
2158 14 : StandbyReleaseLockTree(hdr->xid, hdr->nsubxacts, subxids);
2159 :
2160 : /*
2161 : * We're done with recovering this transaction. Clear MyLockedGxact,
2162 : * like we do in PrepareTransaction() during normal operation.
2163 : */
2164 66 : PostPrepare_Twophase();
2165 :
2166 66 : pfree(buf);
2167 :
2168 66 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
2169 : }
2170 :
2171 1746 : LWLockRelease(TwoPhaseStateLock);
2172 1746 : }
2173 :
2174 : /*
2175 : * ProcessTwoPhaseBuffer
2176 : *
2177 : * Given a FullTransactionId, read it either from disk or read it directly
2178 : * via shmem xlog record pointer using the provided "prepare_start_lsn".
2179 : *
2180 : * If setParent is true, set up subtransaction parent linkages.
2181 : *
2182 : * If setNextXid is true, set TransamVariables->nextXid to the newest
2183 : * value scanned.
2184 : */
2185 : static char *
2186 238 : ProcessTwoPhaseBuffer(FullTransactionId fxid,
2187 : XLogRecPtr prepare_start_lsn,
2188 : bool fromdisk,
2189 : bool setParent, bool setNextXid)
2190 : {
2191 238 : FullTransactionId nextXid = TransamVariables->nextXid;
2192 : TransactionId *subxids;
2193 : char *buf;
2194 : TwoPhaseFileHeader *hdr;
2195 : int i;
2196 :
2197 : Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
2198 :
2199 238 : if (!fromdisk)
2200 : Assert(prepare_start_lsn != InvalidXLogRecPtr);
2201 :
2202 : /* Already processed? */
2203 476 : if (TransactionIdDidCommit(XidFromFullTransactionId(fxid)) ||
2204 238 : TransactionIdDidAbort(XidFromFullTransactionId(fxid)))
2205 : {
2206 0 : if (fromdisk)
2207 : {
2208 0 : ereport(WARNING,
2209 : (errmsg("removing stale two-phase state file for transaction %u of epoch %u",
2210 : XidFromFullTransactionId(fxid),
2211 : EpochFromFullTransactionId(fxid))));
2212 0 : RemoveTwoPhaseFile(fxid, true);
2213 : }
2214 : else
2215 : {
2216 0 : ereport(WARNING,
2217 : (errmsg("removing stale two-phase state from memory for transaction %u of epoch %u",
2218 : XidFromFullTransactionId(fxid),
2219 : EpochFromFullTransactionId(fxid))));
2220 0 : PrepareRedoRemoveFull(fxid, true);
2221 : }
2222 0 : return NULL;
2223 : }
2224 :
2225 : /* Reject XID if too new */
2226 238 : if (FullTransactionIdFollowsOrEquals(fxid, nextXid))
2227 : {
2228 0 : if (fromdisk)
2229 : {
2230 0 : ereport(WARNING,
2231 : (errmsg("removing future two-phase state file for transaction %u of epoch %u",
2232 : XidFromFullTransactionId(fxid),
2233 : EpochFromFullTransactionId(fxid))));
2234 0 : RemoveTwoPhaseFile(fxid, true);
2235 : }
2236 : else
2237 : {
2238 0 : ereport(WARNING,
2239 : (errmsg("removing future two-phase state from memory for transaction %u of epoch %u",
2240 : XidFromFullTransactionId(fxid),
2241 : EpochFromFullTransactionId(fxid))));
2242 0 : PrepareRedoRemoveFull(fxid, true);
2243 : }
2244 0 : return NULL;
2245 : }
2246 :
2247 238 : if (fromdisk)
2248 : {
2249 : /* Read and validate file */
2250 94 : buf = ReadTwoPhaseFile(fxid, false);
2251 : }
2252 : else
2253 : {
2254 : /* Read xlog data */
2255 144 : XlogReadTwoPhaseData(prepare_start_lsn, &buf, NULL);
2256 : }
2257 :
2258 : /* Deconstruct header */
2259 238 : hdr = (TwoPhaseFileHeader *) buf;
2260 238 : if (!TransactionIdEquals(hdr->xid, XidFromFullTransactionId(fxid)))
2261 : {
2262 0 : if (fromdisk)
2263 0 : ereport(ERROR,
2264 : (errcode(ERRCODE_DATA_CORRUPTED),
2265 : errmsg("corrupted two-phase state file for transaction %u of epoch %u",
2266 : XidFromFullTransactionId(fxid),
2267 : EpochFromFullTransactionId(fxid))));
2268 : else
2269 0 : ereport(ERROR,
2270 : (errcode(ERRCODE_DATA_CORRUPTED),
2271 : errmsg("corrupted two-phase state in memory for transaction %u of epoch %u",
2272 : XidFromFullTransactionId(fxid),
2273 : EpochFromFullTransactionId(fxid))));
2274 : }
2275 :
2276 : /*
2277 : * Examine subtransaction XIDs ... they should all follow main XID, and
2278 : * they may force us to advance nextXid.
2279 : */
2280 238 : subxids = (TransactionId *) (buf +
2281 238 : MAXALIGN(sizeof(TwoPhaseFileHeader)) +
2282 238 : MAXALIGN(hdr->gidlen));
2283 3802 : for (i = 0; i < hdr->nsubxacts; i++)
2284 : {
2285 3564 : TransactionId subxid = subxids[i];
2286 :
2287 : Assert(TransactionIdFollows(subxid, XidFromFullTransactionId(fxid)));
2288 :
2289 : /* update nextXid if needed */
2290 3564 : if (setNextXid)
2291 1642 : AdvanceNextFullTransactionIdPastXid(subxid);
2292 :
2293 3564 : if (setParent)
2294 1642 : SubTransSetParent(subxid, XidFromFullTransactionId(fxid));
2295 : }
2296 :
2297 238 : return buf;
2298 : }
2299 :
2300 :
2301 : /*
2302 : * RecordTransactionCommitPrepared
2303 : *
2304 : * This is basically the same as RecordTransactionCommit (q.v. if you change
2305 : * this function): in particular, we must set DELAY_CHKPT_IN_COMMIT to avoid a
2306 : * race condition.
2307 : *
2308 : * We know the transaction made at least one XLOG entry (its PREPARE),
2309 : * so it is never possible to optimize out the commit record.
2310 : */
2311 : static void
2312 502 : RecordTransactionCommitPrepared(TransactionId xid,
2313 : int nchildren,
2314 : TransactionId *children,
2315 : int nrels,
2316 : RelFileLocator *rels,
2317 : int nstats,
2318 : xl_xact_stats_item *stats,
2319 : int ninvalmsgs,
2320 : SharedInvalidationMessage *invalmsgs,
2321 : bool initfileinval,
2322 : const char *gid)
2323 : {
2324 : XLogRecPtr recptr;
2325 : TimestampTz committs;
2326 : bool replorigin;
2327 :
2328 : /*
2329 : * Are we using the replication origins feature? Or, in other words, are
2330 : * we replaying remote actions?
2331 : */
2332 544 : replorigin = (replorigin_session_origin != InvalidRepOriginId &&
2333 42 : replorigin_session_origin != DoNotReplicateId);
2334 :
2335 502 : START_CRIT_SECTION();
2336 :
2337 : /* See notes in RecordTransactionCommit */
2338 : Assert((MyProc->delayChkptFlags & DELAY_CHKPT_IN_COMMIT) == 0);
2339 502 : MyProc->delayChkptFlags |= DELAY_CHKPT_IN_COMMIT;
2340 :
2341 : /*
2342 : * Ensures the DELAY_CHKPT_IN_COMMIT flag write is globally visible before
2343 : * commit time is written.
2344 : */
2345 502 : pg_write_barrier();
2346 :
2347 : /*
2348 : * Note it is important to set committs value after marking ourselves as
2349 : * in the commit critical section (DELAY_CHKPT_IN_COMMIT). This is because
2350 : * we want to ensure all transactions that have acquired commit timestamp
2351 : * are finished before we allow the logical replication client to advance
2352 : * its xid which is used to hold back dead rows for conflict detection.
2353 : * See comments atop worker.c.
2354 : */
2355 502 : committs = GetCurrentTimestamp();
2356 :
2357 : /*
2358 : * Emit the XLOG commit record. Note that we mark 2PC commits as
2359 : * potentially having AccessExclusiveLocks since we don't know whether or
2360 : * not they do.
2361 : */
2362 502 : recptr = XactLogCommitRecord(committs,
2363 : nchildren, children, nrels, rels,
2364 : nstats, stats,
2365 : ninvalmsgs, invalmsgs,
2366 : initfileinval,
2367 502 : MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
2368 : xid, gid);
2369 :
2370 :
2371 502 : if (replorigin)
2372 : /* Move LSNs forward for this replication origin */
2373 42 : replorigin_session_advance(replorigin_session_origin_lsn,
2374 : XactLastRecEnd);
2375 :
2376 : /*
2377 : * Record commit timestamp. The value comes from plain commit timestamp
2378 : * if replorigin is not enabled, or replorigin already set a value for us
2379 : * in replorigin_session_origin_timestamp otherwise.
2380 : *
2381 : * We don't need to WAL-log anything here, as the commit record written
2382 : * above already contains the data.
2383 : */
2384 502 : if (!replorigin || replorigin_session_origin_timestamp == 0)
2385 460 : replorigin_session_origin_timestamp = committs;
2386 :
2387 502 : TransactionTreeSetCommitTsData(xid, nchildren, children,
2388 : replorigin_session_origin_timestamp,
2389 : replorigin_session_origin);
2390 :
2391 : /*
2392 : * We don't currently try to sleep before flush here ... nor is there any
2393 : * support for async commit of a prepared xact (the very idea is probably
2394 : * a contradiction)
2395 : */
2396 :
2397 : /* Flush XLOG to disk */
2398 502 : XLogFlush(recptr);
2399 :
2400 : /* Mark the transaction committed in pg_xact */
2401 502 : TransactionIdCommitTree(xid, nchildren, children);
2402 :
2403 : /* Checkpoint can proceed now */
2404 502 : MyProc->delayChkptFlags &= ~DELAY_CHKPT_IN_COMMIT;
2405 :
2406 502 : END_CRIT_SECTION();
2407 :
2408 : /*
2409 : * Wait for synchronous replication, if required.
2410 : *
2411 : * Note that at this stage we have marked clog, but still show as running
2412 : * in the procarray and continue to hold locks.
2413 : */
2414 502 : SyncRepWaitForLSN(recptr, true);
2415 502 : }
2416 :
2417 : /*
2418 : * RecordTransactionAbortPrepared
2419 : *
2420 : * This is basically the same as RecordTransactionAbort.
2421 : *
2422 : * We know the transaction made at least one XLOG entry (its PREPARE),
2423 : * so it is never possible to optimize out the abort record.
2424 : */
2425 : static void
2426 76 : RecordTransactionAbortPrepared(TransactionId xid,
2427 : int nchildren,
2428 : TransactionId *children,
2429 : int nrels,
2430 : RelFileLocator *rels,
2431 : int nstats,
2432 : xl_xact_stats_item *stats,
2433 : const char *gid)
2434 : {
2435 : XLogRecPtr recptr;
2436 : bool replorigin;
2437 :
2438 : /*
2439 : * Are we using the replication origins feature? Or, in other words, are
2440 : * we replaying remote actions?
2441 : */
2442 88 : replorigin = (replorigin_session_origin != InvalidRepOriginId &&
2443 12 : replorigin_session_origin != DoNotReplicateId);
2444 :
2445 : /*
2446 : * Catch the scenario where we aborted partway through
2447 : * RecordTransactionCommitPrepared ...
2448 : */
2449 76 : if (TransactionIdDidCommit(xid))
2450 0 : elog(PANIC, "cannot abort transaction %u, it was already committed",
2451 : xid);
2452 :
2453 76 : START_CRIT_SECTION();
2454 :
2455 : /*
2456 : * Emit the XLOG commit record. Note that we mark 2PC aborts as
2457 : * potentially having AccessExclusiveLocks since we don't know whether or
2458 : * not they do.
2459 : */
2460 76 : recptr = XactLogAbortRecord(GetCurrentTimestamp(),
2461 : nchildren, children,
2462 : nrels, rels,
2463 : nstats, stats,
2464 76 : MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
2465 : xid, gid);
2466 :
2467 76 : if (replorigin)
2468 : /* Move LSNs forward for this replication origin */
2469 12 : replorigin_session_advance(replorigin_session_origin_lsn,
2470 : XactLastRecEnd);
2471 :
2472 : /* Always flush, since we're about to remove the 2PC state file */
2473 76 : XLogFlush(recptr);
2474 :
2475 : /*
2476 : * Mark the transaction aborted in clog. This is not absolutely necessary
2477 : * but we may as well do it while we are here.
2478 : */
2479 76 : TransactionIdAbortTree(xid, nchildren, children);
2480 :
2481 76 : END_CRIT_SECTION();
2482 :
2483 : /*
2484 : * Wait for synchronous replication, if required.
2485 : *
2486 : * Note that at this stage we have marked clog, but still show as running
2487 : * in the procarray and continue to hold locks.
2488 : */
2489 76 : SyncRepWaitForLSN(recptr, false);
2490 76 : }
2491 :
2492 : /*
2493 : * PrepareRedoAdd
2494 : *
2495 : * Store pointers to the start/end of the WAL record along with the xid in
2496 : * a gxact entry in shared memory TwoPhaseState structure. If caller
2497 : * specifies InvalidXLogRecPtr as WAL location to fetch the two-phase
2498 : * data, the entry is marked as located on disk.
2499 : */
2500 : void
2501 182 : PrepareRedoAdd(FullTransactionId fxid, char *buf,
2502 : XLogRecPtr start_lsn, XLogRecPtr end_lsn,
2503 : RepOriginId origin_id)
2504 : {
2505 182 : TwoPhaseFileHeader *hdr = (TwoPhaseFileHeader *) buf;
2506 : char *bufptr;
2507 : const char *gid;
2508 : GlobalTransaction gxact;
2509 :
2510 : Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
2511 : Assert(RecoveryInProgress());
2512 :
2513 182 : if (!FullTransactionIdIsValid(fxid))
2514 : {
2515 : Assert(InRecovery);
2516 152 : fxid = FullTransactionIdFromAllowableAt(TransamVariables->nextXid,
2517 : hdr->xid);
2518 : }
2519 :
2520 182 : bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
2521 182 : gid = (const char *) bufptr;
2522 :
2523 : /*
2524 : * Reserve the GID for the given transaction in the redo code path.
2525 : *
2526 : * This creates a gxact struct and puts it into the active array.
2527 : *
2528 : * In redo, this struct is mainly used to track PREPARE/COMMIT entries in
2529 : * shared memory. Hence, we only fill up the bare minimum contents here.
2530 : * The gxact also gets marked with gxact->inredo set to true to indicate
2531 : * that it got added in the redo phase
2532 : */
2533 :
2534 : /*
2535 : * In the event of a crash while a checkpoint was running, it may be
2536 : * possible that some two-phase data found its way to disk while its
2537 : * corresponding record needs to be replayed in the follow-up recovery. As
2538 : * the 2PC data was on disk, it has already been restored at the beginning
2539 : * of recovery with restoreTwoPhaseData(), so skip this record to avoid
2540 : * duplicates in TwoPhaseState. If a consistent state has been reached,
2541 : * the record is added to TwoPhaseState and it should have no
2542 : * corresponding file in pg_twophase.
2543 : */
2544 182 : if (!XLogRecPtrIsInvalid(start_lsn))
2545 : {
2546 : char path[MAXPGPATH];
2547 :
2548 : Assert(InRecovery);
2549 152 : TwoPhaseFilePath(path, fxid);
2550 :
2551 152 : if (access(path, F_OK) == 0)
2552 : {
2553 0 : ereport(reachedConsistency ? ERROR : WARNING,
2554 : (errmsg("could not recover two-phase state file for transaction %u",
2555 : hdr->xid),
2556 : errdetail("Two-phase state file has been found in WAL record %X/%08X, but this transaction has already been restored from disk.",
2557 : LSN_FORMAT_ARGS(start_lsn))));
2558 0 : return;
2559 : }
2560 :
2561 152 : if (errno != ENOENT)
2562 0 : ereport(ERROR,
2563 : (errcode_for_file_access(),
2564 : errmsg("could not access file \"%s\": %m", path)));
2565 : }
2566 :
2567 : /* Get a free gxact from the freelist */
2568 182 : if (TwoPhaseState->freeGXacts == NULL)
2569 0 : ereport(ERROR,
2570 : (errcode(ERRCODE_OUT_OF_MEMORY),
2571 : errmsg("maximum number of prepared transactions reached"),
2572 : errhint("Increase \"max_prepared_transactions\" (currently %d).",
2573 : max_prepared_xacts)));
2574 182 : gxact = TwoPhaseState->freeGXacts;
2575 182 : TwoPhaseState->freeGXacts = gxact->next;
2576 :
2577 182 : gxact->prepared_at = hdr->prepared_at;
2578 182 : gxact->prepare_start_lsn = start_lsn;
2579 182 : gxact->prepare_end_lsn = end_lsn;
2580 182 : gxact->fxid = fxid;
2581 182 : gxact->owner = hdr->owner;
2582 182 : gxact->locking_backend = INVALID_PROC_NUMBER;
2583 182 : gxact->valid = false;
2584 182 : gxact->ondisk = XLogRecPtrIsInvalid(start_lsn);
2585 182 : gxact->inredo = true; /* yes, added in redo */
2586 182 : strcpy(gxact->gid, gid);
2587 :
2588 : /* And insert it into the active array */
2589 : Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
2590 182 : TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;
2591 :
2592 182 : if (origin_id != InvalidRepOriginId)
2593 : {
2594 : /* recover apply progress */
2595 26 : replorigin_advance(origin_id, hdr->origin_lsn, end_lsn,
2596 : false /* backward */ , false /* WAL */ );
2597 : }
2598 :
2599 182 : elog(DEBUG2, "added 2PC data in shared memory for transaction %u of epoch %u",
2600 : XidFromFullTransactionId(gxact->fxid),
2601 : EpochFromFullTransactionId(gxact->fxid));
2602 : }
2603 :
2604 : /*
2605 : * PrepareRedoRemoveFull
2606 : *
2607 : * Remove the corresponding gxact entry from TwoPhaseState. Also remove
2608 : * the 2PC file if a prepared transaction was saved via an earlier checkpoint.
2609 : *
2610 : * Caller must hold TwoPhaseStateLock in exclusive mode, because TwoPhaseState
2611 : * is updated.
2612 : */
2613 : static void
2614 130 : PrepareRedoRemoveFull(FullTransactionId fxid, bool giveWarning)
2615 : {
2616 130 : GlobalTransaction gxact = NULL;
2617 : int i;
2618 130 : bool found = false;
2619 :
2620 : Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
2621 : Assert(RecoveryInProgress());
2622 :
2623 130 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
2624 : {
2625 112 : gxact = TwoPhaseState->prepXacts[i];
2626 :
2627 112 : if (FullTransactionIdEquals(gxact->fxid, fxid))
2628 : {
2629 : Assert(gxact->inredo);
2630 112 : found = true;
2631 112 : break;
2632 : }
2633 : }
2634 :
2635 : /*
2636 : * Just leave if there is nothing, this is expected during WAL replay.
2637 : */
2638 130 : if (!found)
2639 18 : return;
2640 :
2641 : /*
2642 : * And now we can clean up any files we may have left.
2643 : */
2644 112 : elog(DEBUG2, "removing 2PC data for transaction %u of epoch %u ",
2645 : XidFromFullTransactionId(fxid),
2646 : EpochFromFullTransactionId(fxid));
2647 :
2648 112 : if (gxact->ondisk)
2649 8 : RemoveTwoPhaseFile(fxid, giveWarning);
2650 :
2651 112 : RemoveGXact(gxact);
2652 : }
2653 :
2654 : /*
2655 : * Wrapper of PrepareRedoRemoveFull(), for TransactionIds.
2656 : */
2657 : void
2658 130 : PrepareRedoRemove(TransactionId xid, bool giveWarning)
2659 : {
2660 : FullTransactionId fxid =
2661 130 : FullTransactionIdFromAllowableAt(TransamVariables->nextXid, xid);
2662 :
2663 130 : PrepareRedoRemoveFull(fxid, giveWarning);
2664 130 : }
2665 :
2666 : /*
2667 : * LookupGXact
2668 : * Check if the prepared transaction with the given GID, lsn and timestamp
2669 : * exists.
2670 : *
2671 : * Note that we always compare with the LSN where prepare ends because that is
2672 : * what is stored as origin_lsn in the 2PC file.
2673 : *
2674 : * This function is primarily used to check if the prepared transaction
2675 : * received from the upstream (remote node) already exists. Checking only GID
2676 : * is not sufficient because a different prepared xact with the same GID can
2677 : * exist on the same node. So, we are ensuring to match origin_lsn and
2678 : * origin_timestamp of prepared xact to avoid the possibility of a match of
2679 : * prepared xact from two different nodes.
2680 : */
2681 : bool
2682 10 : LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn,
2683 : TimestampTz origin_prepare_timestamp)
2684 : {
2685 : int i;
2686 10 : bool found = false;
2687 :
2688 10 : LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
2689 10 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
2690 : {
2691 10 : GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
2692 :
2693 : /* Ignore not-yet-valid GIDs. */
2694 10 : if (gxact->valid && strcmp(gxact->gid, gid) == 0)
2695 : {
2696 : char *buf;
2697 : TwoPhaseFileHeader *hdr;
2698 :
2699 : /*
2700 : * We are not expecting collisions of GXACTs (same gid) between
2701 : * publisher and subscribers, so we perform all I/O while holding
2702 : * TwoPhaseStateLock for simplicity.
2703 : *
2704 : * To move the I/O out of the lock, we need to ensure that no
2705 : * other backend commits the prepared xact in the meantime. We can
2706 : * do this optimization if we encounter many collisions in GID
2707 : * between publisher and subscriber.
2708 : */
2709 10 : if (gxact->ondisk)
2710 0 : buf = ReadTwoPhaseFile(gxact->fxid, false);
2711 : else
2712 : {
2713 : Assert(gxact->prepare_start_lsn);
2714 10 : XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, NULL);
2715 : }
2716 :
2717 10 : hdr = (TwoPhaseFileHeader *) buf;
2718 :
2719 10 : if (hdr->origin_lsn == prepare_end_lsn &&
2720 10 : hdr->origin_timestamp == origin_prepare_timestamp)
2721 : {
2722 10 : found = true;
2723 10 : pfree(buf);
2724 10 : break;
2725 : }
2726 :
2727 0 : pfree(buf);
2728 : }
2729 : }
2730 10 : LWLockRelease(TwoPhaseStateLock);
2731 10 : return found;
2732 : }
2733 :
2734 : /*
2735 : * TwoPhaseTransactionGid
2736 : * Form the prepared transaction GID for two_phase transactions.
2737 : *
2738 : * Return the GID in the supplied buffer.
2739 : */
2740 : void
2741 96 : TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid_res, int szgid)
2742 : {
2743 : Assert(OidIsValid(subid));
2744 :
2745 96 : if (!TransactionIdIsValid(xid))
2746 0 : ereport(ERROR,
2747 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
2748 : errmsg_internal("invalid two-phase transaction ID")));
2749 :
2750 96 : snprintf(gid_res, szgid, "pg_gid_%u_%u", subid, xid);
2751 96 : }
2752 :
2753 : /*
2754 : * IsTwoPhaseTransactionGidForSubid
2755 : * Check whether the given GID (as formed by TwoPhaseTransactionGid) is
2756 : * for the specified 'subid'.
2757 : */
2758 : static bool
2759 0 : IsTwoPhaseTransactionGidForSubid(Oid subid, char *gid)
2760 : {
2761 : int ret;
2762 : Oid subid_from_gid;
2763 : TransactionId xid_from_gid;
2764 : char gid_tmp[GIDSIZE];
2765 :
2766 : /* Extract the subid and xid from the given GID */
2767 0 : ret = sscanf(gid, "pg_gid_%u_%u", &subid_from_gid, &xid_from_gid);
2768 :
2769 : /*
2770 : * Check that the given GID has expected format, and at least the subid
2771 : * matches.
2772 : */
2773 0 : if (ret != 2 || subid != subid_from_gid)
2774 0 : return false;
2775 :
2776 : /*
2777 : * Reconstruct a temporary GID based on the subid and xid extracted from
2778 : * the given GID and check whether the temporary GID and the given GID
2779 : * match.
2780 : */
2781 0 : TwoPhaseTransactionGid(subid, xid_from_gid, gid_tmp, sizeof(gid_tmp));
2782 :
2783 0 : return strcmp(gid, gid_tmp) == 0;
2784 : }
2785 :
2786 : /*
2787 : * LookupGXactBySubid
2788 : * Check if the prepared transaction done by apply worker exists.
2789 : */
2790 : bool
2791 2 : LookupGXactBySubid(Oid subid)
2792 : {
2793 2 : bool found = false;
2794 :
2795 2 : LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
2796 2 : for (int i = 0; i < TwoPhaseState->numPrepXacts; i++)
2797 : {
2798 0 : GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
2799 :
2800 : /* Ignore not-yet-valid GIDs. */
2801 0 : if (gxact->valid &&
2802 0 : IsTwoPhaseTransactionGidForSubid(subid, gxact->gid))
2803 : {
2804 0 : found = true;
2805 0 : break;
2806 : }
2807 : }
2808 2 : LWLockRelease(TwoPhaseStateLock);
2809 :
2810 2 : return found;
2811 : }
|