Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * wait.c
4 : * Implements WAIT FOR, which allows waiting for events such as
5 : * time passing or LSN having been replayed, flushed, or written.
6 : *
7 : * Portions Copyright (c) 2025-2026, PostgreSQL Global Development Group
8 : *
9 : * IDENTIFICATION
10 : * src/backend/commands/wait.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 : #include "postgres.h"
15 :
16 : #include <math.h>
17 :
18 : #include "access/xlog.h"
19 : #include "access/xlogrecovery.h"
20 : #include "access/xlogwait.h"
21 : #include "catalog/pg_type_d.h"
22 : #include "commands/defrem.h"
23 : #include "commands/wait.h"
24 : #include "executor/executor.h"
25 : #include "parser/parse_node.h"
26 : #include "storage/proc.h"
27 : #include "utils/builtins.h"
28 : #include "utils/guc.h"
29 : #include "utils/pg_lsn.h"
30 : #include "utils/snapmgr.h"
31 :
32 :
33 : void
34 228 : ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, bool isTopLevel,
35 : DestReceiver *dest)
36 : {
37 : XLogRecPtr lsn;
38 228 : int64 timeout = 0;
39 : WaitLSNResult waitLSNResult;
40 228 : WaitLSNType lsnType = WAIT_LSN_TYPE_STANDBY_REPLAY; /* default */
41 228 : bool throw = true;
42 : TupleDesc tupdesc;
43 : TupOutputState *tstate;
44 228 : const char *result = "<unset>";
45 228 : bool timeout_specified = false;
46 228 : bool no_throw_specified = false;
47 228 : bool mode_specified = false;
48 :
49 : /*
50 : * WAIT FOR must not be run as a non-top-level statement (e.g., inside a
51 : * function, procedure, or DO block). Forbid this case upfront.
52 : */
53 228 : if (!isTopLevel)
54 3 : ereport(ERROR,
55 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
56 : errmsg("%s can only be executed as a top-level statement",
57 : "WAIT FOR"),
58 : errdetail("WAIT FOR cannot be used within a function, procedure, or DO block.")));
59 :
60 : /* Parse and validate the mandatory LSN */
61 225 : lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in,
62 : CStringGetDatum(stmt->lsn_literal)));
63 :
64 1022 : foreach_node(DefElem, defel, stmt->options)
65 : {
66 590 : if (strcmp(defel->defname, "mode") == 0)
67 : {
68 : char *mode_str;
69 :
70 203 : if (mode_specified)
71 1 : errorConflictingDefElem(defel, pstate);
72 202 : mode_specified = true;
73 :
74 202 : mode_str = defGetString(defel);
75 :
76 202 : if (pg_strcasecmp(mode_str, "standby_replay") == 0)
77 158 : lsnType = WAIT_LSN_TYPE_STANDBY_REPLAY;
78 44 : else if (pg_strcasecmp(mode_str, "standby_write") == 0)
79 26 : lsnType = WAIT_LSN_TYPE_STANDBY_WRITE;
80 18 : else if (pg_strcasecmp(mode_str, "standby_flush") == 0)
81 10 : lsnType = WAIT_LSN_TYPE_STANDBY_FLUSH;
82 8 : else if (pg_strcasecmp(mode_str, "primary_flush") == 0)
83 7 : lsnType = WAIT_LSN_TYPE_PRIMARY_FLUSH;
84 : else
85 1 : ereport(ERROR,
86 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
87 : errmsg("unrecognized value for %s option \"%s\": \"%s\"",
88 : "WAIT", defel->defname, mode_str),
89 : parser_errposition(pstate, defel->location)));
90 : }
91 387 : else if (strcmp(defel->defname, "timeout") == 0)
92 : {
93 : char *timeout_str;
94 : const char *hintmsg;
95 : double dval;
96 :
97 207 : if (timeout_specified)
98 1 : errorConflictingDefElem(defel, pstate);
99 206 : timeout_specified = true;
100 :
101 206 : timeout_str = defGetString(defel);
102 :
103 206 : if (!parse_real(timeout_str, &dval, GUC_UNIT_MS, &hintmsg))
104 : {
105 1 : ereport(ERROR,
106 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
107 : errmsg("invalid timeout value: \"%s\"", timeout_str),
108 : hintmsg ? errhint("%s", _(hintmsg)) : 0);
109 : }
110 :
111 : /*
112 : * Get rid of any fractional part in the input. This is so we
113 : * don't fail on just-out-of-range values that would round into
114 : * range.
115 : */
116 205 : dval = rint(dval);
117 :
118 : /* Range check */
119 205 : if (unlikely(isnan(dval) || !FLOAT8_FITS_IN_INT64(dval)))
120 0 : ereport(ERROR,
121 : errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
122 : errmsg("timeout value is out of range"));
123 :
124 205 : if (dval < 0)
125 1 : ereport(ERROR,
126 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
127 : errmsg("timeout cannot be negative"));
128 :
129 204 : timeout = (int64) dval;
130 : }
131 180 : else if (strcmp(defel->defname, "no_throw") == 0)
132 : {
133 178 : if (no_throw_specified)
134 1 : errorConflictingDefElem(defel, pstate);
135 :
136 177 : no_throw_specified = true;
137 :
138 177 : throw = !defGetBoolean(defel);
139 : }
140 : else
141 : {
142 2 : ereport(ERROR,
143 : errcode(ERRCODE_SYNTAX_ERROR),
144 : errmsg("option \"%s\" not recognized",
145 : defel->defname),
146 : parser_errposition(pstate, defel->location));
147 : }
148 : }
149 :
150 : /*
151 : * We are going to wait for the LSN. We should first care that we don't
152 : * hold a snapshot and correspondingly our MyProc->xmin is invalid.
153 : * Otherwise, our snapshot could prevent the replay of WAL records
154 : * implying a kind of self-deadlock. This is the reason why WAIT FOR is a
155 : * command, not a procedure or function.
156 : *
157 : * Non-top-level contexts are rejected above, but be defensive and pop any
158 : * active snapshot if one is present. PortalRunUtility() can tolerate
159 : * utility commands that remove the active snapshot.
160 : */
161 216 : if (ActiveSnapshotSet())
162 0 : PopActiveSnapshot();
163 :
164 : /*
165 : * At second, invalidate a catalog snapshot if any. And we should be done
166 : * with the preparation.
167 : */
168 216 : InvalidateCatalogSnapshot();
169 :
170 : /* Give up if there is still an active or registered snapshot. */
171 216 : if (HaveRegisteredOrActiveSnapshot())
172 1 : ereport(ERROR,
173 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
174 : errmsg("WAIT FOR must be called without an active or registered snapshot"),
175 : errdetail("WAIT FOR cannot be executed within a transaction with an isolation level higher than READ COMMITTED."));
176 :
177 : /*
178 : * As the result we should hold no snapshot, and correspondingly our xmin
179 : * should be unset.
180 : */
181 : Assert(MyProc->xmin == InvalidTransactionId);
182 :
183 : /*
184 : * Validate that the requested mode matches the current server state.
185 : * Primary modes can only be used on a primary.
186 : */
187 215 : if (lsnType == WAIT_LSN_TYPE_PRIMARY_FLUSH)
188 : {
189 7 : if (RecoveryInProgress())
190 1 : ereport(ERROR,
191 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
192 : errmsg("recovery is in progress"),
193 : errhint("Waiting for primary_flush can only be done on a primary server. "
194 : "Use standby_flush mode on a standby server.")));
195 : }
196 :
197 : /* Now wait for the LSN */
198 214 : waitLSNResult = WaitForLSN(lsnType, lsn, timeout);
199 :
200 : /*
201 : * Process the result of WaitForLSN(). Throw appropriate error if needed.
202 : */
203 214 : switch (waitLSNResult)
204 : {
205 206 : case WAIT_LSN_RESULT_SUCCESS:
206 : /* Nothing to do on success */
207 206 : result = "success";
208 206 : break;
209 :
210 3 : case WAIT_LSN_RESULT_TIMEOUT:
211 3 : if (throw)
212 : {
213 1 : XLogRecPtr currentLSN = GetCurrentLSNForWaitType(lsnType);
214 :
215 1 : switch (lsnType)
216 : {
217 1 : case WAIT_LSN_TYPE_STANDBY_REPLAY:
218 1 : ereport(ERROR,
219 : errcode(ERRCODE_QUERY_CANCELED),
220 : errmsg("timed out while waiting for target LSN %X/%08X to be replayed; current standby_replay LSN %X/%08X",
221 : LSN_FORMAT_ARGS(lsn),
222 : LSN_FORMAT_ARGS(currentLSN)));
223 : break;
224 :
225 0 : case WAIT_LSN_TYPE_STANDBY_WRITE:
226 0 : ereport(ERROR,
227 : errcode(ERRCODE_QUERY_CANCELED),
228 : errmsg("timed out while waiting for target LSN %X/%08X to be written; current standby_write LSN %X/%08X",
229 : LSN_FORMAT_ARGS(lsn),
230 : LSN_FORMAT_ARGS(currentLSN)));
231 : break;
232 :
233 0 : case WAIT_LSN_TYPE_STANDBY_FLUSH:
234 0 : ereport(ERROR,
235 : errcode(ERRCODE_QUERY_CANCELED),
236 : errmsg("timed out while waiting for target LSN %X/%08X to be flushed; current standby_flush LSN %X/%08X",
237 : LSN_FORMAT_ARGS(lsn),
238 : LSN_FORMAT_ARGS(currentLSN)));
239 : break;
240 :
241 0 : case WAIT_LSN_TYPE_PRIMARY_FLUSH:
242 0 : ereport(ERROR,
243 : errcode(ERRCODE_QUERY_CANCELED),
244 : errmsg("timed out while waiting for target LSN %X/%08X to be flushed; current primary_flush LSN %X/%08X",
245 : LSN_FORMAT_ARGS(lsn),
246 : LSN_FORMAT_ARGS(currentLSN)));
247 : break;
248 :
249 0 : default:
250 0 : elog(ERROR, "unexpected wait LSN type %d", lsnType);
251 : }
252 : }
253 : else
254 2 : result = "timeout";
255 2 : break;
256 :
257 5 : case WAIT_LSN_RESULT_NOT_IN_RECOVERY:
258 5 : if (throw)
259 : {
260 4 : if (PromoteIsTriggered())
261 : {
262 3 : XLogRecPtr currentLSN = GetCurrentLSNForWaitType(lsnType);
263 :
264 3 : switch (lsnType)
265 : {
266 1 : case WAIT_LSN_TYPE_STANDBY_REPLAY:
267 1 : ereport(ERROR,
268 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
269 : errmsg("recovery is not in progress"),
270 : errdetail("Recovery ended before target LSN %X/%08X was replayed; last standby_replay LSN %X/%08X.",
271 : LSN_FORMAT_ARGS(lsn),
272 : LSN_FORMAT_ARGS(currentLSN)));
273 : break;
274 :
275 1 : case WAIT_LSN_TYPE_STANDBY_WRITE:
276 1 : ereport(ERROR,
277 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
278 : errmsg("recovery is not in progress"),
279 : errdetail("Recovery ended before target LSN %X/%08X was written; last standby_write LSN %X/%08X.",
280 : LSN_FORMAT_ARGS(lsn),
281 : LSN_FORMAT_ARGS(currentLSN)));
282 : break;
283 :
284 1 : case WAIT_LSN_TYPE_STANDBY_FLUSH:
285 1 : ereport(ERROR,
286 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
287 : errmsg("recovery is not in progress"),
288 : errdetail("Recovery ended before target LSN %X/%08X was flushed; last standby_flush LSN %X/%08X.",
289 : LSN_FORMAT_ARGS(lsn),
290 : LSN_FORMAT_ARGS(currentLSN)));
291 : break;
292 :
293 0 : default:
294 0 : elog(ERROR, "unexpected wait LSN type %d", lsnType);
295 : }
296 : }
297 : else
298 : {
299 1 : switch (lsnType)
300 : {
301 0 : case WAIT_LSN_TYPE_STANDBY_REPLAY:
302 0 : ereport(ERROR,
303 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
304 : errmsg("recovery is not in progress"),
305 : errhint("Waiting for the standby_replay LSN can only be executed during recovery."));
306 : break;
307 :
308 0 : case WAIT_LSN_TYPE_STANDBY_WRITE:
309 0 : ereport(ERROR,
310 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
311 : errmsg("recovery is not in progress"),
312 : errhint("Waiting for the standby_write LSN can only be executed during recovery."));
313 : break;
314 :
315 1 : case WAIT_LSN_TYPE_STANDBY_FLUSH:
316 1 : ereport(ERROR,
317 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
318 : errmsg("recovery is not in progress"),
319 : errhint("Waiting for the standby_flush LSN can only be executed during recovery."));
320 : break;
321 :
322 0 : default:
323 0 : elog(ERROR, "unexpected wait LSN type %d", lsnType);
324 : }
325 : }
326 : }
327 : else
328 1 : result = "not in recovery";
329 1 : break;
330 : }
331 :
332 : /* need a tuple descriptor representing a single TEXT column */
333 209 : tupdesc = WaitStmtResultDesc(stmt);
334 :
335 : /* prepare for projection of tuples */
336 209 : tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
337 :
338 : /* Send it */
339 209 : do_text_output_oneline(tstate, result);
340 :
341 209 : end_tup_output(tstate);
342 209 : }
343 :
344 : TupleDesc
345 437 : WaitStmtResultDesc(WaitStmt *stmt)
346 : {
347 : TupleDesc tupdesc;
348 :
349 : /*
350 : * Need a tuple descriptor representing a single TEXT column.
351 : *
352 : * We use TupleDescInitBuiltinEntry instead of TupleDescInitEntry to avoid
353 : * syscache access. This is important because WaitStmtResultDesc may be
354 : * called after snapshots have been released, and we must not re-establish
355 : * a catalog snapshot which could cause recovery conflicts on a standby.
356 : */
357 437 : tupdesc = CreateTemplateTupleDesc(1);
358 437 : TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "status",
359 : TEXTOID, -1, 0);
360 437 : TupleDescFinalize(tupdesc);
361 437 : return tupdesc;
362 : }
|