LCOV - code coverage report
Current view: top level - src/include/libpq - libpq-be-fe-helpers.h (source / functions) Hit Total Coverage
Test: PostgreSQL 19devel Lines: 95 115 82.6 %
Date: 2025-08-09 08:18:06 Functions: 10 10 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * libpq-be-fe-helpers.h
       4             :  *    Helper functions for using libpq in extensions
       5             :  *
       6             :  * Code built directly into the backend is not allowed to link to libpq
       7             :  * directly. Extension code is allowed to use libpq however. However, libpq
       8             :  * used in extensions has to be careful not to block inside libpq, otherwise
       9             :  * interrupts will not be processed, leading to issues like unresolvable
      10             :  * deadlocks. Backend code also needs to take care to acquire/release an
      11             :  * external fd for the connection, otherwise fd.c's accounting of fd's is
      12             :  * broken.
      13             :  *
      14             :  * This file provides helper functions to make it easier to comply with these
      15             :  * rules. It is a header only library as it needs to be linked into each
      16             :  * extension using libpq, and it seems too small to be worth adding a
      17             :  * dedicated static library for.
      18             :  *
      19             :  * TODO: For historical reasons the connections established here are not put
      20             :  * into non-blocking mode. That can lead to blocking even when only the async
      21             :  * libpq functions are used. This should be fixed.
      22             :  *
      23             :  * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
      24             :  * Portions Copyright (c) 1994, Regents of the University of California
      25             :  *
      26             :  * src/include/libpq/libpq-be-fe-helpers.h
      27             :  *
      28             :  *-------------------------------------------------------------------------
      29             :  */
      30             : #ifndef LIBPQ_BE_FE_HELPERS_H
      31             : #define LIBPQ_BE_FE_HELPERS_H
      32             : 
      33             : #include "libpq/libpq-be-fe.h"
      34             : #include "miscadmin.h"
      35             : #include "storage/fd.h"
      36             : #include "storage/latch.h"
      37             : #include "utils/timestamp.h"
      38             : #include "utils/wait_event.h"
      39             : 
      40             : 
      41             : static inline void libpqsrv_connect_prepare(void);
      42             : static inline void libpqsrv_connect_internal(PGconn *conn, uint32 wait_event_info);
      43             : static inline PGresult *libpqsrv_get_result_last(PGconn *conn, uint32 wait_event_info);
      44             : static inline PGresult *libpqsrv_get_result(PGconn *conn, uint32 wait_event_info);
      45             : 
      46             : 
      47             : /*
      48             :  * PQconnectdb() wrapper that reserves a file descriptor and processes
      49             :  * interrupts during connection establishment.
      50             :  *
      51             :  * Throws an error if AcquireExternalFD() fails, but does not throw if
      52             :  * connection establishment itself fails. Callers need to use PQstatus() to
      53             :  * check if connection establishment succeeded.
      54             :  */
      55             : static inline PGconn *
      56          44 : libpqsrv_connect(const char *conninfo, uint32 wait_event_info)
      57             : {
      58          44 :     PGconn     *conn = NULL;
      59             : 
      60          44 :     libpqsrv_connect_prepare();
      61             : 
      62          44 :     conn = PQconnectStart(conninfo);
      63             : 
      64          44 :     libpqsrv_connect_internal(conn, wait_event_info);
      65             : 
      66          44 :     return conn;
      67             : }
      68             : 
      69             : /*
      70             :  * Like libpqsrv_connect(), except that this is a wrapper for
      71             :  * PQconnectdbParams().
      72             :   */
      73             : static inline PGconn *
      74        1920 : libpqsrv_connect_params(const char *const *keywords,
      75             :                         const char *const *values,
      76             :                         int expand_dbname,
      77             :                         uint32 wait_event_info)
      78             : {
      79        1920 :     PGconn     *conn = NULL;
      80             : 
      81        1920 :     libpqsrv_connect_prepare();
      82             : 
      83        1920 :     conn = PQconnectStartParams(keywords, values, expand_dbname);
      84             : 
      85        1920 :     libpqsrv_connect_internal(conn, wait_event_info);
      86             : 
      87        1918 :     return conn;
      88             : }
      89             : 
      90             : /*
      91             :  * PQfinish() wrapper that additionally releases the reserved file descriptor.
      92             :  *
      93             :  * It is allowed to call this with a NULL pgconn iff NULL was returned by
      94             :  * libpqsrv_connect*.
      95             :  */
      96             : static inline void
      97        1946 : libpqsrv_disconnect(PGconn *conn)
      98             : {
      99             :     /*
     100             :      * If no connection was established, we haven't reserved an FD for it (or
     101             :      * already released it). This rule makes it easier to write PG_CATCH()
     102             :      * handlers for this facility's users.
     103             :      *
     104             :      * See also libpqsrv_connect_internal().
     105             :      */
     106        1946 :     if (conn == NULL)
     107           4 :         return;
     108             : 
     109        1942 :     ReleaseExternalFD();
     110        1942 :     PQfinish(conn);
     111             : }
     112             : 
     113             : 
     114             : /* internal helper functions follow */
     115             : 
     116             : 
     117             : /*
     118             :  * Helper function for all connection establishment functions.
     119             :  */
     120             : static inline void
     121        1964 : libpqsrv_connect_prepare(void)
     122             : {
     123             :     /*
     124             :      * We must obey fd.c's limit on non-virtual file descriptors.  Assume that
     125             :      * a PGconn represents one long-lived FD.  (Doing this here also ensures
     126             :      * that VFDs are closed if needed to make room.)
     127             :      */
     128        1964 :     if (!AcquireExternalFD())
     129             :     {
     130             : #ifndef WIN32                   /* can't write #if within ereport() macro */
     131           0 :         ereport(ERROR,
     132             :                 (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
     133             :                  errmsg("could not establish connection"),
     134             :                  errdetail("There are too many open files on the local server."),
     135             :                  errhint("Raise the server's \"max_files_per_process\" and/or \"ulimit -n\" limits.")));
     136             : #else
     137             :         ereport(ERROR,
     138             :                 (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
     139             :                  errmsg("could not establish connection"),
     140             :                  errdetail("There are too many open files on the local server."),
     141             :                  errhint("Raise the server's \"max_files_per_process\" setting.")));
     142             : #endif
     143             :     }
     144        1964 : }
     145             : 
     146             : /*
     147             :  * Helper function for all connection establishment functions.
     148             :  */
     149             : static inline void
     150        1964 : libpqsrv_connect_internal(PGconn *conn, uint32 wait_event_info)
     151             : {
     152             :     /*
     153             :      * With conn == NULL libpqsrv_disconnect() wouldn't release the FD. So do
     154             :      * that here.
     155             :      */
     156        1964 :     if (conn == NULL)
     157             :     {
     158           0 :         ReleaseExternalFD();
     159           0 :         return;
     160             :     }
     161             : 
     162             :     /*
     163             :      * Can't wait without a socket. Note that we don't want to close the libpq
     164             :      * connection yet, so callers can emit a useful error.
     165             :      */
     166        1964 :     if (PQstatus(conn) == CONNECTION_BAD)
     167          84 :         return;
     168             : 
     169             :     /*
     170             :      * WaitLatchOrSocket() can conceivably fail, handle that case here instead
     171             :      * of requiring all callers to do so.
     172             :      */
     173        1880 :     PG_TRY();
     174             :     {
     175             :         PostgresPollingStatusType status;
     176             : 
     177             :         /*
     178             :          * Poll connection until we have OK or FAILED status.
     179             :          *
     180             :          * Per spec for PQconnectPoll, first wait till socket is write-ready.
     181             :          */
     182        1880 :         status = PGRES_POLLING_WRITING;
     183        8916 :         while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED)
     184             :         {
     185             :             int         io_flag;
     186             :             int         rc;
     187             : 
     188        5158 :             if (status == PGRES_POLLING_READING)
     189        1910 :                 io_flag = WL_SOCKET_READABLE;
     190             : #ifdef WIN32
     191             : 
     192             :             /*
     193             :              * Windows needs a different test while waiting for
     194             :              * connection-made
     195             :              */
     196             :             else if (PQstatus(conn) == CONNECTION_STARTED)
     197             :                 io_flag = WL_SOCKET_CONNECTED;
     198             : #endif
     199             :             else
     200        3248 :                 io_flag = WL_SOCKET_WRITEABLE;
     201             : 
     202        5158 :             rc = WaitLatchOrSocket(MyLatch,
     203             :                                    WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | io_flag,
     204             :                                    PQsocket(conn),
     205             :                                    0,
     206             :                                    wait_event_info);
     207             : 
     208             :             /* Interrupted? */
     209        5158 :             if (rc & WL_LATCH_SET)
     210             :             {
     211        1372 :                 ResetLatch(MyLatch);
     212        1372 :                 CHECK_FOR_INTERRUPTS();
     213             :             }
     214             : 
     215             :             /* If socket is ready, advance the libpq state machine */
     216        5156 :             if (rc & io_flag)
     217        3786 :                 status = PQconnectPoll(conn);
     218             :         }
     219             :     }
     220           0 :     PG_CATCH();
     221             :     {
     222             :         /*
     223             :          * If an error is thrown here, the callers won't call
     224             :          * libpqsrv_disconnect() with a conn, so release resources
     225             :          * immediately.
     226             :          */
     227           0 :         ReleaseExternalFD();
     228           0 :         PQfinish(conn);
     229             : 
     230           0 :         PG_RE_THROW();
     231             :     }
     232        1878 :     PG_END_TRY();
     233             : }
     234             : 
     235             : /*
     236             :  * PQexec() wrapper that processes interrupts.
     237             :  *
     238             :  * Unless PQsetnonblocking(conn, 1) is in effect, this can't process
     239             :  * interrupts while pushing the query text to the server.  Consider that
     240             :  * setting if query strings can be long relative to TCP buffer size.
     241             :  *
     242             :  * This has the preconditions of PQsendQuery(), not those of PQexec().  Most
     243             :  * notably, PQexec() would silently discard any prior query results.
     244             :  */
     245             : static inline PGresult *
     246        7656 : libpqsrv_exec(PGconn *conn, const char *query, uint32 wait_event_info)
     247             : {
     248        7656 :     if (!PQsendQuery(conn, query))
     249           0 :         return NULL;
     250        7656 :     return libpqsrv_get_result_last(conn, wait_event_info);
     251             : }
     252             : 
     253             : /*
     254             :  * PQexecParams() wrapper that processes interrupts.
     255             :  *
     256             :  * See notes at libpqsrv_exec().
     257             :  */
     258             : static inline PGresult *
     259             : libpqsrv_exec_params(PGconn *conn,
     260             :                      const char *command,
     261             :                      int nParams,
     262             :                      const Oid *paramTypes,
     263             :                      const char *const *paramValues,
     264             :                      const int *paramLengths,
     265             :                      const int *paramFormats,
     266             :                      int resultFormat,
     267             :                      uint32 wait_event_info)
     268             : {
     269             :     if (!PQsendQueryParams(conn, command, nParams, paramTypes, paramValues,
     270             :                            paramLengths, paramFormats, resultFormat))
     271             :         return NULL;
     272             :     return libpqsrv_get_result_last(conn, wait_event_info);
     273             : }
     274             : 
     275             : /*
     276             :  * Like PQexec(), loop over PQgetResult() until it returns NULL or another
     277             :  * terminal state.  Return the last non-NULL result or the terminal state.
     278             :  */
     279             : static inline PGresult *
     280       23852 : libpqsrv_get_result_last(PGconn *conn, uint32 wait_event_info)
     281             : {
     282       23852 :     PGresult   *lastResult = NULL;
     283             : 
     284             :     for (;;)
     285       22418 :     {
     286             :         /* Wait for, and collect, the next PGresult. */
     287             :         PGresult   *result;
     288             : 
     289       46270 :         result = libpqsrv_get_result(conn, wait_event_info);
     290       46268 :         if (result == NULL)
     291       22414 :             break;              /* query is complete, or failure */
     292             : 
     293             :         /*
     294             :          * Emulate PQexec()'s behavior of returning the last result when there
     295             :          * are many.
     296             :          */
     297       23854 :         PQclear(lastResult);
     298       23854 :         lastResult = result;
     299             : 
     300       47708 :         if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
     301       47326 :             PQresultStatus(lastResult) == PGRES_COPY_OUT ||
     302       45894 :             PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
     303       22422 :             PQstatus(conn) == CONNECTION_BAD)
     304             :             break;
     305             :     }
     306       23850 :     return lastResult;
     307             : }
     308             : 
     309             : /*
     310             :  * Perform the equivalent of PQgetResult(), but watch for interrupts.
     311             :  */
     312             : static inline PGresult *
     313       48764 : libpqsrv_get_result(PGconn *conn, uint32 wait_event_info)
     314             : {
     315             :     /*
     316             :      * Collect data until PQgetResult is ready to get the result without
     317             :      * blocking.
     318             :      */
     319       72524 :     while (PQisBusy(conn))
     320             :     {
     321             :         int         rc;
     322             : 
     323       23836 :         rc = WaitLatchOrSocket(MyLatch,
     324             :                                WL_EXIT_ON_PM_DEATH | WL_LATCH_SET |
     325             :                                WL_SOCKET_READABLE,
     326             :                                PQsocket(conn),
     327             :                                0,
     328             :                                wait_event_info);
     329             : 
     330             :         /* Interrupted? */
     331       23836 :         if (rc & WL_LATCH_SET)
     332             :         {
     333          20 :             ResetLatch(MyLatch);
     334          20 :             CHECK_FOR_INTERRUPTS();
     335             :         }
     336             : 
     337             :         /* Consume whatever data is available from the socket */
     338       23834 :         if (PQconsumeInput(conn) == 0)
     339             :         {
     340             :             /* trouble; expect PQgetResult() to return NULL */
     341          74 :             break;
     342             :         }
     343             :     }
     344             : 
     345             :     /* Now we can collect and return the next PGresult */
     346       48762 :     return PQgetResult(conn);
     347             : }
     348             : 
     349             : /*
     350             :  * Submit a cancel request to the given connection, waiting only until
     351             :  * the given time.
     352             :  *
     353             :  * We sleep interruptibly until we receive confirmation that the cancel
     354             :  * request has been accepted, and if it is, return NULL; if the cancel
     355             :  * request fails, return an error message string (which is not to be
     356             :  * freed).
     357             :  *
     358             :  * For other problems (to wit: OOM when strdup'ing an error message from
     359             :  * libpq), this function can ereport(ERROR).
     360             :  *
     361             :  * Note: this function leaks a string's worth of memory when reporting
     362             :  * libpq errors.  Make sure to call it in a transient memory context.
     363             :  */
     364             : static inline const char *
     365           6 : libpqsrv_cancel(PGconn *conn, TimestampTz endtime)
     366             : {
     367             :     PGcancelConn *cancel_conn;
     368           6 :     const char *error = NULL;
     369             : 
     370           6 :     cancel_conn = PQcancelCreate(conn);
     371           6 :     if (cancel_conn == NULL)
     372           0 :         return "out of memory";
     373             : 
     374             :     /* In what follows, do not leak any PGcancelConn on any errors. */
     375             : 
     376           6 :     PG_TRY();
     377             :     {
     378           6 :         if (!PQcancelStart(cancel_conn))
     379             :         {
     380           0 :             error = pchomp(PQcancelErrorMessage(cancel_conn));
     381           0 :             goto exit;
     382             :         }
     383             : 
     384             :         for (;;)
     385           6 :         {
     386             :             PostgresPollingStatusType pollres;
     387             :             TimestampTz now;
     388             :             long        cur_timeout;
     389          12 :             int         waitEvents = WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH;
     390             : 
     391          12 :             pollres = PQcancelPoll(cancel_conn);
     392          12 :             if (pollres == PGRES_POLLING_OK)
     393           6 :                 break;          /* success! */
     394             : 
     395             :             /* If timeout has expired, give up, else get sleep time. */
     396           6 :             now = GetCurrentTimestamp();
     397           6 :             cur_timeout = TimestampDifferenceMilliseconds(now, endtime);
     398           6 :             if (cur_timeout <= 0)
     399             :             {
     400           0 :                 error = "cancel request timed out";
     401           0 :                 break;
     402             :             }
     403             : 
     404           6 :             switch (pollres)
     405             :             {
     406           6 :                 case PGRES_POLLING_READING:
     407           6 :                     waitEvents |= WL_SOCKET_READABLE;
     408           6 :                     break;
     409           0 :                 case PGRES_POLLING_WRITING:
     410           0 :                     waitEvents |= WL_SOCKET_WRITEABLE;
     411           0 :                     break;
     412           0 :                 default:
     413           0 :                     error = pchomp(PQcancelErrorMessage(cancel_conn));
     414           0 :                     goto exit;
     415             :             }
     416             : 
     417             :             /* Sleep until there's something to do */
     418           6 :             WaitLatchOrSocket(MyLatch, waitEvents, PQcancelSocket(cancel_conn),
     419             :                               cur_timeout, PG_WAIT_CLIENT);
     420             : 
     421           6 :             ResetLatch(MyLatch);
     422             : 
     423           6 :             CHECK_FOR_INTERRUPTS();
     424             :         }
     425           6 : exit:   ;
     426             :     }
     427           0 :     PG_FINALLY();
     428             :     {
     429           6 :         PQcancelFinish(cancel_conn);
     430             :     }
     431           6 :     PG_END_TRY();
     432             : 
     433           6 :     return error;
     434             : }
     435             : 
     436             : /*
     437             :  * libpqsrv_notice_receiver
     438             :  *
     439             :  * Custom notice receiver for libpq connections.
     440             :  *
     441             :  * This function is intended to be set via PQsetNoticeReceiver() so that
     442             :  * NOTICE, WARNING, and similar messages from the connection are reported via
     443             :  * ereport(), instead of being printed to stderr.
     444             :  *
     445             :  * Because this will be called from libpq with a "real" (not wrapped)
     446             :  * PGresult, we need to temporarily ignore libpq-be-fe.h's wrapper macros
     447             :  * for PGresult and also PQresultErrorMessage, and put back the wrappers
     448             :  * afterwards.  That's not pretty, but there seems no better alternative.
     449             :  */
     450             : #undef PGresult
     451             : #undef PQresultErrorMessage
     452             : 
     453             : static inline void
     454          16 : libpqsrv_notice_receiver(void *arg, const PGresult *res)
     455             : {
     456             :     const char *message;
     457             :     int         len;
     458          16 :     const char *prefix = (const char *) arg;
     459             : 
     460             :     /*
     461             :      * Trim the trailing newline from the message text returned from
     462             :      * PQresultErrorMessage(), as it always includes one, to produce cleaner
     463             :      * log output.
     464             :      */
     465          16 :     message = PQresultErrorMessage(res);
     466          16 :     len = strlen(message);
     467          16 :     if (len > 0 && message[len - 1] == '\n')
     468          16 :         len--;
     469             : 
     470          16 :     ereport(LOG,
     471             :             errmsg_internal("%s: %.*s", prefix, len, message));
     472          16 : }
     473             : 
     474             : #define PGresult libpqsrv_PGresult
     475             : #define PQresultErrorMessage libpqsrv_PQresultErrorMessage
     476             : 
     477             : #endif                          /* LIBPQ_BE_FE_HELPERS_H */

Generated by: LCOV version 1.16