LCOV - code coverage report
Current view: top level - src/fe_utils - parallel_slot.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 140 157 89.2 %
Date: 2025-01-18 03:14:54 Functions: 14 14 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  *  parallel_slot.c
       4             :  *      Parallel support for front-end parallel database connections
       5             :  *
       6             :  *
       7             :  * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
       8             :  * Portions Copyright (c) 1994, Regents of the University of California
       9             :  *
      10             :  * src/fe_utils/parallel_slot.c
      11             :  *
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : 
      15             : #if defined(WIN32) && FD_SETSIZE < 1024
      16             : #error FD_SETSIZE needs to have been increased
      17             : #endif
      18             : 
      19             : #include "postgres_fe.h"
      20             : 
      21             : #include <sys/select.h>
      22             : 
      23             : #include "common/logging.h"
      24             : #include "fe_utils/cancel.h"
      25             : #include "fe_utils/parallel_slot.h"
      26             : #include "fe_utils/query_utils.h"
      27             : 
      28             : #define ERRCODE_UNDEFINED_TABLE  "42P01"
      29             : 
      30             : static int  select_loop(int maxFd, fd_set *workerset);
      31             : static bool processQueryResult(ParallelSlot *slot, PGresult *result);
      32             : 
      33             : /*
      34             :  * Process (and delete) a query result.  Returns true if there's no problem,
      35             :  * false otherwise. It's up to the handler to decide what constitutes a
      36             :  * problem.
      37             :  */
      38             : static bool
      39       22638 : processQueryResult(ParallelSlot *slot, PGresult *result)
      40             : {
      41             :     Assert(slot->handler != NULL);
      42             : 
      43             :     /* On failure, the handler should return NULL after freeing the result */
      44       22638 :     if (!slot->handler(result, slot->connection, slot->handler_context))
      45          12 :         return false;
      46             : 
      47             :     /* Ok, we have to free it ourself */
      48       22626 :     PQclear(result);
      49       22626 :     return true;
      50             : }
      51             : 
      52             : /*
      53             :  * Consume all the results generated for the given connection until
      54             :  * nothing remains.  If at least one error is encountered, return false.
      55             :  * Note that this will block if the connection is busy.
      56             :  */
      57             : static bool
      58         406 : consumeQueryResult(ParallelSlot *slot)
      59             : {
      60         406 :     bool        ok = true;
      61             :     PGresult   *result;
      62             : 
      63         406 :     SetCancelConn(slot->connection);
      64         812 :     while ((result = PQgetResult(slot->connection)) != NULL)
      65             :     {
      66         406 :         if (!processQueryResult(slot, result))
      67          12 :             ok = false;
      68             :     }
      69         406 :     ResetCancelConn();
      70         406 :     return ok;
      71             : }
      72             : 
      73             : /*
      74             :  * Wait until a file descriptor from the given set becomes readable.
      75             :  *
      76             :  * Returns the number of ready descriptors, or -1 on failure (including
      77             :  * getting a cancel request).
      78             :  */
      79             : static int
      80       22338 : select_loop(int maxFd, fd_set *workerset)
      81             : {
      82             :     int         i;
      83       22338 :     fd_set      saveSet = *workerset;
      84             : 
      85       22338 :     if (CancelRequested)
      86           0 :         return -1;
      87             : 
      88             :     for (;;)
      89           0 :     {
      90             :         /*
      91             :          * On Windows, we need to check once in a while for cancel requests;
      92             :          * on other platforms we rely on select() returning when interrupted.
      93             :          */
      94             :         struct timeval *tvp;
      95             : #ifdef WIN32
      96             :         struct timeval tv = {0, 1000000};
      97             : 
      98             :         tvp = &tv;
      99             : #else
     100       22338 :         tvp = NULL;
     101             : #endif
     102             : 
     103       22338 :         *workerset = saveSet;
     104       22338 :         i = select(maxFd + 1, workerset, NULL, NULL, tvp);
     105             : 
     106             : #ifdef WIN32
     107             :         if (i == SOCKET_ERROR)
     108             :         {
     109             :             i = -1;
     110             : 
     111             :             if (WSAGetLastError() == WSAEINTR)
     112             :                 errno = EINTR;
     113             :         }
     114             : #endif
     115             : 
     116       22338 :         if (i < 0 && errno == EINTR)
     117           0 :             continue;           /* ignore this */
     118       22338 :         if (i < 0 || CancelRequested)
     119           0 :             return -1;          /* but not this */
     120       22338 :         if (i == 0)
     121           0 :             continue;           /* timeout (Win32 only) */
     122       22338 :         break;
     123             :     }
     124             : 
     125       22338 :     return i;
     126             : }
     127             : 
     128             : /*
     129             :  * Return the offset of a suitable idle slot, or -1 if none are available.  If
     130             :  * the given dbname is not null, only idle slots connected to the given
     131             :  * database are considered suitable, otherwise all idle connected slots are
     132             :  * considered suitable.
     133             :  */
     134             : static int
     135       44974 : find_matching_idle_slot(const ParallelSlotArray *sa, const char *dbname)
     136             : {
     137             :     int         i;
     138             : 
     139       67580 :     for (i = 0; i < sa->numslots; i++)
     140             :     {
     141       45200 :         if (sa->slots[i].inUse)
     142       22564 :             continue;
     143             : 
     144       22636 :         if (sa->slots[i].connection == NULL)
     145          14 :             continue;
     146             : 
     147       22622 :         if (dbname == NULL ||
     148       15558 :             strcmp(PQdb(sa->slots[i].connection), dbname) == 0)
     149       22594 :             return i;
     150             :     }
     151       22380 :     return -1;
     152             : }
     153             : 
     154             : /*
     155             :  * Return the offset of the first slot without a database connection, or -1 if
     156             :  * all slots are connected.
     157             :  */
     158             : static int
     159       22664 : find_unconnected_slot(const ParallelSlotArray *sa)
     160             : {
     161             :     int         i;
     162             : 
     163       45180 :     for (i = 0; i < sa->numslots; i++)
     164             :     {
     165       22814 :         if (sa->slots[i].inUse)
     166       22488 :             continue;
     167             : 
     168         326 :         if (sa->slots[i].connection == NULL)
     169         298 :             return i;
     170             :     }
     171             : 
     172       22366 :     return -1;
     173             : }
     174             : 
     175             : /*
     176             :  * Return the offset of the first idle slot, or -1 if all slots are busy.
     177             :  */
     178             : static int
     179       22366 : find_any_idle_slot(const ParallelSlotArray *sa)
     180             : {
     181             :     int         i;
     182             : 
     183       44846 :     for (i = 0; i < sa->numslots; i++)
     184       22508 :         if (!sa->slots[i].inUse)
     185          28 :             return i;
     186             : 
     187       22338 :     return -1;
     188             : }
     189             : 
     190             : /*
     191             :  * Wait for any slot's connection to have query results, consume the results,
     192             :  * and update the slot's status as appropriate.  Returns true on success,
     193             :  * false on cancellation, on error, or if no slots are connected.
     194             :  */
     195             : static bool
     196       22338 : wait_on_slots(ParallelSlotArray *sa)
     197             : {
     198             :     int         i;
     199             :     fd_set      slotset;
     200       22338 :     int         maxFd = 0;
     201       22338 :     PGconn     *cancelconn = NULL;
     202             : 
     203             :     /* We must reconstruct the fd_set for each call to select_loop */
     204       22338 :     FD_ZERO(&slotset);
     205             : 
     206       44818 :     for (i = 0; i < sa->numslots; i++)
     207             :     {
     208             :         int         sock;
     209             : 
     210             :         /* We shouldn't get here if we still have slots without connections */
     211             :         Assert(sa->slots[i].connection != NULL);
     212             : 
     213       22480 :         sock = PQsocket(sa->slots[i].connection);
     214             : 
     215             :         /*
     216             :          * We don't really expect any connections to lose their sockets after
     217             :          * startup, but just in case, cope by ignoring them.
     218             :          */
     219       22480 :         if (sock < 0)
     220           0 :             continue;
     221             : 
     222             :         /* Keep track of the first valid connection we see. */
     223       22480 :         if (cancelconn == NULL)
     224       22338 :             cancelconn = sa->slots[i].connection;
     225             : 
     226       22480 :         FD_SET(sock, &slotset);
     227       22480 :         if (sock > maxFd)
     228       22480 :             maxFd = sock;
     229             :     }
     230             : 
     231             :     /*
     232             :      * If we get this far with no valid connections, processing cannot
     233             :      * continue.
     234             :      */
     235       22338 :     if (cancelconn == NULL)
     236           0 :         return false;
     237             : 
     238       22338 :     SetCancelConn(cancelconn);
     239       22338 :     i = select_loop(maxFd, &slotset);
     240       22338 :     ResetCancelConn();
     241             : 
     242             :     /* failure? */
     243       22338 :     if (i < 0)
     244           0 :         return false;
     245             : 
     246       44818 :     for (i = 0; i < sa->numslots; i++)
     247             :     {
     248             :         int         sock;
     249             : 
     250       22480 :         sock = PQsocket(sa->slots[i].connection);
     251             : 
     252       22480 :         if (sock >= 0 && FD_ISSET(sock, &slotset))
     253             :         {
     254             :             /* select() says input is available, so consume it */
     255       22340 :             PQconsumeInput(sa->slots[i].connection);
     256             :         }
     257             : 
     258             :         /* Collect result(s) as long as any are available */
     259       44712 :         while (!PQisBusy(sa->slots[i].connection))
     260             :         {
     261       44464 :             PGresult   *result = PQgetResult(sa->slots[i].connection);
     262             : 
     263       44464 :             if (result != NULL)
     264             :             {
     265             :                 /* Handle and discard the command result */
     266       22232 :                 if (!processQueryResult(&sa->slots[i], result))
     267           0 :                     return false;
     268             :             }
     269             :             else
     270             :             {
     271             :                 /* This connection has become idle */
     272       22232 :                 sa->slots[i].inUse = false;
     273       22232 :                 ParallelSlotClearHandler(&sa->slots[i]);
     274       22232 :                 break;
     275             :             }
     276             :         }
     277             :     }
     278       22338 :     return true;
     279             : }
     280             : 
     281             : /*
     282             :  * Open a new database connection using the stored connection parameters and
     283             :  * optionally a given dbname if not null, execute the stored initial command if
     284             :  * any, and associate the new connection with the given slot.
     285             :  */
     286             : static void
     287          42 : connect_slot(ParallelSlotArray *sa, int slotno, const char *dbname)
     288             : {
     289             :     const char *old_override;
     290          42 :     ParallelSlot *slot = &sa->slots[slotno];
     291             : 
     292          42 :     old_override = sa->cparams->override_dbname;
     293          42 :     if (dbname)
     294          34 :         sa->cparams->override_dbname = dbname;
     295          42 :     slot->connection = connectDatabase(sa->cparams, sa->progname, sa->echo, false, true);
     296          42 :     sa->cparams->override_dbname = old_override;
     297             : 
     298             :     /*
     299             :      * POSIX defines FD_SETSIZE as the highest file descriptor acceptable to
     300             :      * FD_SET() and allied macros.  Windows defines it as a ceiling on the
     301             :      * count of file descriptors in the set, not a ceiling on the value of
     302             :      * each file descriptor; see
     303             :      * https://learn.microsoft.com/en-us/windows/win32/api/winsock2/nf-winsock2-select
     304             :      * and
     305             :      * https://learn.microsoft.com/en-us/windows/win32/api/winsock/ns-winsock-fd_set.
     306             :      * We can't ignore that, because Windows starts file descriptors at a
     307             :      * higher value, delays reuse, and skips values.  With less than ten
     308             :      * concurrent file descriptors, opened and closed rapidly, one can reach
     309             :      * file descriptor 1024.
     310             :      *
     311             :      * Doing a hard exit here is a bit grotty, but it doesn't seem worth
     312             :      * complicating the API to make it less grotty.
     313             :      */
     314             : #ifdef WIN32
     315             :     if (slotno >= FD_SETSIZE)
     316             :     {
     317             :         pg_log_error("too many jobs for this platform: %d", slotno);
     318             :         exit(1);
     319             :     }
     320             : #else
     321             :     {
     322          42 :         int         fd = PQsocket(slot->connection);
     323             : 
     324          42 :         if (fd >= FD_SETSIZE)
     325             :         {
     326           0 :             pg_log_error("socket file descriptor out of range for select(): %d",
     327             :                          fd);
     328           0 :             pg_log_error_hint("Try fewer jobs.");
     329           0 :             exit(1);
     330             :         }
     331             :     }
     332             : #endif
     333             : 
     334             :     /* Setup the connection using the supplied command, if any. */
     335          42 :     if (sa->initcmd)
     336           0 :         executeCommand(slot->connection, sa->initcmd, sa->echo);
     337          42 : }
     338             : 
     339             : /*
     340             :  * ParallelSlotsGetIdle
     341             :  *      Return a connection slot that is ready to execute a command.
     342             :  *
     343             :  * The slot returned is chosen as follows:
     344             :  *
     345             :  * If any idle slot already has an open connection, and if either dbname is
     346             :  * null or the existing connection is to the given database, that slot will be
     347             :  * returned allowing the connection to be reused.
     348             :  *
     349             :  * Otherwise, if any idle slot is not yet connected to any database, the slot
     350             :  * will be returned with it's connection opened using the stored cparams and
     351             :  * optionally the given dbname if not null.
     352             :  *
     353             :  * Otherwise, if any idle slot exists, an idle slot will be chosen and returned
     354             :  * after having it's connection disconnected and reconnected using the stored
     355             :  * cparams and optionally the given dbname if not null.
     356             :  *
     357             :  * Otherwise, if any slots have connections that are busy, we loop on select()
     358             :  * until one socket becomes available.  When this happens, we read the whole
     359             :  * set and mark as free all sockets that become available.  We then select a
     360             :  * slot using the same rules as above.
     361             :  *
     362             :  * Otherwise, we cannot return a slot, which is an error, and NULL is returned.
     363             :  *
     364             :  * For any connection created, if the stored initcmd is not null, it will be
     365             :  * executed as a command on the newly formed connection before the slot is
     366             :  * returned.
     367             :  *
     368             :  * If an error occurs, NULL is returned.
     369             :  */
     370             : ParallelSlot *
     371       44974 : ParallelSlotsGetIdle(ParallelSlotArray *sa, const char *dbname)
     372             : {
     373             :     int         offset;
     374             : 
     375             :     Assert(sa);
     376             :     Assert(sa->numslots > 0);
     377             : 
     378             :     while (1)
     379             :     {
     380             :         /* First choice: a slot already connected to the desired database. */
     381       44974 :         offset = find_matching_idle_slot(sa, dbname);
     382       44974 :         if (offset >= 0)
     383             :         {
     384       22594 :             sa->slots[offset].inUse = true;
     385       22594 :             return &sa->slots[offset];
     386             :         }
     387             : 
     388             :         /* Second choice: a slot not connected to any database. */
     389       22380 :         offset = find_unconnected_slot(sa);
     390       22380 :         if (offset >= 0)
     391             :         {
     392          14 :             connect_slot(sa, offset, dbname);
     393          14 :             sa->slots[offset].inUse = true;
     394          14 :             return &sa->slots[offset];
     395             :         }
     396             : 
     397             :         /* Third choice: a slot connected to the wrong database. */
     398       22366 :         offset = find_any_idle_slot(sa);
     399       22366 :         if (offset >= 0)
     400             :         {
     401          28 :             disconnectDatabase(sa->slots[offset].connection);
     402          28 :             sa->slots[offset].connection = NULL;
     403          28 :             connect_slot(sa, offset, dbname);
     404          28 :             sa->slots[offset].inUse = true;
     405          28 :             return &sa->slots[offset];
     406             :         }
     407             : 
     408             :         /*
     409             :          * Fourth choice: block until one or more slots become available. If
     410             :          * any slots hit a fatal error, we'll find out about that here and
     411             :          * return NULL.
     412             :          */
     413       22338 :         if (!wait_on_slots(sa))
     414           0 :             return NULL;
     415             :     }
     416             : }
     417             : 
     418             : /*
     419             :  * ParallelSlotsSetup
     420             :  *      Prepare a set of parallel slots but do not connect to any database.
     421             :  *
     422             :  * This creates and initializes a set of slots, marking all parallel slots as
     423             :  * free and ready to use.  Establishing connections is delayed until requesting
     424             :  * a free slot.  The cparams, progname, echo, and initcmd are stored for later
     425             :  * use and must remain valid for the lifetime of the returned array.
     426             :  */
     427             : ParallelSlotArray *
     428         290 : ParallelSlotsSetup(int numslots, ConnParams *cparams, const char *progname,
     429             :                    bool echo, const char *initcmd)
     430             : {
     431             :     ParallelSlotArray *sa;
     432             : 
     433             :     Assert(numslots > 0);
     434             :     Assert(cparams != NULL);
     435             :     Assert(progname != NULL);
     436             : 
     437         290 :     sa = (ParallelSlotArray *) palloc0(offsetof(ParallelSlotArray, slots) +
     438         290 :                                        numslots * sizeof(ParallelSlot));
     439             : 
     440         290 :     sa->numslots = numslots;
     441         290 :     sa->cparams = cparams;
     442         290 :     sa->progname = progname;
     443         290 :     sa->echo = echo;
     444         290 :     sa->initcmd = initcmd;
     445             : 
     446         290 :     return sa;
     447             : }
     448             : 
     449             : /*
     450             :  * ParallelSlotsAdoptConn
     451             :  *      Assign an open connection to the slots array for reuse.
     452             :  *
     453             :  * This turns over ownership of an open connection to a slots array.  The
     454             :  * caller should not further use or close the connection.  All the connection's
     455             :  * parameters (user, host, port, etc.) except possibly dbname should match
     456             :  * those of the slots array's cparams, as given in ParallelSlotsSetup.  If
     457             :  * these parameters differ, subsequent behavior is undefined.
     458             :  */
     459             : void
     460         284 : ParallelSlotsAdoptConn(ParallelSlotArray *sa, PGconn *conn)
     461             : {
     462             :     int         offset;
     463             : 
     464         284 :     offset = find_unconnected_slot(sa);
     465         284 :     if (offset >= 0)
     466         284 :         sa->slots[offset].connection = conn;
     467             :     else
     468           0 :         disconnectDatabase(conn);
     469         284 : }
     470             : 
     471             : /*
     472             :  * ParallelSlotsTerminate
     473             :  *      Clean up a set of parallel slots
     474             :  *
     475             :  * Iterate through all connections in a given set of ParallelSlots and
     476             :  * terminate all connections.
     477             :  */
     478             : void
     479         290 : ParallelSlotsTerminate(ParallelSlotArray *sa)
     480             : {
     481             :     int         i;
     482             : 
     483         588 :     for (i = 0; i < sa->numslots; i++)
     484             :     {
     485         298 :         PGconn     *conn = sa->slots[i].connection;
     486             : 
     487         298 :         if (conn == NULL)
     488           0 :             continue;
     489             : 
     490         298 :         disconnectDatabase(conn);
     491             :     }
     492         290 : }
     493             : 
     494             : /*
     495             :  * ParallelSlotsWaitCompletion
     496             :  *
     497             :  * Wait for all connections to finish, returning false if at least one
     498             :  * error has been found on the way.
     499             :  */
     500             : bool
     501         396 : ParallelSlotsWaitCompletion(ParallelSlotArray *sa)
     502             : {
     503             :     int         i;
     504             : 
     505         790 :     for (i = 0; i < sa->numslots; i++)
     506             :     {
     507         406 :         if (sa->slots[i].connection == NULL)
     508           0 :             continue;
     509         406 :         if (!consumeQueryResult(&sa->slots[i]))
     510          12 :             return false;
     511             :         /* Mark connection as idle */
     512         394 :         sa->slots[i].inUse = false;
     513         394 :         ParallelSlotClearHandler(&sa->slots[i]);
     514             :     }
     515             : 
     516         384 :     return true;
     517             : }
     518             : 
     519             : /*
     520             :  * TableCommandResultHandler
     521             :  *
     522             :  * ParallelSlotResultHandler for results of commands (not queries) against
     523             :  * tables.
     524             :  *
     525             :  * Requires that the result status is either PGRES_COMMAND_OK or an error about
     526             :  * a missing table.  This is useful for utilities that compile a list of tables
     527             :  * to process and then run commands (vacuum, reindex, or whatever) against
     528             :  * those tables, as there is a race condition between the time the list is
     529             :  * compiled and the time the command attempts to open the table.
     530             :  *
     531             :  * For missing tables, logs an error but allows processing to continue.
     532             :  *
     533             :  * For all other errors, logs an error and terminates further processing.
     534             :  *
     535             :  * res: PGresult from the query executed on the slot's connection
     536             :  * conn: connection belonging to the slot
     537             :  * context: unused
     538             :  */
     539             : bool
     540        7074 : TableCommandResultHandler(PGresult *res, PGconn *conn, void *context)
     541             : {
     542             :     Assert(res != NULL);
     543             :     Assert(conn != NULL);
     544             : 
     545             :     /*
     546             :      * If it's an error, report it.  Errors about a missing table are harmless
     547             :      * so we continue processing; but die for other errors.
     548             :      */
     549        7074 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
     550             :     {
     551          12 :         char       *sqlState = PQresultErrorField(res, PG_DIAG_SQLSTATE);
     552             : 
     553          12 :         pg_log_error("processing of database \"%s\" failed: %s",
     554             :                      PQdb(conn), PQerrorMessage(conn));
     555             : 
     556          12 :         if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) != 0)
     557             :         {
     558          12 :             PQclear(res);
     559          12 :             return false;
     560             :         }
     561             :     }
     562             : 
     563        7062 :     return true;
     564             : }

Generated by: LCOV version 1.14