LCOV - code coverage report
Current view: top level - src/bin/scripts - scripts_parallel.c (source / functions) Hit Total Coverage
Test: PostgreSQL 14devel Lines: 68 79 86.1 %
Date: 2020-11-27 12:05:55 Functions: 6 6 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  *  scripts_parallel.c
       4             :  *      Parallel support for bin/scripts/
       5             :  *
       6             :  *
       7             :  * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
       8             :  * Portions Copyright (c) 1994, Regents of the University of California
       9             :  *
      10             :  * src/bin/scripts/scripts_parallel.c
      11             :  *
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : 
      15             : #ifdef WIN32
      16             : #define FD_SETSIZE 1024         /* must set before winsock2.h is included */
      17             : #endif
      18             : 
      19             : #include "postgres_fe.h"
      20             : 
      21             : #ifdef HAVE_SYS_SELECT_H
      22             : #include <sys/select.h>
      23             : #endif
      24             : 
      25             : #include "common.h"
      26             : #include "common/logging.h"
      27             : #include "fe_utils/cancel.h"
      28             : #include "scripts_parallel.h"
      29             : 
      30             : static void init_slot(ParallelSlot *slot, PGconn *conn);
      31             : static int  select_loop(int maxFd, fd_set *workerset);
      32             : 
      33             : static void
      34         146 : init_slot(ParallelSlot *slot, PGconn *conn)
      35             : {
      36         146 :     slot->connection = conn;
      37             :     /* Initially assume connection is idle */
      38         146 :     slot->isFree = true;
      39         146 : }
      40             : 
      41             : /*
      42             :  * Wait until a file descriptor from the given set becomes readable.
      43             :  *
      44             :  * Returns the number of ready descriptors, or -1 on failure (including
      45             :  * getting a cancel request).
      46             :  */
      47             : static int
      48        4438 : select_loop(int maxFd, fd_set *workerset)
      49             : {
      50             :     int         i;
      51        4438 :     fd_set      saveSet = *workerset;
      52             : 
      53        4438 :     if (CancelRequested)
      54           0 :         return -1;
      55             : 
      56             :     for (;;)
      57           0 :     {
      58             :         /*
      59             :          * On Windows, we need to check once in a while for cancel requests;
      60             :          * on other platforms we rely on select() returning when interrupted.
      61             :          */
      62             :         struct timeval *tvp;
      63             : #ifdef WIN32
      64             :         struct timeval tv = {0, 1000000};
      65             : 
      66             :         tvp = &tv;
      67             : #else
      68        4438 :         tvp = NULL;
      69             : #endif
      70             : 
      71        4438 :         *workerset = saveSet;
      72        4438 :         i = select(maxFd + 1, workerset, NULL, NULL, tvp);
      73             : 
      74             : #ifdef WIN32
      75             :         if (i == SOCKET_ERROR)
      76             :         {
      77             :             i = -1;
      78             : 
      79             :             if (WSAGetLastError() == WSAEINTR)
      80             :                 errno = EINTR;
      81             :         }
      82             : #endif
      83             : 
      84        4438 :         if (i < 0 && errno == EINTR)
      85           0 :             continue;           /* ignore this */
      86        4438 :         if (i < 0 || CancelRequested)
      87           0 :             return -1;          /* but not this */
      88        4438 :         if (i == 0)
      89           0 :             continue;           /* timeout (Win32 only) */
      90        4438 :         break;
      91             :     }
      92             : 
      93        4438 :     return i;
      94             : }
      95             : 
      96             : /*
      97             :  * ParallelSlotsGetIdle
      98             :  *      Return a connection slot that is ready to execute a command.
      99             :  *
     100             :  * This returns the first slot we find that is marked isFree, if one is;
     101             :  * otherwise, we loop on select() until one socket becomes available.  When
     102             :  * this happens, we read the whole set and mark as free all sockets that
     103             :  * become available.  If an error occurs, NULL is returned.
     104             :  */
     105             : ParallelSlot *
     106        4584 : ParallelSlotsGetIdle(ParallelSlot *slots, int numslots)
     107             : {
     108             :     int         i;
     109        4584 :     int         firstFree = -1;
     110             : 
     111             :     /*
     112             :      * Look for any connection currently free.  If there is one, mark it as
     113             :      * taken and let the caller know the slot to use.
     114             :      */
     115        9178 :     for (i = 0; i < numslots; i++)
     116             :     {
     117        4740 :         if (slots[i].isFree)
     118             :         {
     119         146 :             slots[i].isFree = false;
     120         146 :             return slots + i;
     121             :         }
     122             :     }
     123             : 
     124             :     /*
     125             :      * No free slot found, so wait until one of the connections has finished
     126             :      * its task and return the available slot.
     127             :      */
     128        8876 :     while (firstFree < 0)
     129             :     {
     130             :         fd_set      slotset;
     131        4438 :         int         maxFd = 0;
     132             : 
     133             :         /* We must reconstruct the fd_set for each call to select_loop */
     134        4438 :         FD_ZERO(&slotset);
     135             : 
     136        9024 :         for (i = 0; i < numslots; i++)
     137             :         {
     138        4586 :             int         sock = PQsocket(slots[i].connection);
     139             : 
     140             :             /*
     141             :              * We don't really expect any connections to lose their sockets
     142             :              * after startup, but just in case, cope by ignoring them.
     143             :              */
     144        4586 :             if (sock < 0)
     145           0 :                 continue;
     146             : 
     147        4586 :             FD_SET(sock, &slotset);
     148        4586 :             if (sock > maxFd)
     149        4586 :                 maxFd = sock;
     150             :         }
     151             : 
     152        4438 :         SetCancelConn(slots->connection);
     153        4438 :         i = select_loop(maxFd, &slotset);
     154        4438 :         ResetCancelConn();
     155             : 
     156             :         /* failure? */
     157        4438 :         if (i < 0)
     158           0 :             return NULL;
     159             : 
     160        9024 :         for (i = 0; i < numslots; i++)
     161             :         {
     162        4586 :             int         sock = PQsocket(slots[i].connection);
     163             : 
     164        4586 :             if (sock >= 0 && FD_ISSET(sock, &slotset))
     165             :             {
     166             :                 /* select() says input is available, so consume it */
     167        4438 :                 PQconsumeInput(slots[i].connection);
     168             :             }
     169             : 
     170             :             /* Collect result(s) as long as any are available */
     171        9024 :             while (!PQisBusy(slots[i].connection))
     172             :             {
     173        8876 :                 PGresult   *result = PQgetResult(slots[i].connection);
     174             : 
     175        8876 :                 if (result != NULL)
     176             :                 {
     177             :                     /* Check and discard the command result */
     178        4438 :                     if (!processQueryResult(slots[i].connection, result))
     179           0 :                         return NULL;
     180             :                 }
     181             :                 else
     182             :                 {
     183             :                     /* This connection has become idle */
     184        4438 :                     slots[i].isFree = true;
     185        4438 :                     if (firstFree < 0)
     186        4438 :                         firstFree = i;
     187        4438 :                     break;
     188             :                 }
     189             :             }
     190             :         }
     191             :     }
     192             : 
     193        4438 :     slots[firstFree].isFree = false;
     194        4438 :     return slots + firstFree;
     195             : }
     196             : 
     197             : /*
     198             :  * ParallelSlotsSetup
     199             :  *      Prepare a set of parallel slots to use on a given database.
     200             :  *
     201             :  * This creates and initializes a set of connections to the database
     202             :  * using the information given by the caller, marking all parallel slots
     203             :  * as free and ready to use.  "conn" is an initial connection set up
     204             :  * by the caller and is associated with the first slot in the parallel
     205             :  * set.
     206             :  */
     207             : ParallelSlot *
     208         138 : ParallelSlotsSetup(const ConnParams *cparams,
     209             :                    const char *progname, bool echo,
     210             :                    PGconn *conn, int numslots)
     211             : {
     212             :     ParallelSlot *slots;
     213             :     int         i;
     214             : 
     215             :     Assert(conn != NULL);
     216             : 
     217         138 :     slots = (ParallelSlot *) pg_malloc(sizeof(ParallelSlot) * numslots);
     218         138 :     init_slot(slots, conn);
     219         138 :     if (numslots > 1)
     220             :     {
     221          16 :         for (i = 1; i < numslots; i++)
     222             :         {
     223           8 :             conn = connectDatabase(cparams, progname, echo, false, true);
     224             : 
     225             :             /*
     226             :              * Fail and exit immediately if trying to use a socket in an
     227             :              * unsupported range.  POSIX requires open(2) to use the lowest
     228             :              * unused file descriptor and the hint given relies on that.
     229             :              */
     230           8 :             if (PQsocket(conn) >= FD_SETSIZE)
     231             :             {
     232           0 :                 pg_log_fatal("too many jobs for this platform -- try %d", i);
     233           0 :                 exit(1);
     234             :             }
     235             : 
     236           8 :             init_slot(slots + i, conn);
     237             :         }
     238             :     }
     239             : 
     240         138 :     return slots;
     241             : }
     242             : 
     243             : /*
     244             :  * ParallelSlotsTerminate
     245             :  *      Clean up a set of parallel slots
     246             :  *
     247             :  * Iterate through all connections in a given set of ParallelSlots and
     248             :  * terminate all connections.
     249             :  */
     250             : void
     251         138 : ParallelSlotsTerminate(ParallelSlot *slots, int numslots)
     252             : {
     253             :     int         i;
     254             : 
     255         284 :     for (i = 0; i < numslots; i++)
     256             :     {
     257         146 :         PGconn     *conn = slots[i].connection;
     258             : 
     259         146 :         if (conn == NULL)
     260           0 :             continue;
     261             : 
     262         146 :         disconnectDatabase(conn);
     263             :     }
     264         138 : }
     265             : 
     266             : /*
     267             :  * ParallelSlotsWaitCompletion
     268             :  *
     269             :  * Wait for all connections to finish, returning false if at least one
     270             :  * error has been found on the way.
     271             :  */
     272             : bool
     273         138 : ParallelSlotsWaitCompletion(ParallelSlot *slots, int numslots)
     274             : {
     275             :     int         i;
     276             : 
     277         278 :     for (i = 0; i < numslots; i++)
     278             :     {
     279         146 :         if (!consumeQueryResult((slots + i)->connection))
     280           6 :             return false;
     281             :     }
     282             : 
     283         132 :     return true;
     284             : }

Generated by: LCOV version 1.13