Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pqmq.c
4 : * Use the frontend/backend protocol for communication over a shm_mq
5 : *
6 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : * src/backend/libpq/pqmq.c
10 : *
11 : *-------------------------------------------------------------------------
12 : */
13 :
14 : #include "postgres.h"
15 :
16 : #include "access/parallel.h"
17 : #include "libpq/libpq.h"
18 : #include "libpq/pqformat.h"
19 : #include "libpq/pqmq.h"
20 : #include "miscadmin.h"
21 : #include "pgstat.h"
22 : #include "replication/logicalworker.h"
23 : #include "storage/latch.h"
24 : #include "tcop/tcopprot.h"
25 : #include "utils/builtins.h"
26 :
27 : static shm_mq_handle *pq_mq_handle = NULL;
28 : static bool pq_mq_busy = false;
29 : static pid_t pq_mq_parallel_leader_pid = 0;
30 : static ProcNumber pq_mq_parallel_leader_proc_number = INVALID_PROC_NUMBER;
31 :
32 : static void pq_cleanup_redirect_to_shm_mq(dsm_segment *seg, Datum arg);
33 : static void mq_comm_reset(void);
34 : static int mq_flush(void);
35 : static int mq_flush_if_writable(void);
36 : static bool mq_is_send_pending(void);
37 : static int mq_putmessage(char msgtype, const char *s, size_t len);
38 : static void mq_putmessage_noblock(char msgtype, const char *s, size_t len);
39 :
40 : static const PQcommMethods PqCommMqMethods = {
41 : .comm_reset = mq_comm_reset,
42 : .flush = mq_flush,
43 : .flush_if_writable = mq_flush_if_writable,
44 : .is_send_pending = mq_is_send_pending,
45 : .putmessage = mq_putmessage,
46 : .putmessage_noblock = mq_putmessage_noblock
47 : };
48 :
49 : /*
50 : * Arrange to redirect frontend/backend protocol messages to a shared-memory
51 : * message queue.
52 : */
53 : void
54 1495 : pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh)
55 : {
56 1495 : PqCommMethods = &PqCommMqMethods;
57 1495 : pq_mq_handle = mqh;
58 1495 : whereToSendOutput = DestRemote;
59 1495 : FrontendProtocol = PG_PROTOCOL_LATEST;
60 1495 : on_dsm_detach(seg, pq_cleanup_redirect_to_shm_mq, (Datum) 0);
61 1495 : }
62 :
63 : /*
64 : * When the DSM that contains our shm_mq goes away, we need to stop sending
65 : * messages to it.
66 : */
67 : static void
68 1495 : pq_cleanup_redirect_to_shm_mq(dsm_segment *seg, Datum arg)
69 : {
70 1495 : if (pq_mq_handle != NULL)
71 : {
72 1495 : pfree(pq_mq_handle);
73 1495 : pq_mq_handle = NULL;
74 : }
75 1495 : whereToSendOutput = DestNone;
76 1495 : }
77 :
78 : /*
79 : * Arrange to SendProcSignal() to the parallel leader each time we transmit
80 : * message data via the shm_mq.
81 : */
82 : void
83 1495 : pq_set_parallel_leader(pid_t pid, ProcNumber procNumber)
84 : {
85 : Assert(PqCommMethods == &PqCommMqMethods);
86 1495 : pq_mq_parallel_leader_pid = pid;
87 1495 : pq_mq_parallel_leader_proc_number = procNumber;
88 1495 : }
89 :
90 : static void
91 0 : mq_comm_reset(void)
92 : {
93 : /* Nothing to do. */
94 0 : }
95 :
96 : static int
97 13 : mq_flush(void)
98 : {
99 : /* Nothing to do. */
100 13 : return 0;
101 : }
102 :
103 : static int
104 0 : mq_flush_if_writable(void)
105 : {
106 : /* Nothing to do. */
107 0 : return 0;
108 : }
109 :
110 : static bool
111 0 : mq_is_send_pending(void)
112 : {
113 : /* There's never anything pending. */
114 0 : return 0;
115 : }
116 :
117 : /*
118 : * Transmit a libpq protocol message to the shared memory message queue
119 : * selected via pq_mq_handle. We don't include a length word, because the
120 : * receiver will know the length of the message from shm_mq_receive().
121 : */
122 : static int
123 1490 : mq_putmessage(char msgtype, const char *s, size_t len)
124 : {
125 : shm_mq_iovec iov[2];
126 : shm_mq_result result;
127 :
128 : /*
129 : * If we're sending a message, and we have to wait because the queue is
130 : * full, and then we get interrupted, and that interrupt results in trying
131 : * to send another message, we respond by detaching the queue. There's no
132 : * way to return to the original context, but even if there were, just
133 : * queueing the message would amount to indefinitely postponing the
134 : * response to the interrupt. So we do this instead.
135 : */
136 1490 : if (pq_mq_busy)
137 : {
138 0 : if (pq_mq_handle != NULL)
139 : {
140 0 : shm_mq_detach(pq_mq_handle);
141 0 : pfree(pq_mq_handle);
142 0 : pq_mq_handle = NULL;
143 : }
144 0 : return EOF;
145 : }
146 :
147 : /*
148 : * If the message queue is already gone, just ignore the message. This
149 : * doesn't necessarily indicate a problem; for example, DEBUG messages can
150 : * be generated late in the shutdown sequence, after all DSMs have already
151 : * been detached.
152 : */
153 1490 : if (pq_mq_handle == NULL)
154 0 : return 0;
155 :
156 1490 : pq_mq_busy = true;
157 :
158 1490 : iov[0].data = &msgtype;
159 1490 : iov[0].len = 1;
160 1490 : iov[1].data = s;
161 1490 : iov[1].len = len;
162 :
163 : for (;;)
164 : {
165 : /*
166 : * Immediately notify the receiver by passing force_flush as true so
167 : * that the shared memory value is updated before we send the parallel
168 : * message signal right after this.
169 : */
170 5 : Assert(pq_mq_handle != NULL);
171 1495 : result = shm_mq_sendv(pq_mq_handle, iov, 2, true, true);
172 :
173 1495 : if (pq_mq_parallel_leader_pid != 0)
174 : {
175 1495 : if (IsLogicalParallelApplyWorker())
176 7 : SendProcSignal(pq_mq_parallel_leader_pid,
177 : PROCSIG_PARALLEL_APPLY_MESSAGE,
178 : pq_mq_parallel_leader_proc_number);
179 : else
180 : {
181 : Assert(IsParallelWorker());
182 1488 : SendProcSignal(pq_mq_parallel_leader_pid,
183 : PROCSIG_PARALLEL_MESSAGE,
184 : pq_mq_parallel_leader_proc_number);
185 : }
186 : }
187 :
188 1495 : if (result != SHM_MQ_WOULD_BLOCK)
189 1490 : break;
190 :
191 5 : (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
192 : WAIT_EVENT_MESSAGE_QUEUE_PUT_MESSAGE);
193 5 : ResetLatch(MyLatch);
194 5 : CHECK_FOR_INTERRUPTS();
195 : }
196 :
197 1490 : pq_mq_busy = false;
198 :
199 : Assert(result == SHM_MQ_SUCCESS || result == SHM_MQ_DETACHED);
200 1490 : if (result != SHM_MQ_SUCCESS)
201 4 : return EOF;
202 1486 : return 0;
203 : }
204 :
205 : static void
206 0 : mq_putmessage_noblock(char msgtype, const char *s, size_t len)
207 : {
208 : /*
209 : * While the shm_mq machinery does support sending a message in
210 : * non-blocking mode, there's currently no way to try sending beginning to
211 : * send the message that doesn't also commit us to completing the
212 : * transmission. This could be improved in the future, but for now we
213 : * don't need it.
214 : */
215 0 : elog(ERROR, "not currently supported");
216 : }
217 :
218 : /*
219 : * Parse an ErrorResponse or NoticeResponse payload and populate an ErrorData
220 : * structure with the results.
221 : */
222 : void
223 9 : pq_parse_errornotice(StringInfo msg, ErrorData *edata)
224 : {
225 : /* Initialize edata with reasonable defaults. */
226 216 : MemSet(edata, 0, sizeof(ErrorData));
227 9 : edata->elevel = ERROR;
228 9 : edata->assoc_context = CurrentMemoryContext;
229 :
230 : /* Loop over fields and extract each one. */
231 : for (;;)
232 74 : {
233 83 : char code = pq_getmsgbyte(msg);
234 : const char *value;
235 :
236 83 : if (code == '\0')
237 : {
238 9 : pq_getmsgend(msg);
239 9 : break;
240 : }
241 74 : value = pq_getmsgrawstring(msg);
242 :
243 74 : switch (code)
244 : {
245 9 : case PG_DIAG_SEVERITY:
246 : /* ignore, trusting we'll get a nonlocalized version */
247 9 : break;
248 9 : case PG_DIAG_SEVERITY_NONLOCALIZED:
249 9 : if (strcmp(value, "DEBUG") == 0)
250 : {
251 : /*
252 : * We can't reconstruct the exact DEBUG level, but
253 : * presumably it was >= client_min_messages, so select
254 : * DEBUG1 to ensure we'll pass it on to the client.
255 : */
256 0 : edata->elevel = DEBUG1;
257 : }
258 9 : else if (strcmp(value, "LOG") == 0)
259 : {
260 : /*
261 : * It can't be LOG_SERVER_ONLY, or the worker wouldn't
262 : * have sent it to us; so LOG is the correct value.
263 : */
264 0 : edata->elevel = LOG;
265 : }
266 9 : else if (strcmp(value, "INFO") == 0)
267 0 : edata->elevel = INFO;
268 9 : else if (strcmp(value, "NOTICE") == 0)
269 0 : edata->elevel = NOTICE;
270 9 : else if (strcmp(value, "WARNING") == 0)
271 0 : edata->elevel = WARNING;
272 9 : else if (strcmp(value, "ERROR") == 0)
273 9 : edata->elevel = ERROR;
274 0 : else if (strcmp(value, "FATAL") == 0)
275 0 : edata->elevel = FATAL;
276 0 : else if (strcmp(value, "PANIC") == 0)
277 0 : edata->elevel = PANIC;
278 : else
279 0 : elog(ERROR, "unrecognized error severity: \"%s\"", value);
280 9 : break;
281 9 : case PG_DIAG_SQLSTATE:
282 9 : if (strlen(value) != 5)
283 0 : elog(ERROR, "invalid SQLSTATE: \"%s\"", value);
284 9 : edata->sqlerrcode = MAKE_SQLSTATE(value[0], value[1], value[2],
285 : value[3], value[4]);
286 9 : break;
287 9 : case PG_DIAG_MESSAGE_PRIMARY:
288 9 : edata->message = pstrdup(value);
289 9 : break;
290 2 : case PG_DIAG_MESSAGE_DETAIL:
291 2 : edata->detail = pstrdup(value);
292 2 : break;
293 3 : case PG_DIAG_MESSAGE_HINT:
294 3 : edata->hint = pstrdup(value);
295 3 : break;
296 0 : case PG_DIAG_STATEMENT_POSITION:
297 0 : edata->cursorpos = pg_strtoint32(value);
298 0 : break;
299 0 : case PG_DIAG_INTERNAL_POSITION:
300 0 : edata->internalpos = pg_strtoint32(value);
301 0 : break;
302 0 : case PG_DIAG_INTERNAL_QUERY:
303 0 : edata->internalquery = pstrdup(value);
304 0 : break;
305 6 : case PG_DIAG_CONTEXT:
306 6 : edata->context = pstrdup(value);
307 6 : break;
308 0 : case PG_DIAG_SCHEMA_NAME:
309 0 : edata->schema_name = pstrdup(value);
310 0 : break;
311 0 : case PG_DIAG_TABLE_NAME:
312 0 : edata->table_name = pstrdup(value);
313 0 : break;
314 0 : case PG_DIAG_COLUMN_NAME:
315 0 : edata->column_name = pstrdup(value);
316 0 : break;
317 0 : case PG_DIAG_DATATYPE_NAME:
318 0 : edata->datatype_name = pstrdup(value);
319 0 : break;
320 0 : case PG_DIAG_CONSTRAINT_NAME:
321 0 : edata->constraint_name = pstrdup(value);
322 0 : break;
323 9 : case PG_DIAG_SOURCE_FILE:
324 9 : edata->filename = pstrdup(value);
325 9 : break;
326 9 : case PG_DIAG_SOURCE_LINE:
327 9 : edata->lineno = pg_strtoint32(value);
328 9 : break;
329 9 : case PG_DIAG_SOURCE_FUNCTION:
330 9 : edata->funcname = pstrdup(value);
331 9 : break;
332 0 : default:
333 0 : elog(ERROR, "unrecognized error field code: %d", code);
334 : break;
335 : }
336 : }
337 9 : }
|