LCOV - code coverage report
Current view: top level - src/backend/commands - wait.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 83.3 % 108 90
Test Date: 2026-05-02 16:16:32 Functions: 100.0 % 2 2
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * wait.c
       4              :  *    Implements WAIT FOR, which allows waiting for events such as
       5              :  *    time passing or LSN having been replayed, flushed, or written.
       6              :  *
       7              :  * Portions Copyright (c) 2025-2026, PostgreSQL Global Development Group
       8              :  *
       9              :  * IDENTIFICATION
      10              :  *    src/backend/commands/wait.c
      11              :  *
      12              :  *-------------------------------------------------------------------------
      13              :  */
      14              : #include "postgres.h"
      15              : 
      16              : #include <math.h>
      17              : 
      18              : #include "access/xlog.h"
      19              : #include "access/xlogrecovery.h"
      20              : #include "access/xlogwait.h"
      21              : #include "catalog/pg_type_d.h"
      22              : #include "commands/defrem.h"
      23              : #include "commands/wait.h"
      24              : #include "executor/executor.h"
      25              : #include "parser/parse_node.h"
      26              : #include "storage/proc.h"
      27              : #include "utils/builtins.h"
      28              : #include "utils/guc.h"
      29              : #include "utils/pg_lsn.h"
      30              : #include "utils/snapmgr.h"
      31              : 
      32              : 
      33              : void
      34          228 : ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, bool isTopLevel,
      35              :              DestReceiver *dest)
      36              : {
      37              :     XLogRecPtr  lsn;
      38          228 :     int64       timeout = 0;
      39              :     WaitLSNResult waitLSNResult;
      40          228 :     WaitLSNType lsnType = WAIT_LSN_TYPE_STANDBY_REPLAY; /* default */
      41          228 :     bool        throw = true;
      42              :     TupleDesc   tupdesc;
      43              :     TupOutputState *tstate;
      44          228 :     const char *result = "<unset>";
      45          228 :     bool        timeout_specified = false;
      46          228 :     bool        no_throw_specified = false;
      47          228 :     bool        mode_specified = false;
      48              : 
      49              :     /*
      50              :      * WAIT FOR must not be run as a non-top-level statement (e.g., inside a
      51              :      * function, procedure, or DO block). Forbid this case upfront.
      52              :      */
      53          228 :     if (!isTopLevel)
      54            3 :         ereport(ERROR,
      55              :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
      56              :                  errmsg("%s can only be executed as a top-level statement",
      57              :                         "WAIT FOR"),
      58              :                  errdetail("WAIT FOR cannot be used within a function, procedure, or DO block.")));
      59              : 
      60              :     /* Parse and validate the mandatory LSN */
      61          225 :     lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in,
      62              :                                           CStringGetDatum(stmt->lsn_literal)));
      63              : 
      64         1022 :     foreach_node(DefElem, defel, stmt->options)
      65              :     {
      66          590 :         if (strcmp(defel->defname, "mode") == 0)
      67              :         {
      68              :             char       *mode_str;
      69              : 
      70          203 :             if (mode_specified)
      71            1 :                 errorConflictingDefElem(defel, pstate);
      72          202 :             mode_specified = true;
      73              : 
      74          202 :             mode_str = defGetString(defel);
      75              : 
      76          202 :             if (pg_strcasecmp(mode_str, "standby_replay") == 0)
      77          158 :                 lsnType = WAIT_LSN_TYPE_STANDBY_REPLAY;
      78           44 :             else if (pg_strcasecmp(mode_str, "standby_write") == 0)
      79           26 :                 lsnType = WAIT_LSN_TYPE_STANDBY_WRITE;
      80           18 :             else if (pg_strcasecmp(mode_str, "standby_flush") == 0)
      81           10 :                 lsnType = WAIT_LSN_TYPE_STANDBY_FLUSH;
      82            8 :             else if (pg_strcasecmp(mode_str, "primary_flush") == 0)
      83            7 :                 lsnType = WAIT_LSN_TYPE_PRIMARY_FLUSH;
      84              :             else
      85            1 :                 ereport(ERROR,
      86              :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
      87              :                          errmsg("unrecognized value for %s option \"%s\": \"%s\"",
      88              :                                 "WAIT", defel->defname, mode_str),
      89              :                          parser_errposition(pstate, defel->location)));
      90              :         }
      91          387 :         else if (strcmp(defel->defname, "timeout") == 0)
      92              :         {
      93              :             char       *timeout_str;
      94              :             const char *hintmsg;
      95              :             double      dval;
      96              : 
      97          207 :             if (timeout_specified)
      98            1 :                 errorConflictingDefElem(defel, pstate);
      99          206 :             timeout_specified = true;
     100              : 
     101          206 :             timeout_str = defGetString(defel);
     102              : 
     103          206 :             if (!parse_real(timeout_str, &dval, GUC_UNIT_MS, &hintmsg))
     104              :             {
     105            1 :                 ereport(ERROR,
     106              :                         errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     107              :                         errmsg("invalid timeout value: \"%s\"", timeout_str),
     108              :                         hintmsg ? errhint("%s", _(hintmsg)) : 0);
     109              :             }
     110              : 
     111              :             /*
     112              :              * Get rid of any fractional part in the input. This is so we
     113              :              * don't fail on just-out-of-range values that would round into
     114              :              * range.
     115              :              */
     116          205 :             dval = rint(dval);
     117              : 
     118              :             /* Range check */
     119          205 :             if (unlikely(isnan(dval) || !FLOAT8_FITS_IN_INT64(dval)))
     120            0 :                 ereport(ERROR,
     121              :                         errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
     122              :                         errmsg("timeout value is out of range"));
     123              : 
     124          205 :             if (dval < 0)
     125            1 :                 ereport(ERROR,
     126              :                         errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     127              :                         errmsg("timeout cannot be negative"));
     128              : 
     129          204 :             timeout = (int64) dval;
     130              :         }
     131          180 :         else if (strcmp(defel->defname, "no_throw") == 0)
     132              :         {
     133          178 :             if (no_throw_specified)
     134            1 :                 errorConflictingDefElem(defel, pstate);
     135              : 
     136          177 :             no_throw_specified = true;
     137              : 
     138          177 :             throw = !defGetBoolean(defel);
     139              :         }
     140              :         else
     141              :         {
     142            2 :             ereport(ERROR,
     143              :                     errcode(ERRCODE_SYNTAX_ERROR),
     144              :                     errmsg("option \"%s\" not recognized",
     145              :                            defel->defname),
     146              :                     parser_errposition(pstate, defel->location));
     147              :         }
     148              :     }
     149              : 
     150              :     /*
     151              :      * We are going to wait for the LSN.  We should first care that we don't
     152              :      * hold a snapshot and correspondingly our MyProc->xmin is invalid.
     153              :      * Otherwise, our snapshot could prevent the replay of WAL records
     154              :      * implying a kind of self-deadlock.  This is the reason why WAIT FOR is a
     155              :      * command, not a procedure or function.
     156              :      *
     157              :      * Non-top-level contexts are rejected above, but be defensive and pop any
     158              :      * active snapshot if one is present.  PortalRunUtility() can tolerate
     159              :      * utility commands that remove the active snapshot.
     160              :      */
     161          216 :     if (ActiveSnapshotSet())
     162            0 :         PopActiveSnapshot();
     163              : 
     164              :     /*
     165              :      * At second, invalidate a catalog snapshot if any.  And we should be done
     166              :      * with the preparation.
     167              :      */
     168          216 :     InvalidateCatalogSnapshot();
     169              : 
     170              :     /* Give up if there is still an active or registered snapshot. */
     171          216 :     if (HaveRegisteredOrActiveSnapshot())
     172            1 :         ereport(ERROR,
     173              :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     174              :                 errmsg("WAIT FOR must be called without an active or registered snapshot"),
     175              :                 errdetail("WAIT FOR cannot be executed within a transaction with an isolation level higher than READ COMMITTED."));
     176              : 
     177              :     /*
     178              :      * As the result we should hold no snapshot, and correspondingly our xmin
     179              :      * should be unset.
     180              :      */
     181              :     Assert(MyProc->xmin == InvalidTransactionId);
     182              : 
     183              :     /*
     184              :      * Validate that the requested mode matches the current server state.
     185              :      * Primary modes can only be used on a primary.
     186              :      */
     187          215 :     if (lsnType == WAIT_LSN_TYPE_PRIMARY_FLUSH)
     188              :     {
     189            7 :         if (RecoveryInProgress())
     190            1 :             ereport(ERROR,
     191              :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     192              :                      errmsg("recovery is in progress"),
     193              :                      errhint("Waiting for primary_flush can only be done on a primary server. "
     194              :                              "Use standby_flush mode on a standby server.")));
     195              :     }
     196              : 
     197              :     /* Now wait for the LSN */
     198          214 :     waitLSNResult = WaitForLSN(lsnType, lsn, timeout);
     199              : 
     200              :     /*
     201              :      * Process the result of WaitForLSN().  Throw appropriate error if needed.
     202              :      */
     203          214 :     switch (waitLSNResult)
     204              :     {
     205          206 :         case WAIT_LSN_RESULT_SUCCESS:
     206              :             /* Nothing to do on success */
     207          206 :             result = "success";
     208          206 :             break;
     209              : 
     210            3 :         case WAIT_LSN_RESULT_TIMEOUT:
     211            3 :             if (throw)
     212              :             {
     213            1 :                 XLogRecPtr  currentLSN = GetCurrentLSNForWaitType(lsnType);
     214              : 
     215            1 :                 switch (lsnType)
     216              :                 {
     217            1 :                     case WAIT_LSN_TYPE_STANDBY_REPLAY:
     218            1 :                         ereport(ERROR,
     219              :                                 errcode(ERRCODE_QUERY_CANCELED),
     220              :                                 errmsg("timed out while waiting for target LSN %X/%08X to be replayed; current standby_replay LSN %X/%08X",
     221              :                                        LSN_FORMAT_ARGS(lsn),
     222              :                                        LSN_FORMAT_ARGS(currentLSN)));
     223              :                         break;
     224              : 
     225            0 :                     case WAIT_LSN_TYPE_STANDBY_WRITE:
     226            0 :                         ereport(ERROR,
     227              :                                 errcode(ERRCODE_QUERY_CANCELED),
     228              :                                 errmsg("timed out while waiting for target LSN %X/%08X to be written; current standby_write LSN %X/%08X",
     229              :                                        LSN_FORMAT_ARGS(lsn),
     230              :                                        LSN_FORMAT_ARGS(currentLSN)));
     231              :                         break;
     232              : 
     233            0 :                     case WAIT_LSN_TYPE_STANDBY_FLUSH:
     234            0 :                         ereport(ERROR,
     235              :                                 errcode(ERRCODE_QUERY_CANCELED),
     236              :                                 errmsg("timed out while waiting for target LSN %X/%08X to be flushed; current standby_flush LSN %X/%08X",
     237              :                                        LSN_FORMAT_ARGS(lsn),
     238              :                                        LSN_FORMAT_ARGS(currentLSN)));
     239              :                         break;
     240              : 
     241            0 :                     case WAIT_LSN_TYPE_PRIMARY_FLUSH:
     242            0 :                         ereport(ERROR,
     243              :                                 errcode(ERRCODE_QUERY_CANCELED),
     244              :                                 errmsg("timed out while waiting for target LSN %X/%08X to be flushed; current primary_flush LSN %X/%08X",
     245              :                                        LSN_FORMAT_ARGS(lsn),
     246              :                                        LSN_FORMAT_ARGS(currentLSN)));
     247              :                         break;
     248              : 
     249            0 :                     default:
     250            0 :                         elog(ERROR, "unexpected wait LSN type %d", lsnType);
     251              :                 }
     252              :             }
     253              :             else
     254            2 :                 result = "timeout";
     255            2 :             break;
     256              : 
     257            5 :         case WAIT_LSN_RESULT_NOT_IN_RECOVERY:
     258            5 :             if (throw)
     259              :             {
     260            4 :                 if (PromoteIsTriggered())
     261              :                 {
     262            3 :                     XLogRecPtr  currentLSN = GetCurrentLSNForWaitType(lsnType);
     263              : 
     264            3 :                     switch (lsnType)
     265              :                     {
     266            1 :                         case WAIT_LSN_TYPE_STANDBY_REPLAY:
     267            1 :                             ereport(ERROR,
     268              :                                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     269              :                                     errmsg("recovery is not in progress"),
     270              :                                     errdetail("Recovery ended before target LSN %X/%08X was replayed; last standby_replay LSN %X/%08X.",
     271              :                                               LSN_FORMAT_ARGS(lsn),
     272              :                                               LSN_FORMAT_ARGS(currentLSN)));
     273              :                             break;
     274              : 
     275            1 :                         case WAIT_LSN_TYPE_STANDBY_WRITE:
     276            1 :                             ereport(ERROR,
     277              :                                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     278              :                                     errmsg("recovery is not in progress"),
     279              :                                     errdetail("Recovery ended before target LSN %X/%08X was written; last standby_write LSN %X/%08X.",
     280              :                                               LSN_FORMAT_ARGS(lsn),
     281              :                                               LSN_FORMAT_ARGS(currentLSN)));
     282              :                             break;
     283              : 
     284            1 :                         case WAIT_LSN_TYPE_STANDBY_FLUSH:
     285            1 :                             ereport(ERROR,
     286              :                                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     287              :                                     errmsg("recovery is not in progress"),
     288              :                                     errdetail("Recovery ended before target LSN %X/%08X was flushed; last standby_flush LSN %X/%08X.",
     289              :                                               LSN_FORMAT_ARGS(lsn),
     290              :                                               LSN_FORMAT_ARGS(currentLSN)));
     291              :                             break;
     292              : 
     293            0 :                         default:
     294            0 :                             elog(ERROR, "unexpected wait LSN type %d", lsnType);
     295              :                     }
     296              :                 }
     297              :                 else
     298              :                 {
     299            1 :                     switch (lsnType)
     300              :                     {
     301            0 :                         case WAIT_LSN_TYPE_STANDBY_REPLAY:
     302            0 :                             ereport(ERROR,
     303              :                                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     304              :                                     errmsg("recovery is not in progress"),
     305              :                                     errhint("Waiting for the standby_replay LSN can only be executed during recovery."));
     306              :                             break;
     307              : 
     308            0 :                         case WAIT_LSN_TYPE_STANDBY_WRITE:
     309            0 :                             ereport(ERROR,
     310              :                                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     311              :                                     errmsg("recovery is not in progress"),
     312              :                                     errhint("Waiting for the standby_write LSN can only be executed during recovery."));
     313              :                             break;
     314              : 
     315            1 :                         case WAIT_LSN_TYPE_STANDBY_FLUSH:
     316            1 :                             ereport(ERROR,
     317              :                                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     318              :                                     errmsg("recovery is not in progress"),
     319              :                                     errhint("Waiting for the standby_flush LSN can only be executed during recovery."));
     320              :                             break;
     321              : 
     322            0 :                         default:
     323            0 :                             elog(ERROR, "unexpected wait LSN type %d", lsnType);
     324              :                     }
     325              :                 }
     326              :             }
     327              :             else
     328            1 :                 result = "not in recovery";
     329            1 :             break;
     330              :     }
     331              : 
     332              :     /* need a tuple descriptor representing a single TEXT column */
     333          209 :     tupdesc = WaitStmtResultDesc(stmt);
     334              : 
     335              :     /* prepare for projection of tuples */
     336          209 :     tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
     337              : 
     338              :     /* Send it */
     339          209 :     do_text_output_oneline(tstate, result);
     340              : 
     341          209 :     end_tup_output(tstate);
     342          209 : }
     343              : 
     344              : TupleDesc
     345          437 : WaitStmtResultDesc(WaitStmt *stmt)
     346              : {
     347              :     TupleDesc   tupdesc;
     348              : 
     349              :     /*
     350              :      * Need a tuple descriptor representing a single TEXT column.
     351              :      *
     352              :      * We use TupleDescInitBuiltinEntry instead of TupleDescInitEntry to avoid
     353              :      * syscache access. This is important because WaitStmtResultDesc may be
     354              :      * called after snapshots have been released, and we must not re-establish
     355              :      * a catalog snapshot which could cause recovery conflicts on a standby.
     356              :      */
     357          437 :     tupdesc = CreateTemplateTupleDesc(1);
     358          437 :     TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "status",
     359              :                               TEXTOID, -1, 0);
     360          437 :     TupleDescFinalize(tupdesc);
     361          437 :     return tupdesc;
     362              : }
        

Generated by: LCOV version 2.0-1