Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * commit_ts.c
4 : * PostgreSQL commit timestamp manager
5 : *
6 : * This module is a pg_xact-like system that stores the commit timestamp
7 : * for each transaction.
8 : *
9 : * XLOG interactions: this module generates an XLOG record whenever a new
10 : * CommitTs page is initialized to zeroes. Other writes of CommitTS come
11 : * from recording of transaction commit in xact.c, which generates its own
12 : * XLOG records for these events and will re-perform the status update on
13 : * redo; so we need make no additional XLOG entry here.
14 : *
15 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
16 : * Portions Copyright (c) 1994, Regents of the University of California
17 : *
18 : * src/backend/access/transam/commit_ts.c
19 : *
20 : *-------------------------------------------------------------------------
21 : */
22 : #include "postgres.h"
23 :
24 : #include "access/commit_ts.h"
25 : #include "access/htup_details.h"
26 : #include "access/slru.h"
27 : #include "access/transam.h"
28 : #include "access/xloginsert.h"
29 : #include "access/xlogutils.h"
30 : #include "funcapi.h"
31 : #include "miscadmin.h"
32 : #include "storage/shmem.h"
33 : #include "storage/subsystems.h"
34 : #include "utils/fmgrprotos.h"
35 : #include "utils/guc_hooks.h"
36 : #include "utils/timestamp.h"
37 :
38 : /*
39 : * Defines for CommitTs page sizes. A page is the same BLCKSZ as is used
40 : * everywhere else in Postgres.
41 : *
42 : * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
43 : * CommitTs page numbering also wraps around at
44 : * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE, and CommitTs segment numbering at
45 : * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT. We need take no
46 : * explicit notice of that fact in this module, except when comparing segment
47 : * and page numbers in TruncateCommitTs (see CommitTsPagePrecedes).
48 : */
49 :
50 : /*
51 : * We need 8+2 bytes per xact. Note that enlarging this struct might mean
52 : * the largest possible file name is more than 5 chars long; see
53 : * SlruScanDirectory.
54 : */
55 : typedef struct CommitTimestampEntry
56 : {
57 : TimestampTz time;
58 : ReplOriginId nodeid;
59 : } CommitTimestampEntry;
60 :
61 : #define SizeOfCommitTimestampEntry (offsetof(CommitTimestampEntry, nodeid) + \
62 : sizeof(ReplOriginId))
63 :
64 : #define COMMIT_TS_XACTS_PER_PAGE \
65 : (BLCKSZ / SizeOfCommitTimestampEntry)
66 :
67 :
68 : /*
69 : * Although we return an int64 the actual value can't currently exceed
70 : * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE.
71 : */
72 : static inline int64
73 2121 : TransactionIdToCTsPage(TransactionId xid)
74 : {
75 2121 : return xid / (int64) COMMIT_TS_XACTS_PER_PAGE;
76 : }
77 :
78 : #define TransactionIdToCTsEntry(xid) \
79 : ((xid) % (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
80 :
81 : /*
82 : * Link to shared-memory data structures for CommitTs control
83 : */
84 : static void CommitTsShmemRequest(void *arg);
85 : static void CommitTsShmemInit(void *arg);
86 : static bool CommitTsPagePrecedes(int64 page1, int64 page2);
87 : static int commit_ts_errdetail_for_io_error(const void *opaque_data);
88 :
89 : const ShmemCallbacks CommitTsShmemCallbacks = {
90 : .request_fn = CommitTsShmemRequest,
91 : .init_fn = CommitTsShmemInit,
92 : };
93 :
94 : static SlruDesc CommitTsSlruDesc;
95 :
96 : #define CommitTsCtl (&CommitTsSlruDesc)
97 :
98 : /*
99 : * We keep a cache of the last value set in shared memory.
100 : *
101 : * This is also good place to keep the activation status. We keep this
102 : * separate from the GUC so that the standby can activate the module if the
103 : * primary has it active independently of the value of the GUC.
104 : *
105 : * This is protected by CommitTsLock. In some places, we use commitTsActive
106 : * without acquiring the lock; where this happens, a comment explains the
107 : * rationale for it.
108 : */
109 : typedef struct CommitTimestampShared
110 : {
111 : TransactionId xidLastCommit;
112 : CommitTimestampEntry dataLastCommit;
113 : bool commitTsActive;
114 : } CommitTimestampShared;
115 :
116 : static CommitTimestampShared *commitTsShared;
117 :
118 : static void CommitTsShmemInit(void *arg);
119 :
120 : /* GUC variable */
121 : bool track_commit_timestamp;
122 :
123 : static void SetXidCommitTsInPage(TransactionId xid, int nsubxids,
124 : TransactionId *subxids, TimestampTz ts,
125 : ReplOriginId nodeid, int64 pageno);
126 : static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
127 : ReplOriginId nodeid, int slotno);
128 : static void error_commit_ts_disabled(void);
129 : static void ActivateCommitTs(void);
130 : static void DeactivateCommitTs(void);
131 : static void WriteTruncateXlogRec(int64 pageno, TransactionId oldestXid);
132 :
133 : /*
134 : * TransactionTreeSetCommitTsData
135 : *
136 : * Record the final commit timestamp of transaction entries in the commit log
137 : * for a transaction and its subtransaction tree, as efficiently as possible.
138 : *
139 : * xid is the top level transaction id.
140 : *
141 : * subxids is an array of xids of length nsubxids, representing subtransactions
142 : * in the tree of xid. In various cases nsubxids may be zero.
143 : * The reason why tracking just the parent xid commit timestamp is not enough
144 : * is that the subtrans SLRU does not stay valid across crashes (it's not
145 : * permanent) so we need to keep the information about them here. If the
146 : * subtrans implementation changes in the future, we might want to revisit the
147 : * decision of storing timestamp info for each subxid.
148 : */
149 : void
150 181803 : TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids,
151 : TransactionId *subxids, TimestampTz timestamp,
152 : ReplOriginId nodeid)
153 : {
154 : int i;
155 : TransactionId headxid;
156 : TransactionId newestXact;
157 :
158 : /*
159 : * No-op if the module is not active.
160 : *
161 : * An unlocked read here is fine, because in a standby (the only place
162 : * where the flag can change in flight) this routine is only called by the
163 : * recovery process, which is also the only process which can change the
164 : * flag.
165 : */
166 181803 : if (!commitTsShared->commitTsActive)
167 180952 : return;
168 :
169 : /*
170 : * Figure out the latest Xid in this batch: either the last subxid if
171 : * there's any, otherwise the parent xid.
172 : */
173 851 : if (nsubxids > 0)
174 0 : newestXact = subxids[nsubxids - 1];
175 : else
176 851 : newestXact = xid;
177 :
178 : /*
179 : * We split the xids to set the timestamp to in groups belonging to the
180 : * same SLRU page; the first element in each such set is its head. The
181 : * first group has the main XID as the head; subsequent sets use the first
182 : * subxid not on the previous page as head. This way, we only have to
183 : * lock/modify each SLRU page once.
184 : */
185 851 : headxid = xid;
186 851 : i = 0;
187 : for (;;)
188 0 : {
189 851 : int64 pageno = TransactionIdToCTsPage(headxid);
190 : int j;
191 :
192 851 : for (j = i; j < nsubxids; j++)
193 : {
194 0 : if (TransactionIdToCTsPage(subxids[j]) != pageno)
195 0 : break;
196 : }
197 : /* subxids[i..j] are on the same page as the head */
198 :
199 851 : SetXidCommitTsInPage(headxid, j - i, subxids + i, timestamp, nodeid,
200 : pageno);
201 :
202 : /* if we wrote out all subxids, we're done. */
203 851 : if (j >= nsubxids)
204 851 : break;
205 :
206 : /*
207 : * Set the new head and skip over it, as well as over the subxids we
208 : * just wrote.
209 : */
210 0 : headxid = subxids[j];
211 0 : i = j + 1;
212 : }
213 :
214 : /* update the cached value in shared memory */
215 851 : LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
216 851 : commitTsShared->xidLastCommit = xid;
217 851 : commitTsShared->dataLastCommit.time = timestamp;
218 851 : commitTsShared->dataLastCommit.nodeid = nodeid;
219 :
220 : /* and move forwards our endpoint, if needed */
221 851 : if (TransactionIdPrecedes(TransamVariables->newestCommitTsXid, newestXact))
222 838 : TransamVariables->newestCommitTsXid = newestXact;
223 851 : LWLockRelease(CommitTsLock);
224 : }
225 :
226 : /*
227 : * Record the commit timestamp of transaction entries in the commit log for all
228 : * entries on a single page. Atomic only on this page.
229 : */
230 : static void
231 851 : SetXidCommitTsInPage(TransactionId xid, int nsubxids,
232 : TransactionId *subxids, TimestampTz ts,
233 : ReplOriginId nodeid, int64 pageno)
234 : {
235 851 : LWLock *lock = SimpleLruGetBankLock(CommitTsCtl, pageno);
236 : int slotno;
237 : int i;
238 :
239 851 : LWLockAcquire(lock, LW_EXCLUSIVE);
240 :
241 851 : slotno = SimpleLruReadPage(CommitTsCtl, pageno, true, &xid);
242 :
243 851 : TransactionIdSetCommitTs(xid, ts, nodeid, slotno);
244 851 : for (i = 0; i < nsubxids; i++)
245 0 : TransactionIdSetCommitTs(subxids[i], ts, nodeid, slotno);
246 :
247 851 : CommitTsCtl->shared->page_dirty[slotno] = true;
248 :
249 851 : LWLockRelease(lock);
250 851 : }
251 :
252 : /*
253 : * Sets the commit timestamp of a single transaction.
254 : *
255 : * Caller must hold the correct SLRU bank lock, will be held at exit
256 : */
257 : static void
258 851 : TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
259 : ReplOriginId nodeid, int slotno)
260 : {
261 851 : int entryno = TransactionIdToCTsEntry(xid);
262 : CommitTimestampEntry entry;
263 :
264 : Assert(TransactionIdIsNormal(xid));
265 :
266 851 : entry.time = ts;
267 851 : entry.nodeid = nodeid;
268 :
269 851 : memcpy(CommitTsCtl->shared->page_buffer[slotno] +
270 851 : SizeOfCommitTimestampEntry * entryno,
271 : &entry, SizeOfCommitTimestampEntry);
272 851 : }
273 :
274 : /*
275 : * Interrogate the commit timestamp of a transaction.
276 : *
277 : * The return value indicates whether a commit timestamp record was found for
278 : * the given xid. The timestamp value is returned in *ts (which may not be
279 : * null), and the origin node for the Xid is returned in *nodeid, if it's not
280 : * null.
281 : */
282 : bool
283 95 : TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
284 : ReplOriginId *nodeid)
285 : {
286 95 : int64 pageno = TransactionIdToCTsPage(xid);
287 95 : int entryno = TransactionIdToCTsEntry(xid);
288 : int slotno;
289 : CommitTimestampEntry entry;
290 : TransactionId oldestCommitTsXid;
291 : TransactionId newestCommitTsXid;
292 :
293 95 : if (!TransactionIdIsValid(xid))
294 3 : ereport(ERROR,
295 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
296 : errmsg("cannot retrieve commit timestamp for transaction %u", xid)));
297 92 : else if (!TransactionIdIsNormal(xid))
298 : {
299 : /* frozen and bootstrap xids are always committed far in the past */
300 6 : *ts = 0;
301 6 : if (nodeid)
302 2 : *nodeid = InvalidReplOriginId;
303 6 : return false;
304 : }
305 :
306 86 : LWLockAcquire(CommitTsLock, LW_SHARED);
307 :
308 : /* Error if module not enabled */
309 86 : if (!commitTsShared->commitTsActive)
310 3 : error_commit_ts_disabled();
311 :
312 : /*
313 : * If we're asked for the cached value, return that. Otherwise, fall
314 : * through to read from SLRU.
315 : */
316 83 : if (commitTsShared->xidLastCommit == xid)
317 : {
318 18 : *ts = commitTsShared->dataLastCommit.time;
319 18 : if (nodeid)
320 10 : *nodeid = commitTsShared->dataLastCommit.nodeid;
321 :
322 18 : LWLockRelease(CommitTsLock);
323 18 : return *ts != 0;
324 : }
325 :
326 65 : oldestCommitTsXid = TransamVariables->oldestCommitTsXid;
327 65 : newestCommitTsXid = TransamVariables->newestCommitTsXid;
328 : /* neither is invalid, or both are */
329 : Assert(TransactionIdIsValid(oldestCommitTsXid) == TransactionIdIsValid(newestCommitTsXid));
330 65 : LWLockRelease(CommitTsLock);
331 :
332 : /*
333 : * Return empty if the requested value is outside our valid range.
334 : */
335 130 : if (!TransactionIdIsValid(oldestCommitTsXid) ||
336 79 : TransactionIdPrecedes(xid, oldestCommitTsXid) ||
337 14 : TransactionIdPrecedes(newestCommitTsXid, xid))
338 : {
339 51 : *ts = 0;
340 51 : if (nodeid)
341 48 : *nodeid = InvalidReplOriginId;
342 51 : return false;
343 : }
344 :
345 : /* lock is acquired by SimpleLruReadPage_ReadOnly */
346 14 : slotno = SimpleLruReadPage_ReadOnly(CommitTsCtl, pageno, &xid);
347 14 : memcpy(&entry,
348 14 : CommitTsCtl->shared->page_buffer[slotno] +
349 14 : SizeOfCommitTimestampEntry * entryno,
350 : SizeOfCommitTimestampEntry);
351 :
352 14 : *ts = entry.time;
353 14 : if (nodeid)
354 7 : *nodeid = entry.nodeid;
355 :
356 14 : LWLockRelease(SimpleLruGetBankLock(CommitTsCtl, pageno));
357 14 : return *ts != 0;
358 : }
359 :
360 : /*
361 : * Return the Xid of the latest committed transaction. (As far as this module
362 : * is concerned, anyway; it's up to the caller to ensure the value is useful
363 : * for its purposes.)
364 : *
365 : * ts and nodeid are filled with the corresponding data; they can be passed
366 : * as NULL if not wanted.
367 : */
368 : TransactionId
369 4 : GetLatestCommitTsData(TimestampTz *ts, ReplOriginId *nodeid)
370 : {
371 : TransactionId xid;
372 :
373 4 : LWLockAcquire(CommitTsLock, LW_SHARED);
374 :
375 : /* Error if module not enabled */
376 4 : if (!commitTsShared->commitTsActive)
377 0 : error_commit_ts_disabled();
378 :
379 4 : xid = commitTsShared->xidLastCommit;
380 4 : if (ts)
381 4 : *ts = commitTsShared->dataLastCommit.time;
382 4 : if (nodeid)
383 4 : *nodeid = commitTsShared->dataLastCommit.nodeid;
384 4 : LWLockRelease(CommitTsLock);
385 :
386 4 : return xid;
387 : }
388 :
389 : static void
390 3 : error_commit_ts_disabled(void)
391 : {
392 3 : ereport(ERROR,
393 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
394 : errmsg("could not get commit timestamp data"),
395 : RecoveryInProgress() ?
396 : errhint("Make sure the configuration parameter \"%s\" is set on the primary server.",
397 : "track_commit_timestamp") :
398 : errhint("Make sure the configuration parameter \"%s\" is set.",
399 : "track_commit_timestamp")));
400 : }
401 :
402 : /*
403 : * SQL-callable wrapper to obtain commit time of a transaction
404 : */
405 : Datum
406 27 : pg_xact_commit_timestamp(PG_FUNCTION_ARGS)
407 : {
408 27 : TransactionId xid = PG_GETARG_TRANSACTIONID(0);
409 : TimestampTz ts;
410 : bool found;
411 :
412 27 : found = TransactionIdGetCommitTsData(xid, &ts, NULL);
413 :
414 22 : if (!found)
415 7 : PG_RETURN_NULL();
416 :
417 15 : PG_RETURN_TIMESTAMPTZ(ts);
418 : }
419 :
420 :
421 : /*
422 : * pg_last_committed_xact
423 : *
424 : * SQL-callable wrapper to obtain some information about the latest
425 : * committed transaction: transaction ID, timestamp and replication
426 : * origin.
427 : */
428 : Datum
429 4 : pg_last_committed_xact(PG_FUNCTION_ARGS)
430 : {
431 : TransactionId xid;
432 : ReplOriginId nodeid;
433 : TimestampTz ts;
434 : Datum values[3];
435 : bool nulls[3];
436 : TupleDesc tupdesc;
437 : HeapTuple htup;
438 :
439 : /* and construct a tuple with our data */
440 4 : xid = GetLatestCommitTsData(&ts, &nodeid);
441 :
442 4 : if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
443 0 : elog(ERROR, "return type must be a row type");
444 :
445 4 : if (!TransactionIdIsNormal(xid))
446 : {
447 0 : memset(nulls, true, sizeof(nulls));
448 : }
449 : else
450 : {
451 4 : values[0] = TransactionIdGetDatum(xid);
452 4 : nulls[0] = false;
453 :
454 4 : values[1] = TimestampTzGetDatum(ts);
455 4 : nulls[1] = false;
456 :
457 4 : values[2] = ObjectIdGetDatum((Oid) nodeid);
458 4 : nulls[2] = false;
459 : }
460 :
461 4 : htup = heap_form_tuple(tupdesc, values, nulls);
462 :
463 4 : PG_RETURN_DATUM(HeapTupleGetDatum(htup));
464 : }
465 :
466 : /*
467 : * pg_xact_commit_timestamp_origin
468 : *
469 : * SQL-callable wrapper to obtain commit timestamp and replication origin
470 : * of a given transaction.
471 : */
472 : Datum
473 5 : pg_xact_commit_timestamp_origin(PG_FUNCTION_ARGS)
474 : {
475 5 : TransactionId xid = PG_GETARG_TRANSACTIONID(0);
476 : ReplOriginId nodeid;
477 : TimestampTz ts;
478 : Datum values[2];
479 : bool nulls[2];
480 : TupleDesc tupdesc;
481 : HeapTuple htup;
482 : bool found;
483 :
484 5 : found = TransactionIdGetCommitTsData(xid, &ts, &nodeid);
485 :
486 4 : if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
487 0 : elog(ERROR, "return type must be a row type");
488 :
489 4 : if (!found)
490 : {
491 2 : memset(nulls, true, sizeof(nulls));
492 : }
493 : else
494 : {
495 2 : values[0] = TimestampTzGetDatum(ts);
496 2 : nulls[0] = false;
497 :
498 2 : values[1] = ObjectIdGetDatum((Oid) nodeid);
499 2 : nulls[1] = false;
500 : }
501 :
502 4 : htup = heap_form_tuple(tupdesc, values, nulls);
503 :
504 4 : PG_RETURN_DATUM(HeapTupleGetDatum(htup));
505 : }
506 :
507 : /*
508 : * Number of shared CommitTS buffers.
509 : *
510 : * If asked to autotune, use 2MB for every 1GB of shared buffers, up to 8MB.
511 : * Otherwise just cap the configured amount to be between 16 and the maximum
512 : * allowed.
513 : */
514 : static int
515 2491 : CommitTsShmemBuffers(void)
516 : {
517 : /* auto-tune based on shared buffers */
518 2491 : if (commit_timestamp_buffers == 0)
519 1243 : return SimpleLruAutotuneBuffers(512, 1024);
520 :
521 1248 : return Min(Max(16, commit_timestamp_buffers), SLRU_MAX_ALLOWED_BUFFERS);
522 : }
523 :
524 : /*
525 : * Register CommitTs shared memory needs at system startup (postmaster start
526 : * or standalone backend)
527 : */
528 : static void
529 1248 : CommitTsShmemRequest(void *arg)
530 : {
531 : /* If auto-tuning is requested, now is the time to do it */
532 1248 : if (commit_timestamp_buffers == 0)
533 : {
534 : char buf[32];
535 :
536 1243 : snprintf(buf, sizeof(buf), "%d", CommitTsShmemBuffers());
537 1243 : SetConfigOption("commit_timestamp_buffers", buf, PGC_POSTMASTER,
538 : PGC_S_DYNAMIC_DEFAULT);
539 :
540 : /*
541 : * We prefer to report this value's source as PGC_S_DYNAMIC_DEFAULT.
542 : * However, if the DBA explicitly set commit_timestamp_buffers = 0 in
543 : * the config file, then PGC_S_DYNAMIC_DEFAULT will fail to override
544 : * that and we must force the matter with PGC_S_OVERRIDE.
545 : */
546 1243 : if (commit_timestamp_buffers == 0) /* failed to apply it? */
547 0 : SetConfigOption("commit_timestamp_buffers", buf, PGC_POSTMASTER,
548 : PGC_S_OVERRIDE);
549 : }
550 : Assert(commit_timestamp_buffers != 0);
551 1248 : SimpleLruRequest(.desc = &CommitTsSlruDesc,
552 : .name = "commit_timestamp",
553 : .Dir = "pg_commit_ts",
554 : .long_segment_names = false,
555 :
556 : .nslots = CommitTsShmemBuffers(),
557 :
558 : .PagePrecedes = CommitTsPagePrecedes,
559 : .errdetail_for_io_error = commit_ts_errdetail_for_io_error,
560 :
561 : .sync_handler = SYNC_HANDLER_COMMIT_TS,
562 : .buffer_tranche_id = LWTRANCHE_COMMITTS_BUFFER,
563 : .bank_tranche_id = LWTRANCHE_COMMITTS_SLRU,
564 : );
565 :
566 1248 : ShmemRequestStruct(.name = "CommitTs shared",
567 : .size = sizeof(CommitTimestampShared),
568 : .ptr = (void **) &commitTsShared,
569 : );
570 1248 : }
571 :
572 : static void
573 1245 : CommitTsShmemInit(void *arg)
574 : {
575 1245 : commitTsShared->xidLastCommit = InvalidTransactionId;
576 1245 : TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
577 1245 : commitTsShared->dataLastCommit.nodeid = InvalidReplOriginId;
578 1245 : commitTsShared->commitTsActive = false;
579 :
580 : SlruPagePrecedesUnitTests(CommitTsCtl, COMMIT_TS_XACTS_PER_PAGE);
581 1245 : }
582 :
583 : /*
584 : * GUC check_hook for commit_timestamp_buffers
585 : */
586 : bool
587 2532 : check_commit_ts_buffers(int *newval, void **extra, GucSource source)
588 : {
589 2532 : return check_slru_buffers("commit_timestamp_buffers", newval);
590 : }
591 :
592 : /*
593 : * This function must be called ONCE on system install.
594 : *
595 : * (The CommitTs directory is assumed to have been created by initdb, and
596 : * CommitTsShmemInit must have been called already.)
597 : */
598 : void
599 57 : BootStrapCommitTs(void)
600 : {
601 : /*
602 : * Nothing to do here at present, unlike most other SLRU modules; segments
603 : * are created when the server is started with this module enabled. See
604 : * ActivateCommitTs.
605 : */
606 57 : }
607 :
608 : /*
609 : * This must be called ONCE during postmaster or standalone-backend startup,
610 : * after StartupXLOG has initialized TransamVariables->nextXid.
611 : */
612 : void
613 14 : StartupCommitTs(void)
614 : {
615 14 : ActivateCommitTs();
616 14 : }
617 :
618 : /*
619 : * This must be called ONCE during postmaster or standalone-backend startup,
620 : * after recovery has finished.
621 : */
622 : void
623 1017 : CompleteCommitTsInitialization(void)
624 : {
625 : /*
626 : * If the feature is not enabled, turn it off for good. This also removes
627 : * any leftover data.
628 : *
629 : * Conversely, we activate the module if the feature is enabled. This is
630 : * necessary for primary and standby as the activation depends on the
631 : * control file contents at the beginning of recovery or when a
632 : * XLOG_PARAMETER_CHANGE is replayed.
633 : */
634 1017 : if (!track_commit_timestamp)
635 995 : DeactivateCommitTs();
636 : else
637 22 : ActivateCommitTs();
638 1017 : }
639 :
640 : /*
641 : * Activate or deactivate CommitTs' upon reception of a XLOG_PARAMETER_CHANGE
642 : * XLog record during recovery.
643 : */
644 : void
645 38 : CommitTsParameterChange(bool newvalue, bool oldvalue)
646 : {
647 : /*
648 : * If the commit_ts module is disabled in this server and we get word from
649 : * the primary server that it is enabled there, activate it so that we can
650 : * replay future WAL records involving it; also mark it as active on
651 : * pg_control. If the old value was already set, we already did this, so
652 : * don't do anything.
653 : *
654 : * If the module is disabled in the primary, disable it here too, unless
655 : * the module is enabled locally.
656 : *
657 : * Note this only runs in the recovery process, so an unlocked read is
658 : * fine.
659 : */
660 38 : if (newvalue)
661 : {
662 2 : if (!commitTsShared->commitTsActive)
663 0 : ActivateCommitTs();
664 : }
665 36 : else if (commitTsShared->commitTsActive)
666 1 : DeactivateCommitTs();
667 38 : }
668 :
669 : /*
670 : * Activate this module whenever necessary.
671 : * This must happen during postmaster or standalone-backend startup,
672 : * or during WAL replay anytime the track_commit_timestamp setting is
673 : * changed in the primary.
674 : *
675 : * The reason why this SLRU needs separate activation/deactivation functions is
676 : * that it can be enabled/disabled during start and the activation/deactivation
677 : * on the primary is propagated to the standby via replay. Other SLRUs don't
678 : * have this property and they can be just initialized during normal startup.
679 : *
680 : * This is in charge of creating the currently active segment, if it's not
681 : * already there. The reason for this is that the server might have been
682 : * running with this module disabled for a while and thus might have skipped
683 : * the normal creation point.
684 : */
685 : static void
686 36 : ActivateCommitTs(void)
687 : {
688 : TransactionId xid;
689 : int64 pageno;
690 :
691 : /*
692 : * During bootstrap, we should not register commit timestamps so skip the
693 : * activation in this case.
694 : */
695 36 : if (IsBootstrapProcessingMode())
696 2 : return;
697 :
698 : /* If we've done this already, there's nothing to do */
699 34 : LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
700 34 : if (commitTsShared->commitTsActive)
701 : {
702 6 : LWLockRelease(CommitTsLock);
703 6 : return;
704 : }
705 28 : LWLockRelease(CommitTsLock);
706 :
707 28 : xid = XidFromFullTransactionId(TransamVariables->nextXid);
708 28 : pageno = TransactionIdToCTsPage(xid);
709 :
710 : /*
711 : * Re-Initialize our idea of the latest page number.
712 : */
713 28 : pg_atomic_write_u64(&CommitTsCtl->shared->latest_page_number, pageno);
714 :
715 : /*
716 : * If CommitTs is enabled, but it wasn't in the previous server run, we
717 : * need to set the oldest and newest values to the next Xid; that way, we
718 : * will not try to read data that might not have been set.
719 : *
720 : * XXX does this have a problem if a server is started with commitTs
721 : * enabled, then started with commitTs disabled, then restarted with it
722 : * enabled again? It doesn't look like it does, because there should be a
723 : * checkpoint that sets the value to InvalidTransactionId at end of
724 : * recovery; and so any chance of injecting new transactions without
725 : * CommitTs values would occur after the oldestCommitTsXid has been set to
726 : * Invalid temporarily.
727 : */
728 28 : LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
729 28 : if (TransamVariables->oldestCommitTsXid == InvalidTransactionId)
730 : {
731 16 : TransamVariables->oldestCommitTsXid =
732 16 : TransamVariables->newestCommitTsXid = ReadNextTransactionId();
733 : }
734 28 : LWLockRelease(CommitTsLock);
735 :
736 : /* Create the current segment file, if necessary */
737 28 : if (!SimpleLruDoesPhysicalPageExist(CommitTsCtl, pageno))
738 14 : SimpleLruZeroAndWritePage(CommitTsCtl, pageno);
739 :
740 : /* Change the activation status in shared memory. */
741 28 : LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
742 28 : commitTsShared->commitTsActive = true;
743 28 : LWLockRelease(CommitTsLock);
744 : }
745 :
746 : /*
747 : * Deactivate this module.
748 : *
749 : * This must be called when the track_commit_timestamp parameter is turned off.
750 : * This happens during postmaster or standalone-backend startup, or during WAL
751 : * replay.
752 : *
753 : * Resets CommitTs into invalid state to make sure we don't hand back
754 : * possibly-invalid data; also removes segments of old data.
755 : */
756 : static void
757 996 : DeactivateCommitTs(void)
758 : {
759 : /*
760 : * Cleanup the status in the shared memory.
761 : *
762 : * We reset everything in the commitTsShared record to prevent user from
763 : * getting confusing data about last committed transaction on the standby
764 : * when the module was activated repeatedly on the primary.
765 : */
766 996 : LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
767 :
768 996 : commitTsShared->commitTsActive = false;
769 996 : commitTsShared->xidLastCommit = InvalidTransactionId;
770 996 : TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
771 996 : commitTsShared->dataLastCommit.nodeid = InvalidReplOriginId;
772 :
773 996 : TransamVariables->oldestCommitTsXid = InvalidTransactionId;
774 996 : TransamVariables->newestCommitTsXid = InvalidTransactionId;
775 :
776 : /*
777 : * Remove *all* files. This is necessary so that there are no leftover
778 : * files; in the case where this feature is later enabled after running
779 : * with it disabled for some time there may be a gap in the file sequence.
780 : * (We can probably tolerate out-of-sequence files, as they are going to
781 : * be overwritten anyway when we wrap around, but it seems better to be
782 : * tidy.)
783 : *
784 : * Note that we do this with CommitTsLock acquired in exclusive mode. This
785 : * is very heavy-handed, but since this routine can only be called in the
786 : * replica and should happen very rarely, we don't worry too much about
787 : * it. Note also that no process should be consulting this SLRU if we
788 : * have just deactivated it.
789 : */
790 996 : (void) SlruScanDirectory(CommitTsCtl, SlruScanDirCbDeleteAll, NULL);
791 :
792 996 : LWLockRelease(CommitTsLock);
793 996 : }
794 :
795 : /*
796 : * Perform a checkpoint --- either during shutdown, or on-the-fly
797 : */
798 : void
799 1942 : CheckPointCommitTs(void)
800 : {
801 : /*
802 : * Write dirty CommitTs pages to disk. This may result in sync requests
803 : * queued for later handling by ProcessSyncRequests(), as part of the
804 : * checkpoint.
805 : */
806 1942 : SimpleLruWriteAll(CommitTsCtl, true);
807 1942 : }
808 :
809 : /*
810 : * Make sure that CommitTs has room for a newly-allocated XID.
811 : *
812 : * NB: this is called while holding XidGenLock. We want it to be very fast
813 : * most of the time; even when it's not so fast, no actual I/O need happen
814 : * unless we're forced to write out a dirty CommitTs or xlog page to make room
815 : * in shared memory.
816 : *
817 : * NB: the current implementation relies on track_commit_timestamp being
818 : * PGC_POSTMASTER.
819 : */
820 : void
821 24543159 : ExtendCommitTs(TransactionId newestXact)
822 : {
823 : int64 pageno;
824 : LWLock *lock;
825 :
826 : /*
827 : * Nothing to do if module not enabled. Note we do an unlocked read of
828 : * the flag here, which is okay because this routine is only called from
829 : * GetNewTransactionId, which is never called in a standby.
830 : */
831 : Assert(!InRecovery);
832 24543159 : if (!commitTsShared->commitTsActive)
833 24542293 : return;
834 :
835 : /*
836 : * No work except at first XID of a page. But beware: just after
837 : * wraparound, the first XID of page zero is FirstNormalTransactionId.
838 : */
839 866 : if (TransactionIdToCTsEntry(newestXact) != 0 &&
840 : !TransactionIdEquals(newestXact, FirstNormalTransactionId))
841 865 : return;
842 :
843 1 : pageno = TransactionIdToCTsPage(newestXact);
844 :
845 1 : lock = SimpleLruGetBankLock(CommitTsCtl, pageno);
846 :
847 1 : LWLockAcquire(lock, LW_EXCLUSIVE);
848 :
849 : /* Zero the page ... */
850 1 : SimpleLruZeroPage(CommitTsCtl, pageno);
851 :
852 : /* and make a WAL entry about that, unless we're in REDO */
853 1 : if (!InRecovery)
854 1 : XLogSimpleInsertInt64(RM_COMMIT_TS_ID, COMMIT_TS_ZEROPAGE, pageno);
855 :
856 1 : LWLockRelease(lock);
857 : }
858 :
859 : /*
860 : * Remove all CommitTs segments before the one holding the passed
861 : * transaction ID.
862 : *
863 : * Note that we don't need to flush XLOG here.
864 : */
865 : void
866 1146 : TruncateCommitTs(TransactionId oldestXact)
867 : {
868 : int64 cutoffPage;
869 :
870 : /*
871 : * The cutoff point is the start of the segment containing oldestXact. We
872 : * pass the *page* containing oldestXact to SimpleLruTruncate.
873 : */
874 1146 : cutoffPage = TransactionIdToCTsPage(oldestXact);
875 :
876 : /* Check to see if there's any files that could be removed */
877 1146 : if (!SlruScanDirectory(CommitTsCtl, SlruScanDirCbReportPresence,
878 : &cutoffPage))
879 1146 : return; /* nothing to remove */
880 :
881 : /* Write XLOG record */
882 0 : WriteTruncateXlogRec(cutoffPage, oldestXact);
883 :
884 : /* Now we can remove the old CommitTs segment(s) */
885 0 : SimpleLruTruncate(CommitTsCtl, cutoffPage);
886 : }
887 :
888 : /*
889 : * Set the limit values between which commit TS can be consulted.
890 : */
891 : void
892 1143 : SetCommitTsLimit(TransactionId oldestXact, TransactionId newestXact)
893 : {
894 : /*
895 : * Be careful not to overwrite values that are either further into the
896 : * "future" or signal a disabled committs.
897 : */
898 1143 : LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
899 1143 : if (TransamVariables->oldestCommitTsXid != InvalidTransactionId)
900 : {
901 0 : if (TransactionIdPrecedes(TransamVariables->oldestCommitTsXid, oldestXact))
902 0 : TransamVariables->oldestCommitTsXid = oldestXact;
903 0 : if (TransactionIdPrecedes(newestXact, TransamVariables->newestCommitTsXid))
904 0 : TransamVariables->newestCommitTsXid = newestXact;
905 : }
906 : else
907 : {
908 : Assert(TransamVariables->newestCommitTsXid == InvalidTransactionId);
909 1143 : TransamVariables->oldestCommitTsXid = oldestXact;
910 1143 : TransamVariables->newestCommitTsXid = newestXact;
911 : }
912 1143 : LWLockRelease(CommitTsLock);
913 1143 : }
914 :
915 : /*
916 : * Move forwards the oldest commitTS value that can be consulted
917 : */
918 : void
919 1146 : AdvanceOldestCommitTsXid(TransactionId oldestXact)
920 : {
921 1146 : LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
922 1147 : if (TransamVariables->oldestCommitTsXid != InvalidTransactionId &&
923 1 : TransactionIdPrecedes(TransamVariables->oldestCommitTsXid, oldestXact))
924 1 : TransamVariables->oldestCommitTsXid = oldestXact;
925 1146 : LWLockRelease(CommitTsLock);
926 1146 : }
927 :
928 :
929 : /*
930 : * Decide whether a commitTS page number is "older" for truncation purposes.
931 : * Analogous to CLOGPagePrecedes().
932 : *
933 : * At default BLCKSZ, (1 << 31) % COMMIT_TS_XACTS_PER_PAGE == 128. This
934 : * introduces differences compared to CLOG and the other SLRUs having (1 <<
935 : * 31) % per_page == 0. This function never tests exactly
936 : * TransactionIdPrecedes(x-2^31, x). When the system reaches xidStopLimit,
937 : * there are two possible counts of page boundaries between oldestXact and the
938 : * latest XID assigned, depending on whether oldestXact is within the first
939 : * 128 entries of its page. Since this function doesn't know the location of
940 : * oldestXact within page2, it returns false for one page that actually is
941 : * expendable. This is a wider (yet still negligible) version of the
942 : * truncation opportunity that CLOGPagePrecedes() cannot recognize.
943 : *
944 : * For the sake of a worked example, number entries with decimal values such
945 : * that page1==1 entries range from 1.0 to 1.999. Let N+0.15 be the number of
946 : * pages that 2^31 entries will span (N is an integer). If oldestXact=N+2.1,
947 : * then the final safe XID assignment leaves newestXact=1.95. We keep page 2,
948 : * because entry=2.85 is the border that toggles whether entries precede the
949 : * last entry of the oldestXact page. While page 2 is expendable at
950 : * oldestXact=N+2.1, it would be precious at oldestXact=N+2.9.
951 : */
952 : static bool
953 1 : CommitTsPagePrecedes(int64 page1, int64 page2)
954 : {
955 : TransactionId xid1;
956 : TransactionId xid2;
957 :
958 1 : xid1 = ((TransactionId) page1) * COMMIT_TS_XACTS_PER_PAGE;
959 1 : xid1 += FirstNormalTransactionId + 1;
960 1 : xid2 = ((TransactionId) page2) * COMMIT_TS_XACTS_PER_PAGE;
961 1 : xid2 += FirstNormalTransactionId + 1;
962 :
963 1 : return (TransactionIdPrecedes(xid1, xid2) &&
964 0 : TransactionIdPrecedes(xid1, xid2 + COMMIT_TS_XACTS_PER_PAGE - 1));
965 : }
966 :
967 : static int
968 0 : commit_ts_errdetail_for_io_error(const void *opaque_data)
969 : {
970 0 : TransactionId xid = *(const TransactionId *) opaque_data;
971 :
972 0 : return errdetail("Could not access commit timestamp of transaction %u.", xid);
973 : }
974 :
975 : /*
976 : * Write a TRUNCATE xlog record
977 : */
978 : static void
979 0 : WriteTruncateXlogRec(int64 pageno, TransactionId oldestXid)
980 : {
981 : xl_commit_ts_truncate xlrec;
982 :
983 0 : xlrec.pageno = pageno;
984 0 : xlrec.oldestXid = oldestXid;
985 :
986 0 : XLogBeginInsert();
987 0 : XLogRegisterData(&xlrec, SizeOfCommitTsTruncate);
988 0 : (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_TRUNCATE);
989 0 : }
990 :
991 : /*
992 : * CommitTS resource manager's routines
993 : */
994 : void
995 0 : commit_ts_redo(XLogReaderState *record)
996 : {
997 0 : uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
998 :
999 : /* Backup blocks are not used in commit_ts records */
1000 : Assert(!XLogRecHasAnyBlockRefs(record));
1001 :
1002 0 : if (info == COMMIT_TS_ZEROPAGE)
1003 : {
1004 : int64 pageno;
1005 :
1006 0 : memcpy(&pageno, XLogRecGetData(record), sizeof(pageno));
1007 0 : SimpleLruZeroAndWritePage(CommitTsCtl, pageno);
1008 : }
1009 0 : else if (info == COMMIT_TS_TRUNCATE)
1010 : {
1011 0 : xl_commit_ts_truncate *trunc = (xl_commit_ts_truncate *) XLogRecGetData(record);
1012 :
1013 0 : AdvanceOldestCommitTsXid(trunc->oldestXid);
1014 :
1015 : /*
1016 : * During XLOG replay, latest_page_number isn't set up yet; insert a
1017 : * suitable value to bypass the sanity test in SimpleLruTruncate.
1018 : */
1019 0 : pg_atomic_write_u64(&CommitTsCtl->shared->latest_page_number,
1020 0 : trunc->pageno);
1021 :
1022 0 : SimpleLruTruncate(CommitTsCtl, trunc->pageno);
1023 : }
1024 : else
1025 0 : elog(PANIC, "commit_ts_redo: unknown op code %u", info);
1026 0 : }
1027 :
1028 : /*
1029 : * Entrypoint for sync.c to sync commit_ts files.
1030 : */
1031 : int
1032 0 : committssyncfiletag(const FileTag *ftag, char *path)
1033 : {
1034 0 : return SlruSyncFileTag(CommitTsCtl, ftag, path);
1035 : }
|