Line data Source code
1 : /*-------------------------------------------------------------------------
2 : * xid8funcs.c
3 : *
4 : * Export internal transaction IDs to user level.
5 : *
6 : * Note that only top-level transaction IDs are exposed to user sessions.
7 : * This is important because xid8s frequently persist beyond the global
8 : * xmin horizon, or may even be shipped to other machines, so we cannot
9 : * rely on being able to correlate subtransaction IDs with their parents
10 : * via functions such as SubTransGetTopmostTransaction().
11 : *
12 : * These functions are used to support the txid_XXX functions and the newer
13 : * pg_current_xact, pg_current_snapshot and related fmgr functions, since the
14 : * only difference between them is whether they expose xid8 or int8 values to
15 : * users. The txid_XXX variants should eventually be dropped.
16 : *
17 : *
18 : * Copyright (c) 2003-2022, PostgreSQL Global Development Group
19 : * Author: Jan Wieck, Afilias USA INC.
20 : * 64-bit txids: Marko Kreen, Skype Technologies
21 : *
22 : * src/backend/utils/adt/xid8funcs.c
23 : *
24 : *-------------------------------------------------------------------------
25 : */
26 :
27 : #include "postgres.h"
28 :
29 : #include "access/clog.h"
30 : #include "access/transam.h"
31 : #include "access/xact.h"
32 : #include "access/xlog.h"
33 : #include "funcapi.h"
34 : #include "lib/qunique.h"
35 : #include "libpq/pqformat.h"
36 : #include "miscadmin.h"
37 : #include "postmaster/postmaster.h"
38 : #include "storage/lwlock.h"
39 : #include "utils/builtins.h"
40 : #include "utils/memutils.h"
41 : #include "utils/snapmgr.h"
42 : #include "utils/xid8.h"
43 :
44 :
45 : /*
46 : * If defined, use bsearch() function for searching for xid8s in snapshots
47 : * that have more than the specified number of values.
48 : */
49 : #define USE_BSEARCH_IF_NXIP_GREATER 30
50 :
51 :
52 : /*
53 : * Snapshot containing FullTransactionIds.
54 : */
55 : typedef struct
56 : {
57 : /*
58 : * 4-byte length hdr, should not be touched directly.
59 : *
60 : * Explicit embedding is ok as we want always correct alignment anyway.
61 : */
62 : int32 __varsz;
63 :
64 : uint32 nxip; /* number of fxids in xip array */
65 : FullTransactionId xmin;
66 : FullTransactionId xmax;
67 : /* in-progress fxids, xmin <= xip[i] < xmax: */
68 : FullTransactionId xip[FLEXIBLE_ARRAY_MEMBER];
69 : } pg_snapshot;
70 :
71 : #define PG_SNAPSHOT_SIZE(nxip) \
72 : (offsetof(pg_snapshot, xip) + sizeof(FullTransactionId) * (nxip))
73 : #define PG_SNAPSHOT_MAX_NXIP \
74 : ((MaxAllocSize - offsetof(pg_snapshot, xip)) / sizeof(FullTransactionId))
75 :
76 : /*
77 : * Helper to get a TransactionId from a 64-bit xid with wraparound detection.
78 : *
79 : * It is an ERROR if the xid is in the future. Otherwise, returns true if
80 : * the transaction is still new enough that we can determine whether it
81 : * committed and false otherwise. If *extracted_xid is not NULL, it is set
82 : * to the low 32 bits of the transaction ID (i.e. the actual XID, without the
83 : * epoch).
84 : *
85 : * The caller must hold XactTruncationLock since it's dealing with arbitrary
86 : * XIDs, and must continue to hold it until it's done with any clog lookups
87 : * relating to those XIDs.
88 : */
89 : static bool
90 88 : TransactionIdInRecentPast(FullTransactionId fxid, TransactionId *extracted_xid)
91 : {
92 88 : uint32 xid_epoch = EpochFromFullTransactionId(fxid);
93 88 : TransactionId xid = XidFromFullTransactionId(fxid);
94 : uint32 now_epoch;
95 : TransactionId now_epoch_next_xid;
96 : FullTransactionId now_fullxid;
97 :
98 88 : now_fullxid = ReadNextFullTransactionId();
99 88 : now_epoch_next_xid = XidFromFullTransactionId(now_fullxid);
100 88 : now_epoch = EpochFromFullTransactionId(now_fullxid);
101 :
102 88 : if (extracted_xid != NULL)
103 88 : *extracted_xid = xid;
104 :
105 88 : if (!TransactionIdIsValid(xid))
106 0 : return false;
107 :
108 : /* For non-normal transaction IDs, we can ignore the epoch. */
109 88 : if (!TransactionIdIsNormal(xid))
110 24 : return true;
111 :
112 : /* If the transaction ID is in the future, throw an error. */
113 64 : if (!FullTransactionIdPrecedes(fxid, now_fullxid))
114 12 : ereport(ERROR,
115 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
116 : errmsg("transaction ID %llu is in the future",
117 : (unsigned long long) U64FromFullTransactionId(fxid))));
118 :
119 : /*
120 : * ShmemVariableCache->oldestClogXid is protected by XactTruncationLock,
121 : * but we don't acquire that lock here. Instead, we require the caller to
122 : * acquire it, because the caller is presumably going to look up the
123 : * returned XID. If we took and released the lock within this function, a
124 : * CLOG truncation could occur before the caller finished with the XID.
125 : */
126 : Assert(LWLockHeldByMe(XactTruncationLock));
127 :
128 : /*
129 : * If the transaction ID has wrapped around, it's definitely too old to
130 : * determine the commit status. Otherwise, we can compare it to
131 : * ShmemVariableCache->oldestClogXid to determine whether the relevant
132 : * CLOG entry is guaranteed to still exist.
133 : */
134 52 : if (xid_epoch + 1 < now_epoch
135 52 : || (xid_epoch + 1 == now_epoch && xid < now_epoch_next_xid)
136 52 : || TransactionIdPrecedes(xid, ShmemVariableCache->oldestClogXid))
137 12 : return false;
138 :
139 40 : return true;
140 : }
141 :
142 : /*
143 : * Convert a TransactionId obtained from a snapshot held by the caller to a
144 : * FullTransactionId. Use next_fxid as a reference FullTransactionId, so that
145 : * we can compute the high order bits. It must have been obtained by the
146 : * caller with ReadNextFullTransactionId() after the snapshot was created.
147 : */
148 : static FullTransactionId
149 60 : widen_snapshot_xid(TransactionId xid, FullTransactionId next_fxid)
150 : {
151 60 : TransactionId next_xid = XidFromFullTransactionId(next_fxid);
152 60 : uint32 epoch = EpochFromFullTransactionId(next_fxid);
153 :
154 : /* Special transaction ID. */
155 60 : if (!TransactionIdIsNormal(xid))
156 0 : return FullTransactionIdFromEpochAndXid(0, xid);
157 :
158 : /*
159 : * The 64 bit result must be <= next_fxid, since next_fxid hadn't been
160 : * issued yet when the snapshot was created. Every TransactionId in the
161 : * snapshot must therefore be from the same epoch as next_fxid, or the
162 : * epoch before. We know this because next_fxid is never allow to get
163 : * more than one epoch ahead of the TransactionIds in any snapshot.
164 : */
165 60 : if (xid > next_xid)
166 0 : epoch--;
167 :
168 60 : return FullTransactionIdFromEpochAndXid(epoch, xid);
169 : }
170 :
171 : /*
172 : * txid comparator for qsort/bsearch
173 : */
174 : static int
175 2696 : cmp_fxid(const void *aa, const void *bb)
176 : {
177 2696 : FullTransactionId a = *(const FullTransactionId *) aa;
178 2696 : FullTransactionId b = *(const FullTransactionId *) bb;
179 :
180 2696 : if (FullTransactionIdPrecedes(a, b))
181 648 : return -1;
182 2048 : if (FullTransactionIdPrecedes(b, a))
183 1676 : return 1;
184 372 : return 0;
185 : }
186 :
187 : /*
188 : * Sort a snapshot's txids, so we can use bsearch() later. Also remove
189 : * any duplicates.
190 : *
191 : * For consistency of on-disk representation, we always sort even if bsearch
192 : * will not be used.
193 : */
194 : static void
195 24 : sort_snapshot(pg_snapshot *snap)
196 : {
197 24 : if (snap->nxip > 1)
198 : {
199 4 : qsort(snap->xip, snap->nxip, sizeof(FullTransactionId), cmp_fxid);
200 4 : snap->nxip = qunique(snap->xip, snap->nxip, sizeof(FullTransactionId),
201 : cmp_fxid);
202 : }
203 24 : }
204 :
205 : /*
206 : * check fxid visibility.
207 : */
208 : static bool
209 1020 : is_visible_fxid(FullTransactionId value, const pg_snapshot *snap)
210 : {
211 1020 : if (FullTransactionIdPrecedes(value, snap->xmin))
212 132 : return true;
213 888 : else if (!FullTransactionIdPrecedes(value, snap->xmax))
214 168 : return false;
215 : #ifdef USE_BSEARCH_IF_NXIP_GREATER
216 720 : else if (snap->nxip > USE_BSEARCH_IF_NXIP_GREATER)
217 : {
218 : void *res;
219 :
220 600 : res = bsearch(&value, snap->xip, snap->nxip, sizeof(FullTransactionId),
221 : cmp_fxid);
222 : /* if found, transaction is still in progress */
223 600 : return (res) ? false : true;
224 : }
225 : #endif
226 : else
227 : {
228 : uint32 i;
229 :
230 360 : for (i = 0; i < snap->nxip; i++)
231 : {
232 288 : if (FullTransactionIdEquals(value, snap->xip[i]))
233 48 : return false;
234 : }
235 72 : return true;
236 : }
237 : }
238 :
239 : /*
240 : * helper functions to use StringInfo for pg_snapshot creation.
241 : */
242 :
243 : static StringInfo
244 168 : buf_init(FullTransactionId xmin, FullTransactionId xmax)
245 : {
246 : pg_snapshot snap;
247 : StringInfo buf;
248 :
249 168 : snap.xmin = xmin;
250 168 : snap.xmax = xmax;
251 168 : snap.nxip = 0;
252 :
253 168 : buf = makeStringInfo();
254 168 : appendBinaryStringInfo(buf, (char *) &snap, PG_SNAPSHOT_SIZE(0));
255 168 : return buf;
256 : }
257 :
258 : static void
259 612 : buf_add_txid(StringInfo buf, FullTransactionId fxid)
260 : {
261 612 : pg_snapshot *snap = (pg_snapshot *) buf->data;
262 :
263 : /* do this before possible realloc */
264 612 : snap->nxip++;
265 :
266 612 : appendBinaryStringInfo(buf, (char *) &fxid, sizeof(fxid));
267 612 : }
268 :
269 : static pg_snapshot *
270 144 : buf_finalize(StringInfo buf)
271 : {
272 144 : pg_snapshot *snap = (pg_snapshot *) buf->data;
273 :
274 144 : SET_VARSIZE(snap, buf->len);
275 :
276 : /* buf is not needed anymore */
277 144 : buf->data = NULL;
278 144 : pfree(buf);
279 :
280 144 : return snap;
281 : }
282 :
283 : /*
284 : * parse snapshot from cstring
285 : */
286 : static pg_snapshot *
287 204 : parse_snapshot(const char *str)
288 : {
289 : FullTransactionId xmin;
290 : FullTransactionId xmax;
291 204 : FullTransactionId last_val = InvalidFullTransactionId;
292 : FullTransactionId val;
293 204 : const char *str_start = str;
294 : char *endp;
295 : StringInfo buf;
296 :
297 204 : xmin = FullTransactionIdFromU64(strtou64(str, &endp, 10));
298 204 : if (*endp != ':')
299 0 : goto bad_format;
300 204 : str = endp + 1;
301 :
302 204 : xmax = FullTransactionIdFromU64(strtou64(str, &endp, 10));
303 204 : if (*endp != ':')
304 0 : goto bad_format;
305 204 : str = endp + 1;
306 :
307 : /* it should look sane */
308 204 : if (!FullTransactionIdIsValid(xmin) ||
309 192 : !FullTransactionIdIsValid(xmax) ||
310 180 : FullTransactionIdPrecedes(xmax, xmin))
311 36 : goto bad_format;
312 :
313 : /* allocate buffer */
314 168 : buf = buf_init(xmin, xmax);
315 :
316 : /* loop over values */
317 792 : while (*str != '\0')
318 : {
319 : /* read next value */
320 648 : val = FullTransactionIdFromU64(strtou64(str, &endp, 10));
321 648 : str = endp;
322 :
323 : /* require the input to be in order */
324 648 : if (FullTransactionIdPrecedes(val, xmin) ||
325 636 : FullTransactionIdFollowsOrEquals(val, xmax) ||
326 636 : FullTransactionIdPrecedes(val, last_val))
327 24 : goto bad_format;
328 :
329 : /* skip duplicates */
330 624 : if (!FullTransactionIdEquals(val, last_val))
331 612 : buf_add_txid(buf, val);
332 624 : last_val = val;
333 :
334 624 : if (*str == ',')
335 504 : str++;
336 120 : else if (*str != '\0')
337 0 : goto bad_format;
338 : }
339 :
340 144 : return buf_finalize(buf);
341 :
342 60 : bad_format:
343 60 : ereport(ERROR,
344 : (errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
345 : errmsg("invalid input syntax for type %s: \"%s\"",
346 : "pg_snapshot", str_start)));
347 : return NULL; /* keep compiler quiet */
348 : }
349 :
350 : /*
351 : * pg_current_xact_id() returns xid8
352 : *
353 : * Return the current toplevel full transaction ID.
354 : * If the current transaction does not have one, one is assigned.
355 : */
356 : Datum
357 5868 : pg_current_xact_id(PG_FUNCTION_ARGS)
358 : {
359 : /*
360 : * Must prevent during recovery because if an xid is not assigned we try
361 : * to assign one, which would fail. Programs already rely on this function
362 : * to always return a valid current xid, so we should not change this to
363 : * return NULL or similar invalid xid.
364 : */
365 5868 : PreventCommandDuringRecovery("pg_current_xact_id()");
366 :
367 5868 : PG_RETURN_FULLTRANSACTIONID(GetTopFullTransactionId());
368 : }
369 :
370 : /*
371 : * Same as pg_current_xact_id() but doesn't assign a new xid if there
372 : * isn't one yet.
373 : */
374 : Datum
375 24 : pg_current_xact_id_if_assigned(PG_FUNCTION_ARGS)
376 : {
377 24 : FullTransactionId topfxid = GetTopFullTransactionIdIfAny();
378 :
379 24 : if (!FullTransactionIdIsValid(topfxid))
380 12 : PG_RETURN_NULL();
381 :
382 12 : PG_RETURN_FULLTRANSACTIONID(topfxid);
383 : }
384 :
385 : /*
386 : * pg_current_snapshot() returns pg_snapshot
387 : *
388 : * Return current snapshot
389 : *
390 : * Note that only top-transaction XIDs are included in the snapshot.
391 : */
392 : Datum
393 24 : pg_current_snapshot(PG_FUNCTION_ARGS)
394 : {
395 : pg_snapshot *snap;
396 : uint32 nxip,
397 : i;
398 : Snapshot cur;
399 24 : FullTransactionId next_fxid = ReadNextFullTransactionId();
400 :
401 24 : cur = GetActiveSnapshot();
402 24 : if (cur == NULL)
403 0 : elog(ERROR, "no active snapshot set");
404 :
405 : /*
406 : * Compile-time limits on the procarray (MAX_BACKENDS processes plus
407 : * MAX_BACKENDS prepared transactions) guarantee nxip won't be too large.
408 : */
409 : StaticAssertStmt(MAX_BACKENDS * 2 <= PG_SNAPSHOT_MAX_NXIP,
410 : "possible overflow in pg_current_snapshot()");
411 :
412 : /* allocate */
413 24 : nxip = cur->xcnt;
414 24 : snap = palloc(PG_SNAPSHOT_SIZE(nxip));
415 :
416 : /* fill */
417 24 : snap->xmin = widen_snapshot_xid(cur->xmin, next_fxid);
418 24 : snap->xmax = widen_snapshot_xid(cur->xmax, next_fxid);
419 24 : snap->nxip = nxip;
420 36 : for (i = 0; i < nxip; i++)
421 12 : snap->xip[i] = widen_snapshot_xid(cur->xip[i], next_fxid);
422 :
423 : /*
424 : * We want them guaranteed to be in ascending order. This also removes
425 : * any duplicate xids. Normally, an XID can only be assigned to one
426 : * backend, but when preparing a transaction for two-phase commit, there
427 : * is a transient state when both the original backend and the dummy
428 : * PGPROC entry reserved for the prepared transaction hold the same XID.
429 : */
430 24 : sort_snapshot(snap);
431 :
432 : /* set size after sorting, because it may have removed duplicate xips */
433 24 : SET_VARSIZE(snap, PG_SNAPSHOT_SIZE(snap->nxip));
434 :
435 24 : PG_RETURN_POINTER(snap);
436 : }
437 :
438 : /*
439 : * pg_snapshot_in(cstring) returns pg_snapshot
440 : *
441 : * input function for type pg_snapshot
442 : */
443 : Datum
444 204 : pg_snapshot_in(PG_FUNCTION_ARGS)
445 : {
446 204 : char *str = PG_GETARG_CSTRING(0);
447 : pg_snapshot *snap;
448 :
449 204 : snap = parse_snapshot(str);
450 :
451 144 : PG_RETURN_POINTER(snap);
452 : }
453 :
454 : /*
455 : * pg_snapshot_out(pg_snapshot) returns cstring
456 : *
457 : * output function for type pg_snapshot
458 : */
459 : Datum
460 124 : pg_snapshot_out(PG_FUNCTION_ARGS)
461 : {
462 124 : pg_snapshot *snap = (pg_snapshot *) PG_GETARG_VARLENA_P(0);
463 : StringInfoData str;
464 : uint32 i;
465 :
466 124 : initStringInfo(&str);
467 :
468 124 : appendStringInfo(&str, UINT64_FORMAT ":",
469 : U64FromFullTransactionId(snap->xmin));
470 124 : appendStringInfo(&str, UINT64_FORMAT ":",
471 : U64FromFullTransactionId(snap->xmax));
472 :
473 688 : for (i = 0; i < snap->nxip; i++)
474 : {
475 564 : if (i > 0)
476 464 : appendStringInfoChar(&str, ',');
477 564 : appendStringInfo(&str, UINT64_FORMAT,
478 : U64FromFullTransactionId(snap->xip[i]));
479 : }
480 :
481 124 : PG_RETURN_CSTRING(str.data);
482 : }
483 :
484 : /*
485 : * pg_snapshot_recv(internal) returns pg_snapshot
486 : *
487 : * binary input function for type pg_snapshot
488 : *
489 : * format: int4 nxip, int8 xmin, int8 xmax, int8 xip
490 : */
491 : Datum
492 0 : pg_snapshot_recv(PG_FUNCTION_ARGS)
493 : {
494 0 : StringInfo buf = (StringInfo) PG_GETARG_POINTER(0);
495 : pg_snapshot *snap;
496 0 : FullTransactionId last = InvalidFullTransactionId;
497 : int nxip;
498 : int i;
499 : FullTransactionId xmin;
500 : FullTransactionId xmax;
501 :
502 : /* load and validate nxip */
503 0 : nxip = pq_getmsgint(buf, 4);
504 0 : if (nxip < 0 || nxip > PG_SNAPSHOT_MAX_NXIP)
505 0 : goto bad_format;
506 :
507 0 : xmin = FullTransactionIdFromU64((uint64) pq_getmsgint64(buf));
508 0 : xmax = FullTransactionIdFromU64((uint64) pq_getmsgint64(buf));
509 0 : if (!FullTransactionIdIsValid(xmin) ||
510 0 : !FullTransactionIdIsValid(xmax) ||
511 0 : FullTransactionIdPrecedes(xmax, xmin))
512 0 : goto bad_format;
513 :
514 0 : snap = palloc(PG_SNAPSHOT_SIZE(nxip));
515 0 : snap->xmin = xmin;
516 0 : snap->xmax = xmax;
517 :
518 0 : for (i = 0; i < nxip; i++)
519 : {
520 : FullTransactionId cur =
521 0 : FullTransactionIdFromU64((uint64) pq_getmsgint64(buf));
522 :
523 0 : if (FullTransactionIdPrecedes(cur, last) ||
524 0 : FullTransactionIdPrecedes(cur, xmin) ||
525 0 : FullTransactionIdPrecedes(xmax, cur))
526 0 : goto bad_format;
527 :
528 : /* skip duplicate xips */
529 0 : if (FullTransactionIdEquals(cur, last))
530 : {
531 0 : i--;
532 0 : nxip--;
533 0 : continue;
534 : }
535 :
536 0 : snap->xip[i] = cur;
537 0 : last = cur;
538 : }
539 0 : snap->nxip = nxip;
540 0 : SET_VARSIZE(snap, PG_SNAPSHOT_SIZE(nxip));
541 0 : PG_RETURN_POINTER(snap);
542 :
543 0 : bad_format:
544 0 : ereport(ERROR,
545 : (errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
546 : errmsg("invalid external pg_snapshot data")));
547 : PG_RETURN_POINTER(NULL); /* keep compiler quiet */
548 : }
549 :
550 : /*
551 : * pg_snapshot_send(pg_snapshot) returns bytea
552 : *
553 : * binary output function for type pg_snapshot
554 : *
555 : * format: int4 nxip, u64 xmin, u64 xmax, u64 xip...
556 : */
557 : Datum
558 0 : pg_snapshot_send(PG_FUNCTION_ARGS)
559 : {
560 0 : pg_snapshot *snap = (pg_snapshot *) PG_GETARG_VARLENA_P(0);
561 : StringInfoData buf;
562 : uint32 i;
563 :
564 0 : pq_begintypsend(&buf);
565 0 : pq_sendint32(&buf, snap->nxip);
566 0 : pq_sendint64(&buf, (int64) U64FromFullTransactionId(snap->xmin));
567 0 : pq_sendint64(&buf, (int64) U64FromFullTransactionId(snap->xmax));
568 0 : for (i = 0; i < snap->nxip; i++)
569 0 : pq_sendint64(&buf, (int64) U64FromFullTransactionId(snap->xip[i]));
570 0 : PG_RETURN_BYTEA_P(pq_endtypsend(&buf));
571 : }
572 :
573 : /*
574 : * pg_visible_in_snapshot(xid8, pg_snapshot) returns bool
575 : *
576 : * is txid visible in snapshot ?
577 : */
578 : Datum
579 1020 : pg_visible_in_snapshot(PG_FUNCTION_ARGS)
580 : {
581 1020 : FullTransactionId value = PG_GETARG_FULLTRANSACTIONID(0);
582 1020 : pg_snapshot *snap = (pg_snapshot *) PG_GETARG_VARLENA_P(1);
583 :
584 1020 : PG_RETURN_BOOL(is_visible_fxid(value, snap));
585 : }
586 :
587 : /*
588 : * pg_snapshot_xmin(pg_snapshot) returns xid8
589 : *
590 : * return snapshot's xmin
591 : */
592 : Datum
593 60 : pg_snapshot_xmin(PG_FUNCTION_ARGS)
594 : {
595 60 : pg_snapshot *snap = (pg_snapshot *) PG_GETARG_VARLENA_P(0);
596 :
597 60 : PG_RETURN_FULLTRANSACTIONID(snap->xmin);
598 : }
599 :
600 : /*
601 : * pg_snapshot_xmax(pg_snapshot) returns xid8
602 : *
603 : * return snapshot's xmax
604 : */
605 : Datum
606 48 : pg_snapshot_xmax(PG_FUNCTION_ARGS)
607 : {
608 48 : pg_snapshot *snap = (pg_snapshot *) PG_GETARG_VARLENA_P(0);
609 :
610 48 : PG_RETURN_FULLTRANSACTIONID(snap->xmax);
611 : }
612 :
613 : /*
614 : * pg_snapshot_xip(pg_snapshot) returns setof xid8
615 : *
616 : * return in-progress xid8s in snapshot.
617 : */
618 : Datum
619 492 : pg_snapshot_xip(PG_FUNCTION_ARGS)
620 : {
621 : FuncCallContext *fctx;
622 : pg_snapshot *snap;
623 : FullTransactionId value;
624 :
625 : /* on first call initialize fctx and get copy of snapshot */
626 492 : if (SRF_IS_FIRSTCALL())
627 : {
628 48 : pg_snapshot *arg = (pg_snapshot *) PG_GETARG_VARLENA_P(0);
629 :
630 48 : fctx = SRF_FIRSTCALL_INIT();
631 :
632 : /* make a copy of user snapshot */
633 48 : snap = MemoryContextAlloc(fctx->multi_call_memory_ctx, VARSIZE(arg));
634 48 : memcpy(snap, arg, VARSIZE(arg));
635 :
636 48 : fctx->user_fctx = snap;
637 : }
638 :
639 : /* return values one-by-one */
640 492 : fctx = SRF_PERCALL_SETUP();
641 492 : snap = fctx->user_fctx;
642 492 : if (fctx->call_cntr < snap->nxip)
643 : {
644 444 : value = snap->xip[fctx->call_cntr];
645 444 : SRF_RETURN_NEXT(fctx, FullTransactionIdGetDatum(value));
646 : }
647 : else
648 : {
649 48 : SRF_RETURN_DONE(fctx);
650 : }
651 : }
652 :
653 : /*
654 : * Report the status of a recent transaction ID, or null for wrapped,
655 : * truncated away or otherwise too old XIDs.
656 : *
657 : * The passed epoch-qualified xid is treated as a normal xid, not a
658 : * multixact id.
659 : *
660 : * If it points to a committed subxact the result is the subxact status even
661 : * though the parent xact may still be in progress or may have aborted.
662 : */
663 : Datum
664 88 : pg_xact_status(PG_FUNCTION_ARGS)
665 : {
666 : const char *status;
667 88 : FullTransactionId fxid = PG_GETARG_FULLTRANSACTIONID(0);
668 : TransactionId xid;
669 :
670 : /*
671 : * We must protect against concurrent truncation of clog entries to avoid
672 : * an I/O error on SLRU lookup.
673 : */
674 88 : LWLockAcquire(XactTruncationLock, LW_SHARED);
675 88 : if (TransactionIdInRecentPast(fxid, &xid))
676 : {
677 : Assert(TransactionIdIsValid(xid));
678 :
679 64 : if (TransactionIdIsCurrentTransactionId(xid))
680 12 : status = "in progress";
681 52 : else if (TransactionIdDidCommit(xid))
682 36 : status = "committed";
683 16 : else if (TransactionIdDidAbort(xid))
684 12 : status = "aborted";
685 : else
686 : {
687 : /*
688 : * The xact is not marked as either committed or aborted in clog.
689 : *
690 : * It could be a transaction that ended without updating clog or
691 : * writing an abort record due to a crash. We can safely assume
692 : * it's aborted if it isn't committed and is older than our
693 : * snapshot xmin.
694 : *
695 : * Otherwise it must be in-progress (or have been at the time we
696 : * checked commit/abort status).
697 : */
698 4 : if (TransactionIdPrecedes(xid, GetActiveSnapshot()->xmin))
699 2 : status = "aborted";
700 : else
701 2 : status = "in progress";
702 : }
703 : }
704 : else
705 : {
706 12 : status = NULL;
707 : }
708 76 : LWLockRelease(XactTruncationLock);
709 :
710 76 : if (status == NULL)
711 12 : PG_RETURN_NULL();
712 : else
713 64 : PG_RETURN_TEXT_P(cstring_to_text(status));
714 : }
|