Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * libpq-be-fe-helpers.h
4 : * Helper functions for using libpq in extensions
5 : *
6 : * Code built directly into the backend is not allowed to link to libpq
7 : * directly. Extension code is allowed to use libpq however. However, libpq
8 : * used in extensions has to be careful not to block inside libpq, otherwise
9 : * interrupts will not be processed, leading to issues like unresolvable
10 : * deadlocks. Backend code also needs to take care to acquire/release an
11 : * external fd for the connection, otherwise fd.c's accounting of fd's is
12 : * broken.
13 : *
14 : * This file provides helper functions to make it easier to comply with these
15 : * rules. It is a header only library as it needs to be linked into each
16 : * extension using libpq, and it seems too small to be worth adding a
17 : * dedicated static library for.
18 : *
19 : * TODO: For historical reasons the connections established here are not put
20 : * into non-blocking mode. That can lead to blocking even when only the async
21 : * libpq functions are used. This should be fixed.
22 : *
23 : * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
24 : * Portions Copyright (c) 1994, Regents of the University of California
25 : *
26 : * src/include/libpq/libpq-be-fe-helpers.h
27 : *
28 : *-------------------------------------------------------------------------
29 : */
30 : #ifndef LIBPQ_BE_FE_HELPERS_H
31 : #define LIBPQ_BE_FE_HELPERS_H
32 :
33 : /*
34 : * Despite the name, BUILDING_DLL is set only when building code directly part
35 : * of the backend. Which also is where libpq isn't allowed to be
36 : * used. Obviously this doesn't protect against libpq-fe.h getting included
37 : * otherwise, but perhaps still protects against a few mistakes...
38 : */
39 : #ifdef BUILDING_DLL
40 : #error "libpq may not be used code directly built into the backend"
41 : #endif
42 :
43 : #include "libpq-fe.h"
44 : #include "miscadmin.h"
45 : #include "storage/fd.h"
46 : #include "storage/latch.h"
47 : #include "utils/timestamp.h"
48 : #include "utils/wait_event.h"
49 :
50 :
51 : static inline void libpqsrv_connect_prepare(void);
52 : static inline void libpqsrv_connect_internal(PGconn *conn, uint32 wait_event_info);
53 : static inline PGresult *libpqsrv_get_result_last(PGconn *conn, uint32 wait_event_info);
54 : static inline PGresult *libpqsrv_get_result(PGconn *conn, uint32 wait_event_info);
55 :
56 :
57 : /*
58 : * PQconnectdb() wrapper that reserves a file descriptor and processes
59 : * interrupts during connection establishment.
60 : *
61 : * Throws an error if AcquireExternalFD() fails, but does not throw if
62 : * connection establishment itself fails. Callers need to use PQstatus() to
63 : * check if connection establishment succeeded.
64 : */
65 : static inline PGconn *
66 42 : libpqsrv_connect(const char *conninfo, uint32 wait_event_info)
67 : {
68 42 : PGconn *conn = NULL;
69 :
70 42 : libpqsrv_connect_prepare();
71 :
72 42 : conn = PQconnectStart(conninfo);
73 :
74 42 : libpqsrv_connect_internal(conn, wait_event_info);
75 :
76 42 : return conn;
77 : }
78 :
79 : /*
80 : * Like libpqsrv_connect(), except that this is a wrapper for
81 : * PQconnectdbParams().
82 : */
83 : static inline PGconn *
84 130 : libpqsrv_connect_params(const char *const *keywords,
85 : const char *const *values,
86 : int expand_dbname,
87 : uint32 wait_event_info)
88 : {
89 130 : PGconn *conn = NULL;
90 :
91 130 : libpqsrv_connect_prepare();
92 :
93 130 : conn = PQconnectStartParams(keywords, values, expand_dbname);
94 :
95 130 : libpqsrv_connect_internal(conn, wait_event_info);
96 :
97 130 : return conn;
98 : }
99 :
100 : /*
101 : * PQfinish() wrapper that additionally releases the reserved file descriptor.
102 : *
103 : * It is allowed to call this with a NULL pgconn iff NULL was returned by
104 : * libpqsrv_connect*.
105 : */
106 : static inline void
107 164 : libpqsrv_disconnect(PGconn *conn)
108 : {
109 : /*
110 : * If no connection was established, we haven't reserved an FD for it (or
111 : * already released it). This rule makes it easier to write PG_CATCH()
112 : * handlers for this facility's users.
113 : *
114 : * See also libpqsrv_connect_internal().
115 : */
116 164 : if (conn == NULL)
117 4 : return;
118 :
119 160 : ReleaseExternalFD();
120 160 : PQfinish(conn);
121 : }
122 :
123 :
124 : /* internal helper functions follow */
125 :
126 :
127 : /*
128 : * Helper function for all connection establishment functions.
129 : */
130 : static inline void
131 172 : libpqsrv_connect_prepare(void)
132 : {
133 : /*
134 : * We must obey fd.c's limit on non-virtual file descriptors. Assume that
135 : * a PGconn represents one long-lived FD. (Doing this here also ensures
136 : * that VFDs are closed if needed to make room.)
137 : */
138 172 : if (!AcquireExternalFD())
139 : {
140 : #ifndef WIN32 /* can't write #if within ereport() macro */
141 0 : ereport(ERROR,
142 : (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
143 : errmsg("could not establish connection"),
144 : errdetail("There are too many open files on the local server."),
145 : errhint("Raise the server's \"max_files_per_process\" and/or \"ulimit -n\" limits.")));
146 : #else
147 : ereport(ERROR,
148 : (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
149 : errmsg("could not establish connection"),
150 : errdetail("There are too many open files on the local server."),
151 : errhint("Raise the server's \"max_files_per_process\" setting.")));
152 : #endif
153 : }
154 172 : }
155 :
156 : /*
157 : * Helper function for all connection establishment functions.
158 : */
159 : static inline void
160 172 : libpqsrv_connect_internal(PGconn *conn, uint32 wait_event_info)
161 : {
162 : /*
163 : * With conn == NULL libpqsrv_disconnect() wouldn't release the FD. So do
164 : * that here.
165 : */
166 172 : if (conn == NULL)
167 : {
168 0 : ReleaseExternalFD();
169 0 : return;
170 : }
171 :
172 : /*
173 : * Can't wait without a socket. Note that we don't want to close the libpq
174 : * connection yet, so callers can emit a useful error.
175 : */
176 172 : if (PQstatus(conn) == CONNECTION_BAD)
177 4 : return;
178 :
179 : /*
180 : * WaitLatchOrSocket() can conceivably fail, handle that case here instead
181 : * of requiring all callers to do so.
182 : */
183 168 : PG_TRY();
184 : {
185 : PostgresPollingStatusType status;
186 :
187 : /*
188 : * Poll connection until we have OK or FAILED status.
189 : *
190 : * Per spec for PQconnectPoll, first wait till socket is write-ready.
191 : */
192 168 : status = PGRES_POLLING_WRITING;
193 506 : while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED)
194 : {
195 : int io_flag;
196 : int rc;
197 :
198 338 : if (status == PGRES_POLLING_READING)
199 168 : io_flag = WL_SOCKET_READABLE;
200 : #ifdef WIN32
201 :
202 : /*
203 : * Windows needs a different test while waiting for
204 : * connection-made
205 : */
206 : else if (PQstatus(conn) == CONNECTION_STARTED)
207 : io_flag = WL_SOCKET_CONNECTED;
208 : #endif
209 : else
210 170 : io_flag = WL_SOCKET_WRITEABLE;
211 :
212 338 : rc = WaitLatchOrSocket(MyLatch,
213 : WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | io_flag,
214 : PQsocket(conn),
215 : 0,
216 : wait_event_info);
217 :
218 : /* Interrupted? */
219 338 : if (rc & WL_LATCH_SET)
220 : {
221 2 : ResetLatch(MyLatch);
222 2 : CHECK_FOR_INTERRUPTS();
223 : }
224 :
225 : /* If socket is ready, advance the libpq state machine */
226 338 : if (rc & io_flag)
227 336 : status = PQconnectPoll(conn);
228 : }
229 : }
230 0 : PG_CATCH();
231 : {
232 : /*
233 : * If an error is thrown here, the callers won't call
234 : * libpqsrv_disconnect() with a conn, so release resources
235 : * immediately.
236 : */
237 0 : ReleaseExternalFD();
238 0 : PQfinish(conn);
239 :
240 0 : PG_RE_THROW();
241 : }
242 168 : PG_END_TRY();
243 : }
244 :
245 : /*
246 : * PQexec() wrapper that processes interrupts.
247 : *
248 : * Unless PQsetnonblocking(conn, 1) is in effect, this can't process
249 : * interrupts while pushing the query text to the server. Consider that
250 : * setting if query strings can be long relative to TCP buffer size.
251 : *
252 : * This has the preconditions of PQsendQuery(), not those of PQexec(). Most
253 : * notably, PQexec() would silently discard any prior query results.
254 : */
255 : static inline PGresult *
256 124 : libpqsrv_exec(PGconn *conn, const char *query, uint32 wait_event_info)
257 : {
258 124 : if (!PQsendQuery(conn, query))
259 0 : return NULL;
260 124 : return libpqsrv_get_result_last(conn, wait_event_info);
261 : }
262 :
263 : /*
264 : * PQexecParams() wrapper that processes interrupts.
265 : *
266 : * See notes at libpqsrv_exec().
267 : */
268 : static inline PGresult *
269 : libpqsrv_exec_params(PGconn *conn,
270 : const char *command,
271 : int nParams,
272 : const Oid *paramTypes,
273 : const char *const *paramValues,
274 : const int *paramLengths,
275 : const int *paramFormats,
276 : int resultFormat,
277 : uint32 wait_event_info)
278 : {
279 : if (!PQsendQueryParams(conn, command, nParams, paramTypes, paramValues,
280 : paramLengths, paramFormats, resultFormat))
281 : return NULL;
282 : return libpqsrv_get_result_last(conn, wait_event_info);
283 : }
284 :
285 : /*
286 : * Like PQexec(), loop over PQgetResult() until it returns NULL or another
287 : * terminal state. Return the last non-NULL result or the terminal state.
288 : */
289 : static inline PGresult *
290 15916 : libpqsrv_get_result_last(PGconn *conn, uint32 wait_event_info)
291 : {
292 15916 : PGresult *volatile lastResult = NULL;
293 :
294 : /* In what follows, do not leak any PGresults on an error. */
295 15916 : PG_TRY();
296 : {
297 : for (;;)
298 15914 : {
299 : /* Wait for, and collect, the next PGresult. */
300 : PGresult *result;
301 :
302 31830 : result = libpqsrv_get_result(conn, wait_event_info);
303 31828 : if (result == NULL)
304 15910 : break; /* query is complete, or failure */
305 :
306 : /*
307 : * Emulate PQexec()'s behavior of returning the last result when
308 : * there are many.
309 : */
310 15918 : PQclear(lastResult);
311 15918 : lastResult = result;
312 :
313 15918 : if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
314 15918 : PQresultStatus(lastResult) == PGRES_COPY_OUT ||
315 15918 : PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
316 15918 : PQstatus(conn) == CONNECTION_BAD)
317 : break;
318 : }
319 : }
320 2 : PG_CATCH();
321 : {
322 2 : PQclear(lastResult);
323 2 : PG_RE_THROW();
324 : }
325 15914 : PG_END_TRY();
326 :
327 15914 : return lastResult;
328 : }
329 :
330 : /*
331 : * Perform the equivalent of PQgetResult(), but watch for interrupts.
332 : */
333 : static inline PGresult *
334 32194 : libpqsrv_get_result(PGconn *conn, uint32 wait_event_info)
335 : {
336 : /*
337 : * Collect data until PQgetResult is ready to get the result without
338 : * blocking.
339 : */
340 47960 : while (PQisBusy(conn))
341 : {
342 : int rc;
343 :
344 15772 : rc = WaitLatchOrSocket(MyLatch,
345 : WL_EXIT_ON_PM_DEATH | WL_LATCH_SET |
346 : WL_SOCKET_READABLE,
347 : PQsocket(conn),
348 : 0,
349 : wait_event_info);
350 :
351 : /* Interrupted? */
352 15772 : if (rc & WL_LATCH_SET)
353 : {
354 4 : ResetLatch(MyLatch);
355 4 : CHECK_FOR_INTERRUPTS();
356 : }
357 :
358 : /* Consume whatever data is available from the socket */
359 15770 : if (PQconsumeInput(conn) == 0)
360 : {
361 : /* trouble; expect PQgetResult() to return NULL */
362 4 : break;
363 : }
364 : }
365 :
366 : /* Now we can collect and return the next PGresult */
367 32192 : return PQgetResult(conn);
368 : }
369 :
370 : /*
371 : * Submit a cancel request to the given connection, waiting only until
372 : * the given time.
373 : *
374 : * We sleep interruptibly until we receive confirmation that the cancel
375 : * request has been accepted, and if it is, return NULL; if the cancel
376 : * request fails, return an error message string (which is not to be
377 : * freed).
378 : *
379 : * For other problems (to wit: OOM when strdup'ing an error message from
380 : * libpq), this function can ereport(ERROR).
381 : *
382 : * Note: this function leaks a string's worth of memory when reporting
383 : * libpq errors. Make sure to call it in a transient memory context.
384 : */
385 : static inline const char *
386 4 : libpqsrv_cancel(PGconn *conn, TimestampTz endtime)
387 : {
388 : PGcancelConn *cancel_conn;
389 4 : const char *error = NULL;
390 :
391 4 : cancel_conn = PQcancelCreate(conn);
392 4 : if (cancel_conn == NULL)
393 0 : return "out of memory";
394 :
395 : /* In what follows, do not leak any PGcancelConn on any errors. */
396 :
397 4 : PG_TRY();
398 : {
399 4 : if (!PQcancelStart(cancel_conn))
400 : {
401 0 : error = pchomp(PQcancelErrorMessage(cancel_conn));
402 0 : goto exit;
403 : }
404 :
405 : for (;;)
406 4 : {
407 : PostgresPollingStatusType pollres;
408 : TimestampTz now;
409 : long cur_timeout;
410 8 : int waitEvents = WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH;
411 :
412 8 : pollres = PQcancelPoll(cancel_conn);
413 8 : if (pollres == PGRES_POLLING_OK)
414 4 : break; /* success! */
415 :
416 : /* If timeout has expired, give up, else get sleep time. */
417 4 : now = GetCurrentTimestamp();
418 4 : cur_timeout = TimestampDifferenceMilliseconds(now, endtime);
419 4 : if (cur_timeout <= 0)
420 : {
421 0 : error = "cancel request timed out";
422 0 : break;
423 : }
424 :
425 4 : switch (pollres)
426 : {
427 4 : case PGRES_POLLING_READING:
428 4 : waitEvents |= WL_SOCKET_READABLE;
429 4 : break;
430 0 : case PGRES_POLLING_WRITING:
431 0 : waitEvents |= WL_SOCKET_WRITEABLE;
432 0 : break;
433 0 : default:
434 0 : error = pchomp(PQcancelErrorMessage(cancel_conn));
435 0 : goto exit;
436 : }
437 :
438 : /* Sleep until there's something to do */
439 4 : WaitLatchOrSocket(MyLatch, waitEvents, PQcancelSocket(cancel_conn),
440 : cur_timeout, PG_WAIT_CLIENT);
441 :
442 4 : ResetLatch(MyLatch);
443 :
444 4 : CHECK_FOR_INTERRUPTS();
445 : }
446 4 : exit: ;
447 : }
448 0 : PG_FINALLY();
449 : {
450 4 : PQcancelFinish(cancel_conn);
451 : }
452 4 : PG_END_TRY();
453 :
454 4 : return error;
455 : }
456 :
457 : #endif /* LIBPQ_BE_FE_HELPERS_H */
|