Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * parallel.c
4 : *
5 : * Parallel support for pg_dump and pg_restore
6 : *
7 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
8 : * Portions Copyright (c) 1994, Regents of the University of California
9 : *
10 : * IDENTIFICATION
11 : * src/bin/pg_dump/parallel.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 :
16 : /*
17 : * Parallel operation works like this:
18 : *
19 : * The original, leader process calls ParallelBackupStart(), which forks off
20 : * the desired number of worker processes, which each enter WaitForCommands().
21 : *
22 : * The leader process dispatches an individual work item to one of the worker
23 : * processes in DispatchJobForTocEntry(). We send a command string such as
24 : * "DUMP 1234" or "RESTORE 1234", where 1234 is the TocEntry ID.
25 : * The worker process receives and decodes the command and passes it to the
26 : * routine pointed to by AH->WorkerJobDumpPtr or AH->WorkerJobRestorePtr,
27 : * which are routines of the current archive format. That routine performs
28 : * the required action (dump or restore) and returns an integer status code.
29 : * This is passed back to the leader where we pass it to the
30 : * ParallelCompletionPtr callback function that was passed to
31 : * DispatchJobForTocEntry(). The callback function does state updating
32 : * for the leader control logic in pg_backup_archiver.c.
33 : *
34 : * In principle additional archive-format-specific information might be needed
35 : * in commands or worker status responses, but so far that hasn't proved
36 : * necessary, since workers have full copies of the ArchiveHandle/TocEntry
37 : * data structures. Remember that we have forked off the workers only after
38 : * we have read in the catalog. That's why our worker processes can also
39 : * access the catalog information. (In the Windows case, the workers are
40 : * threads in the same process. To avoid problems, they work with cloned
41 : * copies of the Archive data structure; see RunWorker().)
42 : *
43 : * In the leader process, the workerStatus field for each worker has one of
44 : * the following values:
45 : * WRKR_NOT_STARTED: we've not yet forked this worker
46 : * WRKR_IDLE: it's waiting for a command
47 : * WRKR_WORKING: it's working on a command
48 : * WRKR_TERMINATED: process ended
49 : * The pstate->te[] entry for each worker is valid when it's in WRKR_WORKING
50 : * state, and must be NULL in other states.
51 : */
52 :
53 : #include "postgres_fe.h"
54 :
55 : #ifndef WIN32
56 : #include <sys/select.h>
57 : #include <sys/wait.h>
58 : #include <signal.h>
59 : #include <unistd.h>
60 : #include <fcntl.h>
61 : #endif
62 :
63 : #include "fe_utils/string_utils.h"
64 : #include "parallel.h"
65 : #include "pg_backup_utils.h"
66 : #ifdef WIN32
67 : #include "port/pg_bswap.h"
68 : #endif
69 :
70 : /* Mnemonic macros for indexing the fd array returned by pipe(2) */
71 : #define PIPE_READ 0
72 : #define PIPE_WRITE 1
73 :
74 : #define NO_SLOT (-1) /* Failure result for GetIdleWorker() */
75 :
76 : /* Worker process statuses */
77 : typedef enum
78 : {
79 : WRKR_NOT_STARTED = 0,
80 : WRKR_IDLE,
81 : WRKR_WORKING,
82 : WRKR_TERMINATED,
83 : } T_WorkerStatus;
84 :
85 : #define WORKER_IS_RUNNING(workerStatus) \
86 : ((workerStatus) == WRKR_IDLE || (workerStatus) == WRKR_WORKING)
87 :
88 : /*
89 : * Private per-parallel-worker state (typedef for this is in parallel.h).
90 : *
91 : * Much of this is valid only in the leader process (or, on Windows, should
92 : * be touched only by the leader thread). But the AH field should be touched
93 : * only by workers. The pipe descriptors are valid everywhere.
94 : */
95 : struct ParallelSlot
96 : {
97 : T_WorkerStatus workerStatus; /* see enum above */
98 :
99 : /* These fields are valid if workerStatus == WRKR_WORKING: */
100 : ParallelCompletionPtr callback; /* function to call on completion */
101 : void *callback_data; /* passthrough data for it */
102 :
103 : ArchiveHandle *AH; /* Archive data worker is using */
104 :
105 : int pipeRead; /* leader's end of the pipes */
106 : int pipeWrite;
107 : int pipeRevRead; /* child's end of the pipes */
108 : int pipeRevWrite;
109 :
110 : /* Child process/thread identity info: */
111 : #ifdef WIN32
112 : uintptr_t hThread;
113 : unsigned int threadId;
114 : #else
115 : pid_t pid;
116 : #endif
117 : };
118 :
119 : #ifdef WIN32
120 :
121 : /*
122 : * Structure to hold info passed by _beginthreadex() to the function it calls
123 : * via its single allowed argument.
124 : */
125 : typedef struct
126 : {
127 : ArchiveHandle *AH; /* leader database connection */
128 : ParallelSlot *slot; /* this worker's parallel slot */
129 : } WorkerInfo;
130 :
131 : /* Windows implementation of pipe access */
132 : static int pgpipe(int handles[2]);
133 : #define piperead(a,b,c) recv(a,b,c,0)
134 : #define pipewrite(a,b,c) send(a,b,c,0)
135 :
136 : #else /* !WIN32 */
137 :
138 : /* Non-Windows implementation of pipe access */
139 : #define pgpipe(a) pipe(a)
140 : #define piperead(a,b,c) read(a,b,c)
141 : #define pipewrite(a,b,c) write(a,b,c)
142 :
143 : #endif /* WIN32 */
144 :
145 : /*
146 : * State info for archive_close_connection() shutdown callback.
147 : */
148 : typedef struct ShutdownInformation
149 : {
150 : ParallelState *pstate;
151 : Archive *AHX;
152 : } ShutdownInformation;
153 :
154 : static ShutdownInformation shutdown_info;
155 :
156 : /*
157 : * State info for signal handling.
158 : * We assume signal_info initializes to zeroes.
159 : *
160 : * On Unix, myAH is the leader DB connection in the leader process, and the
161 : * worker's own connection in worker processes. On Windows, we have only one
162 : * instance of signal_info, so myAH is the leader connection and the worker
163 : * connections must be dug out of pstate->parallelSlot[].
164 : */
165 : typedef struct DumpSignalInformation
166 : {
167 : ArchiveHandle *myAH; /* database connection to issue cancel for */
168 : ParallelState *pstate; /* parallel state, if any */
169 : bool handler_set; /* signal handler set up in this process? */
170 : #ifndef WIN32
171 : bool am_worker; /* am I a worker process? */
172 : #endif
173 : } DumpSignalInformation;
174 :
175 : static volatile DumpSignalInformation signal_info;
176 :
177 : #ifdef WIN32
178 : static CRITICAL_SECTION signal_info_lock;
179 : #endif
180 :
181 : /*
182 : * Write a simple string to stderr --- must be safe in a signal handler.
183 : * We ignore the write() result since there's not much we could do about it.
184 : * Certain compilers make that harder than it ought to be.
185 : */
186 : #define write_stderr(str) \
187 : do { \
188 : const char *str_ = (str); \
189 : int rc_; \
190 : rc_ = write(fileno(stderr), str_, strlen(str_)); \
191 : (void) rc_; \
192 : } while (0)
193 :
194 :
195 : #ifdef WIN32
196 : /* file-scope variables */
197 : static DWORD tls_index;
198 :
199 : /* globally visible variables (needed by exit_nicely) */
200 : bool parallel_init_done = false;
201 : DWORD mainThreadId;
202 : #endif /* WIN32 */
203 :
204 : /* Local function prototypes */
205 : static ParallelSlot *GetMyPSlot(ParallelState *pstate);
206 : static void archive_close_connection(int code, void *arg);
207 : static void ShutdownWorkersHard(ParallelState *pstate);
208 : static void WaitForTerminatingWorkers(ParallelState *pstate);
209 : static void set_cancel_handler(void);
210 : static void set_cancel_pstate(ParallelState *pstate);
211 : static void set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH);
212 : static void RunWorker(ArchiveHandle *AH, ParallelSlot *slot);
213 : static int GetIdleWorker(ParallelState *pstate);
214 : static bool HasEveryWorkerTerminated(ParallelState *pstate);
215 : static void lockTableForWorker(ArchiveHandle *AH, TocEntry *te);
216 : static void WaitForCommands(ArchiveHandle *AH, int pipefd[2]);
217 : static bool ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate,
218 : bool do_wait);
219 : static char *getMessageFromLeader(int pipefd[2]);
220 : static void sendMessageToLeader(int pipefd[2], const char *str);
221 : static int select_loop(int maxFd, fd_set *workerset);
222 : static char *getMessageFromWorker(ParallelState *pstate,
223 : bool do_wait, int *worker);
224 : static void sendMessageToWorker(ParallelState *pstate,
225 : int worker, const char *str);
226 : static char *readMessageFromPipe(int fd);
227 :
228 : #define messageStartsWith(msg, prefix) \
229 : (strncmp(msg, prefix, strlen(prefix)) == 0)
230 :
231 :
232 : /*
233 : * Initialize parallel dump support --- should be called early in process
234 : * startup. (Currently, this is called whether or not we intend parallel
235 : * activity.)
236 : */
237 : void
238 896 : init_parallel_dump_utils(void)
239 : {
240 : #ifdef WIN32
241 : if (!parallel_init_done)
242 : {
243 : WSADATA wsaData;
244 : int err;
245 :
246 : /* Prepare for threaded operation */
247 : tls_index = TlsAlloc();
248 : mainThreadId = GetCurrentThreadId();
249 :
250 : /* Initialize socket access */
251 : err = WSAStartup(MAKEWORD(2, 2), &wsaData);
252 : if (err != 0)
253 : pg_fatal("%s() failed: error code %d", "WSAStartup", err);
254 :
255 : parallel_init_done = true;
256 : }
257 : #endif
258 896 : }
259 :
260 : /*
261 : * Find the ParallelSlot for the current worker process or thread.
262 : *
263 : * Returns NULL if no matching slot is found (this implies we're the leader).
264 : */
265 : static ParallelSlot *
266 0 : GetMyPSlot(ParallelState *pstate)
267 : {
268 : int i;
269 :
270 0 : for (i = 0; i < pstate->numWorkers; i++)
271 : {
272 : #ifdef WIN32
273 : if (pstate->parallelSlot[i].threadId == GetCurrentThreadId())
274 : #else
275 0 : if (pstate->parallelSlot[i].pid == getpid())
276 : #endif
277 0 : return &(pstate->parallelSlot[i]);
278 : }
279 :
280 0 : return NULL;
281 : }
282 :
283 : /*
284 : * A thread-local version of getLocalPQExpBuffer().
285 : *
286 : * Non-reentrant but reduces memory leakage: we'll consume one buffer per
287 : * thread, which is much better than one per fmtId/fmtQualifiedId call.
288 : */
289 : #ifdef WIN32
290 : static PQExpBuffer
291 : getThreadLocalPQExpBuffer(void)
292 : {
293 : /*
294 : * The Tls code goes awry if we use a static var, so we provide for both
295 : * static and auto, and omit any use of the static var when using Tls. We
296 : * rely on TlsGetValue() to return 0 if the value is not yet set.
297 : */
298 : static PQExpBuffer s_id_return = NULL;
299 : PQExpBuffer id_return;
300 :
301 : if (parallel_init_done)
302 : id_return = (PQExpBuffer) TlsGetValue(tls_index);
303 : else
304 : id_return = s_id_return;
305 :
306 : if (id_return) /* first time through? */
307 : {
308 : /* same buffer, just wipe contents */
309 : resetPQExpBuffer(id_return);
310 : }
311 : else
312 : {
313 : /* new buffer */
314 : id_return = createPQExpBuffer();
315 : if (parallel_init_done)
316 : TlsSetValue(tls_index, id_return);
317 : else
318 : s_id_return = id_return;
319 : }
320 :
321 : return id_return;
322 : }
323 : #endif /* WIN32 */
324 :
325 : /*
326 : * pg_dump and pg_restore call this to register the cleanup handler
327 : * as soon as they've created the ArchiveHandle.
328 : */
329 : void
330 626 : on_exit_close_archive(Archive *AHX)
331 : {
332 626 : shutdown_info.AHX = AHX;
333 626 : on_exit_nicely(archive_close_connection, &shutdown_info);
334 626 : }
335 :
336 : /*
337 : * When pg_restore restores multiple databases, then update already added entry
338 : * into array for cleanup.
339 : */
340 : void
341 92 : replace_on_exit_close_archive(Archive *AHX)
342 : {
343 92 : shutdown_info.AHX = AHX;
344 92 : }
345 :
346 : /*
347 : * on_exit_nicely handler for shutting down database connections and
348 : * worker processes cleanly.
349 : */
350 : static void
351 502 : archive_close_connection(int code, void *arg)
352 : {
353 502 : ShutdownInformation *si = (ShutdownInformation *) arg;
354 :
355 502 : if (si->pstate)
356 : {
357 : /* In parallel mode, must figure out who we are */
358 0 : ParallelSlot *slot = GetMyPSlot(si->pstate);
359 :
360 0 : if (!slot)
361 : {
362 : /*
363 : * We're the leader. Forcibly shut down workers, then close our
364 : * own database connection, if any.
365 : */
366 0 : ShutdownWorkersHard(si->pstate);
367 :
368 0 : if (si->AHX)
369 0 : DisconnectDatabase(si->AHX);
370 : }
371 : else
372 : {
373 : /*
374 : * We're a worker. Shut down our own DB connection if any. On
375 : * Windows, we also have to close our communication sockets, to
376 : * emulate what will happen on Unix when the worker process exits.
377 : * (Without this, if this is a premature exit, the leader would
378 : * fail to detect it because there would be no EOF condition on
379 : * the other end of the pipe.)
380 : */
381 0 : if (slot->AH)
382 0 : DisconnectDatabase(&(slot->AH->public));
383 :
384 : #ifdef WIN32
385 : closesocket(slot->pipeRevRead);
386 : closesocket(slot->pipeRevWrite);
387 : #endif
388 : }
389 : }
390 : else
391 : {
392 : /* Non-parallel operation: just kill the leader DB connection */
393 502 : if (si->AHX)
394 502 : DisconnectDatabase(si->AHX);
395 : }
396 502 : }
397 :
398 : /*
399 : * Forcibly shut down any remaining workers, waiting for them to finish.
400 : *
401 : * Note that we don't expect to come here during normal exit (the workers
402 : * should be long gone, and the ParallelState too). We're only here in a
403 : * pg_fatal() situation, so intervening to cancel active commands is
404 : * appropriate.
405 : */
406 : static void
407 0 : ShutdownWorkersHard(ParallelState *pstate)
408 : {
409 : int i;
410 :
411 : /*
412 : * Close our write end of the sockets so that any workers waiting for
413 : * commands know they can exit. (Note: some of the pipeWrite fields might
414 : * still be zero, if we failed to initialize all the workers. Hence, just
415 : * ignore errors here.)
416 : */
417 0 : for (i = 0; i < pstate->numWorkers; i++)
418 0 : closesocket(pstate->parallelSlot[i].pipeWrite);
419 :
420 : /*
421 : * Force early termination of any commands currently in progress.
422 : */
423 : #ifndef WIN32
424 : /* On non-Windows, send SIGTERM to each worker process. */
425 0 : for (i = 0; i < pstate->numWorkers; i++)
426 : {
427 0 : pid_t pid = pstate->parallelSlot[i].pid;
428 :
429 0 : if (pid != 0)
430 0 : kill(pid, SIGTERM);
431 : }
432 : #else
433 :
434 : /*
435 : * On Windows, send query cancels directly to the workers' backends. Use
436 : * a critical section to ensure worker threads don't change state.
437 : */
438 : EnterCriticalSection(&signal_info_lock);
439 : for (i = 0; i < pstate->numWorkers; i++)
440 : {
441 : ArchiveHandle *AH = pstate->parallelSlot[i].AH;
442 : char errbuf[1];
443 :
444 : if (AH != NULL && AH->connCancel != NULL)
445 : (void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf));
446 : }
447 : LeaveCriticalSection(&signal_info_lock);
448 : #endif
449 :
450 : /* Now wait for them to terminate. */
451 0 : WaitForTerminatingWorkers(pstate);
452 0 : }
453 :
454 : /*
455 : * Wait for all workers to terminate.
456 : */
457 : static void
458 28 : WaitForTerminatingWorkers(ParallelState *pstate)
459 : {
460 88 : while (!HasEveryWorkerTerminated(pstate))
461 : {
462 60 : ParallelSlot *slot = NULL;
463 : int j;
464 :
465 : #ifndef WIN32
466 : /* On non-Windows, use wait() to wait for next worker to end */
467 : int status;
468 60 : pid_t pid = wait(&status);
469 :
470 : /* Find dead worker's slot, and clear the PID field */
471 96 : for (j = 0; j < pstate->numWorkers; j++)
472 : {
473 96 : slot = &(pstate->parallelSlot[j]);
474 96 : if (slot->pid == pid)
475 : {
476 60 : slot->pid = 0;
477 60 : break;
478 : }
479 : }
480 : #else /* WIN32 */
481 : /* On Windows, we must use WaitForMultipleObjects() */
482 : HANDLE *lpHandles = pg_malloc(sizeof(HANDLE) * pstate->numWorkers);
483 : int nrun = 0;
484 : DWORD ret;
485 : uintptr_t hThread;
486 :
487 : for (j = 0; j < pstate->numWorkers; j++)
488 : {
489 : if (WORKER_IS_RUNNING(pstate->parallelSlot[j].workerStatus))
490 : {
491 : lpHandles[nrun] = (HANDLE) pstate->parallelSlot[j].hThread;
492 : nrun++;
493 : }
494 : }
495 : ret = WaitForMultipleObjects(nrun, lpHandles, false, INFINITE);
496 : Assert(ret != WAIT_FAILED);
497 : hThread = (uintptr_t) lpHandles[ret - WAIT_OBJECT_0];
498 : free(lpHandles);
499 :
500 : /* Find dead worker's slot, and clear the hThread field */
501 : for (j = 0; j < pstate->numWorkers; j++)
502 : {
503 : slot = &(pstate->parallelSlot[j]);
504 : if (slot->hThread == hThread)
505 : {
506 : /* For cleanliness, close handles for dead threads */
507 : CloseHandle((HANDLE) slot->hThread);
508 : slot->hThread = (uintptr_t) INVALID_HANDLE_VALUE;
509 : break;
510 : }
511 : }
512 : #endif /* WIN32 */
513 :
514 : /* On all platforms, update workerStatus and te[] as well */
515 : Assert(j < pstate->numWorkers);
516 60 : slot->workerStatus = WRKR_TERMINATED;
517 60 : pstate->te[j] = NULL;
518 : }
519 28 : }
520 :
521 :
522 : /*
523 : * Code for responding to cancel interrupts (SIGINT, control-C, etc)
524 : *
525 : * This doesn't quite belong in this module, but it needs access to the
526 : * ParallelState data, so there's not really a better place either.
527 : *
528 : * When we get a cancel interrupt, we could just die, but in pg_restore that
529 : * could leave a SQL command (e.g., CREATE INDEX on a large table) running
530 : * for a long time. Instead, we try to send a cancel request and then die.
531 : * pg_dump probably doesn't really need this, but we might as well use it
532 : * there too. Note that sending the cancel directly from the signal handler
533 : * is safe because PQcancel() is written to make it so.
534 : *
535 : * In parallel operation on Unix, each process is responsible for canceling
536 : * its own connection (this must be so because nobody else has access to it).
537 : * Furthermore, the leader process should attempt to forward its signal to
538 : * each child. In simple manual use of pg_dump/pg_restore, forwarding isn't
539 : * needed because typing control-C at the console would deliver SIGINT to
540 : * every member of the terminal process group --- but in other scenarios it
541 : * might be that only the leader gets signaled.
542 : *
543 : * On Windows, the cancel handler runs in a separate thread, because that's
544 : * how SetConsoleCtrlHandler works. We make it stop worker threads, send
545 : * cancels on all active connections, and then return FALSE, which will allow
546 : * the process to die. For safety's sake, we use a critical section to
547 : * protect the PGcancel structures against being changed while the signal
548 : * thread runs.
549 : */
550 :
551 : #ifndef WIN32
552 :
553 : /*
554 : * Signal handler (Unix only)
555 : */
556 : static void
557 0 : sigTermHandler(SIGNAL_ARGS)
558 : {
559 : int i;
560 : char errbuf[1];
561 :
562 : /*
563 : * Some platforms allow delivery of new signals to interrupt an active
564 : * signal handler. That could muck up our attempt to send PQcancel, so
565 : * disable the signals that set_cancel_handler enabled.
566 : */
567 0 : pqsignal(SIGINT, SIG_IGN);
568 0 : pqsignal(SIGTERM, SIG_IGN);
569 0 : pqsignal(SIGQUIT, SIG_IGN);
570 :
571 : /*
572 : * If we're in the leader, forward signal to all workers. (It seems best
573 : * to do this before PQcancel; killing the leader transaction will result
574 : * in invalid-snapshot errors from active workers, which maybe we can
575 : * quiet by killing workers first.) Ignore any errors.
576 : */
577 0 : if (signal_info.pstate != NULL)
578 : {
579 0 : for (i = 0; i < signal_info.pstate->numWorkers; i++)
580 : {
581 0 : pid_t pid = signal_info.pstate->parallelSlot[i].pid;
582 :
583 0 : if (pid != 0)
584 0 : kill(pid, SIGTERM);
585 : }
586 : }
587 :
588 : /*
589 : * Send QueryCancel if we have a connection to send to. Ignore errors,
590 : * there's not much we can do about them anyway.
591 : */
592 0 : if (signal_info.myAH != NULL && signal_info.myAH->connCancel != NULL)
593 0 : (void) PQcancel(signal_info.myAH->connCancel, errbuf, sizeof(errbuf));
594 :
595 : /*
596 : * Report we're quitting, using nothing more complicated than write(2).
597 : * When in parallel operation, only the leader process should do this.
598 : */
599 0 : if (!signal_info.am_worker)
600 : {
601 0 : if (progname)
602 : {
603 0 : write_stderr(progname);
604 0 : write_stderr(": ");
605 : }
606 0 : write_stderr("terminated by user\n");
607 : }
608 :
609 : /*
610 : * And die, using _exit() not exit() because the latter will invoke atexit
611 : * handlers that can fail if we interrupted related code.
612 : */
613 0 : _exit(1);
614 : }
615 :
616 : /*
617 : * Enable cancel interrupt handler, if not already done.
618 : */
619 : static void
620 1414 : set_cancel_handler(void)
621 : {
622 : /*
623 : * When forking, signal_info.handler_set will propagate into the new
624 : * process, but that's fine because the signal handler state does too.
625 : */
626 1414 : if (!signal_info.handler_set)
627 : {
628 560 : signal_info.handler_set = true;
629 :
630 560 : pqsignal(SIGINT, sigTermHandler);
631 560 : pqsignal(SIGTERM, sigTermHandler);
632 560 : pqsignal(SIGQUIT, sigTermHandler);
633 : }
634 1414 : }
635 :
636 : #else /* WIN32 */
637 :
638 : /*
639 : * Console interrupt handler --- runs in a newly-started thread.
640 : *
641 : * After stopping other threads and sending cancel requests on all open
642 : * connections, we return FALSE which will allow the default ExitProcess()
643 : * action to be taken.
644 : */
645 : static BOOL WINAPI
646 : consoleHandler(DWORD dwCtrlType)
647 : {
648 : int i;
649 : char errbuf[1];
650 :
651 : if (dwCtrlType == CTRL_C_EVENT ||
652 : dwCtrlType == CTRL_BREAK_EVENT)
653 : {
654 : /* Critical section prevents changing data we look at here */
655 : EnterCriticalSection(&signal_info_lock);
656 :
657 : /*
658 : * If in parallel mode, stop worker threads and send QueryCancel to
659 : * their connected backends. The main point of stopping the worker
660 : * threads is to keep them from reporting the query cancels as errors,
661 : * which would clutter the user's screen. We needn't stop the leader
662 : * thread since it won't be doing much anyway. Do this before
663 : * canceling the main transaction, else we might get invalid-snapshot
664 : * errors reported before we can stop the workers. Ignore errors,
665 : * there's not much we can do about them anyway.
666 : */
667 : if (signal_info.pstate != NULL)
668 : {
669 : for (i = 0; i < signal_info.pstate->numWorkers; i++)
670 : {
671 : ParallelSlot *slot = &(signal_info.pstate->parallelSlot[i]);
672 : ArchiveHandle *AH = slot->AH;
673 : HANDLE hThread = (HANDLE) slot->hThread;
674 :
675 : /*
676 : * Using TerminateThread here may leave some resources leaked,
677 : * but it doesn't matter since we're about to end the whole
678 : * process.
679 : */
680 : if (hThread != INVALID_HANDLE_VALUE)
681 : TerminateThread(hThread, 0);
682 :
683 : if (AH != NULL && AH->connCancel != NULL)
684 : (void) PQcancel(AH->connCancel, errbuf, sizeof(errbuf));
685 : }
686 : }
687 :
688 : /*
689 : * Send QueryCancel to leader connection, if enabled. Ignore errors,
690 : * there's not much we can do about them anyway.
691 : */
692 : if (signal_info.myAH != NULL && signal_info.myAH->connCancel != NULL)
693 : (void) PQcancel(signal_info.myAH->connCancel,
694 : errbuf, sizeof(errbuf));
695 :
696 : LeaveCriticalSection(&signal_info_lock);
697 :
698 : /*
699 : * Report we're quitting, using nothing more complicated than
700 : * write(2). (We might be able to get away with using pg_log_*()
701 : * here, but since we terminated other threads uncleanly above, it
702 : * seems better to assume as little as possible.)
703 : */
704 : if (progname)
705 : {
706 : write_stderr(progname);
707 : write_stderr(": ");
708 : }
709 : write_stderr("terminated by user\n");
710 : }
711 :
712 : /* Always return FALSE to allow signal handling to continue */
713 : return FALSE;
714 : }
715 :
716 : /*
717 : * Enable cancel interrupt handler, if not already done.
718 : */
719 : static void
720 : set_cancel_handler(void)
721 : {
722 : if (!signal_info.handler_set)
723 : {
724 : signal_info.handler_set = true;
725 :
726 : InitializeCriticalSection(&signal_info_lock);
727 :
728 : SetConsoleCtrlHandler(consoleHandler, TRUE);
729 : }
730 : }
731 :
732 : #endif /* WIN32 */
733 :
734 :
735 : /*
736 : * set_archive_cancel_info
737 : *
738 : * Fill AH->connCancel with cancellation info for the specified database
739 : * connection; or clear it if conn is NULL.
740 : */
741 : void
742 1414 : set_archive_cancel_info(ArchiveHandle *AH, PGconn *conn)
743 : {
744 : PGcancel *oldConnCancel;
745 :
746 : /*
747 : * Activate the interrupt handler if we didn't yet in this process. On
748 : * Windows, this also initializes signal_info_lock; therefore it's
749 : * important that this happen at least once before we fork off any
750 : * threads.
751 : */
752 1414 : set_cancel_handler();
753 :
754 : /*
755 : * On Unix, we assume that storing a pointer value is atomic with respect
756 : * to any possible signal interrupt. On Windows, use a critical section.
757 : */
758 :
759 : #ifdef WIN32
760 : EnterCriticalSection(&signal_info_lock);
761 : #endif
762 :
763 : /* Free the old one if we have one */
764 1414 : oldConnCancel = AH->connCancel;
765 : /* be sure interrupt handler doesn't use pointer while freeing */
766 1414 : AH->connCancel = NULL;
767 :
768 1414 : if (oldConnCancel != NULL)
769 746 : PQfreeCancel(oldConnCancel);
770 :
771 : /* Set the new one if specified */
772 1414 : if (conn)
773 750 : AH->connCancel = PQgetCancel(conn);
774 :
775 : /*
776 : * On Unix, there's only ever one active ArchiveHandle per process, so we
777 : * can just set signal_info.myAH unconditionally. On Windows, do that
778 : * only in the main thread; worker threads have to make sure their
779 : * ArchiveHandle appears in the pstate data, which is dealt with in
780 : * RunWorker().
781 : */
782 : #ifndef WIN32
783 1414 : signal_info.myAH = AH;
784 : #else
785 : if (mainThreadId == GetCurrentThreadId())
786 : signal_info.myAH = AH;
787 : #endif
788 :
789 : #ifdef WIN32
790 : LeaveCriticalSection(&signal_info_lock);
791 : #endif
792 1414 : }
793 :
794 : /*
795 : * set_cancel_pstate
796 : *
797 : * Set signal_info.pstate to point to the specified ParallelState, if any.
798 : * We need this mainly to have an interlock against Windows signal thread.
799 : */
800 : static void
801 56 : set_cancel_pstate(ParallelState *pstate)
802 : {
803 : #ifdef WIN32
804 : EnterCriticalSection(&signal_info_lock);
805 : #endif
806 :
807 56 : signal_info.pstate = pstate;
808 :
809 : #ifdef WIN32
810 : LeaveCriticalSection(&signal_info_lock);
811 : #endif
812 56 : }
813 :
814 : /*
815 : * set_cancel_slot_archive
816 : *
817 : * Set ParallelSlot's AH field to point to the specified archive, if any.
818 : * We need this mainly to have an interlock against Windows signal thread.
819 : */
820 : static void
821 120 : set_cancel_slot_archive(ParallelSlot *slot, ArchiveHandle *AH)
822 : {
823 : #ifdef WIN32
824 : EnterCriticalSection(&signal_info_lock);
825 : #endif
826 :
827 120 : slot->AH = AH;
828 :
829 : #ifdef WIN32
830 : LeaveCriticalSection(&signal_info_lock);
831 : #endif
832 120 : }
833 :
834 :
835 : /*
836 : * This function is called by both Unix and Windows variants to set up
837 : * and run a worker process. Caller should exit the process (or thread)
838 : * upon return.
839 : */
840 : static void
841 60 : RunWorker(ArchiveHandle *AH, ParallelSlot *slot)
842 : {
843 : int pipefd[2];
844 :
845 : /* fetch child ends of pipes */
846 60 : pipefd[PIPE_READ] = slot->pipeRevRead;
847 60 : pipefd[PIPE_WRITE] = slot->pipeRevWrite;
848 :
849 : /*
850 : * Clone the archive so that we have our own state to work with, and in
851 : * particular our own database connection.
852 : *
853 : * We clone on Unix as well as Windows, even though technically we don't
854 : * need to because fork() gives us a copy in our own address space
855 : * already. But CloneArchive resets the state information and also clones
856 : * the database connection which both seem kinda helpful.
857 : */
858 60 : AH = CloneArchive(AH);
859 :
860 : /* Remember cloned archive where signal handler can find it */
861 60 : set_cancel_slot_archive(slot, AH);
862 :
863 : /*
864 : * Call the setup worker function that's defined in the ArchiveHandle.
865 : */
866 60 : (AH->SetupWorkerPtr) ((Archive *) AH);
867 :
868 : /*
869 : * Execute commands until done.
870 : */
871 60 : WaitForCommands(AH, pipefd);
872 :
873 : /*
874 : * Disconnect from database and clean up.
875 : */
876 60 : set_cancel_slot_archive(slot, NULL);
877 60 : DisconnectDatabase(&(AH->public));
878 60 : DeCloneArchive(AH);
879 60 : }
880 :
881 : /*
882 : * Thread base function for Windows
883 : */
884 : #ifdef WIN32
885 : static unsigned __stdcall
886 : init_spawned_worker_win32(WorkerInfo *wi)
887 : {
888 : ArchiveHandle *AH = wi->AH;
889 : ParallelSlot *slot = wi->slot;
890 :
891 : /* Don't need WorkerInfo anymore */
892 : free(wi);
893 :
894 : /* Run the worker ... */
895 : RunWorker(AH, slot);
896 :
897 : /* Exit the thread */
898 : _endthreadex(0);
899 : return 0;
900 : }
901 : #endif /* WIN32 */
902 :
903 : /*
904 : * This function starts a parallel dump or restore by spawning off the worker
905 : * processes. For Windows, it creates a number of threads; on Unix the
906 : * workers are created with fork().
907 : */
908 : ParallelState *
909 108 : ParallelBackupStart(ArchiveHandle *AH)
910 : {
911 : ParallelState *pstate;
912 : int i;
913 :
914 : Assert(AH->public.numWorkers > 0);
915 :
916 108 : pstate = (ParallelState *) pg_malloc(sizeof(ParallelState));
917 :
918 108 : pstate->numWorkers = AH->public.numWorkers;
919 108 : pstate->te = NULL;
920 108 : pstate->parallelSlot = NULL;
921 :
922 108 : if (AH->public.numWorkers == 1)
923 80 : return pstate;
924 :
925 : /* Create status arrays, being sure to initialize all fields to 0 */
926 28 : pstate->te = (TocEntry **)
927 28 : pg_malloc0(pstate->numWorkers * sizeof(TocEntry *));
928 28 : pstate->parallelSlot = (ParallelSlot *)
929 28 : pg_malloc0(pstate->numWorkers * sizeof(ParallelSlot));
930 :
931 : #ifdef WIN32
932 : /* Make fmtId() and fmtQualifiedId() use thread-local storage */
933 : getLocalPQExpBuffer = getThreadLocalPQExpBuffer;
934 : #endif
935 :
936 : /*
937 : * Set the pstate in shutdown_info, to tell the exit handler that it must
938 : * clean up workers as well as the main database connection. But we don't
939 : * set this in signal_info yet, because we don't want child processes to
940 : * inherit non-NULL signal_info.pstate.
941 : */
942 28 : shutdown_info.pstate = pstate;
943 :
944 : /*
945 : * Temporarily disable query cancellation on the leader connection. This
946 : * ensures that child processes won't inherit valid AH->connCancel
947 : * settings and thus won't try to issue cancels against the leader's
948 : * connection. No harm is done if we fail while it's disabled, because
949 : * the leader connection is idle at this point anyway.
950 : */
951 28 : set_archive_cancel_info(AH, NULL);
952 :
953 : /* Ensure stdio state is quiesced before forking */
954 28 : fflush(NULL);
955 :
956 : /* Create desired number of workers */
957 88 : for (i = 0; i < pstate->numWorkers; i++)
958 : {
959 : #ifdef WIN32
960 : WorkerInfo *wi;
961 : uintptr_t handle;
962 : #else
963 : pid_t pid;
964 : #endif
965 60 : ParallelSlot *slot = &(pstate->parallelSlot[i]);
966 : int pipeMW[2],
967 : pipeWM[2];
968 :
969 : /* Create communication pipes for this worker */
970 60 : if (pgpipe(pipeMW) < 0 || pgpipe(pipeWM) < 0)
971 0 : pg_fatal("could not create communication channels: %m");
972 :
973 : /* leader's ends of the pipes */
974 60 : slot->pipeRead = pipeWM[PIPE_READ];
975 60 : slot->pipeWrite = pipeMW[PIPE_WRITE];
976 : /* child's ends of the pipes */
977 60 : slot->pipeRevRead = pipeMW[PIPE_READ];
978 60 : slot->pipeRevWrite = pipeWM[PIPE_WRITE];
979 :
980 : #ifdef WIN32
981 : /* Create transient structure to pass args to worker function */
982 : wi = (WorkerInfo *) pg_malloc(sizeof(WorkerInfo));
983 :
984 : wi->AH = AH;
985 : wi->slot = slot;
986 :
987 : handle = _beginthreadex(NULL, 0, (void *) &init_spawned_worker_win32,
988 : wi, 0, &(slot->threadId));
989 : slot->hThread = handle;
990 : slot->workerStatus = WRKR_IDLE;
991 : #else /* !WIN32 */
992 60 : pid = fork();
993 120 : if (pid == 0)
994 : {
995 : /* we are the worker */
996 : int j;
997 :
998 : /* this is needed for GetMyPSlot() */
999 60 : slot->pid = getpid();
1000 :
1001 : /* instruct signal handler that we're in a worker now */
1002 60 : signal_info.am_worker = true;
1003 :
1004 : /* close read end of Worker -> Leader */
1005 60 : closesocket(pipeWM[PIPE_READ]);
1006 : /* close write end of Leader -> Worker */
1007 60 : closesocket(pipeMW[PIPE_WRITE]);
1008 :
1009 : /*
1010 : * Close all inherited fds for communication of the leader with
1011 : * previously-forked workers.
1012 : */
1013 96 : for (j = 0; j < i; j++)
1014 : {
1015 36 : closesocket(pstate->parallelSlot[j].pipeRead);
1016 36 : closesocket(pstate->parallelSlot[j].pipeWrite);
1017 : }
1018 :
1019 : /* Run the worker ... */
1020 60 : RunWorker(AH, slot);
1021 :
1022 : /* We can just exit(0) when done */
1023 60 : exit(0);
1024 : }
1025 60 : else if (pid < 0)
1026 : {
1027 : /* fork failed */
1028 0 : pg_fatal("could not create worker process: %m");
1029 : }
1030 :
1031 : /* In Leader after successful fork */
1032 60 : slot->pid = pid;
1033 60 : slot->workerStatus = WRKR_IDLE;
1034 :
1035 : /* close read end of Leader -> Worker */
1036 60 : closesocket(pipeMW[PIPE_READ]);
1037 : /* close write end of Worker -> Leader */
1038 60 : closesocket(pipeWM[PIPE_WRITE]);
1039 : #endif /* WIN32 */
1040 : }
1041 :
1042 : /*
1043 : * Having forked off the workers, disable SIGPIPE so that leader isn't
1044 : * killed if it tries to send a command to a dead worker. We don't want
1045 : * the workers to inherit this setting, though.
1046 : */
1047 : #ifndef WIN32
1048 28 : pqsignal(SIGPIPE, SIG_IGN);
1049 : #endif
1050 :
1051 : /*
1052 : * Re-establish query cancellation on the leader connection.
1053 : */
1054 28 : set_archive_cancel_info(AH, AH->connection);
1055 :
1056 : /*
1057 : * Tell the cancel signal handler to forward signals to worker processes,
1058 : * too. (As with query cancel, we did not need this earlier because the
1059 : * workers have not yet been given anything to do; if we die before this
1060 : * point, any already-started workers will see EOF and quit promptly.)
1061 : */
1062 28 : set_cancel_pstate(pstate);
1063 :
1064 28 : return pstate;
1065 : }
1066 :
1067 : /*
1068 : * Close down a parallel dump or restore.
1069 : */
1070 : void
1071 108 : ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate)
1072 : {
1073 : int i;
1074 :
1075 : /* No work if non-parallel */
1076 108 : if (pstate->numWorkers == 1)
1077 80 : return;
1078 :
1079 : /* There should not be any unfinished jobs */
1080 : Assert(IsEveryWorkerIdle(pstate));
1081 :
1082 : /* Close the sockets so that the workers know they can exit */
1083 88 : for (i = 0; i < pstate->numWorkers; i++)
1084 : {
1085 60 : closesocket(pstate->parallelSlot[i].pipeRead);
1086 60 : closesocket(pstate->parallelSlot[i].pipeWrite);
1087 : }
1088 :
1089 : /* Wait for them to exit */
1090 28 : WaitForTerminatingWorkers(pstate);
1091 :
1092 : /*
1093 : * Unlink pstate from shutdown_info, so the exit handler will not try to
1094 : * use it; and likewise unlink from signal_info.
1095 : */
1096 28 : shutdown_info.pstate = NULL;
1097 28 : set_cancel_pstate(NULL);
1098 :
1099 : /* Release state (mere neatnik-ism, since we're about to terminate) */
1100 28 : free(pstate->te);
1101 28 : free(pstate->parallelSlot);
1102 28 : free(pstate);
1103 : }
1104 :
1105 : /*
1106 : * These next four functions handle construction and parsing of the command
1107 : * strings and response strings for parallel workers.
1108 : *
1109 : * Currently, these can be the same regardless of which archive format we are
1110 : * processing. In future, we might want to let format modules override these
1111 : * functions to add format-specific data to a command or response.
1112 : */
1113 :
1114 : /*
1115 : * buildWorkerCommand: format a command string to send to a worker.
1116 : *
1117 : * The string is built in the caller-supplied buffer of size buflen.
1118 : */
1119 : static void
1120 6036 : buildWorkerCommand(ArchiveHandle *AH, TocEntry *te, T_Action act,
1121 : char *buf, int buflen)
1122 : {
1123 6036 : if (act == ACT_DUMP)
1124 1498 : snprintf(buf, buflen, "DUMP %d", te->dumpId);
1125 4538 : else if (act == ACT_RESTORE)
1126 4538 : snprintf(buf, buflen, "RESTORE %d", te->dumpId);
1127 : else
1128 : Assert(false);
1129 6036 : }
1130 :
1131 : /*
1132 : * parseWorkerCommand: interpret a command string in a worker.
1133 : */
1134 : static void
1135 6036 : parseWorkerCommand(ArchiveHandle *AH, TocEntry **te, T_Action *act,
1136 : const char *msg)
1137 : {
1138 : DumpId dumpId;
1139 : int nBytes;
1140 :
1141 6036 : if (messageStartsWith(msg, "DUMP "))
1142 : {
1143 1498 : *act = ACT_DUMP;
1144 1498 : sscanf(msg, "DUMP %d%n", &dumpId, &nBytes);
1145 : Assert(nBytes == strlen(msg));
1146 1498 : *te = getTocEntryByDumpId(AH, dumpId);
1147 : Assert(*te != NULL);
1148 : }
1149 4538 : else if (messageStartsWith(msg, "RESTORE "))
1150 : {
1151 4538 : *act = ACT_RESTORE;
1152 4538 : sscanf(msg, "RESTORE %d%n", &dumpId, &nBytes);
1153 : Assert(nBytes == strlen(msg));
1154 4538 : *te = getTocEntryByDumpId(AH, dumpId);
1155 : Assert(*te != NULL);
1156 : }
1157 : else
1158 0 : pg_fatal("unrecognized command received from leader: \"%s\"",
1159 : msg);
1160 6036 : }
1161 :
1162 : /*
1163 : * buildWorkerResponse: format a response string to send to the leader.
1164 : *
1165 : * The string is built in the caller-supplied buffer of size buflen.
1166 : */
1167 : static void
1168 6036 : buildWorkerResponse(ArchiveHandle *AH, TocEntry *te, T_Action act, int status,
1169 : char *buf, int buflen)
1170 : {
1171 6036 : snprintf(buf, buflen, "OK %d %d %d",
1172 : te->dumpId,
1173 : status,
1174 : status == WORKER_IGNORED_ERRORS ? AH->public.n_errors : 0);
1175 6036 : }
1176 :
1177 : /*
1178 : * parseWorkerResponse: parse the status message returned by a worker.
1179 : *
1180 : * Returns the integer status code, and may update fields of AH and/or te.
1181 : */
1182 : static int
1183 6036 : parseWorkerResponse(ArchiveHandle *AH, TocEntry *te,
1184 : const char *msg)
1185 : {
1186 : DumpId dumpId;
1187 : int nBytes,
1188 : n_errors;
1189 6036 : int status = 0;
1190 :
1191 6036 : if (messageStartsWith(msg, "OK "))
1192 : {
1193 6036 : sscanf(msg, "OK %d %d %d%n", &dumpId, &status, &n_errors, &nBytes);
1194 :
1195 : Assert(dumpId == te->dumpId);
1196 : Assert(nBytes == strlen(msg));
1197 :
1198 6036 : AH->public.n_errors += n_errors;
1199 : }
1200 : else
1201 0 : pg_fatal("invalid message received from worker: \"%s\"",
1202 : msg);
1203 :
1204 6036 : return status;
1205 : }
1206 :
1207 : /*
1208 : * Dispatch a job to some free worker.
1209 : *
1210 : * te is the TocEntry to be processed, act is the action to be taken on it.
1211 : * callback is the function to call on completion of the job.
1212 : *
1213 : * If no worker is currently available, this will block, and previously
1214 : * registered callback functions may be called.
1215 : */
1216 : void
1217 6036 : DispatchJobForTocEntry(ArchiveHandle *AH,
1218 : ParallelState *pstate,
1219 : TocEntry *te,
1220 : T_Action act,
1221 : ParallelCompletionPtr callback,
1222 : void *callback_data)
1223 : {
1224 : int worker;
1225 : char buf[256];
1226 :
1227 : /* Get a worker, waiting if none are idle */
1228 7500 : while ((worker = GetIdleWorker(pstate)) == NO_SLOT)
1229 1464 : WaitForWorkers(AH, pstate, WFW_ONE_IDLE);
1230 :
1231 : /* Construct and send command string */
1232 6036 : buildWorkerCommand(AH, te, act, buf, sizeof(buf));
1233 :
1234 6036 : sendMessageToWorker(pstate, worker, buf);
1235 :
1236 : /* Remember worker is busy, and which TocEntry it's working on */
1237 6036 : pstate->parallelSlot[worker].workerStatus = WRKR_WORKING;
1238 6036 : pstate->parallelSlot[worker].callback = callback;
1239 6036 : pstate->parallelSlot[worker].callback_data = callback_data;
1240 6036 : pstate->te[worker] = te;
1241 6036 : }
1242 :
1243 : /*
1244 : * Find an idle worker and return its slot number.
1245 : * Return NO_SLOT if none are idle.
1246 : */
1247 : static int
1248 13300 : GetIdleWorker(ParallelState *pstate)
1249 : {
1250 : int i;
1251 :
1252 31076 : for (i = 0; i < pstate->numWorkers; i++)
1253 : {
1254 23846 : if (pstate->parallelSlot[i].workerStatus == WRKR_IDLE)
1255 6070 : return i;
1256 : }
1257 7230 : return NO_SLOT;
1258 : }
1259 :
1260 : /*
1261 : * Return true iff no worker is running.
1262 : */
1263 : static bool
1264 88 : HasEveryWorkerTerminated(ParallelState *pstate)
1265 : {
1266 : int i;
1267 :
1268 170 : for (i = 0; i < pstate->numWorkers; i++)
1269 : {
1270 142 : if (WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
1271 60 : return false;
1272 : }
1273 28 : return true;
1274 : }
1275 :
1276 : /*
1277 : * Return true iff every worker is in the WRKR_IDLE state.
1278 : */
1279 : bool
1280 116 : IsEveryWorkerIdle(ParallelState *pstate)
1281 : {
1282 : int i;
1283 :
1284 254 : for (i = 0; i < pstate->numWorkers; i++)
1285 : {
1286 206 : if (pstate->parallelSlot[i].workerStatus != WRKR_IDLE)
1287 68 : return false;
1288 : }
1289 48 : return true;
1290 : }
1291 :
1292 : /*
1293 : * Acquire lock on a table to be dumped by a worker process.
1294 : *
1295 : * The leader process is already holding an ACCESS SHARE lock. Ordinarily
1296 : * it's no problem for a worker to get one too, but if anything else besides
1297 : * pg_dump is running, there's a possible deadlock:
1298 : *
1299 : * 1) Leader dumps the schema and locks all tables in ACCESS SHARE mode.
1300 : * 2) Another process requests an ACCESS EXCLUSIVE lock (which is not granted
1301 : * because the leader holds a conflicting ACCESS SHARE lock).
1302 : * 3) A worker process also requests an ACCESS SHARE lock to read the table.
1303 : * The worker is enqueued behind the ACCESS EXCLUSIVE lock request.
1304 : * 4) Now we have a deadlock, since the leader is effectively waiting for
1305 : * the worker. The server cannot detect that, however.
1306 : *
1307 : * To prevent an infinite wait, prior to touching a table in a worker, request
1308 : * a lock in ACCESS SHARE mode but with NOWAIT. If we don't get the lock,
1309 : * then we know that somebody else has requested an ACCESS EXCLUSIVE lock and
1310 : * so we have a deadlock. We must fail the backup in that case.
1311 : */
1312 : static void
1313 1498 : lockTableForWorker(ArchiveHandle *AH, TocEntry *te)
1314 : {
1315 : const char *qualId;
1316 : PQExpBuffer query;
1317 : PGresult *res;
1318 :
1319 : /* Nothing to do for BLOBS */
1320 1498 : if (strcmp(te->desc, "BLOBS") == 0)
1321 14 : return;
1322 :
1323 1484 : query = createPQExpBuffer();
1324 :
1325 1484 : qualId = fmtQualifiedId(te->namespace, te->tag);
1326 :
1327 1484 : appendPQExpBuffer(query, "LOCK TABLE %s IN ACCESS SHARE MODE NOWAIT",
1328 : qualId);
1329 :
1330 1484 : res = PQexec(AH->connection, query->data);
1331 :
1332 1484 : if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
1333 0 : pg_fatal("could not obtain lock on relation \"%s\"\n"
1334 : "This usually means that someone requested an ACCESS EXCLUSIVE lock "
1335 : "on the table after the pg_dump parent process had gotten the "
1336 : "initial ACCESS SHARE lock on the table.", qualId);
1337 :
1338 1484 : PQclear(res);
1339 1484 : destroyPQExpBuffer(query);
1340 : }
1341 :
1342 : /*
1343 : * WaitForCommands: main routine for a worker process.
1344 : *
1345 : * Read and execute commands from the leader until we see EOF on the pipe.
1346 : */
1347 : static void
1348 60 : WaitForCommands(ArchiveHandle *AH, int pipefd[2])
1349 : {
1350 : char *command;
1351 : TocEntry *te;
1352 : T_Action act;
1353 60 : int status = 0;
1354 : char buf[256];
1355 :
1356 : for (;;)
1357 : {
1358 6096 : if (!(command = getMessageFromLeader(pipefd)))
1359 : {
1360 : /* EOF, so done */
1361 60 : return;
1362 : }
1363 :
1364 : /* Decode the command */
1365 6036 : parseWorkerCommand(AH, &te, &act, command);
1366 :
1367 6036 : if (act == ACT_DUMP)
1368 : {
1369 : /* Acquire lock on this table within the worker's session */
1370 1498 : lockTableForWorker(AH, te);
1371 :
1372 : /* Perform the dump command */
1373 1498 : status = (AH->WorkerJobDumpPtr) (AH, te);
1374 : }
1375 4538 : else if (act == ACT_RESTORE)
1376 : {
1377 : /* Perform the restore command */
1378 4538 : status = (AH->WorkerJobRestorePtr) (AH, te);
1379 : }
1380 : else
1381 : Assert(false);
1382 :
1383 : /* Return status to leader */
1384 6036 : buildWorkerResponse(AH, te, act, status, buf, sizeof(buf));
1385 :
1386 6036 : sendMessageToLeader(pipefd, buf);
1387 :
1388 : /* command was pg_malloc'd and we are responsible for free()ing it. */
1389 6036 : free(command);
1390 : }
1391 : }
1392 :
1393 : /*
1394 : * Check for status messages from workers.
1395 : *
1396 : * If do_wait is true, wait to get a status message; otherwise, just return
1397 : * immediately if there is none available.
1398 : *
1399 : * When we get a status message, we pass the status code to the callback
1400 : * function that was specified to DispatchJobForTocEntry, then reset the
1401 : * worker status to IDLE.
1402 : *
1403 : * Returns true if we collected a status message, else false.
1404 : *
1405 : * XXX is it worth checking for more than one status message per call?
1406 : * It seems somewhat unlikely that multiple workers would finish at exactly
1407 : * the same time.
1408 : */
1409 : static bool
1410 11854 : ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait)
1411 : {
1412 : int worker;
1413 : char *msg;
1414 :
1415 : /* Try to collect a status message */
1416 11854 : msg = getMessageFromWorker(pstate, do_wait, &worker);
1417 :
1418 11854 : if (!msg)
1419 : {
1420 : /* If do_wait is true, we must have detected EOF on some socket */
1421 5818 : if (do_wait)
1422 0 : pg_fatal("a worker process died unexpectedly");
1423 5818 : return false;
1424 : }
1425 :
1426 : /* Process it and update our idea of the worker's status */
1427 6036 : if (messageStartsWith(msg, "OK "))
1428 : {
1429 6036 : ParallelSlot *slot = &pstate->parallelSlot[worker];
1430 6036 : TocEntry *te = pstate->te[worker];
1431 : int status;
1432 :
1433 6036 : status = parseWorkerResponse(AH, te, msg);
1434 6036 : slot->callback(AH, te, status, slot->callback_data);
1435 6036 : slot->workerStatus = WRKR_IDLE;
1436 6036 : pstate->te[worker] = NULL;
1437 : }
1438 : else
1439 0 : pg_fatal("invalid message received from worker: \"%s\"",
1440 : msg);
1441 :
1442 : /* Free the string returned from getMessageFromWorker */
1443 6036 : free(msg);
1444 :
1445 6036 : return true;
1446 : }
1447 :
1448 : /*
1449 : * Check for status results from workers, waiting if necessary.
1450 : *
1451 : * Available wait modes are:
1452 : * WFW_NO_WAIT: reap any available status, but don't block
1453 : * WFW_GOT_STATUS: wait for at least one more worker to finish
1454 : * WFW_ONE_IDLE: wait for at least one worker to be idle
1455 : * WFW_ALL_IDLE: wait for all workers to be idle
1456 : *
1457 : * Any received results are passed to the callback specified to
1458 : * DispatchJobForTocEntry.
1459 : *
1460 : * This function is executed in the leader process.
1461 : */
1462 : void
1463 6054 : WaitForWorkers(ArchiveHandle *AH, ParallelState *pstate, WFW_WaitOption mode)
1464 : {
1465 6054 : bool do_wait = false;
1466 :
1467 : /*
1468 : * In GOT_STATUS mode, always block waiting for a message, since we can't
1469 : * return till we get something. In other modes, we don't block the first
1470 : * time through the loop.
1471 : */
1472 6054 : if (mode == WFW_GOT_STATUS)
1473 : {
1474 : /* Assert that caller knows what it's doing */
1475 : Assert(!IsEveryWorkerIdle(pstate));
1476 34 : do_wait = true;
1477 : }
1478 :
1479 : for (;;)
1480 : {
1481 : /*
1482 : * Check for status messages, even if we don't need to block. We do
1483 : * not try very hard to reap all available messages, though, since
1484 : * there's unlikely to be more than one.
1485 : */
1486 11854 : if (ListenToWorkers(AH, pstate, do_wait))
1487 : {
1488 : /*
1489 : * If we got a message, we are done by definition for GOT_STATUS
1490 : * mode, and we can also be certain that there's at least one idle
1491 : * worker. So we're done in all but ALL_IDLE mode.
1492 : */
1493 6036 : if (mode != WFW_ALL_IDLE)
1494 6002 : return;
1495 : }
1496 :
1497 : /* Check whether we must wait for new status messages */
1498 5852 : switch (mode)
1499 : {
1500 0 : case WFW_NO_WAIT:
1501 0 : return; /* never wait */
1502 0 : case WFW_GOT_STATUS:
1503 : Assert(false); /* can't get here, because we waited */
1504 0 : break;
1505 5800 : case WFW_ONE_IDLE:
1506 5800 : if (GetIdleWorker(pstate) != NO_SLOT)
1507 34 : return;
1508 5766 : break;
1509 52 : case WFW_ALL_IDLE:
1510 52 : if (IsEveryWorkerIdle(pstate))
1511 18 : return;
1512 34 : break;
1513 : }
1514 :
1515 : /* Loop back, and this time wait for something to happen */
1516 5800 : do_wait = true;
1517 : }
1518 : }
1519 :
1520 : /*
1521 : * Read one command message from the leader, blocking if necessary
1522 : * until one is available, and return it as a malloc'd string.
1523 : * On EOF, return NULL.
1524 : *
1525 : * This function is executed in worker processes.
1526 : */
1527 : static char *
1528 6096 : getMessageFromLeader(int pipefd[2])
1529 : {
1530 6096 : return readMessageFromPipe(pipefd[PIPE_READ]);
1531 : }
1532 :
1533 : /*
1534 : * Send a status message to the leader.
1535 : *
1536 : * This function is executed in worker processes.
1537 : */
1538 : static void
1539 6036 : sendMessageToLeader(int pipefd[2], const char *str)
1540 : {
1541 6036 : int len = strlen(str) + 1;
1542 :
1543 6036 : if (pipewrite(pipefd[PIPE_WRITE], str, len) != len)
1544 0 : pg_fatal("could not write to the communication channel: %m");
1545 6036 : }
1546 :
1547 : /*
1548 : * Wait until some descriptor in "workerset" becomes readable.
1549 : * Returns -1 on error, else the number of readable descriptors.
1550 : */
1551 : static int
1552 5834 : select_loop(int maxFd, fd_set *workerset)
1553 : {
1554 : int i;
1555 5834 : fd_set saveSet = *workerset;
1556 :
1557 : for (;;)
1558 : {
1559 5834 : *workerset = saveSet;
1560 5834 : i = select(maxFd + 1, workerset, NULL, NULL, NULL);
1561 :
1562 : #ifndef WIN32
1563 5834 : if (i < 0 && errno == EINTR)
1564 0 : continue;
1565 : #else
1566 : if (i == SOCKET_ERROR && WSAGetLastError() == WSAEINTR)
1567 : continue;
1568 : #endif
1569 5834 : break;
1570 : }
1571 :
1572 5834 : return i;
1573 : }
1574 :
1575 :
1576 : /*
1577 : * Check for messages from worker processes.
1578 : *
1579 : * If a message is available, return it as a malloc'd string, and put the
1580 : * index of the sending worker in *worker.
1581 : *
1582 : * If nothing is available, wait if "do_wait" is true, else return NULL.
1583 : *
1584 : * If we detect EOF on any socket, we'll return NULL. It's not great that
1585 : * that's hard to distinguish from the no-data-available case, but for now
1586 : * our one caller is okay with that.
1587 : *
1588 : * This function is executed in the leader process.
1589 : */
1590 : static char *
1591 11854 : getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker)
1592 : {
1593 : int i;
1594 : fd_set workerset;
1595 11854 : int maxFd = -1;
1596 11854 : struct timeval nowait = {0, 0};
1597 :
1598 : /* construct bitmap of socket descriptors for select() */
1599 11854 : FD_ZERO(&workerset);
1600 35876 : for (i = 0; i < pstate->numWorkers; i++)
1601 : {
1602 24022 : if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
1603 0 : continue;
1604 24022 : FD_SET(pstate->parallelSlot[i].pipeRead, &workerset);
1605 24022 : if (pstate->parallelSlot[i].pipeRead > maxFd)
1606 24022 : maxFd = pstate->parallelSlot[i].pipeRead;
1607 : }
1608 :
1609 11854 : if (do_wait)
1610 : {
1611 5834 : i = select_loop(maxFd, &workerset);
1612 : Assert(i != 0);
1613 : }
1614 : else
1615 : {
1616 6020 : if ((i = select(maxFd + 1, &workerset, NULL, NULL, &nowait)) == 0)
1617 5818 : return NULL;
1618 : }
1619 :
1620 6036 : if (i < 0)
1621 0 : pg_fatal("%s() failed: %m", "select");
1622 :
1623 9180 : for (i = 0; i < pstate->numWorkers; i++)
1624 : {
1625 : char *msg;
1626 :
1627 9180 : if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
1628 0 : continue;
1629 9180 : if (!FD_ISSET(pstate->parallelSlot[i].pipeRead, &workerset))
1630 3144 : continue;
1631 :
1632 : /*
1633 : * Read the message if any. If the socket is ready because of EOF,
1634 : * we'll return NULL instead (and the socket will stay ready, so the
1635 : * condition will persist).
1636 : *
1637 : * Note: because this is a blocking read, we'll wait if only part of
1638 : * the message is available. Waiting a long time would be bad, but
1639 : * since worker status messages are short and are always sent in one
1640 : * operation, it shouldn't be a problem in practice.
1641 : */
1642 6036 : msg = readMessageFromPipe(pstate->parallelSlot[i].pipeRead);
1643 6036 : *worker = i;
1644 6036 : return msg;
1645 : }
1646 : Assert(false);
1647 0 : return NULL;
1648 : }
1649 :
1650 : /*
1651 : * Send a command message to the specified worker process.
1652 : *
1653 : * This function is executed in the leader process.
1654 : */
1655 : static void
1656 6036 : sendMessageToWorker(ParallelState *pstate, int worker, const char *str)
1657 : {
1658 6036 : int len = strlen(str) + 1;
1659 :
1660 6036 : if (pipewrite(pstate->parallelSlot[worker].pipeWrite, str, len) != len)
1661 : {
1662 0 : pg_fatal("could not write to the communication channel: %m");
1663 : }
1664 6036 : }
1665 :
1666 : /*
1667 : * Read one message from the specified pipe (fd), blocking if necessary
1668 : * until one is available, and return it as a malloc'd string.
1669 : * On EOF, return NULL.
1670 : *
1671 : * A "message" on the channel is just a null-terminated string.
1672 : */
1673 : static char *
1674 12132 : readMessageFromPipe(int fd)
1675 : {
1676 : char *msg;
1677 : int msgsize,
1678 : bufsize;
1679 : int ret;
1680 :
1681 : /*
1682 : * In theory, if we let piperead() read multiple bytes, it might give us
1683 : * back fragments of multiple messages. (That can't actually occur, since
1684 : * neither leader nor workers send more than one message without waiting
1685 : * for a reply, but we don't wish to assume that here.) For simplicity,
1686 : * read a byte at a time until we get the terminating '\0'. This method
1687 : * is a bit inefficient, but since this is only used for relatively short
1688 : * command and status strings, it shouldn't matter.
1689 : */
1690 12132 : bufsize = 64; /* could be any number */
1691 12132 : msg = (char *) pg_malloc(bufsize);
1692 12132 : msgsize = 0;
1693 : for (;;)
1694 : {
1695 138526 : Assert(msgsize < bufsize);
1696 150658 : ret = piperead(fd, msg + msgsize, 1);
1697 150658 : if (ret <= 0)
1698 60 : break; /* error or connection closure */
1699 :
1700 : Assert(ret == 1);
1701 :
1702 150598 : if (msg[msgsize] == '\0')
1703 12072 : return msg; /* collected whole message */
1704 :
1705 138526 : msgsize++;
1706 138526 : if (msgsize == bufsize) /* enlarge buffer if needed */
1707 : {
1708 0 : bufsize += 16; /* could be any number */
1709 0 : msg = (char *) pg_realloc(msg, bufsize);
1710 : }
1711 : }
1712 :
1713 : /* Other end has closed the connection */
1714 60 : pg_free(msg);
1715 60 : return NULL;
1716 : }
1717 :
1718 : #ifdef WIN32
1719 :
1720 : /*
1721 : * This is a replacement version of pipe(2) for Windows which allows the pipe
1722 : * handles to be used in select().
1723 : *
1724 : * Reads and writes on the pipe must go through piperead()/pipewrite().
1725 : *
1726 : * For consistency with Unix we declare the returned handles as "int".
1727 : * This is okay even on WIN64 because system handles are not more than
1728 : * 32 bits wide, but we do have to do some casting.
1729 : */
1730 : static int
1731 : pgpipe(int handles[2])
1732 : {
1733 : pgsocket s,
1734 : tmp_sock;
1735 : struct sockaddr_in serv_addr;
1736 : int len = sizeof(serv_addr);
1737 :
1738 : /* We have to use the Unix socket invalid file descriptor value here. */
1739 : handles[0] = handles[1] = -1;
1740 :
1741 : /*
1742 : * setup listen socket
1743 : */
1744 : if ((s = socket(AF_INET, SOCK_STREAM, 0)) == PGINVALID_SOCKET)
1745 : {
1746 : pg_log_error("pgpipe: could not create socket: error code %d",
1747 : WSAGetLastError());
1748 : return -1;
1749 : }
1750 :
1751 : memset(&serv_addr, 0, sizeof(serv_addr));
1752 : serv_addr.sin_family = AF_INET;
1753 : serv_addr.sin_port = pg_hton16(0);
1754 : serv_addr.sin_addr.s_addr = pg_hton32(INADDR_LOOPBACK);
1755 : if (bind(s, (SOCKADDR *) &serv_addr, len) == SOCKET_ERROR)
1756 : {
1757 : pg_log_error("pgpipe: could not bind: error code %d",
1758 : WSAGetLastError());
1759 : closesocket(s);
1760 : return -1;
1761 : }
1762 : if (listen(s, 1) == SOCKET_ERROR)
1763 : {
1764 : pg_log_error("pgpipe: could not listen: error code %d",
1765 : WSAGetLastError());
1766 : closesocket(s);
1767 : return -1;
1768 : }
1769 : if (getsockname(s, (SOCKADDR *) &serv_addr, &len) == SOCKET_ERROR)
1770 : {
1771 : pg_log_error("pgpipe: %s() failed: error code %d", "getsockname",
1772 : WSAGetLastError());
1773 : closesocket(s);
1774 : return -1;
1775 : }
1776 :
1777 : /*
1778 : * setup pipe handles
1779 : */
1780 : if ((tmp_sock = socket(AF_INET, SOCK_STREAM, 0)) == PGINVALID_SOCKET)
1781 : {
1782 : pg_log_error("pgpipe: could not create second socket: error code %d",
1783 : WSAGetLastError());
1784 : closesocket(s);
1785 : return -1;
1786 : }
1787 : handles[1] = (int) tmp_sock;
1788 :
1789 : if (connect(handles[1], (SOCKADDR *) &serv_addr, len) == SOCKET_ERROR)
1790 : {
1791 : pg_log_error("pgpipe: could not connect socket: error code %d",
1792 : WSAGetLastError());
1793 : closesocket(handles[1]);
1794 : handles[1] = -1;
1795 : closesocket(s);
1796 : return -1;
1797 : }
1798 : if ((tmp_sock = accept(s, (SOCKADDR *) &serv_addr, &len)) == PGINVALID_SOCKET)
1799 : {
1800 : pg_log_error("pgpipe: could not accept connection: error code %d",
1801 : WSAGetLastError());
1802 : closesocket(handles[1]);
1803 : handles[1] = -1;
1804 : closesocket(s);
1805 : return -1;
1806 : }
1807 : handles[0] = (int) tmp_sock;
1808 :
1809 : closesocket(s);
1810 : return 0;
1811 : }
1812 :
1813 : #endif /* WIN32 */
|