LCOV - code coverage report
Current view: top level - src/backend/access/transam - commit_ts.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 83.3 % 269 224
Test Date: 2026-05-05 12:17:12 Functions: 86.7 % 30 26
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * commit_ts.c
       4              :  *      PostgreSQL commit timestamp manager
       5              :  *
       6              :  * This module is a pg_xact-like system that stores the commit timestamp
       7              :  * for each transaction.
       8              :  *
       9              :  * XLOG interactions: this module generates an XLOG record whenever a new
      10              :  * CommitTs page is initialized to zeroes.  Other writes of CommitTS come
      11              :  * from recording of transaction commit in xact.c, which generates its own
      12              :  * XLOG records for these events and will re-perform the status update on
      13              :  * redo; so we need make no additional XLOG entry here.
      14              :  *
      15              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
      16              :  * Portions Copyright (c) 1994, Regents of the University of California
      17              :  *
      18              :  * src/backend/access/transam/commit_ts.c
      19              :  *
      20              :  *-------------------------------------------------------------------------
      21              :  */
      22              : #include "postgres.h"
      23              : 
      24              : #include "access/commit_ts.h"
      25              : #include "access/htup_details.h"
      26              : #include "access/slru.h"
      27              : #include "access/transam.h"
      28              : #include "access/xloginsert.h"
      29              : #include "access/xlogutils.h"
      30              : #include "funcapi.h"
      31              : #include "miscadmin.h"
      32              : #include "storage/shmem.h"
      33              : #include "storage/subsystems.h"
      34              : #include "utils/fmgrprotos.h"
      35              : #include "utils/guc_hooks.h"
      36              : #include "utils/timestamp.h"
      37              : 
      38              : /*
      39              :  * Defines for CommitTs page sizes.  A page is the same BLCKSZ as is used
      40              :  * everywhere else in Postgres.
      41              :  *
      42              :  * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
      43              :  * CommitTs page numbering also wraps around at
      44              :  * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE, and CommitTs segment numbering at
      45              :  * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT.  We need take no
      46              :  * explicit notice of that fact in this module, except when comparing segment
      47              :  * and page numbers in TruncateCommitTs (see CommitTsPagePrecedes).
      48              :  */
      49              : 
      50              : /*
      51              :  * We need 8+2 bytes per xact.  Note that enlarging this struct might mean
      52              :  * the largest possible file name is more than 5 chars long; see
      53              :  * SlruScanDirectory.
      54              :  */
      55              : typedef struct CommitTimestampEntry
      56              : {
      57              :     TimestampTz time;
      58              :     ReplOriginId nodeid;
      59              : } CommitTimestampEntry;
      60              : 
      61              : #define SizeOfCommitTimestampEntry (offsetof(CommitTimestampEntry, nodeid) + \
      62              :                                     sizeof(ReplOriginId))
      63              : 
      64              : #define COMMIT_TS_XACTS_PER_PAGE \
      65              :     (BLCKSZ / SizeOfCommitTimestampEntry)
      66              : 
      67              : 
      68              : /*
      69              :  * Although we return an int64 the actual value can't currently exceed
      70              :  * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE.
      71              :  */
      72              : static inline int64
      73         2121 : TransactionIdToCTsPage(TransactionId xid)
      74              : {
      75         2121 :     return xid / (int64) COMMIT_TS_XACTS_PER_PAGE;
      76              : }
      77              : 
      78              : #define TransactionIdToCTsEntry(xid)    \
      79              :     ((xid) % (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
      80              : 
      81              : /*
      82              :  * Link to shared-memory data structures for CommitTs control
      83              :  */
      84              : static void CommitTsShmemRequest(void *arg);
      85              : static void CommitTsShmemInit(void *arg);
      86              : static bool CommitTsPagePrecedes(int64 page1, int64 page2);
      87              : static int  commit_ts_errdetail_for_io_error(const void *opaque_data);
      88              : 
      89              : const ShmemCallbacks CommitTsShmemCallbacks = {
      90              :     .request_fn = CommitTsShmemRequest,
      91              :     .init_fn = CommitTsShmemInit,
      92              : };
      93              : 
      94              : static SlruDesc CommitTsSlruDesc;
      95              : 
      96              : #define CommitTsCtl (&CommitTsSlruDesc)
      97              : 
      98              : /*
      99              :  * We keep a cache of the last value set in shared memory.
     100              :  *
     101              :  * This is also good place to keep the activation status.  We keep this
     102              :  * separate from the GUC so that the standby can activate the module if the
     103              :  * primary has it active independently of the value of the GUC.
     104              :  *
     105              :  * This is protected by CommitTsLock.  In some places, we use commitTsActive
     106              :  * without acquiring the lock; where this happens, a comment explains the
     107              :  * rationale for it.
     108              :  */
     109              : typedef struct CommitTimestampShared
     110              : {
     111              :     TransactionId xidLastCommit;
     112              :     CommitTimestampEntry dataLastCommit;
     113              :     bool        commitTsActive;
     114              : } CommitTimestampShared;
     115              : 
     116              : static CommitTimestampShared *commitTsShared;
     117              : 
     118              : static void CommitTsShmemInit(void *arg);
     119              : 
     120              : /* GUC variable */
     121              : bool        track_commit_timestamp;
     122              : 
     123              : static void SetXidCommitTsInPage(TransactionId xid, int nsubxids,
     124              :                                  TransactionId *subxids, TimestampTz ts,
     125              :                                  ReplOriginId nodeid, int64 pageno);
     126              : static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
     127              :                                      ReplOriginId nodeid, int slotno);
     128              : static void error_commit_ts_disabled(void);
     129              : static void ActivateCommitTs(void);
     130              : static void DeactivateCommitTs(void);
     131              : static void WriteTruncateXlogRec(int64 pageno, TransactionId oldestXid);
     132              : 
     133              : /*
     134              :  * TransactionTreeSetCommitTsData
     135              :  *
     136              :  * Record the final commit timestamp of transaction entries in the commit log
     137              :  * for a transaction and its subtransaction tree, as efficiently as possible.
     138              :  *
     139              :  * xid is the top level transaction id.
     140              :  *
     141              :  * subxids is an array of xids of length nsubxids, representing subtransactions
     142              :  * in the tree of xid. In various cases nsubxids may be zero.
     143              :  * The reason why tracking just the parent xid commit timestamp is not enough
     144              :  * is that the subtrans SLRU does not stay valid across crashes (it's not
     145              :  * permanent) so we need to keep the information about them here. If the
     146              :  * subtrans implementation changes in the future, we might want to revisit the
     147              :  * decision of storing timestamp info for each subxid.
     148              :  */
     149              : void
     150       181803 : TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids,
     151              :                                TransactionId *subxids, TimestampTz timestamp,
     152              :                                ReplOriginId nodeid)
     153              : {
     154              :     int         i;
     155              :     TransactionId headxid;
     156              :     TransactionId newestXact;
     157              : 
     158              :     /*
     159              :      * No-op if the module is not active.
     160              :      *
     161              :      * An unlocked read here is fine, because in a standby (the only place
     162              :      * where the flag can change in flight) this routine is only called by the
     163              :      * recovery process, which is also the only process which can change the
     164              :      * flag.
     165              :      */
     166       181803 :     if (!commitTsShared->commitTsActive)
     167       180952 :         return;
     168              : 
     169              :     /*
     170              :      * Figure out the latest Xid in this batch: either the last subxid if
     171              :      * there's any, otherwise the parent xid.
     172              :      */
     173          851 :     if (nsubxids > 0)
     174            0 :         newestXact = subxids[nsubxids - 1];
     175              :     else
     176          851 :         newestXact = xid;
     177              : 
     178              :     /*
     179              :      * We split the xids to set the timestamp to in groups belonging to the
     180              :      * same SLRU page; the first element in each such set is its head.  The
     181              :      * first group has the main XID as the head; subsequent sets use the first
     182              :      * subxid not on the previous page as head.  This way, we only have to
     183              :      * lock/modify each SLRU page once.
     184              :      */
     185          851 :     headxid = xid;
     186          851 :     i = 0;
     187              :     for (;;)
     188            0 :     {
     189          851 :         int64       pageno = TransactionIdToCTsPage(headxid);
     190              :         int         j;
     191              : 
     192          851 :         for (j = i; j < nsubxids; j++)
     193              :         {
     194            0 :             if (TransactionIdToCTsPage(subxids[j]) != pageno)
     195            0 :                 break;
     196              :         }
     197              :         /* subxids[i..j] are on the same page as the head */
     198              : 
     199          851 :         SetXidCommitTsInPage(headxid, j - i, subxids + i, timestamp, nodeid,
     200              :                              pageno);
     201              : 
     202              :         /* if we wrote out all subxids, we're done. */
     203          851 :         if (j >= nsubxids)
     204          851 :             break;
     205              : 
     206              :         /*
     207              :          * Set the new head and skip over it, as well as over the subxids we
     208              :          * just wrote.
     209              :          */
     210            0 :         headxid = subxids[j];
     211            0 :         i = j + 1;
     212              :     }
     213              : 
     214              :     /* update the cached value in shared memory */
     215          851 :     LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
     216          851 :     commitTsShared->xidLastCommit = xid;
     217          851 :     commitTsShared->dataLastCommit.time = timestamp;
     218          851 :     commitTsShared->dataLastCommit.nodeid = nodeid;
     219              : 
     220              :     /* and move forwards our endpoint, if needed */
     221          851 :     if (TransactionIdPrecedes(TransamVariables->newestCommitTsXid, newestXact))
     222          838 :         TransamVariables->newestCommitTsXid = newestXact;
     223          851 :     LWLockRelease(CommitTsLock);
     224              : }
     225              : 
     226              : /*
     227              :  * Record the commit timestamp of transaction entries in the commit log for all
     228              :  * entries on a single page.  Atomic only on this page.
     229              :  */
     230              : static void
     231          851 : SetXidCommitTsInPage(TransactionId xid, int nsubxids,
     232              :                      TransactionId *subxids, TimestampTz ts,
     233              :                      ReplOriginId nodeid, int64 pageno)
     234              : {
     235          851 :     LWLock     *lock = SimpleLruGetBankLock(CommitTsCtl, pageno);
     236              :     int         slotno;
     237              :     int         i;
     238              : 
     239          851 :     LWLockAcquire(lock, LW_EXCLUSIVE);
     240              : 
     241          851 :     slotno = SimpleLruReadPage(CommitTsCtl, pageno, true, &xid);
     242              : 
     243          851 :     TransactionIdSetCommitTs(xid, ts, nodeid, slotno);
     244          851 :     for (i = 0; i < nsubxids; i++)
     245            0 :         TransactionIdSetCommitTs(subxids[i], ts, nodeid, slotno);
     246              : 
     247          851 :     CommitTsCtl->shared->page_dirty[slotno] = true;
     248              : 
     249          851 :     LWLockRelease(lock);
     250          851 : }
     251              : 
     252              : /*
     253              :  * Sets the commit timestamp of a single transaction.
     254              :  *
     255              :  * Caller must hold the correct SLRU bank lock, will be held at exit
     256              :  */
     257              : static void
     258          851 : TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
     259              :                          ReplOriginId nodeid, int slotno)
     260              : {
     261          851 :     int         entryno = TransactionIdToCTsEntry(xid);
     262              :     CommitTimestampEntry entry;
     263              : 
     264              :     Assert(TransactionIdIsNormal(xid));
     265              : 
     266          851 :     entry.time = ts;
     267          851 :     entry.nodeid = nodeid;
     268              : 
     269          851 :     memcpy(CommitTsCtl->shared->page_buffer[slotno] +
     270          851 :            SizeOfCommitTimestampEntry * entryno,
     271              :            &entry, SizeOfCommitTimestampEntry);
     272          851 : }
     273              : 
     274              : /*
     275              :  * Interrogate the commit timestamp of a transaction.
     276              :  *
     277              :  * The return value indicates whether a commit timestamp record was found for
     278              :  * the given xid.  The timestamp value is returned in *ts (which may not be
     279              :  * null), and the origin node for the Xid is returned in *nodeid, if it's not
     280              :  * null.
     281              :  */
     282              : bool
     283           95 : TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
     284              :                              ReplOriginId *nodeid)
     285              : {
     286           95 :     int64       pageno = TransactionIdToCTsPage(xid);
     287           95 :     int         entryno = TransactionIdToCTsEntry(xid);
     288              :     int         slotno;
     289              :     CommitTimestampEntry entry;
     290              :     TransactionId oldestCommitTsXid;
     291              :     TransactionId newestCommitTsXid;
     292              : 
     293           95 :     if (!TransactionIdIsValid(xid))
     294            3 :         ereport(ERROR,
     295              :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     296              :                  errmsg("cannot retrieve commit timestamp for transaction %u", xid)));
     297           92 :     else if (!TransactionIdIsNormal(xid))
     298              :     {
     299              :         /* frozen and bootstrap xids are always committed far in the past */
     300            6 :         *ts = 0;
     301            6 :         if (nodeid)
     302            2 :             *nodeid = InvalidReplOriginId;
     303            6 :         return false;
     304              :     }
     305              : 
     306           86 :     LWLockAcquire(CommitTsLock, LW_SHARED);
     307              : 
     308              :     /* Error if module not enabled */
     309           86 :     if (!commitTsShared->commitTsActive)
     310            3 :         error_commit_ts_disabled();
     311              : 
     312              :     /*
     313              :      * If we're asked for the cached value, return that.  Otherwise, fall
     314              :      * through to read from SLRU.
     315              :      */
     316           83 :     if (commitTsShared->xidLastCommit == xid)
     317              :     {
     318           18 :         *ts = commitTsShared->dataLastCommit.time;
     319           18 :         if (nodeid)
     320           10 :             *nodeid = commitTsShared->dataLastCommit.nodeid;
     321              : 
     322           18 :         LWLockRelease(CommitTsLock);
     323           18 :         return *ts != 0;
     324              :     }
     325              : 
     326           65 :     oldestCommitTsXid = TransamVariables->oldestCommitTsXid;
     327           65 :     newestCommitTsXid = TransamVariables->newestCommitTsXid;
     328              :     /* neither is invalid, or both are */
     329              :     Assert(TransactionIdIsValid(oldestCommitTsXid) == TransactionIdIsValid(newestCommitTsXid));
     330           65 :     LWLockRelease(CommitTsLock);
     331              : 
     332              :     /*
     333              :      * Return empty if the requested value is outside our valid range.
     334              :      */
     335          130 :     if (!TransactionIdIsValid(oldestCommitTsXid) ||
     336           79 :         TransactionIdPrecedes(xid, oldestCommitTsXid) ||
     337           14 :         TransactionIdPrecedes(newestCommitTsXid, xid))
     338              :     {
     339           51 :         *ts = 0;
     340           51 :         if (nodeid)
     341           48 :             *nodeid = InvalidReplOriginId;
     342           51 :         return false;
     343              :     }
     344              : 
     345              :     /* lock is acquired by SimpleLruReadPage_ReadOnly */
     346           14 :     slotno = SimpleLruReadPage_ReadOnly(CommitTsCtl, pageno, &xid);
     347           14 :     memcpy(&entry,
     348           14 :            CommitTsCtl->shared->page_buffer[slotno] +
     349           14 :            SizeOfCommitTimestampEntry * entryno,
     350              :            SizeOfCommitTimestampEntry);
     351              : 
     352           14 :     *ts = entry.time;
     353           14 :     if (nodeid)
     354            7 :         *nodeid = entry.nodeid;
     355              : 
     356           14 :     LWLockRelease(SimpleLruGetBankLock(CommitTsCtl, pageno));
     357           14 :     return *ts != 0;
     358              : }
     359              : 
     360              : /*
     361              :  * Return the Xid of the latest committed transaction.  (As far as this module
     362              :  * is concerned, anyway; it's up to the caller to ensure the value is useful
     363              :  * for its purposes.)
     364              :  *
     365              :  * ts and nodeid are filled with the corresponding data; they can be passed
     366              :  * as NULL if not wanted.
     367              :  */
     368              : TransactionId
     369            4 : GetLatestCommitTsData(TimestampTz *ts, ReplOriginId *nodeid)
     370              : {
     371              :     TransactionId xid;
     372              : 
     373            4 :     LWLockAcquire(CommitTsLock, LW_SHARED);
     374              : 
     375              :     /* Error if module not enabled */
     376            4 :     if (!commitTsShared->commitTsActive)
     377            0 :         error_commit_ts_disabled();
     378              : 
     379            4 :     xid = commitTsShared->xidLastCommit;
     380            4 :     if (ts)
     381            4 :         *ts = commitTsShared->dataLastCommit.time;
     382            4 :     if (nodeid)
     383            4 :         *nodeid = commitTsShared->dataLastCommit.nodeid;
     384            4 :     LWLockRelease(CommitTsLock);
     385              : 
     386            4 :     return xid;
     387              : }
     388              : 
     389              : static void
     390            3 : error_commit_ts_disabled(void)
     391              : {
     392            3 :     ereport(ERROR,
     393              :             (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     394              :              errmsg("could not get commit timestamp data"),
     395              :              RecoveryInProgress() ?
     396              :              errhint("Make sure the configuration parameter \"%s\" is set on the primary server.",
     397              :                      "track_commit_timestamp") :
     398              :              errhint("Make sure the configuration parameter \"%s\" is set.",
     399              :                      "track_commit_timestamp")));
     400              : }
     401              : 
     402              : /*
     403              :  * SQL-callable wrapper to obtain commit time of a transaction
     404              :  */
     405              : Datum
     406           27 : pg_xact_commit_timestamp(PG_FUNCTION_ARGS)
     407              : {
     408           27 :     TransactionId xid = PG_GETARG_TRANSACTIONID(0);
     409              :     TimestampTz ts;
     410              :     bool        found;
     411              : 
     412           27 :     found = TransactionIdGetCommitTsData(xid, &ts, NULL);
     413              : 
     414           22 :     if (!found)
     415            7 :         PG_RETURN_NULL();
     416              : 
     417           15 :     PG_RETURN_TIMESTAMPTZ(ts);
     418              : }
     419              : 
     420              : 
     421              : /*
     422              :  * pg_last_committed_xact
     423              :  *
     424              :  * SQL-callable wrapper to obtain some information about the latest
     425              :  * committed transaction: transaction ID, timestamp and replication
     426              :  * origin.
     427              :  */
     428              : Datum
     429            4 : pg_last_committed_xact(PG_FUNCTION_ARGS)
     430              : {
     431              :     TransactionId xid;
     432              :     ReplOriginId nodeid;
     433              :     TimestampTz ts;
     434              :     Datum       values[3];
     435              :     bool        nulls[3];
     436              :     TupleDesc   tupdesc;
     437              :     HeapTuple   htup;
     438              : 
     439              :     /* and construct a tuple with our data */
     440            4 :     xid = GetLatestCommitTsData(&ts, &nodeid);
     441              : 
     442            4 :     if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
     443            0 :         elog(ERROR, "return type must be a row type");
     444              : 
     445            4 :     if (!TransactionIdIsNormal(xid))
     446              :     {
     447            0 :         memset(nulls, true, sizeof(nulls));
     448              :     }
     449              :     else
     450              :     {
     451            4 :         values[0] = TransactionIdGetDatum(xid);
     452            4 :         nulls[0] = false;
     453              : 
     454            4 :         values[1] = TimestampTzGetDatum(ts);
     455            4 :         nulls[1] = false;
     456              : 
     457            4 :         values[2] = ObjectIdGetDatum((Oid) nodeid);
     458            4 :         nulls[2] = false;
     459              :     }
     460              : 
     461            4 :     htup = heap_form_tuple(tupdesc, values, nulls);
     462              : 
     463            4 :     PG_RETURN_DATUM(HeapTupleGetDatum(htup));
     464              : }
     465              : 
     466              : /*
     467              :  * pg_xact_commit_timestamp_origin
     468              :  *
     469              :  * SQL-callable wrapper to obtain commit timestamp and replication origin
     470              :  * of a given transaction.
     471              :  */
     472              : Datum
     473            5 : pg_xact_commit_timestamp_origin(PG_FUNCTION_ARGS)
     474              : {
     475            5 :     TransactionId xid = PG_GETARG_TRANSACTIONID(0);
     476              :     ReplOriginId nodeid;
     477              :     TimestampTz ts;
     478              :     Datum       values[2];
     479              :     bool        nulls[2];
     480              :     TupleDesc   tupdesc;
     481              :     HeapTuple   htup;
     482              :     bool        found;
     483              : 
     484            5 :     found = TransactionIdGetCommitTsData(xid, &ts, &nodeid);
     485              : 
     486            4 :     if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
     487            0 :         elog(ERROR, "return type must be a row type");
     488              : 
     489            4 :     if (!found)
     490              :     {
     491            2 :         memset(nulls, true, sizeof(nulls));
     492              :     }
     493              :     else
     494              :     {
     495            2 :         values[0] = TimestampTzGetDatum(ts);
     496            2 :         nulls[0] = false;
     497              : 
     498            2 :         values[1] = ObjectIdGetDatum((Oid) nodeid);
     499            2 :         nulls[1] = false;
     500              :     }
     501              : 
     502            4 :     htup = heap_form_tuple(tupdesc, values, nulls);
     503              : 
     504            4 :     PG_RETURN_DATUM(HeapTupleGetDatum(htup));
     505              : }
     506              : 
     507              : /*
     508              :  * Number of shared CommitTS buffers.
     509              :  *
     510              :  * If asked to autotune, use 2MB for every 1GB of shared buffers, up to 8MB.
     511              :  * Otherwise just cap the configured amount to be between 16 and the maximum
     512              :  * allowed.
     513              :  */
     514              : static int
     515         2491 : CommitTsShmemBuffers(void)
     516              : {
     517              :     /* auto-tune based on shared buffers */
     518         2491 :     if (commit_timestamp_buffers == 0)
     519         1243 :         return SimpleLruAutotuneBuffers(512, 1024);
     520              : 
     521         1248 :     return Min(Max(16, commit_timestamp_buffers), SLRU_MAX_ALLOWED_BUFFERS);
     522              : }
     523              : 
     524              : /*
     525              :  * Register CommitTs shared memory needs at system startup (postmaster start
     526              :  * or standalone backend)
     527              :  */
     528              : static void
     529         1248 : CommitTsShmemRequest(void *arg)
     530              : {
     531              :     /* If auto-tuning is requested, now is the time to do it */
     532         1248 :     if (commit_timestamp_buffers == 0)
     533              :     {
     534              :         char        buf[32];
     535              : 
     536         1243 :         snprintf(buf, sizeof(buf), "%d", CommitTsShmemBuffers());
     537         1243 :         SetConfigOption("commit_timestamp_buffers", buf, PGC_POSTMASTER,
     538              :                         PGC_S_DYNAMIC_DEFAULT);
     539              : 
     540              :         /*
     541              :          * We prefer to report this value's source as PGC_S_DYNAMIC_DEFAULT.
     542              :          * However, if the DBA explicitly set commit_timestamp_buffers = 0 in
     543              :          * the config file, then PGC_S_DYNAMIC_DEFAULT will fail to override
     544              :          * that and we must force the matter with PGC_S_OVERRIDE.
     545              :          */
     546         1243 :         if (commit_timestamp_buffers == 0)  /* failed to apply it? */
     547            0 :             SetConfigOption("commit_timestamp_buffers", buf, PGC_POSTMASTER,
     548              :                             PGC_S_OVERRIDE);
     549              :     }
     550              :     Assert(commit_timestamp_buffers != 0);
     551         1248 :     SimpleLruRequest(.desc = &CommitTsSlruDesc,
     552              :                      .name = "commit_timestamp",
     553              :                      .Dir = "pg_commit_ts",
     554              :                      .long_segment_names = false,
     555              : 
     556              :                      .nslots = CommitTsShmemBuffers(),
     557              : 
     558              :                      .PagePrecedes = CommitTsPagePrecedes,
     559              :                      .errdetail_for_io_error = commit_ts_errdetail_for_io_error,
     560              : 
     561              :                      .sync_handler = SYNC_HANDLER_COMMIT_TS,
     562              :                      .buffer_tranche_id = LWTRANCHE_COMMITTS_BUFFER,
     563              :                      .bank_tranche_id = LWTRANCHE_COMMITTS_SLRU,
     564              :         );
     565              : 
     566         1248 :     ShmemRequestStruct(.name = "CommitTs shared",
     567              :                        .size = sizeof(CommitTimestampShared),
     568              :                        .ptr = (void **) &commitTsShared,
     569              :         );
     570         1248 : }
     571              : 
     572              : static void
     573         1245 : CommitTsShmemInit(void *arg)
     574              : {
     575         1245 :     commitTsShared->xidLastCommit = InvalidTransactionId;
     576         1245 :     TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
     577         1245 :     commitTsShared->dataLastCommit.nodeid = InvalidReplOriginId;
     578         1245 :     commitTsShared->commitTsActive = false;
     579              : 
     580              :     SlruPagePrecedesUnitTests(CommitTsCtl, COMMIT_TS_XACTS_PER_PAGE);
     581         1245 : }
     582              : 
     583              : /*
     584              :  * GUC check_hook for commit_timestamp_buffers
     585              :  */
     586              : bool
     587         2532 : check_commit_ts_buffers(int *newval, void **extra, GucSource source)
     588              : {
     589         2532 :     return check_slru_buffers("commit_timestamp_buffers", newval);
     590              : }
     591              : 
     592              : /*
     593              :  * This function must be called ONCE on system install.
     594              :  *
     595              :  * (The CommitTs directory is assumed to have been created by initdb, and
     596              :  * CommitTsShmemInit must have been called already.)
     597              :  */
     598              : void
     599           57 : BootStrapCommitTs(void)
     600              : {
     601              :     /*
     602              :      * Nothing to do here at present, unlike most other SLRU modules; segments
     603              :      * are created when the server is started with this module enabled. See
     604              :      * ActivateCommitTs.
     605              :      */
     606           57 : }
     607              : 
     608              : /*
     609              :  * This must be called ONCE during postmaster or standalone-backend startup,
     610              :  * after StartupXLOG has initialized TransamVariables->nextXid.
     611              :  */
     612              : void
     613           14 : StartupCommitTs(void)
     614              : {
     615           14 :     ActivateCommitTs();
     616           14 : }
     617              : 
     618              : /*
     619              :  * This must be called ONCE during postmaster or standalone-backend startup,
     620              :  * after recovery has finished.
     621              :  */
     622              : void
     623         1017 : CompleteCommitTsInitialization(void)
     624              : {
     625              :     /*
     626              :      * If the feature is not enabled, turn it off for good.  This also removes
     627              :      * any leftover data.
     628              :      *
     629              :      * Conversely, we activate the module if the feature is enabled.  This is
     630              :      * necessary for primary and standby as the activation depends on the
     631              :      * control file contents at the beginning of recovery or when a
     632              :      * XLOG_PARAMETER_CHANGE is replayed.
     633              :      */
     634         1017 :     if (!track_commit_timestamp)
     635          995 :         DeactivateCommitTs();
     636              :     else
     637           22 :         ActivateCommitTs();
     638         1017 : }
     639              : 
     640              : /*
     641              :  * Activate or deactivate CommitTs' upon reception of a XLOG_PARAMETER_CHANGE
     642              :  * XLog record during recovery.
     643              :  */
     644              : void
     645           38 : CommitTsParameterChange(bool newvalue, bool oldvalue)
     646              : {
     647              :     /*
     648              :      * If the commit_ts module is disabled in this server and we get word from
     649              :      * the primary server that it is enabled there, activate it so that we can
     650              :      * replay future WAL records involving it; also mark it as active on
     651              :      * pg_control.  If the old value was already set, we already did this, so
     652              :      * don't do anything.
     653              :      *
     654              :      * If the module is disabled in the primary, disable it here too, unless
     655              :      * the module is enabled locally.
     656              :      *
     657              :      * Note this only runs in the recovery process, so an unlocked read is
     658              :      * fine.
     659              :      */
     660           38 :     if (newvalue)
     661              :     {
     662            2 :         if (!commitTsShared->commitTsActive)
     663            0 :             ActivateCommitTs();
     664              :     }
     665           36 :     else if (commitTsShared->commitTsActive)
     666            1 :         DeactivateCommitTs();
     667           38 : }
     668              : 
     669              : /*
     670              :  * Activate this module whenever necessary.
     671              :  *      This must happen during postmaster or standalone-backend startup,
     672              :  *      or during WAL replay anytime the track_commit_timestamp setting is
     673              :  *      changed in the primary.
     674              :  *
     675              :  * The reason why this SLRU needs separate activation/deactivation functions is
     676              :  * that it can be enabled/disabled during start and the activation/deactivation
     677              :  * on the primary is propagated to the standby via replay. Other SLRUs don't
     678              :  * have this property and they can be just initialized during normal startup.
     679              :  *
     680              :  * This is in charge of creating the currently active segment, if it's not
     681              :  * already there.  The reason for this is that the server might have been
     682              :  * running with this module disabled for a while and thus might have skipped
     683              :  * the normal creation point.
     684              :  */
     685              : static void
     686           36 : ActivateCommitTs(void)
     687              : {
     688              :     TransactionId xid;
     689              :     int64       pageno;
     690              : 
     691              :     /*
     692              :      * During bootstrap, we should not register commit timestamps so skip the
     693              :      * activation in this case.
     694              :      */
     695           36 :     if (IsBootstrapProcessingMode())
     696            2 :         return;
     697              : 
     698              :     /* If we've done this already, there's nothing to do */
     699           34 :     LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
     700           34 :     if (commitTsShared->commitTsActive)
     701              :     {
     702            6 :         LWLockRelease(CommitTsLock);
     703            6 :         return;
     704              :     }
     705           28 :     LWLockRelease(CommitTsLock);
     706              : 
     707           28 :     xid = XidFromFullTransactionId(TransamVariables->nextXid);
     708           28 :     pageno = TransactionIdToCTsPage(xid);
     709              : 
     710              :     /*
     711              :      * Re-Initialize our idea of the latest page number.
     712              :      */
     713           28 :     pg_atomic_write_u64(&CommitTsCtl->shared->latest_page_number, pageno);
     714              : 
     715              :     /*
     716              :      * If CommitTs is enabled, but it wasn't in the previous server run, we
     717              :      * need to set the oldest and newest values to the next Xid; that way, we
     718              :      * will not try to read data that might not have been set.
     719              :      *
     720              :      * XXX does this have a problem if a server is started with commitTs
     721              :      * enabled, then started with commitTs disabled, then restarted with it
     722              :      * enabled again?  It doesn't look like it does, because there should be a
     723              :      * checkpoint that sets the value to InvalidTransactionId at end of
     724              :      * recovery; and so any chance of injecting new transactions without
     725              :      * CommitTs values would occur after the oldestCommitTsXid has been set to
     726              :      * Invalid temporarily.
     727              :      */
     728           28 :     LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
     729           28 :     if (TransamVariables->oldestCommitTsXid == InvalidTransactionId)
     730              :     {
     731           16 :         TransamVariables->oldestCommitTsXid =
     732           16 :             TransamVariables->newestCommitTsXid = ReadNextTransactionId();
     733              :     }
     734           28 :     LWLockRelease(CommitTsLock);
     735              : 
     736              :     /* Create the current segment file, if necessary */
     737           28 :     if (!SimpleLruDoesPhysicalPageExist(CommitTsCtl, pageno))
     738           14 :         SimpleLruZeroAndWritePage(CommitTsCtl, pageno);
     739              : 
     740              :     /* Change the activation status in shared memory. */
     741           28 :     LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
     742           28 :     commitTsShared->commitTsActive = true;
     743           28 :     LWLockRelease(CommitTsLock);
     744              : }
     745              : 
     746              : /*
     747              :  * Deactivate this module.
     748              :  *
     749              :  * This must be called when the track_commit_timestamp parameter is turned off.
     750              :  * This happens during postmaster or standalone-backend startup, or during WAL
     751              :  * replay.
     752              :  *
     753              :  * Resets CommitTs into invalid state to make sure we don't hand back
     754              :  * possibly-invalid data; also removes segments of old data.
     755              :  */
     756              : static void
     757          996 : DeactivateCommitTs(void)
     758              : {
     759              :     /*
     760              :      * Cleanup the status in the shared memory.
     761              :      *
     762              :      * We reset everything in the commitTsShared record to prevent user from
     763              :      * getting confusing data about last committed transaction on the standby
     764              :      * when the module was activated repeatedly on the primary.
     765              :      */
     766          996 :     LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
     767              : 
     768          996 :     commitTsShared->commitTsActive = false;
     769          996 :     commitTsShared->xidLastCommit = InvalidTransactionId;
     770          996 :     TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
     771          996 :     commitTsShared->dataLastCommit.nodeid = InvalidReplOriginId;
     772              : 
     773          996 :     TransamVariables->oldestCommitTsXid = InvalidTransactionId;
     774          996 :     TransamVariables->newestCommitTsXid = InvalidTransactionId;
     775              : 
     776              :     /*
     777              :      * Remove *all* files.  This is necessary so that there are no leftover
     778              :      * files; in the case where this feature is later enabled after running
     779              :      * with it disabled for some time there may be a gap in the file sequence.
     780              :      * (We can probably tolerate out-of-sequence files, as they are going to
     781              :      * be overwritten anyway when we wrap around, but it seems better to be
     782              :      * tidy.)
     783              :      *
     784              :      * Note that we do this with CommitTsLock acquired in exclusive mode. This
     785              :      * is very heavy-handed, but since this routine can only be called in the
     786              :      * replica and should happen very rarely, we don't worry too much about
     787              :      * it.  Note also that no process should be consulting this SLRU if we
     788              :      * have just deactivated it.
     789              :      */
     790          996 :     (void) SlruScanDirectory(CommitTsCtl, SlruScanDirCbDeleteAll, NULL);
     791              : 
     792          996 :     LWLockRelease(CommitTsLock);
     793          996 : }
     794              : 
     795              : /*
     796              :  * Perform a checkpoint --- either during shutdown, or on-the-fly
     797              :  */
     798              : void
     799         1942 : CheckPointCommitTs(void)
     800              : {
     801              :     /*
     802              :      * Write dirty CommitTs pages to disk.  This may result in sync requests
     803              :      * queued for later handling by ProcessSyncRequests(), as part of the
     804              :      * checkpoint.
     805              :      */
     806         1942 :     SimpleLruWriteAll(CommitTsCtl, true);
     807         1942 : }
     808              : 
     809              : /*
     810              :  * Make sure that CommitTs has room for a newly-allocated XID.
     811              :  *
     812              :  * NB: this is called while holding XidGenLock.  We want it to be very fast
     813              :  * most of the time; even when it's not so fast, no actual I/O need happen
     814              :  * unless we're forced to write out a dirty CommitTs or xlog page to make room
     815              :  * in shared memory.
     816              :  *
     817              :  * NB: the current implementation relies on track_commit_timestamp being
     818              :  * PGC_POSTMASTER.
     819              :  */
     820              : void
     821     24543159 : ExtendCommitTs(TransactionId newestXact)
     822              : {
     823              :     int64       pageno;
     824              :     LWLock     *lock;
     825              : 
     826              :     /*
     827              :      * Nothing to do if module not enabled.  Note we do an unlocked read of
     828              :      * the flag here, which is okay because this routine is only called from
     829              :      * GetNewTransactionId, which is never called in a standby.
     830              :      */
     831              :     Assert(!InRecovery);
     832     24543159 :     if (!commitTsShared->commitTsActive)
     833     24542293 :         return;
     834              : 
     835              :     /*
     836              :      * No work except at first XID of a page.  But beware: just after
     837              :      * wraparound, the first XID of page zero is FirstNormalTransactionId.
     838              :      */
     839          866 :     if (TransactionIdToCTsEntry(newestXact) != 0 &&
     840              :         !TransactionIdEquals(newestXact, FirstNormalTransactionId))
     841          865 :         return;
     842              : 
     843            1 :     pageno = TransactionIdToCTsPage(newestXact);
     844              : 
     845            1 :     lock = SimpleLruGetBankLock(CommitTsCtl, pageno);
     846              : 
     847            1 :     LWLockAcquire(lock, LW_EXCLUSIVE);
     848              : 
     849              :     /* Zero the page ... */
     850            1 :     SimpleLruZeroPage(CommitTsCtl, pageno);
     851              : 
     852              :     /* and make a WAL entry about that, unless we're in REDO */
     853            1 :     if (!InRecovery)
     854            1 :         XLogSimpleInsertInt64(RM_COMMIT_TS_ID, COMMIT_TS_ZEROPAGE, pageno);
     855              : 
     856            1 :     LWLockRelease(lock);
     857              : }
     858              : 
     859              : /*
     860              :  * Remove all CommitTs segments before the one holding the passed
     861              :  * transaction ID.
     862              :  *
     863              :  * Note that we don't need to flush XLOG here.
     864              :  */
     865              : void
     866         1146 : TruncateCommitTs(TransactionId oldestXact)
     867              : {
     868              :     int64       cutoffPage;
     869              : 
     870              :     /*
     871              :      * The cutoff point is the start of the segment containing oldestXact. We
     872              :      * pass the *page* containing oldestXact to SimpleLruTruncate.
     873              :      */
     874         1146 :     cutoffPage = TransactionIdToCTsPage(oldestXact);
     875              : 
     876              :     /* Check to see if there's any files that could be removed */
     877         1146 :     if (!SlruScanDirectory(CommitTsCtl, SlruScanDirCbReportPresence,
     878              :                            &cutoffPage))
     879         1146 :         return;                 /* nothing to remove */
     880              : 
     881              :     /* Write XLOG record */
     882            0 :     WriteTruncateXlogRec(cutoffPage, oldestXact);
     883              : 
     884              :     /* Now we can remove the old CommitTs segment(s) */
     885            0 :     SimpleLruTruncate(CommitTsCtl, cutoffPage);
     886              : }
     887              : 
     888              : /*
     889              :  * Set the limit values between which commit TS can be consulted.
     890              :  */
     891              : void
     892         1143 : SetCommitTsLimit(TransactionId oldestXact, TransactionId newestXact)
     893              : {
     894              :     /*
     895              :      * Be careful not to overwrite values that are either further into the
     896              :      * "future" or signal a disabled committs.
     897              :      */
     898         1143 :     LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
     899         1143 :     if (TransamVariables->oldestCommitTsXid != InvalidTransactionId)
     900              :     {
     901            0 :         if (TransactionIdPrecedes(TransamVariables->oldestCommitTsXid, oldestXact))
     902            0 :             TransamVariables->oldestCommitTsXid = oldestXact;
     903            0 :         if (TransactionIdPrecedes(newestXact, TransamVariables->newestCommitTsXid))
     904            0 :             TransamVariables->newestCommitTsXid = newestXact;
     905              :     }
     906              :     else
     907              :     {
     908              :         Assert(TransamVariables->newestCommitTsXid == InvalidTransactionId);
     909         1143 :         TransamVariables->oldestCommitTsXid = oldestXact;
     910         1143 :         TransamVariables->newestCommitTsXid = newestXact;
     911              :     }
     912         1143 :     LWLockRelease(CommitTsLock);
     913         1143 : }
     914              : 
     915              : /*
     916              :  * Move forwards the oldest commitTS value that can be consulted
     917              :  */
     918              : void
     919         1146 : AdvanceOldestCommitTsXid(TransactionId oldestXact)
     920              : {
     921         1146 :     LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
     922         1147 :     if (TransamVariables->oldestCommitTsXid != InvalidTransactionId &&
     923            1 :         TransactionIdPrecedes(TransamVariables->oldestCommitTsXid, oldestXact))
     924            1 :         TransamVariables->oldestCommitTsXid = oldestXact;
     925         1146 :     LWLockRelease(CommitTsLock);
     926         1146 : }
     927              : 
     928              : 
     929              : /*
     930              :  * Decide whether a commitTS page number is "older" for truncation purposes.
     931              :  * Analogous to CLOGPagePrecedes().
     932              :  *
     933              :  * At default BLCKSZ, (1 << 31) % COMMIT_TS_XACTS_PER_PAGE == 128.  This
     934              :  * introduces differences compared to CLOG and the other SLRUs having (1 <<
     935              :  * 31) % per_page == 0.  This function never tests exactly
     936              :  * TransactionIdPrecedes(x-2^31, x).  When the system reaches xidStopLimit,
     937              :  * there are two possible counts of page boundaries between oldestXact and the
     938              :  * latest XID assigned, depending on whether oldestXact is within the first
     939              :  * 128 entries of its page.  Since this function doesn't know the location of
     940              :  * oldestXact within page2, it returns false for one page that actually is
     941              :  * expendable.  This is a wider (yet still negligible) version of the
     942              :  * truncation opportunity that CLOGPagePrecedes() cannot recognize.
     943              :  *
     944              :  * For the sake of a worked example, number entries with decimal values such
     945              :  * that page1==1 entries range from 1.0 to 1.999.  Let N+0.15 be the number of
     946              :  * pages that 2^31 entries will span (N is an integer).  If oldestXact=N+2.1,
     947              :  * then the final safe XID assignment leaves newestXact=1.95.  We keep page 2,
     948              :  * because entry=2.85 is the border that toggles whether entries precede the
     949              :  * last entry of the oldestXact page.  While page 2 is expendable at
     950              :  * oldestXact=N+2.1, it would be precious at oldestXact=N+2.9.
     951              :  */
     952              : static bool
     953            1 : CommitTsPagePrecedes(int64 page1, int64 page2)
     954              : {
     955              :     TransactionId xid1;
     956              :     TransactionId xid2;
     957              : 
     958            1 :     xid1 = ((TransactionId) page1) * COMMIT_TS_XACTS_PER_PAGE;
     959            1 :     xid1 += FirstNormalTransactionId + 1;
     960            1 :     xid2 = ((TransactionId) page2) * COMMIT_TS_XACTS_PER_PAGE;
     961            1 :     xid2 += FirstNormalTransactionId + 1;
     962              : 
     963            1 :     return (TransactionIdPrecedes(xid1, xid2) &&
     964            0 :             TransactionIdPrecedes(xid1, xid2 + COMMIT_TS_XACTS_PER_PAGE - 1));
     965              : }
     966              : 
     967              : static int
     968            0 : commit_ts_errdetail_for_io_error(const void *opaque_data)
     969              : {
     970            0 :     TransactionId xid = *(const TransactionId *) opaque_data;
     971              : 
     972            0 :     return errdetail("Could not access commit timestamp of transaction %u.", xid);
     973              : }
     974              : 
     975              : /*
     976              :  * Write a TRUNCATE xlog record
     977              :  */
     978              : static void
     979            0 : WriteTruncateXlogRec(int64 pageno, TransactionId oldestXid)
     980              : {
     981              :     xl_commit_ts_truncate xlrec;
     982              : 
     983            0 :     xlrec.pageno = pageno;
     984            0 :     xlrec.oldestXid = oldestXid;
     985              : 
     986            0 :     XLogBeginInsert();
     987            0 :     XLogRegisterData(&xlrec, SizeOfCommitTsTruncate);
     988            0 :     (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_TRUNCATE);
     989            0 : }
     990              : 
     991              : /*
     992              :  * CommitTS resource manager's routines
     993              :  */
     994              : void
     995            0 : commit_ts_redo(XLogReaderState *record)
     996              : {
     997            0 :     uint8       info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
     998              : 
     999              :     /* Backup blocks are not used in commit_ts records */
    1000              :     Assert(!XLogRecHasAnyBlockRefs(record));
    1001              : 
    1002            0 :     if (info == COMMIT_TS_ZEROPAGE)
    1003              :     {
    1004              :         int64       pageno;
    1005              : 
    1006            0 :         memcpy(&pageno, XLogRecGetData(record), sizeof(pageno));
    1007            0 :         SimpleLruZeroAndWritePage(CommitTsCtl, pageno);
    1008              :     }
    1009            0 :     else if (info == COMMIT_TS_TRUNCATE)
    1010              :     {
    1011            0 :         xl_commit_ts_truncate *trunc = (xl_commit_ts_truncate *) XLogRecGetData(record);
    1012              : 
    1013            0 :         AdvanceOldestCommitTsXid(trunc->oldestXid);
    1014              : 
    1015              :         /*
    1016              :          * During XLOG replay, latest_page_number isn't set up yet; insert a
    1017              :          * suitable value to bypass the sanity test in SimpleLruTruncate.
    1018              :          */
    1019            0 :         pg_atomic_write_u64(&CommitTsCtl->shared->latest_page_number,
    1020            0 :                             trunc->pageno);
    1021              : 
    1022            0 :         SimpleLruTruncate(CommitTsCtl, trunc->pageno);
    1023              :     }
    1024              :     else
    1025            0 :         elog(PANIC, "commit_ts_redo: unknown op code %u", info);
    1026            0 : }
    1027              : 
    1028              : /*
    1029              :  * Entrypoint for sync.c to sync commit_ts files.
    1030              :  */
    1031              : int
    1032            0 : committssyncfiletag(const FileTag *ftag, char *path)
    1033              : {
    1034            0 :     return SlruSyncFileTag(CommitTsCtl, ftag, path);
    1035              : }
        

Generated by: LCOV version 2.0-1