LCOV - code coverage report
Current view: top level - src/bin/pg_dump - parallel.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 82.3 % 328 270
Test Date: 2026-03-03 14:15:12 Functions: 90.9 % 33 30
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * parallel.c
       4              :  *
       5              :  *  Parallel support for pg_dump and pg_restore
       6              :  *
       7              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
       8              :  * Portions Copyright (c) 1994, Regents of the University of California
       9              :  *
      10              :  * IDENTIFICATION
      11              :  *      src/bin/pg_dump/parallel.c
      12              :  *
      13              :  *-------------------------------------------------------------------------
      14              :  */
      15              : 
      16              : /*
      17              :  * Parallel operation works like this:
      18              :  *
      19              :  * The original, leader process calls ParallelBackupStart(), which forks off
      20              :  * the desired number of worker processes, which each enter WaitForCommands().
      21              :  *
      22              :  * The leader process dispatches an individual work item to one of the worker
      23              :  * processes in DispatchJobForTocEntry().  We send a command string such as
      24              :  * "DUMP 1234" or "RESTORE 1234", where 1234 is the TocEntry ID.
      25              :  * The worker process receives and decodes the command and passes it to the
      26              :  * routine pointed to by AH->WorkerJobDumpPtr or AH->WorkerJobRestorePtr,
      27              :  * which are routines of the current archive format.  That routine performs
      28              :  * the required action (dump or restore) and returns an integer status code.
      29              :  * This is passed back to the leader where we pass it to the
      30              :  * ParallelCompletionPtr callback function that was passed to
      31              :  * DispatchJobForTocEntry().  The callback function does state updating
      32              :  * for the leader control logic in pg_backup_archiver.c.
      33              :  *
      34              :  * In principle additional archive-format-specific information might be needed
      35              :  * in commands or worker status responses, but so far that hasn't proved
      36              :  * necessary, since workers have full copies of the ArchiveHandle/TocEntry
      37              :  * data structures.  Remember that we have forked off the workers only after
      38              :  * we have read in the catalog.  That's why our worker processes can also
      39              :  * access the catalog information.  (In the Windows case, the workers are
      40              :  * threads in the same process.  To avoid problems, they work with cloned
      41              :  * copies of the Archive data structure; see RunWorker().)
      42              :  *
      43              :  * In the leader process, the workerStatus field for each worker has one of
      44              :  * the following values:
      45              :  *      WRKR_NOT_STARTED: we've not yet forked this worker
      46              :  *      WRKR_IDLE: it's waiting for a command
      47              :  *      WRKR_WORKING: it's working on a command
      48              :  *      WRKR_TERMINATED: process ended
      49              :  * The pstate->te[] entry for each worker is valid when it's in WRKR_WORKING
      50              :  * state, and must be NULL in other states.
      51              :  */
      52              : 
      53              : #include "postgres_fe.h"
      54              : 
      55              : #ifndef WIN32
      56              : #include <sys/select.h>
      57              : #include <sys/wait.h>
      58              : #include <signal.h>
      59              : #include <unistd.h>
      60              : #include <fcntl.h>
      61              : #endif
      62              : 
      63              : #include "fe_utils/string_utils.h"
      64              : #include "parallel.h"
      65              : #include "pg_backup_utils.h"
      66              : #ifdef WIN32
      67              : #include "port/pg_bswap.h"
      68              : #endif
      69              : 
      70              : /* Mnemonic macros for indexing the fd array returned by pipe(2) */
      71              : #define PIPE_READ                           0
      72              : #define PIPE_WRITE                          1
      73              : 
      74              : #define NO_SLOT (-1)            /* Failure result for GetIdleWorker() */
      75              : 
      76              : /* Worker process statuses */
      77              : typedef enum
      78              : {
      79              :     WRKR_NOT_STARTED = 0,
      80              :     WRKR_IDLE,
      81              :     WRKR_WORKING,
      82              :     WRKR_TERMINATED,
      83              : } T_WorkerStatus;
      84              : 
      85              : #define WORKER_IS_RUNNING(workerStatus) \
      86              :     ((workerStatus) == WRKR_IDLE || (workerStatus) == WRKR_WORKING)
      87              : 
      88              : /*
      89              :  * Private per-parallel-worker state (typedef for this is in parallel.h).
      90              :  *
      91              :  * Much of this is valid only in the leader process (or, on Windows, should
      92              :  * be touched only by the leader thread).  But the AH field should be touched
      93              :  * only by workers.  The pipe descriptors are valid everywhere.
      94              :  */
      95              : struct ParallelSlot
      96              : {
      97              :     T_WorkerStatus workerStatus;    /* see enum above */
      98              : 
      99              :     /* These fields are valid if workerStatus == WRKR_WORKING: */
     100              :     ParallelCompletionPtr callback; /* function to call on completion */
     101              :     void       *callback_data;  /* passthrough data for it */
     102              : 
     103              :     ArchiveHandle *AH;          /* Archive data worker is using */
     104              : 
     105              :     int         pipeRead;       /* leader's end of the pipes */
     106              :     int         pipeWrite;
     107              :     int         pipeRevRead;    /* child's end of the pipes */
     108              :     int         pipeRevWrite;
     109              : 
     110              :     /* Child process/thread identity info: */
     111              : #ifdef WIN32
     112              :     uintptr_t   hThread;
     113              :     unsigned int threadId;
     114              : #else
     115              :     pid_t       pid;
     116              : #endif
     117              : };
     118              : 
     119              : #ifdef WIN32
     120              : 
     121              : /*
     122              :  * Structure to hold info passed by _beginthreadex() to the function it calls
     123              :  * via its single allowed argument.
     124              :  */
     125              : typedef struct
     126              : {
     127              :     ArchiveHandle *AH;          /* leader database connection */
     128              :     ParallelSlot *slot;         /* this worker's parallel slot */
     129              : } WorkerInfo;
     130              : 
     131              : /* Windows implementation of pipe access */
     132              : static int  pgpipe(int handles[2]);
     133              : #define piperead(a,b,c)     recv(a,b,c,0)
     134              : #define pipewrite(a,b,c)    send(a,b,c,0)
     135              : 
     136              : #else                           /* !WIN32 */
     137              : 
     138              : /* Non-Windows implementation of pipe access */
     139              : #define pgpipe(a)           pipe(a)
     140              : #define piperead(a,b,c)     read(a,b,c)
     141              : #define pipewrite(a,b,c)    write(a,b,c)
     142              : 
     143              : #endif                          /* WIN32 */
     144              : 
     145              : /*
     146              :  * State info for archive_close_connection() shutdown callback.
     147              :  */
     148              : typedef struct ShutdownInformation
     149              : {
     150              :     ParallelState *pstate;
     151              :     Archive    *AHX;
     152              : } ShutdownInformation;
     153              : 
     154              : static ShutdownInformation shutdown_info;
     155              : 
     156              : /*
     157              :  * State info for signal handling.
     158              :  * We assume signal_info initializes to zeroes.
     159              :  *
     160              :  * On Unix, myAH is the leader DB connection in the leader process, and the
     161              :  * worker's own connection in worker processes.  On Windows, we have only one
     162              :  * instance of signal_info, so myAH is the leader connection and the worker
     163              :  * connections must be dug out of pstate->parallelSlot[].
     164              :  */
     165              : typedef struct DumpSignalInformation
     166              : {
     167              :     ArchiveHandle *myAH;        /* database connection to issue cancel for */
     168              :     ParallelState *pstate;      /* parallel state, if any */
     169              :     bool        handler_set;    /* signal handler set up in this process? */
     170              : #ifndef WIN32
     171              :     bool        am_worker;      /* am I a worker process? */
     172              : #endif
     173              : } DumpSignalInformation;
     174              : 
     175              : static volatile DumpSignalInformation signal_info;
     176              : 
     177              : #ifdef WIN32
     178              : static CRITICAL_SECTION signal_info_lock;
     179              : #endif
     180              : 
     181              : /*
     182              :  * Write a simple string to stderr --- must be safe in a signal handler.
     183              :  * We ignore the write() result since there's not much we could do about it.
     184              :  * Certain compilers make that harder than it ought to be.
     185              :  */
     186              : #define write_stderr(str) \
     187              :     do { \
     188              :         const char *str_ = (str); \
     189              :         int     rc_; \
     190              :         rc_ = write(fileno(stderr), str_, strlen(str_)); \
     191              :         (void) rc_; \
     192              :     } while (0)
     193              : 
     194              : 
     195              : #ifdef WIN32
     196              : /* file-scope variables */
     197              : static DWORD tls_index;
     198              : 
     199              : /* globally visible variables (needed by exit_nicely) */
     200              : bool        parallel_init_done = false;
     201              : DWORD       mainThreadId;
     202              : #endif                          /* WIN32 */
     203              : 
     204              : /* Local function prototypes */
     205              : static ParallelSlot *GetMyPSlot(ParallelState *pstate);
     206              : static void archive_close_connection(int code, void *arg);
     207              : static void ShutdownWorkersHard(ParallelState *pstate);
     208              : static void WaitForTerminatingWorkers(ParallelState *pstate);
     209              : static void set_cancel_handler(void);
     210              : static void set_cancel_pstate(ParallelState *pstate);
     211              : static void set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH);
     212              : static void RunWorker(ArchiveHandle *AH, ParallelSlot *slot);
     213              : static int  GetIdleWorker(ParallelState *pstate);
     214              : static bool HasEveryWorkerTerminated(ParallelState *pstate);
     215              : static void lockTableForWorker(ArchiveHandle *AH, TocEntry *te);
     216              : static void WaitForCommands(ArchiveHandle *AH, int pipefd[2]);
     217              : static bool ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate,
     218              :                             bool do_wait);
     219              : static char *getMessageFromLeader(int pipefd[2]);
     220              : static void sendMessageToLeader(int pipefd[2], const char *str);
     221              : static int  select_loop(int maxFd, fd_set *workerset);
     222              : static char *getMessageFromWorker(ParallelState *pstate,
     223              :                                   bool do_wait, int *worker);
     224              : static void sendMessageToWorker(ParallelState *pstate,
     225              :                                 int worker, const char *str);
     226              : static char *readMessageFromPipe(int fd);
     227              : 
     228              : #define messageStartsWith(msg, prefix) \
     229              :     (strncmp(msg, prefix, strlen(prefix)) == 0)
     230              : 
     231              : 
     232              : /*
     233              :  * Initialize parallel dump support --- should be called early in process
     234              :  * startup.  (Currently, this is called whether or not we intend parallel
     235              :  * activity.)
     236              :  */
     237              : void
     238          498 : init_parallel_dump_utils(void)
     239              : {
     240              : #ifdef WIN32
     241              :     if (!parallel_init_done)
     242              :     {
     243              :         WSADATA     wsaData;
     244              :         int         err;
     245              : 
     246              :         /* Prepare for threaded operation */
     247              :         tls_index = TlsAlloc();
     248              :         mainThreadId = GetCurrentThreadId();
     249              : 
     250              :         /* Initialize socket access */
     251              :         err = WSAStartup(MAKEWORD(2, 2), &wsaData);
     252              :         if (err != 0)
     253              :             pg_fatal("%s() failed: error code %d", "WSAStartup", err);
     254              : 
     255              :         parallel_init_done = true;
     256              :     }
     257              : #endif
     258          498 : }
     259              : 
     260              : /*
     261              :  * Find the ParallelSlot for the current worker process or thread.
     262              :  *
     263              :  * Returns NULL if no matching slot is found (this implies we're the leader).
     264              :  */
     265              : static ParallelSlot *
     266            0 : GetMyPSlot(ParallelState *pstate)
     267              : {
     268              :     int         i;
     269              : 
     270            0 :     for (i = 0; i < pstate->numWorkers; i++)
     271              :     {
     272              : #ifdef WIN32
     273              :         if (pstate->parallelSlot[i].threadId == GetCurrentThreadId())
     274              : #else
     275            0 :         if (pstate->parallelSlot[i].pid == getpid())
     276              : #endif
     277            0 :             return &(pstate->parallelSlot[i]);
     278              :     }
     279              : 
     280            0 :     return NULL;
     281              : }
     282              : 
     283              : /*
     284              :  * A thread-local version of getLocalPQExpBuffer().
     285              :  *
     286              :  * Non-reentrant but reduces memory leakage: we'll consume one buffer per
     287              :  * thread, which is much better than one per fmtId/fmtQualifiedId call.
     288              :  */
     289              : #ifdef WIN32
     290              : static PQExpBuffer
     291              : getThreadLocalPQExpBuffer(void)
     292              : {
     293              :     /*
     294              :      * The Tls code goes awry if we use a static var, so we provide for both
     295              :      * static and auto, and omit any use of the static var when using Tls. We
     296              :      * rely on TlsGetValue() to return 0 if the value is not yet set.
     297              :      */
     298              :     static PQExpBuffer s_id_return = NULL;
     299              :     PQExpBuffer id_return;
     300              : 
     301              :     if (parallel_init_done)
     302              :         id_return = (PQExpBuffer) TlsGetValue(tls_index);
     303              :     else
     304              :         id_return = s_id_return;
     305              : 
     306              :     if (id_return)              /* first time through? */
     307              :     {
     308              :         /* same buffer, just wipe contents */
     309              :         resetPQExpBuffer(id_return);
     310              :     }
     311              :     else
     312              :     {
     313              :         /* new buffer */
     314              :         id_return = createPQExpBuffer();
     315              :         if (parallel_init_done)
     316              :             TlsSetValue(tls_index, id_return);
     317              :         else
     318              :             s_id_return = id_return;
     319              :     }
     320              : 
     321              :     return id_return;
     322              : }
     323              : #endif                          /* WIN32 */
     324              : 
     325              : /*
     326              :  * pg_dump and pg_restore call this to register the cleanup handler
     327              :  * as soon as they've created the ArchiveHandle.
     328              :  */
     329              : void
     330          348 : on_exit_close_archive(Archive *AHX)
     331              : {
     332          348 :     shutdown_info.AHX = AHX;
     333          348 :     on_exit_nicely(archive_close_connection, &shutdown_info);
     334          348 : }
     335              : 
     336              : /*
     337              :  * Update the archive handle in the on_exit callback registered by
     338              :  * on_exit_close_archive(). When pg_restore processes a pg_dumpall archive
     339              :  * containing multiple databases, each database is restored from a separate
     340              :  * archive. After closing one archive and opening the next, we update the
     341              :  * shutdown_info to reference the new archive handle so the cleanup callback
     342              :  * will close the correct archive on exit.
     343              :  */
     344              : void
     345           69 : replace_on_exit_close_archive(Archive *AHX)
     346              : {
     347           69 :     shutdown_info.AHX = AHX;
     348           69 : }
     349              : 
     350              : /*
     351              :  * on_exit_nicely handler for shutting down database connections and
     352              :  * worker processes cleanly.
     353              :  */
     354              : static void
     355          276 : archive_close_connection(int code, void *arg)
     356              : {
     357          276 :     ShutdownInformation *si = (ShutdownInformation *) arg;
     358              : 
     359          276 :     if (si->pstate)
     360              :     {
     361              :         /* In parallel mode, must figure out who we are */
     362            0 :         ParallelSlot *slot = GetMyPSlot(si->pstate);
     363              : 
     364            0 :         if (!slot)
     365              :         {
     366              :             /*
     367              :              * We're the leader.  Forcibly shut down workers, then close our
     368              :              * own database connection, if any.
     369              :              */
     370            0 :             ShutdownWorkersHard(si->pstate);
     371              : 
     372            0 :             if (si->AHX)
     373            0 :                 DisconnectDatabase(si->AHX);
     374              :         }
     375              :         else
     376              :         {
     377              :             /*
     378              :              * We're a worker.  Shut down our own DB connection if any.  On
     379              :              * Windows, we also have to close our communication sockets, to
     380              :              * emulate what will happen on Unix when the worker process exits.
     381              :              * (Without this, if this is a premature exit, the leader would
     382              :              * fail to detect it because there would be no EOF condition on
     383              :              * the other end of the pipe.)
     384              :              */
     385            0 :             if (slot->AH)
     386            0 :                 DisconnectDatabase(&(slot->AH->public));
     387              : 
     388              : #ifdef WIN32
     389              :             closesocket(slot->pipeRevRead);
     390              :             closesocket(slot->pipeRevWrite);
     391              : #endif
     392              :         }
     393              :     }
     394              :     else
     395              :     {
     396              :         /* Non-parallel operation: just kill the leader DB connection */
     397          276 :         if (si->AHX)
     398          276 :             DisconnectDatabase(si->AHX);
     399              :     }
     400          276 : }
     401              : 
     402              : /*
     403              :  * Forcibly shut down any remaining workers, waiting for them to finish.
     404              :  *
     405              :  * Note that we don't expect to come here during normal exit (the workers
     406              :  * should be long gone, and the ParallelState too).  We're only here in a
     407              :  * pg_fatal() situation, so intervening to cancel active commands is
     408              :  * appropriate.
     409              :  */
     410              : static void
     411            0 : ShutdownWorkersHard(ParallelState *pstate)
     412              : {
     413              :     int         i;
     414              : 
     415              :     /*
     416              :      * Close our write end of the sockets so that any workers waiting for
     417              :      * commands know they can exit.  (Note: some of the pipeWrite fields might
     418              :      * still be zero, if we failed to initialize all the workers.  Hence, just
     419              :      * ignore errors here.)
     420              :      */
     421            0 :     for (i = 0; i < pstate->numWorkers; i++)
     422            0 :         closesocket(pstate->parallelSlot[i].pipeWrite);
     423              : 
     424              :     /*
     425              :      * Force early termination of any commands currently in progress.
     426              :      */
     427              : #ifndef WIN32
     428              :     /* On non-Windows, send SIGTERM to each worker process. */
     429            0 :     for (i = 0; i < pstate->numWorkers; i++)
     430              :     {
     431            0 :         pid_t       pid = pstate->parallelSlot[i].pid;
     432              : 
     433            0 :         if (pid != 0)
     434            0 :             kill(pid, SIGTERM);
     435              :     }
     436              : #else
     437              : 
     438              :     /*
     439              :      * On Windows, send query cancels directly to the workers' backends.  Use
     440              :      * a critical section to ensure worker threads don't change state.
     441              :      */
     442              :     EnterCriticalSection(&signal_info_lock);
     443              :     for (i = 0; i < pstate->numWorkers; i++)
     444              :     {
     445              :         ArchiveHandle *AH = pstate->parallelSlot[i].AH;
     446              :         char        errbuf[1];
     447              : 
     448              :         if (AH != NULL && AH->connCancel != NULL)
     449              :             (void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf));
     450              :     }
     451              :     LeaveCriticalSection(&signal_info_lock);
     452              : #endif
     453              : 
     454              :     /* Now wait for them to terminate. */
     455            0 :     WaitForTerminatingWorkers(pstate);
     456            0 : }
     457              : 
     458              : /*
     459              :  * Wait for all workers to terminate.
     460              :  */
     461              : static void
     462           12 : WaitForTerminatingWorkers(ParallelState *pstate)
     463              : {
     464           38 :     while (!HasEveryWorkerTerminated(pstate))
     465              :     {
     466           26 :         ParallelSlot *slot = NULL;
     467              :         int         j;
     468              : 
     469              : #ifndef WIN32
     470              :         /* On non-Windows, use wait() to wait for next worker to end */
     471              :         int         status;
     472           26 :         pid_t       pid = wait(&status);
     473              : 
     474              :         /* Find dead worker's slot, and clear the PID field */
     475           42 :         for (j = 0; j < pstate->numWorkers; j++)
     476              :         {
     477           42 :             slot = &(pstate->parallelSlot[j]);
     478           42 :             if (slot->pid == pid)
     479              :             {
     480           26 :                 slot->pid = 0;
     481           26 :                 break;
     482              :             }
     483              :         }
     484              : #else                           /* WIN32 */
     485              :         /* On Windows, we must use WaitForMultipleObjects() */
     486              :         HANDLE     *lpHandles = pg_malloc_array(HANDLE, pstate->numWorkers);
     487              :         int         nrun = 0;
     488              :         DWORD       ret;
     489              :         uintptr_t   hThread;
     490              : 
     491              :         for (j = 0; j < pstate->numWorkers; j++)
     492              :         {
     493              :             if (WORKER_IS_RUNNING(pstate->parallelSlot[j].workerStatus))
     494              :             {
     495              :                 lpHandles[nrun] = (HANDLE) pstate->parallelSlot[j].hThread;
     496              :                 nrun++;
     497              :             }
     498              :         }
     499              :         ret = WaitForMultipleObjects(nrun, lpHandles, false, INFINITE);
     500              :         Assert(ret != WAIT_FAILED);
     501              :         hThread = (uintptr_t) lpHandles[ret - WAIT_OBJECT_0];
     502              :         free(lpHandles);
     503              : 
     504              :         /* Find dead worker's slot, and clear the hThread field */
     505              :         for (j = 0; j < pstate->numWorkers; j++)
     506              :         {
     507              :             slot = &(pstate->parallelSlot[j]);
     508              :             if (slot->hThread == hThread)
     509              :             {
     510              :                 /* For cleanliness, close handles for dead threads */
     511              :                 CloseHandle((HANDLE) slot->hThread);
     512              :                 slot->hThread = (uintptr_t) INVALID_HANDLE_VALUE;
     513              :                 break;
     514              :             }
     515              :         }
     516              : #endif                          /* WIN32 */
     517              : 
     518              :         /* On all platforms, update workerStatus and te[] as well */
     519              :         Assert(j < pstate->numWorkers);
     520           26 :         slot->workerStatus = WRKR_TERMINATED;
     521           26 :         pstate->te[j] = NULL;
     522              :     }
     523           12 : }
     524              : 
     525              : 
     526              : /*
     527              :  * Code for responding to cancel interrupts (SIGINT, control-C, etc)
     528              :  *
     529              :  * This doesn't quite belong in this module, but it needs access to the
     530              :  * ParallelState data, so there's not really a better place either.
     531              :  *
     532              :  * When we get a cancel interrupt, we could just die, but in pg_restore that
     533              :  * could leave a SQL command (e.g., CREATE INDEX on a large table) running
     534              :  * for a long time.  Instead, we try to send a cancel request and then die.
     535              :  * pg_dump probably doesn't really need this, but we might as well use it
     536              :  * there too.  Note that sending the cancel directly from the signal handler
     537              :  * is safe because PQcancel() is written to make it so.
     538              :  *
     539              :  * In parallel operation on Unix, each process is responsible for canceling
     540              :  * its own connection (this must be so because nobody else has access to it).
     541              :  * Furthermore, the leader process should attempt to forward its signal to
     542              :  * each child.  In simple manual use of pg_dump/pg_restore, forwarding isn't
     543              :  * needed because typing control-C at the console would deliver SIGINT to
     544              :  * every member of the terminal process group --- but in other scenarios it
     545              :  * might be that only the leader gets signaled.
     546              :  *
     547              :  * On Windows, the cancel handler runs in a separate thread, because that's
     548              :  * how SetConsoleCtrlHandler works.  We make it stop worker threads, send
     549              :  * cancels on all active connections, and then return FALSE, which will allow
     550              :  * the process to die.  For safety's sake, we use a critical section to
     551              :  * protect the PGcancel structures against being changed while the signal
     552              :  * thread runs.
     553              :  */
     554              : 
     555              : #ifndef WIN32
     556              : 
     557              : /*
     558              :  * Signal handler (Unix only)
     559              :  */
     560              : static void
     561            0 : sigTermHandler(SIGNAL_ARGS)
     562              : {
     563              :     int         i;
     564              :     char        errbuf[1];
     565              : 
     566              :     /*
     567              :      * Some platforms allow delivery of new signals to interrupt an active
     568              :      * signal handler.  That could muck up our attempt to send PQcancel, so
     569              :      * disable the signals that set_cancel_handler enabled.
     570              :      */
     571            0 :     pqsignal(SIGINT, SIG_IGN);
     572            0 :     pqsignal(SIGTERM, SIG_IGN);
     573            0 :     pqsignal(SIGQUIT, SIG_IGN);
     574              : 
     575              :     /*
     576              :      * If we're in the leader, forward signal to all workers.  (It seems best
     577              :      * to do this before PQcancel; killing the leader transaction will result
     578              :      * in invalid-snapshot errors from active workers, which maybe we can
     579              :      * quiet by killing workers first.)  Ignore any errors.
     580              :      */
     581            0 :     if (signal_info.pstate != NULL)
     582              :     {
     583            0 :         for (i = 0; i < signal_info.pstate->numWorkers; i++)
     584              :         {
     585            0 :             pid_t       pid = signal_info.pstate->parallelSlot[i].pid;
     586              : 
     587            0 :             if (pid != 0)
     588            0 :                 kill(pid, SIGTERM);
     589              :         }
     590              :     }
     591              : 
     592              :     /*
     593              :      * Send QueryCancel if we have a connection to send to.  Ignore errors,
     594              :      * there's not much we can do about them anyway.
     595              :      */
     596            0 :     if (signal_info.myAH != NULL && signal_info.myAH->connCancel != NULL)
     597            0 :         (void) PQcancel(signal_info.myAH->connCancel, errbuf, sizeof(errbuf));
     598              : 
     599              :     /*
     600              :      * Report we're quitting, using nothing more complicated than write(2).
     601              :      * When in parallel operation, only the leader process should do this.
     602              :      */
     603            0 :     if (!signal_info.am_worker)
     604              :     {
     605            0 :         if (progname)
     606              :         {
     607            0 :             write_stderr(progname);
     608            0 :             write_stderr(": ");
     609              :         }
     610            0 :         write_stderr("terminated by user\n");
     611              :     }
     612              : 
     613              :     /*
     614              :      * And die, using _exit() not exit() because the latter will invoke atexit
     615              :      * handlers that can fail if we interrupted related code.
     616              :      */
     617            0 :     _exit(1);
     618              : }
     619              : 
     620              : /*
     621              :  * Enable cancel interrupt handler, if not already done.
     622              :  */
     623              : static void
     624          743 : set_cancel_handler(void)
     625              : {
     626              :     /*
     627              :      * When forking, signal_info.handler_set will propagate into the new
     628              :      * process, but that's fine because the signal handler state does too.
     629              :      */
     630          743 :     if (!signal_info.handler_set)
     631              :     {
     632          300 :         signal_info.handler_set = true;
     633              : 
     634          300 :         pqsignal(SIGINT, sigTermHandler);
     635          300 :         pqsignal(SIGTERM, sigTermHandler);
     636          300 :         pqsignal(SIGQUIT, sigTermHandler);
     637              :     }
     638          743 : }
     639              : 
     640              : #else                           /* WIN32 */
     641              : 
     642              : /*
     643              :  * Console interrupt handler --- runs in a newly-started thread.
     644              :  *
     645              :  * After stopping other threads and sending cancel requests on all open
     646              :  * connections, we return FALSE which will allow the default ExitProcess()
     647              :  * action to be taken.
     648              :  */
     649              : static BOOL WINAPI
     650              : consoleHandler(DWORD dwCtrlType)
     651              : {
     652              :     int         i;
     653              :     char        errbuf[1];
     654              : 
     655              :     if (dwCtrlType == CTRL_C_EVENT ||
     656              :         dwCtrlType == CTRL_BREAK_EVENT)
     657              :     {
     658              :         /* Critical section prevents changing data we look at here */
     659              :         EnterCriticalSection(&signal_info_lock);
     660              : 
     661              :         /*
     662              :          * If in parallel mode, stop worker threads and send QueryCancel to
     663              :          * their connected backends.  The main point of stopping the worker
     664              :          * threads is to keep them from reporting the query cancels as errors,
     665              :          * which would clutter the user's screen.  We needn't stop the leader
     666              :          * thread since it won't be doing much anyway.  Do this before
     667              :          * canceling the main transaction, else we might get invalid-snapshot
     668              :          * errors reported before we can stop the workers.  Ignore errors,
     669              :          * there's not much we can do about them anyway.
     670              :          */
     671              :         if (signal_info.pstate != NULL)
     672              :         {
     673              :             for (i = 0; i < signal_info.pstate->numWorkers; i++)
     674              :             {
     675              :                 ParallelSlot *slot = &(signal_info.pstate->parallelSlot[i]);
     676              :                 ArchiveHandle *AH = slot->AH;
     677              :                 HANDLE      hThread = (HANDLE) slot->hThread;
     678              : 
     679              :                 /*
     680              :                  * Using TerminateThread here may leave some resources leaked,
     681              :                  * but it doesn't matter since we're about to end the whole
     682              :                  * process.
     683              :                  */
     684              :                 if (hThread != INVALID_HANDLE_VALUE)
     685              :                     TerminateThread(hThread, 0);
     686              : 
     687              :                 if (AH != NULL && AH->connCancel != NULL)
     688              :                     (void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf));
     689              :             }
     690              :         }
     691              : 
     692              :         /*
     693              :          * Send QueryCancel to leader connection, if enabled.  Ignore errors,
     694              :          * there's not much we can do about them anyway.
     695              :          */
     696              :         if (signal_info.myAH != NULL && signal_info.myAH->connCancel != NULL)
     697              :             (void) PQcancel(signal_info.myAH->connCancel,
     698              :                             errbuf, sizeof(errbuf));
     699              : 
     700              :         LeaveCriticalSection(&signal_info_lock);
     701              : 
     702              :         /*
     703              :          * Report we're quitting, using nothing more complicated than
     704              :          * write(2).  (We might be able to get away with using pg_log_*()
     705              :          * here, but since we terminated other threads uncleanly above, it
     706              :          * seems better to assume as little as possible.)
     707              :          */
     708              :         if (progname)
     709              :         {
     710              :             write_stderr(progname);
     711              :             write_stderr(": ");
     712              :         }
     713              :         write_stderr("terminated by user\n");
     714              :     }
     715              : 
     716              :     /* Always return FALSE to allow signal handling to continue */
     717              :     return FALSE;
     718              : }
     719              : 
     720              : /*
     721              :  * Enable cancel interrupt handler, if not already done.
     722              :  */
     723              : static void
     724              : set_cancel_handler(void)
     725              : {
     726              :     if (!signal_info.handler_set)
     727              :     {
     728              :         signal_info.handler_set = true;
     729              : 
     730              :         InitializeCriticalSection(&signal_info_lock);
     731              : 
     732              :         SetConsoleCtrlHandler(consoleHandler, TRUE);
     733              :     }
     734              : }
     735              : 
     736              : #endif                          /* WIN32 */
     737              : 
     738              : 
     739              : /*
     740              :  * set_archive_cancel_info
     741              :  *
     742              :  * Fill AH->connCancel with cancellation info for the specified database
     743              :  * connection; or clear it if conn is NULL.
     744              :  */
     745              : void
     746          743 : set_archive_cancel_info(ArchiveHandle *AH, PGconn *conn)
     747              : {
     748              :     PGcancel   *oldConnCancel;
     749              : 
     750              :     /*
     751              :      * Activate the interrupt handler if we didn't yet in this process.  On
     752              :      * Windows, this also initializes signal_info_lock; therefore it's
     753              :      * important that this happen at least once before we fork off any
     754              :      * threads.
     755              :      */
     756          743 :     set_cancel_handler();
     757              : 
     758              :     /*
     759              :      * On Unix, we assume that storing a pointer value is atomic with respect
     760              :      * to any possible signal interrupt.  On Windows, use a critical section.
     761              :      */
     762              : 
     763              : #ifdef WIN32
     764              :     EnterCriticalSection(&signal_info_lock);
     765              : #endif
     766              : 
     767              :     /* Free the old one if we have one */
     768          743 :     oldConnCancel = AH->connCancel;
     769              :     /* be sure interrupt handler doesn't use pointer while freeing */
     770          743 :     AH->connCancel = NULL;
     771              : 
     772          743 :     if (oldConnCancel != NULL)
     773          397 :         PQfreeCancel(oldConnCancel);
     774              : 
     775              :     /* Set the new one if specified */
     776          743 :     if (conn)
     777          399 :         AH->connCancel = PQgetCancel(conn);
     778              : 
     779              :     /*
     780              :      * On Unix, there's only ever one active ArchiveHandle per process, so we
     781              :      * can just set signal_info.myAH unconditionally.  On Windows, do that
     782              :      * only in the main thread; worker threads have to make sure their
     783              :      * ArchiveHandle appears in the pstate data, which is dealt with in
     784              :      * RunWorker().
     785              :      */
     786              : #ifndef WIN32
     787          743 :     signal_info.myAH = AH;
     788              : #else
     789              :     if (mainThreadId == GetCurrentThreadId())
     790              :         signal_info.myAH = AH;
     791              : #endif
     792              : 
     793              : #ifdef WIN32
     794              :     LeaveCriticalSection(&signal_info_lock);
     795              : #endif
     796          743 : }
     797              : 
     798              : /*
     799              :  * set_cancel_pstate
     800              :  *
     801              :  * Set signal_info.pstate to point to the specified ParallelState, if any.
     802              :  * We need this mainly to have an interlock against Windows signal thread.
     803              :  */
     804              : static void
     805           24 : set_cancel_pstate(ParallelState *pstate)
     806              : {
     807              : #ifdef WIN32
     808              :     EnterCriticalSection(&signal_info_lock);
     809              : #endif
     810              : 
     811           24 :     signal_info.pstate = pstate;
     812              : 
     813              : #ifdef WIN32
     814              :     LeaveCriticalSection(&signal_info_lock);
     815              : #endif
     816           24 : }
     817              : 
     818              : /*
     819              :  * set_cancel_slot_archive
     820              :  *
     821              :  * Set ParallelSlot's AH field to point to the specified archive, if any.
     822              :  * We need this mainly to have an interlock against Windows signal thread.
     823              :  */
     824              : static void
     825           52 : set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH)
     826              : {
     827              : #ifdef WIN32
     828              :     EnterCriticalSection(&signal_info_lock);
     829              : #endif
     830              : 
     831           52 :     slot->AH = AH;
     832              : 
     833              : #ifdef WIN32
     834              :     LeaveCriticalSection(&signal_info_lock);
     835              : #endif
     836           52 : }
     837              : 
     838              : 
     839              : /*
     840              :  * This function is called by both Unix and Windows variants to set up
     841              :  * and run a worker process.  Caller should exit the process (or thread)
     842              :  * upon return.
     843              :  */
     844              : static void
     845           26 : RunWorker(ArchiveHandle *AH, ParallelSlot *slot)
     846              : {
     847              :     int         pipefd[2];
     848              : 
     849              :     /* fetch child ends of pipes */
     850           26 :     pipefd[PIPE_READ] = slot->pipeRevRead;
     851           26 :     pipefd[PIPE_WRITE] = slot->pipeRevWrite;
     852              : 
     853              :     /*
     854              :      * Clone the archive so that we have our own state to work with, and in
     855              :      * particular our own database connection.
     856              :      *
     857              :      * We clone on Unix as well as Windows, even though technically we don't
     858              :      * need to because fork() gives us a copy in our own address space
     859              :      * already.  But CloneArchive resets the state information and also clones
     860              :      * the database connection which both seem kinda helpful.
     861              :      */
     862           26 :     AH = CloneArchive(AH);
     863              : 
     864              :     /* Remember cloned archive where signal handler can find it */
     865           26 :     set_cancel_slot_archive(slot, AH);
     866              : 
     867              :     /*
     868              :      * Call the setup worker function that's defined in the ArchiveHandle.
     869              :      */
     870           26 :     (AH->SetupWorkerPtr) ((Archive *) AH);
     871              : 
     872              :     /*
     873              :      * Execute commands until done.
     874              :      */
     875           26 :     WaitForCommands(AH, pipefd);
     876              : 
     877              :     /*
     878              :      * Disconnect from database and clean up.
     879              :      */
     880           26 :     set_cancel_slot_archive(slot, NULL);
     881           26 :     DisconnectDatabase(&(AH->public));
     882           26 :     DeCloneArchive(AH);
     883           26 : }
     884              : 
     885              : /*
     886              :  * Thread base function for Windows
     887              :  */
     888              : #ifdef WIN32
     889              : static unsigned __stdcall
     890              : init_spawned_worker_win32(WorkerInfo *wi)
     891              : {
     892              :     ArchiveHandle *AH = wi->AH;
     893              :     ParallelSlot *slot = wi->slot;
     894              : 
     895              :     /* Don't need WorkerInfo anymore */
     896              :     free(wi);
     897              : 
     898              :     /* Run the worker ... */
     899              :     RunWorker(AH, slot);
     900              : 
     901              :     /* Exit the thread */
     902              :     _endthreadex(0);
     903              :     return 0;
     904              : }
     905              : #endif                          /* WIN32 */
     906              : 
     907              : /*
     908              :  * This function starts a parallel dump or restore by spawning off the worker
     909              :  * processes.  For Windows, it creates a number of threads; on Unix the
     910              :  * workers are created with fork().
     911              :  */
     912              : ParallelState *
     913           61 : ParallelBackupStart(ArchiveHandle *AH)
     914              : {
     915              :     ParallelState *pstate;
     916              :     int         i;
     917              : 
     918              :     Assert(AH->public.numWorkers > 0);
     919              : 
     920           61 :     pstate = pg_malloc_object(ParallelState);
     921              : 
     922           61 :     pstate->numWorkers = AH->public.numWorkers;
     923           61 :     pstate->te = NULL;
     924           61 :     pstate->parallelSlot = NULL;
     925              : 
     926           61 :     if (AH->public.numWorkers == 1)
     927           49 :         return pstate;
     928              : 
     929              :     /* Create status arrays, being sure to initialize all fields to 0 */
     930           12 :     pstate->te =
     931           12 :         pg_malloc0_array(TocEntry *, pstate->numWorkers);
     932           12 :     pstate->parallelSlot =
     933           12 :         pg_malloc0_array(ParallelSlot, pstate->numWorkers);
     934              : 
     935              : #ifdef WIN32
     936              :     /* Make fmtId() and fmtQualifiedId() use thread-local storage */
     937              :     getLocalPQExpBuffer = getThreadLocalPQExpBuffer;
     938              : #endif
     939              : 
     940              :     /*
     941              :      * Set the pstate in shutdown_info, to tell the exit handler that it must
     942              :      * clean up workers as well as the main database connection.  But we don't
     943              :      * set this in signal_info yet, because we don't want child processes to
     944              :      * inherit non-NULL signal_info.pstate.
     945              :      */
     946           12 :     shutdown_info.pstate = pstate;
     947              : 
     948              :     /*
     949              :      * Temporarily disable query cancellation on the leader connection.  This
     950              :      * ensures that child processes won't inherit valid AH->connCancel
     951              :      * settings and thus won't try to issue cancels against the leader's
     952              :      * connection.  No harm is done if we fail while it's disabled, because
     953              :      * the leader connection is idle at this point anyway.
     954              :      */
     955           12 :     set_archive_cancel_info(AH, NULL);
     956              : 
     957              :     /* Ensure stdio state is quiesced before forking */
     958           12 :     fflush(NULL);
     959              : 
     960              :     /* Create desired number of workers */
     961           38 :     for (i = 0; i < pstate->numWorkers; i++)
     962              :     {
     963              : #ifdef WIN32
     964              :         WorkerInfo *wi;
     965              :         uintptr_t   handle;
     966              : #else
     967              :         pid_t       pid;
     968              : #endif
     969           26 :         ParallelSlot *slot = &(pstate->parallelSlot[i]);
     970              :         int         pipeMW[2],
     971              :                     pipeWM[2];
     972              : 
     973              :         /* Create communication pipes for this worker */
     974           26 :         if (pgpipe(pipeMW) < 0 || pgpipe(pipeWM) < 0)
     975            0 :             pg_fatal("could not create communication channels: %m");
     976              : 
     977              :         /* leader's ends of the pipes */
     978           26 :         slot->pipeRead = pipeWM[PIPE_READ];
     979           26 :         slot->pipeWrite = pipeMW[PIPE_WRITE];
     980              :         /* child's ends of the pipes */
     981           26 :         slot->pipeRevRead = pipeMW[PIPE_READ];
     982           26 :         slot->pipeRevWrite = pipeWM[PIPE_WRITE];
     983              : 
     984              : #ifdef WIN32
     985              :         /* Create transient structure to pass args to worker function */
     986              :         wi = pg_malloc_object(WorkerInfo);
     987              : 
     988              :         wi->AH = AH;
     989              :         wi->slot = slot;
     990              : 
     991              :         handle = _beginthreadex(NULL, 0, (void *) &init_spawned_worker_win32,
     992              :                                 wi, 0, &(slot->threadId));
     993              :         slot->hThread = handle;
     994              :         slot->workerStatus = WRKR_IDLE;
     995              : #else                           /* !WIN32 */
     996           26 :         pid = fork();
     997           52 :         if (pid == 0)
     998              :         {
     999              :             /* we are the worker */
    1000              :             int         j;
    1001              : 
    1002              :             /* this is needed for GetMyPSlot() */
    1003           26 :             slot->pid = getpid();
    1004              : 
    1005              :             /* instruct signal handler that we're in a worker now */
    1006           26 :             signal_info.am_worker = true;
    1007              : 
    1008              :             /* close read end of Worker -> Leader */
    1009           26 :             closesocket(pipeWM[PIPE_READ]);
    1010              :             /* close write end of Leader -> Worker */
    1011           26 :             closesocket(pipeMW[PIPE_WRITE]);
    1012              : 
    1013              :             /*
    1014              :              * Close all inherited fds for communication of the leader with
    1015              :              * previously-forked workers.
    1016              :              */
    1017           42 :             for (j = 0; j < i; j++)
    1018              :             {
    1019           16 :                 closesocket(pstate->parallelSlot[j].pipeRead);
    1020           16 :                 closesocket(pstate->parallelSlot[j].pipeWrite);
    1021              :             }
    1022              : 
    1023              :             /* Run the worker ... */
    1024           26 :             RunWorker(AH, slot);
    1025              : 
    1026              :             /* We can just exit(0) when done */
    1027           26 :             exit(0);
    1028              :         }
    1029           26 :         else if (pid < 0)
    1030              :         {
    1031              :             /* fork failed */
    1032            0 :             pg_fatal("could not create worker process: %m");
    1033              :         }
    1034              : 
    1035              :         /* In Leader after successful fork */
    1036           26 :         slot->pid = pid;
    1037           26 :         slot->workerStatus = WRKR_IDLE;
    1038              : 
    1039              :         /* close read end of Leader -> Worker */
    1040           26 :         closesocket(pipeMW[PIPE_READ]);
    1041              :         /* close write end of Worker -> Leader */
    1042           26 :         closesocket(pipeWM[PIPE_WRITE]);
    1043              : #endif                          /* WIN32 */
    1044              :     }
    1045              : 
    1046              :     /*
    1047              :      * Having forked off the workers, disable SIGPIPE so that leader isn't
    1048              :      * killed if it tries to send a command to a dead worker.  We don't want
    1049              :      * the workers to inherit this setting, though.
    1050              :      */
    1051              : #ifndef WIN32
    1052           12 :     pqsignal(SIGPIPE, SIG_IGN);
    1053              : #endif
    1054              : 
    1055              :     /*
    1056              :      * Re-establish query cancellation on the leader connection.
    1057              :      */
    1058           12 :     set_archive_cancel_info(AH, AH->connection);
    1059              : 
    1060              :     /*
    1061              :      * Tell the cancel signal handler to forward signals to worker processes,
    1062              :      * too.  (As with query cancel, we did not need this earlier because the
    1063              :      * workers have not yet been given anything to do; if we die before this
    1064              :      * point, any already-started workers will see EOF and quit promptly.)
    1065              :      */
    1066           12 :     set_cancel_pstate(pstate);
    1067              : 
    1068           12 :     return pstate;
    1069              : }
    1070              : 
    1071              : /*
    1072              :  * Close down a parallel dump or restore.
    1073              :  */
    1074              : void
    1075           61 : ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate)
    1076              : {
    1077              :     int         i;
    1078              : 
    1079              :     /* No work if non-parallel */
    1080           61 :     if (pstate->numWorkers == 1)
    1081           49 :         return;
    1082              : 
    1083              :     /* There should not be any unfinished jobs */
    1084              :     Assert(IsEveryWorkerIdle(pstate));
    1085              : 
    1086              :     /* Close the sockets so that the workers know they can exit */
    1087           38 :     for (i = 0; i < pstate->numWorkers; i++)
    1088              :     {
    1089           26 :         closesocket(pstate->parallelSlot[i].pipeRead);
    1090           26 :         closesocket(pstate->parallelSlot[i].pipeWrite);
    1091              :     }
    1092              : 
    1093              :     /* Wait for them to exit */
    1094           12 :     WaitForTerminatingWorkers(pstate);
    1095              : 
    1096              :     /*
    1097              :      * Unlink pstate from shutdown_info, so the exit handler will not try to
    1098              :      * use it; and likewise unlink from signal_info.
    1099              :      */
    1100           12 :     shutdown_info.pstate = NULL;
    1101           12 :     set_cancel_pstate(NULL);
    1102              : 
    1103              :     /* Release state (mere neatnik-ism, since we're about to terminate) */
    1104           12 :     free(pstate->te);
    1105           12 :     free(pstate->parallelSlot);
    1106           12 :     free(pstate);
    1107              : }
    1108              : 
    1109              : /*
    1110              :  * These next four functions handle construction and parsing of the command
    1111              :  * strings and response strings for parallel workers.
    1112              :  *
    1113              :  * Currently, these can be the same regardless of which archive format we are
    1114              :  * processing.  In future, we might want to let format modules override these
    1115              :  * functions to add format-specific data to a command or response.
    1116              :  */
    1117              : 
    1118              : /*
    1119              :  * buildWorkerCommand: format a command string to send to a worker.
    1120              :  *
    1121              :  * The string is built in the caller-supplied buffer of size buflen.
    1122              :  */
    1123              : static void
    1124          115 : buildWorkerCommand(ArchiveHandle *AH, TocEntry *te, T_Action act,
    1125              :                    char *buf, int buflen)
    1126              : {
    1127          115 :     if (act == ACT_DUMP)
    1128           69 :         snprintf(buf, buflen, "DUMP %d", te->dumpId);
    1129           46 :     else if (act == ACT_RESTORE)
    1130           46 :         snprintf(buf, buflen, "RESTORE %d", te->dumpId);
    1131              :     else
    1132              :         Assert(false);
    1133          115 : }
    1134              : 
    1135              : /*
    1136              :  * parseWorkerCommand: interpret a command string in a worker.
    1137              :  */
    1138              : static void
    1139          115 : parseWorkerCommand(ArchiveHandle *AH, TocEntry **te, T_Action *act,
    1140              :                    const char *msg)
    1141              : {
    1142              :     DumpId      dumpId;
    1143              :     int         nBytes;
    1144              : 
    1145          115 :     if (messageStartsWith(msg, "DUMP "))
    1146              :     {
    1147           69 :         *act = ACT_DUMP;
    1148           69 :         sscanf(msg, "DUMP %d%n", &dumpId, &nBytes);
    1149              :         Assert(nBytes == strlen(msg));
    1150           69 :         *te = getTocEntryByDumpId(AH, dumpId);
    1151              :         Assert(*te != NULL);
    1152              :     }
    1153           46 :     else if (messageStartsWith(msg, "RESTORE "))
    1154              :     {
    1155           46 :         *act = ACT_RESTORE;
    1156           46 :         sscanf(msg, "RESTORE %d%n", &dumpId, &nBytes);
    1157              :         Assert(nBytes == strlen(msg));
    1158           46 :         *te = getTocEntryByDumpId(AH, dumpId);
    1159              :         Assert(*te != NULL);
    1160              :     }
    1161              :     else
    1162            0 :         pg_fatal("unrecognized command received from leader: \"%s\"",
    1163              :                  msg);
    1164          115 : }
    1165              : 
    1166              : /*
    1167              :  * buildWorkerResponse: format a response string to send to the leader.
    1168              :  *
    1169              :  * The string is built in the caller-supplied buffer of size buflen.
    1170              :  */
    1171              : static void
    1172          115 : buildWorkerResponse(ArchiveHandle *AH, TocEntry *te, T_Action act, int status,
    1173              :                     char *buf, int buflen)
    1174              : {
    1175          115 :     snprintf(buf, buflen, "OK %d %d %d",
    1176              :              te->dumpId,
    1177              :              status,
    1178              :              status == WORKER_IGNORED_ERRORS ? AH->public.n_errors : 0);
    1179          115 : }
    1180              : 
    1181              : /*
    1182              :  * parseWorkerResponse: parse the status message returned by a worker.
    1183              :  *
    1184              :  * Returns the integer status code, and may update fields of AH and/or te.
    1185              :  */
    1186              : static int
    1187          115 : parseWorkerResponse(ArchiveHandle *AH, TocEntry *te,
    1188              :                     const char *msg)
    1189              : {
    1190              :     DumpId      dumpId;
    1191              :     int         nBytes,
    1192              :                 n_errors;
    1193          115 :     int         status = 0;
    1194              : 
    1195          115 :     if (messageStartsWith(msg, "OK "))
    1196              :     {
    1197          115 :         sscanf(msg, "OK %d %d %d%n", &dumpId, &status, &n_errors, &nBytes);
    1198              : 
    1199              :         Assert(dumpId == te->dumpId);
    1200              :         Assert(nBytes == strlen(msg));
    1201              : 
    1202          115 :         AH->public.n_errors += n_errors;
    1203              :     }
    1204              :     else
    1205            0 :         pg_fatal("invalid message received from worker: \"%s\"",
    1206              :                  msg);
    1207              : 
    1208          115 :     return status;
    1209              : }
    1210              : 
    1211              : /*
    1212              :  * Dispatch a job to some free worker.
    1213              :  *
    1214              :  * te is the TocEntry to be processed, act is the action to be taken on it.
    1215              :  * callback is the function to call on completion of the job.
    1216              :  *
    1217              :  * If no worker is currently available, this will block, and previously
    1218              :  * registered callback functions may be called.
    1219              :  */
    1220              : void
    1221          115 : DispatchJobForTocEntry(ArchiveHandle *AH,
    1222              :                        ParallelState *pstate,
    1223              :                        TocEntry *te,
    1224              :                        T_Action act,
    1225              :                        ParallelCompletionPtr callback,
    1226              :                        void *callback_data)
    1227              : {
    1228              :     int         worker;
    1229              :     char        buf[256];
    1230              : 
    1231              :     /* Get a worker, waiting if none are idle */
    1232          169 :     while ((worker = GetIdleWorker(pstate)) == NO_SLOT)
    1233           54 :         WaitForWorkers(AH, pstate, WFW_ONE_IDLE);
    1234              : 
    1235              :     /* Construct and send command string */
    1236          115 :     buildWorkerCommand(AH, te, act, buf, sizeof(buf));
    1237              : 
    1238          115 :     sendMessageToWorker(pstate, worker, buf);
    1239              : 
    1240              :     /* Remember worker is busy, and which TocEntry it's working on */
    1241          115 :     pstate->parallelSlot[worker].workerStatus = WRKR_WORKING;
    1242          115 :     pstate->parallelSlot[worker].callback = callback;
    1243          115 :     pstate->parallelSlot[worker].callback_data = callback_data;
    1244          115 :     pstate->te[worker] = te;
    1245          115 : }
    1246              : 
    1247              : /*
    1248              :  * Find an idle worker and return its slot number.
    1249              :  * Return NO_SLOT if none are idle.
    1250              :  */
    1251              : static int
    1252          259 : GetIdleWorker(ParallelState *pstate)
    1253              : {
    1254              :     int         i;
    1255              : 
    1256          625 :     for (i = 0; i < pstate->numWorkers; i++)
    1257              :     {
    1258          493 :         if (pstate->parallelSlot[i].workerStatus == WRKR_IDLE)
    1259          127 :             return i;
    1260              :     }
    1261          132 :     return NO_SLOT;
    1262              : }
    1263              : 
    1264              : /*
    1265              :  * Return true iff no worker is running.
    1266              :  */
    1267              : static bool
    1268           38 : HasEveryWorkerTerminated(ParallelState *pstate)
    1269              : {
    1270              :     int         i;
    1271              : 
    1272           75 :     for (i = 0; i < pstate->numWorkers; i++)
    1273              :     {
    1274           63 :         if (WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
    1275           26 :             return false;
    1276              :     }
    1277           12 :     return true;
    1278              : }
    1279              : 
    1280              : /*
    1281              :  * Return true iff every worker is in the WRKR_IDLE state.
    1282              :  */
    1283              : bool
    1284           45 : IsEveryWorkerIdle(ParallelState *pstate)
    1285              : {
    1286              :     int         i;
    1287              : 
    1288          101 :     for (i = 0; i < pstate->numWorkers; i++)
    1289              :     {
    1290           81 :         if (pstate->parallelSlot[i].workerStatus != WRKR_IDLE)
    1291           25 :             return false;
    1292              :     }
    1293           20 :     return true;
    1294              : }
    1295              : 
    1296              : /*
    1297              :  * Acquire lock on a table to be dumped by a worker process.
    1298              :  *
    1299              :  * The leader process is already holding an ACCESS SHARE lock.  Ordinarily
    1300              :  * it's no problem for a worker to get one too, but if anything else besides
    1301              :  * pg_dump is running, there's a possible deadlock:
    1302              :  *
    1303              :  * 1) Leader dumps the schema and locks all tables in ACCESS SHARE mode.
    1304              :  * 2) Another process requests an ACCESS EXCLUSIVE lock (which is not granted
    1305              :  *    because the leader holds a conflicting ACCESS SHARE lock).
    1306              :  * 3) A worker process also requests an ACCESS SHARE lock to read the table.
    1307              :  *    The worker is enqueued behind the ACCESS EXCLUSIVE lock request.
    1308              :  * 4) Now we have a deadlock, since the leader is effectively waiting for
    1309              :  *    the worker.  The server cannot detect that, however.
    1310              :  *
    1311              :  * To prevent an infinite wait, prior to touching a table in a worker, request
    1312              :  * a lock in ACCESS SHARE mode but with NOWAIT.  If we don't get the lock,
    1313              :  * then we know that somebody else has requested an ACCESS EXCLUSIVE lock and
    1314              :  * so we have a deadlock.  We must fail the backup in that case.
    1315              :  */
    1316              : static void
    1317           69 : lockTableForWorker(ArchiveHandle *AH, TocEntry *te)
    1318              : {
    1319              :     const char *qualId;
    1320              :     PQExpBuffer query;
    1321              :     PGresult   *res;
    1322              : 
    1323              :     /* Nothing to do for BLOBS */
    1324           69 :     if (strcmp(te->desc, "BLOBS") == 0)
    1325            4 :         return;
    1326              : 
    1327           65 :     query = createPQExpBuffer();
    1328              : 
    1329           65 :     qualId = fmtQualifiedId(te->namespace, te->tag);
    1330              : 
    1331           65 :     appendPQExpBuffer(query, "LOCK TABLE %s IN ACCESS SHARE MODE NOWAIT",
    1332              :                       qualId);
    1333              : 
    1334           65 :     res = PQexec(AH->connection, query->data);
    1335              : 
    1336           65 :     if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
    1337            0 :         pg_fatal("could not obtain lock on relation \"%s\"\n"
    1338              :                  "This usually means that someone requested an ACCESS EXCLUSIVE lock "
    1339              :                  "on the table after the pg_dump parent process had gotten the "
    1340              :                  "initial ACCESS SHARE lock on the table.", qualId);
    1341              : 
    1342           65 :     PQclear(res);
    1343           65 :     destroyPQExpBuffer(query);
    1344              : }
    1345              : 
    1346              : /*
    1347              :  * WaitForCommands: main routine for a worker process.
    1348              :  *
    1349              :  * Read and execute commands from the leader until we see EOF on the pipe.
    1350              :  */
    1351              : static void
    1352           26 : WaitForCommands(ArchiveHandle *AH, int pipefd[2])
    1353              : {
    1354              :     char       *command;
    1355              :     TocEntry   *te;
    1356              :     T_Action    act;
    1357           26 :     int         status = 0;
    1358              :     char        buf[256];
    1359              : 
    1360              :     for (;;)
    1361              :     {
    1362          141 :         if (!(command = getMessageFromLeader(pipefd)))
    1363              :         {
    1364              :             /* EOF, so done */
    1365           26 :             return;
    1366              :         }
    1367              : 
    1368              :         /* Decode the command */
    1369          115 :         parseWorkerCommand(AH, &te, &act, command);
    1370              : 
    1371          115 :         if (act == ACT_DUMP)
    1372              :         {
    1373              :             /* Acquire lock on this table within the worker's session */
    1374           69 :             lockTableForWorker(AH, te);
    1375              : 
    1376              :             /* Perform the dump command */
    1377           69 :             status = (AH->WorkerJobDumpPtr) (AH, te);
    1378              :         }
    1379           46 :         else if (act == ACT_RESTORE)
    1380              :         {
    1381              :             /* Perform the restore command */
    1382           46 :             status = (AH->WorkerJobRestorePtr) (AH, te);
    1383              :         }
    1384              :         else
    1385              :             Assert(false);
    1386              : 
    1387              :         /* Return status to leader */
    1388          115 :         buildWorkerResponse(AH, te, act, status, buf, sizeof(buf));
    1389              : 
    1390          115 :         sendMessageToLeader(pipefd, buf);
    1391              : 
    1392              :         /* command was pg_malloc'd and we are responsible for free()ing it. */
    1393          115 :         free(command);
    1394              :     }
    1395              : }
    1396              : 
    1397              : /*
    1398              :  * Check for status messages from workers.
    1399              :  *
    1400              :  * If do_wait is true, wait to get a status message; otherwise, just return
    1401              :  * immediately if there is none available.
    1402              :  *
    1403              :  * When we get a status message, we pass the status code to the callback
    1404              :  * function that was specified to DispatchJobForTocEntry, then reset the
    1405              :  * worker status to IDLE.
    1406              :  *
    1407              :  * Returns true if we collected a status message, else false.
    1408              :  *
    1409              :  * XXX is it worth checking for more than one status message per call?
    1410              :  * It seems somewhat unlikely that multiple workers would finish at exactly
    1411              :  * the same time.
    1412              :  */
    1413              : static bool
    1414          211 : ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait)
    1415              : {
    1416              :     int         worker;
    1417              :     char       *msg;
    1418              : 
    1419              :     /* Try to collect a status message */
    1420          211 :     msg = getMessageFromWorker(pstate, do_wait, &worker);
    1421              : 
    1422          211 :     if (!msg)
    1423              :     {
    1424              :         /* If do_wait is true, we must have detected EOF on some socket */
    1425           96 :         if (do_wait)
    1426            0 :             pg_fatal("a worker process died unexpectedly");
    1427           96 :         return false;
    1428              :     }
    1429              : 
    1430              :     /* Process it and update our idea of the worker's status */
    1431          115 :     if (messageStartsWith(msg, "OK "))
    1432              :     {
    1433          115 :         ParallelSlot *slot = &pstate->parallelSlot[worker];
    1434          115 :         TocEntry   *te = pstate->te[worker];
    1435              :         int         status;
    1436              : 
    1437          115 :         status = parseWorkerResponse(AH, te, msg);
    1438          115 :         slot->callback(AH, te, status, slot->callback_data);
    1439          115 :         slot->workerStatus = WRKR_IDLE;
    1440          115 :         pstate->te[worker] = NULL;
    1441              :     }
    1442              :     else
    1443            0 :         pg_fatal("invalid message received from worker: \"%s\"",
    1444              :                  msg);
    1445              : 
    1446              :     /* Free the string returned from getMessageFromWorker */
    1447          115 :     free(msg);
    1448              : 
    1449          115 :     return true;
    1450              : }
    1451              : 
    1452              : /*
    1453              :  * Check for status results from workers, waiting if necessary.
    1454              :  *
    1455              :  * Available wait modes are:
    1456              :  * WFW_NO_WAIT: reap any available status, but don't block
    1457              :  * WFW_GOT_STATUS: wait for at least one more worker to finish
    1458              :  * WFW_ONE_IDLE: wait for at least one worker to be idle
    1459              :  * WFW_ALL_IDLE: wait for all workers to be idle
    1460              :  *
    1461              :  * Any received results are passed to the callback specified to
    1462              :  * DispatchJobForTocEntry.
    1463              :  *
    1464              :  * This function is executed in the leader process.
    1465              :  */
    1466              : void
    1467          120 : WaitForWorkers(ArchiveHandle *AH, ParallelState *pstate, WFW_WaitOption mode)
    1468              : {
    1469          120 :     bool        do_wait = false;
    1470              : 
    1471              :     /*
    1472              :      * In GOT_STATUS mode, always block waiting for a message, since we can't
    1473              :      * return till we get something.  In other modes, we don't block the first
    1474              :      * time through the loop.
    1475              :      */
    1476          120 :     if (mode == WFW_GOT_STATUS)
    1477              :     {
    1478              :         /* Assert that caller knows what it's doing */
    1479              :         Assert(!IsEveryWorkerIdle(pstate));
    1480           12 :         do_wait = true;
    1481              :     }
    1482              : 
    1483              :     for (;;)
    1484              :     {
    1485              :         /*
    1486              :          * Check for status messages, even if we don't need to block.  We do
    1487              :          * not try very hard to reap all available messages, though, since
    1488              :          * there's unlikely to be more than one.
    1489              :          */
    1490          211 :         if (ListenToWorkers(AH, pstate, do_wait))
    1491              :         {
    1492              :             /*
    1493              :              * If we got a message, we are done by definition for GOT_STATUS
    1494              :              * mode, and we can also be certain that there's at least one idle
    1495              :              * worker.  So we're done in all but ALL_IDLE mode.
    1496              :              */
    1497          115 :             if (mode != WFW_ALL_IDLE)
    1498          100 :                 return;
    1499              :         }
    1500              : 
    1501              :         /* Check whether we must wait for new status messages */
    1502          111 :         switch (mode)
    1503              :         {
    1504            0 :             case WFW_NO_WAIT:
    1505            0 :                 return;         /* never wait */
    1506            0 :             case WFW_GOT_STATUS:
    1507              :                 Assert(false);  /* can't get here, because we waited */
    1508            0 :                 break;
    1509           90 :             case WFW_ONE_IDLE:
    1510           90 :                 if (GetIdleWorker(pstate) != NO_SLOT)
    1511           12 :                     return;
    1512           78 :                 break;
    1513           21 :             case WFW_ALL_IDLE:
    1514           21 :                 if (IsEveryWorkerIdle(pstate))
    1515            8 :                     return;
    1516           13 :                 break;
    1517              :         }
    1518              : 
    1519              :         /* Loop back, and this time wait for something to happen */
    1520           91 :         do_wait = true;
    1521              :     }
    1522              : }
    1523              : 
    1524              : /*
    1525              :  * Read one command message from the leader, blocking if necessary
    1526              :  * until one is available, and return it as a malloc'd string.
    1527              :  * On EOF, return NULL.
    1528              :  *
    1529              :  * This function is executed in worker processes.
    1530              :  */
    1531              : static char *
    1532          141 : getMessageFromLeader(int pipefd[2])
    1533              : {
    1534          141 :     return readMessageFromPipe(pipefd[PIPE_READ]);
    1535              : }
    1536              : 
    1537              : /*
    1538              :  * Send a status message to the leader.
    1539              :  *
    1540              :  * This function is executed in worker processes.
    1541              :  */
    1542              : static void
    1543          115 : sendMessageToLeader(int pipefd[2], const char *str)
    1544              : {
    1545          115 :     int         len = strlen(str) + 1;
    1546              : 
    1547          115 :     if (pipewrite(pipefd[PIPE_WRITE], str, len) != len)
    1548            0 :         pg_fatal("could not write to the communication channel: %m");
    1549          115 : }
    1550              : 
    1551              : /*
    1552              :  * Wait until some descriptor in "workerset" becomes readable.
    1553              :  * Returns -1 on error, else the number of readable descriptors.
    1554              :  */
    1555              : static int
    1556          103 : select_loop(int maxFd, fd_set *workerset)
    1557              : {
    1558              :     int         i;
    1559          103 :     fd_set      saveSet = *workerset;
    1560              : 
    1561              :     for (;;)
    1562              :     {
    1563          103 :         *workerset = saveSet;
    1564          103 :         i = select(maxFd + 1, workerset, NULL, NULL, NULL);
    1565              : 
    1566              : #ifndef WIN32
    1567          103 :         if (i < 0 && errno == EINTR)
    1568            0 :             continue;
    1569              : #else
    1570              :         if (i == SOCKET_ERROR && WSAGetLastError() == WSAEINTR)
    1571              :             continue;
    1572              : #endif
    1573          103 :         break;
    1574              :     }
    1575              : 
    1576          103 :     return i;
    1577              : }
    1578              : 
    1579              : 
    1580              : /*
    1581              :  * Check for messages from worker processes.
    1582              :  *
    1583              :  * If a message is available, return it as a malloc'd string, and put the
    1584              :  * index of the sending worker in *worker.
    1585              :  *
    1586              :  * If nothing is available, wait if "do_wait" is true, else return NULL.
    1587              :  *
    1588              :  * If we detect EOF on any socket, we'll return NULL.  It's not great that
    1589              :  * that's hard to distinguish from the no-data-available case, but for now
    1590              :  * our one caller is okay with that.
    1591              :  *
    1592              :  * This function is executed in the leader process.
    1593              :  */
    1594              : static char *
    1595          211 : getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker)
    1596              : {
    1597              :     int         i;
    1598              :     fd_set      workerset;
    1599          211 :     int         maxFd = -1;
    1600          211 :     struct timeval nowait = {0, 0};
    1601              : 
    1602              :     /* construct bitmap of socket descriptors for select() */
    1603         3587 :     FD_ZERO(&workerset);
    1604          715 :     for (i = 0; i < pstate->numWorkers; i++)
    1605              :     {
    1606          504 :         if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
    1607            0 :             continue;
    1608          504 :         FD_SET(pstate->parallelSlot[i].pipeRead, &workerset);
    1609          504 :         if (pstate->parallelSlot[i].pipeRead > maxFd)
    1610          504 :             maxFd = pstate->parallelSlot[i].pipeRead;
    1611              :     }
    1612              : 
    1613          211 :     if (do_wait)
    1614              :     {
    1615          103 :         i = select_loop(maxFd, &workerset);
    1616              :         Assert(i != 0);
    1617              :     }
    1618              :     else
    1619              :     {
    1620          108 :         if ((i = select(maxFd + 1, &workerset, NULL, NULL, &nowait)) == 0)
    1621           96 :             return NULL;
    1622              :     }
    1623              : 
    1624          115 :     if (i < 0)
    1625            0 :         pg_fatal("%s() failed: %m", "select");
    1626              : 
    1627          172 :     for (i = 0; i < pstate->numWorkers; i++)
    1628              :     {
    1629              :         char       *msg;
    1630              : 
    1631          172 :         if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
    1632            0 :             continue;
    1633          172 :         if (!FD_ISSET(pstate->parallelSlot[i].pipeRead, &workerset))
    1634           57 :             continue;
    1635              : 
    1636              :         /*
    1637              :          * Read the message if any.  If the socket is ready because of EOF,
    1638              :          * we'll return NULL instead (and the socket will stay ready, so the
    1639              :          * condition will persist).
    1640              :          *
    1641              :          * Note: because this is a blocking read, we'll wait if only part of
    1642              :          * the message is available.  Waiting a long time would be bad, but
    1643              :          * since worker status messages are short and are always sent in one
    1644              :          * operation, it shouldn't be a problem in practice.
    1645              :          */
    1646          115 :         msg = readMessageFromPipe(pstate->parallelSlot[i].pipeRead);
    1647          115 :         *worker = i;
    1648          115 :         return msg;
    1649              :     }
    1650              :     Assert(false);
    1651            0 :     return NULL;
    1652              : }
    1653              : 
    1654              : /*
    1655              :  * Send a command message to the specified worker process.
    1656              :  *
    1657              :  * This function is executed in the leader process.
    1658              :  */
    1659              : static void
    1660          115 : sendMessageToWorker(ParallelState *pstate, int worker, const char *str)
    1661              : {
    1662          115 :     int         len = strlen(str) + 1;
    1663              : 
    1664          115 :     if (pipewrite(pstate->parallelSlot[worker].pipeWrite, str, len) != len)
    1665              :     {
    1666            0 :         pg_fatal("could not write to the communication channel: %m");
    1667              :     }
    1668          115 : }
    1669              : 
    1670              : /*
    1671              :  * Read one message from the specified pipe (fd), blocking if necessary
    1672              :  * until one is available, and return it as a malloc'd string.
    1673              :  * On EOF, return NULL.
    1674              :  *
    1675              :  * A "message" on the channel is just a null-terminated string.
    1676              :  */
    1677              : static char *
    1678          256 : readMessageFromPipe(int fd)
    1679              : {
    1680              :     char       *msg;
    1681              :     int         msgsize,
    1682              :                 bufsize;
    1683              :     int         ret;
    1684              : 
    1685              :     /*
    1686              :      * In theory, if we let piperead() read multiple bytes, it might give us
    1687              :      * back fragments of multiple messages.  (That can't actually occur, since
    1688              :      * neither leader nor workers send more than one message without waiting
    1689              :      * for a reply, but we don't wish to assume that here.)  For simplicity,
    1690              :      * read a byte at a time until we get the terminating '\0'.  This method
    1691              :      * is a bit inefficient, but since this is only used for relatively short
    1692              :      * command and status strings, it shouldn't matter.
    1693              :      */
    1694          256 :     bufsize = 64;               /* could be any number */
    1695          256 :     msg = (char *) pg_malloc(bufsize);
    1696          256 :     msgsize = 0;
    1697              :     for (;;)
    1698              :     {
    1699         2438 :         Assert(msgsize < bufsize);
    1700         2694 :         ret = piperead(fd, msg + msgsize, 1);
    1701         2694 :         if (ret <= 0)
    1702           26 :             break;              /* error or connection closure */
    1703              : 
    1704              :         Assert(ret == 1);
    1705              : 
    1706         2668 :         if (msg[msgsize] == '\0')
    1707          230 :             return msg;         /* collected whole message */
    1708              : 
    1709         2438 :         msgsize++;
    1710         2438 :         if (msgsize == bufsize) /* enlarge buffer if needed */
    1711              :         {
    1712            0 :             bufsize += 16;      /* could be any number */
    1713            0 :             msg = (char *) pg_realloc(msg, bufsize);
    1714              :         }
    1715              :     }
    1716              : 
    1717              :     /* Other end has closed the connection */
    1718           26 :     pg_free(msg);
    1719           26 :     return NULL;
    1720              : }
    1721              : 
    1722              : #ifdef WIN32
    1723              : 
    1724              : /*
    1725              :  * This is a replacement version of pipe(2) for Windows which allows the pipe
    1726              :  * handles to be used in select().
    1727              :  *
    1728              :  * Reads and writes on the pipe must go through piperead()/pipewrite().
    1729              :  *
    1730              :  * For consistency with Unix we declare the returned handles as "int".
    1731              :  * This is okay even on WIN64 because system handles are not more than
    1732              :  * 32 bits wide, but we do have to do some casting.
    1733              :  */
    1734              : static int
    1735              : pgpipe(int handles[2])
    1736              : {
    1737              :     pgsocket    s,
    1738              :                 tmp_sock;
    1739              :     struct sockaddr_in serv_addr;
    1740              :     int         len = sizeof(serv_addr);
    1741              : 
    1742              :     /* We have to use the Unix socket invalid file descriptor value here. */
    1743              :     handles[0] = handles[1] = -1;
    1744              : 
    1745              :     /*
    1746              :      * setup listen socket
    1747              :      */
    1748              :     if ((s = socket(AF_INET, SOCK_STREAM, 0)) == PGINVALID_SOCKET)
    1749              :     {
    1750              :         pg_log_error("pgpipe: could not create socket: error code %d",
    1751              :                      WSAGetLastError());
    1752              :         return -1;
    1753              :     }
    1754              : 
    1755              :     memset(&serv_addr, 0, sizeof(serv_addr));
    1756              :     serv_addr.sin_family = AF_INET;
    1757              :     serv_addr.sin_port = pg_hton16(0);
    1758              :     serv_addr.sin_addr.s_addr = pg_hton32(INADDR_LOOPBACK);
    1759              :     if (bind(s, (SOCKADDR *) &serv_addr, len) == SOCKET_ERROR)
    1760              :     {
    1761              :         pg_log_error("pgpipe: could not bind: error code %d",
    1762              :                      WSAGetLastError());
    1763              :         closesocket(s);
    1764              :         return -1;
    1765              :     }
    1766              :     if (listen(s, 1) == SOCKET_ERROR)
    1767              :     {
    1768              :         pg_log_error("pgpipe: could not listen: error code %d",
    1769              :                      WSAGetLastError());
    1770              :         closesocket(s);
    1771              :         return -1;
    1772              :     }
    1773              :     if (getsockname(s, (SOCKADDR *) &serv_addr, &len) == SOCKET_ERROR)
    1774              :     {
    1775              :         pg_log_error("pgpipe: %s() failed: error code %d", "getsockname",
    1776              :                      WSAGetLastError());
    1777              :         closesocket(s);
    1778              :         return -1;
    1779              :     }
    1780              : 
    1781              :     /*
    1782              :      * setup pipe handles
    1783              :      */
    1784              :     if ((tmp_sock = socket(AF_INET, SOCK_STREAM, 0)) == PGINVALID_SOCKET)
    1785              :     {
    1786              :         pg_log_error("pgpipe: could not create second socket: error code %d",
    1787              :                      WSAGetLastError());
    1788              :         closesocket(s);
    1789              :         return -1;
    1790              :     }
    1791              :     handles[1] = (int) tmp_sock;
    1792              : 
    1793              :     if (connect(handles[1], (SOCKADDR *) &serv_addr, len) == SOCKET_ERROR)
    1794              :     {
    1795              :         pg_log_error("pgpipe: could not connect socket: error code %d",
    1796              :                      WSAGetLastError());
    1797              :         closesocket(handles[1]);
    1798              :         handles[1] = -1;
    1799              :         closesocket(s);
    1800              :         return -1;
    1801              :     }
    1802              :     if ((tmp_sock = accept(s, (SOCKADDR *) &serv_addr, &len)) == PGINVALID_SOCKET)
    1803              :     {
    1804              :         pg_log_error("pgpipe: could not accept connection: error code %d",
    1805              :                      WSAGetLastError());
    1806              :         closesocket(handles[1]);
    1807              :         handles[1] = -1;
    1808              :         closesocket(s);
    1809              :         return -1;
    1810              :     }
    1811              :     handles[0] = (int) tmp_sock;
    1812              : 
    1813              :     closesocket(s);
    1814              :     return 0;
    1815              : }
    1816              : 
    1817              : #endif                          /* WIN32 */
        

Generated by: LCOV version 2.0-1