Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * commit_ts.c
4 : * PostgreSQL commit timestamp manager
5 : *
6 : * This module is a pg_xact-like system that stores the commit timestamp
7 : * for each transaction.
8 : *
9 : * XLOG interactions: this module generates an XLOG record whenever a new
10 : * CommitTs page is initialized to zeroes. Other writes of CommitTS come
11 : * from recording of transaction commit in xact.c, which generates its own
12 : * XLOG records for these events and will re-perform the status update on
13 : * redo; so we need make no additional XLOG entry here.
14 : *
15 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
16 : * Portions Copyright (c) 1994, Regents of the University of California
17 : *
18 : * src/backend/access/transam/commit_ts.c
19 : *
20 : *-------------------------------------------------------------------------
21 : */
22 : #include "postgres.h"
23 :
24 : #include "access/commit_ts.h"
25 : #include "access/htup_details.h"
26 : #include "access/slru.h"
27 : #include "access/transam.h"
28 : #include "access/xloginsert.h"
29 : #include "access/xlogutils.h"
30 : #include "funcapi.h"
31 : #include "miscadmin.h"
32 : #include "storage/shmem.h"
33 : #include "utils/fmgrprotos.h"
34 : #include "utils/guc_hooks.h"
35 : #include "utils/timestamp.h"
36 :
37 : /*
38 : * Defines for CommitTs page sizes. A page is the same BLCKSZ as is used
39 : * everywhere else in Postgres.
40 : *
41 : * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
42 : * CommitTs page numbering also wraps around at
43 : * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE, and CommitTs segment numbering at
44 : * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT. We need take no
45 : * explicit notice of that fact in this module, except when comparing segment
46 : * and page numbers in TruncateCommitTs (see CommitTsPagePrecedes).
47 : */
48 :
49 : /*
50 : * We need 8+2 bytes per xact. Note that enlarging this struct might mean
51 : * the largest possible file name is more than 5 chars long; see
52 : * SlruScanDirectory.
53 : */
54 : typedef struct CommitTimestampEntry
55 : {
56 : TimestampTz time;
57 : RepOriginId nodeid;
58 : } CommitTimestampEntry;
59 :
60 : #define SizeOfCommitTimestampEntry (offsetof(CommitTimestampEntry, nodeid) + \
61 : sizeof(RepOriginId))
62 :
63 : #define COMMIT_TS_XACTS_PER_PAGE \
64 : (BLCKSZ / SizeOfCommitTimestampEntry)
65 :
66 :
67 : /*
68 : * Although we return an int64 the actual value can't currently exceed
69 : * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE.
70 : */
71 : static inline int64
72 1398 : TransactionIdToCTsPage(TransactionId xid)
73 : {
74 1398 : return xid / (int64) COMMIT_TS_XACTS_PER_PAGE;
75 : }
76 :
77 : #define TransactionIdToCTsEntry(xid) \
78 : ((xid) % (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
79 :
80 : /*
81 : * Link to shared-memory data structures for CommitTs control
82 : */
83 : static SlruCtlData CommitTsCtlData;
84 :
85 : #define CommitTsCtl (&CommitTsCtlData)
86 :
87 : /*
88 : * We keep a cache of the last value set in shared memory.
89 : *
90 : * This is also good place to keep the activation status. We keep this
91 : * separate from the GUC so that the standby can activate the module if the
92 : * primary has it active independently of the value of the GUC.
93 : *
94 : * This is protected by CommitTsLock. In some places, we use commitTsActive
95 : * without acquiring the lock; where this happens, a comment explains the
96 : * rationale for it.
97 : */
98 : typedef struct CommitTimestampShared
99 : {
100 : TransactionId xidLastCommit;
101 : CommitTimestampEntry dataLastCommit;
102 : bool commitTsActive;
103 : } CommitTimestampShared;
104 :
105 : static CommitTimestampShared *commitTsShared;
106 :
107 :
108 : /* GUC variable */
109 : bool track_commit_timestamp;
110 :
111 : static void SetXidCommitTsInPage(TransactionId xid, int nsubxids,
112 : TransactionId *subxids, TimestampTz ts,
113 : RepOriginId nodeid, int64 pageno);
114 : static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
115 : RepOriginId nodeid, int slotno);
116 : static void error_commit_ts_disabled(void);
117 : static int ZeroCommitTsPage(int64 pageno, bool writeXlog);
118 : static bool CommitTsPagePrecedes(int64 page1, int64 page2);
119 : static void ActivateCommitTs(void);
120 : static void DeactivateCommitTs(void);
121 : static void WriteZeroPageXlogRec(int64 pageno);
122 : static void WriteTruncateXlogRec(int64 pageno, TransactionId oldestXid);
123 :
124 : /*
125 : * TransactionTreeSetCommitTsData
126 : *
127 : * Record the final commit timestamp of transaction entries in the commit log
128 : * for a transaction and its subtransaction tree, as efficiently as possible.
129 : *
130 : * xid is the top level transaction id.
131 : *
132 : * subxids is an array of xids of length nsubxids, representing subtransactions
133 : * in the tree of xid. In various cases nsubxids may be zero.
134 : * The reason why tracking just the parent xid commit timestamp is not enough
135 : * is that the subtrans SLRU does not stay valid across crashes (it's not
136 : * permanent) so we need to keep the information about them here. If the
137 : * subtrans implementation changes in the future, we might want to revisit the
138 : * decision of storing timestamp info for each subxid.
139 : */
140 : void
141 275762 : TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids,
142 : TransactionId *subxids, TimestampTz timestamp,
143 : RepOriginId nodeid)
144 : {
145 : int i;
146 : TransactionId headxid;
147 : TransactionId newestXact;
148 :
149 : /*
150 : * No-op if the module is not active.
151 : *
152 : * An unlocked read here is fine, because in a standby (the only place
153 : * where the flag can change in flight) this routine is only called by the
154 : * recovery process, which is also the only process which can change the
155 : * flag.
156 : */
157 275762 : if (!commitTsShared->commitTsActive)
158 275556 : return;
159 :
160 : /*
161 : * Figure out the latest Xid in this batch: either the last subxid if
162 : * there's any, otherwise the parent xid.
163 : */
164 206 : if (nsubxids > 0)
165 0 : newestXact = subxids[nsubxids - 1];
166 : else
167 206 : newestXact = xid;
168 :
169 : /*
170 : * We split the xids to set the timestamp to in groups belonging to the
171 : * same SLRU page; the first element in each such set is its head. The
172 : * first group has the main XID as the head; subsequent sets use the first
173 : * subxid not on the previous page as head. This way, we only have to
174 : * lock/modify each SLRU page once.
175 : */
176 206 : headxid = xid;
177 206 : i = 0;
178 : for (;;)
179 0 : {
180 206 : int64 pageno = TransactionIdToCTsPage(headxid);
181 : int j;
182 :
183 206 : for (j = i; j < nsubxids; j++)
184 : {
185 0 : if (TransactionIdToCTsPage(subxids[j]) != pageno)
186 0 : break;
187 : }
188 : /* subxids[i..j] are on the same page as the head */
189 :
190 206 : SetXidCommitTsInPage(headxid, j - i, subxids + i, timestamp, nodeid,
191 : pageno);
192 :
193 : /* if we wrote out all subxids, we're done. */
194 206 : if (j >= nsubxids)
195 206 : break;
196 :
197 : /*
198 : * Set the new head and skip over it, as well as over the subxids we
199 : * just wrote.
200 : */
201 0 : headxid = subxids[j];
202 0 : i = j + 1;
203 : }
204 :
205 : /* update the cached value in shared memory */
206 206 : LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
207 206 : commitTsShared->xidLastCommit = xid;
208 206 : commitTsShared->dataLastCommit.time = timestamp;
209 206 : commitTsShared->dataLastCommit.nodeid = nodeid;
210 :
211 : /* and move forwards our endpoint, if needed */
212 206 : if (TransactionIdPrecedes(TransamVariables->newestCommitTsXid, newestXact))
213 180 : TransamVariables->newestCommitTsXid = newestXact;
214 206 : LWLockRelease(CommitTsLock);
215 : }
216 :
217 : /*
218 : * Record the commit timestamp of transaction entries in the commit log for all
219 : * entries on a single page. Atomic only on this page.
220 : */
221 : static void
222 206 : SetXidCommitTsInPage(TransactionId xid, int nsubxids,
223 : TransactionId *subxids, TimestampTz ts,
224 : RepOriginId nodeid, int64 pageno)
225 : {
226 206 : LWLock *lock = SimpleLruGetBankLock(CommitTsCtl, pageno);
227 : int slotno;
228 : int i;
229 :
230 206 : LWLockAcquire(lock, LW_EXCLUSIVE);
231 :
232 206 : slotno = SimpleLruReadPage(CommitTsCtl, pageno, true, xid);
233 :
234 206 : TransactionIdSetCommitTs(xid, ts, nodeid, slotno);
235 206 : for (i = 0; i < nsubxids; i++)
236 0 : TransactionIdSetCommitTs(subxids[i], ts, nodeid, slotno);
237 :
238 206 : CommitTsCtl->shared->page_dirty[slotno] = true;
239 :
240 206 : LWLockRelease(lock);
241 206 : }
242 :
243 : /*
244 : * Sets the commit timestamp of a single transaction.
245 : *
246 : * Caller must hold the correct SLRU bank lock, will be held at exit
247 : */
248 : static void
249 206 : TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
250 : RepOriginId nodeid, int slotno)
251 : {
252 206 : int entryno = TransactionIdToCTsEntry(xid);
253 : CommitTimestampEntry entry;
254 :
255 : Assert(TransactionIdIsNormal(xid));
256 :
257 206 : entry.time = ts;
258 206 : entry.nodeid = nodeid;
259 :
260 206 : memcpy(CommitTsCtl->shared->page_buffer[slotno] +
261 206 : SizeOfCommitTimestampEntry * entryno,
262 : &entry, SizeOfCommitTimestampEntry);
263 206 : }
264 :
265 : /*
266 : * Interrogate the commit timestamp of a transaction.
267 : *
268 : * The return value indicates whether a commit timestamp record was found for
269 : * the given xid. The timestamp value is returned in *ts (which may not be
270 : * null), and the origin node for the Xid is returned in *nodeid, if it's not
271 : * null.
272 : */
273 : bool
274 82 : TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
275 : RepOriginId *nodeid)
276 : {
277 82 : int64 pageno = TransactionIdToCTsPage(xid);
278 82 : int entryno = TransactionIdToCTsEntry(xid);
279 : int slotno;
280 : CommitTimestampEntry entry;
281 : TransactionId oldestCommitTsXid;
282 : TransactionId newestCommitTsXid;
283 :
284 82 : if (!TransactionIdIsValid(xid))
285 6 : ereport(ERROR,
286 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
287 : errmsg("cannot retrieve commit timestamp for transaction %u", xid)));
288 76 : else if (!TransactionIdIsNormal(xid))
289 : {
290 : /* frozen and bootstrap xids are always committed far in the past */
291 12 : *ts = 0;
292 12 : if (nodeid)
293 4 : *nodeid = 0;
294 12 : return false;
295 : }
296 :
297 64 : LWLockAcquire(CommitTsLock, LW_SHARED);
298 :
299 : /* Error if module not enabled */
300 64 : if (!commitTsShared->commitTsActive)
301 6 : error_commit_ts_disabled();
302 :
303 : /*
304 : * If we're asked for the cached value, return that. Otherwise, fall
305 : * through to read from SLRU.
306 : */
307 58 : if (commitTsShared->xidLastCommit == xid)
308 : {
309 30 : *ts = commitTsShared->dataLastCommit.time;
310 30 : if (nodeid)
311 14 : *nodeid = commitTsShared->dataLastCommit.nodeid;
312 :
313 30 : LWLockRelease(CommitTsLock);
314 30 : return *ts != 0;
315 : }
316 :
317 28 : oldestCommitTsXid = TransamVariables->oldestCommitTsXid;
318 28 : newestCommitTsXid = TransamVariables->newestCommitTsXid;
319 : /* neither is invalid, or both are */
320 : Assert(TransactionIdIsValid(oldestCommitTsXid) == TransactionIdIsValid(newestCommitTsXid));
321 28 : LWLockRelease(CommitTsLock);
322 :
323 : /*
324 : * Return empty if the requested value is outside our valid range.
325 : */
326 56 : if (!TransactionIdIsValid(oldestCommitTsXid) ||
327 50 : TransactionIdPrecedes(xid, oldestCommitTsXid) ||
328 22 : TransactionIdPrecedes(newestCommitTsXid, xid))
329 : {
330 6 : *ts = 0;
331 6 : if (nodeid)
332 0 : *nodeid = InvalidRepOriginId;
333 6 : return false;
334 : }
335 :
336 : /* lock is acquired by SimpleLruReadPage_ReadOnly */
337 22 : slotno = SimpleLruReadPage_ReadOnly(CommitTsCtl, pageno, xid);
338 22 : memcpy(&entry,
339 22 : CommitTsCtl->shared->page_buffer[slotno] +
340 22 : SizeOfCommitTimestampEntry * entryno,
341 : SizeOfCommitTimestampEntry);
342 :
343 22 : *ts = entry.time;
344 22 : if (nodeid)
345 8 : *nodeid = entry.nodeid;
346 :
347 22 : LWLockRelease(SimpleLruGetBankLock(CommitTsCtl, pageno));
348 22 : return *ts != 0;
349 : }
350 :
351 : /*
352 : * Return the Xid of the latest committed transaction. (As far as this module
353 : * is concerned, anyway; it's up to the caller to ensure the value is useful
354 : * for its purposes.)
355 : *
356 : * ts and nodeid are filled with the corresponding data; they can be passed
357 : * as NULL if not wanted.
358 : */
359 : TransactionId
360 6 : GetLatestCommitTsData(TimestampTz *ts, RepOriginId *nodeid)
361 : {
362 : TransactionId xid;
363 :
364 6 : LWLockAcquire(CommitTsLock, LW_SHARED);
365 :
366 : /* Error if module not enabled */
367 6 : if (!commitTsShared->commitTsActive)
368 0 : error_commit_ts_disabled();
369 :
370 6 : xid = commitTsShared->xidLastCommit;
371 6 : if (ts)
372 6 : *ts = commitTsShared->dataLastCommit.time;
373 6 : if (nodeid)
374 6 : *nodeid = commitTsShared->dataLastCommit.nodeid;
375 6 : LWLockRelease(CommitTsLock);
376 :
377 6 : return xid;
378 : }
379 :
380 : static void
381 6 : error_commit_ts_disabled(void)
382 : {
383 6 : ereport(ERROR,
384 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
385 : errmsg("could not get commit timestamp data"),
386 : RecoveryInProgress() ?
387 : errhint("Make sure the configuration parameter \"%s\" is set on the primary server.",
388 : "track_commit_timestamp") :
389 : errhint("Make sure the configuration parameter \"%s\" is set.",
390 : "track_commit_timestamp")));
391 : }
392 :
393 : /*
394 : * SQL-callable wrapper to obtain commit time of a transaction
395 : */
396 : Datum
397 54 : pg_xact_commit_timestamp(PG_FUNCTION_ARGS)
398 : {
399 54 : TransactionId xid = PG_GETARG_TRANSACTIONID(0);
400 : TimestampTz ts;
401 : bool found;
402 :
403 54 : found = TransactionIdGetCommitTsData(xid, &ts, NULL);
404 :
405 44 : if (!found)
406 14 : PG_RETURN_NULL();
407 :
408 30 : PG_RETURN_TIMESTAMPTZ(ts);
409 : }
410 :
411 :
412 : /*
413 : * pg_last_committed_xact
414 : *
415 : * SQL-callable wrapper to obtain some information about the latest
416 : * committed transaction: transaction ID, timestamp and replication
417 : * origin.
418 : */
419 : Datum
420 6 : pg_last_committed_xact(PG_FUNCTION_ARGS)
421 : {
422 : TransactionId xid;
423 : RepOriginId nodeid;
424 : TimestampTz ts;
425 : Datum values[3];
426 : bool nulls[3];
427 : TupleDesc tupdesc;
428 : HeapTuple htup;
429 :
430 : /* and construct a tuple with our data */
431 6 : xid = GetLatestCommitTsData(&ts, &nodeid);
432 :
433 6 : if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
434 0 : elog(ERROR, "return type must be a row type");
435 :
436 6 : if (!TransactionIdIsNormal(xid))
437 : {
438 0 : memset(nulls, true, sizeof(nulls));
439 : }
440 : else
441 : {
442 6 : values[0] = TransactionIdGetDatum(xid);
443 6 : nulls[0] = false;
444 :
445 6 : values[1] = TimestampTzGetDatum(ts);
446 6 : nulls[1] = false;
447 :
448 6 : values[2] = ObjectIdGetDatum((Oid) nodeid);
449 6 : nulls[2] = false;
450 : }
451 :
452 6 : htup = heap_form_tuple(tupdesc, values, nulls);
453 :
454 6 : PG_RETURN_DATUM(HeapTupleGetDatum(htup));
455 : }
456 :
457 : /*
458 : * pg_xact_commit_timestamp_origin
459 : *
460 : * SQL-callable wrapper to obtain commit timestamp and replication origin
461 : * of a given transaction.
462 : */
463 : Datum
464 10 : pg_xact_commit_timestamp_origin(PG_FUNCTION_ARGS)
465 : {
466 10 : TransactionId xid = PG_GETARG_TRANSACTIONID(0);
467 : RepOriginId nodeid;
468 : TimestampTz ts;
469 : Datum values[2];
470 : bool nulls[2];
471 : TupleDesc tupdesc;
472 : HeapTuple htup;
473 : bool found;
474 :
475 10 : found = TransactionIdGetCommitTsData(xid, &ts, &nodeid);
476 :
477 8 : if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
478 0 : elog(ERROR, "return type must be a row type");
479 :
480 8 : if (!found)
481 : {
482 4 : memset(nulls, true, sizeof(nulls));
483 : }
484 : else
485 : {
486 4 : values[0] = TimestampTzGetDatum(ts);
487 4 : nulls[0] = false;
488 :
489 4 : values[1] = ObjectIdGetDatum((Oid) nodeid);
490 4 : nulls[1] = false;
491 : }
492 :
493 8 : htup = heap_form_tuple(tupdesc, values, nulls);
494 :
495 8 : PG_RETURN_DATUM(HeapTupleGetDatum(htup));
496 : }
497 :
498 : /*
499 : * Number of shared CommitTS buffers.
500 : *
501 : * If asked to autotune, use 2MB for every 1GB of shared buffers, up to 8MB.
502 : * Otherwise just cap the configured amount to be between 16 and the maximum
503 : * allowed.
504 : */
505 : static int
506 7392 : CommitTsShmemBuffers(void)
507 : {
508 : /* auto-tune based on shared buffers */
509 7392 : if (commit_timestamp_buffers == 0)
510 5464 : return SimpleLruAutotuneBuffers(512, 1024);
511 :
512 1928 : return Min(Max(16, commit_timestamp_buffers), SLRU_MAX_ALLOWED_BUFFERS);
513 : }
514 :
515 : /*
516 : * Shared memory sizing for CommitTs
517 : */
518 : Size
519 3566 : CommitTsShmemSize(void)
520 : {
521 3566 : return SimpleLruShmemSize(CommitTsShmemBuffers(), 0) +
522 : sizeof(CommitTimestampShared);
523 : }
524 :
525 : /*
526 : * Initialize CommitTs at system startup (postmaster start or standalone
527 : * backend)
528 : */
529 : void
530 1918 : CommitTsShmemInit(void)
531 : {
532 : bool found;
533 :
534 : /* If auto-tuning is requested, now is the time to do it */
535 1918 : if (commit_timestamp_buffers == 0)
536 : {
537 : char buf[32];
538 :
539 1908 : snprintf(buf, sizeof(buf), "%d", CommitTsShmemBuffers());
540 1908 : SetConfigOption("commit_timestamp_buffers", buf, PGC_POSTMASTER,
541 : PGC_S_DYNAMIC_DEFAULT);
542 :
543 : /*
544 : * We prefer to report this value's source as PGC_S_DYNAMIC_DEFAULT.
545 : * However, if the DBA explicitly set commit_timestamp_buffers = 0 in
546 : * the config file, then PGC_S_DYNAMIC_DEFAULT will fail to override
547 : * that and we must force the matter with PGC_S_OVERRIDE.
548 : */
549 1908 : if (commit_timestamp_buffers == 0) /* failed to apply it? */
550 0 : SetConfigOption("commit_timestamp_buffers", buf, PGC_POSTMASTER,
551 : PGC_S_OVERRIDE);
552 : }
553 : Assert(commit_timestamp_buffers != 0);
554 :
555 1918 : CommitTsCtl->PagePrecedes = CommitTsPagePrecedes;
556 1918 : SimpleLruInit(CommitTsCtl, "commit_timestamp", CommitTsShmemBuffers(), 0,
557 : "pg_commit_ts", LWTRANCHE_COMMITTS_BUFFER,
558 : LWTRANCHE_COMMITTS_SLRU,
559 : SYNC_HANDLER_COMMIT_TS,
560 : false);
561 : SlruPagePrecedesUnitTests(CommitTsCtl, COMMIT_TS_XACTS_PER_PAGE);
562 :
563 1918 : commitTsShared = ShmemInitStruct("CommitTs shared",
564 : sizeof(CommitTimestampShared),
565 : &found);
566 :
567 1918 : if (!IsUnderPostmaster)
568 : {
569 : Assert(!found);
570 :
571 1918 : commitTsShared->xidLastCommit = InvalidTransactionId;
572 1918 : TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
573 1918 : commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
574 1918 : commitTsShared->commitTsActive = false;
575 : }
576 : else
577 : Assert(found);
578 1918 : }
579 :
580 : /*
581 : * GUC check_hook for commit_timestamp_buffers
582 : */
583 : bool
584 3890 : check_commit_ts_buffers(int *newval, void **extra, GucSource source)
585 : {
586 3890 : return check_slru_buffers("commit_timestamp_buffers", newval);
587 : }
588 :
589 : /*
590 : * This function must be called ONCE on system install.
591 : *
592 : * (The CommitTs directory is assumed to have been created by initdb, and
593 : * CommitTsShmemInit must have been called already.)
594 : */
595 : void
596 90 : BootStrapCommitTs(void)
597 : {
598 : /*
599 : * Nothing to do here at present, unlike most other SLRU modules; segments
600 : * are created when the server is started with this module enabled. See
601 : * ActivateCommitTs.
602 : */
603 90 : }
604 :
605 : /*
606 : * Initialize (or reinitialize) a page of CommitTs to zeroes.
607 : * If writeXlog is true, also emit an XLOG record saying we did this.
608 : *
609 : * The page is not actually written, just set up in shared memory.
610 : * The slot number of the new page is returned.
611 : *
612 : * Control lock must be held at entry, and will be held at exit.
613 : */
614 : static int
615 24 : ZeroCommitTsPage(int64 pageno, bool writeXlog)
616 : {
617 : int slotno;
618 :
619 24 : slotno = SimpleLruZeroPage(CommitTsCtl, pageno);
620 :
621 24 : if (writeXlog)
622 0 : WriteZeroPageXlogRec(pageno);
623 :
624 24 : return slotno;
625 : }
626 :
627 : /*
628 : * This must be called ONCE during postmaster or standalone-backend startup,
629 : * after StartupXLOG has initialized TransamVariables->nextXid.
630 : */
631 : void
632 20 : StartupCommitTs(void)
633 : {
634 20 : ActivateCommitTs();
635 20 : }
636 :
637 : /*
638 : * This must be called ONCE during postmaster or standalone-backend startup,
639 : * after recovery has finished.
640 : */
641 : void
642 1544 : CompleteCommitTsInitialization(void)
643 : {
644 : /*
645 : * If the feature is not enabled, turn it off for good. This also removes
646 : * any leftover data.
647 : *
648 : * Conversely, we activate the module if the feature is enabled. This is
649 : * necessary for primary and standby as the activation depends on the
650 : * control file contents at the beginning of recovery or when a
651 : * XLOG_PARAMETER_CHANGE is replayed.
652 : */
653 1544 : if (!track_commit_timestamp)
654 1510 : DeactivateCommitTs();
655 : else
656 34 : ActivateCommitTs();
657 1544 : }
658 :
659 : /*
660 : * Activate or deactivate CommitTs' upon reception of a XLOG_PARAMETER_CHANGE
661 : * XLog record during recovery.
662 : */
663 : void
664 60 : CommitTsParameterChange(bool newvalue, bool oldvalue)
665 : {
666 : /*
667 : * If the commit_ts module is disabled in this server and we get word from
668 : * the primary server that it is enabled there, activate it so that we can
669 : * replay future WAL records involving it; also mark it as active on
670 : * pg_control. If the old value was already set, we already did this, so
671 : * don't do anything.
672 : *
673 : * If the module is disabled in the primary, disable it here too, unless
674 : * the module is enabled locally.
675 : *
676 : * Note this only runs in the recovery process, so an unlocked read is
677 : * fine.
678 : */
679 60 : if (newvalue)
680 : {
681 4 : if (!commitTsShared->commitTsActive)
682 0 : ActivateCommitTs();
683 : }
684 56 : else if (commitTsShared->commitTsActive)
685 2 : DeactivateCommitTs();
686 60 : }
687 :
688 : /*
689 : * Activate this module whenever necessary.
690 : * This must happen during postmaster or standalone-backend startup,
691 : * or during WAL replay anytime the track_commit_timestamp setting is
692 : * changed in the primary.
693 : *
694 : * The reason why this SLRU needs separate activation/deactivation functions is
695 : * that it can be enabled/disabled during start and the activation/deactivation
696 : * on the primary is propagated to the standby via replay. Other SLRUs don't
697 : * have this property and they can be just initialized during normal startup.
698 : *
699 : * This is in charge of creating the currently active segment, if it's not
700 : * already there. The reason for this is that the server might have been
701 : * running with this module disabled for a while and thus might have skipped
702 : * the normal creation point.
703 : */
704 : static void
705 54 : ActivateCommitTs(void)
706 : {
707 : TransactionId xid;
708 : int64 pageno;
709 :
710 : /* If we've done this already, there's nothing to do */
711 54 : LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
712 54 : if (commitTsShared->commitTsActive)
713 : {
714 6 : LWLockRelease(CommitTsLock);
715 6 : return;
716 : }
717 48 : LWLockRelease(CommitTsLock);
718 :
719 48 : xid = XidFromFullTransactionId(TransamVariables->nextXid);
720 48 : pageno = TransactionIdToCTsPage(xid);
721 :
722 : /*
723 : * Re-Initialize our idea of the latest page number.
724 : */
725 48 : pg_atomic_write_u64(&CommitTsCtl->shared->latest_page_number, pageno);
726 :
727 : /*
728 : * If CommitTs is enabled, but it wasn't in the previous server run, we
729 : * need to set the oldest and newest values to the next Xid; that way, we
730 : * will not try to read data that might not have been set.
731 : *
732 : * XXX does this have a problem if a server is started with commitTs
733 : * enabled, then started with commitTs disabled, then restarted with it
734 : * enabled again? It doesn't look like it does, because there should be a
735 : * checkpoint that sets the value to InvalidTransactionId at end of
736 : * recovery; and so any chance of injecting new transactions without
737 : * CommitTs values would occur after the oldestCommitTsXid has been set to
738 : * Invalid temporarily.
739 : */
740 48 : LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
741 48 : if (TransamVariables->oldestCommitTsXid == InvalidTransactionId)
742 : {
743 30 : TransamVariables->oldestCommitTsXid =
744 30 : TransamVariables->newestCommitTsXid = ReadNextTransactionId();
745 : }
746 48 : LWLockRelease(CommitTsLock);
747 :
748 : /* Create the current segment file, if necessary */
749 48 : if (!SimpleLruDoesPhysicalPageExist(CommitTsCtl, pageno))
750 : {
751 24 : LWLock *lock = SimpleLruGetBankLock(CommitTsCtl, pageno);
752 : int slotno;
753 :
754 24 : LWLockAcquire(lock, LW_EXCLUSIVE);
755 24 : slotno = ZeroCommitTsPage(pageno, false);
756 24 : SimpleLruWritePage(CommitTsCtl, slotno);
757 : Assert(!CommitTsCtl->shared->page_dirty[slotno]);
758 24 : LWLockRelease(lock);
759 : }
760 :
761 : /* Change the activation status in shared memory. */
762 48 : LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
763 48 : commitTsShared->commitTsActive = true;
764 48 : LWLockRelease(CommitTsLock);
765 : }
766 :
767 : /*
768 : * Deactivate this module.
769 : *
770 : * This must be called when the track_commit_timestamp parameter is turned off.
771 : * This happens during postmaster or standalone-backend startup, or during WAL
772 : * replay.
773 : *
774 : * Resets CommitTs into invalid state to make sure we don't hand back
775 : * possibly-invalid data; also removes segments of old data.
776 : */
777 : static void
778 1512 : DeactivateCommitTs(void)
779 : {
780 : /*
781 : * Cleanup the status in the shared memory.
782 : *
783 : * We reset everything in the commitTsShared record to prevent user from
784 : * getting confusing data about last committed transaction on the standby
785 : * when the module was activated repeatedly on the primary.
786 : */
787 1512 : LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
788 :
789 1512 : commitTsShared->commitTsActive = false;
790 1512 : commitTsShared->xidLastCommit = InvalidTransactionId;
791 1512 : TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
792 1512 : commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
793 :
794 1512 : TransamVariables->oldestCommitTsXid = InvalidTransactionId;
795 1512 : TransamVariables->newestCommitTsXid = InvalidTransactionId;
796 :
797 : /*
798 : * Remove *all* files. This is necessary so that there are no leftover
799 : * files; in the case where this feature is later enabled after running
800 : * with it disabled for some time there may be a gap in the file sequence.
801 : * (We can probably tolerate out-of-sequence files, as they are going to
802 : * be overwritten anyway when we wrap around, but it seems better to be
803 : * tidy.)
804 : *
805 : * Note that we do this with CommitTsLock acquired in exclusive mode. This
806 : * is very heavy-handed, but since this routine can only be called in the
807 : * replica and should happen very rarely, we don't worry too much about
808 : * it. Note also that no process should be consulting this SLRU if we
809 : * have just deactivated it.
810 : */
811 1512 : (void) SlruScanDirectory(CommitTsCtl, SlruScanDirCbDeleteAll, NULL);
812 :
813 1512 : LWLockRelease(CommitTsLock);
814 1512 : }
815 :
816 : /*
817 : * Perform a checkpoint --- either during shutdown, or on-the-fly
818 : */
819 : void
820 2476 : CheckPointCommitTs(void)
821 : {
822 : /*
823 : * Write dirty CommitTs pages to disk. This may result in sync requests
824 : * queued for later handling by ProcessSyncRequests(), as part of the
825 : * checkpoint.
826 : */
827 2476 : SimpleLruWriteAll(CommitTsCtl, true);
828 2476 : }
829 :
830 : /*
831 : * Make sure that CommitTs has room for a newly-allocated XID.
832 : *
833 : * NB: this is called while holding XidGenLock. We want it to be very fast
834 : * most of the time; even when it's not so fast, no actual I/O need happen
835 : * unless we're forced to write out a dirty CommitTs or xlog page to make room
836 : * in shared memory.
837 : *
838 : * NB: the current implementation relies on track_commit_timestamp being
839 : * PGC_POSTMASTER.
840 : */
841 : void
842 48977616 : ExtendCommitTs(TransactionId newestXact)
843 : {
844 : int64 pageno;
845 : LWLock *lock;
846 :
847 : /*
848 : * Nothing to do if module not enabled. Note we do an unlocked read of
849 : * the flag here, which is okay because this routine is only called from
850 : * GetNewTransactionId, which is never called in a standby.
851 : */
852 : Assert(!InRecovery);
853 48977616 : if (!commitTsShared->commitTsActive)
854 48977428 : return;
855 :
856 : /*
857 : * No work except at first XID of a page. But beware: just after
858 : * wraparound, the first XID of page zero is FirstNormalTransactionId.
859 : */
860 188 : if (TransactionIdToCTsEntry(newestXact) != 0 &&
861 : !TransactionIdEquals(newestXact, FirstNormalTransactionId))
862 188 : return;
863 :
864 0 : pageno = TransactionIdToCTsPage(newestXact);
865 :
866 0 : lock = SimpleLruGetBankLock(CommitTsCtl, pageno);
867 :
868 0 : LWLockAcquire(lock, LW_EXCLUSIVE);
869 :
870 : /* Zero the page and make an XLOG entry about it */
871 0 : ZeroCommitTsPage(pageno, !InRecovery);
872 :
873 0 : LWLockRelease(lock);
874 : }
875 :
876 : /*
877 : * Remove all CommitTs segments before the one holding the passed
878 : * transaction ID.
879 : *
880 : * Note that we don't need to flush XLOG here.
881 : */
882 : void
883 1062 : TruncateCommitTs(TransactionId oldestXact)
884 : {
885 : int64 cutoffPage;
886 :
887 : /*
888 : * The cutoff point is the start of the segment containing oldestXact. We
889 : * pass the *page* containing oldestXact to SimpleLruTruncate.
890 : */
891 1062 : cutoffPage = TransactionIdToCTsPage(oldestXact);
892 :
893 : /* Check to see if there's any files that could be removed */
894 1062 : if (!SlruScanDirectory(CommitTsCtl, SlruScanDirCbReportPresence,
895 : &cutoffPage))
896 1062 : return; /* nothing to remove */
897 :
898 : /* Write XLOG record */
899 0 : WriteTruncateXlogRec(cutoffPage, oldestXact);
900 :
901 : /* Now we can remove the old CommitTs segment(s) */
902 0 : SimpleLruTruncate(CommitTsCtl, cutoffPage);
903 : }
904 :
905 : /*
906 : * Set the limit values between which commit TS can be consulted.
907 : */
908 : void
909 1740 : SetCommitTsLimit(TransactionId oldestXact, TransactionId newestXact)
910 : {
911 : /*
912 : * Be careful not to overwrite values that are either further into the
913 : * "future" or signal a disabled committs.
914 : */
915 1740 : LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
916 1740 : if (TransamVariables->oldestCommitTsXid != InvalidTransactionId)
917 : {
918 0 : if (TransactionIdPrecedes(TransamVariables->oldestCommitTsXid, oldestXact))
919 0 : TransamVariables->oldestCommitTsXid = oldestXact;
920 0 : if (TransactionIdPrecedes(newestXact, TransamVariables->newestCommitTsXid))
921 0 : TransamVariables->newestCommitTsXid = newestXact;
922 : }
923 : else
924 : {
925 : Assert(TransamVariables->newestCommitTsXid == InvalidTransactionId);
926 1740 : TransamVariables->oldestCommitTsXid = oldestXact;
927 1740 : TransamVariables->newestCommitTsXid = newestXact;
928 : }
929 1740 : LWLockRelease(CommitTsLock);
930 1740 : }
931 :
932 : /*
933 : * Move forwards the oldest commitTS value that can be consulted
934 : */
935 : void
936 1062 : AdvanceOldestCommitTsXid(TransactionId oldestXact)
937 : {
938 1062 : LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
939 1062 : if (TransamVariables->oldestCommitTsXid != InvalidTransactionId &&
940 0 : TransactionIdPrecedes(TransamVariables->oldestCommitTsXid, oldestXact))
941 0 : TransamVariables->oldestCommitTsXid = oldestXact;
942 1062 : LWLockRelease(CommitTsLock);
943 1062 : }
944 :
945 :
946 : /*
947 : * Decide whether a commitTS page number is "older" for truncation purposes.
948 : * Analogous to CLOGPagePrecedes().
949 : *
950 : * At default BLCKSZ, (1 << 31) % COMMIT_TS_XACTS_PER_PAGE == 128. This
951 : * introduces differences compared to CLOG and the other SLRUs having (1 <<
952 : * 31) % per_page == 0. This function never tests exactly
953 : * TransactionIdPrecedes(x-2^31, x). When the system reaches xidStopLimit,
954 : * there are two possible counts of page boundaries between oldestXact and the
955 : * latest XID assigned, depending on whether oldestXact is within the first
956 : * 128 entries of its page. Since this function doesn't know the location of
957 : * oldestXact within page2, it returns false for one page that actually is
958 : * expendable. This is a wider (yet still negligible) version of the
959 : * truncation opportunity that CLOGPagePrecedes() cannot recognize.
960 : *
961 : * For the sake of a worked example, number entries with decimal values such
962 : * that page1==1 entries range from 1.0 to 1.999. Let N+0.15 be the number of
963 : * pages that 2^31 entries will span (N is an integer). If oldestXact=N+2.1,
964 : * then the final safe XID assignment leaves newestXact=1.95. We keep page 2,
965 : * because entry=2.85 is the border that toggles whether entries precede the
966 : * last entry of the oldestXact page. While page 2 is expendable at
967 : * oldestXact=N+2.1, it would be precious at oldestXact=N+2.9.
968 : */
969 : static bool
970 0 : CommitTsPagePrecedes(int64 page1, int64 page2)
971 : {
972 : TransactionId xid1;
973 : TransactionId xid2;
974 :
975 0 : xid1 = ((TransactionId) page1) * COMMIT_TS_XACTS_PER_PAGE;
976 0 : xid1 += FirstNormalTransactionId + 1;
977 0 : xid2 = ((TransactionId) page2) * COMMIT_TS_XACTS_PER_PAGE;
978 0 : xid2 += FirstNormalTransactionId + 1;
979 :
980 0 : return (TransactionIdPrecedes(xid1, xid2) &&
981 0 : TransactionIdPrecedes(xid1, xid2 + COMMIT_TS_XACTS_PER_PAGE - 1));
982 : }
983 :
984 :
985 : /*
986 : * Write a ZEROPAGE xlog record
987 : */
988 : static void
989 0 : WriteZeroPageXlogRec(int64 pageno)
990 : {
991 0 : XLogBeginInsert();
992 0 : XLogRegisterData((char *) (&pageno), sizeof(pageno));
993 0 : (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_ZEROPAGE);
994 0 : }
995 :
996 : /*
997 : * Write a TRUNCATE xlog record
998 : */
999 : static void
1000 0 : WriteTruncateXlogRec(int64 pageno, TransactionId oldestXid)
1001 : {
1002 : xl_commit_ts_truncate xlrec;
1003 :
1004 0 : xlrec.pageno = pageno;
1005 0 : xlrec.oldestXid = oldestXid;
1006 :
1007 0 : XLogBeginInsert();
1008 0 : XLogRegisterData((char *) (&xlrec), SizeOfCommitTsTruncate);
1009 0 : (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_TRUNCATE);
1010 0 : }
1011 :
1012 : /*
1013 : * CommitTS resource manager's routines
1014 : */
1015 : void
1016 0 : commit_ts_redo(XLogReaderState *record)
1017 : {
1018 0 : uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
1019 :
1020 : /* Backup blocks are not used in commit_ts records */
1021 : Assert(!XLogRecHasAnyBlockRefs(record));
1022 :
1023 0 : if (info == COMMIT_TS_ZEROPAGE)
1024 : {
1025 : int64 pageno;
1026 : int slotno;
1027 : LWLock *lock;
1028 :
1029 0 : memcpy(&pageno, XLogRecGetData(record), sizeof(pageno));
1030 :
1031 0 : lock = SimpleLruGetBankLock(CommitTsCtl, pageno);
1032 0 : LWLockAcquire(lock, LW_EXCLUSIVE);
1033 :
1034 0 : slotno = ZeroCommitTsPage(pageno, false);
1035 0 : SimpleLruWritePage(CommitTsCtl, slotno);
1036 : Assert(!CommitTsCtl->shared->page_dirty[slotno]);
1037 :
1038 0 : LWLockRelease(lock);
1039 : }
1040 0 : else if (info == COMMIT_TS_TRUNCATE)
1041 : {
1042 0 : xl_commit_ts_truncate *trunc = (xl_commit_ts_truncate *) XLogRecGetData(record);
1043 :
1044 0 : AdvanceOldestCommitTsXid(trunc->oldestXid);
1045 :
1046 : /*
1047 : * During XLOG replay, latest_page_number isn't set up yet; insert a
1048 : * suitable value to bypass the sanity test in SimpleLruTruncate.
1049 : */
1050 0 : pg_atomic_write_u64(&CommitTsCtl->shared->latest_page_number,
1051 0 : trunc->pageno);
1052 :
1053 0 : SimpleLruTruncate(CommitTsCtl, trunc->pageno);
1054 : }
1055 : else
1056 0 : elog(PANIC, "commit_ts_redo: unknown op code %u", info);
1057 0 : }
1058 :
1059 : /*
1060 : * Entrypoint for sync.c to sync commit_ts files.
1061 : */
1062 : int
1063 0 : committssyncfiletag(const FileTag *ftag, char *path)
1064 : {
1065 0 : return SlruSyncFileTag(CommitTsCtl, ftag, path);
1066 : }
|