LCOV - code coverage report
Current view: top level - src/backend/access/transam - commit_ts.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 216 282 76.6 %
Date: 2025-01-18 04:15:08 Functions: 26 31 83.9 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * commit_ts.c
       4             :  *      PostgreSQL commit timestamp manager
       5             :  *
       6             :  * This module is a pg_xact-like system that stores the commit timestamp
       7             :  * for each transaction.
       8             :  *
       9             :  * XLOG interactions: this module generates an XLOG record whenever a new
      10             :  * CommitTs page is initialized to zeroes.  Other writes of CommitTS come
      11             :  * from recording of transaction commit in xact.c, which generates its own
      12             :  * XLOG records for these events and will re-perform the status update on
      13             :  * redo; so we need make no additional XLOG entry here.
      14             :  *
      15             :  * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
      16             :  * Portions Copyright (c) 1994, Regents of the University of California
      17             :  *
      18             :  * src/backend/access/transam/commit_ts.c
      19             :  *
      20             :  *-------------------------------------------------------------------------
      21             :  */
      22             : #include "postgres.h"
      23             : 
      24             : #include "access/commit_ts.h"
      25             : #include "access/htup_details.h"
      26             : #include "access/slru.h"
      27             : #include "access/transam.h"
      28             : #include "access/xloginsert.h"
      29             : #include "access/xlogutils.h"
      30             : #include "funcapi.h"
      31             : #include "miscadmin.h"
      32             : #include "storage/shmem.h"
      33             : #include "utils/fmgrprotos.h"
      34             : #include "utils/guc_hooks.h"
      35             : #include "utils/timestamp.h"
      36             : 
      37             : /*
      38             :  * Defines for CommitTs page sizes.  A page is the same BLCKSZ as is used
      39             :  * everywhere else in Postgres.
      40             :  *
      41             :  * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
      42             :  * CommitTs page numbering also wraps around at
      43             :  * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE, and CommitTs segment numbering at
      44             :  * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT.  We need take no
      45             :  * explicit notice of that fact in this module, except when comparing segment
      46             :  * and page numbers in TruncateCommitTs (see CommitTsPagePrecedes).
      47             :  */
      48             : 
      49             : /*
      50             :  * We need 8+2 bytes per xact.  Note that enlarging this struct might mean
      51             :  * the largest possible file name is more than 5 chars long; see
      52             :  * SlruScanDirectory.
      53             :  */
      54             : typedef struct CommitTimestampEntry
      55             : {
      56             :     TimestampTz time;
      57             :     RepOriginId nodeid;
      58             : } CommitTimestampEntry;
      59             : 
      60             : #define SizeOfCommitTimestampEntry (offsetof(CommitTimestampEntry, nodeid) + \
      61             :                                     sizeof(RepOriginId))
      62             : 
      63             : #define COMMIT_TS_XACTS_PER_PAGE \
      64             :     (BLCKSZ / SizeOfCommitTimestampEntry)
      65             : 
      66             : 
      67             : /*
      68             :  * Although we return an int64 the actual value can't currently exceed
      69             :  * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE.
      70             :  */
      71             : static inline int64
      72        1398 : TransactionIdToCTsPage(TransactionId xid)
      73             : {
      74        1398 :     return xid / (int64) COMMIT_TS_XACTS_PER_PAGE;
      75             : }
      76             : 
      77             : #define TransactionIdToCTsEntry(xid)    \
      78             :     ((xid) % (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
      79             : 
      80             : /*
      81             :  * Link to shared-memory data structures for CommitTs control
      82             :  */
      83             : static SlruCtlData CommitTsCtlData;
      84             : 
      85             : #define CommitTsCtl (&CommitTsCtlData)
      86             : 
      87             : /*
      88             :  * We keep a cache of the last value set in shared memory.
      89             :  *
      90             :  * This is also good place to keep the activation status.  We keep this
      91             :  * separate from the GUC so that the standby can activate the module if the
      92             :  * primary has it active independently of the value of the GUC.
      93             :  *
      94             :  * This is protected by CommitTsLock.  In some places, we use commitTsActive
      95             :  * without acquiring the lock; where this happens, a comment explains the
      96             :  * rationale for it.
      97             :  */
      98             : typedef struct CommitTimestampShared
      99             : {
     100             :     TransactionId xidLastCommit;
     101             :     CommitTimestampEntry dataLastCommit;
     102             :     bool        commitTsActive;
     103             : } CommitTimestampShared;
     104             : 
     105             : static CommitTimestampShared *commitTsShared;
     106             : 
     107             : 
     108             : /* GUC variable */
     109             : bool        track_commit_timestamp;
     110             : 
     111             : static void SetXidCommitTsInPage(TransactionId xid, int nsubxids,
     112             :                                  TransactionId *subxids, TimestampTz ts,
     113             :                                  RepOriginId nodeid, int64 pageno);
     114             : static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
     115             :                                      RepOriginId nodeid, int slotno);
     116             : static void error_commit_ts_disabled(void);
     117             : static int  ZeroCommitTsPage(int64 pageno, bool writeXlog);
     118             : static bool CommitTsPagePrecedes(int64 page1, int64 page2);
     119             : static void ActivateCommitTs(void);
     120             : static void DeactivateCommitTs(void);
     121             : static void WriteZeroPageXlogRec(int64 pageno);
     122             : static void WriteTruncateXlogRec(int64 pageno, TransactionId oldestXid);
     123             : 
     124             : /*
     125             :  * TransactionTreeSetCommitTsData
     126             :  *
     127             :  * Record the final commit timestamp of transaction entries in the commit log
     128             :  * for a transaction and its subtransaction tree, as efficiently as possible.
     129             :  *
     130             :  * xid is the top level transaction id.
     131             :  *
     132             :  * subxids is an array of xids of length nsubxids, representing subtransactions
     133             :  * in the tree of xid. In various cases nsubxids may be zero.
     134             :  * The reason why tracking just the parent xid commit timestamp is not enough
     135             :  * is that the subtrans SLRU does not stay valid across crashes (it's not
     136             :  * permanent) so we need to keep the information about them here. If the
     137             :  * subtrans implementation changes in the future, we might want to revisit the
     138             :  * decision of storing timestamp info for each subxid.
     139             :  */
     140             : void
     141      275762 : TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids,
     142             :                                TransactionId *subxids, TimestampTz timestamp,
     143             :                                RepOriginId nodeid)
     144             : {
     145             :     int         i;
     146             :     TransactionId headxid;
     147             :     TransactionId newestXact;
     148             : 
     149             :     /*
     150             :      * No-op if the module is not active.
     151             :      *
     152             :      * An unlocked read here is fine, because in a standby (the only place
     153             :      * where the flag can change in flight) this routine is only called by the
     154             :      * recovery process, which is also the only process which can change the
     155             :      * flag.
     156             :      */
     157      275762 :     if (!commitTsShared->commitTsActive)
     158      275556 :         return;
     159             : 
     160             :     /*
     161             :      * Figure out the latest Xid in this batch: either the last subxid if
     162             :      * there's any, otherwise the parent xid.
     163             :      */
     164         206 :     if (nsubxids > 0)
     165           0 :         newestXact = subxids[nsubxids - 1];
     166             :     else
     167         206 :         newestXact = xid;
     168             : 
     169             :     /*
     170             :      * We split the xids to set the timestamp to in groups belonging to the
     171             :      * same SLRU page; the first element in each such set is its head.  The
     172             :      * first group has the main XID as the head; subsequent sets use the first
     173             :      * subxid not on the previous page as head.  This way, we only have to
     174             :      * lock/modify each SLRU page once.
     175             :      */
     176         206 :     headxid = xid;
     177         206 :     i = 0;
     178             :     for (;;)
     179           0 :     {
     180         206 :         int64       pageno = TransactionIdToCTsPage(headxid);
     181             :         int         j;
     182             : 
     183         206 :         for (j = i; j < nsubxids; j++)
     184             :         {
     185           0 :             if (TransactionIdToCTsPage(subxids[j]) != pageno)
     186           0 :                 break;
     187             :         }
     188             :         /* subxids[i..j] are on the same page as the head */
     189             : 
     190         206 :         SetXidCommitTsInPage(headxid, j - i, subxids + i, timestamp, nodeid,
     191             :                              pageno);
     192             : 
     193             :         /* if we wrote out all subxids, we're done. */
     194         206 :         if (j >= nsubxids)
     195         206 :             break;
     196             : 
     197             :         /*
     198             :          * Set the new head and skip over it, as well as over the subxids we
     199             :          * just wrote.
     200             :          */
     201           0 :         headxid = subxids[j];
     202           0 :         i = j + 1;
     203             :     }
     204             : 
     205             :     /* update the cached value in shared memory */
     206         206 :     LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
     207         206 :     commitTsShared->xidLastCommit = xid;
     208         206 :     commitTsShared->dataLastCommit.time = timestamp;
     209         206 :     commitTsShared->dataLastCommit.nodeid = nodeid;
     210             : 
     211             :     /* and move forwards our endpoint, if needed */
     212         206 :     if (TransactionIdPrecedes(TransamVariables->newestCommitTsXid, newestXact))
     213         180 :         TransamVariables->newestCommitTsXid = newestXact;
     214         206 :     LWLockRelease(CommitTsLock);
     215             : }
     216             : 
     217             : /*
     218             :  * Record the commit timestamp of transaction entries in the commit log for all
     219             :  * entries on a single page.  Atomic only on this page.
     220             :  */
     221             : static void
     222         206 : SetXidCommitTsInPage(TransactionId xid, int nsubxids,
     223             :                      TransactionId *subxids, TimestampTz ts,
     224             :                      RepOriginId nodeid, int64 pageno)
     225             : {
     226         206 :     LWLock     *lock = SimpleLruGetBankLock(CommitTsCtl, pageno);
     227             :     int         slotno;
     228             :     int         i;
     229             : 
     230         206 :     LWLockAcquire(lock, LW_EXCLUSIVE);
     231             : 
     232         206 :     slotno = SimpleLruReadPage(CommitTsCtl, pageno, true, xid);
     233             : 
     234         206 :     TransactionIdSetCommitTs(xid, ts, nodeid, slotno);
     235         206 :     for (i = 0; i < nsubxids; i++)
     236           0 :         TransactionIdSetCommitTs(subxids[i], ts, nodeid, slotno);
     237             : 
     238         206 :     CommitTsCtl->shared->page_dirty[slotno] = true;
     239             : 
     240         206 :     LWLockRelease(lock);
     241         206 : }
     242             : 
     243             : /*
     244             :  * Sets the commit timestamp of a single transaction.
     245             :  *
     246             :  * Caller must hold the correct SLRU bank lock, will be held at exit
     247             :  */
     248             : static void
     249         206 : TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
     250             :                          RepOriginId nodeid, int slotno)
     251             : {
     252         206 :     int         entryno = TransactionIdToCTsEntry(xid);
     253             :     CommitTimestampEntry entry;
     254             : 
     255             :     Assert(TransactionIdIsNormal(xid));
     256             : 
     257         206 :     entry.time = ts;
     258         206 :     entry.nodeid = nodeid;
     259             : 
     260         206 :     memcpy(CommitTsCtl->shared->page_buffer[slotno] +
     261         206 :            SizeOfCommitTimestampEntry * entryno,
     262             :            &entry, SizeOfCommitTimestampEntry);
     263         206 : }
     264             : 
     265             : /*
     266             :  * Interrogate the commit timestamp of a transaction.
     267             :  *
     268             :  * The return value indicates whether a commit timestamp record was found for
     269             :  * the given xid.  The timestamp value is returned in *ts (which may not be
     270             :  * null), and the origin node for the Xid is returned in *nodeid, if it's not
     271             :  * null.
     272             :  */
     273             : bool
     274          82 : TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
     275             :                              RepOriginId *nodeid)
     276             : {
     277          82 :     int64       pageno = TransactionIdToCTsPage(xid);
     278          82 :     int         entryno = TransactionIdToCTsEntry(xid);
     279             :     int         slotno;
     280             :     CommitTimestampEntry entry;
     281             :     TransactionId oldestCommitTsXid;
     282             :     TransactionId newestCommitTsXid;
     283             : 
     284          82 :     if (!TransactionIdIsValid(xid))
     285           6 :         ereport(ERROR,
     286             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     287             :                  errmsg("cannot retrieve commit timestamp for transaction %u", xid)));
     288          76 :     else if (!TransactionIdIsNormal(xid))
     289             :     {
     290             :         /* frozen and bootstrap xids are always committed far in the past */
     291          12 :         *ts = 0;
     292          12 :         if (nodeid)
     293           4 :             *nodeid = 0;
     294          12 :         return false;
     295             :     }
     296             : 
     297          64 :     LWLockAcquire(CommitTsLock, LW_SHARED);
     298             : 
     299             :     /* Error if module not enabled */
     300          64 :     if (!commitTsShared->commitTsActive)
     301           6 :         error_commit_ts_disabled();
     302             : 
     303             :     /*
     304             :      * If we're asked for the cached value, return that.  Otherwise, fall
     305             :      * through to read from SLRU.
     306             :      */
     307          58 :     if (commitTsShared->xidLastCommit == xid)
     308             :     {
     309          30 :         *ts = commitTsShared->dataLastCommit.time;
     310          30 :         if (nodeid)
     311          14 :             *nodeid = commitTsShared->dataLastCommit.nodeid;
     312             : 
     313          30 :         LWLockRelease(CommitTsLock);
     314          30 :         return *ts != 0;
     315             :     }
     316             : 
     317          28 :     oldestCommitTsXid = TransamVariables->oldestCommitTsXid;
     318          28 :     newestCommitTsXid = TransamVariables->newestCommitTsXid;
     319             :     /* neither is invalid, or both are */
     320             :     Assert(TransactionIdIsValid(oldestCommitTsXid) == TransactionIdIsValid(newestCommitTsXid));
     321          28 :     LWLockRelease(CommitTsLock);
     322             : 
     323             :     /*
     324             :      * Return empty if the requested value is outside our valid range.
     325             :      */
     326          56 :     if (!TransactionIdIsValid(oldestCommitTsXid) ||
     327          50 :         TransactionIdPrecedes(xid, oldestCommitTsXid) ||
     328          22 :         TransactionIdPrecedes(newestCommitTsXid, xid))
     329             :     {
     330           6 :         *ts = 0;
     331           6 :         if (nodeid)
     332           0 :             *nodeid = InvalidRepOriginId;
     333           6 :         return false;
     334             :     }
     335             : 
     336             :     /* lock is acquired by SimpleLruReadPage_ReadOnly */
     337          22 :     slotno = SimpleLruReadPage_ReadOnly(CommitTsCtl, pageno, xid);
     338          22 :     memcpy(&entry,
     339          22 :            CommitTsCtl->shared->page_buffer[slotno] +
     340          22 :            SizeOfCommitTimestampEntry * entryno,
     341             :            SizeOfCommitTimestampEntry);
     342             : 
     343          22 :     *ts = entry.time;
     344          22 :     if (nodeid)
     345           8 :         *nodeid = entry.nodeid;
     346             : 
     347          22 :     LWLockRelease(SimpleLruGetBankLock(CommitTsCtl, pageno));
     348          22 :     return *ts != 0;
     349             : }
     350             : 
     351             : /*
     352             :  * Return the Xid of the latest committed transaction.  (As far as this module
     353             :  * is concerned, anyway; it's up to the caller to ensure the value is useful
     354             :  * for its purposes.)
     355             :  *
     356             :  * ts and nodeid are filled with the corresponding data; they can be passed
     357             :  * as NULL if not wanted.
     358             :  */
     359             : TransactionId
     360           6 : GetLatestCommitTsData(TimestampTz *ts, RepOriginId *nodeid)
     361             : {
     362             :     TransactionId xid;
     363             : 
     364           6 :     LWLockAcquire(CommitTsLock, LW_SHARED);
     365             : 
     366             :     /* Error if module not enabled */
     367           6 :     if (!commitTsShared->commitTsActive)
     368           0 :         error_commit_ts_disabled();
     369             : 
     370           6 :     xid = commitTsShared->xidLastCommit;
     371           6 :     if (ts)
     372           6 :         *ts = commitTsShared->dataLastCommit.time;
     373           6 :     if (nodeid)
     374           6 :         *nodeid = commitTsShared->dataLastCommit.nodeid;
     375           6 :     LWLockRelease(CommitTsLock);
     376             : 
     377           6 :     return xid;
     378             : }
     379             : 
     380             : static void
     381           6 : error_commit_ts_disabled(void)
     382             : {
     383           6 :     ereport(ERROR,
     384             :             (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     385             :              errmsg("could not get commit timestamp data"),
     386             :              RecoveryInProgress() ?
     387             :              errhint("Make sure the configuration parameter \"%s\" is set on the primary server.",
     388             :                      "track_commit_timestamp") :
     389             :              errhint("Make sure the configuration parameter \"%s\" is set.",
     390             :                      "track_commit_timestamp")));
     391             : }
     392             : 
     393             : /*
     394             :  * SQL-callable wrapper to obtain commit time of a transaction
     395             :  */
     396             : Datum
     397          54 : pg_xact_commit_timestamp(PG_FUNCTION_ARGS)
     398             : {
     399          54 :     TransactionId xid = PG_GETARG_TRANSACTIONID(0);
     400             :     TimestampTz ts;
     401             :     bool        found;
     402             : 
     403          54 :     found = TransactionIdGetCommitTsData(xid, &ts, NULL);
     404             : 
     405          44 :     if (!found)
     406          14 :         PG_RETURN_NULL();
     407             : 
     408          30 :     PG_RETURN_TIMESTAMPTZ(ts);
     409             : }
     410             : 
     411             : 
     412             : /*
     413             :  * pg_last_committed_xact
     414             :  *
     415             :  * SQL-callable wrapper to obtain some information about the latest
     416             :  * committed transaction: transaction ID, timestamp and replication
     417             :  * origin.
     418             :  */
     419             : Datum
     420           6 : pg_last_committed_xact(PG_FUNCTION_ARGS)
     421             : {
     422             :     TransactionId xid;
     423             :     RepOriginId nodeid;
     424             :     TimestampTz ts;
     425             :     Datum       values[3];
     426             :     bool        nulls[3];
     427             :     TupleDesc   tupdesc;
     428             :     HeapTuple   htup;
     429             : 
     430             :     /* and construct a tuple with our data */
     431           6 :     xid = GetLatestCommitTsData(&ts, &nodeid);
     432             : 
     433           6 :     if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
     434           0 :         elog(ERROR, "return type must be a row type");
     435             : 
     436           6 :     if (!TransactionIdIsNormal(xid))
     437             :     {
     438           0 :         memset(nulls, true, sizeof(nulls));
     439             :     }
     440             :     else
     441             :     {
     442           6 :         values[0] = TransactionIdGetDatum(xid);
     443           6 :         nulls[0] = false;
     444             : 
     445           6 :         values[1] = TimestampTzGetDatum(ts);
     446           6 :         nulls[1] = false;
     447             : 
     448           6 :         values[2] = ObjectIdGetDatum((Oid) nodeid);
     449           6 :         nulls[2] = false;
     450             :     }
     451             : 
     452           6 :     htup = heap_form_tuple(tupdesc, values, nulls);
     453             : 
     454           6 :     PG_RETURN_DATUM(HeapTupleGetDatum(htup));
     455             : }
     456             : 
     457             : /*
     458             :  * pg_xact_commit_timestamp_origin
     459             :  *
     460             :  * SQL-callable wrapper to obtain commit timestamp and replication origin
     461             :  * of a given transaction.
     462             :  */
     463             : Datum
     464          10 : pg_xact_commit_timestamp_origin(PG_FUNCTION_ARGS)
     465             : {
     466          10 :     TransactionId xid = PG_GETARG_TRANSACTIONID(0);
     467             :     RepOriginId nodeid;
     468             :     TimestampTz ts;
     469             :     Datum       values[2];
     470             :     bool        nulls[2];
     471             :     TupleDesc   tupdesc;
     472             :     HeapTuple   htup;
     473             :     bool        found;
     474             : 
     475          10 :     found = TransactionIdGetCommitTsData(xid, &ts, &nodeid);
     476             : 
     477           8 :     if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
     478           0 :         elog(ERROR, "return type must be a row type");
     479             : 
     480           8 :     if (!found)
     481             :     {
     482           4 :         memset(nulls, true, sizeof(nulls));
     483             :     }
     484             :     else
     485             :     {
     486           4 :         values[0] = TimestampTzGetDatum(ts);
     487           4 :         nulls[0] = false;
     488             : 
     489           4 :         values[1] = ObjectIdGetDatum((Oid) nodeid);
     490           4 :         nulls[1] = false;
     491             :     }
     492             : 
     493           8 :     htup = heap_form_tuple(tupdesc, values, nulls);
     494             : 
     495           8 :     PG_RETURN_DATUM(HeapTupleGetDatum(htup));
     496             : }
     497             : 
     498             : /*
     499             :  * Number of shared CommitTS buffers.
     500             :  *
     501             :  * If asked to autotune, use 2MB for every 1GB of shared buffers, up to 8MB.
     502             :  * Otherwise just cap the configured amount to be between 16 and the maximum
     503             :  * allowed.
     504             :  */
     505             : static int
     506        7392 : CommitTsShmemBuffers(void)
     507             : {
     508             :     /* auto-tune based on shared buffers */
     509        7392 :     if (commit_timestamp_buffers == 0)
     510        5464 :         return SimpleLruAutotuneBuffers(512, 1024);
     511             : 
     512        1928 :     return Min(Max(16, commit_timestamp_buffers), SLRU_MAX_ALLOWED_BUFFERS);
     513             : }
     514             : 
     515             : /*
     516             :  * Shared memory sizing for CommitTs
     517             :  */
     518             : Size
     519        3566 : CommitTsShmemSize(void)
     520             : {
     521        3566 :     return SimpleLruShmemSize(CommitTsShmemBuffers(), 0) +
     522             :         sizeof(CommitTimestampShared);
     523             : }
     524             : 
     525             : /*
     526             :  * Initialize CommitTs at system startup (postmaster start or standalone
     527             :  * backend)
     528             :  */
     529             : void
     530        1918 : CommitTsShmemInit(void)
     531             : {
     532             :     bool        found;
     533             : 
     534             :     /* If auto-tuning is requested, now is the time to do it */
     535        1918 :     if (commit_timestamp_buffers == 0)
     536             :     {
     537             :         char        buf[32];
     538             : 
     539        1908 :         snprintf(buf, sizeof(buf), "%d", CommitTsShmemBuffers());
     540        1908 :         SetConfigOption("commit_timestamp_buffers", buf, PGC_POSTMASTER,
     541             :                         PGC_S_DYNAMIC_DEFAULT);
     542             : 
     543             :         /*
     544             :          * We prefer to report this value's source as PGC_S_DYNAMIC_DEFAULT.
     545             :          * However, if the DBA explicitly set commit_timestamp_buffers = 0 in
     546             :          * the config file, then PGC_S_DYNAMIC_DEFAULT will fail to override
     547             :          * that and we must force the matter with PGC_S_OVERRIDE.
     548             :          */
     549        1908 :         if (commit_timestamp_buffers == 0)  /* failed to apply it? */
     550           0 :             SetConfigOption("commit_timestamp_buffers", buf, PGC_POSTMASTER,
     551             :                             PGC_S_OVERRIDE);
     552             :     }
     553             :     Assert(commit_timestamp_buffers != 0);
     554             : 
     555        1918 :     CommitTsCtl->PagePrecedes = CommitTsPagePrecedes;
     556        1918 :     SimpleLruInit(CommitTsCtl, "commit_timestamp", CommitTsShmemBuffers(), 0,
     557             :                   "pg_commit_ts", LWTRANCHE_COMMITTS_BUFFER,
     558             :                   LWTRANCHE_COMMITTS_SLRU,
     559             :                   SYNC_HANDLER_COMMIT_TS,
     560             :                   false);
     561             :     SlruPagePrecedesUnitTests(CommitTsCtl, COMMIT_TS_XACTS_PER_PAGE);
     562             : 
     563        1918 :     commitTsShared = ShmemInitStruct("CommitTs shared",
     564             :                                      sizeof(CommitTimestampShared),
     565             :                                      &found);
     566             : 
     567        1918 :     if (!IsUnderPostmaster)
     568             :     {
     569             :         Assert(!found);
     570             : 
     571        1918 :         commitTsShared->xidLastCommit = InvalidTransactionId;
     572        1918 :         TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
     573        1918 :         commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
     574        1918 :         commitTsShared->commitTsActive = false;
     575             :     }
     576             :     else
     577             :         Assert(found);
     578        1918 : }
     579             : 
     580             : /*
     581             :  * GUC check_hook for commit_timestamp_buffers
     582             :  */
     583             : bool
     584        3890 : check_commit_ts_buffers(int *newval, void **extra, GucSource source)
     585             : {
     586        3890 :     return check_slru_buffers("commit_timestamp_buffers", newval);
     587             : }
     588             : 
     589             : /*
     590             :  * This function must be called ONCE on system install.
     591             :  *
     592             :  * (The CommitTs directory is assumed to have been created by initdb, and
     593             :  * CommitTsShmemInit must have been called already.)
     594             :  */
     595             : void
     596          90 : BootStrapCommitTs(void)
     597             : {
     598             :     /*
     599             :      * Nothing to do here at present, unlike most other SLRU modules; segments
     600             :      * are created when the server is started with this module enabled. See
     601             :      * ActivateCommitTs.
     602             :      */
     603          90 : }
     604             : 
     605             : /*
     606             :  * Initialize (or reinitialize) a page of CommitTs to zeroes.
     607             :  * If writeXlog is true, also emit an XLOG record saying we did this.
     608             :  *
     609             :  * The page is not actually written, just set up in shared memory.
     610             :  * The slot number of the new page is returned.
     611             :  *
     612             :  * Control lock must be held at entry, and will be held at exit.
     613             :  */
     614             : static int
     615          24 : ZeroCommitTsPage(int64 pageno, bool writeXlog)
     616             : {
     617             :     int         slotno;
     618             : 
     619          24 :     slotno = SimpleLruZeroPage(CommitTsCtl, pageno);
     620             : 
     621          24 :     if (writeXlog)
     622           0 :         WriteZeroPageXlogRec(pageno);
     623             : 
     624          24 :     return slotno;
     625             : }
     626             : 
     627             : /*
     628             :  * This must be called ONCE during postmaster or standalone-backend startup,
     629             :  * after StartupXLOG has initialized TransamVariables->nextXid.
     630             :  */
     631             : void
     632          20 : StartupCommitTs(void)
     633             : {
     634          20 :     ActivateCommitTs();
     635          20 : }
     636             : 
     637             : /*
     638             :  * This must be called ONCE during postmaster or standalone-backend startup,
     639             :  * after recovery has finished.
     640             :  */
     641             : void
     642        1544 : CompleteCommitTsInitialization(void)
     643             : {
     644             :     /*
     645             :      * If the feature is not enabled, turn it off for good.  This also removes
     646             :      * any leftover data.
     647             :      *
     648             :      * Conversely, we activate the module if the feature is enabled.  This is
     649             :      * necessary for primary and standby as the activation depends on the
     650             :      * control file contents at the beginning of recovery or when a
     651             :      * XLOG_PARAMETER_CHANGE is replayed.
     652             :      */
     653        1544 :     if (!track_commit_timestamp)
     654        1510 :         DeactivateCommitTs();
     655             :     else
     656          34 :         ActivateCommitTs();
     657        1544 : }
     658             : 
     659             : /*
     660             :  * Activate or deactivate CommitTs' upon reception of a XLOG_PARAMETER_CHANGE
     661             :  * XLog record during recovery.
     662             :  */
     663             : void
     664          60 : CommitTsParameterChange(bool newvalue, bool oldvalue)
     665             : {
     666             :     /*
     667             :      * If the commit_ts module is disabled in this server and we get word from
     668             :      * the primary server that it is enabled there, activate it so that we can
     669             :      * replay future WAL records involving it; also mark it as active on
     670             :      * pg_control.  If the old value was already set, we already did this, so
     671             :      * don't do anything.
     672             :      *
     673             :      * If the module is disabled in the primary, disable it here too, unless
     674             :      * the module is enabled locally.
     675             :      *
     676             :      * Note this only runs in the recovery process, so an unlocked read is
     677             :      * fine.
     678             :      */
     679          60 :     if (newvalue)
     680             :     {
     681           4 :         if (!commitTsShared->commitTsActive)
     682           0 :             ActivateCommitTs();
     683             :     }
     684          56 :     else if (commitTsShared->commitTsActive)
     685           2 :         DeactivateCommitTs();
     686          60 : }
     687             : 
     688             : /*
     689             :  * Activate this module whenever necessary.
     690             :  *      This must happen during postmaster or standalone-backend startup,
     691             :  *      or during WAL replay anytime the track_commit_timestamp setting is
     692             :  *      changed in the primary.
     693             :  *
     694             :  * The reason why this SLRU needs separate activation/deactivation functions is
     695             :  * that it can be enabled/disabled during start and the activation/deactivation
     696             :  * on the primary is propagated to the standby via replay. Other SLRUs don't
     697             :  * have this property and they can be just initialized during normal startup.
     698             :  *
     699             :  * This is in charge of creating the currently active segment, if it's not
     700             :  * already there.  The reason for this is that the server might have been
     701             :  * running with this module disabled for a while and thus might have skipped
     702             :  * the normal creation point.
     703             :  */
     704             : static void
     705          54 : ActivateCommitTs(void)
     706             : {
     707             :     TransactionId xid;
     708             :     int64       pageno;
     709             : 
     710             :     /* If we've done this already, there's nothing to do */
     711          54 :     LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
     712          54 :     if (commitTsShared->commitTsActive)
     713             :     {
     714           6 :         LWLockRelease(CommitTsLock);
     715           6 :         return;
     716             :     }
     717          48 :     LWLockRelease(CommitTsLock);
     718             : 
     719          48 :     xid = XidFromFullTransactionId(TransamVariables->nextXid);
     720          48 :     pageno = TransactionIdToCTsPage(xid);
     721             : 
     722             :     /*
     723             :      * Re-Initialize our idea of the latest page number.
     724             :      */
     725          48 :     pg_atomic_write_u64(&CommitTsCtl->shared->latest_page_number, pageno);
     726             : 
     727             :     /*
     728             :      * If CommitTs is enabled, but it wasn't in the previous server run, we
     729             :      * need to set the oldest and newest values to the next Xid; that way, we
     730             :      * will not try to read data that might not have been set.
     731             :      *
     732             :      * XXX does this have a problem if a server is started with commitTs
     733             :      * enabled, then started with commitTs disabled, then restarted with it
     734             :      * enabled again?  It doesn't look like it does, because there should be a
     735             :      * checkpoint that sets the value to InvalidTransactionId at end of
     736             :      * recovery; and so any chance of injecting new transactions without
     737             :      * CommitTs values would occur after the oldestCommitTsXid has been set to
     738             :      * Invalid temporarily.
     739             :      */
     740          48 :     LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
     741          48 :     if (TransamVariables->oldestCommitTsXid == InvalidTransactionId)
     742             :     {
     743          30 :         TransamVariables->oldestCommitTsXid =
     744          30 :             TransamVariables->newestCommitTsXid = ReadNextTransactionId();
     745             :     }
     746          48 :     LWLockRelease(CommitTsLock);
     747             : 
     748             :     /* Create the current segment file, if necessary */
     749          48 :     if (!SimpleLruDoesPhysicalPageExist(CommitTsCtl, pageno))
     750             :     {
     751          24 :         LWLock     *lock = SimpleLruGetBankLock(CommitTsCtl, pageno);
     752             :         int         slotno;
     753             : 
     754          24 :         LWLockAcquire(lock, LW_EXCLUSIVE);
     755          24 :         slotno = ZeroCommitTsPage(pageno, false);
     756          24 :         SimpleLruWritePage(CommitTsCtl, slotno);
     757             :         Assert(!CommitTsCtl->shared->page_dirty[slotno]);
     758          24 :         LWLockRelease(lock);
     759             :     }
     760             : 
     761             :     /* Change the activation status in shared memory. */
     762          48 :     LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
     763          48 :     commitTsShared->commitTsActive = true;
     764          48 :     LWLockRelease(CommitTsLock);
     765             : }
     766             : 
     767             : /*
     768             :  * Deactivate this module.
     769             :  *
     770             :  * This must be called when the track_commit_timestamp parameter is turned off.
     771             :  * This happens during postmaster or standalone-backend startup, or during WAL
     772             :  * replay.
     773             :  *
     774             :  * Resets CommitTs into invalid state to make sure we don't hand back
     775             :  * possibly-invalid data; also removes segments of old data.
     776             :  */
     777             : static void
     778        1512 : DeactivateCommitTs(void)
     779             : {
     780             :     /*
     781             :      * Cleanup the status in the shared memory.
     782             :      *
     783             :      * We reset everything in the commitTsShared record to prevent user from
     784             :      * getting confusing data about last committed transaction on the standby
     785             :      * when the module was activated repeatedly on the primary.
     786             :      */
     787        1512 :     LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
     788             : 
     789        1512 :     commitTsShared->commitTsActive = false;
     790        1512 :     commitTsShared->xidLastCommit = InvalidTransactionId;
     791        1512 :     TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
     792        1512 :     commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
     793             : 
     794        1512 :     TransamVariables->oldestCommitTsXid = InvalidTransactionId;
     795        1512 :     TransamVariables->newestCommitTsXid = InvalidTransactionId;
     796             : 
     797             :     /*
     798             :      * Remove *all* files.  This is necessary so that there are no leftover
     799             :      * files; in the case where this feature is later enabled after running
     800             :      * with it disabled for some time there may be a gap in the file sequence.
     801             :      * (We can probably tolerate out-of-sequence files, as they are going to
     802             :      * be overwritten anyway when we wrap around, but it seems better to be
     803             :      * tidy.)
     804             :      *
     805             :      * Note that we do this with CommitTsLock acquired in exclusive mode. This
     806             :      * is very heavy-handed, but since this routine can only be called in the
     807             :      * replica and should happen very rarely, we don't worry too much about
     808             :      * it.  Note also that no process should be consulting this SLRU if we
     809             :      * have just deactivated it.
     810             :      */
     811        1512 :     (void) SlruScanDirectory(CommitTsCtl, SlruScanDirCbDeleteAll, NULL);
     812             : 
     813        1512 :     LWLockRelease(CommitTsLock);
     814        1512 : }
     815             : 
     816             : /*
     817             :  * Perform a checkpoint --- either during shutdown, or on-the-fly
     818             :  */
     819             : void
     820        2476 : CheckPointCommitTs(void)
     821             : {
     822             :     /*
     823             :      * Write dirty CommitTs pages to disk.  This may result in sync requests
     824             :      * queued for later handling by ProcessSyncRequests(), as part of the
     825             :      * checkpoint.
     826             :      */
     827        2476 :     SimpleLruWriteAll(CommitTsCtl, true);
     828        2476 : }
     829             : 
     830             : /*
     831             :  * Make sure that CommitTs has room for a newly-allocated XID.
     832             :  *
     833             :  * NB: this is called while holding XidGenLock.  We want it to be very fast
     834             :  * most of the time; even when it's not so fast, no actual I/O need happen
     835             :  * unless we're forced to write out a dirty CommitTs or xlog page to make room
     836             :  * in shared memory.
     837             :  *
     838             :  * NB: the current implementation relies on track_commit_timestamp being
     839             :  * PGC_POSTMASTER.
     840             :  */
     841             : void
     842    48977616 : ExtendCommitTs(TransactionId newestXact)
     843             : {
     844             :     int64       pageno;
     845             :     LWLock     *lock;
     846             : 
     847             :     /*
     848             :      * Nothing to do if module not enabled.  Note we do an unlocked read of
     849             :      * the flag here, which is okay because this routine is only called from
     850             :      * GetNewTransactionId, which is never called in a standby.
     851             :      */
     852             :     Assert(!InRecovery);
     853    48977616 :     if (!commitTsShared->commitTsActive)
     854    48977428 :         return;
     855             : 
     856             :     /*
     857             :      * No work except at first XID of a page.  But beware: just after
     858             :      * wraparound, the first XID of page zero is FirstNormalTransactionId.
     859             :      */
     860         188 :     if (TransactionIdToCTsEntry(newestXact) != 0 &&
     861             :         !TransactionIdEquals(newestXact, FirstNormalTransactionId))
     862         188 :         return;
     863             : 
     864           0 :     pageno = TransactionIdToCTsPage(newestXact);
     865             : 
     866           0 :     lock = SimpleLruGetBankLock(CommitTsCtl, pageno);
     867             : 
     868           0 :     LWLockAcquire(lock, LW_EXCLUSIVE);
     869             : 
     870             :     /* Zero the page and make an XLOG entry about it */
     871           0 :     ZeroCommitTsPage(pageno, !InRecovery);
     872             : 
     873           0 :     LWLockRelease(lock);
     874             : }
     875             : 
     876             : /*
     877             :  * Remove all CommitTs segments before the one holding the passed
     878             :  * transaction ID.
     879             :  *
     880             :  * Note that we don't need to flush XLOG here.
     881             :  */
     882             : void
     883        1062 : TruncateCommitTs(TransactionId oldestXact)
     884             : {
     885             :     int64       cutoffPage;
     886             : 
     887             :     /*
     888             :      * The cutoff point is the start of the segment containing oldestXact. We
     889             :      * pass the *page* containing oldestXact to SimpleLruTruncate.
     890             :      */
     891        1062 :     cutoffPage = TransactionIdToCTsPage(oldestXact);
     892             : 
     893             :     /* Check to see if there's any files that could be removed */
     894        1062 :     if (!SlruScanDirectory(CommitTsCtl, SlruScanDirCbReportPresence,
     895             :                            &cutoffPage))
     896        1062 :         return;                 /* nothing to remove */
     897             : 
     898             :     /* Write XLOG record */
     899           0 :     WriteTruncateXlogRec(cutoffPage, oldestXact);
     900             : 
     901             :     /* Now we can remove the old CommitTs segment(s) */
     902           0 :     SimpleLruTruncate(CommitTsCtl, cutoffPage);
     903             : }
     904             : 
     905             : /*
     906             :  * Set the limit values between which commit TS can be consulted.
     907             :  */
     908             : void
     909        1740 : SetCommitTsLimit(TransactionId oldestXact, TransactionId newestXact)
     910             : {
     911             :     /*
     912             :      * Be careful not to overwrite values that are either further into the
     913             :      * "future" or signal a disabled committs.
     914             :      */
     915        1740 :     LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
     916        1740 :     if (TransamVariables->oldestCommitTsXid != InvalidTransactionId)
     917             :     {
     918           0 :         if (TransactionIdPrecedes(TransamVariables->oldestCommitTsXid, oldestXact))
     919           0 :             TransamVariables->oldestCommitTsXid = oldestXact;
     920           0 :         if (TransactionIdPrecedes(newestXact, TransamVariables->newestCommitTsXid))
     921           0 :             TransamVariables->newestCommitTsXid = newestXact;
     922             :     }
     923             :     else
     924             :     {
     925             :         Assert(TransamVariables->newestCommitTsXid == InvalidTransactionId);
     926        1740 :         TransamVariables->oldestCommitTsXid = oldestXact;
     927        1740 :         TransamVariables->newestCommitTsXid = newestXact;
     928             :     }
     929        1740 :     LWLockRelease(CommitTsLock);
     930        1740 : }
     931             : 
     932             : /*
     933             :  * Move forwards the oldest commitTS value that can be consulted
     934             :  */
     935             : void
     936        1062 : AdvanceOldestCommitTsXid(TransactionId oldestXact)
     937             : {
     938        1062 :     LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
     939        1062 :     if (TransamVariables->oldestCommitTsXid != InvalidTransactionId &&
     940           0 :         TransactionIdPrecedes(TransamVariables->oldestCommitTsXid, oldestXact))
     941           0 :         TransamVariables->oldestCommitTsXid = oldestXact;
     942        1062 :     LWLockRelease(CommitTsLock);
     943        1062 : }
     944             : 
     945             : 
     946             : /*
     947             :  * Decide whether a commitTS page number is "older" for truncation purposes.
     948             :  * Analogous to CLOGPagePrecedes().
     949             :  *
     950             :  * At default BLCKSZ, (1 << 31) % COMMIT_TS_XACTS_PER_PAGE == 128.  This
     951             :  * introduces differences compared to CLOG and the other SLRUs having (1 <<
     952             :  * 31) % per_page == 0.  This function never tests exactly
     953             :  * TransactionIdPrecedes(x-2^31, x).  When the system reaches xidStopLimit,
     954             :  * there are two possible counts of page boundaries between oldestXact and the
     955             :  * latest XID assigned, depending on whether oldestXact is within the first
     956             :  * 128 entries of its page.  Since this function doesn't know the location of
     957             :  * oldestXact within page2, it returns false for one page that actually is
     958             :  * expendable.  This is a wider (yet still negligible) version of the
     959             :  * truncation opportunity that CLOGPagePrecedes() cannot recognize.
     960             :  *
     961             :  * For the sake of a worked example, number entries with decimal values such
     962             :  * that page1==1 entries range from 1.0 to 1.999.  Let N+0.15 be the number of
     963             :  * pages that 2^31 entries will span (N is an integer).  If oldestXact=N+2.1,
     964             :  * then the final safe XID assignment leaves newestXact=1.95.  We keep page 2,
     965             :  * because entry=2.85 is the border that toggles whether entries precede the
     966             :  * last entry of the oldestXact page.  While page 2 is expendable at
     967             :  * oldestXact=N+2.1, it would be precious at oldestXact=N+2.9.
     968             :  */
     969             : static bool
     970           0 : CommitTsPagePrecedes(int64 page1, int64 page2)
     971             : {
     972             :     TransactionId xid1;
     973             :     TransactionId xid2;
     974             : 
     975           0 :     xid1 = ((TransactionId) page1) * COMMIT_TS_XACTS_PER_PAGE;
     976           0 :     xid1 += FirstNormalTransactionId + 1;
     977           0 :     xid2 = ((TransactionId) page2) * COMMIT_TS_XACTS_PER_PAGE;
     978           0 :     xid2 += FirstNormalTransactionId + 1;
     979             : 
     980           0 :     return (TransactionIdPrecedes(xid1, xid2) &&
     981           0 :             TransactionIdPrecedes(xid1, xid2 + COMMIT_TS_XACTS_PER_PAGE - 1));
     982             : }
     983             : 
     984             : 
     985             : /*
     986             :  * Write a ZEROPAGE xlog record
     987             :  */
     988             : static void
     989           0 : WriteZeroPageXlogRec(int64 pageno)
     990             : {
     991           0 :     XLogBeginInsert();
     992           0 :     XLogRegisterData((char *) (&pageno), sizeof(pageno));
     993           0 :     (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_ZEROPAGE);
     994           0 : }
     995             : 
     996             : /*
     997             :  * Write a TRUNCATE xlog record
     998             :  */
     999             : static void
    1000           0 : WriteTruncateXlogRec(int64 pageno, TransactionId oldestXid)
    1001             : {
    1002             :     xl_commit_ts_truncate xlrec;
    1003             : 
    1004           0 :     xlrec.pageno = pageno;
    1005           0 :     xlrec.oldestXid = oldestXid;
    1006             : 
    1007           0 :     XLogBeginInsert();
    1008           0 :     XLogRegisterData((char *) (&xlrec), SizeOfCommitTsTruncate);
    1009           0 :     (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_TRUNCATE);
    1010           0 : }
    1011             : 
    1012             : /*
    1013             :  * CommitTS resource manager's routines
    1014             :  */
    1015             : void
    1016           0 : commit_ts_redo(XLogReaderState *record)
    1017             : {
    1018           0 :     uint8       info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
    1019             : 
    1020             :     /* Backup blocks are not used in commit_ts records */
    1021             :     Assert(!XLogRecHasAnyBlockRefs(record));
    1022             : 
    1023           0 :     if (info == COMMIT_TS_ZEROPAGE)
    1024             :     {
    1025             :         int64       pageno;
    1026             :         int         slotno;
    1027             :         LWLock     *lock;
    1028             : 
    1029           0 :         memcpy(&pageno, XLogRecGetData(record), sizeof(pageno));
    1030             : 
    1031           0 :         lock = SimpleLruGetBankLock(CommitTsCtl, pageno);
    1032           0 :         LWLockAcquire(lock, LW_EXCLUSIVE);
    1033             : 
    1034           0 :         slotno = ZeroCommitTsPage(pageno, false);
    1035           0 :         SimpleLruWritePage(CommitTsCtl, slotno);
    1036             :         Assert(!CommitTsCtl->shared->page_dirty[slotno]);
    1037             : 
    1038           0 :         LWLockRelease(lock);
    1039             :     }
    1040           0 :     else if (info == COMMIT_TS_TRUNCATE)
    1041             :     {
    1042           0 :         xl_commit_ts_truncate *trunc = (xl_commit_ts_truncate *) XLogRecGetData(record);
    1043             : 
    1044           0 :         AdvanceOldestCommitTsXid(trunc->oldestXid);
    1045             : 
    1046             :         /*
    1047             :          * During XLOG replay, latest_page_number isn't set up yet; insert a
    1048             :          * suitable value to bypass the sanity test in SimpleLruTruncate.
    1049             :          */
    1050           0 :         pg_atomic_write_u64(&CommitTsCtl->shared->latest_page_number,
    1051           0 :                             trunc->pageno);
    1052             : 
    1053           0 :         SimpleLruTruncate(CommitTsCtl, trunc->pageno);
    1054             :     }
    1055             :     else
    1056           0 :         elog(PANIC, "commit_ts_redo: unknown op code %u", info);
    1057           0 : }
    1058             : 
    1059             : /*
    1060             :  * Entrypoint for sync.c to sync commit_ts files.
    1061             :  */
    1062             : int
    1063           0 : committssyncfiletag(const FileTag *ftag, char *path)
    1064             : {
    1065           0 :     return SlruSyncFileTag(CommitTsCtl, ftag, path);
    1066             : }

Generated by: LCOV version 1.14