LCOV - code coverage report
Current view: top level - src/bin/scripts - scripts_parallel.c (source / functions) Hit Total Coverage
Test: PostgreSQL 13beta1 Lines: 69 82 84.1 %
Date: 2020-06-01 10:07:15 Functions: 6 6 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  *  scripts_parallel.c
       4             :  *      Parallel support for bin/scripts/
       5             :  *
       6             :  *
       7             :  * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
       8             :  * Portions Copyright (c) 1994, Regents of the University of California
       9             :  *
      10             :  * src/bin/scripts/scripts_parallel.c
      11             :  *
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : 
      15             : #ifdef WIN32
      16             : #define FD_SETSIZE 1024         /* must set before winsock2.h is included */
      17             : #endif
      18             : 
      19             : #include "postgres_fe.h"
      20             : 
      21             : #ifdef HAVE_SYS_SELECT_H
      22             : #include <sys/select.h>
      23             : #endif
      24             : 
      25             : #include "common.h"
      26             : #include "common/logging.h"
      27             : #include "fe_utils/cancel.h"
      28             : #include "scripts_parallel.h"
      29             : 
      30             : static void init_slot(ParallelSlot *slot, PGconn *conn);
      31             : static int  select_loop(int maxFd, fd_set *workerset, bool *aborting);
      32             : 
      33             : static void
      34         178 : init_slot(ParallelSlot *slot, PGconn *conn)
      35             : {
      36         178 :     slot->connection = conn;
      37             :     /* Initially assume connection is idle */
      38         178 :     slot->isFree = true;
      39         178 : }
      40             : 
      41             : /*
      42             :  * Loop on select() until a descriptor from the given set becomes readable.
      43             :  *
      44             :  * If we get a cancel request while we're waiting, we forego all further
      45             :  * processing and set the *aborting flag to true.  The return value must be
      46             :  * ignored in this case.  Otherwise, *aborting is set to false.
      47             :  */
      48             : static int
      49        9458 : select_loop(int maxFd, fd_set *workerset, bool *aborting)
      50             : {
      51             :     int         i;
      52        9458 :     fd_set      saveSet = *workerset;
      53             : 
      54        9458 :     if (CancelRequested)
      55             :     {
      56           0 :         *aborting = true;
      57           0 :         return -1;
      58             :     }
      59             :     else
      60        9458 :         *aborting = false;
      61             : 
      62             :     for (;;)
      63           0 :     {
      64             :         /*
      65             :          * On Windows, we need to check once in a while for cancel requests;
      66             :          * on other platforms we rely on select() returning when interrupted.
      67             :          */
      68             :         struct timeval *tvp;
      69             : #ifdef WIN32
      70             :         struct timeval tv = {0, 1000000};
      71             : 
      72             :         tvp = &tv;
      73             : #else
      74        9458 :         tvp = NULL;
      75             : #endif
      76             : 
      77        9458 :         *workerset = saveSet;
      78        9458 :         i = select(maxFd + 1, workerset, NULL, NULL, tvp);
      79             : 
      80             : #ifdef WIN32
      81             :         if (i == SOCKET_ERROR)
      82             :         {
      83             :             i = -1;
      84             : 
      85             :             if (WSAGetLastError() == WSAEINTR)
      86             :                 errno = EINTR;
      87             :         }
      88             : #endif
      89             : 
      90        9458 :         if (i < 0 && errno == EINTR)
      91           0 :             continue;           /* ignore this */
      92        9458 :         if (i < 0 || CancelRequested)
      93           0 :             *aborting = true;   /* but not this */
      94        9458 :         if (i == 0)
      95           0 :             continue;           /* timeout (Win32 only) */
      96        9458 :         break;
      97             :     }
      98             : 
      99        9458 :     return i;
     100             : }
     101             : 
     102             : /*
     103             :  * ParallelSlotsGetIdle
     104             :  *      Return a connection slot that is ready to execute a command.
     105             :  *
     106             :  * This returns the first slot we find that is marked isFree, if one is;
     107             :  * otherwise, we loop on select() until one socket becomes available.  When
     108             :  * this happens, we read the whole set and mark as free all sockets that
     109             :  * become available.  If an error occurs, NULL is returned.
     110             :  */
     111             : ParallelSlot *
     112        9636 : ParallelSlotsGetIdle(ParallelSlot *slots, int numslots)
     113             : {
     114             :     int         i;
     115        9636 :     int         firstFree = -1;
     116             : 
     117             :     /*
     118             :      * Look for any connection currently free.  If there is one, mark it as
     119             :      * taken and let the caller know the slot to use.
     120             :      */
     121       19250 :     for (i = 0; i < numslots; i++)
     122             :     {
     123        9792 :         if (slots[i].isFree)
     124             :         {
     125         178 :             slots[i].isFree = false;
     126         178 :             return slots + i;
     127             :         }
     128             :     }
     129             : 
     130             :     /*
     131             :      * No free slot found, so wait until one of the connections has finished
     132             :      * its task and return the available slot.
     133             :      */
     134       18916 :     while (firstFree < 0)
     135             :     {
     136             :         fd_set      slotset;
     137        9458 :         int         maxFd = 0;
     138             :         bool        aborting;
     139             : 
     140             :         /* We must reconstruct the fd_set for each call to select_loop */
     141        9458 :         FD_ZERO(&slotset);
     142             : 
     143       19064 :         for (i = 0; i < numslots; i++)
     144             :         {
     145        9606 :             int         sock = PQsocket(slots[i].connection);
     146             : 
     147             :             /*
     148             :              * We don't really expect any connections to lose their sockets
     149             :              * after startup, but just in case, cope by ignoring them.
     150             :              */
     151        9606 :             if (sock < 0)
     152           0 :                 continue;
     153             : 
     154        9606 :             FD_SET(sock, &slotset);
     155        9606 :             if (sock > maxFd)
     156        9606 :                 maxFd = sock;
     157             :         }
     158             : 
     159        9458 :         SetCancelConn(slots->connection);
     160        9458 :         i = select_loop(maxFd, &slotset, &aborting);
     161        9458 :         ResetCancelConn();
     162             : 
     163        9458 :         if (aborting)
     164             :         {
     165             :             /*
     166             :              * We set the cancel-receiving connection to the one in the zeroth
     167             :              * slot above, so fetch the error from there.
     168             :              */
     169           0 :             consumeQueryResult(slots->connection);
     170           0 :             return NULL;
     171             :         }
     172             :         Assert(i != 0);
     173             : 
     174       19064 :         for (i = 0; i < numslots; i++)
     175             :         {
     176        9606 :             int         sock = PQsocket(slots[i].connection);
     177             : 
     178        9606 :             if (sock >= 0 && FD_ISSET(sock, &slotset))
     179             :             {
     180             :                 /* select() says input is available, so consume it */
     181        9458 :                 PQconsumeInput(slots[i].connection);
     182             :             }
     183             : 
     184             :             /* Collect result(s) as long as any are available */
     185       19064 :             while (!PQisBusy(slots[i].connection))
     186             :             {
     187       18916 :                 PGresult   *result = PQgetResult(slots[i].connection);
     188             : 
     189       18916 :                 if (result != NULL)
     190             :                 {
     191             :                     /* Check and discard the command result */
     192        9458 :                     if (!processQueryResult(slots[i].connection, result))
     193           0 :                         return NULL;
     194             :                 }
     195             :                 else
     196             :                 {
     197             :                     /* This connection has become idle */
     198        9458 :                     slots[i].isFree = true;
     199        9458 :                     if (firstFree < 0)
     200        9458 :                         firstFree = i;
     201        9458 :                     break;
     202             :                 }
     203             :             }
     204             :         }
     205             :     }
     206             : 
     207        9458 :     slots[firstFree].isFree = false;
     208        9458 :     return slots + firstFree;
     209             : }
     210             : 
     211             : /*
     212             :  * ParallelSlotsSetup
     213             :  *      Prepare a set of parallel slots to use on a given database.
     214             :  *
     215             :  * This creates and initializes a set of connections to the database
     216             :  * using the information given by the caller, marking all parallel slots
     217             :  * as free and ready to use.  "conn" is an initial connection set up
     218             :  * by the caller and is associated with the first slot in the parallel
     219             :  * set.
     220             :  */
     221             : ParallelSlot *
     222         170 : ParallelSlotsSetup(const char *dbname, const char *host, const char *port,
     223             :                    const char *username, bool prompt_password,
     224             :                    const char *progname, bool echo,
     225             :                    PGconn *conn, int numslots)
     226             : {
     227             :     ParallelSlot *slots;
     228             :     int         i;
     229             : 
     230             :     Assert(conn != NULL);
     231             : 
     232         170 :     slots = (ParallelSlot *) pg_malloc(sizeof(ParallelSlot) * numslots);
     233         170 :     init_slot(slots, conn);
     234         170 :     if (numslots > 1)
     235             :     {
     236          16 :         for (i = 1; i < numslots; i++)
     237             :         {
     238           8 :             conn = connectDatabase(dbname, host, port, username, prompt_password,
     239             :                                    progname, echo, false, true);
     240             : 
     241             :             /*
     242             :              * Fail and exit immediately if trying to use a socket in an
     243             :              * unsupported range.  POSIX requires open(2) to use the lowest
     244             :              * unused file descriptor and the hint given relies on that.
     245             :              */
     246           8 :             if (PQsocket(conn) >= FD_SETSIZE)
     247             :             {
     248           0 :                 pg_log_fatal("too many jobs for this platform -- try %d", i);
     249           0 :                 exit(1);
     250             :             }
     251             : 
     252           8 :             init_slot(slots + i, conn);
     253             :         }
     254             :     }
     255             : 
     256         170 :     return slots;
     257             : }
     258             : 
     259             : /*
     260             :  * ParallelSlotsTerminate
     261             :  *      Clean up a set of parallel slots
     262             :  *
     263             :  * Iterate through all connections in a given set of ParallelSlots and
     264             :  * terminate all connections.
     265             :  */
     266             : void
     267         170 : ParallelSlotsTerminate(ParallelSlot *slots, int numslots)
     268             : {
     269             :     int         i;
     270             : 
     271         348 :     for (i = 0; i < numslots; i++)
     272             :     {
     273         178 :         PGconn     *conn = slots[i].connection;
     274             : 
     275         178 :         if (conn == NULL)
     276           0 :             continue;
     277             : 
     278         178 :         disconnectDatabase(conn);
     279             :     }
     280         170 : }
     281             : 
     282             : /*
     283             :  * ParallelSlotsWaitCompletion
     284             :  *
     285             :  * Wait for all connections to finish, returning false if at least one
     286             :  * error has been found on the way.
     287             :  */
     288             : bool
     289         170 : ParallelSlotsWaitCompletion(ParallelSlot *slots, int numslots)
     290             : {
     291             :     int         i;
     292             : 
     293         342 :     for (i = 0; i < numslots; i++)
     294             :     {
     295         178 :         if (!consumeQueryResult((slots + i)->connection))
     296           6 :             return false;
     297             :     }
     298             : 
     299         164 :     return true;
     300             : }

Generated by: LCOV version 1.13