LCOV - code coverage report
Current view: top level - src/backend/commands - wait.c (source / functions) Coverage Total Hit
Test: PostgreSQL 20devel Lines: 83.3 % 108 90
Test Date: 2026-07-03 19:57:34 Functions: 100.0 % 2 2
Legend: Lines:     hit not hit
Branches: + taken - not taken # not executed
Branches: 60.9 % 115 70

             Branch data     Line data    Source code
       1                 :             : /*-------------------------------------------------------------------------
       2                 :             :  *
       3                 :             :  * wait.c
       4                 :             :  *    Implements WAIT FOR, which allows waiting for events such as
       5                 :             :  *    time passing or LSN having been replayed, flushed, or written.
       6                 :             :  *
       7                 :             :  * Portions Copyright (c) 2025-2026, PostgreSQL Global Development Group
       8                 :             :  *
       9                 :             :  * IDENTIFICATION
      10                 :             :  *    src/backend/commands/wait.c
      11                 :             :  *
      12                 :             :  *-------------------------------------------------------------------------
      13                 :             :  */
      14                 :             : #include "postgres.h"
      15                 :             : 
      16                 :             : #include <math.h>
      17                 :             : 
      18                 :             : #include "access/xlog.h"
      19                 :             : #include "access/xlogrecovery.h"
      20                 :             : #include "access/xlogwait.h"
      21                 :             : #include "catalog/pg_type_d.h"
      22                 :             : #include "commands/defrem.h"
      23                 :             : #include "commands/wait.h"
      24                 :             : #include "executor/executor.h"
      25                 :             : #include "parser/parse_node.h"
      26                 :             : #include "storage/proc.h"
      27                 :             : #include "utils/builtins.h"
      28                 :             : #include "utils/guc.h"
      29                 :             : #include "utils/pg_lsn.h"
      30                 :             : #include "utils/snapmgr.h"
      31                 :             : 
      32                 :             : 
      33                 :             : void
      34                 :         252 : ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, bool isTopLevel,
      35                 :             :              DestReceiver *dest)
      36                 :             : {
      37                 :             :     XLogRecPtr  lsn;
      38                 :         252 :     int64       timeout = 0;
      39                 :             :     WaitLSNResult waitLSNResult;
      40                 :         252 :     WaitLSNType lsnType = WAIT_LSN_TYPE_STANDBY_REPLAY; /* default */
      41                 :         252 :     bool        throw = true;
      42                 :             :     TupleDesc   tupdesc;
      43                 :             :     TupOutputState *tstate;
      44                 :         252 :     const char *result = "<unset>";
      45                 :         252 :     bool        timeout_specified = false;
      46                 :         252 :     bool        no_throw_specified = false;
      47                 :         252 :     bool        mode_specified = false;
      48                 :             : 
      49                 :             :     /*
      50                 :             :      * WAIT FOR must not be run as a non-top-level statement (e.g., inside a
      51                 :             :      * function, procedure, or DO block). Forbid this case upfront.
      52                 :             :      */
      53         [ +  + ]:         252 :     if (!isTopLevel)
      54         [ +  - ]:           3 :         ereport(ERROR,
      55                 :             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
      56                 :             :                  errmsg("%s can only be executed as a top-level statement",
      57                 :             :                         "WAIT FOR"),
      58                 :             :                  errdetail("WAIT FOR cannot be used within a function, procedure, or DO block.")));
      59                 :             : 
      60                 :             :     /* Parse and validate the mandatory LSN */
      61                 :         249 :     lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in,
      62                 :             :                                           CStringGetDatum(stmt->lsn_literal)));
      63                 :             : 
      64   [ +  +  +  +  :        1140 :     foreach_node(DefElem, defel, stmt->options)
                   +  + ]
      65                 :             :     {
      66         [ +  + ]:         660 :         if (strcmp(defel->defname, "mode") == 0)
      67                 :             :         {
      68                 :             :             char       *mode_str;
      69                 :             : 
      70         [ +  + ]:         227 :             if (mode_specified)
      71                 :           1 :                 errorConflictingDefElem(defel, pstate);
      72                 :         226 :             mode_specified = true;
      73                 :             : 
      74                 :         226 :             mode_str = defGetString(defel);
      75                 :             : 
      76         [ +  + ]:         226 :             if (pg_strcasecmp(mode_str, "standby_replay") == 0)
      77                 :         173 :                 lsnType = WAIT_LSN_TYPE_STANDBY_REPLAY;
      78         [ +  + ]:          53 :             else if (pg_strcasecmp(mode_str, "standby_write") == 0)
      79                 :          27 :                 lsnType = WAIT_LSN_TYPE_STANDBY_WRITE;
      80         [ +  + ]:          26 :             else if (pg_strcasecmp(mode_str, "standby_flush") == 0)
      81                 :          16 :                 lsnType = WAIT_LSN_TYPE_STANDBY_FLUSH;
      82         [ +  + ]:          10 :             else if (pg_strcasecmp(mode_str, "primary_flush") == 0)
      83                 :           9 :                 lsnType = WAIT_LSN_TYPE_PRIMARY_FLUSH;
      84                 :             :             else
      85         [ +  - ]:           1 :                 ereport(ERROR,
      86                 :             :                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
      87                 :             :                          errmsg("unrecognized value for %s option \"%s\": \"%s\"",
      88                 :             :                                 "WAIT", defel->defname, mode_str),
      89                 :             :                          parser_errposition(pstate, defel->location)));
      90                 :             :         }
      91         [ +  + ]:         433 :         else if (strcmp(defel->defname, "timeout") == 0)
      92                 :             :         {
      93                 :             :             char       *timeout_str;
      94                 :             :             const char *hintmsg;
      95                 :             :             double      dval;
      96                 :             : 
      97         [ +  + ]:         230 :             if (timeout_specified)
      98                 :           1 :                 errorConflictingDefElem(defel, pstate);
      99                 :         229 :             timeout_specified = true;
     100                 :             : 
     101                 :         229 :             timeout_str = defGetString(defel);
     102                 :             : 
     103         [ +  + ]:         229 :             if (!parse_real(timeout_str, &dval, GUC_UNIT_MS, &hintmsg))
     104                 :             :             {
     105   [ +  -  -  + ]:           1 :                 ereport(ERROR,
     106                 :             :                         errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     107                 :             :                         errmsg("invalid timeout value: \"%s\"", timeout_str),
     108                 :             :                         hintmsg ? errhint("%s", _(hintmsg)) : 0);
     109                 :             :             }
     110                 :             : 
     111                 :             :             /*
     112                 :             :              * Get rid of any fractional part in the input. This is so we
     113                 :             :              * don't fail on just-out-of-range values that would round into
     114                 :             :              * range.
     115                 :             :              */
     116                 :         228 :             dval = rint(dval);
     117                 :             : 
     118                 :             :             /* Range check */
     119   [ +  -  +  -  :         228 :             if (unlikely(isnan(dval) || !FLOAT8_FITS_IN_INT64(dval)))
             -  +  -  + ]
     120         [ #  # ]:           0 :                 ereport(ERROR,
     121                 :             :                         errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
     122                 :             :                         errmsg("timeout value is out of range"));
     123                 :             : 
     124         [ +  + ]:         228 :             if (dval < 0)
     125         [ +  - ]:           1 :                 ereport(ERROR,
     126                 :             :                         errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     127                 :             :                         errmsg("timeout cannot be negative"));
     128                 :             : 
     129                 :         227 :             timeout = (int64) dval;
     130                 :             :         }
     131         [ +  + ]:         203 :         else if (strcmp(defel->defname, "no_throw") == 0)
     132                 :             :         {
     133         [ +  + ]:         201 :             if (no_throw_specified)
     134                 :           1 :                 errorConflictingDefElem(defel, pstate);
     135                 :             : 
     136                 :         200 :             no_throw_specified = true;
     137                 :             : 
     138                 :         200 :             throw = !defGetBoolean(defel);
     139                 :             :         }
     140                 :             :         else
     141                 :             :         {
     142         [ +  - ]:           2 :             ereport(ERROR,
     143                 :             :                     errcode(ERRCODE_SYNTAX_ERROR),
     144                 :             :                     errmsg("option \"%s\" not recognized",
     145                 :             :                            defel->defname),
     146                 :             :                     parser_errposition(pstate, defel->location));
     147                 :             :         }
     148                 :             :     }
     149                 :             : 
     150                 :             :     /*
     151                 :             :      * We are going to wait for the LSN.  We should first care that we don't
     152                 :             :      * hold a snapshot and correspondingly our MyProc->xmin is invalid.
     153                 :             :      * Otherwise, our snapshot could prevent the replay of WAL records
     154                 :             :      * implying a kind of self-deadlock.  This is the reason why WAIT FOR is a
     155                 :             :      * command, not a procedure or function.
     156                 :             :      *
     157                 :             :      * Non-top-level contexts are rejected above, but be defensive and pop any
     158                 :             :      * active snapshot if one is present.  PortalRunUtility() can tolerate
     159                 :             :      * utility commands that remove the active snapshot.
     160                 :             :      */
     161         [ -  + ]:         240 :     if (ActiveSnapshotSet())
     162                 :           0 :         PopActiveSnapshot();
     163                 :             : 
     164                 :             :     /*
     165                 :             :      * At second, invalidate a catalog snapshot if any.  And we should be done
     166                 :             :      * with the preparation.
     167                 :             :      */
     168                 :         240 :     InvalidateCatalogSnapshot();
     169                 :             : 
     170                 :             :     /* Give up if there is still an active or registered snapshot. */
     171         [ +  + ]:         240 :     if (HaveRegisteredOrActiveSnapshot())
     172         [ +  - ]:           1 :         ereport(ERROR,
     173                 :             :                 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     174                 :             :                 errmsg("WAIT FOR must be called without an active or registered snapshot"),
     175                 :             :                 errdetail("WAIT FOR cannot be executed within a transaction with an isolation level higher than READ COMMITTED."));
     176                 :             : 
     177                 :             :     /*
     178                 :             :      * As the result we should hold no snapshot, and correspondingly our xmin
     179                 :             :      * should be unset.
     180                 :             :      */
     181                 :             :     Assert(MyProc->xmin == InvalidTransactionId);
     182                 :             : 
     183                 :             :     /*
     184                 :             :      * Validate that the requested mode matches the current server state.
     185                 :             :      * Primary modes can only be used on a primary.
     186                 :             :      */
     187         [ +  + ]:         239 :     if (lsnType == WAIT_LSN_TYPE_PRIMARY_FLUSH)
     188                 :             :     {
     189         [ +  + ]:           9 :         if (RecoveryInProgress())
     190         [ +  - ]:           1 :             ereport(ERROR,
     191                 :             :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     192                 :             :                      errmsg("recovery is in progress"),
     193                 :             :                      errhint("Waiting for primary_flush can only be done on a primary server. "
     194                 :             :                              "Use standby_flush mode on a standby server.")));
     195                 :             :     }
     196                 :             : 
     197                 :             :     /* Now wait for the LSN */
     198                 :         238 :     waitLSNResult = WaitForLSN(lsnType, lsn, timeout);
     199                 :             : 
     200                 :             :     /*
     201                 :             :      * Process the result of WaitForLSN().  Throw appropriate error if needed.
     202                 :             :      */
     203   [ +  +  +  - ]:         237 :     switch (waitLSNResult)
     204                 :             :     {
     205                 :         226 :         case WAIT_LSN_RESULT_SUCCESS:
     206                 :             :             /* Nothing to do on success */
     207                 :         226 :             result = "success";
     208                 :         226 :             break;
     209                 :             : 
     210                 :           6 :         case WAIT_LSN_RESULT_TIMEOUT:
     211         [ +  + ]:           6 :             if (throw)
     212                 :             :             {
     213                 :           1 :                 XLogRecPtr  currentLSN = GetCurrentLSNForWaitType(lsnType);
     214                 :             : 
     215   [ +  -  -  -  :           1 :                 switch (lsnType)
                      - ]
     216                 :             :                 {
     217                 :           1 :                     case WAIT_LSN_TYPE_STANDBY_REPLAY:
     218         [ +  - ]:           1 :                         ereport(ERROR,
     219                 :             :                                 errcode(ERRCODE_QUERY_CANCELED),
     220                 :             :                                 errmsg("timed out while waiting for target LSN %X/%08X to be replayed; current standby_replay LSN %X/%08X",
     221                 :             :                                        LSN_FORMAT_ARGS(lsn),
     222                 :             :                                        LSN_FORMAT_ARGS(currentLSN)));
     223                 :             :                         break;
     224                 :             : 
     225                 :           0 :                     case WAIT_LSN_TYPE_STANDBY_WRITE:
     226         [ #  # ]:           0 :                         ereport(ERROR,
     227                 :             :                                 errcode(ERRCODE_QUERY_CANCELED),
     228                 :             :                                 errmsg("timed out while waiting for target LSN %X/%08X to be written; current standby_write LSN %X/%08X",
     229                 :             :                                        LSN_FORMAT_ARGS(lsn),
     230                 :             :                                        LSN_FORMAT_ARGS(currentLSN)));
     231                 :             :                         break;
     232                 :             : 
     233                 :           0 :                     case WAIT_LSN_TYPE_STANDBY_FLUSH:
     234         [ #  # ]:           0 :                         ereport(ERROR,
     235                 :             :                                 errcode(ERRCODE_QUERY_CANCELED),
     236                 :             :                                 errmsg("timed out while waiting for target LSN %X/%08X to be flushed; current standby_flush LSN %X/%08X",
     237                 :             :                                        LSN_FORMAT_ARGS(lsn),
     238                 :             :                                        LSN_FORMAT_ARGS(currentLSN)));
     239                 :             :                         break;
     240                 :             : 
     241                 :           0 :                     case WAIT_LSN_TYPE_PRIMARY_FLUSH:
     242         [ #  # ]:           0 :                         ereport(ERROR,
     243                 :             :                                 errcode(ERRCODE_QUERY_CANCELED),
     244                 :             :                                 errmsg("timed out while waiting for target LSN %X/%08X to be flushed; current primary_flush LSN %X/%08X",
     245                 :             :                                        LSN_FORMAT_ARGS(lsn),
     246                 :             :                                        LSN_FORMAT_ARGS(currentLSN)));
     247                 :             :                         break;
     248                 :             : 
     249                 :           0 :                     default:
     250         [ #  # ]:           0 :                         elog(ERROR, "unexpected wait LSN type %d", lsnType);
     251                 :             :                 }
     252                 :             :             }
     253                 :             :             else
     254                 :           5 :                 result = "timeout";
     255                 :           5 :             break;
     256                 :             : 
     257                 :           5 :         case WAIT_LSN_RESULT_NOT_IN_RECOVERY:
     258         [ +  + ]:           5 :             if (throw)
     259                 :             :             {
     260         [ +  + ]:           4 :                 if (PromoteIsTriggered())
     261                 :             :                 {
     262                 :           3 :                     XLogRecPtr  currentLSN = GetCurrentLSNForWaitType(lsnType);
     263                 :             : 
     264   [ +  +  +  - ]:           3 :                     switch (lsnType)
     265                 :             :                     {
     266                 :           1 :                         case WAIT_LSN_TYPE_STANDBY_REPLAY:
     267         [ +  - ]:           1 :                             ereport(ERROR,
     268                 :             :                                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     269                 :             :                                     errmsg("recovery is not in progress"),
     270                 :             :                                     errdetail("Recovery ended before target LSN %X/%08X was replayed; last standby_replay LSN %X/%08X.",
     271                 :             :                                               LSN_FORMAT_ARGS(lsn),
     272                 :             :                                               LSN_FORMAT_ARGS(currentLSN)));
     273                 :             :                             break;
     274                 :             : 
     275                 :           1 :                         case WAIT_LSN_TYPE_STANDBY_WRITE:
     276         [ +  - ]:           1 :                             ereport(ERROR,
     277                 :             :                                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     278                 :             :                                     errmsg("recovery is not in progress"),
     279                 :             :                                     errdetail("Recovery ended before target LSN %X/%08X was written; last standby_write LSN %X/%08X.",
     280                 :             :                                               LSN_FORMAT_ARGS(lsn),
     281                 :             :                                               LSN_FORMAT_ARGS(currentLSN)));
     282                 :             :                             break;
     283                 :             : 
     284                 :           1 :                         case WAIT_LSN_TYPE_STANDBY_FLUSH:
     285         [ +  - ]:           1 :                             ereport(ERROR,
     286                 :             :                                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     287                 :             :                                     errmsg("recovery is not in progress"),
     288                 :             :                                     errdetail("Recovery ended before target LSN %X/%08X was flushed; last standby_flush LSN %X/%08X.",
     289                 :             :                                               LSN_FORMAT_ARGS(lsn),
     290                 :             :                                               LSN_FORMAT_ARGS(currentLSN)));
     291                 :             :                             break;
     292                 :             : 
     293                 :           0 :                         default:
     294         [ #  # ]:           0 :                             elog(ERROR, "unexpected wait LSN type %d", lsnType);
     295                 :             :                     }
     296                 :             :                 }
     297                 :             :                 else
     298                 :             :                 {
     299   [ -  -  +  - ]:           1 :                     switch (lsnType)
     300                 :             :                     {
     301                 :           0 :                         case WAIT_LSN_TYPE_STANDBY_REPLAY:
     302         [ #  # ]:           0 :                             ereport(ERROR,
     303                 :             :                                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     304                 :             :                                     errmsg("recovery is not in progress"),
     305                 :             :                                     errhint("Waiting for the standby_replay LSN can only be executed during recovery."));
     306                 :             :                             break;
     307                 :             : 
     308                 :           0 :                         case WAIT_LSN_TYPE_STANDBY_WRITE:
     309         [ #  # ]:           0 :                             ereport(ERROR,
     310                 :             :                                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     311                 :             :                                     errmsg("recovery is not in progress"),
     312                 :             :                                     errhint("Waiting for the standby_write LSN can only be executed during recovery."));
     313                 :             :                             break;
     314                 :             : 
     315                 :           1 :                         case WAIT_LSN_TYPE_STANDBY_FLUSH:
     316         [ +  - ]:           1 :                             ereport(ERROR,
     317                 :             :                                     errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     318                 :             :                                     errmsg("recovery is not in progress"),
     319                 :             :                                     errhint("Waiting for the standby_flush LSN can only be executed during recovery."));
     320                 :             :                             break;
     321                 :             : 
     322                 :           0 :                         default:
     323         [ #  # ]:           0 :                             elog(ERROR, "unexpected wait LSN type %d", lsnType);
     324                 :             :                     }
     325                 :             :                 }
     326                 :             :             }
     327                 :             :             else
     328                 :           1 :                 result = "not in recovery";
     329                 :           1 :             break;
     330                 :             :     }
     331                 :             : 
     332                 :             :     /* need a tuple descriptor representing a single TEXT column */
     333                 :         232 :     tupdesc = WaitStmtResultDesc(stmt);
     334                 :             : 
     335                 :             :     /* prepare for projection of tuples */
     336                 :         232 :     tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
     337                 :             : 
     338                 :             :     /* Send it */
     339                 :         232 :     do_text_output_oneline(tstate, result);
     340                 :             : 
     341                 :         232 :     end_tup_output(tstate);
     342                 :         232 : }
     343                 :             : 
     344                 :             : TupleDesc
     345                 :         484 : WaitStmtResultDesc(WaitStmt *stmt)
     346                 :             : {
     347                 :             :     TupleDesc   tupdesc;
     348                 :             : 
     349                 :             :     /*
     350                 :             :      * Need a tuple descriptor representing a single TEXT column.
     351                 :             :      *
     352                 :             :      * We use TupleDescInitBuiltinEntry instead of TupleDescInitEntry to avoid
     353                 :             :      * syscache access. This is important because WaitStmtResultDesc may be
     354                 :             :      * called after snapshots have been released, and we must not re-establish
     355                 :             :      * a catalog snapshot which could cause recovery conflicts on a standby.
     356                 :             :      */
     357                 :         484 :     tupdesc = CreateTemplateTupleDesc(1);
     358                 :         484 :     TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "status",
     359                 :             :                               TEXTOID, -1, 0);
     360                 :         484 :     TupleDescFinalize(tupdesc);
     361                 :         484 :     return tupdesc;
     362                 :             : }
        

Generated by: LCOV version 2.0-1