Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * wait.c
4 : * Implements WAIT FOR, which allows waiting for events such as
5 : * time passing or LSN having been replayed, flushed, or written.
6 : *
7 : * Portions Copyright (c) 2025-2026, PostgreSQL Global Development Group
8 : *
9 : * IDENTIFICATION
10 : * src/backend/commands/wait.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 : #include "postgres.h"
15 :
16 : #include <math.h>
17 :
18 : #include "access/xlog.h"
19 : #include "access/xlogrecovery.h"
20 : #include "access/xlogwait.h"
21 : #include "commands/defrem.h"
22 : #include "commands/wait.h"
23 : #include "executor/executor.h"
24 : #include "parser/parse_node.h"
25 : #include "storage/proc.h"
26 : #include "utils/builtins.h"
27 : #include "utils/guc.h"
28 : #include "utils/pg_lsn.h"
29 : #include "utils/snapmgr.h"
30 :
31 :
32 : void
33 110 : ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest)
34 : {
35 : XLogRecPtr lsn;
36 110 : int64 timeout = 0;
37 : WaitLSNResult waitLSNResult;
38 110 : WaitLSNType lsnType = WAIT_LSN_TYPE_STANDBY_REPLAY; /* default */
39 110 : bool throw = true;
40 : TupleDesc tupdesc;
41 : TupOutputState *tstate;
42 110 : const char *result = "<unset>";
43 110 : bool timeout_specified = false;
44 110 : bool no_throw_specified = false;
45 110 : bool mode_specified = false;
46 :
47 : /* Parse and validate the mandatory LSN */
48 110 : lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in,
49 : CStringGetDatum(stmt->lsn_literal)));
50 :
51 338 : foreach_node(DefElem, defel, stmt->options)
52 : {
53 154 : if (strcmp(defel->defname, "mode") == 0)
54 : {
55 : char *mode_str;
56 :
57 64 : if (mode_specified)
58 2 : errorConflictingDefElem(defel, pstate);
59 62 : mode_specified = true;
60 :
61 62 : mode_str = defGetString(defel);
62 :
63 62 : if (pg_strcasecmp(mode_str, "standby_replay") == 0)
64 8 : lsnType = WAIT_LSN_TYPE_STANDBY_REPLAY;
65 54 : else if (pg_strcasecmp(mode_str, "standby_write") == 0)
66 18 : lsnType = WAIT_LSN_TYPE_STANDBY_WRITE;
67 36 : else if (pg_strcasecmp(mode_str, "standby_flush") == 0)
68 20 : lsnType = WAIT_LSN_TYPE_STANDBY_FLUSH;
69 16 : else if (pg_strcasecmp(mode_str, "primary_flush") == 0)
70 14 : lsnType = WAIT_LSN_TYPE_PRIMARY_FLUSH;
71 : else
72 2 : ereport(ERROR,
73 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
74 : errmsg("unrecognized value for %s option \"%s\": \"%s\"",
75 : "WAIT", defel->defname, mode_str),
76 : parser_errposition(pstate, defel->location)));
77 : }
78 90 : else if (strcmp(defel->defname, "timeout") == 0)
79 : {
80 : char *timeout_str;
81 : const char *hintmsg;
82 : double result;
83 :
84 72 : if (timeout_specified)
85 2 : errorConflictingDefElem(defel, pstate);
86 70 : timeout_specified = true;
87 :
88 70 : timeout_str = defGetString(defel);
89 :
90 70 : if (!parse_real(timeout_str, &result, GUC_UNIT_MS, &hintmsg))
91 : {
92 2 : ereport(ERROR,
93 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
94 : errmsg("invalid timeout value: \"%s\"", timeout_str),
95 : hintmsg ? errhint("%s", _(hintmsg)) : 0);
96 : }
97 :
98 : /*
99 : * Get rid of any fractional part in the input. This is so we
100 : * don't fail on just-out-of-range values that would round into
101 : * range.
102 : */
103 68 : result = rint(result);
104 :
105 : /* Range check */
106 68 : if (unlikely(isnan(result) || !FLOAT8_FITS_IN_INT64(result)))
107 0 : ereport(ERROR,
108 : errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
109 : errmsg("timeout value is out of range"));
110 :
111 68 : if (result < 0)
112 2 : ereport(ERROR,
113 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
114 : errmsg("timeout cannot be negative"));
115 :
116 66 : timeout = (int64) result;
117 : }
118 18 : else if (strcmp(defel->defname, "no_throw") == 0)
119 : {
120 14 : if (no_throw_specified)
121 2 : errorConflictingDefElem(defel, pstate);
122 :
123 12 : no_throw_specified = true;
124 :
125 12 : throw = !defGetBoolean(defel);
126 : }
127 : else
128 : {
129 4 : ereport(ERROR,
130 : errcode(ERRCODE_SYNTAX_ERROR),
131 : errmsg("option \"%s\" not recognized",
132 : defel->defname),
133 : parser_errposition(pstate, defel->location));
134 : }
135 : }
136 :
137 : /*
138 : * We are going to wait for the LSN. We should first care that we don't
139 : * hold a snapshot and correspondingly our MyProc->xmin is invalid.
140 : * Otherwise, our snapshot could prevent the replay of WAL records
141 : * implying a kind of self-deadlock. This is the reason why WAIT FOR is a
142 : * command, not a procedure or function.
143 : *
144 : * At first, we should check there is no active snapshot. According to
145 : * PlannedStmtRequiresSnapshot(), even in an atomic context, CallStmt is
146 : * processed with a snapshot. Thankfully, we can pop this snapshot,
147 : * because PortalRunUtility() can tolerate this.
148 : */
149 92 : if (ActiveSnapshotSet())
150 2 : PopActiveSnapshot();
151 :
152 : /*
153 : * At second, invalidate a catalog snapshot if any. And we should be done
154 : * with the preparation.
155 : */
156 92 : InvalidateCatalogSnapshot();
157 :
158 : /* Give up if there is still an active or registered snapshot. */
159 92 : if (HaveRegisteredOrActiveSnapshot())
160 4 : ereport(ERROR,
161 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
162 : errmsg("WAIT FOR must be called without an active or registered snapshot"),
163 : errdetail("WAIT FOR cannot be executed from a function or procedure, nor within a transaction with an isolation level higher than READ COMMITTED."));
164 :
165 : /*
166 : * As the result we should hold no snapshot, and correspondingly our xmin
167 : * should be unset.
168 : */
169 : Assert(MyProc->xmin == InvalidTransactionId);
170 :
171 : /*
172 : * Validate that the requested mode matches the current server state.
173 : * Primary modes can only be used on a primary.
174 : */
175 88 : if (lsnType == WAIT_LSN_TYPE_PRIMARY_FLUSH)
176 : {
177 14 : if (RecoveryInProgress())
178 2 : ereport(ERROR,
179 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
180 : errmsg("recovery is in progress"),
181 : errhint("Waiting for primary_flush can only be done on a primary server. "
182 : "Use standby_flush mode on a standby server.")));
183 : }
184 :
185 : /* Now wait for the LSN */
186 86 : waitLSNResult = WaitForLSN(lsnType, lsn, timeout);
187 :
188 : /*
189 : * Process the result of WaitForLSN(). Throw appropriate error if needed.
190 : */
191 86 : switch (waitLSNResult)
192 : {
193 70 : case WAIT_LSN_RESULT_SUCCESS:
194 : /* Nothing to do on success */
195 70 : result = "success";
196 70 : break;
197 :
198 6 : case WAIT_LSN_RESULT_TIMEOUT:
199 6 : if (throw)
200 : {
201 2 : XLogRecPtr currentLSN = GetCurrentLSNForWaitType(lsnType);
202 :
203 : switch (lsnType)
204 : {
205 2 : case WAIT_LSN_TYPE_STANDBY_REPLAY:
206 2 : ereport(ERROR,
207 : errcode(ERRCODE_QUERY_CANCELED),
208 : errmsg("timed out while waiting for target LSN %X/%08X to be replayed; current standby_replay LSN %X/%08X",
209 : LSN_FORMAT_ARGS(lsn),
210 : LSN_FORMAT_ARGS(currentLSN)));
211 : break;
212 :
213 0 : case WAIT_LSN_TYPE_STANDBY_WRITE:
214 0 : ereport(ERROR,
215 : errcode(ERRCODE_QUERY_CANCELED),
216 : errmsg("timed out while waiting for target LSN %X/%08X to be written; current standby_write LSN %X/%08X",
217 : LSN_FORMAT_ARGS(lsn),
218 : LSN_FORMAT_ARGS(currentLSN)));
219 : break;
220 :
221 0 : case WAIT_LSN_TYPE_STANDBY_FLUSH:
222 0 : ereport(ERROR,
223 : errcode(ERRCODE_QUERY_CANCELED),
224 : errmsg("timed out while waiting for target LSN %X/%08X to be flushed; current standby_flush LSN %X/%08X",
225 : LSN_FORMAT_ARGS(lsn),
226 : LSN_FORMAT_ARGS(currentLSN)));
227 : break;
228 :
229 0 : case WAIT_LSN_TYPE_PRIMARY_FLUSH:
230 0 : ereport(ERROR,
231 : errcode(ERRCODE_QUERY_CANCELED),
232 : errmsg("timed out while waiting for target LSN %X/%08X to be flushed; current primary_flush LSN %X/%08X",
233 : LSN_FORMAT_ARGS(lsn),
234 : LSN_FORMAT_ARGS(currentLSN)));
235 : break;
236 :
237 0 : default:
238 0 : elog(ERROR, "unexpected wait LSN type %d", lsnType);
239 : }
240 : }
241 : else
242 4 : result = "timeout";
243 4 : break;
244 :
245 10 : case WAIT_LSN_RESULT_NOT_IN_RECOVERY:
246 10 : if (throw)
247 : {
248 8 : if (PromoteIsTriggered())
249 : {
250 6 : XLogRecPtr currentLSN = GetCurrentLSNForWaitType(lsnType);
251 :
252 : switch (lsnType)
253 : {
254 2 : case WAIT_LSN_TYPE_STANDBY_REPLAY:
255 2 : ereport(ERROR,
256 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
257 : errmsg("recovery is not in progress"),
258 : errdetail("Recovery ended before target LSN %X/%08X was replayed; last standby_replay LSN %X/%08X.",
259 : LSN_FORMAT_ARGS(lsn),
260 : LSN_FORMAT_ARGS(currentLSN)));
261 : break;
262 :
263 2 : case WAIT_LSN_TYPE_STANDBY_WRITE:
264 2 : ereport(ERROR,
265 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
266 : errmsg("recovery is not in progress"),
267 : errdetail("Recovery ended before target LSN %X/%08X was written; last standby_write LSN %X/%08X.",
268 : LSN_FORMAT_ARGS(lsn),
269 : LSN_FORMAT_ARGS(currentLSN)));
270 : break;
271 :
272 2 : case WAIT_LSN_TYPE_STANDBY_FLUSH:
273 2 : ereport(ERROR,
274 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
275 : errmsg("recovery is not in progress"),
276 : errdetail("Recovery ended before target LSN %X/%08X was flushed; last standby_flush LSN %X/%08X.",
277 : LSN_FORMAT_ARGS(lsn),
278 : LSN_FORMAT_ARGS(currentLSN)));
279 : break;
280 :
281 0 : default:
282 0 : elog(ERROR, "unexpected wait LSN type %d", lsnType);
283 : }
284 : }
285 : else
286 : {
287 : switch (lsnType)
288 : {
289 0 : case WAIT_LSN_TYPE_STANDBY_REPLAY:
290 0 : ereport(ERROR,
291 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
292 : errmsg("recovery is not in progress"),
293 : errhint("Waiting for the standby_replay LSN can only be executed during recovery."));
294 : break;
295 :
296 0 : case WAIT_LSN_TYPE_STANDBY_WRITE:
297 0 : ereport(ERROR,
298 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
299 : errmsg("recovery is not in progress"),
300 : errhint("Waiting for the standby_write LSN can only be executed during recovery."));
301 : break;
302 :
303 2 : case WAIT_LSN_TYPE_STANDBY_FLUSH:
304 2 : ereport(ERROR,
305 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
306 : errmsg("recovery is not in progress"),
307 : errhint("Waiting for the standby_flush LSN can only be executed during recovery."));
308 : break;
309 :
310 0 : default:
311 0 : elog(ERROR, "unexpected wait LSN type %d", lsnType);
312 : }
313 : }
314 : }
315 : else
316 2 : result = "not in recovery";
317 2 : break;
318 : }
319 :
320 : /* need a tuple descriptor representing a single TEXT column */
321 76 : tupdesc = WaitStmtResultDesc(stmt);
322 :
323 : /* prepare for projection of tuples */
324 76 : tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
325 :
326 : /* Send it */
327 76 : do_text_output_oneline(tstate, result);
328 :
329 76 : end_tup_output(tstate);
330 76 : }
331 :
332 : TupleDesc
333 186 : WaitStmtResultDesc(WaitStmt *stmt)
334 : {
335 : TupleDesc tupdesc;
336 :
337 : /* Need a tuple descriptor representing a single TEXT column */
338 186 : tupdesc = CreateTemplateTupleDesc(1);
339 186 : TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
340 : TEXTOID, -1, 0);
341 186 : return tupdesc;
342 : }
|