Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * parallel.c
4 : *
5 : * Parallel support for pg_dump and pg_restore
6 : *
7 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
8 : * Portions Copyright (c) 1994, Regents of the University of California
9 : *
10 : * IDENTIFICATION
11 : * src/bin/pg_dump/parallel.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 :
16 : /*
17 : * Parallel operation works like this:
18 : *
19 : * The original, leader process calls ParallelBackupStart(), which forks off
20 : * the desired number of worker processes, which each enter WaitForCommands().
21 : *
22 : * The leader process dispatches an individual work item to one of the worker
23 : * processes in DispatchJobForTocEntry(). We send a command string such as
24 : * "DUMP 1234" or "RESTORE 1234", where 1234 is the TocEntry ID.
25 : * The worker process receives and decodes the command and passes it to the
26 : * routine pointed to by AH->WorkerJobDumpPtr or AH->WorkerJobRestorePtr,
27 : * which are routines of the current archive format. That routine performs
28 : * the required action (dump or restore) and returns an integer status code.
29 : * This is passed back to the leader where we pass it to the
30 : * ParallelCompletionPtr callback function that was passed to
31 : * DispatchJobForTocEntry(). The callback function does state updating
32 : * for the leader control logic in pg_backup_archiver.c.
33 : *
34 : * In principle additional archive-format-specific information might be needed
35 : * in commands or worker status responses, but so far that hasn't proved
36 : * necessary, since workers have full copies of the ArchiveHandle/TocEntry
37 : * data structures. Remember that we have forked off the workers only after
38 : * we have read in the catalog. That's why our worker processes can also
39 : * access the catalog information. (In the Windows case, the workers are
40 : * threads in the same process. To avoid problems, they work with cloned
41 : * copies of the Archive data structure; see RunWorker().)
42 : *
43 : * In the leader process, the workerStatus field for each worker has one of
44 : * the following values:
45 : * WRKR_NOT_STARTED: we've not yet forked this worker
46 : * WRKR_IDLE: it's waiting for a command
47 : * WRKR_WORKING: it's working on a command
48 : * WRKR_TERMINATED: process ended
49 : * The pstate->te[] entry for each worker is valid when it's in WRKR_WORKING
50 : * state, and must be NULL in other states.
51 : */
52 :
53 : #include "postgres_fe.h"
54 :
55 : #ifndef WIN32
56 : #include <sys/select.h>
57 : #include <sys/wait.h>
58 : #include <signal.h>
59 : #include <unistd.h>
60 : #include <fcntl.h>
61 : #endif
62 :
63 : #include "fe_utils/string_utils.h"
64 : #include "parallel.h"
65 : #include "pg_backup_utils.h"
66 : #ifdef WIN32
67 : #include "port/pg_bswap.h"
68 : #endif
69 :
70 : /* Mnemonic macros for indexing the fd array returned by pipe(2) */
71 : #define PIPE_READ 0
72 : #define PIPE_WRITE 1
73 :
74 : #define NO_SLOT (-1) /* Failure result for GetIdleWorker() */
75 :
76 : /* Worker process statuses */
77 : typedef enum
78 : {
79 : WRKR_NOT_STARTED = 0,
80 : WRKR_IDLE,
81 : WRKR_WORKING,
82 : WRKR_TERMINATED,
83 : } T_WorkerStatus;
84 :
85 : #define WORKER_IS_RUNNING(workerStatus) \
86 : ((workerStatus) == WRKR_IDLE || (workerStatus) == WRKR_WORKING)
87 :
88 : /*
89 : * Private per-parallel-worker state (typedef for this is in parallel.h).
90 : *
91 : * Much of this is valid only in the leader process (or, on Windows, should
92 : * be touched only by the leader thread). But the AH field should be touched
93 : * only by workers. The pipe descriptors are valid everywhere.
94 : */
95 : struct ParallelSlot
96 : {
97 : T_WorkerStatus workerStatus; /* see enum above */
98 :
99 : /* These fields are valid if workerStatus == WRKR_WORKING: */
100 : ParallelCompletionPtr callback; /* function to call on completion */
101 : void *callback_data; /* passthrough data for it */
102 :
103 : ArchiveHandle *AH; /* Archive data worker is using */
104 :
105 : int pipeRead; /* leader's end of the pipes */
106 : int pipeWrite;
107 : int pipeRevRead; /* child's end of the pipes */
108 : int pipeRevWrite;
109 :
110 : /* Child process/thread identity info: */
111 : #ifdef WIN32
112 : uintptr_t hThread;
113 : unsigned int threadId;
114 : #else
115 : pid_t pid;
116 : #endif
117 : };
118 :
119 : #ifdef WIN32
120 :
121 : /*
122 : * Structure to hold info passed by _beginthreadex() to the function it calls
123 : * via its single allowed argument.
124 : */
125 : typedef struct
126 : {
127 : ArchiveHandle *AH; /* leader database connection */
128 : ParallelSlot *slot; /* this worker's parallel slot */
129 : } WorkerInfo;
130 :
131 : /* Windows implementation of pipe access */
132 : static int pgpipe(int handles[2]);
133 : #define piperead(a,b,c) recv(a,b,c,0)
134 : #define pipewrite(a,b,c) send(a,b,c,0)
135 :
136 : #else /* !WIN32 */
137 :
138 : /* Non-Windows implementation of pipe access */
139 : #define pgpipe(a) pipe(a)
140 : #define piperead(a,b,c) read(a,b,c)
141 : #define pipewrite(a,b,c) write(a,b,c)
142 :
143 : #endif /* WIN32 */
144 :
145 : /*
146 : * State info for archive_close_connection() shutdown callback.
147 : */
148 : typedef struct ShutdownInformation
149 : {
150 : ParallelState *pstate;
151 : Archive *AHX;
152 : } ShutdownInformation;
153 :
154 : static ShutdownInformation shutdown_info;
155 :
156 : /*
157 : * State info for signal handling.
158 : * We assume signal_info initializes to zeroes.
159 : *
160 : * On Unix, myAH is the leader DB connection in the leader process, and the
161 : * worker's own connection in worker processes. On Windows, we have only one
162 : * instance of signal_info, so myAH is the leader connection and the worker
163 : * connections must be dug out of pstate->parallelSlot[].
164 : */
165 : typedef struct DumpSignalInformation
166 : {
167 : ArchiveHandle *myAH; /* database connection to issue cancel for */
168 : ParallelState *pstate; /* parallel state, if any */
169 : bool handler_set; /* signal handler set up in this process? */
170 : #ifndef WIN32
171 : bool am_worker; /* am I a worker process? */
172 : #endif
173 : } DumpSignalInformation;
174 :
175 : static volatile DumpSignalInformation signal_info;
176 :
177 : #ifdef WIN32
178 : static CRITICAL_SECTION signal_info_lock;
179 : #endif
180 :
181 : /*
182 : * Write a simple string to stderr --- must be safe in a signal handler.
183 : * We ignore the write() result since there's not much we could do about it.
184 : * Certain compilers make that harder than it ought to be.
185 : */
186 : #define write_stderr(str) \
187 : do { \
188 : const char *str_ = (str); \
189 : int rc_; \
190 : rc_ = write(fileno(stderr), str_, strlen(str_)); \
191 : (void) rc_; \
192 : } while (0)
193 :
194 :
195 : #ifdef WIN32
196 : /* file-scope variables */
197 : static DWORD tls_index;
198 :
199 : /* globally visible variables (needed by exit_nicely) */
200 : bool parallel_init_done = false;
201 : DWORD mainThreadId;
202 : #endif /* WIN32 */
203 :
204 : /* Local function prototypes */
205 : static ParallelSlot *GetMyPSlot(ParallelState *pstate);
206 : static void archive_close_connection(int code, void *arg);
207 : static void ShutdownWorkersHard(ParallelState *pstate);
208 : static void WaitForTerminatingWorkers(ParallelState *pstate);
209 : static void set_cancel_handler(void);
210 : static void set_cancel_pstate(ParallelState *pstate);
211 : static void set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH);
212 : static void RunWorker(ArchiveHandle *AH, ParallelSlot *slot);
213 : static int GetIdleWorker(ParallelState *pstate);
214 : static bool HasEveryWorkerTerminated(ParallelState *pstate);
215 : static void lockTableForWorker(ArchiveHandle *AH, TocEntry *te);
216 : static void WaitForCommands(ArchiveHandle *AH, int pipefd[2]);
217 : static bool ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate,
218 : bool do_wait);
219 : static char *getMessageFromLeader(int pipefd[2]);
220 : static void sendMessageToLeader(int pipefd[2], const char *str);
221 : static int select_loop(int maxFd, fd_set *workerset);
222 : static char *getMessageFromWorker(ParallelState *pstate,
223 : bool do_wait, int *worker);
224 : static void sendMessageToWorker(ParallelState *pstate,
225 : int worker, const char *str);
226 : static char *readMessageFromPipe(int fd);
227 :
228 : #define messageStartsWith(msg, prefix) \
229 : (strncmp(msg, prefix, strlen(prefix)) == 0)
230 :
231 :
232 : /*
233 : * Initialize parallel dump support --- should be called early in process
234 : * startup. (Currently, this is called whether or not we intend parallel
235 : * activity.)
236 : */
237 : void
238 614 : init_parallel_dump_utils(void)
239 : {
240 : #ifdef WIN32
241 : if (!parallel_init_done)
242 : {
243 : WSADATA wsaData;
244 : int err;
245 :
246 : /* Prepare for threaded operation */
247 : tls_index = TlsAlloc();
248 : mainThreadId = GetCurrentThreadId();
249 :
250 : /* Initialize socket access */
251 : err = WSAStartup(MAKEWORD(2, 2), &wsaData);
252 : if (err != 0)
253 : pg_fatal("%s() failed: error code %d", "WSAStartup", err);
254 :
255 : parallel_init_done = true;
256 : }
257 : #endif
258 614 : }
259 :
260 : /*
261 : * Find the ParallelSlot for the current worker process or thread.
262 : *
263 : * Returns NULL if no matching slot is found (this implies we're the leader).
264 : */
265 : static ParallelSlot *
266 0 : GetMyPSlot(ParallelState *pstate)
267 : {
268 : int i;
269 :
270 0 : for (i = 0; i < pstate->numWorkers; i++)
271 : {
272 : #ifdef WIN32
273 : if (pstate->parallelSlot[i].threadId == GetCurrentThreadId())
274 : #else
275 0 : if (pstate->parallelSlot[i].pid == getpid())
276 : #endif
277 0 : return &(pstate->parallelSlot[i]);
278 : }
279 :
280 0 : return NULL;
281 : }
282 :
283 : /*
284 : * A thread-local version of getLocalPQExpBuffer().
285 : *
286 : * Non-reentrant but reduces memory leakage: we'll consume one buffer per
287 : * thread, which is much better than one per fmtId/fmtQualifiedId call.
288 : */
289 : #ifdef WIN32
290 : static PQExpBuffer
291 : getThreadLocalPQExpBuffer(void)
292 : {
293 : /*
294 : * The Tls code goes awry if we use a static var, so we provide for both
295 : * static and auto, and omit any use of the static var when using Tls. We
296 : * rely on TlsGetValue() to return 0 if the value is not yet set.
297 : */
298 : static PQExpBuffer s_id_return = NULL;
299 : PQExpBuffer id_return;
300 :
301 : if (parallel_init_done)
302 : id_return = (PQExpBuffer) TlsGetValue(tls_index);
303 : else
304 : id_return = s_id_return;
305 :
306 : if (id_return) /* first time through? */
307 : {
308 : /* same buffer, just wipe contents */
309 : resetPQExpBuffer(id_return);
310 : }
311 : else
312 : {
313 : /* new buffer */
314 : id_return = createPQExpBuffer();
315 : if (parallel_init_done)
316 : TlsSetValue(tls_index, id_return);
317 : else
318 : s_id_return = id_return;
319 : }
320 :
321 : return id_return;
322 : }
323 : #endif /* WIN32 */
324 :
325 : /*
326 : * pg_dump and pg_restore call this to register the cleanup handler
327 : * as soon as they've created the ArchiveHandle.
328 : */
329 : void
330 422 : on_exit_close_archive(Archive *AHX)
331 : {
332 422 : shutdown_info.AHX = AHX;
333 422 : on_exit_nicely(archive_close_connection, &shutdown_info);
334 422 : }
335 :
336 : /*
337 : * on_exit_nicely handler for shutting down database connections and
338 : * worker processes cleanly.
339 : */
340 : static void
341 342 : archive_close_connection(int code, void *arg)
342 : {
343 342 : ShutdownInformation *si = (ShutdownInformation *) arg;
344 :
345 342 : if (si->pstate)
346 : {
347 : /* In parallel mode, must figure out who we are */
348 0 : ParallelSlot *slot = GetMyPSlot(si->pstate);
349 :
350 0 : if (!slot)
351 : {
352 : /*
353 : * We're the leader. Forcibly shut down workers, then close our
354 : * own database connection, if any.
355 : */
356 0 : ShutdownWorkersHard(si->pstate);
357 :
358 0 : if (si->AHX)
359 0 : DisconnectDatabase(si->AHX);
360 : }
361 : else
362 : {
363 : /*
364 : * We're a worker. Shut down our own DB connection if any. On
365 : * Windows, we also have to close our communication sockets, to
366 : * emulate what will happen on Unix when the worker process exits.
367 : * (Without this, if this is a premature exit, the leader would
368 : * fail to detect it because there would be no EOF condition on
369 : * the other end of the pipe.)
370 : */
371 0 : if (slot->AH)
372 0 : DisconnectDatabase(&(slot->AH->public));
373 :
374 : #ifdef WIN32
375 : closesocket(slot->pipeRevRead);
376 : closesocket(slot->pipeRevWrite);
377 : #endif
378 : }
379 : }
380 : else
381 : {
382 : /* Non-parallel operation: just kill the leader DB connection */
383 342 : if (si->AHX)
384 342 : DisconnectDatabase(si->AHX);
385 : }
386 342 : }
387 :
388 : /*
389 : * Forcibly shut down any remaining workers, waiting for them to finish.
390 : *
391 : * Note that we don't expect to come here during normal exit (the workers
392 : * should be long gone, and the ParallelState too). We're only here in a
393 : * pg_fatal() situation, so intervening to cancel active commands is
394 : * appropriate.
395 : */
396 : static void
397 0 : ShutdownWorkersHard(ParallelState *pstate)
398 : {
399 : int i;
400 :
401 : /*
402 : * Close our write end of the sockets so that any workers waiting for
403 : * commands know they can exit. (Note: some of the pipeWrite fields might
404 : * still be zero, if we failed to initialize all the workers. Hence, just
405 : * ignore errors here.)
406 : */
407 0 : for (i = 0; i < pstate->numWorkers; i++)
408 0 : closesocket(pstate->parallelSlot[i].pipeWrite);
409 :
410 : /*
411 : * Force early termination of any commands currently in progress.
412 : */
413 : #ifndef WIN32
414 : /* On non-Windows, send SIGTERM to each worker process. */
415 0 : for (i = 0; i < pstate->numWorkers; i++)
416 : {
417 0 : pid_t pid = pstate->parallelSlot[i].pid;
418 :
419 0 : if (pid != 0)
420 0 : kill(pid, SIGTERM);
421 : }
422 : #else
423 :
424 : /*
425 : * On Windows, send query cancels directly to the workers' backends. Use
426 : * a critical section to ensure worker threads don't change state.
427 : */
428 : EnterCriticalSection(&signal_info_lock);
429 : for (i = 0; i < pstate->numWorkers; i++)
430 : {
431 : ArchiveHandle *AH = pstate->parallelSlot[i].AH;
432 : char errbuf[1];
433 :
434 : if (AH != NULL && AH->connCancel != NULL)
435 : (void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf));
436 : }
437 : LeaveCriticalSection(&signal_info_lock);
438 : #endif
439 :
440 : /* Now wait for them to terminate. */
441 0 : WaitForTerminatingWorkers(pstate);
442 0 : }
443 :
444 : /*
445 : * Wait for all workers to terminate.
446 : */
447 : static void
448 24 : WaitForTerminatingWorkers(ParallelState *pstate)
449 : {
450 76 : while (!HasEveryWorkerTerminated(pstate))
451 : {
452 52 : ParallelSlot *slot = NULL;
453 : int j;
454 :
455 : #ifndef WIN32
456 : /* On non-Windows, use wait() to wait for next worker to end */
457 : int status;
458 52 : pid_t pid = wait(&status);
459 :
460 : /* Find dead worker's slot, and clear the PID field */
461 84 : for (j = 0; j < pstate->numWorkers; j++)
462 : {
463 84 : slot = &(pstate->parallelSlot[j]);
464 84 : if (slot->pid == pid)
465 : {
466 52 : slot->pid = 0;
467 52 : break;
468 : }
469 : }
470 : #else /* WIN32 */
471 : /* On Windows, we must use WaitForMultipleObjects() */
472 : HANDLE *lpHandles = pg_malloc(sizeof(HANDLE) * pstate->numWorkers);
473 : int nrun = 0;
474 : DWORD ret;
475 : uintptr_t hThread;
476 :
477 : for (j = 0; j < pstate->numWorkers; j++)
478 : {
479 : if (WORKER_IS_RUNNING(pstate->parallelSlot[j].workerStatus))
480 : {
481 : lpHandles[nrun] = (HANDLE) pstate->parallelSlot[j].hThread;
482 : nrun++;
483 : }
484 : }
485 : ret = WaitForMultipleObjects(nrun, lpHandles, false, INFINITE);
486 : Assert(ret != WAIT_FAILED);
487 : hThread = (uintptr_t) lpHandles[ret - WAIT_OBJECT_0];
488 : free(lpHandles);
489 :
490 : /* Find dead worker's slot, and clear the hThread field */
491 : for (j = 0; j < pstate->numWorkers; j++)
492 : {
493 : slot = &(pstate->parallelSlot[j]);
494 : if (slot->hThread == hThread)
495 : {
496 : /* For cleanliness, close handles for dead threads */
497 : CloseHandle((HANDLE) slot->hThread);
498 : slot->hThread = (uintptr_t) INVALID_HANDLE_VALUE;
499 : break;
500 : }
501 : }
502 : #endif /* WIN32 */
503 :
504 : /* On all platforms, update workerStatus and te[] as well */
505 : Assert(j < pstate->numWorkers);
506 52 : slot->workerStatus = WRKR_TERMINATED;
507 52 : pstate->te[j] = NULL;
508 : }
509 24 : }
510 :
511 :
512 : /*
513 : * Code for responding to cancel interrupts (SIGINT, control-C, etc)
514 : *
515 : * This doesn't quite belong in this module, but it needs access to the
516 : * ParallelState data, so there's not really a better place either.
517 : *
518 : * When we get a cancel interrupt, we could just die, but in pg_restore that
519 : * could leave a SQL command (e.g., CREATE INDEX on a large table) running
520 : * for a long time. Instead, we try to send a cancel request and then die.
521 : * pg_dump probably doesn't really need this, but we might as well use it
522 : * there too. Note that sending the cancel directly from the signal handler
523 : * is safe because PQcancel() is written to make it so.
524 : *
525 : * In parallel operation on Unix, each process is responsible for canceling
526 : * its own connection (this must be so because nobody else has access to it).
527 : * Furthermore, the leader process should attempt to forward its signal to
528 : * each child. In simple manual use of pg_dump/pg_restore, forwarding isn't
529 : * needed because typing control-C at the console would deliver SIGINT to
530 : * every member of the terminal process group --- but in other scenarios it
531 : * might be that only the leader gets signaled.
532 : *
533 : * On Windows, the cancel handler runs in a separate thread, because that's
534 : * how SetConsoleCtrlHandler works. We make it stop worker threads, send
535 : * cancels on all active connections, and then return FALSE, which will allow
536 : * the process to die. For safety's sake, we use a critical section to
537 : * protect the PGcancel structures against being changed while the signal
538 : * thread runs.
539 : */
540 :
541 : #ifndef WIN32
542 :
543 : /*
544 : * Signal handler (Unix only)
545 : */
546 : static void
547 0 : sigTermHandler(SIGNAL_ARGS)
548 : {
549 : int i;
550 : char errbuf[1];
551 :
552 : /*
553 : * Some platforms allow delivery of new signals to interrupt an active
554 : * signal handler. That could muck up our attempt to send PQcancel, so
555 : * disable the signals that set_cancel_handler enabled.
556 : */
557 0 : pqsignal(SIGINT, SIG_IGN);
558 0 : pqsignal(SIGTERM, SIG_IGN);
559 0 : pqsignal(SIGQUIT, SIG_IGN);
560 :
561 : /*
562 : * If we're in the leader, forward signal to all workers. (It seems best
563 : * to do this before PQcancel; killing the leader transaction will result
564 : * in invalid-snapshot errors from active workers, which maybe we can
565 : * quiet by killing workers first.) Ignore any errors.
566 : */
567 0 : if (signal_info.pstate != NULL)
568 : {
569 0 : for (i = 0; i < signal_info.pstate->numWorkers; i++)
570 : {
571 0 : pid_t pid = signal_info.pstate->parallelSlot[i].pid;
572 :
573 0 : if (pid != 0)
574 0 : kill(pid, SIGTERM);
575 : }
576 : }
577 :
578 : /*
579 : * Send QueryCancel if we have a connection to send to. Ignore errors,
580 : * there's not much we can do about them anyway.
581 : */
582 0 : if (signal_info.myAH != NULL && signal_info.myAH->connCancel != NULL)
583 0 : (void) PQcancel(signal_info.myAH->connCancel, errbuf, sizeof(errbuf));
584 :
585 : /*
586 : * Report we're quitting, using nothing more complicated than write(2).
587 : * When in parallel operation, only the leader process should do this.
588 : */
589 0 : if (!signal_info.am_worker)
590 : {
591 0 : if (progname)
592 : {
593 0 : write_stderr(progname);
594 0 : write_stderr(": ");
595 : }
596 0 : write_stderr("terminated by user\n");
597 : }
598 :
599 : /*
600 : * And die, using _exit() not exit() because the latter will invoke atexit
601 : * handlers that can fail if we interrupted related code.
602 : */
603 0 : _exit(1);
604 : }
605 :
606 : /*
607 : * Enable cancel interrupt handler, if not already done.
608 : */
609 : static void
610 946 : set_cancel_handler(void)
611 : {
612 : /*
613 : * When forking, signal_info.handler_set will propagate into the new
614 : * process, but that's fine because the signal handler state does too.
615 : */
616 946 : if (!signal_info.handler_set)
617 : {
618 370 : signal_info.handler_set = true;
619 :
620 370 : pqsignal(SIGINT, sigTermHandler);
621 370 : pqsignal(SIGTERM, sigTermHandler);
622 370 : pqsignal(SIGQUIT, sigTermHandler);
623 : }
624 946 : }
625 :
626 : #else /* WIN32 */
627 :
628 : /*
629 : * Console interrupt handler --- runs in a newly-started thread.
630 : *
631 : * After stopping other threads and sending cancel requests on all open
632 : * connections, we return FALSE which will allow the default ExitProcess()
633 : * action to be taken.
634 : */
635 : static BOOL WINAPI
636 : consoleHandler(DWORD dwCtrlType)
637 : {
638 : int i;
639 : char errbuf[1];
640 :
641 : if (dwCtrlType == CTRL_C_EVENT ||
642 : dwCtrlType == CTRL_BREAK_EVENT)
643 : {
644 : /* Critical section prevents changing data we look at here */
645 : EnterCriticalSection(&signal_info_lock);
646 :
647 : /*
648 : * If in parallel mode, stop worker threads and send QueryCancel to
649 : * their connected backends. The main point of stopping the worker
650 : * threads is to keep them from reporting the query cancels as errors,
651 : * which would clutter the user's screen. We needn't stop the leader
652 : * thread since it won't be doing much anyway. Do this before
653 : * canceling the main transaction, else we might get invalid-snapshot
654 : * errors reported before we can stop the workers. Ignore errors,
655 : * there's not much we can do about them anyway.
656 : */
657 : if (signal_info.pstate != NULL)
658 : {
659 : for (i = 0; i < signal_info.pstate->numWorkers; i++)
660 : {
661 : ParallelSlot *slot = &(signal_info.pstate->parallelSlot[i]);
662 : ArchiveHandle *AH = slot->AH;
663 : HANDLE hThread = (HANDLE) slot->hThread;
664 :
665 : /*
666 : * Using TerminateThread here may leave some resources leaked,
667 : * but it doesn't matter since we're about to end the whole
668 : * process.
669 : */
670 : if (hThread != INVALID_HANDLE_VALUE)
671 : TerminateThread(hThread, 0);
672 :
673 : if (AH != NULL && AH->connCancel != NULL)
674 : (void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf));
675 : }
676 : }
677 :
678 : /*
679 : * Send QueryCancel to leader connection, if enabled. Ignore errors,
680 : * there's not much we can do about them anyway.
681 : */
682 : if (signal_info.myAH != NULL && signal_info.myAH->connCancel != NULL)
683 : (void) PQcancel(signal_info.myAH->connCancel,
684 : errbuf, sizeof(errbuf));
685 :
686 : LeaveCriticalSection(&signal_info_lock);
687 :
688 : /*
689 : * Report we're quitting, using nothing more complicated than
690 : * write(2). (We might be able to get away with using pg_log_*()
691 : * here, but since we terminated other threads uncleanly above, it
692 : * seems better to assume as little as possible.)
693 : */
694 : if (progname)
695 : {
696 : write_stderr(progname);
697 : write_stderr(": ");
698 : }
699 : write_stderr("terminated by user\n");
700 : }
701 :
702 : /* Always return FALSE to allow signal handling to continue */
703 : return FALSE;
704 : }
705 :
706 : /*
707 : * Enable cancel interrupt handler, if not already done.
708 : */
709 : static void
710 : set_cancel_handler(void)
711 : {
712 : if (!signal_info.handler_set)
713 : {
714 : signal_info.handler_set = true;
715 :
716 : InitializeCriticalSection(&signal_info_lock);
717 :
718 : SetConsoleCtrlHandler(consoleHandler, TRUE);
719 : }
720 : }
721 :
722 : #endif /* WIN32 */
723 :
724 :
725 : /*
726 : * set_archive_cancel_info
727 : *
728 : * Fill AH->connCancel with cancellation info for the specified database
729 : * connection; or clear it if conn is NULL.
730 : */
731 : void
732 946 : set_archive_cancel_info(ArchiveHandle *AH, PGconn *conn)
733 : {
734 : PGcancel *oldConnCancel;
735 :
736 : /*
737 : * Activate the interrupt handler if we didn't yet in this process. On
738 : * Windows, this also initializes signal_info_lock; therefore it's
739 : * important that this happen at least once before we fork off any
740 : * threads.
741 : */
742 946 : set_cancel_handler();
743 :
744 : /*
745 : * On Unix, we assume that storing a pointer value is atomic with respect
746 : * to any possible signal interrupt. On Windows, use a critical section.
747 : */
748 :
749 : #ifdef WIN32
750 : EnterCriticalSection(&signal_info_lock);
751 : #endif
752 :
753 : /* Free the old one if we have one */
754 946 : oldConnCancel = AH->connCancel;
755 : /* be sure interrupt handler doesn't use pointer while freeing */
756 946 : AH->connCancel = NULL;
757 :
758 946 : if (oldConnCancel != NULL)
759 484 : PQfreeCancel(oldConnCancel);
760 :
761 : /* Set the new one if specified */
762 946 : if (conn)
763 488 : AH->connCancel = PQgetCancel(conn);
764 :
765 : /*
766 : * On Unix, there's only ever one active ArchiveHandle per process, so we
767 : * can just set signal_info.myAH unconditionally. On Windows, do that
768 : * only in the main thread; worker threads have to make sure their
769 : * ArchiveHandle appears in the pstate data, which is dealt with in
770 : * RunWorker().
771 : */
772 : #ifndef WIN32
773 946 : signal_info.myAH = AH;
774 : #else
775 : if (mainThreadId == GetCurrentThreadId())
776 : signal_info.myAH = AH;
777 : #endif
778 :
779 : #ifdef WIN32
780 : LeaveCriticalSection(&signal_info_lock);
781 : #endif
782 946 : }
783 :
784 : /*
785 : * set_cancel_pstate
786 : *
787 : * Set signal_info.pstate to point to the specified ParallelState, if any.
788 : * We need this mainly to have an interlock against Windows signal thread.
789 : */
790 : static void
791 48 : set_cancel_pstate(ParallelState *pstate)
792 : {
793 : #ifdef WIN32
794 : EnterCriticalSection(&signal_info_lock);
795 : #endif
796 :
797 48 : signal_info.pstate = pstate;
798 :
799 : #ifdef WIN32
800 : LeaveCriticalSection(&signal_info_lock);
801 : #endif
802 48 : }
803 :
804 : /*
805 : * set_cancel_slot_archive
806 : *
807 : * Set ParallelSlot's AH field to point to the specified archive, if any.
808 : * We need this mainly to have an interlock against Windows signal thread.
809 : */
810 : static void
811 104 : set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH)
812 : {
813 : #ifdef WIN32
814 : EnterCriticalSection(&signal_info_lock);
815 : #endif
816 :
817 104 : slot->AH = AH;
818 :
819 : #ifdef WIN32
820 : LeaveCriticalSection(&signal_info_lock);
821 : #endif
822 104 : }
823 :
824 :
825 : /*
826 : * This function is called by both Unix and Windows variants to set up
827 : * and run a worker process. Caller should exit the process (or thread)
828 : * upon return.
829 : */
830 : static void
831 52 : RunWorker(ArchiveHandle *AH, ParallelSlot *slot)
832 : {
833 : int pipefd[2];
834 :
835 : /* fetch child ends of pipes */
836 52 : pipefd[PIPE_READ] = slot->pipeRevRead;
837 52 : pipefd[PIPE_WRITE] = slot->pipeRevWrite;
838 :
839 : /*
840 : * Clone the archive so that we have our own state to work with, and in
841 : * particular our own database connection.
842 : *
843 : * We clone on Unix as well as Windows, even though technically we don't
844 : * need to because fork() gives us a copy in our own address space
845 : * already. But CloneArchive resets the state information and also clones
846 : * the database connection which both seem kinda helpful.
847 : */
848 52 : AH = CloneArchive(AH);
849 :
850 : /* Remember cloned archive where signal handler can find it */
851 52 : set_cancel_slot_archive(slot, AH);
852 :
853 : /*
854 : * Call the setup worker function that's defined in the ArchiveHandle.
855 : */
856 52 : (AH->SetupWorkerPtr) ((Archive *) AH);
857 :
858 : /*
859 : * Execute commands until done.
860 : */
861 52 : WaitForCommands(AH, pipefd);
862 :
863 : /*
864 : * Disconnect from database and clean up.
865 : */
866 52 : set_cancel_slot_archive(slot, NULL);
867 52 : DisconnectDatabase(&(AH->public));
868 52 : DeCloneArchive(AH);
869 52 : }
870 :
871 : /*
872 : * Thread base function for Windows
873 : */
874 : #ifdef WIN32
875 : static unsigned __stdcall
876 : init_spawned_worker_win32(WorkerInfo *wi)
877 : {
878 : ArchiveHandle *AH = wi->AH;
879 : ParallelSlot *slot = wi->slot;
880 :
881 : /* Don't need WorkerInfo anymore */
882 : free(wi);
883 :
884 : /* Run the worker ... */
885 : RunWorker(AH, slot);
886 :
887 : /* Exit the thread */
888 : _endthreadex(0);
889 : return 0;
890 : }
891 : #endif /* WIN32 */
892 :
893 : /*
894 : * This function starts a parallel dump or restore by spawning off the worker
895 : * processes. For Windows, it creates a number of threads; on Unix the
896 : * workers are created with fork().
897 : */
898 : ParallelState *
899 28 : ParallelBackupStart(ArchiveHandle *AH)
900 : {
901 : ParallelState *pstate;
902 : int i;
903 :
904 : Assert(AH->public.numWorkers > 0);
905 :
906 28 : pstate = (ParallelState *) pg_malloc(sizeof(ParallelState));
907 :
908 28 : pstate->numWorkers = AH->public.numWorkers;
909 28 : pstate->te = NULL;
910 28 : pstate->parallelSlot = NULL;
911 :
912 28 : if (AH->public.numWorkers == 1)
913 4 : return pstate;
914 :
915 : /* Create status arrays, being sure to initialize all fields to 0 */
916 24 : pstate->te = (TocEntry **)
917 24 : pg_malloc0(pstate->numWorkers * sizeof(TocEntry *));
918 24 : pstate->parallelSlot = (ParallelSlot *)
919 24 : pg_malloc0(pstate->numWorkers * sizeof(ParallelSlot));
920 :
921 : #ifdef WIN32
922 : /* Make fmtId() and fmtQualifiedId() use thread-local storage */
923 : getLocalPQExpBuffer = getThreadLocalPQExpBuffer;
924 : #endif
925 :
926 : /*
927 : * Set the pstate in shutdown_info, to tell the exit handler that it must
928 : * clean up workers as well as the main database connection. But we don't
929 : * set this in signal_info yet, because we don't want child processes to
930 : * inherit non-NULL signal_info.pstate.
931 : */
932 24 : shutdown_info.pstate = pstate;
933 :
934 : /*
935 : * Temporarily disable query cancellation on the leader connection. This
936 : * ensures that child processes won't inherit valid AH->connCancel
937 : * settings and thus won't try to issue cancels against the leader's
938 : * connection. No harm is done if we fail while it's disabled, because
939 : * the leader connection is idle at this point anyway.
940 : */
941 24 : set_archive_cancel_info(AH, NULL);
942 :
943 : /* Ensure stdio state is quiesced before forking */
944 24 : fflush(NULL);
945 :
946 : /* Create desired number of workers */
947 76 : for (i = 0; i < pstate->numWorkers; i++)
948 : {
949 : #ifdef WIN32
950 : WorkerInfo *wi;
951 : uintptr_t handle;
952 : #else
953 : pid_t pid;
954 : #endif
955 52 : ParallelSlot *slot = &(pstate->parallelSlot[i]);
956 : int pipeMW[2],
957 : pipeWM[2];
958 :
959 : /* Create communication pipes for this worker */
960 52 : if (pgpipe(pipeMW) < 0 || pgpipe(pipeWM) < 0)
961 0 : pg_fatal("could not create communication channels: %m");
962 :
963 : /* leader's ends of the pipes */
964 52 : slot->pipeRead = pipeWM[PIPE_READ];
965 52 : slot->pipeWrite = pipeMW[PIPE_WRITE];
966 : /* child's ends of the pipes */
967 52 : slot->pipeRevRead = pipeMW[PIPE_READ];
968 52 : slot->pipeRevWrite = pipeWM[PIPE_WRITE];
969 :
970 : #ifdef WIN32
971 : /* Create transient structure to pass args to worker function */
972 : wi = (WorkerInfo *) pg_malloc(sizeof(WorkerInfo));
973 :
974 : wi->AH = AH;
975 : wi->slot = slot;
976 :
977 : handle = _beginthreadex(NULL, 0, (void *) &init_spawned_worker_win32,
978 : wi, 0, &(slot->threadId));
979 : slot->hThread = handle;
980 : slot->workerStatus = WRKR_IDLE;
981 : #else /* !WIN32 */
982 52 : pid = fork();
983 104 : if (pid == 0)
984 : {
985 : /* we are the worker */
986 : int j;
987 :
988 : /* this is needed for GetMyPSlot() */
989 52 : slot->pid = getpid();
990 :
991 : /* instruct signal handler that we're in a worker now */
992 52 : signal_info.am_worker = true;
993 :
994 : /* close read end of Worker -> Leader */
995 52 : closesocket(pipeWM[PIPE_READ]);
996 : /* close write end of Leader -> Worker */
997 52 : closesocket(pipeMW[PIPE_WRITE]);
998 :
999 : /*
1000 : * Close all inherited fds for communication of the leader with
1001 : * previously-forked workers.
1002 : */
1003 84 : for (j = 0; j < i; j++)
1004 : {
1005 32 : closesocket(pstate->parallelSlot[j].pipeRead);
1006 32 : closesocket(pstate->parallelSlot[j].pipeWrite);
1007 : }
1008 :
1009 : /* Run the worker ... */
1010 52 : RunWorker(AH, slot);
1011 :
1012 : /* We can just exit(0) when done */
1013 52 : exit(0);
1014 : }
1015 52 : else if (pid < 0)
1016 : {
1017 : /* fork failed */
1018 0 : pg_fatal("could not create worker process: %m");
1019 : }
1020 :
1021 : /* In Leader after successful fork */
1022 52 : slot->pid = pid;
1023 52 : slot->workerStatus = WRKR_IDLE;
1024 :
1025 : /* close read end of Leader -> Worker */
1026 52 : closesocket(pipeMW[PIPE_READ]);
1027 : /* close write end of Worker -> Leader */
1028 52 : closesocket(pipeWM[PIPE_WRITE]);
1029 : #endif /* WIN32 */
1030 : }
1031 :
1032 : /*
1033 : * Having forked off the workers, disable SIGPIPE so that leader isn't
1034 : * killed if it tries to send a command to a dead worker. We don't want
1035 : * the workers to inherit this setting, though.
1036 : */
1037 : #ifndef WIN32
1038 24 : pqsignal(SIGPIPE, SIG_IGN);
1039 : #endif
1040 :
1041 : /*
1042 : * Re-establish query cancellation on the leader connection.
1043 : */
1044 24 : set_archive_cancel_info(AH, AH->connection);
1045 :
1046 : /*
1047 : * Tell the cancel signal handler to forward signals to worker processes,
1048 : * too. (As with query cancel, we did not need this earlier because the
1049 : * workers have not yet been given anything to do; if we die before this
1050 : * point, any already-started workers will see EOF and quit promptly.)
1051 : */
1052 24 : set_cancel_pstate(pstate);
1053 :
1054 24 : return pstate;
1055 : }
1056 :
1057 : /*
1058 : * Close down a parallel dump or restore.
1059 : */
1060 : void
1061 28 : ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate)
1062 : {
1063 : int i;
1064 :
1065 : /* No work if non-parallel */
1066 28 : if (pstate->numWorkers == 1)
1067 4 : return;
1068 :
1069 : /* There should not be any unfinished jobs */
1070 : Assert(IsEveryWorkerIdle(pstate));
1071 :
1072 : /* Close the sockets so that the workers know they can exit */
1073 76 : for (i = 0; i < pstate->numWorkers; i++)
1074 : {
1075 52 : closesocket(pstate->parallelSlot[i].pipeRead);
1076 52 : closesocket(pstate->parallelSlot[i].pipeWrite);
1077 : }
1078 :
1079 : /* Wait for them to exit */
1080 24 : WaitForTerminatingWorkers(pstate);
1081 :
1082 : /*
1083 : * Unlink pstate from shutdown_info, so the exit handler will not try to
1084 : * use it; and likewise unlink from signal_info.
1085 : */
1086 24 : shutdown_info.pstate = NULL;
1087 24 : set_cancel_pstate(NULL);
1088 :
1089 : /* Release state (mere neatnik-ism, since we're about to terminate) */
1090 24 : free(pstate->te);
1091 24 : free(pstate->parallelSlot);
1092 24 : free(pstate);
1093 : }
1094 :
1095 : /*
1096 : * These next four functions handle construction and parsing of the command
1097 : * strings and response strings for parallel workers.
1098 : *
1099 : * Currently, these can be the same regardless of which archive format we are
1100 : * processing. In future, we might want to let format modules override these
1101 : * functions to add format-specific data to a command or response.
1102 : */
1103 :
1104 : /*
1105 : * buildWorkerCommand: format a command string to send to a worker.
1106 : *
1107 : * The string is built in the caller-supplied buffer of size buflen.
1108 : */
1109 : static void
1110 328 : buildWorkerCommand(ArchiveHandle *AH, TocEntry *te, T_Action act,
1111 : char *buf, int buflen)
1112 : {
1113 328 : if (act == ACT_DUMP)
1114 236 : snprintf(buf, buflen, "DUMP %d", te->dumpId);
1115 92 : else if (act == ACT_RESTORE)
1116 92 : snprintf(buf, buflen, "RESTORE %d", te->dumpId);
1117 : else
1118 : Assert(false);
1119 328 : }
1120 :
1121 : /*
1122 : * parseWorkerCommand: interpret a command string in a worker.
1123 : */
1124 : static void
1125 328 : parseWorkerCommand(ArchiveHandle *AH, TocEntry **te, T_Action *act,
1126 : const char *msg)
1127 : {
1128 : DumpId dumpId;
1129 : int nBytes;
1130 :
1131 328 : if (messageStartsWith(msg, "DUMP "))
1132 : {
1133 236 : *act = ACT_DUMP;
1134 236 : sscanf(msg, "DUMP %d%n", &dumpId, &nBytes);
1135 : Assert(nBytes == strlen(msg));
1136 236 : *te = getTocEntryByDumpId(AH, dumpId);
1137 : Assert(*te != NULL);
1138 : }
1139 92 : else if (messageStartsWith(msg, "RESTORE "))
1140 : {
1141 92 : *act = ACT_RESTORE;
1142 92 : sscanf(msg, "RESTORE %d%n", &dumpId, &nBytes);
1143 : Assert(nBytes == strlen(msg));
1144 92 : *te = getTocEntryByDumpId(AH, dumpId);
1145 : Assert(*te != NULL);
1146 : }
1147 : else
1148 0 : pg_fatal("unrecognized command received from leader: \"%s\"",
1149 : msg);
1150 328 : }
1151 :
1152 : /*
1153 : * buildWorkerResponse: format a response string to send to the leader.
1154 : *
1155 : * The string is built in the caller-supplied buffer of size buflen.
1156 : */
1157 : static void
1158 328 : buildWorkerResponse(ArchiveHandle *AH, TocEntry *te, T_Action act, int status,
1159 : char *buf, int buflen)
1160 : {
1161 328 : snprintf(buf, buflen, "OK %d %d %d",
1162 : te->dumpId,
1163 : status,
1164 : status == WORKER_IGNORED_ERRORS ? AH->public.n_errors : 0);
1165 328 : }
1166 :
1167 : /*
1168 : * parseWorkerResponse: parse the status message returned by a worker.
1169 : *
1170 : * Returns the integer status code, and may update fields of AH and/or te.
1171 : */
1172 : static int
1173 328 : parseWorkerResponse(ArchiveHandle *AH, TocEntry *te,
1174 : const char *msg)
1175 : {
1176 : DumpId dumpId;
1177 : int nBytes,
1178 : n_errors;
1179 328 : int status = 0;
1180 :
1181 328 : if (messageStartsWith(msg, "OK "))
1182 : {
1183 328 : sscanf(msg, "OK %d %d %d%n", &dumpId, &status, &n_errors, &nBytes);
1184 :
1185 : Assert(dumpId == te->dumpId);
1186 : Assert(nBytes == strlen(msg));
1187 :
1188 328 : AH->public.n_errors += n_errors;
1189 : }
1190 : else
1191 0 : pg_fatal("invalid message received from worker: \"%s\"",
1192 : msg);
1193 :
1194 328 : return status;
1195 : }
1196 :
1197 : /*
1198 : * Dispatch a job to some free worker.
1199 : *
1200 : * te is the TocEntry to be processed, act is the action to be taken on it.
1201 : * callback is the function to call on completion of the job.
1202 : *
1203 : * If no worker is currently available, this will block, and previously
1204 : * registered callback functions may be called.
1205 : */
1206 : void
1207 328 : DispatchJobForTocEntry(ArchiveHandle *AH,
1208 : ParallelState *pstate,
1209 : TocEntry *te,
1210 : T_Action act,
1211 : ParallelCompletionPtr callback,
1212 : void *callback_data)
1213 : {
1214 : int worker;
1215 : char buf[256];
1216 :
1217 : /* Get a worker, waiting if none are idle */
1218 534 : while ((worker = GetIdleWorker(pstate)) == NO_SLOT)
1219 206 : WaitForWorkers(AH, pstate, WFW_ONE_IDLE);
1220 :
1221 : /* Construct and send command string */
1222 328 : buildWorkerCommand(AH, te, act, buf, sizeof(buf));
1223 :
1224 328 : sendMessageToWorker(pstate, worker, buf);
1225 :
1226 : /* Remember worker is busy, and which TocEntry it's working on */
1227 328 : pstate->parallelSlot[worker].workerStatus = WRKR_WORKING;
1228 328 : pstate->parallelSlot[worker].callback = callback;
1229 328 : pstate->parallelSlot[worker].callback_data = callback_data;
1230 328 : pstate->te[worker] = te;
1231 328 : }
1232 :
1233 : /*
1234 : * Find an idle worker and return its slot number.
1235 : * Return NO_SLOT if none are idle.
1236 : */
1237 : static int
1238 832 : GetIdleWorker(ParallelState *pstate)
1239 : {
1240 : int i;
1241 :
1242 2012 : for (i = 0; i < pstate->numWorkers; i++)
1243 : {
1244 1532 : if (pstate->parallelSlot[i].workerStatus == WRKR_IDLE)
1245 352 : return i;
1246 : }
1247 480 : return NO_SLOT;
1248 : }
1249 :
1250 : /*
1251 : * Return true iff no worker is running.
1252 : */
1253 : static bool
1254 76 : HasEveryWorkerTerminated(ParallelState *pstate)
1255 : {
1256 : int i;
1257 :
1258 148 : for (i = 0; i < pstate->numWorkers; i++)
1259 : {
1260 124 : if (WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
1261 52 : return false;
1262 : }
1263 24 : return true;
1264 : }
1265 :
1266 : /*
1267 : * Return true iff every worker is in the WRKR_IDLE state.
1268 : */
1269 : bool
1270 94 : IsEveryWorkerIdle(ParallelState *pstate)
1271 : {
1272 : int i;
1273 :
1274 200 : for (i = 0; i < pstate->numWorkers; i++)
1275 : {
1276 160 : if (pstate->parallelSlot[i].workerStatus != WRKR_IDLE)
1277 54 : return false;
1278 : }
1279 40 : return true;
1280 : }
1281 :
1282 : /*
1283 : * Acquire lock on a table to be dumped by a worker process.
1284 : *
1285 : * The leader process is already holding an ACCESS SHARE lock. Ordinarily
1286 : * it's no problem for a worker to get one too, but if anything else besides
1287 : * pg_dump is running, there's a possible deadlock:
1288 : *
1289 : * 1) Leader dumps the schema and locks all tables in ACCESS SHARE mode.
1290 : * 2) Another process requests an ACCESS EXCLUSIVE lock (which is not granted
1291 : * because the leader holds a conflicting ACCESS SHARE lock).
1292 : * 3) A worker process also requests an ACCESS SHARE lock to read the table.
1293 : * The worker is enqueued behind the ACCESS EXCLUSIVE lock request.
1294 : * 4) Now we have a deadlock, since the leader is effectively waiting for
1295 : * the worker. The server cannot detect that, however.
1296 : *
1297 : * To prevent an infinite wait, prior to touching a table in a worker, request
1298 : * a lock in ACCESS SHARE mode but with NOWAIT. If we don't get the lock,
1299 : * then we know that somebody else has requested an ACCESS EXCLUSIVE lock and
1300 : * so we have a deadlock. We must fail the backup in that case.
1301 : */
1302 : static void
1303 236 : lockTableForWorker(ArchiveHandle *AH, TocEntry *te)
1304 : {
1305 : const char *qualId;
1306 : PQExpBuffer query;
1307 : PGresult *res;
1308 :
1309 : /* Nothing to do for BLOBS */
1310 236 : if (strcmp(te->desc, "BLOBS") == 0)
1311 12 : return;
1312 :
1313 224 : query = createPQExpBuffer();
1314 :
1315 224 : qualId = fmtQualifiedId(te->namespace, te->tag);
1316 :
1317 224 : appendPQExpBuffer(query, "LOCK TABLE %s IN ACCESS SHARE MODE NOWAIT",
1318 : qualId);
1319 :
1320 224 : res = PQexec(AH->connection, query->data);
1321 :
1322 224 : if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
1323 0 : pg_fatal("could not obtain lock on relation \"%s\"\n"
1324 : "This usually means that someone requested an ACCESS EXCLUSIVE lock "
1325 : "on the table after the pg_dump parent process had gotten the "
1326 : "initial ACCESS SHARE lock on the table.", qualId);
1327 :
1328 224 : PQclear(res);
1329 224 : destroyPQExpBuffer(query);
1330 : }
1331 :
1332 : /*
1333 : * WaitForCommands: main routine for a worker process.
1334 : *
1335 : * Read and execute commands from the leader until we see EOF on the pipe.
1336 : */
1337 : static void
1338 52 : WaitForCommands(ArchiveHandle *AH, int pipefd[2])
1339 : {
1340 : char *command;
1341 : TocEntry *te;
1342 : T_Action act;
1343 52 : int status = 0;
1344 : char buf[256];
1345 :
1346 : for (;;)
1347 : {
1348 380 : if (!(command = getMessageFromLeader(pipefd)))
1349 : {
1350 : /* EOF, so done */
1351 52 : return;
1352 : }
1353 :
1354 : /* Decode the command */
1355 328 : parseWorkerCommand(AH, &te, &act, command);
1356 :
1357 328 : if (act == ACT_DUMP)
1358 : {
1359 : /* Acquire lock on this table within the worker's session */
1360 236 : lockTableForWorker(AH, te);
1361 :
1362 : /* Perform the dump command */
1363 236 : status = (AH->WorkerJobDumpPtr) (AH, te);
1364 : }
1365 92 : else if (act == ACT_RESTORE)
1366 : {
1367 : /* Perform the restore command */
1368 92 : status = (AH->WorkerJobRestorePtr) (AH, te);
1369 : }
1370 : else
1371 : Assert(false);
1372 :
1373 : /* Return status to leader */
1374 328 : buildWorkerResponse(AH, te, act, status, buf, sizeof(buf));
1375 :
1376 328 : sendMessageToLeader(pipefd, buf);
1377 :
1378 : /* command was pg_malloc'd and we are responsible for free()ing it. */
1379 328 : free(command);
1380 : }
1381 : }
1382 :
1383 : /*
1384 : * Check for status messages from workers.
1385 : *
1386 : * If do_wait is true, wait to get a status message; otherwise, just return
1387 : * immediately if there is none available.
1388 : *
1389 : * When we get a status message, we pass the status code to the callback
1390 : * function that was specified to DispatchJobForTocEntry, then reset the
1391 : * worker status to IDLE.
1392 : *
1393 : * Returns true if we collected a status message, else false.
1394 : *
1395 : * XXX is it worth checking for more than one status message per call?
1396 : * It seems somewhat unlikely that multiple workers would finish at exactly
1397 : * the same time.
1398 : */
1399 : static bool
1400 642 : ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait)
1401 : {
1402 : int worker;
1403 : char *msg;
1404 :
1405 : /* Try to collect a status message */
1406 642 : msg = getMessageFromWorker(pstate, do_wait, &worker);
1407 :
1408 642 : if (!msg)
1409 : {
1410 : /* If do_wait is true, we must have detected EOF on some socket */
1411 314 : if (do_wait)
1412 0 : pg_fatal("a worker process died unexpectedly");
1413 314 : return false;
1414 : }
1415 :
1416 : /* Process it and update our idea of the worker's status */
1417 328 : if (messageStartsWith(msg, "OK "))
1418 : {
1419 328 : ParallelSlot *slot = &pstate->parallelSlot[worker];
1420 328 : TocEntry *te = pstate->te[worker];
1421 : int status;
1422 :
1423 328 : status = parseWorkerResponse(AH, te, msg);
1424 328 : slot->callback(AH, te, status, slot->callback_data);
1425 328 : slot->workerStatus = WRKR_IDLE;
1426 328 : pstate->te[worker] = NULL;
1427 : }
1428 : else
1429 0 : pg_fatal("invalid message received from worker: \"%s\"",
1430 : msg);
1431 :
1432 : /* Free the string returned from getMessageFromWorker */
1433 328 : free(msg);
1434 :
1435 328 : return true;
1436 : }
1437 :
1438 : /*
1439 : * Check for status results from workers, waiting if necessary.
1440 : *
1441 : * Available wait modes are:
1442 : * WFW_NO_WAIT: reap any available status, but don't block
1443 : * WFW_GOT_STATUS: wait for at least one more worker to finish
1444 : * WFW_ONE_IDLE: wait for at least one worker to be idle
1445 : * WFW_ALL_IDLE: wait for all workers to be idle
1446 : *
1447 : * Any received results are passed to the callback specified to
1448 : * DispatchJobForTocEntry.
1449 : *
1450 : * This function is executed in the leader process.
1451 : */
1452 : void
1453 338 : WaitForWorkers(ArchiveHandle *AH, ParallelState *pstate, WFW_WaitOption mode)
1454 : {
1455 338 : bool do_wait = false;
1456 :
1457 : /*
1458 : * In GOT_STATUS mode, always block waiting for a message, since we can't
1459 : * return till we get something. In other modes, we don't block the first
1460 : * time through the loop.
1461 : */
1462 338 : if (mode == WFW_GOT_STATUS)
1463 : {
1464 : /* Assert that caller knows what it's doing */
1465 : Assert(!IsEveryWorkerIdle(pstate));
1466 24 : do_wait = true;
1467 : }
1468 :
1469 : for (;;)
1470 : {
1471 : /*
1472 : * Check for status messages, even if we don't need to block. We do
1473 : * not try very hard to reap all available messages, though, since
1474 : * there's unlikely to be more than one.
1475 : */
1476 642 : if (ListenToWorkers(AH, pstate, do_wait))
1477 : {
1478 : /*
1479 : * If we got a message, we are done by definition for GOT_STATUS
1480 : * mode, and we can also be certain that there's at least one idle
1481 : * worker. So we're done in all but ALL_IDLE mode.
1482 : */
1483 328 : if (mode != WFW_ALL_IDLE)
1484 298 : return;
1485 : }
1486 :
1487 : /* Check whether we must wait for new status messages */
1488 344 : switch (mode)
1489 : {
1490 0 : case WFW_NO_WAIT:
1491 0 : return; /* never wait */
1492 0 : case WFW_GOT_STATUS:
1493 : Assert(false); /* can't get here, because we waited */
1494 0 : break;
1495 298 : case WFW_ONE_IDLE:
1496 298 : if (GetIdleWorker(pstate) != NO_SLOT)
1497 24 : return;
1498 274 : break;
1499 46 : case WFW_ALL_IDLE:
1500 46 : if (IsEveryWorkerIdle(pstate))
1501 16 : return;
1502 30 : break;
1503 : }
1504 :
1505 : /* Loop back, and this time wait for something to happen */
1506 304 : do_wait = true;
1507 : }
1508 : }
1509 :
1510 : /*
1511 : * Read one command message from the leader, blocking if necessary
1512 : * until one is available, and return it as a malloc'd string.
1513 : * On EOF, return NULL.
1514 : *
1515 : * This function is executed in worker processes.
1516 : */
1517 : static char *
1518 380 : getMessageFromLeader(int pipefd[2])
1519 : {
1520 380 : return readMessageFromPipe(pipefd[PIPE_READ]);
1521 : }
1522 :
1523 : /*
1524 : * Send a status message to the leader.
1525 : *
1526 : * This function is executed in worker processes.
1527 : */
1528 : static void
1529 328 : sendMessageToLeader(int pipefd[2], const char *str)
1530 : {
1531 328 : int len = strlen(str) + 1;
1532 :
1533 328 : if (pipewrite(pipefd[PIPE_WRITE], str, len) != len)
1534 0 : pg_fatal("could not write to the communication channel: %m");
1535 328 : }
1536 :
1537 : /*
1538 : * Wait until some descriptor in "workerset" becomes readable.
1539 : * Returns -1 on error, else the number of readable descriptors.
1540 : */
1541 : static int
1542 328 : select_loop(int maxFd, fd_set *workerset)
1543 : {
1544 : int i;
1545 328 : fd_set saveSet = *workerset;
1546 :
1547 : for (;;)
1548 : {
1549 328 : *workerset = saveSet;
1550 328 : i = select(maxFd + 1, workerset, NULL, NULL, NULL);
1551 :
1552 : #ifndef WIN32
1553 328 : if (i < 0 && errno == EINTR)
1554 0 : continue;
1555 : #else
1556 : if (i == SOCKET_ERROR && WSAGetLastError() == WSAEINTR)
1557 : continue;
1558 : #endif
1559 328 : break;
1560 : }
1561 :
1562 328 : return i;
1563 : }
1564 :
1565 :
1566 : /*
1567 : * Check for messages from worker processes.
1568 : *
1569 : * If a message is available, return it as a malloc'd string, and put the
1570 : * index of the sending worker in *worker.
1571 : *
1572 : * If nothing is available, wait if "do_wait" is true, else return NULL.
1573 : *
1574 : * If we detect EOF on any socket, we'll return NULL. It's not great that
1575 : * that's hard to distinguish from the no-data-available case, but for now
1576 : * our one caller is okay with that.
1577 : *
1578 : * This function is executed in the leader process.
1579 : */
1580 : static char *
1581 642 : getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker)
1582 : {
1583 : int i;
1584 : fd_set workerset;
1585 642 : int maxFd = -1;
1586 642 : struct timeval nowait = {0, 0};
1587 :
1588 : /* construct bitmap of socket descriptors for select() */
1589 642 : FD_ZERO(&workerset);
1590 2102 : for (i = 0; i < pstate->numWorkers; i++)
1591 : {
1592 1460 : if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
1593 0 : continue;
1594 1460 : FD_SET(pstate->parallelSlot[i].pipeRead, &workerset);
1595 1460 : if (pstate->parallelSlot[i].pipeRead > maxFd)
1596 1460 : maxFd = pstate->parallelSlot[i].pipeRead;
1597 : }
1598 :
1599 642 : if (do_wait)
1600 : {
1601 328 : i = select_loop(maxFd, &workerset);
1602 : Assert(i != 0);
1603 : }
1604 : else
1605 : {
1606 314 : if ((i = select(maxFd + 1, &workerset, NULL, NULL, &nowait)) == 0)
1607 314 : return NULL;
1608 : }
1609 :
1610 328 : if (i < 0)
1611 0 : pg_fatal("%s() failed: %m", "select");
1612 :
1613 448 : for (i = 0; i < pstate->numWorkers; i++)
1614 : {
1615 : char *msg;
1616 :
1617 448 : if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
1618 0 : continue;
1619 448 : if (!FD_ISSET(pstate->parallelSlot[i].pipeRead, &workerset))
1620 120 : continue;
1621 :
1622 : /*
1623 : * Read the message if any. If the socket is ready because of EOF,
1624 : * we'll return NULL instead (and the socket will stay ready, so the
1625 : * condition will persist).
1626 : *
1627 : * Note: because this is a blocking read, we'll wait if only part of
1628 : * the message is available. Waiting a long time would be bad, but
1629 : * since worker status messages are short and are always sent in one
1630 : * operation, it shouldn't be a problem in practice.
1631 : */
1632 328 : msg = readMessageFromPipe(pstate->parallelSlot[i].pipeRead);
1633 328 : *worker = i;
1634 328 : return msg;
1635 : }
1636 : Assert(false);
1637 0 : return NULL;
1638 : }
1639 :
1640 : /*
1641 : * Send a command message to the specified worker process.
1642 : *
1643 : * This function is executed in the leader process.
1644 : */
1645 : static void
1646 328 : sendMessageToWorker(ParallelState *pstate, int worker, const char *str)
1647 : {
1648 328 : int len = strlen(str) + 1;
1649 :
1650 328 : if (pipewrite(pstate->parallelSlot[worker].pipeWrite, str, len) != len)
1651 : {
1652 0 : pg_fatal("could not write to the communication channel: %m");
1653 : }
1654 328 : }
1655 :
1656 : /*
1657 : * Read one message from the specified pipe (fd), blocking if necessary
1658 : * until one is available, and return it as a malloc'd string.
1659 : * On EOF, return NULL.
1660 : *
1661 : * A "message" on the channel is just a null-terminated string.
1662 : */
1663 : static char *
1664 708 : readMessageFromPipe(int fd)
1665 : {
1666 : char *msg;
1667 : int msgsize,
1668 : bufsize;
1669 : int ret;
1670 :
1671 : /*
1672 : * In theory, if we let piperead() read multiple bytes, it might give us
1673 : * back fragments of multiple messages. (That can't actually occur, since
1674 : * neither leader nor workers send more than one message without waiting
1675 : * for a reply, but we don't wish to assume that here.) For simplicity,
1676 : * read a byte at a time until we get the terminating '\0'. This method
1677 : * is a bit inefficient, but since this is only used for relatively short
1678 : * command and status strings, it shouldn't matter.
1679 : */
1680 708 : bufsize = 64; /* could be any number */
1681 708 : msg = (char *) pg_malloc(bufsize);
1682 708 : msgsize = 0;
1683 : for (;;)
1684 : {
1685 6836 : Assert(msgsize < bufsize);
1686 7544 : ret = piperead(fd, msg + msgsize, 1);
1687 7544 : if (ret <= 0)
1688 52 : break; /* error or connection closure */
1689 :
1690 : Assert(ret == 1);
1691 :
1692 7492 : if (msg[msgsize] == '\0')
1693 656 : return msg; /* collected whole message */
1694 :
1695 6836 : msgsize++;
1696 6836 : if (msgsize == bufsize) /* enlarge buffer if needed */
1697 : {
1698 0 : bufsize += 16; /* could be any number */
1699 0 : msg = (char *) pg_realloc(msg, bufsize);
1700 : }
1701 : }
1702 :
1703 : /* Other end has closed the connection */
1704 52 : pg_free(msg);
1705 52 : return NULL;
1706 : }
1707 :
1708 : #ifdef WIN32
1709 :
1710 : /*
1711 : * This is a replacement version of pipe(2) for Windows which allows the pipe
1712 : * handles to be used in select().
1713 : *
1714 : * Reads and writes on the pipe must go through piperead()/pipewrite().
1715 : *
1716 : * For consistency with Unix we declare the returned handles as "int".
1717 : * This is okay even on WIN64 because system handles are not more than
1718 : * 32 bits wide, but we do have to do some casting.
1719 : */
1720 : static int
1721 : pgpipe(int handles[2])
1722 : {
1723 : pgsocket s,
1724 : tmp_sock;
1725 : struct sockaddr_in serv_addr;
1726 : int len = sizeof(serv_addr);
1727 :
1728 : /* We have to use the Unix socket invalid file descriptor value here. */
1729 : handles[0] = handles[1] = -1;
1730 :
1731 : /*
1732 : * setup listen socket
1733 : */
1734 : if ((s = socket(AF_INET, SOCK_STREAM, 0)) == PGINVALID_SOCKET)
1735 : {
1736 : pg_log_error("pgpipe: could not create socket: error code %d",
1737 : WSAGetLastError());
1738 : return -1;
1739 : }
1740 :
1741 : memset(&serv_addr, 0, sizeof(serv_addr));
1742 : serv_addr.sin_family = AF_INET;
1743 : serv_addr.sin_port = pg_hton16(0);
1744 : serv_addr.sin_addr.s_addr = pg_hton32(INADDR_LOOPBACK);
1745 : if (bind(s, (SOCKADDR *) &serv_addr, len) == SOCKET_ERROR)
1746 : {
1747 : pg_log_error("pgpipe: could not bind: error code %d",
1748 : WSAGetLastError());
1749 : closesocket(s);
1750 : return -1;
1751 : }
1752 : if (listen(s, 1) == SOCKET_ERROR)
1753 : {
1754 : pg_log_error("pgpipe: could not listen: error code %d",
1755 : WSAGetLastError());
1756 : closesocket(s);
1757 : return -1;
1758 : }
1759 : if (getsockname(s, (SOCKADDR *) &serv_addr, &len) == SOCKET_ERROR)
1760 : {
1761 : pg_log_error("pgpipe: %s() failed: error code %d", "getsockname",
1762 : WSAGetLastError());
1763 : closesocket(s);
1764 : return -1;
1765 : }
1766 :
1767 : /*
1768 : * setup pipe handles
1769 : */
1770 : if ((tmp_sock = socket(AF_INET, SOCK_STREAM, 0)) == PGINVALID_SOCKET)
1771 : {
1772 : pg_log_error("pgpipe: could not create second socket: error code %d",
1773 : WSAGetLastError());
1774 : closesocket(s);
1775 : return -1;
1776 : }
1777 : handles[1] = (int) tmp_sock;
1778 :
1779 : if (connect(handles[1], (SOCKADDR *) &serv_addr, len) == SOCKET_ERROR)
1780 : {
1781 : pg_log_error("pgpipe: could not connect socket: error code %d",
1782 : WSAGetLastError());
1783 : closesocket(handles[1]);
1784 : handles[1] = -1;
1785 : closesocket(s);
1786 : return -1;
1787 : }
1788 : if ((tmp_sock = accept(s, (SOCKADDR *) &serv_addr, &len)) == PGINVALID_SOCKET)
1789 : {
1790 : pg_log_error("pgpipe: could not accept connection: error code %d",
1791 : WSAGetLastError());
1792 : closesocket(handles[1]);
1793 : handles[1] = -1;
1794 : closesocket(s);
1795 : return -1;
1796 : }
1797 : handles[0] = (int) tmp_sock;
1798 :
1799 : closesocket(s);
1800 : return 0;
1801 : }
1802 :
1803 : #endif /* WIN32 */
|