Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * commit_ts.c
4 : * PostgreSQL commit timestamp manager
5 : *
6 : * This module is a pg_xact-like system that stores the commit timestamp
7 : * for each transaction.
8 : *
9 : * XLOG interactions: this module generates an XLOG record whenever a new
10 : * CommitTs page is initialized to zeroes. Other writes of CommitTS come
11 : * from recording of transaction commit in xact.c, which generates its own
12 : * XLOG records for these events and will re-perform the status update on
13 : * redo; so we need make no additional XLOG entry here.
14 : *
15 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
16 : * Portions Copyright (c) 1994, Regents of the University of California
17 : *
18 : * src/backend/access/transam/commit_ts.c
19 : *
20 : *-------------------------------------------------------------------------
21 : */
22 : #include "postgres.h"
23 :
24 : #include "access/commit_ts.h"
25 : #include "access/htup_details.h"
26 : #include "access/slru.h"
27 : #include "access/transam.h"
28 : #include "access/xloginsert.h"
29 : #include "access/xlogutils.h"
30 : #include "funcapi.h"
31 : #include "miscadmin.h"
32 : #include "storage/shmem.h"
33 : #include "utils/fmgrprotos.h"
34 : #include "utils/guc_hooks.h"
35 : #include "utils/timestamp.h"
36 :
37 : /*
38 : * Defines for CommitTs page sizes. A page is the same BLCKSZ as is used
39 : * everywhere else in Postgres.
40 : *
41 : * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
42 : * CommitTs page numbering also wraps around at
43 : * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE, and CommitTs segment numbering at
44 : * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT. We need take no
45 : * explicit notice of that fact in this module, except when comparing segment
46 : * and page numbers in TruncateCommitTs (see CommitTsPagePrecedes).
47 : */
48 :
49 : /*
50 : * We need 8+2 bytes per xact. Note that enlarging this struct might mean
51 : * the largest possible file name is more than 5 chars long; see
52 : * SlruScanDirectory.
53 : */
54 : typedef struct CommitTimestampEntry
55 : {
56 : TimestampTz time;
57 : ReplOriginId nodeid;
58 : } CommitTimestampEntry;
59 :
60 : #define SizeOfCommitTimestampEntry (offsetof(CommitTimestampEntry, nodeid) + \
61 : sizeof(ReplOriginId))
62 :
63 : #define COMMIT_TS_XACTS_PER_PAGE \
64 : (BLCKSZ / SizeOfCommitTimestampEntry)
65 :
66 :
67 : /*
68 : * Although we return an int64 the actual value can't currently exceed
69 : * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE.
70 : */
71 : static inline int64
72 2043 : TransactionIdToCTsPage(TransactionId xid)
73 : {
74 2043 : return xid / (int64) COMMIT_TS_XACTS_PER_PAGE;
75 : }
76 :
77 : #define TransactionIdToCTsEntry(xid) \
78 : ((xid) % (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
79 :
80 : /*
81 : * Link to shared-memory data structures for CommitTs control
82 : */
83 : static SlruCtlData CommitTsCtlData;
84 :
85 : #define CommitTsCtl (&CommitTsCtlData)
86 :
87 : /*
88 : * We keep a cache of the last value set in shared memory.
89 : *
90 : * This is also good place to keep the activation status. We keep this
91 : * separate from the GUC so that the standby can activate the module if the
92 : * primary has it active independently of the value of the GUC.
93 : *
94 : * This is protected by CommitTsLock. In some places, we use commitTsActive
95 : * without acquiring the lock; where this happens, a comment explains the
96 : * rationale for it.
97 : */
98 : typedef struct CommitTimestampShared
99 : {
100 : TransactionId xidLastCommit;
101 : CommitTimestampEntry dataLastCommit;
102 : bool commitTsActive;
103 : } CommitTimestampShared;
104 :
105 : static CommitTimestampShared *commitTsShared;
106 :
107 :
108 : /* GUC variable */
109 : bool track_commit_timestamp;
110 :
111 : static void SetXidCommitTsInPage(TransactionId xid, int nsubxids,
112 : TransactionId *subxids, TimestampTz ts,
113 : ReplOriginId nodeid, int64 pageno);
114 : static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
115 : ReplOriginId nodeid, int slotno);
116 : static void error_commit_ts_disabled(void);
117 : static bool CommitTsPagePrecedes(int64 page1, int64 page2);
118 : static int commit_ts_errdetail_for_io_error(const void *opaque_data);
119 : static void ActivateCommitTs(void);
120 : static void DeactivateCommitTs(void);
121 : static void WriteTruncateXlogRec(int64 pageno, TransactionId oldestXid);
122 :
123 : /*
124 : * TransactionTreeSetCommitTsData
125 : *
126 : * Record the final commit timestamp of transaction entries in the commit log
127 : * for a transaction and its subtransaction tree, as efficiently as possible.
128 : *
129 : * xid is the top level transaction id.
130 : *
131 : * subxids is an array of xids of length nsubxids, representing subtransactions
132 : * in the tree of xid. In various cases nsubxids may be zero.
133 : * The reason why tracking just the parent xid commit timestamp is not enough
134 : * is that the subtrans SLRU does not stay valid across crashes (it's not
135 : * permanent) so we need to keep the information about them here. If the
136 : * subtrans implementation changes in the future, we might want to revisit the
137 : * decision of storing timestamp info for each subxid.
138 : */
139 : void
140 174994 : TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids,
141 : TransactionId *subxids, TimestampTz timestamp,
142 : ReplOriginId nodeid)
143 : {
144 : int i;
145 : TransactionId headxid;
146 : TransactionId newestXact;
147 :
148 : /*
149 : * No-op if the module is not active.
150 : *
151 : * An unlocked read here is fine, because in a standby (the only place
152 : * where the flag can change in flight) this routine is only called by the
153 : * recovery process, which is also the only process which can change the
154 : * flag.
155 : */
156 174994 : if (!commitTsShared->commitTsActive)
157 174144 : return;
158 :
159 : /*
160 : * Figure out the latest Xid in this batch: either the last subxid if
161 : * there's any, otherwise the parent xid.
162 : */
163 850 : if (nsubxids > 0)
164 0 : newestXact = subxids[nsubxids - 1];
165 : else
166 850 : newestXact = xid;
167 :
168 : /*
169 : * We split the xids to set the timestamp to in groups belonging to the
170 : * same SLRU page; the first element in each such set is its head. The
171 : * first group has the main XID as the head; subsequent sets use the first
172 : * subxid not on the previous page as head. This way, we only have to
173 : * lock/modify each SLRU page once.
174 : */
175 850 : headxid = xid;
176 850 : i = 0;
177 : for (;;)
178 0 : {
179 850 : int64 pageno = TransactionIdToCTsPage(headxid);
180 : int j;
181 :
182 850 : for (j = i; j < nsubxids; j++)
183 : {
184 0 : if (TransactionIdToCTsPage(subxids[j]) != pageno)
185 0 : break;
186 : }
187 : /* subxids[i..j] are on the same page as the head */
188 :
189 850 : SetXidCommitTsInPage(headxid, j - i, subxids + i, timestamp, nodeid,
190 : pageno);
191 :
192 : /* if we wrote out all subxids, we're done. */
193 850 : if (j >= nsubxids)
194 850 : break;
195 :
196 : /*
197 : * Set the new head and skip over it, as well as over the subxids we
198 : * just wrote.
199 : */
200 0 : headxid = subxids[j];
201 0 : i = j + 1;
202 : }
203 :
204 : /* update the cached value in shared memory */
205 850 : LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
206 850 : commitTsShared->xidLastCommit = xid;
207 850 : commitTsShared->dataLastCommit.time = timestamp;
208 850 : commitTsShared->dataLastCommit.nodeid = nodeid;
209 :
210 : /* and move forwards our endpoint, if needed */
211 850 : if (TransactionIdPrecedes(TransamVariables->newestCommitTsXid, newestXact))
212 837 : TransamVariables->newestCommitTsXid = newestXact;
213 850 : LWLockRelease(CommitTsLock);
214 : }
215 :
216 : /*
217 : * Record the commit timestamp of transaction entries in the commit log for all
218 : * entries on a single page. Atomic only on this page.
219 : */
220 : static void
221 850 : SetXidCommitTsInPage(TransactionId xid, int nsubxids,
222 : TransactionId *subxids, TimestampTz ts,
223 : ReplOriginId nodeid, int64 pageno)
224 : {
225 850 : LWLock *lock = SimpleLruGetBankLock(CommitTsCtl, pageno);
226 : int slotno;
227 : int i;
228 :
229 850 : LWLockAcquire(lock, LW_EXCLUSIVE);
230 :
231 850 : slotno = SimpleLruReadPage(CommitTsCtl, pageno, true, &xid);
232 :
233 850 : TransactionIdSetCommitTs(xid, ts, nodeid, slotno);
234 850 : for (i = 0; i < nsubxids; i++)
235 0 : TransactionIdSetCommitTs(subxids[i], ts, nodeid, slotno);
236 :
237 850 : CommitTsCtl->shared->page_dirty[slotno] = true;
238 :
239 850 : LWLockRelease(lock);
240 850 : }
241 :
242 : /*
243 : * Sets the commit timestamp of a single transaction.
244 : *
245 : * Caller must hold the correct SLRU bank lock, will be held at exit
246 : */
247 : static void
248 850 : TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
249 : ReplOriginId nodeid, int slotno)
250 : {
251 850 : int entryno = TransactionIdToCTsEntry(xid);
252 : CommitTimestampEntry entry;
253 :
254 : Assert(TransactionIdIsNormal(xid));
255 :
256 850 : entry.time = ts;
257 850 : entry.nodeid = nodeid;
258 :
259 850 : memcpy(CommitTsCtl->shared->page_buffer[slotno] +
260 850 : SizeOfCommitTimestampEntry * entryno,
261 : &entry, SizeOfCommitTimestampEntry);
262 850 : }
263 :
264 : /*
265 : * Interrogate the commit timestamp of a transaction.
266 : *
267 : * The return value indicates whether a commit timestamp record was found for
268 : * the given xid. The timestamp value is returned in *ts (which may not be
269 : * null), and the origin node for the Xid is returned in *nodeid, if it's not
270 : * null.
271 : */
272 : bool
273 104 : TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
274 : ReplOriginId *nodeid)
275 : {
276 104 : int64 pageno = TransactionIdToCTsPage(xid);
277 104 : int entryno = TransactionIdToCTsEntry(xid);
278 : int slotno;
279 : CommitTimestampEntry entry;
280 : TransactionId oldestCommitTsXid;
281 : TransactionId newestCommitTsXid;
282 :
283 104 : if (!TransactionIdIsValid(xid))
284 3 : ereport(ERROR,
285 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
286 : errmsg("cannot retrieve commit timestamp for transaction %u", xid)));
287 101 : else if (!TransactionIdIsNormal(xid))
288 : {
289 : /* frozen and bootstrap xids are always committed far in the past */
290 6 : *ts = 0;
291 6 : if (nodeid)
292 2 : *nodeid = 0;
293 6 : return false;
294 : }
295 :
296 95 : LWLockAcquire(CommitTsLock, LW_SHARED);
297 :
298 : /* Error if module not enabled */
299 95 : if (!commitTsShared->commitTsActive)
300 3 : error_commit_ts_disabled();
301 :
302 : /*
303 : * If we're asked for the cached value, return that. Otherwise, fall
304 : * through to read from SLRU.
305 : */
306 92 : if (commitTsShared->xidLastCommit == xid)
307 : {
308 19 : *ts = commitTsShared->dataLastCommit.time;
309 19 : if (nodeid)
310 10 : *nodeid = commitTsShared->dataLastCommit.nodeid;
311 :
312 19 : LWLockRelease(CommitTsLock);
313 19 : return *ts != 0;
314 : }
315 :
316 73 : oldestCommitTsXid = TransamVariables->oldestCommitTsXid;
317 73 : newestCommitTsXid = TransamVariables->newestCommitTsXid;
318 : /* neither is invalid, or both are */
319 : Assert(TransactionIdIsValid(oldestCommitTsXid) == TransactionIdIsValid(newestCommitTsXid));
320 73 : LWLockRelease(CommitTsLock);
321 :
322 : /*
323 : * Return empty if the requested value is outside our valid range.
324 : */
325 146 : if (!TransactionIdIsValid(oldestCommitTsXid) ||
326 87 : TransactionIdPrecedes(xid, oldestCommitTsXid) ||
327 14 : TransactionIdPrecedes(newestCommitTsXid, xid))
328 : {
329 59 : *ts = 0;
330 59 : if (nodeid)
331 56 : *nodeid = InvalidReplOriginId;
332 59 : return false;
333 : }
334 :
335 : /* lock is acquired by SimpleLruReadPage_ReadOnly */
336 14 : slotno = SimpleLruReadPage_ReadOnly(CommitTsCtl, pageno, &xid);
337 14 : memcpy(&entry,
338 14 : CommitTsCtl->shared->page_buffer[slotno] +
339 14 : SizeOfCommitTimestampEntry * entryno,
340 : SizeOfCommitTimestampEntry);
341 :
342 14 : *ts = entry.time;
343 14 : if (nodeid)
344 7 : *nodeid = entry.nodeid;
345 :
346 14 : LWLockRelease(SimpleLruGetBankLock(CommitTsCtl, pageno));
347 14 : return *ts != 0;
348 : }
349 :
350 : /*
351 : * Return the Xid of the latest committed transaction. (As far as this module
352 : * is concerned, anyway; it's up to the caller to ensure the value is useful
353 : * for its purposes.)
354 : *
355 : * ts and nodeid are filled with the corresponding data; they can be passed
356 : * as NULL if not wanted.
357 : */
358 : TransactionId
359 4 : GetLatestCommitTsData(TimestampTz *ts, ReplOriginId *nodeid)
360 : {
361 : TransactionId xid;
362 :
363 4 : LWLockAcquire(CommitTsLock, LW_SHARED);
364 :
365 : /* Error if module not enabled */
366 4 : if (!commitTsShared->commitTsActive)
367 0 : error_commit_ts_disabled();
368 :
369 4 : xid = commitTsShared->xidLastCommit;
370 4 : if (ts)
371 4 : *ts = commitTsShared->dataLastCommit.time;
372 4 : if (nodeid)
373 4 : *nodeid = commitTsShared->dataLastCommit.nodeid;
374 4 : LWLockRelease(CommitTsLock);
375 :
376 4 : return xid;
377 : }
378 :
379 : static void
380 3 : error_commit_ts_disabled(void)
381 : {
382 3 : ereport(ERROR,
383 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
384 : errmsg("could not get commit timestamp data"),
385 : RecoveryInProgress() ?
386 : errhint("Make sure the configuration parameter \"%s\" is set on the primary server.",
387 : "track_commit_timestamp") :
388 : errhint("Make sure the configuration parameter \"%s\" is set.",
389 : "track_commit_timestamp")));
390 : }
391 :
392 : /*
393 : * SQL-callable wrapper to obtain commit time of a transaction
394 : */
395 : Datum
396 28 : pg_xact_commit_timestamp(PG_FUNCTION_ARGS)
397 : {
398 28 : TransactionId xid = PG_GETARG_TRANSACTIONID(0);
399 : TimestampTz ts;
400 : bool found;
401 :
402 28 : found = TransactionIdGetCommitTsData(xid, &ts, NULL);
403 :
404 23 : if (!found)
405 7 : PG_RETURN_NULL();
406 :
407 16 : PG_RETURN_TIMESTAMPTZ(ts);
408 : }
409 :
410 :
411 : /*
412 : * pg_last_committed_xact
413 : *
414 : * SQL-callable wrapper to obtain some information about the latest
415 : * committed transaction: transaction ID, timestamp and replication
416 : * origin.
417 : */
418 : Datum
419 4 : pg_last_committed_xact(PG_FUNCTION_ARGS)
420 : {
421 : TransactionId xid;
422 : ReplOriginId nodeid;
423 : TimestampTz ts;
424 : Datum values[3];
425 : bool nulls[3];
426 : TupleDesc tupdesc;
427 : HeapTuple htup;
428 :
429 : /* and construct a tuple with our data */
430 4 : xid = GetLatestCommitTsData(&ts, &nodeid);
431 :
432 4 : if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
433 0 : elog(ERROR, "return type must be a row type");
434 :
435 4 : if (!TransactionIdIsNormal(xid))
436 : {
437 0 : memset(nulls, true, sizeof(nulls));
438 : }
439 : else
440 : {
441 4 : values[0] = TransactionIdGetDatum(xid);
442 4 : nulls[0] = false;
443 :
444 4 : values[1] = TimestampTzGetDatum(ts);
445 4 : nulls[1] = false;
446 :
447 4 : values[2] = ObjectIdGetDatum((Oid) nodeid);
448 4 : nulls[2] = false;
449 : }
450 :
451 4 : htup = heap_form_tuple(tupdesc, values, nulls);
452 :
453 4 : PG_RETURN_DATUM(HeapTupleGetDatum(htup));
454 : }
455 :
456 : /*
457 : * pg_xact_commit_timestamp_origin
458 : *
459 : * SQL-callable wrapper to obtain commit timestamp and replication origin
460 : * of a given transaction.
461 : */
462 : Datum
463 5 : pg_xact_commit_timestamp_origin(PG_FUNCTION_ARGS)
464 : {
465 5 : TransactionId xid = PG_GETARG_TRANSACTIONID(0);
466 : ReplOriginId nodeid;
467 : TimestampTz ts;
468 : Datum values[2];
469 : bool nulls[2];
470 : TupleDesc tupdesc;
471 : HeapTuple htup;
472 : bool found;
473 :
474 5 : found = TransactionIdGetCommitTsData(xid, &ts, &nodeid);
475 :
476 4 : if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
477 0 : elog(ERROR, "return type must be a row type");
478 :
479 4 : if (!found)
480 : {
481 2 : memset(nulls, true, sizeof(nulls));
482 : }
483 : else
484 : {
485 2 : values[0] = TimestampTzGetDatum(ts);
486 2 : nulls[0] = false;
487 :
488 2 : values[1] = ObjectIdGetDatum((Oid) nodeid);
489 2 : nulls[1] = false;
490 : }
491 :
492 4 : htup = heap_form_tuple(tupdesc, values, nulls);
493 :
494 4 : PG_RETURN_DATUM(HeapTupleGetDatum(htup));
495 : }
496 :
497 : /*
498 : * Number of shared CommitTS buffers.
499 : *
500 : * If asked to autotune, use 2MB for every 1GB of shared buffers, up to 8MB.
501 : * Otherwise just cap the configured amount to be between 16 and the maximum
502 : * allowed.
503 : */
504 : static int
505 4562 : CommitTsShmemBuffers(void)
506 : {
507 : /* auto-tune based on shared buffers */
508 4562 : if (commit_timestamp_buffers == 0)
509 3377 : return SimpleLruAutotuneBuffers(512, 1024);
510 :
511 1185 : return Min(Max(16, commit_timestamp_buffers), SLRU_MAX_ALLOWED_BUFFERS);
512 : }
513 :
514 : /*
515 : * Shared memory sizing for CommitTs
516 : */
517 : Size
518 2207 : CommitTsShmemSize(void)
519 : {
520 2207 : return SimpleLruShmemSize(CommitTsShmemBuffers(), 0) +
521 : sizeof(CommitTimestampShared);
522 : }
523 :
524 : /*
525 : * Initialize CommitTs at system startup (postmaster start or standalone
526 : * backend)
527 : */
528 : void
529 1180 : CommitTsShmemInit(void)
530 : {
531 : bool found;
532 :
533 : /* If auto-tuning is requested, now is the time to do it */
534 1180 : if (commit_timestamp_buffers == 0)
535 : {
536 : char buf[32];
537 :
538 1175 : snprintf(buf, sizeof(buf), "%d", CommitTsShmemBuffers());
539 1175 : SetConfigOption("commit_timestamp_buffers", buf, PGC_POSTMASTER,
540 : PGC_S_DYNAMIC_DEFAULT);
541 :
542 : /*
543 : * We prefer to report this value's source as PGC_S_DYNAMIC_DEFAULT.
544 : * However, if the DBA explicitly set commit_timestamp_buffers = 0 in
545 : * the config file, then PGC_S_DYNAMIC_DEFAULT will fail to override
546 : * that and we must force the matter with PGC_S_OVERRIDE.
547 : */
548 1175 : if (commit_timestamp_buffers == 0) /* failed to apply it? */
549 0 : SetConfigOption("commit_timestamp_buffers", buf, PGC_POSTMASTER,
550 : PGC_S_OVERRIDE);
551 : }
552 : Assert(commit_timestamp_buffers != 0);
553 :
554 1180 : CommitTsCtl->PagePrecedes = CommitTsPagePrecedes;
555 1180 : CommitTsCtl->errdetail_for_io_error = commit_ts_errdetail_for_io_error;
556 1180 : SimpleLruInit(CommitTsCtl, "commit_timestamp", CommitTsShmemBuffers(), 0,
557 : "pg_commit_ts", LWTRANCHE_COMMITTS_BUFFER,
558 : LWTRANCHE_COMMITTS_SLRU,
559 : SYNC_HANDLER_COMMIT_TS,
560 : false);
561 : SlruPagePrecedesUnitTests(CommitTsCtl, COMMIT_TS_XACTS_PER_PAGE);
562 :
563 1180 : commitTsShared = ShmemInitStruct("CommitTs shared",
564 : sizeof(CommitTimestampShared),
565 : &found);
566 :
567 1180 : if (!IsUnderPostmaster)
568 : {
569 : Assert(!found);
570 :
571 1180 : commitTsShared->xidLastCommit = InvalidTransactionId;
572 1180 : TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
573 1180 : commitTsShared->dataLastCommit.nodeid = InvalidReplOriginId;
574 1180 : commitTsShared->commitTsActive = false;
575 : }
576 : else
577 : Assert(found);
578 1180 : }
579 :
580 : /*
581 : * GUC check_hook for commit_timestamp_buffers
582 : */
583 : bool
584 2397 : check_commit_ts_buffers(int *newval, void **extra, GucSource source)
585 : {
586 2397 : return check_slru_buffers("commit_timestamp_buffers", newval);
587 : }
588 :
589 : /*
590 : * This function must be called ONCE on system install.
591 : *
592 : * (The CommitTs directory is assumed to have been created by initdb, and
593 : * CommitTsShmemInit must have been called already.)
594 : */
595 : void
596 51 : BootStrapCommitTs(void)
597 : {
598 : /*
599 : * Nothing to do here at present, unlike most other SLRU modules; segments
600 : * are created when the server is started with this module enabled. See
601 : * ActivateCommitTs.
602 : */
603 51 : }
604 :
605 : /*
606 : * This must be called ONCE during postmaster or standalone-backend startup,
607 : * after StartupXLOG has initialized TransamVariables->nextXid.
608 : */
609 : void
610 14 : StartupCommitTs(void)
611 : {
612 14 : ActivateCommitTs();
613 14 : }
614 :
615 : /*
616 : * This must be called ONCE during postmaster or standalone-backend startup,
617 : * after recovery has finished.
618 : */
619 : void
620 970 : CompleteCommitTsInitialization(void)
621 : {
622 : /*
623 : * If the feature is not enabled, turn it off for good. This also removes
624 : * any leftover data.
625 : *
626 : * Conversely, we activate the module if the feature is enabled. This is
627 : * necessary for primary and standby as the activation depends on the
628 : * control file contents at the beginning of recovery or when a
629 : * XLOG_PARAMETER_CHANGE is replayed.
630 : */
631 970 : if (!track_commit_timestamp)
632 948 : DeactivateCommitTs();
633 : else
634 22 : ActivateCommitTs();
635 970 : }
636 :
637 : /*
638 : * Activate or deactivate CommitTs' upon reception of a XLOG_PARAMETER_CHANGE
639 : * XLog record during recovery.
640 : */
641 : void
642 38 : CommitTsParameterChange(bool newvalue, bool oldvalue)
643 : {
644 : /*
645 : * If the commit_ts module is disabled in this server and we get word from
646 : * the primary server that it is enabled there, activate it so that we can
647 : * replay future WAL records involving it; also mark it as active on
648 : * pg_control. If the old value was already set, we already did this, so
649 : * don't do anything.
650 : *
651 : * If the module is disabled in the primary, disable it here too, unless
652 : * the module is enabled locally.
653 : *
654 : * Note this only runs in the recovery process, so an unlocked read is
655 : * fine.
656 : */
657 38 : if (newvalue)
658 : {
659 2 : if (!commitTsShared->commitTsActive)
660 0 : ActivateCommitTs();
661 : }
662 36 : else if (commitTsShared->commitTsActive)
663 1 : DeactivateCommitTs();
664 38 : }
665 :
666 : /*
667 : * Activate this module whenever necessary.
668 : * This must happen during postmaster or standalone-backend startup,
669 : * or during WAL replay anytime the track_commit_timestamp setting is
670 : * changed in the primary.
671 : *
672 : * The reason why this SLRU needs separate activation/deactivation functions is
673 : * that it can be enabled/disabled during start and the activation/deactivation
674 : * on the primary is propagated to the standby via replay. Other SLRUs don't
675 : * have this property and they can be just initialized during normal startup.
676 : *
677 : * This is in charge of creating the currently active segment, if it's not
678 : * already there. The reason for this is that the server might have been
679 : * running with this module disabled for a while and thus might have skipped
680 : * the normal creation point.
681 : */
682 : static void
683 36 : ActivateCommitTs(void)
684 : {
685 : TransactionId xid;
686 : int64 pageno;
687 :
688 : /*
689 : * During bootstrap, we should not register commit timestamps so skip the
690 : * activation in this case.
691 : */
692 36 : if (IsBootstrapProcessingMode())
693 2 : return;
694 :
695 : /* If we've done this already, there's nothing to do */
696 34 : LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
697 34 : if (commitTsShared->commitTsActive)
698 : {
699 6 : LWLockRelease(CommitTsLock);
700 6 : return;
701 : }
702 28 : LWLockRelease(CommitTsLock);
703 :
704 28 : xid = XidFromFullTransactionId(TransamVariables->nextXid);
705 28 : pageno = TransactionIdToCTsPage(xid);
706 :
707 : /*
708 : * Re-Initialize our idea of the latest page number.
709 : */
710 28 : pg_atomic_write_u64(&CommitTsCtl->shared->latest_page_number, pageno);
711 :
712 : /*
713 : * If CommitTs is enabled, but it wasn't in the previous server run, we
714 : * need to set the oldest and newest values to the next Xid; that way, we
715 : * will not try to read data that might not have been set.
716 : *
717 : * XXX does this have a problem if a server is started with commitTs
718 : * enabled, then started with commitTs disabled, then restarted with it
719 : * enabled again? It doesn't look like it does, because there should be a
720 : * checkpoint that sets the value to InvalidTransactionId at end of
721 : * recovery; and so any chance of injecting new transactions without
722 : * CommitTs values would occur after the oldestCommitTsXid has been set to
723 : * Invalid temporarily.
724 : */
725 28 : LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
726 28 : if (TransamVariables->oldestCommitTsXid == InvalidTransactionId)
727 : {
728 16 : TransamVariables->oldestCommitTsXid =
729 16 : TransamVariables->newestCommitTsXid = ReadNextTransactionId();
730 : }
731 28 : LWLockRelease(CommitTsLock);
732 :
733 : /* Create the current segment file, if necessary */
734 28 : if (!SimpleLruDoesPhysicalPageExist(CommitTsCtl, pageno))
735 14 : SimpleLruZeroAndWritePage(CommitTsCtl, pageno);
736 :
737 : /* Change the activation status in shared memory. */
738 28 : LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
739 28 : commitTsShared->commitTsActive = true;
740 28 : LWLockRelease(CommitTsLock);
741 : }
742 :
743 : /*
744 : * Deactivate this module.
745 : *
746 : * This must be called when the track_commit_timestamp parameter is turned off.
747 : * This happens during postmaster or standalone-backend startup, or during WAL
748 : * replay.
749 : *
750 : * Resets CommitTs into invalid state to make sure we don't hand back
751 : * possibly-invalid data; also removes segments of old data.
752 : */
753 : static void
754 949 : DeactivateCommitTs(void)
755 : {
756 : /*
757 : * Cleanup the status in the shared memory.
758 : *
759 : * We reset everything in the commitTsShared record to prevent user from
760 : * getting confusing data about last committed transaction on the standby
761 : * when the module was activated repeatedly on the primary.
762 : */
763 949 : LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
764 :
765 949 : commitTsShared->commitTsActive = false;
766 949 : commitTsShared->xidLastCommit = InvalidTransactionId;
767 949 : TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
768 949 : commitTsShared->dataLastCommit.nodeid = InvalidReplOriginId;
769 :
770 949 : TransamVariables->oldestCommitTsXid = InvalidTransactionId;
771 949 : TransamVariables->newestCommitTsXid = InvalidTransactionId;
772 :
773 : /*
774 : * Remove *all* files. This is necessary so that there are no leftover
775 : * files; in the case where this feature is later enabled after running
776 : * with it disabled for some time there may be a gap in the file sequence.
777 : * (We can probably tolerate out-of-sequence files, as they are going to
778 : * be overwritten anyway when we wrap around, but it seems better to be
779 : * tidy.)
780 : *
781 : * Note that we do this with CommitTsLock acquired in exclusive mode. This
782 : * is very heavy-handed, but since this routine can only be called in the
783 : * replica and should happen very rarely, we don't worry too much about
784 : * it. Note also that no process should be consulting this SLRU if we
785 : * have just deactivated it.
786 : */
787 949 : (void) SlruScanDirectory(CommitTsCtl, SlruScanDirCbDeleteAll, NULL);
788 :
789 949 : LWLockRelease(CommitTsLock);
790 949 : }
791 :
792 : /*
793 : * Perform a checkpoint --- either during shutdown, or on-the-fly
794 : */
795 : void
796 1835 : CheckPointCommitTs(void)
797 : {
798 : /*
799 : * Write dirty CommitTs pages to disk. This may result in sync requests
800 : * queued for later handling by ProcessSyncRequests(), as part of the
801 : * checkpoint.
802 : */
803 1835 : SimpleLruWriteAll(CommitTsCtl, true);
804 1835 : }
805 :
806 : /*
807 : * Make sure that CommitTs has room for a newly-allocated XID.
808 : *
809 : * NB: this is called while holding XidGenLock. We want it to be very fast
810 : * most of the time; even when it's not so fast, no actual I/O need happen
811 : * unless we're forced to write out a dirty CommitTs or xlog page to make room
812 : * in shared memory.
813 : *
814 : * NB: the current implementation relies on track_commit_timestamp being
815 : * PGC_POSTMASTER.
816 : */
817 : void
818 24536200 : ExtendCommitTs(TransactionId newestXact)
819 : {
820 : int64 pageno;
821 : LWLock *lock;
822 :
823 : /*
824 : * Nothing to do if module not enabled. Note we do an unlocked read of
825 : * the flag here, which is okay because this routine is only called from
826 : * GetNewTransactionId, which is never called in a standby.
827 : */
828 : Assert(!InRecovery);
829 24536200 : if (!commitTsShared->commitTsActive)
830 24535331 : return;
831 :
832 : /*
833 : * No work except at first XID of a page. But beware: just after
834 : * wraparound, the first XID of page zero is FirstNormalTransactionId.
835 : */
836 869 : if (TransactionIdToCTsEntry(newestXact) != 0 &&
837 : !TransactionIdEquals(newestXact, FirstNormalTransactionId))
838 868 : return;
839 :
840 1 : pageno = TransactionIdToCTsPage(newestXact);
841 :
842 1 : lock = SimpleLruGetBankLock(CommitTsCtl, pageno);
843 :
844 1 : LWLockAcquire(lock, LW_EXCLUSIVE);
845 :
846 : /* Zero the page ... */
847 1 : SimpleLruZeroPage(CommitTsCtl, pageno);
848 :
849 : /* and make a WAL entry about that, unless we're in REDO */
850 1 : if (!InRecovery)
851 1 : XLogSimpleInsertInt64(RM_COMMIT_TS_ID, COMMIT_TS_ZEROPAGE, pageno);
852 :
853 1 : LWLockRelease(lock);
854 : }
855 :
856 : /*
857 : * Remove all CommitTs segments before the one holding the passed
858 : * transaction ID.
859 : *
860 : * Note that we don't need to flush XLOG here.
861 : */
862 : void
863 1060 : TruncateCommitTs(TransactionId oldestXact)
864 : {
865 : int64 cutoffPage;
866 :
867 : /*
868 : * The cutoff point is the start of the segment containing oldestXact. We
869 : * pass the *page* containing oldestXact to SimpleLruTruncate.
870 : */
871 1060 : cutoffPage = TransactionIdToCTsPage(oldestXact);
872 :
873 : /* Check to see if there's any files that could be removed */
874 1060 : if (!SlruScanDirectory(CommitTsCtl, SlruScanDirCbReportPresence,
875 : &cutoffPage))
876 1060 : return; /* nothing to remove */
877 :
878 : /* Write XLOG record */
879 0 : WriteTruncateXlogRec(cutoffPage, oldestXact);
880 :
881 : /* Now we can remove the old CommitTs segment(s) */
882 0 : SimpleLruTruncate(CommitTsCtl, cutoffPage);
883 : }
884 :
885 : /*
886 : * Set the limit values between which commit TS can be consulted.
887 : */
888 : void
889 1084 : SetCommitTsLimit(TransactionId oldestXact, TransactionId newestXact)
890 : {
891 : /*
892 : * Be careful not to overwrite values that are either further into the
893 : * "future" or signal a disabled committs.
894 : */
895 1084 : LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
896 1084 : if (TransamVariables->oldestCommitTsXid != InvalidTransactionId)
897 : {
898 0 : if (TransactionIdPrecedes(TransamVariables->oldestCommitTsXid, oldestXact))
899 0 : TransamVariables->oldestCommitTsXid = oldestXact;
900 0 : if (TransactionIdPrecedes(newestXact, TransamVariables->newestCommitTsXid))
901 0 : TransamVariables->newestCommitTsXid = newestXact;
902 : }
903 : else
904 : {
905 : Assert(TransamVariables->newestCommitTsXid == InvalidTransactionId);
906 1084 : TransamVariables->oldestCommitTsXid = oldestXact;
907 1084 : TransamVariables->newestCommitTsXid = newestXact;
908 : }
909 1084 : LWLockRelease(CommitTsLock);
910 1084 : }
911 :
912 : /*
913 : * Move forwards the oldest commitTS value that can be consulted
914 : */
915 : void
916 1060 : AdvanceOldestCommitTsXid(TransactionId oldestXact)
917 : {
918 1060 : LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
919 1061 : if (TransamVariables->oldestCommitTsXid != InvalidTransactionId &&
920 1 : TransactionIdPrecedes(TransamVariables->oldestCommitTsXid, oldestXact))
921 1 : TransamVariables->oldestCommitTsXid = oldestXact;
922 1060 : LWLockRelease(CommitTsLock);
923 1060 : }
924 :
925 :
926 : /*
927 : * Decide whether a commitTS page number is "older" for truncation purposes.
928 : * Analogous to CLOGPagePrecedes().
929 : *
930 : * At default BLCKSZ, (1 << 31) % COMMIT_TS_XACTS_PER_PAGE == 128. This
931 : * introduces differences compared to CLOG and the other SLRUs having (1 <<
932 : * 31) % per_page == 0. This function never tests exactly
933 : * TransactionIdPrecedes(x-2^31, x). When the system reaches xidStopLimit,
934 : * there are two possible counts of page boundaries between oldestXact and the
935 : * latest XID assigned, depending on whether oldestXact is within the first
936 : * 128 entries of its page. Since this function doesn't know the location of
937 : * oldestXact within page2, it returns false for one page that actually is
938 : * expendable. This is a wider (yet still negligible) version of the
939 : * truncation opportunity that CLOGPagePrecedes() cannot recognize.
940 : *
941 : * For the sake of a worked example, number entries with decimal values such
942 : * that page1==1 entries range from 1.0 to 1.999. Let N+0.15 be the number of
943 : * pages that 2^31 entries will span (N is an integer). If oldestXact=N+2.1,
944 : * then the final safe XID assignment leaves newestXact=1.95. We keep page 2,
945 : * because entry=2.85 is the border that toggles whether entries precede the
946 : * last entry of the oldestXact page. While page 2 is expendable at
947 : * oldestXact=N+2.1, it would be precious at oldestXact=N+2.9.
948 : */
949 : static bool
950 1 : CommitTsPagePrecedes(int64 page1, int64 page2)
951 : {
952 : TransactionId xid1;
953 : TransactionId xid2;
954 :
955 1 : xid1 = ((TransactionId) page1) * COMMIT_TS_XACTS_PER_PAGE;
956 1 : xid1 += FirstNormalTransactionId + 1;
957 1 : xid2 = ((TransactionId) page2) * COMMIT_TS_XACTS_PER_PAGE;
958 1 : xid2 += FirstNormalTransactionId + 1;
959 :
960 1 : return (TransactionIdPrecedes(xid1, xid2) &&
961 0 : TransactionIdPrecedes(xid1, xid2 + COMMIT_TS_XACTS_PER_PAGE - 1));
962 : }
963 :
964 : static int
965 0 : commit_ts_errdetail_for_io_error(const void *opaque_data)
966 : {
967 0 : TransactionId xid = *(const TransactionId *) opaque_data;
968 :
969 0 : return errdetail("Could not access commit timestamp of transaction %u.", xid);
970 : }
971 :
972 : /*
973 : * Write a TRUNCATE xlog record
974 : */
975 : static void
976 0 : WriteTruncateXlogRec(int64 pageno, TransactionId oldestXid)
977 : {
978 : xl_commit_ts_truncate xlrec;
979 :
980 0 : xlrec.pageno = pageno;
981 0 : xlrec.oldestXid = oldestXid;
982 :
983 0 : XLogBeginInsert();
984 0 : XLogRegisterData(&xlrec, SizeOfCommitTsTruncate);
985 0 : (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_TRUNCATE);
986 0 : }
987 :
988 : /*
989 : * CommitTS resource manager's routines
990 : */
991 : void
992 0 : commit_ts_redo(XLogReaderState *record)
993 : {
994 0 : uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
995 :
996 : /* Backup blocks are not used in commit_ts records */
997 : Assert(!XLogRecHasAnyBlockRefs(record));
998 :
999 0 : if (info == COMMIT_TS_ZEROPAGE)
1000 : {
1001 : int64 pageno;
1002 :
1003 0 : memcpy(&pageno, XLogRecGetData(record), sizeof(pageno));
1004 0 : SimpleLruZeroAndWritePage(CommitTsCtl, pageno);
1005 : }
1006 0 : else if (info == COMMIT_TS_TRUNCATE)
1007 : {
1008 0 : xl_commit_ts_truncate *trunc = (xl_commit_ts_truncate *) XLogRecGetData(record);
1009 :
1010 0 : AdvanceOldestCommitTsXid(trunc->oldestXid);
1011 :
1012 : /*
1013 : * During XLOG replay, latest_page_number isn't set up yet; insert a
1014 : * suitable value to bypass the sanity test in SimpleLruTruncate.
1015 : */
1016 0 : pg_atomic_write_u64(&CommitTsCtl->shared->latest_page_number,
1017 0 : trunc->pageno);
1018 :
1019 0 : SimpleLruTruncate(CommitTsCtl, trunc->pageno);
1020 : }
1021 : else
1022 0 : elog(PANIC, "commit_ts_redo: unknown op code %u", info);
1023 0 : }
1024 :
1025 : /*
1026 : * Entrypoint for sync.c to sync commit_ts files.
1027 : */
1028 : int
1029 0 : committssyncfiletag(const FileTag *ftag, char *path)
1030 : {
1031 0 : return SlruSyncFileTag(CommitTsCtl, ftag, path);
1032 : }
|