Line data Source code
1 : /*
2 : * parallel.c
3 : *
4 : * multi-process support
5 : *
6 : * Copyright (c) 2010-2024, PostgreSQL Global Development Group
7 : * src/bin/pg_upgrade/parallel.c
8 : */
9 :
10 : #include "postgres_fe.h"
11 :
12 : #include <sys/wait.h>
13 : #ifdef WIN32
14 : #include <io.h>
15 : #endif
16 :
17 : #include "pg_upgrade.h"
18 :
19 : static int parallel_jobs;
20 :
21 : #ifdef WIN32
22 : /*
23 : * Array holding all active threads. There can't be any gaps/zeros so
24 : * it can be passed to WaitForMultipleObjects(). We use two arrays
25 : * so the thread_handles array can be passed to WaitForMultipleObjects().
26 : */
27 : static HANDLE *thread_handles;
28 :
29 : typedef struct
30 : {
31 : char *log_file;
32 : char *opt_log_file;
33 : char *cmd;
34 : } exec_thread_arg;
35 :
36 : typedef struct
37 : {
38 : DbInfoArr *old_db_arr;
39 : DbInfoArr *new_db_arr;
40 : char *old_pgdata;
41 : char *new_pgdata;
42 : char *old_tablespace;
43 : } transfer_thread_arg;
44 :
45 : static exec_thread_arg **exec_thread_args;
46 : static transfer_thread_arg **transfer_thread_args;
47 :
48 : /* track current thread_args struct so reap_child() can be used for all cases */
49 : static void **cur_thread_args;
50 :
51 : DWORD win32_exec_prog(exec_thread_arg *args);
52 : DWORD win32_transfer_all_new_dbs(transfer_thread_arg *args);
53 : #endif
54 :
55 : /*
56 : * parallel_exec_prog
57 : *
58 : * This has the same API as exec_prog, except it does parallel execution,
59 : * and therefore must throw errors and doesn't return an error status.
60 : */
61 : void
62 38 : parallel_exec_prog(const char *log_file, const char *opt_log_file,
63 : const char *fmt,...)
64 : {
65 : va_list args;
66 : char cmd[MAX_STRING];
67 :
68 : #ifndef WIN32
69 : pid_t child;
70 : #else
71 : HANDLE child;
72 : exec_thread_arg *new_arg;
73 : #endif
74 :
75 38 : va_start(args, fmt);
76 38 : vsnprintf(cmd, sizeof(cmd), fmt, args);
77 38 : va_end(args);
78 :
79 38 : if (user_opts.jobs <= 1)
80 : /* exit_on_error must be true to allow jobs */
81 38 : exec_prog(log_file, opt_log_file, true, true, "%s", cmd);
82 : else
83 : {
84 : /* parallel */
85 : #ifdef WIN32
86 : if (thread_handles == NULL)
87 : thread_handles = pg_malloc(user_opts.jobs * sizeof(HANDLE));
88 :
89 : if (exec_thread_args == NULL)
90 : {
91 : int i;
92 :
93 : exec_thread_args = pg_malloc(user_opts.jobs * sizeof(exec_thread_arg *));
94 :
95 : /*
96 : * For safety and performance, we keep the args allocated during
97 : * the entire life of the process, and we don't free the args in a
98 : * thread different from the one that allocated it.
99 : */
100 : for (i = 0; i < user_opts.jobs; i++)
101 : exec_thread_args[i] = pg_malloc0(sizeof(exec_thread_arg));
102 : }
103 :
104 : cur_thread_args = (void **) exec_thread_args;
105 : #endif
106 : /* harvest any dead children */
107 0 : while (reap_child(false) == true)
108 : ;
109 :
110 : /* must we wait for a dead child? */
111 0 : if (parallel_jobs >= user_opts.jobs)
112 0 : reap_child(true);
113 :
114 : /* set this before we start the job */
115 0 : parallel_jobs++;
116 :
117 : /* Ensure stdio state is quiesced before forking */
118 0 : fflush(NULL);
119 :
120 : #ifndef WIN32
121 0 : child = fork();
122 0 : if (child == 0)
123 : /* use _exit to skip atexit() functions */
124 0 : _exit(!exec_prog(log_file, opt_log_file, true, true, "%s", cmd));
125 0 : else if (child < 0)
126 : /* fork failed */
127 0 : pg_fatal("could not create worker process: %m");
128 : #else
129 : /* empty array element are always at the end */
130 : new_arg = exec_thread_args[parallel_jobs - 1];
131 :
132 : /* Can only pass one pointer into the function, so use a struct */
133 : pg_free(new_arg->log_file);
134 : new_arg->log_file = pg_strdup(log_file);
135 : pg_free(new_arg->opt_log_file);
136 : new_arg->opt_log_file = opt_log_file ? pg_strdup(opt_log_file) : NULL;
137 : pg_free(new_arg->cmd);
138 : new_arg->cmd = pg_strdup(cmd);
139 :
140 : child = (HANDLE) _beginthreadex(NULL, 0, (void *) win32_exec_prog,
141 : new_arg, 0, NULL);
142 : if (child == 0)
143 : pg_fatal("could not create worker thread: %m");
144 :
145 : thread_handles[parallel_jobs - 1] = child;
146 : #endif
147 : }
148 38 : }
149 :
150 :
151 : #ifdef WIN32
152 : DWORD
153 : win32_exec_prog(exec_thread_arg *args)
154 : {
155 : int ret;
156 :
157 : ret = !exec_prog(args->log_file, args->opt_log_file, true, true, "%s", args->cmd);
158 :
159 : /* terminates thread */
160 : return ret;
161 : }
162 : #endif
163 :
164 :
165 : /*
166 : * parallel_transfer_all_new_dbs
167 : *
168 : * This has the same API as transfer_all_new_dbs, except it does parallel execution
169 : * by transferring multiple tablespaces in parallel
170 : */
171 : void
172 6 : parallel_transfer_all_new_dbs(DbInfoArr *old_db_arr, DbInfoArr *new_db_arr,
173 : char *old_pgdata, char *new_pgdata,
174 : char *old_tablespace)
175 : {
176 : #ifndef WIN32
177 : pid_t child;
178 : #else
179 : HANDLE child;
180 : transfer_thread_arg *new_arg;
181 : #endif
182 :
183 6 : if (user_opts.jobs <= 1)
184 6 : transfer_all_new_dbs(old_db_arr, new_db_arr, old_pgdata, new_pgdata, NULL);
185 : else
186 : {
187 : /* parallel */
188 : #ifdef WIN32
189 : if (thread_handles == NULL)
190 : thread_handles = pg_malloc(user_opts.jobs * sizeof(HANDLE));
191 :
192 : if (transfer_thread_args == NULL)
193 : {
194 : int i;
195 :
196 : transfer_thread_args = pg_malloc(user_opts.jobs * sizeof(transfer_thread_arg *));
197 :
198 : /*
199 : * For safety and performance, we keep the args allocated during
200 : * the entire life of the process, and we don't free the args in a
201 : * thread different from the one that allocated it.
202 : */
203 : for (i = 0; i < user_opts.jobs; i++)
204 : transfer_thread_args[i] = pg_malloc0(sizeof(transfer_thread_arg));
205 : }
206 :
207 : cur_thread_args = (void **) transfer_thread_args;
208 : #endif
209 : /* harvest any dead children */
210 0 : while (reap_child(false) == true)
211 : ;
212 :
213 : /* must we wait for a dead child? */
214 0 : if (parallel_jobs >= user_opts.jobs)
215 0 : reap_child(true);
216 :
217 : /* set this before we start the job */
218 0 : parallel_jobs++;
219 :
220 : /* Ensure stdio state is quiesced before forking */
221 0 : fflush(NULL);
222 :
223 : #ifndef WIN32
224 0 : child = fork();
225 0 : if (child == 0)
226 : {
227 0 : transfer_all_new_dbs(old_db_arr, new_db_arr, old_pgdata, new_pgdata,
228 : old_tablespace);
229 : /* if we take another exit path, it will be non-zero */
230 : /* use _exit to skip atexit() functions */
231 0 : _exit(0);
232 : }
233 0 : else if (child < 0)
234 : /* fork failed */
235 0 : pg_fatal("could not create worker process: %m");
236 : #else
237 : /* empty array element are always at the end */
238 : new_arg = transfer_thread_args[parallel_jobs - 1];
239 :
240 : /* Can only pass one pointer into the function, so use a struct */
241 : new_arg->old_db_arr = old_db_arr;
242 : new_arg->new_db_arr = new_db_arr;
243 : pg_free(new_arg->old_pgdata);
244 : new_arg->old_pgdata = pg_strdup(old_pgdata);
245 : pg_free(new_arg->new_pgdata);
246 : new_arg->new_pgdata = pg_strdup(new_pgdata);
247 : pg_free(new_arg->old_tablespace);
248 : new_arg->old_tablespace = old_tablespace ? pg_strdup(old_tablespace) : NULL;
249 :
250 : child = (HANDLE) _beginthreadex(NULL, 0, (void *) win32_transfer_all_new_dbs,
251 : new_arg, 0, NULL);
252 : if (child == 0)
253 : pg_fatal("could not create worker thread: %m");
254 :
255 : thread_handles[parallel_jobs - 1] = child;
256 : #endif
257 : }
258 6 : }
259 :
260 :
261 : #ifdef WIN32
262 : DWORD
263 : win32_transfer_all_new_dbs(transfer_thread_arg *args)
264 : {
265 : transfer_all_new_dbs(args->old_db_arr, args->new_db_arr, args->old_pgdata,
266 : args->new_pgdata, args->old_tablespace);
267 :
268 : /* terminates thread */
269 : return 0;
270 : }
271 : #endif
272 :
273 :
274 : /*
275 : * collect status from a completed worker child
276 : */
277 : bool
278 14 : reap_child(bool wait_for_child)
279 : {
280 : #ifndef WIN32
281 : int work_status;
282 : pid_t child;
283 : #else
284 : int thread_num;
285 : DWORD res;
286 : #endif
287 :
288 14 : if (user_opts.jobs <= 1 || parallel_jobs == 0)
289 14 : return false;
290 :
291 : #ifndef WIN32
292 0 : child = waitpid(-1, &work_status, wait_for_child ? 0 : WNOHANG);
293 0 : if (child == (pid_t) -1)
294 0 : pg_fatal("%s() failed: %m", "waitpid");
295 0 : if (child == 0)
296 0 : return false; /* no children, or no dead children */
297 0 : if (work_status != 0)
298 0 : pg_fatal("child process exited abnormally: status %d", work_status);
299 : #else
300 : /* wait for one to finish */
301 : thread_num = WaitForMultipleObjects(parallel_jobs, thread_handles,
302 : false, wait_for_child ? INFINITE : 0);
303 :
304 : if (thread_num == WAIT_TIMEOUT || thread_num == WAIT_FAILED)
305 : return false;
306 :
307 : /* compute thread index in active_threads */
308 : thread_num -= WAIT_OBJECT_0;
309 :
310 : /* get the result */
311 : GetExitCodeThread(thread_handles[thread_num], &res);
312 : if (res != 0)
313 : pg_fatal("child worker exited abnormally: %m");
314 :
315 : /* dispose of handle to stop leaks */
316 : CloseHandle(thread_handles[thread_num]);
317 :
318 : /* Move last slot into dead child's position */
319 : if (thread_num != parallel_jobs - 1)
320 : {
321 : void *tmp_args;
322 :
323 : thread_handles[thread_num] = thread_handles[parallel_jobs - 1];
324 :
325 : /*
326 : * Move last active thread arg struct into the now-dead slot, and the
327 : * now-dead slot to the end for reuse by the next thread. Though the
328 : * thread struct is in use by another thread, we can safely swap the
329 : * struct pointers within the array.
330 : */
331 : tmp_args = cur_thread_args[thread_num];
332 : cur_thread_args[thread_num] = cur_thread_args[parallel_jobs - 1];
333 : cur_thread_args[parallel_jobs - 1] = tmp_args;
334 : }
335 : #endif
336 :
337 : /* do this after job has been removed */
338 0 : parallel_jobs--;
339 :
340 0 : return true;
341 : }
|