LCOV - code coverage report
Current view: top level - src/backend/commands - wait.c (source / functions) Hit Total Coverage
Test: PostgreSQL 19devel Lines: 85 102 83.3 %
Date: 2026-02-07 13:18:11 Functions: 2 2 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * wait.c
       4             :  *    Implements WAIT FOR, which allows waiting for events such as
       5             :  *    time passing or LSN having been replayed, flushed, or written.
       6             :  *
       7             :  * Portions Copyright (c) 2025-2026, PostgreSQL Global Development Group
       8             :  *
       9             :  * IDENTIFICATION
      10             :  *    src/backend/commands/wait.c
      11             :  *
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : #include "postgres.h"
      15             : 
      16             : #include <math.h>
      17             : 
      18             : #include "access/xlog.h"
      19             : #include "access/xlogrecovery.h"
      20             : #include "access/xlogwait.h"
      21             : #include "commands/defrem.h"
      22             : #include "commands/wait.h"
      23             : #include "executor/executor.h"
      24             : #include "parser/parse_node.h"
      25             : #include "storage/proc.h"
      26             : #include "utils/builtins.h"
      27             : #include "utils/guc.h"
      28             : #include "utils/pg_lsn.h"
      29             : #include "utils/snapmgr.h"
      30             : 
      31             : 
      32             : void
      33         110 : ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest)
      34             : {
      35             :     XLogRecPtr  lsn;
      36         110 :     int64       timeout = 0;
      37             :     WaitLSNResult waitLSNResult;
      38         110 :     WaitLSNType lsnType = WAIT_LSN_TYPE_STANDBY_REPLAY; /* default */
      39         110 :     bool        throw = true;
      40             :     TupleDesc   tupdesc;
      41             :     TupOutputState *tstate;
      42         110 :     const char *result = "<unset>";
      43         110 :     bool        timeout_specified = false;
      44         110 :     bool        no_throw_specified = false;
      45         110 :     bool        mode_specified = false;
      46             : 
      47             :     /* Parse and validate the mandatory LSN */
      48         110 :     lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in,
      49             :                                           CStringGetDatum(stmt->lsn_literal)));
      50             : 
      51         338 :     foreach_node(DefElem, defel, stmt->options)
      52             :     {
      53         154 :         if (strcmp(defel->defname, "mode") == 0)
      54             :         {
      55             :             char       *mode_str;
      56             : 
      57          64 :             if (mode_specified)
      58           2 :                 errorConflictingDefElem(defel, pstate);
      59          62 :             mode_specified = true;
      60             : 
      61          62 :             mode_str = defGetString(defel);
      62             : 
      63          62 :             if (pg_strcasecmp(mode_str, "standby_replay") == 0)
      64           8 :                 lsnType = WAIT_LSN_TYPE_STANDBY_REPLAY;
      65          54 :             else if (pg_strcasecmp(mode_str, "standby_write") == 0)
      66          18 :                 lsnType = WAIT_LSN_TYPE_STANDBY_WRITE;
      67          36 :             else if (pg_strcasecmp(mode_str, "standby_flush") == 0)
      68          20 :                 lsnType = WAIT_LSN_TYPE_STANDBY_FLUSH;
      69          16 :             else if (pg_strcasecmp(mode_str, "primary_flush") == 0)
      70          14 :                 lsnType = WAIT_LSN_TYPE_PRIMARY_FLUSH;
      71             :             else
      72           2 :                 ereport(ERROR,
      73             :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
      74             :                          errmsg("unrecognized value for %s option \"%s\": \"%s\"",
      75             :                                 "WAIT", defel->defname, mode_str),
      76             :                          parser_errposition(pstate, defel->location)));
      77             :         }
      78          90 :         else if (strcmp(defel->defname, "timeout") == 0)
      79             :         {
      80             :             char       *timeout_str;
      81             :             const char *hintmsg;
      82             :             double      result;
      83             : 
      84          72 :             if (timeout_specified)
      85           2 :                 errorConflictingDefElem(defel, pstate);
      86          70 :             timeout_specified = true;
      87             : 
      88          70 :             timeout_str = defGetString(defel);
      89             : 
      90          70 :             if (!parse_real(timeout_str, &result, GUC_UNIT_MS, &hintmsg))
      91             :             {
      92           2 :                 ereport(ERROR,
      93             :                         errcode(ERRCODE_INVALID_PARAMETER_VALUE),
      94             :                         errmsg("invalid timeout value: \"%s\"", timeout_str),
      95             :                         hintmsg ? errhint("%s", _(hintmsg)) : 0);
      96             :             }
      97             : 
      98             :             /*
      99             :              * Get rid of any fractional part in the input. This is so we
     100             :              * don't fail on just-out-of-range values that would round into
     101             :              * range.
     102             :              */
     103          68 :             result = rint(result);
     104             : 
     105             :             /* Range check */
     106          68 :             if (unlikely(isnan(result) || !FLOAT8_FITS_IN_INT64(result)))
     107           0 :                 ereport(ERROR,
     108             :                         errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
     109             :                         errmsg("timeout value is out of range"));
     110             : 
     111          68 :             if (result < 0)
     112           2 :                 ereport(ERROR,
     113             :                         errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     114             :                         errmsg("timeout cannot be negative"));
     115             : 
     116          66 :             timeout = (int64) result;
     117             :         }
     118          18 :         else if (strcmp(defel->defname, "no_throw") == 0)
     119             :         {
     120          14 :             if (no_throw_specified)
     121           2 :                 errorConflictingDefElem(defel, pstate);
     122             : 
     123          12 :             no_throw_specified = true;
     124             : 
     125          12 :             throw = !defGetBoolean(defel);
     126             :         }
     127             :         else
     128             :         {
     129           4 :             ereport(ERROR,
     130             :                     errcode(ERRCODE_SYNTAX_ERROR),
     131             :                     errmsg("option \"%s\" not recognized",
     132             :                            defel->defname),
     133             :                     parser_errposition(pstate, defel->location));
     134             :         }
     135             :     }
     136             : 
     137             :     /*
     138             :      * We are going to wait for the LSN.  We should first care that we don't
     139             :      * hold a snapshot and correspondingly our MyProc->xmin is invalid.
     140             :      * Otherwise, our snapshot could prevent the replay of WAL records
     141             :      * implying a kind of self-deadlock.  This is the reason why WAIT FOR is a
     142             :      * command, not a procedure or function.
     143             :      *
     144             :      * At first, we should check there is no active snapshot.  According to
     145             :      * PlannedStmtRequiresSnapshot(), even in an atomic context, CallStmt is
     146             :      * processed with a snapshot.  Thankfully, we can pop this snapshot,
     147             :      * because PortalRunUtility() can tolerate this.
     148             :      */
     149          92 :     if (ActiveSnapshotSet())
     150           2 :         PopActiveSnapshot();
     151             : 
     152             :     /*
     153             :      * At second, invalidate a catalog snapshot if any.  And we should be done
     154             :      * with the preparation.
     155             :      */
     156          92 :     InvalidateCatalogSnapshot();
     157             : 
     158             :     /* Give up if there is still an active or registered snapshot. */
     159          92 :     if (HaveRegisteredOrActiveSnapshot())
     160           4 :         ereport(ERROR,
     161             :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     162             :                 errmsg("WAIT FOR must be called without an active or registered snapshot"),
     163             :                 errdetail("WAIT FOR cannot be executed from a function or procedure, nor within a transaction with an isolation level higher than READ COMMITTED."));
     164             : 
     165             :     /*
     166             :      * As the result we should hold no snapshot, and correspondingly our xmin
     167             :      * should be unset.
     168             :      */
     169             :     Assert(MyProc->xmin == InvalidTransactionId);
     170             : 
     171             :     /*
     172             :      * Validate that the requested mode matches the current server state.
     173             :      * Primary modes can only be used on a primary.
     174             :      */
     175          88 :     if (lsnType == WAIT_LSN_TYPE_PRIMARY_FLUSH)
     176             :     {
     177          14 :         if (RecoveryInProgress())
     178           2 :             ereport(ERROR,
     179             :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     180             :                      errmsg("recovery is in progress"),
     181             :                      errhint("Waiting for primary_flush can only be done on a primary server. "
     182             :                              "Use standby_flush mode on a standby server.")));
     183             :     }
     184             : 
     185             :     /* Now wait for the LSN */
     186          86 :     waitLSNResult = WaitForLSN(lsnType, lsn, timeout);
     187             : 
     188             :     /*
     189             :      * Process the result of WaitForLSN().  Throw appropriate error if needed.
     190             :      */
     191          86 :     switch (waitLSNResult)
     192             :     {
     193          70 :         case WAIT_LSN_RESULT_SUCCESS:
     194             :             /* Nothing to do on success */
     195          70 :             result = "success";
     196          70 :             break;
     197             : 
     198           6 :         case WAIT_LSN_RESULT_TIMEOUT:
     199           6 :             if (throw)
     200             :             {
     201           2 :                 XLogRecPtr  currentLSN = GetCurrentLSNForWaitType(lsnType);
     202             : 
     203             :                 switch (lsnType)
     204             :                 {
     205           2 :                     case WAIT_LSN_TYPE_STANDBY_REPLAY:
     206           2 :                         ereport(ERROR,
     207             :                                 errcode(ERRCODE_QUERY_CANCELED),
     208             :                                 errmsg("timed out while waiting for target LSN %X/%08X to be replayed; current standby_replay LSN %X/%08X",
     209             :                                        LSN_FORMAT_ARGS(lsn),
     210             :                                        LSN_FORMAT_ARGS(currentLSN)));
     211             :                         break;
     212             : 
     213           0 :                     case WAIT_LSN_TYPE_STANDBY_WRITE:
     214           0 :                         ereport(ERROR,
     215             :                                 errcode(ERRCODE_QUERY_CANCELED),
     216             :                                 errmsg("timed out while waiting for target LSN %X/%08X to be written; current standby_write LSN %X/%08X",
     217             :                                        LSN_FORMAT_ARGS(lsn),
     218             :                                        LSN_FORMAT_ARGS(currentLSN)));
     219             :                         break;
     220             : 
     221           0 :                     case WAIT_LSN_TYPE_STANDBY_FLUSH:
     222           0 :                         ereport(ERROR,
     223             :                                 errcode(ERRCODE_QUERY_CANCELED),
     224             :                                 errmsg("timed out while waiting for target LSN %X/%08X to be flushed; current standby_flush LSN %X/%08X",
     225             :                                        LSN_FORMAT_ARGS(lsn),
     226             :                                        LSN_FORMAT_ARGS(currentLSN)));
     227             :                         break;
     228             : 
     229           0 :                     case WAIT_LSN_TYPE_PRIMARY_FLUSH:
     230           0 :                         ereport(ERROR,
     231             :                                 errcode(ERRCODE_QUERY_CANCELED),
     232             :                                 errmsg("timed out while waiting for target LSN %X/%08X to be flushed; current primary_flush LSN %X/%08X",
     233             :                                        LSN_FORMAT_ARGS(lsn),
     234             :                                        LSN_FORMAT_ARGS(currentLSN)));
     235             :                         break;
     236             : 
     237           0 :                     default:
     238           0 :                         elog(ERROR, "unexpected wait LSN type %d", lsnType);
     239             :                 }
     240             :             }
     241             :             else
     242           4 :                 result = "timeout";
     243           4 :             break;
     244             : 
     245          10 :         case WAIT_LSN_RESULT_NOT_IN_RECOVERY:
     246          10 :             if (throw)
     247             :             {
     248           8 :                 if (PromoteIsTriggered())
     249             :                 {
     250           6 :                     XLogRecPtr  currentLSN = GetCurrentLSNForWaitType(lsnType);
     251             : 
     252             :                     switch (lsnType)
     253             :                     {
     254           2 :                         case WAIT_LSN_TYPE_STANDBY_REPLAY:
     255           2 :                             ereport(ERROR,
     256             :                                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     257             :                                     errmsg("recovery is not in progress"),
     258             :                                     errdetail("Recovery ended before target LSN %X/%08X was replayed; last standby_replay LSN %X/%08X.",
     259             :                                               LSN_FORMAT_ARGS(lsn),
     260             :                                               LSN_FORMAT_ARGS(currentLSN)));
     261             :                             break;
     262             : 
     263           2 :                         case WAIT_LSN_TYPE_STANDBY_WRITE:
     264           2 :                             ereport(ERROR,
     265             :                                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     266             :                                     errmsg("recovery is not in progress"),
     267             :                                     errdetail("Recovery ended before target LSN %X/%08X was written; last standby_write LSN %X/%08X.",
     268             :                                               LSN_FORMAT_ARGS(lsn),
     269             :                                               LSN_FORMAT_ARGS(currentLSN)));
     270             :                             break;
     271             : 
     272           2 :                         case WAIT_LSN_TYPE_STANDBY_FLUSH:
     273           2 :                             ereport(ERROR,
     274             :                                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     275             :                                     errmsg("recovery is not in progress"),
     276             :                                     errdetail("Recovery ended before target LSN %X/%08X was flushed; last standby_flush LSN %X/%08X.",
     277             :                                               LSN_FORMAT_ARGS(lsn),
     278             :                                               LSN_FORMAT_ARGS(currentLSN)));
     279             :                             break;
     280             : 
     281           0 :                         default:
     282           0 :                             elog(ERROR, "unexpected wait LSN type %d", lsnType);
     283             :                     }
     284             :                 }
     285             :                 else
     286             :                 {
     287             :                     switch (lsnType)
     288             :                     {
     289           0 :                         case WAIT_LSN_TYPE_STANDBY_REPLAY:
     290           0 :                             ereport(ERROR,
     291             :                                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     292             :                                     errmsg("recovery is not in progress"),
     293             :                                     errhint("Waiting for the standby_replay LSN can only be executed during recovery."));
     294             :                             break;
     295             : 
     296           0 :                         case WAIT_LSN_TYPE_STANDBY_WRITE:
     297           0 :                             ereport(ERROR,
     298             :                                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     299             :                                     errmsg("recovery is not in progress"),
     300             :                                     errhint("Waiting for the standby_write LSN can only be executed during recovery."));
     301             :                             break;
     302             : 
     303           2 :                         case WAIT_LSN_TYPE_STANDBY_FLUSH:
     304           2 :                             ereport(ERROR,
     305             :                                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     306             :                                     errmsg("recovery is not in progress"),
     307             :                                     errhint("Waiting for the standby_flush LSN can only be executed during recovery."));
     308             :                             break;
     309             : 
     310           0 :                         default:
     311           0 :                             elog(ERROR, "unexpected wait LSN type %d", lsnType);
     312             :                     }
     313             :                 }
     314             :             }
     315             :             else
     316           2 :                 result = "not in recovery";
     317           2 :             break;
     318             :     }
     319             : 
     320             :     /* need a tuple descriptor representing a single TEXT column */
     321          76 :     tupdesc = WaitStmtResultDesc(stmt);
     322             : 
     323             :     /* prepare for projection of tuples */
     324          76 :     tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
     325             : 
     326             :     /* Send it */
     327          76 :     do_text_output_oneline(tstate, result);
     328             : 
     329          76 :     end_tup_output(tstate);
     330          76 : }
     331             : 
     332             : TupleDesc
     333         186 : WaitStmtResultDesc(WaitStmt *stmt)
     334             : {
     335             :     TupleDesc   tupdesc;
     336             : 
     337             :     /* Need a tuple descriptor representing a single TEXT  column */
     338         186 :     tupdesc = CreateTemplateTupleDesc(1);
     339         186 :     TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
     340             :                        TEXTOID, -1, 0);
     341         186 :     return tupdesc;
     342             : }

Generated by: LCOV version 1.16