LCOV - code coverage report
Current view: top level - src/backend/commands - wait.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 84.0 % 106 89
Test Date: 2026-03-22 08:15:57 Functions: 100.0 % 2 2
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * wait.c
       4              :  *    Implements WAIT FOR, which allows waiting for events such as
       5              :  *    time passing or LSN having been replayed, flushed, or written.
       6              :  *
       7              :  * Portions Copyright (c) 2025-2026, PostgreSQL Global Development Group
       8              :  *
       9              :  * IDENTIFICATION
      10              :  *    src/backend/commands/wait.c
      11              :  *
      12              :  *-------------------------------------------------------------------------
      13              :  */
      14              : #include "postgres.h"
      15              : 
      16              : #include <math.h>
      17              : 
      18              : #include "access/xlog.h"
      19              : #include "access/xlogrecovery.h"
      20              : #include "access/xlogwait.h"
      21              : #include "catalog/pg_type_d.h"
      22              : #include "commands/defrem.h"
      23              : #include "commands/wait.h"
      24              : #include "executor/executor.h"
      25              : #include "parser/parse_node.h"
      26              : #include "storage/proc.h"
      27              : #include "utils/builtins.h"
      28              : #include "utils/guc.h"
      29              : #include "utils/pg_lsn.h"
      30              : #include "utils/snapmgr.h"
      31              : 
      32              : 
      33              : void
      34           55 : ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest)
      35              : {
      36              :     XLogRecPtr  lsn;
      37           55 :     int64       timeout = 0;
      38              :     WaitLSNResult waitLSNResult;
      39           55 :     WaitLSNType lsnType = WAIT_LSN_TYPE_STANDBY_REPLAY; /* default */
      40           55 :     bool        throw = true;
      41              :     TupleDesc   tupdesc;
      42              :     TupOutputState *tstate;
      43           55 :     const char *result = "<unset>";
      44           55 :     bool        timeout_specified = false;
      45           55 :     bool        no_throw_specified = false;
      46           55 :     bool        mode_specified = false;
      47              : 
      48              :     /* Parse and validate the mandatory LSN */
      49           55 :     lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in,
      50              :                                           CStringGetDatum(stmt->lsn_literal)));
      51              : 
      52          169 :     foreach_node(DefElem, defel, stmt->options)
      53              :     {
      54           77 :         if (strcmp(defel->defname, "mode") == 0)
      55              :         {
      56              :             char       *mode_str;
      57              : 
      58           32 :             if (mode_specified)
      59            1 :                 errorConflictingDefElem(defel, pstate);
      60           31 :             mode_specified = true;
      61              : 
      62           31 :             mode_str = defGetString(defel);
      63              : 
      64           31 :             if (pg_strcasecmp(mode_str, "standby_replay") == 0)
      65            4 :                 lsnType = WAIT_LSN_TYPE_STANDBY_REPLAY;
      66           27 :             else if (pg_strcasecmp(mode_str, "standby_write") == 0)
      67            9 :                 lsnType = WAIT_LSN_TYPE_STANDBY_WRITE;
      68           18 :             else if (pg_strcasecmp(mode_str, "standby_flush") == 0)
      69           10 :                 lsnType = WAIT_LSN_TYPE_STANDBY_FLUSH;
      70            8 :             else if (pg_strcasecmp(mode_str, "primary_flush") == 0)
      71            7 :                 lsnType = WAIT_LSN_TYPE_PRIMARY_FLUSH;
      72              :             else
      73            1 :                 ereport(ERROR,
      74              :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
      75              :                          errmsg("unrecognized value for %s option \"%s\": \"%s\"",
      76              :                                 "WAIT", defel->defname, mode_str),
      77              :                          parser_errposition(pstate, defel->location)));
      78              :         }
      79           45 :         else if (strcmp(defel->defname, "timeout") == 0)
      80              :         {
      81              :             char       *timeout_str;
      82              :             const char *hintmsg;
      83              :             double      result;
      84              : 
      85           36 :             if (timeout_specified)
      86            1 :                 errorConflictingDefElem(defel, pstate);
      87           35 :             timeout_specified = true;
      88              : 
      89           35 :             timeout_str = defGetString(defel);
      90              : 
      91           35 :             if (!parse_real(timeout_str, &result, GUC_UNIT_MS, &hintmsg))
      92              :             {
      93            1 :                 ereport(ERROR,
      94              :                         errcode(ERRCODE_INVALID_PARAMETER_VALUE),
      95              :                         errmsg("invalid timeout value: \"%s\"", timeout_str),
      96              :                         hintmsg ? errhint("%s", _(hintmsg)) : 0);
      97              :             }
      98              : 
      99              :             /*
     100              :              * Get rid of any fractional part in the input. This is so we
     101              :              * don't fail on just-out-of-range values that would round into
     102              :              * range.
     103              :              */
     104           34 :             result = rint(result);
     105              : 
     106              :             /* Range check */
     107           34 :             if (unlikely(isnan(result) || !FLOAT8_FITS_IN_INT64(result)))
     108            0 :                 ereport(ERROR,
     109              :                         errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
     110              :                         errmsg("timeout value is out of range"));
     111              : 
     112           34 :             if (result < 0)
     113            1 :                 ereport(ERROR,
     114              :                         errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     115              :                         errmsg("timeout cannot be negative"));
     116              : 
     117           33 :             timeout = (int64) result;
     118              :         }
     119            9 :         else if (strcmp(defel->defname, "no_throw") == 0)
     120              :         {
     121            7 :             if (no_throw_specified)
     122            1 :                 errorConflictingDefElem(defel, pstate);
     123              : 
     124            6 :             no_throw_specified = true;
     125              : 
     126            6 :             throw = !defGetBoolean(defel);
     127              :         }
     128              :         else
     129              :         {
     130            2 :             ereport(ERROR,
     131              :                     errcode(ERRCODE_SYNTAX_ERROR),
     132              :                     errmsg("option \"%s\" not recognized",
     133              :                            defel->defname),
     134              :                     parser_errposition(pstate, defel->location));
     135              :         }
     136              :     }
     137              : 
     138              :     /*
     139              :      * We are going to wait for the LSN.  We should first care that we don't
     140              :      * hold a snapshot and correspondingly our MyProc->xmin is invalid.
     141              :      * Otherwise, our snapshot could prevent the replay of WAL records
     142              :      * implying a kind of self-deadlock.  This is the reason why WAIT FOR is a
     143              :      * command, not a procedure or function.
     144              :      *
     145              :      * At first, we should check there is no active snapshot.  According to
     146              :      * PlannedStmtRequiresSnapshot(), even in an atomic context, CallStmt is
     147              :      * processed with a snapshot.  Thankfully, we can pop this snapshot,
     148              :      * because PortalRunUtility() can tolerate this.
     149              :      */
     150           46 :     if (ActiveSnapshotSet())
     151            1 :         PopActiveSnapshot();
     152              : 
     153              :     /*
     154              :      * At second, invalidate a catalog snapshot if any.  And we should be done
     155              :      * with the preparation.
     156              :      */
     157           46 :     InvalidateCatalogSnapshot();
     158              : 
     159              :     /* Give up if there is still an active or registered snapshot. */
     160           46 :     if (HaveRegisteredOrActiveSnapshot())
     161            2 :         ereport(ERROR,
     162              :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     163              :                 errmsg("WAIT FOR must be called without an active or registered snapshot"),
     164              :                 errdetail("WAIT FOR cannot be executed from a function or procedure, nor within a transaction with an isolation level higher than READ COMMITTED."));
     165              : 
     166              :     /*
     167              :      * As the result we should hold no snapshot, and correspondingly our xmin
     168              :      * should be unset.
     169              :      */
     170              :     Assert(MyProc->xmin == InvalidTransactionId);
     171              : 
     172              :     /*
     173              :      * Validate that the requested mode matches the current server state.
     174              :      * Primary modes can only be used on a primary.
     175              :      */
     176           44 :     if (lsnType == WAIT_LSN_TYPE_PRIMARY_FLUSH)
     177              :     {
     178            7 :         if (RecoveryInProgress())
     179            1 :             ereport(ERROR,
     180              :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     181              :                      errmsg("recovery is in progress"),
     182              :                      errhint("Waiting for primary_flush can only be done on a primary server. "
     183              :                              "Use standby_flush mode on a standby server.")));
     184              :     }
     185              : 
     186              :     /* Now wait for the LSN */
     187           43 :     waitLSNResult = WaitForLSN(lsnType, lsn, timeout);
     188              : 
     189              :     /*
     190              :      * Process the result of WaitForLSN().  Throw appropriate error if needed.
     191              :      */
     192           43 :     switch (waitLSNResult)
     193              :     {
     194           35 :         case WAIT_LSN_RESULT_SUCCESS:
     195              :             /* Nothing to do on success */
     196           35 :             result = "success";
     197           35 :             break;
     198              : 
     199            3 :         case WAIT_LSN_RESULT_TIMEOUT:
     200            3 :             if (throw)
     201              :             {
     202            1 :                 XLogRecPtr  currentLSN = GetCurrentLSNForWaitType(lsnType);
     203              : 
     204            1 :                 switch (lsnType)
     205              :                 {
     206            1 :                     case WAIT_LSN_TYPE_STANDBY_REPLAY:
     207            1 :                         ereport(ERROR,
     208              :                                 errcode(ERRCODE_QUERY_CANCELED),
     209              :                                 errmsg("timed out while waiting for target LSN %X/%08X to be replayed; current standby_replay LSN %X/%08X",
     210              :                                        LSN_FORMAT_ARGS(lsn),
     211              :                                        LSN_FORMAT_ARGS(currentLSN)));
     212              :                         break;
     213              : 
     214            0 :                     case WAIT_LSN_TYPE_STANDBY_WRITE:
     215            0 :                         ereport(ERROR,
     216              :                                 errcode(ERRCODE_QUERY_CANCELED),
     217              :                                 errmsg("timed out while waiting for target LSN %X/%08X to be written; current standby_write LSN %X/%08X",
     218              :                                        LSN_FORMAT_ARGS(lsn),
     219              :                                        LSN_FORMAT_ARGS(currentLSN)));
     220              :                         break;
     221              : 
     222            0 :                     case WAIT_LSN_TYPE_STANDBY_FLUSH:
     223            0 :                         ereport(ERROR,
     224              :                                 errcode(ERRCODE_QUERY_CANCELED),
     225              :                                 errmsg("timed out while waiting for target LSN %X/%08X to be flushed; current standby_flush LSN %X/%08X",
     226              :                                        LSN_FORMAT_ARGS(lsn),
     227              :                                        LSN_FORMAT_ARGS(currentLSN)));
     228              :                         break;
     229              : 
     230            0 :                     case WAIT_LSN_TYPE_PRIMARY_FLUSH:
     231            0 :                         ereport(ERROR,
     232              :                                 errcode(ERRCODE_QUERY_CANCELED),
     233              :                                 errmsg("timed out while waiting for target LSN %X/%08X to be flushed; current primary_flush LSN %X/%08X",
     234              :                                        LSN_FORMAT_ARGS(lsn),
     235              :                                        LSN_FORMAT_ARGS(currentLSN)));
     236              :                         break;
     237              : 
     238            0 :                     default:
     239            0 :                         elog(ERROR, "unexpected wait LSN type %d", lsnType);
     240              :                 }
     241              :             }
     242              :             else
     243            2 :                 result = "timeout";
     244            2 :             break;
     245              : 
     246            5 :         case WAIT_LSN_RESULT_NOT_IN_RECOVERY:
     247            5 :             if (throw)
     248              :             {
     249            4 :                 if (PromoteIsTriggered())
     250              :                 {
     251            3 :                     XLogRecPtr  currentLSN = GetCurrentLSNForWaitType(lsnType);
     252              : 
     253            3 :                     switch (lsnType)
     254              :                     {
     255            1 :                         case WAIT_LSN_TYPE_STANDBY_REPLAY:
     256            1 :                             ereport(ERROR,
     257              :                                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     258              :                                     errmsg("recovery is not in progress"),
     259              :                                     errdetail("Recovery ended before target LSN %X/%08X was replayed; last standby_replay LSN %X/%08X.",
     260              :                                               LSN_FORMAT_ARGS(lsn),
     261              :                                               LSN_FORMAT_ARGS(currentLSN)));
     262              :                             break;
     263              : 
     264            1 :                         case WAIT_LSN_TYPE_STANDBY_WRITE:
     265            1 :                             ereport(ERROR,
     266              :                                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     267              :                                     errmsg("recovery is not in progress"),
     268              :                                     errdetail("Recovery ended before target LSN %X/%08X was written; last standby_write LSN %X/%08X.",
     269              :                                               LSN_FORMAT_ARGS(lsn),
     270              :                                               LSN_FORMAT_ARGS(currentLSN)));
     271              :                             break;
     272              : 
     273            1 :                         case WAIT_LSN_TYPE_STANDBY_FLUSH:
     274            1 :                             ereport(ERROR,
     275              :                                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     276              :                                     errmsg("recovery is not in progress"),
     277              :                                     errdetail("Recovery ended before target LSN %X/%08X was flushed; last standby_flush LSN %X/%08X.",
     278              :                                               LSN_FORMAT_ARGS(lsn),
     279              :                                               LSN_FORMAT_ARGS(currentLSN)));
     280              :                             break;
     281              : 
     282            0 :                         default:
     283            0 :                             elog(ERROR, "unexpected wait LSN type %d", lsnType);
     284              :                     }
     285              :                 }
     286              :                 else
     287              :                 {
     288            1 :                     switch (lsnType)
     289              :                     {
     290            0 :                         case WAIT_LSN_TYPE_STANDBY_REPLAY:
     291            0 :                             ereport(ERROR,
     292              :                                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     293              :                                     errmsg("recovery is not in progress"),
     294              :                                     errhint("Waiting for the standby_replay LSN can only be executed during recovery."));
     295              :                             break;
     296              : 
     297            0 :                         case WAIT_LSN_TYPE_STANDBY_WRITE:
     298            0 :                             ereport(ERROR,
     299              :                                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     300              :                                     errmsg("recovery is not in progress"),
     301              :                                     errhint("Waiting for the standby_write LSN can only be executed during recovery."));
     302              :                             break;
     303              : 
     304            1 :                         case WAIT_LSN_TYPE_STANDBY_FLUSH:
     305            1 :                             ereport(ERROR,
     306              :                                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     307              :                                     errmsg("recovery is not in progress"),
     308              :                                     errhint("Waiting for the standby_flush LSN can only be executed during recovery."));
     309              :                             break;
     310              : 
     311            0 :                         default:
     312            0 :                             elog(ERROR, "unexpected wait LSN type %d", lsnType);
     313              :                     }
     314              :                 }
     315              :             }
     316              :             else
     317            1 :                 result = "not in recovery";
     318            1 :             break;
     319              :     }
     320              : 
     321              :     /* need a tuple descriptor representing a single TEXT column */
     322           38 :     tupdesc = WaitStmtResultDesc(stmt);
     323              : 
     324              :     /* prepare for projection of tuples */
     325           38 :     tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
     326              : 
     327              :     /* Send it */
     328           38 :     do_text_output_oneline(tstate, result);
     329              : 
     330           38 :     end_tup_output(tstate);
     331           38 : }
     332              : 
     333              : TupleDesc
     334           93 : WaitStmtResultDesc(WaitStmt *stmt)
     335              : {
     336              :     TupleDesc   tupdesc;
     337              : 
     338              :     /* Need a tuple descriptor representing a single TEXT  column */
     339           93 :     tupdesc = CreateTemplateTupleDesc(1);
     340           93 :     TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
     341              :                        TEXTOID, -1, 0);
     342           93 :     TupleDescFinalize(tupdesc);
     343           93 :     return tupdesc;
     344              : }
        

Generated by: LCOV version 2.0-1