LCOV - code coverage report
Current view: top level - src/bin/pg_upgrade - task.c (source / functions) Hit Total Coverage
Test: PostgreSQL 19devel Lines: 118 129 91.5 %
Date: 2025-07-02 01:17:55 Functions: 9 9 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :  * task.c
       3             :  *      framework for parallelizing pg_upgrade's once-in-each-database tasks
       4             :  *
       5             :  * This framework provides an efficient way of running the various
       6             :  * once-in-each-database tasks required by pg_upgrade.  Specifically, it
       7             :  * parallelizes these tasks by managing a set of slots that follow a simple
       8             :  * state machine and by using libpq's asynchronous APIs to establish the
       9             :  * connections and run the queries.  Callers simply need to create a callback
      10             :  * function and build/execute an UpgradeTask.  A simple example follows:
      11             :  *
      12             :  *      static void
      13             :  *      my_process_cb(DbInfo *dbinfo, PGresult *res, void *arg)
      14             :  *      {
      15             :  *          for (int i = 0; i < PQntuples(res); i++)
      16             :  *          {
      17             :  *              ... process results ...
      18             :  *          }
      19             :  *      }
      20             :  *
      21             :  *      void
      22             :  *      my_task(ClusterInfo *cluster)
      23             :  *      {
      24             :  *          UpgradeTask *task = upgrade_task_create();
      25             :  *
      26             :  *          upgrade_task_add_step(task,
      27             :  *                                "... query text ...",
      28             :  *                                my_process_cb,
      29             :  *                                true,     // let the task free the PGresult
      30             :  *                                NULL);    // "arg" pointer for callback
      31             :  *          upgrade_task_run(task, cluster);
      32             :  *          upgrade_task_free(task);
      33             :  *      }
      34             :  *
      35             :  * Note that multiple steps can be added to a given task.  When there are
      36             :  * multiple steps, the task will run all of the steps consecutively in the same
      37             :  * database connection before freeing the connection and moving on.  In other
      38             :  * words, it only ever initiates one connection to each database in the
      39             :  * cluster for a given run.
      40             :  *
      41             :  * Copyright (c) 2024-2025, PostgreSQL Global Development Group
      42             :  * src/bin/pg_upgrade/task.c
      43             :  */
      44             : 
      45             : #include "postgres_fe.h"
      46             : 
      47             : #include "common/connect.h"
      48             : #include "fe_utils/string_utils.h"
      49             : #include "pg_upgrade.h"
      50             : 
      51             : /*
      52             :  * dbs_complete stores the number of databases that we have completed
      53             :  * processing.  When this value equals the number of databases in the cluster,
      54             :  * the task is finished.
      55             :  */
      56             : static int  dbs_complete;
      57             : 
      58             : /*
      59             :  * dbs_processing stores the index of the next database in the cluster's array
      60             :  * of databases that will be picked up for processing.  It will always be
      61             :  * greater than or equal to dbs_complete.
      62             :  */
      63             : static int  dbs_processing;
      64             : 
      65             : /*
      66             :  * This struct stores the information for a single step of a task.  Note that
      67             :  * the query string is stored in the "queries" PQExpBuffer for the UpgradeTask.
      68             :  * All steps in a task are run in a single connection before moving on to the
      69             :  * next database (which requires a new connection).
      70             :  */
      71             : typedef struct UpgradeTaskStep
      72             : {
      73             :     UpgradeTaskProcessCB process_cb;    /* processes the results of the query */
      74             :     bool        free_result;    /* should we free the result? */
      75             :     void       *arg;            /* pointer passed to process_cb */
      76             : } UpgradeTaskStep;
      77             : 
      78             : /*
      79             :  * This struct is a thin wrapper around an array of steps, i.e.,
      80             :  * UpgradeTaskStep, plus a PQExpBuffer for all the query strings.
      81             :  */
      82             : struct UpgradeTask
      83             : {
      84             :     UpgradeTaskStep *steps;
      85             :     int         num_steps;
      86             :     PQExpBuffer queries;
      87             : };
      88             : 
      89             : /*
      90             :  * The different states for a parallel slot.
      91             :  */
      92             : typedef enum UpgradeTaskSlotState
      93             : {
      94             :     FREE,                       /* slot available for use in a new database */
      95             :     CONNECTING,                 /* waiting for connection to be established */
      96             :     RUNNING_QUERIES,            /* running/processing queries in the task */
      97             : } UpgradeTaskSlotState;
      98             : 
      99             : /*
     100             :  * We maintain an array of user_opts.jobs slots to execute the task.
     101             :  */
     102             : typedef struct UpgradeTaskSlot
     103             : {
     104             :     UpgradeTaskSlotState state; /* state of the slot */
     105             :     int         db_idx;         /* index of the database assigned to slot */
     106             :     int         step_idx;       /* index of the current step of task */
     107             :     PGconn     *conn;           /* current connection managed by slot */
     108             :     bool        ready;          /* slot is ready for processing */
     109             :     bool        select_mode;    /* select() mode: true->read, false->write */
     110             :     int         sock;           /* file descriptor for connection's socket */
     111             : } UpgradeTaskSlot;
     112             : 
     113             : /*
     114             :  * Initializes an UpgradeTask.
     115             :  */
     116             : UpgradeTask *
     117         162 : upgrade_task_create(void)
     118             : {
     119         162 :     UpgradeTask *task = pg_malloc0(sizeof(UpgradeTask));
     120             : 
     121         162 :     task->queries = createPQExpBuffer();
     122             : 
     123             :     /* All tasks must first set a secure search_path. */
     124         162 :     upgrade_task_add_step(task, ALWAYS_SECURE_SEARCH_PATH_SQL, NULL, true, NULL);
     125             : 
     126         162 :     return task;
     127             : }
     128             : 
     129             : /*
     130             :  * Frees all storage associated with an UpgradeTask.
     131             :  */
     132             : void
     133         162 : upgrade_task_free(UpgradeTask *task)
     134             : {
     135         162 :     destroyPQExpBuffer(task->queries);
     136         162 :     pg_free(task->steps);
     137         162 :     pg_free(task);
     138         162 : }
     139             : 
     140             : /*
     141             :  * Adds a step to an UpgradeTask.  The steps will be executed in each database
     142             :  * in the order in which they are added.
     143             :  *
     144             :  *  task: task object that must have been initialized via upgrade_task_create()
     145             :  *  query: the query text
     146             :  *  process_cb: function that processes the results of the query
     147             :  *  free_result: should we free the PGresult, or leave it to the caller?
     148             :  *  arg: pointer to task-specific data that is passed to each callback
     149             :  */
     150             : void
     151         376 : upgrade_task_add_step(UpgradeTask *task, const char *query,
     152             :                       UpgradeTaskProcessCB process_cb, bool free_result,
     153             :                       void *arg)
     154             : {
     155             :     UpgradeTaskStep *new_step;
     156             : 
     157         752 :     task->steps = pg_realloc(task->steps,
     158         376 :                              ++task->num_steps * sizeof(UpgradeTaskStep));
     159             : 
     160         376 :     new_step = &task->steps[task->num_steps - 1];
     161         376 :     new_step->process_cb = process_cb;
     162         376 :     new_step->free_result = free_result;
     163         376 :     new_step->arg = arg;
     164             : 
     165         376 :     appendPQExpBuffer(task->queries, "%s;", query);
     166         376 : }
     167             : 
     168             : /*
     169             :  * Build a connection string for the slot's current database and asynchronously
     170             :  * start a new connection, but do not wait for the connection to be
     171             :  * established.
     172             :  */
     173             : static void
     174         460 : start_conn(const ClusterInfo *cluster, UpgradeTaskSlot *slot)
     175             : {
     176             :     PQExpBufferData conn_opts;
     177         460 :     DbInfo     *dbinfo = &cluster->dbarr.dbs[slot->db_idx];
     178             : 
     179             :     /* Build connection string with proper quoting */
     180         460 :     initPQExpBuffer(&conn_opts);
     181         460 :     appendPQExpBufferStr(&conn_opts, "dbname=");
     182         460 :     appendConnStrVal(&conn_opts, dbinfo->db_name);
     183         460 :     appendPQExpBufferStr(&conn_opts, " user=");
     184         460 :     appendConnStrVal(&conn_opts, os_info.user);
     185         460 :     appendPQExpBuffer(&conn_opts, " port=%d", cluster->port);
     186         460 :     if (cluster->sockdir)
     187             :     {
     188         460 :         appendPQExpBufferStr(&conn_opts, " host=");
     189         460 :         appendConnStrVal(&conn_opts, cluster->sockdir);
     190             :     }
     191             : 
     192         460 :     slot->conn = PQconnectStart(conn_opts.data);
     193             : 
     194         460 :     if (!slot->conn)
     195           0 :         pg_fatal("out of memory");
     196             : 
     197         460 :     termPQExpBuffer(&conn_opts);
     198         460 : }
     199             : 
     200             : /*
     201             :  * Run the process_cb callback function to process the result of a query, and
     202             :  * free the result if the caller indicated we should do so.
     203             :  */
     204             : static void
     205        1076 : process_query_result(const ClusterInfo *cluster, UpgradeTaskSlot *slot,
     206             :                      const UpgradeTask *task)
     207             : {
     208        1076 :     UpgradeTaskStep *steps = &task->steps[slot->step_idx];
     209        1076 :     UpgradeTaskProcessCB process_cb = steps->process_cb;
     210        1076 :     DbInfo     *dbinfo = &cluster->dbarr.dbs[slot->db_idx];
     211        1076 :     PGresult   *res = PQgetResult(slot->conn);
     212             : 
     213        2152 :     if (PQstatus(slot->conn) == CONNECTION_BAD ||
     214        1076 :         (PQresultStatus(res) != PGRES_TUPLES_OK &&
     215           0 :          PQresultStatus(res) != PGRES_COMMAND_OK))
     216           0 :         pg_fatal("connection failure: %s", PQerrorMessage(slot->conn));
     217             : 
     218             :     /*
     219             :      * We assume that a NULL process_cb callback function means there's
     220             :      * nothing to process.  This is primarily intended for the initial step in
     221             :      * every task that sets a safe search_path.
     222             :      */
     223        1076 :     if (process_cb)
     224         616 :         (*process_cb) (dbinfo, res, steps->arg);
     225             : 
     226        1076 :     if (steps->free_result)
     227         994 :         PQclear(res);
     228        1076 : }
     229             : 
     230             : /*
     231             :  * Advances the state machine for a given slot as necessary.
     232             :  */
     233             : static void
     234        2080 : process_slot(const ClusterInfo *cluster, UpgradeTaskSlot *slot, const UpgradeTask *task)
     235             : {
     236             :     PostgresPollingStatusType status;
     237             : 
     238        2080 :     if (!slot->ready)
     239           0 :         return;
     240             : 
     241        2080 :     switch (slot->state)
     242             :     {
     243         622 :         case FREE:
     244             : 
     245             :             /*
     246             :              * If all of the databases in the cluster have been processed or
     247             :              * are currently being processed by other slots, we are done.
     248             :              */
     249         622 :             if (dbs_processing >= cluster->dbarr.ndbs)
     250         162 :                 return;
     251             : 
     252             :             /*
     253             :              * Claim the next database in the cluster's array and initiate a
     254             :              * new connection.
     255             :              */
     256         460 :             slot->db_idx = dbs_processing++;
     257         460 :             slot->state = CONNECTING;
     258         460 :             start_conn(cluster, slot);
     259             : 
     260         460 :             return;
     261             : 
     262         920 :         case CONNECTING:
     263             : 
     264             :             /* Check for connection failure. */
     265         920 :             status = PQconnectPoll(slot->conn);
     266         920 :             if (status == PGRES_POLLING_FAILED)
     267           0 :                 pg_fatal("connection failure: %s", PQerrorMessage(slot->conn));
     268             : 
     269             :             /* Check whether the connection is still establishing. */
     270         920 :             if (status != PGRES_POLLING_OK)
     271             :             {
     272         460 :                 slot->select_mode = (status == PGRES_POLLING_READING);
     273         460 :                 return;
     274             :             }
     275             : 
     276             :             /*
     277             :              * Move on to running/processing the queries in the task.
     278             :              */
     279         460 :             slot->state = RUNNING_QUERIES;
     280         460 :             slot->select_mode = true;    /* wait until ready for reading */
     281         460 :             if (!PQsendQuery(slot->conn, task->queries->data))
     282           0 :                 pg_fatal("connection failure: %s", PQerrorMessage(slot->conn));
     283             : 
     284         460 :             return;
     285             : 
     286         538 :         case RUNNING_QUERIES:
     287             : 
     288             :             /*
     289             :              * Consume any available data and clear the read-ready indicator
     290             :              * for the connection.
     291             :              */
     292         538 :             if (!PQconsumeInput(slot->conn))
     293           0 :                 pg_fatal("connection failure: %s", PQerrorMessage(slot->conn));
     294             : 
     295             :             /*
     296             :              * Process any results that are ready so that we can free up this
     297             :              * slot for another database as soon as possible.
     298             :              */
     299        1614 :             for (; slot->step_idx < task->num_steps; slot->step_idx++)
     300             :             {
     301             :                 /* If no more results are available yet, move on. */
     302        1154 :                 if (PQisBusy(slot->conn))
     303          78 :                     return;
     304             : 
     305        1076 :                 process_query_result(cluster, slot, task);
     306             :             }
     307             : 
     308             :             /*
     309             :              * If we just finished processing the result of the last step in
     310             :              * the task, free the slot.  We recursively call this function on
     311             :              * the newly-freed slot so that we can start initiating the next
     312             :              * connection immediately instead of waiting for the next loop
     313             :              * through the slots.
     314             :              */
     315         460 :             dbs_complete++;
     316         460 :             PQfinish(slot->conn);
     317         460 :             memset(slot, 0, sizeof(UpgradeTaskSlot));
     318         460 :             slot->ready = true;
     319             : 
     320         460 :             process_slot(cluster, slot, task);
     321             : 
     322         460 :             return;
     323             :     }
     324             : }
     325             : 
     326             : /*
     327             :  * Returns -1 on error, else the number of ready descriptors.
     328             :  */
     329             : static int
     330        1620 : select_loop(int maxFd, fd_set *input, fd_set *output)
     331             : {
     332        1620 :     fd_set      save_input = *input;
     333        1620 :     fd_set      save_output = *output;
     334             : 
     335        1620 :     if (maxFd == 0)
     336         162 :         return 0;
     337             : 
     338             :     for (;;)
     339           0 :     {
     340             :         int         i;
     341             : 
     342        1458 :         *input = save_input;
     343        1458 :         *output = save_output;
     344             : 
     345        1458 :         i = select(maxFd + 1, input, output, NULL, NULL);
     346             : 
     347             : #ifndef WIN32
     348        1458 :         if (i < 0 && errno == EINTR)
     349           0 :             continue;
     350             : #else
     351             :         if (i == SOCKET_ERROR && WSAGetLastError() == WSAEINTR)
     352             :             continue;
     353             : #endif
     354        1458 :         return i;
     355             :     }
     356             : }
     357             : 
     358             : /*
     359             :  * Wait on the slots to either finish connecting or to receive query results if
     360             :  * possible.  This avoids a tight loop in upgrade_task_run().
     361             :  */
     362             : static void
     363        1620 : wait_on_slots(UpgradeTaskSlot *slots, int numslots)
     364             : {
     365             :     fd_set      input;
     366             :     fd_set      output;
     367        1620 :     int         maxFd = 0;
     368             : 
     369       27540 :     FD_ZERO(&input);
     370       27540 :     FD_ZERO(&output);
     371             : 
     372        3240 :     for (int i = 0; i < numslots; i++)
     373             :     {
     374             :         /*
     375             :          * We assume the previous call to process_slot() handled everything
     376             :          * that was marked ready in the previous call to wait_on_slots(), if
     377             :          * any.
     378             :          */
     379        1620 :         slots[i].ready = false;
     380             : 
     381             :         /*
     382             :          * This function should only ever see free slots as we are finishing
     383             :          * processing the last few databases, at which point we don't have any
     384             :          * databases left for them to process.  We'll never use these slots
     385             :          * again, so we can safely ignore them.
     386             :          */
     387        1620 :         if (slots[i].state == FREE)
     388         162 :             continue;
     389             : 
     390             :         /*
     391             :          * Add the socket to the set.
     392             :          */
     393        1458 :         slots[i].sock = PQsocket(slots[i].conn);
     394        1458 :         if (slots[i].sock < 0)
     395           0 :             pg_fatal("invalid socket");
     396        1458 :         FD_SET(slots[i].sock, slots[i].select_mode ? &input : &output);
     397        1458 :         maxFd = Max(maxFd, slots[i].sock);
     398             :     }
     399             : 
     400             :     /*
     401             :      * If we found socket(s) to wait on, wait.
     402             :      */
     403        1620 :     if (select_loop(maxFd, &input, &output) == -1)
     404           0 :         pg_fatal("%s() failed: %m", "select");
     405             : 
     406             :     /*
     407             :      * Mark which sockets appear to be ready.
     408             :      */
     409        3240 :     for (int i = 0; i < numslots; i++)
     410        2242 :         slots[i].ready |= (FD_ISSET(slots[i].sock, &input) ||
     411         622 :                            FD_ISSET(slots[i].sock, &output));
     412        1620 : }
     413             : 
     414             : /*
     415             :  * Runs all the steps of the task in every database in the cluster using
     416             :  * user_opts.jobs parallel slots.
     417             :  */
     418             : void
     419         162 : upgrade_task_run(const UpgradeTask *task, const ClusterInfo *cluster)
     420             : {
     421         162 :     int         jobs = Max(1, user_opts.jobs);
     422         162 :     UpgradeTaskSlot *slots = pg_malloc0(sizeof(UpgradeTaskSlot) * jobs);
     423             : 
     424         162 :     dbs_complete = 0;
     425         162 :     dbs_processing = 0;
     426             : 
     427             :     /*
     428             :      * Process every slot the first time round.
     429             :      */
     430         324 :     for (int i = 0; i < jobs; i++)
     431         162 :         slots[i].ready = true;
     432             : 
     433        1782 :     while (dbs_complete < cluster->dbarr.ndbs)
     434             :     {
     435        3240 :         for (int i = 0; i < jobs; i++)
     436        1620 :             process_slot(cluster, &slots[i], task);
     437             : 
     438        1620 :         wait_on_slots(slots, jobs);
     439             :     }
     440             : 
     441         162 :     pg_free(slots);
     442         162 : }

Generated by: LCOV version 1.16