Line data Source code
1 : /*
2 : * task.c
3 : * framework for parallelizing pg_upgrade's once-in-each-database tasks
4 : *
5 : * This framework provides an efficient way of running the various
6 : * once-in-each-database tasks required by pg_upgrade. Specifically, it
7 : * parallelizes these tasks by managing a set of slots that follow a simple
8 : * state machine and by using libpq's asynchronous APIs to establish the
9 : * connections and run the queries. Callers simply need to create a callback
10 : * function and build/execute an UpgradeTask. A simple example follows:
11 : *
12 : * static void
13 : * my_process_cb(DbInfo *dbinfo, PGresult *res, void *arg)
14 : * {
15 : * for (int i = 0; i < PQntuples(res); i++)
16 : * {
17 : * ... process results ...
18 : * }
19 : * }
20 : *
21 : * void
22 : * my_task(ClusterInfo *cluster)
23 : * {
24 : * UpgradeTask *task = upgrade_task_create();
25 : *
26 : * upgrade_task_add_step(task,
27 : * "... query text ...",
28 : * my_process_cb,
29 : * true, // let the task free the PGresult
30 : * NULL); // "arg" pointer for callback
31 : * upgrade_task_run(task, cluster);
32 : * upgrade_task_free(task);
33 : * }
34 : *
35 : * Note that multiple steps can be added to a given task. When there are
36 : * multiple steps, the task will run all of the steps consecutively in the same
37 : * database connection before freeing the connection and moving on. In other
38 : * words, it only ever initiates one connection to each database in the
39 : * cluster for a given run.
40 : *
41 : * Copyright (c) 2024-2025, PostgreSQL Global Development Group
42 : * src/bin/pg_upgrade/task.c
43 : */
44 :
45 : #include "postgres_fe.h"
46 :
47 : #include "common/connect.h"
48 : #include "fe_utils/string_utils.h"
49 : #include "pg_upgrade.h"
50 :
51 : /*
52 : * dbs_complete stores the number of databases that we have completed
53 : * processing. When this value equals the number of databases in the cluster,
54 : * the task is finished.
55 : */
56 : static int dbs_complete;
57 :
58 : /*
59 : * dbs_processing stores the index of the next database in the cluster's array
60 : * of databases that will be picked up for processing. It will always be
61 : * greater than or equal to dbs_complete.
62 : */
63 : static int dbs_processing;
64 :
65 : /*
66 : * This struct stores the information for a single step of a task. Note that
67 : * the query string is stored in the "queries" PQExpBuffer for the UpgradeTask.
68 : * All steps in a task are run in a single connection before moving on to the
69 : * next database (which requires a new connection).
70 : */
71 : typedef struct UpgradeTaskStep
72 : {
73 : UpgradeTaskProcessCB process_cb; /* processes the results of the query */
74 : bool free_result; /* should we free the result? */
75 : void *arg; /* pointer passed to process_cb */
76 : } UpgradeTaskStep;
77 :
78 : /*
79 : * This struct is a thin wrapper around an array of steps, i.e.,
80 : * UpgradeTaskStep, plus a PQExpBuffer for all the query strings.
81 : */
82 : struct UpgradeTask
83 : {
84 : UpgradeTaskStep *steps;
85 : int num_steps;
86 : PQExpBuffer queries;
87 : };
88 :
89 : /*
90 : * The different states for a parallel slot.
91 : */
92 : typedef enum UpgradeTaskSlotState
93 : {
94 : FREE, /* slot available for use in a new database */
95 : CONNECTING, /* waiting for connection to be established */
96 : RUNNING_QUERIES, /* running/processing queries in the task */
97 : } UpgradeTaskSlotState;
98 :
99 : /*
100 : * We maintain an array of user_opts.jobs slots to execute the task.
101 : */
102 : typedef struct UpgradeTaskSlot
103 : {
104 : UpgradeTaskSlotState state; /* state of the slot */
105 : int db_idx; /* index of the database assigned to slot */
106 : int step_idx; /* index of the current step of task */
107 : PGconn *conn; /* current connection managed by slot */
108 : bool ready; /* slot is ready for processing */
109 : bool select_mode; /* select() mode: true->read, false->write */
110 : int sock; /* file descriptor for connection's socket */
111 : } UpgradeTaskSlot;
112 :
113 : /*
114 : * Initializes an UpgradeTask.
115 : */
116 : UpgradeTask *
117 82 : upgrade_task_create(void)
118 : {
119 82 : UpgradeTask *task = pg_malloc0(sizeof(UpgradeTask));
120 :
121 82 : task->queries = createPQExpBuffer();
122 :
123 : /* All tasks must first set a secure search_path. */
124 82 : upgrade_task_add_step(task, ALWAYS_SECURE_SEARCH_PATH_SQL, NULL, true, NULL);
125 :
126 82 : return task;
127 : }
128 :
129 : /*
130 : * Frees all storage associated with an UpgradeTask.
131 : */
132 : void
133 82 : upgrade_task_free(UpgradeTask *task)
134 : {
135 82 : destroyPQExpBuffer(task->queries);
136 82 : pg_free(task->steps);
137 82 : pg_free(task);
138 82 : }
139 :
140 : /*
141 : * Adds a step to an UpgradeTask. The steps will be executed in each database
142 : * in the order in which they are added.
143 : *
144 : * task: task object that must have been initialized via upgrade_task_create()
145 : * query: the query text
146 : * process_cb: function that processes the results of the query
147 : * free_result: should we free the PGresult, or leave it to the caller?
148 : * arg: pointer to task-specific data that is passed to each callback
149 : */
150 : void
151 192 : upgrade_task_add_step(UpgradeTask *task, const char *query,
152 : UpgradeTaskProcessCB process_cb, bool free_result,
153 : void *arg)
154 : {
155 : UpgradeTaskStep *new_step;
156 :
157 384 : task->steps = pg_realloc(task->steps,
158 192 : ++task->num_steps * sizeof(UpgradeTaskStep));
159 :
160 192 : new_step = &task->steps[task->num_steps - 1];
161 192 : new_step->process_cb = process_cb;
162 192 : new_step->free_result = free_result;
163 192 : new_step->arg = arg;
164 :
165 192 : appendPQExpBuffer(task->queries, "%s;", query);
166 192 : }
167 :
168 : /*
169 : * Build a connection string for the slot's current database and asynchronously
170 : * start a new connection, but do not wait for the connection to be
171 : * established.
172 : */
173 : static void
174 244 : start_conn(const ClusterInfo *cluster, UpgradeTaskSlot *slot)
175 : {
176 : PQExpBufferData conn_opts;
177 244 : DbInfo *dbinfo = &cluster->dbarr.dbs[slot->db_idx];
178 :
179 : /* Build connection string with proper quoting */
180 244 : initPQExpBuffer(&conn_opts);
181 244 : appendPQExpBufferStr(&conn_opts, "dbname=");
182 244 : appendConnStrVal(&conn_opts, dbinfo->db_name);
183 244 : appendPQExpBufferStr(&conn_opts, " user=");
184 244 : appendConnStrVal(&conn_opts, os_info.user);
185 244 : appendPQExpBuffer(&conn_opts, " port=%d", cluster->port);
186 244 : if (cluster->sockdir)
187 : {
188 244 : appendPQExpBufferStr(&conn_opts, " host=");
189 244 : appendConnStrVal(&conn_opts, cluster->sockdir);
190 : }
191 :
192 244 : slot->conn = PQconnectStart(conn_opts.data);
193 :
194 244 : if (!slot->conn)
195 0 : pg_fatal("failed to create connection with connection string: \"%s\"",
196 : conn_opts.data);
197 :
198 244 : termPQExpBuffer(&conn_opts);
199 244 : }
200 :
201 : /*
202 : * Run the process_cb callback function to process the result of a query, and
203 : * free the result if the caller indicated we should do so.
204 : */
205 : static void
206 576 : process_query_result(const ClusterInfo *cluster, UpgradeTaskSlot *slot,
207 : const UpgradeTask *task)
208 : {
209 576 : UpgradeTaskStep *steps = &task->steps[slot->step_idx];
210 576 : UpgradeTaskProcessCB process_cb = steps->process_cb;
211 576 : DbInfo *dbinfo = &cluster->dbarr.dbs[slot->db_idx];
212 576 : PGresult *res = PQgetResult(slot->conn);
213 :
214 1152 : if (PQstatus(slot->conn) == CONNECTION_BAD ||
215 576 : (PQresultStatus(res) != PGRES_TUPLES_OK &&
216 0 : PQresultStatus(res) != PGRES_COMMAND_OK))
217 0 : pg_fatal("connection failure: %s", PQerrorMessage(slot->conn));
218 :
219 : /*
220 : * We assume that a NULL process_cb callback function means there's
221 : * nothing to process. This is primarily intended for the initial step in
222 : * every task that sets a safe search_path.
223 : */
224 576 : if (process_cb)
225 332 : (*process_cb) (dbinfo, res, steps->arg);
226 :
227 576 : if (steps->free_result)
228 528 : PQclear(res);
229 576 : }
230 :
231 : /*
232 : * Advances the state machine for a given slot as necessary.
233 : */
234 : static void
235 1130 : process_slot(const ClusterInfo *cluster, UpgradeTaskSlot *slot, const UpgradeTask *task)
236 : {
237 : PostgresPollingStatusType status;
238 :
239 1130 : if (!slot->ready)
240 0 : return;
241 :
242 1130 : switch (slot->state)
243 : {
244 326 : case FREE:
245 :
246 : /*
247 : * If all of the databases in the cluster have been processed or
248 : * are currently being processed by other slots, we are done.
249 : */
250 326 : if (dbs_processing >= cluster->dbarr.ndbs)
251 82 : return;
252 :
253 : /*
254 : * Claim the next database in the cluster's array and initiate a
255 : * new connection.
256 : */
257 244 : slot->db_idx = dbs_processing++;
258 244 : slot->state = CONNECTING;
259 244 : start_conn(cluster, slot);
260 :
261 244 : return;
262 :
263 488 : case CONNECTING:
264 :
265 : /* Check for connection failure. */
266 488 : status = PQconnectPoll(slot->conn);
267 488 : if (status == PGRES_POLLING_FAILED)
268 0 : pg_fatal("connection failure: %s", PQerrorMessage(slot->conn));
269 :
270 : /* Check whether the connection is still establishing. */
271 488 : if (status != PGRES_POLLING_OK)
272 : {
273 244 : slot->select_mode = (status == PGRES_POLLING_READING);
274 244 : return;
275 : }
276 :
277 : /*
278 : * Move on to running/processing the queries in the task.
279 : */
280 244 : slot->state = RUNNING_QUERIES;
281 244 : slot->select_mode = true; /* wait until ready for reading */
282 244 : if (!PQsendQuery(slot->conn, task->queries->data))
283 0 : pg_fatal("connection failure: %s", PQerrorMessage(slot->conn));
284 :
285 244 : return;
286 :
287 316 : case RUNNING_QUERIES:
288 :
289 : /*
290 : * Consume any available data and clear the read-ready indicator
291 : * for the connection.
292 : */
293 316 : if (!PQconsumeInput(slot->conn))
294 0 : pg_fatal("connection failure: %s", PQerrorMessage(slot->conn));
295 :
296 : /*
297 : * Process any results that are ready so that we can free up this
298 : * slot for another database as soon as possible.
299 : */
300 892 : for (; slot->step_idx < task->num_steps; slot->step_idx++)
301 : {
302 : /* If no more results are available yet, move on. */
303 648 : if (PQisBusy(slot->conn))
304 72 : return;
305 :
306 576 : process_query_result(cluster, slot, task);
307 : }
308 :
309 : /*
310 : * If we just finished processing the result of the last step in
311 : * the task, free the slot. We recursively call this function on
312 : * the newly-freed slot so that we can start initiating the next
313 : * connection immediately instead of waiting for the next loop
314 : * through the slots.
315 : */
316 244 : dbs_complete++;
317 244 : PQfinish(slot->conn);
318 244 : memset(slot, 0, sizeof(UpgradeTaskSlot));
319 244 : slot->ready = true;
320 :
321 244 : process_slot(cluster, slot, task);
322 :
323 244 : return;
324 : }
325 : }
326 :
327 : /*
328 : * Returns -1 on error, else the number of ready descriptors.
329 : */
330 : static int
331 886 : select_loop(int maxFd, fd_set *input, fd_set *output)
332 : {
333 886 : fd_set save_input = *input;
334 886 : fd_set save_output = *output;
335 :
336 886 : if (maxFd == 0)
337 82 : return 0;
338 :
339 : for (;;)
340 0 : {
341 : int i;
342 :
343 804 : *input = save_input;
344 804 : *output = save_output;
345 :
346 804 : i = select(maxFd + 1, input, output, NULL, NULL);
347 :
348 : #ifndef WIN32
349 804 : if (i < 0 && errno == EINTR)
350 0 : continue;
351 : #else
352 : if (i == SOCKET_ERROR && WSAGetLastError() == WSAEINTR)
353 : continue;
354 : #endif
355 804 : return i;
356 : }
357 : }
358 :
359 : /*
360 : * Wait on the slots to either finish connecting or to receive query results if
361 : * possible. This avoids a tight loop in upgrade_task_run().
362 : */
363 : static void
364 886 : wait_on_slots(UpgradeTaskSlot *slots, int numslots)
365 : {
366 : fd_set input;
367 : fd_set output;
368 886 : int maxFd = 0;
369 :
370 886 : FD_ZERO(&input);
371 886 : FD_ZERO(&output);
372 :
373 1772 : for (int i = 0; i < numslots; i++)
374 : {
375 : /*
376 : * We assume the previous call to process_slot() handled everything
377 : * that was marked ready in the previous call to wait_on_slots(), if
378 : * any.
379 : */
380 886 : slots[i].ready = false;
381 :
382 : /*
383 : * This function should only ever see free slots as we are finishing
384 : * processing the last few databases, at which point we don't have any
385 : * databases left for them to process. We'll never use these slots
386 : * again, so we can safely ignore them.
387 : */
388 886 : if (slots[i].state == FREE)
389 82 : continue;
390 :
391 : /*
392 : * Add the socket to the set.
393 : */
394 804 : slots[i].sock = PQsocket(slots[i].conn);
395 804 : if (slots[i].sock < 0)
396 0 : pg_fatal("invalid socket");
397 804 : FD_SET(slots[i].sock, slots[i].select_mode ? &input : &output);
398 804 : maxFd = Max(maxFd, slots[i].sock);
399 : }
400 :
401 : /*
402 : * If we found socket(s) to wait on, wait.
403 : */
404 886 : if (select_loop(maxFd, &input, &output) == -1)
405 0 : pg_fatal("select() failed: %m");
406 :
407 : /*
408 : * Mark which sockets appear to be ready.
409 : */
410 1772 : for (int i = 0; i < numslots; i++)
411 1212 : slots[i].ready |= (FD_ISSET(slots[i].sock, &input) ||
412 326 : FD_ISSET(slots[i].sock, &output));
413 886 : }
414 :
415 : /*
416 : * Runs all the steps of the task in every database in the cluster using
417 : * user_opts.jobs parallel slots.
418 : */
419 : void
420 82 : upgrade_task_run(const UpgradeTask *task, const ClusterInfo *cluster)
421 : {
422 82 : int jobs = Max(1, user_opts.jobs);
423 82 : UpgradeTaskSlot *slots = pg_malloc0(sizeof(UpgradeTaskSlot) * jobs);
424 :
425 82 : dbs_complete = 0;
426 82 : dbs_processing = 0;
427 :
428 : /*
429 : * Process every slot the first time round.
430 : */
431 164 : for (int i = 0; i < jobs; i++)
432 82 : slots[i].ready = true;
433 :
434 968 : while (dbs_complete < cluster->dbarr.ndbs)
435 : {
436 1772 : for (int i = 0; i < jobs; i++)
437 886 : process_slot(cluster, &slots[i], task);
438 :
439 886 : wait_on_slots(slots, jobs);
440 : }
441 :
442 82 : pg_free(slots);
443 82 : }
|