LCOV - code coverage report
Current view: top level - src/bin/pg_upgrade - parallel.c (source / functions) Hit Total Coverage
Test: PostgreSQL 19devel Lines: 14 44 31.8 %
Date: 2025-08-31 10:17:55 Functions: 3 3 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :  *  parallel.c
       3             :  *
       4             :  *  multi-process support
       5             :  *
       6             :  *  Copyright (c) 2010-2025, PostgreSQL Global Development Group
       7             :  *  src/bin/pg_upgrade/parallel.c
       8             :  */
       9             : 
      10             : #include "postgres_fe.h"
      11             : 
      12             : #include <sys/wait.h>
      13             : #ifdef WIN32
      14             : #include <io.h>
      15             : #endif
      16             : 
      17             : #include "pg_upgrade.h"
      18             : 
      19             : static int  parallel_jobs;
      20             : 
      21             : #ifdef WIN32
      22             : /*
      23             :  *  Array holding all active threads.  There can't be any gaps/zeros so
      24             :  *  it can be passed to WaitForMultipleObjects().  We use two arrays
      25             :  *  so the thread_handles array can be passed to WaitForMultipleObjects().
      26             :  */
      27             : static HANDLE *thread_handles;
      28             : 
      29             : typedef struct
      30             : {
      31             :     char       *log_file;
      32             :     char       *opt_log_file;
      33             :     char       *cmd;
      34             : } exec_thread_arg;
      35             : 
      36             : typedef struct
      37             : {
      38             :     DbInfoArr  *old_db_arr;
      39             :     DbInfoArr  *new_db_arr;
      40             :     char       *old_pgdata;
      41             :     char       *new_pgdata;
      42             :     char       *old_tablespace;
      43             :     char       *new_tablespace;
      44             : } transfer_thread_arg;
      45             : 
      46             : static exec_thread_arg **exec_thread_args;
      47             : static transfer_thread_arg **transfer_thread_args;
      48             : 
      49             : /* track current thread_args struct so reap_child() can be used for all cases */
      50             : static void **cur_thread_args;
      51             : 
      52             : DWORD       win32_exec_prog(exec_thread_arg *args);
      53             : DWORD       win32_transfer_all_new_dbs(transfer_thread_arg *args);
      54             : #endif
      55             : 
      56             : /*
      57             :  *  parallel_exec_prog
      58             :  *
      59             :  *  This has the same API as exec_prog, except it does parallel execution,
      60             :  *  and therefore must throw errors and doesn't return an error status.
      61             :  */
      62             : void
      63         108 : parallel_exec_prog(const char *log_file, const char *opt_log_file,
      64             :                    const char *fmt,...)
      65             : {
      66             :     va_list     args;
      67             :     char        cmd[MAX_STRING];
      68             : 
      69             : #ifndef WIN32
      70             :     pid_t       child;
      71             : #else
      72             :     HANDLE      child;
      73             :     exec_thread_arg *new_arg;
      74             : #endif
      75             : 
      76         108 :     va_start(args, fmt);
      77         108 :     vsnprintf(cmd, sizeof(cmd), fmt, args);
      78         108 :     va_end(args);
      79             : 
      80         108 :     if (user_opts.jobs <= 1)
      81             :         /* exit_on_error must be true to allow jobs */
      82         108 :         exec_prog(log_file, opt_log_file, true, true, "%s", cmd);
      83             :     else
      84             :     {
      85             :         /* parallel */
      86             : #ifdef WIN32
      87             :         if (thread_handles == NULL)
      88             :             thread_handles = pg_malloc(user_opts.jobs * sizeof(HANDLE));
      89             : 
      90             :         if (exec_thread_args == NULL)
      91             :         {
      92             :             int         i;
      93             : 
      94             :             exec_thread_args = pg_malloc(user_opts.jobs * sizeof(exec_thread_arg *));
      95             : 
      96             :             /*
      97             :              * For safety and performance, we keep the args allocated during
      98             :              * the entire life of the process, and we don't free the args in a
      99             :              * thread different from the one that allocated it.
     100             :              */
     101             :             for (i = 0; i < user_opts.jobs; i++)
     102             :                 exec_thread_args[i] = pg_malloc0(sizeof(exec_thread_arg));
     103             :         }
     104             : 
     105             :         cur_thread_args = (void **) exec_thread_args;
     106             : #endif
     107             :         /* harvest any dead children */
     108           0 :         while (reap_child(false) == true)
     109             :             ;
     110             : 
     111             :         /* must we wait for a dead child? */
     112           0 :         if (parallel_jobs >= user_opts.jobs)
     113           0 :             reap_child(true);
     114             : 
     115             :         /* set this before we start the job */
     116           0 :         parallel_jobs++;
     117             : 
     118             :         /* Ensure stdio state is quiesced before forking */
     119           0 :         fflush(NULL);
     120             : 
     121             : #ifndef WIN32
     122           0 :         child = fork();
     123           0 :         if (child == 0)
     124             :             /* use _exit to skip atexit() functions */
     125           0 :             _exit(!exec_prog(log_file, opt_log_file, true, true, "%s", cmd));
     126           0 :         else if (child < 0)
     127             :             /* fork failed */
     128           0 :             pg_fatal("could not create worker process: %m");
     129             : #else
     130             :         /* empty array element are always at the end */
     131             :         new_arg = exec_thread_args[parallel_jobs - 1];
     132             : 
     133             :         /* Can only pass one pointer into the function, so use a struct */
     134             :         pg_free(new_arg->log_file);
     135             :         new_arg->log_file = pg_strdup(log_file);
     136             :         pg_free(new_arg->opt_log_file);
     137             :         new_arg->opt_log_file = opt_log_file ? pg_strdup(opt_log_file) : NULL;
     138             :         pg_free(new_arg->cmd);
     139             :         new_arg->cmd = pg_strdup(cmd);
     140             : 
     141             :         child = (HANDLE) _beginthreadex(NULL, 0, (void *) win32_exec_prog,
     142             :                                         new_arg, 0, NULL);
     143             :         if (child == 0)
     144             :             pg_fatal("could not create worker thread: %m");
     145             : 
     146             :         thread_handles[parallel_jobs - 1] = child;
     147             : #endif
     148             :     }
     149         108 : }
     150             : 
     151             : 
     152             : #ifdef WIN32
     153             : DWORD
     154             : win32_exec_prog(exec_thread_arg *args)
     155             : {
     156             :     int         ret;
     157             : 
     158             :     ret = !exec_prog(args->log_file, args->opt_log_file, true, true, "%s", args->cmd);
     159             : 
     160             :     /* terminates thread */
     161             :     return ret;
     162             : }
     163             : #endif
     164             : 
     165             : 
     166             : /*
     167             :  *  parallel_transfer_all_new_dbs
     168             :  *
     169             :  *  This has the same API as transfer_all_new_dbs, except it does parallel execution
     170             :  *  by transferring multiple tablespaces in parallel
     171             :  */
     172             : void
     173          16 : parallel_transfer_all_new_dbs(DbInfoArr *old_db_arr, DbInfoArr *new_db_arr,
     174             :                               char *old_pgdata, char *new_pgdata,
     175             :                               char *old_tablespace, char *new_tablespace)
     176             : {
     177             : #ifndef WIN32
     178             :     pid_t       child;
     179             : #else
     180             :     HANDLE      child;
     181             :     transfer_thread_arg *new_arg;
     182             : #endif
     183             : 
     184          16 :     if (user_opts.jobs <= 1)
     185          16 :         transfer_all_new_dbs(old_db_arr, new_db_arr, old_pgdata, new_pgdata, NULL, NULL);
     186             :     else
     187             :     {
     188             :         /* parallel */
     189             : #ifdef WIN32
     190             :         if (thread_handles == NULL)
     191             :             thread_handles = pg_malloc(user_opts.jobs * sizeof(HANDLE));
     192             : 
     193             :         if (transfer_thread_args == NULL)
     194             :         {
     195             :             int         i;
     196             : 
     197             :             transfer_thread_args = pg_malloc(user_opts.jobs * sizeof(transfer_thread_arg *));
     198             : 
     199             :             /*
     200             :              * For safety and performance, we keep the args allocated during
     201             :              * the entire life of the process, and we don't free the args in a
     202             :              * thread different from the one that allocated it.
     203             :              */
     204             :             for (i = 0; i < user_opts.jobs; i++)
     205             :                 transfer_thread_args[i] = pg_malloc0(sizeof(transfer_thread_arg));
     206             :         }
     207             : 
     208             :         cur_thread_args = (void **) transfer_thread_args;
     209             : #endif
     210             :         /* harvest any dead children */
     211           0 :         while (reap_child(false) == true)
     212             :             ;
     213             : 
     214             :         /* must we wait for a dead child? */
     215           0 :         if (parallel_jobs >= user_opts.jobs)
     216           0 :             reap_child(true);
     217             : 
     218             :         /* set this before we start the job */
     219           0 :         parallel_jobs++;
     220             : 
     221             :         /* Ensure stdio state is quiesced before forking */
     222           0 :         fflush(NULL);
     223             : 
     224             : #ifndef WIN32
     225           0 :         child = fork();
     226           0 :         if (child == 0)
     227             :         {
     228           0 :             transfer_all_new_dbs(old_db_arr, new_db_arr, old_pgdata, new_pgdata,
     229             :                                  old_tablespace, new_tablespace);
     230             :             /* if we take another exit path, it will be non-zero */
     231             :             /* use _exit to skip atexit() functions */
     232           0 :             _exit(0);
     233             :         }
     234           0 :         else if (child < 0)
     235             :             /* fork failed */
     236           0 :             pg_fatal("could not create worker process: %m");
     237             : #else
     238             :         /* empty array element are always at the end */
     239             :         new_arg = transfer_thread_args[parallel_jobs - 1];
     240             : 
     241             :         /* Can only pass one pointer into the function, so use a struct */
     242             :         new_arg->old_db_arr = old_db_arr;
     243             :         new_arg->new_db_arr = new_db_arr;
     244             :         pg_free(new_arg->old_pgdata);
     245             :         new_arg->old_pgdata = pg_strdup(old_pgdata);
     246             :         pg_free(new_arg->new_pgdata);
     247             :         new_arg->new_pgdata = pg_strdup(new_pgdata);
     248             :         pg_free(new_arg->old_tablespace);
     249             :         new_arg->old_tablespace = old_tablespace ? pg_strdup(old_tablespace) : NULL;
     250             :         new_arg->new_tablespace = new_tablespace ? pg_strdup(new_tablespace) : NULL;
     251             : 
     252             :         child = (HANDLE) _beginthreadex(NULL, 0, (void *) win32_transfer_all_new_dbs,
     253             :                                         new_arg, 0, NULL);
     254             :         if (child == 0)
     255             :             pg_fatal("could not create worker thread: %m");
     256             : 
     257             :         thread_handles[parallel_jobs - 1] = child;
     258             : #endif
     259             :     }
     260          16 : }
     261             : 
     262             : 
     263             : #ifdef WIN32
     264             : DWORD
     265             : win32_transfer_all_new_dbs(transfer_thread_arg *args)
     266             : {
     267             :     transfer_all_new_dbs(args->old_db_arr, args->new_db_arr, args->old_pgdata,
     268             :                          args->new_pgdata, args->old_tablespace,
     269             :                          args->new_tablespace);
     270             : 
     271             :     /* terminates thread */
     272             :     return 0;
     273             : }
     274             : #endif
     275             : 
     276             : 
     277             : /*
     278             :  *  collect status from a completed worker child
     279             :  */
     280             : bool
     281          36 : reap_child(bool wait_for_child)
     282             : {
     283             : #ifndef WIN32
     284             :     int         work_status;
     285             :     pid_t       child;
     286             : #else
     287             :     int         thread_num;
     288             :     DWORD       res;
     289             : #endif
     290             : 
     291          36 :     if (user_opts.jobs <= 1 || parallel_jobs == 0)
     292          36 :         return false;
     293             : 
     294             : #ifndef WIN32
     295           0 :     child = waitpid(-1, &work_status, wait_for_child ? 0 : WNOHANG);
     296           0 :     if (child == (pid_t) -1)
     297           0 :         pg_fatal("%s() failed: %m", "waitpid");
     298           0 :     if (child == 0)
     299           0 :         return false;           /* no children, or no dead children */
     300           0 :     if (work_status != 0)
     301           0 :         pg_fatal("child process exited abnormally: status %d", work_status);
     302             : #else
     303             :     /* wait for one to finish */
     304             :     thread_num = WaitForMultipleObjects(parallel_jobs, thread_handles,
     305             :                                         false, wait_for_child ? INFINITE : 0);
     306             : 
     307             :     if (thread_num == WAIT_TIMEOUT || thread_num == WAIT_FAILED)
     308             :         return false;
     309             : 
     310             :     /* compute thread index in active_threads */
     311             :     thread_num -= WAIT_OBJECT_0;
     312             : 
     313             :     /* get the result */
     314             :     GetExitCodeThread(thread_handles[thread_num], &res);
     315             :     if (res != 0)
     316             :         pg_fatal("child worker exited abnormally: %m");
     317             : 
     318             :     /* dispose of handle to stop leaks */
     319             :     CloseHandle(thread_handles[thread_num]);
     320             : 
     321             :     /* Move last slot into dead child's position */
     322             :     if (thread_num != parallel_jobs - 1)
     323             :     {
     324             :         void       *tmp_args;
     325             : 
     326             :         thread_handles[thread_num] = thread_handles[parallel_jobs - 1];
     327             : 
     328             :         /*
     329             :          * Move last active thread arg struct into the now-dead slot, and the
     330             :          * now-dead slot to the end for reuse by the next thread. Though the
     331             :          * thread struct is in use by another thread, we can safely swap the
     332             :          * struct pointers within the array.
     333             :          */
     334             :         tmp_args = cur_thread_args[thread_num];
     335             :         cur_thread_args[thread_num] = cur_thread_args[parallel_jobs - 1];
     336             :         cur_thread_args[parallel_jobs - 1] = tmp_args;
     337             :     }
     338             : #endif
     339             : 
     340             :     /* do this after job has been removed */
     341           0 :     parallel_jobs--;
     342             : 
     343           0 :     return true;
     344             : }

Generated by: LCOV version 1.16