Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * wait.c
4 : * Implements WAIT FOR, which allows waiting for events such as
5 : * time passing or LSN having been replayed, flushed, or written.
6 : *
7 : * Portions Copyright (c) 2025-2026, PostgreSQL Global Development Group
8 : *
9 : * IDENTIFICATION
10 : * src/backend/commands/wait.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 : #include "postgres.h"
15 :
16 : #include <math.h>
17 :
18 : #include "access/xlog.h"
19 : #include "access/xlogrecovery.h"
20 : #include "access/xlogwait.h"
21 : #include "catalog/pg_type_d.h"
22 : #include "commands/defrem.h"
23 : #include "commands/wait.h"
24 : #include "executor/executor.h"
25 : #include "parser/parse_node.h"
26 : #include "storage/proc.h"
27 : #include "utils/builtins.h"
28 : #include "utils/guc.h"
29 : #include "utils/pg_lsn.h"
30 : #include "utils/snapmgr.h"
31 :
32 :
33 : void
34 55 : ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest)
35 : {
36 : XLogRecPtr lsn;
37 55 : int64 timeout = 0;
38 : WaitLSNResult waitLSNResult;
39 55 : WaitLSNType lsnType = WAIT_LSN_TYPE_STANDBY_REPLAY; /* default */
40 55 : bool throw = true;
41 : TupleDesc tupdesc;
42 : TupOutputState *tstate;
43 55 : const char *result = "<unset>";
44 55 : bool timeout_specified = false;
45 55 : bool no_throw_specified = false;
46 55 : bool mode_specified = false;
47 :
48 : /* Parse and validate the mandatory LSN */
49 55 : lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in,
50 : CStringGetDatum(stmt->lsn_literal)));
51 :
52 169 : foreach_node(DefElem, defel, stmt->options)
53 : {
54 77 : if (strcmp(defel->defname, "mode") == 0)
55 : {
56 : char *mode_str;
57 :
58 32 : if (mode_specified)
59 1 : errorConflictingDefElem(defel, pstate);
60 31 : mode_specified = true;
61 :
62 31 : mode_str = defGetString(defel);
63 :
64 31 : if (pg_strcasecmp(mode_str, "standby_replay") == 0)
65 4 : lsnType = WAIT_LSN_TYPE_STANDBY_REPLAY;
66 27 : else if (pg_strcasecmp(mode_str, "standby_write") == 0)
67 9 : lsnType = WAIT_LSN_TYPE_STANDBY_WRITE;
68 18 : else if (pg_strcasecmp(mode_str, "standby_flush") == 0)
69 10 : lsnType = WAIT_LSN_TYPE_STANDBY_FLUSH;
70 8 : else if (pg_strcasecmp(mode_str, "primary_flush") == 0)
71 7 : lsnType = WAIT_LSN_TYPE_PRIMARY_FLUSH;
72 : else
73 1 : ereport(ERROR,
74 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
75 : errmsg("unrecognized value for %s option \"%s\": \"%s\"",
76 : "WAIT", defel->defname, mode_str),
77 : parser_errposition(pstate, defel->location)));
78 : }
79 45 : else if (strcmp(defel->defname, "timeout") == 0)
80 : {
81 : char *timeout_str;
82 : const char *hintmsg;
83 : double result;
84 :
85 36 : if (timeout_specified)
86 1 : errorConflictingDefElem(defel, pstate);
87 35 : timeout_specified = true;
88 :
89 35 : timeout_str = defGetString(defel);
90 :
91 35 : if (!parse_real(timeout_str, &result, GUC_UNIT_MS, &hintmsg))
92 : {
93 1 : ereport(ERROR,
94 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
95 : errmsg("invalid timeout value: \"%s\"", timeout_str),
96 : hintmsg ? errhint("%s", _(hintmsg)) : 0);
97 : }
98 :
99 : /*
100 : * Get rid of any fractional part in the input. This is so we
101 : * don't fail on just-out-of-range values that would round into
102 : * range.
103 : */
104 34 : result = rint(result);
105 :
106 : /* Range check */
107 34 : if (unlikely(isnan(result) || !FLOAT8_FITS_IN_INT64(result)))
108 0 : ereport(ERROR,
109 : errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
110 : errmsg("timeout value is out of range"));
111 :
112 34 : if (result < 0)
113 1 : ereport(ERROR,
114 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
115 : errmsg("timeout cannot be negative"));
116 :
117 33 : timeout = (int64) result;
118 : }
119 9 : else if (strcmp(defel->defname, "no_throw") == 0)
120 : {
121 7 : if (no_throw_specified)
122 1 : errorConflictingDefElem(defel, pstate);
123 :
124 6 : no_throw_specified = true;
125 :
126 6 : throw = !defGetBoolean(defel);
127 : }
128 : else
129 : {
130 2 : ereport(ERROR,
131 : errcode(ERRCODE_SYNTAX_ERROR),
132 : errmsg("option \"%s\" not recognized",
133 : defel->defname),
134 : parser_errposition(pstate, defel->location));
135 : }
136 : }
137 :
138 : /*
139 : * We are going to wait for the LSN. We should first care that we don't
140 : * hold a snapshot and correspondingly our MyProc->xmin is invalid.
141 : * Otherwise, our snapshot could prevent the replay of WAL records
142 : * implying a kind of self-deadlock. This is the reason why WAIT FOR is a
143 : * command, not a procedure or function.
144 : *
145 : * At first, we should check there is no active snapshot. According to
146 : * PlannedStmtRequiresSnapshot(), even in an atomic context, CallStmt is
147 : * processed with a snapshot. Thankfully, we can pop this snapshot,
148 : * because PortalRunUtility() can tolerate this.
149 : */
150 46 : if (ActiveSnapshotSet())
151 1 : PopActiveSnapshot();
152 :
153 : /*
154 : * At second, invalidate a catalog snapshot if any. And we should be done
155 : * with the preparation.
156 : */
157 46 : InvalidateCatalogSnapshot();
158 :
159 : /* Give up if there is still an active or registered snapshot. */
160 46 : if (HaveRegisteredOrActiveSnapshot())
161 2 : ereport(ERROR,
162 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
163 : errmsg("WAIT FOR must be called without an active or registered snapshot"),
164 : errdetail("WAIT FOR cannot be executed from a function or procedure, nor within a transaction with an isolation level higher than READ COMMITTED."));
165 :
166 : /*
167 : * As the result we should hold no snapshot, and correspondingly our xmin
168 : * should be unset.
169 : */
170 : Assert(MyProc->xmin == InvalidTransactionId);
171 :
172 : /*
173 : * Validate that the requested mode matches the current server state.
174 : * Primary modes can only be used on a primary.
175 : */
176 44 : if (lsnType == WAIT_LSN_TYPE_PRIMARY_FLUSH)
177 : {
178 7 : if (RecoveryInProgress())
179 1 : ereport(ERROR,
180 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
181 : errmsg("recovery is in progress"),
182 : errhint("Waiting for primary_flush can only be done on a primary server. "
183 : "Use standby_flush mode on a standby server.")));
184 : }
185 :
186 : /* Now wait for the LSN */
187 43 : waitLSNResult = WaitForLSN(lsnType, lsn, timeout);
188 :
189 : /*
190 : * Process the result of WaitForLSN(). Throw appropriate error if needed.
191 : */
192 43 : switch (waitLSNResult)
193 : {
194 35 : case WAIT_LSN_RESULT_SUCCESS:
195 : /* Nothing to do on success */
196 35 : result = "success";
197 35 : break;
198 :
199 3 : case WAIT_LSN_RESULT_TIMEOUT:
200 3 : if (throw)
201 : {
202 1 : XLogRecPtr currentLSN = GetCurrentLSNForWaitType(lsnType);
203 :
204 1 : switch (lsnType)
205 : {
206 1 : case WAIT_LSN_TYPE_STANDBY_REPLAY:
207 1 : ereport(ERROR,
208 : errcode(ERRCODE_QUERY_CANCELED),
209 : errmsg("timed out while waiting for target LSN %X/%08X to be replayed; current standby_replay LSN %X/%08X",
210 : LSN_FORMAT_ARGS(lsn),
211 : LSN_FORMAT_ARGS(currentLSN)));
212 : break;
213 :
214 0 : case WAIT_LSN_TYPE_STANDBY_WRITE:
215 0 : ereport(ERROR,
216 : errcode(ERRCODE_QUERY_CANCELED),
217 : errmsg("timed out while waiting for target LSN %X/%08X to be written; current standby_write LSN %X/%08X",
218 : LSN_FORMAT_ARGS(lsn),
219 : LSN_FORMAT_ARGS(currentLSN)));
220 : break;
221 :
222 0 : case WAIT_LSN_TYPE_STANDBY_FLUSH:
223 0 : ereport(ERROR,
224 : errcode(ERRCODE_QUERY_CANCELED),
225 : errmsg("timed out while waiting for target LSN %X/%08X to be flushed; current standby_flush LSN %X/%08X",
226 : LSN_FORMAT_ARGS(lsn),
227 : LSN_FORMAT_ARGS(currentLSN)));
228 : break;
229 :
230 0 : case WAIT_LSN_TYPE_PRIMARY_FLUSH:
231 0 : ereport(ERROR,
232 : errcode(ERRCODE_QUERY_CANCELED),
233 : errmsg("timed out while waiting for target LSN %X/%08X to be flushed; current primary_flush LSN %X/%08X",
234 : LSN_FORMAT_ARGS(lsn),
235 : LSN_FORMAT_ARGS(currentLSN)));
236 : break;
237 :
238 0 : default:
239 0 : elog(ERROR, "unexpected wait LSN type %d", lsnType);
240 : }
241 : }
242 : else
243 2 : result = "timeout";
244 2 : break;
245 :
246 5 : case WAIT_LSN_RESULT_NOT_IN_RECOVERY:
247 5 : if (throw)
248 : {
249 4 : if (PromoteIsTriggered())
250 : {
251 3 : XLogRecPtr currentLSN = GetCurrentLSNForWaitType(lsnType);
252 :
253 3 : switch (lsnType)
254 : {
255 1 : case WAIT_LSN_TYPE_STANDBY_REPLAY:
256 1 : ereport(ERROR,
257 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
258 : errmsg("recovery is not in progress"),
259 : errdetail("Recovery ended before target LSN %X/%08X was replayed; last standby_replay LSN %X/%08X.",
260 : LSN_FORMAT_ARGS(lsn),
261 : LSN_FORMAT_ARGS(currentLSN)));
262 : break;
263 :
264 1 : case WAIT_LSN_TYPE_STANDBY_WRITE:
265 1 : ereport(ERROR,
266 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
267 : errmsg("recovery is not in progress"),
268 : errdetail("Recovery ended before target LSN %X/%08X was written; last standby_write LSN %X/%08X.",
269 : LSN_FORMAT_ARGS(lsn),
270 : LSN_FORMAT_ARGS(currentLSN)));
271 : break;
272 :
273 1 : case WAIT_LSN_TYPE_STANDBY_FLUSH:
274 1 : ereport(ERROR,
275 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
276 : errmsg("recovery is not in progress"),
277 : errdetail("Recovery ended before target LSN %X/%08X was flushed; last standby_flush LSN %X/%08X.",
278 : LSN_FORMAT_ARGS(lsn),
279 : LSN_FORMAT_ARGS(currentLSN)));
280 : break;
281 :
282 0 : default:
283 0 : elog(ERROR, "unexpected wait LSN type %d", lsnType);
284 : }
285 : }
286 : else
287 : {
288 1 : switch (lsnType)
289 : {
290 0 : case WAIT_LSN_TYPE_STANDBY_REPLAY:
291 0 : ereport(ERROR,
292 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
293 : errmsg("recovery is not in progress"),
294 : errhint("Waiting for the standby_replay LSN can only be executed during recovery."));
295 : break;
296 :
297 0 : case WAIT_LSN_TYPE_STANDBY_WRITE:
298 0 : ereport(ERROR,
299 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
300 : errmsg("recovery is not in progress"),
301 : errhint("Waiting for the standby_write LSN can only be executed during recovery."));
302 : break;
303 :
304 1 : case WAIT_LSN_TYPE_STANDBY_FLUSH:
305 1 : ereport(ERROR,
306 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
307 : errmsg("recovery is not in progress"),
308 : errhint("Waiting for the standby_flush LSN can only be executed during recovery."));
309 : break;
310 :
311 0 : default:
312 0 : elog(ERROR, "unexpected wait LSN type %d", lsnType);
313 : }
314 : }
315 : }
316 : else
317 1 : result = "not in recovery";
318 1 : break;
319 : }
320 :
321 : /* need a tuple descriptor representing a single TEXT column */
322 38 : tupdesc = WaitStmtResultDesc(stmt);
323 :
324 : /* prepare for projection of tuples */
325 38 : tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
326 :
327 : /* Send it */
328 38 : do_text_output_oneline(tstate, result);
329 :
330 38 : end_tup_output(tstate);
331 38 : }
332 :
333 : TupleDesc
334 93 : WaitStmtResultDesc(WaitStmt *stmt)
335 : {
336 : TupleDesc tupdesc;
337 :
338 : /* Need a tuple descriptor representing a single TEXT column */
339 93 : tupdesc = CreateTemplateTupleDesc(1);
340 93 : TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
341 : TEXTOID, -1, 0);
342 93 : TupleDescFinalize(tupdesc);
343 93 : return tupdesc;
344 : }
|