LCOV - code coverage report
Current view: top level - src/fe_utils - parallel_slot.c (source / functions) Hit Total Coverage
Test: PostgreSQL 15devel Lines: 137 153 89.5 %
Date: 2021-09-17 15:07:27 Functions: 14 14 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  *  parallel_slot.c
       4             :  *      Parallel support for front-end parallel database connections
       5             :  *
       6             :  *
       7             :  * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
       8             :  * Portions Copyright (c) 1994, Regents of the University of California
       9             :  *
      10             :  * src/fe_utils/parallel_slot.c
      11             :  *
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : 
      15             : #ifdef WIN32
      16             : #define FD_SETSIZE 1024         /* must set before winsock2.h is included */
      17             : #endif
      18             : 
      19             : #include "postgres_fe.h"
      20             : 
      21             : #ifdef HAVE_SYS_SELECT_H
      22             : #include <sys/select.h>
      23             : #endif
      24             : 
      25             : #include "common/logging.h"
      26             : #include "fe_utils/cancel.h"
      27             : #include "fe_utils/parallel_slot.h"
      28             : #include "fe_utils/query_utils.h"
      29             : 
      30             : #define ERRCODE_UNDEFINED_TABLE  "42P01"
      31             : 
      32             : static int  select_loop(int maxFd, fd_set *workerset);
      33             : static bool processQueryResult(ParallelSlot *slot, PGresult *result);
      34             : 
      35             : /*
      36             :  * Process (and delete) a query result.  Returns true if there's no problem,
      37             :  * false otherwise. It's up to the handler to decide what constitutes a
      38             :  * problem.
      39             :  */
      40             : static bool
      41       15938 : processQueryResult(ParallelSlot *slot, PGresult *result)
      42             : {
      43             :     Assert(slot->handler != NULL);
      44             : 
      45             :     /* On failure, the handler should return NULL after freeing the result */
      46       15938 :     if (!slot->handler(result, slot->connection, slot->handler_context))
      47          14 :         return false;
      48             : 
      49             :     /* Ok, we have to free it ourself */
      50       15924 :     PQclear(result);
      51       15924 :     return true;
      52             : }
      53             : 
      54             : /*
      55             :  * Consume all the results generated for the given connection until
      56             :  * nothing remains.  If at least one error is encountered, return false.
      57             :  * Note that this will block if the connection is busy.
      58             :  */
      59             : static bool
      60         214 : consumeQueryResult(ParallelSlot *slot)
      61             : {
      62         214 :     bool        ok = true;
      63             :     PGresult   *result;
      64             : 
      65         214 :     SetCancelConn(slot->connection);
      66         430 :     while ((result = PQgetResult(slot->connection)) != NULL)
      67             :     {
      68         216 :         if (!processQueryResult(slot, result))
      69          14 :             ok = false;
      70             :     }
      71         214 :     ResetCancelConn();
      72         214 :     return ok;
      73             : }
      74             : 
      75             : /*
      76             :  * Wait until a file descriptor from the given set becomes readable.
      77             :  *
      78             :  * Returns the number of ready descriptors, or -1 on failure (including
      79             :  * getting a cancel request).
      80             :  */
      81             : static int
      82       15814 : select_loop(int maxFd, fd_set *workerset)
      83             : {
      84             :     int         i;
      85       15814 :     fd_set      saveSet = *workerset;
      86             : 
      87       15814 :     if (CancelRequested)
      88           0 :         return -1;
      89             : 
      90             :     for (;;)
      91           0 :     {
      92             :         /*
      93             :          * On Windows, we need to check once in a while for cancel requests;
      94             :          * on other platforms we rely on select() returning when interrupted.
      95             :          */
      96             :         struct timeval *tvp;
      97             : #ifdef WIN32
      98             :         struct timeval tv = {0, 1000000};
      99             : 
     100             :         tvp = &tv;
     101             : #else
     102       15814 :         tvp = NULL;
     103             : #endif
     104             : 
     105       15814 :         *workerset = saveSet;
     106       15814 :         i = select(maxFd + 1, workerset, NULL, NULL, tvp);
     107             : 
     108             : #ifdef WIN32
     109             :         if (i == SOCKET_ERROR)
     110             :         {
     111             :             i = -1;
     112             : 
     113             :             if (WSAGetLastError() == WSAEINTR)
     114             :                 errno = EINTR;
     115             :         }
     116             : #endif
     117             : 
     118       15814 :         if (i < 0 && errno == EINTR)
     119           0 :             continue;           /* ignore this */
     120       15814 :         if (i < 0 || CancelRequested)
     121           0 :             return -1;          /* but not this */
     122       15814 :         if (i == 0)
     123           0 :             continue;           /* timeout (Win32 only) */
     124       15814 :         break;
     125             :     }
     126             : 
     127       15814 :     return i;
     128             : }
     129             : 
     130             : /*
     131             :  * Return the offset of a suitable idle slot, or -1 if none are available.  If
     132             :  * the given dbname is not null, only idle slots connected to the given
     133             :  * database are considered suitable, otherwise all idle connected slots are
     134             :  * considered suitable.
     135             :  */
     136             : static int
     137       31750 : find_matching_idle_slot(const ParallelSlotArray *sa, const char *dbname)
     138             : {
     139             :     int         i;
     140             : 
     141       47838 :     for (i = 0; i < sa->numslots; i++)
     142             :     {
     143       31988 :         if (sa->slots[i].inUse)
     144       16052 :             continue;
     145             : 
     146       15936 :         if (sa->slots[i].connection == NULL)
     147          14 :             continue;
     148             : 
     149       15922 :         if (dbname == NULL ||
     150       11200 :             strcmp(PQdb(sa->slots[i].connection), dbname) == 0)
     151       15900 :             return i;
     152             :     }
     153       15850 :     return -1;
     154             : }
     155             : 
     156             : /*
     157             :  * Return the offset of the first slot without a database connection, or -1 if
     158             :  * all slots are connected.
     159             :  */
     160             : static int
     161       16050 : find_unconnected_slot(const ParallelSlotArray *sa)
     162             : {
     163             :     int         i;
     164             : 
     165       32042 :     for (i = 0; i < sa->numslots; i++)
     166             :     {
     167       16206 :         if (sa->slots[i].inUse)
     168       15970 :             continue;
     169             : 
     170         236 :         if (sa->slots[i].connection == NULL)
     171         214 :             return i;
     172             :     }
     173             : 
     174       15836 :     return -1;
     175             : }
     176             : 
     177             : /*
     178             :  * Return the offset of the first idle slot, or -1 if all slots are busy.
     179             :  */
     180             : static int
     181       15836 : find_any_idle_slot(const ParallelSlotArray *sa)
     182             : {
     183             :     int         i;
     184             : 
     185       31798 :     for (i = 0; i < sa->numslots; i++)
     186       15984 :         if (!sa->slots[i].inUse)
     187          22 :             return i;
     188             : 
     189       15814 :     return -1;
     190             : }
     191             : 
     192             : /*
     193             :  * Wait for any slot's connection to have query results, consume the results,
     194             :  * and update the slot's status as appropriate.  Returns true on success,
     195             :  * false on cancellation, on error, or if no slots are connected.
     196             :  */
     197             : static bool
     198       15814 : wait_on_slots(ParallelSlotArray *sa)
     199             : {
     200             :     int         i;
     201             :     fd_set      slotset;
     202       15814 :     int         maxFd = 0;
     203       15814 :     PGconn     *cancelconn = NULL;
     204             : 
     205             :     /* We must reconstruct the fd_set for each call to select_loop */
     206       15814 :     FD_ZERO(&slotset);
     207             : 
     208       31776 :     for (i = 0; i < sa->numslots; i++)
     209             :     {
     210             :         int         sock;
     211             : 
     212             :         /* We shouldn't get here if we still have slots without connections */
     213             :         Assert(sa->slots[i].connection != NULL);
     214             : 
     215       15962 :         sock = PQsocket(sa->slots[i].connection);
     216             : 
     217             :         /*
     218             :          * We don't really expect any connections to lose their sockets after
     219             :          * startup, but just in case, cope by ignoring them.
     220             :          */
     221       15962 :         if (sock < 0)
     222           0 :             continue;
     223             : 
     224             :         /* Keep track of the first valid connection we see. */
     225       15962 :         if (cancelconn == NULL)
     226       15814 :             cancelconn = sa->slots[i].connection;
     227             : 
     228       15962 :         FD_SET(sock, &slotset);
     229       15962 :         if (sock > maxFd)
     230       15962 :             maxFd = sock;
     231             :     }
     232             : 
     233             :     /*
     234             :      * If we get this far with no valid connections, processing cannot
     235             :      * continue.
     236             :      */
     237       15814 :     if (cancelconn == NULL)
     238           0 :         return false;
     239             : 
     240       15814 :     SetCancelConn(sa->slots->connection);
     241       15814 :     i = select_loop(maxFd, &slotset);
     242       15814 :     ResetCancelConn();
     243             : 
     244             :     /* failure? */
     245       15814 :     if (i < 0)
     246           0 :         return false;
     247             : 
     248       31776 :     for (i = 0; i < sa->numslots; i++)
     249             :     {
     250             :         int         sock;
     251             : 
     252       15962 :         sock = PQsocket(sa->slots[i].connection);
     253             : 
     254       15962 :         if (sock >= 0 && FD_ISSET(sock, &slotset))
     255             :         {
     256             :             /* select() says input is available, so consume it */
     257       15814 :             PQconsumeInput(sa->slots[i].connection);
     258             :         }
     259             : 
     260             :         /* Collect result(s) as long as any are available */
     261       31684 :         while (!PQisBusy(sa->slots[i].connection))
     262             :         {
     263       31444 :             PGresult   *result = PQgetResult(sa->slots[i].connection);
     264             : 
     265       31444 :             if (result != NULL)
     266             :             {
     267             :                 /* Handle and discard the command result */
     268       15722 :                 if (!processQueryResult(&sa->slots[i], result))
     269           0 :                     return false;
     270             :             }
     271             :             else
     272             :             {
     273             :                 /* This connection has become idle */
     274       15722 :                 sa->slots[i].inUse = false;
     275       15722 :                 ParallelSlotClearHandler(&sa->slots[i]);
     276       15722 :                 break;
     277             :             }
     278             :         }
     279             :     }
     280       15814 :     return true;
     281             : }
     282             : 
     283             : /*
     284             :  * Open a new database connection using the stored connection parameters and
     285             :  * optionally a given dbname if not null, execute the stored initial command if
     286             :  * any, and associate the new connection with the given slot.
     287             :  */
     288             : static void
     289          36 : connect_slot(ParallelSlotArray *sa, int slotno, const char *dbname)
     290             : {
     291             :     const char *old_override;
     292          36 :     ParallelSlot *slot = &sa->slots[slotno];
     293             : 
     294          36 :     old_override = sa->cparams->override_dbname;
     295          36 :     if (dbname)
     296          28 :         sa->cparams->override_dbname = dbname;
     297          36 :     slot->connection = connectDatabase(sa->cparams, sa->progname, sa->echo, false, true);
     298          36 :     sa->cparams->override_dbname = old_override;
     299             : 
     300          36 :     if (PQsocket(slot->connection) >= FD_SETSIZE)
     301             :     {
     302           0 :         pg_log_fatal("too many jobs for this platform");
     303           0 :         exit(1);
     304             :     }
     305             : 
     306             :     /* Setup the connection using the supplied command, if any. */
     307          36 :     if (sa->initcmd)
     308           0 :         executeCommand(slot->connection, sa->initcmd, sa->echo);
     309          36 : }
     310             : 
     311             : /*
     312             :  * ParallelSlotsGetIdle
     313             :  *      Return a connection slot that is ready to execute a command.
     314             :  *
     315             :  * The slot returned is chosen as follows:
     316             :  *
     317             :  * If any idle slot already has an open connection, and if either dbname is
     318             :  * null or the existing connection is to the given database, that slot will be
     319             :  * returned allowing the connection to be reused.
     320             :  *
     321             :  * Otherwise, if any idle slot is not yet connected to any database, the slot
     322             :  * will be returned with it's connection opened using the stored cparams and
     323             :  * optionally the given dbname if not null.
     324             :  *
     325             :  * Otherwise, if any idle slot exists, an idle slot will be chosen and returned
     326             :  * after having it's connection disconnected and reconnected using the stored
     327             :  * cparams and optionally the given dbname if not null.
     328             :  *
     329             :  * Otherwise, if any slots have connections that are busy, we loop on select()
     330             :  * until one socket becomes available.  When this happens, we read the whole
     331             :  * set and mark as free all sockets that become available.  We then select a
     332             :  * slot using the same rules as above.
     333             :  *
     334             :  * Otherwise, we cannot return a slot, which is an error, and NULL is returned.
     335             :  *
     336             :  * For any connection created, if the stored initcmd is not null, it will be
     337             :  * executed as a command on the newly formed connection before the slot is
     338             :  * returned.
     339             :  *
     340             :  * If an error occurs, NULL is returned.
     341             :  */
     342             : ParallelSlot *
     343       31750 : ParallelSlotsGetIdle(ParallelSlotArray *sa, const char *dbname)
     344             : {
     345             :     int         offset;
     346             : 
     347             :     Assert(sa);
     348             :     Assert(sa->numslots > 0);
     349             : 
     350             :     while (1)
     351             :     {
     352             :         /* First choice: a slot already connected to the desired database. */
     353       31750 :         offset = find_matching_idle_slot(sa, dbname);
     354       31750 :         if (offset >= 0)
     355             :         {
     356       15900 :             sa->slots[offset].inUse = true;
     357       15900 :             return &sa->slots[offset];
     358             :         }
     359             : 
     360             :         /* Second choice: a slot not connected to any database. */
     361       15850 :         offset = find_unconnected_slot(sa);
     362       15850 :         if (offset >= 0)
     363             :         {
     364          14 :             connect_slot(sa, offset, dbname);
     365          14 :             sa->slots[offset].inUse = true;
     366          14 :             return &sa->slots[offset];
     367             :         }
     368             : 
     369             :         /* Third choice: a slot connected to the wrong database. */
     370       15836 :         offset = find_any_idle_slot(sa);
     371       15836 :         if (offset >= 0)
     372             :         {
     373          22 :             disconnectDatabase(sa->slots[offset].connection);
     374          22 :             sa->slots[offset].connection = NULL;
     375          22 :             connect_slot(sa, offset, dbname);
     376          22 :             sa->slots[offset].inUse = true;
     377          22 :             return &sa->slots[offset];
     378             :         }
     379             : 
     380             :         /*
     381             :          * Fourth choice: block until one or more slots become available. If
     382             :          * any slots hit a fatal error, we'll find out about that here and
     383             :          * return NULL.
     384             :          */
     385       15814 :         if (!wait_on_slots(sa))
     386           0 :             return NULL;
     387             :     }
     388             : }
     389             : 
     390             : /*
     391             :  * ParallelSlotsSetup
     392             :  *      Prepare a set of parallel slots but do not connect to any database.
     393             :  *
     394             :  * This creates and initializes a set of slots, marking all parallel slots as
     395             :  * free and ready to use.  Establishing connections is delayed until requesting
     396             :  * a free slot.  The cparams, progname, echo, and initcmd are stored for later
     397             :  * use and must remain valid for the lifetime of the returned array.
     398             :  */
     399             : ParallelSlotArray *
     400         206 : ParallelSlotsSetup(int numslots, ConnParams *cparams, const char *progname,
     401             :                    bool echo, const char *initcmd)
     402             : {
     403             :     ParallelSlotArray *sa;
     404             : 
     405             :     Assert(numslots > 0);
     406             :     Assert(cparams != NULL);
     407             :     Assert(progname != NULL);
     408             : 
     409         206 :     sa = (ParallelSlotArray *) palloc0(offsetof(ParallelSlotArray, slots) +
     410         206 :                                        numslots * sizeof(ParallelSlot));
     411             : 
     412         206 :     sa->numslots = numslots;
     413         206 :     sa->cparams = cparams;
     414         206 :     sa->progname = progname;
     415         206 :     sa->echo = echo;
     416         206 :     sa->initcmd = initcmd;
     417             : 
     418         206 :     return sa;
     419             : }
     420             : 
     421             : /*
     422             :  * ParallelSlotsAdoptConn
     423             :  *      Assign an open connection to the slots array for reuse.
     424             :  *
     425             :  * This turns over ownership of an open connection to a slots array.  The
     426             :  * caller should not further use or close the connection.  All the connection's
     427             :  * parameters (user, host, port, etc.) except possibly dbname should match
     428             :  * those of the slots array's cparams, as given in ParallelSlotsSetup.  If
     429             :  * these parameters differ, subsequent behavior is undefined.
     430             :  */
     431             : void
     432         200 : ParallelSlotsAdoptConn(ParallelSlotArray *sa, PGconn *conn)
     433             : {
     434             :     int         offset;
     435             : 
     436         200 :     offset = find_unconnected_slot(sa);
     437         200 :     if (offset >= 0)
     438         200 :         sa->slots[offset].connection = conn;
     439             :     else
     440           0 :         disconnectDatabase(conn);
     441         200 : }
     442             : 
     443             : /*
     444             :  * ParallelSlotsTerminate
     445             :  *      Clean up a set of parallel slots
     446             :  *
     447             :  * Iterate through all connections in a given set of ParallelSlots and
     448             :  * terminate all connections.
     449             :  */
     450             : void
     451         206 : ParallelSlotsTerminate(ParallelSlotArray *sa)
     452             : {
     453             :     int         i;
     454             : 
     455         420 :     for (i = 0; i < sa->numslots; i++)
     456             :     {
     457         214 :         PGconn     *conn = sa->slots[i].connection;
     458             : 
     459         214 :         if (conn == NULL)
     460           0 :             continue;
     461             : 
     462         214 :         disconnectDatabase(conn);
     463             :     }
     464         206 : }
     465             : 
     466             : /*
     467             :  * ParallelSlotsWaitCompletion
     468             :  *
     469             :  * Wait for all connections to finish, returning false if at least one
     470             :  * error has been found on the way.
     471             :  */
     472             : bool
     473         206 : ParallelSlotsWaitCompletion(ParallelSlotArray *sa)
     474             : {
     475             :     int         i;
     476             : 
     477         406 :     for (i = 0; i < sa->numslots; i++)
     478             :     {
     479         214 :         if (sa->slots[i].connection == NULL)
     480           0 :             continue;
     481         214 :         if (!consumeQueryResult(&sa->slots[i]))
     482          14 :             return false;
     483             :     }
     484             : 
     485         192 :     return true;
     486             : }
     487             : 
     488             : /*
     489             :  * TableCommandResultHandler
     490             :  *
     491             :  * ParallelSlotResultHandler for results of commands (not queries) against
     492             :  * tables.
     493             :  *
     494             :  * Requires that the result status is either PGRES_COMMAND_OK or an error about
     495             :  * a missing table.  This is useful for utilities that compile a list of tables
     496             :  * to process and then run commands (vacuum, reindex, or whatever) against
     497             :  * those tables, as there is a race condition between the time the list is
     498             :  * compiled and the time the command attempts to open the table.
     499             :  *
     500             :  * For missing tables, logs an error but allows processing to continue.
     501             :  *
     502             :  * For all other errors, logs an error and terminates further processing.
     503             :  *
     504             :  * res: PGresult from the query executed on the slot's connection
     505             :  * conn: connection belonging to the slot
     506             :  * context: unused
     507             :  */
     508             : bool
     509        4732 : TableCommandResultHandler(PGresult *res, PGconn *conn, void *context)
     510             : {
     511             :     Assert(res != NULL);
     512             :     Assert(conn != NULL);
     513             : 
     514             :     /*
     515             :      * If it's an error, report it.  Errors about a missing table are harmless
     516             :      * so we continue processing; but die for other errors.
     517             :      */
     518        4732 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
     519             :     {
     520          14 :         char       *sqlState = PQresultErrorField(res, PG_DIAG_SQLSTATE);
     521             : 
     522          14 :         pg_log_error("processing of database \"%s\" failed: %s",
     523             :                      PQdb(conn), PQerrorMessage(conn));
     524             : 
     525          14 :         if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) != 0)
     526             :         {
     527          14 :             PQclear(res);
     528          14 :             return false;
     529             :         }
     530             :     }
     531             : 
     532        4718 :     return true;
     533             : }

Generated by: LCOV version 1.13