LCOV - code coverage report
Current view: top level - src/backend/access/transam - commit_ts.c (source / functions) Hit Total Coverage
Test: PostgreSQL 13devel Lines: 200 282 70.9 %
Date: 2019-11-15 22:06:47 Functions: 24 29 82.8 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * commit_ts.c
       4             :  *      PostgreSQL commit timestamp manager
       5             :  *
       6             :  * This module is a pg_xact-like system that stores the commit timestamp
       7             :  * for each transaction.
       8             :  *
       9             :  * XLOG interactions: this module generates an XLOG record whenever a new
      10             :  * CommitTs page is initialized to zeroes.  Also, one XLOG record is
      11             :  * generated for setting of values when the caller requests it; this allows
      12             :  * us to support values coming from places other than transaction commit.
      13             :  * Other writes of CommitTS come from recording of transaction commit in
      14             :  * xact.c, which generates its own XLOG records for these events and will
      15             :  * re-perform the status update on redo; so we need make no additional XLOG
      16             :  * entry here.
      17             :  *
      18             :  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
      19             :  * Portions Copyright (c) 1994, Regents of the University of California
      20             :  *
      21             :  * src/backend/access/transam/commit_ts.c
      22             :  *
      23             :  *-------------------------------------------------------------------------
      24             :  */
      25             : #include "postgres.h"
      26             : 
      27             : #include "access/commit_ts.h"
      28             : #include "access/htup_details.h"
      29             : #include "access/slru.h"
      30             : #include "access/transam.h"
      31             : #include "catalog/pg_type.h"
      32             : #include "funcapi.h"
      33             : #include "miscadmin.h"
      34             : #include "pg_trace.h"
      35             : #include "storage/shmem.h"
      36             : #include "utils/builtins.h"
      37             : #include "utils/snapmgr.h"
      38             : #include "utils/timestamp.h"
      39             : 
      40             : /*
      41             :  * Defines for CommitTs page sizes.  A page is the same BLCKSZ as is used
      42             :  * everywhere else in Postgres.
      43             :  *
      44             :  * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
      45             :  * CommitTs page numbering also wraps around at
      46             :  * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE, and CommitTs segment numbering at
      47             :  * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT.  We need take no
      48             :  * explicit notice of that fact in this module, except when comparing segment
      49             :  * and page numbers in TruncateCommitTs (see CommitTsPagePrecedes).
      50             :  */
      51             : 
      52             : /*
      53             :  * We need 8+2 bytes per xact.  Note that enlarging this struct might mean
      54             :  * the largest possible file name is more than 5 chars long; see
      55             :  * SlruScanDirectory.
      56             :  */
      57             : typedef struct CommitTimestampEntry
      58             : {
      59             :     TimestampTz time;
      60             :     RepOriginId nodeid;
      61             : } CommitTimestampEntry;
      62             : 
      63             : #define SizeOfCommitTimestampEntry (offsetof(CommitTimestampEntry, nodeid) + \
      64             :                                     sizeof(RepOriginId))
      65             : 
      66             : #define COMMIT_TS_XACTS_PER_PAGE \
      67             :     (BLCKSZ / SizeOfCommitTimestampEntry)
      68             : 
      69             : #define TransactionIdToCTsPage(xid) \
      70             :     ((xid) / (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
      71             : #define TransactionIdToCTsEntry(xid)    \
      72             :     ((xid) % (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
      73             : 
      74             : /*
      75             :  * Link to shared-memory data structures for CommitTs control
      76             :  */
      77             : static SlruCtlData CommitTsCtlData;
      78             : 
      79             : #define CommitTsCtl (&CommitTsCtlData)
      80             : 
      81             : /*
      82             :  * We keep a cache of the last value set in shared memory.
      83             :  *
      84             :  * This is also good place to keep the activation status.  We keep this
      85             :  * separate from the GUC so that the standby can activate the module if the
      86             :  * primary has it active independently of the value of the GUC.
      87             :  *
      88             :  * This is protected by CommitTsLock.  In some places, we use commitTsActive
      89             :  * without acquiring the lock; where this happens, a comment explains the
      90             :  * rationale for it.
      91             :  */
      92             : typedef struct CommitTimestampShared
      93             : {
      94             :     TransactionId xidLastCommit;
      95             :     CommitTimestampEntry dataLastCommit;
      96             :     bool        commitTsActive;
      97             : } CommitTimestampShared;
      98             : 
      99             : CommitTimestampShared *commitTsShared;
     100             : 
     101             : 
     102             : /* GUC variable */
     103             : bool        track_commit_timestamp;
     104             : 
     105             : static void SetXidCommitTsInPage(TransactionId xid, int nsubxids,
     106             :                                  TransactionId *subxids, TimestampTz ts,
     107             :                                  RepOriginId nodeid, int pageno);
     108             : static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
     109             :                                      RepOriginId nodeid, int slotno);
     110             : static void error_commit_ts_disabled(void);
     111             : static int  ZeroCommitTsPage(int pageno, bool writeXlog);
     112             : static bool CommitTsPagePrecedes(int page1, int page2);
     113             : static void ActivateCommitTs(void);
     114             : static void DeactivateCommitTs(void);
     115             : static void WriteZeroPageXlogRec(int pageno);
     116             : static void WriteTruncateXlogRec(int pageno, TransactionId oldestXid);
     117             : static void WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids,
     118             :                                      TransactionId *subxids, TimestampTz timestamp,
     119             :                                      RepOriginId nodeid);
     120             : 
     121             : /*
     122             :  * TransactionTreeSetCommitTsData
     123             :  *
     124             :  * Record the final commit timestamp of transaction entries in the commit log
     125             :  * for a transaction and its subtransaction tree, as efficiently as possible.
     126             :  *
     127             :  * xid is the top level transaction id.
     128             :  *
     129             :  * subxids is an array of xids of length nsubxids, representing subtransactions
     130             :  * in the tree of xid. In various cases nsubxids may be zero.
     131             :  * The reason why tracking just the parent xid commit timestamp is not enough
     132             :  * is that the subtrans SLRU does not stay valid across crashes (it's not
     133             :  * permanent) so we need to keep the information about them here. If the
     134             :  * subtrans implementation changes in the future, we might want to revisit the
     135             :  * decision of storing timestamp info for each subxid.
     136             :  *
     137             :  * The write_xlog parameter tells us whether to include an XLog record of this
     138             :  * or not.  Normally, this is called from transaction commit routines (both
     139             :  * normal and prepared) and the information will be stored in the transaction
     140             :  * commit XLog record, and so they should pass "false" for this.  The XLog redo
     141             :  * code should use "false" here as well.  Other callers probably want to pass
     142             :  * true, so that the given values persist in case of crashes.
     143             :  */
     144             : void
     145      250552 : TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids,
     146             :                                TransactionId *subxids, TimestampTz timestamp,
     147             :                                RepOriginId nodeid, bool write_xlog)
     148             : {
     149             :     int         i;
     150             :     TransactionId headxid;
     151             :     TransactionId newestXact;
     152             : 
     153             :     /*
     154             :      * No-op if the module is not active.
     155             :      *
     156             :      * An unlocked read here is fine, because in a standby (the only place
     157             :      * where the flag can change in flight) this routine is only called by the
     158             :      * recovery process, which is also the only process which can change the
     159             :      * flag.
     160             :      */
     161      250552 :     if (!commitTsShared->commitTsActive)
     162      250464 :         return;
     163             : 
     164             :     /*
     165             :      * Comply with the WAL-before-data rule: if caller specified it wants this
     166             :      * value to be recorded in WAL, do so before touching the data.
     167             :      */
     168          88 :     if (write_xlog)
     169           0 :         WriteSetTimestampXlogRec(xid, nsubxids, subxids, timestamp, nodeid);
     170             : 
     171             :     /*
     172             :      * Figure out the latest Xid in this batch: either the last subxid if
     173             :      * there's any, otherwise the parent xid.
     174             :      */
     175          88 :     if (nsubxids > 0)
     176           0 :         newestXact = subxids[nsubxids - 1];
     177             :     else
     178          88 :         newestXact = xid;
     179             : 
     180             :     /*
     181             :      * We split the xids to set the timestamp to in groups belonging to the
     182             :      * same SLRU page; the first element in each such set is its head.  The
     183             :      * first group has the main XID as the head; subsequent sets use the first
     184             :      * subxid not on the previous page as head.  This way, we only have to
     185             :      * lock/modify each SLRU page once.
     186             :      */
     187          88 :     for (i = 0, headxid = xid;;)
     188           0 :     {
     189          88 :         int         pageno = TransactionIdToCTsPage(headxid);
     190             :         int         j;
     191             : 
     192          88 :         for (j = i; j < nsubxids; j++)
     193             :         {
     194           0 :             if (TransactionIdToCTsPage(subxids[j]) != pageno)
     195           0 :                 break;
     196             :         }
     197             :         /* subxids[i..j] are on the same page as the head */
     198             : 
     199          88 :         SetXidCommitTsInPage(headxid, j - i, subxids + i, timestamp, nodeid,
     200             :                              pageno);
     201             : 
     202             :         /* if we wrote out all subxids, we're done. */
     203          88 :         if (j + 1 >= nsubxids)
     204          88 :             break;
     205             : 
     206             :         /*
     207             :          * Set the new head and skip over it, as well as over the subxids we
     208             :          * just wrote.
     209             :          */
     210           0 :         headxid = subxids[j];
     211           0 :         i += j - i + 1;
     212             :     }
     213             : 
     214             :     /* update the cached value in shared memory */
     215          88 :     LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
     216          88 :     commitTsShared->xidLastCommit = xid;
     217          88 :     commitTsShared->dataLastCommit.time = timestamp;
     218          88 :     commitTsShared->dataLastCommit.nodeid = nodeid;
     219             : 
     220             :     /* and move forwards our endpoint, if needed */
     221          88 :     if (TransactionIdPrecedes(ShmemVariableCache->newestCommitTsXid, newestXact))
     222          70 :         ShmemVariableCache->newestCommitTsXid = newestXact;
     223          88 :     LWLockRelease(CommitTsLock);
     224             : }
     225             : 
     226             : /*
     227             :  * Record the commit timestamp of transaction entries in the commit log for all
     228             :  * entries on a single page.  Atomic only on this page.
     229             :  */
     230             : static void
     231          88 : SetXidCommitTsInPage(TransactionId xid, int nsubxids,
     232             :                      TransactionId *subxids, TimestampTz ts,
     233             :                      RepOriginId nodeid, int pageno)
     234             : {
     235             :     int         slotno;
     236             :     int         i;
     237             : 
     238          88 :     LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
     239             : 
     240          88 :     slotno = SimpleLruReadPage(CommitTsCtl, pageno, true, xid);
     241             : 
     242          88 :     TransactionIdSetCommitTs(xid, ts, nodeid, slotno);
     243          88 :     for (i = 0; i < nsubxids; i++)
     244           0 :         TransactionIdSetCommitTs(subxids[i], ts, nodeid, slotno);
     245             : 
     246          88 :     CommitTsCtl->shared->page_dirty[slotno] = true;
     247             : 
     248          88 :     LWLockRelease(CommitTsControlLock);
     249          88 : }
     250             : 
     251             : /*
     252             :  * Sets the commit timestamp of a single transaction.
     253             :  *
     254             :  * Must be called with CommitTsControlLock held
     255             :  */
     256             : static void
     257          88 : TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
     258             :                          RepOriginId nodeid, int slotno)
     259             : {
     260          88 :     int         entryno = TransactionIdToCTsEntry(xid);
     261             :     CommitTimestampEntry entry;
     262             : 
     263             :     Assert(TransactionIdIsNormal(xid));
     264             : 
     265          88 :     entry.time = ts;
     266          88 :     entry.nodeid = nodeid;
     267             : 
     268         176 :     memcpy(CommitTsCtl->shared->page_buffer[slotno] +
     269          88 :            SizeOfCommitTimestampEntry * entryno,
     270             :            &entry, SizeOfCommitTimestampEntry);
     271          88 : }
     272             : 
     273             : /*
     274             :  * Interrogate the commit timestamp of a transaction.
     275             :  *
     276             :  * The return value indicates whether a commit timestamp record was found for
     277             :  * the given xid.  The timestamp value is returned in *ts (which may not be
     278             :  * null), and the origin node for the Xid is returned in *nodeid, if it's not
     279             :  * null.
     280             :  */
     281             : bool
     282          54 : TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
     283             :                              RepOriginId *nodeid)
     284             : {
     285          54 :     int         pageno = TransactionIdToCTsPage(xid);
     286          54 :     int         entryno = TransactionIdToCTsEntry(xid);
     287             :     int         slotno;
     288             :     CommitTimestampEntry entry;
     289             :     TransactionId oldestCommitTsXid;
     290             :     TransactionId newestCommitTsXid;
     291             : 
     292          54 :     if (!TransactionIdIsValid(xid))
     293           4 :         ereport(ERROR,
     294             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     295             :                  errmsg("cannot retrieve commit timestamp for transaction %u", xid)));
     296          50 :     else if (!TransactionIdIsNormal(xid))
     297             :     {
     298             :         /* frozen and bootstrap xids are always committed far in the past */
     299           8 :         *ts = 0;
     300           8 :         if (nodeid)
     301           0 :             *nodeid = 0;
     302           8 :         return false;
     303             :     }
     304             : 
     305          42 :     LWLockAcquire(CommitTsLock, LW_SHARED);
     306             : 
     307             :     /* Error if module not enabled */
     308          42 :     if (!commitTsShared->commitTsActive)
     309           6 :         error_commit_ts_disabled();
     310             : 
     311             :     /*
     312             :      * If we're asked for the cached value, return that.  Otherwise, fall
     313             :      * through to read from SLRU.
     314             :      */
     315          36 :     if (commitTsShared->xidLastCommit == xid)
     316             :     {
     317          16 :         *ts = commitTsShared->dataLastCommit.time;
     318          16 :         if (nodeid)
     319           0 :             *nodeid = commitTsShared->dataLastCommit.nodeid;
     320             : 
     321          16 :         LWLockRelease(CommitTsLock);
     322          16 :         return *ts != 0;
     323             :     }
     324             : 
     325          20 :     oldestCommitTsXid = ShmemVariableCache->oldestCommitTsXid;
     326          20 :     newestCommitTsXid = ShmemVariableCache->newestCommitTsXid;
     327             :     /* neither is invalid, or both are */
     328             :     Assert(TransactionIdIsValid(oldestCommitTsXid) == TransactionIdIsValid(newestCommitTsXid));
     329          20 :     LWLockRelease(CommitTsLock);
     330             : 
     331             :     /*
     332             :      * Return empty if the requested value is outside our valid range.
     333             :      */
     334          40 :     if (!TransactionIdIsValid(oldestCommitTsXid) ||
     335          34 :         TransactionIdPrecedes(xid, oldestCommitTsXid) ||
     336          14 :         TransactionIdPrecedes(newestCommitTsXid, xid))
     337             :     {
     338           6 :         *ts = 0;
     339           6 :         if (nodeid)
     340           0 :             *nodeid = InvalidRepOriginId;
     341           6 :         return false;
     342             :     }
     343             : 
     344             :     /* lock is acquired by SimpleLruReadPage_ReadOnly */
     345          14 :     slotno = SimpleLruReadPage_ReadOnly(CommitTsCtl, pageno, xid);
     346          14 :     memcpy(&entry,
     347          28 :            CommitTsCtl->shared->page_buffer[slotno] +
     348          14 :            SizeOfCommitTimestampEntry * entryno,
     349             :            SizeOfCommitTimestampEntry);
     350             : 
     351          14 :     *ts = entry.time;
     352          14 :     if (nodeid)
     353           0 :         *nodeid = entry.nodeid;
     354             : 
     355          14 :     LWLockRelease(CommitTsControlLock);
     356          14 :     return *ts != 0;
     357             : }
     358             : 
     359             : /*
     360             :  * Return the Xid of the latest committed transaction.  (As far as this module
     361             :  * is concerned, anyway; it's up to the caller to ensure the value is useful
     362             :  * for its purposes.)
     363             :  *
     364             :  * ts and extra are filled with the corresponding data; they can be passed
     365             :  * as NULL if not wanted.
     366             :  */
     367             : TransactionId
     368           2 : GetLatestCommitTsData(TimestampTz *ts, RepOriginId *nodeid)
     369             : {
     370             :     TransactionId xid;
     371             : 
     372           2 :     LWLockAcquire(CommitTsLock, LW_SHARED);
     373             : 
     374             :     /* Error if module not enabled */
     375           2 :     if (!commitTsShared->commitTsActive)
     376           0 :         error_commit_ts_disabled();
     377             : 
     378           2 :     xid = commitTsShared->xidLastCommit;
     379           2 :     if (ts)
     380           2 :         *ts = commitTsShared->dataLastCommit.time;
     381           2 :     if (nodeid)
     382           0 :         *nodeid = commitTsShared->dataLastCommit.nodeid;
     383           2 :     LWLockRelease(CommitTsLock);
     384             : 
     385           2 :     return xid;
     386             : }
     387             : 
     388             : static void
     389           6 : error_commit_ts_disabled(void)
     390             : {
     391           6 :     ereport(ERROR,
     392             :             (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     393             :              errmsg("could not get commit timestamp data"),
     394             :              RecoveryInProgress() ?
     395             :              errhint("Make sure the configuration parameter \"%s\" is set on the master server.",
     396             :                      "track_commit_timestamp") :
     397             :              errhint("Make sure the configuration parameter \"%s\" is set.",
     398             :                      "track_commit_timestamp")));
     399             : }
     400             : 
     401             : /*
     402             :  * SQL-callable wrapper to obtain commit time of a transaction
     403             :  */
     404             : Datum
     405          54 : pg_xact_commit_timestamp(PG_FUNCTION_ARGS)
     406             : {
     407          54 :     TransactionId xid = PG_GETARG_UINT32(0);
     408             :     TimestampTz ts;
     409             :     bool        found;
     410             : 
     411          54 :     found = TransactionIdGetCommitTsData(xid, &ts, NULL);
     412             : 
     413          44 :     if (!found)
     414          14 :         PG_RETURN_NULL();
     415             : 
     416          30 :     PG_RETURN_TIMESTAMPTZ(ts);
     417             : }
     418             : 
     419             : 
     420             : Datum
     421           2 : pg_last_committed_xact(PG_FUNCTION_ARGS)
     422             : {
     423             :     TransactionId xid;
     424             :     TimestampTz ts;
     425             :     Datum       values[2];
     426             :     bool        nulls[2];
     427             :     TupleDesc   tupdesc;
     428             :     HeapTuple   htup;
     429             : 
     430             :     /* and construct a tuple with our data */
     431           2 :     xid = GetLatestCommitTsData(&ts, NULL);
     432             : 
     433             :     /*
     434             :      * Construct a tuple descriptor for the result row.  This must match this
     435             :      * function's pg_proc entry!
     436             :      */
     437           2 :     tupdesc = CreateTemplateTupleDesc(2);
     438           2 :     TupleDescInitEntry(tupdesc, (AttrNumber) 1, "xid",
     439             :                        XIDOID, -1, 0);
     440           2 :     TupleDescInitEntry(tupdesc, (AttrNumber) 2, "timestamp",
     441             :                        TIMESTAMPTZOID, -1, 0);
     442           2 :     tupdesc = BlessTupleDesc(tupdesc);
     443             : 
     444           2 :     if (!TransactionIdIsNormal(xid))
     445             :     {
     446           0 :         memset(nulls, true, sizeof(nulls));
     447             :     }
     448             :     else
     449             :     {
     450           2 :         values[0] = TransactionIdGetDatum(xid);
     451           2 :         nulls[0] = false;
     452             : 
     453           2 :         values[1] = TimestampTzGetDatum(ts);
     454           2 :         nulls[1] = false;
     455             :     }
     456             : 
     457           2 :     htup = heap_form_tuple(tupdesc, values, nulls);
     458             : 
     459           2 :     PG_RETURN_DATUM(HeapTupleGetDatum(htup));
     460             : }
     461             : 
     462             : 
     463             : /*
     464             :  * Number of shared CommitTS buffers.
     465             :  *
     466             :  * We use a very similar logic as for the number of CLOG buffers; see comments
     467             :  * in CLOGShmemBuffers.
     468             :  */
     469             : Size
     470        3784 : CommitTsShmemBuffers(void)
     471             : {
     472        3784 :     return Min(16, Max(4, NBuffers / 1024));
     473             : }
     474             : 
     475             : /*
     476             :  * Shared memory sizing for CommitTs
     477             :  */
     478             : Size
     479        1894 : CommitTsShmemSize(void)
     480             : {
     481        1894 :     return SimpleLruShmemSize(CommitTsShmemBuffers(), 0) +
     482             :         sizeof(CommitTimestampShared);
     483             : }
     484             : 
     485             : /*
     486             :  * Initialize CommitTs at system startup (postmaster start or standalone
     487             :  * backend)
     488             :  */
     489             : void
     490        1890 : CommitTsShmemInit(void)
     491             : {
     492             :     bool        found;
     493             : 
     494        1890 :     CommitTsCtl->PagePrecedes = CommitTsPagePrecedes;
     495        1890 :     SimpleLruInit(CommitTsCtl, "commit_timestamp", CommitTsShmemBuffers(), 0,
     496        1890 :                   CommitTsControlLock, "pg_commit_ts",
     497             :                   LWTRANCHE_COMMITTS_BUFFERS);
     498             : 
     499        1890 :     commitTsShared = ShmemInitStruct("CommitTs shared",
     500             :                                      sizeof(CommitTimestampShared),
     501             :                                      &found);
     502             : 
     503        1890 :     if (!IsUnderPostmaster)
     504             :     {
     505             :         Assert(!found);
     506             : 
     507        1890 :         commitTsShared->xidLastCommit = InvalidTransactionId;
     508        1890 :         TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
     509        1890 :         commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
     510        1890 :         commitTsShared->commitTsActive = false;
     511             :     }
     512             :     else
     513             :         Assert(found);
     514        1890 : }
     515             : 
     516             : /*
     517             :  * This function must be called ONCE on system install.
     518             :  *
     519             :  * (The CommitTs directory is assumed to have been created by initdb, and
     520             :  * CommitTsShmemInit must have been called already.)
     521             :  */
     522             : void
     523         322 : BootStrapCommitTs(void)
     524             : {
     525             :     /*
     526             :      * Nothing to do here at present, unlike most other SLRU modules; segments
     527             :      * are created when the server is started with this module enabled. See
     528             :      * ActivateCommitTs.
     529             :      */
     530         322 : }
     531             : 
     532             : /*
     533             :  * Initialize (or reinitialize) a page of CommitTs to zeroes.
     534             :  * If writeXlog is true, also emit an XLOG record saying we did this.
     535             :  *
     536             :  * The page is not actually written, just set up in shared memory.
     537             :  * The slot number of the new page is returned.
     538             :  *
     539             :  * Control lock must be held at entry, and will be held at exit.
     540             :  */
     541             : static int
     542          14 : ZeroCommitTsPage(int pageno, bool writeXlog)
     543             : {
     544             :     int         slotno;
     545             : 
     546          14 :     slotno = SimpleLruZeroPage(CommitTsCtl, pageno);
     547             : 
     548          14 :     if (writeXlog)
     549           0 :         WriteZeroPageXlogRec(pageno);
     550             : 
     551          14 :     return slotno;
     552             : }
     553             : 
     554             : /*
     555             :  * This must be called ONCE during postmaster or standalone-backend startup,
     556             :  * after StartupXLOG has initialized ShmemVariableCache->nextFullXid.
     557             :  */
     558             : void
     559          16 : StartupCommitTs(void)
     560             : {
     561          16 :     ActivateCommitTs();
     562          16 : }
     563             : 
     564             : /*
     565             :  * This must be called ONCE during postmaster or standalone-backend startup,
     566             :  * after recovery has finished.
     567             :  */
     568             : void
     569        1170 : CompleteCommitTsInitialization(void)
     570             : {
     571             :     /*
     572             :      * If the feature is not enabled, turn it off for good.  This also removes
     573             :      * any leftover data.
     574             :      *
     575             :      * Conversely, we activate the module if the feature is enabled.  This is
     576             :      * necessary for primary and standby as the activation depends on the
     577             :      * control file contents at the beginning of recovery or when a
     578             :      * XLOG_PARAMETER_CHANGE is replayed.
     579             :      */
     580        1170 :     if (!track_commit_timestamp)
     581        1148 :         DeactivateCommitTs();
     582             :     else
     583          22 :         ActivateCommitTs();
     584        1170 : }
     585             : 
     586             : /*
     587             :  * Activate or deactivate CommitTs' upon reception of a XLOG_PARAMETER_CHANGE
     588             :  * XLog record during recovery.
     589             :  */
     590             : void
     591          14 : CommitTsParameterChange(bool newvalue, bool oldvalue)
     592             : {
     593             :     /*
     594             :      * If the commit_ts module is disabled in this server and we get word from
     595             :      * the master server that it is enabled there, activate it so that we can
     596             :      * replay future WAL records involving it; also mark it as active on
     597             :      * pg_control.  If the old value was already set, we already did this, so
     598             :      * don't do anything.
     599             :      *
     600             :      * If the module is disabled in the master, disable it here too, unless
     601             :      * the module is enabled locally.
     602             :      *
     603             :      * Note this only runs in the recovery process, so an unlocked read is
     604             :      * fine.
     605             :      */
     606          14 :     if (newvalue)
     607             :     {
     608           6 :         if (!commitTsShared->commitTsActive)
     609           2 :             ActivateCommitTs();
     610             :     }
     611           8 :     else if (commitTsShared->commitTsActive)
     612           2 :         DeactivateCommitTs();
     613          14 : }
     614             : 
     615             : /*
     616             :  * Activate this module whenever necessary.
     617             :  *      This must happen during postmaster or standalone-backend startup,
     618             :  *      or during WAL replay anytime the track_commit_timestamp setting is
     619             :  *      changed in the master.
     620             :  *
     621             :  * The reason why this SLRU needs separate activation/deactivation functions is
     622             :  * that it can be enabled/disabled during start and the activation/deactivation
     623             :  * on master is propagated to standby via replay. Other SLRUs don't have this
     624             :  * property and they can be just initialized during normal startup.
     625             :  *
     626             :  * This is in charge of creating the currently active segment, if it's not
     627             :  * already there.  The reason for this is that the server might have been
     628             :  * running with this module disabled for a while and thus might have skipped
     629             :  * the normal creation point.
     630             :  */
     631             : static void
     632          40 : ActivateCommitTs(void)
     633             : {
     634             :     TransactionId xid;
     635             :     int         pageno;
     636             : 
     637             :     /* If we've done this already, there's nothing to do */
     638          40 :     LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
     639          40 :     if (commitTsShared->commitTsActive)
     640             :     {
     641           8 :         LWLockRelease(CommitTsLock);
     642           8 :         return;
     643             :     }
     644          32 :     LWLockRelease(CommitTsLock);
     645             : 
     646          32 :     xid = XidFromFullTransactionId(ShmemVariableCache->nextFullXid);
     647          32 :     pageno = TransactionIdToCTsPage(xid);
     648             : 
     649             :     /*
     650             :      * Re-Initialize our idea of the latest page number.
     651             :      */
     652          32 :     LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
     653          32 :     CommitTsCtl->shared->latest_page_number = pageno;
     654          32 :     LWLockRelease(CommitTsControlLock);
     655             : 
     656             :     /*
     657             :      * If CommitTs is enabled, but it wasn't in the previous server run, we
     658             :      * need to set the oldest and newest values to the next Xid; that way, we
     659             :      * will not try to read data that might not have been set.
     660             :      *
     661             :      * XXX does this have a problem if a server is started with commitTs
     662             :      * enabled, then started with commitTs disabled, then restarted with it
     663             :      * enabled again?  It doesn't look like it does, because there should be a
     664             :      * checkpoint that sets the value to InvalidTransactionId at end of
     665             :      * recovery; and so any chance of injecting new transactions without
     666             :      * CommitTs values would occur after the oldestCommitTsXid has been set to
     667             :      * Invalid temporarily.
     668             :      */
     669          32 :     LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
     670          32 :     if (ShmemVariableCache->oldestCommitTsXid == InvalidTransactionId)
     671             :     {
     672          40 :         ShmemVariableCache->oldestCommitTsXid =
     673          20 :             ShmemVariableCache->newestCommitTsXid = ReadNewTransactionId();
     674             :     }
     675          32 :     LWLockRelease(CommitTsLock);
     676             : 
     677             :     /* Create the current segment file, if necessary */
     678          32 :     if (!SimpleLruDoesPhysicalPageExist(CommitTsCtl, pageno))
     679             :     {
     680             :         int         slotno;
     681             : 
     682          14 :         LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
     683          14 :         slotno = ZeroCommitTsPage(pageno, false);
     684          14 :         SimpleLruWritePage(CommitTsCtl, slotno);
     685             :         Assert(!CommitTsCtl->shared->page_dirty[slotno]);
     686          14 :         LWLockRelease(CommitTsControlLock);
     687             :     }
     688             : 
     689             :     /* Change the activation status in shared memory. */
     690          32 :     LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
     691          32 :     commitTsShared->commitTsActive = true;
     692          32 :     LWLockRelease(CommitTsLock);
     693             : }
     694             : 
     695             : /*
     696             :  * Deactivate this module.
     697             :  *
     698             :  * This must be called when the track_commit_timestamp parameter is turned off.
     699             :  * This happens during postmaster or standalone-backend startup, or during WAL
     700             :  * replay.
     701             :  *
     702             :  * Resets CommitTs into invalid state to make sure we don't hand back
     703             :  * possibly-invalid data; also removes segments of old data.
     704             :  */
     705             : static void
     706        1150 : DeactivateCommitTs(void)
     707             : {
     708             :     /*
     709             :      * Cleanup the status in the shared memory.
     710             :      *
     711             :      * We reset everything in the commitTsShared record to prevent user from
     712             :      * getting confusing data about last committed transaction on the standby
     713             :      * when the module was activated repeatedly on the primary.
     714             :      */
     715        1150 :     LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
     716             : 
     717        1150 :     commitTsShared->commitTsActive = false;
     718        1150 :     commitTsShared->xidLastCommit = InvalidTransactionId;
     719        1150 :     TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
     720        1150 :     commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
     721             : 
     722        1150 :     ShmemVariableCache->oldestCommitTsXid = InvalidTransactionId;
     723        1150 :     ShmemVariableCache->newestCommitTsXid = InvalidTransactionId;
     724             : 
     725        1150 :     LWLockRelease(CommitTsLock);
     726             : 
     727             :     /*
     728             :      * Remove *all* files.  This is necessary so that there are no leftover
     729             :      * files; in the case where this feature is later enabled after running
     730             :      * with it disabled for some time there may be a gap in the file sequence.
     731             :      * (We can probably tolerate out-of-sequence files, as they are going to
     732             :      * be overwritten anyway when we wrap around, but it seems better to be
     733             :      * tidy.)
     734             :      */
     735        1150 :     LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
     736        1150 :     (void) SlruScanDirectory(CommitTsCtl, SlruScanDirCbDeleteAll, NULL);
     737        1150 :     LWLockRelease(CommitTsControlLock);
     738        1150 : }
     739             : 
     740             : /*
     741             :  * This must be called ONCE during postmaster or standalone-backend shutdown
     742             :  */
     743             : void
     744        1024 : ShutdownCommitTs(void)
     745             : {
     746             :     /* Flush dirty CommitTs pages to disk */
     747        1024 :     SimpleLruFlush(CommitTsCtl, false);
     748             : 
     749             :     /*
     750             :      * fsync pg_commit_ts to ensure that any files flushed previously are
     751             :      * durably on disk.
     752             :      */
     753        1024 :     fsync_fname("pg_commit_ts", true);
     754        1024 : }
     755             : 
     756             : /*
     757             :  * Perform a checkpoint --- either during shutdown, or on-the-fly
     758             :  */
     759             : void
     760        2832 : CheckPointCommitTs(void)
     761             : {
     762             :     /* Flush dirty CommitTs pages to disk */
     763        2832 :     SimpleLruFlush(CommitTsCtl, true);
     764             : 
     765             :     /*
     766             :      * fsync pg_commit_ts to ensure that any files flushed previously are
     767             :      * durably on disk.
     768             :      */
     769        2832 :     fsync_fname("pg_commit_ts", true);
     770        2832 : }
     771             : 
     772             : /*
     773             :  * Make sure that CommitTs has room for a newly-allocated XID.
     774             :  *
     775             :  * NB: this is called while holding XidGenLock.  We want it to be very fast
     776             :  * most of the time; even when it's not so fast, no actual I/O need happen
     777             :  * unless we're forced to write out a dirty CommitTs or xlog page to make room
     778             :  * in shared memory.
     779             :  *
     780             :  * NB: the current implementation relies on track_commit_timestamp being
     781             :  * PGC_POSTMASTER.
     782             :  */
     783             : void
     784      255262 : ExtendCommitTs(TransactionId newestXact)
     785             : {
     786             :     int         pageno;
     787             : 
     788             :     /*
     789             :      * Nothing to do if module not enabled.  Note we do an unlocked read of
     790             :      * the flag here, which is okay because this routine is only called from
     791             :      * GetNewTransactionId, which is never called in a standby.
     792             :      */
     793             :     Assert(!InRecovery);
     794      255262 :     if (!commitTsShared->commitTsActive)
     795      255200 :         return;
     796             : 
     797             :     /*
     798             :      * No work except at first XID of a page.  But beware: just after
     799             :      * wraparound, the first XID of page zero is FirstNormalTransactionId.
     800             :      */
     801          62 :     if (TransactionIdToCTsEntry(newestXact) != 0 &&
     802             :         !TransactionIdEquals(newestXact, FirstNormalTransactionId))
     803          62 :         return;
     804             : 
     805           0 :     pageno = TransactionIdToCTsPage(newestXact);
     806             : 
     807           0 :     LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
     808             : 
     809             :     /* Zero the page and make an XLOG entry about it */
     810           0 :     ZeroCommitTsPage(pageno, !InRecovery);
     811             : 
     812           0 :     LWLockRelease(CommitTsControlLock);
     813             : }
     814             : 
     815             : /*
     816             :  * Remove all CommitTs segments before the one holding the passed
     817             :  * transaction ID.
     818             :  *
     819             :  * Note that we don't need to flush XLOG here.
     820             :  */
     821             : void
     822         700 : TruncateCommitTs(TransactionId oldestXact)
     823             : {
     824             :     int         cutoffPage;
     825             : 
     826             :     /*
     827             :      * The cutoff point is the start of the segment containing oldestXact. We
     828             :      * pass the *page* containing oldestXact to SimpleLruTruncate.
     829             :      */
     830         700 :     cutoffPage = TransactionIdToCTsPage(oldestXact);
     831             : 
     832             :     /* Check to see if there's any files that could be removed */
     833         700 :     if (!SlruScanDirectory(CommitTsCtl, SlruScanDirCbReportPresence,
     834             :                            &cutoffPage))
     835         700 :         return;                 /* nothing to remove */
     836             : 
     837             :     /* Write XLOG record */
     838           0 :     WriteTruncateXlogRec(cutoffPage, oldestXact);
     839             : 
     840             :     /* Now we can remove the old CommitTs segment(s) */
     841           0 :     SimpleLruTruncate(CommitTsCtl, cutoffPage);
     842             : }
     843             : 
     844             : /*
     845             :  * Set the limit values between which commit TS can be consulted.
     846             :  */
     847             : void
     848        1514 : SetCommitTsLimit(TransactionId oldestXact, TransactionId newestXact)
     849             : {
     850             :     /*
     851             :      * Be careful not to overwrite values that are either further into the
     852             :      * "future" or signal a disabled committs.
     853             :      */
     854        1514 :     LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
     855        1514 :     if (ShmemVariableCache->oldestCommitTsXid != InvalidTransactionId)
     856             :     {
     857           0 :         if (TransactionIdPrecedes(ShmemVariableCache->oldestCommitTsXid, oldestXact))
     858           0 :             ShmemVariableCache->oldestCommitTsXid = oldestXact;
     859           0 :         if (TransactionIdPrecedes(newestXact, ShmemVariableCache->newestCommitTsXid))
     860           0 :             ShmemVariableCache->newestCommitTsXid = newestXact;
     861             :     }
     862             :     else
     863             :     {
     864             :         Assert(ShmemVariableCache->newestCommitTsXid == InvalidTransactionId);
     865        1514 :         ShmemVariableCache->oldestCommitTsXid = oldestXact;
     866        1514 :         ShmemVariableCache->newestCommitTsXid = newestXact;
     867             :     }
     868        1514 :     LWLockRelease(CommitTsLock);
     869        1514 : }
     870             : 
     871             : /*
     872             :  * Move forwards the oldest commitTS value that can be consulted
     873             :  */
     874             : void
     875         700 : AdvanceOldestCommitTsXid(TransactionId oldestXact)
     876             : {
     877         700 :     LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
     878         700 :     if (ShmemVariableCache->oldestCommitTsXid != InvalidTransactionId &&
     879           0 :         TransactionIdPrecedes(ShmemVariableCache->oldestCommitTsXid, oldestXact))
     880           0 :         ShmemVariableCache->oldestCommitTsXid = oldestXact;
     881         700 :     LWLockRelease(CommitTsLock);
     882         700 : }
     883             : 
     884             : 
     885             : /*
     886             :  * Decide which of two commitTS page numbers is "older" for truncation
     887             :  * purposes.
     888             :  *
     889             :  * We need to use comparison of TransactionIds here in order to do the right
     890             :  * thing with wraparound XID arithmetic.  However, if we are asked about
     891             :  * page number zero, we don't want to hand InvalidTransactionId to
     892             :  * TransactionIdPrecedes: it'll get weird about permanent xact IDs.  So,
     893             :  * offset both xids by FirstNormalTransactionId to avoid that.
     894             :  */
     895             : static bool
     896           0 : CommitTsPagePrecedes(int page1, int page2)
     897             : {
     898             :     TransactionId xid1;
     899             :     TransactionId xid2;
     900             : 
     901           0 :     xid1 = ((TransactionId) page1) * COMMIT_TS_XACTS_PER_PAGE;
     902           0 :     xid1 += FirstNormalTransactionId;
     903           0 :     xid2 = ((TransactionId) page2) * COMMIT_TS_XACTS_PER_PAGE;
     904           0 :     xid2 += FirstNormalTransactionId;
     905             : 
     906           0 :     return TransactionIdPrecedes(xid1, xid2);
     907             : }
     908             : 
     909             : 
     910             : /*
     911             :  * Write a ZEROPAGE xlog record
     912             :  */
     913             : static void
     914           0 : WriteZeroPageXlogRec(int pageno)
     915             : {
     916           0 :     XLogBeginInsert();
     917           0 :     XLogRegisterData((char *) (&pageno), sizeof(int));
     918           0 :     (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_ZEROPAGE);
     919           0 : }
     920             : 
     921             : /*
     922             :  * Write a TRUNCATE xlog record
     923             :  */
     924             : static void
     925           0 : WriteTruncateXlogRec(int pageno, TransactionId oldestXid)
     926             : {
     927             :     xl_commit_ts_truncate xlrec;
     928             : 
     929           0 :     xlrec.pageno = pageno;
     930           0 :     xlrec.oldestXid = oldestXid;
     931             : 
     932           0 :     XLogBeginInsert();
     933           0 :     XLogRegisterData((char *) (&xlrec), SizeOfCommitTsTruncate);
     934           0 :     (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_TRUNCATE);
     935           0 : }
     936             : 
     937             : /*
     938             :  * Write a SETTS xlog record
     939             :  */
     940             : static void
     941           0 : WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids,
     942             :                          TransactionId *subxids, TimestampTz timestamp,
     943             :                          RepOriginId nodeid)
     944             : {
     945             :     xl_commit_ts_set record;
     946             : 
     947           0 :     record.timestamp = timestamp;
     948           0 :     record.nodeid = nodeid;
     949           0 :     record.mainxid = mainxid;
     950             : 
     951           0 :     XLogBeginInsert();
     952           0 :     XLogRegisterData((char *) &record,
     953             :                      offsetof(xl_commit_ts_set, mainxid) +
     954             :                      sizeof(TransactionId));
     955           0 :     XLogRegisterData((char *) subxids, nsubxids * sizeof(TransactionId));
     956           0 :     XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_SETTS);
     957           0 : }
     958             : 
     959             : /*
     960             :  * CommitTS resource manager's routines
     961             :  */
     962             : void
     963           0 : commit_ts_redo(XLogReaderState *record)
     964             : {
     965           0 :     uint8       info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
     966             : 
     967             :     /* Backup blocks are not used in commit_ts records */
     968             :     Assert(!XLogRecHasAnyBlockRefs(record));
     969             : 
     970           0 :     if (info == COMMIT_TS_ZEROPAGE)
     971             :     {
     972             :         int         pageno;
     973             :         int         slotno;
     974             : 
     975           0 :         memcpy(&pageno, XLogRecGetData(record), sizeof(int));
     976             : 
     977           0 :         LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
     978             : 
     979           0 :         slotno = ZeroCommitTsPage(pageno, false);
     980           0 :         SimpleLruWritePage(CommitTsCtl, slotno);
     981             :         Assert(!CommitTsCtl->shared->page_dirty[slotno]);
     982             : 
     983           0 :         LWLockRelease(CommitTsControlLock);
     984             :     }
     985           0 :     else if (info == COMMIT_TS_TRUNCATE)
     986             :     {
     987           0 :         xl_commit_ts_truncate *trunc = (xl_commit_ts_truncate *) XLogRecGetData(record);
     988             : 
     989           0 :         AdvanceOldestCommitTsXid(trunc->oldestXid);
     990             : 
     991             :         /*
     992             :          * During XLOG replay, latest_page_number isn't set up yet; insert a
     993             :          * suitable value to bypass the sanity test in SimpleLruTruncate.
     994             :          */
     995           0 :         CommitTsCtl->shared->latest_page_number = trunc->pageno;
     996             : 
     997           0 :         SimpleLruTruncate(CommitTsCtl, trunc->pageno);
     998             :     }
     999           0 :     else if (info == COMMIT_TS_SETTS)
    1000             :     {
    1001           0 :         xl_commit_ts_set *setts = (xl_commit_ts_set *) XLogRecGetData(record);
    1002             :         int         nsubxids;
    1003             :         TransactionId *subxids;
    1004             : 
    1005           0 :         nsubxids = ((XLogRecGetDataLen(record) - SizeOfCommitTsSet) /
    1006             :                     sizeof(TransactionId));
    1007           0 :         if (nsubxids > 0)
    1008             :         {
    1009           0 :             subxids = palloc(sizeof(TransactionId) * nsubxids);
    1010           0 :             memcpy(subxids,
    1011           0 :                    XLogRecGetData(record) + SizeOfCommitTsSet,
    1012             :                    sizeof(TransactionId) * nsubxids);
    1013             :         }
    1014             :         else
    1015           0 :             subxids = NULL;
    1016             : 
    1017           0 :         TransactionTreeSetCommitTsData(setts->mainxid, nsubxids, subxids,
    1018           0 :                                        setts->timestamp, setts->nodeid, true);
    1019           0 :         if (subxids)
    1020           0 :             pfree(subxids);
    1021             :     }
    1022             :     else
    1023           0 :         elog(PANIC, "commit_ts_redo: unknown op code %u", info);
    1024           0 : }

Generated by: LCOV version 1.13