LCOV - code coverage report
Current view: top level - src/bin/pg_dump - parallel.c (source / functions) Hit Total Coverage
Test: PostgreSQL 13beta1 Lines: 267 325 82.2 %
Date: 2020-05-31 23:07:13 Functions: 29 32 90.6 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * parallel.c
       4             :  *
       5             :  *  Parallel support for pg_dump and pg_restore
       6             :  *
       7             :  * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
       8             :  * Portions Copyright (c) 1994, Regents of the University of California
       9             :  *
      10             :  * IDENTIFICATION
      11             :  *      src/bin/pg_dump/parallel.c
      12             :  *
      13             :  *-------------------------------------------------------------------------
      14             :  */
      15             : 
      16             : /*
      17             :  * Parallel operation works like this:
      18             :  *
      19             :  * The original, master process calls ParallelBackupStart(), which forks off
      20             :  * the desired number of worker processes, which each enter WaitForCommands().
      21             :  *
      22             :  * The master process dispatches an individual work item to one of the worker
      23             :  * processes in DispatchJobForTocEntry().  We send a command string such as
      24             :  * "DUMP 1234" or "RESTORE 1234", where 1234 is the TocEntry ID.
      25             :  * The worker process receives and decodes the command and passes it to the
      26             :  * routine pointed to by AH->WorkerJobDumpPtr or AH->WorkerJobRestorePtr,
      27             :  * which are routines of the current archive format.  That routine performs
      28             :  * the required action (dump or restore) and returns an integer status code.
      29             :  * This is passed back to the master where we pass it to the
      30             :  * ParallelCompletionPtr callback function that was passed to
      31             :  * DispatchJobForTocEntry().  The callback function does state updating
      32             :  * for the master control logic in pg_backup_archiver.c.
      33             :  *
      34             :  * In principle additional archive-format-specific information might be needed
      35             :  * in commands or worker status responses, but so far that hasn't proved
      36             :  * necessary, since workers have full copies of the ArchiveHandle/TocEntry
      37             :  * data structures.  Remember that we have forked off the workers only after
      38             :  * we have read in the catalog.  That's why our worker processes can also
      39             :  * access the catalog information.  (In the Windows case, the workers are
      40             :  * threads in the same process.  To avoid problems, they work with cloned
      41             :  * copies of the Archive data structure; see RunWorker().)
      42             :  *
      43             :  * In the master process, the workerStatus field for each worker has one of
      44             :  * the following values:
      45             :  *      WRKR_NOT_STARTED: we've not yet forked this worker
      46             :  *      WRKR_IDLE: it's waiting for a command
      47             :  *      WRKR_WORKING: it's working on a command
      48             :  *      WRKR_TERMINATED: process ended
      49             :  * The pstate->te[] entry for each worker is valid when it's in WRKR_WORKING
      50             :  * state, and must be NULL in other states.
      51             :  */
      52             : 
      53             : #include "postgres_fe.h"
      54             : 
      55             : #ifndef WIN32
      56             : #include <sys/wait.h>
      57             : #include <signal.h>
      58             : #include <unistd.h>
      59             : #include <fcntl.h>
      60             : #endif
      61             : #ifdef HAVE_SYS_SELECT_H
      62             : #include <sys/select.h>
      63             : #endif
      64             : 
      65             : #include "fe_utils/string_utils.h"
      66             : #include "parallel.h"
      67             : #include "pg_backup_utils.h"
      68             : #include "port/pg_bswap.h"
      69             : 
      70             : /* Mnemonic macros for indexing the fd array returned by pipe(2) */
      71             : #define PIPE_READ                           0
      72             : #define PIPE_WRITE                          1
      73             : 
      74             : #define NO_SLOT (-1)            /* Failure result for GetIdleWorker() */
      75             : 
      76             : /* Worker process statuses */
      77             : typedef enum
      78             : {
      79             :     WRKR_NOT_STARTED = 0,
      80             :     WRKR_IDLE,
      81             :     WRKR_WORKING,
      82             :     WRKR_TERMINATED
      83             : } T_WorkerStatus;
      84             : 
      85             : #define WORKER_IS_RUNNING(workerStatus) \
      86             :     ((workerStatus) == WRKR_IDLE || (workerStatus) == WRKR_WORKING)
      87             : 
      88             : /*
      89             :  * Private per-parallel-worker state (typedef for this is in parallel.h).
      90             :  *
      91             :  * Much of this is valid only in the master process (or, on Windows, should
      92             :  * be touched only by the master thread).  But the AH field should be touched
      93             :  * only by workers.  The pipe descriptors are valid everywhere.
      94             :  */
      95             : struct ParallelSlot
      96             : {
      97             :     T_WorkerStatus workerStatus;    /* see enum above */
      98             : 
      99             :     /* These fields are valid if workerStatus == WRKR_WORKING: */
     100             :     ParallelCompletionPtr callback; /* function to call on completion */
     101             :     void       *callback_data;  /* passthrough data for it */
     102             : 
     103             :     ArchiveHandle *AH;          /* Archive data worker is using */
     104             : 
     105             :     int         pipeRead;       /* master's end of the pipes */
     106             :     int         pipeWrite;
     107             :     int         pipeRevRead;    /* child's end of the pipes */
     108             :     int         pipeRevWrite;
     109             : 
     110             :     /* Child process/thread identity info: */
     111             : #ifdef WIN32
     112             :     uintptr_t   hThread;
     113             :     unsigned int threadId;
     114             : #else
     115             :     pid_t       pid;
     116             : #endif
     117             : };
     118             : 
     119             : #ifdef WIN32
     120             : 
     121             : /*
     122             :  * Structure to hold info passed by _beginthreadex() to the function it calls
     123             :  * via its single allowed argument.
     124             :  */
     125             : typedef struct
     126             : {
     127             :     ArchiveHandle *AH;          /* master database connection */
     128             :     ParallelSlot *slot;         /* this worker's parallel slot */
     129             : } WorkerInfo;
     130             : 
     131             : /* Windows implementation of pipe access */
     132             : static int  pgpipe(int handles[2]);
     133             : static int  piperead(int s, char *buf, int len);
     134             : #define pipewrite(a,b,c)    send(a,b,c,0)
     135             : 
     136             : #else                           /* !WIN32 */
     137             : 
     138             : /* Non-Windows implementation of pipe access */
     139             : #define pgpipe(a)           pipe(a)
     140             : #define piperead(a,b,c)     read(a,b,c)
     141             : #define pipewrite(a,b,c)    write(a,b,c)
     142             : 
     143             : #endif                          /* WIN32 */
     144             : 
     145             : /*
     146             :  * State info for archive_close_connection() shutdown callback.
     147             :  */
     148             : typedef struct ShutdownInformation
     149             : {
     150             :     ParallelState *pstate;
     151             :     Archive    *AHX;
     152             : } ShutdownInformation;
     153             : 
     154             : static ShutdownInformation shutdown_info;
     155             : 
     156             : /*
     157             :  * State info for signal handling.
     158             :  * We assume signal_info initializes to zeroes.
     159             :  *
     160             :  * On Unix, myAH is the master DB connection in the master process, and the
     161             :  * worker's own connection in worker processes.  On Windows, we have only one
     162             :  * instance of signal_info, so myAH is the master connection and the worker
     163             :  * connections must be dug out of pstate->parallelSlot[].
     164             :  */
     165             : typedef struct DumpSignalInformation
     166             : {
     167             :     ArchiveHandle *myAH;        /* database connection to issue cancel for */
     168             :     ParallelState *pstate;      /* parallel state, if any */
     169             :     bool        handler_set;    /* signal handler set up in this process? */
     170             : #ifndef WIN32
     171             :     bool        am_worker;      /* am I a worker process? */
     172             : #endif
     173             : } DumpSignalInformation;
     174             : 
     175             : static volatile DumpSignalInformation signal_info;
     176             : 
     177             : #ifdef WIN32
     178             : static CRITICAL_SECTION signal_info_lock;
     179             : #endif
     180             : 
     181             : /*
     182             :  * Write a simple string to stderr --- must be safe in a signal handler.
     183             :  * We ignore the write() result since there's not much we could do about it.
     184             :  * Certain compilers make that harder than it ought to be.
     185             :  */
     186             : #define write_stderr(str) \
     187             :     do { \
     188             :         const char *str_ = (str); \
     189             :         int     rc_; \
     190             :         rc_ = write(fileno(stderr), str_, strlen(str_)); \
     191             :         (void) rc_; \
     192             :     } while (0)
     193             : 
     194             : 
     195             : #ifdef WIN32
     196             : /* file-scope variables */
     197             : static DWORD tls_index;
     198             : 
     199             : /* globally visible variables (needed by exit_nicely) */
     200             : bool        parallel_init_done = false;
     201             : DWORD       mainThreadId;
     202             : #endif                          /* WIN32 */
     203             : 
     204             : /* Local function prototypes */
     205             : static ParallelSlot *GetMyPSlot(ParallelState *pstate);
     206             : static void archive_close_connection(int code, void *arg);
     207             : static void ShutdownWorkersHard(ParallelState *pstate);
     208             : static void WaitForTerminatingWorkers(ParallelState *pstate);
     209             : static void setup_cancel_handler(void);
     210             : static void set_cancel_pstate(ParallelState *pstate);
     211             : static void set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH);
     212             : static void RunWorker(ArchiveHandle *AH, ParallelSlot *slot);
     213             : static int  GetIdleWorker(ParallelState *pstate);
     214             : static bool HasEveryWorkerTerminated(ParallelState *pstate);
     215             : static void lockTableForWorker(ArchiveHandle *AH, TocEntry *te);
     216             : static void WaitForCommands(ArchiveHandle *AH, int pipefd[2]);
     217             : static bool ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate,
     218             :                             bool do_wait);
     219             : static char *getMessageFromMaster(int pipefd[2]);
     220             : static void sendMessageToMaster(int pipefd[2], const char *str);
     221             : static int  select_loop(int maxFd, fd_set *workerset);
     222             : static char *getMessageFromWorker(ParallelState *pstate,
     223             :                                   bool do_wait, int *worker);
     224             : static void sendMessageToWorker(ParallelState *pstate,
     225             :                                 int worker, const char *str);
     226             : static char *readMessageFromPipe(int fd);
     227             : 
     228             : #define messageStartsWith(msg, prefix) \
     229             :     (strncmp(msg, prefix, strlen(prefix)) == 0)
     230             : 
     231             : 
     232             : /*
     233             :  * Shutdown callback to clean up socket access
     234             :  */
     235             : #ifdef WIN32
     236             : static void
     237             : shutdown_parallel_dump_utils(int code, void *unused)
     238             : {
     239             :     /* Call the cleanup function only from the main thread */
     240             :     if (mainThreadId == GetCurrentThreadId())
     241             :         WSACleanup();
     242             : }
     243             : #endif
     244             : 
     245             : /*
     246             :  * Initialize parallel dump support --- should be called early in process
     247             :  * startup.  (Currently, this is called whether or not we intend parallel
     248             :  * activity.)
     249             :  */
     250             : void
     251         306 : init_parallel_dump_utils(void)
     252             : {
     253             : #ifdef WIN32
     254             :     if (!parallel_init_done)
     255             :     {
     256             :         WSADATA     wsaData;
     257             :         int         err;
     258             : 
     259             :         /* Prepare for threaded operation */
     260             :         tls_index = TlsAlloc();
     261             :         mainThreadId = GetCurrentThreadId();
     262             : 
     263             :         /* Initialize socket access */
     264             :         err = WSAStartup(MAKEWORD(2, 2), &wsaData);
     265             :         if (err != 0)
     266             :         {
     267             :             pg_log_error("WSAStartup failed: %d", err);
     268             :             exit_nicely(1);
     269             :         }
     270             :         /* ... and arrange to shut it down at exit */
     271             :         on_exit_nicely(shutdown_parallel_dump_utils, NULL);
     272             :         parallel_init_done = true;
     273             :     }
     274             : #endif
     275         306 : }
     276             : 
     277             : /*
     278             :  * Find the ParallelSlot for the current worker process or thread.
     279             :  *
     280             :  * Returns NULL if no matching slot is found (this implies we're the master).
     281             :  */
     282             : static ParallelSlot *
     283           0 : GetMyPSlot(ParallelState *pstate)
     284             : {
     285             :     int         i;
     286             : 
     287           0 :     for (i = 0; i < pstate->numWorkers; i++)
     288             :     {
     289             : #ifdef WIN32
     290             :         if (pstate->parallelSlot[i].threadId == GetCurrentThreadId())
     291             : #else
     292           0 :         if (pstate->parallelSlot[i].pid == getpid())
     293             : #endif
     294           0 :             return &(pstate->parallelSlot[i]);
     295             :     }
     296             : 
     297           0 :     return NULL;
     298             : }
     299             : 
     300             : /*
     301             :  * A thread-local version of getLocalPQExpBuffer().
     302             :  *
     303             :  * Non-reentrant but reduces memory leakage: we'll consume one buffer per
     304             :  * thread, which is much better than one per fmtId/fmtQualifiedId call.
     305             :  */
     306             : #ifdef WIN32
     307             : static PQExpBuffer
     308             : getThreadLocalPQExpBuffer(void)
     309             : {
     310             :     /*
     311             :      * The Tls code goes awry if we use a static var, so we provide for both
     312             :      * static and auto, and omit any use of the static var when using Tls. We
     313             :      * rely on TlsGetValue() to return 0 if the value is not yet set.
     314             :      */
     315             :     static PQExpBuffer s_id_return = NULL;
     316             :     PQExpBuffer id_return;
     317             : 
     318             :     if (parallel_init_done)
     319             :         id_return = (PQExpBuffer) TlsGetValue(tls_index);
     320             :     else
     321             :         id_return = s_id_return;
     322             : 
     323             :     if (id_return)              /* first time through? */
     324             :     {
     325             :         /* same buffer, just wipe contents */
     326             :         resetPQExpBuffer(id_return);
     327             :     }
     328             :     else
     329             :     {
     330             :         /* new buffer */
     331             :         id_return = createPQExpBuffer();
     332             :         if (parallel_init_done)
     333             :             TlsSetValue(tls_index, id_return);
     334             :         else
     335             :             s_id_return = id_return;
     336             :     }
     337             : 
     338             :     return id_return;
     339             : }
     340             : #endif                          /* WIN32 */
     341             : 
     342             : /*
     343             :  * pg_dump and pg_restore call this to register the cleanup handler
     344             :  * as soon as they've created the ArchiveHandle.
     345             :  */
     346             : void
     347         208 : on_exit_close_archive(Archive *AHX)
     348             : {
     349         208 :     shutdown_info.AHX = AHX;
     350         208 :     on_exit_nicely(archive_close_connection, &shutdown_info);
     351         208 : }
     352             : 
     353             : /*
     354             :  * on_exit_nicely handler for shutting down database connections and
     355             :  * worker processes cleanly.
     356             :  */
     357             : static void
     358         172 : archive_close_connection(int code, void *arg)
     359             : {
     360         172 :     ShutdownInformation *si = (ShutdownInformation *) arg;
     361             : 
     362         172 :     if (si->pstate)
     363             :     {
     364             :         /* In parallel mode, must figure out who we are */
     365           0 :         ParallelSlot *slot = GetMyPSlot(si->pstate);
     366             : 
     367           0 :         if (!slot)
     368             :         {
     369             :             /*
     370             :              * We're the master.  Forcibly shut down workers, then close our
     371             :              * own database connection, if any.
     372             :              */
     373           0 :             ShutdownWorkersHard(si->pstate);
     374             : 
     375           0 :             if (si->AHX)
     376           0 :                 DisconnectDatabase(si->AHX);
     377             :         }
     378             :         else
     379             :         {
     380             :             /*
     381             :              * We're a worker.  Shut down our own DB connection if any.  On
     382             :              * Windows, we also have to close our communication sockets, to
     383             :              * emulate what will happen on Unix when the worker process exits.
     384             :              * (Without this, if this is a premature exit, the master would
     385             :              * fail to detect it because there would be no EOF condition on
     386             :              * the other end of the pipe.)
     387             :              */
     388           0 :             if (slot->AH)
     389           0 :                 DisconnectDatabase(&(slot->AH->public));
     390             : 
     391             : #ifdef WIN32
     392             :             closesocket(slot->pipeRevRead);
     393             :             closesocket(slot->pipeRevWrite);
     394             : #endif
     395             :         }
     396             :     }
     397             :     else
     398             :     {
     399             :         /* Non-parallel operation: just kill the master DB connection */
     400         172 :         if (si->AHX)
     401         172 :             DisconnectDatabase(si->AHX);
     402             :     }
     403         172 : }
     404             : 
     405             : /*
     406             :  * Forcibly shut down any remaining workers, waiting for them to finish.
     407             :  *
     408             :  * Note that we don't expect to come here during normal exit (the workers
     409             :  * should be long gone, and the ParallelState too).  We're only here in a
     410             :  * fatal() situation, so intervening to cancel active commands is
     411             :  * appropriate.
     412             :  */
     413             : static void
     414           0 : ShutdownWorkersHard(ParallelState *pstate)
     415             : {
     416             :     int         i;
     417             : 
     418             :     /*
     419             :      * Close our write end of the sockets so that any workers waiting for
     420             :      * commands know they can exit.  (Note: some of the pipeWrite fields might
     421             :      * still be zero, if we failed to initialize all the workers.  Hence, just
     422             :      * ignore errors here.)
     423             :      */
     424           0 :     for (i = 0; i < pstate->numWorkers; i++)
     425           0 :         closesocket(pstate->parallelSlot[i].pipeWrite);
     426             : 
     427             :     /*
     428             :      * Force early termination of any commands currently in progress.
     429             :      */
     430             : #ifndef WIN32
     431             :     /* On non-Windows, send SIGTERM to each worker process. */
     432           0 :     for (i = 0; i < pstate->numWorkers; i++)
     433             :     {
     434           0 :         pid_t       pid = pstate->parallelSlot[i].pid;
     435             : 
     436           0 :         if (pid != 0)
     437           0 :             kill(pid, SIGTERM);
     438             :     }
     439             : #else
     440             : 
     441             :     /*
     442             :      * On Windows, send query cancels directly to the workers' backends.  Use
     443             :      * a critical section to ensure worker threads don't change state.
     444             :      */
     445             :     EnterCriticalSection(&signal_info_lock);
     446             :     for (i = 0; i < pstate->numWorkers; i++)
     447             :     {
     448             :         ArchiveHandle *AH = pstate->parallelSlot[i].AH;
     449             :         char        errbuf[1];
     450             : 
     451             :         if (AH != NULL && AH->connCancel != NULL)
     452             :             (void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf));
     453             :     }
     454             :     LeaveCriticalSection(&signal_info_lock);
     455             : #endif
     456             : 
     457             :     /* Now wait for them to terminate. */
     458           0 :     WaitForTerminatingWorkers(pstate);
     459           0 : }
     460             : 
     461             : /*
     462             :  * Wait for all workers to terminate.
     463             :  */
     464             : static void
     465          12 : WaitForTerminatingWorkers(ParallelState *pstate)
     466             : {
     467          36 :     while (!HasEveryWorkerTerminated(pstate))
     468             :     {
     469          24 :         ParallelSlot *slot = NULL;
     470             :         int         j;
     471             : 
     472             : #ifndef WIN32
     473             :         /* On non-Windows, use wait() to wait for next worker to end */
     474             :         int         status;
     475          24 :         pid_t       pid = wait(&status);
     476             : 
     477             :         /* Find dead worker's slot, and clear the PID field */
     478          36 :         for (j = 0; j < pstate->numWorkers; j++)
     479             :         {
     480          36 :             slot = &(pstate->parallelSlot[j]);
     481          36 :             if (slot->pid == pid)
     482             :             {
     483          24 :                 slot->pid = 0;
     484          24 :                 break;
     485             :             }
     486             :         }
     487             : #else                           /* WIN32 */
     488             :         /* On Windows, we must use WaitForMultipleObjects() */
     489             :         HANDLE     *lpHandles = pg_malloc(sizeof(HANDLE) * pstate->numWorkers);
     490             :         int         nrun = 0;
     491             :         DWORD       ret;
     492             :         uintptr_t   hThread;
     493             : 
     494             :         for (j = 0; j < pstate->numWorkers; j++)
     495             :         {
     496             :             if (WORKER_IS_RUNNING(pstate->parallelSlot[j].workerStatus))
     497             :             {
     498             :                 lpHandles[nrun] = (HANDLE) pstate->parallelSlot[j].hThread;
     499             :                 nrun++;
     500             :             }
     501             :         }
     502             :         ret = WaitForMultipleObjects(nrun, lpHandles, false, INFINITE);
     503             :         Assert(ret != WAIT_FAILED);
     504             :         hThread = (uintptr_t) lpHandles[ret - WAIT_OBJECT_0];
     505             :         free(lpHandles);
     506             : 
     507             :         /* Find dead worker's slot, and clear the hThread field */
     508             :         for (j = 0; j < pstate->numWorkers; j++)
     509             :         {
     510             :             slot = &(pstate->parallelSlot[j]);
     511             :             if (slot->hThread == hThread)
     512             :             {
     513             :                 /* For cleanliness, close handles for dead threads */
     514             :                 CloseHandle((HANDLE) slot->hThread);
     515             :                 slot->hThread = (uintptr_t) INVALID_HANDLE_VALUE;
     516             :                 break;
     517             :             }
     518             :         }
     519             : #endif                          /* WIN32 */
     520             : 
     521             :         /* On all platforms, update workerStatus and te[] as well */
     522             :         Assert(j < pstate->numWorkers);
     523          24 :         slot->workerStatus = WRKR_TERMINATED;
     524          24 :         pstate->te[j] = NULL;
     525             :     }
     526          12 : }
     527             : 
     528             : 
     529             : /*
     530             :  * Code for responding to cancel interrupts (SIGINT, control-C, etc)
     531             :  *
     532             :  * This doesn't quite belong in this module, but it needs access to the
     533             :  * ParallelState data, so there's not really a better place either.
     534             :  *
     535             :  * When we get a cancel interrupt, we could just die, but in pg_restore that
     536             :  * could leave a SQL command (e.g., CREATE INDEX on a large table) running
     537             :  * for a long time.  Instead, we try to send a cancel request and then die.
     538             :  * pg_dump probably doesn't really need this, but we might as well use it
     539             :  * there too.  Note that sending the cancel directly from the signal handler
     540             :  * is safe because PQcancel() is written to make it so.
     541             :  *
     542             :  * In parallel operation on Unix, each process is responsible for canceling
     543             :  * its own connection (this must be so because nobody else has access to it).
     544             :  * Furthermore, the master process should attempt to forward its signal to
     545             :  * each child.  In simple manual use of pg_dump/pg_restore, forwarding isn't
     546             :  * needed because typing control-C at the console would deliver SIGINT to
     547             :  * every member of the terminal process group --- but in other scenarios it
     548             :  * might be that only the master gets signaled.
     549             :  *
     550             :  * On Windows, the cancel handler runs in a separate thread, because that's
     551             :  * how SetConsoleCtrlHandler works.  We make it stop worker threads, send
     552             :  * cancels on all active connections, and then return FALSE, which will allow
     553             :  * the process to die.  For safety's sake, we use a critical section to
     554             :  * protect the PGcancel structures against being changed while the signal
     555             :  * thread runs.
     556             :  */
     557             : 
     558             : #ifndef WIN32
     559             : 
     560             : /*
     561             :  * Signal handler (Unix only)
     562             :  */
     563             : static void
     564           0 : sigTermHandler(SIGNAL_ARGS)
     565             : {
     566             :     int         i;
     567             :     char        errbuf[1];
     568             : 
     569             :     /*
     570             :      * Some platforms allow delivery of new signals to interrupt an active
     571             :      * signal handler.  That could muck up our attempt to send PQcancel, so
     572             :      * disable the signals that setup_cancel_handler enabled.
     573             :      */
     574           0 :     pqsignal(SIGINT, SIG_IGN);
     575           0 :     pqsignal(SIGTERM, SIG_IGN);
     576           0 :     pqsignal(SIGQUIT, SIG_IGN);
     577             : 
     578             :     /*
     579             :      * If we're in the master, forward signal to all workers.  (It seems best
     580             :      * to do this before PQcancel; killing the master transaction will result
     581             :      * in invalid-snapshot errors from active workers, which maybe we can
     582             :      * quiet by killing workers first.)  Ignore any errors.
     583             :      */
     584           0 :     if (signal_info.pstate != NULL)
     585             :     {
     586           0 :         for (i = 0; i < signal_info.pstate->numWorkers; i++)
     587             :         {
     588           0 :             pid_t       pid = signal_info.pstate->parallelSlot[i].pid;
     589             : 
     590           0 :             if (pid != 0)
     591           0 :                 kill(pid, SIGTERM);
     592             :         }
     593             :     }
     594             : 
     595             :     /*
     596             :      * Send QueryCancel if we have a connection to send to.  Ignore errors,
     597             :      * there's not much we can do about them anyway.
     598             :      */
     599           0 :     if (signal_info.myAH != NULL && signal_info.myAH->connCancel != NULL)
     600           0 :         (void) PQcancel(signal_info.myAH->connCancel, errbuf, sizeof(errbuf));
     601             : 
     602             :     /*
     603             :      * Report we're quitting, using nothing more complicated than write(2).
     604             :      * When in parallel operation, only the master process should do this.
     605             :      */
     606           0 :     if (!signal_info.am_worker)
     607             :     {
     608           0 :         if (progname)
     609             :         {
     610           0 :             write_stderr(progname);
     611           0 :             write_stderr(": ");
     612             :         }
     613           0 :         write_stderr("terminated by user\n");
     614             :     }
     615             : 
     616             :     /*
     617             :      * And die, using _exit() not exit() because the latter will invoke atexit
     618             :      * handlers that can fail if we interrupted related code.
     619             :      */
     620           0 :     _exit(1);
     621             : }
     622             : 
     623             : /*
     624             :  * Enable cancel interrupt handler, if not already done.
     625             :  */
     626             : static void
     627         478 : setup_cancel_handler(void)
     628             : {
     629             :     /*
     630             :      * When forking, signal_info.handler_set will propagate into the new
     631             :      * process, but that's fine because the signal handler state does too.
     632             :      */
     633         478 :     if (!signal_info.handler_set)
     634             :     {
     635         186 :         signal_info.handler_set = true;
     636             : 
     637         186 :         pqsignal(SIGINT, sigTermHandler);
     638         186 :         pqsignal(SIGTERM, sigTermHandler);
     639         186 :         pqsignal(SIGQUIT, sigTermHandler);
     640             :     }
     641         478 : }
     642             : 
     643             : #else                           /* WIN32 */
     644             : 
     645             : /*
     646             :  * Console interrupt handler --- runs in a newly-started thread.
     647             :  *
     648             :  * After stopping other threads and sending cancel requests on all open
     649             :  * connections, we return FALSE which will allow the default ExitProcess()
     650             :  * action to be taken.
     651             :  */
     652             : static BOOL WINAPI
     653             : consoleHandler(DWORD dwCtrlType)
     654             : {
     655             :     int         i;
     656             :     char        errbuf[1];
     657             : 
     658             :     if (dwCtrlType == CTRL_C_EVENT ||
     659             :         dwCtrlType == CTRL_BREAK_EVENT)
     660             :     {
     661             :         /* Critical section prevents changing data we look at here */
     662             :         EnterCriticalSection(&signal_info_lock);
     663             : 
     664             :         /*
     665             :          * If in parallel mode, stop worker threads and send QueryCancel to
     666             :          * their connected backends.  The main point of stopping the worker
     667             :          * threads is to keep them from reporting the query cancels as errors,
     668             :          * which would clutter the user's screen.  We needn't stop the master
     669             :          * thread since it won't be doing much anyway.  Do this before
     670             :          * canceling the main transaction, else we might get invalid-snapshot
     671             :          * errors reported before we can stop the workers.  Ignore errors,
     672             :          * there's not much we can do about them anyway.
     673             :          */
     674             :         if (signal_info.pstate != NULL)
     675             :         {
     676             :             for (i = 0; i < signal_info.pstate->numWorkers; i++)
     677             :             {
     678             :                 ParallelSlot *slot = &(signal_info.pstate->parallelSlot[i]);
     679             :                 ArchiveHandle *AH = slot->AH;
     680             :                 HANDLE      hThread = (HANDLE) slot->hThread;
     681             : 
     682             :                 /*
     683             :                  * Using TerminateThread here may leave some resources leaked,
     684             :                  * but it doesn't matter since we're about to end the whole
     685             :                  * process.
     686             :                  */
     687             :                 if (hThread != INVALID_HANDLE_VALUE)
     688             :                     TerminateThread(hThread, 0);
     689             : 
     690             :                 if (AH != NULL && AH->connCancel != NULL)
     691             :                     (void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf));
     692             :             }
     693             :         }
     694             : 
     695             :         /*
     696             :          * Send QueryCancel to master connection, if enabled.  Ignore errors,
     697             :          * there's not much we can do about them anyway.
     698             :          */
     699             :         if (signal_info.myAH != NULL && signal_info.myAH->connCancel != NULL)
     700             :             (void) PQcancel(signal_info.myAH->connCancel,
     701             :                             errbuf, sizeof(errbuf));
     702             : 
     703             :         LeaveCriticalSection(&signal_info_lock);
     704             : 
     705             :         /*
     706             :          * Report we're quitting, using nothing more complicated than
     707             :          * write(2).  (We might be able to get away with using pg_log_*()
     708             :          * here, but since we terminated other threads uncleanly above, it
     709             :          * seems better to assume as little as possible.)
     710             :          */
     711             :         if (progname)
     712             :         {
     713             :             write_stderr(progname);
     714             :             write_stderr(": ");
     715             :         }
     716             :         write_stderr("terminated by user\n");
     717             :     }
     718             : 
     719             :     /* Always return FALSE to allow signal handling to continue */
     720             :     return FALSE;
     721             : }
     722             : 
     723             : /*
     724             :  * Enable cancel interrupt handler, if not already done.
     725             :  */
     726             : static void
     727             : setup_cancel_handler(void)
     728             : {
     729             :     if (!signal_info.handler_set)
     730             :     {
     731             :         signal_info.handler_set = true;
     732             : 
     733             :         InitializeCriticalSection(&signal_info_lock);
     734             : 
     735             :         SetConsoleCtrlHandler(consoleHandler, TRUE);
     736             :     }
     737             : }
     738             : 
     739             : #endif                          /* WIN32 */
     740             : 
     741             : 
     742             : /*
     743             :  * set_archive_cancel_info
     744             :  *
     745             :  * Fill AH->connCancel with cancellation info for the specified database
     746             :  * connection; or clear it if conn is NULL.
     747             :  */
     748             : void
     749         478 : set_archive_cancel_info(ArchiveHandle *AH, PGconn *conn)
     750             : {
     751             :     PGcancel   *oldConnCancel;
     752             : 
     753             :     /*
     754             :      * Activate the interrupt handler if we didn't yet in this process.  On
     755             :      * Windows, this also initializes signal_info_lock; therefore it's
     756             :      * important that this happen at least once before we fork off any
     757             :      * threads.
     758             :      */
     759         478 :     setup_cancel_handler();
     760             : 
     761             :     /*
     762             :      * On Unix, we assume that storing a pointer value is atomic with respect
     763             :      * to any possible signal interrupt.  On Windows, use a critical section.
     764             :      */
     765             : 
     766             : #ifdef WIN32
     767             :     EnterCriticalSection(&signal_info_lock);
     768             : #endif
     769             : 
     770             :     /* Free the old one if we have one */
     771         478 :     oldConnCancel = AH->connCancel;
     772             :     /* be sure interrupt handler doesn't use pointer while freeing */
     773         478 :     AH->connCancel = NULL;
     774             : 
     775         478 :     if (oldConnCancel != NULL)
     776         248 :         PQfreeCancel(oldConnCancel);
     777             : 
     778             :     /* Set the new one if specified */
     779         478 :     if (conn)
     780         248 :         AH->connCancel = PQgetCancel(conn);
     781             : 
     782             :     /*
     783             :      * On Unix, there's only ever one active ArchiveHandle per process, so we
     784             :      * can just set signal_info.myAH unconditionally.  On Windows, do that
     785             :      * only in the main thread; worker threads have to make sure their
     786             :      * ArchiveHandle appears in the pstate data, which is dealt with in
     787             :      * RunWorker().
     788             :      */
     789             : #ifndef WIN32
     790         478 :     signal_info.myAH = AH;
     791             : #else
     792             :     if (mainThreadId == GetCurrentThreadId())
     793             :         signal_info.myAH = AH;
     794             : #endif
     795             : 
     796             : #ifdef WIN32
     797             :     LeaveCriticalSection(&signal_info_lock);
     798             : #endif
     799         478 : }
     800             : 
     801             : /*
     802             :  * set_cancel_pstate
     803             :  *
     804             :  * Set signal_info.pstate to point to the specified ParallelState, if any.
     805             :  * We need this mainly to have an interlock against Windows signal thread.
     806             :  */
     807             : static void
     808          24 : set_cancel_pstate(ParallelState *pstate)
     809             : {
     810             : #ifdef WIN32
     811             :     EnterCriticalSection(&signal_info_lock);
     812             : #endif
     813             : 
     814          24 :     signal_info.pstate = pstate;
     815             : 
     816             : #ifdef WIN32
     817             :     LeaveCriticalSection(&signal_info_lock);
     818             : #endif
     819          24 : }
     820             : 
     821             : /*
     822             :  * set_cancel_slot_archive
     823             :  *
     824             :  * Set ParallelSlot's AH field to point to the specified archive, if any.
     825             :  * We need this mainly to have an interlock against Windows signal thread.
     826             :  */
     827             : static void
     828          48 : set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH)
     829             : {
     830             : #ifdef WIN32
     831             :     EnterCriticalSection(&signal_info_lock);
     832             : #endif
     833             : 
     834          48 :     slot->AH = AH;
     835             : 
     836             : #ifdef WIN32
     837             :     LeaveCriticalSection(&signal_info_lock);
     838             : #endif
     839          48 : }
     840             : 
     841             : 
     842             : /*
     843             :  * This function is called by both Unix and Windows variants to set up
     844             :  * and run a worker process.  Caller should exit the process (or thread)
     845             :  * upon return.
     846             :  */
     847             : static void
     848          24 : RunWorker(ArchiveHandle *AH, ParallelSlot *slot)
     849             : {
     850             :     int         pipefd[2];
     851             : 
     852             :     /* fetch child ends of pipes */
     853          24 :     pipefd[PIPE_READ] = slot->pipeRevRead;
     854          24 :     pipefd[PIPE_WRITE] = slot->pipeRevWrite;
     855             : 
     856             :     /*
     857             :      * Clone the archive so that we have our own state to work with, and in
     858             :      * particular our own database connection.
     859             :      *
     860             :      * We clone on Unix as well as Windows, even though technically we don't
     861             :      * need to because fork() gives us a copy in our own address space
     862             :      * already.  But CloneArchive resets the state information and also clones
     863             :      * the database connection which both seem kinda helpful.
     864             :      */
     865          24 :     AH = CloneArchive(AH);
     866             : 
     867             :     /* Remember cloned archive where signal handler can find it */
     868          24 :     set_cancel_slot_archive(slot, AH);
     869             : 
     870             :     /*
     871             :      * Call the setup worker function that's defined in the ArchiveHandle.
     872             :      */
     873          24 :     (AH->SetupWorkerPtr) ((Archive *) AH);
     874             : 
     875             :     /*
     876             :      * Execute commands until done.
     877             :      */
     878          24 :     WaitForCommands(AH, pipefd);
     879             : 
     880             :     /*
     881             :      * Disconnect from database and clean up.
     882             :      */
     883          24 :     set_cancel_slot_archive(slot, NULL);
     884          24 :     DisconnectDatabase(&(AH->public));
     885          24 :     DeCloneArchive(AH);
     886          24 : }
     887             : 
     888             : /*
     889             :  * Thread base function for Windows
     890             :  */
     891             : #ifdef WIN32
     892             : static unsigned __stdcall
     893             : init_spawned_worker_win32(WorkerInfo *wi)
     894             : {
     895             :     ArchiveHandle *AH = wi->AH;
     896             :     ParallelSlot *slot = wi->slot;
     897             : 
     898             :     /* Don't need WorkerInfo anymore */
     899             :     free(wi);
     900             : 
     901             :     /* Run the worker ... */
     902             :     RunWorker(AH, slot);
     903             : 
     904             :     /* Exit the thread */
     905             :     _endthreadex(0);
     906             :     return 0;
     907             : }
     908             : #endif                          /* WIN32 */
     909             : 
     910             : /*
     911             :  * This function starts a parallel dump or restore by spawning off the worker
     912             :  * processes.  For Windows, it creates a number of threads; on Unix the
     913             :  * workers are created with fork().
     914             :  */
     915             : ParallelState *
     916          16 : ParallelBackupStart(ArchiveHandle *AH)
     917             : {
     918             :     ParallelState *pstate;
     919             :     int         i;
     920             : 
     921             :     Assert(AH->public.numWorkers > 0);
     922             : 
     923          16 :     pstate = (ParallelState *) pg_malloc(sizeof(ParallelState));
     924             : 
     925          16 :     pstate->numWorkers = AH->public.numWorkers;
     926          16 :     pstate->te = NULL;
     927          16 :     pstate->parallelSlot = NULL;
     928             : 
     929          16 :     if (AH->public.numWorkers == 1)
     930           4 :         return pstate;
     931             : 
     932             :     /* Create status arrays, being sure to initialize all fields to 0 */
     933          12 :     pstate->te = (TocEntry **)
     934          12 :         pg_malloc0(pstate->numWorkers * sizeof(TocEntry *));
     935          12 :     pstate->parallelSlot = (ParallelSlot *)
     936          12 :         pg_malloc0(pstate->numWorkers * sizeof(ParallelSlot));
     937             : 
     938             : #ifdef WIN32
     939             :     /* Make fmtId() and fmtQualifiedId() use thread-local storage */
     940             :     getLocalPQExpBuffer = getThreadLocalPQExpBuffer;
     941             : #endif
     942             : 
     943             :     /*
     944             :      * Set the pstate in shutdown_info, to tell the exit handler that it must
     945             :      * clean up workers as well as the main database connection.  But we don't
     946             :      * set this in signal_info yet, because we don't want child processes to
     947             :      * inherit non-NULL signal_info.pstate.
     948             :      */
     949          12 :     shutdown_info.pstate = pstate;
     950             : 
     951             :     /*
     952             :      * Temporarily disable query cancellation on the master connection.  This
     953             :      * ensures that child processes won't inherit valid AH->connCancel
     954             :      * settings and thus won't try to issue cancels against the master's
     955             :      * connection.  No harm is done if we fail while it's disabled, because
     956             :      * the master connection is idle at this point anyway.
     957             :      */
     958          12 :     set_archive_cancel_info(AH, NULL);
     959             : 
     960             :     /* Ensure stdio state is quiesced before forking */
     961          12 :     fflush(NULL);
     962             : 
     963             :     /* Create desired number of workers */
     964          36 :     for (i = 0; i < pstate->numWorkers; i++)
     965             :     {
     966             : #ifdef WIN32
     967             :         WorkerInfo *wi;
     968             :         uintptr_t   handle;
     969             : #else
     970             :         pid_t       pid;
     971             : #endif
     972          24 :         ParallelSlot *slot = &(pstate->parallelSlot[i]);
     973             :         int         pipeMW[2],
     974             :                     pipeWM[2];
     975             : 
     976             :         /* Create communication pipes for this worker */
     977          24 :         if (pgpipe(pipeMW) < 0 || pgpipe(pipeWM) < 0)
     978           0 :             fatal("could not create communication channels: %m");
     979             : 
     980             :         /* master's ends of the pipes */
     981          24 :         slot->pipeRead = pipeWM[PIPE_READ];
     982          24 :         slot->pipeWrite = pipeMW[PIPE_WRITE];
     983             :         /* child's ends of the pipes */
     984          24 :         slot->pipeRevRead = pipeMW[PIPE_READ];
     985          24 :         slot->pipeRevWrite = pipeWM[PIPE_WRITE];
     986             : 
     987             : #ifdef WIN32
     988             :         /* Create transient structure to pass args to worker function */
     989             :         wi = (WorkerInfo *) pg_malloc(sizeof(WorkerInfo));
     990             : 
     991             :         wi->AH = AH;
     992             :         wi->slot = slot;
     993             : 
     994             :         handle = _beginthreadex(NULL, 0, (void *) &init_spawned_worker_win32,
     995             :                                 wi, 0, &(slot->threadId));
     996             :         slot->hThread = handle;
     997             :         slot->workerStatus = WRKR_IDLE;
     998             : #else                           /* !WIN32 */
     999          24 :         pid = fork();
    1000          48 :         if (pid == 0)
    1001             :         {
    1002             :             /* we are the worker */
    1003             :             int         j;
    1004             : 
    1005             :             /* this is needed for GetMyPSlot() */
    1006          24 :             slot->pid = getpid();
    1007             : 
    1008             :             /* instruct signal handler that we're in a worker now */
    1009          24 :             signal_info.am_worker = true;
    1010             : 
    1011             :             /* close read end of Worker -> Master */
    1012          24 :             closesocket(pipeWM[PIPE_READ]);
    1013             :             /* close write end of Master -> Worker */
    1014          24 :             closesocket(pipeMW[PIPE_WRITE]);
    1015             : 
    1016             :             /*
    1017             :              * Close all inherited fds for communication of the master with
    1018             :              * previously-forked workers.
    1019             :              */
    1020          36 :             for (j = 0; j < i; j++)
    1021             :             {
    1022          12 :                 closesocket(pstate->parallelSlot[j].pipeRead);
    1023          12 :                 closesocket(pstate->parallelSlot[j].pipeWrite);
    1024             :             }
    1025             : 
    1026             :             /* Run the worker ... */
    1027          24 :             RunWorker(AH, slot);
    1028             : 
    1029             :             /* We can just exit(0) when done */
    1030          24 :             exit(0);
    1031             :         }
    1032          24 :         else if (pid < 0)
    1033             :         {
    1034             :             /* fork failed */
    1035           0 :             fatal("could not create worker process: %m");
    1036             :         }
    1037             : 
    1038             :         /* In Master after successful fork */
    1039          24 :         slot->pid = pid;
    1040          24 :         slot->workerStatus = WRKR_IDLE;
    1041             : 
    1042             :         /* close read end of Master -> Worker */
    1043          24 :         closesocket(pipeMW[PIPE_READ]);
    1044             :         /* close write end of Worker -> Master */
    1045          24 :         closesocket(pipeWM[PIPE_WRITE]);
    1046             : #endif                          /* WIN32 */
    1047             :     }
    1048             : 
    1049             :     /*
    1050             :      * Having forked off the workers, disable SIGPIPE so that master isn't
    1051             :      * killed if it tries to send a command to a dead worker.  We don't want
    1052             :      * the workers to inherit this setting, though.
    1053             :      */
    1054             : #ifndef WIN32
    1055          12 :     pqsignal(SIGPIPE, SIG_IGN);
    1056             : #endif
    1057             : 
    1058             :     /*
    1059             :      * Re-establish query cancellation on the master connection.
    1060             :      */
    1061          12 :     set_archive_cancel_info(AH, AH->connection);
    1062             : 
    1063             :     /*
    1064             :      * Tell the cancel signal handler to forward signals to worker processes,
    1065             :      * too.  (As with query cancel, we did not need this earlier because the
    1066             :      * workers have not yet been given anything to do; if we die before this
    1067             :      * point, any already-started workers will see EOF and quit promptly.)
    1068             :      */
    1069          12 :     set_cancel_pstate(pstate);
    1070             : 
    1071          12 :     return pstate;
    1072             : }
    1073             : 
    1074             : /*
    1075             :  * Close down a parallel dump or restore.
    1076             :  */
    1077             : void
    1078          16 : ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate)
    1079             : {
    1080             :     int         i;
    1081             : 
    1082             :     /* No work if non-parallel */
    1083          16 :     if (pstate->numWorkers == 1)
    1084           4 :         return;
    1085             : 
    1086             :     /* There should not be any unfinished jobs */
    1087             :     Assert(IsEveryWorkerIdle(pstate));
    1088             : 
    1089             :     /* Close the sockets so that the workers know they can exit */
    1090          36 :     for (i = 0; i < pstate->numWorkers; i++)
    1091             :     {
    1092          24 :         closesocket(pstate->parallelSlot[i].pipeRead);
    1093          24 :         closesocket(pstate->parallelSlot[i].pipeWrite);
    1094             :     }
    1095             : 
    1096             :     /* Wait for them to exit */
    1097          12 :     WaitForTerminatingWorkers(pstate);
    1098             : 
    1099             :     /*
    1100             :      * Unlink pstate from shutdown_info, so the exit handler will not try to
    1101             :      * use it; and likewise unlink from signal_info.
    1102             :      */
    1103          12 :     shutdown_info.pstate = NULL;
    1104          12 :     set_cancel_pstate(NULL);
    1105             : 
    1106             :     /* Release state (mere neatnik-ism, since we're about to terminate) */
    1107          12 :     free(pstate->te);
    1108          12 :     free(pstate->parallelSlot);
    1109          12 :     free(pstate);
    1110             : }
    1111             : 
    1112             : /*
    1113             :  * These next four functions handle construction and parsing of the command
    1114             :  * strings and response strings for parallel workers.
    1115             :  *
    1116             :  * Currently, these can be the same regardless of which archive format we are
    1117             :  * processing.  In future, we might want to let format modules override these
    1118             :  * functions to add format-specific data to a command or response.
    1119             :  */
    1120             : 
    1121             : /*
    1122             :  * buildWorkerCommand: format a command string to send to a worker.
    1123             :  *
    1124             :  * The string is built in the caller-supplied buffer of size buflen.
    1125             :  */
    1126             : static void
    1127          44 : buildWorkerCommand(ArchiveHandle *AH, TocEntry *te, T_Action act,
    1128             :                    char *buf, int buflen)
    1129             : {
    1130          44 :     if (act == ACT_DUMP)
    1131          40 :         snprintf(buf, buflen, "DUMP %d", te->dumpId);
    1132           4 :     else if (act == ACT_RESTORE)
    1133           4 :         snprintf(buf, buflen, "RESTORE %d", te->dumpId);
    1134             :     else
    1135             :         Assert(false);
    1136          44 : }
    1137             : 
    1138             : /*
    1139             :  * parseWorkerCommand: interpret a command string in a worker.
    1140             :  */
    1141             : static void
    1142          44 : parseWorkerCommand(ArchiveHandle *AH, TocEntry **te, T_Action *act,
    1143             :                    const char *msg)
    1144             : {
    1145             :     DumpId      dumpId;
    1146             :     int         nBytes;
    1147             : 
    1148          44 :     if (messageStartsWith(msg, "DUMP "))
    1149             :     {
    1150          40 :         *act = ACT_DUMP;
    1151          40 :         sscanf(msg, "DUMP %d%n", &dumpId, &nBytes);
    1152             :         Assert(nBytes == strlen(msg));
    1153          40 :         *te = getTocEntryByDumpId(AH, dumpId);
    1154             :         Assert(*te != NULL);
    1155             :     }
    1156           4 :     else if (messageStartsWith(msg, "RESTORE "))
    1157             :     {
    1158           4 :         *act = ACT_RESTORE;
    1159           4 :         sscanf(msg, "RESTORE %d%n", &dumpId, &nBytes);
    1160             :         Assert(nBytes == strlen(msg));
    1161           4 :         *te = getTocEntryByDumpId(AH, dumpId);
    1162             :         Assert(*te != NULL);
    1163             :     }
    1164             :     else
    1165           0 :         fatal("unrecognized command received from master: \"%s\"",
    1166             :               msg);
    1167          44 : }
    1168             : 
    1169             : /*
    1170             :  * buildWorkerResponse: format a response string to send to the master.
    1171             :  *
    1172             :  * The string is built in the caller-supplied buffer of size buflen.
    1173             :  */
    1174             : static void
    1175          44 : buildWorkerResponse(ArchiveHandle *AH, TocEntry *te, T_Action act, int status,
    1176             :                     char *buf, int buflen)
    1177             : {
    1178          44 :     snprintf(buf, buflen, "OK %d %d %d",
    1179             :              te->dumpId,
    1180             :              status,
    1181             :              status == WORKER_IGNORED_ERRORS ? AH->public.n_errors : 0);
    1182          44 : }
    1183             : 
    1184             : /*
    1185             :  * parseWorkerResponse: parse the status message returned by a worker.
    1186             :  *
    1187             :  * Returns the integer status code, and may update fields of AH and/or te.
    1188             :  */
    1189             : static int
    1190          44 : parseWorkerResponse(ArchiveHandle *AH, TocEntry *te,
    1191             :                     const char *msg)
    1192             : {
    1193             :     DumpId      dumpId;
    1194             :     int         nBytes,
    1195             :                 n_errors;
    1196          44 :     int         status = 0;
    1197             : 
    1198          44 :     if (messageStartsWith(msg, "OK "))
    1199             :     {
    1200          44 :         sscanf(msg, "OK %d %d %d%n", &dumpId, &status, &n_errors, &nBytes);
    1201             : 
    1202             :         Assert(dumpId == te->dumpId);
    1203             :         Assert(nBytes == strlen(msg));
    1204             : 
    1205          44 :         AH->public.n_errors += n_errors;
    1206             :     }
    1207             :     else
    1208           0 :         fatal("invalid message received from worker: \"%s\"",
    1209             :               msg);
    1210             : 
    1211          44 :     return status;
    1212             : }
    1213             : 
    1214             : /*
    1215             :  * Dispatch a job to some free worker.
    1216             :  *
    1217             :  * te is the TocEntry to be processed, act is the action to be taken on it.
    1218             :  * callback is the function to call on completion of the job.
    1219             :  *
    1220             :  * If no worker is currently available, this will block, and previously
    1221             :  * registered callback functions may be called.
    1222             :  */
    1223             : void
    1224          44 : DispatchJobForTocEntry(ArchiveHandle *AH,
    1225             :                        ParallelState *pstate,
    1226             :                        TocEntry *te,
    1227             :                        T_Action act,
    1228             :                        ParallelCompletionPtr callback,
    1229             :                        void *callback_data)
    1230             : {
    1231             :     int         worker;
    1232             :     char        buf[256];
    1233             : 
    1234             :     /* Get a worker, waiting if none are idle */
    1235          72 :     while ((worker = GetIdleWorker(pstate)) == NO_SLOT)
    1236          28 :         WaitForWorkers(AH, pstate, WFW_ONE_IDLE);
    1237             : 
    1238             :     /* Construct and send command string */
    1239          44 :     buildWorkerCommand(AH, te, act, buf, sizeof(buf));
    1240             : 
    1241          44 :     sendMessageToWorker(pstate, worker, buf);
    1242             : 
    1243             :     /* Remember worker is busy, and which TocEntry it's working on */
    1244          44 :     pstate->parallelSlot[worker].workerStatus = WRKR_WORKING;
    1245          44 :     pstate->parallelSlot[worker].callback = callback;
    1246          44 :     pstate->parallelSlot[worker].callback_data = callback_data;
    1247          44 :     pstate->te[worker] = te;
    1248          44 : }
    1249             : 
    1250             : /*
    1251             :  * Find an idle worker and return its slot number.
    1252             :  * Return NO_SLOT if none are idle.
    1253             :  */
    1254             : static int
    1255         104 : GetIdleWorker(ParallelState *pstate)
    1256             : {
    1257             :     int         i;
    1258             : 
    1259         252 :     for (i = 0; i < pstate->numWorkers; i++)
    1260             :     {
    1261         196 :         if (pstate->parallelSlot[i].workerStatus == WRKR_IDLE)
    1262          48 :             return i;
    1263             :     }
    1264          56 :     return NO_SLOT;
    1265             : }
    1266             : 
    1267             : /*
    1268             :  * Return true iff no worker is running.
    1269             :  */
    1270             : static bool
    1271          36 : HasEveryWorkerTerminated(ParallelState *pstate)
    1272             : {
    1273             :     int         i;
    1274             : 
    1275          68 :     for (i = 0; i < pstate->numWorkers; i++)
    1276             :     {
    1277          56 :         if (WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
    1278          24 :             return false;
    1279             :     }
    1280          12 :     return true;
    1281             : }
    1282             : 
    1283             : /*
    1284             :  * Return true iff every worker is in the WRKR_IDLE state.
    1285             :  */
    1286             : bool
    1287          36 : IsEveryWorkerIdle(ParallelState *pstate)
    1288             : {
    1289             :     int         i;
    1290             : 
    1291          78 :     for (i = 0; i < pstate->numWorkers; i++)
    1292             :     {
    1293          58 :         if (pstate->parallelSlot[i].workerStatus != WRKR_IDLE)
    1294          16 :             return false;
    1295             :     }
    1296          20 :     return true;
    1297             : }
    1298             : 
    1299             : /*
    1300             :  * Acquire lock on a table to be dumped by a worker process.
    1301             :  *
    1302             :  * The master process is already holding an ACCESS SHARE lock.  Ordinarily
    1303             :  * it's no problem for a worker to get one too, but if anything else besides
    1304             :  * pg_dump is running, there's a possible deadlock:
    1305             :  *
    1306             :  * 1) Master dumps the schema and locks all tables in ACCESS SHARE mode.
    1307             :  * 2) Another process requests an ACCESS EXCLUSIVE lock (which is not granted
    1308             :  *    because the master holds a conflicting ACCESS SHARE lock).
    1309             :  * 3) A worker process also requests an ACCESS SHARE lock to read the table.
    1310             :  *    The worker is enqueued behind the ACCESS EXCLUSIVE lock request.
    1311             :  * 4) Now we have a deadlock, since the master is effectively waiting for
    1312             :  *    the worker.  The server cannot detect that, however.
    1313             :  *
    1314             :  * To prevent an infinite wait, prior to touching a table in a worker, request
    1315             :  * a lock in ACCESS SHARE mode but with NOWAIT.  If we don't get the lock,
    1316             :  * then we know that somebody else has requested an ACCESS EXCLUSIVE lock and
    1317             :  * so we have a deadlock.  We must fail the backup in that case.
    1318             :  */
    1319             : static void
    1320          40 : lockTableForWorker(ArchiveHandle *AH, TocEntry *te)
    1321             : {
    1322             :     const char *qualId;
    1323             :     PQExpBuffer query;
    1324             :     PGresult   *res;
    1325             : 
    1326             :     /* Nothing to do for BLOBS */
    1327          40 :     if (strcmp(te->desc, "BLOBS") == 0)
    1328           2 :         return;
    1329             : 
    1330          38 :     query = createPQExpBuffer();
    1331             : 
    1332          38 :     qualId = fmtQualifiedId(te->namespace, te->tag);
    1333             : 
    1334          38 :     appendPQExpBuffer(query, "LOCK TABLE %s IN ACCESS SHARE MODE NOWAIT",
    1335             :                       qualId);
    1336             : 
    1337          38 :     res = PQexec(AH->connection, query->data);
    1338             : 
    1339          38 :     if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
    1340           0 :         fatal("could not obtain lock on relation \"%s\"\n"
    1341             :               "This usually means that someone requested an ACCESS EXCLUSIVE lock "
    1342             :               "on the table after the pg_dump parent process had gotten the "
    1343             :               "initial ACCESS SHARE lock on the table.", qualId);
    1344             : 
    1345          38 :     PQclear(res);
    1346          38 :     destroyPQExpBuffer(query);
    1347             : }
    1348             : 
    1349             : /*
    1350             :  * WaitForCommands: main routine for a worker process.
    1351             :  *
    1352             :  * Read and execute commands from the master until we see EOF on the pipe.
    1353             :  */
    1354             : static void
    1355          24 : WaitForCommands(ArchiveHandle *AH, int pipefd[2])
    1356             : {
    1357             :     char       *command;
    1358             :     TocEntry   *te;
    1359             :     T_Action    act;
    1360          24 :     int         status = 0;
    1361             :     char        buf[256];
    1362             : 
    1363             :     for (;;)
    1364             :     {
    1365          68 :         if (!(command = getMessageFromMaster(pipefd)))
    1366             :         {
    1367             :             /* EOF, so done */
    1368          24 :             return;
    1369             :         }
    1370             : 
    1371             :         /* Decode the command */
    1372          44 :         parseWorkerCommand(AH, &te, &act, command);
    1373             : 
    1374          44 :         if (act == ACT_DUMP)
    1375             :         {
    1376             :             /* Acquire lock on this table within the worker's session */
    1377          40 :             lockTableForWorker(AH, te);
    1378             : 
    1379             :             /* Perform the dump command */
    1380          40 :             status = (AH->WorkerJobDumpPtr) (AH, te);
    1381             :         }
    1382           4 :         else if (act == ACT_RESTORE)
    1383             :         {
    1384             :             /* Perform the restore command */
    1385           4 :             status = (AH->WorkerJobRestorePtr) (AH, te);
    1386             :         }
    1387             :         else
    1388             :             Assert(false);
    1389             : 
    1390             :         /* Return status to master */
    1391          44 :         buildWorkerResponse(AH, te, act, status, buf, sizeof(buf));
    1392             : 
    1393          44 :         sendMessageToMaster(pipefd, buf);
    1394             : 
    1395             :         /* command was pg_malloc'd and we are responsible for free()ing it. */
    1396          44 :         free(command);
    1397             :     }
    1398             : }
    1399             : 
    1400             : /*
    1401             :  * Check for status messages from workers.
    1402             :  *
    1403             :  * If do_wait is true, wait to get a status message; otherwise, just return
    1404             :  * immediately if there is none available.
    1405             :  *
    1406             :  * When we get a status message, we pass the status code to the callback
    1407             :  * function that was specified to DispatchJobForTocEntry, then reset the
    1408             :  * worker status to IDLE.
    1409             :  *
    1410             :  * Returns true if we collected a status message, else false.
    1411             :  *
    1412             :  * XXX is it worth checking for more than one status message per call?
    1413             :  * It seems somewhat unlikely that multiple workers would finish at exactly
    1414             :  * the same time.
    1415             :  */
    1416             : static bool
    1417          84 : ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait)
    1418             : {
    1419             :     int         worker;
    1420             :     char       *msg;
    1421             : 
    1422             :     /* Try to collect a status message */
    1423          84 :     msg = getMessageFromWorker(pstate, do_wait, &worker);
    1424             : 
    1425          84 :     if (!msg)
    1426             :     {
    1427             :         /* If do_wait is true, we must have detected EOF on some socket */
    1428          40 :         if (do_wait)
    1429           0 :             fatal("a worker process died unexpectedly");
    1430          40 :         return false;
    1431             :     }
    1432             : 
    1433             :     /* Process it and update our idea of the worker's status */
    1434          44 :     if (messageStartsWith(msg, "OK "))
    1435             :     {
    1436          44 :         ParallelSlot *slot = &pstate->parallelSlot[worker];
    1437          44 :         TocEntry   *te = pstate->te[worker];
    1438             :         int         status;
    1439             : 
    1440          44 :         status = parseWorkerResponse(AH, te, msg);
    1441          44 :         slot->callback(AH, te, status, slot->callback_data);
    1442          44 :         slot->workerStatus = WRKR_IDLE;
    1443          44 :         pstate->te[worker] = NULL;
    1444             :     }
    1445             :     else
    1446           0 :         fatal("invalid message received from worker: \"%s\"",
    1447             :               msg);
    1448             : 
    1449             :     /* Free the string returned from getMessageFromWorker */
    1450          44 :     free(msg);
    1451             : 
    1452          44 :     return true;
    1453             : }
    1454             : 
    1455             : /*
    1456             :  * Check for status results from workers, waiting if necessary.
    1457             :  *
    1458             :  * Available wait modes are:
    1459             :  * WFW_NO_WAIT: reap any available status, but don't block
    1460             :  * WFW_GOT_STATUS: wait for at least one more worker to finish
    1461             :  * WFW_ONE_IDLE: wait for at least one worker to be idle
    1462             :  * WFW_ALL_IDLE: wait for all workers to be idle
    1463             :  *
    1464             :  * Any received results are passed to the callback specified to
    1465             :  * DispatchJobForTocEntry.
    1466             :  *
    1467             :  * This function is executed in the master process.
    1468             :  */
    1469             : void
    1470          44 : WaitForWorkers(ArchiveHandle *AH, ParallelState *pstate, WFW_WaitOption mode)
    1471             : {
    1472          44 :     bool        do_wait = false;
    1473             : 
    1474             :     /*
    1475             :      * In GOT_STATUS mode, always block waiting for a message, since we can't
    1476             :      * return till we get something.  In other modes, we don't block the first
    1477             :      * time through the loop.
    1478             :      */
    1479          44 :     if (mode == WFW_GOT_STATUS)
    1480             :     {
    1481             :         /* Assert that caller knows what it's doing */
    1482             :         Assert(!IsEveryWorkerIdle(pstate));
    1483           4 :         do_wait = true;
    1484             :     }
    1485             : 
    1486             :     for (;;)
    1487             :     {
    1488             :         /*
    1489             :          * Check for status messages, even if we don't need to block.  We do
    1490             :          * not try very hard to reap all available messages, though, since
    1491             :          * there's unlikely to be more than one.
    1492             :          */
    1493          84 :         if (ListenToWorkers(AH, pstate, do_wait))
    1494             :         {
    1495             :             /*
    1496             :              * If we got a message, we are done by definition for GOT_STATUS
    1497             :              * mode, and we can also be certain that there's at least one idle
    1498             :              * worker.  So we're done in all but ALL_IDLE mode.
    1499             :              */
    1500          44 :             if (mode != WFW_ALL_IDLE)
    1501          32 :                 return;
    1502             :         }
    1503             : 
    1504             :         /* Check whether we must wait for new status messages */
    1505          52 :         switch (mode)
    1506             :         {
    1507           0 :             case WFW_NO_WAIT:
    1508           0 :                 return;         /* never wait */
    1509           0 :             case WFW_GOT_STATUS:
    1510             :                 Assert(false);  /* can't get here, because we waited */
    1511           0 :                 break;
    1512          32 :             case WFW_ONE_IDLE:
    1513          32 :                 if (GetIdleWorker(pstate) != NO_SLOT)
    1514           4 :                     return;
    1515          28 :                 break;
    1516          20 :             case WFW_ALL_IDLE:
    1517          20 :                 if (IsEveryWorkerIdle(pstate))
    1518           8 :                     return;
    1519          12 :                 break;
    1520             :         }
    1521             : 
    1522             :         /* Loop back, and this time wait for something to happen */
    1523          40 :         do_wait = true;
    1524             :     }
    1525             : }
    1526             : 
    1527             : /*
    1528             :  * Read one command message from the master, blocking if necessary
    1529             :  * until one is available, and return it as a malloc'd string.
    1530             :  * On EOF, return NULL.
    1531             :  *
    1532             :  * This function is executed in worker processes.
    1533             :  */
    1534             : static char *
    1535          68 : getMessageFromMaster(int pipefd[2])
    1536             : {
    1537          68 :     return readMessageFromPipe(pipefd[PIPE_READ]);
    1538             : }
    1539             : 
    1540             : /*
    1541             :  * Send a status message to the master.
    1542             :  *
    1543             :  * This function is executed in worker processes.
    1544             :  */
    1545             : static void
    1546          44 : sendMessageToMaster(int pipefd[2], const char *str)
    1547             : {
    1548          44 :     int         len = strlen(str) + 1;
    1549             : 
    1550          44 :     if (pipewrite(pipefd[PIPE_WRITE], str, len) != len)
    1551           0 :         fatal("could not write to the communication channel: %m");
    1552          44 : }
    1553             : 
    1554             : /*
    1555             :  * Wait until some descriptor in "workerset" becomes readable.
    1556             :  * Returns -1 on error, else the number of readable descriptors.
    1557             :  */
    1558             : static int
    1559          44 : select_loop(int maxFd, fd_set *workerset)
    1560             : {
    1561             :     int         i;
    1562          44 :     fd_set      saveSet = *workerset;
    1563             : 
    1564             :     for (;;)
    1565             :     {
    1566          44 :         *workerset = saveSet;
    1567          44 :         i = select(maxFd + 1, workerset, NULL, NULL, NULL);
    1568             : 
    1569             : #ifndef WIN32
    1570          44 :         if (i < 0 && errno == EINTR)
    1571           0 :             continue;
    1572             : #else
    1573             :         if (i == SOCKET_ERROR && WSAGetLastError() == WSAEINTR)
    1574             :             continue;
    1575             : #endif
    1576          44 :         break;
    1577             :     }
    1578             : 
    1579          44 :     return i;
    1580             : }
    1581             : 
    1582             : 
    1583             : /*
    1584             :  * Check for messages from worker processes.
    1585             :  *
    1586             :  * If a message is available, return it as a malloc'd string, and put the
    1587             :  * index of the sending worker in *worker.
    1588             :  *
    1589             :  * If nothing is available, wait if "do_wait" is true, else return NULL.
    1590             :  *
    1591             :  * If we detect EOF on any socket, we'll return NULL.  It's not great that
    1592             :  * that's hard to distinguish from the no-data-available case, but for now
    1593             :  * our one caller is okay with that.
    1594             :  *
    1595             :  * This function is executed in the master process.
    1596             :  */
    1597             : static char *
    1598          84 : getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker)
    1599             : {
    1600             :     int         i;
    1601             :     fd_set      workerset;
    1602          84 :     int         maxFd = -1;
    1603          84 :     struct timeval nowait = {0, 0};
    1604             : 
    1605             :     /* construct bitmap of socket descriptors for select() */
    1606          84 :     FD_ZERO(&workerset);
    1607         252 :     for (i = 0; i < pstate->numWorkers; i++)
    1608             :     {
    1609         168 :         if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
    1610           0 :             continue;
    1611         168 :         FD_SET(pstate->parallelSlot[i].pipeRead, &workerset);
    1612         168 :         if (pstate->parallelSlot[i].pipeRead > maxFd)
    1613         168 :             maxFd = pstate->parallelSlot[i].pipeRead;
    1614             :     }
    1615             : 
    1616          84 :     if (do_wait)
    1617             :     {
    1618          44 :         i = select_loop(maxFd, &workerset);
    1619             :         Assert(i != 0);
    1620             :     }
    1621             :     else
    1622             :     {
    1623          40 :         if ((i = select(maxFd + 1, &workerset, NULL, NULL, &nowait)) == 0)
    1624          40 :             return NULL;
    1625             :     }
    1626             : 
    1627          44 :     if (i < 0)
    1628           0 :         fatal("select() failed: %m");
    1629             : 
    1630          76 :     for (i = 0; i < pstate->numWorkers; i++)
    1631             :     {
    1632             :         char       *msg;
    1633             : 
    1634          76 :         if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
    1635           0 :             continue;
    1636          76 :         if (!FD_ISSET(pstate->parallelSlot[i].pipeRead, &workerset))
    1637          32 :             continue;
    1638             : 
    1639             :         /*
    1640             :          * Read the message if any.  If the socket is ready because of EOF,
    1641             :          * we'll return NULL instead (and the socket will stay ready, so the
    1642             :          * condition will persist).
    1643             :          *
    1644             :          * Note: because this is a blocking read, we'll wait if only part of
    1645             :          * the message is available.  Waiting a long time would be bad, but
    1646             :          * since worker status messages are short and are always sent in one
    1647             :          * operation, it shouldn't be a problem in practice.
    1648             :          */
    1649          44 :         msg = readMessageFromPipe(pstate->parallelSlot[i].pipeRead);
    1650          44 :         *worker = i;
    1651          44 :         return msg;
    1652             :     }
    1653             :     Assert(false);
    1654           0 :     return NULL;
    1655             : }
    1656             : 
    1657             : /*
    1658             :  * Send a command message to the specified worker process.
    1659             :  *
    1660             :  * This function is executed in the master process.
    1661             :  */
    1662             : static void
    1663          44 : sendMessageToWorker(ParallelState *pstate, int worker, const char *str)
    1664             : {
    1665          44 :     int         len = strlen(str) + 1;
    1666             : 
    1667          44 :     if (pipewrite(pstate->parallelSlot[worker].pipeWrite, str, len) != len)
    1668             :     {
    1669           0 :         fatal("could not write to the communication channel: %m");
    1670             :     }
    1671          44 : }
    1672             : 
    1673             : /*
    1674             :  * Read one message from the specified pipe (fd), blocking if necessary
    1675             :  * until one is available, and return it as a malloc'd string.
    1676             :  * On EOF, return NULL.
    1677             :  *
    1678             :  * A "message" on the channel is just a null-terminated string.
    1679             :  */
    1680             : static char *
    1681         112 : readMessageFromPipe(int fd)
    1682             : {
    1683             :     char       *msg;
    1684             :     int         msgsize,
    1685             :                 bufsize;
    1686             :     int         ret;
    1687             : 
    1688             :     /*
    1689             :      * In theory, if we let piperead() read multiple bytes, it might give us
    1690             :      * back fragments of multiple messages.  (That can't actually occur, since
    1691             :      * neither master nor workers send more than one message without waiting
    1692             :      * for a reply, but we don't wish to assume that here.)  For simplicity,
    1693             :      * read a byte at a time until we get the terminating '\0'.  This method
    1694             :      * is a bit inefficient, but since this is only used for relatively short
    1695             :      * command and status strings, it shouldn't matter.
    1696             :      */
    1697         112 :     bufsize = 64;               /* could be any number */
    1698         112 :     msg = (char *) pg_malloc(bufsize);
    1699         112 :     msgsize = 0;
    1700             :     for (;;)
    1701             :     {
    1702         892 :         Assert(msgsize < bufsize);
    1703        1004 :         ret = piperead(fd, msg + msgsize, 1);
    1704        1004 :         if (ret <= 0)
    1705          24 :             break;              /* error or connection closure */
    1706             : 
    1707             :         Assert(ret == 1);
    1708             : 
    1709         980 :         if (msg[msgsize] == '\0')
    1710          88 :             return msg;         /* collected whole message */
    1711             : 
    1712         892 :         msgsize++;
    1713         892 :         if (msgsize == bufsize) /* enlarge buffer if needed */
    1714             :         {
    1715           0 :             bufsize += 16;      /* could be any number */
    1716           0 :             msg = (char *) pg_realloc(msg, bufsize);
    1717             :         }
    1718             :     }
    1719             : 
    1720             :     /* Other end has closed the connection */
    1721          24 :     pg_free(msg);
    1722          24 :     return NULL;
    1723             : }
    1724             : 
    1725             : #ifdef WIN32
    1726             : 
    1727             : /*
    1728             :  * This is a replacement version of pipe(2) for Windows which allows the pipe
    1729             :  * handles to be used in select().
    1730             :  *
    1731             :  * Reads and writes on the pipe must go through piperead()/pipewrite().
    1732             :  *
    1733             :  * For consistency with Unix we declare the returned handles as "int".
    1734             :  * This is okay even on WIN64 because system handles are not more than
    1735             :  * 32 bits wide, but we do have to do some casting.
    1736             :  */
    1737             : static int
    1738             : pgpipe(int handles[2])
    1739             : {
    1740             :     pgsocket    s,
    1741             :                 tmp_sock;
    1742             :     struct sockaddr_in serv_addr;
    1743             :     int         len = sizeof(serv_addr);
    1744             : 
    1745             :     /* We have to use the Unix socket invalid file descriptor value here. */
    1746             :     handles[0] = handles[1] = -1;
    1747             : 
    1748             :     /*
    1749             :      * setup listen socket
    1750             :      */
    1751             :     if ((s = socket(AF_INET, SOCK_STREAM, 0)) == PGINVALID_SOCKET)
    1752             :     {
    1753             :         pg_log_error("pgpipe: could not create socket: error code %d",
    1754             :                      WSAGetLastError());
    1755             :         return -1;
    1756             :     }
    1757             : 
    1758             :     memset((void *) &serv_addr, 0, sizeof(serv_addr));
    1759             :     serv_addr.sin_family = AF_INET;
    1760             :     serv_addr.sin_port = pg_hton16(0);
    1761             :     serv_addr.sin_addr.s_addr = pg_hton32(INADDR_LOOPBACK);
    1762             :     if (bind(s, (SOCKADDR *) &serv_addr, len) == SOCKET_ERROR)
    1763             :     {
    1764             :         pg_log_error("pgpipe: could not bind: error code %d",
    1765             :                      WSAGetLastError());
    1766             :         closesocket(s);
    1767             :         return -1;
    1768             :     }
    1769             :     if (listen(s, 1) == SOCKET_ERROR)
    1770             :     {
    1771             :         pg_log_error("pgpipe: could not listen: error code %d",
    1772             :                      WSAGetLastError());
    1773             :         closesocket(s);
    1774             :         return -1;
    1775             :     }
    1776             :     if (getsockname(s, (SOCKADDR *) &serv_addr, &len) == SOCKET_ERROR)
    1777             :     {
    1778             :         pg_log_error("pgpipe: getsockname() failed: error code %d",
    1779             :                      WSAGetLastError());
    1780             :         closesocket(s);
    1781             :         return -1;
    1782             :     }
    1783             : 
    1784             :     /*
    1785             :      * setup pipe handles
    1786             :      */
    1787             :     if ((tmp_sock = socket(AF_INET, SOCK_STREAM, 0)) == PGINVALID_SOCKET)
    1788             :     {
    1789             :         pg_log_error("pgpipe: could not create second socket: error code %d",
    1790             :                      WSAGetLastError());
    1791             :         closesocket(s);
    1792             :         return -1;
    1793             :     }
    1794             :     handles[1] = (int) tmp_sock;
    1795             : 
    1796             :     if (connect(handles[1], (SOCKADDR *) &serv_addr, len) == SOCKET_ERROR)
    1797             :     {
    1798             :         pg_log_error("pgpipe: could not connect socket: error code %d",
    1799             :                      WSAGetLastError());
    1800             :         closesocket(handles[1]);
    1801             :         handles[1] = -1;
    1802             :         closesocket(s);
    1803             :         return -1;
    1804             :     }
    1805             :     if ((tmp_sock = accept(s, (SOCKADDR *) &serv_addr, &len)) == PGINVALID_SOCKET)
    1806             :     {
    1807             :         pg_log_error("pgpipe: could not accept connection: error code %d",
    1808             :                      WSAGetLastError());
    1809             :         closesocket(handles[1]);
    1810             :         handles[1] = -1;
    1811             :         closesocket(s);
    1812             :         return -1;
    1813             :     }
    1814             :     handles[0] = (int) tmp_sock;
    1815             : 
    1816             :     closesocket(s);
    1817             :     return 0;
    1818             : }
    1819             : 
    1820             : /*
    1821             :  * Windows implementation of reading from a pipe.
    1822             :  */
    1823             : static int
    1824             : piperead(int s, char *buf, int len)
    1825             : {
    1826             :     int         ret = recv(s, buf, len, 0);
    1827             : 
    1828             :     if (ret < 0 && WSAGetLastError() == WSAECONNRESET)
    1829             :     {
    1830             :         /* EOF on the pipe! */
    1831             :         ret = 0;
    1832             :     }
    1833             :     return ret;
    1834             : }
    1835             : 
    1836             : #endif                          /* WIN32 */

Generated by: LCOV version 1.13