LCOV - code coverage report
Current view: top level - src/bin/pg_upgrade - task.c (source / functions) Coverage Total Hit
Test: PostgreSQL 20devel Lines: 90.8 % 130 118
Test Date: 2026-07-03 19:57:34 Functions: 100.0 % 9 9
Legend: Lines:     hit not hit
Branches: + taken - not taken # not executed
Branches: 75.7 % 70 53

             Branch data     Line data    Source code
       1                 :             : /*
       2                 :             :  * task.c
       3                 :             :  *      framework for parallelizing pg_upgrade's once-in-each-database tasks
       4                 :             :  *
       5                 :             :  * This framework provides an efficient way of running the various
       6                 :             :  * once-in-each-database tasks required by pg_upgrade.  Specifically, it
       7                 :             :  * parallelizes these tasks by managing a set of slots that follow a simple
       8                 :             :  * state machine and by using libpq's asynchronous APIs to establish the
       9                 :             :  * connections and run the queries.  Callers simply need to create a callback
      10                 :             :  * function and build/execute an UpgradeTask.  A simple example follows:
      11                 :             :  *
      12                 :             :  *      static void
      13                 :             :  *      my_process_cb(DbInfo *dbinfo, PGresult *res, void *arg)
      14                 :             :  *      {
      15                 :             :  *          for (int i = 0; i < PQntuples(res); i++)
      16                 :             :  *          {
      17                 :             :  *              ... process results ...
      18                 :             :  *          }
      19                 :             :  *      }
      20                 :             :  *
      21                 :             :  *      void
      22                 :             :  *      my_task(ClusterInfo *cluster)
      23                 :             :  *      {
      24                 :             :  *          UpgradeTask *task = upgrade_task_create();
      25                 :             :  *
      26                 :             :  *          upgrade_task_add_step(task,
      27                 :             :  *                                "... query text ...",
      28                 :             :  *                                my_process_cb,
      29                 :             :  *                                true,     // let the task free the PGresult
      30                 :             :  *                                NULL);    // "arg" pointer for callback
      31                 :             :  *          upgrade_task_run(task, cluster);
      32                 :             :  *          upgrade_task_free(task);
      33                 :             :  *      }
      34                 :             :  *
      35                 :             :  * Note that multiple steps can be added to a given task.  When there are
      36                 :             :  * multiple steps, the task will run all of the steps consecutively in the same
      37                 :             :  * database connection before freeing the connection and moving on.  In other
      38                 :             :  * words, it only ever initiates one connection to each database in the
      39                 :             :  * cluster for a given run.
      40                 :             :  *
      41                 :             :  * Copyright (c) 2024-2026, PostgreSQL Global Development Group
      42                 :             :  * src/bin/pg_upgrade/task.c
      43                 :             :  */
      44                 :             : 
      45                 :             : #include "postgres_fe.h"
      46                 :             : 
      47                 :             : #include "common/connect.h"
      48                 :             : #include "fe_utils/string_utils.h"
      49                 :             : #include "pg_upgrade.h"
      50                 :             : 
      51                 :             : /*
      52                 :             :  * dbs_complete stores the number of databases that we have completed
      53                 :             :  * processing.  When this value equals the number of databases in the cluster,
      54                 :             :  * the task is finished.
      55                 :             :  */
      56                 :             : static int  dbs_complete;
      57                 :             : 
      58                 :             : /*
      59                 :             :  * dbs_processing stores the index of the next database in the cluster's array
      60                 :             :  * of databases that will be picked up for processing.  It will always be
      61                 :             :  * greater than or equal to dbs_complete.
      62                 :             :  */
      63                 :             : static int  dbs_processing;
      64                 :             : 
      65                 :             : /*
      66                 :             :  * This struct stores the information for a single step of a task.  Note that
      67                 :             :  * the query string is stored in the "queries" PQExpBuffer for the UpgradeTask.
      68                 :             :  * All steps in a task are run in a single connection before moving on to the
      69                 :             :  * next database (which requires a new connection).
      70                 :             :  */
      71                 :             : typedef struct UpgradeTaskStep
      72                 :             : {
      73                 :             :     UpgradeTaskProcessCB process_cb;    /* processes the results of the query */
      74                 :             :     bool        free_result;    /* should we free the result? */
      75                 :             :     void       *arg;            /* pointer passed to process_cb */
      76                 :             : } UpgradeTaskStep;
      77                 :             : 
      78                 :             : /*
      79                 :             :  * This struct is a thin wrapper around an array of steps, i.e.,
      80                 :             :  * UpgradeTaskStep, plus a PQExpBuffer for all the query strings.
      81                 :             :  */
      82                 :             : struct UpgradeTask
      83                 :             : {
      84                 :             :     UpgradeTaskStep *steps;
      85                 :             :     int         num_steps;
      86                 :             :     PQExpBuffer queries;
      87                 :             : };
      88                 :             : 
      89                 :             : /*
      90                 :             :  * The different states for a parallel slot.
      91                 :             :  */
      92                 :             : typedef enum UpgradeTaskSlotState
      93                 :             : {
      94                 :             :     FREE,                       /* slot available for use in a new database */
      95                 :             :     CONNECTING,                 /* waiting for connection to be established */
      96                 :             :     RUNNING_QUERIES,            /* running/processing queries in the task */
      97                 :             : } UpgradeTaskSlotState;
      98                 :             : 
      99                 :             : /*
     100                 :             :  * We maintain an array of user_opts.jobs slots to execute the task.
     101                 :             :  */
     102                 :             : typedef struct UpgradeTaskSlot
     103                 :             : {
     104                 :             :     UpgradeTaskSlotState state; /* state of the slot */
     105                 :             :     int         db_idx;         /* index of the database assigned to slot */
     106                 :             :     int         step_idx;       /* index of the current step of task */
     107                 :             :     PGconn     *conn;           /* current connection managed by slot */
     108                 :             :     bool        ready;          /* slot is ready for processing */
     109                 :             :     bool        select_mode;    /* select() mode: true->read, false->write */
     110                 :             :     int         sock;           /* file descriptor for connection's socket */
     111                 :             : } UpgradeTaskSlot;
     112                 :             : 
     113                 :             : /*
     114                 :             :  * Initializes an UpgradeTask.
     115                 :             :  */
     116                 :             : UpgradeTask *
     117                 :         100 : upgrade_task_create(void)
     118                 :             : {
     119                 :         100 :     UpgradeTask *task = pg_malloc0_object(UpgradeTask);
     120                 :             : 
     121                 :         100 :     task->queries = createPQExpBuffer();
     122                 :             : 
     123                 :             :     /* All tasks must first set a secure search_path. */
     124                 :         100 :     upgrade_task_add_step(task, ALWAYS_SECURE_SEARCH_PATH_SQL, NULL, true, NULL);
     125                 :             : 
     126                 :         100 :     return task;
     127                 :             : }
     128                 :             : 
     129                 :             : /*
     130                 :             :  * Frees all storage associated with an UpgradeTask.
     131                 :             :  */
     132                 :             : void
     133                 :         100 : upgrade_task_free(UpgradeTask *task)
     134                 :             : {
     135                 :         100 :     destroyPQExpBuffer(task->queries);
     136                 :         100 :     pg_free(task->steps);
     137                 :         100 :     pg_free(task);
     138                 :         100 : }
     139                 :             : 
     140                 :             : /*
     141                 :             :  * Adds a step to an UpgradeTask.  The steps will be executed in each database
     142                 :             :  * in the order in which they are added.
     143                 :             :  *
     144                 :             :  *  task: task object that must have been initialized via upgrade_task_create()
     145                 :             :  *  query: the query text
     146                 :             :  *  process_cb: function that processes the results of the query
     147                 :             :  *  free_result: should we free the PGresult, or leave it to the caller?
     148                 :             :  *  arg: pointer to task-specific data that is passed to each callback
     149                 :             :  */
     150                 :             : void
     151                 :         232 : upgrade_task_add_step(UpgradeTask *task, const char *query,
     152                 :             :                       UpgradeTaskProcessCB process_cb, bool free_result,
     153                 :             :                       void *arg)
     154                 :             : {
     155                 :             :     UpgradeTaskStep *new_step;
     156                 :             : 
     157                 :         232 :     task->steps = pg_realloc_array(task->steps, UpgradeTaskStep,
     158                 :             :                                    ++task->num_steps);
     159                 :             : 
     160                 :         232 :     new_step = &task->steps[task->num_steps - 1];
     161                 :         232 :     new_step->process_cb = process_cb;
     162                 :         232 :     new_step->free_result = free_result;
     163                 :         232 :     new_step->arg = arg;
     164                 :             : 
     165                 :         232 :     appendPQExpBuffer(task->queries, "%s;", query);
     166                 :         232 : }
     167                 :             : 
     168                 :             : /*
     169                 :             :  * Build a connection string for the slot's current database and asynchronously
     170                 :             :  * start a new connection, but do not wait for the connection to be
     171                 :             :  * established.
     172                 :             :  */
     173                 :             : static void
     174                 :         296 : start_conn(const ClusterInfo *cluster, UpgradeTaskSlot *slot)
     175                 :             : {
     176                 :             :     PQExpBufferData conn_opts;
     177                 :         296 :     DbInfo     *dbinfo = &cluster->dbarr.dbs[slot->db_idx];
     178                 :             : 
     179                 :             :     /* Build connection string with proper quoting */
     180                 :         296 :     initPQExpBuffer(&conn_opts);
     181                 :         296 :     appendPQExpBufferStr(&conn_opts, "dbname=");
     182                 :         296 :     appendConnStrVal(&conn_opts, dbinfo->db_name);
     183                 :         296 :     appendPQExpBufferStr(&conn_opts, " user=");
     184                 :         296 :     appendConnStrVal(&conn_opts, os_info.user);
     185                 :         296 :     appendPQExpBuffer(&conn_opts, " port=%d", cluster->port);
     186         [ +  - ]:         296 :     if (cluster->sockdir)
     187                 :             :     {
     188                 :         296 :         appendPQExpBufferStr(&conn_opts, " host=");
     189                 :         296 :         appendConnStrVal(&conn_opts, cluster->sockdir);
     190                 :             :     }
     191         [ -  + ]:         296 :     if (!protocol_negotiation_supported(cluster))
     192                 :           0 :         appendPQExpBufferStr(&conn_opts, " max_protocol_version=3.0");
     193                 :             : 
     194                 :         296 :     slot->conn = PQconnectStart(conn_opts.data);
     195                 :             : 
     196         [ -  + ]:         296 :     if (!slot->conn)
     197                 :           0 :         pg_fatal("out of memory");
     198                 :             : 
     199                 :         296 :     termPQExpBuffer(&conn_opts);
     200                 :         296 : }
     201                 :             : 
     202                 :             : /*
     203                 :             :  * Run the process_cb callback function to process the result of a query, and
     204                 :             :  * free the result if the caller indicated we should do so.
     205                 :             :  */
     206                 :             : static void
     207                 :         692 : process_query_result(const ClusterInfo *cluster, UpgradeTaskSlot *slot,
     208                 :             :                      const UpgradeTask *task)
     209                 :             : {
     210                 :         692 :     UpgradeTaskStep *steps = &task->steps[slot->step_idx];
     211                 :         692 :     UpgradeTaskProcessCB process_cb = steps->process_cb;
     212                 :         692 :     DbInfo     *dbinfo = &cluster->dbarr.dbs[slot->db_idx];
     213                 :         692 :     PGresult   *res = PQgetResult(slot->conn);
     214                 :             : 
     215   [ +  -  -  + ]:        1384 :     if (PQstatus(slot->conn) == CONNECTION_BAD ||
     216         [ -  - ]:         692 :         (PQresultStatus(res) != PGRES_TUPLES_OK &&
     217                 :           0 :          PQresultStatus(res) != PGRES_COMMAND_OK))
     218                 :           0 :         pg_fatal("connection failure: %s", PQerrorMessage(slot->conn));
     219                 :             : 
     220                 :             :     /*
     221                 :             :      * We assume that a NULL process_cb callback function means there's
     222                 :             :      * nothing to process.  This is primarily intended for the initial step in
     223                 :             :      * every task that sets a safe search_path.
     224                 :             :      */
     225         [ +  + ]:         692 :     if (process_cb)
     226                 :         396 :         (*process_cb) (dbinfo, res, steps->arg);
     227                 :             : 
     228         [ +  + ]:         692 :     if (steps->free_result)
     229                 :         640 :         PQclear(res);
     230                 :         692 : }
     231                 :             : 
     232                 :             : /*
     233                 :             :  * Advances the state machine for a given slot as necessary.
     234                 :             :  */
     235                 :             : static void
     236                 :        1320 : process_slot(const ClusterInfo *cluster, UpgradeTaskSlot *slot, const UpgradeTask *task)
     237                 :             : {
     238                 :             :     PostgresPollingStatusType status;
     239                 :             : 
     240         [ -  + ]:        1320 :     if (!slot->ready)
     241                 :           0 :         return;
     242                 :             : 
     243   [ +  +  +  - ]:        1320 :     switch (slot->state)
     244                 :             :     {
     245                 :         396 :         case FREE:
     246                 :             : 
     247                 :             :             /*
     248                 :             :              * If all of the databases in the cluster have been processed or
     249                 :             :              * are currently being processed by other slots, we are done.
     250                 :             :              */
     251         [ +  + ]:         396 :             if (dbs_processing >= cluster->dbarr.ndbs)
     252                 :         100 :                 return;
     253                 :             : 
     254                 :             :             /*
     255                 :             :              * Claim the next database in the cluster's array and initiate a
     256                 :             :              * new connection.
     257                 :             :              */
     258                 :         296 :             slot->db_idx = dbs_processing++;
     259                 :         296 :             slot->state = CONNECTING;
     260                 :         296 :             start_conn(cluster, slot);
     261                 :             : 
     262                 :         296 :             return;
     263                 :             : 
     264                 :         592 :         case CONNECTING:
     265                 :             : 
     266                 :             :             /* Check for connection failure. */
     267                 :         592 :             status = PQconnectPoll(slot->conn);
     268         [ -  + ]:         592 :             if (status == PGRES_POLLING_FAILED)
     269                 :           0 :                 pg_fatal("connection failure: %s", PQerrorMessage(slot->conn));
     270                 :             : 
     271                 :             :             /* Check whether the connection is still establishing. */
     272         [ +  + ]:         592 :             if (status != PGRES_POLLING_OK)
     273                 :             :             {
     274                 :         296 :                 slot->select_mode = (status == PGRES_POLLING_READING);
     275                 :         296 :                 return;
     276                 :             :             }
     277                 :             : 
     278                 :             :             /*
     279                 :             :              * Move on to running/processing the queries in the task.
     280                 :             :              */
     281                 :         296 :             slot->state = RUNNING_QUERIES;
     282                 :         296 :             slot->select_mode = true;    /* wait until ready for reading */
     283         [ -  + ]:         296 :             if (!PQsendQuery(slot->conn, task->queries->data))
     284                 :           0 :                 pg_fatal("connection failure: %s", PQerrorMessage(slot->conn));
     285                 :             : 
     286                 :         296 :             return;
     287                 :             : 
     288                 :         332 :         case RUNNING_QUERIES:
     289                 :             : 
     290                 :             :             /*
     291                 :             :              * Consume any available data and clear the read-ready indicator
     292                 :             :              * for the connection.
     293                 :             :              */
     294         [ -  + ]:         332 :             if (!PQconsumeInput(slot->conn))
     295                 :           0 :                 pg_fatal("connection failure: %s", PQerrorMessage(slot->conn));
     296                 :             : 
     297                 :             :             /*
     298                 :             :              * Process any results that are ready so that we can free up this
     299                 :             :              * slot for another database as soon as possible.
     300                 :             :              */
     301         [ +  + ]:        1024 :             for (; slot->step_idx < task->num_steps; slot->step_idx++)
     302                 :             :             {
     303                 :             :                 /* If no more results are available yet, move on. */
     304         [ +  + ]:         728 :                 if (PQisBusy(slot->conn))
     305                 :          36 :                     return;
     306                 :             : 
     307                 :         692 :                 process_query_result(cluster, slot, task);
     308                 :             :             }
     309                 :             : 
     310                 :             :             /*
     311                 :             :              * If we just finished processing the result of the last step in
     312                 :             :              * the task, free the slot.  We recursively call this function on
     313                 :             :              * the newly-freed slot so that we can start initiating the next
     314                 :             :              * connection immediately instead of waiting for the next loop
     315                 :             :              * through the slots.
     316                 :             :              */
     317                 :         296 :             dbs_complete++;
     318                 :         296 :             PQfinish(slot->conn);
     319                 :         296 :             memset(slot, 0, sizeof(UpgradeTaskSlot));
     320                 :         296 :             slot->ready = true;
     321                 :             : 
     322                 :         296 :             process_slot(cluster, slot, task);
     323                 :             : 
     324                 :         296 :             return;
     325                 :             :     }
     326                 :             : }
     327                 :             : 
     328                 :             : /*
     329                 :             :  * Returns -1 on error, else the number of ready descriptors.
     330                 :             :  */
     331                 :             : static int
     332                 :        1024 : select_loop(int maxFd, fd_set *input, fd_set *output)
     333                 :             : {
     334                 :        1024 :     fd_set      save_input = *input;
     335                 :        1024 :     fd_set      save_output = *output;
     336                 :             : 
     337         [ +  + ]:        1024 :     if (maxFd == 0)
     338                 :         100 :         return 0;
     339                 :             : 
     340                 :             :     for (;;)
     341                 :           0 :     {
     342                 :             :         int         i;
     343                 :             : 
     344                 :         924 :         *input = save_input;
     345                 :         924 :         *output = save_output;
     346                 :             : 
     347                 :         924 :         i = select(maxFd + 1, input, output, NULL, NULL);
     348                 :             : 
     349                 :             : #ifndef WIN32
     350   [ -  +  -  - ]:         924 :         if (i < 0 && errno == EINTR)
     351                 :           0 :             continue;
     352                 :             : #else
     353                 :             :         if (i == SOCKET_ERROR && WSAGetLastError() == WSAEINTR)
     354                 :             :             continue;
     355                 :             : #endif
     356                 :         924 :         return i;
     357                 :             :     }
     358                 :             : }
     359                 :             : 
     360                 :             : /*
     361                 :             :  * Wait on the slots to either finish connecting or to receive query results if
     362                 :             :  * possible.  This avoids a tight loop in upgrade_task_run().
     363                 :             :  */
     364                 :             : static void
     365                 :        1024 : wait_on_slots(UpgradeTaskSlot *slots, int numslots)
     366                 :             : {
     367                 :             :     fd_set      input;
     368                 :             :     fd_set      output;
     369                 :        1024 :     int         maxFd = 0;
     370                 :             : 
     371         [ +  + ]:       17408 :     FD_ZERO(&input);
     372         [ +  + ]:       17408 :     FD_ZERO(&output);
     373                 :             : 
     374         [ +  + ]:        2048 :     for (int i = 0; i < numslots; i++)
     375                 :             :     {
     376                 :             :         /*
     377                 :             :          * We assume the previous call to process_slot() handled everything
     378                 :             :          * that was marked ready in the previous call to wait_on_slots(), if
     379                 :             :          * any.
     380                 :             :          */
     381                 :        1024 :         slots[i].ready = false;
     382                 :             : 
     383                 :             :         /*
     384                 :             :          * This function should only ever see free slots as we are finishing
     385                 :             :          * processing the last few databases, at which point we don't have any
     386                 :             :          * databases left for them to process.  We'll never use these slots
     387                 :             :          * again, so we can safely ignore them.
     388                 :             :          */
     389         [ +  + ]:        1024 :         if (slots[i].state == FREE)
     390                 :         100 :             continue;
     391                 :             : 
     392                 :             :         /*
     393                 :             :          * Add the socket to the set.
     394                 :             :          */
     395                 :         924 :         slots[i].sock = PQsocket(slots[i].conn);
     396         [ -  + ]:         924 :         if (slots[i].sock < 0)
     397                 :           0 :             pg_fatal("invalid socket");
     398   [ +  +  +  + ]:         924 :         FD_SET(slots[i].sock, slots[i].select_mode ? &input : &output);
     399                 :         924 :         maxFd = Max(maxFd, slots[i].sock);
     400                 :             :     }
     401                 :             : 
     402                 :             :     /*
     403                 :             :      * If we found socket(s) to wait on, wait.
     404                 :             :      */
     405         [ -  + ]:        1024 :     if (select_loop(maxFd, &input, &output) == -1)
     406                 :           0 :         pg_fatal("%s() failed: %m", "select");
     407                 :             : 
     408                 :             :     /*
     409                 :             :      * Mark which sockets appear to be ready.
     410                 :             :      */
     411         [ +  + ]:        2048 :     for (int i = 0; i < numslots; i++)
     412         [ +  + ]:        1420 :         slots[i].ready |= (FD_ISSET(slots[i].sock, &input) ||
     413         [ +  + ]:        1420 :                            FD_ISSET(slots[i].sock, &output));
     414                 :        1024 : }
     415                 :             : 
     416                 :             : /*
     417                 :             :  * Runs all the steps of the task in every database in the cluster using
     418                 :             :  * user_opts.jobs parallel slots.
     419                 :             :  */
     420                 :             : void
     421                 :         100 : upgrade_task_run(const UpgradeTask *task, const ClusterInfo *cluster)
     422                 :             : {
     423                 :         100 :     int         jobs = Max(1, user_opts.jobs);
     424                 :         100 :     UpgradeTaskSlot *slots = pg_malloc0_array(UpgradeTaskSlot, jobs);
     425                 :             : 
     426                 :         100 :     dbs_complete = 0;
     427                 :         100 :     dbs_processing = 0;
     428                 :             : 
     429                 :             :     /*
     430                 :             :      * Process every slot the first time round.
     431                 :             :      */
     432         [ +  + ]:         200 :     for (int i = 0; i < jobs; i++)
     433                 :         100 :         slots[i].ready = true;
     434                 :             : 
     435         [ +  + ]:        1124 :     while (dbs_complete < cluster->dbarr.ndbs)
     436                 :             :     {
     437         [ +  + ]:        2048 :         for (int i = 0; i < jobs; i++)
     438                 :        1024 :             process_slot(cluster, &slots[i], task);
     439                 :             : 
     440                 :        1024 :         wait_on_slots(slots, jobs);
     441                 :             :     }
     442                 :             : 
     443                 :         100 :     pg_free(slots);
     444                 :         100 : }
        

Generated by: LCOV version 2.0-1