LCOV - code coverage report
Current view: top level - src/include/libpq - libpq-be-fe-helpers.h (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 82.6 % 115 95
Test Date: 2026-02-17 17:20:33 Functions: 100.0 % 10 10
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * libpq-be-fe-helpers.h
       4              :  *    Helper functions for using libpq in extensions
       5              :  *
       6              :  * Code built directly into the backend is not allowed to link to libpq
       7              :  * directly. Extension code is allowed to use libpq however. However, libpq
       8              :  * used in extensions has to be careful not to block inside libpq, otherwise
       9              :  * interrupts will not be processed, leading to issues like unresolvable
      10              :  * deadlocks. Backend code also needs to take care to acquire/release an
      11              :  * external fd for the connection, otherwise fd.c's accounting of fd's is
      12              :  * broken.
      13              :  *
      14              :  * This file provides helper functions to make it easier to comply with these
      15              :  * rules. It is a header only library as it needs to be linked into each
      16              :  * extension using libpq, and it seems too small to be worth adding a
      17              :  * dedicated static library for.
      18              :  *
      19              :  * TODO: For historical reasons the connections established here are not put
      20              :  * into non-blocking mode. That can lead to blocking even when only the async
      21              :  * libpq functions are used. This should be fixed.
      22              :  *
      23              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
      24              :  * Portions Copyright (c) 1994, Regents of the University of California
      25              :  *
      26              :  * src/include/libpq/libpq-be-fe-helpers.h
      27              :  *
      28              :  *-------------------------------------------------------------------------
      29              :  */
      30              : #ifndef LIBPQ_BE_FE_HELPERS_H
      31              : #define LIBPQ_BE_FE_HELPERS_H
      32              : 
      33              : #include "libpq/libpq-be-fe.h"
      34              : #include "miscadmin.h"
      35              : #include "storage/fd.h"
      36              : #include "storage/latch.h"
      37              : #include "utils/timestamp.h"
      38              : #include "utils/wait_event.h"
      39              : 
      40              : 
      41              : static inline void libpqsrv_connect_prepare(void);
      42              : static inline void libpqsrv_connect_internal(PGconn *conn, uint32 wait_event_info);
      43              : static inline PGresult *libpqsrv_get_result_last(PGconn *conn, uint32 wait_event_info);
      44              : static inline PGresult *libpqsrv_get_result(PGconn *conn, uint32 wait_event_info);
      45              : 
      46              : 
      47              : /*
      48              :  * PQconnectdb() wrapper that reserves a file descriptor and processes
      49              :  * interrupts during connection establishment.
      50              :  *
      51              :  * Throws an error if AcquireExternalFD() fails, but does not throw if
      52              :  * connection establishment itself fails. Callers need to use PQstatus() to
      53              :  * check if connection establishment succeeded.
      54              :  */
      55              : static inline PGconn *
      56           22 : libpqsrv_connect(const char *conninfo, uint32 wait_event_info)
      57              : {
      58           22 :     PGconn     *conn = NULL;
      59              : 
      60           22 :     libpqsrv_connect_prepare();
      61              : 
      62           22 :     conn = PQconnectStart(conninfo);
      63              : 
      64           22 :     libpqsrv_connect_internal(conn, wait_event_info);
      65              : 
      66           22 :     return conn;
      67              : }
      68              : 
      69              : /*
      70              :  * Like libpqsrv_connect(), except that this is a wrapper for
      71              :  * PQconnectdbParams().
      72              :  */
      73              : static inline PGconn *
      74         1033 : libpqsrv_connect_params(const char *const *keywords,
      75              :                         const char *const *values,
      76              :                         int expand_dbname,
      77              :                         uint32 wait_event_info)
      78              : {
      79         1033 :     PGconn     *conn = NULL;
      80              : 
      81         1033 :     libpqsrv_connect_prepare();
      82              : 
      83         1033 :     conn = PQconnectStartParams(keywords, values, expand_dbname);
      84              : 
      85         1033 :     libpqsrv_connect_internal(conn, wait_event_info);
      86              : 
      87         1032 :     return conn;
      88              : }
      89              : 
      90              : /*
      91              :  * PQfinish() wrapper that additionally releases the reserved file descriptor.
      92              :  *
      93              :  * It is allowed to call this with a NULL pgconn iff NULL was returned by
      94              :  * libpqsrv_connect*.
      95              :  */
      96              : static inline void
      97         1044 : libpqsrv_disconnect(PGconn *conn)
      98              : {
      99              :     /*
     100              :      * If no connection was established, we haven't reserved an FD for it (or
     101              :      * already released it). This rule makes it easier to write PG_CATCH()
     102              :      * handlers for this facility's users.
     103              :      *
     104              :      * See also libpqsrv_connect_internal().
     105              :      */
     106         1044 :     if (conn == NULL)
     107            2 :         return;
     108              : 
     109         1042 :     ReleaseExternalFD();
     110         1042 :     PQfinish(conn);
     111              : }
     112              : 
     113              : 
     114              : /* internal helper functions follow */
     115              : 
     116              : 
     117              : /*
     118              :  * Helper function for all connection establishment functions.
     119              :  */
     120              : static inline void
     121         1055 : libpqsrv_connect_prepare(void)
     122              : {
     123              :     /*
     124              :      * We must obey fd.c's limit on non-virtual file descriptors.  Assume that
     125              :      * a PGconn represents one long-lived FD.  (Doing this here also ensures
     126              :      * that VFDs are closed if needed to make room.)
     127              :      */
     128         1055 :     if (!AcquireExternalFD())
     129              :     {
     130              : #ifndef WIN32                   /* can't write #if within ereport() macro */
     131            0 :         ereport(ERROR,
     132              :                 (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
     133              :                  errmsg("could not establish connection"),
     134              :                  errdetail("There are too many open files on the local server."),
     135              :                  errhint("Raise the server's \"max_files_per_process\" and/or \"ulimit -n\" limits.")));
     136              : #else
     137              :         ereport(ERROR,
     138              :                 (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
     139              :                  errmsg("could not establish connection"),
     140              :                  errdetail("There are too many open files on the local server."),
     141              :                  errhint("Raise the server's \"max_files_per_process\" setting.")));
     142              : #endif
     143              :     }
     144         1055 : }
     145              : 
     146              : /*
     147              :  * Helper function for all connection establishment functions.
     148              :  */
     149              : static inline void
     150         1055 : libpqsrv_connect_internal(PGconn *conn, uint32 wait_event_info)
     151              : {
     152              :     /*
     153              :      * With conn == NULL libpqsrv_disconnect() wouldn't release the FD. So do
     154              :      * that here.
     155              :      */
     156         1055 :     if (conn == NULL)
     157              :     {
     158            0 :         ReleaseExternalFD();
     159            0 :         return;
     160              :     }
     161              : 
     162              :     /*
     163              :      * Can't wait without a socket. Note that we don't want to close the libpq
     164              :      * connection yet, so callers can emit a useful error.
     165              :      */
     166         1055 :     if (PQstatus(conn) == CONNECTION_BAD)
     167           61 :         return;
     168              : 
     169              :     /*
     170              :      * WaitLatchOrSocket() can conceivably fail, handle that case here instead
     171              :      * of requiring all callers to do so.
     172              :      */
     173          994 :     PG_TRY();
     174              :     {
     175              :         PostgresPollingStatusType status;
     176              : 
     177              :         /*
     178              :          * Poll connection until we have OK or FAILED status.
     179              :          *
     180              :          * Per spec for PQconnectPoll, first wait till socket is write-ready.
     181              :          */
     182          994 :         status = PGRES_POLLING_WRITING;
     183         4665 :         while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED)
     184              :         {
     185              :             int         io_flag;
     186              :             int         rc;
     187              : 
     188         2678 :             if (status == PGRES_POLLING_READING)
     189         1009 :                 io_flag = WL_SOCKET_READABLE;
     190              : #ifdef WIN32
     191              : 
     192              :             /*
     193              :              * Windows needs a different test while waiting for
     194              :              * connection-made
     195              :              */
     196              :             else if (PQstatus(conn) == CONNECTION_STARTED)
     197              :                 io_flag = WL_SOCKET_CONNECTED;
     198              : #endif
     199              :             else
     200         1669 :                 io_flag = WL_SOCKET_WRITEABLE;
     201              : 
     202         2678 :             rc = WaitLatchOrSocket(MyLatch,
     203              :                                    WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | io_flag,
     204              :                                    PQsocket(conn),
     205              :                                    0,
     206              :                                    wait_event_info);
     207              : 
     208              :             /* Interrupted? */
     209         2678 :             if (rc & WL_LATCH_SET)
     210              :             {
     211          677 :                 ResetLatch(MyLatch);
     212          677 :                 CHECK_FOR_INTERRUPTS();
     213              :             }
     214              : 
     215              :             /* If socket is ready, advance the libpq state machine */
     216         2677 :             if (rc & io_flag)
     217         2001 :                 status = PQconnectPoll(conn);
     218              :         }
     219              :     }
     220            0 :     PG_CATCH();
     221              :     {
     222              :         /*
     223              :          * If an error is thrown here, the callers won't call
     224              :          * libpqsrv_disconnect() with a conn, so release resources
     225              :          * immediately.
     226              :          */
     227            0 :         ReleaseExternalFD();
     228            0 :         PQfinish(conn);
     229              : 
     230            0 :         PG_RE_THROW();
     231              :     }
     232          993 :     PG_END_TRY();
     233              : }
     234              : 
     235              : /*
     236              :  * PQexec() wrapper that processes interrupts.
     237              :  *
     238              :  * Unless PQsetnonblocking(conn, 1) is in effect, this can't process
     239              :  * interrupts while pushing the query text to the server.  Consider that
     240              :  * setting if query strings can be long relative to TCP buffer size.
     241              :  *
     242              :  * This has the preconditions of PQsendQuery(), not those of PQexec().  Most
     243              :  * notably, PQexec() would silently discard any prior query results.
     244              :  */
     245              : static inline PGresult *
     246         4034 : libpqsrv_exec(PGconn *conn, const char *query, uint32 wait_event_info)
     247              : {
     248         4034 :     if (!PQsendQuery(conn, query))
     249            0 :         return NULL;
     250         4034 :     return libpqsrv_get_result_last(conn, wait_event_info);
     251              : }
     252              : 
     253              : /*
     254              :  * PQexecParams() wrapper that processes interrupts.
     255              :  *
     256              :  * See notes at libpqsrv_exec().
     257              :  */
     258              : static inline PGresult *
     259              : libpqsrv_exec_params(PGconn *conn,
     260              :                      const char *command,
     261              :                      int nParams,
     262              :                      const Oid *paramTypes,
     263              :                      const char *const *paramValues,
     264              :                      const int *paramLengths,
     265              :                      const int *paramFormats,
     266              :                      int resultFormat,
     267              :                      uint32 wait_event_info)
     268              : {
     269              :     if (!PQsendQueryParams(conn, command, nParams, paramTypes, paramValues,
     270              :                            paramLengths, paramFormats, resultFormat))
     271              :         return NULL;
     272              :     return libpqsrv_get_result_last(conn, wait_event_info);
     273              : }
     274              : 
     275              : /*
     276              :  * Like PQexec(), loop over PQgetResult() until it returns NULL or another
     277              :  * terminal state.  Return the last non-NULL result or the terminal state.
     278              :  */
     279              : static inline PGresult *
     280        12319 : libpqsrv_get_result_last(PGconn *conn, uint32 wait_event_info)
     281              : {
     282        12319 :     PGresult   *lastResult = NULL;
     283              : 
     284              :     for (;;)
     285        11560 :     {
     286              :         /* Wait for, and collect, the next PGresult. */
     287              :         PGresult   *result;
     288              : 
     289        23879 :         result = libpqsrv_get_result(conn, wait_event_info);
     290        23877 :         if (result == NULL)
     291        11558 :             break;              /* query is complete, or failure */
     292              : 
     293              :         /*
     294              :          * Emulate PQexec()'s behavior of returning the last result when there
     295              :          * are many.
     296              :          */
     297        12319 :         PQclear(lastResult);
     298        12319 :         lastResult = result;
     299              : 
     300        24638 :         if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
     301        24442 :             PQresultStatus(lastResult) == PGRES_COPY_OUT ||
     302        23685 :             PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
     303        11562 :             PQstatus(conn) == CONNECTION_BAD)
     304              :             break;
     305              :     }
     306        12317 :     return lastResult;
     307              : }
     308              : 
     309              : /*
     310              :  * Perform the equivalent of PQgetResult(), but watch for interrupts.
     311              :  */
     312              : static inline PGresult *
     313        25164 : libpqsrv_get_result(PGconn *conn, uint32 wait_event_info)
     314              : {
     315              :     /*
     316              :      * Collect data until PQgetResult is ready to get the result without
     317              :      * blocking.
     318              :      */
     319        37407 :     while (PQisBusy(conn))
     320              :     {
     321              :         int         rc;
     322              : 
     323        12286 :         rc = WaitLatchOrSocket(MyLatch,
     324              :                                WL_EXIT_ON_PM_DEATH | WL_LATCH_SET |
     325              :                                WL_SOCKET_READABLE,
     326              :                                PQsocket(conn),
     327              :                                0,
     328              :                                wait_event_info);
     329              : 
     330              :         /* Interrupted? */
     331        12286 :         if (rc & WL_LATCH_SET)
     332              :         {
     333           17 :             ResetLatch(MyLatch);
     334           17 :             CHECK_FOR_INTERRUPTS();
     335              :         }
     336              : 
     337              :         /* Consume whatever data is available from the socket */
     338        12284 :         if (PQconsumeInput(conn) == 0)
     339              :         {
     340              :             /* trouble; expect PQgetResult() to return NULL */
     341           41 :             break;
     342              :         }
     343              :     }
     344              : 
     345              :     /* Now we can collect and return the next PGresult */
     346        25162 :     return PQgetResult(conn);
     347              : }
     348              : 
     349              : /*
     350              :  * Submit a cancel request to the given connection, waiting only until
     351              :  * the given time.
     352              :  *
     353              :  * We sleep interruptibly until we receive confirmation that the cancel
     354              :  * request has been accepted, and if it is, return NULL; if the cancel
     355              :  * request fails, return an error message string (which is not to be
     356              :  * freed).
     357              :  *
     358              :  * For other problems (to wit: OOM when strdup'ing an error message from
     359              :  * libpq), this function can ereport(ERROR).
     360              :  *
     361              :  * Note: this function leaks a string's worth of memory when reporting
     362              :  * libpq errors.  Make sure to call it in a transient memory context.
     363              :  */
     364              : static inline const char *
     365            2 : libpqsrv_cancel(PGconn *conn, TimestampTz endtime)
     366              : {
     367              :     PGcancelConn *cancel_conn;
     368            2 :     const char *error = NULL;
     369              : 
     370            2 :     cancel_conn = PQcancelCreate(conn);
     371            2 :     if (cancel_conn == NULL)
     372            0 :         return "out of memory";
     373              : 
     374              :     /* In what follows, do not leak any PGcancelConn on any errors. */
     375              : 
     376            2 :     PG_TRY();
     377              :     {
     378            2 :         if (!PQcancelStart(cancel_conn))
     379              :         {
     380            0 :             error = pchomp(PQcancelErrorMessage(cancel_conn));
     381            0 :             goto exit;
     382              :         }
     383              : 
     384              :         for (;;)
     385            2 :         {
     386              :             PostgresPollingStatusType pollres;
     387              :             TimestampTz now;
     388              :             long        cur_timeout;
     389            4 :             int         waitEvents = WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH;
     390              : 
     391            4 :             pollres = PQcancelPoll(cancel_conn);
     392            4 :             if (pollres == PGRES_POLLING_OK)
     393            2 :                 break;          /* success! */
     394              : 
     395              :             /* If timeout has expired, give up, else get sleep time. */
     396            2 :             now = GetCurrentTimestamp();
     397            2 :             cur_timeout = TimestampDifferenceMilliseconds(now, endtime);
     398            2 :             if (cur_timeout <= 0)
     399              :             {
     400            0 :                 error = "cancel request timed out";
     401            0 :                 break;
     402              :             }
     403              : 
     404            2 :             switch (pollres)
     405              :             {
     406            2 :                 case PGRES_POLLING_READING:
     407            2 :                     waitEvents |= WL_SOCKET_READABLE;
     408            2 :                     break;
     409            0 :                 case PGRES_POLLING_WRITING:
     410            0 :                     waitEvents |= WL_SOCKET_WRITEABLE;
     411            0 :                     break;
     412            0 :                 default:
     413            0 :                     error = pchomp(PQcancelErrorMessage(cancel_conn));
     414            0 :                     goto exit;
     415              :             }
     416              : 
     417              :             /* Sleep until there's something to do */
     418            2 :             WaitLatchOrSocket(MyLatch, waitEvents, PQcancelSocket(cancel_conn),
     419              :                               cur_timeout, PG_WAIT_CLIENT);
     420              : 
     421            2 :             ResetLatch(MyLatch);
     422              : 
     423            2 :             CHECK_FOR_INTERRUPTS();
     424              :         }
     425            2 : exit:   ;
     426              :     }
     427            0 :     PG_FINALLY();
     428              :     {
     429            2 :         PQcancelFinish(cancel_conn);
     430              :     }
     431            2 :     PG_END_TRY();
     432              : 
     433            2 :     return error;
     434              : }
     435              : 
     436              : /*
     437              :  * libpqsrv_notice_receiver
     438              :  *
     439              :  * Custom notice receiver for libpq connections.
     440              :  *
     441              :  * This function is intended to be set via PQsetNoticeReceiver() so that
     442              :  * NOTICE, WARNING, and similar messages from the connection are reported via
     443              :  * ereport(), instead of being printed to stderr.
     444              :  *
     445              :  * Because this will be called from libpq with a "real" (not wrapped)
     446              :  * PGresult, we need to temporarily ignore libpq-be-fe.h's wrapper macros
     447              :  * for PGresult and also PQresultErrorMessage, and put back the wrappers
     448              :  * afterwards.  That's not pretty, but there seems no better alternative.
     449              :  */
     450              : #undef PGresult
     451              : #undef PQresultErrorMessage
     452              : 
     453              : static inline void
     454           11 : libpqsrv_notice_receiver(void *arg, const PGresult *res)
     455              : {
     456              :     const char *message;
     457              :     int         len;
     458           11 :     const char *prefix = (const char *) arg;
     459              : 
     460              :     /*
     461              :      * Trim the trailing newline from the message text returned from
     462              :      * PQresultErrorMessage(), as it always includes one, to produce cleaner
     463              :      * log output.
     464              :      */
     465           11 :     message = PQresultErrorMessage(res);
     466           11 :     len = strlen(message);
     467           11 :     if (len > 0 && message[len - 1] == '\n')
     468           11 :         len--;
     469              : 
     470           11 :     ereport(LOG,
     471              :             errmsg_internal("%s: %.*s", prefix, len, message));
     472           11 : }
     473              : 
     474              : #define PGresult libpqsrv_PGresult
     475              : #define PQresultErrorMessage libpqsrv_PQresultErrorMessage
     476              : 
     477              : #endif                          /* LIBPQ_BE_FE_HELPERS_H */
        

Generated by: LCOV version 2.0-1