LCOV - code coverage report
Current view: top level - src/bin/pg_upgrade - parallel.c (source / functions) Hit Total Coverage
Test: PostgreSQL 13devel Lines: 14 44 31.8 %
Date: 2019-11-21 13:06:38 Functions: 3 3 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :  *  parallel.c
       3             :  *
       4             :  *  multi-process support
       5             :  *
       6             :  *  Copyright (c) 2010-2019, PostgreSQL Global Development Group
       7             :  *  src/bin/pg_upgrade/parallel.c
       8             :  */
       9             : 
      10             : #include "postgres_fe.h"
      11             : 
      12             : #include <sys/wait.h>
      13             : #ifdef WIN32
      14             : #include <io.h>
      15             : #endif
      16             : 
      17             : #include "pg_upgrade.h"
      18             : 
      19             : static int  parallel_jobs;
      20             : 
      21             : #ifdef WIN32
      22             : /*
      23             :  *  Array holding all active threads.  There can't be any gaps/zeros so
      24             :  *  it can be passed to WaitForMultipleObjects().  We use two arrays
      25             :  *  so the thread_handles array can be passed to WaitForMultipleObjects().
      26             :  */
      27             : HANDLE     *thread_handles;
      28             : 
      29             : typedef struct
      30             : {
      31             :     char       *log_file;
      32             :     char       *opt_log_file;
      33             :     char       *cmd;
      34             : } exec_thread_arg;
      35             : 
      36             : typedef struct
      37             : {
      38             :     DbInfoArr  *old_db_arr;
      39             :     DbInfoArr  *new_db_arr;
      40             :     char       *old_pgdata;
      41             :     char       *new_pgdata;
      42             :     char       *old_tablespace;
      43             : } transfer_thread_arg;
      44             : 
      45             : exec_thread_arg **exec_thread_args;
      46             : transfer_thread_arg **transfer_thread_args;
      47             : 
      48             : /* track current thread_args struct so reap_child() can be used for all cases */
      49             : void      **cur_thread_args;
      50             : 
      51             : DWORD       win32_exec_prog(exec_thread_arg *args);
      52             : DWORD       win32_transfer_all_new_dbs(transfer_thread_arg *args);
      53             : #endif
      54             : 
      55             : /*
      56             :  *  parallel_exec_prog
      57             :  *
      58             :  *  This has the same API as exec_prog, except it does parallel execution,
      59             :  *  and therefore must throw errors and doesn't return an error status.
      60             :  */
      61             : void
      62          22 : parallel_exec_prog(const char *log_file, const char *opt_log_file,
      63             :                    const char *fmt,...)
      64             : {
      65             :     va_list     args;
      66             :     char        cmd[MAX_STRING];
      67             : 
      68             : #ifndef WIN32
      69             :     pid_t       child;
      70             : #else
      71             :     HANDLE      child;
      72             :     exec_thread_arg *new_arg;
      73             : #endif
      74             : 
      75          22 :     va_start(args, fmt);
      76          22 :     vsnprintf(cmd, sizeof(cmd), fmt, args);
      77          22 :     va_end(args);
      78             : 
      79          22 :     if (user_opts.jobs <= 1)
      80             :         /* exit_on_error must be true to allow jobs */
      81          22 :         exec_prog(log_file, opt_log_file, true, true, "%s", cmd);
      82             :     else
      83             :     {
      84             :         /* parallel */
      85             : #ifdef WIN32
      86             :         if (thread_handles == NULL)
      87             :             thread_handles = pg_malloc(user_opts.jobs * sizeof(HANDLE));
      88             : 
      89             :         if (exec_thread_args == NULL)
      90             :         {
      91             :             int         i;
      92             : 
      93             :             exec_thread_args = pg_malloc(user_opts.jobs * sizeof(exec_thread_arg *));
      94             : 
      95             :             /*
      96             :              * For safety and performance, we keep the args allocated during
      97             :              * the entire life of the process, and we don't free the args in a
      98             :              * thread different from the one that allocated it.
      99             :              */
     100             :             for (i = 0; i < user_opts.jobs; i++)
     101             :                 exec_thread_args[i] = pg_malloc0(sizeof(exec_thread_arg));
     102             :         }
     103             : 
     104             :         cur_thread_args = (void **) exec_thread_args;
     105             : #endif
     106             :         /* harvest any dead children */
     107           0 :         while (reap_child(false) == true)
     108             :             ;
     109             : 
     110             :         /* must we wait for a dead child? */
     111           0 :         if (parallel_jobs >= user_opts.jobs)
     112           0 :             reap_child(true);
     113             : 
     114             :         /* set this before we start the job */
     115           0 :         parallel_jobs++;
     116             : 
     117             :         /* Ensure stdio state is quiesced before forking */
     118           0 :         fflush(NULL);
     119             : 
     120             : #ifndef WIN32
     121           0 :         child = fork();
     122           0 :         if (child == 0)
     123             :             /* use _exit to skip atexit() functions */
     124           0 :             _exit(!exec_prog(log_file, opt_log_file, true, true, "%s", cmd));
     125           0 :         else if (child < 0)
     126             :             /* fork failed */
     127           0 :             pg_fatal("could not create worker process: %s\n", strerror(errno));
     128             : #else
     129             :         /* empty array element are always at the end */
     130             :         new_arg = exec_thread_args[parallel_jobs - 1];
     131             : 
     132             :         /* Can only pass one pointer into the function, so use a struct */
     133             :         if (new_arg->log_file)
     134             :             pg_free(new_arg->log_file);
     135             :         new_arg->log_file = pg_strdup(log_file);
     136             :         if (new_arg->opt_log_file)
     137             :             pg_free(new_arg->opt_log_file);
     138             :         new_arg->opt_log_file = opt_log_file ? pg_strdup(opt_log_file) : NULL;
     139             :         if (new_arg->cmd)
     140             :             pg_free(new_arg->cmd);
     141             :         new_arg->cmd = pg_strdup(cmd);
     142             : 
     143             :         child = (HANDLE) _beginthreadex(NULL, 0, (void *) win32_exec_prog,
     144             :                                         new_arg, 0, NULL);
     145             :         if (child == 0)
     146             :             pg_fatal("could not create worker thread: %s\n", strerror(errno));
     147             : 
     148             :         thread_handles[parallel_jobs - 1] = child;
     149             : #endif
     150             :     }
     151             : 
     152          22 :     return;
     153             : }
     154             : 
     155             : 
     156             : #ifdef WIN32
     157             : DWORD
     158             : win32_exec_prog(exec_thread_arg *args)
     159             : {
     160             :     int         ret;
     161             : 
     162             :     ret = !exec_prog(args->log_file, args->opt_log_file, true, true, "%s", args->cmd);
     163             : 
     164             :     /* terminates thread */
     165             :     return ret;
     166             : }
     167             : #endif
     168             : 
     169             : 
     170             : /*
     171             :  *  parallel_transfer_all_new_dbs
     172             :  *
     173             :  *  This has the same API as transfer_all_new_dbs, except it does parallel execution
     174             :  *  by transferring multiple tablespaces in parallel
     175             :  */
     176             : void
     177           2 : parallel_transfer_all_new_dbs(DbInfoArr *old_db_arr, DbInfoArr *new_db_arr,
     178             :                               char *old_pgdata, char *new_pgdata,
     179             :                               char *old_tablespace)
     180             : {
     181             : #ifndef WIN32
     182             :     pid_t       child;
     183             : #else
     184             :     HANDLE      child;
     185             :     transfer_thread_arg *new_arg;
     186             : #endif
     187             : 
     188           2 :     if (user_opts.jobs <= 1)
     189           2 :         transfer_all_new_dbs(old_db_arr, new_db_arr, old_pgdata, new_pgdata, NULL);
     190             :     else
     191             :     {
     192             :         /* parallel */
     193             : #ifdef WIN32
     194             :         if (thread_handles == NULL)
     195             :             thread_handles = pg_malloc(user_opts.jobs * sizeof(HANDLE));
     196             : 
     197             :         if (transfer_thread_args == NULL)
     198             :         {
     199             :             int         i;
     200             : 
     201             :             transfer_thread_args = pg_malloc(user_opts.jobs * sizeof(transfer_thread_arg *));
     202             : 
     203             :             /*
     204             :              * For safety and performance, we keep the args allocated during
     205             :              * the entire life of the process, and we don't free the args in a
     206             :              * thread different from the one that allocated it.
     207             :              */
     208             :             for (i = 0; i < user_opts.jobs; i++)
     209             :                 transfer_thread_args[i] = pg_malloc0(sizeof(transfer_thread_arg));
     210             :         }
     211             : 
     212             :         cur_thread_args = (void **) transfer_thread_args;
     213             : #endif
     214             :         /* harvest any dead children */
     215           0 :         while (reap_child(false) == true)
     216             :             ;
     217             : 
     218             :         /* must we wait for a dead child? */
     219           0 :         if (parallel_jobs >= user_opts.jobs)
     220           0 :             reap_child(true);
     221             : 
     222             :         /* set this before we start the job */
     223           0 :         parallel_jobs++;
     224             : 
     225             :         /* Ensure stdio state is quiesced before forking */
     226           0 :         fflush(NULL);
     227             : 
     228             : #ifndef WIN32
     229           0 :         child = fork();
     230           0 :         if (child == 0)
     231             :         {
     232           0 :             transfer_all_new_dbs(old_db_arr, new_db_arr, old_pgdata, new_pgdata,
     233             :                                  old_tablespace);
     234             :             /* if we take another exit path, it will be non-zero */
     235             :             /* use _exit to skip atexit() functions */
     236           0 :             _exit(0);
     237             :         }
     238           0 :         else if (child < 0)
     239             :             /* fork failed */
     240           0 :             pg_fatal("could not create worker process: %s\n", strerror(errno));
     241             : #else
     242             :         /* empty array element are always at the end */
     243             :         new_arg = transfer_thread_args[parallel_jobs - 1];
     244             : 
     245             :         /* Can only pass one pointer into the function, so use a struct */
     246             :         new_arg->old_db_arr = old_db_arr;
     247             :         new_arg->new_db_arr = new_db_arr;
     248             :         if (new_arg->old_pgdata)
     249             :             pg_free(new_arg->old_pgdata);
     250             :         new_arg->old_pgdata = pg_strdup(old_pgdata);
     251             :         if (new_arg->new_pgdata)
     252             :             pg_free(new_arg->new_pgdata);
     253             :         new_arg->new_pgdata = pg_strdup(new_pgdata);
     254             :         if (new_arg->old_tablespace)
     255             :             pg_free(new_arg->old_tablespace);
     256             :         new_arg->old_tablespace = old_tablespace ? pg_strdup(old_tablespace) : NULL;
     257             : 
     258             :         child = (HANDLE) _beginthreadex(NULL, 0, (void *) win32_transfer_all_new_dbs,
     259             :                                         new_arg, 0, NULL);
     260             :         if (child == 0)
     261             :             pg_fatal("could not create worker thread: %s\n", strerror(errno));
     262             : 
     263             :         thread_handles[parallel_jobs - 1] = child;
     264             : #endif
     265             :     }
     266             : 
     267           2 :     return;
     268             : }
     269             : 
     270             : 
     271             : #ifdef WIN32
     272             : DWORD
     273             : win32_transfer_all_new_dbs(transfer_thread_arg *args)
     274             : {
     275             :     transfer_all_new_dbs(args->old_db_arr, args->new_db_arr, args->old_pgdata,
     276             :                          args->new_pgdata, args->old_tablespace);
     277             : 
     278             :     /* terminates thread */
     279             :     return 0;
     280             : }
     281             : #endif
     282             : 
     283             : 
     284             : /*
     285             :  *  collect status from a completed worker child
     286             :  */
     287             : bool
     288           4 : reap_child(bool wait_for_child)
     289             : {
     290             : #ifndef WIN32
     291             :     int         work_status;
     292             :     pid_t       child;
     293             : #else
     294             :     int         thread_num;
     295             :     DWORD       res;
     296             : #endif
     297             : 
     298           4 :     if (user_opts.jobs <= 1 || parallel_jobs == 0)
     299           4 :         return false;
     300             : 
     301             : #ifndef WIN32
     302           0 :     child = waitpid(-1, &work_status, wait_for_child ? 0 : WNOHANG);
     303           0 :     if (child == (pid_t) -1)
     304           0 :         pg_fatal("waitpid() failed: %s\n", strerror(errno));
     305           0 :     if (child == 0)
     306           0 :         return false;           /* no children, or no dead children */
     307           0 :     if (work_status != 0)
     308           0 :         pg_fatal("child process exited abnormally: status %d\n", work_status);
     309             : #else
     310             :     /* wait for one to finish */
     311             :     thread_num = WaitForMultipleObjects(parallel_jobs, thread_handles,
     312             :                                         false, wait_for_child ? INFINITE : 0);
     313             : 
     314             :     if (thread_num == WAIT_TIMEOUT || thread_num == WAIT_FAILED)
     315             :         return false;
     316             : 
     317             :     /* compute thread index in active_threads */
     318             :     thread_num -= WAIT_OBJECT_0;
     319             : 
     320             :     /* get the result */
     321             :     GetExitCodeThread(thread_handles[thread_num], &res);
     322             :     if (res != 0)
     323             :         pg_fatal("child worker exited abnormally: %s\n", strerror(errno));
     324             : 
     325             :     /* dispose of handle to stop leaks */
     326             :     CloseHandle(thread_handles[thread_num]);
     327             : 
     328             :     /* Move last slot into dead child's position */
     329             :     if (thread_num != parallel_jobs - 1)
     330             :     {
     331             :         void       *tmp_args;
     332             : 
     333             :         thread_handles[thread_num] = thread_handles[parallel_jobs - 1];
     334             : 
     335             :         /*
     336             :          * Move last active thread arg struct into the now-dead slot, and the
     337             :          * now-dead slot to the end for reuse by the next thread. Though the
     338             :          * thread struct is in use by another thread, we can safely swap the
     339             :          * struct pointers within the array.
     340             :          */
     341             :         tmp_args = cur_thread_args[thread_num];
     342             :         cur_thread_args[thread_num] = cur_thread_args[parallel_jobs - 1];
     343             :         cur_thread_args[parallel_jobs - 1] = tmp_args;
     344             :     }
     345             : #endif
     346             : 
     347             :     /* do this after job has been removed */
     348           0 :     parallel_jobs--;
     349             : 
     350           0 :     return true;
     351             : }

Generated by: LCOV version 1.13