Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * parallel.c
4 : *
5 : * Parallel support for pg_dump and pg_restore
6 : *
7 : * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
8 : * Portions Copyright (c) 1994, Regents of the University of California
9 : *
10 : * IDENTIFICATION
11 : * src/bin/pg_dump/parallel.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 :
16 : /*
17 : * Parallel operation works like this:
18 : *
19 : * The original, leader process calls ParallelBackupStart(), which forks off
20 : * the desired number of worker processes, which each enter WaitForCommands().
21 : *
22 : * The leader process dispatches an individual work item to one of the worker
23 : * processes in DispatchJobForTocEntry(). We send a command string such as
24 : * "DUMP 1234" or "RESTORE 1234", where 1234 is the TocEntry ID.
25 : * The worker process receives and decodes the command and passes it to the
26 : * routine pointed to by AH->WorkerJobDumpPtr or AH->WorkerJobRestorePtr,
27 : * which are routines of the current archive format. That routine performs
28 : * the required action (dump or restore) and returns an integer status code.
29 : * This is passed back to the leader where we pass it to the
30 : * ParallelCompletionPtr callback function that was passed to
31 : * DispatchJobForTocEntry(). The callback function does state updating
32 : * for the leader control logic in pg_backup_archiver.c.
33 : *
34 : * In principle additional archive-format-specific information might be needed
35 : * in commands or worker status responses, but so far that hasn't proved
36 : * necessary, since workers have full copies of the ArchiveHandle/TocEntry
37 : * data structures. Remember that we have forked off the workers only after
38 : * we have read in the catalog. That's why our worker processes can also
39 : * access the catalog information. (In the Windows case, the workers are
40 : * threads in the same process. To avoid problems, they work with cloned
41 : * copies of the Archive data structure; see RunWorker().)
42 : *
43 : * In the leader process, the workerStatus field for each worker has one of
44 : * the following values:
45 : * WRKR_NOT_STARTED: we've not yet forked this worker
46 : * WRKR_IDLE: it's waiting for a command
47 : * WRKR_WORKING: it's working on a command
48 : * WRKR_TERMINATED: process ended
49 : * The pstate->te[] entry for each worker is valid when it's in WRKR_WORKING
50 : * state, and must be NULL in other states.
51 : */
52 :
53 : #include "postgres_fe.h"
54 :
55 : #ifndef WIN32
56 : #include <sys/select.h>
57 : #include <sys/wait.h>
58 : #include <signal.h>
59 : #include <unistd.h>
60 : #include <fcntl.h>
61 : #endif
62 :
63 : #include "fe_utils/string_utils.h"
64 : #include "parallel.h"
65 : #include "pg_backup_utils.h"
66 : #include "port/pg_bswap.h"
67 :
68 : /* Mnemonic macros for indexing the fd array returned by pipe(2) */
69 : #define PIPE_READ 0
70 : #define PIPE_WRITE 1
71 :
72 : #define NO_SLOT (-1) /* Failure result for GetIdleWorker() */
73 :
74 : /* Worker process statuses */
75 : typedef enum
76 : {
77 : WRKR_NOT_STARTED = 0,
78 : WRKR_IDLE,
79 : WRKR_WORKING,
80 : WRKR_TERMINATED,
81 : } T_WorkerStatus;
82 :
83 : #define WORKER_IS_RUNNING(workerStatus) \
84 : ((workerStatus) == WRKR_IDLE || (workerStatus) == WRKR_WORKING)
85 :
86 : /*
87 : * Private per-parallel-worker state (typedef for this is in parallel.h).
88 : *
89 : * Much of this is valid only in the leader process (or, on Windows, should
90 : * be touched only by the leader thread). But the AH field should be touched
91 : * only by workers. The pipe descriptors are valid everywhere.
92 : */
93 : struct ParallelSlot
94 : {
95 : T_WorkerStatus workerStatus; /* see enum above */
96 :
97 : /* These fields are valid if workerStatus == WRKR_WORKING: */
98 : ParallelCompletionPtr callback; /* function to call on completion */
99 : void *callback_data; /* passthrough data for it */
100 :
101 : ArchiveHandle *AH; /* Archive data worker is using */
102 :
103 : int pipeRead; /* leader's end of the pipes */
104 : int pipeWrite;
105 : int pipeRevRead; /* child's end of the pipes */
106 : int pipeRevWrite;
107 :
108 : /* Child process/thread identity info: */
109 : #ifdef WIN32
110 : uintptr_t hThread;
111 : unsigned int threadId;
112 : #else
113 : pid_t pid;
114 : #endif
115 : };
116 :
117 : #ifdef WIN32
118 :
119 : /*
120 : * Structure to hold info passed by _beginthreadex() to the function it calls
121 : * via its single allowed argument.
122 : */
123 : typedef struct
124 : {
125 : ArchiveHandle *AH; /* leader database connection */
126 : ParallelSlot *slot; /* this worker's parallel slot */
127 : } WorkerInfo;
128 :
129 : /* Windows implementation of pipe access */
130 : static int pgpipe(int handles[2]);
131 : #define piperead(a,b,c) recv(a,b,c,0)
132 : #define pipewrite(a,b,c) send(a,b,c,0)
133 :
134 : #else /* !WIN32 */
135 :
136 : /* Non-Windows implementation of pipe access */
137 : #define pgpipe(a) pipe(a)
138 : #define piperead(a,b,c) read(a,b,c)
139 : #define pipewrite(a,b,c) write(a,b,c)
140 :
141 : #endif /* WIN32 */
142 :
143 : /*
144 : * State info for archive_close_connection() shutdown callback.
145 : */
146 : typedef struct ShutdownInformation
147 : {
148 : ParallelState *pstate;
149 : Archive *AHX;
150 : } ShutdownInformation;
151 :
152 : static ShutdownInformation shutdown_info;
153 :
154 : /*
155 : * State info for signal handling.
156 : * We assume signal_info initializes to zeroes.
157 : *
158 : * On Unix, myAH is the leader DB connection in the leader process, and the
159 : * worker's own connection in worker processes. On Windows, we have only one
160 : * instance of signal_info, so myAH is the leader connection and the worker
161 : * connections must be dug out of pstate->parallelSlot[].
162 : */
163 : typedef struct DumpSignalInformation
164 : {
165 : ArchiveHandle *myAH; /* database connection to issue cancel for */
166 : ParallelState *pstate; /* parallel state, if any */
167 : bool handler_set; /* signal handler set up in this process? */
168 : #ifndef WIN32
169 : bool am_worker; /* am I a worker process? */
170 : #endif
171 : } DumpSignalInformation;
172 :
173 : static volatile DumpSignalInformation signal_info;
174 :
175 : #ifdef WIN32
176 : static CRITICAL_SECTION signal_info_lock;
177 : #endif
178 :
179 : /*
180 : * Write a simple string to stderr --- must be safe in a signal handler.
181 : * We ignore the write() result since there's not much we could do about it.
182 : * Certain compilers make that harder than it ought to be.
183 : */
184 : #define write_stderr(str) \
185 : do { \
186 : const char *str_ = (str); \
187 : int rc_; \
188 : rc_ = write(fileno(stderr), str_, strlen(str_)); \
189 : (void) rc_; \
190 : } while (0)
191 :
192 :
193 : #ifdef WIN32
194 : /* file-scope variables */
195 : static DWORD tls_index;
196 :
197 : /* globally visible variables (needed by exit_nicely) */
198 : bool parallel_init_done = false;
199 : DWORD mainThreadId;
200 : #endif /* WIN32 */
201 :
202 : /* Local function prototypes */
203 : static ParallelSlot *GetMyPSlot(ParallelState *pstate);
204 : static void archive_close_connection(int code, void *arg);
205 : static void ShutdownWorkersHard(ParallelState *pstate);
206 : static void WaitForTerminatingWorkers(ParallelState *pstate);
207 : static void set_cancel_handler(void);
208 : static void set_cancel_pstate(ParallelState *pstate);
209 : static void set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH);
210 : static void RunWorker(ArchiveHandle *AH, ParallelSlot *slot);
211 : static int GetIdleWorker(ParallelState *pstate);
212 : static bool HasEveryWorkerTerminated(ParallelState *pstate);
213 : static void lockTableForWorker(ArchiveHandle *AH, TocEntry *te);
214 : static void WaitForCommands(ArchiveHandle *AH, int pipefd[2]);
215 : static bool ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate,
216 : bool do_wait);
217 : static char *getMessageFromLeader(int pipefd[2]);
218 : static void sendMessageToLeader(int pipefd[2], const char *str);
219 : static int select_loop(int maxFd, fd_set *workerset);
220 : static char *getMessageFromWorker(ParallelState *pstate,
221 : bool do_wait, int *worker);
222 : static void sendMessageToWorker(ParallelState *pstate,
223 : int worker, const char *str);
224 : static char *readMessageFromPipe(int fd);
225 :
226 : #define messageStartsWith(msg, prefix) \
227 : (strncmp(msg, prefix, strlen(prefix)) == 0)
228 :
229 :
230 : /*
231 : * Initialize parallel dump support --- should be called early in process
232 : * startup. (Currently, this is called whether or not we intend parallel
233 : * activity.)
234 : */
235 : void
236 614 : init_parallel_dump_utils(void)
237 : {
238 : #ifdef WIN32
239 : if (!parallel_init_done)
240 : {
241 : WSADATA wsaData;
242 : int err;
243 :
244 : /* Prepare for threaded operation */
245 : tls_index = TlsAlloc();
246 : mainThreadId = GetCurrentThreadId();
247 :
248 : /* Initialize socket access */
249 : err = WSAStartup(MAKEWORD(2, 2), &wsaData);
250 : if (err != 0)
251 : pg_fatal("%s() failed: error code %d", "WSAStartup", err);
252 :
253 : parallel_init_done = true;
254 : }
255 : #endif
256 614 : }
257 :
258 : /*
259 : * Find the ParallelSlot for the current worker process or thread.
260 : *
261 : * Returns NULL if no matching slot is found (this implies we're the leader).
262 : */
263 : static ParallelSlot *
264 0 : GetMyPSlot(ParallelState *pstate)
265 : {
266 : int i;
267 :
268 0 : for (i = 0; i < pstate->numWorkers; i++)
269 : {
270 : #ifdef WIN32
271 : if (pstate->parallelSlot[i].threadId == GetCurrentThreadId())
272 : #else
273 0 : if (pstate->parallelSlot[i].pid == getpid())
274 : #endif
275 0 : return &(pstate->parallelSlot[i]);
276 : }
277 :
278 0 : return NULL;
279 : }
280 :
281 : /*
282 : * A thread-local version of getLocalPQExpBuffer().
283 : *
284 : * Non-reentrant but reduces memory leakage: we'll consume one buffer per
285 : * thread, which is much better than one per fmtId/fmtQualifiedId call.
286 : */
287 : #ifdef WIN32
288 : static PQExpBuffer
289 : getThreadLocalPQExpBuffer(void)
290 : {
291 : /*
292 : * The Tls code goes awry if we use a static var, so we provide for both
293 : * static and auto, and omit any use of the static var when using Tls. We
294 : * rely on TlsGetValue() to return 0 if the value is not yet set.
295 : */
296 : static PQExpBuffer s_id_return = NULL;
297 : PQExpBuffer id_return;
298 :
299 : if (parallel_init_done)
300 : id_return = (PQExpBuffer) TlsGetValue(tls_index);
301 : else
302 : id_return = s_id_return;
303 :
304 : if (id_return) /* first time through? */
305 : {
306 : /* same buffer, just wipe contents */
307 : resetPQExpBuffer(id_return);
308 : }
309 : else
310 : {
311 : /* new buffer */
312 : id_return = createPQExpBuffer();
313 : if (parallel_init_done)
314 : TlsSetValue(tls_index, id_return);
315 : else
316 : s_id_return = id_return;
317 : }
318 :
319 : return id_return;
320 : }
321 : #endif /* WIN32 */
322 :
323 : /*
324 : * pg_dump and pg_restore call this to register the cleanup handler
325 : * as soon as they've created the ArchiveHandle.
326 : */
327 : void
328 422 : on_exit_close_archive(Archive *AHX)
329 : {
330 422 : shutdown_info.AHX = AHX;
331 422 : on_exit_nicely(archive_close_connection, &shutdown_info);
332 422 : }
333 :
334 : /*
335 : * on_exit_nicely handler for shutting down database connections and
336 : * worker processes cleanly.
337 : */
338 : static void
339 342 : archive_close_connection(int code, void *arg)
340 : {
341 342 : ShutdownInformation *si = (ShutdownInformation *) arg;
342 :
343 342 : if (si->pstate)
344 : {
345 : /* In parallel mode, must figure out who we are */
346 0 : ParallelSlot *slot = GetMyPSlot(si->pstate);
347 :
348 0 : if (!slot)
349 : {
350 : /*
351 : * We're the leader. Forcibly shut down workers, then close our
352 : * own database connection, if any.
353 : */
354 0 : ShutdownWorkersHard(si->pstate);
355 :
356 0 : if (si->AHX)
357 0 : DisconnectDatabase(si->AHX);
358 : }
359 : else
360 : {
361 : /*
362 : * We're a worker. Shut down our own DB connection if any. On
363 : * Windows, we also have to close our communication sockets, to
364 : * emulate what will happen on Unix when the worker process exits.
365 : * (Without this, if this is a premature exit, the leader would
366 : * fail to detect it because there would be no EOF condition on
367 : * the other end of the pipe.)
368 : */
369 0 : if (slot->AH)
370 0 : DisconnectDatabase(&(slot->AH->public));
371 :
372 : #ifdef WIN32
373 : closesocket(slot->pipeRevRead);
374 : closesocket(slot->pipeRevWrite);
375 : #endif
376 : }
377 : }
378 : else
379 : {
380 : /* Non-parallel operation: just kill the leader DB connection */
381 342 : if (si->AHX)
382 342 : DisconnectDatabase(si->AHX);
383 : }
384 342 : }
385 :
386 : /*
387 : * Forcibly shut down any remaining workers, waiting for them to finish.
388 : *
389 : * Note that we don't expect to come here during normal exit (the workers
390 : * should be long gone, and the ParallelState too). We're only here in a
391 : * pg_fatal() situation, so intervening to cancel active commands is
392 : * appropriate.
393 : */
394 : static void
395 0 : ShutdownWorkersHard(ParallelState *pstate)
396 : {
397 : int i;
398 :
399 : /*
400 : * Close our write end of the sockets so that any workers waiting for
401 : * commands know they can exit. (Note: some of the pipeWrite fields might
402 : * still be zero, if we failed to initialize all the workers. Hence, just
403 : * ignore errors here.)
404 : */
405 0 : for (i = 0; i < pstate->numWorkers; i++)
406 0 : closesocket(pstate->parallelSlot[i].pipeWrite);
407 :
408 : /*
409 : * Force early termination of any commands currently in progress.
410 : */
411 : #ifndef WIN32
412 : /* On non-Windows, send SIGTERM to each worker process. */
413 0 : for (i = 0; i < pstate->numWorkers; i++)
414 : {
415 0 : pid_t pid = pstate->parallelSlot[i].pid;
416 :
417 0 : if (pid != 0)
418 0 : kill(pid, SIGTERM);
419 : }
420 : #else
421 :
422 : /*
423 : * On Windows, send query cancels directly to the workers' backends. Use
424 : * a critical section to ensure worker threads don't change state.
425 : */
426 : EnterCriticalSection(&signal_info_lock);
427 : for (i = 0; i < pstate->numWorkers; i++)
428 : {
429 : ArchiveHandle *AH = pstate->parallelSlot[i].AH;
430 : char errbuf[1];
431 :
432 : if (AH != NULL && AH->connCancel != NULL)
433 : (void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf));
434 : }
435 : LeaveCriticalSection(&signal_info_lock);
436 : #endif
437 :
438 : /* Now wait for them to terminate. */
439 0 : WaitForTerminatingWorkers(pstate);
440 0 : }
441 :
442 : /*
443 : * Wait for all workers to terminate.
444 : */
445 : static void
446 24 : WaitForTerminatingWorkers(ParallelState *pstate)
447 : {
448 76 : while (!HasEveryWorkerTerminated(pstate))
449 : {
450 52 : ParallelSlot *slot = NULL;
451 : int j;
452 :
453 : #ifndef WIN32
454 : /* On non-Windows, use wait() to wait for next worker to end */
455 : int status;
456 52 : pid_t pid = wait(&status);
457 :
458 : /* Find dead worker's slot, and clear the PID field */
459 84 : for (j = 0; j < pstate->numWorkers; j++)
460 : {
461 84 : slot = &(pstate->parallelSlot[j]);
462 84 : if (slot->pid == pid)
463 : {
464 52 : slot->pid = 0;
465 52 : break;
466 : }
467 : }
468 : #else /* WIN32 */
469 : /* On Windows, we must use WaitForMultipleObjects() */
470 : HANDLE *lpHandles = pg_malloc(sizeof(HANDLE) * pstate->numWorkers);
471 : int nrun = 0;
472 : DWORD ret;
473 : uintptr_t hThread;
474 :
475 : for (j = 0; j < pstate->numWorkers; j++)
476 : {
477 : if (WORKER_IS_RUNNING(pstate->parallelSlot[j].workerStatus))
478 : {
479 : lpHandles[nrun] = (HANDLE) pstate->parallelSlot[j].hThread;
480 : nrun++;
481 : }
482 : }
483 : ret = WaitForMultipleObjects(nrun, lpHandles, false, INFINITE);
484 : Assert(ret != WAIT_FAILED);
485 : hThread = (uintptr_t) lpHandles[ret - WAIT_OBJECT_0];
486 : free(lpHandles);
487 :
488 : /* Find dead worker's slot, and clear the hThread field */
489 : for (j = 0; j < pstate->numWorkers; j++)
490 : {
491 : slot = &(pstate->parallelSlot[j]);
492 : if (slot->hThread == hThread)
493 : {
494 : /* For cleanliness, close handles for dead threads */
495 : CloseHandle((HANDLE) slot->hThread);
496 : slot->hThread = (uintptr_t) INVALID_HANDLE_VALUE;
497 : break;
498 : }
499 : }
500 : #endif /* WIN32 */
501 :
502 : /* On all platforms, update workerStatus and te[] as well */
503 : Assert(j < pstate->numWorkers);
504 52 : slot->workerStatus = WRKR_TERMINATED;
505 52 : pstate->te[j] = NULL;
506 : }
507 24 : }
508 :
509 :
510 : /*
511 : * Code for responding to cancel interrupts (SIGINT, control-C, etc)
512 : *
513 : * This doesn't quite belong in this module, but it needs access to the
514 : * ParallelState data, so there's not really a better place either.
515 : *
516 : * When we get a cancel interrupt, we could just die, but in pg_restore that
517 : * could leave a SQL command (e.g., CREATE INDEX on a large table) running
518 : * for a long time. Instead, we try to send a cancel request and then die.
519 : * pg_dump probably doesn't really need this, but we might as well use it
520 : * there too. Note that sending the cancel directly from the signal handler
521 : * is safe because PQcancel() is written to make it so.
522 : *
523 : * In parallel operation on Unix, each process is responsible for canceling
524 : * its own connection (this must be so because nobody else has access to it).
525 : * Furthermore, the leader process should attempt to forward its signal to
526 : * each child. In simple manual use of pg_dump/pg_restore, forwarding isn't
527 : * needed because typing control-C at the console would deliver SIGINT to
528 : * every member of the terminal process group --- but in other scenarios it
529 : * might be that only the leader gets signaled.
530 : *
531 : * On Windows, the cancel handler runs in a separate thread, because that's
532 : * how SetConsoleCtrlHandler works. We make it stop worker threads, send
533 : * cancels on all active connections, and then return FALSE, which will allow
534 : * the process to die. For safety's sake, we use a critical section to
535 : * protect the PGcancel structures against being changed while the signal
536 : * thread runs.
537 : */
538 :
539 : #ifndef WIN32
540 :
541 : /*
542 : * Signal handler (Unix only)
543 : */
544 : static void
545 0 : sigTermHandler(SIGNAL_ARGS)
546 : {
547 : int i;
548 : char errbuf[1];
549 :
550 : /*
551 : * Some platforms allow delivery of new signals to interrupt an active
552 : * signal handler. That could muck up our attempt to send PQcancel, so
553 : * disable the signals that set_cancel_handler enabled.
554 : */
555 0 : pqsignal(SIGINT, SIG_IGN);
556 0 : pqsignal(SIGTERM, SIG_IGN);
557 0 : pqsignal(SIGQUIT, SIG_IGN);
558 :
559 : /*
560 : * If we're in the leader, forward signal to all workers. (It seems best
561 : * to do this before PQcancel; killing the leader transaction will result
562 : * in invalid-snapshot errors from active workers, which maybe we can
563 : * quiet by killing workers first.) Ignore any errors.
564 : */
565 0 : if (signal_info.pstate != NULL)
566 : {
567 0 : for (i = 0; i < signal_info.pstate->numWorkers; i++)
568 : {
569 0 : pid_t pid = signal_info.pstate->parallelSlot[i].pid;
570 :
571 0 : if (pid != 0)
572 0 : kill(pid, SIGTERM);
573 : }
574 : }
575 :
576 : /*
577 : * Send QueryCancel if we have a connection to send to. Ignore errors,
578 : * there's not much we can do about them anyway.
579 : */
580 0 : if (signal_info.myAH != NULL && signal_info.myAH->connCancel != NULL)
581 0 : (void) PQcancel(signal_info.myAH->connCancel, errbuf, sizeof(errbuf));
582 :
583 : /*
584 : * Report we're quitting, using nothing more complicated than write(2).
585 : * When in parallel operation, only the leader process should do this.
586 : */
587 0 : if (!signal_info.am_worker)
588 : {
589 0 : if (progname)
590 : {
591 0 : write_stderr(progname);
592 0 : write_stderr(": ");
593 : }
594 0 : write_stderr("terminated by user\n");
595 : }
596 :
597 : /*
598 : * And die, using _exit() not exit() because the latter will invoke atexit
599 : * handlers that can fail if we interrupted related code.
600 : */
601 0 : _exit(1);
602 : }
603 :
604 : /*
605 : * Enable cancel interrupt handler, if not already done.
606 : */
607 : static void
608 946 : set_cancel_handler(void)
609 : {
610 : /*
611 : * When forking, signal_info.handler_set will propagate into the new
612 : * process, but that's fine because the signal handler state does too.
613 : */
614 946 : if (!signal_info.handler_set)
615 : {
616 370 : signal_info.handler_set = true;
617 :
618 370 : pqsignal(SIGINT, sigTermHandler);
619 370 : pqsignal(SIGTERM, sigTermHandler);
620 370 : pqsignal(SIGQUIT, sigTermHandler);
621 : }
622 946 : }
623 :
624 : #else /* WIN32 */
625 :
626 : /*
627 : * Console interrupt handler --- runs in a newly-started thread.
628 : *
629 : * After stopping other threads and sending cancel requests on all open
630 : * connections, we return FALSE which will allow the default ExitProcess()
631 : * action to be taken.
632 : */
633 : static BOOL WINAPI
634 : consoleHandler(DWORD dwCtrlType)
635 : {
636 : int i;
637 : char errbuf[1];
638 :
639 : if (dwCtrlType == CTRL_C_EVENT ||
640 : dwCtrlType == CTRL_BREAK_EVENT)
641 : {
642 : /* Critical section prevents changing data we look at here */
643 : EnterCriticalSection(&signal_info_lock);
644 :
645 : /*
646 : * If in parallel mode, stop worker threads and send QueryCancel to
647 : * their connected backends. The main point of stopping the worker
648 : * threads is to keep them from reporting the query cancels as errors,
649 : * which would clutter the user's screen. We needn't stop the leader
650 : * thread since it won't be doing much anyway. Do this before
651 : * canceling the main transaction, else we might get invalid-snapshot
652 : * errors reported before we can stop the workers. Ignore errors,
653 : * there's not much we can do about them anyway.
654 : */
655 : if (signal_info.pstate != NULL)
656 : {
657 : for (i = 0; i < signal_info.pstate->numWorkers; i++)
658 : {
659 : ParallelSlot *slot = &(signal_info.pstate->parallelSlot[i]);
660 : ArchiveHandle *AH = slot->AH;
661 : HANDLE hThread = (HANDLE) slot->hThread;
662 :
663 : /*
664 : * Using TerminateThread here may leave some resources leaked,
665 : * but it doesn't matter since we're about to end the whole
666 : * process.
667 : */
668 : if (hThread != INVALID_HANDLE_VALUE)
669 : TerminateThread(hThread, 0);
670 :
671 : if (AH != NULL && AH->connCancel != NULL)
672 : (void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf));
673 : }
674 : }
675 :
676 : /*
677 : * Send QueryCancel to leader connection, if enabled. Ignore errors,
678 : * there's not much we can do about them anyway.
679 : */
680 : if (signal_info.myAH != NULL && signal_info.myAH->connCancel != NULL)
681 : (void) PQcancel(signal_info.myAH->connCancel,
682 : errbuf, sizeof(errbuf));
683 :
684 : LeaveCriticalSection(&signal_info_lock);
685 :
686 : /*
687 : * Report we're quitting, using nothing more complicated than
688 : * write(2). (We might be able to get away with using pg_log_*()
689 : * here, but since we terminated other threads uncleanly above, it
690 : * seems better to assume as little as possible.)
691 : */
692 : if (progname)
693 : {
694 : write_stderr(progname);
695 : write_stderr(": ");
696 : }
697 : write_stderr("terminated by user\n");
698 : }
699 :
700 : /* Always return FALSE to allow signal handling to continue */
701 : return FALSE;
702 : }
703 :
704 : /*
705 : * Enable cancel interrupt handler, if not already done.
706 : */
707 : static void
708 : set_cancel_handler(void)
709 : {
710 : if (!signal_info.handler_set)
711 : {
712 : signal_info.handler_set = true;
713 :
714 : InitializeCriticalSection(&signal_info_lock);
715 :
716 : SetConsoleCtrlHandler(consoleHandler, TRUE);
717 : }
718 : }
719 :
720 : #endif /* WIN32 */
721 :
722 :
723 : /*
724 : * set_archive_cancel_info
725 : *
726 : * Fill AH->connCancel with cancellation info for the specified database
727 : * connection; or clear it if conn is NULL.
728 : */
729 : void
730 946 : set_archive_cancel_info(ArchiveHandle *AH, PGconn *conn)
731 : {
732 : PGcancel *oldConnCancel;
733 :
734 : /*
735 : * Activate the interrupt handler if we didn't yet in this process. On
736 : * Windows, this also initializes signal_info_lock; therefore it's
737 : * important that this happen at least once before we fork off any
738 : * threads.
739 : */
740 946 : set_cancel_handler();
741 :
742 : /*
743 : * On Unix, we assume that storing a pointer value is atomic with respect
744 : * to any possible signal interrupt. On Windows, use a critical section.
745 : */
746 :
747 : #ifdef WIN32
748 : EnterCriticalSection(&signal_info_lock);
749 : #endif
750 :
751 : /* Free the old one if we have one */
752 946 : oldConnCancel = AH->connCancel;
753 : /* be sure interrupt handler doesn't use pointer while freeing */
754 946 : AH->connCancel = NULL;
755 :
756 946 : if (oldConnCancel != NULL)
757 484 : PQfreeCancel(oldConnCancel);
758 :
759 : /* Set the new one if specified */
760 946 : if (conn)
761 488 : AH->connCancel = PQgetCancel(conn);
762 :
763 : /*
764 : * On Unix, there's only ever one active ArchiveHandle per process, so we
765 : * can just set signal_info.myAH unconditionally. On Windows, do that
766 : * only in the main thread; worker threads have to make sure their
767 : * ArchiveHandle appears in the pstate data, which is dealt with in
768 : * RunWorker().
769 : */
770 : #ifndef WIN32
771 946 : signal_info.myAH = AH;
772 : #else
773 : if (mainThreadId == GetCurrentThreadId())
774 : signal_info.myAH = AH;
775 : #endif
776 :
777 : #ifdef WIN32
778 : LeaveCriticalSection(&signal_info_lock);
779 : #endif
780 946 : }
781 :
782 : /*
783 : * set_cancel_pstate
784 : *
785 : * Set signal_info.pstate to point to the specified ParallelState, if any.
786 : * We need this mainly to have an interlock against Windows signal thread.
787 : */
788 : static void
789 48 : set_cancel_pstate(ParallelState *pstate)
790 : {
791 : #ifdef WIN32
792 : EnterCriticalSection(&signal_info_lock);
793 : #endif
794 :
795 48 : signal_info.pstate = pstate;
796 :
797 : #ifdef WIN32
798 : LeaveCriticalSection(&signal_info_lock);
799 : #endif
800 48 : }
801 :
802 : /*
803 : * set_cancel_slot_archive
804 : *
805 : * Set ParallelSlot's AH field to point to the specified archive, if any.
806 : * We need this mainly to have an interlock against Windows signal thread.
807 : */
808 : static void
809 104 : set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH)
810 : {
811 : #ifdef WIN32
812 : EnterCriticalSection(&signal_info_lock);
813 : #endif
814 :
815 104 : slot->AH = AH;
816 :
817 : #ifdef WIN32
818 : LeaveCriticalSection(&signal_info_lock);
819 : #endif
820 104 : }
821 :
822 :
823 : /*
824 : * This function is called by both Unix and Windows variants to set up
825 : * and run a worker process. Caller should exit the process (or thread)
826 : * upon return.
827 : */
828 : static void
829 52 : RunWorker(ArchiveHandle *AH, ParallelSlot *slot)
830 : {
831 : int pipefd[2];
832 :
833 : /* fetch child ends of pipes */
834 52 : pipefd[PIPE_READ] = slot->pipeRevRead;
835 52 : pipefd[PIPE_WRITE] = slot->pipeRevWrite;
836 :
837 : /*
838 : * Clone the archive so that we have our own state to work with, and in
839 : * particular our own database connection.
840 : *
841 : * We clone on Unix as well as Windows, even though technically we don't
842 : * need to because fork() gives us a copy in our own address space
843 : * already. But CloneArchive resets the state information and also clones
844 : * the database connection which both seem kinda helpful.
845 : */
846 52 : AH = CloneArchive(AH);
847 :
848 : /* Remember cloned archive where signal handler can find it */
849 52 : set_cancel_slot_archive(slot, AH);
850 :
851 : /*
852 : * Call the setup worker function that's defined in the ArchiveHandle.
853 : */
854 52 : (AH->SetupWorkerPtr) ((Archive *) AH);
855 :
856 : /*
857 : * Execute commands until done.
858 : */
859 52 : WaitForCommands(AH, pipefd);
860 :
861 : /*
862 : * Disconnect from database and clean up.
863 : */
864 52 : set_cancel_slot_archive(slot, NULL);
865 52 : DisconnectDatabase(&(AH->public));
866 52 : DeCloneArchive(AH);
867 52 : }
868 :
869 : /*
870 : * Thread base function for Windows
871 : */
872 : #ifdef WIN32
873 : static unsigned __stdcall
874 : init_spawned_worker_win32(WorkerInfo *wi)
875 : {
876 : ArchiveHandle *AH = wi->AH;
877 : ParallelSlot *slot = wi->slot;
878 :
879 : /* Don't need WorkerInfo anymore */
880 : free(wi);
881 :
882 : /* Run the worker ... */
883 : RunWorker(AH, slot);
884 :
885 : /* Exit the thread */
886 : _endthreadex(0);
887 : return 0;
888 : }
889 : #endif /* WIN32 */
890 :
891 : /*
892 : * This function starts a parallel dump or restore by spawning off the worker
893 : * processes. For Windows, it creates a number of threads; on Unix the
894 : * workers are created with fork().
895 : */
896 : ParallelState *
897 28 : ParallelBackupStart(ArchiveHandle *AH)
898 : {
899 : ParallelState *pstate;
900 : int i;
901 :
902 : Assert(AH->public.numWorkers > 0);
903 :
904 28 : pstate = (ParallelState *) pg_malloc(sizeof(ParallelState));
905 :
906 28 : pstate->numWorkers = AH->public.numWorkers;
907 28 : pstate->te = NULL;
908 28 : pstate->parallelSlot = NULL;
909 :
910 28 : if (AH->public.numWorkers == 1)
911 4 : return pstate;
912 :
913 : /* Create status arrays, being sure to initialize all fields to 0 */
914 24 : pstate->te = (TocEntry **)
915 24 : pg_malloc0(pstate->numWorkers * sizeof(TocEntry *));
916 24 : pstate->parallelSlot = (ParallelSlot *)
917 24 : pg_malloc0(pstate->numWorkers * sizeof(ParallelSlot));
918 :
919 : #ifdef WIN32
920 : /* Make fmtId() and fmtQualifiedId() use thread-local storage */
921 : getLocalPQExpBuffer = getThreadLocalPQExpBuffer;
922 : #endif
923 :
924 : /*
925 : * Set the pstate in shutdown_info, to tell the exit handler that it must
926 : * clean up workers as well as the main database connection. But we don't
927 : * set this in signal_info yet, because we don't want child processes to
928 : * inherit non-NULL signal_info.pstate.
929 : */
930 24 : shutdown_info.pstate = pstate;
931 :
932 : /*
933 : * Temporarily disable query cancellation on the leader connection. This
934 : * ensures that child processes won't inherit valid AH->connCancel
935 : * settings and thus won't try to issue cancels against the leader's
936 : * connection. No harm is done if we fail while it's disabled, because
937 : * the leader connection is idle at this point anyway.
938 : */
939 24 : set_archive_cancel_info(AH, NULL);
940 :
941 : /* Ensure stdio state is quiesced before forking */
942 24 : fflush(NULL);
943 :
944 : /* Create desired number of workers */
945 76 : for (i = 0; i < pstate->numWorkers; i++)
946 : {
947 : #ifdef WIN32
948 : WorkerInfo *wi;
949 : uintptr_t handle;
950 : #else
951 : pid_t pid;
952 : #endif
953 52 : ParallelSlot *slot = &(pstate->parallelSlot[i]);
954 : int pipeMW[2],
955 : pipeWM[2];
956 :
957 : /* Create communication pipes for this worker */
958 52 : if (pgpipe(pipeMW) < 0 || pgpipe(pipeWM) < 0)
959 0 : pg_fatal("could not create communication channels: %m");
960 :
961 : /* leader's ends of the pipes */
962 52 : slot->pipeRead = pipeWM[PIPE_READ];
963 52 : slot->pipeWrite = pipeMW[PIPE_WRITE];
964 : /* child's ends of the pipes */
965 52 : slot->pipeRevRead = pipeMW[PIPE_READ];
966 52 : slot->pipeRevWrite = pipeWM[PIPE_WRITE];
967 :
968 : #ifdef WIN32
969 : /* Create transient structure to pass args to worker function */
970 : wi = (WorkerInfo *) pg_malloc(sizeof(WorkerInfo));
971 :
972 : wi->AH = AH;
973 : wi->slot = slot;
974 :
975 : handle = _beginthreadex(NULL, 0, (void *) &init_spawned_worker_win32,
976 : wi, 0, &(slot->threadId));
977 : slot->hThread = handle;
978 : slot->workerStatus = WRKR_IDLE;
979 : #else /* !WIN32 */
980 52 : pid = fork();
981 104 : if (pid == 0)
982 : {
983 : /* we are the worker */
984 : int j;
985 :
986 : /* this is needed for GetMyPSlot() */
987 52 : slot->pid = getpid();
988 :
989 : /* instruct signal handler that we're in a worker now */
990 52 : signal_info.am_worker = true;
991 :
992 : /* close read end of Worker -> Leader */
993 52 : closesocket(pipeWM[PIPE_READ]);
994 : /* close write end of Leader -> Worker */
995 52 : closesocket(pipeMW[PIPE_WRITE]);
996 :
997 : /*
998 : * Close all inherited fds for communication of the leader with
999 : * previously-forked workers.
1000 : */
1001 84 : for (j = 0; j < i; j++)
1002 : {
1003 32 : closesocket(pstate->parallelSlot[j].pipeRead);
1004 32 : closesocket(pstate->parallelSlot[j].pipeWrite);
1005 : }
1006 :
1007 : /* Run the worker ... */
1008 52 : RunWorker(AH, slot);
1009 :
1010 : /* We can just exit(0) when done */
1011 52 : exit(0);
1012 : }
1013 52 : else if (pid < 0)
1014 : {
1015 : /* fork failed */
1016 0 : pg_fatal("could not create worker process: %m");
1017 : }
1018 :
1019 : /* In Leader after successful fork */
1020 52 : slot->pid = pid;
1021 52 : slot->workerStatus = WRKR_IDLE;
1022 :
1023 : /* close read end of Leader -> Worker */
1024 52 : closesocket(pipeMW[PIPE_READ]);
1025 : /* close write end of Worker -> Leader */
1026 52 : closesocket(pipeWM[PIPE_WRITE]);
1027 : #endif /* WIN32 */
1028 : }
1029 :
1030 : /*
1031 : * Having forked off the workers, disable SIGPIPE so that leader isn't
1032 : * killed if it tries to send a command to a dead worker. We don't want
1033 : * the workers to inherit this setting, though.
1034 : */
1035 : #ifndef WIN32
1036 24 : pqsignal(SIGPIPE, SIG_IGN);
1037 : #endif
1038 :
1039 : /*
1040 : * Re-establish query cancellation on the leader connection.
1041 : */
1042 24 : set_archive_cancel_info(AH, AH->connection);
1043 :
1044 : /*
1045 : * Tell the cancel signal handler to forward signals to worker processes,
1046 : * too. (As with query cancel, we did not need this earlier because the
1047 : * workers have not yet been given anything to do; if we die before this
1048 : * point, any already-started workers will see EOF and quit promptly.)
1049 : */
1050 24 : set_cancel_pstate(pstate);
1051 :
1052 24 : return pstate;
1053 : }
1054 :
1055 : /*
1056 : * Close down a parallel dump or restore.
1057 : */
1058 : void
1059 28 : ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate)
1060 : {
1061 : int i;
1062 :
1063 : /* No work if non-parallel */
1064 28 : if (pstate->numWorkers == 1)
1065 4 : return;
1066 :
1067 : /* There should not be any unfinished jobs */
1068 : Assert(IsEveryWorkerIdle(pstate));
1069 :
1070 : /* Close the sockets so that the workers know they can exit */
1071 76 : for (i = 0; i < pstate->numWorkers; i++)
1072 : {
1073 52 : closesocket(pstate->parallelSlot[i].pipeRead);
1074 52 : closesocket(pstate->parallelSlot[i].pipeWrite);
1075 : }
1076 :
1077 : /* Wait for them to exit */
1078 24 : WaitForTerminatingWorkers(pstate);
1079 :
1080 : /*
1081 : * Unlink pstate from shutdown_info, so the exit handler will not try to
1082 : * use it; and likewise unlink from signal_info.
1083 : */
1084 24 : shutdown_info.pstate = NULL;
1085 24 : set_cancel_pstate(NULL);
1086 :
1087 : /* Release state (mere neatnik-ism, since we're about to terminate) */
1088 24 : free(pstate->te);
1089 24 : free(pstate->parallelSlot);
1090 24 : free(pstate);
1091 : }
1092 :
1093 : /*
1094 : * These next four functions handle construction and parsing of the command
1095 : * strings and response strings for parallel workers.
1096 : *
1097 : * Currently, these can be the same regardless of which archive format we are
1098 : * processing. In future, we might want to let format modules override these
1099 : * functions to add format-specific data to a command or response.
1100 : */
1101 :
1102 : /*
1103 : * buildWorkerCommand: format a command string to send to a worker.
1104 : *
1105 : * The string is built in the caller-supplied buffer of size buflen.
1106 : */
1107 : static void
1108 328 : buildWorkerCommand(ArchiveHandle *AH, TocEntry *te, T_Action act,
1109 : char *buf, int buflen)
1110 : {
1111 328 : if (act == ACT_DUMP)
1112 236 : snprintf(buf, buflen, "DUMP %d", te->dumpId);
1113 92 : else if (act == ACT_RESTORE)
1114 92 : snprintf(buf, buflen, "RESTORE %d", te->dumpId);
1115 : else
1116 : Assert(false);
1117 328 : }
1118 :
1119 : /*
1120 : * parseWorkerCommand: interpret a command string in a worker.
1121 : */
1122 : static void
1123 328 : parseWorkerCommand(ArchiveHandle *AH, TocEntry **te, T_Action *act,
1124 : const char *msg)
1125 : {
1126 : DumpId dumpId;
1127 : int nBytes;
1128 :
1129 328 : if (messageStartsWith(msg, "DUMP "))
1130 : {
1131 236 : *act = ACT_DUMP;
1132 236 : sscanf(msg, "DUMP %d%n", &dumpId, &nBytes);
1133 : Assert(nBytes == strlen(msg));
1134 236 : *te = getTocEntryByDumpId(AH, dumpId);
1135 : Assert(*te != NULL);
1136 : }
1137 92 : else if (messageStartsWith(msg, "RESTORE "))
1138 : {
1139 92 : *act = ACT_RESTORE;
1140 92 : sscanf(msg, "RESTORE %d%n", &dumpId, &nBytes);
1141 : Assert(nBytes == strlen(msg));
1142 92 : *te = getTocEntryByDumpId(AH, dumpId);
1143 : Assert(*te != NULL);
1144 : }
1145 : else
1146 0 : pg_fatal("unrecognized command received from leader: \"%s\"",
1147 : msg);
1148 328 : }
1149 :
1150 : /*
1151 : * buildWorkerResponse: format a response string to send to the leader.
1152 : *
1153 : * The string is built in the caller-supplied buffer of size buflen.
1154 : */
1155 : static void
1156 328 : buildWorkerResponse(ArchiveHandle *AH, TocEntry *te, T_Action act, int status,
1157 : char *buf, int buflen)
1158 : {
1159 328 : snprintf(buf, buflen, "OK %d %d %d",
1160 : te->dumpId,
1161 : status,
1162 : status == WORKER_IGNORED_ERRORS ? AH->public.n_errors : 0);
1163 328 : }
1164 :
1165 : /*
1166 : * parseWorkerResponse: parse the status message returned by a worker.
1167 : *
1168 : * Returns the integer status code, and may update fields of AH and/or te.
1169 : */
1170 : static int
1171 328 : parseWorkerResponse(ArchiveHandle *AH, TocEntry *te,
1172 : const char *msg)
1173 : {
1174 : DumpId dumpId;
1175 : int nBytes,
1176 : n_errors;
1177 328 : int status = 0;
1178 :
1179 328 : if (messageStartsWith(msg, "OK "))
1180 : {
1181 328 : sscanf(msg, "OK %d %d %d%n", &dumpId, &status, &n_errors, &nBytes);
1182 :
1183 : Assert(dumpId == te->dumpId);
1184 : Assert(nBytes == strlen(msg));
1185 :
1186 328 : AH->public.n_errors += n_errors;
1187 : }
1188 : else
1189 0 : pg_fatal("invalid message received from worker: \"%s\"",
1190 : msg);
1191 :
1192 328 : return status;
1193 : }
1194 :
1195 : /*
1196 : * Dispatch a job to some free worker.
1197 : *
1198 : * te is the TocEntry to be processed, act is the action to be taken on it.
1199 : * callback is the function to call on completion of the job.
1200 : *
1201 : * If no worker is currently available, this will block, and previously
1202 : * registered callback functions may be called.
1203 : */
1204 : void
1205 328 : DispatchJobForTocEntry(ArchiveHandle *AH,
1206 : ParallelState *pstate,
1207 : TocEntry *te,
1208 : T_Action act,
1209 : ParallelCompletionPtr callback,
1210 : void *callback_data)
1211 : {
1212 : int worker;
1213 : char buf[256];
1214 :
1215 : /* Get a worker, waiting if none are idle */
1216 534 : while ((worker = GetIdleWorker(pstate)) == NO_SLOT)
1217 206 : WaitForWorkers(AH, pstate, WFW_ONE_IDLE);
1218 :
1219 : /* Construct and send command string */
1220 328 : buildWorkerCommand(AH, te, act, buf, sizeof(buf));
1221 :
1222 328 : sendMessageToWorker(pstate, worker, buf);
1223 :
1224 : /* Remember worker is busy, and which TocEntry it's working on */
1225 328 : pstate->parallelSlot[worker].workerStatus = WRKR_WORKING;
1226 328 : pstate->parallelSlot[worker].callback = callback;
1227 328 : pstate->parallelSlot[worker].callback_data = callback_data;
1228 328 : pstate->te[worker] = te;
1229 328 : }
1230 :
1231 : /*
1232 : * Find an idle worker and return its slot number.
1233 : * Return NO_SLOT if none are idle.
1234 : */
1235 : static int
1236 832 : GetIdleWorker(ParallelState *pstate)
1237 : {
1238 : int i;
1239 :
1240 2006 : for (i = 0; i < pstate->numWorkers; i++)
1241 : {
1242 1528 : if (pstate->parallelSlot[i].workerStatus == WRKR_IDLE)
1243 354 : return i;
1244 : }
1245 478 : return NO_SLOT;
1246 : }
1247 :
1248 : /*
1249 : * Return true iff no worker is running.
1250 : */
1251 : static bool
1252 76 : HasEveryWorkerTerminated(ParallelState *pstate)
1253 : {
1254 : int i;
1255 :
1256 152 : for (i = 0; i < pstate->numWorkers; i++)
1257 : {
1258 128 : if (WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
1259 52 : return false;
1260 : }
1261 24 : return true;
1262 : }
1263 :
1264 : /*
1265 : * Return true iff every worker is in the WRKR_IDLE state.
1266 : */
1267 : bool
1268 96 : IsEveryWorkerIdle(ParallelState *pstate)
1269 : {
1270 : int i;
1271 :
1272 210 : for (i = 0; i < pstate->numWorkers; i++)
1273 : {
1274 170 : if (pstate->parallelSlot[i].workerStatus != WRKR_IDLE)
1275 56 : return false;
1276 : }
1277 40 : return true;
1278 : }
1279 :
1280 : /*
1281 : * Acquire lock on a table to be dumped by a worker process.
1282 : *
1283 : * The leader process is already holding an ACCESS SHARE lock. Ordinarily
1284 : * it's no problem for a worker to get one too, but if anything else besides
1285 : * pg_dump is running, there's a possible deadlock:
1286 : *
1287 : * 1) Leader dumps the schema and locks all tables in ACCESS SHARE mode.
1288 : * 2) Another process requests an ACCESS EXCLUSIVE lock (which is not granted
1289 : * because the leader holds a conflicting ACCESS SHARE lock).
1290 : * 3) A worker process also requests an ACCESS SHARE lock to read the table.
1291 : * The worker is enqueued behind the ACCESS EXCLUSIVE lock request.
1292 : * 4) Now we have a deadlock, since the leader is effectively waiting for
1293 : * the worker. The server cannot detect that, however.
1294 : *
1295 : * To prevent an infinite wait, prior to touching a table in a worker, request
1296 : * a lock in ACCESS SHARE mode but with NOWAIT. If we don't get the lock,
1297 : * then we know that somebody else has requested an ACCESS EXCLUSIVE lock and
1298 : * so we have a deadlock. We must fail the backup in that case.
1299 : */
1300 : static void
1301 236 : lockTableForWorker(ArchiveHandle *AH, TocEntry *te)
1302 : {
1303 : const char *qualId;
1304 : PQExpBuffer query;
1305 : PGresult *res;
1306 :
1307 : /* Nothing to do for BLOBS */
1308 236 : if (strcmp(te->desc, "BLOBS") == 0)
1309 12 : return;
1310 :
1311 224 : query = createPQExpBuffer();
1312 :
1313 224 : qualId = fmtQualifiedId(te->namespace, te->tag);
1314 :
1315 224 : appendPQExpBuffer(query, "LOCK TABLE %s IN ACCESS SHARE MODE NOWAIT",
1316 : qualId);
1317 :
1318 224 : res = PQexec(AH->connection, query->data);
1319 :
1320 224 : if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
1321 0 : pg_fatal("could not obtain lock on relation \"%s\"\n"
1322 : "This usually means that someone requested an ACCESS EXCLUSIVE lock "
1323 : "on the table after the pg_dump parent process had gotten the "
1324 : "initial ACCESS SHARE lock on the table.", qualId);
1325 :
1326 224 : PQclear(res);
1327 224 : destroyPQExpBuffer(query);
1328 : }
1329 :
1330 : /*
1331 : * WaitForCommands: main routine for a worker process.
1332 : *
1333 : * Read and execute commands from the leader until we see EOF on the pipe.
1334 : */
1335 : static void
1336 52 : WaitForCommands(ArchiveHandle *AH, int pipefd[2])
1337 : {
1338 : char *command;
1339 : TocEntry *te;
1340 : T_Action act;
1341 52 : int status = 0;
1342 : char buf[256];
1343 :
1344 : for (;;)
1345 : {
1346 380 : if (!(command = getMessageFromLeader(pipefd)))
1347 : {
1348 : /* EOF, so done */
1349 52 : return;
1350 : }
1351 :
1352 : /* Decode the command */
1353 328 : parseWorkerCommand(AH, &te, &act, command);
1354 :
1355 328 : if (act == ACT_DUMP)
1356 : {
1357 : /* Acquire lock on this table within the worker's session */
1358 236 : lockTableForWorker(AH, te);
1359 :
1360 : /* Perform the dump command */
1361 236 : status = (AH->WorkerJobDumpPtr) (AH, te);
1362 : }
1363 92 : else if (act == ACT_RESTORE)
1364 : {
1365 : /* Perform the restore command */
1366 92 : status = (AH->WorkerJobRestorePtr) (AH, te);
1367 : }
1368 : else
1369 : Assert(false);
1370 :
1371 : /* Return status to leader */
1372 328 : buildWorkerResponse(AH, te, act, status, buf, sizeof(buf));
1373 :
1374 328 : sendMessageToLeader(pipefd, buf);
1375 :
1376 : /* command was pg_malloc'd and we are responsible for free()ing it. */
1377 328 : free(command);
1378 : }
1379 : }
1380 :
1381 : /*
1382 : * Check for status messages from workers.
1383 : *
1384 : * If do_wait is true, wait to get a status message; otherwise, just return
1385 : * immediately if there is none available.
1386 : *
1387 : * When we get a status message, we pass the status code to the callback
1388 : * function that was specified to DispatchJobForTocEntry, then reset the
1389 : * worker status to IDLE.
1390 : *
1391 : * Returns true if we collected a status message, else false.
1392 : *
1393 : * XXX is it worth checking for more than one status message per call?
1394 : * It seems somewhat unlikely that multiple workers would finish at exactly
1395 : * the same time.
1396 : */
1397 : static bool
1398 642 : ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait)
1399 : {
1400 : int worker;
1401 : char *msg;
1402 :
1403 : /* Try to collect a status message */
1404 642 : msg = getMessageFromWorker(pstate, do_wait, &worker);
1405 :
1406 642 : if (!msg)
1407 : {
1408 : /* If do_wait is true, we must have detected EOF on some socket */
1409 314 : if (do_wait)
1410 0 : pg_fatal("a worker process died unexpectedly");
1411 314 : return false;
1412 : }
1413 :
1414 : /* Process it and update our idea of the worker's status */
1415 328 : if (messageStartsWith(msg, "OK "))
1416 : {
1417 328 : ParallelSlot *slot = &pstate->parallelSlot[worker];
1418 328 : TocEntry *te = pstate->te[worker];
1419 : int status;
1420 :
1421 328 : status = parseWorkerResponse(AH, te, msg);
1422 328 : slot->callback(AH, te, status, slot->callback_data);
1423 328 : slot->workerStatus = WRKR_IDLE;
1424 328 : pstate->te[worker] = NULL;
1425 : }
1426 : else
1427 0 : pg_fatal("invalid message received from worker: \"%s\"",
1428 : msg);
1429 :
1430 : /* Free the string returned from getMessageFromWorker */
1431 328 : free(msg);
1432 :
1433 328 : return true;
1434 : }
1435 :
1436 : /*
1437 : * Check for status results from workers, waiting if necessary.
1438 : *
1439 : * Available wait modes are:
1440 : * WFW_NO_WAIT: reap any available status, but don't block
1441 : * WFW_GOT_STATUS: wait for at least one more worker to finish
1442 : * WFW_ONE_IDLE: wait for at least one worker to be idle
1443 : * WFW_ALL_IDLE: wait for all workers to be idle
1444 : *
1445 : * Any received results are passed to the callback specified to
1446 : * DispatchJobForTocEntry.
1447 : *
1448 : * This function is executed in the leader process.
1449 : */
1450 : void
1451 340 : WaitForWorkers(ArchiveHandle *AH, ParallelState *pstate, WFW_WaitOption mode)
1452 : {
1453 340 : bool do_wait = false;
1454 :
1455 : /*
1456 : * In GOT_STATUS mode, always block waiting for a message, since we can't
1457 : * return till we get something. In other modes, we don't block the first
1458 : * time through the loop.
1459 : */
1460 340 : if (mode == WFW_GOT_STATUS)
1461 : {
1462 : /* Assert that caller knows what it's doing */
1463 : Assert(!IsEveryWorkerIdle(pstate));
1464 26 : do_wait = true;
1465 : }
1466 :
1467 : for (;;)
1468 : {
1469 : /*
1470 : * Check for status messages, even if we don't need to block. We do
1471 : * not try very hard to reap all available messages, though, since
1472 : * there's unlikely to be more than one.
1473 : */
1474 642 : if (ListenToWorkers(AH, pstate, do_wait))
1475 : {
1476 : /*
1477 : * If we got a message, we are done by definition for GOT_STATUS
1478 : * mode, and we can also be certain that there's at least one idle
1479 : * worker. So we're done in all but ALL_IDLE mode.
1480 : */
1481 328 : if (mode != WFW_ALL_IDLE)
1482 298 : return;
1483 : }
1484 :
1485 : /* Check whether we must wait for new status messages */
1486 344 : switch (mode)
1487 : {
1488 0 : case WFW_NO_WAIT:
1489 0 : return; /* never wait */
1490 0 : case WFW_GOT_STATUS:
1491 : Assert(false); /* can't get here, because we waited */
1492 0 : break;
1493 298 : case WFW_ONE_IDLE:
1494 298 : if (GetIdleWorker(pstate) != NO_SLOT)
1495 26 : return;
1496 272 : break;
1497 46 : case WFW_ALL_IDLE:
1498 46 : if (IsEveryWorkerIdle(pstate))
1499 16 : return;
1500 30 : break;
1501 : }
1502 :
1503 : /* Loop back, and this time wait for something to happen */
1504 302 : do_wait = true;
1505 : }
1506 : }
1507 :
1508 : /*
1509 : * Read one command message from the leader, blocking if necessary
1510 : * until one is available, and return it as a malloc'd string.
1511 : * On EOF, return NULL.
1512 : *
1513 : * This function is executed in worker processes.
1514 : */
1515 : static char *
1516 380 : getMessageFromLeader(int pipefd[2])
1517 : {
1518 380 : return readMessageFromPipe(pipefd[PIPE_READ]);
1519 : }
1520 :
1521 : /*
1522 : * Send a status message to the leader.
1523 : *
1524 : * This function is executed in worker processes.
1525 : */
1526 : static void
1527 328 : sendMessageToLeader(int pipefd[2], const char *str)
1528 : {
1529 328 : int len = strlen(str) + 1;
1530 :
1531 328 : if (pipewrite(pipefd[PIPE_WRITE], str, len) != len)
1532 0 : pg_fatal("could not write to the communication channel: %m");
1533 328 : }
1534 :
1535 : /*
1536 : * Wait until some descriptor in "workerset" becomes readable.
1537 : * Returns -1 on error, else the number of readable descriptors.
1538 : */
1539 : static int
1540 328 : select_loop(int maxFd, fd_set *workerset)
1541 : {
1542 : int i;
1543 328 : fd_set saveSet = *workerset;
1544 :
1545 : for (;;)
1546 : {
1547 328 : *workerset = saveSet;
1548 328 : i = select(maxFd + 1, workerset, NULL, NULL, NULL);
1549 :
1550 : #ifndef WIN32
1551 328 : if (i < 0 && errno == EINTR)
1552 0 : continue;
1553 : #else
1554 : if (i == SOCKET_ERROR && WSAGetLastError() == WSAEINTR)
1555 : continue;
1556 : #endif
1557 328 : break;
1558 : }
1559 :
1560 328 : return i;
1561 : }
1562 :
1563 :
1564 : /*
1565 : * Check for messages from worker processes.
1566 : *
1567 : * If a message is available, return it as a malloc'd string, and put the
1568 : * index of the sending worker in *worker.
1569 : *
1570 : * If nothing is available, wait if "do_wait" is true, else return NULL.
1571 : *
1572 : * If we detect EOF on any socket, we'll return NULL. It's not great that
1573 : * that's hard to distinguish from the no-data-available case, but for now
1574 : * our one caller is okay with that.
1575 : *
1576 : * This function is executed in the leader process.
1577 : */
1578 : static char *
1579 642 : getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker)
1580 : {
1581 : int i;
1582 : fd_set workerset;
1583 642 : int maxFd = -1;
1584 642 : struct timeval nowait = {0, 0};
1585 :
1586 : /* construct bitmap of socket descriptors for select() */
1587 642 : FD_ZERO(&workerset);
1588 2102 : for (i = 0; i < pstate->numWorkers; i++)
1589 : {
1590 1460 : if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
1591 0 : continue;
1592 1460 : FD_SET(pstate->parallelSlot[i].pipeRead, &workerset);
1593 1460 : if (pstate->parallelSlot[i].pipeRead > maxFd)
1594 1460 : maxFd = pstate->parallelSlot[i].pipeRead;
1595 : }
1596 :
1597 642 : if (do_wait)
1598 : {
1599 328 : i = select_loop(maxFd, &workerset);
1600 : Assert(i != 0);
1601 : }
1602 : else
1603 : {
1604 314 : if ((i = select(maxFd + 1, &workerset, NULL, NULL, &nowait)) == 0)
1605 314 : return NULL;
1606 : }
1607 :
1608 328 : if (i < 0)
1609 0 : pg_fatal("%s() failed: %m", "select");
1610 :
1611 450 : for (i = 0; i < pstate->numWorkers; i++)
1612 : {
1613 : char *msg;
1614 :
1615 450 : if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
1616 0 : continue;
1617 450 : if (!FD_ISSET(pstate->parallelSlot[i].pipeRead, &workerset))
1618 122 : continue;
1619 :
1620 : /*
1621 : * Read the message if any. If the socket is ready because of EOF,
1622 : * we'll return NULL instead (and the socket will stay ready, so the
1623 : * condition will persist).
1624 : *
1625 : * Note: because this is a blocking read, we'll wait if only part of
1626 : * the message is available. Waiting a long time would be bad, but
1627 : * since worker status messages are short and are always sent in one
1628 : * operation, it shouldn't be a problem in practice.
1629 : */
1630 328 : msg = readMessageFromPipe(pstate->parallelSlot[i].pipeRead);
1631 328 : *worker = i;
1632 328 : return msg;
1633 : }
1634 : Assert(false);
1635 0 : return NULL;
1636 : }
1637 :
1638 : /*
1639 : * Send a command message to the specified worker process.
1640 : *
1641 : * This function is executed in the leader process.
1642 : */
1643 : static void
1644 328 : sendMessageToWorker(ParallelState *pstate, int worker, const char *str)
1645 : {
1646 328 : int len = strlen(str) + 1;
1647 :
1648 328 : if (pipewrite(pstate->parallelSlot[worker].pipeWrite, str, len) != len)
1649 : {
1650 0 : pg_fatal("could not write to the communication channel: %m");
1651 : }
1652 328 : }
1653 :
1654 : /*
1655 : * Read one message from the specified pipe (fd), blocking if necessary
1656 : * until one is available, and return it as a malloc'd string.
1657 : * On EOF, return NULL.
1658 : *
1659 : * A "message" on the channel is just a null-terminated string.
1660 : */
1661 : static char *
1662 708 : readMessageFromPipe(int fd)
1663 : {
1664 : char *msg;
1665 : int msgsize,
1666 : bufsize;
1667 : int ret;
1668 :
1669 : /*
1670 : * In theory, if we let piperead() read multiple bytes, it might give us
1671 : * back fragments of multiple messages. (That can't actually occur, since
1672 : * neither leader nor workers send more than one message without waiting
1673 : * for a reply, but we don't wish to assume that here.) For simplicity,
1674 : * read a byte at a time until we get the terminating '\0'. This method
1675 : * is a bit inefficient, but since this is only used for relatively short
1676 : * command and status strings, it shouldn't matter.
1677 : */
1678 708 : bufsize = 64; /* could be any number */
1679 708 : msg = (char *) pg_malloc(bufsize);
1680 708 : msgsize = 0;
1681 : for (;;)
1682 : {
1683 6836 : Assert(msgsize < bufsize);
1684 7544 : ret = piperead(fd, msg + msgsize, 1);
1685 7544 : if (ret <= 0)
1686 52 : break; /* error or connection closure */
1687 :
1688 : Assert(ret == 1);
1689 :
1690 7492 : if (msg[msgsize] == '\0')
1691 656 : return msg; /* collected whole message */
1692 :
1693 6836 : msgsize++;
1694 6836 : if (msgsize == bufsize) /* enlarge buffer if needed */
1695 : {
1696 0 : bufsize += 16; /* could be any number */
1697 0 : msg = (char *) pg_realloc(msg, bufsize);
1698 : }
1699 : }
1700 :
1701 : /* Other end has closed the connection */
1702 52 : pg_free(msg);
1703 52 : return NULL;
1704 : }
1705 :
1706 : #ifdef WIN32
1707 :
1708 : /*
1709 : * This is a replacement version of pipe(2) for Windows which allows the pipe
1710 : * handles to be used in select().
1711 : *
1712 : * Reads and writes on the pipe must go through piperead()/pipewrite().
1713 : *
1714 : * For consistency with Unix we declare the returned handles as "int".
1715 : * This is okay even on WIN64 because system handles are not more than
1716 : * 32 bits wide, but we do have to do some casting.
1717 : */
1718 : static int
1719 : pgpipe(int handles[2])
1720 : {
1721 : pgsocket s,
1722 : tmp_sock;
1723 : struct sockaddr_in serv_addr;
1724 : int len = sizeof(serv_addr);
1725 :
1726 : /* We have to use the Unix socket invalid file descriptor value here. */
1727 : handles[0] = handles[1] = -1;
1728 :
1729 : /*
1730 : * setup listen socket
1731 : */
1732 : if ((s = socket(AF_INET, SOCK_STREAM, 0)) == PGINVALID_SOCKET)
1733 : {
1734 : pg_log_error("pgpipe: could not create socket: error code %d",
1735 : WSAGetLastError());
1736 : return -1;
1737 : }
1738 :
1739 : memset(&serv_addr, 0, sizeof(serv_addr));
1740 : serv_addr.sin_family = AF_INET;
1741 : serv_addr.sin_port = pg_hton16(0);
1742 : serv_addr.sin_addr.s_addr = pg_hton32(INADDR_LOOPBACK);
1743 : if (bind(s, (SOCKADDR *) &serv_addr, len) == SOCKET_ERROR)
1744 : {
1745 : pg_log_error("pgpipe: could not bind: error code %d",
1746 : WSAGetLastError());
1747 : closesocket(s);
1748 : return -1;
1749 : }
1750 : if (listen(s, 1) == SOCKET_ERROR)
1751 : {
1752 : pg_log_error("pgpipe: could not listen: error code %d",
1753 : WSAGetLastError());
1754 : closesocket(s);
1755 : return -1;
1756 : }
1757 : if (getsockname(s, (SOCKADDR *) &serv_addr, &len) == SOCKET_ERROR)
1758 : {
1759 : pg_log_error("pgpipe: %s() failed: error code %d", "getsockname",
1760 : WSAGetLastError());
1761 : closesocket(s);
1762 : return -1;
1763 : }
1764 :
1765 : /*
1766 : * setup pipe handles
1767 : */
1768 : if ((tmp_sock = socket(AF_INET, SOCK_STREAM, 0)) == PGINVALID_SOCKET)
1769 : {
1770 : pg_log_error("pgpipe: could not create second socket: error code %d",
1771 : WSAGetLastError());
1772 : closesocket(s);
1773 : return -1;
1774 : }
1775 : handles[1] = (int) tmp_sock;
1776 :
1777 : if (connect(handles[1], (SOCKADDR *) &serv_addr, len) == SOCKET_ERROR)
1778 : {
1779 : pg_log_error("pgpipe: could not connect socket: error code %d",
1780 : WSAGetLastError());
1781 : closesocket(handles[1]);
1782 : handles[1] = -1;
1783 : closesocket(s);
1784 : return -1;
1785 : }
1786 : if ((tmp_sock = accept(s, (SOCKADDR *) &serv_addr, &len)) == PGINVALID_SOCKET)
1787 : {
1788 : pg_log_error("pgpipe: could not accept connection: error code %d",
1789 : WSAGetLastError());
1790 : closesocket(handles[1]);
1791 : handles[1] = -1;
1792 : closesocket(s);
1793 : return -1;
1794 : }
1795 : handles[0] = (int) tmp_sock;
1796 :
1797 : closesocket(s);
1798 : return 0;
1799 : }
1800 :
1801 : #endif /* WIN32 */
|