Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * wait.c
4 : * Implements WAIT FOR, which allows waiting for events such as
5 : * time passing or LSN having been replayed on replica.
6 : *
7 : * Portions Copyright (c) 2025, PostgreSQL Global Development Group
8 : *
9 : * IDENTIFICATION
10 : * src/backend/commands/wait.c
11 : *
12 : *-------------------------------------------------------------------------
13 : */
14 : #include "postgres.h"
15 :
16 : #include <math.h>
17 :
18 : #include "access/xlogrecovery.h"
19 : #include "access/xlogwait.h"
20 : #include "commands/defrem.h"
21 : #include "commands/wait.h"
22 : #include "executor/executor.h"
23 : #include "parser/parse_node.h"
24 : #include "storage/proc.h"
25 : #include "utils/builtins.h"
26 : #include "utils/guc.h"
27 : #include "utils/pg_lsn.h"
28 : #include "utils/snapmgr.h"
29 :
30 :
31 : void
32 52 : ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest)
33 : {
34 : XLogRecPtr lsn;
35 52 : int64 timeout = 0;
36 : WaitLSNResult waitLSNResult;
37 52 : bool throw = true;
38 : TupleDesc tupdesc;
39 : TupOutputState *tstate;
40 52 : const char *result = "<unset>";
41 52 : bool timeout_specified = false;
42 52 : bool no_throw_specified = false;
43 :
44 : /* Parse and validate the mandatory LSN */
45 52 : lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in,
46 : CStringGetDatum(stmt->lsn_literal)));
47 :
48 118 : foreach_node(DefElem, defel, stmt->options)
49 : {
50 42 : if (strcmp(defel->defname, "timeout") == 0)
51 : {
52 : char *timeout_str;
53 : const char *hintmsg;
54 : double result;
55 :
56 24 : if (timeout_specified)
57 2 : errorConflictingDefElem(defel, pstate);
58 22 : timeout_specified = true;
59 :
60 22 : timeout_str = defGetString(defel);
61 :
62 22 : if (!parse_real(timeout_str, &result, GUC_UNIT_MS, &hintmsg))
63 : {
64 2 : ereport(ERROR,
65 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
66 : errmsg("invalid timeout value: \"%s\"", timeout_str),
67 : hintmsg ? errhint("%s", _(hintmsg)) : 0);
68 : }
69 :
70 : /*
71 : * Get rid of any fractional part in the input. This is so we
72 : * don't fail on just-out-of-range values that would round into
73 : * range.
74 : */
75 20 : result = rint(result);
76 :
77 : /* Range check */
78 20 : if (unlikely(isnan(result) || !FLOAT8_FITS_IN_INT64(result)))
79 0 : ereport(ERROR,
80 : errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
81 : errmsg("timeout value is out of range"));
82 :
83 20 : if (result < 0)
84 2 : ereport(ERROR,
85 : errcode(ERRCODE_INVALID_PARAMETER_VALUE),
86 : errmsg("timeout cannot be negative"));
87 :
88 18 : timeout = (int64) result;
89 : }
90 18 : else if (strcmp(defel->defname, "no_throw") == 0)
91 : {
92 14 : if (no_throw_specified)
93 2 : errorConflictingDefElem(defel, pstate);
94 :
95 12 : no_throw_specified = true;
96 :
97 12 : throw = !defGetBoolean(defel);
98 : }
99 : else
100 : {
101 4 : ereport(ERROR,
102 : errcode(ERRCODE_SYNTAX_ERROR),
103 : errmsg("option \"%s\" not recognized",
104 : defel->defname),
105 : parser_errposition(pstate, defel->location));
106 : }
107 : }
108 :
109 : /*
110 : * We are going to wait for the LSN replay. We should first care that we
111 : * don't hold a snapshot and correspondingly our MyProc->xmin is invalid.
112 : * Otherwise, our snapshot could prevent the replay of WAL records
113 : * implying a kind of self-deadlock. This is the reason why WAIT FOR is a
114 : * command, not a procedure or function.
115 : *
116 : * At first, we should check there is no active snapshot. According to
117 : * PlannedStmtRequiresSnapshot(), even in an atomic context, CallStmt is
118 : * processed with a snapshot. Thankfully, we can pop this snapshot,
119 : * because PortalRunUtility() can tolerate this.
120 : */
121 38 : if (ActiveSnapshotSet())
122 2 : PopActiveSnapshot();
123 :
124 : /*
125 : * At second, invalidate a catalog snapshot if any. And we should be done
126 : * with the preparation.
127 : */
128 38 : InvalidateCatalogSnapshot();
129 :
130 : /* Give up if there is still an active or registered snapshot. */
131 38 : if (HaveRegisteredOrActiveSnapshot())
132 4 : ereport(ERROR,
133 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
134 : errmsg("WAIT FOR must be only called without an active or registered snapshot"),
135 : errdetail("WAIT FOR cannot be executed from a function or a procedure or within a transaction with an isolation level higher than READ COMMITTED."));
136 :
137 : /*
138 : * As the result we should hold no snapshot, and correspondingly our xmin
139 : * should be unset.
140 : */
141 : Assert(MyProc->xmin == InvalidTransactionId);
142 :
143 34 : waitLSNResult = WaitForLSN(WAIT_LSN_TYPE_REPLAY, lsn, timeout);
144 :
145 : /*
146 : * Process the result of WaitForLSN(). Throw appropriate error if needed.
147 : */
148 34 : switch (waitLSNResult)
149 : {
150 22 : case WAIT_LSN_RESULT_SUCCESS:
151 : /* Nothing to do on success */
152 22 : result = "success";
153 22 : break;
154 :
155 6 : case WAIT_LSN_RESULT_TIMEOUT:
156 6 : if (throw)
157 2 : ereport(ERROR,
158 : errcode(ERRCODE_QUERY_CANCELED),
159 : errmsg("timed out while waiting for target LSN %X/%08X to be replayed; current replay LSN %X/%08X",
160 : LSN_FORMAT_ARGS(lsn),
161 : LSN_FORMAT_ARGS(GetXLogReplayRecPtr(NULL))));
162 : else
163 4 : result = "timeout";
164 4 : break;
165 :
166 6 : case WAIT_LSN_RESULT_NOT_IN_RECOVERY:
167 6 : if (throw)
168 : {
169 4 : if (PromoteIsTriggered())
170 : {
171 2 : ereport(ERROR,
172 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
173 : errmsg("recovery is not in progress"),
174 : errdetail("Recovery ended before replaying target LSN %X/%08X; last replay LSN %X/%08X.",
175 : LSN_FORMAT_ARGS(lsn),
176 : LSN_FORMAT_ARGS(GetXLogReplayRecPtr(NULL))));
177 : }
178 : else
179 2 : ereport(ERROR,
180 : errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
181 : errmsg("recovery is not in progress"),
182 : errhint("Waiting for the replay LSN can only be executed during recovery."));
183 : }
184 : else
185 2 : result = "not in recovery";
186 2 : break;
187 : }
188 :
189 : /* need a tuple descriptor representing a single TEXT column */
190 28 : tupdesc = WaitStmtResultDesc(stmt);
191 :
192 : /* prepare for projection of tuples */
193 28 : tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
194 :
195 : /* Send it */
196 28 : do_text_output_oneline(tstate, result);
197 :
198 28 : end_tup_output(tstate);
199 28 : }
200 :
201 : TupleDesc
202 80 : WaitStmtResultDesc(WaitStmt *stmt)
203 : {
204 : TupleDesc tupdesc;
205 :
206 : /* Need a tuple descriptor representing a single TEXT column */
207 80 : tupdesc = CreateTemplateTupleDesc(1);
208 80 : TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
209 : TEXTOID, -1, 0);
210 80 : return tupdesc;
211 : }
|