Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * twophase.c
4 : * Two-phase commit support functions.
5 : *
6 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * IDENTIFICATION
10 : * src/backend/access/transam/twophase.c
11 : *
12 : * NOTES
13 : * Each global transaction is associated with a global transaction
14 : * identifier (GID). The client assigns a GID to a postgres
15 : * transaction with the PREPARE TRANSACTION command.
16 : *
17 : * We keep all active global transactions in a shared memory array.
18 : * When the PREPARE TRANSACTION command is issued, the GID is
19 : * reserved for the transaction in the array. This is done before
20 : * a WAL entry is made, because the reservation checks for duplicate
21 : * GIDs and aborts the transaction if there already is a global
22 : * transaction in prepared state with the same GID.
23 : *
24 : * A global transaction (gxact) also has dummy PGPROC; this is what keeps
25 : * the XID considered running by TransactionIdIsInProgress. It is also
26 : * convenient as a PGPROC to hook the gxact's locks to.
27 : *
28 : * Information to recover prepared transactions in case of crash is
29 : * now stored in WAL for the common case. In some cases there will be
30 : * an extended period between preparing a GXACT and commit/abort, in
31 : * which case we need to separately record prepared transaction data
32 : * in permanent storage. This includes locking information, pending
33 : * notifications etc. All that state information is written to the
34 : * per-transaction state file in the pg_twophase directory.
35 : * All prepared transactions will be written prior to shutdown.
36 : *
37 : * Life track of state data is following:
38 : *
39 : * * On PREPARE TRANSACTION backend writes state data only to the WAL and
40 : * stores pointer to the start of the WAL record in
41 : * gxact->prepare_start_lsn.
42 : * * If COMMIT occurs before checkpoint then backend reads data from WAL
43 : * using prepare_start_lsn.
44 : * * On checkpoint state data copied to files in pg_twophase directory and
45 : * fsynced
46 : * * If COMMIT happens after checkpoint then backend reads state data from
47 : * files
48 : *
49 : * During replay and replication, TwoPhaseState also holds information
50 : * about active prepared transactions that haven't been moved to disk yet.
51 : *
52 : * Replay of twophase records happens by the following rules:
53 : *
54 : * * At the beginning of recovery, pg_twophase is scanned once, filling
55 : * TwoPhaseState with entries marked with gxact->inredo and
56 : * gxact->ondisk. Two-phase file data older than the XID horizon of
57 : * the redo position are discarded.
58 : * * On PREPARE redo, the transaction is added to TwoPhaseState->prepXacts.
59 : * gxact->inredo is set to true for such entries.
60 : * * On Checkpoint we iterate through TwoPhaseState->prepXacts entries
61 : * that have gxact->inredo set and are behind the redo_horizon. We
62 : * save them to disk and then switch gxact->ondisk to true.
63 : * * On COMMIT/ABORT we delete the entry from TwoPhaseState->prepXacts.
64 : * If gxact->ondisk is true, the corresponding entry from the disk
65 : * is additionally deleted.
66 : * * RecoverPreparedTransactions(), StandbyRecoverPreparedTransactions()
67 : * and PrescanPreparedTransactions() have been modified to go through
68 : * gxact->inredo entries that have not made it to disk.
69 : *
70 : *-------------------------------------------------------------------------
71 : */
72 : #include "postgres.h"
73 :
74 : #include <fcntl.h>
75 : #include <sys/stat.h>
76 : #include <time.h>
77 : #include <unistd.h>
78 :
79 : #include "access/commit_ts.h"
80 : #include "access/htup_details.h"
81 : #include "access/subtrans.h"
82 : #include "access/transam.h"
83 : #include "access/twophase.h"
84 : #include "access/twophase_rmgr.h"
85 : #include "access/xact.h"
86 : #include "access/xlog.h"
87 : #include "access/xloginsert.h"
88 : #include "access/xlogreader.h"
89 : #include "access/xlogrecovery.h"
90 : #include "access/xlogutils.h"
91 : #include "catalog/pg_type.h"
92 : #include "catalog/storage.h"
93 : #include "funcapi.h"
94 : #include "miscadmin.h"
95 : #include "pg_trace.h"
96 : #include "pgstat.h"
97 : #include "replication/origin.h"
98 : #include "replication/syncrep.h"
99 : #include "storage/fd.h"
100 : #include "storage/ipc.h"
101 : #include "storage/md.h"
102 : #include "storage/predicate.h"
103 : #include "storage/proc.h"
104 : #include "storage/procarray.h"
105 : #include "utils/builtins.h"
106 : #include "utils/injection_point.h"
107 : #include "utils/memutils.h"
108 : #include "utils/timestamp.h"
109 : #include "utils/wait_event.h"
110 :
111 : /*
112 : * Directory where Two-phase commit files reside within PGDATA
113 : */
114 : #define TWOPHASE_DIR "pg_twophase"
115 :
116 : /* GUC variable, can't be changed after startup */
117 : int max_prepared_xacts = 0;
118 :
119 : /*
120 : * This struct describes one global transaction that is in prepared state
121 : * or attempting to become prepared.
122 : *
123 : * The lifecycle of a global transaction is:
124 : *
125 : * 1. After checking that the requested GID is not in use, set up an entry in
126 : * the TwoPhaseState->prepXacts array with the correct GID and valid = false,
127 : * and mark it as locked by my backend.
128 : *
129 : * 2. After successfully completing prepare, set valid = true and enter the
130 : * referenced PGPROC into the global ProcArray.
131 : *
132 : * 3. To begin COMMIT PREPARED or ROLLBACK PREPARED, check that the entry is
133 : * valid and not locked, then mark the entry as locked by storing my current
134 : * proc number into locking_backend. This prevents concurrent attempts to
135 : * commit or rollback the same prepared xact.
136 : *
137 : * 4. On completion of COMMIT PREPARED or ROLLBACK PREPARED, remove the entry
138 : * from the ProcArray and the TwoPhaseState->prepXacts array and return it to
139 : * the freelist.
140 : *
141 : * Note that if the preparing transaction fails between steps 1 and 2, the
142 : * entry must be removed so that the GID and the GlobalTransaction struct
143 : * can be reused. See AtAbort_Twophase().
144 : *
145 : * typedef struct GlobalTransactionData *GlobalTransaction appears in
146 : * twophase.h
147 : */
148 :
149 : typedef struct GlobalTransactionData
150 : {
151 : GlobalTransaction next; /* list link for free list */
152 : int pgprocno; /* ID of associated dummy PGPROC */
153 : TimestampTz prepared_at; /* time of preparation */
154 :
155 : /*
156 : * Note that we need to keep track of two LSNs for each GXACT. We keep
157 : * track of the start LSN because this is the address we must use to read
158 : * state data back from WAL when committing a prepared GXACT. We keep
159 : * track of the end LSN because that is the LSN we need to wait for prior
160 : * to commit.
161 : */
162 : XLogRecPtr prepare_start_lsn; /* XLOG offset of prepare record start */
163 : XLogRecPtr prepare_end_lsn; /* XLOG offset of prepare record end */
164 : FullTransactionId fxid; /* The GXACT full xid */
165 :
166 : Oid owner; /* ID of user that executed the xact */
167 : ProcNumber locking_backend; /* backend currently working on the xact */
168 : bool valid; /* true if PGPROC entry is in proc array */
169 : bool ondisk; /* true if prepare state file is on disk */
170 : bool inredo; /* true if entry was added via xlog_redo */
171 : char gid[GIDSIZE]; /* The GID assigned to the prepared xact */
172 : } GlobalTransactionData;
173 :
174 : /*
175 : * Two Phase Commit shared state. Access to this struct is protected
176 : * by TwoPhaseStateLock.
177 : */
178 : typedef struct TwoPhaseStateData
179 : {
180 : /* Head of linked list of free GlobalTransactionData structs */
181 : GlobalTransaction freeGXacts;
182 :
183 : /* Number of valid prepXacts entries. */
184 : int numPrepXacts;
185 :
186 : /* There are max_prepared_xacts items in this array */
187 : GlobalTransaction prepXacts[FLEXIBLE_ARRAY_MEMBER];
188 : } TwoPhaseStateData;
189 :
190 : static TwoPhaseStateData *TwoPhaseState;
191 :
192 : /*
193 : * Global transaction entry currently locked by us, if any. Note that any
194 : * access to the entry pointed to by this variable must be protected by
195 : * TwoPhaseStateLock, though obviously the pointer itself doesn't need to be
196 : * (since it's just local memory).
197 : */
198 : static GlobalTransaction MyLockedGxact = NULL;
199 :
200 : static bool twophaseExitRegistered = false;
201 :
202 : static void PrepareRedoRemoveFull(FullTransactionId fxid, bool giveWarning);
203 : static void RecordTransactionCommitPrepared(TransactionId xid,
204 : int nchildren,
205 : TransactionId *children,
206 : int nrels,
207 : RelFileLocator *rels,
208 : int nstats,
209 : xl_xact_stats_item *stats,
210 : int ninvalmsgs,
211 : SharedInvalidationMessage *invalmsgs,
212 : bool initfileinval,
213 : const char *gid);
214 : static void RecordTransactionAbortPrepared(TransactionId xid,
215 : int nchildren,
216 : TransactionId *children,
217 : int nrels,
218 : RelFileLocator *rels,
219 : int nstats,
220 : xl_xact_stats_item *stats,
221 : const char *gid);
222 : static void ProcessRecords(char *bufptr, FullTransactionId fxid,
223 : const TwoPhaseCallback callbacks[]);
224 : static void RemoveGXact(GlobalTransaction gxact);
225 :
226 : static void XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len);
227 : static char *ProcessTwoPhaseBuffer(FullTransactionId fxid,
228 : XLogRecPtr prepare_start_lsn,
229 : bool fromdisk, bool setParent, bool setNextXid);
230 : static void MarkAsPreparingGuts(GlobalTransaction gxact, FullTransactionId fxid,
231 : const char *gid, TimestampTz prepared_at, Oid owner,
232 : Oid databaseid);
233 : static void RemoveTwoPhaseFile(FullTransactionId fxid, bool giveWarning);
234 : static void RecreateTwoPhaseFile(FullTransactionId fxid, void *content, int len);
235 :
236 : /*
237 : * Initialization of shared memory
238 : */
239 : Size
240 3393 : TwoPhaseShmemSize(void)
241 : {
242 : Size size;
243 :
244 : /* Need the fixed struct, the array of pointers, and the GTD structs */
245 3393 : size = offsetof(TwoPhaseStateData, prepXacts);
246 3393 : size = add_size(size, mul_size(max_prepared_xacts,
247 : sizeof(GlobalTransaction)));
248 3393 : size = MAXALIGN(size);
249 3393 : size = add_size(size, mul_size(max_prepared_xacts,
250 : sizeof(GlobalTransactionData)));
251 :
252 3393 : return size;
253 : }
254 :
255 : void
256 1182 : TwoPhaseShmemInit(void)
257 : {
258 : bool found;
259 :
260 1182 : TwoPhaseState = ShmemInitStruct("Prepared Transaction Table",
261 : TwoPhaseShmemSize(),
262 : &found);
263 1182 : if (!IsUnderPostmaster)
264 : {
265 : GlobalTransaction gxacts;
266 : int i;
267 :
268 : Assert(!found);
269 1182 : TwoPhaseState->freeGXacts = NULL;
270 1182 : TwoPhaseState->numPrepXacts = 0;
271 :
272 : /*
273 : * Initialize the linked list of free GlobalTransactionData structs
274 : */
275 1182 : gxacts = (GlobalTransaction)
276 1182 : ((char *) TwoPhaseState +
277 1182 : MAXALIGN(offsetof(TwoPhaseStateData, prepXacts) +
278 : sizeof(GlobalTransaction) * max_prepared_xacts));
279 2054 : for (i = 0; i < max_prepared_xacts; i++)
280 : {
281 : /* insert into linked list */
282 872 : gxacts[i].next = TwoPhaseState->freeGXacts;
283 872 : TwoPhaseState->freeGXacts = &gxacts[i];
284 :
285 : /* associate it with a PGPROC assigned by InitProcGlobal */
286 872 : gxacts[i].pgprocno = GetNumberFromPGProc(&PreparedXactProcs[i]);
287 : }
288 : }
289 : else
290 : Assert(found);
291 1182 : }
292 :
293 : /*
294 : * Exit hook to unlock the global transaction entry we're working on.
295 : */
296 : static void
297 148 : AtProcExit_Twophase(int code, Datum arg)
298 : {
299 : /* same logic as abort */
300 148 : AtAbort_Twophase();
301 148 : }
302 :
303 : /*
304 : * Abort hook to unlock the global transaction entry we're working on.
305 : */
306 : void
307 34560 : AtAbort_Twophase(void)
308 : {
309 34560 : if (MyLockedGxact == NULL)
310 34558 : return;
311 :
312 : /*
313 : * What to do with the locked global transaction entry? If we were in the
314 : * process of preparing the transaction, but haven't written the WAL
315 : * record and state file yet, the transaction must not be considered as
316 : * prepared. Likewise, if we are in the process of finishing an
317 : * already-prepared transaction, and fail after having already written the
318 : * 2nd phase commit or rollback record to the WAL, the transaction should
319 : * not be considered as prepared anymore. In those cases, just remove the
320 : * entry from shared memory.
321 : *
322 : * Otherwise, the entry must be left in place so that the transaction can
323 : * be finished later, so just unlock it.
324 : *
325 : * If we abort during prepare, after having written the WAL record, we
326 : * might not have transferred all locks and other state to the prepared
327 : * transaction yet. Likewise, if we abort during commit or rollback,
328 : * after having written the WAL record, we might not have released all the
329 : * resources held by the transaction yet. In those cases, the in-memory
330 : * state can be wrong, but it's too late to back out.
331 : */
332 2 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
333 2 : if (!MyLockedGxact->valid)
334 2 : RemoveGXact(MyLockedGxact);
335 : else
336 0 : MyLockedGxact->locking_backend = INVALID_PROC_NUMBER;
337 2 : LWLockRelease(TwoPhaseStateLock);
338 :
339 2 : MyLockedGxact = NULL;
340 : }
341 :
342 : /*
343 : * This is called after we have finished transferring state to the prepared
344 : * PGPROC entry.
345 : */
346 : void
347 358 : PostPrepare_Twophase(void)
348 : {
349 358 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
350 358 : MyLockedGxact->locking_backend = INVALID_PROC_NUMBER;
351 358 : LWLockRelease(TwoPhaseStateLock);
352 :
353 358 : MyLockedGxact = NULL;
354 358 : }
355 :
356 :
357 : /*
358 : * MarkAsPreparing
359 : * Reserve the GID for the given transaction.
360 : */
361 : GlobalTransaction
362 346 : MarkAsPreparing(FullTransactionId fxid, const char *gid,
363 : TimestampTz prepared_at, Oid owner, Oid databaseid)
364 : {
365 : GlobalTransaction gxact;
366 : int i;
367 :
368 346 : if (strlen(gid) >= GIDSIZE)
369 0 : ereport(ERROR,
370 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
371 : errmsg("transaction identifier \"%s\" is too long",
372 : gid)));
373 :
374 : /* fail immediately if feature is disabled */
375 346 : if (max_prepared_xacts == 0)
376 17 : ereport(ERROR,
377 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
378 : errmsg("prepared transactions are disabled"),
379 : errhint("Set \"max_prepared_transactions\" to a nonzero value.")));
380 :
381 : /* on first call, register the exit hook */
382 329 : if (!twophaseExitRegistered)
383 : {
384 83 : before_shmem_exit(AtProcExit_Twophase, 0);
385 83 : twophaseExitRegistered = true;
386 : }
387 :
388 329 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
389 :
390 : /* Check for conflicting GID */
391 587 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
392 : {
393 260 : gxact = TwoPhaseState->prepXacts[i];
394 260 : if (strcmp(gxact->gid, gid) == 0)
395 : {
396 2 : ereport(ERROR,
397 : (errcode(ERRCODE_DUPLICATE_OBJECT),
398 : errmsg("transaction identifier \"%s\" is already in use",
399 : gid)));
400 : }
401 : }
402 :
403 : /* Get a free gxact from the freelist */
404 327 : if (TwoPhaseState->freeGXacts == NULL)
405 0 : ereport(ERROR,
406 : (errcode(ERRCODE_OUT_OF_MEMORY),
407 : errmsg("maximum number of prepared transactions reached"),
408 : errhint("Increase \"max_prepared_transactions\" (currently %d).",
409 : max_prepared_xacts)));
410 327 : gxact = TwoPhaseState->freeGXacts;
411 327 : TwoPhaseState->freeGXacts = gxact->next;
412 :
413 327 : MarkAsPreparingGuts(gxact, fxid, gid, prepared_at, owner, databaseid);
414 :
415 327 : gxact->ondisk = false;
416 :
417 : /* And insert it into the active array */
418 : Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
419 327 : TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;
420 :
421 327 : LWLockRelease(TwoPhaseStateLock);
422 :
423 327 : return gxact;
424 : }
425 :
426 : /*
427 : * MarkAsPreparingGuts
428 : *
429 : * This uses a gxact struct and puts it into the active array.
430 : * NOTE: this is also used when reloading a gxact after a crash; so avoid
431 : * assuming that we can use very much backend context.
432 : *
433 : * Note: This function should be called with appropriate locks held.
434 : */
435 : static void
436 360 : MarkAsPreparingGuts(GlobalTransaction gxact, FullTransactionId fxid,
437 : const char *gid, TimestampTz prepared_at, Oid owner,
438 : Oid databaseid)
439 : {
440 : PGPROC *proc;
441 : int i;
442 360 : TransactionId xid = XidFromFullTransactionId(fxid);
443 :
444 : Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
445 :
446 : Assert(gxact != NULL);
447 360 : proc = GetPGProcByNumber(gxact->pgprocno);
448 :
449 : /* Initialize the PGPROC entry */
450 40680 : MemSet(proc, 0, sizeof(PGPROC));
451 360 : proc->waitStatus = PROC_WAIT_STATUS_OK;
452 360 : if (LocalTransactionIdIsValid(MyProc->vxid.lxid))
453 : {
454 : /* clone VXID, for TwoPhaseGetXidByVirtualXID() to find */
455 327 : proc->vxid.lxid = MyProc->vxid.lxid;
456 327 : proc->vxid.procNumber = MyProcNumber;
457 : }
458 : else
459 : {
460 : Assert(AmStartupProcess() || !IsPostmasterEnvironment);
461 : /* GetLockConflicts() uses this to specify a wait on the XID */
462 33 : proc->vxid.lxid = xid;
463 33 : proc->vxid.procNumber = INVALID_PROC_NUMBER;
464 : }
465 360 : proc->xid = xid;
466 : Assert(proc->xmin == InvalidTransactionId);
467 360 : proc->delayChkptFlags = 0;
468 360 : proc->statusFlags = 0;
469 360 : proc->pid = 0;
470 360 : proc->databaseId = databaseid;
471 360 : proc->roleId = owner;
472 360 : proc->tempNamespaceId = InvalidOid;
473 360 : proc->backendType = B_INVALID;
474 360 : proc->lwWaiting = LW_WS_NOT_WAITING;
475 360 : proc->lwWaitMode = 0;
476 360 : proc->waitLock = NULL;
477 360 : dlist_node_init(&proc->waitLink);
478 360 : proc->waitProcLock = NULL;
479 360 : pg_atomic_init_u64(&proc->waitStart, 0);
480 6120 : for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
481 5760 : dlist_init(&proc->myProcLocks[i]);
482 : /* subxid data must be filled later by GXactLoadSubxactData */
483 360 : proc->subxidStatus.overflowed = false;
484 360 : proc->subxidStatus.count = 0;
485 :
486 360 : gxact->prepared_at = prepared_at;
487 360 : gxact->fxid = fxid;
488 360 : gxact->owner = owner;
489 360 : gxact->locking_backend = MyProcNumber;
490 360 : gxact->valid = false;
491 360 : gxact->inredo = false;
492 360 : strcpy(gxact->gid, gid);
493 :
494 : /*
495 : * Remember that we have this GlobalTransaction entry locked for us. If we
496 : * abort after this, we must release it.
497 : */
498 360 : MyLockedGxact = gxact;
499 360 : }
500 :
501 : /*
502 : * GXactLoadSubxactData
503 : *
504 : * If the transaction being persisted had any subtransactions, this must
505 : * be called before MarkAsPrepared() to load information into the dummy
506 : * PGPROC.
507 : */
508 : static void
509 142 : GXactLoadSubxactData(GlobalTransaction gxact, int nsubxacts,
510 : TransactionId *children)
511 : {
512 142 : PGPROC *proc = GetPGProcByNumber(gxact->pgprocno);
513 :
514 : /* We need no extra lock since the GXACT isn't valid yet */
515 142 : if (nsubxacts > PGPROC_MAX_CACHED_SUBXIDS)
516 : {
517 4 : proc->subxidStatus.overflowed = true;
518 4 : nsubxacts = PGPROC_MAX_CACHED_SUBXIDS;
519 : }
520 142 : if (nsubxacts > 0)
521 : {
522 125 : memcpy(proc->subxids.xids, children,
523 : nsubxacts * sizeof(TransactionId));
524 125 : proc->subxidStatus.count = nsubxacts;
525 : }
526 142 : }
527 :
528 : /*
529 : * MarkAsPrepared
530 : * Mark the GXACT as fully valid, and enter it into the global ProcArray.
531 : *
532 : * lock_held indicates whether caller already holds TwoPhaseStateLock.
533 : */
534 : static void
535 358 : MarkAsPrepared(GlobalTransaction gxact, bool lock_held)
536 : {
537 : /* Lock here may be overkill, but I'm not convinced of that ... */
538 358 : if (!lock_held)
539 325 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
540 : Assert(!gxact->valid);
541 358 : gxact->valid = true;
542 358 : if (!lock_held)
543 325 : LWLockRelease(TwoPhaseStateLock);
544 :
545 : /*
546 : * Put it into the global ProcArray so TransactionIdIsInProgress considers
547 : * the XID as still running.
548 : */
549 358 : ProcArrayAdd(GetPGProcByNumber(gxact->pgprocno));
550 358 : }
551 :
552 : /*
553 : * LockGXact
554 : * Locate the prepared transaction and mark it busy for COMMIT or PREPARE.
555 : */
556 : static GlobalTransaction
557 344 : LockGXact(const char *gid, Oid user)
558 : {
559 : int i;
560 :
561 : /* on first call, register the exit hook */
562 344 : if (!twophaseExitRegistered)
563 : {
564 65 : before_shmem_exit(AtProcExit_Twophase, 0);
565 65 : twophaseExitRegistered = true;
566 : }
567 :
568 344 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
569 :
570 557 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
571 : {
572 543 : GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
573 543 : PGPROC *proc = GetPGProcByNumber(gxact->pgprocno);
574 :
575 : /* Ignore not-yet-valid GIDs */
576 543 : if (!gxact->valid)
577 1 : continue;
578 542 : if (strcmp(gxact->gid, gid) != 0)
579 212 : continue;
580 :
581 : /* Found it, but has someone else got it locked? */
582 330 : if (gxact->locking_backend != INVALID_PROC_NUMBER)
583 0 : ereport(ERROR,
584 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
585 : errmsg("prepared transaction with identifier \"%s\" is busy",
586 : gid)));
587 :
588 330 : if (user != gxact->owner && !superuser_arg(user))
589 0 : ereport(ERROR,
590 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
591 : errmsg("permission denied to finish prepared transaction"),
592 : errhint("Must be superuser or the user that prepared the transaction.")));
593 :
594 : /*
595 : * Note: it probably would be possible to allow committing from
596 : * another database; but at the moment NOTIFY is known not to work and
597 : * there may be some other issues as well. Hence disallow until
598 : * someone gets motivated to make it work.
599 : */
600 330 : if (MyDatabaseId != proc->databaseId)
601 0 : ereport(ERROR,
602 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
603 : errmsg("prepared transaction belongs to another database"),
604 : errhint("Connect to the database where the transaction was prepared to finish it.")));
605 :
606 : /* OK for me to lock it */
607 330 : gxact->locking_backend = MyProcNumber;
608 330 : MyLockedGxact = gxact;
609 :
610 330 : LWLockRelease(TwoPhaseStateLock);
611 :
612 330 : return gxact;
613 : }
614 :
615 14 : LWLockRelease(TwoPhaseStateLock);
616 :
617 14 : ereport(ERROR,
618 : (errcode(ERRCODE_UNDEFINED_OBJECT),
619 : errmsg("prepared transaction with identifier \"%s\" does not exist",
620 : gid)));
621 :
622 : /* NOTREACHED */
623 : return NULL;
624 : }
625 :
626 : /*
627 : * RemoveGXact
628 : * Remove the prepared transaction from the shared memory array.
629 : *
630 : * NB: caller should have already removed it from ProcArray
631 : */
632 : static void
633 395 : RemoveGXact(GlobalTransaction gxact)
634 : {
635 : int i;
636 :
637 : Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
638 :
639 599 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
640 : {
641 599 : if (gxact == TwoPhaseState->prepXacts[i])
642 : {
643 : /* remove from the active array */
644 395 : TwoPhaseState->numPrepXacts--;
645 395 : TwoPhaseState->prepXacts[i] = TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts];
646 :
647 : /* and put it back in the freelist */
648 395 : gxact->next = TwoPhaseState->freeGXacts;
649 395 : TwoPhaseState->freeGXacts = gxact;
650 :
651 395 : return;
652 : }
653 : }
654 :
655 0 : elog(ERROR, "failed to find %p in GlobalTransaction array", gxact);
656 : }
657 :
658 : /*
659 : * Returns an array of all prepared transactions for the user-level
660 : * function pg_prepared_xact.
661 : *
662 : * The returned array and all its elements are copies of internal data
663 : * structures, to minimize the time we need to hold the TwoPhaseStateLock.
664 : *
665 : * WARNING -- we return even those transactions that are not fully prepared
666 : * yet. The caller should filter them out if he doesn't want them.
667 : *
668 : * The returned array is palloc'd.
669 : */
670 : static int
671 111 : GetPreparedTransactionList(GlobalTransaction *gxacts)
672 : {
673 : GlobalTransaction array;
674 : int num;
675 : int i;
676 :
677 111 : LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
678 :
679 111 : if (TwoPhaseState->numPrepXacts == 0)
680 : {
681 70 : LWLockRelease(TwoPhaseStateLock);
682 :
683 70 : *gxacts = NULL;
684 70 : return 0;
685 : }
686 :
687 41 : num = TwoPhaseState->numPrepXacts;
688 41 : array = palloc_array(GlobalTransactionData, num);
689 41 : *gxacts = array;
690 87 : for (i = 0; i < num; i++)
691 46 : memcpy(array + i, TwoPhaseState->prepXacts[i],
692 : sizeof(GlobalTransactionData));
693 :
694 41 : LWLockRelease(TwoPhaseStateLock);
695 :
696 41 : return num;
697 : }
698 :
699 :
700 : /* Working status for pg_prepared_xact */
701 : typedef struct
702 : {
703 : GlobalTransaction array;
704 : int ngxacts;
705 : int currIdx;
706 : } Working_State;
707 :
708 : /*
709 : * pg_prepared_xact
710 : * Produce a view with one row per prepared transaction.
711 : *
712 : * This function is here so we don't have to export the
713 : * GlobalTransactionData struct definition.
714 : */
715 : Datum
716 157 : pg_prepared_xact(PG_FUNCTION_ARGS)
717 : {
718 : FuncCallContext *funcctx;
719 : Working_State *status;
720 :
721 157 : if (SRF_IS_FIRSTCALL())
722 : {
723 : TupleDesc tupdesc;
724 : MemoryContext oldcontext;
725 :
726 : /* create a function context for cross-call persistence */
727 111 : funcctx = SRF_FIRSTCALL_INIT();
728 :
729 : /*
730 : * Switch to memory context appropriate for multiple function calls
731 : */
732 111 : oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
733 :
734 : /* build tupdesc for result tuples */
735 : /* this had better match pg_prepared_xacts view in system_views.sql */
736 111 : tupdesc = CreateTemplateTupleDesc(5);
737 111 : TupleDescInitEntry(tupdesc, (AttrNumber) 1, "transaction",
738 : XIDOID, -1, 0);
739 111 : TupleDescInitEntry(tupdesc, (AttrNumber) 2, "gid",
740 : TEXTOID, -1, 0);
741 111 : TupleDescInitEntry(tupdesc, (AttrNumber) 3, "prepared",
742 : TIMESTAMPTZOID, -1, 0);
743 111 : TupleDescInitEntry(tupdesc, (AttrNumber) 4, "ownerid",
744 : OIDOID, -1, 0);
745 111 : TupleDescInitEntry(tupdesc, (AttrNumber) 5, "dbid",
746 : OIDOID, -1, 0);
747 :
748 111 : TupleDescFinalize(tupdesc);
749 111 : funcctx->tuple_desc = BlessTupleDesc(tupdesc);
750 :
751 : /*
752 : * Collect all the 2PC status information that we will format and send
753 : * out as a result set.
754 : */
755 111 : status = palloc_object(Working_State);
756 111 : funcctx->user_fctx = status;
757 :
758 111 : status->ngxacts = GetPreparedTransactionList(&status->array);
759 111 : status->currIdx = 0;
760 :
761 111 : MemoryContextSwitchTo(oldcontext);
762 : }
763 :
764 157 : funcctx = SRF_PERCALL_SETUP();
765 157 : status = (Working_State *) funcctx->user_fctx;
766 :
767 157 : while (status->array != NULL && status->currIdx < status->ngxacts)
768 : {
769 46 : GlobalTransaction gxact = &status->array[status->currIdx++];
770 46 : PGPROC *proc = GetPGProcByNumber(gxact->pgprocno);
771 46 : Datum values[5] = {0};
772 46 : bool nulls[5] = {0};
773 : HeapTuple tuple;
774 : Datum result;
775 :
776 46 : if (!gxact->valid)
777 0 : continue;
778 :
779 : /*
780 : * Form tuple with appropriate data.
781 : */
782 :
783 46 : values[0] = TransactionIdGetDatum(proc->xid);
784 46 : values[1] = CStringGetTextDatum(gxact->gid);
785 46 : values[2] = TimestampTzGetDatum(gxact->prepared_at);
786 46 : values[3] = ObjectIdGetDatum(gxact->owner);
787 46 : values[4] = ObjectIdGetDatum(proc->databaseId);
788 :
789 46 : tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls);
790 46 : result = HeapTupleGetDatum(tuple);
791 46 : SRF_RETURN_NEXT(funcctx, result);
792 : }
793 :
794 111 : SRF_RETURN_DONE(funcctx);
795 : }
796 :
797 : /*
798 : * TwoPhaseGetGXact
799 : * Get the GlobalTransaction struct for a prepared transaction
800 : * specified by XID
801 : *
802 : * If lock_held is set to true, TwoPhaseStateLock will not be taken, so the
803 : * caller had better hold it.
804 : */
805 : static GlobalTransaction
806 1431 : TwoPhaseGetGXact(FullTransactionId fxid, bool lock_held)
807 : {
808 1431 : GlobalTransaction result = NULL;
809 : int i;
810 :
811 : static FullTransactionId cached_fxid = {InvalidTransactionId};
812 : static GlobalTransaction cached_gxact = NULL;
813 :
814 : Assert(!lock_held || LWLockHeldByMe(TwoPhaseStateLock));
815 :
816 : /*
817 : * During a recovery, COMMIT PREPARED, or ABORT PREPARED, we'll be called
818 : * repeatedly for the same XID. We can save work with a simple cache.
819 : */
820 1431 : if (FullTransactionIdEquals(fxid, cached_fxid))
821 995 : return cached_gxact;
822 :
823 436 : if (!lock_held)
824 358 : LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
825 :
826 696 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
827 : {
828 696 : GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
829 :
830 696 : if (FullTransactionIdEquals(gxact->fxid, fxid))
831 : {
832 436 : result = gxact;
833 436 : break;
834 : }
835 : }
836 :
837 436 : if (!lock_held)
838 358 : LWLockRelease(TwoPhaseStateLock);
839 :
840 436 : if (result == NULL) /* should not happen */
841 0 : elog(ERROR, "failed to find GlobalTransaction for xid %u",
842 : XidFromFullTransactionId(fxid));
843 :
844 436 : cached_fxid = fxid;
845 436 : cached_gxact = result;
846 :
847 436 : return result;
848 : }
849 :
850 : /*
851 : * TwoPhaseGetXidByVirtualXID
852 : * Lookup VXID among xacts prepared since last startup.
853 : *
854 : * (This won't find recovered xacts.) If more than one matches, return any
855 : * and set "have_more" to true. To witness multiple matches, a single
856 : * proc number must consume 2^32 LXIDs, with no intervening database restart.
857 : */
858 : TransactionId
859 84 : TwoPhaseGetXidByVirtualXID(VirtualTransactionId vxid,
860 : bool *have_more)
861 : {
862 : int i;
863 84 : TransactionId result = InvalidTransactionId;
864 :
865 : Assert(VirtualTransactionIdIsValid(vxid));
866 84 : LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
867 :
868 144 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
869 : {
870 60 : GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
871 : PGPROC *proc;
872 : VirtualTransactionId proc_vxid;
873 :
874 60 : if (!gxact->valid)
875 1 : continue;
876 59 : proc = GetPGProcByNumber(gxact->pgprocno);
877 59 : GET_VXID_FROM_PGPROC(proc_vxid, *proc);
878 59 : if (VirtualTransactionIdEquals(vxid, proc_vxid))
879 : {
880 : /*
881 : * Startup process sets proc->vxid.procNumber to
882 : * INVALID_PROC_NUMBER.
883 : */
884 : Assert(!gxact->inredo);
885 :
886 16 : if (result != InvalidTransactionId)
887 : {
888 0 : *have_more = true;
889 0 : break;
890 : }
891 16 : result = XidFromFullTransactionId(gxact->fxid);
892 : }
893 : }
894 :
895 84 : LWLockRelease(TwoPhaseStateLock);
896 :
897 84 : return result;
898 : }
899 :
900 : /*
901 : * TwoPhaseGetDummyProcNumber
902 : * Get the dummy proc number for prepared transaction
903 : *
904 : * Dummy proc numbers are similar to proc numbers of real backends. They
905 : * start at FIRST_PREPARED_XACT_PROC_NUMBER, and are unique across all
906 : * currently active real backends and prepared transactions. If lock_held is
907 : * set to true, TwoPhaseStateLock will not be taken, so the caller had better
908 : * hold it.
909 : */
910 : ProcNumber
911 150 : TwoPhaseGetDummyProcNumber(FullTransactionId fxid, bool lock_held)
912 : {
913 150 : GlobalTransaction gxact = TwoPhaseGetGXact(fxid, lock_held);
914 :
915 150 : return gxact->pgprocno;
916 : }
917 :
918 : /*
919 : * TwoPhaseGetDummyProc
920 : * Get the PGPROC that represents a prepared transaction
921 : *
922 : * If lock_held is set to true, TwoPhaseStateLock will not be taken, so the
923 : * caller had better hold it.
924 : */
925 : PGPROC *
926 1281 : TwoPhaseGetDummyProc(FullTransactionId fxid, bool lock_held)
927 : {
928 1281 : GlobalTransaction gxact = TwoPhaseGetGXact(fxid, lock_held);
929 :
930 1281 : return GetPGProcByNumber(gxact->pgprocno);
931 : }
932 :
933 : /************************************************************************/
934 : /* State file support */
935 : /************************************************************************/
936 :
937 : /*
938 : * Compute the FullTransactionId for the given TransactionId.
939 : *
940 : * This is safe if the xid has not yet reached COMMIT PREPARED or ROLLBACK
941 : * PREPARED. After those commands, concurrent vac_truncate_clog() may make
942 : * the xid cease to qualify as allowable. XXX Not all callers limit their
943 : * calls accordingly.
944 : */
945 : static inline FullTransactionId
946 292 : AdjustToFullTransactionId(TransactionId xid)
947 : {
948 : Assert(TransactionIdIsValid(xid));
949 292 : return FullTransactionIdFromAllowableAt(ReadNextFullTransactionId(), xid);
950 : }
951 :
952 : static inline int
953 511 : TwoPhaseFilePath(char *path, FullTransactionId fxid)
954 : {
955 1022 : return snprintf(path, MAXPGPATH, TWOPHASE_DIR "/%08X%08X",
956 511 : EpochFromFullTransactionId(fxid),
957 511 : XidFromFullTransactionId(fxid));
958 : }
959 :
960 : /*
961 : * 2PC state file format:
962 : *
963 : * 1. TwoPhaseFileHeader
964 : * 2. TransactionId[] (subtransactions)
965 : * 3. RelFileLocator[] (files to be deleted at commit)
966 : * 4. RelFileLocator[] (files to be deleted at abort)
967 : * 5. SharedInvalidationMessage[] (inval messages to be sent at commit)
968 : * 6. TwoPhaseRecordOnDisk
969 : * 7. ...
970 : * 8. TwoPhaseRecordOnDisk (end sentinel, rmid == TWOPHASE_RM_END_ID)
971 : * 9. checksum (CRC-32C)
972 : *
973 : * Each segment except the final checksum is MAXALIGN'd.
974 : */
975 :
976 : /*
977 : * Header for a 2PC state file
978 : */
979 : #define TWOPHASE_MAGIC 0x57F94534 /* format identifier */
980 :
981 : typedef xl_xact_prepare TwoPhaseFileHeader;
982 :
983 : /*
984 : * Header for each record in a state file
985 : *
986 : * NOTE: len counts only the rmgr data, not the TwoPhaseRecordOnDisk header.
987 : * The rmgr data will be stored starting on a MAXALIGN boundary.
988 : */
989 : typedef struct TwoPhaseRecordOnDisk
990 : {
991 : uint32 len; /* length of rmgr data */
992 : TwoPhaseRmgrId rmid; /* resource manager for this record */
993 : uint16 info; /* flag bits for use by rmgr */
994 : } TwoPhaseRecordOnDisk;
995 :
996 : /*
997 : * During prepare, the state file is assembled in memory before writing it
998 : * to WAL and the actual state file. We use a chain of StateFileChunk blocks
999 : * for that.
1000 : */
1001 : typedef struct StateFileChunk
1002 : {
1003 : char *data;
1004 : uint32 len;
1005 : struct StateFileChunk *next;
1006 : } StateFileChunk;
1007 :
1008 : static struct xllist
1009 : {
1010 : StateFileChunk *head; /* first data block in the chain */
1011 : StateFileChunk *tail; /* last block in chain */
1012 : uint32 num_chunks;
1013 : uint32 bytes_free; /* free bytes left in tail block */
1014 : uint32 total_len; /* total data bytes in chain */
1015 : } records;
1016 :
1017 :
1018 : /*
1019 : * Append a block of data to records data structure.
1020 : *
1021 : * NB: each block is padded to a MAXALIGN multiple. This must be
1022 : * accounted for when the file is later read!
1023 : *
1024 : * The data is copied, so the caller is free to modify it afterwards.
1025 : */
1026 : static void
1027 4031 : save_state_data(const void *data, uint32 len)
1028 : {
1029 4031 : uint32 padlen = MAXALIGN(len);
1030 :
1031 4031 : if (padlen > records.bytes_free)
1032 : {
1033 85 : records.tail->next = palloc0_object(StateFileChunk);
1034 85 : records.tail = records.tail->next;
1035 85 : records.tail->len = 0;
1036 85 : records.tail->next = NULL;
1037 85 : records.num_chunks++;
1038 :
1039 85 : records.bytes_free = Max(padlen, 512);
1040 85 : records.tail->data = palloc(records.bytes_free);
1041 : }
1042 :
1043 4031 : memcpy(records.tail->data + records.tail->len, data, len);
1044 4031 : records.tail->len += padlen;
1045 4031 : records.bytes_free -= padlen;
1046 4031 : records.total_len += padlen;
1047 4031 : }
1048 :
1049 : /*
1050 : * Start preparing a state file.
1051 : *
1052 : * Initializes data structure and inserts the 2PC file header record.
1053 : */
1054 : void
1055 327 : StartPrepare(GlobalTransaction gxact)
1056 : {
1057 327 : PGPROC *proc = GetPGProcByNumber(gxact->pgprocno);
1058 327 : TransactionId xid = XidFromFullTransactionId(gxact->fxid);
1059 : TwoPhaseFileHeader hdr;
1060 : TransactionId *children;
1061 : RelFileLocator *commitrels;
1062 : RelFileLocator *abortrels;
1063 327 : xl_xact_stats_item *abortstats = NULL;
1064 327 : xl_xact_stats_item *commitstats = NULL;
1065 : SharedInvalidationMessage *invalmsgs;
1066 :
1067 : /* Initialize linked list */
1068 327 : records.head = palloc0_object(StateFileChunk);
1069 327 : records.head->len = 0;
1070 327 : records.head->next = NULL;
1071 :
1072 327 : records.bytes_free = Max(sizeof(TwoPhaseFileHeader), 512);
1073 327 : records.head->data = palloc(records.bytes_free);
1074 :
1075 327 : records.tail = records.head;
1076 327 : records.num_chunks = 1;
1077 :
1078 327 : records.total_len = 0;
1079 :
1080 : /* Create header */
1081 327 : hdr.magic = TWOPHASE_MAGIC;
1082 327 : hdr.total_len = 0; /* EndPrepare will fill this in */
1083 327 : hdr.xid = xid;
1084 327 : hdr.database = proc->databaseId;
1085 327 : hdr.prepared_at = gxact->prepared_at;
1086 327 : hdr.owner = gxact->owner;
1087 327 : hdr.nsubxacts = xactGetCommittedChildren(&children);
1088 327 : hdr.ncommitrels = smgrGetPendingDeletes(true, &commitrels);
1089 327 : hdr.nabortrels = smgrGetPendingDeletes(false, &abortrels);
1090 327 : hdr.ncommitstats =
1091 327 : pgstat_get_transactional_drops(true, &commitstats);
1092 327 : hdr.nabortstats =
1093 327 : pgstat_get_transactional_drops(false, &abortstats);
1094 327 : hdr.ninvalmsgs = xactGetCommittedInvalidationMessages(&invalmsgs,
1095 : &hdr.initfileinval);
1096 327 : hdr.gidlen = strlen(gxact->gid) + 1; /* Include '\0' */
1097 : /* EndPrepare will fill the origin data, if necessary */
1098 327 : hdr.origin_lsn = InvalidXLogRecPtr;
1099 327 : hdr.origin_timestamp = 0;
1100 :
1101 327 : save_state_data(&hdr, sizeof(TwoPhaseFileHeader));
1102 327 : save_state_data(gxact->gid, hdr.gidlen);
1103 :
1104 : /*
1105 : * Add the additional info about subxacts, deletable files and cache
1106 : * invalidation messages.
1107 : */
1108 327 : if (hdr.nsubxacts > 0)
1109 : {
1110 109 : save_state_data(children, hdr.nsubxacts * sizeof(TransactionId));
1111 : /* While we have the child-xact data, stuff it in the gxact too */
1112 109 : GXactLoadSubxactData(gxact, hdr.nsubxacts, children);
1113 : }
1114 327 : if (hdr.ncommitrels > 0)
1115 : {
1116 21 : save_state_data(commitrels, hdr.ncommitrels * sizeof(RelFileLocator));
1117 21 : pfree(commitrels);
1118 : }
1119 327 : if (hdr.nabortrels > 0)
1120 : {
1121 31 : save_state_data(abortrels, hdr.nabortrels * sizeof(RelFileLocator));
1122 31 : pfree(abortrels);
1123 : }
1124 327 : if (hdr.ncommitstats > 0)
1125 : {
1126 21 : save_state_data(commitstats,
1127 21 : hdr.ncommitstats * sizeof(xl_xact_stats_item));
1128 21 : pfree(commitstats);
1129 : }
1130 327 : if (hdr.nabortstats > 0)
1131 : {
1132 27 : save_state_data(abortstats,
1133 27 : hdr.nabortstats * sizeof(xl_xact_stats_item));
1134 27 : pfree(abortstats);
1135 : }
1136 327 : if (hdr.ninvalmsgs > 0)
1137 : {
1138 39 : save_state_data(invalmsgs,
1139 39 : hdr.ninvalmsgs * sizeof(SharedInvalidationMessage));
1140 39 : pfree(invalmsgs);
1141 : }
1142 327 : }
1143 :
1144 : /*
1145 : * Finish preparing state data and writing it to WAL.
1146 : */
1147 : void
1148 325 : EndPrepare(GlobalTransaction gxact)
1149 : {
1150 : TwoPhaseFileHeader *hdr;
1151 : StateFileChunk *record;
1152 : bool replorigin;
1153 :
1154 : /* Add the end sentinel to the list of 2PC records */
1155 325 : RegisterTwoPhaseRecord(TWOPHASE_RM_END_ID, 0,
1156 : NULL, 0);
1157 :
1158 : /* Go back and fill in total_len in the file header record */
1159 325 : hdr = (TwoPhaseFileHeader *) records.head->data;
1160 : Assert(hdr->magic == TWOPHASE_MAGIC);
1161 325 : hdr->total_len = records.total_len + sizeof(pg_crc32c);
1162 :
1163 351 : replorigin = (replorigin_xact_state.origin != InvalidReplOriginId &&
1164 26 : replorigin_xact_state.origin != DoNotReplicateId);
1165 :
1166 325 : if (replorigin)
1167 : {
1168 26 : hdr->origin_lsn = replorigin_xact_state.origin_lsn;
1169 26 : hdr->origin_timestamp = replorigin_xact_state.origin_timestamp;
1170 : }
1171 :
1172 : /*
1173 : * If the data size exceeds MaxAllocSize, we won't be able to read it in
1174 : * ReadTwoPhaseFile. Check for that now, rather than fail in the case
1175 : * where we write data to file and then re-read at commit time.
1176 : */
1177 325 : if (hdr->total_len > MaxAllocSize)
1178 0 : ereport(ERROR,
1179 : (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
1180 : errmsg("two-phase state file maximum length exceeded")));
1181 :
1182 : /*
1183 : * Now writing 2PC state data to WAL. We let the WAL's CRC protection
1184 : * cover us, so no need to calculate a separate CRC.
1185 : *
1186 : * We have to set DELAY_CHKPT_START here, too; otherwise a checkpoint
1187 : * starting immediately after the WAL record is inserted could complete
1188 : * without fsync'ing our state file. (This is essentially the same kind
1189 : * of race condition as the COMMIT-to-clog-write case that
1190 : * RecordTransactionCommit uses DELAY_CHKPT_IN_COMMIT for; see notes
1191 : * there.) Note that DELAY_CHKPT_IN_COMMIT is used to find transactions in
1192 : * the critical commit section. We need to know about such transactions
1193 : * for conflict detection in logical replication. See
1194 : * GetOldestActiveTransactionId(true, false) and its use.
1195 : *
1196 : * We save the PREPARE record's location in the gxact for later use by
1197 : * CheckPointTwoPhase.
1198 : */
1199 325 : XLogEnsureRecordSpace(0, records.num_chunks);
1200 :
1201 325 : START_CRIT_SECTION();
1202 :
1203 : Assert((MyProc->delayChkptFlags & DELAY_CHKPT_START) == 0);
1204 325 : MyProc->delayChkptFlags |= DELAY_CHKPT_START;
1205 :
1206 325 : XLogBeginInsert();
1207 735 : for (record = records.head; record != NULL; record = record->next)
1208 410 : XLogRegisterData(record->data, record->len);
1209 :
1210 325 : XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
1211 :
1212 325 : gxact->prepare_end_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE);
1213 :
1214 325 : if (replorigin)
1215 : {
1216 : /* Move LSNs forward for this replication origin */
1217 26 : replorigin_session_advance(replorigin_xact_state.origin_lsn,
1218 : gxact->prepare_end_lsn);
1219 : }
1220 :
1221 325 : XLogFlush(gxact->prepare_end_lsn);
1222 :
1223 : /* If we crash now, we have prepared: WAL replay will fix things */
1224 :
1225 : /* Store record's start location to read that later on Commit */
1226 325 : gxact->prepare_start_lsn = ProcLastRecPtr;
1227 :
1228 : /*
1229 : * Mark the prepared transaction as valid. As soon as xact.c marks MyProc
1230 : * as not running our XID (which it will do immediately after this
1231 : * function returns), others can commit/rollback the xact.
1232 : *
1233 : * NB: a side effect of this is to make a dummy ProcArray entry for the
1234 : * prepared XID. This must happen before we clear the XID from MyProc /
1235 : * ProcGlobal->xids[], else there is a window where the XID is not running
1236 : * according to TransactionIdIsInProgress, and onlookers would be entitled
1237 : * to assume the xact crashed. Instead we have a window where the same
1238 : * XID appears twice in ProcArray, which is OK.
1239 : */
1240 325 : MarkAsPrepared(gxact, false);
1241 :
1242 : /*
1243 : * Now we can mark ourselves as out of the commit critical section: a
1244 : * checkpoint starting after this will certainly see the gxact as a
1245 : * candidate for fsyncing.
1246 : */
1247 325 : MyProc->delayChkptFlags &= ~DELAY_CHKPT_START;
1248 :
1249 : /*
1250 : * Remember that we have this GlobalTransaction entry locked for us. If
1251 : * we crash after this point, it's too late to abort, but we must unlock
1252 : * it so that the prepared transaction can be committed or rolled back.
1253 : */
1254 325 : MyLockedGxact = gxact;
1255 :
1256 325 : END_CRIT_SECTION();
1257 :
1258 : /*
1259 : * Wait for synchronous replication, if required.
1260 : *
1261 : * Note that at this stage we have marked the prepare, but still show as
1262 : * running in the procarray (twice!) and continue to hold locks.
1263 : */
1264 325 : SyncRepWaitForLSN(gxact->prepare_end_lsn, false);
1265 :
1266 325 : records.tail = records.head = NULL;
1267 325 : records.num_chunks = 0;
1268 325 : }
1269 :
1270 : /*
1271 : * Register a 2PC record to be written to state file.
1272 : */
1273 : void
1274 1727 : RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, uint16 info,
1275 : const void *data, uint32 len)
1276 : {
1277 : TwoPhaseRecordOnDisk record;
1278 :
1279 1727 : record.rmid = rmid;
1280 1727 : record.info = info;
1281 1727 : record.len = len;
1282 1727 : save_state_data(&record, sizeof(TwoPhaseRecordOnDisk));
1283 1727 : if (len > 0)
1284 1402 : save_state_data(data, len);
1285 1727 : }
1286 :
1287 :
1288 : /*
1289 : * Read and validate the state file for xid.
1290 : *
1291 : * If it looks OK (has a valid magic number and CRC), return the palloc'd
1292 : * contents of the file, issuing an error when finding corrupted data. If
1293 : * missing_ok is true, which indicates that missing files can be safely
1294 : * ignored, then return NULL. This state can be reached when doing recovery
1295 : * after discarding two-phase files from frozen epochs.
1296 : */
1297 : static char *
1298 370 : ReadTwoPhaseFile(FullTransactionId fxid, bool missing_ok)
1299 : {
1300 : char path[MAXPGPATH];
1301 : char *buf;
1302 : TwoPhaseFileHeader *hdr;
1303 : int fd;
1304 : struct stat stat;
1305 : uint32 crc_offset;
1306 : pg_crc32c calc_crc,
1307 : file_crc;
1308 : int r;
1309 :
1310 370 : TwoPhaseFilePath(path, fxid);
1311 :
1312 370 : fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
1313 370 : if (fd < 0)
1314 : {
1315 292 : if (missing_ok && errno == ENOENT)
1316 292 : return NULL;
1317 :
1318 0 : ereport(ERROR,
1319 : (errcode_for_file_access(),
1320 : errmsg("could not open file \"%s\": %m", path)));
1321 : }
1322 :
1323 : /*
1324 : * Check file length. We can determine a lower bound pretty easily. We
1325 : * set an upper bound to avoid palloc() failure on a corrupt file, though
1326 : * we can't guarantee that we won't get an out of memory error anyway,
1327 : * even on a valid file.
1328 : */
1329 78 : if (fstat(fd, &stat))
1330 0 : ereport(ERROR,
1331 : (errcode_for_file_access(),
1332 : errmsg("could not stat file \"%s\": %m", path)));
1333 :
1334 78 : if (stat.st_size < (MAXALIGN(sizeof(TwoPhaseFileHeader)) +
1335 : MAXALIGN(sizeof(TwoPhaseRecordOnDisk)) +
1336 78 : sizeof(pg_crc32c)) ||
1337 78 : stat.st_size > MaxAllocSize)
1338 0 : ereport(ERROR,
1339 : (errcode(ERRCODE_DATA_CORRUPTED),
1340 : errmsg_plural("incorrect size of file \"%s\": %lld byte",
1341 : "incorrect size of file \"%s\": %lld bytes",
1342 : (long long int) stat.st_size, path,
1343 : (long long int) stat.st_size)));
1344 :
1345 78 : crc_offset = stat.st_size - sizeof(pg_crc32c);
1346 78 : if (crc_offset != MAXALIGN(crc_offset))
1347 0 : ereport(ERROR,
1348 : (errcode(ERRCODE_DATA_CORRUPTED),
1349 : errmsg("incorrect alignment of CRC offset for file \"%s\"",
1350 : path)));
1351 :
1352 : /*
1353 : * OK, slurp in the file.
1354 : */
1355 78 : buf = (char *) palloc(stat.st_size);
1356 :
1357 78 : pgstat_report_wait_start(WAIT_EVENT_TWOPHASE_FILE_READ);
1358 78 : r = read(fd, buf, stat.st_size);
1359 78 : if (r != stat.st_size)
1360 : {
1361 0 : if (r < 0)
1362 0 : ereport(ERROR,
1363 : (errcode_for_file_access(),
1364 : errmsg("could not read file \"%s\": %m", path)));
1365 : else
1366 0 : ereport(ERROR,
1367 : (errmsg("could not read file \"%s\": read %d of %lld",
1368 : path, r, (long long int) stat.st_size)));
1369 : }
1370 :
1371 78 : pgstat_report_wait_end();
1372 :
1373 78 : if (CloseTransientFile(fd) != 0)
1374 0 : ereport(ERROR,
1375 : (errcode_for_file_access(),
1376 : errmsg("could not close file \"%s\": %m", path)));
1377 :
1378 78 : hdr = (TwoPhaseFileHeader *) buf;
1379 78 : if (hdr->magic != TWOPHASE_MAGIC)
1380 0 : ereport(ERROR,
1381 : (errcode(ERRCODE_DATA_CORRUPTED),
1382 : errmsg("invalid magic number stored in file \"%s\"",
1383 : path)));
1384 :
1385 78 : if (hdr->total_len != stat.st_size)
1386 0 : ereport(ERROR,
1387 : (errcode(ERRCODE_DATA_CORRUPTED),
1388 : errmsg("invalid size stored in file \"%s\"",
1389 : path)));
1390 :
1391 78 : INIT_CRC32C(calc_crc);
1392 78 : COMP_CRC32C(calc_crc, buf, crc_offset);
1393 78 : FIN_CRC32C(calc_crc);
1394 :
1395 78 : file_crc = *((pg_crc32c *) (buf + crc_offset));
1396 :
1397 78 : if (!EQ_CRC32C(calc_crc, file_crc))
1398 0 : ereport(ERROR,
1399 : (errcode(ERRCODE_DATA_CORRUPTED),
1400 : errmsg("calculated CRC checksum does not match value stored in file \"%s\"",
1401 : path)));
1402 :
1403 78 : return buf;
1404 : }
1405 :
1406 :
1407 : /*
1408 : * Reads 2PC data from xlog. During checkpoint this data will be moved to
1409 : * twophase files and ReadTwoPhaseFile should be used instead.
1410 : *
1411 : * Note clearly that this function can access WAL during normal operation,
1412 : * similarly to the way WALSender or Logical Decoding would do.
1413 : */
1414 : static void
1415 407 : XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
1416 : {
1417 : XLogRecord *record;
1418 : XLogReaderState *xlogreader;
1419 : char *errormsg;
1420 :
1421 407 : xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
1422 407 : XL_ROUTINE(.page_read = &read_local_xlog_page,
1423 : .segment_open = &wal_segment_open,
1424 : .segment_close = &wal_segment_close),
1425 : NULL);
1426 407 : if (!xlogreader)
1427 0 : ereport(ERROR,
1428 : (errcode(ERRCODE_OUT_OF_MEMORY),
1429 : errmsg("out of memory"),
1430 : errdetail("Failed while allocating a WAL reading processor.")));
1431 :
1432 407 : XLogBeginRead(xlogreader, lsn);
1433 407 : record = XLogReadRecord(xlogreader, &errormsg);
1434 :
1435 407 : if (record == NULL)
1436 : {
1437 0 : if (errormsg)
1438 0 : ereport(ERROR,
1439 : (errcode_for_file_access(),
1440 : errmsg("could not read two-phase state from WAL at %X/%08X: %s",
1441 : LSN_FORMAT_ARGS(lsn), errormsg)));
1442 : else
1443 0 : ereport(ERROR,
1444 : (errcode_for_file_access(),
1445 : errmsg("could not read two-phase state from WAL at %X/%08X",
1446 : LSN_FORMAT_ARGS(lsn))));
1447 : }
1448 :
1449 407 : if (XLogRecGetRmid(xlogreader) != RM_XACT_ID ||
1450 407 : (XLogRecGetInfo(xlogreader) & XLOG_XACT_OPMASK) != XLOG_XACT_PREPARE)
1451 0 : ereport(ERROR,
1452 : (errcode_for_file_access(),
1453 : errmsg("expected two-phase state data is not present in WAL at %X/%08X",
1454 : LSN_FORMAT_ARGS(lsn))));
1455 :
1456 407 : if (len != NULL)
1457 26 : *len = XLogRecGetDataLen(xlogreader);
1458 :
1459 407 : *buf = palloc_array(char, XLogRecGetDataLen(xlogreader));
1460 407 : memcpy(*buf, XLogRecGetData(xlogreader), sizeof(char) * XLogRecGetDataLen(xlogreader));
1461 :
1462 407 : XLogReaderFree(xlogreader);
1463 407 : }
1464 :
1465 :
1466 : /*
1467 : * Confirms an xid is prepared, during recovery
1468 : */
1469 : bool
1470 292 : StandbyTransactionIdIsPrepared(TransactionId xid)
1471 : {
1472 : char *buf;
1473 : TwoPhaseFileHeader *hdr;
1474 : bool result;
1475 : FullTransactionId fxid;
1476 :
1477 : Assert(TransactionIdIsValid(xid));
1478 :
1479 292 : if (max_prepared_xacts <= 0)
1480 0 : return false; /* nothing to do */
1481 :
1482 : /* Read and validate file */
1483 292 : fxid = AdjustToFullTransactionId(xid);
1484 292 : buf = ReadTwoPhaseFile(fxid, true);
1485 292 : if (buf == NULL)
1486 292 : return false;
1487 :
1488 : /* Check header also */
1489 0 : hdr = (TwoPhaseFileHeader *) buf;
1490 0 : result = TransactionIdEquals(hdr->xid, xid);
1491 0 : pfree(buf);
1492 :
1493 0 : return result;
1494 : }
1495 :
1496 : /*
1497 : * FinishPreparedTransaction: execute COMMIT PREPARED or ROLLBACK PREPARED
1498 : */
1499 : void
1500 344 : FinishPreparedTransaction(const char *gid, bool isCommit)
1501 : {
1502 : GlobalTransaction gxact;
1503 : PGPROC *proc;
1504 : FullTransactionId fxid;
1505 : TransactionId xid;
1506 : bool ondisk;
1507 : char *buf;
1508 : char *bufptr;
1509 : TwoPhaseFileHeader *hdr;
1510 : TransactionId latestXid;
1511 : TransactionId *children;
1512 : RelFileLocator *commitrels;
1513 : RelFileLocator *abortrels;
1514 : RelFileLocator *delrels;
1515 : int ndelrels;
1516 : xl_xact_stats_item *commitstats;
1517 : xl_xact_stats_item *abortstats;
1518 : SharedInvalidationMessage *invalmsgs;
1519 :
1520 : /*
1521 : * Validate the GID, and lock the GXACT to ensure that two backends do not
1522 : * try to commit the same GID at once.
1523 : */
1524 344 : gxact = LockGXact(gid, GetUserId());
1525 330 : proc = GetPGProcByNumber(gxact->pgprocno);
1526 330 : fxid = gxact->fxid;
1527 330 : xid = XidFromFullTransactionId(fxid);
1528 :
1529 : /*
1530 : * Read and validate 2PC state data. State data will typically be stored
1531 : * in WAL files if the LSN is after the last checkpoint record, or moved
1532 : * to disk if for some reason they have lived for a long time.
1533 : */
1534 330 : if (gxact->ondisk)
1535 28 : buf = ReadTwoPhaseFile(fxid, false);
1536 : else
1537 302 : XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, NULL);
1538 :
1539 :
1540 : /*
1541 : * Disassemble the header area
1542 : */
1543 330 : hdr = (TwoPhaseFileHeader *) buf;
1544 : Assert(TransactionIdEquals(hdr->xid, xid));
1545 330 : bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
1546 330 : bufptr += MAXALIGN(hdr->gidlen);
1547 330 : children = (TransactionId *) bufptr;
1548 330 : bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
1549 330 : commitrels = (RelFileLocator *) bufptr;
1550 330 : bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileLocator));
1551 330 : abortrels = (RelFileLocator *) bufptr;
1552 330 : bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileLocator));
1553 330 : commitstats = (xl_xact_stats_item *) bufptr;
1554 330 : bufptr += MAXALIGN(hdr->ncommitstats * sizeof(xl_xact_stats_item));
1555 330 : abortstats = (xl_xact_stats_item *) bufptr;
1556 330 : bufptr += MAXALIGN(hdr->nabortstats * sizeof(xl_xact_stats_item));
1557 330 : invalmsgs = (SharedInvalidationMessage *) bufptr;
1558 330 : bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
1559 :
1560 : /* compute latestXid among all children */
1561 330 : latestXid = TransactionIdLatest(xid, hdr->nsubxacts, children);
1562 :
1563 : /* Prevent cancel/die interrupt while cleaning up */
1564 330 : HOLD_INTERRUPTS();
1565 :
1566 : /*
1567 : * The order of operations here is critical: make the XLOG entry for
1568 : * commit or abort, then mark the transaction committed or aborted in
1569 : * pg_xact, then remove its PGPROC from the global ProcArray (which means
1570 : * TransactionIdIsInProgress will stop saying the prepared xact is in
1571 : * progress), then run the post-commit or post-abort callbacks. The
1572 : * callbacks will release the locks the transaction held.
1573 : */
1574 330 : if (isCommit)
1575 281 : RecordTransactionCommitPrepared(xid,
1576 : hdr->nsubxacts, children,
1577 : hdr->ncommitrels, commitrels,
1578 : hdr->ncommitstats,
1579 : commitstats,
1580 : hdr->ninvalmsgs, invalmsgs,
1581 281 : hdr->initfileinval, gid);
1582 : else
1583 49 : RecordTransactionAbortPrepared(xid,
1584 : hdr->nsubxacts, children,
1585 : hdr->nabortrels, abortrels,
1586 : hdr->nabortstats,
1587 : abortstats,
1588 : gid);
1589 :
1590 330 : ProcArrayRemove(proc, latestXid);
1591 :
1592 : /*
1593 : * In case we fail while running the callbacks, mark the gxact invalid so
1594 : * no one else will try to commit/rollback, and so it will be recycled if
1595 : * we fail after this point. It is still locked by our backend so it
1596 : * won't go away yet.
1597 : *
1598 : * (We assume it's safe to do this without taking TwoPhaseStateLock.)
1599 : */
1600 330 : gxact->valid = false;
1601 :
1602 : /*
1603 : * We have to remove any files that were supposed to be dropped. For
1604 : * consistency with the regular xact.c code paths, must do this before
1605 : * releasing locks, so do it before running the callbacks.
1606 : *
1607 : * NB: this code knows that we couldn't be dropping any temp rels ...
1608 : */
1609 330 : if (isCommit)
1610 : {
1611 281 : delrels = commitrels;
1612 281 : ndelrels = hdr->ncommitrels;
1613 : }
1614 : else
1615 : {
1616 49 : delrels = abortrels;
1617 49 : ndelrels = hdr->nabortrels;
1618 : }
1619 :
1620 : /* Make sure files supposed to be dropped are dropped */
1621 330 : DropRelationFiles(delrels, ndelrels, false);
1622 :
1623 330 : if (isCommit)
1624 281 : pgstat_execute_transactional_drops(hdr->ncommitstats, commitstats, false);
1625 : else
1626 49 : pgstat_execute_transactional_drops(hdr->nabortstats, abortstats, false);
1627 :
1628 : /*
1629 : * Handle cache invalidation messages.
1630 : *
1631 : * Relcache init file invalidation requires processing both before and
1632 : * after we send the SI messages, only when committing. See
1633 : * AtEOXact_Inval().
1634 : */
1635 330 : if (isCommit)
1636 : {
1637 281 : if (hdr->initfileinval)
1638 0 : RelationCacheInitFilePreInvalidate();
1639 281 : SendSharedInvalidMessages(invalmsgs, hdr->ninvalmsgs);
1640 281 : if (hdr->initfileinval)
1641 0 : RelationCacheInitFilePostInvalidate();
1642 : }
1643 :
1644 : /*
1645 : * Acquire the two-phase lock. We want to work on the two-phase callbacks
1646 : * while holding it to avoid potential conflicts with other transactions
1647 : * attempting to use the same GID, so the lock is released once the shared
1648 : * memory state is cleared.
1649 : */
1650 330 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
1651 :
1652 : /* And now do the callbacks */
1653 330 : if (isCommit)
1654 281 : ProcessRecords(bufptr, fxid, twophase_postcommit_callbacks);
1655 : else
1656 49 : ProcessRecords(bufptr, fxid, twophase_postabort_callbacks);
1657 :
1658 330 : PredicateLockTwoPhaseFinish(fxid, isCommit);
1659 :
1660 : /*
1661 : * Read this value while holding the two-phase lock, as the on-disk 2PC
1662 : * file is physically removed after the lock is released.
1663 : */
1664 330 : ondisk = gxact->ondisk;
1665 :
1666 : /* Clear shared memory state */
1667 330 : RemoveGXact(gxact);
1668 :
1669 : /*
1670 : * Release the lock as all callbacks are called and shared memory cleanup
1671 : * is done.
1672 : */
1673 330 : LWLockRelease(TwoPhaseStateLock);
1674 :
1675 : /* Count the prepared xact as committed or aborted */
1676 330 : AtEOXact_PgStat(isCommit, false);
1677 :
1678 : /*
1679 : * And now we can clean up any files we may have left.
1680 : */
1681 330 : if (ondisk)
1682 28 : RemoveTwoPhaseFile(fxid, true);
1683 :
1684 330 : MyLockedGxact = NULL;
1685 :
1686 330 : RESUME_INTERRUPTS();
1687 :
1688 330 : pfree(buf);
1689 330 : }
1690 :
1691 : /*
1692 : * Scan 2PC state data in memory and call the indicated callbacks for each 2PC record.
1693 : */
1694 : static void
1695 363 : ProcessRecords(char *bufptr, FullTransactionId fxid,
1696 : const TwoPhaseCallback callbacks[])
1697 : {
1698 : for (;;)
1699 1570 : {
1700 1933 : TwoPhaseRecordOnDisk *record = (TwoPhaseRecordOnDisk *) bufptr;
1701 :
1702 : Assert(record->rmid <= TWOPHASE_RM_MAX_ID);
1703 1933 : if (record->rmid == TWOPHASE_RM_END_ID)
1704 363 : break;
1705 :
1706 1570 : bufptr += MAXALIGN(sizeof(TwoPhaseRecordOnDisk));
1707 :
1708 1570 : if (callbacks[record->rmid] != NULL)
1709 1491 : callbacks[record->rmid] (fxid, record->info, bufptr, record->len);
1710 :
1711 1570 : bufptr += MAXALIGN(record->len);
1712 : }
1713 363 : }
1714 :
1715 : /*
1716 : * Remove the 2PC file.
1717 : *
1718 : * If giveWarning is false, do not complain about file-not-present;
1719 : * this is an expected case during WAL replay.
1720 : *
1721 : * This routine is used at early stages at recovery where future and
1722 : * past orphaned files are checked, hence the FullTransactionId to build
1723 : * a complete file name fit for the removal.
1724 : */
1725 : static void
1726 33 : RemoveTwoPhaseFile(FullTransactionId fxid, bool giveWarning)
1727 : {
1728 : char path[MAXPGPATH];
1729 :
1730 33 : TwoPhaseFilePath(path, fxid);
1731 33 : if (unlink(path))
1732 0 : if (errno != ENOENT || giveWarning)
1733 0 : ereport(WARNING,
1734 : (errcode_for_file_access(),
1735 : errmsg("could not remove file \"%s\": %m", path)));
1736 33 : }
1737 :
1738 : /*
1739 : * Recreates a state file. This is used in WAL replay and during
1740 : * checkpoint creation.
1741 : *
1742 : * Note: content and len don't include CRC.
1743 : */
1744 : static void
1745 26 : RecreateTwoPhaseFile(FullTransactionId fxid, void *content, int len)
1746 : {
1747 : char path[MAXPGPATH];
1748 : pg_crc32c statefile_crc;
1749 : int fd;
1750 :
1751 : /* Recompute CRC */
1752 26 : INIT_CRC32C(statefile_crc);
1753 26 : COMP_CRC32C(statefile_crc, content, len);
1754 26 : FIN_CRC32C(statefile_crc);
1755 :
1756 26 : TwoPhaseFilePath(path, fxid);
1757 :
1758 26 : fd = OpenTransientFile(path,
1759 : O_CREAT | O_TRUNC | O_WRONLY | PG_BINARY);
1760 26 : if (fd < 0)
1761 0 : ereport(ERROR,
1762 : (errcode_for_file_access(),
1763 : errmsg("could not recreate file \"%s\": %m", path)));
1764 :
1765 : /* Write content and CRC */
1766 26 : errno = 0;
1767 26 : pgstat_report_wait_start(WAIT_EVENT_TWOPHASE_FILE_WRITE);
1768 26 : if (write(fd, content, len) != len)
1769 : {
1770 : /* if write didn't set errno, assume problem is no disk space */
1771 0 : if (errno == 0)
1772 0 : errno = ENOSPC;
1773 0 : ereport(ERROR,
1774 : (errcode_for_file_access(),
1775 : errmsg("could not write file \"%s\": %m", path)));
1776 : }
1777 26 : if (write(fd, &statefile_crc, sizeof(pg_crc32c)) != sizeof(pg_crc32c))
1778 : {
1779 : /* if write didn't set errno, assume problem is no disk space */
1780 0 : if (errno == 0)
1781 0 : errno = ENOSPC;
1782 0 : ereport(ERROR,
1783 : (errcode_for_file_access(),
1784 : errmsg("could not write file \"%s\": %m", path)));
1785 : }
1786 26 : pgstat_report_wait_end();
1787 :
1788 : /*
1789 : * We must fsync the file because the end-of-replay checkpoint will not do
1790 : * so, there being no GXACT in shared memory yet to tell it to.
1791 : */
1792 26 : pgstat_report_wait_start(WAIT_EVENT_TWOPHASE_FILE_SYNC);
1793 26 : if (pg_fsync(fd) != 0)
1794 0 : ereport(ERROR,
1795 : (errcode_for_file_access(),
1796 : errmsg("could not fsync file \"%s\": %m", path)));
1797 26 : pgstat_report_wait_end();
1798 :
1799 26 : if (CloseTransientFile(fd) != 0)
1800 0 : ereport(ERROR,
1801 : (errcode_for_file_access(),
1802 : errmsg("could not close file \"%s\": %m", path)));
1803 26 : }
1804 :
1805 : /*
1806 : * CheckPointTwoPhase -- handle 2PC component of checkpointing.
1807 : *
1808 : * We must fsync the state file of any GXACT that is valid or has been
1809 : * generated during redo and has a PREPARE LSN <= the checkpoint's redo
1810 : * horizon. (If the gxact isn't valid yet, has not been generated in
1811 : * redo, or has a later LSN, this checkpoint is not responsible for
1812 : * fsyncing it.)
1813 : *
1814 : * This is deliberately run as late as possible in the checkpoint sequence,
1815 : * because GXACTs ordinarily have short lifespans, and so it is quite
1816 : * possible that GXACTs that were valid at checkpoint start will no longer
1817 : * exist if we wait a little bit. With typical checkpoint settings this
1818 : * will be about 3 minutes for an online checkpoint, so as a result we
1819 : * expect that there will be no GXACTs that need to be copied to disk.
1820 : *
1821 : * If a GXACT remains valid across multiple checkpoints, it will already
1822 : * be on disk so we don't bother to repeat that write.
1823 : */
1824 : void
1825 1844 : CheckPointTwoPhase(XLogRecPtr redo_horizon)
1826 : {
1827 : int i;
1828 1844 : int serialized_xacts = 0;
1829 :
1830 1844 : if (max_prepared_xacts <= 0)
1831 1289 : return; /* nothing to do */
1832 :
1833 : TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_START();
1834 :
1835 : /*
1836 : * We are expecting there to be zero GXACTs that need to be copied to
1837 : * disk, so we perform all I/O while holding TwoPhaseStateLock for
1838 : * simplicity. This prevents any new xacts from preparing while this
1839 : * occurs, which shouldn't be a problem since the presence of long-lived
1840 : * prepared xacts indicates the transaction manager isn't active.
1841 : *
1842 : * It's also possible to move I/O out of the lock, but on every error we
1843 : * should check whether somebody committed our transaction in different
1844 : * backend. Let's leave this optimization for future, if somebody will
1845 : * spot that this place cause bottleneck.
1846 : *
1847 : * Note that it isn't possible for there to be a GXACT with a
1848 : * prepare_end_lsn set prior to the last checkpoint yet is marked invalid,
1849 : * because of the efforts with delayChkptFlags.
1850 : */
1851 555 : LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
1852 590 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
1853 : {
1854 : /*
1855 : * Note that we are using gxact not PGPROC so this works in recovery
1856 : * also
1857 : */
1858 35 : GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
1859 :
1860 35 : if ((gxact->valid || gxact->inredo) &&
1861 35 : !gxact->ondisk &&
1862 31 : gxact->prepare_end_lsn <= redo_horizon)
1863 : {
1864 : char *buf;
1865 : int len;
1866 :
1867 26 : XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, &len);
1868 26 : RecreateTwoPhaseFile(gxact->fxid, buf, len);
1869 26 : gxact->ondisk = true;
1870 26 : gxact->prepare_start_lsn = InvalidXLogRecPtr;
1871 26 : gxact->prepare_end_lsn = InvalidXLogRecPtr;
1872 26 : pfree(buf);
1873 26 : serialized_xacts++;
1874 : }
1875 : }
1876 555 : LWLockRelease(TwoPhaseStateLock);
1877 :
1878 : /*
1879 : * Flush unconditionally the parent directory to make any information
1880 : * durable on disk. Two-phase files could have been removed and those
1881 : * removals need to be made persistent as well as any files newly created
1882 : * previously since the last checkpoint.
1883 : */
1884 555 : fsync_fname(TWOPHASE_DIR, true);
1885 :
1886 : TRACE_POSTGRESQL_TWOPHASE_CHECKPOINT_DONE();
1887 :
1888 555 : if (log_checkpoints && serialized_xacts > 0)
1889 22 : ereport(LOG,
1890 : (errmsg_plural("%u two-phase state file was written "
1891 : "for a long-running prepared transaction",
1892 : "%u two-phase state files were written "
1893 : "for long-running prepared transactions",
1894 : serialized_xacts,
1895 : serialized_xacts)));
1896 : }
1897 :
1898 : /*
1899 : * restoreTwoPhaseData
1900 : *
1901 : * Scan pg_twophase and fill TwoPhaseState depending on the on-disk data.
1902 : * This is called once at the beginning of recovery, saving any extra
1903 : * lookups in the future. Two-phase files that are newer than the
1904 : * minimum XID horizon are discarded on the way.
1905 : */
1906 : void
1907 1034 : restoreTwoPhaseData(void)
1908 : {
1909 : DIR *cldir;
1910 : struct dirent *clde;
1911 :
1912 1034 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
1913 1034 : cldir = AllocateDir(TWOPHASE_DIR);
1914 3118 : while ((clde = ReadDir(cldir, TWOPHASE_DIR)) != NULL)
1915 : {
1916 2084 : if (strlen(clde->d_name) == 16 &&
1917 16 : strspn(clde->d_name, "0123456789ABCDEF") == 16)
1918 : {
1919 : FullTransactionId fxid;
1920 : char *buf;
1921 :
1922 16 : fxid = FullTransactionIdFromU64(strtou64(clde->d_name, NULL, 16));
1923 :
1924 16 : buf = ProcessTwoPhaseBuffer(fxid, InvalidXLogRecPtr,
1925 : true, false, false);
1926 16 : if (buf == NULL)
1927 0 : continue;
1928 :
1929 16 : PrepareRedoAdd(fxid, buf, InvalidXLogRecPtr,
1930 : InvalidXLogRecPtr, InvalidReplOriginId);
1931 : }
1932 : }
1933 1034 : LWLockRelease(TwoPhaseStateLock);
1934 1034 : FreeDir(cldir);
1935 1034 : }
1936 :
1937 : /*
1938 : * PrescanPreparedTransactions
1939 : *
1940 : * Scan the shared memory entries of TwoPhaseState and determine the range
1941 : * of valid XIDs present. This is run during database startup, after we
1942 : * have completed reading WAL. TransamVariables->nextXid has been set to
1943 : * one more than the highest XID for which evidence exists in WAL.
1944 : *
1945 : * We throw away any prepared xacts with main XID beyond nextXid --- if any
1946 : * are present, it suggests that the DBA has done a PITR recovery to an
1947 : * earlier point in time without cleaning out pg_twophase. We dare not
1948 : * try to recover such prepared xacts since they likely depend on database
1949 : * state that doesn't exist now.
1950 : *
1951 : * However, we will advance nextXid beyond any subxact XIDs belonging to
1952 : * valid prepared xacts. We need to do this since subxact commit doesn't
1953 : * write a WAL entry, and so there might be no evidence in WAL of those
1954 : * subxact XIDs.
1955 : *
1956 : * On corrupted two-phase files, fail immediately. Keeping around broken
1957 : * entries and let replay continue causes harm on the system, and a new
1958 : * backup should be rolled in.
1959 : *
1960 : * Our other responsibility is to determine and return the oldest valid XID
1961 : * among the prepared xacts (if none, return TransamVariables->nextXid).
1962 : * This is needed to synchronize pg_subtrans startup properly.
1963 : *
1964 : * If xids_p and nxids_p are not NULL, pointer to a palloc'd array of all
1965 : * top-level xids is stored in *xids_p. The number of entries in the array
1966 : * is returned in *nxids_p.
1967 : */
1968 : TransactionId
1969 1037 : PrescanPreparedTransactions(TransactionId **xids_p, int *nxids_p)
1970 : {
1971 1037 : FullTransactionId nextXid = TransamVariables->nextXid;
1972 1037 : TransactionId origNextXid = XidFromFullTransactionId(nextXid);
1973 1037 : TransactionId result = origNextXid;
1974 1037 : TransactionId *xids = NULL;
1975 1037 : int nxids = 0;
1976 1037 : int allocsize = 0;
1977 : int i;
1978 :
1979 1037 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
1980 1091 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
1981 : {
1982 : TransactionId xid;
1983 : char *buf;
1984 54 : GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
1985 :
1986 : Assert(gxact->inredo);
1987 :
1988 54 : buf = ProcessTwoPhaseBuffer(gxact->fxid,
1989 : gxact->prepare_start_lsn,
1990 54 : gxact->ondisk, false, true);
1991 :
1992 54 : if (buf == NULL)
1993 0 : continue;
1994 :
1995 : /*
1996 : * OK, we think this file is valid. Incorporate xid into the
1997 : * running-minimum result.
1998 : */
1999 54 : xid = XidFromFullTransactionId(gxact->fxid);
2000 54 : if (TransactionIdPrecedes(xid, result))
2001 46 : result = xid;
2002 :
2003 54 : if (xids_p)
2004 : {
2005 21 : if (nxids == allocsize)
2006 : {
2007 17 : if (nxids == 0)
2008 : {
2009 17 : allocsize = 10;
2010 17 : xids = palloc(allocsize * sizeof(TransactionId));
2011 : }
2012 : else
2013 : {
2014 0 : allocsize = allocsize * 2;
2015 0 : xids = repalloc(xids, allocsize * sizeof(TransactionId));
2016 : }
2017 : }
2018 21 : xids[nxids++] = xid;
2019 : }
2020 :
2021 54 : pfree(buf);
2022 : }
2023 1037 : LWLockRelease(TwoPhaseStateLock);
2024 :
2025 1037 : if (xids_p)
2026 : {
2027 65 : *xids_p = xids;
2028 65 : *nxids_p = nxids;
2029 : }
2030 :
2031 1037 : return result;
2032 : }
2033 :
2034 : /*
2035 : * StandbyRecoverPreparedTransactions
2036 : *
2037 : * Scan the shared memory entries of TwoPhaseState and setup all the required
2038 : * information to allow standby queries to treat prepared transactions as still
2039 : * active.
2040 : *
2041 : * This is never called at the end of recovery - we use
2042 : * RecoverPreparedTransactions() at that point.
2043 : *
2044 : * This updates pg_subtrans, so that any subtransactions will be correctly
2045 : * seen as in-progress in snapshots taken during recovery.
2046 : */
2047 : void
2048 65 : StandbyRecoverPreparedTransactions(void)
2049 : {
2050 : int i;
2051 :
2052 65 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
2053 86 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
2054 : {
2055 : char *buf;
2056 21 : GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
2057 :
2058 : Assert(gxact->inredo);
2059 :
2060 21 : buf = ProcessTwoPhaseBuffer(gxact->fxid,
2061 : gxact->prepare_start_lsn,
2062 21 : gxact->ondisk, true, false);
2063 21 : if (buf != NULL)
2064 21 : pfree(buf);
2065 : }
2066 65 : LWLockRelease(TwoPhaseStateLock);
2067 65 : }
2068 :
2069 : /*
2070 : * RecoverPreparedTransactions
2071 : *
2072 : * Scan the shared memory entries of TwoPhaseState and reload the state for
2073 : * each prepared transaction (reacquire locks, etc).
2074 : *
2075 : * This is run at the end of recovery, but before we allow backends to write
2076 : * WAL.
2077 : *
2078 : * At the end of recovery the way we take snapshots will change. We now need
2079 : * to mark all running transactions with their full SubTransSetParent() info
2080 : * to allow normal snapshots to work correctly if snapshots overflow.
2081 : * We do this here because by definition prepared transactions are the only
2082 : * type of write transaction still running, so this is necessary and
2083 : * complete.
2084 : */
2085 : void
2086 972 : RecoverPreparedTransactions(void)
2087 : {
2088 : int i;
2089 :
2090 972 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
2091 1005 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
2092 : {
2093 : char *buf;
2094 33 : GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
2095 33 : FullTransactionId fxid = gxact->fxid;
2096 : char *bufptr;
2097 : TwoPhaseFileHeader *hdr;
2098 : TransactionId *subxids;
2099 : const char *gid;
2100 :
2101 : /*
2102 : * Reconstruct subtrans state for the transaction --- needed because
2103 : * pg_subtrans is not preserved over a restart. Note that we are
2104 : * linking all the subtransactions directly to the top-level XID;
2105 : * there may originally have been a more complex hierarchy, but
2106 : * there's no need to restore that exactly. It's possible that
2107 : * SubTransSetParent has been set before, if the prepared transaction
2108 : * generated xid assignment records.
2109 : */
2110 33 : buf = ProcessTwoPhaseBuffer(gxact->fxid,
2111 : gxact->prepare_start_lsn,
2112 33 : gxact->ondisk, true, false);
2113 33 : if (buf == NULL)
2114 0 : continue;
2115 :
2116 33 : ereport(LOG,
2117 : (errmsg("recovering prepared transaction %u of epoch %u from shared memory",
2118 : XidFromFullTransactionId(gxact->fxid),
2119 : EpochFromFullTransactionId(gxact->fxid))));
2120 :
2121 33 : hdr = (TwoPhaseFileHeader *) buf;
2122 : Assert(TransactionIdEquals(hdr->xid,
2123 : XidFromFullTransactionId(gxact->fxid)));
2124 33 : bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
2125 33 : gid = (const char *) bufptr;
2126 33 : bufptr += MAXALIGN(hdr->gidlen);
2127 33 : subxids = (TransactionId *) bufptr;
2128 33 : bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId));
2129 33 : bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileLocator));
2130 33 : bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileLocator));
2131 33 : bufptr += MAXALIGN(hdr->ncommitstats * sizeof(xl_xact_stats_item));
2132 33 : bufptr += MAXALIGN(hdr->nabortstats * sizeof(xl_xact_stats_item));
2133 33 : bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
2134 :
2135 : /*
2136 : * Recreate its GXACT and dummy PGPROC. But, check whether it was
2137 : * added in redo and already has a shmem entry for it.
2138 : */
2139 33 : MarkAsPreparingGuts(gxact, gxact->fxid, gid,
2140 : hdr->prepared_at,
2141 : hdr->owner, hdr->database);
2142 :
2143 : /* recovered, so reset the flag for entries generated by redo */
2144 33 : gxact->inredo = false;
2145 :
2146 33 : GXactLoadSubxactData(gxact, hdr->nsubxacts, subxids);
2147 33 : MarkAsPrepared(gxact, true);
2148 :
2149 33 : LWLockRelease(TwoPhaseStateLock);
2150 :
2151 : /*
2152 : * Recover other state (notably locks) using resource managers.
2153 : */
2154 33 : ProcessRecords(bufptr, fxid, twophase_recover_callbacks);
2155 :
2156 : /*
2157 : * Release locks held by the standby process after we process each
2158 : * prepared transaction. As a result, we don't need too many
2159 : * additional locks at any one time.
2160 : */
2161 33 : if (InHotStandby)
2162 7 : StandbyReleaseLockTree(hdr->xid, hdr->nsubxacts, subxids);
2163 :
2164 : /*
2165 : * We're done with recovering this transaction. Clear MyLockedGxact,
2166 : * like we do in PrepareTransaction() during normal operation.
2167 : */
2168 33 : PostPrepare_Twophase();
2169 :
2170 33 : pfree(buf);
2171 :
2172 33 : LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
2173 : }
2174 :
2175 972 : LWLockRelease(TwoPhaseStateLock);
2176 972 : }
2177 :
2178 : /*
2179 : * ProcessTwoPhaseBuffer
2180 : *
2181 : * Given a FullTransactionId, read it either from disk or read it directly
2182 : * via shmem xlog record pointer using the provided "prepare_start_lsn".
2183 : *
2184 : * If setParent is true, set up subtransaction parent linkages.
2185 : *
2186 : * If setNextXid is true, set TransamVariables->nextXid to the newest
2187 : * value scanned.
2188 : */
2189 : static char *
2190 124 : ProcessTwoPhaseBuffer(FullTransactionId fxid,
2191 : XLogRecPtr prepare_start_lsn,
2192 : bool fromdisk,
2193 : bool setParent, bool setNextXid)
2194 : {
2195 124 : FullTransactionId nextXid = TransamVariables->nextXid;
2196 : TransactionId *subxids;
2197 : char *buf;
2198 : TwoPhaseFileHeader *hdr;
2199 : int i;
2200 :
2201 : Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
2202 :
2203 124 : if (!fromdisk)
2204 : Assert(XLogRecPtrIsValid(prepare_start_lsn));
2205 :
2206 : /* Already processed? */
2207 248 : if (TransactionIdDidCommit(XidFromFullTransactionId(fxid)) ||
2208 124 : TransactionIdDidAbort(XidFromFullTransactionId(fxid)))
2209 : {
2210 0 : if (fromdisk)
2211 : {
2212 0 : ereport(WARNING,
2213 : (errmsg("removing stale two-phase state file for transaction %u of epoch %u",
2214 : XidFromFullTransactionId(fxid),
2215 : EpochFromFullTransactionId(fxid))));
2216 0 : RemoveTwoPhaseFile(fxid, true);
2217 : }
2218 : else
2219 : {
2220 0 : ereport(WARNING,
2221 : (errmsg("removing stale two-phase state from memory for transaction %u of epoch %u",
2222 : XidFromFullTransactionId(fxid),
2223 : EpochFromFullTransactionId(fxid))));
2224 0 : PrepareRedoRemoveFull(fxid, true);
2225 : }
2226 0 : return NULL;
2227 : }
2228 :
2229 : /* Reject XID if too new */
2230 124 : if (FullTransactionIdFollowsOrEquals(fxid, nextXid))
2231 : {
2232 0 : if (fromdisk)
2233 : {
2234 0 : ereport(WARNING,
2235 : (errmsg("removing future two-phase state file for transaction %u of epoch %u",
2236 : XidFromFullTransactionId(fxid),
2237 : EpochFromFullTransactionId(fxid))));
2238 0 : RemoveTwoPhaseFile(fxid, true);
2239 : }
2240 : else
2241 : {
2242 0 : ereport(WARNING,
2243 : (errmsg("removing future two-phase state from memory for transaction %u of epoch %u",
2244 : XidFromFullTransactionId(fxid),
2245 : EpochFromFullTransactionId(fxid))));
2246 0 : PrepareRedoRemoveFull(fxid, true);
2247 : }
2248 0 : return NULL;
2249 : }
2250 :
2251 124 : if (fromdisk)
2252 : {
2253 : /* Read and validate file */
2254 50 : buf = ReadTwoPhaseFile(fxid, false);
2255 : }
2256 : else
2257 : {
2258 : /* Read xlog data */
2259 74 : XlogReadTwoPhaseData(prepare_start_lsn, &buf, NULL);
2260 : }
2261 :
2262 : /* Deconstruct header */
2263 124 : hdr = (TwoPhaseFileHeader *) buf;
2264 124 : if (!TransactionIdEquals(hdr->xid, XidFromFullTransactionId(fxid)))
2265 : {
2266 0 : if (fromdisk)
2267 0 : ereport(ERROR,
2268 : (errcode(ERRCODE_DATA_CORRUPTED),
2269 : errmsg("corrupted two-phase state file for transaction %u of epoch %u",
2270 : XidFromFullTransactionId(fxid),
2271 : EpochFromFullTransactionId(fxid))));
2272 : else
2273 0 : ereport(ERROR,
2274 : (errcode(ERRCODE_DATA_CORRUPTED),
2275 : errmsg("corrupted two-phase state in memory for transaction %u of epoch %u",
2276 : XidFromFullTransactionId(fxid),
2277 : EpochFromFullTransactionId(fxid))));
2278 : }
2279 :
2280 : /*
2281 : * Examine subtransaction XIDs ... they should all follow main XID, and
2282 : * they may force us to advance nextXid.
2283 : */
2284 124 : subxids = (TransactionId *) (buf +
2285 124 : MAXALIGN(sizeof(TwoPhaseFileHeader)) +
2286 124 : MAXALIGN(hdr->gidlen));
2287 1911 : for (i = 0; i < hdr->nsubxacts; i++)
2288 : {
2289 1787 : TransactionId subxid = subxids[i];
2290 :
2291 : Assert(TransactionIdFollows(subxid, XidFromFullTransactionId(fxid)));
2292 :
2293 : /* update nextXid if needed */
2294 1787 : if (setNextXid)
2295 823 : AdvanceNextFullTransactionIdPastXid(subxid);
2296 :
2297 1787 : if (setParent)
2298 823 : SubTransSetParent(subxid, XidFromFullTransactionId(fxid));
2299 : }
2300 :
2301 124 : return buf;
2302 : }
2303 :
2304 :
2305 : /*
2306 : * RecordTransactionCommitPrepared
2307 : *
2308 : * This is basically the same as RecordTransactionCommit (q.v. if you change
2309 : * this function): in particular, we must set DELAY_CHKPT_IN_COMMIT to avoid a
2310 : * race condition.
2311 : *
2312 : * We know the transaction made at least one XLOG entry (its PREPARE),
2313 : * so it is never possible to optimize out the commit record.
2314 : */
2315 : static void
2316 281 : RecordTransactionCommitPrepared(TransactionId xid,
2317 : int nchildren,
2318 : TransactionId *children,
2319 : int nrels,
2320 : RelFileLocator *rels,
2321 : int nstats,
2322 : xl_xact_stats_item *stats,
2323 : int ninvalmsgs,
2324 : SharedInvalidationMessage *invalmsgs,
2325 : bool initfileinval,
2326 : const char *gid)
2327 : {
2328 : XLogRecPtr recptr;
2329 : TimestampTz committs;
2330 : bool replorigin;
2331 :
2332 : /*
2333 : * Are we using the replication origins feature? Or, in other words, are
2334 : * we replaying remote actions?
2335 : */
2336 304 : replorigin = (replorigin_xact_state.origin != InvalidReplOriginId &&
2337 23 : replorigin_xact_state.origin != DoNotReplicateId);
2338 :
2339 : /* Load the injection point before entering the critical section */
2340 281 : INJECTION_POINT_LOAD("commit-after-delay-checkpoint");
2341 :
2342 281 : START_CRIT_SECTION();
2343 :
2344 : /* See notes in RecordTransactionCommit */
2345 : Assert((MyProc->delayChkptFlags & DELAY_CHKPT_IN_COMMIT) == 0);
2346 281 : MyProc->delayChkptFlags |= DELAY_CHKPT_IN_COMMIT;
2347 :
2348 281 : INJECTION_POINT_CACHED("commit-after-delay-checkpoint", NULL);
2349 :
2350 : /*
2351 : * Ensures the DELAY_CHKPT_IN_COMMIT flag write is globally visible before
2352 : * commit time is written.
2353 : */
2354 281 : pg_write_barrier();
2355 :
2356 : /*
2357 : * Note it is important to set committs value after marking ourselves as
2358 : * in the commit critical section (DELAY_CHKPT_IN_COMMIT). This is because
2359 : * we want to ensure all transactions that have acquired commit timestamp
2360 : * are finished before we allow the logical replication client to advance
2361 : * its xid which is used to hold back dead rows for conflict detection.
2362 : * See comments atop worker.c.
2363 : */
2364 281 : committs = GetCurrentTimestamp();
2365 :
2366 : /*
2367 : * Emit the XLOG commit record. Note that we mark 2PC commits as
2368 : * potentially having AccessExclusiveLocks since we don't know whether or
2369 : * not they do.
2370 : */
2371 281 : recptr = XactLogCommitRecord(committs,
2372 : nchildren, children, nrels, rels,
2373 : nstats, stats,
2374 : ninvalmsgs, invalmsgs,
2375 : initfileinval,
2376 281 : MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
2377 : xid, gid);
2378 :
2379 :
2380 281 : if (replorigin)
2381 : /* Move LSNs forward for this replication origin */
2382 23 : replorigin_session_advance(replorigin_xact_state.origin_lsn,
2383 : XactLastRecEnd);
2384 :
2385 : /*
2386 : * Record commit timestamp. The value comes from plain commit timestamp
2387 : * if replorigin is not enabled, or replorigin already set a value for us
2388 : * in replorigin_xact_state.origin_timestamp otherwise.
2389 : *
2390 : * We don't need to WAL-log anything here, as the commit record written
2391 : * above already contains the data.
2392 : */
2393 281 : if (!replorigin || replorigin_xact_state.origin_timestamp == 0)
2394 258 : replorigin_xact_state.origin_timestamp = committs;
2395 :
2396 281 : TransactionTreeSetCommitTsData(xid, nchildren, children,
2397 : replorigin_xact_state.origin_timestamp,
2398 281 : replorigin_xact_state.origin);
2399 :
2400 : /*
2401 : * We don't currently try to sleep before flush here ... nor is there any
2402 : * support for async commit of a prepared xact (the very idea is probably
2403 : * a contradiction)
2404 : */
2405 :
2406 : /* Flush XLOG to disk */
2407 281 : XLogFlush(recptr);
2408 :
2409 : /* Mark the transaction committed in pg_xact */
2410 281 : TransactionIdCommitTree(xid, nchildren, children);
2411 :
2412 : /* Checkpoint can proceed now */
2413 281 : MyProc->delayChkptFlags &= ~DELAY_CHKPT_IN_COMMIT;
2414 :
2415 281 : END_CRIT_SECTION();
2416 :
2417 : /*
2418 : * Wait for synchronous replication, if required.
2419 : *
2420 : * Note that at this stage we have marked clog, but still show as running
2421 : * in the procarray and continue to hold locks.
2422 : */
2423 281 : SyncRepWaitForLSN(recptr, true);
2424 281 : }
2425 :
2426 : /*
2427 : * RecordTransactionAbortPrepared
2428 : *
2429 : * This is basically the same as RecordTransactionAbort.
2430 : *
2431 : * We know the transaction made at least one XLOG entry (its PREPARE),
2432 : * so it is never possible to optimize out the abort record.
2433 : */
2434 : static void
2435 49 : RecordTransactionAbortPrepared(TransactionId xid,
2436 : int nchildren,
2437 : TransactionId *children,
2438 : int nrels,
2439 : RelFileLocator *rels,
2440 : int nstats,
2441 : xl_xact_stats_item *stats,
2442 : const char *gid)
2443 : {
2444 : XLogRecPtr recptr;
2445 : bool replorigin;
2446 :
2447 : /*
2448 : * Are we using the replication origins feature? Or, in other words, are
2449 : * we replaying remote actions?
2450 : */
2451 55 : replorigin = (replorigin_xact_state.origin != InvalidReplOriginId &&
2452 6 : replorigin_xact_state.origin != DoNotReplicateId);
2453 :
2454 : /*
2455 : * Catch the scenario where we aborted partway through
2456 : * RecordTransactionCommitPrepared ...
2457 : */
2458 49 : if (TransactionIdDidCommit(xid))
2459 0 : elog(PANIC, "cannot abort transaction %u, it was already committed",
2460 : xid);
2461 :
2462 49 : START_CRIT_SECTION();
2463 :
2464 : /*
2465 : * Emit the XLOG commit record. Note that we mark 2PC aborts as
2466 : * potentially having AccessExclusiveLocks since we don't know whether or
2467 : * not they do.
2468 : */
2469 49 : recptr = XactLogAbortRecord(GetCurrentTimestamp(),
2470 : nchildren, children,
2471 : nrels, rels,
2472 : nstats, stats,
2473 49 : MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
2474 : xid, gid);
2475 :
2476 49 : if (replorigin)
2477 : /* Move LSNs forward for this replication origin */
2478 6 : replorigin_session_advance(replorigin_xact_state.origin_lsn,
2479 : XactLastRecEnd);
2480 :
2481 : /* Always flush, since we're about to remove the 2PC state file */
2482 49 : XLogFlush(recptr);
2483 :
2484 : /*
2485 : * Mark the transaction aborted in clog. This is not absolutely necessary
2486 : * but we may as well do it while we are here.
2487 : */
2488 49 : TransactionIdAbortTree(xid, nchildren, children);
2489 :
2490 49 : END_CRIT_SECTION();
2491 :
2492 : /*
2493 : * Wait for synchronous replication, if required.
2494 : *
2495 : * Note that at this stage we have marked clog, but still show as running
2496 : * in the procarray and continue to hold locks.
2497 : */
2498 49 : SyncRepWaitForLSN(recptr, false);
2499 49 : }
2500 :
2501 : /*
2502 : * PrepareRedoAdd
2503 : *
2504 : * Store pointers to the start/end of the WAL record along with the xid in
2505 : * a gxact entry in shared memory TwoPhaseState structure. If caller
2506 : * specifies InvalidXLogRecPtr as WAL location to fetch the two-phase
2507 : * data, the entry is marked as located on disk.
2508 : */
2509 : void
2510 98 : PrepareRedoAdd(FullTransactionId fxid, char *buf,
2511 : XLogRecPtr start_lsn, XLogRecPtr end_lsn,
2512 : ReplOriginId origin_id)
2513 : {
2514 98 : TwoPhaseFileHeader *hdr = (TwoPhaseFileHeader *) buf;
2515 : char *bufptr;
2516 : const char *gid;
2517 : GlobalTransaction gxact;
2518 :
2519 : Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
2520 : Assert(RecoveryInProgress());
2521 :
2522 98 : if (!FullTransactionIdIsValid(fxid))
2523 : {
2524 : Assert(InRecovery);
2525 82 : fxid = FullTransactionIdFromAllowableAt(TransamVariables->nextXid,
2526 : hdr->xid);
2527 : }
2528 :
2529 98 : bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
2530 98 : gid = (const char *) bufptr;
2531 :
2532 : /*
2533 : * Reserve the GID for the given transaction in the redo code path.
2534 : *
2535 : * This creates a gxact struct and puts it into the active array.
2536 : *
2537 : * In redo, this struct is mainly used to track PREPARE/COMMIT entries in
2538 : * shared memory. Hence, we only fill up the bare minimum contents here.
2539 : * The gxact also gets marked with gxact->inredo set to true to indicate
2540 : * that it got added in the redo phase
2541 : */
2542 :
2543 : /*
2544 : * In the event of a crash while a checkpoint was running, it may be
2545 : * possible that some two-phase data found its way to disk while its
2546 : * corresponding record needs to be replayed in the follow-up recovery. As
2547 : * the 2PC data was on disk, it has already been restored at the beginning
2548 : * of recovery with restoreTwoPhaseData(), so skip this record to avoid
2549 : * duplicates in TwoPhaseState. If a consistent state has been reached,
2550 : * the record is added to TwoPhaseState and it should have no
2551 : * corresponding file in pg_twophase.
2552 : */
2553 98 : if (XLogRecPtrIsValid(start_lsn))
2554 : {
2555 : char path[MAXPGPATH];
2556 :
2557 : Assert(InRecovery);
2558 82 : TwoPhaseFilePath(path, fxid);
2559 :
2560 82 : if (access(path, F_OK) == 0)
2561 : {
2562 0 : ereport(reachedConsistency ? ERROR : WARNING,
2563 : (errmsg("could not recover two-phase state file for transaction %u",
2564 : hdr->xid),
2565 : errdetail("Two-phase state file has been found in WAL record %X/%08X, but this transaction has already been restored from disk.",
2566 : LSN_FORMAT_ARGS(start_lsn))));
2567 0 : return;
2568 : }
2569 :
2570 82 : if (errno != ENOENT)
2571 0 : ereport(ERROR,
2572 : (errcode_for_file_access(),
2573 : errmsg("could not access file \"%s\": %m", path)));
2574 : }
2575 :
2576 : /* Get a free gxact from the freelist */
2577 98 : if (TwoPhaseState->freeGXacts == NULL)
2578 0 : ereport(ERROR,
2579 : (errcode(ERRCODE_OUT_OF_MEMORY),
2580 : errmsg("maximum number of prepared transactions reached"),
2581 : errhint("Increase \"max_prepared_transactions\" (currently %d).",
2582 : max_prepared_xacts)));
2583 98 : gxact = TwoPhaseState->freeGXacts;
2584 98 : TwoPhaseState->freeGXacts = gxact->next;
2585 :
2586 98 : gxact->prepared_at = hdr->prepared_at;
2587 98 : gxact->prepare_start_lsn = start_lsn;
2588 98 : gxact->prepare_end_lsn = end_lsn;
2589 98 : gxact->fxid = fxid;
2590 98 : gxact->owner = hdr->owner;
2591 98 : gxact->locking_backend = INVALID_PROC_NUMBER;
2592 98 : gxact->valid = false;
2593 98 : gxact->ondisk = !XLogRecPtrIsValid(start_lsn);
2594 98 : gxact->inredo = true; /* yes, added in redo */
2595 98 : strcpy(gxact->gid, gid);
2596 :
2597 : /* And insert it into the active array */
2598 : Assert(TwoPhaseState->numPrepXacts < max_prepared_xacts);
2599 98 : TwoPhaseState->prepXacts[TwoPhaseState->numPrepXacts++] = gxact;
2600 :
2601 98 : if (origin_id != InvalidReplOriginId)
2602 : {
2603 : /* recover apply progress */
2604 13 : replorigin_advance(origin_id, hdr->origin_lsn, end_lsn,
2605 : false /* backward */ , false /* WAL */ );
2606 : }
2607 :
2608 98 : elog(DEBUG2, "added 2PC data in shared memory for transaction %u of epoch %u",
2609 : XidFromFullTransactionId(gxact->fxid),
2610 : EpochFromFullTransactionId(gxact->fxid));
2611 : }
2612 :
2613 : /*
2614 : * PrepareRedoRemoveFull
2615 : *
2616 : * Remove the corresponding gxact entry from TwoPhaseState. Also remove
2617 : * the 2PC file if a prepared transaction was saved via an earlier checkpoint.
2618 : *
2619 : * Caller must hold TwoPhaseStateLock in exclusive mode, because TwoPhaseState
2620 : * is updated.
2621 : */
2622 : static void
2623 72 : PrepareRedoRemoveFull(FullTransactionId fxid, bool giveWarning)
2624 : {
2625 72 : GlobalTransaction gxact = NULL;
2626 : int i;
2627 72 : bool found = false;
2628 :
2629 : Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
2630 : Assert(RecoveryInProgress());
2631 :
2632 72 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
2633 : {
2634 63 : gxact = TwoPhaseState->prepXacts[i];
2635 :
2636 63 : if (FullTransactionIdEquals(gxact->fxid, fxid))
2637 : {
2638 : Assert(gxact->inredo);
2639 63 : found = true;
2640 63 : break;
2641 : }
2642 : }
2643 :
2644 : /*
2645 : * Just leave if there is nothing, this is expected during WAL replay.
2646 : */
2647 72 : if (!found)
2648 9 : return;
2649 :
2650 : /*
2651 : * And now we can clean up any files we may have left.
2652 : */
2653 63 : elog(DEBUG2, "removing 2PC data for transaction %u of epoch %u ",
2654 : XidFromFullTransactionId(fxid),
2655 : EpochFromFullTransactionId(fxid));
2656 :
2657 63 : if (gxact->ondisk)
2658 5 : RemoveTwoPhaseFile(fxid, giveWarning);
2659 :
2660 63 : RemoveGXact(gxact);
2661 : }
2662 :
2663 : /*
2664 : * Wrapper of PrepareRedoRemoveFull(), for TransactionIds.
2665 : */
2666 : void
2667 72 : PrepareRedoRemove(TransactionId xid, bool giveWarning)
2668 : {
2669 : FullTransactionId fxid =
2670 72 : FullTransactionIdFromAllowableAt(TransamVariables->nextXid, xid);
2671 :
2672 72 : PrepareRedoRemoveFull(fxid, giveWarning);
2673 72 : }
2674 :
2675 : /*
2676 : * LookupGXact
2677 : * Check if the prepared transaction with the given GID, lsn and timestamp
2678 : * exists.
2679 : *
2680 : * Note that we always compare with the LSN where prepare ends because that is
2681 : * what is stored as origin_lsn in the 2PC file.
2682 : *
2683 : * This function is primarily used to check if the prepared transaction
2684 : * received from the upstream (remote node) already exists. Checking only GID
2685 : * is not sufficient because a different prepared xact with the same GID can
2686 : * exist on the same node. So, we are ensuring to match origin_lsn and
2687 : * origin_timestamp of prepared xact to avoid the possibility of a match of
2688 : * prepared xact from two different nodes.
2689 : */
2690 : bool
2691 5 : LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn,
2692 : TimestampTz origin_prepare_timestamp)
2693 : {
2694 : int i;
2695 5 : bool found = false;
2696 :
2697 5 : LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
2698 5 : for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
2699 : {
2700 5 : GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
2701 :
2702 : /* Ignore not-yet-valid GIDs. */
2703 5 : if (gxact->valid && strcmp(gxact->gid, gid) == 0)
2704 : {
2705 : char *buf;
2706 : TwoPhaseFileHeader *hdr;
2707 :
2708 : /*
2709 : * We are not expecting collisions of GXACTs (same gid) between
2710 : * publisher and subscribers, so we perform all I/O while holding
2711 : * TwoPhaseStateLock for simplicity.
2712 : *
2713 : * To move the I/O out of the lock, we need to ensure that no
2714 : * other backend commits the prepared xact in the meantime. We can
2715 : * do this optimization if we encounter many collisions in GID
2716 : * between publisher and subscriber.
2717 : */
2718 5 : if (gxact->ondisk)
2719 0 : buf = ReadTwoPhaseFile(gxact->fxid, false);
2720 : else
2721 : {
2722 : Assert(gxact->prepare_start_lsn);
2723 5 : XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, NULL);
2724 : }
2725 :
2726 5 : hdr = (TwoPhaseFileHeader *) buf;
2727 :
2728 5 : if (hdr->origin_lsn == prepare_end_lsn &&
2729 5 : hdr->origin_timestamp == origin_prepare_timestamp)
2730 : {
2731 5 : found = true;
2732 5 : pfree(buf);
2733 5 : break;
2734 : }
2735 :
2736 0 : pfree(buf);
2737 : }
2738 : }
2739 5 : LWLockRelease(TwoPhaseStateLock);
2740 5 : return found;
2741 : }
2742 :
2743 : /*
2744 : * TwoPhaseTransactionGid
2745 : * Form the prepared transaction GID for two_phase transactions.
2746 : *
2747 : * Return the GID in the supplied buffer.
2748 : */
2749 : void
2750 54 : TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid_res, int szgid)
2751 : {
2752 : Assert(OidIsValid(subid));
2753 :
2754 54 : if (!TransactionIdIsValid(xid))
2755 0 : ereport(ERROR,
2756 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
2757 : errmsg_internal("invalid two-phase transaction ID")));
2758 :
2759 54 : snprintf(gid_res, szgid, "pg_gid_%u_%u", subid, xid);
2760 54 : }
2761 :
2762 : /*
2763 : * IsTwoPhaseTransactionGidForSubid
2764 : * Check whether the given GID (as formed by TwoPhaseTransactionGid) is
2765 : * for the specified 'subid'.
2766 : */
2767 : static bool
2768 0 : IsTwoPhaseTransactionGidForSubid(Oid subid, char *gid)
2769 : {
2770 : int ret;
2771 : Oid subid_from_gid;
2772 : TransactionId xid_from_gid;
2773 : char gid_tmp[GIDSIZE];
2774 :
2775 : /* Extract the subid and xid from the given GID */
2776 0 : ret = sscanf(gid, "pg_gid_%u_%u", &subid_from_gid, &xid_from_gid);
2777 :
2778 : /*
2779 : * Check that the given GID has expected format, and at least the subid
2780 : * matches.
2781 : */
2782 0 : if (ret != 2 || subid != subid_from_gid)
2783 0 : return false;
2784 :
2785 : /*
2786 : * Reconstruct a temporary GID based on the subid and xid extracted from
2787 : * the given GID and check whether the temporary GID and the given GID
2788 : * match.
2789 : */
2790 0 : TwoPhaseTransactionGid(subid, xid_from_gid, gid_tmp, sizeof(gid_tmp));
2791 :
2792 0 : return strcmp(gid, gid_tmp) == 0;
2793 : }
2794 :
2795 : /*
2796 : * LookupGXactBySubid
2797 : * Check if the prepared transaction done by apply worker exists.
2798 : */
2799 : bool
2800 1 : LookupGXactBySubid(Oid subid)
2801 : {
2802 1 : bool found = false;
2803 :
2804 1 : LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
2805 1 : for (int i = 0; i < TwoPhaseState->numPrepXacts; i++)
2806 : {
2807 0 : GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
2808 :
2809 : /* Ignore not-yet-valid GIDs. */
2810 0 : if (gxact->valid &&
2811 0 : IsTwoPhaseTransactionGidForSubid(subid, gxact->gid))
2812 : {
2813 0 : found = true;
2814 0 : break;
2815 : }
2816 : }
2817 1 : LWLockRelease(TwoPhaseStateLock);
2818 :
2819 1 : return found;
2820 : }
2821 :
2822 : /*
2823 : * TwoPhaseGetOldestXidInCommit
2824 : * Return the oldest transaction ID from prepared transactions that are
2825 : * currently in the commit critical section.
2826 : *
2827 : * This function only considers transactions in the currently connected
2828 : * database. If no matching transactions are found, it returns
2829 : * InvalidTransactionId.
2830 : */
2831 : TransactionId
2832 6686 : TwoPhaseGetOldestXidInCommit(void)
2833 : {
2834 6686 : TransactionId oldestRunningXid = InvalidTransactionId;
2835 :
2836 6686 : LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
2837 :
2838 12777 : for (int i = 0; i < TwoPhaseState->numPrepXacts; i++)
2839 : {
2840 6091 : GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
2841 : PGPROC *commitproc;
2842 : TransactionId xid;
2843 :
2844 6091 : if (!gxact->valid)
2845 0 : continue;
2846 :
2847 6091 : if (gxact->locking_backend == INVALID_PROC_NUMBER)
2848 0 : continue;
2849 :
2850 : /*
2851 : * Get the backend that is handling the transaction. It's safe to
2852 : * access this backend while holding TwoPhaseStateLock, as the backend
2853 : * can only be destroyed after either removing or unlocking the
2854 : * current global transaction, both of which require an exclusive
2855 : * TwoPhaseStateLock.
2856 : */
2857 6091 : commitproc = GetPGProcByNumber(gxact->locking_backend);
2858 :
2859 6091 : if (MyDatabaseId != commitproc->databaseId)
2860 0 : continue;
2861 :
2862 6091 : if ((commitproc->delayChkptFlags & DELAY_CHKPT_IN_COMMIT) == 0)
2863 0 : continue;
2864 :
2865 6091 : xid = XidFromFullTransactionId(gxact->fxid);
2866 :
2867 6091 : if (!TransactionIdIsValid(oldestRunningXid) ||
2868 0 : TransactionIdPrecedes(xid, oldestRunningXid))
2869 6091 : oldestRunningXid = xid;
2870 : }
2871 :
2872 6686 : LWLockRelease(TwoPhaseStateLock);
2873 :
2874 6686 : return oldestRunningXid;
2875 : }
|