LCOV - code coverage report
Current view: top level - src/backend/access/transam - commit_ts.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 83.5 % 272 227
Test Date: 2026-03-22 09:16:17 Functions: 86.7 % 30 26
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * commit_ts.c
       4              :  *      PostgreSQL commit timestamp manager
       5              :  *
       6              :  * This module is a pg_xact-like system that stores the commit timestamp
       7              :  * for each transaction.
       8              :  *
       9              :  * XLOG interactions: this module generates an XLOG record whenever a new
      10              :  * CommitTs page is initialized to zeroes.  Other writes of CommitTS come
      11              :  * from recording of transaction commit in xact.c, which generates its own
      12              :  * XLOG records for these events and will re-perform the status update on
      13              :  * redo; so we need make no additional XLOG entry here.
      14              :  *
      15              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
      16              :  * Portions Copyright (c) 1994, Regents of the University of California
      17              :  *
      18              :  * src/backend/access/transam/commit_ts.c
      19              :  *
      20              :  *-------------------------------------------------------------------------
      21              :  */
      22              : #include "postgres.h"
      23              : 
      24              : #include "access/commit_ts.h"
      25              : #include "access/htup_details.h"
      26              : #include "access/slru.h"
      27              : #include "access/transam.h"
      28              : #include "access/xloginsert.h"
      29              : #include "access/xlogutils.h"
      30              : #include "funcapi.h"
      31              : #include "miscadmin.h"
      32              : #include "storage/shmem.h"
      33              : #include "utils/fmgrprotos.h"
      34              : #include "utils/guc_hooks.h"
      35              : #include "utils/timestamp.h"
      36              : 
      37              : /*
      38              :  * Defines for CommitTs page sizes.  A page is the same BLCKSZ as is used
      39              :  * everywhere else in Postgres.
      40              :  *
      41              :  * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
      42              :  * CommitTs page numbering also wraps around at
      43              :  * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE, and CommitTs segment numbering at
      44              :  * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT.  We need take no
      45              :  * explicit notice of that fact in this module, except when comparing segment
      46              :  * and page numbers in TruncateCommitTs (see CommitTsPagePrecedes).
      47              :  */
      48              : 
      49              : /*
      50              :  * We need 8+2 bytes per xact.  Note that enlarging this struct might mean
      51              :  * the largest possible file name is more than 5 chars long; see
      52              :  * SlruScanDirectory.
      53              :  */
      54              : typedef struct CommitTimestampEntry
      55              : {
      56              :     TimestampTz time;
      57              :     ReplOriginId nodeid;
      58              : } CommitTimestampEntry;
      59              : 
      60              : #define SizeOfCommitTimestampEntry (offsetof(CommitTimestampEntry, nodeid) + \
      61              :                                     sizeof(ReplOriginId))
      62              : 
      63              : #define COMMIT_TS_XACTS_PER_PAGE \
      64              :     (BLCKSZ / SizeOfCommitTimestampEntry)
      65              : 
      66              : 
      67              : /*
      68              :  * Although we return an int64 the actual value can't currently exceed
      69              :  * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE.
      70              :  */
      71              : static inline int64
      72         2043 : TransactionIdToCTsPage(TransactionId xid)
      73              : {
      74         2043 :     return xid / (int64) COMMIT_TS_XACTS_PER_PAGE;
      75              : }
      76              : 
      77              : #define TransactionIdToCTsEntry(xid)    \
      78              :     ((xid) % (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
      79              : 
      80              : /*
      81              :  * Link to shared-memory data structures for CommitTs control
      82              :  */
      83              : static SlruCtlData CommitTsCtlData;
      84              : 
      85              : #define CommitTsCtl (&CommitTsCtlData)
      86              : 
      87              : /*
      88              :  * We keep a cache of the last value set in shared memory.
      89              :  *
      90              :  * This is also good place to keep the activation status.  We keep this
      91              :  * separate from the GUC so that the standby can activate the module if the
      92              :  * primary has it active independently of the value of the GUC.
      93              :  *
      94              :  * This is protected by CommitTsLock.  In some places, we use commitTsActive
      95              :  * without acquiring the lock; where this happens, a comment explains the
      96              :  * rationale for it.
      97              :  */
      98              : typedef struct CommitTimestampShared
      99              : {
     100              :     TransactionId xidLastCommit;
     101              :     CommitTimestampEntry dataLastCommit;
     102              :     bool        commitTsActive;
     103              : } CommitTimestampShared;
     104              : 
     105              : static CommitTimestampShared *commitTsShared;
     106              : 
     107              : 
     108              : /* GUC variable */
     109              : bool        track_commit_timestamp;
     110              : 
     111              : static void SetXidCommitTsInPage(TransactionId xid, int nsubxids,
     112              :                                  TransactionId *subxids, TimestampTz ts,
     113              :                                  ReplOriginId nodeid, int64 pageno);
     114              : static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
     115              :                                      ReplOriginId nodeid, int slotno);
     116              : static void error_commit_ts_disabled(void);
     117              : static bool CommitTsPagePrecedes(int64 page1, int64 page2);
     118              : static int  commit_ts_errdetail_for_io_error(const void *opaque_data);
     119              : static void ActivateCommitTs(void);
     120              : static void DeactivateCommitTs(void);
     121              : static void WriteTruncateXlogRec(int64 pageno, TransactionId oldestXid);
     122              : 
     123              : /*
     124              :  * TransactionTreeSetCommitTsData
     125              :  *
     126              :  * Record the final commit timestamp of transaction entries in the commit log
     127              :  * for a transaction and its subtransaction tree, as efficiently as possible.
     128              :  *
     129              :  * xid is the top level transaction id.
     130              :  *
     131              :  * subxids is an array of xids of length nsubxids, representing subtransactions
     132              :  * in the tree of xid. In various cases nsubxids may be zero.
     133              :  * The reason why tracking just the parent xid commit timestamp is not enough
     134              :  * is that the subtrans SLRU does not stay valid across crashes (it's not
     135              :  * permanent) so we need to keep the information about them here. If the
     136              :  * subtrans implementation changes in the future, we might want to revisit the
     137              :  * decision of storing timestamp info for each subxid.
     138              :  */
     139              : void
     140       174994 : TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids,
     141              :                                TransactionId *subxids, TimestampTz timestamp,
     142              :                                ReplOriginId nodeid)
     143              : {
     144              :     int         i;
     145              :     TransactionId headxid;
     146              :     TransactionId newestXact;
     147              : 
     148              :     /*
     149              :      * No-op if the module is not active.
     150              :      *
     151              :      * An unlocked read here is fine, because in a standby (the only place
     152              :      * where the flag can change in flight) this routine is only called by the
     153              :      * recovery process, which is also the only process which can change the
     154              :      * flag.
     155              :      */
     156       174994 :     if (!commitTsShared->commitTsActive)
     157       174144 :         return;
     158              : 
     159              :     /*
     160              :      * Figure out the latest Xid in this batch: either the last subxid if
     161              :      * there's any, otherwise the parent xid.
     162              :      */
     163          850 :     if (nsubxids > 0)
     164            0 :         newestXact = subxids[nsubxids - 1];
     165              :     else
     166          850 :         newestXact = xid;
     167              : 
     168              :     /*
     169              :      * We split the xids to set the timestamp to in groups belonging to the
     170              :      * same SLRU page; the first element in each such set is its head.  The
     171              :      * first group has the main XID as the head; subsequent sets use the first
     172              :      * subxid not on the previous page as head.  This way, we only have to
     173              :      * lock/modify each SLRU page once.
     174              :      */
     175          850 :     headxid = xid;
     176          850 :     i = 0;
     177              :     for (;;)
     178            0 :     {
     179          850 :         int64       pageno = TransactionIdToCTsPage(headxid);
     180              :         int         j;
     181              : 
     182          850 :         for (j = i; j < nsubxids; j++)
     183              :         {
     184            0 :             if (TransactionIdToCTsPage(subxids[j]) != pageno)
     185            0 :                 break;
     186              :         }
     187              :         /* subxids[i..j] are on the same page as the head */
     188              : 
     189          850 :         SetXidCommitTsInPage(headxid, j - i, subxids + i, timestamp, nodeid,
     190              :                              pageno);
     191              : 
     192              :         /* if we wrote out all subxids, we're done. */
     193          850 :         if (j >= nsubxids)
     194          850 :             break;
     195              : 
     196              :         /*
     197              :          * Set the new head and skip over it, as well as over the subxids we
     198              :          * just wrote.
     199              :          */
     200            0 :         headxid = subxids[j];
     201            0 :         i = j + 1;
     202              :     }
     203              : 
     204              :     /* update the cached value in shared memory */
     205          850 :     LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
     206          850 :     commitTsShared->xidLastCommit = xid;
     207          850 :     commitTsShared->dataLastCommit.time = timestamp;
     208          850 :     commitTsShared->dataLastCommit.nodeid = nodeid;
     209              : 
     210              :     /* and move forwards our endpoint, if needed */
     211          850 :     if (TransactionIdPrecedes(TransamVariables->newestCommitTsXid, newestXact))
     212          837 :         TransamVariables->newestCommitTsXid = newestXact;
     213          850 :     LWLockRelease(CommitTsLock);
     214              : }
     215              : 
     216              : /*
     217              :  * Record the commit timestamp of transaction entries in the commit log for all
     218              :  * entries on a single page.  Atomic only on this page.
     219              :  */
     220              : static void
     221          850 : SetXidCommitTsInPage(TransactionId xid, int nsubxids,
     222              :                      TransactionId *subxids, TimestampTz ts,
     223              :                      ReplOriginId nodeid, int64 pageno)
     224              : {
     225          850 :     LWLock     *lock = SimpleLruGetBankLock(CommitTsCtl, pageno);
     226              :     int         slotno;
     227              :     int         i;
     228              : 
     229          850 :     LWLockAcquire(lock, LW_EXCLUSIVE);
     230              : 
     231          850 :     slotno = SimpleLruReadPage(CommitTsCtl, pageno, true, &xid);
     232              : 
     233          850 :     TransactionIdSetCommitTs(xid, ts, nodeid, slotno);
     234          850 :     for (i = 0; i < nsubxids; i++)
     235            0 :         TransactionIdSetCommitTs(subxids[i], ts, nodeid, slotno);
     236              : 
     237          850 :     CommitTsCtl->shared->page_dirty[slotno] = true;
     238              : 
     239          850 :     LWLockRelease(lock);
     240          850 : }
     241              : 
     242              : /*
     243              :  * Sets the commit timestamp of a single transaction.
     244              :  *
     245              :  * Caller must hold the correct SLRU bank lock, will be held at exit
     246              :  */
     247              : static void
     248          850 : TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
     249              :                          ReplOriginId nodeid, int slotno)
     250              : {
     251          850 :     int         entryno = TransactionIdToCTsEntry(xid);
     252              :     CommitTimestampEntry entry;
     253              : 
     254              :     Assert(TransactionIdIsNormal(xid));
     255              : 
     256          850 :     entry.time = ts;
     257          850 :     entry.nodeid = nodeid;
     258              : 
     259          850 :     memcpy(CommitTsCtl->shared->page_buffer[slotno] +
     260          850 :            SizeOfCommitTimestampEntry * entryno,
     261              :            &entry, SizeOfCommitTimestampEntry);
     262          850 : }
     263              : 
     264              : /*
     265              :  * Interrogate the commit timestamp of a transaction.
     266              :  *
     267              :  * The return value indicates whether a commit timestamp record was found for
     268              :  * the given xid.  The timestamp value is returned in *ts (which may not be
     269              :  * null), and the origin node for the Xid is returned in *nodeid, if it's not
     270              :  * null.
     271              :  */
     272              : bool
     273          104 : TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
     274              :                              ReplOriginId *nodeid)
     275              : {
     276          104 :     int64       pageno = TransactionIdToCTsPage(xid);
     277          104 :     int         entryno = TransactionIdToCTsEntry(xid);
     278              :     int         slotno;
     279              :     CommitTimestampEntry entry;
     280              :     TransactionId oldestCommitTsXid;
     281              :     TransactionId newestCommitTsXid;
     282              : 
     283          104 :     if (!TransactionIdIsValid(xid))
     284            3 :         ereport(ERROR,
     285              :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     286              :                  errmsg("cannot retrieve commit timestamp for transaction %u", xid)));
     287          101 :     else if (!TransactionIdIsNormal(xid))
     288              :     {
     289              :         /* frozen and bootstrap xids are always committed far in the past */
     290            6 :         *ts = 0;
     291            6 :         if (nodeid)
     292            2 :             *nodeid = 0;
     293            6 :         return false;
     294              :     }
     295              : 
     296           95 :     LWLockAcquire(CommitTsLock, LW_SHARED);
     297              : 
     298              :     /* Error if module not enabled */
     299           95 :     if (!commitTsShared->commitTsActive)
     300            3 :         error_commit_ts_disabled();
     301              : 
     302              :     /*
     303              :      * If we're asked for the cached value, return that.  Otherwise, fall
     304              :      * through to read from SLRU.
     305              :      */
     306           92 :     if (commitTsShared->xidLastCommit == xid)
     307              :     {
     308           19 :         *ts = commitTsShared->dataLastCommit.time;
     309           19 :         if (nodeid)
     310           10 :             *nodeid = commitTsShared->dataLastCommit.nodeid;
     311              : 
     312           19 :         LWLockRelease(CommitTsLock);
     313           19 :         return *ts != 0;
     314              :     }
     315              : 
     316           73 :     oldestCommitTsXid = TransamVariables->oldestCommitTsXid;
     317           73 :     newestCommitTsXid = TransamVariables->newestCommitTsXid;
     318              :     /* neither is invalid, or both are */
     319              :     Assert(TransactionIdIsValid(oldestCommitTsXid) == TransactionIdIsValid(newestCommitTsXid));
     320           73 :     LWLockRelease(CommitTsLock);
     321              : 
     322              :     /*
     323              :      * Return empty if the requested value is outside our valid range.
     324              :      */
     325          146 :     if (!TransactionIdIsValid(oldestCommitTsXid) ||
     326           87 :         TransactionIdPrecedes(xid, oldestCommitTsXid) ||
     327           14 :         TransactionIdPrecedes(newestCommitTsXid, xid))
     328              :     {
     329           59 :         *ts = 0;
     330           59 :         if (nodeid)
     331           56 :             *nodeid = InvalidReplOriginId;
     332           59 :         return false;
     333              :     }
     334              : 
     335              :     /* lock is acquired by SimpleLruReadPage_ReadOnly */
     336           14 :     slotno = SimpleLruReadPage_ReadOnly(CommitTsCtl, pageno, &xid);
     337           14 :     memcpy(&entry,
     338           14 :            CommitTsCtl->shared->page_buffer[slotno] +
     339           14 :            SizeOfCommitTimestampEntry * entryno,
     340              :            SizeOfCommitTimestampEntry);
     341              : 
     342           14 :     *ts = entry.time;
     343           14 :     if (nodeid)
     344            7 :         *nodeid = entry.nodeid;
     345              : 
     346           14 :     LWLockRelease(SimpleLruGetBankLock(CommitTsCtl, pageno));
     347           14 :     return *ts != 0;
     348              : }
     349              : 
     350              : /*
     351              :  * Return the Xid of the latest committed transaction.  (As far as this module
     352              :  * is concerned, anyway; it's up to the caller to ensure the value is useful
     353              :  * for its purposes.)
     354              :  *
     355              :  * ts and nodeid are filled with the corresponding data; they can be passed
     356              :  * as NULL if not wanted.
     357              :  */
     358              : TransactionId
     359            4 : GetLatestCommitTsData(TimestampTz *ts, ReplOriginId *nodeid)
     360              : {
     361              :     TransactionId xid;
     362              : 
     363            4 :     LWLockAcquire(CommitTsLock, LW_SHARED);
     364              : 
     365              :     /* Error if module not enabled */
     366            4 :     if (!commitTsShared->commitTsActive)
     367            0 :         error_commit_ts_disabled();
     368              : 
     369            4 :     xid = commitTsShared->xidLastCommit;
     370            4 :     if (ts)
     371            4 :         *ts = commitTsShared->dataLastCommit.time;
     372            4 :     if (nodeid)
     373            4 :         *nodeid = commitTsShared->dataLastCommit.nodeid;
     374            4 :     LWLockRelease(CommitTsLock);
     375              : 
     376            4 :     return xid;
     377              : }
     378              : 
     379              : static void
     380            3 : error_commit_ts_disabled(void)
     381              : {
     382            3 :     ereport(ERROR,
     383              :             (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     384              :              errmsg("could not get commit timestamp data"),
     385              :              RecoveryInProgress() ?
     386              :              errhint("Make sure the configuration parameter \"%s\" is set on the primary server.",
     387              :                      "track_commit_timestamp") :
     388              :              errhint("Make sure the configuration parameter \"%s\" is set.",
     389              :                      "track_commit_timestamp")));
     390              : }
     391              : 
     392              : /*
     393              :  * SQL-callable wrapper to obtain commit time of a transaction
     394              :  */
     395              : Datum
     396           28 : pg_xact_commit_timestamp(PG_FUNCTION_ARGS)
     397              : {
     398           28 :     TransactionId xid = PG_GETARG_TRANSACTIONID(0);
     399              :     TimestampTz ts;
     400              :     bool        found;
     401              : 
     402           28 :     found = TransactionIdGetCommitTsData(xid, &ts, NULL);
     403              : 
     404           23 :     if (!found)
     405            7 :         PG_RETURN_NULL();
     406              : 
     407           16 :     PG_RETURN_TIMESTAMPTZ(ts);
     408              : }
     409              : 
     410              : 
     411              : /*
     412              :  * pg_last_committed_xact
     413              :  *
     414              :  * SQL-callable wrapper to obtain some information about the latest
     415              :  * committed transaction: transaction ID, timestamp and replication
     416              :  * origin.
     417              :  */
     418              : Datum
     419            4 : pg_last_committed_xact(PG_FUNCTION_ARGS)
     420              : {
     421              :     TransactionId xid;
     422              :     ReplOriginId nodeid;
     423              :     TimestampTz ts;
     424              :     Datum       values[3];
     425              :     bool        nulls[3];
     426              :     TupleDesc   tupdesc;
     427              :     HeapTuple   htup;
     428              : 
     429              :     /* and construct a tuple with our data */
     430            4 :     xid = GetLatestCommitTsData(&ts, &nodeid);
     431              : 
     432            4 :     if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
     433            0 :         elog(ERROR, "return type must be a row type");
     434              : 
     435            4 :     if (!TransactionIdIsNormal(xid))
     436              :     {
     437            0 :         memset(nulls, true, sizeof(nulls));
     438              :     }
     439              :     else
     440              :     {
     441            4 :         values[0] = TransactionIdGetDatum(xid);
     442            4 :         nulls[0] = false;
     443              : 
     444            4 :         values[1] = TimestampTzGetDatum(ts);
     445            4 :         nulls[1] = false;
     446              : 
     447            4 :         values[2] = ObjectIdGetDatum((Oid) nodeid);
     448            4 :         nulls[2] = false;
     449              :     }
     450              : 
     451            4 :     htup = heap_form_tuple(tupdesc, values, nulls);
     452              : 
     453            4 :     PG_RETURN_DATUM(HeapTupleGetDatum(htup));
     454              : }
     455              : 
     456              : /*
     457              :  * pg_xact_commit_timestamp_origin
     458              :  *
     459              :  * SQL-callable wrapper to obtain commit timestamp and replication origin
     460              :  * of a given transaction.
     461              :  */
     462              : Datum
     463            5 : pg_xact_commit_timestamp_origin(PG_FUNCTION_ARGS)
     464              : {
     465            5 :     TransactionId xid = PG_GETARG_TRANSACTIONID(0);
     466              :     ReplOriginId nodeid;
     467              :     TimestampTz ts;
     468              :     Datum       values[2];
     469              :     bool        nulls[2];
     470              :     TupleDesc   tupdesc;
     471              :     HeapTuple   htup;
     472              :     bool        found;
     473              : 
     474            5 :     found = TransactionIdGetCommitTsData(xid, &ts, &nodeid);
     475              : 
     476            4 :     if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
     477            0 :         elog(ERROR, "return type must be a row type");
     478              : 
     479            4 :     if (!found)
     480              :     {
     481            2 :         memset(nulls, true, sizeof(nulls));
     482              :     }
     483              :     else
     484              :     {
     485            2 :         values[0] = TimestampTzGetDatum(ts);
     486            2 :         nulls[0] = false;
     487              : 
     488            2 :         values[1] = ObjectIdGetDatum((Oid) nodeid);
     489            2 :         nulls[1] = false;
     490              :     }
     491              : 
     492            4 :     htup = heap_form_tuple(tupdesc, values, nulls);
     493              : 
     494            4 :     PG_RETURN_DATUM(HeapTupleGetDatum(htup));
     495              : }
     496              : 
     497              : /*
     498              :  * Number of shared CommitTS buffers.
     499              :  *
     500              :  * If asked to autotune, use 2MB for every 1GB of shared buffers, up to 8MB.
     501              :  * Otherwise just cap the configured amount to be between 16 and the maximum
     502              :  * allowed.
     503              :  */
     504              : static int
     505         4562 : CommitTsShmemBuffers(void)
     506              : {
     507              :     /* auto-tune based on shared buffers */
     508         4562 :     if (commit_timestamp_buffers == 0)
     509         3377 :         return SimpleLruAutotuneBuffers(512, 1024);
     510              : 
     511         1185 :     return Min(Max(16, commit_timestamp_buffers), SLRU_MAX_ALLOWED_BUFFERS);
     512              : }
     513              : 
     514              : /*
     515              :  * Shared memory sizing for CommitTs
     516              :  */
     517              : Size
     518         2207 : CommitTsShmemSize(void)
     519              : {
     520         2207 :     return SimpleLruShmemSize(CommitTsShmemBuffers(), 0) +
     521              :         sizeof(CommitTimestampShared);
     522              : }
     523              : 
     524              : /*
     525              :  * Initialize CommitTs at system startup (postmaster start or standalone
     526              :  * backend)
     527              :  */
     528              : void
     529         1180 : CommitTsShmemInit(void)
     530              : {
     531              :     bool        found;
     532              : 
     533              :     /* If auto-tuning is requested, now is the time to do it */
     534         1180 :     if (commit_timestamp_buffers == 0)
     535              :     {
     536              :         char        buf[32];
     537              : 
     538         1175 :         snprintf(buf, sizeof(buf), "%d", CommitTsShmemBuffers());
     539         1175 :         SetConfigOption("commit_timestamp_buffers", buf, PGC_POSTMASTER,
     540              :                         PGC_S_DYNAMIC_DEFAULT);
     541              : 
     542              :         /*
     543              :          * We prefer to report this value's source as PGC_S_DYNAMIC_DEFAULT.
     544              :          * However, if the DBA explicitly set commit_timestamp_buffers = 0 in
     545              :          * the config file, then PGC_S_DYNAMIC_DEFAULT will fail to override
     546              :          * that and we must force the matter with PGC_S_OVERRIDE.
     547              :          */
     548         1175 :         if (commit_timestamp_buffers == 0)  /* failed to apply it? */
     549            0 :             SetConfigOption("commit_timestamp_buffers", buf, PGC_POSTMASTER,
     550              :                             PGC_S_OVERRIDE);
     551              :     }
     552              :     Assert(commit_timestamp_buffers != 0);
     553              : 
     554         1180 :     CommitTsCtl->PagePrecedes = CommitTsPagePrecedes;
     555         1180 :     CommitTsCtl->errdetail_for_io_error = commit_ts_errdetail_for_io_error;
     556         1180 :     SimpleLruInit(CommitTsCtl, "commit_timestamp", CommitTsShmemBuffers(), 0,
     557              :                   "pg_commit_ts", LWTRANCHE_COMMITTS_BUFFER,
     558              :                   LWTRANCHE_COMMITTS_SLRU,
     559              :                   SYNC_HANDLER_COMMIT_TS,
     560              :                   false);
     561              :     SlruPagePrecedesUnitTests(CommitTsCtl, COMMIT_TS_XACTS_PER_PAGE);
     562              : 
     563         1180 :     commitTsShared = ShmemInitStruct("CommitTs shared",
     564              :                                      sizeof(CommitTimestampShared),
     565              :                                      &found);
     566              : 
     567         1180 :     if (!IsUnderPostmaster)
     568              :     {
     569              :         Assert(!found);
     570              : 
     571         1180 :         commitTsShared->xidLastCommit = InvalidTransactionId;
     572         1180 :         TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
     573         1180 :         commitTsShared->dataLastCommit.nodeid = InvalidReplOriginId;
     574         1180 :         commitTsShared->commitTsActive = false;
     575              :     }
     576              :     else
     577              :         Assert(found);
     578         1180 : }
     579              : 
     580              : /*
     581              :  * GUC check_hook for commit_timestamp_buffers
     582              :  */
     583              : bool
     584         2397 : check_commit_ts_buffers(int *newval, void **extra, GucSource source)
     585              : {
     586         2397 :     return check_slru_buffers("commit_timestamp_buffers", newval);
     587              : }
     588              : 
     589              : /*
     590              :  * This function must be called ONCE on system install.
     591              :  *
     592              :  * (The CommitTs directory is assumed to have been created by initdb, and
     593              :  * CommitTsShmemInit must have been called already.)
     594              :  */
     595              : void
     596           51 : BootStrapCommitTs(void)
     597              : {
     598              :     /*
     599              :      * Nothing to do here at present, unlike most other SLRU modules; segments
     600              :      * are created when the server is started with this module enabled. See
     601              :      * ActivateCommitTs.
     602              :      */
     603           51 : }
     604              : 
     605              : /*
     606              :  * This must be called ONCE during postmaster or standalone-backend startup,
     607              :  * after StartupXLOG has initialized TransamVariables->nextXid.
     608              :  */
     609              : void
     610           14 : StartupCommitTs(void)
     611              : {
     612           14 :     ActivateCommitTs();
     613           14 : }
     614              : 
     615              : /*
     616              :  * This must be called ONCE during postmaster or standalone-backend startup,
     617              :  * after recovery has finished.
     618              :  */
     619              : void
     620          970 : CompleteCommitTsInitialization(void)
     621              : {
     622              :     /*
     623              :      * If the feature is not enabled, turn it off for good.  This also removes
     624              :      * any leftover data.
     625              :      *
     626              :      * Conversely, we activate the module if the feature is enabled.  This is
     627              :      * necessary for primary and standby as the activation depends on the
     628              :      * control file contents at the beginning of recovery or when a
     629              :      * XLOG_PARAMETER_CHANGE is replayed.
     630              :      */
     631          970 :     if (!track_commit_timestamp)
     632          948 :         DeactivateCommitTs();
     633              :     else
     634           22 :         ActivateCommitTs();
     635          970 : }
     636              : 
     637              : /*
     638              :  * Activate or deactivate CommitTs' upon reception of a XLOG_PARAMETER_CHANGE
     639              :  * XLog record during recovery.
     640              :  */
     641              : void
     642           38 : CommitTsParameterChange(bool newvalue, bool oldvalue)
     643              : {
     644              :     /*
     645              :      * If the commit_ts module is disabled in this server and we get word from
     646              :      * the primary server that it is enabled there, activate it so that we can
     647              :      * replay future WAL records involving it; also mark it as active on
     648              :      * pg_control.  If the old value was already set, we already did this, so
     649              :      * don't do anything.
     650              :      *
     651              :      * If the module is disabled in the primary, disable it here too, unless
     652              :      * the module is enabled locally.
     653              :      *
     654              :      * Note this only runs in the recovery process, so an unlocked read is
     655              :      * fine.
     656              :      */
     657           38 :     if (newvalue)
     658              :     {
     659            2 :         if (!commitTsShared->commitTsActive)
     660            0 :             ActivateCommitTs();
     661              :     }
     662           36 :     else if (commitTsShared->commitTsActive)
     663            1 :         DeactivateCommitTs();
     664           38 : }
     665              : 
     666              : /*
     667              :  * Activate this module whenever necessary.
     668              :  *      This must happen during postmaster or standalone-backend startup,
     669              :  *      or during WAL replay anytime the track_commit_timestamp setting is
     670              :  *      changed in the primary.
     671              :  *
     672              :  * The reason why this SLRU needs separate activation/deactivation functions is
     673              :  * that it can be enabled/disabled during start and the activation/deactivation
     674              :  * on the primary is propagated to the standby via replay. Other SLRUs don't
     675              :  * have this property and they can be just initialized during normal startup.
     676              :  *
     677              :  * This is in charge of creating the currently active segment, if it's not
     678              :  * already there.  The reason for this is that the server might have been
     679              :  * running with this module disabled for a while and thus might have skipped
     680              :  * the normal creation point.
     681              :  */
     682              : static void
     683           36 : ActivateCommitTs(void)
     684              : {
     685              :     TransactionId xid;
     686              :     int64       pageno;
     687              : 
     688              :     /*
     689              :      * During bootstrap, we should not register commit timestamps so skip the
     690              :      * activation in this case.
     691              :      */
     692           36 :     if (IsBootstrapProcessingMode())
     693            2 :         return;
     694              : 
     695              :     /* If we've done this already, there's nothing to do */
     696           34 :     LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
     697           34 :     if (commitTsShared->commitTsActive)
     698              :     {
     699            6 :         LWLockRelease(CommitTsLock);
     700            6 :         return;
     701              :     }
     702           28 :     LWLockRelease(CommitTsLock);
     703              : 
     704           28 :     xid = XidFromFullTransactionId(TransamVariables->nextXid);
     705           28 :     pageno = TransactionIdToCTsPage(xid);
     706              : 
     707              :     /*
     708              :      * Re-Initialize our idea of the latest page number.
     709              :      */
     710           28 :     pg_atomic_write_u64(&CommitTsCtl->shared->latest_page_number, pageno);
     711              : 
     712              :     /*
     713              :      * If CommitTs is enabled, but it wasn't in the previous server run, we
     714              :      * need to set the oldest and newest values to the next Xid; that way, we
     715              :      * will not try to read data that might not have been set.
     716              :      *
     717              :      * XXX does this have a problem if a server is started with commitTs
     718              :      * enabled, then started with commitTs disabled, then restarted with it
     719              :      * enabled again?  It doesn't look like it does, because there should be a
     720              :      * checkpoint that sets the value to InvalidTransactionId at end of
     721              :      * recovery; and so any chance of injecting new transactions without
     722              :      * CommitTs values would occur after the oldestCommitTsXid has been set to
     723              :      * Invalid temporarily.
     724              :      */
     725           28 :     LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
     726           28 :     if (TransamVariables->oldestCommitTsXid == InvalidTransactionId)
     727              :     {
     728           16 :         TransamVariables->oldestCommitTsXid =
     729           16 :             TransamVariables->newestCommitTsXid = ReadNextTransactionId();
     730              :     }
     731           28 :     LWLockRelease(CommitTsLock);
     732              : 
     733              :     /* Create the current segment file, if necessary */
     734           28 :     if (!SimpleLruDoesPhysicalPageExist(CommitTsCtl, pageno))
     735           14 :         SimpleLruZeroAndWritePage(CommitTsCtl, pageno);
     736              : 
     737              :     /* Change the activation status in shared memory. */
     738           28 :     LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
     739           28 :     commitTsShared->commitTsActive = true;
     740           28 :     LWLockRelease(CommitTsLock);
     741              : }
     742              : 
     743              : /*
     744              :  * Deactivate this module.
     745              :  *
     746              :  * This must be called when the track_commit_timestamp parameter is turned off.
     747              :  * This happens during postmaster or standalone-backend startup, or during WAL
     748              :  * replay.
     749              :  *
     750              :  * Resets CommitTs into invalid state to make sure we don't hand back
     751              :  * possibly-invalid data; also removes segments of old data.
     752              :  */
     753              : static void
     754          949 : DeactivateCommitTs(void)
     755              : {
     756              :     /*
     757              :      * Cleanup the status in the shared memory.
     758              :      *
     759              :      * We reset everything in the commitTsShared record to prevent user from
     760              :      * getting confusing data about last committed transaction on the standby
     761              :      * when the module was activated repeatedly on the primary.
     762              :      */
     763          949 :     LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
     764              : 
     765          949 :     commitTsShared->commitTsActive = false;
     766          949 :     commitTsShared->xidLastCommit = InvalidTransactionId;
     767          949 :     TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
     768          949 :     commitTsShared->dataLastCommit.nodeid = InvalidReplOriginId;
     769              : 
     770          949 :     TransamVariables->oldestCommitTsXid = InvalidTransactionId;
     771          949 :     TransamVariables->newestCommitTsXid = InvalidTransactionId;
     772              : 
     773              :     /*
     774              :      * Remove *all* files.  This is necessary so that there are no leftover
     775              :      * files; in the case where this feature is later enabled after running
     776              :      * with it disabled for some time there may be a gap in the file sequence.
     777              :      * (We can probably tolerate out-of-sequence files, as they are going to
     778              :      * be overwritten anyway when we wrap around, but it seems better to be
     779              :      * tidy.)
     780              :      *
     781              :      * Note that we do this with CommitTsLock acquired in exclusive mode. This
     782              :      * is very heavy-handed, but since this routine can only be called in the
     783              :      * replica and should happen very rarely, we don't worry too much about
     784              :      * it.  Note also that no process should be consulting this SLRU if we
     785              :      * have just deactivated it.
     786              :      */
     787          949 :     (void) SlruScanDirectory(CommitTsCtl, SlruScanDirCbDeleteAll, NULL);
     788              : 
     789          949 :     LWLockRelease(CommitTsLock);
     790          949 : }
     791              : 
     792              : /*
     793              :  * Perform a checkpoint --- either during shutdown, or on-the-fly
     794              :  */
     795              : void
     796         1835 : CheckPointCommitTs(void)
     797              : {
     798              :     /*
     799              :      * Write dirty CommitTs pages to disk.  This may result in sync requests
     800              :      * queued for later handling by ProcessSyncRequests(), as part of the
     801              :      * checkpoint.
     802              :      */
     803         1835 :     SimpleLruWriteAll(CommitTsCtl, true);
     804         1835 : }
     805              : 
     806              : /*
     807              :  * Make sure that CommitTs has room for a newly-allocated XID.
     808              :  *
     809              :  * NB: this is called while holding XidGenLock.  We want it to be very fast
     810              :  * most of the time; even when it's not so fast, no actual I/O need happen
     811              :  * unless we're forced to write out a dirty CommitTs or xlog page to make room
     812              :  * in shared memory.
     813              :  *
     814              :  * NB: the current implementation relies on track_commit_timestamp being
     815              :  * PGC_POSTMASTER.
     816              :  */
     817              : void
     818     24536200 : ExtendCommitTs(TransactionId newestXact)
     819              : {
     820              :     int64       pageno;
     821              :     LWLock     *lock;
     822              : 
     823              :     /*
     824              :      * Nothing to do if module not enabled.  Note we do an unlocked read of
     825              :      * the flag here, which is okay because this routine is only called from
     826              :      * GetNewTransactionId, which is never called in a standby.
     827              :      */
     828              :     Assert(!InRecovery);
     829     24536200 :     if (!commitTsShared->commitTsActive)
     830     24535331 :         return;
     831              : 
     832              :     /*
     833              :      * No work except at first XID of a page.  But beware: just after
     834              :      * wraparound, the first XID of page zero is FirstNormalTransactionId.
     835              :      */
     836          869 :     if (TransactionIdToCTsEntry(newestXact) != 0 &&
     837              :         !TransactionIdEquals(newestXact, FirstNormalTransactionId))
     838          868 :         return;
     839              : 
     840            1 :     pageno = TransactionIdToCTsPage(newestXact);
     841              : 
     842            1 :     lock = SimpleLruGetBankLock(CommitTsCtl, pageno);
     843              : 
     844            1 :     LWLockAcquire(lock, LW_EXCLUSIVE);
     845              : 
     846              :     /* Zero the page ... */
     847            1 :     SimpleLruZeroPage(CommitTsCtl, pageno);
     848              : 
     849              :     /* and make a WAL entry about that, unless we're in REDO */
     850            1 :     if (!InRecovery)
     851            1 :         XLogSimpleInsertInt64(RM_COMMIT_TS_ID, COMMIT_TS_ZEROPAGE, pageno);
     852              : 
     853            1 :     LWLockRelease(lock);
     854              : }
     855              : 
     856              : /*
     857              :  * Remove all CommitTs segments before the one holding the passed
     858              :  * transaction ID.
     859              :  *
     860              :  * Note that we don't need to flush XLOG here.
     861              :  */
     862              : void
     863         1060 : TruncateCommitTs(TransactionId oldestXact)
     864              : {
     865              :     int64       cutoffPage;
     866              : 
     867              :     /*
     868              :      * The cutoff point is the start of the segment containing oldestXact. We
     869              :      * pass the *page* containing oldestXact to SimpleLruTruncate.
     870              :      */
     871         1060 :     cutoffPage = TransactionIdToCTsPage(oldestXact);
     872              : 
     873              :     /* Check to see if there's any files that could be removed */
     874         1060 :     if (!SlruScanDirectory(CommitTsCtl, SlruScanDirCbReportPresence,
     875              :                            &cutoffPage))
     876         1060 :         return;                 /* nothing to remove */
     877              : 
     878              :     /* Write XLOG record */
     879            0 :     WriteTruncateXlogRec(cutoffPage, oldestXact);
     880              : 
     881              :     /* Now we can remove the old CommitTs segment(s) */
     882            0 :     SimpleLruTruncate(CommitTsCtl, cutoffPage);
     883              : }
     884              : 
     885              : /*
     886              :  * Set the limit values between which commit TS can be consulted.
     887              :  */
     888              : void
     889         1084 : SetCommitTsLimit(TransactionId oldestXact, TransactionId newestXact)
     890              : {
     891              :     /*
     892              :      * Be careful not to overwrite values that are either further into the
     893              :      * "future" or signal a disabled committs.
     894              :      */
     895         1084 :     LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
     896         1084 :     if (TransamVariables->oldestCommitTsXid != InvalidTransactionId)
     897              :     {
     898            0 :         if (TransactionIdPrecedes(TransamVariables->oldestCommitTsXid, oldestXact))
     899            0 :             TransamVariables->oldestCommitTsXid = oldestXact;
     900            0 :         if (TransactionIdPrecedes(newestXact, TransamVariables->newestCommitTsXid))
     901            0 :             TransamVariables->newestCommitTsXid = newestXact;
     902              :     }
     903              :     else
     904              :     {
     905              :         Assert(TransamVariables->newestCommitTsXid == InvalidTransactionId);
     906         1084 :         TransamVariables->oldestCommitTsXid = oldestXact;
     907         1084 :         TransamVariables->newestCommitTsXid = newestXact;
     908              :     }
     909         1084 :     LWLockRelease(CommitTsLock);
     910         1084 : }
     911              : 
     912              : /*
     913              :  * Move forwards the oldest commitTS value that can be consulted
     914              :  */
     915              : void
     916         1060 : AdvanceOldestCommitTsXid(TransactionId oldestXact)
     917              : {
     918         1060 :     LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
     919         1061 :     if (TransamVariables->oldestCommitTsXid != InvalidTransactionId &&
     920            1 :         TransactionIdPrecedes(TransamVariables->oldestCommitTsXid, oldestXact))
     921            1 :         TransamVariables->oldestCommitTsXid = oldestXact;
     922         1060 :     LWLockRelease(CommitTsLock);
     923         1060 : }
     924              : 
     925              : 
     926              : /*
     927              :  * Decide whether a commitTS page number is "older" for truncation purposes.
     928              :  * Analogous to CLOGPagePrecedes().
     929              :  *
     930              :  * At default BLCKSZ, (1 << 31) % COMMIT_TS_XACTS_PER_PAGE == 128.  This
     931              :  * introduces differences compared to CLOG and the other SLRUs having (1 <<
     932              :  * 31) % per_page == 0.  This function never tests exactly
     933              :  * TransactionIdPrecedes(x-2^31, x).  When the system reaches xidStopLimit,
     934              :  * there are two possible counts of page boundaries between oldestXact and the
     935              :  * latest XID assigned, depending on whether oldestXact is within the first
     936              :  * 128 entries of its page.  Since this function doesn't know the location of
     937              :  * oldestXact within page2, it returns false for one page that actually is
     938              :  * expendable.  This is a wider (yet still negligible) version of the
     939              :  * truncation opportunity that CLOGPagePrecedes() cannot recognize.
     940              :  *
     941              :  * For the sake of a worked example, number entries with decimal values such
     942              :  * that page1==1 entries range from 1.0 to 1.999.  Let N+0.15 be the number of
     943              :  * pages that 2^31 entries will span (N is an integer).  If oldestXact=N+2.1,
     944              :  * then the final safe XID assignment leaves newestXact=1.95.  We keep page 2,
     945              :  * because entry=2.85 is the border that toggles whether entries precede the
     946              :  * last entry of the oldestXact page.  While page 2 is expendable at
     947              :  * oldestXact=N+2.1, it would be precious at oldestXact=N+2.9.
     948              :  */
     949              : static bool
     950            1 : CommitTsPagePrecedes(int64 page1, int64 page2)
     951              : {
     952              :     TransactionId xid1;
     953              :     TransactionId xid2;
     954              : 
     955            1 :     xid1 = ((TransactionId) page1) * COMMIT_TS_XACTS_PER_PAGE;
     956            1 :     xid1 += FirstNormalTransactionId + 1;
     957            1 :     xid2 = ((TransactionId) page2) * COMMIT_TS_XACTS_PER_PAGE;
     958            1 :     xid2 += FirstNormalTransactionId + 1;
     959              : 
     960            1 :     return (TransactionIdPrecedes(xid1, xid2) &&
     961            0 :             TransactionIdPrecedes(xid1, xid2 + COMMIT_TS_XACTS_PER_PAGE - 1));
     962              : }
     963              : 
     964              : static int
     965            0 : commit_ts_errdetail_for_io_error(const void *opaque_data)
     966              : {
     967            0 :     TransactionId xid = *(const TransactionId *) opaque_data;
     968              : 
     969            0 :     return errdetail("Could not access commit timestamp of transaction %u.", xid);
     970              : }
     971              : 
     972              : /*
     973              :  * Write a TRUNCATE xlog record
     974              :  */
     975              : static void
     976            0 : WriteTruncateXlogRec(int64 pageno, TransactionId oldestXid)
     977              : {
     978              :     xl_commit_ts_truncate xlrec;
     979              : 
     980            0 :     xlrec.pageno = pageno;
     981            0 :     xlrec.oldestXid = oldestXid;
     982              : 
     983            0 :     XLogBeginInsert();
     984            0 :     XLogRegisterData(&xlrec, SizeOfCommitTsTruncate);
     985            0 :     (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_TRUNCATE);
     986            0 : }
     987              : 
     988              : /*
     989              :  * CommitTS resource manager's routines
     990              :  */
     991              : void
     992            0 : commit_ts_redo(XLogReaderState *record)
     993              : {
     994            0 :     uint8       info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
     995              : 
     996              :     /* Backup blocks are not used in commit_ts records */
     997              :     Assert(!XLogRecHasAnyBlockRefs(record));
     998              : 
     999            0 :     if (info == COMMIT_TS_ZEROPAGE)
    1000              :     {
    1001              :         int64       pageno;
    1002              : 
    1003            0 :         memcpy(&pageno, XLogRecGetData(record), sizeof(pageno));
    1004            0 :         SimpleLruZeroAndWritePage(CommitTsCtl, pageno);
    1005              :     }
    1006            0 :     else if (info == COMMIT_TS_TRUNCATE)
    1007              :     {
    1008            0 :         xl_commit_ts_truncate *trunc = (xl_commit_ts_truncate *) XLogRecGetData(record);
    1009              : 
    1010            0 :         AdvanceOldestCommitTsXid(trunc->oldestXid);
    1011              : 
    1012              :         /*
    1013              :          * During XLOG replay, latest_page_number isn't set up yet; insert a
    1014              :          * suitable value to bypass the sanity test in SimpleLruTruncate.
    1015              :          */
    1016            0 :         pg_atomic_write_u64(&CommitTsCtl->shared->latest_page_number,
    1017            0 :                             trunc->pageno);
    1018              : 
    1019            0 :         SimpleLruTruncate(CommitTsCtl, trunc->pageno);
    1020              :     }
    1021              :     else
    1022            0 :         elog(PANIC, "commit_ts_redo: unknown op code %u", info);
    1023            0 : }
    1024              : 
    1025              : /*
    1026              :  * Entrypoint for sync.c to sync commit_ts files.
    1027              :  */
    1028              : int
    1029            0 : committssyncfiletag(const FileTag *ftag, char *path)
    1030              : {
    1031            0 :     return SlruSyncFileTag(CommitTsCtl, ftag, path);
    1032              : }
        

Generated by: LCOV version 2.0-1