LCOV - code coverage report
Current view: top level - src/bin/pg_upgrade - parallel.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 31.8 % 44 14
Test Date: 2026-03-03 13:15:30 Functions: 100.0 % 3 3
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*
       2              :  *  parallel.c
       3              :  *
       4              :  *  multi-process support
       5              :  *
       6              :  *  Copyright (c) 2010-2026, PostgreSQL Global Development Group
       7              :  *  src/bin/pg_upgrade/parallel.c
       8              :  */
       9              : 
      10              : #include "postgres_fe.h"
      11              : 
      12              : #include <sys/wait.h>
      13              : #ifdef WIN32
      14              : #include <io.h>
      15              : #endif
      16              : 
      17              : #include "pg_upgrade.h"
      18              : 
      19              : static int  parallel_jobs;
      20              : 
      21              : #ifdef WIN32
      22              : /*
      23              :  *  Array holding all active threads.  There can't be any gaps/zeros so
      24              :  *  it can be passed to WaitForMultipleObjects().  We use two arrays
      25              :  *  so the thread_handles array can be passed to WaitForMultipleObjects().
      26              :  */
      27              : static HANDLE *thread_handles;
      28              : 
      29              : typedef struct
      30              : {
      31              :     char       *log_file;
      32              :     char       *opt_log_file;
      33              :     char       *cmd;
      34              : } exec_thread_arg;
      35              : 
      36              : typedef struct
      37              : {
      38              :     DbInfoArr  *old_db_arr;
      39              :     DbInfoArr  *new_db_arr;
      40              :     char       *old_pgdata;
      41              :     char       *new_pgdata;
      42              :     char       *old_tablespace;
      43              :     char       *new_tablespace;
      44              : } transfer_thread_arg;
      45              : 
      46              : static exec_thread_arg **exec_thread_args;
      47              : static transfer_thread_arg **transfer_thread_args;
      48              : 
      49              : /* track current thread_args struct so reap_child() can be used for all cases */
      50              : static void **cur_thread_args;
      51              : 
      52              : DWORD       win32_exec_prog(exec_thread_arg *args);
      53              : DWORD       win32_transfer_all_new_dbs(transfer_thread_arg *args);
      54              : #endif
      55              : 
      56              : /*
      57              :  *  parallel_exec_prog
      58              :  *
      59              :  *  This has the same API as exec_prog, except it does parallel execution,
      60              :  *  and therefore must throw errors and doesn't return an error status.
      61              :  */
      62              : void
      63           57 : parallel_exec_prog(const char *log_file, const char *opt_log_file,
      64              :                    const char *fmt,...)
      65              : {
      66              :     va_list     args;
      67              :     char        cmd[MAX_STRING];
      68              : 
      69              : #ifndef WIN32
      70              :     pid_t       child;
      71              : #else
      72              :     HANDLE      child;
      73              :     exec_thread_arg *new_arg;
      74              : #endif
      75              : 
      76           57 :     va_start(args, fmt);
      77           57 :     vsnprintf(cmd, sizeof(cmd), fmt, args);
      78           57 :     va_end(args);
      79              : 
      80           57 :     if (user_opts.jobs <= 1)
      81              :         /* exit_on_error must be true to allow jobs */
      82           57 :         exec_prog(log_file, opt_log_file, true, true, "%s", cmd);
      83              :     else
      84              :     {
      85              :         /* parallel */
      86              : #ifdef WIN32
      87              :         if (thread_handles == NULL)
      88              :             thread_handles = pg_malloc_array(HANDLE, user_opts.jobs);
      89              : 
      90              :         if (exec_thread_args == NULL)
      91              :         {
      92              :             int         i;
      93              : 
      94              :             exec_thread_args = pg_malloc_array(exec_thread_arg *, user_opts.jobs);
      95              : 
      96              :             /*
      97              :              * For safety and performance, we keep the args allocated during
      98              :              * the entire life of the process, and we don't free the args in a
      99              :              * thread different from the one that allocated it.
     100              :              */
     101              :             for (i = 0; i < user_opts.jobs; i++)
     102              :                 exec_thread_args[i] = pg_malloc0_object(exec_thread_arg);
     103              :         }
     104              : 
     105              :         cur_thread_args = (void **) exec_thread_args;
     106              : #endif
     107              :         /* harvest any dead children */
     108            0 :         while (reap_child(false) == true)
     109              :             ;
     110              : 
     111              :         /* must we wait for a dead child? */
     112            0 :         if (parallel_jobs >= user_opts.jobs)
     113            0 :             reap_child(true);
     114              : 
     115              :         /* set this before we start the job */
     116            0 :         parallel_jobs++;
     117              : 
     118              :         /* Ensure stdio state is quiesced before forking */
     119            0 :         fflush(NULL);
     120              : 
     121              : #ifndef WIN32
     122            0 :         child = fork();
     123            0 :         if (child == 0)
     124              :             /* use _exit to skip atexit() functions */
     125            0 :             _exit(!exec_prog(log_file, opt_log_file, true, true, "%s", cmd));
     126            0 :         else if (child < 0)
     127              :             /* fork failed */
     128            0 :             pg_fatal("could not create worker process: %m");
     129              : #else
     130              :         /* empty array element are always at the end */
     131              :         new_arg = exec_thread_args[parallel_jobs - 1];
     132              : 
     133              :         /* Can only pass one pointer into the function, so use a struct */
     134              :         pg_free(new_arg->log_file);
     135              :         new_arg->log_file = pg_strdup(log_file);
     136              :         pg_free(new_arg->opt_log_file);
     137              :         new_arg->opt_log_file = opt_log_file ? pg_strdup(opt_log_file) : NULL;
     138              :         pg_free(new_arg->cmd);
     139              :         new_arg->cmd = pg_strdup(cmd);
     140              : 
     141              :         child = (HANDLE) _beginthreadex(NULL, 0, (void *) win32_exec_prog,
     142              :                                         new_arg, 0, NULL);
     143              :         if (child == 0)
     144              :             pg_fatal("could not create worker thread: %m");
     145              : 
     146              :         thread_handles[parallel_jobs - 1] = child;
     147              : #endif
     148              :     }
     149           57 : }
     150              : 
     151              : 
     152              : #ifdef WIN32
     153              : DWORD
     154              : win32_exec_prog(exec_thread_arg *args)
     155              : {
     156              :     int         ret;
     157              : 
     158              :     ret = !exec_prog(args->log_file, args->opt_log_file, true, true, "%s", args->cmd);
     159              : 
     160              :     /* terminates thread */
     161              :     return ret;
     162              : }
     163              : #endif
     164              : 
     165              : 
     166              : /*
     167              :  *  parallel_transfer_all_new_dbs
     168              :  *
     169              :  *  This has the same API as transfer_all_new_dbs, except it does parallel execution
     170              :  *  by transferring multiple tablespaces in parallel
     171              :  */
     172              : void
     173            9 : parallel_transfer_all_new_dbs(DbInfoArr *old_db_arr, DbInfoArr *new_db_arr,
     174              :                               char *old_pgdata, char *new_pgdata,
     175              :                               char *old_tablespace, char *new_tablespace)
     176              : {
     177              : #ifndef WIN32
     178              :     pid_t       child;
     179              : #else
     180              :     HANDLE      child;
     181              :     transfer_thread_arg *new_arg;
     182              : #endif
     183              : 
     184            9 :     if (user_opts.jobs <= 1)
     185            9 :         transfer_all_new_dbs(old_db_arr, new_db_arr, old_pgdata, new_pgdata, NULL, NULL);
     186              :     else
     187              :     {
     188              :         /* parallel */
     189              : #ifdef WIN32
     190              :         if (thread_handles == NULL)
     191              :             thread_handles = pg_malloc_array(HANDLE, user_opts.jobs);
     192              : 
     193              :         if (transfer_thread_args == NULL)
     194              :         {
     195              :             int         i;
     196              : 
     197              :             transfer_thread_args = pg_malloc_array(transfer_thread_arg *, user_opts.jobs);
     198              : 
     199              :             /*
     200              :              * For safety and performance, we keep the args allocated during
     201              :              * the entire life of the process, and we don't free the args in a
     202              :              * thread different from the one that allocated it.
     203              :              */
     204              :             for (i = 0; i < user_opts.jobs; i++)
     205              :                 transfer_thread_args[i] = pg_malloc0_object(transfer_thread_arg);
     206              :         }
     207              : 
     208              :         cur_thread_args = (void **) transfer_thread_args;
     209              : #endif
     210              :         /* harvest any dead children */
     211            0 :         while (reap_child(false) == true)
     212              :             ;
     213              : 
     214              :         /* must we wait for a dead child? */
     215            0 :         if (parallel_jobs >= user_opts.jobs)
     216            0 :             reap_child(true);
     217              : 
     218              :         /* set this before we start the job */
     219            0 :         parallel_jobs++;
     220              : 
     221              :         /* Ensure stdio state is quiesced before forking */
     222            0 :         fflush(NULL);
     223              : 
     224              : #ifndef WIN32
     225            0 :         child = fork();
     226            0 :         if (child == 0)
     227              :         {
     228            0 :             transfer_all_new_dbs(old_db_arr, new_db_arr, old_pgdata, new_pgdata,
     229              :                                  old_tablespace, new_tablespace);
     230              :             /* if we take another exit path, it will be non-zero */
     231              :             /* use _exit to skip atexit() functions */
     232            0 :             _exit(0);
     233              :         }
     234            0 :         else if (child < 0)
     235              :             /* fork failed */
     236            0 :             pg_fatal("could not create worker process: %m");
     237              : #else
     238              :         /* empty array element are always at the end */
     239              :         new_arg = transfer_thread_args[parallel_jobs - 1];
     240              : 
     241              :         /* Can only pass one pointer into the function, so use a struct */
     242              :         new_arg->old_db_arr = old_db_arr;
     243              :         new_arg->new_db_arr = new_db_arr;
     244              :         pg_free(new_arg->old_pgdata);
     245              :         new_arg->old_pgdata = pg_strdup(old_pgdata);
     246              :         pg_free(new_arg->new_pgdata);
     247              :         new_arg->new_pgdata = pg_strdup(new_pgdata);
     248              :         pg_free(new_arg->old_tablespace);
     249              :         new_arg->old_tablespace = old_tablespace ? pg_strdup(old_tablespace) : NULL;
     250              :         new_arg->new_tablespace = new_tablespace ? pg_strdup(new_tablespace) : NULL;
     251              : 
     252              :         child = (HANDLE) _beginthreadex(NULL, 0, (void *) win32_transfer_all_new_dbs,
     253              :                                         new_arg, 0, NULL);
     254              :         if (child == 0)
     255              :             pg_fatal("could not create worker thread: %m");
     256              : 
     257              :         thread_handles[parallel_jobs - 1] = child;
     258              : #endif
     259              :     }
     260            9 : }
     261              : 
     262              : 
     263              : #ifdef WIN32
     264              : DWORD
     265              : win32_transfer_all_new_dbs(transfer_thread_arg *args)
     266              : {
     267              :     transfer_all_new_dbs(args->old_db_arr, args->new_db_arr, args->old_pgdata,
     268              :                          args->new_pgdata, args->old_tablespace,
     269              :                          args->new_tablespace);
     270              : 
     271              :     /* terminates thread */
     272              :     return 0;
     273              : }
     274              : #endif
     275              : 
     276              : 
     277              : /*
     278              :  *  collect status from a completed worker child
     279              :  */
     280              : bool
     281           20 : reap_child(bool wait_for_child)
     282              : {
     283              : #ifndef WIN32
     284              :     int         work_status;
     285              :     pid_t       child;
     286              : #else
     287              :     int         thread_num;
     288              :     DWORD       res;
     289              : #endif
     290              : 
     291           20 :     if (user_opts.jobs <= 1 || parallel_jobs == 0)
     292           20 :         return false;
     293              : 
     294              : #ifndef WIN32
     295            0 :     child = waitpid(-1, &work_status, wait_for_child ? 0 : WNOHANG);
     296            0 :     if (child == (pid_t) -1)
     297            0 :         pg_fatal("%s() failed: %m", "waitpid");
     298            0 :     if (child == 0)
     299            0 :         return false;           /* no children, or no dead children */
     300            0 :     if (work_status != 0)
     301            0 :         pg_fatal("child process exited abnormally: status %d", work_status);
     302              : #else
     303              :     /* wait for one to finish */
     304              :     thread_num = WaitForMultipleObjects(parallel_jobs, thread_handles,
     305              :                                         false, wait_for_child ? INFINITE : 0);
     306              : 
     307              :     if (thread_num == WAIT_TIMEOUT || thread_num == WAIT_FAILED)
     308              :         return false;
     309              : 
     310              :     /* compute thread index in active_threads */
     311              :     thread_num -= WAIT_OBJECT_0;
     312              : 
     313              :     /* get the result */
     314              :     GetExitCodeThread(thread_handles[thread_num], &res);
     315              :     if (res != 0)
     316              :         pg_fatal("child worker exited abnormally: %m");
     317              : 
     318              :     /* dispose of handle to stop leaks */
     319              :     CloseHandle(thread_handles[thread_num]);
     320              : 
     321              :     /* Move last slot into dead child's position */
     322              :     if (thread_num != parallel_jobs - 1)
     323              :     {
     324              :         void       *tmp_args;
     325              : 
     326              :         thread_handles[thread_num] = thread_handles[parallel_jobs - 1];
     327              : 
     328              :         /*
     329              :          * Move last active thread arg struct into the now-dead slot, and the
     330              :          * now-dead slot to the end for reuse by the next thread. Though the
     331              :          * thread struct is in use by another thread, we can safely swap the
     332              :          * struct pointers within the array.
     333              :          */
     334              :         tmp_args = cur_thread_args[thread_num];
     335              :         cur_thread_args[thread_num] = cur_thread_args[parallel_jobs - 1];
     336              :         cur_thread_args[parallel_jobs - 1] = tmp_args;
     337              :     }
     338              : #endif
     339              : 
     340              :     /* do this after job has been removed */
     341            0 :     parallel_jobs--;
     342              : 
     343            0 :     return true;
     344              : }
        

Generated by: LCOV version 2.0-1