LCOV - code coverage report
Current view: top level - src/backend/commands - wait.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 83.8 % 105 88
Test Date: 2026-03-01 23:14:58 Functions: 100.0 % 2 2
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * wait.c
       4              :  *    Implements WAIT FOR, which allows waiting for events such as
       5              :  *    time passing or LSN having been replayed, flushed, or written.
       6              :  *
       7              :  * Portions Copyright (c) 2025-2026, PostgreSQL Global Development Group
       8              :  *
       9              :  * IDENTIFICATION
      10              :  *    src/backend/commands/wait.c
      11              :  *
      12              :  *-------------------------------------------------------------------------
      13              :  */
      14              : #include "postgres.h"
      15              : 
      16              : #include <math.h>
      17              : 
      18              : #include "access/xlog.h"
      19              : #include "access/xlogrecovery.h"
      20              : #include "access/xlogwait.h"
      21              : #include "commands/defrem.h"
      22              : #include "commands/wait.h"
      23              : #include "executor/executor.h"
      24              : #include "parser/parse_node.h"
      25              : #include "storage/proc.h"
      26              : #include "utils/builtins.h"
      27              : #include "utils/guc.h"
      28              : #include "utils/pg_lsn.h"
      29              : #include "utils/snapmgr.h"
      30              : 
      31              : 
      32              : void
      33           55 : ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest)
      34              : {
      35              :     XLogRecPtr  lsn;
      36           55 :     int64       timeout = 0;
      37              :     WaitLSNResult waitLSNResult;
      38           55 :     WaitLSNType lsnType = WAIT_LSN_TYPE_STANDBY_REPLAY; /* default */
      39           55 :     bool        throw = true;
      40              :     TupleDesc   tupdesc;
      41              :     TupOutputState *tstate;
      42           55 :     const char *result = "<unset>";
      43           55 :     bool        timeout_specified = false;
      44           55 :     bool        no_throw_specified = false;
      45           55 :     bool        mode_specified = false;
      46              : 
      47              :     /* Parse and validate the mandatory LSN */
      48           55 :     lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in,
      49              :                                           CStringGetDatum(stmt->lsn_literal)));
      50              : 
      51          169 :     foreach_node(DefElem, defel, stmt->options)
      52              :     {
      53           77 :         if (strcmp(defel->defname, "mode") == 0)
      54              :         {
      55              :             char       *mode_str;
      56              : 
      57           32 :             if (mode_specified)
      58            1 :                 errorConflictingDefElem(defel, pstate);
      59           31 :             mode_specified = true;
      60              : 
      61           31 :             mode_str = defGetString(defel);
      62              : 
      63           31 :             if (pg_strcasecmp(mode_str, "standby_replay") == 0)
      64            4 :                 lsnType = WAIT_LSN_TYPE_STANDBY_REPLAY;
      65           27 :             else if (pg_strcasecmp(mode_str, "standby_write") == 0)
      66            9 :                 lsnType = WAIT_LSN_TYPE_STANDBY_WRITE;
      67           18 :             else if (pg_strcasecmp(mode_str, "standby_flush") == 0)
      68           10 :                 lsnType = WAIT_LSN_TYPE_STANDBY_FLUSH;
      69            8 :             else if (pg_strcasecmp(mode_str, "primary_flush") == 0)
      70            7 :                 lsnType = WAIT_LSN_TYPE_PRIMARY_FLUSH;
      71              :             else
      72            1 :                 ereport(ERROR,
      73              :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
      74              :                          errmsg("unrecognized value for %s option \"%s\": \"%s\"",
      75              :                                 "WAIT", defel->defname, mode_str),
      76              :                          parser_errposition(pstate, defel->location)));
      77              :         }
      78           45 :         else if (strcmp(defel->defname, "timeout") == 0)
      79              :         {
      80              :             char       *timeout_str;
      81              :             const char *hintmsg;
      82              :             double      result;
      83              : 
      84           36 :             if (timeout_specified)
      85            1 :                 errorConflictingDefElem(defel, pstate);
      86           35 :             timeout_specified = true;
      87              : 
      88           35 :             timeout_str = defGetString(defel);
      89              : 
      90           35 :             if (!parse_real(timeout_str, &result, GUC_UNIT_MS, &hintmsg))
      91              :             {
      92            1 :                 ereport(ERROR,
      93              :                         errcode(ERRCODE_INVALID_PARAMETER_VALUE),
      94              :                         errmsg("invalid timeout value: \"%s\"", timeout_str),
      95              :                         hintmsg ? errhint("%s", _(hintmsg)) : 0);
      96              :             }
      97              : 
      98              :             /*
      99              :              * Get rid of any fractional part in the input. This is so we
     100              :              * don't fail on just-out-of-range values that would round into
     101              :              * range.
     102              :              */
     103           34 :             result = rint(result);
     104              : 
     105              :             /* Range check */
     106           34 :             if (unlikely(isnan(result) || !FLOAT8_FITS_IN_INT64(result)))
     107            0 :                 ereport(ERROR,
     108              :                         errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
     109              :                         errmsg("timeout value is out of range"));
     110              : 
     111           34 :             if (result < 0)
     112            1 :                 ereport(ERROR,
     113              :                         errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     114              :                         errmsg("timeout cannot be negative"));
     115              : 
     116           33 :             timeout = (int64) result;
     117              :         }
     118            9 :         else if (strcmp(defel->defname, "no_throw") == 0)
     119              :         {
     120            7 :             if (no_throw_specified)
     121            1 :                 errorConflictingDefElem(defel, pstate);
     122              : 
     123            6 :             no_throw_specified = true;
     124              : 
     125            6 :             throw = !defGetBoolean(defel);
     126              :         }
     127              :         else
     128              :         {
     129            2 :             ereport(ERROR,
     130              :                     errcode(ERRCODE_SYNTAX_ERROR),
     131              :                     errmsg("option \"%s\" not recognized",
     132              :                            defel->defname),
     133              :                     parser_errposition(pstate, defel->location));
     134              :         }
     135              :     }
     136              : 
     137              :     /*
     138              :      * We are going to wait for the LSN.  We should first care that we don't
     139              :      * hold a snapshot and correspondingly our MyProc->xmin is invalid.
     140              :      * Otherwise, our snapshot could prevent the replay of WAL records
     141              :      * implying a kind of self-deadlock.  This is the reason why WAIT FOR is a
     142              :      * command, not a procedure or function.
     143              :      *
     144              :      * At first, we should check there is no active snapshot.  According to
     145              :      * PlannedStmtRequiresSnapshot(), even in an atomic context, CallStmt is
     146              :      * processed with a snapshot.  Thankfully, we can pop this snapshot,
     147              :      * because PortalRunUtility() can tolerate this.
     148              :      */
     149           46 :     if (ActiveSnapshotSet())
     150            1 :         PopActiveSnapshot();
     151              : 
     152              :     /*
     153              :      * At second, invalidate a catalog snapshot if any.  And we should be done
     154              :      * with the preparation.
     155              :      */
     156           46 :     InvalidateCatalogSnapshot();
     157              : 
     158              :     /* Give up if there is still an active or registered snapshot. */
     159           46 :     if (HaveRegisteredOrActiveSnapshot())
     160            2 :         ereport(ERROR,
     161              :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     162              :                 errmsg("WAIT FOR must be called without an active or registered snapshot"),
     163              :                 errdetail("WAIT FOR cannot be executed from a function or procedure, nor within a transaction with an isolation level higher than READ COMMITTED."));
     164              : 
     165              :     /*
     166              :      * As the result we should hold no snapshot, and correspondingly our xmin
     167              :      * should be unset.
     168              :      */
     169              :     Assert(MyProc->xmin == InvalidTransactionId);
     170              : 
     171              :     /*
     172              :      * Validate that the requested mode matches the current server state.
     173              :      * Primary modes can only be used on a primary.
     174              :      */
     175           44 :     if (lsnType == WAIT_LSN_TYPE_PRIMARY_FLUSH)
     176              :     {
     177            7 :         if (RecoveryInProgress())
     178            1 :             ereport(ERROR,
     179              :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     180              :                      errmsg("recovery is in progress"),
     181              :                      errhint("Waiting for primary_flush can only be done on a primary server. "
     182              :                              "Use standby_flush mode on a standby server.")));
     183              :     }
     184              : 
     185              :     /* Now wait for the LSN */
     186           43 :     waitLSNResult = WaitForLSN(lsnType, lsn, timeout);
     187              : 
     188              :     /*
     189              :      * Process the result of WaitForLSN().  Throw appropriate error if needed.
     190              :      */
     191           43 :     switch (waitLSNResult)
     192              :     {
     193           35 :         case WAIT_LSN_RESULT_SUCCESS:
     194              :             /* Nothing to do on success */
     195           35 :             result = "success";
     196           35 :             break;
     197              : 
     198            3 :         case WAIT_LSN_RESULT_TIMEOUT:
     199            3 :             if (throw)
     200              :             {
     201            1 :                 XLogRecPtr  currentLSN = GetCurrentLSNForWaitType(lsnType);
     202              : 
     203            1 :                 switch (lsnType)
     204              :                 {
     205            1 :                     case WAIT_LSN_TYPE_STANDBY_REPLAY:
     206            1 :                         ereport(ERROR,
     207              :                                 errcode(ERRCODE_QUERY_CANCELED),
     208              :                                 errmsg("timed out while waiting for target LSN %X/%08X to be replayed; current standby_replay LSN %X/%08X",
     209              :                                        LSN_FORMAT_ARGS(lsn),
     210              :                                        LSN_FORMAT_ARGS(currentLSN)));
     211              :                         break;
     212              : 
     213            0 :                     case WAIT_LSN_TYPE_STANDBY_WRITE:
     214            0 :                         ereport(ERROR,
     215              :                                 errcode(ERRCODE_QUERY_CANCELED),
     216              :                                 errmsg("timed out while waiting for target LSN %X/%08X to be written; current standby_write LSN %X/%08X",
     217              :                                        LSN_FORMAT_ARGS(lsn),
     218              :                                        LSN_FORMAT_ARGS(currentLSN)));
     219              :                         break;
     220              : 
     221            0 :                     case WAIT_LSN_TYPE_STANDBY_FLUSH:
     222            0 :                         ereport(ERROR,
     223              :                                 errcode(ERRCODE_QUERY_CANCELED),
     224              :                                 errmsg("timed out while waiting for target LSN %X/%08X to be flushed; current standby_flush LSN %X/%08X",
     225              :                                        LSN_FORMAT_ARGS(lsn),
     226              :                                        LSN_FORMAT_ARGS(currentLSN)));
     227              :                         break;
     228              : 
     229            0 :                     case WAIT_LSN_TYPE_PRIMARY_FLUSH:
     230            0 :                         ereport(ERROR,
     231              :                                 errcode(ERRCODE_QUERY_CANCELED),
     232              :                                 errmsg("timed out while waiting for target LSN %X/%08X to be flushed; current primary_flush LSN %X/%08X",
     233              :                                        LSN_FORMAT_ARGS(lsn),
     234              :                                        LSN_FORMAT_ARGS(currentLSN)));
     235              :                         break;
     236              : 
     237            0 :                     default:
     238            0 :                         elog(ERROR, "unexpected wait LSN type %d", lsnType);
     239              :                 }
     240              :             }
     241              :             else
     242            2 :                 result = "timeout";
     243            2 :             break;
     244              : 
     245            5 :         case WAIT_LSN_RESULT_NOT_IN_RECOVERY:
     246            5 :             if (throw)
     247              :             {
     248            4 :                 if (PromoteIsTriggered())
     249              :                 {
     250            3 :                     XLogRecPtr  currentLSN = GetCurrentLSNForWaitType(lsnType);
     251              : 
     252            3 :                     switch (lsnType)
     253              :                     {
     254            1 :                         case WAIT_LSN_TYPE_STANDBY_REPLAY:
     255            1 :                             ereport(ERROR,
     256              :                                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     257              :                                     errmsg("recovery is not in progress"),
     258              :                                     errdetail("Recovery ended before target LSN %X/%08X was replayed; last standby_replay LSN %X/%08X.",
     259              :                                               LSN_FORMAT_ARGS(lsn),
     260              :                                               LSN_FORMAT_ARGS(currentLSN)));
     261              :                             break;
     262              : 
     263            1 :                         case WAIT_LSN_TYPE_STANDBY_WRITE:
     264            1 :                             ereport(ERROR,
     265              :                                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     266              :                                     errmsg("recovery is not in progress"),
     267              :                                     errdetail("Recovery ended before target LSN %X/%08X was written; last standby_write LSN %X/%08X.",
     268              :                                               LSN_FORMAT_ARGS(lsn),
     269              :                                               LSN_FORMAT_ARGS(currentLSN)));
     270              :                             break;
     271              : 
     272            1 :                         case WAIT_LSN_TYPE_STANDBY_FLUSH:
     273            1 :                             ereport(ERROR,
     274              :                                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     275              :                                     errmsg("recovery is not in progress"),
     276              :                                     errdetail("Recovery ended before target LSN %X/%08X was flushed; last standby_flush LSN %X/%08X.",
     277              :                                               LSN_FORMAT_ARGS(lsn),
     278              :                                               LSN_FORMAT_ARGS(currentLSN)));
     279              :                             break;
     280              : 
     281            0 :                         default:
     282            0 :                             elog(ERROR, "unexpected wait LSN type %d", lsnType);
     283              :                     }
     284              :                 }
     285              :                 else
     286              :                 {
     287            1 :                     switch (lsnType)
     288              :                     {
     289            0 :                         case WAIT_LSN_TYPE_STANDBY_REPLAY:
     290            0 :                             ereport(ERROR,
     291              :                                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     292              :                                     errmsg("recovery is not in progress"),
     293              :                                     errhint("Waiting for the standby_replay LSN can only be executed during recovery."));
     294              :                             break;
     295              : 
     296            0 :                         case WAIT_LSN_TYPE_STANDBY_WRITE:
     297            0 :                             ereport(ERROR,
     298              :                                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     299              :                                     errmsg("recovery is not in progress"),
     300              :                                     errhint("Waiting for the standby_write LSN can only be executed during recovery."));
     301              :                             break;
     302              : 
     303            1 :                         case WAIT_LSN_TYPE_STANDBY_FLUSH:
     304            1 :                             ereport(ERROR,
     305              :                                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     306              :                                     errmsg("recovery is not in progress"),
     307              :                                     errhint("Waiting for the standby_flush LSN can only be executed during recovery."));
     308              :                             break;
     309              : 
     310            0 :                         default:
     311            0 :                             elog(ERROR, "unexpected wait LSN type %d", lsnType);
     312              :                     }
     313              :                 }
     314              :             }
     315              :             else
     316            1 :                 result = "not in recovery";
     317            1 :             break;
     318              :     }
     319              : 
     320              :     /* need a tuple descriptor representing a single TEXT column */
     321           38 :     tupdesc = WaitStmtResultDesc(stmt);
     322              : 
     323              :     /* prepare for projection of tuples */
     324           38 :     tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
     325              : 
     326              :     /* Send it */
     327           38 :     do_text_output_oneline(tstate, result);
     328              : 
     329           38 :     end_tup_output(tstate);
     330           38 : }
     331              : 
     332              : TupleDesc
     333           93 : WaitStmtResultDesc(WaitStmt *stmt)
     334              : {
     335              :     TupleDesc   tupdesc;
     336              : 
     337              :     /* Need a tuple descriptor representing a single TEXT  column */
     338           93 :     tupdesc = CreateTemplateTupleDesc(1);
     339           93 :     TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
     340              :                        TEXTOID, -1, 0);
     341           93 :     return tupdesc;
     342              : }
        

Generated by: LCOV version 2.0-1