LCOV - code coverage report
Current view: top level - src/include/libpq - libpq-be-fe-helpers.h (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 92 112 82.1 %
Date: 2024-11-21 08:14:44 Functions: 9 9 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * libpq-be-fe-helpers.h
       4             :  *    Helper functions for using libpq in extensions
       5             :  *
       6             :  * Code built directly into the backend is not allowed to link to libpq
       7             :  * directly. Extension code is allowed to use libpq however. However, libpq
       8             :  * used in extensions has to be careful not to block inside libpq, otherwise
       9             :  * interrupts will not be processed, leading to issues like unresolvable
      10             :  * deadlocks. Backend code also needs to take care to acquire/release an
      11             :  * external fd for the connection, otherwise fd.c's accounting of fd's is
      12             :  * broken.
      13             :  *
      14             :  * This file provides helper functions to make it easier to comply with these
      15             :  * rules. It is a header only library as it needs to be linked into each
      16             :  * extension using libpq, and it seems too small to be worth adding a
      17             :  * dedicated static library for.
      18             :  *
      19             :  * TODO: For historical reasons the connections established here are not put
      20             :  * into non-blocking mode. That can lead to blocking even when only the async
      21             :  * libpq functions are used. This should be fixed.
      22             :  *
      23             :  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
      24             :  * Portions Copyright (c) 1994, Regents of the University of California
      25             :  *
      26             :  * src/include/libpq/libpq-be-fe-helpers.h
      27             :  *
      28             :  *-------------------------------------------------------------------------
      29             :  */
      30             : #ifndef LIBPQ_BE_FE_HELPERS_H
      31             : #define LIBPQ_BE_FE_HELPERS_H
      32             : 
      33             : /*
      34             :  * Despite the name, BUILDING_DLL is set only when building code directly part
      35             :  * of the backend. Which also is where libpq isn't allowed to be
      36             :  * used. Obviously this doesn't protect against libpq-fe.h getting included
      37             :  * otherwise, but perhaps still protects against a few mistakes...
      38             :  */
      39             : #ifdef BUILDING_DLL
      40             : #error "libpq may not be used code directly built into the backend"
      41             : #endif
      42             : 
      43             : #include "libpq-fe.h"
      44             : #include "miscadmin.h"
      45             : #include "storage/fd.h"
      46             : #include "storage/latch.h"
      47             : #include "utils/timestamp.h"
      48             : #include "utils/wait_event.h"
      49             : 
      50             : 
      51             : static inline void libpqsrv_connect_prepare(void);
      52             : static inline void libpqsrv_connect_internal(PGconn *conn, uint32 wait_event_info);
      53             : static inline PGresult *libpqsrv_get_result_last(PGconn *conn, uint32 wait_event_info);
      54             : static inline PGresult *libpqsrv_get_result(PGconn *conn, uint32 wait_event_info);
      55             : 
      56             : 
      57             : /*
      58             :  * PQconnectdb() wrapper that reserves a file descriptor and processes
      59             :  * interrupts during connection establishment.
      60             :  *
      61             :  * Throws an error if AcquireExternalFD() fails, but does not throw if
      62             :  * connection establishment itself fails. Callers need to use PQstatus() to
      63             :  * check if connection establishment succeeded.
      64             :  */
      65             : static inline PGconn *
      66          42 : libpqsrv_connect(const char *conninfo, uint32 wait_event_info)
      67             : {
      68          42 :     PGconn     *conn = NULL;
      69             : 
      70          42 :     libpqsrv_connect_prepare();
      71             : 
      72          42 :     conn = PQconnectStart(conninfo);
      73             : 
      74          42 :     libpqsrv_connect_internal(conn, wait_event_info);
      75             : 
      76          42 :     return conn;
      77             : }
      78             : 
      79             : /*
      80             :  * Like libpqsrv_connect(), except that this is a wrapper for
      81             :  * PQconnectdbParams().
      82             :   */
      83             : static inline PGconn *
      84         130 : libpqsrv_connect_params(const char *const *keywords,
      85             :                         const char *const *values,
      86             :                         int expand_dbname,
      87             :                         uint32 wait_event_info)
      88             : {
      89         130 :     PGconn     *conn = NULL;
      90             : 
      91         130 :     libpqsrv_connect_prepare();
      92             : 
      93         130 :     conn = PQconnectStartParams(keywords, values, expand_dbname);
      94             : 
      95         130 :     libpqsrv_connect_internal(conn, wait_event_info);
      96             : 
      97         130 :     return conn;
      98             : }
      99             : 
     100             : /*
     101             :  * PQfinish() wrapper that additionally releases the reserved file descriptor.
     102             :  *
     103             :  * It is allowed to call this with a NULL pgconn iff NULL was returned by
     104             :  * libpqsrv_connect*.
     105             :  */
     106             : static inline void
     107         164 : libpqsrv_disconnect(PGconn *conn)
     108             : {
     109             :     /*
     110             :      * If no connection was established, we haven't reserved an FD for it (or
     111             :      * already released it). This rule makes it easier to write PG_CATCH()
     112             :      * handlers for this facility's users.
     113             :      *
     114             :      * See also libpqsrv_connect_internal().
     115             :      */
     116         164 :     if (conn == NULL)
     117           4 :         return;
     118             : 
     119         160 :     ReleaseExternalFD();
     120         160 :     PQfinish(conn);
     121             : }
     122             : 
     123             : 
     124             : /* internal helper functions follow */
     125             : 
     126             : 
     127             : /*
     128             :  * Helper function for all connection establishment functions.
     129             :  */
     130             : static inline void
     131         172 : libpqsrv_connect_prepare(void)
     132             : {
     133             :     /*
     134             :      * We must obey fd.c's limit on non-virtual file descriptors.  Assume that
     135             :      * a PGconn represents one long-lived FD.  (Doing this here also ensures
     136             :      * that VFDs are closed if needed to make room.)
     137             :      */
     138         172 :     if (!AcquireExternalFD())
     139             :     {
     140             : #ifndef WIN32                   /* can't write #if within ereport() macro */
     141           0 :         ereport(ERROR,
     142             :                 (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
     143             :                  errmsg("could not establish connection"),
     144             :                  errdetail("There are too many open files on the local server."),
     145             :                  errhint("Raise the server's \"max_files_per_process\" and/or \"ulimit -n\" limits.")));
     146             : #else
     147             :         ereport(ERROR,
     148             :                 (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
     149             :                  errmsg("could not establish connection"),
     150             :                  errdetail("There are too many open files on the local server."),
     151             :                  errhint("Raise the server's \"max_files_per_process\" setting.")));
     152             : #endif
     153             :     }
     154         172 : }
     155             : 
     156             : /*
     157             :  * Helper function for all connection establishment functions.
     158             :  */
     159             : static inline void
     160         172 : libpqsrv_connect_internal(PGconn *conn, uint32 wait_event_info)
     161             : {
     162             :     /*
     163             :      * With conn == NULL libpqsrv_disconnect() wouldn't release the FD. So do
     164             :      * that here.
     165             :      */
     166         172 :     if (conn == NULL)
     167             :     {
     168           0 :         ReleaseExternalFD();
     169           0 :         return;
     170             :     }
     171             : 
     172             :     /*
     173             :      * Can't wait without a socket. Note that we don't want to close the libpq
     174             :      * connection yet, so callers can emit a useful error.
     175             :      */
     176         172 :     if (PQstatus(conn) == CONNECTION_BAD)
     177           4 :         return;
     178             : 
     179             :     /*
     180             :      * WaitLatchOrSocket() can conceivably fail, handle that case here instead
     181             :      * of requiring all callers to do so.
     182             :      */
     183         168 :     PG_TRY();
     184             :     {
     185             :         PostgresPollingStatusType status;
     186             : 
     187             :         /*
     188             :          * Poll connection until we have OK or FAILED status.
     189             :          *
     190             :          * Per spec for PQconnectPoll, first wait till socket is write-ready.
     191             :          */
     192         168 :         status = PGRES_POLLING_WRITING;
     193         506 :         while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED)
     194             :         {
     195             :             int         io_flag;
     196             :             int         rc;
     197             : 
     198         338 :             if (status == PGRES_POLLING_READING)
     199         168 :                 io_flag = WL_SOCKET_READABLE;
     200             : #ifdef WIN32
     201             : 
     202             :             /*
     203             :              * Windows needs a different test while waiting for
     204             :              * connection-made
     205             :              */
     206             :             else if (PQstatus(conn) == CONNECTION_STARTED)
     207             :                 io_flag = WL_SOCKET_CONNECTED;
     208             : #endif
     209             :             else
     210         170 :                 io_flag = WL_SOCKET_WRITEABLE;
     211             : 
     212         338 :             rc = WaitLatchOrSocket(MyLatch,
     213             :                                    WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | io_flag,
     214             :                                    PQsocket(conn),
     215             :                                    0,
     216             :                                    wait_event_info);
     217             : 
     218             :             /* Interrupted? */
     219         338 :             if (rc & WL_LATCH_SET)
     220             :             {
     221           2 :                 ResetLatch(MyLatch);
     222           2 :                 CHECK_FOR_INTERRUPTS();
     223             :             }
     224             : 
     225             :             /* If socket is ready, advance the libpq state machine */
     226         338 :             if (rc & io_flag)
     227         336 :                 status = PQconnectPoll(conn);
     228             :         }
     229             :     }
     230           0 :     PG_CATCH();
     231             :     {
     232             :         /*
     233             :          * If an error is thrown here, the callers won't call
     234             :          * libpqsrv_disconnect() with a conn, so release resources
     235             :          * immediately.
     236             :          */
     237           0 :         ReleaseExternalFD();
     238           0 :         PQfinish(conn);
     239             : 
     240           0 :         PG_RE_THROW();
     241             :     }
     242         168 :     PG_END_TRY();
     243             : }
     244             : 
     245             : /*
     246             :  * PQexec() wrapper that processes interrupts.
     247             :  *
     248             :  * Unless PQsetnonblocking(conn, 1) is in effect, this can't process
     249             :  * interrupts while pushing the query text to the server.  Consider that
     250             :  * setting if query strings can be long relative to TCP buffer size.
     251             :  *
     252             :  * This has the preconditions of PQsendQuery(), not those of PQexec().  Most
     253             :  * notably, PQexec() would silently discard any prior query results.
     254             :  */
     255             : static inline PGresult *
     256         124 : libpqsrv_exec(PGconn *conn, const char *query, uint32 wait_event_info)
     257             : {
     258         124 :     if (!PQsendQuery(conn, query))
     259           0 :         return NULL;
     260         124 :     return libpqsrv_get_result_last(conn, wait_event_info);
     261             : }
     262             : 
     263             : /*
     264             :  * PQexecParams() wrapper that processes interrupts.
     265             :  *
     266             :  * See notes at libpqsrv_exec().
     267             :  */
     268             : static inline PGresult *
     269             : libpqsrv_exec_params(PGconn *conn,
     270             :                      const char *command,
     271             :                      int nParams,
     272             :                      const Oid *paramTypes,
     273             :                      const char *const *paramValues,
     274             :                      const int *paramLengths,
     275             :                      const int *paramFormats,
     276             :                      int resultFormat,
     277             :                      uint32 wait_event_info)
     278             : {
     279             :     if (!PQsendQueryParams(conn, command, nParams, paramTypes, paramValues,
     280             :                            paramLengths, paramFormats, resultFormat))
     281             :         return NULL;
     282             :     return libpqsrv_get_result_last(conn, wait_event_info);
     283             : }
     284             : 
     285             : /*
     286             :  * Like PQexec(), loop over PQgetResult() until it returns NULL or another
     287             :  * terminal state.  Return the last non-NULL result or the terminal state.
     288             :  */
     289             : static inline PGresult *
     290       15916 : libpqsrv_get_result_last(PGconn *conn, uint32 wait_event_info)
     291             : {
     292       15916 :     PGresult   *volatile lastResult = NULL;
     293             : 
     294             :     /* In what follows, do not leak any PGresults on an error. */
     295       15916 :     PG_TRY();
     296             :     {
     297             :         for (;;)
     298       15914 :         {
     299             :             /* Wait for, and collect, the next PGresult. */
     300             :             PGresult   *result;
     301             : 
     302       31830 :             result = libpqsrv_get_result(conn, wait_event_info);
     303       31828 :             if (result == NULL)
     304       15910 :                 break;          /* query is complete, or failure */
     305             : 
     306             :             /*
     307             :              * Emulate PQexec()'s behavior of returning the last result when
     308             :              * there are many.
     309             :              */
     310       15918 :             PQclear(lastResult);
     311       15918 :             lastResult = result;
     312             : 
     313       15918 :             if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
     314       15918 :                 PQresultStatus(lastResult) == PGRES_COPY_OUT ||
     315       15918 :                 PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
     316       15918 :                 PQstatus(conn) == CONNECTION_BAD)
     317             :                 break;
     318             :         }
     319             :     }
     320           2 :     PG_CATCH();
     321             :     {
     322           2 :         PQclear(lastResult);
     323           2 :         PG_RE_THROW();
     324             :     }
     325       15914 :     PG_END_TRY();
     326             : 
     327       15914 :     return lastResult;
     328             : }
     329             : 
     330             : /*
     331             :  * Perform the equivalent of PQgetResult(), but watch for interrupts.
     332             :  */
     333             : static inline PGresult *
     334       32194 : libpqsrv_get_result(PGconn *conn, uint32 wait_event_info)
     335             : {
     336             :     /*
     337             :      * Collect data until PQgetResult is ready to get the result without
     338             :      * blocking.
     339             :      */
     340       47960 :     while (PQisBusy(conn))
     341             :     {
     342             :         int         rc;
     343             : 
     344       15772 :         rc = WaitLatchOrSocket(MyLatch,
     345             :                                WL_EXIT_ON_PM_DEATH | WL_LATCH_SET |
     346             :                                WL_SOCKET_READABLE,
     347             :                                PQsocket(conn),
     348             :                                0,
     349             :                                wait_event_info);
     350             : 
     351             :         /* Interrupted? */
     352       15772 :         if (rc & WL_LATCH_SET)
     353             :         {
     354           4 :             ResetLatch(MyLatch);
     355           4 :             CHECK_FOR_INTERRUPTS();
     356             :         }
     357             : 
     358             :         /* Consume whatever data is available from the socket */
     359       15770 :         if (PQconsumeInput(conn) == 0)
     360             :         {
     361             :             /* trouble; expect PQgetResult() to return NULL */
     362           4 :             break;
     363             :         }
     364             :     }
     365             : 
     366             :     /* Now we can collect and return the next PGresult */
     367       32192 :     return PQgetResult(conn);
     368             : }
     369             : 
     370             : /*
     371             :  * Submit a cancel request to the given connection, waiting only until
     372             :  * the given time.
     373             :  *
     374             :  * We sleep interruptibly until we receive confirmation that the cancel
     375             :  * request has been accepted, and if it is, return NULL; if the cancel
     376             :  * request fails, return an error message string (which is not to be
     377             :  * freed).
     378             :  *
     379             :  * For other problems (to wit: OOM when strdup'ing an error message from
     380             :  * libpq), this function can ereport(ERROR).
     381             :  *
     382             :  * Note: this function leaks a string's worth of memory when reporting
     383             :  * libpq errors.  Make sure to call it in a transient memory context.
     384             :  */
     385             : static inline const char *
     386           4 : libpqsrv_cancel(PGconn *conn, TimestampTz endtime)
     387             : {
     388             :     PGcancelConn *cancel_conn;
     389           4 :     const char *error = NULL;
     390             : 
     391           4 :     cancel_conn = PQcancelCreate(conn);
     392           4 :     if (cancel_conn == NULL)
     393           0 :         return "out of memory";
     394             : 
     395             :     /* In what follows, do not leak any PGcancelConn on any errors. */
     396             : 
     397           4 :     PG_TRY();
     398             :     {
     399           4 :         if (!PQcancelStart(cancel_conn))
     400             :         {
     401           0 :             error = pchomp(PQcancelErrorMessage(cancel_conn));
     402           0 :             goto exit;
     403             :         }
     404             : 
     405             :         for (;;)
     406           4 :         {
     407             :             PostgresPollingStatusType pollres;
     408             :             TimestampTz now;
     409             :             long        cur_timeout;
     410           8 :             int         waitEvents = WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH;
     411             : 
     412           8 :             pollres = PQcancelPoll(cancel_conn);
     413           8 :             if (pollres == PGRES_POLLING_OK)
     414           4 :                 break;          /* success! */
     415             : 
     416             :             /* If timeout has expired, give up, else get sleep time. */
     417           4 :             now = GetCurrentTimestamp();
     418           4 :             cur_timeout = TimestampDifferenceMilliseconds(now, endtime);
     419           4 :             if (cur_timeout <= 0)
     420             :             {
     421           0 :                 error = "cancel request timed out";
     422           0 :                 break;
     423             :             }
     424             : 
     425           4 :             switch (pollres)
     426             :             {
     427           4 :                 case PGRES_POLLING_READING:
     428           4 :                     waitEvents |= WL_SOCKET_READABLE;
     429           4 :                     break;
     430           0 :                 case PGRES_POLLING_WRITING:
     431           0 :                     waitEvents |= WL_SOCKET_WRITEABLE;
     432           0 :                     break;
     433           0 :                 default:
     434           0 :                     error = pchomp(PQcancelErrorMessage(cancel_conn));
     435           0 :                     goto exit;
     436             :             }
     437             : 
     438             :             /* Sleep until there's something to do */
     439           4 :             WaitLatchOrSocket(MyLatch, waitEvents, PQcancelSocket(cancel_conn),
     440             :                               cur_timeout, PG_WAIT_CLIENT);
     441             : 
     442           4 :             ResetLatch(MyLatch);
     443             : 
     444           4 :             CHECK_FOR_INTERRUPTS();
     445             :         }
     446           4 : exit:   ;
     447             :     }
     448           0 :     PG_FINALLY();
     449             :     {
     450           4 :         PQcancelFinish(cancel_conn);
     451             :     }
     452           4 :     PG_END_TRY();
     453             : 
     454           4 :     return error;
     455             : }
     456             : 
     457             : #endif                          /* LIBPQ_BE_FE_HELPERS_H */

Generated by: LCOV version 1.14