Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * parallel.c
4 : *
5 : * Parallel support for pg_dump and pg_restore
6 : *
7 : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
8 : * Portions Copyright (c) 1994, Regents of the University of California
9 : *
10 : * IDENTIFICATION
11 : * src/bin/pg_dump/parallel.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 :
16 : /*
17 : * Parallel operation works like this:
18 : *
19 : * The original, leader process calls ParallelBackupStart(), which forks off
20 : * the desired number of worker processes, which each enter WaitForCommands().
21 : *
22 : * The leader process dispatches an individual work item to one of the worker
23 : * processes in DispatchJobForTocEntry(). We send a command string such as
24 : * "DUMP 1234" or "RESTORE 1234", where 1234 is the TocEntry ID.
25 : * The worker process receives and decodes the command and passes it to the
26 : * routine pointed to by AH->WorkerJobDumpPtr or AH->WorkerJobRestorePtr,
27 : * which are routines of the current archive format. That routine performs
28 : * the required action (dump or restore) and returns an integer status code.
29 : * This is passed back to the leader where we pass it to the
30 : * ParallelCompletionPtr callback function that was passed to
31 : * DispatchJobForTocEntry(). The callback function does state updating
32 : * for the leader control logic in pg_backup_archiver.c.
33 : *
34 : * In principle additional archive-format-specific information might be needed
35 : * in commands or worker status responses, but so far that hasn't proved
36 : * necessary, since workers have full copies of the ArchiveHandle/TocEntry
37 : * data structures. Remember that we have forked off the workers only after
38 : * we have read in the catalog. That's why our worker processes can also
39 : * access the catalog information. (In the Windows case, the workers are
40 : * threads in the same process. To avoid problems, they work with cloned
41 : * copies of the Archive data structure; see RunWorker().)
42 : *
43 : * In the leader process, the workerStatus field for each worker has one of
44 : * the following values:
45 : * WRKR_NOT_STARTED: we've not yet forked this worker
46 : * WRKR_IDLE: it's waiting for a command
47 : * WRKR_WORKING: it's working on a command
48 : * WRKR_TERMINATED: process ended
49 : * The pstate->te[] entry for each worker is valid when it's in WRKR_WORKING
50 : * state, and must be NULL in other states.
51 : */
52 :
53 : #include "postgres_fe.h"
54 :
55 : #ifndef WIN32
56 : #include <sys/select.h>
57 : #include <sys/wait.h>
58 : #include <signal.h>
59 : #include <unistd.h>
60 : #include <fcntl.h>
61 : #endif
62 :
63 : #include "fe_utils/string_utils.h"
64 : #include "parallel.h"
65 : #include "pg_backup_utils.h"
66 : #ifdef WIN32
67 : #include "port/pg_bswap.h"
68 : #endif
69 :
70 : /* Mnemonic macros for indexing the fd array returned by pipe(2) */
71 : #define PIPE_READ 0
72 : #define PIPE_WRITE 1
73 :
74 : #define NO_SLOT (-1) /* Failure result for GetIdleWorker() */
75 :
76 : /* Worker process statuses */
77 : typedef enum
78 : {
79 : WRKR_NOT_STARTED = 0,
80 : WRKR_IDLE,
81 : WRKR_WORKING,
82 : WRKR_TERMINATED,
83 : } T_WorkerStatus;
84 :
85 : #define WORKER_IS_RUNNING(workerStatus) \
86 : ((workerStatus) == WRKR_IDLE || (workerStatus) == WRKR_WORKING)
87 :
88 : /*
89 : * Private per-parallel-worker state (typedef for this is in parallel.h).
90 : *
91 : * Much of this is valid only in the leader process (or, on Windows, should
92 : * be touched only by the leader thread). But the AH field should be touched
93 : * only by workers. The pipe descriptors are valid everywhere.
94 : */
95 : struct ParallelSlot
96 : {
97 : T_WorkerStatus workerStatus; /* see enum above */
98 :
99 : /* These fields are valid if workerStatus == WRKR_WORKING: */
100 : ParallelCompletionPtr callback; /* function to call on completion */
101 : void *callback_data; /* passthrough data for it */
102 :
103 : ArchiveHandle *AH; /* Archive data worker is using */
104 :
105 : int pipeRead; /* leader's end of the pipes */
106 : int pipeWrite;
107 : int pipeRevRead; /* child's end of the pipes */
108 : int pipeRevWrite;
109 :
110 : /* Child process/thread identity info: */
111 : #ifdef WIN32
112 : uintptr_t hThread;
113 : unsigned int threadId;
114 : #else
115 : pid_t pid;
116 : #endif
117 : };
118 :
119 : #ifdef WIN32
120 :
121 : /*
122 : * Structure to hold info passed by _beginthreadex() to the function it calls
123 : * via its single allowed argument.
124 : */
125 : typedef struct
126 : {
127 : ArchiveHandle *AH; /* leader database connection */
128 : ParallelSlot *slot; /* this worker's parallel slot */
129 : } WorkerInfo;
130 :
131 : /* Windows implementation of pipe access */
132 : static int pgpipe(int handles[2]);
133 : #define piperead(a,b,c) recv(a,b,c,0)
134 : #define pipewrite(a,b,c) send(a,b,c,0)
135 :
136 : #else /* !WIN32 */
137 :
138 : /* Non-Windows implementation of pipe access */
139 : #define pgpipe(a) pipe(a)
140 : #define piperead(a,b,c) read(a,b,c)
141 : #define pipewrite(a,b,c) write(a,b,c)
142 :
143 : #endif /* WIN32 */
144 :
145 : /*
146 : * State info for archive_close_connection() shutdown callback.
147 : */
148 : typedef struct ShutdownInformation
149 : {
150 : ParallelState *pstate;
151 : Archive *AHX;
152 : } ShutdownInformation;
153 :
154 : static ShutdownInformation shutdown_info;
155 :
156 : /*
157 : * State info for signal handling.
158 : * We assume signal_info initializes to zeroes.
159 : *
160 : * On Unix, myAH is the leader DB connection in the leader process, and the
161 : * worker's own connection in worker processes. On Windows, we have only one
162 : * instance of signal_info, so myAH is the leader connection and the worker
163 : * connections must be dug out of pstate->parallelSlot[].
164 : */
165 : typedef struct DumpSignalInformation
166 : {
167 : ArchiveHandle *myAH; /* database connection to issue cancel for */
168 : ParallelState *pstate; /* parallel state, if any */
169 : bool handler_set; /* signal handler set up in this process? */
170 : #ifndef WIN32
171 : bool am_worker; /* am I a worker process? */
172 : #endif
173 : } DumpSignalInformation;
174 :
175 : static volatile DumpSignalInformation signal_info;
176 :
177 : #ifdef WIN32
178 : static CRITICAL_SECTION signal_info_lock;
179 : #endif
180 :
181 : /*
182 : * Write a simple string to stderr --- must be safe in a signal handler.
183 : * We ignore the write() result since there's not much we could do about it.
184 : * Certain compilers make that harder than it ought to be.
185 : */
186 : #define write_stderr(str) \
187 : do { \
188 : const char *str_ = (str); \
189 : int rc_; \
190 : rc_ = write(fileno(stderr), str_, strlen(str_)); \
191 : (void) rc_; \
192 : } while (0)
193 :
194 :
195 : #ifdef WIN32
196 : /* file-scope variables */
197 : static DWORD tls_index;
198 :
199 : /* globally visible variables (needed by exit_nicely) */
200 : bool parallel_init_done = false;
201 : DWORD mainThreadId;
202 : #endif /* WIN32 */
203 :
204 : /* Local function prototypes */
205 : static ParallelSlot *GetMyPSlot(ParallelState *pstate);
206 : static void archive_close_connection(int code, void *arg);
207 : static void ShutdownWorkersHard(ParallelState *pstate);
208 : static void WaitForTerminatingWorkers(ParallelState *pstate);
209 : static void set_cancel_handler(void);
210 : static void set_cancel_pstate(ParallelState *pstate);
211 : static void set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH);
212 : static void RunWorker(ArchiveHandle *AH, ParallelSlot *slot);
213 : static int GetIdleWorker(ParallelState *pstate);
214 : static bool HasEveryWorkerTerminated(ParallelState *pstate);
215 : static void lockTableForWorker(ArchiveHandle *AH, TocEntry *te);
216 : static void WaitForCommands(ArchiveHandle *AH, int pipefd[2]);
217 : static bool ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate,
218 : bool do_wait);
219 : static char *getMessageFromLeader(int pipefd[2]);
220 : static void sendMessageToLeader(int pipefd[2], const char *str);
221 : static int select_loop(int maxFd, fd_set *workerset);
222 : static char *getMessageFromWorker(ParallelState *pstate,
223 : bool do_wait, int *worker);
224 : static void sendMessageToWorker(ParallelState *pstate,
225 : int worker, const char *str);
226 : static char *readMessageFromPipe(int fd);
227 :
228 : #define messageStartsWith(msg, prefix) \
229 : (strncmp(msg, prefix, strlen(prefix)) == 0)
230 :
231 :
232 : /*
233 : * Initialize parallel dump support --- should be called early in process
234 : * startup. (Currently, this is called whether or not we intend parallel
235 : * activity.)
236 : */
237 : void
238 498 : init_parallel_dump_utils(void)
239 : {
240 : #ifdef WIN32
241 : if (!parallel_init_done)
242 : {
243 : WSADATA wsaData;
244 : int err;
245 :
246 : /* Prepare for threaded operation */
247 : tls_index = TlsAlloc();
248 : mainThreadId = GetCurrentThreadId();
249 :
250 : /* Initialize socket access */
251 : err = WSAStartup(MAKEWORD(2, 2), &wsaData);
252 : if (err != 0)
253 : pg_fatal("%s() failed: error code %d", "WSAStartup", err);
254 :
255 : parallel_init_done = true;
256 : }
257 : #endif
258 498 : }
259 :
260 : /*
261 : * Find the ParallelSlot for the current worker process or thread.
262 : *
263 : * Returns NULL if no matching slot is found (this implies we're the leader).
264 : */
265 : static ParallelSlot *
266 0 : GetMyPSlot(ParallelState *pstate)
267 : {
268 : int i;
269 :
270 0 : for (i = 0; i < pstate->numWorkers; i++)
271 : {
272 : #ifdef WIN32
273 : if (pstate->parallelSlot[i].threadId == GetCurrentThreadId())
274 : #else
275 0 : if (pstate->parallelSlot[i].pid == getpid())
276 : #endif
277 0 : return &(pstate->parallelSlot[i]);
278 : }
279 :
280 0 : return NULL;
281 : }
282 :
283 : /*
284 : * A thread-local version of getLocalPQExpBuffer().
285 : *
286 : * Non-reentrant but reduces memory leakage: we'll consume one buffer per
287 : * thread, which is much better than one per fmtId/fmtQualifiedId call.
288 : */
289 : #ifdef WIN32
290 : static PQExpBuffer
291 : getThreadLocalPQExpBuffer(void)
292 : {
293 : /*
294 : * The Tls code goes awry if we use a static var, so we provide for both
295 : * static and auto, and omit any use of the static var when using Tls. We
296 : * rely on TlsGetValue() to return 0 if the value is not yet set.
297 : */
298 : static PQExpBuffer s_id_return = NULL;
299 : PQExpBuffer id_return;
300 :
301 : if (parallel_init_done)
302 : id_return = (PQExpBuffer) TlsGetValue(tls_index);
303 : else
304 : id_return = s_id_return;
305 :
306 : if (id_return) /* first time through? */
307 : {
308 : /* same buffer, just wipe contents */
309 : resetPQExpBuffer(id_return);
310 : }
311 : else
312 : {
313 : /* new buffer */
314 : id_return = createPQExpBuffer();
315 : if (parallel_init_done)
316 : TlsSetValue(tls_index, id_return);
317 : else
318 : s_id_return = id_return;
319 : }
320 :
321 : return id_return;
322 : }
323 : #endif /* WIN32 */
324 :
325 : /*
326 : * pg_dump and pg_restore call this to register the cleanup handler
327 : * as soon as they've created the ArchiveHandle.
328 : */
329 : void
330 348 : on_exit_close_archive(Archive *AHX)
331 : {
332 348 : shutdown_info.AHX = AHX;
333 348 : on_exit_nicely(archive_close_connection, &shutdown_info);
334 348 : }
335 :
336 : /*
337 : * Update the archive handle in the on_exit callback registered by
338 : * on_exit_close_archive(). When pg_restore processes a pg_dumpall archive
339 : * containing multiple databases, each database is restored from a separate
340 : * archive. After closing one archive and opening the next, we update the
341 : * shutdown_info to reference the new archive handle so the cleanup callback
342 : * will close the correct archive on exit.
343 : */
344 : void
345 69 : replace_on_exit_close_archive(Archive *AHX)
346 : {
347 69 : shutdown_info.AHX = AHX;
348 69 : }
349 :
350 : /*
351 : * on_exit_nicely handler for shutting down database connections and
352 : * worker processes cleanly.
353 : */
354 : static void
355 276 : archive_close_connection(int code, void *arg)
356 : {
357 276 : ShutdownInformation *si = (ShutdownInformation *) arg;
358 :
359 276 : if (si->pstate)
360 : {
361 : /* In parallel mode, must figure out who we are */
362 0 : ParallelSlot *slot = GetMyPSlot(si->pstate);
363 :
364 0 : if (!slot)
365 : {
366 : /*
367 : * We're the leader. Forcibly shut down workers, then close our
368 : * own database connection, if any.
369 : */
370 0 : ShutdownWorkersHard(si->pstate);
371 :
372 0 : if (si->AHX)
373 0 : DisconnectDatabase(si->AHX);
374 : }
375 : else
376 : {
377 : /*
378 : * We're a worker. Shut down our own DB connection if any. On
379 : * Windows, we also have to close our communication sockets, to
380 : * emulate what will happen on Unix when the worker process exits.
381 : * (Without this, if this is a premature exit, the leader would
382 : * fail to detect it because there would be no EOF condition on
383 : * the other end of the pipe.)
384 : */
385 0 : if (slot->AH)
386 0 : DisconnectDatabase(&(slot->AH->public));
387 :
388 : #ifdef WIN32
389 : closesocket(slot->pipeRevRead);
390 : closesocket(slot->pipeRevWrite);
391 : #endif
392 : }
393 : }
394 : else
395 : {
396 : /* Non-parallel operation: just kill the leader DB connection */
397 276 : if (si->AHX)
398 276 : DisconnectDatabase(si->AHX);
399 : }
400 276 : }
401 :
402 : /*
403 : * Forcibly shut down any remaining workers, waiting for them to finish.
404 : *
405 : * Note that we don't expect to come here during normal exit (the workers
406 : * should be long gone, and the ParallelState too). We're only here in a
407 : * pg_fatal() situation, so intervening to cancel active commands is
408 : * appropriate.
409 : */
410 : static void
411 0 : ShutdownWorkersHard(ParallelState *pstate)
412 : {
413 : int i;
414 :
415 : /*
416 : * Close our write end of the sockets so that any workers waiting for
417 : * commands know they can exit. (Note: some of the pipeWrite fields might
418 : * still be zero, if we failed to initialize all the workers. Hence, just
419 : * ignore errors here.)
420 : */
421 0 : for (i = 0; i < pstate->numWorkers; i++)
422 0 : closesocket(pstate->parallelSlot[i].pipeWrite);
423 :
424 : /*
425 : * Force early termination of any commands currently in progress.
426 : */
427 : #ifndef WIN32
428 : /* On non-Windows, send SIGTERM to each worker process. */
429 0 : for (i = 0; i < pstate->numWorkers; i++)
430 : {
431 0 : pid_t pid = pstate->parallelSlot[i].pid;
432 :
433 0 : if (pid != 0)
434 0 : kill(pid, SIGTERM);
435 : }
436 : #else
437 :
438 : /*
439 : * On Windows, send query cancels directly to the workers' backends. Use
440 : * a critical section to ensure worker threads don't change state.
441 : */
442 : EnterCriticalSection(&signal_info_lock);
443 : for (i = 0; i < pstate->numWorkers; i++)
444 : {
445 : ArchiveHandle *AH = pstate->parallelSlot[i].AH;
446 : char errbuf[1];
447 :
448 : if (AH != NULL && AH->connCancel != NULL)
449 : (void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf));
450 : }
451 : LeaveCriticalSection(&signal_info_lock);
452 : #endif
453 :
454 : /* Now wait for them to terminate. */
455 0 : WaitForTerminatingWorkers(pstate);
456 0 : }
457 :
458 : /*
459 : * Wait for all workers to terminate.
460 : */
461 : static void
462 12 : WaitForTerminatingWorkers(ParallelState *pstate)
463 : {
464 38 : while (!HasEveryWorkerTerminated(pstate))
465 : {
466 26 : ParallelSlot *slot = NULL;
467 : int j;
468 :
469 : #ifndef WIN32
470 : /* On non-Windows, use wait() to wait for next worker to end */
471 : int status;
472 26 : pid_t pid = wait(&status);
473 :
474 : /* Find dead worker's slot, and clear the PID field */
475 42 : for (j = 0; j < pstate->numWorkers; j++)
476 : {
477 42 : slot = &(pstate->parallelSlot[j]);
478 42 : if (slot->pid == pid)
479 : {
480 26 : slot->pid = 0;
481 26 : break;
482 : }
483 : }
484 : #else /* WIN32 */
485 : /* On Windows, we must use WaitForMultipleObjects() */
486 : HANDLE *lpHandles = pg_malloc_array(HANDLE, pstate->numWorkers);
487 : int nrun = 0;
488 : DWORD ret;
489 : uintptr_t hThread;
490 :
491 : for (j = 0; j < pstate->numWorkers; j++)
492 : {
493 : if (WORKER_IS_RUNNING(pstate->parallelSlot[j].workerStatus))
494 : {
495 : lpHandles[nrun] = (HANDLE) pstate->parallelSlot[j].hThread;
496 : nrun++;
497 : }
498 : }
499 : ret = WaitForMultipleObjects(nrun, lpHandles, false, INFINITE);
500 : Assert(ret != WAIT_FAILED);
501 : hThread = (uintptr_t) lpHandles[ret - WAIT_OBJECT_0];
502 : free(lpHandles);
503 :
504 : /* Find dead worker's slot, and clear the hThread field */
505 : for (j = 0; j < pstate->numWorkers; j++)
506 : {
507 : slot = &(pstate->parallelSlot[j]);
508 : if (slot->hThread == hThread)
509 : {
510 : /* For cleanliness, close handles for dead threads */
511 : CloseHandle((HANDLE) slot->hThread);
512 : slot->hThread = (uintptr_t) INVALID_HANDLE_VALUE;
513 : break;
514 : }
515 : }
516 : #endif /* WIN32 */
517 :
518 : /* On all platforms, update workerStatus and te[] as well */
519 : Assert(j < pstate->numWorkers);
520 26 : slot->workerStatus = WRKR_TERMINATED;
521 26 : pstate->te[j] = NULL;
522 : }
523 12 : }
524 :
525 :
526 : /*
527 : * Code for responding to cancel interrupts (SIGINT, control-C, etc)
528 : *
529 : * This doesn't quite belong in this module, but it needs access to the
530 : * ParallelState data, so there's not really a better place either.
531 : *
532 : * When we get a cancel interrupt, we could just die, but in pg_restore that
533 : * could leave a SQL command (e.g., CREATE INDEX on a large table) running
534 : * for a long time. Instead, we try to send a cancel request and then die.
535 : * pg_dump probably doesn't really need this, but we might as well use it
536 : * there too. Note that sending the cancel directly from the signal handler
537 : * is safe because PQcancel() is written to make it so.
538 : *
539 : * In parallel operation on Unix, each process is responsible for canceling
540 : * its own connection (this must be so because nobody else has access to it).
541 : * Furthermore, the leader process should attempt to forward its signal to
542 : * each child. In simple manual use of pg_dump/pg_restore, forwarding isn't
543 : * needed because typing control-C at the console would deliver SIGINT to
544 : * every member of the terminal process group --- but in other scenarios it
545 : * might be that only the leader gets signaled.
546 : *
547 : * On Windows, the cancel handler runs in a separate thread, because that's
548 : * how SetConsoleCtrlHandler works. We make it stop worker threads, send
549 : * cancels on all active connections, and then return FALSE, which will allow
550 : * the process to die. For safety's sake, we use a critical section to
551 : * protect the PGcancel structures against being changed while the signal
552 : * thread runs.
553 : */
554 :
555 : #ifndef WIN32
556 :
557 : /*
558 : * Signal handler (Unix only)
559 : */
560 : static void
561 0 : sigTermHandler(SIGNAL_ARGS)
562 : {
563 : int i;
564 : char errbuf[1];
565 :
566 : /*
567 : * Some platforms allow delivery of new signals to interrupt an active
568 : * signal handler. That could muck up our attempt to send PQcancel, so
569 : * disable the signals that set_cancel_handler enabled.
570 : */
571 0 : pqsignal(SIGINT, SIG_IGN);
572 0 : pqsignal(SIGTERM, SIG_IGN);
573 0 : pqsignal(SIGQUIT, SIG_IGN);
574 :
575 : /*
576 : * If we're in the leader, forward signal to all workers. (It seems best
577 : * to do this before PQcancel; killing the leader transaction will result
578 : * in invalid-snapshot errors from active workers, which maybe we can
579 : * quiet by killing workers first.) Ignore any errors.
580 : */
581 0 : if (signal_info.pstate != NULL)
582 : {
583 0 : for (i = 0; i < signal_info.pstate->numWorkers; i++)
584 : {
585 0 : pid_t pid = signal_info.pstate->parallelSlot[i].pid;
586 :
587 0 : if (pid != 0)
588 0 : kill(pid, SIGTERM);
589 : }
590 : }
591 :
592 : /*
593 : * Send QueryCancel if we have a connection to send to. Ignore errors,
594 : * there's not much we can do about them anyway.
595 : */
596 0 : if (signal_info.myAH != NULL && signal_info.myAH->connCancel != NULL)
597 0 : (void) PQcancel(signal_info.myAH->connCancel, errbuf, sizeof(errbuf));
598 :
599 : /*
600 : * Report we're quitting, using nothing more complicated than write(2).
601 : * When in parallel operation, only the leader process should do this.
602 : */
603 0 : if (!signal_info.am_worker)
604 : {
605 0 : if (progname)
606 : {
607 0 : write_stderr(progname);
608 0 : write_stderr(": ");
609 : }
610 0 : write_stderr("terminated by user\n");
611 : }
612 :
613 : /*
614 : * And die, using _exit() not exit() because the latter will invoke atexit
615 : * handlers that can fail if we interrupted related code.
616 : */
617 0 : _exit(1);
618 : }
619 :
620 : /*
621 : * Enable cancel interrupt handler, if not already done.
622 : */
623 : static void
624 743 : set_cancel_handler(void)
625 : {
626 : /*
627 : * When forking, signal_info.handler_set will propagate into the new
628 : * process, but that's fine because the signal handler state does too.
629 : */
630 743 : if (!signal_info.handler_set)
631 : {
632 300 : signal_info.handler_set = true;
633 :
634 300 : pqsignal(SIGINT, sigTermHandler);
635 300 : pqsignal(SIGTERM, sigTermHandler);
636 300 : pqsignal(SIGQUIT, sigTermHandler);
637 : }
638 743 : }
639 :
640 : #else /* WIN32 */
641 :
642 : /*
643 : * Console interrupt handler --- runs in a newly-started thread.
644 : *
645 : * After stopping other threads and sending cancel requests on all open
646 : * connections, we return FALSE which will allow the default ExitProcess()
647 : * action to be taken.
648 : */
649 : static BOOL WINAPI
650 : consoleHandler(DWORD dwCtrlType)
651 : {
652 : int i;
653 : char errbuf[1];
654 :
655 : if (dwCtrlType == CTRL_C_EVENT ||
656 : dwCtrlType == CTRL_BREAK_EVENT)
657 : {
658 : /* Critical section prevents changing data we look at here */
659 : EnterCriticalSection(&signal_info_lock);
660 :
661 : /*
662 : * If in parallel mode, stop worker threads and send QueryCancel to
663 : * their connected backends. The main point of stopping the worker
664 : * threads is to keep them from reporting the query cancels as errors,
665 : * which would clutter the user's screen. We needn't stop the leader
666 : * thread since it won't be doing much anyway. Do this before
667 : * canceling the main transaction, else we might get invalid-snapshot
668 : * errors reported before we can stop the workers. Ignore errors,
669 : * there's not much we can do about them anyway.
670 : */
671 : if (signal_info.pstate != NULL)
672 : {
673 : for (i = 0; i < signal_info.pstate->numWorkers; i++)
674 : {
675 : ParallelSlot *slot = &(signal_info.pstate->parallelSlot[i]);
676 : ArchiveHandle *AH = slot->AH;
677 : HANDLE hThread = (HANDLE) slot->hThread;
678 :
679 : /*
680 : * Using TerminateThread here may leave some resources leaked,
681 : * but it doesn't matter since we're about to end the whole
682 : * process.
683 : */
684 : if (hThread != INVALID_HANDLE_VALUE)
685 : TerminateThread(hThread, 0);
686 :
687 : if (AH != NULL && AH->connCancel != NULL)
688 : (void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf));
689 : }
690 : }
691 :
692 : /*
693 : * Send QueryCancel to leader connection, if enabled. Ignore errors,
694 : * there's not much we can do about them anyway.
695 : */
696 : if (signal_info.myAH != NULL && signal_info.myAH->connCancel != NULL)
697 : (void) PQcancel(signal_info.myAH->connCancel,
698 : errbuf, sizeof(errbuf));
699 :
700 : LeaveCriticalSection(&signal_info_lock);
701 :
702 : /*
703 : * Report we're quitting, using nothing more complicated than
704 : * write(2). (We might be able to get away with using pg_log_*()
705 : * here, but since we terminated other threads uncleanly above, it
706 : * seems better to assume as little as possible.)
707 : */
708 : if (progname)
709 : {
710 : write_stderr(progname);
711 : write_stderr(": ");
712 : }
713 : write_stderr("terminated by user\n");
714 : }
715 :
716 : /* Always return FALSE to allow signal handling to continue */
717 : return FALSE;
718 : }
719 :
720 : /*
721 : * Enable cancel interrupt handler, if not already done.
722 : */
723 : static void
724 : set_cancel_handler(void)
725 : {
726 : if (!signal_info.handler_set)
727 : {
728 : signal_info.handler_set = true;
729 :
730 : InitializeCriticalSection(&signal_info_lock);
731 :
732 : SetConsoleCtrlHandler(consoleHandler, TRUE);
733 : }
734 : }
735 :
736 : #endif /* WIN32 */
737 :
738 :
739 : /*
740 : * set_archive_cancel_info
741 : *
742 : * Fill AH->connCancel with cancellation info for the specified database
743 : * connection; or clear it if conn is NULL.
744 : */
745 : void
746 743 : set_archive_cancel_info(ArchiveHandle *AH, PGconn *conn)
747 : {
748 : PGcancel *oldConnCancel;
749 :
750 : /*
751 : * Activate the interrupt handler if we didn't yet in this process. On
752 : * Windows, this also initializes signal_info_lock; therefore it's
753 : * important that this happen at least once before we fork off any
754 : * threads.
755 : */
756 743 : set_cancel_handler();
757 :
758 : /*
759 : * On Unix, we assume that storing a pointer value is atomic with respect
760 : * to any possible signal interrupt. On Windows, use a critical section.
761 : */
762 :
763 : #ifdef WIN32
764 : EnterCriticalSection(&signal_info_lock);
765 : #endif
766 :
767 : /* Free the old one if we have one */
768 743 : oldConnCancel = AH->connCancel;
769 : /* be sure interrupt handler doesn't use pointer while freeing */
770 743 : AH->connCancel = NULL;
771 :
772 743 : if (oldConnCancel != NULL)
773 397 : PQfreeCancel(oldConnCancel);
774 :
775 : /* Set the new one if specified */
776 743 : if (conn)
777 399 : AH->connCancel = PQgetCancel(conn);
778 :
779 : /*
780 : * On Unix, there's only ever one active ArchiveHandle per process, so we
781 : * can just set signal_info.myAH unconditionally. On Windows, do that
782 : * only in the main thread; worker threads have to make sure their
783 : * ArchiveHandle appears in the pstate data, which is dealt with in
784 : * RunWorker().
785 : */
786 : #ifndef WIN32
787 743 : signal_info.myAH = AH;
788 : #else
789 : if (mainThreadId == GetCurrentThreadId())
790 : signal_info.myAH = AH;
791 : #endif
792 :
793 : #ifdef WIN32
794 : LeaveCriticalSection(&signal_info_lock);
795 : #endif
796 743 : }
797 :
798 : /*
799 : * set_cancel_pstate
800 : *
801 : * Set signal_info.pstate to point to the specified ParallelState, if any.
802 : * We need this mainly to have an interlock against Windows signal thread.
803 : */
804 : static void
805 24 : set_cancel_pstate(ParallelState *pstate)
806 : {
807 : #ifdef WIN32
808 : EnterCriticalSection(&signal_info_lock);
809 : #endif
810 :
811 24 : signal_info.pstate = pstate;
812 :
813 : #ifdef WIN32
814 : LeaveCriticalSection(&signal_info_lock);
815 : #endif
816 24 : }
817 :
818 : /*
819 : * set_cancel_slot_archive
820 : *
821 : * Set ParallelSlot's AH field to point to the specified archive, if any.
822 : * We need this mainly to have an interlock against Windows signal thread.
823 : */
824 : static void
825 52 : set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH)
826 : {
827 : #ifdef WIN32
828 : EnterCriticalSection(&signal_info_lock);
829 : #endif
830 :
831 52 : slot->AH = AH;
832 :
833 : #ifdef WIN32
834 : LeaveCriticalSection(&signal_info_lock);
835 : #endif
836 52 : }
837 :
838 :
839 : /*
840 : * This function is called by both Unix and Windows variants to set up
841 : * and run a worker process. Caller should exit the process (or thread)
842 : * upon return.
843 : */
844 : static void
845 26 : RunWorker(ArchiveHandle *AH, ParallelSlot *slot)
846 : {
847 : int pipefd[2];
848 :
849 : /* fetch child ends of pipes */
850 26 : pipefd[PIPE_READ] = slot->pipeRevRead;
851 26 : pipefd[PIPE_WRITE] = slot->pipeRevWrite;
852 :
853 : /*
854 : * Clone the archive so that we have our own state to work with, and in
855 : * particular our own database connection.
856 : *
857 : * We clone on Unix as well as Windows, even though technically we don't
858 : * need to because fork() gives us a copy in our own address space
859 : * already. But CloneArchive resets the state information and also clones
860 : * the database connection which both seem kinda helpful.
861 : */
862 26 : AH = CloneArchive(AH);
863 :
864 : /* Remember cloned archive where signal handler can find it */
865 26 : set_cancel_slot_archive(slot, AH);
866 :
867 : /*
868 : * Call the setup worker function that's defined in the ArchiveHandle.
869 : */
870 26 : (AH->SetupWorkerPtr) ((Archive *) AH);
871 :
872 : /*
873 : * Execute commands until done.
874 : */
875 26 : WaitForCommands(AH, pipefd);
876 :
877 : /*
878 : * Disconnect from database and clean up.
879 : */
880 26 : set_cancel_slot_archive(slot, NULL);
881 26 : DisconnectDatabase(&(AH->public));
882 26 : DeCloneArchive(AH);
883 26 : }
884 :
885 : /*
886 : * Thread base function for Windows
887 : */
888 : #ifdef WIN32
889 : static unsigned __stdcall
890 : init_spawned_worker_win32(WorkerInfo *wi)
891 : {
892 : ArchiveHandle *AH = wi->AH;
893 : ParallelSlot *slot = wi->slot;
894 :
895 : /* Don't need WorkerInfo anymore */
896 : free(wi);
897 :
898 : /* Run the worker ... */
899 : RunWorker(AH, slot);
900 :
901 : /* Exit the thread */
902 : _endthreadex(0);
903 : return 0;
904 : }
905 : #endif /* WIN32 */
906 :
907 : /*
908 : * This function starts a parallel dump or restore by spawning off the worker
909 : * processes. For Windows, it creates a number of threads; on Unix the
910 : * workers are created with fork().
911 : */
912 : ParallelState *
913 61 : ParallelBackupStart(ArchiveHandle *AH)
914 : {
915 : ParallelState *pstate;
916 : int i;
917 :
918 : Assert(AH->public.numWorkers > 0);
919 :
920 61 : pstate = pg_malloc_object(ParallelState);
921 :
922 61 : pstate->numWorkers = AH->public.numWorkers;
923 61 : pstate->te = NULL;
924 61 : pstate->parallelSlot = NULL;
925 :
926 61 : if (AH->public.numWorkers == 1)
927 49 : return pstate;
928 :
929 : /* Create status arrays, being sure to initialize all fields to 0 */
930 12 : pstate->te =
931 12 : pg_malloc0_array(TocEntry *, pstate->numWorkers);
932 12 : pstate->parallelSlot =
933 12 : pg_malloc0_array(ParallelSlot, pstate->numWorkers);
934 :
935 : #ifdef WIN32
936 : /* Make fmtId() and fmtQualifiedId() use thread-local storage */
937 : getLocalPQExpBuffer = getThreadLocalPQExpBuffer;
938 : #endif
939 :
940 : /*
941 : * Set the pstate in shutdown_info, to tell the exit handler that it must
942 : * clean up workers as well as the main database connection. But we don't
943 : * set this in signal_info yet, because we don't want child processes to
944 : * inherit non-NULL signal_info.pstate.
945 : */
946 12 : shutdown_info.pstate = pstate;
947 :
948 : /*
949 : * Temporarily disable query cancellation on the leader connection. This
950 : * ensures that child processes won't inherit valid AH->connCancel
951 : * settings and thus won't try to issue cancels against the leader's
952 : * connection. No harm is done if we fail while it's disabled, because
953 : * the leader connection is idle at this point anyway.
954 : */
955 12 : set_archive_cancel_info(AH, NULL);
956 :
957 : /* Ensure stdio state is quiesced before forking */
958 12 : fflush(NULL);
959 :
960 : /* Create desired number of workers */
961 38 : for (i = 0; i < pstate->numWorkers; i++)
962 : {
963 : #ifdef WIN32
964 : WorkerInfo *wi;
965 : uintptr_t handle;
966 : #else
967 : pid_t pid;
968 : #endif
969 26 : ParallelSlot *slot = &(pstate->parallelSlot[i]);
970 : int pipeMW[2],
971 : pipeWM[2];
972 :
973 : /* Create communication pipes for this worker */
974 26 : if (pgpipe(pipeMW) < 0 || pgpipe(pipeWM) < 0)
975 0 : pg_fatal("could not create communication channels: %m");
976 :
977 : /* leader's ends of the pipes */
978 26 : slot->pipeRead = pipeWM[PIPE_READ];
979 26 : slot->pipeWrite = pipeMW[PIPE_WRITE];
980 : /* child's ends of the pipes */
981 26 : slot->pipeRevRead = pipeMW[PIPE_READ];
982 26 : slot->pipeRevWrite = pipeWM[PIPE_WRITE];
983 :
984 : #ifdef WIN32
985 : /* Create transient structure to pass args to worker function */
986 : wi = pg_malloc_object(WorkerInfo);
987 :
988 : wi->AH = AH;
989 : wi->slot = slot;
990 :
991 : handle = _beginthreadex(NULL, 0, (void *) &init_spawned_worker_win32,
992 : wi, 0, &(slot->threadId));
993 : slot->hThread = handle;
994 : slot->workerStatus = WRKR_IDLE;
995 : #else /* !WIN32 */
996 26 : pid = fork();
997 52 : if (pid == 0)
998 : {
999 : /* we are the worker */
1000 : int j;
1001 :
1002 : /* this is needed for GetMyPSlot() */
1003 26 : slot->pid = getpid();
1004 :
1005 : /* instruct signal handler that we're in a worker now */
1006 26 : signal_info.am_worker = true;
1007 :
1008 : /* close read end of Worker -> Leader */
1009 26 : closesocket(pipeWM[PIPE_READ]);
1010 : /* close write end of Leader -> Worker */
1011 26 : closesocket(pipeMW[PIPE_WRITE]);
1012 :
1013 : /*
1014 : * Close all inherited fds for communication of the leader with
1015 : * previously-forked workers.
1016 : */
1017 42 : for (j = 0; j < i; j++)
1018 : {
1019 16 : closesocket(pstate->parallelSlot[j].pipeRead);
1020 16 : closesocket(pstate->parallelSlot[j].pipeWrite);
1021 : }
1022 :
1023 : /* Run the worker ... */
1024 26 : RunWorker(AH, slot);
1025 :
1026 : /* We can just exit(0) when done */
1027 26 : exit(0);
1028 : }
1029 26 : else if (pid < 0)
1030 : {
1031 : /* fork failed */
1032 0 : pg_fatal("could not create worker process: %m");
1033 : }
1034 :
1035 : /* In Leader after successful fork */
1036 26 : slot->pid = pid;
1037 26 : slot->workerStatus = WRKR_IDLE;
1038 :
1039 : /* close read end of Leader -> Worker */
1040 26 : closesocket(pipeMW[PIPE_READ]);
1041 : /* close write end of Worker -> Leader */
1042 26 : closesocket(pipeWM[PIPE_WRITE]);
1043 : #endif /* WIN32 */
1044 : }
1045 :
1046 : /*
1047 : * Having forked off the workers, disable SIGPIPE so that leader isn't
1048 : * killed if it tries to send a command to a dead worker. We don't want
1049 : * the workers to inherit this setting, though.
1050 : */
1051 : #ifndef WIN32
1052 12 : pqsignal(SIGPIPE, SIG_IGN);
1053 : #endif
1054 :
1055 : /*
1056 : * Re-establish query cancellation on the leader connection.
1057 : */
1058 12 : set_archive_cancel_info(AH, AH->connection);
1059 :
1060 : /*
1061 : * Tell the cancel signal handler to forward signals to worker processes,
1062 : * too. (As with query cancel, we did not need this earlier because the
1063 : * workers have not yet been given anything to do; if we die before this
1064 : * point, any already-started workers will see EOF and quit promptly.)
1065 : */
1066 12 : set_cancel_pstate(pstate);
1067 :
1068 12 : return pstate;
1069 : }
1070 :
1071 : /*
1072 : * Close down a parallel dump or restore.
1073 : */
1074 : void
1075 61 : ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate)
1076 : {
1077 : int i;
1078 :
1079 : /* No work if non-parallel */
1080 61 : if (pstate->numWorkers == 1)
1081 49 : return;
1082 :
1083 : /* There should not be any unfinished jobs */
1084 : Assert(IsEveryWorkerIdle(pstate));
1085 :
1086 : /* Close the sockets so that the workers know they can exit */
1087 38 : for (i = 0; i < pstate->numWorkers; i++)
1088 : {
1089 26 : closesocket(pstate->parallelSlot[i].pipeRead);
1090 26 : closesocket(pstate->parallelSlot[i].pipeWrite);
1091 : }
1092 :
1093 : /* Wait for them to exit */
1094 12 : WaitForTerminatingWorkers(pstate);
1095 :
1096 : /*
1097 : * Unlink pstate from shutdown_info, so the exit handler will not try to
1098 : * use it; and likewise unlink from signal_info.
1099 : */
1100 12 : shutdown_info.pstate = NULL;
1101 12 : set_cancel_pstate(NULL);
1102 :
1103 : /* Release state (mere neatnik-ism, since we're about to terminate) */
1104 12 : free(pstate->te);
1105 12 : free(pstate->parallelSlot);
1106 12 : free(pstate);
1107 : }
1108 :
1109 : /*
1110 : * These next four functions handle construction and parsing of the command
1111 : * strings and response strings for parallel workers.
1112 : *
1113 : * Currently, these can be the same regardless of which archive format we are
1114 : * processing. In future, we might want to let format modules override these
1115 : * functions to add format-specific data to a command or response.
1116 : */
1117 :
1118 : /*
1119 : * buildWorkerCommand: format a command string to send to a worker.
1120 : *
1121 : * The string is built in the caller-supplied buffer of size buflen.
1122 : */
1123 : static void
1124 115 : buildWorkerCommand(ArchiveHandle *AH, TocEntry *te, T_Action act,
1125 : char *buf, int buflen)
1126 : {
1127 115 : if (act == ACT_DUMP)
1128 69 : snprintf(buf, buflen, "DUMP %d", te->dumpId);
1129 46 : else if (act == ACT_RESTORE)
1130 46 : snprintf(buf, buflen, "RESTORE %d", te->dumpId);
1131 : else
1132 : Assert(false);
1133 115 : }
1134 :
1135 : /*
1136 : * parseWorkerCommand: interpret a command string in a worker.
1137 : */
1138 : static void
1139 115 : parseWorkerCommand(ArchiveHandle *AH, TocEntry **te, T_Action *act,
1140 : const char *msg)
1141 : {
1142 : DumpId dumpId;
1143 : int nBytes;
1144 :
1145 115 : if (messageStartsWith(msg, "DUMP "))
1146 : {
1147 69 : *act = ACT_DUMP;
1148 69 : sscanf(msg, "DUMP %d%n", &dumpId, &nBytes);
1149 : Assert(nBytes == strlen(msg));
1150 69 : *te = getTocEntryByDumpId(AH, dumpId);
1151 : Assert(*te != NULL);
1152 : }
1153 46 : else if (messageStartsWith(msg, "RESTORE "))
1154 : {
1155 46 : *act = ACT_RESTORE;
1156 46 : sscanf(msg, "RESTORE %d%n", &dumpId, &nBytes);
1157 : Assert(nBytes == strlen(msg));
1158 46 : *te = getTocEntryByDumpId(AH, dumpId);
1159 : Assert(*te != NULL);
1160 : }
1161 : else
1162 0 : pg_fatal("unrecognized command received from leader: \"%s\"",
1163 : msg);
1164 115 : }
1165 :
1166 : /*
1167 : * buildWorkerResponse: format a response string to send to the leader.
1168 : *
1169 : * The string is built in the caller-supplied buffer of size buflen.
1170 : */
1171 : static void
1172 115 : buildWorkerResponse(ArchiveHandle *AH, TocEntry *te, T_Action act, int status,
1173 : char *buf, int buflen)
1174 : {
1175 115 : snprintf(buf, buflen, "OK %d %d %d",
1176 : te->dumpId,
1177 : status,
1178 : status == WORKER_IGNORED_ERRORS ? AH->public.n_errors : 0);
1179 115 : }
1180 :
1181 : /*
1182 : * parseWorkerResponse: parse the status message returned by a worker.
1183 : *
1184 : * Returns the integer status code, and may update fields of AH and/or te.
1185 : */
1186 : static int
1187 115 : parseWorkerResponse(ArchiveHandle *AH, TocEntry *te,
1188 : const char *msg)
1189 : {
1190 : DumpId dumpId;
1191 : int nBytes,
1192 : n_errors;
1193 115 : int status = 0;
1194 :
1195 115 : if (messageStartsWith(msg, "OK "))
1196 : {
1197 115 : sscanf(msg, "OK %d %d %d%n", &dumpId, &status, &n_errors, &nBytes);
1198 :
1199 : Assert(dumpId == te->dumpId);
1200 : Assert(nBytes == strlen(msg));
1201 :
1202 115 : AH->public.n_errors += n_errors;
1203 : }
1204 : else
1205 0 : pg_fatal("invalid message received from worker: \"%s\"",
1206 : msg);
1207 :
1208 115 : return status;
1209 : }
1210 :
1211 : /*
1212 : * Dispatch a job to some free worker.
1213 : *
1214 : * te is the TocEntry to be processed, act is the action to be taken on it.
1215 : * callback is the function to call on completion of the job.
1216 : *
1217 : * If no worker is currently available, this will block, and previously
1218 : * registered callback functions may be called.
1219 : */
1220 : void
1221 115 : DispatchJobForTocEntry(ArchiveHandle *AH,
1222 : ParallelState *pstate,
1223 : TocEntry *te,
1224 : T_Action act,
1225 : ParallelCompletionPtr callback,
1226 : void *callback_data)
1227 : {
1228 : int worker;
1229 : char buf[256];
1230 :
1231 : /* Get a worker, waiting if none are idle */
1232 169 : while ((worker = GetIdleWorker(pstate)) == NO_SLOT)
1233 54 : WaitForWorkers(AH, pstate, WFW_ONE_IDLE);
1234 :
1235 : /* Construct and send command string */
1236 115 : buildWorkerCommand(AH, te, act, buf, sizeof(buf));
1237 :
1238 115 : sendMessageToWorker(pstate, worker, buf);
1239 :
1240 : /* Remember worker is busy, and which TocEntry it's working on */
1241 115 : pstate->parallelSlot[worker].workerStatus = WRKR_WORKING;
1242 115 : pstate->parallelSlot[worker].callback = callback;
1243 115 : pstate->parallelSlot[worker].callback_data = callback_data;
1244 115 : pstate->te[worker] = te;
1245 115 : }
1246 :
1247 : /*
1248 : * Find an idle worker and return its slot number.
1249 : * Return NO_SLOT if none are idle.
1250 : */
1251 : static int
1252 259 : GetIdleWorker(ParallelState *pstate)
1253 : {
1254 : int i;
1255 :
1256 625 : for (i = 0; i < pstate->numWorkers; i++)
1257 : {
1258 493 : if (pstate->parallelSlot[i].workerStatus == WRKR_IDLE)
1259 127 : return i;
1260 : }
1261 132 : return NO_SLOT;
1262 : }
1263 :
1264 : /*
1265 : * Return true iff no worker is running.
1266 : */
1267 : static bool
1268 38 : HasEveryWorkerTerminated(ParallelState *pstate)
1269 : {
1270 : int i;
1271 :
1272 75 : for (i = 0; i < pstate->numWorkers; i++)
1273 : {
1274 63 : if (WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
1275 26 : return false;
1276 : }
1277 12 : return true;
1278 : }
1279 :
1280 : /*
1281 : * Return true iff every worker is in the WRKR_IDLE state.
1282 : */
1283 : bool
1284 45 : IsEveryWorkerIdle(ParallelState *pstate)
1285 : {
1286 : int i;
1287 :
1288 101 : for (i = 0; i < pstate->numWorkers; i++)
1289 : {
1290 81 : if (pstate->parallelSlot[i].workerStatus != WRKR_IDLE)
1291 25 : return false;
1292 : }
1293 20 : return true;
1294 : }
1295 :
1296 : /*
1297 : * Acquire lock on a table to be dumped by a worker process.
1298 : *
1299 : * The leader process is already holding an ACCESS SHARE lock. Ordinarily
1300 : * it's no problem for a worker to get one too, but if anything else besides
1301 : * pg_dump is running, there's a possible deadlock:
1302 : *
1303 : * 1) Leader dumps the schema and locks all tables in ACCESS SHARE mode.
1304 : * 2) Another process requests an ACCESS EXCLUSIVE lock (which is not granted
1305 : * because the leader holds a conflicting ACCESS SHARE lock).
1306 : * 3) A worker process also requests an ACCESS SHARE lock to read the table.
1307 : * The worker is enqueued behind the ACCESS EXCLUSIVE lock request.
1308 : * 4) Now we have a deadlock, since the leader is effectively waiting for
1309 : * the worker. The server cannot detect that, however.
1310 : *
1311 : * To prevent an infinite wait, prior to touching a table in a worker, request
1312 : * a lock in ACCESS SHARE mode but with NOWAIT. If we don't get the lock,
1313 : * then we know that somebody else has requested an ACCESS EXCLUSIVE lock and
1314 : * so we have a deadlock. We must fail the backup in that case.
1315 : */
1316 : static void
1317 69 : lockTableForWorker(ArchiveHandle *AH, TocEntry *te)
1318 : {
1319 : const char *qualId;
1320 : PQExpBuffer query;
1321 : PGresult *res;
1322 :
1323 : /* Nothing to do for BLOBS */
1324 69 : if (strcmp(te->desc, "BLOBS") == 0)
1325 4 : return;
1326 :
1327 65 : query = createPQExpBuffer();
1328 :
1329 65 : qualId = fmtQualifiedId(te->namespace, te->tag);
1330 :
1331 65 : appendPQExpBuffer(query, "LOCK TABLE %s IN ACCESS SHARE MODE NOWAIT",
1332 : qualId);
1333 :
1334 65 : res = PQexec(AH->connection, query->data);
1335 :
1336 65 : if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
1337 0 : pg_fatal("could not obtain lock on relation \"%s\"\n"
1338 : "This usually means that someone requested an ACCESS EXCLUSIVE lock "
1339 : "on the table after the pg_dump parent process had gotten the "
1340 : "initial ACCESS SHARE lock on the table.", qualId);
1341 :
1342 65 : PQclear(res);
1343 65 : destroyPQExpBuffer(query);
1344 : }
1345 :
1346 : /*
1347 : * WaitForCommands: main routine for a worker process.
1348 : *
1349 : * Read and execute commands from the leader until we see EOF on the pipe.
1350 : */
1351 : static void
1352 26 : WaitForCommands(ArchiveHandle *AH, int pipefd[2])
1353 : {
1354 : char *command;
1355 : TocEntry *te;
1356 : T_Action act;
1357 26 : int status = 0;
1358 : char buf[256];
1359 :
1360 : for (;;)
1361 : {
1362 141 : if (!(command = getMessageFromLeader(pipefd)))
1363 : {
1364 : /* EOF, so done */
1365 26 : return;
1366 : }
1367 :
1368 : /* Decode the command */
1369 115 : parseWorkerCommand(AH, &te, &act, command);
1370 :
1371 115 : if (act == ACT_DUMP)
1372 : {
1373 : /* Acquire lock on this table within the worker's session */
1374 69 : lockTableForWorker(AH, te);
1375 :
1376 : /* Perform the dump command */
1377 69 : status = (AH->WorkerJobDumpPtr) (AH, te);
1378 : }
1379 46 : else if (act == ACT_RESTORE)
1380 : {
1381 : /* Perform the restore command */
1382 46 : status = (AH->WorkerJobRestorePtr) (AH, te);
1383 : }
1384 : else
1385 : Assert(false);
1386 :
1387 : /* Return status to leader */
1388 115 : buildWorkerResponse(AH, te, act, status, buf, sizeof(buf));
1389 :
1390 115 : sendMessageToLeader(pipefd, buf);
1391 :
1392 : /* command was pg_malloc'd and we are responsible for free()ing it. */
1393 115 : free(command);
1394 : }
1395 : }
1396 :
1397 : /*
1398 : * Check for status messages from workers.
1399 : *
1400 : * If do_wait is true, wait to get a status message; otherwise, just return
1401 : * immediately if there is none available.
1402 : *
1403 : * When we get a status message, we pass the status code to the callback
1404 : * function that was specified to DispatchJobForTocEntry, then reset the
1405 : * worker status to IDLE.
1406 : *
1407 : * Returns true if we collected a status message, else false.
1408 : *
1409 : * XXX is it worth checking for more than one status message per call?
1410 : * It seems somewhat unlikely that multiple workers would finish at exactly
1411 : * the same time.
1412 : */
1413 : static bool
1414 211 : ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait)
1415 : {
1416 : int worker;
1417 : char *msg;
1418 :
1419 : /* Try to collect a status message */
1420 211 : msg = getMessageFromWorker(pstate, do_wait, &worker);
1421 :
1422 211 : if (!msg)
1423 : {
1424 : /* If do_wait is true, we must have detected EOF on some socket */
1425 96 : if (do_wait)
1426 0 : pg_fatal("a worker process died unexpectedly");
1427 96 : return false;
1428 : }
1429 :
1430 : /* Process it and update our idea of the worker's status */
1431 115 : if (messageStartsWith(msg, "OK "))
1432 : {
1433 115 : ParallelSlot *slot = &pstate->parallelSlot[worker];
1434 115 : TocEntry *te = pstate->te[worker];
1435 : int status;
1436 :
1437 115 : status = parseWorkerResponse(AH, te, msg);
1438 115 : slot->callback(AH, te, status, slot->callback_data);
1439 115 : slot->workerStatus = WRKR_IDLE;
1440 115 : pstate->te[worker] = NULL;
1441 : }
1442 : else
1443 0 : pg_fatal("invalid message received from worker: \"%s\"",
1444 : msg);
1445 :
1446 : /* Free the string returned from getMessageFromWorker */
1447 115 : free(msg);
1448 :
1449 115 : return true;
1450 : }
1451 :
1452 : /*
1453 : * Check for status results from workers, waiting if necessary.
1454 : *
1455 : * Available wait modes are:
1456 : * WFW_NO_WAIT: reap any available status, but don't block
1457 : * WFW_GOT_STATUS: wait for at least one more worker to finish
1458 : * WFW_ONE_IDLE: wait for at least one worker to be idle
1459 : * WFW_ALL_IDLE: wait for all workers to be idle
1460 : *
1461 : * Any received results are passed to the callback specified to
1462 : * DispatchJobForTocEntry.
1463 : *
1464 : * This function is executed in the leader process.
1465 : */
1466 : void
1467 120 : WaitForWorkers(ArchiveHandle *AH, ParallelState *pstate, WFW_WaitOption mode)
1468 : {
1469 120 : bool do_wait = false;
1470 :
1471 : /*
1472 : * In GOT_STATUS mode, always block waiting for a message, since we can't
1473 : * return till we get something. In other modes, we don't block the first
1474 : * time through the loop.
1475 : */
1476 120 : if (mode == WFW_GOT_STATUS)
1477 : {
1478 : /* Assert that caller knows what it's doing */
1479 : Assert(!IsEveryWorkerIdle(pstate));
1480 12 : do_wait = true;
1481 : }
1482 :
1483 : for (;;)
1484 : {
1485 : /*
1486 : * Check for status messages, even if we don't need to block. We do
1487 : * not try very hard to reap all available messages, though, since
1488 : * there's unlikely to be more than one.
1489 : */
1490 211 : if (ListenToWorkers(AH, pstate, do_wait))
1491 : {
1492 : /*
1493 : * If we got a message, we are done by definition for GOT_STATUS
1494 : * mode, and we can also be certain that there's at least one idle
1495 : * worker. So we're done in all but ALL_IDLE mode.
1496 : */
1497 115 : if (mode != WFW_ALL_IDLE)
1498 100 : return;
1499 : }
1500 :
1501 : /* Check whether we must wait for new status messages */
1502 111 : switch (mode)
1503 : {
1504 0 : case WFW_NO_WAIT:
1505 0 : return; /* never wait */
1506 0 : case WFW_GOT_STATUS:
1507 : Assert(false); /* can't get here, because we waited */
1508 0 : break;
1509 90 : case WFW_ONE_IDLE:
1510 90 : if (GetIdleWorker(pstate) != NO_SLOT)
1511 12 : return;
1512 78 : break;
1513 21 : case WFW_ALL_IDLE:
1514 21 : if (IsEveryWorkerIdle(pstate))
1515 8 : return;
1516 13 : break;
1517 : }
1518 :
1519 : /* Loop back, and this time wait for something to happen */
1520 91 : do_wait = true;
1521 : }
1522 : }
1523 :
1524 : /*
1525 : * Read one command message from the leader, blocking if necessary
1526 : * until one is available, and return it as a malloc'd string.
1527 : * On EOF, return NULL.
1528 : *
1529 : * This function is executed in worker processes.
1530 : */
1531 : static char *
1532 141 : getMessageFromLeader(int pipefd[2])
1533 : {
1534 141 : return readMessageFromPipe(pipefd[PIPE_READ]);
1535 : }
1536 :
1537 : /*
1538 : * Send a status message to the leader.
1539 : *
1540 : * This function is executed in worker processes.
1541 : */
1542 : static void
1543 115 : sendMessageToLeader(int pipefd[2], const char *str)
1544 : {
1545 115 : int len = strlen(str) + 1;
1546 :
1547 115 : if (pipewrite(pipefd[PIPE_WRITE], str, len) != len)
1548 0 : pg_fatal("could not write to the communication channel: %m");
1549 115 : }
1550 :
1551 : /*
1552 : * Wait until some descriptor in "workerset" becomes readable.
1553 : * Returns -1 on error, else the number of readable descriptors.
1554 : */
1555 : static int
1556 103 : select_loop(int maxFd, fd_set *workerset)
1557 : {
1558 : int i;
1559 103 : fd_set saveSet = *workerset;
1560 :
1561 : for (;;)
1562 : {
1563 103 : *workerset = saveSet;
1564 103 : i = select(maxFd + 1, workerset, NULL, NULL, NULL);
1565 :
1566 : #ifndef WIN32
1567 103 : if (i < 0 && errno == EINTR)
1568 0 : continue;
1569 : #else
1570 : if (i == SOCKET_ERROR && WSAGetLastError() == WSAEINTR)
1571 : continue;
1572 : #endif
1573 103 : break;
1574 : }
1575 :
1576 103 : return i;
1577 : }
1578 :
1579 :
1580 : /*
1581 : * Check for messages from worker processes.
1582 : *
1583 : * If a message is available, return it as a malloc'd string, and put the
1584 : * index of the sending worker in *worker.
1585 : *
1586 : * If nothing is available, wait if "do_wait" is true, else return NULL.
1587 : *
1588 : * If we detect EOF on any socket, we'll return NULL. It's not great that
1589 : * that's hard to distinguish from the no-data-available case, but for now
1590 : * our one caller is okay with that.
1591 : *
1592 : * This function is executed in the leader process.
1593 : */
1594 : static char *
1595 211 : getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker)
1596 : {
1597 : int i;
1598 : fd_set workerset;
1599 211 : int maxFd = -1;
1600 211 : struct timeval nowait = {0, 0};
1601 :
1602 : /* construct bitmap of socket descriptors for select() */
1603 3587 : FD_ZERO(&workerset);
1604 715 : for (i = 0; i < pstate->numWorkers; i++)
1605 : {
1606 504 : if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
1607 0 : continue;
1608 504 : FD_SET(pstate->parallelSlot[i].pipeRead, &workerset);
1609 504 : if (pstate->parallelSlot[i].pipeRead > maxFd)
1610 504 : maxFd = pstate->parallelSlot[i].pipeRead;
1611 : }
1612 :
1613 211 : if (do_wait)
1614 : {
1615 103 : i = select_loop(maxFd, &workerset);
1616 : Assert(i != 0);
1617 : }
1618 : else
1619 : {
1620 108 : if ((i = select(maxFd + 1, &workerset, NULL, NULL, &nowait)) == 0)
1621 96 : return NULL;
1622 : }
1623 :
1624 115 : if (i < 0)
1625 0 : pg_fatal("%s() failed: %m", "select");
1626 :
1627 172 : for (i = 0; i < pstate->numWorkers; i++)
1628 : {
1629 : char *msg;
1630 :
1631 172 : if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
1632 0 : continue;
1633 172 : if (!FD_ISSET(pstate->parallelSlot[i].pipeRead, &workerset))
1634 57 : continue;
1635 :
1636 : /*
1637 : * Read the message if any. If the socket is ready because of EOF,
1638 : * we'll return NULL instead (and the socket will stay ready, so the
1639 : * condition will persist).
1640 : *
1641 : * Note: because this is a blocking read, we'll wait if only part of
1642 : * the message is available. Waiting a long time would be bad, but
1643 : * since worker status messages are short and are always sent in one
1644 : * operation, it shouldn't be a problem in practice.
1645 : */
1646 115 : msg = readMessageFromPipe(pstate->parallelSlot[i].pipeRead);
1647 115 : *worker = i;
1648 115 : return msg;
1649 : }
1650 : Assert(false);
1651 0 : return NULL;
1652 : }
1653 :
1654 : /*
1655 : * Send a command message to the specified worker process.
1656 : *
1657 : * This function is executed in the leader process.
1658 : */
1659 : static void
1660 115 : sendMessageToWorker(ParallelState *pstate, int worker, const char *str)
1661 : {
1662 115 : int len = strlen(str) + 1;
1663 :
1664 115 : if (pipewrite(pstate->parallelSlot[worker].pipeWrite, str, len) != len)
1665 : {
1666 0 : pg_fatal("could not write to the communication channel: %m");
1667 : }
1668 115 : }
1669 :
1670 : /*
1671 : * Read one message from the specified pipe (fd), blocking if necessary
1672 : * until one is available, and return it as a malloc'd string.
1673 : * On EOF, return NULL.
1674 : *
1675 : * A "message" on the channel is just a null-terminated string.
1676 : */
1677 : static char *
1678 256 : readMessageFromPipe(int fd)
1679 : {
1680 : char *msg;
1681 : int msgsize,
1682 : bufsize;
1683 : int ret;
1684 :
1685 : /*
1686 : * In theory, if we let piperead() read multiple bytes, it might give us
1687 : * back fragments of multiple messages. (That can't actually occur, since
1688 : * neither leader nor workers send more than one message without waiting
1689 : * for a reply, but we don't wish to assume that here.) For simplicity,
1690 : * read a byte at a time until we get the terminating '\0'. This method
1691 : * is a bit inefficient, but since this is only used for relatively short
1692 : * command and status strings, it shouldn't matter.
1693 : */
1694 256 : bufsize = 64; /* could be any number */
1695 256 : msg = (char *) pg_malloc(bufsize);
1696 256 : msgsize = 0;
1697 : for (;;)
1698 : {
1699 2438 : Assert(msgsize < bufsize);
1700 2694 : ret = piperead(fd, msg + msgsize, 1);
1701 2694 : if (ret <= 0)
1702 26 : break; /* error or connection closure */
1703 :
1704 : Assert(ret == 1);
1705 :
1706 2668 : if (msg[msgsize] == '\0')
1707 230 : return msg; /* collected whole message */
1708 :
1709 2438 : msgsize++;
1710 2438 : if (msgsize == bufsize) /* enlarge buffer if needed */
1711 : {
1712 0 : bufsize += 16; /* could be any number */
1713 0 : msg = (char *) pg_realloc(msg, bufsize);
1714 : }
1715 : }
1716 :
1717 : /* Other end has closed the connection */
1718 26 : pg_free(msg);
1719 26 : return NULL;
1720 : }
1721 :
1722 : #ifdef WIN32
1723 :
1724 : /*
1725 : * This is a replacement version of pipe(2) for Windows which allows the pipe
1726 : * handles to be used in select().
1727 : *
1728 : * Reads and writes on the pipe must go through piperead()/pipewrite().
1729 : *
1730 : * For consistency with Unix we declare the returned handles as "int".
1731 : * This is okay even on WIN64 because system handles are not more than
1732 : * 32 bits wide, but we do have to do some casting.
1733 : */
1734 : static int
1735 : pgpipe(int handles[2])
1736 : {
1737 : pgsocket s,
1738 : tmp_sock;
1739 : struct sockaddr_in serv_addr;
1740 : int len = sizeof(serv_addr);
1741 :
1742 : /* We have to use the Unix socket invalid file descriptor value here. */
1743 : handles[0] = handles[1] = -1;
1744 :
1745 : /*
1746 : * setup listen socket
1747 : */
1748 : if ((s = socket(AF_INET, SOCK_STREAM, 0)) == PGINVALID_SOCKET)
1749 : {
1750 : pg_log_error("pgpipe: could not create socket: error code %d",
1751 : WSAGetLastError());
1752 : return -1;
1753 : }
1754 :
1755 : memset(&serv_addr, 0, sizeof(serv_addr));
1756 : serv_addr.sin_family = AF_INET;
1757 : serv_addr.sin_port = pg_hton16(0);
1758 : serv_addr.sin_addr.s_addr = pg_hton32(INADDR_LOOPBACK);
1759 : if (bind(s, (SOCKADDR *) &serv_addr, len) == SOCKET_ERROR)
1760 : {
1761 : pg_log_error("pgpipe: could not bind: error code %d",
1762 : WSAGetLastError());
1763 : closesocket(s);
1764 : return -1;
1765 : }
1766 : if (listen(s, 1) == SOCKET_ERROR)
1767 : {
1768 : pg_log_error("pgpipe: could not listen: error code %d",
1769 : WSAGetLastError());
1770 : closesocket(s);
1771 : return -1;
1772 : }
1773 : if (getsockname(s, (SOCKADDR *) &serv_addr, &len) == SOCKET_ERROR)
1774 : {
1775 : pg_log_error("pgpipe: %s() failed: error code %d", "getsockname",
1776 : WSAGetLastError());
1777 : closesocket(s);
1778 : return -1;
1779 : }
1780 :
1781 : /*
1782 : * setup pipe handles
1783 : */
1784 : if ((tmp_sock = socket(AF_INET, SOCK_STREAM, 0)) == PGINVALID_SOCKET)
1785 : {
1786 : pg_log_error("pgpipe: could not create second socket: error code %d",
1787 : WSAGetLastError());
1788 : closesocket(s);
1789 : return -1;
1790 : }
1791 : handles[1] = (int) tmp_sock;
1792 :
1793 : if (connect(handles[1], (SOCKADDR *) &serv_addr, len) == SOCKET_ERROR)
1794 : {
1795 : pg_log_error("pgpipe: could not connect socket: error code %d",
1796 : WSAGetLastError());
1797 : closesocket(handles[1]);
1798 : handles[1] = -1;
1799 : closesocket(s);
1800 : return -1;
1801 : }
1802 : if ((tmp_sock = accept(s, (SOCKADDR *) &serv_addr, &len)) == PGINVALID_SOCKET)
1803 : {
1804 : pg_log_error("pgpipe: could not accept connection: error code %d",
1805 : WSAGetLastError());
1806 : closesocket(handles[1]);
1807 : handles[1] = -1;
1808 : closesocket(s);
1809 : return -1;
1810 : }
1811 : handles[0] = (int) tmp_sock;
1812 :
1813 : closesocket(s);
1814 : return 0;
1815 : }
1816 :
1817 : #endif /* WIN32 */
|