LCOV - code coverage report
Current view: top level - src/include/libpq - libpq-be-fe-helpers.h (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 81.7 % 109 89
Test Date: 2026-05-25 02:16:27 Functions: 100.0 % 10 10
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * libpq-be-fe-helpers.h
       4              :  *    Helper functions for using libpq in extensions
       5              :  *
       6              :  * Code built directly into the backend is not allowed to link to libpq
       7              :  * directly. Extension code is allowed to use libpq however. However, libpq
       8              :  * used in extensions has to be careful not to block inside libpq, otherwise
       9              :  * interrupts will not be processed, leading to issues like unresolvable
      10              :  * deadlocks. Backend code also needs to take care to acquire/release an
      11              :  * external fd for the connection, otherwise fd.c's accounting of fd's is
      12              :  * broken.
      13              :  *
      14              :  * This file provides helper functions to make it easier to comply with these
      15              :  * rules. It is a header only library as it needs to be linked into each
      16              :  * extension using libpq, and it seems too small to be worth adding a
      17              :  * dedicated static library for.
      18              :  *
      19              :  * TODO: For historical reasons the connections established here are not put
      20              :  * into non-blocking mode. That can lead to blocking even when only the async
      21              :  * libpq functions are used. This should be fixed.
      22              :  *
      23              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
      24              :  * Portions Copyright (c) 1994, Regents of the University of California
      25              :  *
      26              :  * src/include/libpq/libpq-be-fe-helpers.h
      27              :  *
      28              :  *-------------------------------------------------------------------------
      29              :  */
      30              : #ifndef LIBPQ_BE_FE_HELPERS_H
      31              : #define LIBPQ_BE_FE_HELPERS_H
      32              : 
      33              : #include "libpq/libpq-be-fe.h"
      34              : #include "miscadmin.h"
      35              : #include "storage/fd.h"
      36              : #include "storage/latch.h"
      37              : #include "utils/timestamp.h"
      38              : #include "utils/wait_event.h"
      39              : 
      40              : 
      41              : static inline void libpqsrv_connect_prepare(void);
      42              : static inline void libpqsrv_connect_complete(PGconn *conn, uint32 wait_event_info);
      43              : static inline PGresult *libpqsrv_get_result_last(PGconn *conn, uint32 wait_event_info);
      44              : static inline PGresult *libpqsrv_get_result(PGconn *conn, uint32 wait_event_info);
      45              : 
      46              : /*
      47              :  * Start a connection using PQconnectStart().
      48              :  *
      49              :  * The returned connection has not yet completed its startup sequence.  Callers
      50              :  * may perform per-connection setup, such as installing a notice receiver,
      51              :  * before calling libpqsrv_connect_complete().
      52              :  *
      53              :  * Callers must call libpqsrv_connect_complete(), even if this function returns
      54              :  * NULL, because libpqsrv_connect_prepare() may already have reserved an
      55              :  * external FD that must be released.
      56              :  */
      57              : static inline PGconn *
      58           23 : libpqsrv_connect_start(const char *conninfo)
      59              : {
      60           23 :     libpqsrv_connect_prepare();
      61              : 
      62           23 :     return PQconnectStart(conninfo);
      63              : }
      64              : 
      65              : /*
      66              :  * PQconnectdb() wrapper that reserves a file descriptor and processes
      67              :  * interrupts during connection establishment.
      68              :  *
      69              :  * Throws an error if AcquireExternalFD() fails, but does not throw if
      70              :  * connection establishment itself fails. Callers need to use PQstatus() to
      71              :  * check if connection establishment succeeded.
      72              :  */
      73              : static inline PGconn *
      74              : libpqsrv_connect(const char *conninfo, uint32 wait_event_info)
      75              : {
      76              :     PGconn     *conn;
      77              : 
      78              :     conn = libpqsrv_connect_start(conninfo);
      79              : 
      80              :     libpqsrv_connect_complete(conn, wait_event_info);
      81              : 
      82              :     return conn;
      83              : }
      84              : 
      85              : /*
      86              :  * Start a connection using PQconnectStartParams().
      87              :  *
      88              :  * See libpqsrv_connect_start() for the resource-lifetime rules.
      89              :  */
      90              : static inline PGconn *
      91         1124 : libpqsrv_connect_params_start(const char *const *keywords,
      92              :                               const char *const *values,
      93              :                               int expand_dbname)
      94              : {
      95         1124 :     libpqsrv_connect_prepare();
      96              : 
      97         1124 :     return PQconnectStartParams(keywords, values, expand_dbname);
      98              : }
      99              : 
     100              : /*
     101              :  * Like libpqsrv_connect(), except that this is a wrapper for
     102              :  * PQconnectdbParams().
     103              :  */
     104              : static inline PGconn *
     105              : libpqsrv_connect_params(const char *const *keywords,
     106              :                         const char *const *values,
     107              :                         int expand_dbname,
     108              :                         uint32 wait_event_info)
     109              : {
     110              :     PGconn     *conn;
     111              : 
     112              :     conn = libpqsrv_connect_params_start(keywords, values, expand_dbname);
     113              : 
     114              :     libpqsrv_connect_complete(conn, wait_event_info);
     115              : 
     116              :     return conn;
     117              : }
     118              : 
     119              : /*
     120              :  * PQfinish() wrapper that additionally releases the reserved file descriptor.
     121              :  *
     122              :  * It is allowed to call this with NULL only when the external FD reservation
     123              :  * has already been released, for example after calling
     124              :  * libpqsrv_connect_complete() with a NULL connection.
     125              :  */
     126              : static inline void
     127         1138 : libpqsrv_disconnect(PGconn *conn)
     128              : {
     129              :     /*
     130              :      * If no connection was established, we haven't reserved an FD for it (or
     131              :      * already released it). This rule makes it easier to write PG_CATCH()
     132              :      * handlers for this facility's users.
     133              :      *
     134              :      * See also libpqsrv_connect_complete().
     135              :      */
     136         1138 :     if (conn == NULL)
     137            2 :         return;
     138              : 
     139         1136 :     ReleaseExternalFD();
     140         1136 :     PQfinish(conn);
     141              : }
     142              : 
     143              : 
     144              : /* lower-level connection helper functions follow */
     145              : 
     146              : 
     147              : /*
     148              :  * Helper function for all connection establishment functions.
     149              :  */
     150              : static inline void
     151         1147 : libpqsrv_connect_prepare(void)
     152              : {
     153              :     /*
     154              :      * We must obey fd.c's limit on non-virtual file descriptors.  Assume that
     155              :      * a PGconn represents one long-lived FD.  (Doing this here also ensures
     156              :      * that VFDs are closed if needed to make room.)
     157              :      */
     158         1147 :     if (!AcquireExternalFD())
     159              :     {
     160              : #ifndef WIN32                   /* can't write #if within ereport() macro */
     161            0 :         ereport(ERROR,
     162              :                 (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
     163              :                  errmsg("could not establish connection"),
     164              :                  errdetail("There are too many open files on the local server."),
     165              :                  errhint("Raise the server's \"max_files_per_process\" and/or \"ulimit -n\" limits.")));
     166              : #else
     167              :         ereport(ERROR,
     168              :                 (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
     169              :                  errmsg("could not establish connection"),
     170              :                  errdetail("There are too many open files on the local server."),
     171              :                  errhint("Raise the server's \"max_files_per_process\" setting.")));
     172              : #endif
     173              :     }
     174         1147 : }
     175              : 
     176              : /*
     177              :  * Complete a connection started by libpqsrv_connect_start() or
     178              :  * libpqsrv_connect_params_start().
     179              :  */
     180              : static inline void
     181         1147 : libpqsrv_connect_complete(PGconn *conn, uint32 wait_event_info)
     182              : {
     183              :     /*
     184              :      * With conn == NULL libpqsrv_disconnect() wouldn't release the FD. So do
     185              :      * that here.
     186              :      */
     187         1147 :     if (conn == NULL)
     188              :     {
     189            0 :         ReleaseExternalFD();
     190            0 :         return;
     191              :     }
     192              : 
     193              :     /*
     194              :      * Can't wait without a socket. Note that we don't want to close the libpq
     195              :      * connection yet, so callers can emit a useful error.
     196              :      */
     197         1147 :     if (PQstatus(conn) == CONNECTION_BAD)
     198           80 :         return;
     199              : 
     200              :     /*
     201              :      * WaitLatchOrSocket() can conceivably fail, handle that case here instead
     202              :      * of requiring all callers to do so.
     203              :      */
     204         1067 :     PG_TRY();
     205              :     {
     206              :         PostgresPollingStatusType status;
     207              : 
     208              :         /*
     209              :          * Poll connection until we have OK or FAILED status.
     210              :          *
     211              :          * Per spec for PQconnectPoll, first wait till socket is write-ready.
     212              :          */
     213         1067 :         status = PGRES_POLLING_WRITING;
     214         4984 :         while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED)
     215              :         {
     216              :             int         io_flag;
     217              :             int         rc;
     218              : 
     219         2851 :             if (status == PGRES_POLLING_READING)
     220         1083 :                 io_flag = WL_SOCKET_READABLE;
     221              : #ifdef WIN32
     222              : 
     223              :             /*
     224              :              * Windows needs a different test while waiting for
     225              :              * connection-made
     226              :              */
     227              :             else if (PQstatus(conn) == CONNECTION_STARTED)
     228              :                 io_flag = WL_SOCKET_CONNECTED;
     229              : #endif
     230              :             else
     231         1768 :                 io_flag = WL_SOCKET_WRITEABLE;
     232              : 
     233         2851 :             rc = WaitLatchOrSocket(MyLatch,
     234              :                                    WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | io_flag,
     235              :                                    PQsocket(conn),
     236              :                                    0,
     237              :                                    wait_event_info);
     238              : 
     239              :             /* Interrupted? */
     240         2851 :             if (rc & WL_LATCH_SET)
     241              :             {
     242          703 :                 ResetLatch(MyLatch);
     243          703 :                 CHECK_FOR_INTERRUPTS();
     244              :             }
     245              : 
     246              :             /* If socket is ready, advance the libpq state machine */
     247         2850 :             if (rc & io_flag)
     248         2148 :                 status = PQconnectPoll(conn);
     249              :         }
     250              :     }
     251            0 :     PG_CATCH();
     252              :     {
     253              :         /*
     254              :          * If an error is thrown here, the callers won't call
     255              :          * libpqsrv_disconnect() with a conn, so release resources
     256              :          * immediately.
     257              :          */
     258            0 :         ReleaseExternalFD();
     259            0 :         PQfinish(conn);
     260              : 
     261            0 :         PG_RE_THROW();
     262              :     }
     263         1066 :     PG_END_TRY();
     264              : }
     265              : 
     266              : /*
     267              :  * PQexec() wrapper that processes interrupts.
     268              :  *
     269              :  * Unless PQsetnonblocking(conn, 1) is in effect, this can't process
     270              :  * interrupts while pushing the query text to the server.  Consider that
     271              :  * setting if query strings can be long relative to TCP buffer size.
     272              :  *
     273              :  * This has the preconditions of PQsendQuery(), not those of PQexec().  Most
     274              :  * notably, PQexec() would silently discard any prior query results.
     275              :  */
     276              : static inline PGresult *
     277         4364 : libpqsrv_exec(PGconn *conn, const char *query, uint32 wait_event_info)
     278              : {
     279         4364 :     if (!PQsendQuery(conn, query))
     280            0 :         return NULL;
     281         4364 :     return libpqsrv_get_result_last(conn, wait_event_info);
     282              : }
     283              : 
     284              : /*
     285              :  * PQexecParams() wrapper that processes interrupts.
     286              :  *
     287              :  * See notes at libpqsrv_exec().
     288              :  */
     289              : static inline PGresult *
     290              : libpqsrv_exec_params(PGconn *conn,
     291              :                      const char *command,
     292              :                      int nParams,
     293              :                      const Oid *paramTypes,
     294              :                      const char *const *paramValues,
     295              :                      const int *paramLengths,
     296              :                      const int *paramFormats,
     297              :                      int resultFormat,
     298              :                      uint32 wait_event_info)
     299              : {
     300              :     if (!PQsendQueryParams(conn, command, nParams, paramTypes, paramValues,
     301              :                            paramLengths, paramFormats, resultFormat))
     302              :         return NULL;
     303              :     return libpqsrv_get_result_last(conn, wait_event_info);
     304              : }
     305              : 
     306              : /*
     307              :  * Like PQexec(), loop over PQgetResult() until it returns NULL or another
     308              :  * terminal state.  Return the last non-NULL result or the terminal state.
     309              :  */
     310              : static inline PGresult *
     311        12807 : libpqsrv_get_result_last(PGconn *conn, uint32 wait_event_info)
     312              : {
     313        12807 :     PGresult   *lastResult = NULL;
     314              : 
     315              :     for (;;)
     316        11979 :     {
     317              :         /* Wait for, and collect, the next PGresult. */
     318              :         PGresult   *result;
     319              : 
     320        24786 :         result = libpqsrv_get_result(conn, wait_event_info);
     321        24784 :         if (result == NULL)
     322        11973 :             break;              /* query is complete, or failure */
     323              : 
     324              :         /*
     325              :          * Emulate PQexec()'s behavior of returning the last result when there
     326              :          * are many.
     327              :          */
     328        12811 :         PQclear(lastResult);
     329        12811 :         lastResult = result;
     330              : 
     331        25622 :         if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
     332        25412 :             PQresultStatus(lastResult) == PGRES_COPY_OUT ||
     333        24585 :             PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
     334        11984 :             PQstatus(conn) == CONNECTION_BAD)
     335              :             break;
     336              :     }
     337        12805 :     return lastResult;
     338              : }
     339              : 
     340              : /*
     341              :  * Perform the equivalent of PQgetResult(), but watch for interrupts.
     342              :  */
     343              : static inline PGresult *
     344        26153 : libpqsrv_get_result(PGconn *conn, uint32 wait_event_info)
     345              : {
     346              :     /*
     347              :      * Collect data until PQgetResult is ready to get the result without
     348              :      * blocking.
     349              :      */
     350        38895 :     while (PQisBusy(conn))
     351              :     {
     352              :         int         rc;
     353              : 
     354        12793 :         rc = WaitLatchOrSocket(MyLatch,
     355              :                                WL_EXIT_ON_PM_DEATH | WL_LATCH_SET |
     356              :                                WL_SOCKET_READABLE,
     357              :                                PQsocket(conn),
     358              :                                0,
     359              :                                wait_event_info);
     360              : 
     361              :         /* Interrupted? */
     362        12793 :         if (rc & WL_LATCH_SET)
     363              :         {
     364           23 :             ResetLatch(MyLatch);
     365           23 :             CHECK_FOR_INTERRUPTS();
     366              :         }
     367              : 
     368              :         /* Consume whatever data is available from the socket */
     369        12791 :         if (PQconsumeInput(conn) == 0)
     370              :         {
     371              :             /* trouble; expect PQgetResult() to return NULL */
     372           49 :             break;
     373              :         }
     374              :     }
     375              : 
     376              :     /* Now we can collect and return the next PGresult */
     377        26151 :     return PQgetResult(conn);
     378              : }
     379              : 
     380              : /*
     381              :  * Submit a cancel request to the given connection, waiting only until
     382              :  * the given time.
     383              :  *
     384              :  * We sleep interruptibly until we receive confirmation that the cancel
     385              :  * request has been accepted, and if it is, return NULL; if the cancel
     386              :  * request fails, return an error message string (which is not to be
     387              :  * freed).
     388              :  *
     389              :  * For other problems (to wit: OOM when strdup'ing an error message from
     390              :  * libpq), this function can ereport(ERROR).
     391              :  *
     392              :  * Note: this function leaks a string's worth of memory when reporting
     393              :  * libpq errors.  Make sure to call it in a transient memory context.
     394              :  */
     395              : static inline const char *
     396            3 : libpqsrv_cancel(PGconn *conn, TimestampTz endtime)
     397              : {
     398              :     PGcancelConn *cancel_conn;
     399            3 :     const char *error = NULL;
     400              : 
     401            3 :     cancel_conn = PQcancelCreate(conn);
     402            3 :     if (cancel_conn == NULL)
     403            0 :         return "out of memory";
     404              : 
     405              :     /* In what follows, do not leak any PGcancelConn on any errors. */
     406              : 
     407            3 :     PG_TRY();
     408              :     {
     409            3 :         if (!PQcancelStart(cancel_conn))
     410              :         {
     411            0 :             error = pchomp(PQcancelErrorMessage(cancel_conn));
     412            0 :             goto exit;
     413              :         }
     414              : 
     415              :         for (;;)
     416            3 :         {
     417              :             PostgresPollingStatusType pollres;
     418              :             TimestampTz now;
     419              :             long        cur_timeout;
     420            6 :             int         waitEvents = WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH;
     421              : 
     422            6 :             pollres = PQcancelPoll(cancel_conn);
     423            6 :             if (pollres == PGRES_POLLING_OK)
     424            3 :                 break;          /* success! */
     425              : 
     426              :             /* If timeout has expired, give up, else get sleep time. */
     427            3 :             now = GetCurrentTimestamp();
     428            3 :             cur_timeout = TimestampDifferenceMilliseconds(now, endtime);
     429            3 :             if (cur_timeout <= 0)
     430              :             {
     431            0 :                 error = "cancel request timed out";
     432            0 :                 break;
     433              :             }
     434              : 
     435            3 :             switch (pollres)
     436              :             {
     437            3 :                 case PGRES_POLLING_READING:
     438            3 :                     waitEvents |= WL_SOCKET_READABLE;
     439            3 :                     break;
     440            0 :                 case PGRES_POLLING_WRITING:
     441            0 :                     waitEvents |= WL_SOCKET_WRITEABLE;
     442            0 :                     break;
     443            0 :                 default:
     444            0 :                     error = pchomp(PQcancelErrorMessage(cancel_conn));
     445            0 :                     goto exit;
     446              :             }
     447              : 
     448              :             /* Sleep until there's something to do */
     449            3 :             WaitLatchOrSocket(MyLatch, waitEvents, PQcancelSocket(cancel_conn),
     450              :                               cur_timeout, PG_WAIT_CLIENT);
     451              : 
     452            3 :             ResetLatch(MyLatch);
     453              : 
     454            3 :             CHECK_FOR_INTERRUPTS();
     455              :         }
     456            3 : exit:   ;
     457              :     }
     458            0 :     PG_FINALLY();
     459              :     {
     460            3 :         PQcancelFinish(cancel_conn);
     461              :     }
     462            3 :     PG_END_TRY();
     463              : 
     464            3 :     return error;
     465              : }
     466              : 
     467              : /*
     468              :  * libpqsrv_notice_receiver
     469              :  *
     470              :  * Custom notice receiver for libpq connections.
     471              :  *
     472              :  * This function is intended to be set via PQsetNoticeReceiver() so that
     473              :  * NOTICE, WARNING, and similar messages from the connection are reported via
     474              :  * ereport(), instead of being printed to stderr.
     475              :  *
     476              :  * Because this will be called from libpq with a "real" (not wrapped)
     477              :  * PGresult, we need to temporarily ignore libpq-be-fe.h's wrapper macros
     478              :  * for PGresult and also PQresultErrorMessage, and put back the wrappers
     479              :  * afterwards.  That's not pretty, but there seems no better alternative.
     480              :  */
     481              : #undef PGresult
     482              : #undef PQresultErrorMessage
     483              : 
     484              : static inline void
     485           18 : libpqsrv_notice_receiver(void *arg, const PGresult *res)
     486              : {
     487              :     const char *message;
     488              :     int         len;
     489           18 :     const char *prefix = (const char *) arg;
     490              : 
     491              :     /*
     492              :      * Trim the trailing newline from the message text returned from
     493              :      * PQresultErrorMessage(), as it always includes one, to produce cleaner
     494              :      * log output.
     495              :      */
     496           18 :     message = PQresultErrorMessage(res);
     497           18 :     len = strlen(message);
     498           18 :     if (len > 0 && message[len - 1] == '\n')
     499           18 :         len--;
     500              : 
     501           18 :     ereport(LOG,
     502              :             errmsg_internal("%s: %.*s", prefix, len, message));
     503           18 : }
     504              : 
     505              : #define PGresult libpqsrv_PGresult
     506              : #define PQresultErrorMessage libpqsrv_PQresultErrorMessage
     507              : 
     508              : #endif                          /* LIBPQ_BE_FE_HELPERS_H */
        

Generated by: LCOV version 2.0-1